fix(gateway): preserve restart drain for active runs

Fixes https://github.com/openclaw/openclaw/issues/65485
This commit is contained in:
Vincent Koc
2026-04-25 01:35:47 -07:00
committed by GitHub
parent 734748d4f4
commit ec1f72b6c5
17 changed files with 453 additions and 64 deletions

View File

@@ -131,6 +131,7 @@ Docs: https://docs.openclaw.ai
- macOS Gateway: wait for launchd to reload the exited Gateway LaunchAgent before bootstrapping repair fallback, preventing config-triggered restarts from leaving the service not loaded. Fixes #45178. Thanks @vincentkoc.
- macOS Gateway: tolerate launchctl bootstrap's already-loaded exit during restart fallback and use non-killing kickstart after bootstrap, avoiding a second race that can unload the LaunchAgent. Fixes #41934. Thanks @zerone0x.
- macOS Gateway: rewrite stale LaunchAgent plists before restart fallback bootstrap, matching install repair behavior when `gateway restart` has to re-register launchd. Thanks @maybegeeker.
- Gateway/reload: wait indefinitely by default for active operations before restart-required reloads and preserve that drain path when service-manager restarts arrive as SIGTERM, preventing in-flight agent runs from being killed mid-turn. Fixes #65485. Thanks @rijhsinghani.
- TTS/hooks: preserve audio-only TTS transcripts for `message_sending` and `message_sent` hooks without rendering the transcript as a media caption. Thanks @zqchris.
- WhatsApp/TTS: preserve `audioAsVoice` through shared media payload sends and the WhatsApp outbound adapter, so `[[audio_as_voice]]` reply payloads keep their voice-note intent when routed through `sendPayload`. Fixes #66053. Thanks @masatohoshino.
- Control UI/WebChat: hide heartbeat prompts, `HEARTBEAT_OK` acknowledgments, and internal-only runtime context turns from visible chat history while leaving the underlying transcript intact. Fixes #71381. Thanks @gerald1950ggg-ai.

View File

@@ -1,4 +1,4 @@
83677b2666da2169511e5372f26c20c794001ec8acc7e9c2e1935043010c05d6 config-baseline.json
fa38a1bde88d8858ae0a11e7e17fa42fe107c34268b568f51877afbde81922e8 config-baseline.core.json
dae9ece3ac683a0bed2835d96d4373f65ab955b8b901df0bcdeedc565ade6ed6 config-baseline.json
7cd52f77b1e0ecb50d2119b4c21d6d51d336a0c752a44cbaf8df1efa9ef538c0 config-baseline.core.json
d72032762ab46b99480b57deb81130a0ab5b1401189cfbaf4f7fef4a063a7f6c config-baseline.channel.json
0504c4f38d4c753fffeb465c93540d829df6b0fcef921eb0e2226ac16bdbbe07 config-baseline.plugin.json

View File

@@ -472,7 +472,7 @@ See [Multiple Gateways](/gateway/multiple-gateways).
reload: {
mode: "hybrid", // off | restart | hot | hybrid
debounceMs: 500,
deferralTimeoutMs: 300000,
deferralTimeoutMs: 0,
},
},
}
@@ -484,7 +484,7 @@ See [Multiple Gateways](/gateway/multiple-gateways).
- `"hot"`: apply changes in-process without restarting.
- `"hybrid"` (default): try hot reload first; fall back to restart if required.
- `debounceMs`: debounce window in ms before config changes are applied (non-negative integer).
- `deferralTimeoutMs`: maximum time in ms to wait for in-flight operations before forcing a restart (default: `300000` = 5 minutes).
- `deferralTimeoutMs`: optional maximum time in ms to wait for in-flight operations before forcing a restart. Omit it or set `0` to wait indefinitely and log periodic still-pending warnings.
---

View File

