diff --git a/src/gateway/server-import-boundary.test.ts b/src/gateway/server-import-boundary.test.ts index f7654e745af..2da1e5db2df 100644 --- a/src/gateway/server-import-boundary.test.ts +++ b/src/gateway/server-import-boundary.test.ts @@ -50,4 +50,15 @@ describe("gateway startup import boundaries", () => { expect(validation).not.toContain("legacy-secretref-env-marker"); expect(validation).not.toContain("commands/doctor"); }); + + it("marks gateway close before awaiting gateway_stop hooks", () => { + const serverImpl = readSource("src/gateway/server.impl.ts"); + const closeStart = serverImpl.indexOf("close: async (opts)"); + const hookStart = serverImpl.indexOf("runGlobalGatewayStopSafely", closeStart); + const markStart = serverImpl.indexOf("markClosePreludeStarted();", closeStart); + + expect(closeStart).toBeGreaterThan(-1); + expect(markStart).toBeGreaterThan(closeStart); + expect(markStart).toBeLessThan(hookStart); + }); }); diff --git a/src/gateway/server-runtime-services.test.ts b/src/gateway/server-runtime-services.test.ts index d9e5843429a..72dca20d540 100644 --- a/src/gateway/server-runtime-services.test.ts +++ b/src/gateway/server-runtime-services.test.ts @@ -55,6 +55,7 @@ vi.mock("./model-pricing-cache.js", () => ({ const { activateGatewayScheduledServices, runGatewayPostReadyMaintenance, + scheduleGatewayPostReadyMaintenance, startGatewayRuntimeServices, } = await import("./server-runtime-services.js"); @@ -245,6 +246,64 @@ describe("server-runtime-services", () => { expect(recordPostReadyMemory).toHaveBeenCalledTimes(1); }); + it("returns a cancellable post-ready maintenance timer", async () => { + vi.useFakeTimers(); + const startMaintenance = vi.fn(async () => null); + const onStarted = vi.fn(); + const handle = scheduleGatewayPostReadyMaintenance( + createPostReadyMaintenanceScheduleParams({ + delayMs: 25, + onStarted, + startMaintenance, + }), + ); + + clearTimeout(handle); + await vi.advanceTimersByTimeAsync(25); + + expect(onStarted).not.toHaveBeenCalled(); + expect(startMaintenance).not.toHaveBeenCalled(); + }); + + it("clears delayed maintenance handles when close starts during maintenance startup", async () => { + vi.useFakeTimers(); + let closing = false; + let resolveMaintenance!: (maintenance: ReturnType) => void; + const startMaintenance = vi.fn( + () => + new Promise>((resolve) => { + resolveMaintenance = resolve; + }), + ); + const applyMaintenance = vi.fn(); + const cron = { start: vi.fn(async () => undefined) }; + const recordPostReadyMemory = vi.fn(); + + scheduleGatewayPostReadyMaintenance( + createPostReadyMaintenanceScheduleParams({ + delayMs: 25, + isClosing: () => closing, + startMaintenance, + applyMaintenance, + cron, + recordPostReadyMemory, + }), + ); + + await vi.advanceTimersByTimeAsync(25); + expect(startMaintenance).toHaveBeenCalledTimes(1); + + closing = true; + resolveMaintenance(createMaintenanceHandles()); + await Promise.resolve(); + await Promise.resolve(); + + expect(applyMaintenance).not.toHaveBeenCalled(); + expect(cron.start).not.toHaveBeenCalled(); + expect(recordPostReadyMemory).not.toHaveBeenCalled(); + expect(vi.getTimerCount()).toBe(0); + }); + it("keeps scheduled services disabled for minimal test gateways", () => { const cron = { start: vi.fn(async () => undefined) }; @@ -279,3 +338,30 @@ function createLog() { error: vi.fn(), }; } + +function createPostReadyMaintenanceScheduleParams( + overrides: Partial[0]> = {}, +): Parameters[0] { + return { + delayMs: 1, + isClosing: () => false, + startMaintenance: vi.fn(async () => null), + applyMaintenance: vi.fn(), + shouldStartCron: () => true, + markCronStartHandled: vi.fn(), + cron: { start: vi.fn(async () => undefined) }, + logCron: { error: vi.fn() }, + log: createLog(), + recordPostReadyMemory: vi.fn(), + ...overrides, + }; +} + +function createMaintenanceHandles() { + return { + tickInterval: setInterval(() => undefined, 60_000), + healthInterval: setInterval(() => undefined, 60_000), + dedupeCleanup: setInterval(() => undefined, 60_000), + mediaCleanup: setInterval(() => undefined, 60_000), + }; +} diff --git a/src/gateway/server-runtime-services.ts b/src/gateway/server-runtime-services.ts index b01efad65d4..d5d0847e28f 100644 --- a/src/gateway/server-runtime-services.ts +++ b/src/gateway/server-runtime-services.ts @@ -18,7 +18,7 @@ type GatewayRuntimeServiceLogger = { type GatewayPostReadyLogger = { warn: (message: string) => void; }; -type GatewayMaintenanceHandles = NonNullable< +export type GatewayMaintenanceHandles = NonNullable< Awaited> >; @@ -60,6 +60,18 @@ export function startGatewayCronWithLogging(params: { void params.cron.start().catch((err) => params.logCron.error(`failed to start: ${String(err)}`)); } +function clearGatewayMaintenanceHandles(maintenance: GatewayMaintenanceHandles | null): void { + if (!maintenance) { + return; + } + clearInterval(maintenance.tickInterval); + clearInterval(maintenance.healthInterval); + clearInterval(maintenance.dedupeCleanup); + if (maintenance.mediaCleanup) { + clearInterval(maintenance.mediaCleanup); + } +} + export async function runGatewayPostReadyMaintenance(params: { startMaintenance: () => Promise; applyMaintenance: (maintenance: GatewayMaintenanceHandles) => void; @@ -88,6 +100,59 @@ export async function runGatewayPostReadyMaintenance(params: { params.recordPostReadyMemory(); } +export function scheduleGatewayPostReadyMaintenance(params: { + delayMs: number; + isClosing: () => boolean; + onStarted?: () => void; + startMaintenance: () => Promise; + applyMaintenance: (maintenance: GatewayMaintenanceHandles) => void; + shouldStartCron: () => boolean; + markCronStartHandled: () => void; + cron: { start: () => Promise }; + logCron: { error: (message: string) => void }; + log: GatewayPostReadyLogger; + recordPostReadyMemory: () => void; +}): ReturnType { + const timer = setTimeout(() => { + params.onStarted?.(); + if (params.isClosing()) { + return; + } + void runGatewayPostReadyMaintenance({ + startMaintenance: async () => { + if (params.isClosing()) { + return null; + } + const maintenance = await params.startMaintenance(); + if (params.isClosing()) { + clearGatewayMaintenanceHandles(maintenance); + return null; + } + return maintenance; + }, + applyMaintenance: (maintenance) => { + if (params.isClosing()) { + clearGatewayMaintenanceHandles(maintenance); + return; + } + params.applyMaintenance(maintenance); + }, + shouldStartCron: () => !params.isClosing() && params.shouldStartCron(), + markCronStartHandled: params.markCronStartHandled, + cron: params.cron, + logCron: params.logCron, + log: params.log, + recordPostReadyMemory: () => { + if (!params.isClosing()) { + params.recordPostReadyMemory(); + } + }, + }); + }, params.delayMs); + timer.unref?.(); + return timer; +} + function recoverPendingOutboundDeliveries(params: { cfg: OpenClawConfig; log: GatewayRuntimeServiceLogger; diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 5f5104208fc..d4ecdad7128 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -894,8 +894,20 @@ export async function startGatewayServer( deps.cron = runtimeState.cronState.cron; let closePreludeStarted = false; - const runClosePrelude = async () => { + let postReadyMaintenanceTimer: ReturnType | null = null; + const clearPostReadyMaintenanceTimer = () => { + if (!postReadyMaintenanceTimer) { + return; + } + clearTimeout(postReadyMaintenanceTimer); + postReadyMaintenanceTimer = null; + }; + const markClosePreludeStarted = () => { closePreludeStarted = true; + clearPostReadyMaintenanceTimer(); + }; + const runClosePrelude = async () => { + markClosePreludeStarted(); clearCurrentPluginMetadataSnapshot(); const { runGatewayClosePrelude } = await loadGatewayCloseModule(); await runGatewayClosePrelude({ @@ -1492,28 +1504,30 @@ export async function startGatewayServer( log.warn(`gateway: failed to promote config last-known-good backup: ${String(err)}`); }); if (!minimalTestGateway) { - const handle = setTimeout(() => { - void gatewayRuntimeServices.runGatewayPostReadyMaintenance({ - startMaintenance: earlyRuntime.startMaintenance, - applyMaintenance: (maintenance) => { - runtimeState.tickInterval = maintenance.tickInterval; - runtimeState.healthInterval = maintenance.healthInterval; - runtimeState.dedupeCleanup = maintenance.dedupeCleanup; - runtimeState.mediaCleanup = maintenance.mediaCleanup; - }, - shouldStartCron: () => !gatewayCronStartHandled, - markCronStartHandled: () => { - gatewayCronStartHandled = true; - }, - cron: runtimeState.cronState.cron, - logCron, - log, - recordPostReadyMemory: () => { - startupTrace.detail("memory.post-ready", collectProcessMemoryUsageMb()); - }, - }); - }, POST_READY_MAINTENANCE_DELAY_MS); - handle.unref?.(); + postReadyMaintenanceTimer = gatewayRuntimeServices.scheduleGatewayPostReadyMaintenance({ + delayMs: POST_READY_MAINTENANCE_DELAY_MS, + isClosing: () => closePreludeStarted, + onStarted: () => { + postReadyMaintenanceTimer = null; + }, + startMaintenance: earlyRuntime.startMaintenance, + applyMaintenance: (maintenance) => { + runtimeState.tickInterval = maintenance.tickInterval; + runtimeState.healthInterval = maintenance.healthInterval; + runtimeState.dedupeCleanup = maintenance.dedupeCleanup; + runtimeState.mediaCleanup = maintenance.mediaCleanup; + }, + shouldStartCron: () => !gatewayCronStartHandled, + markCronStartHandled: () => { + gatewayCronStartHandled = true; + }, + cron: runtimeState.cronState.cron, + logCron, + log, + recordPostReadyMemory: () => { + startupTrace.detail("memory.post-ready", collectProcessMemoryUsageMb()); + }, + }); } else { startupTrace.detail("memory.post-ready", collectProcessMemoryUsageMb()); } @@ -1527,6 +1541,7 @@ export async function startGatewayServer( return { close: async (opts) => { try { + markClosePreludeStarted(); // Run gateway_stop plugin hook before shutdown const { runGlobalGatewayStopSafely } = await import("../plugins/hook-runner-global.js"); await runGlobalGatewayStopSafely({