From ec1f72b6c58f05432e6d5578fef4884b6adc2fee Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Sat, 25 Apr 2026 01:35:47 -0700 Subject: [PATCH] fix(gateway): preserve restart drain for active runs Fixes https://github.com/openclaw/openclaw/issues/65485 --- CHANGELOG.md | 1 + docs/.generated/config-baseline.sha256 | 4 +- docs/gateway/configuration-reference.md | 4 +- src/agents/pi-embedded-runner/runs.ts | 16 ++- src/cli/daemon-cli/lifecycle-core.test.ts | 31 +++++ src/cli/daemon-cli/lifecycle-core.ts | 20 ++- src/cli/gateway-cli/run-loop.test.ts | 54 +++++++- src/cli/gateway-cli/run-loop.ts | 80 ++++++++--- src/config/schema.base.generated.ts | 4 +- src/config/schema.help.ts | 2 +- src/config/types.gateway.ts | 9 +- src/gateway/server-reload-handlers.ts | 6 + src/infra/infra-runtime.test.ts | 33 ++++- src/infra/restart-intent.test.ts | 79 +++++++++++ src/infra/restart.deferral-timeout.test.ts | 12 +- src/infra/restart.ts | 147 ++++++++++++++++++++- src/process/command-queue.ts | 15 ++- 17 files changed, 453 insertions(+), 64 deletions(-) create mode 100644 src/infra/restart-intent.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 51ca957ede1..d2b4e116108 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -131,6 +131,7 @@ Docs: https://docs.openclaw.ai - macOS Gateway: wait for launchd to reload the exited Gateway LaunchAgent before bootstrapping repair fallback, preventing config-triggered restarts from leaving the service not loaded. Fixes #45178. Thanks @vincentkoc. - macOS Gateway: tolerate launchctl bootstrap's already-loaded exit during restart fallback and use non-killing kickstart after bootstrap, avoiding a second race that can unload the LaunchAgent. Fixes #41934. Thanks @zerone0x. - macOS Gateway: rewrite stale LaunchAgent plists before restart fallback bootstrap, matching install repair behavior when `gateway restart` has to re-register launchd. Thanks @maybegeeker. +- Gateway/reload: wait indefinitely by default for active operations before restart-required reloads and preserve that drain path when service-manager restarts arrive as SIGTERM, preventing in-flight agent runs from being killed mid-turn. Fixes #65485. Thanks @rijhsinghani. - TTS/hooks: preserve audio-only TTS transcripts for `message_sending` and `message_sent` hooks without rendering the transcript as a media caption. Thanks @zqchris. - WhatsApp/TTS: preserve `audioAsVoice` through shared media payload sends and the WhatsApp outbound adapter, so `[[audio_as_voice]]` reply payloads keep their voice-note intent when routed through `sendPayload`. Fixes #66053. Thanks @masatohoshino. - Control UI/WebChat: hide heartbeat prompts, `HEARTBEAT_OK` acknowledgments, and internal-only runtime context turns from visible chat history while leaving the underlying transcript intact. Fixes #71381. Thanks @gerald1950ggg-ai. diff --git a/docs/.generated/config-baseline.sha256 b/docs/.generated/config-baseline.sha256 index 7b6f5e3262b..7ba99141fb7 100644 --- a/docs/.generated/config-baseline.sha256 +++ b/docs/.generated/config-baseline.sha256 @@ -1,4 +1,4 @@ -83677b2666da2169511e5372f26c20c794001ec8acc7e9c2e1935043010c05d6 config-baseline.json -fa38a1bde88d8858ae0a11e7e17fa42fe107c34268b568f51877afbde81922e8 config-baseline.core.json +dae9ece3ac683a0bed2835d96d4373f65ab955b8b901df0bcdeedc565ade6ed6 config-baseline.json +7cd52f77b1e0ecb50d2119b4c21d6d51d336a0c752a44cbaf8df1efa9ef538c0 config-baseline.core.json d72032762ab46b99480b57deb81130a0ab5b1401189cfbaf4f7fef4a063a7f6c config-baseline.channel.json 0504c4f38d4c753fffeb465c93540d829df6b0fcef921eb0e2226ac16bdbbe07 config-baseline.plugin.json diff --git a/docs/gateway/configuration-reference.md b/docs/gateway/configuration-reference.md index a61a228d1ba..7ed3704c76f 100644 --- a/docs/gateway/configuration-reference.md +++ b/docs/gateway/configuration-reference.md @@ -472,7 +472,7 @@ See [Multiple Gateways](/gateway/multiple-gateways). reload: { mode: "hybrid", // off | restart | hot | hybrid debounceMs: 500, - deferralTimeoutMs: 300000, + deferralTimeoutMs: 0, }, }, } @@ -484,7 +484,7 @@ See [Multiple Gateways](/gateway/multiple-gateways). - `"hot"`: apply changes in-process without restarting. - `"hybrid"` (default): try hot reload first; fall back to restart if required. - `debounceMs`: debounce window in ms before config changes are applied (non-negative integer). -- `deferralTimeoutMs`: maximum time in ms to wait for in-flight operations before forcing a restart (default: `300000` = 5 minutes). +- `deferralTimeoutMs`: optional maximum time in ms to wait for in-flight operations before forcing a restart. Omit it or set `0` to wait indefinitely and log periodic still-pending warnings. --- diff --git a/src/agents/pi-embedded-runner/runs.ts b/src/agents/pi-embedded-runner/runs.ts index 0a266285e8b..4931c77e7a7 100644 --- a/src/agents/pi-embedded-runner/runs.ts +++ b/src/agents/pi-embedded-runner/runs.ts @@ -273,16 +273,22 @@ export function consumeEmbeddedRunModelSwitch( /** * Wait for active embedded runs to drain. * - * Used during restarts so in-flight compaction runs can release session write - * locks before the next lifecycle starts. + * Used during restarts so in-flight runs can release session write locks before + * the next lifecycle starts. If no timeout is passed, waits indefinitely. */ export async function waitForActiveEmbeddedRuns( - timeoutMs = 15_000, + timeoutMs?: number, opts?: { pollMs?: number }, ): Promise<{ drained: boolean }> { const pollMsRaw = opts?.pollMs ?? 250; const pollMs = Math.max(10, Math.floor(pollMsRaw)); - const maxWaitMs = Math.max(pollMs, Math.floor(timeoutMs)); + if (timeoutMs !== undefined && timeoutMs <= 0) { + return { drained: getActiveEmbeddedRunCount() === 0 }; + } + const maxWaitMs = + typeof timeoutMs === "number" && Number.isFinite(timeoutMs) + ? Math.max(pollMs, Math.floor(timeoutMs)) + : undefined; const startedAt = Date.now(); while (true) { @@ -290,7 +296,7 @@ export async function waitForActiveEmbeddedRuns( return { drained: true }; } const elapsedMs = Date.now() - startedAt; - if (elapsedMs >= maxWaitMs) { + if (maxWaitMs !== undefined && elapsedMs >= maxWaitMs) { diag.warn( `wait for active embedded runs timed out: activeRuns=${getActiveEmbeddedRunCount()} timeoutMs=${maxWaitMs}`, ); diff --git a/src/cli/daemon-cli/lifecycle-core.test.ts b/src/cli/daemon-cli/lifecycle-core.test.ts index 9d51a360f74..3fcdd4f9648 100644 --- a/src/cli/daemon-cli/lifecycle-core.test.ts +++ b/src/cli/daemon-cli/lifecycle-core.test.ts @@ -16,6 +16,8 @@ const loadConfig = vi.fn<() => OpenClawConfig>(() => ({ }, }, })); +const writeGatewayRestartIntentSync = vi.fn(); +const clearGatewayRestartIntentSync = vi.fn(); vi.mock("../../config/config.js", () => ({ loadConfig: () => loadConfig(), @@ -26,6 +28,11 @@ vi.mock("../../runtime.js", () => ({ defaultRuntime, })); +vi.mock("../../infra/restart.js", () => ({ + clearGatewayRestartIntentSync: () => clearGatewayRestartIntentSync(), + writeGatewayRestartIntentSync: (opts: unknown) => writeGatewayRestartIntentSync(opts), +})); + let runServiceRestart: typeof import("./lifecycle-core.js").runServiceRestart; let runServiceStart: typeof import("./lifecycle-core.js").runServiceStart; let runServiceStop: typeof import("./lifecycle-core.js").runServiceStop; @@ -92,6 +99,8 @@ describe("runServiceRestart token drift", () => { }, }); resetLifecycleServiceMocks(); + writeGatewayRestartIntentSync.mockClear(); + clearGatewayRestartIntentSync.mockClear(); service.readCommand.mockResolvedValue({ programArguments: [], environment: { OPENCLAW_GATEWAY_TOKEN: "service-token" }, @@ -182,6 +191,7 @@ describe("runServiceRestart token drift", () => { expect(loadConfig).not.toHaveBeenCalled(); expect(service.readCommand).not.toHaveBeenCalled(); + expect(writeGatewayRestartIntentSync).not.toHaveBeenCalled(); const payload = readJsonLog<{ warnings?: string[] }>(); expect(payload.warnings).toBeUndefined(); }); @@ -305,6 +315,27 @@ describe("runServiceRestart token drift", () => { expect(payload.message).toBe("restart scheduled, gateway will restart momentarily"); }); + it("writes a restart intent before service-manager restart", async () => { + service.readRuntime.mockResolvedValue({ status: "running", pid: 1234 }); + + await runServiceRestart(createServiceRunArgs()); + + expect(writeGatewayRestartIntentSync).toHaveBeenCalledWith({ targetPid: 1234 }); + expect(clearGatewayRestartIntentSync).not.toHaveBeenCalled(); + expect(service.restart).toHaveBeenCalledTimes(1); + }); + + it("clears restart intent when service-manager restart fails before signaling", async () => { + service.readRuntime.mockResolvedValue({ status: "running", pid: 1234 }); + writeGatewayRestartIntentSync.mockReturnValueOnce(true); + service.restart.mockRejectedValueOnce(new Error("launchctl failed before signaling")); + + await expect(runServiceRestart(createServiceRunArgs())).rejects.toThrow("__exit__:1"); + + expect(writeGatewayRestartIntentSync).toHaveBeenCalledWith({ targetPid: 1234 }); + expect(clearGatewayRestartIntentSync).toHaveBeenCalledOnce(); + }); + it("emits scheduled when service start routes through a scheduled restart", async () => { service.restart.mockResolvedValue({ outcome: "scheduled" }); diff --git a/src/cli/daemon-cli/lifecycle-core.ts b/src/cli/daemon-cli/lifecycle-core.ts index c8be0c8dd01..65f194d7746 100644 --- a/src/cli/daemon-cli/lifecycle-core.ts +++ b/src/cli/daemon-cli/lifecycle-core.ts @@ -9,6 +9,10 @@ import type { GatewayService } from "../../daemon/service.js"; import { renderSystemdUnavailableHints } from "../../daemon/systemd-hints.js"; import { isSystemdUserServiceAvailable } from "../../daemon/systemd.js"; import { isGatewaySecretRefUnavailableError } from "../../gateway/credentials.js"; +import { + clearGatewayRestartIntentSync, + writeGatewayRestartIntentSync, +} from "../../infra/restart.js"; import { isWSL } from "../../infra/wsl.js"; import { defaultRuntime } from "../../runtime.js"; import { resolveGatewayTokenForDriftCheck } from "./gateway-token-drift.js"; @@ -458,7 +462,21 @@ export async function runServiceRestart(params: { try { let restartResult: GatewayServiceRestartResult = { outcome: "completed" }; if (loaded) { - restartResult = await params.service.restart({ env: process.env, stdout }); + let wroteRestartIntent = false; + if (params.serviceNoun === "Gateway") { + const runtime = await params.service.readRuntime(process.env).catch(() => null); + wroteRestartIntent = writeGatewayRestartIntentSync({ + targetPid: runtime?.pid, + }); + } + try { + restartResult = await params.service.restart({ env: process.env, stdout }); + } catch (err) { + if (wroteRestartIntent) { + clearGatewayRestartIntentSync(); + } + throw err; + } } let restartStatus = describeGatewayServiceRestart(params.serviceNoun, restartResult); if (restartStatus.scheduled) { diff --git a/src/cli/gateway-cli/run-loop.test.ts b/src/cli/gateway-cli/run-loop.test.ts index 73a07429561..49ab1f53fed 100644 --- a/src/cli/gateway-cli/run-loop.test.ts +++ b/src/cli/gateway-cli/run-loop.test.ts @@ -6,6 +6,7 @@ const acquireGatewayLock = vi.fn(async (_opts?: { port?: number }) => ({ release: vi.fn(async () => {}), })); const consumeGatewaySigusr1RestartAuthorization = vi.fn(() => true); +const consumeGatewayRestartIntentSync = vi.fn(() => false); const isGatewaySigusr1RestartExternallyAllowed = vi.fn(() => false); const markGatewaySigusr1RestartHandled = vi.fn(); const scheduleGatewaySigusr1Restart = vi.fn((_opts?: { delayMs?: number; reason?: string }) => ({ @@ -19,7 +20,7 @@ const scheduleGatewaySigusr1Restart = vi.fn((_opts?: { delayMs?: number; reason? })); const getActiveTaskCount = vi.fn(() => 0); const markGatewayDraining = vi.fn(); -const waitForActiveTasks = vi.fn(async (_timeoutMs: number) => ({ drained: true })); +const waitForActiveTasks = vi.fn(async (_timeoutMs?: number) => ({ drained: true })); const resetAllLanes = vi.fn(); const restartGatewayProcessWithFreshPid = vi.fn< () => { mode: "spawned" | "supervised" | "disabled" | "failed"; pid?: number; detail?: string } @@ -28,7 +29,7 @@ const abortEmbeddedPiRun = vi.fn( (_sessionId?: string, _opts?: { mode?: "all" | "compacting" }) => false, ); const getActiveEmbeddedRunCount = vi.fn(() => 0); -const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs: number) => ({ drained: true })); +const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs?: number) => ({ drained: true })); const DRAIN_TIMEOUT_LOG = "drain timeout reached; proceeding with restart"; const loadConfig = vi.fn(() => ({ gateway: { @@ -49,6 +50,7 @@ vi.mock("../../infra/gateway-lock.js", () => ({ vi.mock("../../infra/restart.js", () => ({ consumeGatewaySigusr1RestartAuthorization: () => consumeGatewaySigusr1RestartAuthorization(), + consumeGatewayRestartIntentSync: () => consumeGatewayRestartIntentSync(), isGatewaySigusr1RestartExternallyAllowed: () => isGatewaySigusr1RestartExternallyAllowed(), markGatewaySigusr1RestartHandled: () => markGatewaySigusr1RestartHandled(), scheduleGatewaySigusr1Restart: (opts?: { delayMs?: number; reason?: string }) => @@ -62,7 +64,7 @@ vi.mock("../../infra/process-respawn.js", () => ({ vi.mock("../../process/command-queue.js", () => ({ getActiveTaskCount: () => getActiveTaskCount(), markGatewayDraining: () => markGatewayDraining(), - waitForActiveTasks: (timeoutMs: number) => waitForActiveTasks(timeoutMs), + waitForActiveTasks: (timeoutMs?: number) => waitForActiveTasks(timeoutMs), resetAllLanes: () => resetAllLanes(), })); @@ -70,7 +72,7 @@ vi.mock("../../agents/pi-embedded-runner/runs.js", () => ({ abortEmbeddedPiRun: (sessionId?: string, opts?: { mode?: "all" | "compacting" }) => abortEmbeddedPiRun(sessionId, opts), getActiveEmbeddedRunCount: () => getActiveEmbeddedRunCount(), - waitForActiveEmbeddedRuns: (timeoutMs: number) => waitForActiveEmbeddedRuns(timeoutMs), + waitForActiveEmbeddedRuns: (timeoutMs?: number) => waitForActiveEmbeddedRuns(timeoutMs), })); vi.mock("../../config/config.js", () => ({ @@ -226,6 +228,50 @@ describe("runGatewayLoop", () => { }); }); + it("treats SIGTERM with a restart intent as a draining restart", async () => { + vi.clearAllMocks(); + consumeGatewayRestartIntentSync.mockReturnValueOnce(true); + getActiveTaskCount.mockReturnValueOnce(1).mockReturnValue(0); + + await withIsolatedSignals(async ({ captureSignal }) => { + const closeFirst = vi.fn(async () => {}); + const closeSecond = vi.fn(async () => {}); + const { runtime, exited } = createRuntimeWithExitSignal(); + const start = vi + .fn() + .mockResolvedValueOnce({ close: closeFirst }) + .mockResolvedValueOnce({ close: closeSecond }); + const { runGatewayLoop } = await import("./run-loop.js"); + void runGatewayLoop({ + start: start as unknown as Parameters[0]["start"], + runtime: runtime as unknown as Parameters[0]["runtime"], + }); + await new Promise((resolve) => setImmediate(resolve)); + const sigterm = captureSignal("SIGTERM"); + const sigint = captureSignal("SIGINT"); + + sigterm(); + await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); + + expect(consumeGatewayRestartIntentSync).toHaveBeenCalledOnce(); + expect(markGatewayDraining).toHaveBeenCalledOnce(); + expect(waitForActiveTasks).toHaveBeenCalledWith(90_000); + expect(closeFirst).toHaveBeenCalledWith({ + reason: "gateway restarting", + restartExpectedMs: 1500, + }); + expect(start).toHaveBeenCalledTimes(2); + + sigint(); + await expect(exited).resolves.toBe(0); + expect(closeSecond).toHaveBeenCalledWith({ + reason: "gateway stopping", + restartExpectedMs: null, + }); + }); + }); + it("restarts after SIGUSR1 even when drain times out, and resets lanes for the new iteration", async () => { vi.clearAllMocks(); loadConfig.mockReturnValue({ diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index 2223b018a19..6922c061d5c 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -10,6 +10,7 @@ import { acquireGatewayLock } from "../../infra/gateway-lock.js"; import { restartGatewayProcessWithFreshPid } from "../../infra/process-respawn.js"; import { consumeGatewaySigusr1RestartAuthorization, + consumeGatewayRestartIntentSync, isGatewaySigusr1RestartExternallyAllowed, markGatewaySigusr1RestartHandled, scheduleGatewaySigusr1Restart, @@ -29,8 +30,10 @@ import type { RuntimeEnv } from "../../runtime.js"; const gatewayLog = createSubsystemLogger("gateway"); const LAUNCHD_SUPERVISED_RESTART_EXIT_DELAY_MS = 1500; const DEFAULT_RESTART_DRAIN_TIMEOUT_MS = 300_000; +const RESTART_DRAIN_STILL_PENDING_WARN_MS = 30_000; type GatewayRunSignalAction = "stop" | "restart"; +type RestartDrainTimeoutMs = number | undefined; export async function runGatewayLoop(params: { start: (params?: { @@ -125,12 +128,12 @@ export async function runGatewayLoop(params: { const SUPERVISOR_STOP_TIMEOUT_MS = 30_000; const SHUTDOWN_TIMEOUT_MS = SUPERVISOR_STOP_TIMEOUT_MS - 5_000; - const resolveRestartDrainTimeoutMs = () => { + const resolveRestartDrainTimeoutMs = (): RestartDrainTimeoutMs => { try { const timeoutMs = loadConfig().gateway?.reload?.deferralTimeoutMs; - return typeof timeoutMs === "number" && Number.isFinite(timeoutMs) && timeoutMs >= 0 + return typeof timeoutMs === "number" && Number.isFinite(timeoutMs) && timeoutMs > 0 ? timeoutMs - : DEFAULT_RESTART_DRAIN_TIMEOUT_MS; + : undefined; } catch { return DEFAULT_RESTART_DRAIN_TIMEOUT_MS; } @@ -146,20 +149,53 @@ export async function runGatewayLoop(params: { const restartDrainTimeoutMs = isRestart ? resolveRestartDrainTimeoutMs() : 0; gatewayLog.info(`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`); - // Allow extra time for draining active turns on restart. - const forceExitMs = isRestart - ? restartDrainTimeoutMs + SHUTDOWN_TIMEOUT_MS - : SHUTDOWN_TIMEOUT_MS; - const forceExitTimer = setTimeout(() => { - gatewayLog.error("shutdown timed out; exiting without full cleanup"); - writeStabilityBundle( - isRestart ? "gateway.restart_shutdown_timeout" : "gateway.stop_shutdown_timeout", - ); - // Keep the in-process watchdog below the supervisor stop budget so this - // path wins before launchd/systemd escalates to a hard kill. Exit - // non-zero on any timeout so supervised installs restart cleanly. - exitProcess(1); - }, forceExitMs); + let forceExitTimer: ReturnType | null = null; + const armForceExitTimer = (forceExitMs: number) => { + if (forceExitTimer) { + return; + } + forceExitTimer = setTimeout(() => { + gatewayLog.error("shutdown timed out; exiting without full cleanup"); + writeStabilityBundle( + isRestart ? "gateway.restart_shutdown_timeout" : "gateway.stop_shutdown_timeout", + ); + // Keep the in-process watchdog below the supervisor stop budget so this + // path wins before launchd/systemd escalates to a hard kill. Exit + // non-zero on any timeout so supervised installs restart cleanly. + exitProcess(1); + }, forceExitMs); + }; + const clearForceExitTimer = () => { + if (!forceExitTimer) { + return; + } + clearTimeout(forceExitTimer); + forceExitTimer = null; + }; + + if (!isRestart) { + armForceExitTimer(SHUTDOWN_TIMEOUT_MS); + } else if (restartDrainTimeoutMs !== undefined) { + // Allow extra time for draining active turns on explicitly capped restarts. + armForceExitTimer(restartDrainTimeoutMs + SHUTDOWN_TIMEOUT_MS); + } + + const formatRestartDrainBudget = () => + restartDrainTimeoutMs === undefined + ? "without a timeout" + : `with timeout ${restartDrainTimeoutMs}ms`; + const createStillPendingDrainLogger = () => + setInterval(() => { + gatewayLog.warn( + `still draining ${getActiveTaskCount()} active task(s) and ${getActiveEmbeddedRunCount()} active embedded run(s) before restart`, + ); + }, RESTART_DRAIN_STILL_PENDING_WARN_MS); + + const armCloseForceExitTimerForIndefiniteRestart = () => { + if (isRestart && restartDrainTimeoutMs === undefined) { + armForceExitTimer(SHUTDOWN_TIMEOUT_MS); + } + }; void (async () => { try { @@ -180,8 +216,9 @@ export async function runGatewayLoop(params: { if (activeTasks > 0 || activeRuns > 0) { gatewayLog.info( - `draining ${activeTasks} active task(s) and ${activeRuns} active embedded run(s) before restart (timeout ${restartDrainTimeoutMs}ms)`, + `draining ${activeTasks} active task(s) and ${activeRuns} active embedded run(s) before restart ${formatRestartDrainBudget()}`, ); + const stillPendingDrainLogger = createStillPendingDrainLogger(); const [tasksDrain, runsDrain] = await Promise.all([ activeTasks > 0 ? waitForActiveTasks(restartDrainTimeoutMs) @@ -189,7 +226,7 @@ export async function runGatewayLoop(params: { activeRuns > 0 ? waitForActiveEmbeddedRuns(restartDrainTimeoutMs) : Promise.resolve({ drained: true }), - ]); + ]).finally(() => clearInterval(stillPendingDrainLogger)); if (tasksDrain.drained && runsDrain.drained) { gatewayLog.info("all active work drained"); } else { @@ -201,6 +238,7 @@ export async function runGatewayLoop(params: { } } + armCloseForceExitTimerForIndefiniteRestart(); await server?.close({ reason: isRestart ? "gateway restarting" : "gateway stopping", restartExpectedMs: isRestart ? 1500 : null, @@ -208,7 +246,7 @@ export async function runGatewayLoop(params: { } catch (err) { gatewayLog.error(`shutdown error: ${String(err)}`); } finally { - clearTimeout(forceExitTimer); + clearForceExitTimer(); server = null; if (isRestart) { await handleRestartAfterServerClose(); @@ -221,7 +259,7 @@ export async function runGatewayLoop(params: { const onSigterm = () => { gatewayLog.info("signal SIGTERM received"); - request("stop", "SIGTERM"); + request(consumeGatewayRestartIntentSync() ? "restart" : "stop", "SIGTERM"); }; const onSigint = () => { gatewayLog.info("signal SIGINT received"); diff --git a/src/config/schema.base.generated.ts b/src/config/schema.base.generated.ts index 47f7571e591..baa381dc059 100644 --- a/src/config/schema.base.generated.ts +++ b/src/config/schema.base.generated.ts @@ -21711,7 +21711,7 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = { maximum: 9007199254740991, title: "Restart Deferral Timeout (ms)", description: - "Maximum time (ms) to wait for in-flight operations to complete before forcing a SIGUSR1 restart. Default: 300000 (5 minutes). Lower values risk aborting active subagent LLM calls.", + "Optional maximum time (ms) to wait for in-flight operations before forcing a restart. Omit or set 0 to wait indefinitely with periodic still-pending warnings. Lower positive values risk aborting active subagent LLM calls.", }, }, additionalProperties: false, @@ -24940,7 +24940,7 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = { }, "gateway.reload.deferralTimeoutMs": { label: "Restart Deferral Timeout (ms)", - help: "Maximum time (ms) to wait for in-flight operations to complete before forcing a SIGUSR1 restart. Default: 300000 (5 minutes). Lower values risk aborting active subagent LLM calls.", + help: "Optional maximum time (ms) to wait for in-flight operations before forcing a restart. Omit or set 0 to wait indefinitely with periodic still-pending warnings. Lower positive values risk aborting active subagent LLM calls.", tags: ["network", "reliability", "performance"], }, "gateway.nodes.browser.mode": { diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index f144255a01f..5664f2d83d2 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -466,7 +466,7 @@ export const FIELD_HELP: Record = { 'Controls how config edits are applied: "off" ignores live edits, "restart" always restarts, "hot" applies in-process, and "hybrid" tries hot then restarts if required. Keep "hybrid" for safest routine updates.', "gateway.reload.debounceMs": "Debounce window (ms) before applying config changes.", "gateway.reload.deferralTimeoutMs": - "Maximum time (ms) to wait for in-flight operations to complete before forcing a SIGUSR1 restart. Default: 300000 (5 minutes). Lower values risk aborting active subagent LLM calls.", + "Optional maximum time (ms) to wait for in-flight operations before forcing a restart. Omit or set 0 to wait indefinitely with periodic still-pending warnings. Lower positive values risk aborting active subagent LLM calls.", "gateway.nodes.browser.mode": 'Node browser routing ("auto" = pick single connected browser node, "manual" = require node param, "off" = disable).', "gateway.nodes.browser.node": "Pin browser routing to a specific node id or name (optional).", diff --git a/src/config/types.gateway.ts b/src/config/types.gateway.ts index f767d1edb5e..2e7527ffdc3 100644 --- a/src/config/types.gateway.ts +++ b/src/config/types.gateway.ts @@ -206,10 +206,11 @@ export type GatewayReloadConfig = { /** Debounce window for config reloads (ms). Default: 300. */ debounceMs?: number; /** - * Maximum time (ms) to wait for in-flight operations to complete before - * forcing a SIGUSR1 restart. Default: 300000 (5 minutes). - * Lower values risk aborting active subagent LLM calls. - * @see https://github.com/openclaw/openclaw/issues/47711 + * Optional maximum time (ms) to wait for in-flight operations to complete + * before forcing a restart. Absent or 0 waits indefinitely and logs periodic + * still-pending warnings. + * Lower positive values risk aborting active subagent LLM calls. + * @see https://github.com/openclaw/openclaw/issues/65485 */ deferralTimeoutMs?: number; }; diff --git a/src/gateway/server-reload-handlers.ts b/src/gateway/server-reload-handlers.ts index b0c88f031c8..34cde076626 100644 --- a/src/gateway/server-reload-handlers.ts +++ b/src/gateway/server-reload-handlers.ts @@ -297,6 +297,12 @@ export function createGatewayReloadHandlers(params: GatewayReloadHandlerParams) restartPending = false; params.logReload.info("all operations and replies completed; restarting gateway now"); }, + onStillPending: (_pending, elapsedMs) => { + const remaining = formatActiveDetails(getActiveCounts()); + params.logReload.warn( + `restart still deferred after ${elapsedMs}ms with ${remaining.join(", ")} active`, + ); + }, onTimeout: (_pending, elapsedMs) => { const remaining = formatActiveDetails(getActiveCounts()); restartPending = false; diff --git a/src/infra/infra-runtime.test.ts b/src/infra/infra-runtime.test.ts index 88e0dc7f5d2..8d015ed6fba 100644 --- a/src/infra/infra-runtime.test.ts +++ b/src/infra/infra-runtime.test.ts @@ -1,5 +1,10 @@ import os from "node:os"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { + clearConfigCache, + clearRuntimeConfigSnapshot, + setRuntimeConfigSnapshot, +} from "../config/config.js"; import { makeNetworkInterfacesSnapshot } from "../test-helpers/network-interfaces.js"; import { __testing, @@ -22,10 +27,12 @@ describe("infra runtime", () => { }); afterEach(async () => { + __testing.resetSigusr1State(); + clearRuntimeConfigSnapshot(); + clearConfigCache(); await vi.runOnlyPendingTimersAsync(); vi.useRealTimers(); vi.restoreAllMocks(); - __testing.resetSigusr1State(); }); } @@ -341,7 +348,7 @@ describe("infra runtime", () => { } }); - it("emits SIGUSR1 after deferral timeout even if still pending", async () => { + it("keeps SIGUSR1 deferred by default while work is still pending", async () => { const emitSpy = vi.spyOn(process, "emit"); const handler = () => {}; process.on("SIGUSR1", handler); @@ -353,8 +360,28 @@ describe("infra runtime", () => { await vi.advanceTimersByTimeAsync(0); expect(emitSpy).not.toHaveBeenCalledWith("SIGUSR1"); - // Advance past the 5-minute max deferral wait + // No default max deferral wait; active turns should not be killed just + // because a config-triggered restart has been pending for 5 minutes. await vi.advanceTimersByTimeAsync(300_000); + expect(emitSpy).not.toHaveBeenCalledWith("SIGUSR1"); + } finally { + process.removeListener("SIGUSR1", handler); + } + }); + + it("emits SIGUSR1 after explicit deferral timeout even if still pending", async () => { + const emitSpy = vi.spyOn(process, "emit"); + const handler = () => {}; + process.on("SIGUSR1", handler); + try { + setRuntimeConfigSnapshot({ gateway: { reload: { deferralTimeoutMs: 1_000 } } }); + setPreRestartDeferralCheck(() => 5); // always pending + scheduleGatewaySigusr1Restart({ delayMs: 0 }); + + await vi.advanceTimersByTimeAsync(0); + expect(emitSpy).not.toHaveBeenCalledWith("SIGUSR1"); + + await vi.advanceTimersByTimeAsync(1_000); expect(emitSpy).toHaveBeenCalledWith("SIGUSR1"); } finally { process.removeListener("SIGUSR1", handler); diff --git a/src/infra/restart-intent.test.ts b/src/infra/restart-intent.test.ts new file mode 100644 index 00000000000..21d6d73bd80 --- /dev/null +++ b/src/infra/restart-intent.test.ts @@ -0,0 +1,79 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, it } from "vitest"; +import { consumeGatewayRestartIntentSync, writeGatewayRestartIntentSync } from "./restart.js"; + +const tempDirs: string[] = []; + +function createIntentEnv(): NodeJS.ProcessEnv { + const dir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-restart-intent-")); + tempDirs.push(dir); + return { + ...process.env, + OPENCLAW_STATE_DIR: dir, + }; +} + +function intentPath(env: NodeJS.ProcessEnv): string { + return path.join(env.OPENCLAW_STATE_DIR ?? "", "gateway-restart-intent.json"); +} + +describe("gateway restart intent", () => { + afterEach(() => { + for (const dir of tempDirs.splice(0)) { + fs.rmSync(dir, { force: true, recursive: true }); + } + }); + + it("consumes a fresh intent for the current process", () => { + const env = createIntentEnv(); + + expect(writeGatewayRestartIntentSync({ env, targetPid: process.pid })).toBe(true); + + expect(consumeGatewayRestartIntentSync(env)).toBe(true); + expect(fs.existsSync(intentPath(env))).toBe(false); + }); + + it("rejects an intent for a different process", () => { + const env = createIntentEnv(); + + expect(writeGatewayRestartIntentSync({ env, targetPid: process.pid + 1 })).toBe(true); + + expect(consumeGatewayRestartIntentSync(env)).toBe(false); + expect(fs.existsSync(intentPath(env))).toBe(false); + }); + + it("rejects oversized intent files before parsing", () => { + const env = createIntentEnv(); + fs.writeFileSync(intentPath(env), "x".repeat(2048), { encoding: "utf8", mode: 0o600 }); + + expect(consumeGatewayRestartIntentSync(env)).toBe(false); + expect(fs.existsSync(intentPath(env))).toBe(false); + }); + + it("writes intent files with owner-only permissions", () => { + const env = createIntentEnv(); + + expect(writeGatewayRestartIntentSync({ env, targetPid: process.pid })).toBe(true); + + expect(fs.statSync(intentPath(env)).mode & 0o777).toBe(0o600); + }); + + it("does not follow an existing intent-path symlink when writing", () => { + const env = createIntentEnv(); + const targetPath = path.join(env.OPENCLAW_STATE_DIR ?? "", "attacker-target.txt"); + fs.writeFileSync(targetPath, "keep", "utf8"); + try { + fs.symlinkSync(targetPath, intentPath(env)); + } catch { + return; + } + + expect(writeGatewayRestartIntentSync({ env, targetPid: process.pid })).toBe(true); + + expect(fs.readFileSync(targetPath, "utf8")).toBe("keep"); + expect(fs.lstatSync(intentPath(env)).isSymbolicLink()).toBe(false); + expect(consumeGatewayRestartIntentSync(env)).toBe(true); + }); +}); diff --git a/src/infra/restart.deferral-timeout.test.ts b/src/infra/restart.deferral-timeout.test.ts index 167fe95ccdc..fb898bae548 100644 --- a/src/infra/restart.deferral-timeout.test.ts +++ b/src/infra/restart.deferral-timeout.test.ts @@ -16,10 +16,11 @@ describe("deferGatewayRestartUntilIdle timeout", () => { process.removeAllListeners("SIGUSR1"); }); - it("uses default 5-minute timeout when maxWaitMs is not specified", () => { + it("waits indefinitely when maxWaitMs is not specified", () => { const hooks: RestartDeferralHooks = { onTimeout: vi.fn(), onReady: vi.fn(), + onStillPending: vi.fn(), }; // Always return 1 pending item to prevent draining @@ -28,13 +29,12 @@ describe("deferGatewayRestartUntilIdle timeout", () => { hooks, }); - // Advance to just before 5 minutes — should NOT have timed out yet - vi.advanceTimersByTime(299_999); + vi.advanceTimersByTime(300_000); expect(hooks.onTimeout).not.toHaveBeenCalled(); + expect(hooks.onStillPending).toHaveBeenCalled(); - // Advance past 5 minutes — should time out - vi.advanceTimersByTime(1); - expect(hooks.onTimeout).toHaveBeenCalledOnce(); + vi.advanceTimersByTime(300_000); + expect(hooks.onTimeout).not.toHaveBeenCalled(); expect(hooks.onReady).not.toHaveBeenCalled(); }); diff --git a/src/infra/restart.ts b/src/infra/restart.ts index fa80eb211e0..0155e53aa32 100644 --- a/src/infra/restart.ts +++ b/src/infra/restart.ts @@ -1,7 +1,9 @@ import { spawnSync } from "node:child_process"; +import fs from "node:fs"; import os from "node:os"; import path from "node:path"; import { getRuntimeConfig } from "../config/config.js"; +import { resolveStateDir } from "../config/paths.js"; import { resolveGatewayLaunchAgentLabel, resolveGatewaySystemdServiceName, @@ -16,11 +18,12 @@ export type { RestartAttempt } from "./restart.types.js"; const SPAWN_TIMEOUT_MS = 2000; const SIGUSR1_AUTH_GRACE_MS = 5000; const DEFAULT_DEFERRAL_POLL_MS = 500; -// Default to 5 minutes to avoid aborting in-flight subagent LLM calls. -// Configurable via gateway.reload.deferralTimeoutMs. -const DEFAULT_DEFERRAL_MAX_WAIT_MS = 300_000; +const DEFAULT_DEFERRAL_STILL_PENDING_WARN_MS = 30_000; const RESTART_COOLDOWN_MS = 30_000; const LAUNCHCTL_ALREADY_LOADED_EXIT_CODE = 37; +const GATEWAY_RESTART_INTENT_FILENAME = "gateway-restart-intent.json"; +const GATEWAY_RESTART_INTENT_TTL_MS = 60_000; +const GATEWAY_RESTART_INTENT_MAX_BYTES = 1024; const restartLog = createSubsystemLogger("restart"); @@ -70,6 +73,127 @@ export type RestartAuditInfo = { changedPaths?: string[]; }; +type GatewayRestartIntentPayload = { + kind: "gateway-restart"; + pid: number; + createdAt: number; +}; + +function resolveGatewayRestartIntentPath(env: NodeJS.ProcessEnv = process.env): string { + return path.join(resolveStateDir(env), GATEWAY_RESTART_INTENT_FILENAME); +} + +function unlinkGatewayRestartIntentFileSync(intentPath: string): boolean { + try { + const stat = fs.lstatSync(intentPath); + if (!stat.isFile() || stat.nlink > 1) { + return false; + } + fs.unlinkSync(intentPath); + return true; + } catch { + return false; + } +} + +function normalizeRestartIntentPid(pid: number | undefined): number | null { + return typeof pid === "number" && Number.isSafeInteger(pid) && pid > 0 ? pid : null; +} + +export function writeGatewayRestartIntentSync(opts: { + env?: NodeJS.ProcessEnv; + targetPid?: number; +}): boolean { + const targetPid = normalizeRestartIntentPid(opts.targetPid); + if (targetPid === null) { + return false; + } + const env = opts.env ?? process.env; + let tmpPath: string | undefined; + try { + const intentPath = resolveGatewayRestartIntentPath(env); + fs.mkdirSync(path.dirname(intentPath), { recursive: true }); + const payload: GatewayRestartIntentPayload = { + kind: "gateway-restart", + pid: targetPid, + createdAt: Date.now(), + }; + tmpPath = path.join( + path.dirname(intentPath), + `.${path.basename(intentPath)}.${process.pid}.${Date.now()}.${Math.random() + .toString(16) + .slice(2)}.tmp`, + ); + let fd: number | undefined; + try { + fd = fs.openSync(tmpPath, "wx", 0o600); + fs.writeFileSync(fd, `${JSON.stringify(payload)}\n`, "utf8"); + } finally { + if (fd !== undefined) { + fs.closeSync(fd); + } + } + fs.renameSync(tmpPath, intentPath); + return true; + } catch (err) { + if (tmpPath) { + unlinkGatewayRestartIntentFileSync(tmpPath); + } + restartLog.warn(`failed to write gateway restart intent: ${String(err)}`); + return false; + } +} + +export function clearGatewayRestartIntentSync(env: NodeJS.ProcessEnv = process.env): void { + unlinkGatewayRestartIntentFileSync(resolveGatewayRestartIntentPath(env)); +} + +function parseGatewayRestartIntent(raw: string): GatewayRestartIntentPayload | null { + try { + const parsed = JSON.parse(raw) as Partial; + if ( + parsed.kind === "gateway-restart" && + typeof parsed.pid === "number" && + Number.isFinite(parsed.pid) && + typeof parsed.createdAt === "number" && + Number.isFinite(parsed.createdAt) + ) { + return parsed as GatewayRestartIntentPayload; + } + } catch { + return null; + } + return null; +} + +export function consumeGatewayRestartIntentSync( + env: NodeJS.ProcessEnv = process.env, + now = Date.now(), +): boolean { + const intentPath = resolveGatewayRestartIntentPath(env); + let raw: string; + try { + const stat = fs.lstatSync(intentPath); + if (!stat.isFile() || stat.size > GATEWAY_RESTART_INTENT_MAX_BYTES) { + return false; + } + raw = fs.readFileSync(intentPath, "utf8"); + } catch { + return false; + } finally { + clearGatewayRestartIntentSync(env); + } + const payload = parseGatewayRestartIntent(raw); + if (!payload) { + return false; + } + if (payload.pid !== process.pid) { + return false; + } + const ageMs = now - payload.createdAt; + return ageMs >= 0 && ageMs <= GATEWAY_RESTART_INTENT_TTL_MS; +} + function summarizeChangedPaths(paths: string[] | undefined, maxPaths = 6): string | null { if (!Array.isArray(paths) || paths.length === 0) { return null; @@ -197,6 +321,7 @@ export function markGatewaySigusr1RestartHandled(): void { export type RestartDeferralHooks = { onDeferring?: (pending: number) => void; + onStillPending?: (pending: number, elapsedMs: number) => void; onReady?: () => void; onTimeout?: (pending: number, elapsedMs: number) => void; onCheckError?: (err: unknown) => void; @@ -246,7 +371,8 @@ async function emitPreparedGatewayRestart(hooks?: RestartEmitHooks): Promise 0 + ? Math.max(pollMs, Math.floor(opts.maxWaitMs)) + : undefined; let pending: number; try { @@ -277,6 +405,7 @@ export function deferGatewayRestartUntilIdle(opts: { opts.hooks?.onDeferring?.(pending); const startedAt = Date.now(); + let nextStillPendingAt = startedAt + DEFAULT_DEFERRAL_STILL_PENDING_WARN_MS; const poll = setInterval(() => { let current: number; try { @@ -296,7 +425,11 @@ export function deferGatewayRestartUntilIdle(opts: { return; } const elapsedMs = Date.now() - startedAt; - if (elapsedMs >= maxWaitMs) { + if (Date.now() >= nextStillPendingAt) { + opts.hooks?.onStillPending?.(current, elapsedMs); + nextStillPendingAt = Date.now() + DEFAULT_DEFERRAL_STILL_PENDING_WARN_MS; + } + if (maxWaitMs !== undefined && elapsedMs >= maxWaitMs) { clearInterval(poll); activeDeferralPolls.delete(poll); opts.hooks?.onTimeout?.(current, elapsedMs); diff --git a/src/process/command-queue.ts b/src/process/command-queue.ts index b35dff84212..fd547f1f13d 100644 --- a/src/process/command-queue.ts +++ b/src/process/command-queue.ts @@ -373,12 +373,13 @@ export function getActiveTaskCount(): number { /** * Wait for all currently active tasks across all lanes to finish. * Polls at a short interval; resolves when no tasks are active or - * when `timeoutMs` elapses (whichever comes first). + * when `timeoutMs` elapses (whichever comes first). If no timeout is passed, + * waits indefinitely for the active set captured at call time. * * New tasks enqueued after this call are ignored — only tasks that are * already executing are waited on. */ -export function waitForActiveTasks(timeoutMs: number): Promise<{ drained: boolean }> { +export function waitForActiveTasks(timeoutMs?: number): Promise<{ drained: boolean }> { const queueState = getQueueState(); const activeAtStart = new Set(); for (const state of queueState.lanes.values()) { @@ -390,7 +391,7 @@ export function waitForActiveTasks(timeoutMs: number): Promise<{ drained: boolea if (activeAtStart.size === 0) { return Promise.resolve({ drained: true }); } - if (timeoutMs <= 0) { + if (timeoutMs !== undefined && timeoutMs <= 0) { return Promise.resolve({ drained: false }); } @@ -399,9 +400,11 @@ export function waitForActiveTasks(timeoutMs: number): Promise<{ drained: boolea activeTaskIds: activeAtStart, resolve, }; - waiter.timeout = setTimeout(() => { - resolveActiveTaskWaiter(waiter, { drained: false }); - }, timeoutMs); + if (timeoutMs !== undefined) { + waiter.timeout = setTimeout(() => { + resolveActiveTaskWaiter(waiter, { drained: false }); + }, timeoutMs); + } queueState.activeTaskWaiters.add(waiter); notifyActiveTaskWaiters(); });