From d77c4bbb2d2b216b1fe9bb0676983aba147cf15d Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 16 May 2026 18:51:51 +0100 Subject: [PATCH] fix(gateway): harden startup restart queue (#82660) (thanks @samzong) --- CHANGELOG.md | 1 + src/cli/gateway-cli/run-loop.test.ts | 51 +++++++++++++++++++++++++++- src/cli/gateway-cli/run-loop.ts | 50 +++++++++++++++++++++++++-- 3 files changed, 99 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 57567d5b0de..13f7d7eadd5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ Docs: https://docs.openclaw.ai - Codex app-server: mark native context compaction completion events as successful, preventing false "Compaction incomplete" notices after successful Codex-managed compaction. Fixes #82470. (#81593) Thanks @Kyzcreig. - Codex app-server: keep long-running turns alive while current-turn approvals, user input, dynamic tools, and notifications make progress, and carry that progress into the outer run timeout. (#82601) Thanks @100yenadmin. - Gateway/channels: hand off traced channel account startup outside the startup diagnostic phase so long-lived channel tasks do not keep liveness warnings pinned to channel startup. Refs #82398. +- Gateway/restart: queue restart and shutdown signals received while the gateway startup loop is still returning its server handle, so startup-time restarts are not dropped during update churn. (#82660) Thanks @samzong. - GitHub Copilot: route device-login requests through the plugin SSRF guard with a GitHub-only policy. - Group/channel replies: keep message-tool-preferred final replies private when the agent misses the message tool, and log suppressed payload metadata in the gateway debug log for quieter diagnosis. - Gateway/WebChat: route image attachments through a configured vision-capable `imageModel` plan before inlining images, and carry that image-model fallback chain through runtime retries. (#82524) Thanks @frankekn. diff --git a/src/cli/gateway-cli/run-loop.test.ts b/src/cli/gateway-cli/run-loop.test.ts index e8ba0327bed..0c926f8d567 100644 --- a/src/cli/gateway-cli/run-loop.test.ts +++ b/src/cli/gateway-cli/run-loop.test.ts @@ -635,6 +635,10 @@ describe("runGatewayLoop", () => { () => markGatewaySigusr1RestartHandled.mock.calls.length > 0, "expected SIGUSR1 handler to consume the restart before startup returned", ); + await waitForLoopCondition( + () => markGatewayDraining.mock.calls.length > 0, + "expected queued startup restart to mark gateway draining before startup returned", + ); return { close: closeFirst }; }); start.mockImplementationOnce(async () => { @@ -675,6 +679,51 @@ describe("runGatewayLoop", () => { }); }); + it("exits if a queued startup restart never reaches a close handle", async () => { + vi.clearAllMocks(); + peekGatewaySigusr1RestartReason.mockReturnValue(undefined); + vi.useFakeTimers(); + + try { + await withIsolatedSignals(async ({ captureSignal }) => { + const close = vi.fn(async () => {}); + const startupNeverReturns = new Promise(() => {}); + const { runtime, exited } = createRuntimeWithExitSignal(); + const start = vi.fn(async () => { + await startupNeverReturns; + return { close }; + }); + + const { runGatewayLoop } = await import("./run-loop.js"); + void runGatewayLoop({ + start: start as unknown as Parameters[0]["start"], + runtime: runtime as unknown as Parameters[0]["runtime"], + }); + await vi.advanceTimersByTimeAsync(0); + const sigusr1 = captureSignal("SIGUSR1"); + + sigusr1(); + await vi.advanceTimersByTimeAsync(0); + expect(markGatewaySigusr1RestartHandled).toHaveBeenCalledTimes(1); + expect(markGatewayDraining).toHaveBeenCalledTimes(1); + expect(runtime.exit).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(24_999); + expect(runtime.exit).not.toHaveBeenCalled(); + await vi.advanceTimersByTimeAsync(1); + + await expect(exited).resolves.toBe(1); + expect(close).not.toHaveBeenCalled(); + expect(start).toHaveBeenCalledTimes(1); + expect(gatewayLog.error).toHaveBeenCalledWith( + "startup restart request timed out before gateway returned a close handle; exiting for supervisor recovery", + ); + }); + } finally { + vi.useRealTimers(); + } + }); + it("processes SIGINT immediately before startup returns a server", async () => { vi.clearAllMocks(); @@ -736,7 +785,7 @@ describe("runGatewayLoop", () => { await expect(exited).resolves.toBe(0); expect(close).not.toHaveBeenCalled(); - expect(markGatewayDraining).not.toHaveBeenCalled(); + expect(markGatewayDraining).toHaveBeenCalledTimes(1); expect(start).toHaveBeenCalledTimes(1); expect(acquireGatewayLock).toHaveBeenCalledTimes(1); expect(gatewayLog.info).toHaveBeenCalledWith( diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index 1f634ce480a..bea0cbf9907 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -112,6 +112,8 @@ export async function runGatewayLoop(params: { // The HTTP server can report ready before params.start returns its close handle. // Defer lifecycle signals from that window until the loop can close and advance. let pendingStartupRequest: GatewayRunSignalRequest | null = null; + let pendingStartupForceExitTimer: ReturnType | null = null; + let restartDrainingMarkPromise: Promise | null = null; let startupFailedWithoutServerHandle = false; const processInstanceId = randomUUID(); const waitForHealthyChild = params.waitForHealthyChild ?? waitForHealthyGatewayChild; @@ -305,6 +307,32 @@ export async function runGatewayLoop(params: { const SUPERVISOR_STOP_TIMEOUT_MS = 30_000; const SHUTDOWN_TIMEOUT_MS = SUPERVISOR_STOP_TIMEOUT_MS - 5_000; + const clearPendingStartupForceExitTimer = () => { + if (!pendingStartupForceExitTimer) { + return; + } + clearTimeout(pendingStartupForceExitTimer); + pendingStartupForceExitTimer = null; + }; + const armPendingStartupForceExitTimer = () => { + if (pendingStartupForceExitTimer) { + return; + } + pendingStartupForceExitTimer = setTimeout(() => { + pendingStartupForceExitTimer = null; + gatewayLog.error( + "startup restart request timed out before gateway returned a close handle; exiting for supervisor recovery", + ); + void (async () => { + try { + await writeStabilityBundle("gateway.restart_startup_request_timeout"); + } finally { + exitProcess(1); + } + })(); + }, SHUTDOWN_TIMEOUT_MS); + pendingStartupForceExitTimer.unref?.(); + }; const resolveRestartDrainTimeoutMs = async ( restartIntent?: RestartIntentOptions, ): Promise => { @@ -323,6 +351,18 @@ export async function runGatewayLoop(params: { return DEFAULT_RESTART_DRAIN_TIMEOUT_MS; } }; + const markRestartDraining = async () => { + if (!restartDrainingMarkPromise) { + restartDrainingMarkPromise = (async () => { + const { markGatewayDraining } = await loadGatewayLifecycleRuntimeModule(); + markGatewayDraining(); + })().catch((err) => { + restartDrainingMarkPromise = null; + throw err; + }); + } + await restartDrainingMarkPromise; + }; const runAcceptedRequest = ({ action, @@ -404,7 +444,6 @@ export async function runGatewayLoop(params: { getInspectableActiveTaskRestartBlockers, getActiveEmbeddedRunCount, getActiveTaskCount, - markGatewayDraining, waitForActiveEmbeddedRuns, waitForActiveTasks, } = await loadGatewayLifecycleRuntimeModule(); @@ -439,7 +478,7 @@ export async function runGatewayLoop(params: { // Reject new enqueues immediately during the drain window so // sessions get an explicit restart error instead of silent task loss. - markGatewayDraining(); + await markRestartDraining(); const activeTasks = getActiveTaskCount(); const activeRuns = getActiveEmbeddedRunCount(); activeTasksAtDrainStart = activeTasks; @@ -540,6 +579,7 @@ export async function runGatewayLoop(params: { } const request = pendingStartupRequest; pendingStartupRequest = null; + clearPendingStartupForceExitTimer(); startupFailedWithoutServerHandle = false; runAcceptedRequest(request); }; @@ -554,6 +594,7 @@ export async function runGatewayLoop(params: { if (action === "stop" && pendingStartupRequest && !server) { gatewayLog.info(`received ${signal}; overriding pending startup restart with shutdown`); pendingStartupRequest = null; + clearPendingStartupForceExitTimer(); startupFailedWithoutServerHandle = false; runAcceptedRequest(acceptedRequest); return; @@ -583,6 +624,10 @@ export async function runGatewayLoop(params: { } if (!server || !restartResolver) { pendingStartupRequest = acceptedRequest; + void markRestartDraining().catch((err) => { + gatewayLog.warn(`failed to mark gateway draining for startup restart: ${String(err)}`); + }); + armPendingStartupForceExitTimer(); return; } runAcceptedRequest(acceptedRequest); @@ -668,6 +713,7 @@ export async function runGatewayLoop(params: { let isFirstStart = true; for (;;) { await onIteration(); + restartDrainingMarkPromise = null; let startupFailedBeforeServerHandle = false; try { server = await params.start({ startupStartedAt });