@@ -273,16 +273,22 @@ export function consumeEmbeddedRunModelSwitch(
/**
* Wait for active embedded runs to drain.
*
* Used during restarts so in-flight compaction runs can release session write
* locks before the next lifecycle starts.
* Used during restarts so in-flight runs can release session write locks before
* the next lifecycle starts. If no timeout is passed, waits indefinitely.
*/
export async function waitForActiveEmbeddedRuns(
timeoutMs = 15_000,
timeoutMs?: number,
opts?: { pollMs?: number },
): Promise<{ drained: boolean }> {
const pollMsRaw = opts?.pollMs ?? 250;
const pollMs = Math.max(10, Math.floor(pollMsRaw));
const maxWaitMs = Math.max(pollMs, Math.floor(timeoutMs));
if (timeoutMs !== undefined && timeoutMs <= 0) {
return { drained: getActiveEmbeddedRunCount() === 0 };
}
const maxWaitMs =
typeof timeoutMs === "number" && Number.isFinite(timeoutMs)
? Math.max(pollMs, Math.floor(timeoutMs))
: undefined;
const startedAt = Date.now();
while (true) {
@@ -290,7 +296,7 @@ export async function waitForActiveEmbeddedRuns(
return { drained: true };
}
const elapsedMs = Date.now() - startedAt;
if (elapsedMs >= maxWaitMs) {
if (maxWaitMs !== undefined && elapsedMs >= maxWaitMs) {
diag.warn(
`wait for active embedded runs timed out: activeRuns=${getActiveEmbeddedRunCount()} timeoutMs=${maxWaitMs}`,
);

View File

@@ -16,6 +16,8 @@ const loadConfig = vi.fn<() => OpenClawConfig>(() => ({
},
},
}));
const writeGatewayRestartIntentSync = vi.fn();
const clearGatewayRestartIntentSync = vi.fn();
vi.mock("../../config/config.js", () => ({
loadConfig: () => loadConfig(),
@@ -26,6 +28,11 @@ vi.mock("../../runtime.js", () => ({
defaultRuntime,
}));
vi.mock("../../infra/restart.js", () => ({
clearGatewayRestartIntentSync: () => clearGatewayRestartIntentSync(),
writeGatewayRestartIntentSync: (opts: unknown) => writeGatewayRestartIntentSync(opts),
}));
let runServiceRestart: typeof import("./lifecycle-core.js").runServiceRestart;
let runServiceStart: typeof import("./lifecycle-core.js").runServiceStart;
let runServiceStop: typeof import("./lifecycle-core.js").runServiceStop;
@@ -92,6 +99,8 @@ describe("runServiceRestart token drift", () => {
},
});
resetLifecycleServiceMocks();
writeGatewayRestartIntentSync.mockClear();
clearGatewayRestartIntentSync.mockClear();
service.readCommand.mockResolvedValue({
programArguments: [],
environment: { OPENCLAW_GATEWAY_TOKEN: "service-token" },
@@ -182,6 +191,7 @@ describe("runServiceRestart token drift", () => {
expect(loadConfig).not.toHaveBeenCalled();
expect(service.readCommand).not.toHaveBeenCalled();
expect(writeGatewayRestartIntentSync).not.toHaveBeenCalled();
const payload = readJsonLog<{ warnings?: string[] }>();
expect(payload.warnings).toBeUndefined();
});
@@ -305,6 +315,27 @@ describe("runServiceRestart token drift", () => {
expect(payload.message).toBe("restart scheduled, gateway will restart momentarily");
});
it("writes a restart intent before service-manager restart", async () => {
service.readRuntime.mockResolvedValue({ status: "running", pid: 1234 });
await runServiceRestart(createServiceRunArgs());
expect(writeGatewayRestartIntentSync).toHaveBeenCalledWith({ targetPid: 1234 });
expect(clearGatewayRestartIntentSync).not.toHaveBeenCalled();
expect(service.restart).toHaveBeenCalledTimes(1);
});
it("clears restart intent when service-manager restart fails before signaling", async () => {
service.readRuntime.mockResolvedValue({ status: "running", pid: 1234 });
writeGatewayRestartIntentSync.mockReturnValueOnce(true);
service.restart.mockRejectedValueOnce(new Error("launchctl failed before signaling"));
await expect(runServiceRestart(createServiceRunArgs())).rejects.toThrow("__exit__:1");
expect(writeGatewayRestartIntentSync).toHaveBeenCalledWith({ targetPid: 1234 });
expect(clearGatewayRestartIntentSync).toHaveBeenCalledOnce();
});
it("emits scheduled when service start routes through a scheduled restart", async () => {
service.restart.mockResolvedValue({ outcome: "scheduled" });

View File

@@ -9,6 +9,10 @@ import type { GatewayService } from "../../daemon/service.js";
import { renderSystemdUnavailableHints } from "../../daemon/systemd-hints.js";
import { isSystemdUserServiceAvailable } from "../../daemon/systemd.js";
import { isGatewaySecretRefUnavailableError } from "../../gateway/credentials.js";
import {
clearGatewayRestartIntentSync,
writeGatewayRestartIntentSync,
} from "../../infra/restart.js";
import { isWSL } from "../../infra/wsl.js";
import { defaultRuntime } from "../../runtime.js";
import { resolveGatewayTokenForDriftCheck } from "./gateway-token-drift.js";
@@ -458,7 +462,21 @@ export async function runServiceRestart(params: {
try {
let restartResult: GatewayServiceRestartResult = { outcome: "completed" };
if (loaded) {
restartResult = await params.service.restart({ env: process.env, stdout });
let wroteRestartIntent = false;
if (params.serviceNoun === "Gateway") {
const runtime = await params.service.readRuntime(process.env).catch(() => null);
wroteRestartIntent = writeGatewayRestartIntentSync({
targetPid: runtime?.pid,
});
}
try {
restartResult = await params.service.restart({ env: process.env, stdout });
} catch (err) {
if (wroteRestartIntent) {
clearGatewayRestartIntentSync();
}
throw err;
}
}
let restartStatus = describeGatewayServiceRestart(params.serviceNoun, restartResult);
if (restartStatus.scheduled) {

View File

@@ -6,6 +6,7 @@ const acquireGatewayLock = vi.fn(async (_opts?: { port?: number }) => ({
release: vi.fn(async () => {}),
}));
const consumeGatewaySigusr1RestartAuthorization = vi.fn(() => true);
const consumeGatewayRestartIntentSync = vi.fn(() => false);
const isGatewaySigusr1RestartExternallyAllowed = vi.fn(() => false);
const markGatewaySigusr1RestartHandled = vi.fn();
const scheduleGatewaySigusr1Restart = vi.fn((_opts?: { delayMs?: number; reason?: string }) => ({
@@ -19,7 +20,7 @@ const scheduleGatewaySigusr1Restart = vi.fn((_opts?: { delayMs?: number; reason?
}));
const getActiveTaskCount = vi.fn(() => 0);
const markGatewayDraining = vi.fn();
const waitForActiveTasks = vi.fn(async (_timeoutMs: number) => ({ drained: true }));
const waitForActiveTasks = vi.fn(async (_timeoutMs?: number) => ({ drained: true }));
const resetAllLanes = vi.fn();
const restartGatewayProcessWithFreshPid = vi.fn<
() => { mode: "spawned" | "supervised" | "disabled" | "failed"; pid?: number; detail?: string }
@@ -28,7 +29,7 @@ const abortEmbeddedPiRun = vi.fn(
(_sessionId?: string, _opts?: { mode?: "all" | "compacting" }) => false,
);
const getActiveEmbeddedRunCount = vi.fn(() => 0);
const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs: number) => ({ drained: true }));
const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs?: number) => ({ drained: true }));
const DRAIN_TIMEOUT_LOG = "drain timeout reached; proceeding with restart";
const loadConfig = vi.fn(() => ({
gateway: {
@@ -49,6 +50,7 @@ vi.mock("../../infra/gateway-lock.js", () => ({
vi.mock("../../infra/restart.js", () => ({
consumeGatewaySigusr1RestartAuthorization: () => consumeGatewaySigusr1RestartAuthorization(),
consumeGatewayRestartIntentSync: () => consumeGatewayRestartIntentSync(),
isGatewaySigusr1RestartExternallyAllowed: () => isGatewaySigusr1RestartExternallyAllowed(),
markGatewaySigusr1RestartHandled: () => markGatewaySigusr1RestartHandled(),
scheduleGatewaySigusr1Restart: (opts?: { delayMs?: number; reason?: string }) =>
@@ -62,7 +64,7 @@ vi.mock("../../infra/process-respawn.js", () => ({
vi.mock("../../process/command-queue.js", () => ({
getActiveTaskCount: () => getActiveTaskCount(),
markGatewayDraining: () => markGatewayDraining(),
waitForActiveTasks: (timeoutMs: number) => waitForActiveTasks(timeoutMs),
waitForActiveTasks: (timeoutMs?: number) => waitForActiveTasks(timeoutMs),
resetAllLanes: () => resetAllLanes(),
}));
@@ -70,7 +72,7 @@ vi.mock("../../agents/pi-embedded-runner/runs.js", () => ({
abortEmbeddedPiRun: (sessionId?: string, opts?: { mode?: "all" | "compacting" }) =>
abortEmbeddedPiRun(sessionId, opts),
getActiveEmbeddedRunCount: () => getActiveEmbeddedRunCount(),
waitForActiveEmbeddedRuns: (timeoutMs: number) => waitForActiveEmbeddedRuns(timeoutMs),
waitForActiveEmbeddedRuns: (timeoutMs?: number) => waitForActiveEmbeddedRuns(timeoutMs),
}));
vi.mock("../../config/config.js", () => ({
@@ -226,6 +228,50 @@ describe("runGatewayLoop", () => {
});
});
it("treats SIGTERM with a restart intent as a draining restart", async () => {
vi.clearAllMocks();
consumeGatewayRestartIntentSync.mockReturnValueOnce(true);
getActiveTaskCount.mockReturnValueOnce(1).mockReturnValue(0);
await withIsolatedSignals(async ({ captureSignal }) => {
const closeFirst = vi.fn(async () => {});
const closeSecond = vi.fn(async () => {});
const { runtime, exited } = createRuntimeWithExitSignal();
const start = vi
.fn()
.mockResolvedValueOnce({ close: closeFirst })
.mockResolvedValueOnce({ close: closeSecond });
const { runGatewayLoop } = await import("./run-loop.js");
void runGatewayLoop({
start: start as unknown as Parameters<typeof runGatewayLoop>[0]["start"],
runtime: runtime as unknown as Parameters<typeof runGatewayLoop>[0]["runtime"],
});
await new Promise<void>((resolve) => setImmediate(resolve));
const sigterm = captureSignal("SIGTERM");
const sigint = captureSignal("SIGINT");
sigterm();
await new Promise<void>((resolve) => setImmediate(resolve));
await new Promise<void>((resolve) => setImmediate(resolve));
expect(consumeGatewayRestartIntentSync).toHaveBeenCalledOnce();
expect(markGatewayDraining).toHaveBeenCalledOnce();
expect(waitForActiveTasks).toHaveBeenCalledWith(90_000);
expect(closeFirst).toHaveBeenCalledWith({
reason: "gateway restarting",
restartExpectedMs: 1500,
});
expect(start).toHaveBeenCalledTimes(2);
sigint();
await expect(exited).resolves.toBe(0);
expect(closeSecond).toHaveBeenCalledWith({
reason: "gateway stopping",
restartExpectedMs: null,
});
});
});
it("restarts after SIGUSR1 even when drain times out, and resets lanes for the new iteration", async () => {
vi.clearAllMocks();
loadConfig.mockReturnValue({

View File

@@ -10,6 +10,7 @@ import { acquireGatewayLock } from "../../infra/gateway-lock.js";
import { restartGatewayProcessWithFreshPid } from "../../infra/process-respawn.js";
import {
consumeGatewaySigusr1RestartAuthorization,
consumeGatewayRestartIntentSync,
isGatewaySigusr1RestartExternallyAllowed,
markGatewaySigusr1RestartHandled,
scheduleGatewaySigusr1Restart,
@@ -29,8 +30,10 @@ import type { RuntimeEnv } from "../../runtime.js";
const gatewayLog = createSubsystemLogger("gateway");
const LAUNCHD_SUPERVISED_RESTART_EXIT_DELAY_MS = 1500;
const DEFAULT_RESTART_DRAIN_TIMEOUT_MS = 300_000;
const RESTART_DRAIN_STILL_PENDING_WARN_MS = 30_000;
type GatewayRunSignalAction = "stop" | "restart";
type RestartDrainTimeoutMs = number | undefined;
export async function runGatewayLoop(params: {
start: (params?: {
@@ -125,12 +128,12 @@ export async function runGatewayLoop(params: {
const SUPERVISOR_STOP_TIMEOUT_MS = 30_000;
const SHUTDOWN_TIMEOUT_MS = SUPERVISOR_STOP_TIMEOUT_MS - 5_000;
const resolveRestartDrainTimeoutMs = () => {
const resolveRestartDrainTimeoutMs = (): RestartDrainTimeoutMs => {
try {
const timeoutMs = loadConfig().gateway?.reload?.deferralTimeoutMs;
return typeof timeoutMs === "number" && Number.isFinite(timeoutMs) && timeoutMs >= 0
return typeof timeoutMs === "number" && Number.isFinite(timeoutMs) && timeoutMs > 0
? timeoutMs
: DEFAULT_RESTART_DRAIN_TIMEOUT_MS;
: undefined;
} catch {
return DEFAULT_RESTART_DRAIN_TIMEOUT_MS;
}
@@ -146,20 +149,53 @@ export async function runGatewayLoop(params: {
const restartDrainTimeoutMs = isRestart ? resolveRestartDrainTimeoutMs() : 0;
gatewayLog.info(`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`);
// Allow extra time for draining active turns on restart.
const forceExitMs = isRestart
? restartDrainTimeoutMs + SHUTDOWN_TIMEOUT_MS
: SHUTDOWN_TIMEOUT_MS;
const forceExitTimer = setTimeout(() => {
gatewayLog.error("shutdown timed out; exiting without full cleanup");
writeStabilityBundle(
isRestart ? "gateway.restart_shutdown_timeout" : "gateway.stop_shutdown_timeout",
);
// Keep the in-process watchdog below the supervisor stop budget so this
// path wins before launchd/systemd escalates to a hard kill. Exit
// non-zero on any timeout so supervised installs restart cleanly.
exitProcess(1);
}, forceExitMs);
let forceExitTimer: ReturnType<typeof setTimeout> | null = null;
const armForceExitTimer = (forceExitMs: number) => {
if (forceExitTimer) {
return;
}
forceExitTimer = setTimeout(() => {
gatewayLog.error("shutdown timed out; exiting without full cleanup");
writeStabilityBundle(
isRestart ? "gateway.restart_shutdown_timeout" : "gateway.stop_shutdown_timeout",
);
// Keep the in-process watchdog below the supervisor stop budget so this
// path wins before launchd/systemd escalates to a hard kill. Exit
// non-zero on any timeout so supervised installs restart cleanly.
exitProcess(1);
}, forceExitMs);
};
const clearForceExitTimer = () => {
if (!forceExitTimer) {
return;
}
clearTimeout(forceExitTimer);
forceExitTimer = null;
};
if (!isRestart) {
armForceExitTimer(SHUTDOWN_TIMEOUT_MS);
} else if (restartDrainTimeoutMs !== undefined) {
// Allow extra time for draining active turns on explicitly capped restarts.
armForceExitTimer(restartDrainTimeoutMs + SHUTDOWN_TIMEOUT_MS);
}
const formatRestartDrainBudget = () =>
restartDrainTimeoutMs === undefined
? "without a timeout"
: `with timeout ${restartDrainTimeoutMs}ms`;
const createStillPendingDrainLogger = () =>
setInterval(() => {
gatewayLog.warn(
`still draining ${getActiveTaskCount()} active task(s) and ${getActiveEmbeddedRunCount()} active embedded run(s) before restart`,
);
}, RESTART_DRAIN_STILL_PENDING_WARN_MS);
const armCloseForceExitTimerForIndefiniteRestart = () => {
if (isRestart && restartDrainTimeoutMs === undefined) {
armForceExitTimer(SHUTDOWN_TIMEOUT_MS);
}
};
void (async () => {
try {
@@ -180,8 +216,9 @@ export async function runGatewayLoop(params: {
if (activeTasks > 0 || activeRuns > 0) {
gatewayLog.info(
`draining ${activeTasks} active task(s) and ${activeRuns} active embedded run(s) before restart (timeout ${restartDrainTimeoutMs}ms)`,
`draining ${activeTasks} active task(s) and ${activeRuns} active embedded run(s) before restart ${formatRestartDrainBudget()}`,
);
const stillPendingDrainLogger = createStillPendingDrainLogger();
const [tasksDrain, runsDrain] = await Promise.all([
activeTasks > 0
? waitForActiveTasks(restartDrainTimeoutMs)
@@ -189,7 +226,7 @@ export async function runGatewayLoop(params: {
activeRuns > 0
? waitForActiveEmbeddedRuns(restartDrainTimeoutMs)
: Promise.resolve({ drained: true }),
]);
]).finally(() => clearInterval(stillPendingDrainLogger));
if (tasksDrain.drained && runsDrain.drained) {
gatewayLog.info("all active work drained");
} else {
@@ -201,6 +238,7 @@ export async function runGatewayLoop(params: {
}
}
armCloseForceExitTimerForIndefiniteRestart();
await server?.close({
reason: isRestart ? "gateway restarting" : "gateway stopping",
restartExpectedMs: isRestart ? 1500 : null,
@@ -208,7 +246,7 @@ export async function runGatewayLoop(params: {
} catch (err) {
gatewayLog.error(`shutdown error: ${String(err)}`);
} finally {
clearTimeout(forceExitTimer);
clearForceExitTimer();
server = null;
if (isRestart) {
await handleRestartAfterServerClose();
@@ -221,7 +259,7 @@ export async function runGatewayLoop(params: {
const onSigterm = () => {
gatewayLog.info("signal SIGTERM received");
request("stop", "SIGTERM");
request(consumeGatewayRestartIntentSync() ? "restart" : "stop", "SIGTERM");
};
const onSigint = () => {
gatewayLog.info("signal SIGINT received");

View File

@@ -21711,7 +21711,7 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
maximum: 9007199254740991,
title: "Restart Deferral Timeout (ms)",
description:
"Maximum time (ms) to wait for in-flight operations to complete before forcing a SIGUSR1 restart. Default: 300000 (5 minutes). Lower values risk aborting active subagent LLM calls.",
"Optional maximum time (ms) to wait for in-flight operations before forcing a restart. Omit or set 0 to wait indefinitely with periodic still-pending warnings. Lower positive values risk aborting active subagent LLM calls.",
},
},
additionalProperties: false,
@@ -24940,7 +24940,7 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
},
"gateway.reload.deferralTimeoutMs": {
label: "Restart Deferral Timeout (ms)",
help: "Maximum time (ms) to wait for in-flight operations to complete before forcing a SIGUSR1 restart. Default: 300000 (5 minutes). Lower values risk aborting active subagent LLM calls.",
help: "Optional maximum time (ms) to wait for in-flight operations before forcing a restart. Omit or set 0 to wait indefinitely with periodic still-pending warnings. Lower positive values risk aborting active subagent LLM calls.",
tags: ["network", "reliability", "performance"],
},
"gateway.nodes.browser.mode": {

View File

@@ -466,7 +466,7 @@ export const FIELD_HELP: Record<string, string> = {
'Controls how config edits are applied: "off" ignores live edits, "restart" always restarts, "hot" applies in-process, and "hybrid" tries hot then restarts if required. Keep "hybrid" for safest routine updates.',
"gateway.reload.debounceMs": "Debounce window (ms) before applying config changes.",
"gateway.reload.deferralTimeoutMs":
"Maximum time (ms) to wait for in-flight operations to complete before forcing a SIGUSR1 restart. Default: 300000 (5 minutes). Lower values risk aborting active subagent LLM calls.",
"Optional maximum time (ms) to wait for in-flight operations before forcing a restart. Omit or set 0 to wait indefinitely with periodic still-pending warnings. Lower positive values risk aborting active subagent LLM calls.",
"gateway.nodes.browser.mode":
'Node browser routing ("auto" = pick single connected browser node, "manual" = require node param, "off" = disable).',
"gateway.nodes.browser.node": "Pin browser routing to a specific node id or name (optional).",

View File

@@ -206,10 +206,11 @@ export type GatewayReloadConfig = {
/** Debounce window for config reloads (ms). Default: 300. */
debounceMs?: number;
/**
* Maximum time (ms) to wait for in-flight operations to complete before
* forcing a SIGUSR1 restart. Default: 300000 (5 minutes).
* Lower values risk aborting active subagent LLM calls.
* @see https://github.com/openclaw/openclaw/issues/47711
* Optional maximum time (ms) to wait for in-flight operations to complete
* before forcing a restart. Absent or 0 waits indefinitely and logs periodic
* still-pending warnings.
* Lower positive values risk aborting active subagent LLM calls.
* @see https://github.com/openclaw/openclaw/issues/65485
*/
deferralTimeoutMs?: number;
};

View File

@@ -297,6 +297,12 @@ export function createGatewayReloadHandlers(params: GatewayReloadHandlerParams)
restartPending = false;
params.logReload.info("all operations and replies completed; restarting gateway now");
},
onStillPending: (_pending, elapsedMs) => {
const remaining = formatActiveDetails(getActiveCounts());
params.logReload.warn(
`restart still deferred after ${elapsedMs}ms with ${remaining.join(", ")} active`,
);
},
onTimeout: (_pending, elapsedMs) => {
const remaining = formatActiveDetails(getActiveCounts());
restartPending = false;

View File

@@ -1,5 +1,10 @@
import os from "node:os";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
clearConfigCache,
clearRuntimeConfigSnapshot,
setRuntimeConfigSnapshot,
} from "../config/config.js";
import { makeNetworkInterfacesSnapshot } from "../test-helpers/network-interfaces.js";
import {
__testing,
@@ -22,10 +27,12 @@ describe("infra runtime", () => {
});
afterEach(async () => {
__testing.resetSigusr1State();
clearRuntimeConfigSnapshot();
clearConfigCache();
await vi.runOnlyPendingTimersAsync();
vi.useRealTimers();
vi.restoreAllMocks();
__testing.resetSigusr1State();
});
}
@@ -341,7 +348,7 @@ describe("infra runtime", () => {
}
});
it("emits SIGUSR1 after deferral timeout even if still pending", async () => {
it("keeps SIGUSR1 deferred by default while work is still pending", async () => {
const emitSpy = vi.spyOn(process, "emit");
const handler = () => {};
process.on("SIGUSR1", handler);
@@ -353,8 +360,28 @@ describe("infra runtime", () => {
await vi.advanceTimersByTimeAsync(0);
expect(emitSpy).not.toHaveBeenCalledWith("SIGUSR1");
// Advance past the 5-minute max deferral wait
// No default max deferral wait; active turns should not be killed just
// because a config-triggered restart has been pending for 5 minutes.
await vi.advanceTimersByTimeAsync(300_000);
expect(emitSpy).not.toHaveBeenCalledWith("SIGUSR1");
} finally {
process.removeListener("SIGUSR1", handler);
}
});
it("emits SIGUSR1 after explicit deferral timeout even if still pending", async () => {
const emitSpy = vi.spyOn(process, "emit");
const handler = () => {};
process.on("SIGUSR1", handler);
try {
setRuntimeConfigSnapshot({ gateway: { reload: { deferralTimeoutMs: 1_000 } } });
setPreRestartDeferralCheck(() => 5); // always pending
scheduleGatewaySigusr1Restart({ delayMs: 0 });
await vi.advanceTimersByTimeAsync(0);
expect(emitSpy).not.toHaveBeenCalledWith("SIGUSR1");
await vi.advanceTimersByTimeAsync(1_000);
expect(emitSpy).toHaveBeenCalledWith("SIGUSR1");
} finally {
process.removeListener("SIGUSR1", handler);

View File

@@ -0,0 +1,79 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import { consumeGatewayRestartIntentSync, writeGatewayRestartIntentSync } from "./restart.js";
const tempDirs: string[] = [];
function createIntentEnv(): NodeJS.ProcessEnv {
const dir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-restart-intent-"));
tempDirs.push(dir);
return {
...process.env,
OPENCLAW_STATE_DIR: dir,
};
}
function intentPath(env: NodeJS.ProcessEnv): string {
return path.join(env.OPENCLAW_STATE_DIR ?? "", "gateway-restart-intent.json");
}
describe("gateway restart intent", () => {
afterEach(() => {
for (const dir of tempDirs.splice(0)) {
fs.rmSync(dir, { force: true, recursive: true });
}
});
it("consumes a fresh intent for the current process", () => {
const env = createIntentEnv();
expect(writeGatewayRestartIntentSync({ env, targetPid: process.pid })).toBe(true);
expect(consumeGatewayRestartIntentSync(env)).toBe(true);
expect(fs.existsSync(intentPath(env))).toBe(false);
});
it("rejects an intent for a different process", () => {
const env = createIntentEnv();
expect(writeGatewayRestartIntentSync({ env, targetPid: process.pid + 1 })).toBe(true);
expect(consumeGatewayRestartIntentSync(env)).toBe(false);
expect(fs.existsSync(intentPath(env))).toBe(false);
});
it("rejects oversized intent files before parsing", () => {
const env = createIntentEnv();
fs.writeFileSync(intentPath(env), "x".repeat(2048), { encoding: "utf8", mode: 0o600 });
expect(consumeGatewayRestartIntentSync(env)).toBe(false);
expect(fs.existsSync(intentPath(env))).toBe(false);
});
it("writes intent files with owner-only permissions", () => {
const env = createIntentEnv();
expect(writeGatewayRestartIntentSync({ env, targetPid: process.pid })).toBe(true);
expect(fs.statSync(intentPath(env)).mode & 0o777).toBe(0o600);
});
it("does not follow an existing intent-path symlink when writing", () => {
const env = createIntentEnv();
const targetPath = path.join(env.OPENCLAW_STATE_DIR ?? "", "attacker-target.txt");
fs.writeFileSync(targetPath, "keep", "utf8");
try {
fs.symlinkSync(targetPath, intentPath(env));
} catch {
return;
}
expect(writeGatewayRestartIntentSync({ env, targetPid: process.pid })).toBe(true);
expect(fs.readFileSync(targetPath, "utf8")).toBe("keep");
expect(fs.lstatSync(intentPath(env)).isSymbolicLink()).toBe(false);
expect(consumeGatewayRestartIntentSync(env)).toBe(true);
});
});

View File

@@ -16,10 +16,11 @@ describe("deferGatewayRestartUntilIdle timeout", () => {
process.removeAllListeners("SIGUSR1");
});
it("uses default 5-minute timeout when maxWaitMs is not specified", () => {
it("waits indefinitely when maxWaitMs is not specified", () => {
const hooks: RestartDeferralHooks = {
onTimeout: vi.fn(),
onReady: vi.fn(),
onStillPending: vi.fn(),
};
// Always return 1 pending item to prevent draining
@@ -28,13 +29,12 @@ describe("deferGatewayRestartUntilIdle timeout", () => {
hooks,
});
// Advance to just before 5 minutes — should NOT have timed out yet
vi.advanceTimersByTime(299_999);
vi.advanceTimersByTime(300_000);
expect(hooks.onTimeout).not.toHaveBeenCalled();
expect(hooks.onStillPending).toHaveBeenCalled();
// Advance past 5 minutes — should time out
vi.advanceTimersByTime(1);
expect(hooks.onTimeout).toHaveBeenCalledOnce();
vi.advanceTimersByTime(300_000);
expect(hooks.onTimeout).not.toHaveBeenCalled();
expect(hooks.onReady).not.toHaveBeenCalled();
});

View File

@@ -1,7 +1,9 @@
import { spawnSync } from "node:child_process";
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { getRuntimeConfig } from "../config/config.js";
import { resolveStateDir } from "../config/paths.js";
import {
resolveGatewayLaunchAgentLabel,
resolveGatewaySystemdServiceName,
@@ -16,11 +18,12 @@ export type { RestartAttempt } from "./restart.types.js";
const SPAWN_TIMEOUT_MS = 2000;
const SIGUSR1_AUTH_GRACE_MS = 5000;
const DEFAULT_DEFERRAL_POLL_MS = 500;
// Default to 5 minutes to avoid aborting in-flight subagent LLM calls.
// Configurable via gateway.reload.deferralTimeoutMs.
const DEFAULT_DEFERRAL_MAX_WAIT_MS = 300_000;
const DEFAULT_DEFERRAL_STILL_PENDING_WARN_MS = 30_000;
const RESTART_COOLDOWN_MS = 30_000;
const LAUNCHCTL_ALREADY_LOADED_EXIT_CODE = 37;
const GATEWAY_RESTART_INTENT_FILENAME = "gateway-restart-intent.json";
const GATEWAY_RESTART_INTENT_TTL_MS = 60_000;
const GATEWAY_RESTART_INTENT_MAX_BYTES = 1024;
const restartLog = createSubsystemLogger("restart");
@@ -70,6 +73,127 @@ export type RestartAuditInfo = {
changedPaths?: string[];
};
type GatewayRestartIntentPayload = {
kind: "gateway-restart";
pid: number;
createdAt: number;
};
function resolveGatewayRestartIntentPath(env: NodeJS.ProcessEnv = process.env): string {
return path.join(resolveStateDir(env), GATEWAY_RESTART_INTENT_FILENAME);
}
function unlinkGatewayRestartIntentFileSync(intentPath: string): boolean {
try {
const stat = fs.lstatSync(intentPath);
if (!stat.isFile() || stat.nlink > 1) {
return false;
}
fs.unlinkSync(intentPath);
return true;
} catch {
return false;
}
}
function normalizeRestartIntentPid(pid: number | undefined): number | null {
return typeof pid === "number" && Number.isSafeInteger(pid) && pid > 0 ? pid : null;
}
export function writeGatewayRestartIntentSync(opts: {
env?: NodeJS.ProcessEnv;
targetPid?: number;
}): boolean {
const targetPid = normalizeRestartIntentPid(opts.targetPid);
if (targetPid === null) {
return false;
}
const env = opts.env ?? process.env;
let tmpPath: string | undefined;
try {
const intentPath = resolveGatewayRestartIntentPath(env);
fs.mkdirSync(path.dirname(intentPath), { recursive: true });
const payload: GatewayRestartIntentPayload = {
kind: "gateway-restart",
pid: targetPid,
createdAt: Date.now(),
};
tmpPath = path.join(
path.dirname(intentPath),
`.${path.basename(intentPath)}.${process.pid}.${Date.now()}.${Math.random()
.toString(16)
.slice(2)}.tmp`,
);
let fd: number | undefined;
try {
fd = fs.openSync(tmpPath, "wx", 0o600);
fs.writeFileSync(fd, `${JSON.stringify(payload)}\n`, "utf8");
} finally {
if (fd !== undefined) {
fs.closeSync(fd);
}
}
fs.renameSync(tmpPath, intentPath);
return true;
} catch (err) {
if (tmpPath) {
unlinkGatewayRestartIntentFileSync(tmpPath);
}
restartLog.warn(`failed to write gateway restart intent: ${String(err)}`);
return false;
}
}
export function clearGatewayRestartIntentSync(env: NodeJS.ProcessEnv = process.env): void {
unlinkGatewayRestartIntentFileSync(resolveGatewayRestartIntentPath(env));
}
function parseGatewayRestartIntent(raw: string): GatewayRestartIntentPayload | null {
try {
const parsed = JSON.parse(raw) as Partial<GatewayRestartIntentPayload>;
if (
parsed.kind === "gateway-restart" &&
typeof parsed.pid === "number" &&
Number.isFinite(parsed.pid) &&
typeof parsed.createdAt === "number" &&
Number.isFinite(parsed.createdAt)
) {
return parsed as GatewayRestartIntentPayload;
}
} catch {
return null;
}
return null;
}
export function consumeGatewayRestartIntentSync(
env: NodeJS.ProcessEnv = process.env,
now = Date.now(),
): boolean {
const intentPath = resolveGatewayRestartIntentPath(env);
let raw: string;
try {
const stat = fs.lstatSync(intentPath);
if (!stat.isFile() || stat.size > GATEWAY_RESTART_INTENT_MAX_BYTES) {
return false;
}
raw = fs.readFileSync(intentPath, "utf8");
} catch {
return false;
} finally {
clearGatewayRestartIntentSync(env);
}
const payload = parseGatewayRestartIntent(raw);
if (!payload) {
return false;
}
if (payload.pid !== process.pid) {
return false;
}
const ageMs = now - payload.createdAt;
return ageMs >= 0 && ageMs <= GATEWAY_RESTART_INTENT_TTL_MS;
}
function summarizeChangedPaths(paths: string[] | undefined, maxPaths = 6): string | null {
if (!Array.isArray(paths) || paths.length === 0) {
return null;
@@ -197,6 +321,7 @@ export function markGatewaySigusr1RestartHandled(): void {
export type RestartDeferralHooks = {
onDeferring?: (pending: number) => void;
onStillPending?: (pending: number, elapsedMs: number) => void;
onReady?: () => void;
onTimeout?: (pending: number, elapsedMs: number) => void;
onCheckError?: (err: unknown) => void;
@@ -246,7 +371,8 @@ async function emitPreparedGatewayRestart(hooks?: RestartEmitHooks): Promise<voi
}
/**
* Poll pending work until it drains (or times out), then emit one restart signal.
* Poll pending work until it drains, then emit one restart signal.
* A positive maxWaitMs keeps the old capped behavior for explicit configs.
* Shared by both the direct RPC restart path and the config watcher path.
*/
export function deferGatewayRestartUntilIdle(opts: {
@@ -258,8 +384,10 @@ export function deferGatewayRestartUntilIdle(opts: {
}): void {
const pollMsRaw = opts.pollMs ?? DEFAULT_DEFERRAL_POLL_MS;
const pollMs = Math.max(10, Math.floor(pollMsRaw));
const maxWaitMsRaw = opts.maxWaitMs ?? DEFAULT_DEFERRAL_MAX_WAIT_MS;
const maxWaitMs = Math.max(pollMs, Math.floor(maxWaitMsRaw));
const maxWaitMs =
typeof opts.maxWaitMs === "number" && Number.isFinite(opts.maxWaitMs) && opts.maxWaitMs > 0
? Math.max(pollMs, Math.floor(opts.maxWaitMs))
: undefined;
let pending: number;
try {
@@ -277,6 +405,7 @@ export function deferGatewayRestartUntilIdle(opts: {
opts.hooks?.onDeferring?.(pending);
const startedAt = Date.now();
let nextStillPendingAt = startedAt + DEFAULT_DEFERRAL_STILL_PENDING_WARN_MS;
const poll = setInterval(() => {
let current: number;
try {
@@ -296,7 +425,11 @@ export function deferGatewayRestartUntilIdle(opts: {
return;
}
const elapsedMs = Date.now() - startedAt;
if (elapsedMs >= maxWaitMs) {
if (Date.now() >= nextStillPendingAt) {
opts.hooks?.onStillPending?.(current, elapsedMs);
nextStillPendingAt = Date.now() + DEFAULT_DEFERRAL_STILL_PENDING_WARN_MS;
}
if (maxWaitMs !== undefined && elapsedMs >= maxWaitMs) {
clearInterval(poll);
activeDeferralPolls.delete(poll);
opts.hooks?.onTimeout?.(current, elapsedMs);

View File

@@ -373,12 +373,13 @@ export function getActiveTaskCount(): number {
/**
* Wait for all currently active tasks across all lanes to finish.
* Polls at a short interval; resolves when no tasks are active or
* when `timeoutMs` elapses (whichever comes first).
* when `timeoutMs` elapses (whichever comes first). If no timeout is passed,
* waits indefinitely for the active set captured at call time.
*
* New tasks enqueued after this call are ignored — only tasks that are
* already executing are waited on.
*/
export function waitForActiveTasks(timeoutMs: number): Promise<{ drained: boolean }> {
export function waitForActiveTasks(timeoutMs?: number): Promise<{ drained: boolean }> {
const queueState = getQueueState();
const activeAtStart = new Set<number>();
for (const state of queueState.lanes.values()) {
@@ -390,7 +391,7 @@ export function waitForActiveTasks(timeoutMs: number): Promise<{ drained: boolea
if (activeAtStart.size === 0) {
return Promise.resolve({ drained: true });
}
if (timeoutMs <= 0) {
if (timeoutMs !== undefined && timeoutMs <= 0) {
return Promise.resolve({ drained: false });
}
@@ -399,9 +400,11 @@ export function waitForActiveTasks(timeoutMs: number): Promise<{ drained: boolea
activeTaskIds: activeAtStart,
resolve,
};
waiter.timeout = setTimeout(() => {
resolveActiveTaskWaiter(waiter, { drained: false });
}, timeoutMs);
if (timeoutMs !== undefined) {
waiter.timeout = setTimeout(() => {
resolveActiveTaskWaiter(waiter, { drained: false });
}, timeoutMs);
}
queueState.activeTaskWaiters.add(waiter);
notifyActiveTaskWaiters();
});