mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-15 13:10:43 +00:00
Fixes the post-ready maintenance shutdown race by marking close before gateway_stop hooks, clearing delayed timers, and suppressing already-fired maintenance work during shutdown. Verification: - pnpm test:serial src/gateway/server-runtime-services.test.ts src/gateway/server-import-boundary.test.ts - pnpm exec oxfmt --check --threads=1 src/gateway/server.impl.ts src/gateway/server-import-boundary.test.ts src/gateway/server-runtime-services.ts src/gateway/server-runtime-services.test.ts - git diff --check - crabbox blacksmith-testbox tbx_01kqrw87d527jwcfxbp6qk1wc3: pnpm check:changed (exit 0)
368 lines
12 KiB
TypeScript
368 lines
12 KiB
TypeScript
import { beforeEach, describe, expect, it, vi } from "vitest";
|
|
|
|
const hoisted = vi.hoisted(() => {
|
|
const heartbeatRunner = {
|
|
stop: vi.fn(),
|
|
updateConfig: vi.fn(),
|
|
};
|
|
const stopModelPricingRefresh = vi.fn();
|
|
return {
|
|
heartbeatRunner,
|
|
startHeartbeatRunner: vi.fn(() => heartbeatRunner),
|
|
startChannelHealthMonitor: vi.fn(() => ({ stop: vi.fn() })),
|
|
stopModelPricingRefresh,
|
|
startGatewayModelPricingRefresh: vi.fn(() => stopModelPricingRefresh),
|
|
loadModelPricingCacheModule: vi.fn(),
|
|
isVitestRuntimeEnv: vi.fn(() => false),
|
|
recoverPendingDeliveries: vi.fn(async () => undefined),
|
|
recoverPendingRestartContinuationDeliveries: vi.fn(async () => undefined),
|
|
deliverOutboundPayloads: vi.fn(),
|
|
};
|
|
});
|
|
|
|
vi.mock("../infra/heartbeat-runner.js", () => ({
|
|
startHeartbeatRunner: hoisted.startHeartbeatRunner,
|
|
}));
|
|
|
|
vi.mock("../infra/env.js", () => ({
|
|
isVitestRuntimeEnv: hoisted.isVitestRuntimeEnv,
|
|
}));
|
|
|
|
vi.mock("../infra/outbound/deliver.js", () => ({
|
|
deliverOutboundPayloads: hoisted.deliverOutboundPayloads,
|
|
}));
|
|
|
|
vi.mock("../infra/outbound/delivery-queue.js", () => ({
|
|
recoverPendingDeliveries: hoisted.recoverPendingDeliveries,
|
|
}));
|
|
|
|
vi.mock("./server-restart-sentinel.js", () => ({
|
|
recoverPendingRestartContinuationDeliveries: hoisted.recoverPendingRestartContinuationDeliveries,
|
|
}));
|
|
|
|
vi.mock("./channel-health-monitor.js", () => ({
|
|
startChannelHealthMonitor: hoisted.startChannelHealthMonitor,
|
|
}));
|
|
|
|
vi.mock("./model-pricing-cache.js", () => ({
|
|
...(() => {
|
|
hoisted.loadModelPricingCacheModule();
|
|
return {};
|
|
})(),
|
|
startGatewayModelPricingRefresh: hoisted.startGatewayModelPricingRefresh,
|
|
}));
|
|
|
|
const {
|
|
activateGatewayScheduledServices,
|
|
runGatewayPostReadyMaintenance,
|
|
scheduleGatewayPostReadyMaintenance,
|
|
startGatewayRuntimeServices,
|
|
} = await import("./server-runtime-services.js");
|
|
|
|
describe("server-runtime-services", () => {
|
|
beforeEach(() => {
|
|
vi.useRealTimers();
|
|
hoisted.heartbeatRunner.stop.mockClear();
|
|
hoisted.heartbeatRunner.updateConfig.mockClear();
|
|
hoisted.startHeartbeatRunner.mockClear();
|
|
hoisted.startChannelHealthMonitor.mockClear();
|
|
hoisted.startGatewayModelPricingRefresh.mockClear();
|
|
hoisted.stopModelPricingRefresh.mockClear();
|
|
hoisted.loadModelPricingCacheModule.mockClear();
|
|
hoisted.isVitestRuntimeEnv.mockReset().mockReturnValue(false);
|
|
hoisted.recoverPendingDeliveries.mockClear();
|
|
hoisted.recoverPendingRestartContinuationDeliveries.mockClear();
|
|
hoisted.deliverOutboundPayloads.mockClear();
|
|
});
|
|
|
|
it("skips model pricing bootstrap import when pricing is disabled", async () => {
|
|
activateGatewayScheduledServices({
|
|
minimalTestGateway: false,
|
|
cfgAtStart: { models: { pricing: { enabled: false } } } as never,
|
|
deps: {} as never,
|
|
sessionDeliveryRecoveryMaxEnqueuedAt: 123,
|
|
cron: { start: vi.fn(async () => undefined) },
|
|
logCron: { error: vi.fn() },
|
|
log: createLog(),
|
|
});
|
|
|
|
await vi.dynamicImportSettled();
|
|
|
|
expect(hoisted.loadModelPricingCacheModule).not.toHaveBeenCalled();
|
|
expect(hoisted.startGatewayModelPricingRefresh).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("keeps scheduled services and pricing refresh inert during initial runtime setup", async () => {
|
|
const services = startGatewayRuntimeServices({
|
|
minimalTestGateway: false,
|
|
cfgAtStart: {} as never,
|
|
channelManager: {
|
|
getRuntimeSnapshot: vi.fn(),
|
|
isHealthMonitorEnabled: vi.fn(),
|
|
isManuallyStopped: vi.fn(),
|
|
} as never,
|
|
log: createLog(),
|
|
});
|
|
|
|
expect(hoisted.startChannelHealthMonitor).toHaveBeenCalledTimes(1);
|
|
await vi.dynamicImportSettled();
|
|
expect(hoisted.loadModelPricingCacheModule).not.toHaveBeenCalled();
|
|
expect(hoisted.startGatewayModelPricingRefresh).not.toHaveBeenCalled();
|
|
expect(hoisted.startHeartbeatRunner).not.toHaveBeenCalled();
|
|
expect(hoisted.recoverPendingDeliveries).not.toHaveBeenCalled();
|
|
|
|
services.heartbeatRunner.stop();
|
|
expect(hoisted.heartbeatRunner.stop).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("starts model pricing refresh after scheduled services activate", async () => {
|
|
const pluginLookUpTable = {
|
|
index: { plugins: [] },
|
|
manifestRegistry: { plugins: [], diagnostics: [] },
|
|
};
|
|
const cron = { start: vi.fn(async () => undefined) };
|
|
const log = createLog();
|
|
|
|
const services = activateGatewayScheduledServices({
|
|
minimalTestGateway: false,
|
|
cfgAtStart: {} as never,
|
|
deps: {} as never,
|
|
sessionDeliveryRecoveryMaxEnqueuedAt: 123,
|
|
cron,
|
|
logCron: { error: vi.fn() },
|
|
log,
|
|
pluginLookUpTable: pluginLookUpTable as never,
|
|
});
|
|
|
|
expect(hoisted.startHeartbeatRunner).toHaveBeenCalledTimes(1);
|
|
expect(cron.start).toHaveBeenCalledTimes(1);
|
|
await vi.dynamicImportSettled();
|
|
expect(hoisted.startGatewayModelPricingRefresh).toHaveBeenCalledWith({
|
|
config: {},
|
|
pluginLookUpTable,
|
|
});
|
|
services.stopModelPricingRefresh();
|
|
expect(hoisted.stopModelPricingRefresh).toHaveBeenCalledTimes(1);
|
|
});
|
|
|
|
it("does not start model pricing refresh after scheduled services stop before import settles", async () => {
|
|
const cron = { start: vi.fn(async () => undefined) };
|
|
const services = activateGatewayScheduledServices({
|
|
minimalTestGateway: false,
|
|
cfgAtStart: {} as never,
|
|
deps: {} as never,
|
|
sessionDeliveryRecoveryMaxEnqueuedAt: 123,
|
|
cron,
|
|
logCron: { error: vi.fn() },
|
|
log: createLog(),
|
|
});
|
|
|
|
services.stopModelPricingRefresh();
|
|
await vi.dynamicImportSettled();
|
|
|
|
expect(hoisted.startGatewayModelPricingRefresh).not.toHaveBeenCalled();
|
|
expect(hoisted.stopModelPricingRefresh).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("activates heartbeat, cron, and delivery recovery after sidecars are ready", async () => {
|
|
vi.useFakeTimers();
|
|
const cron = { start: vi.fn(async () => undefined) };
|
|
const log = createLog();
|
|
|
|
const services = activateGatewayScheduledServices({
|
|
minimalTestGateway: false,
|
|
cfgAtStart: {} as never,
|
|
deps: {} as never,
|
|
sessionDeliveryRecoveryMaxEnqueuedAt: 123,
|
|
cron,
|
|
logCron: { error: vi.fn() },
|
|
log,
|
|
});
|
|
|
|
expect(hoisted.startHeartbeatRunner).toHaveBeenCalledTimes(1);
|
|
expect(cron.start).toHaveBeenCalledTimes(1);
|
|
expect(services.heartbeatRunner).toBe(hoisted.heartbeatRunner);
|
|
await vi.advanceTimersByTimeAsync(1_250);
|
|
await vi.dynamicImportSettled();
|
|
expect(hoisted.recoverPendingDeliveries).toHaveBeenCalledWith(
|
|
expect.objectContaining({
|
|
deliver: hoisted.deliverOutboundPayloads,
|
|
cfg: {},
|
|
}),
|
|
);
|
|
expect(hoisted.recoverPendingRestartContinuationDeliveries).toHaveBeenCalledWith(
|
|
expect.objectContaining({
|
|
deps: {},
|
|
maxEnqueuedAt: 123,
|
|
}),
|
|
);
|
|
});
|
|
|
|
it("can defer cron startup while activating other scheduled services", async () => {
|
|
vi.useFakeTimers();
|
|
const cron = { start: vi.fn(async () => undefined) };
|
|
const log = createLog();
|
|
|
|
activateGatewayScheduledServices({
|
|
minimalTestGateway: false,
|
|
cfgAtStart: {} as never,
|
|
deps: {} as never,
|
|
sessionDeliveryRecoveryMaxEnqueuedAt: 123,
|
|
cron,
|
|
startCron: false,
|
|
logCron: { error: vi.fn() },
|
|
log,
|
|
});
|
|
|
|
expect(hoisted.startHeartbeatRunner).toHaveBeenCalledTimes(1);
|
|
expect(cron.start).not.toHaveBeenCalled();
|
|
await vi.advanceTimersByTimeAsync(1_250);
|
|
await vi.dynamicImportSettled();
|
|
expect(hoisted.recoverPendingDeliveries).toHaveBeenCalledTimes(1);
|
|
});
|
|
|
|
it("starts cron and records memory when post-ready maintenance fails", async () => {
|
|
const cron = { start: vi.fn(async () => undefined) };
|
|
const log = createLog();
|
|
const recordPostReadyMemory = vi.fn();
|
|
|
|
await runGatewayPostReadyMaintenance({
|
|
startMaintenance: vi.fn(async () => {
|
|
throw new Error("timers unavailable");
|
|
}),
|
|
applyMaintenance: vi.fn(),
|
|
shouldStartCron: () => true,
|
|
markCronStartHandled: vi.fn(),
|
|
cron,
|
|
logCron: { error: vi.fn() },
|
|
log,
|
|
recordPostReadyMemory,
|
|
});
|
|
|
|
expect(log.warn).toHaveBeenCalledWith(
|
|
"gateway post-ready maintenance startup failed: Error: timers unavailable",
|
|
);
|
|
expect(cron.start).toHaveBeenCalledTimes(1);
|
|
expect(recordPostReadyMemory).toHaveBeenCalledTimes(1);
|
|
});
|
|
|
|
it("returns a cancellable post-ready maintenance timer", async () => {
|
|
vi.useFakeTimers();
|
|
const startMaintenance = vi.fn(async () => null);
|
|
const onStarted = vi.fn();
|
|
const handle = scheduleGatewayPostReadyMaintenance(
|
|
createPostReadyMaintenanceScheduleParams({
|
|
delayMs: 25,
|
|
onStarted,
|
|
startMaintenance,
|
|
}),
|
|
);
|
|
|
|
clearTimeout(handle);
|
|
await vi.advanceTimersByTimeAsync(25);
|
|
|
|
expect(onStarted).not.toHaveBeenCalled();
|
|
expect(startMaintenance).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("clears delayed maintenance handles when close starts during maintenance startup", async () => {
|
|
vi.useFakeTimers();
|
|
let closing = false;
|
|
let resolveMaintenance!: (maintenance: ReturnType<typeof createMaintenanceHandles>) => void;
|
|
const startMaintenance = vi.fn(
|
|
() =>
|
|
new Promise<ReturnType<typeof createMaintenanceHandles>>((resolve) => {
|
|
resolveMaintenance = resolve;
|
|
}),
|
|
);
|
|
const applyMaintenance = vi.fn();
|
|
const cron = { start: vi.fn(async () => undefined) };
|
|
const recordPostReadyMemory = vi.fn();
|
|
|
|
scheduleGatewayPostReadyMaintenance(
|
|
createPostReadyMaintenanceScheduleParams({
|
|
delayMs: 25,
|
|
isClosing: () => closing,
|
|
startMaintenance,
|
|
applyMaintenance,
|
|
cron,
|
|
recordPostReadyMemory,
|
|
}),
|
|
);
|
|
|
|
await vi.advanceTimersByTimeAsync(25);
|
|
expect(startMaintenance).toHaveBeenCalledTimes(1);
|
|
|
|
closing = true;
|
|
resolveMaintenance(createMaintenanceHandles());
|
|
await Promise.resolve();
|
|
await Promise.resolve();
|
|
|
|
expect(applyMaintenance).not.toHaveBeenCalled();
|
|
expect(cron.start).not.toHaveBeenCalled();
|
|
expect(recordPostReadyMemory).not.toHaveBeenCalled();
|
|
expect(vi.getTimerCount()).toBe(0);
|
|
});
|
|
|
|
it("keeps scheduled services disabled for minimal test gateways", () => {
|
|
const cron = { start: vi.fn(async () => undefined) };
|
|
|
|
const services = activateGatewayScheduledServices({
|
|
minimalTestGateway: true,
|
|
cfgAtStart: {} as never,
|
|
deps: {} as never,
|
|
sessionDeliveryRecoveryMaxEnqueuedAt: 123,
|
|
cron,
|
|
logCron: { error: vi.fn() },
|
|
log: createLog(),
|
|
});
|
|
|
|
expect(hoisted.startHeartbeatRunner).not.toHaveBeenCalled();
|
|
expect(cron.start).not.toHaveBeenCalled();
|
|
expect(hoisted.recoverPendingDeliveries).not.toHaveBeenCalled();
|
|
expect(hoisted.recoverPendingRestartContinuationDeliveries).not.toHaveBeenCalled();
|
|
|
|
services.heartbeatRunner.stop();
|
|
expect(hoisted.heartbeatRunner.stop).not.toHaveBeenCalled();
|
|
});
|
|
});
|
|
|
|
function createLog() {
|
|
return {
|
|
child: vi.fn(() => ({
|
|
info: vi.fn(),
|
|
warn: vi.fn(),
|
|
error: vi.fn(),
|
|
})),
|
|
warn: vi.fn(),
|
|
error: vi.fn(),
|
|
};
|
|
}
|
|
|
|
function createPostReadyMaintenanceScheduleParams(
|
|
overrides: Partial<Parameters<typeof scheduleGatewayPostReadyMaintenance>[0]> = {},
|
|
): Parameters<typeof scheduleGatewayPostReadyMaintenance>[0] {
|
|
return {
|
|
delayMs: 1,
|
|
isClosing: () => false,
|
|
startMaintenance: vi.fn(async () => null),
|
|
applyMaintenance: vi.fn(),
|
|
shouldStartCron: () => true,
|
|
markCronStartHandled: vi.fn(),
|
|
cron: { start: vi.fn(async () => undefined) },
|
|
logCron: { error: vi.fn() },
|
|
log: createLog(),
|
|
recordPostReadyMemory: vi.fn(),
|
|
...overrides,
|
|
};
|
|
}
|
|
|
|
function createMaintenanceHandles() {
|
|
return {
|
|
tickInterval: setInterval(() => undefined, 60_000),
|
|
healthInterval: setInterval(() => undefined, 60_000),
|
|
dedupeCleanup: setInterval(() => undefined, 60_000),
|
|
mediaCleanup: setInterval(() => undefined, 60_000),
|
|
};
|
|
}
|