From f6f8d74419a14d2d25ea74c1352c57ffb216dd61 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Sat, 2 May 2026 14:43:53 -0700 Subject: [PATCH] fix(gateway): expose restart drain controls --- src/cli/daemon-cli/lifecycle-core.test.ts | 21 +++ src/cli/daemon-cli/lifecycle-core.ts | 6 + src/cli/daemon-cli/lifecycle.ts | 34 ++++- .../register-service-commands.test.ts | 22 +++ .../daemon-cli/register-service-commands.ts | 19 ++- src/cli/daemon-cli/types.ts | 2 + src/cli/gateway-cli/lifecycle.runtime.ts | 2 + src/cli/gateway-cli/run-loop.test.ts | 85 ++++++++++- src/cli/gateway-cli/run-loop.ts | 98 +++++++++--- src/gateway/server-reload-handlers.test.ts | 141 ++++++++++++++++++ src/gateway/server-reload-handlers.ts | 40 ++++- src/gateway/server.reload.test.ts | 11 ++ src/infra/restart-intent.test.ts | 24 ++- src/infra/restart.ts | 54 +++++-- src/tasks/task-registry.maintenance.ts | 22 +++ 15 files changed, 540 insertions(+), 41 deletions(-) diff --git a/src/cli/daemon-cli/lifecycle-core.test.ts b/src/cli/daemon-cli/lifecycle-core.test.ts index 74a5c3d5604..d85235c156e 100644 --- a/src/cli/daemon-cli/lifecycle-core.test.ts +++ b/src/cli/daemon-cli/lifecycle-core.test.ts @@ -326,6 +326,27 @@ describe("runServiceRestart token drift", () => { expect(service.restart).toHaveBeenCalledTimes(1); }); + it("writes restart force and wait options into the service-manager intent", async () => { + service.readRuntime.mockResolvedValue({ status: "running", pid: 1234 }); + + await runServiceRestart({ + ...createServiceRunArgs(), + opts: { + json: true, + restartIntent: { + waitMs: 2_500, + }, + }, + }); + + expect(writeGatewayRestartIntentSync).toHaveBeenCalledWith({ + targetPid: 1234, + intent: { + waitMs: 2_500, + }, + }); + }); + it("clears restart intent when service-manager restart fails before signaling", async () => { service.readRuntime.mockResolvedValue({ status: "running", pid: 1234 }); writeGatewayRestartIntentSync.mockReturnValueOnce(true); diff --git a/src/cli/daemon-cli/lifecycle-core.ts b/src/cli/daemon-cli/lifecycle-core.ts index 9bd867f7ea4..e5dad463c57 100644 --- a/src/cli/daemon-cli/lifecycle-core.ts +++ b/src/cli/daemon-cli/lifecycle-core.ts @@ -13,6 +13,7 @@ import { isSystemdUserServiceAvailable } from "../../daemon/systemd.js"; import { isGatewaySecretRefUnavailableError } from "../../gateway/credentials.js"; import { clearGatewayRestartIntentSync, + type GatewayRestartIntent, writeGatewayRestartIntentSync, } from "../../infra/restart.js"; import { isWSL } from "../../infra/wsl.js"; @@ -28,6 +29,9 @@ import { filterContainerGenericHints } from "./shared.js"; type DaemonLifecycleOptions = { json?: boolean; + force?: boolean; + wait?: string; + restartIntent?: GatewayRestartIntent; }; type RestartPostCheckContext = { @@ -440,6 +444,7 @@ export async function runServiceRestart(params: { const json = Boolean(params.opts?.json); const { stdout, emit, fail } = createDaemonActionContext({ action: "restart", json }); const warnings: string[] = []; + const restartIntent = params.opts?.restartIntent; let handledRecovery: ServiceRecoveryResult | null = null; let recoveredLoadedState: boolean | null = null; const emitScheduledRestart = ( @@ -552,6 +557,7 @@ export async function runServiceRestart(params: { const runtime = await params.service.readRuntime(process.env).catch(() => null); wroteRestartIntent = writeGatewayRestartIntentSync({ targetPid: runtime?.pid, + ...(restartIntent ? { intent: restartIntent } : {}), }); } try { diff --git a/src/cli/daemon-cli/lifecycle.ts b/src/cli/daemon-cli/lifecycle.ts index 39d24aacc33..954bd4deaef 100644 --- a/src/cli/daemon-cli/lifecycle.ts +++ b/src/cli/daemon-cli/lifecycle.ts @@ -7,10 +7,12 @@ import { formatGatewayPidList, signalVerifiedGatewayPidSync, } from "../../infra/gateway-processes.js"; +import { type GatewayRestartIntent, writeGatewayRestartIntentSync } from "../../infra/restart.js"; import { defaultRuntime } from "../../runtime.js"; import { normalizeOptionalString } from "../../shared/string-coerce.js"; import { theme } from "../../terminal/theme.js"; import { formatCliCommand } from "../command-format.js"; +import { parseDurationMs } from "../parse-duration.js"; import { recoverInstalledLaunchAgent } from "./launchd-recovery.js"; import { runServiceRestart, @@ -122,7 +124,25 @@ async function stopGatewayWithoutServiceManager(port: number) { }; } -async function restartGatewayWithoutServiceManager(port: number) { +function resolveGatewayRestartIntentOptions( + opts: DaemonLifecycleOptions, +): GatewayRestartIntent | undefined { + if (opts.force && opts.wait !== undefined) { + throw new Error("--force cannot be combined with --wait"); + } + if (opts.force) { + return { force: true }; + } + if (opts.wait !== undefined) { + return { waitMs: parseDurationMs(String(opts.wait)) }; + } + return undefined; +} + +async function restartGatewayWithoutServiceManager( + port: number, + restartIntent?: GatewayRestartIntent, +) { await assertUnmanagedGatewayRestartEnabled(port); const pids = resolveVerifiedGatewayListenerPids(port); if (pids.length === 0) { @@ -133,6 +153,10 @@ async function restartGatewayWithoutServiceManager(port: number) { `multiple gateway processes are listening on port ${port}: ${formatGatewayPidList(pids)}; use "openclaw gateway status --deep" before retrying restart`, ); } + writeGatewayRestartIntentSync({ + targetPid: pids[0], + ...(restartIntent ? { intent: restartIntent } : {}), + }); signalVerifiedGatewayPidSync(pids[0], "SIGUSR1"); return { result: "restarted" as const, @@ -197,6 +221,7 @@ export async function runDaemonRestart(opts: DaemonLifecycleOptions = {}): Promi const json = Boolean(opts.json); const service = resolveGatewayService(); let restartedWithoutServiceManager = false; + const restartIntent = resolveGatewayRestartIntentOptions(opts); const restartPort = await resolveGatewayLifecyclePort(service).catch(() => resolveGatewayPortFallback(), ); @@ -208,7 +233,10 @@ export async function runDaemonRestart(opts: DaemonLifecycleOptions = {}): Promi serviceNoun: "Gateway", service, renderStartHints: renderGatewayServiceStartHints, - opts, + opts: { + ...opts, + ...(restartIntent ? { restartIntent } : {}), + }, checkTokenDrift: true, onNotLoaded: async () => { if (process.platform === "darwin") { @@ -217,7 +245,7 @@ export async function runDaemonRestart(opts: DaemonLifecycleOptions = {}): Promi return recovered; } } - const handled = await restartGatewayWithoutServiceManager(restartPort); + const handled = await restartGatewayWithoutServiceManager(restartPort, restartIntent); if (handled) { restartedWithoutServiceManager = true; return handled; diff --git a/src/cli/daemon-cli/register-service-commands.test.ts b/src/cli/daemon-cli/register-service-commands.test.ts index 07ce0f8bdcc..0d33fe39bdc 100644 --- a/src/cli/daemon-cli/register-service-commands.test.ts +++ b/src/cli/daemon-cli/register-service-commands.test.ts @@ -59,6 +59,28 @@ describe("addGatewayServiceCommands", () => { ); }, }, + { + name: "forwards restart force and wait controls", + argv: ["restart", "--wait", "30s"], + assert: () => { + expect(runDaemonRestart).toHaveBeenCalledWith( + expect.objectContaining({ + wait: "30s", + }), + ); + }, + }, + { + name: "forwards restart force control", + argv: ["restart", "--force"], + assert: () => { + expect(runDaemonRestart).toHaveBeenCalledWith( + expect.objectContaining({ + force: true, + }), + ); + }, + }, { name: "forwards status auth collisions from parent gateway command", argv: ["status", "--token", "tok_status", "--password", "pw_status"], diff --git a/src/cli/daemon-cli/register-service-commands.ts b/src/cli/daemon-cli/register-service-commands.ts index 33738f5a3d7..13081d38aff 100644 --- a/src/cli/daemon-cli/register-service-commands.ts +++ b/src/cli/daemon-cli/register-service-commands.ts @@ -1,7 +1,7 @@ import type { Command } from "commander"; import { createLazyImportLoader } from "../../shared/lazy-promise.js"; import { inheritOptionFromParent } from "../command-options.js"; -import type { DaemonInstallOptions, GatewayRpcOpts } from "./types.js"; +import type { DaemonInstallOptions, DaemonLifecycleOptions, GatewayRpcOpts } from "./types.js"; const daemonInstallModuleLoader = createLazyImportLoader(() => import("./install.runtime.js")); const daemonLifecycleModuleLoader = createLazyImportLoader(() => import("./lifecycle.runtime.js")); @@ -44,6 +44,14 @@ function resolveRpcOptions(cmdOpts: GatewayRpcOpts, command?: Command): GatewayR }; } +function resolveRestartOptions(cmdOpts: DaemonLifecycleOptions, command?: Command) { + const parentForce = inheritOptionFromParent(command, "force"); + return { + ...cmdOpts, + force: Boolean(cmdOpts.force || parentForce), + }; +} + export function addGatewayServiceCommands(parent: Command, opts?: { statusDescription?: string }) { parent .command("status") @@ -113,9 +121,14 @@ export function addGatewayServiceCommands(parent: Command, opts?: { statusDescri parent .command("restart") .description("Restart the Gateway service (launchd/systemd/schtasks)") + .option("--force", "Restart immediately without waiting for active gateway work", false) + .option( + "--wait ", + "Wait duration before forcing restart (ms, 10s, 5m; 0 waits indefinitely)", + ) .option("--json", "Output JSON", false) - .action(async (cmdOpts) => { + .action(async (cmdOpts, command) => { const { runDaemonRestart } = await loadDaemonLifecycleModule(); - await runDaemonRestart(cmdOpts); + await runDaemonRestart(resolveRestartOptions(cmdOpts, command)); }); } diff --git a/src/cli/daemon-cli/types.ts b/src/cli/daemon-cli/types.ts index 3ae79327f81..90df43a6612 100644 --- a/src/cli/daemon-cli/types.ts +++ b/src/cli/daemon-cli/types.ts @@ -26,4 +26,6 @@ export type DaemonInstallOptions = { export type DaemonLifecycleOptions = { json?: boolean; + force?: boolean; + wait?: string; }; diff --git a/src/cli/gateway-cli/lifecycle.runtime.ts b/src/cli/gateway-cli/lifecycle.runtime.ts index baf9c138dfe..c0a47b2a286 100644 --- a/src/cli/gateway-cli/lifecycle.runtime.ts +++ b/src/cli/gateway-cli/lifecycle.runtime.ts @@ -10,6 +10,7 @@ export { } from "../../infra/process-respawn.js"; export { resolveGatewayRestartDeferralTimeoutMs, + consumeGatewayRestartIntentPayloadSync, consumeGatewayRestartIntentSync, consumeGatewaySigusr1RestartAuthorization, isGatewaySigusr1RestartExternallyAllowed, @@ -27,4 +28,5 @@ export { resetAllLanes, waitForActiveTasks, } from "../../process/command-queue.js"; +export { getInspectableActiveTaskRestartBlockers } from "../../tasks/task-registry.maintenance.js"; export { reloadTaskRegistryFromStore } from "../../tasks/runtime-internal.js"; diff --git a/src/cli/gateway-cli/run-loop.test.ts b/src/cli/gateway-cli/run-loop.test.ts index cc593bad616..39c3effecd2 100644 --- a/src/cli/gateway-cli/run-loop.test.ts +++ b/src/cli/gateway-cli/run-loop.test.ts @@ -5,6 +5,9 @@ import { pickBeaconHost, pickGatewayPort } from "./discover.js"; const acquireGatewayLock = vi.fn(async (_opts?: { port?: number }) => ({ release: vi.fn(async () => {}), })); +const consumeGatewayRestartIntentPayloadSync = vi.fn< + () => { force?: boolean; waitMs?: number } | null +>(() => null); const consumeGatewaySigusr1RestartAuthorization = vi.fn(() => true); const consumeGatewayRestartIntentSync = vi.fn(() => false); const isGatewaySigusr1RestartExternallyAllowed = vi.fn(() => false); @@ -21,6 +24,17 @@ const scheduleGatewaySigusr1Restart = vi.fn((_opts?: { delayMs?: number; reason? cooldownMsApplied: 0, })); const getActiveTaskCount = vi.fn(() => 0); +const getInspectableActiveTaskRestartBlockers = vi.fn( + () => + [] as Array<{ + taskId: string; + status: "queued" | "running"; + runtime: "subagent" | "acp" | "cli" | "cron"; + runId?: string; + label?: string; + title?: string; + }>, +); const markGatewayDraining = vi.fn(); const waitForActiveTasks = vi.fn(async (_timeoutMs?: number) => ({ drained: true })); const resetAllLanes = vi.fn(); @@ -64,6 +78,7 @@ vi.mock("../../infra/gateway-lock.js", () => ({ })); vi.mock("../../infra/restart.js", () => ({ + consumeGatewayRestartIntentPayloadSync: () => consumeGatewayRestartIntentPayloadSync(), consumeGatewaySigusr1RestartAuthorization: () => consumeGatewaySigusr1RestartAuthorization(), consumeGatewayRestartIntentSync: () => consumeGatewayRestartIntentSync(), isGatewaySigusr1RestartExternallyAllowed: () => isGatewaySigusr1RestartExternallyAllowed(), @@ -103,6 +118,10 @@ vi.mock("../../tasks/runtime-internal.js", () => ({ reloadTaskRegistryFromStore: () => reloadTaskRegistryFromStore(), })); +vi.mock("../../tasks/task-registry.maintenance.js", () => ({ + getInspectableActiveTaskRestartBlockers: () => getInspectableActiveTaskRestartBlockers(), +})); + vi.mock("../../agents/pi-embedded-runner/runs.js", () => ({ abortEmbeddedPiRun: (sessionId?: string, opts?: { mode?: "all" | "compacting" }) => abortEmbeddedPiRun(sessionId, opts), @@ -270,7 +289,7 @@ describe("runGatewayLoop", () => { it("treats SIGTERM with a restart intent as a draining restart", async () => { vi.clearAllMocks(); - consumeGatewayRestartIntentSync.mockReturnValueOnce(true); + consumeGatewayRestartIntentPayloadSync.mockReturnValueOnce({}); getActiveTaskCount.mockReturnValueOnce(1).mockReturnValue(0); await withIsolatedSignals(async ({ captureSignal }) => { @@ -301,7 +320,7 @@ describe("runGatewayLoop", () => { await new Promise((resolve) => setImmediate(resolve)); await new Promise((resolve) => setImmediate(resolve)); - expect(consumeGatewayRestartIntentSync).toHaveBeenCalledOnce(); + expect(consumeGatewayRestartIntentPayloadSync).toHaveBeenCalledOnce(); expect(markGatewayDraining).toHaveBeenCalledOnce(); expect(waitForActiveTasks).toHaveBeenCalledWith(90_000); expect(closeFirst).toHaveBeenCalledWith({ @@ -321,6 +340,68 @@ describe("runGatewayLoop", () => { }); }); + it("uses restart intent wait overrides for SIGTERM drain", async () => { + vi.clearAllMocks(); + consumeGatewayRestartIntentPayloadSync.mockReturnValueOnce({ waitMs: 2_500 }); + getActiveTaskCount.mockReturnValueOnce(1).mockReturnValue(0); + + await withIsolatedSignals(async ({ captureSignal }) => { + const { start, exited } = await createSignaledLoopHarness(); + const sigterm = captureSignal("SIGTERM"); + const sigint = captureSignal("SIGINT"); + + sigterm(); + await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); + + expect(waitForActiveTasks).toHaveBeenCalledWith(2_500); + expect(start).toHaveBeenCalledTimes(2); + + sigint(); + await expect(exited).resolves.toBe(0); + }); + }); + + it("forces SIGTERM restarts without waiting for active task drain", async () => { + vi.clearAllMocks(); + consumeGatewayRestartIntentPayloadSync.mockReturnValueOnce({ force: true }); + getActiveTaskCount.mockReturnValueOnce(1).mockReturnValue(0); + getActiveEmbeddedRunCount.mockReturnValueOnce(1).mockReturnValue(0); + getInspectableActiveTaskRestartBlockers.mockReturnValueOnce([ + { + taskId: "task-force", + runId: "run-force", + status: "running", + runtime: "cron", + label: "forced", + }, + ]); + + await withIsolatedSignals(async ({ captureSignal }) => { + const { start, exited } = await createSignaledLoopHarness(); + const sigterm = captureSignal("SIGTERM"); + const sigint = captureSignal("SIGINT"); + + sigterm(); + await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); + + expect(waitForActiveTasks).not.toHaveBeenCalled(); + expect(waitForActiveEmbeddedRuns).not.toHaveBeenCalled(); + expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "all" }); + expect(gatewayLog.warn).toHaveBeenCalledWith( + expect.stringContaining("restart blocked by active task run(s): taskId=task-force"), + ); + expect(gatewayLog.warn).toHaveBeenCalledWith( + "forced restart requested; skipping active work drain", + ); + expect(start).toHaveBeenCalledTimes(2); + + sigint(); + await expect(exited).resolves.toBe(0); + }); + }); + it("restarts after SIGUSR1 even when drain times out, and resets runtime state for the new iteration", async () => { vi.clearAllMocks(); loadConfig.mockReturnValue({ diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index 3aacff4eb2d..25e028219c7 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -15,6 +15,10 @@ const UPDATE_RESPAWN_HEALTH_POLL_MS = 200; type GatewayRunSignalAction = "stop" | "restart"; type RestartDrainTimeoutMs = number | undefined; +type RestartIntentOptions = { + force?: boolean; + waitMs?: number; +}; type GatewayLifecycleRuntimeModule = typeof import("./lifecycle.runtime.js"); @@ -245,7 +249,15 @@ export async function runGatewayLoop(params: { const SUPERVISOR_STOP_TIMEOUT_MS = 30_000; const SHUTDOWN_TIMEOUT_MS = SUPERVISOR_STOP_TIMEOUT_MS - 5_000; - const resolveRestartDrainTimeoutMs = async (): Promise => { + const resolveRestartDrainTimeoutMs = async ( + restartIntent?: RestartIntentOptions, + ): Promise => { + if (restartIntent?.force) { + return 0; + } + if (typeof restartIntent?.waitMs === "number" && Number.isFinite(restartIntent.waitMs)) { + return restartIntent.waitMs > 0 ? Math.floor(restartIntent.waitMs) : undefined; + } try { const { getRuntimeConfig, resolveGatewayRestartDeferralTimeoutMs } = await loadGatewayLifecycleRuntimeModule(); @@ -256,7 +268,12 @@ export async function runGatewayLoop(params: { } }; - const request = (action: GatewayRunSignalAction, signal: string, restartReason?: string) => { + const request = ( + action: GatewayRunSignalAction, + signal: string, + restartReason?: string, + restartIntent?: RestartIntentOptions, + ) => { if (shuttingDown) { gatewayLog.info(`received ${signal} during shutdown; ignoring`); return; @@ -295,7 +312,9 @@ export async function runGatewayLoop(params: { }; void (async () => { - const restartDrainTimeoutMs = isRestart ? await resolveRestartDrainTimeoutMs() : 0; + const restartDrainTimeoutMs = isRestart + ? await resolveRestartDrainTimeoutMs(restartIntent) + : 0; if (!isRestart) { armForceExitTimer(SHUTDOWN_TIMEOUT_MS); } else if (restartDrainTimeoutMs !== undefined) { @@ -319,12 +338,35 @@ export async function runGatewayLoop(params: { if (isRestart) { const { abortEmbeddedPiRun, + getInspectableActiveTaskRestartBlockers, getActiveEmbeddedRunCount, getActiveTaskCount, markGatewayDraining, waitForActiveEmbeddedRuns, waitForActiveTasks, } = await loadGatewayLifecycleRuntimeModule(); + const formatTaskBlockers = () => { + const blockers = getInspectableActiveTaskRestartBlockers(); + if (blockers.length === 0) { + return null; + } + const shown = blockers + .slice(0, 8) + .map((task) => + [ + `taskId=${task.taskId}`, + task.runId ? `runId=${task.runId}` : null, + `status=${task.status}`, + `runtime=${task.runtime}`, + task.label ? `label=${task.label}` : null, + task.title ? `title=${task.title.slice(0, 80)}` : null, + ] + .filter((value): value is string => Boolean(value)) + .join(" "), + ); + const omitted = blockers.length - shown.length; + return omitted > 0 ? `${shown.join("; ")}; +${omitted} more` : shown.join("; "); + }; const createStillPendingDrainLogger = () => setInterval(() => { gatewayLog.warn( @@ -345,25 +387,34 @@ export async function runGatewayLoop(params: { } if (activeTasks > 0 || activeRuns > 0) { + const taskBlockers = formatTaskBlockers(); gatewayLog.info( `draining ${activeTasks} active task(s) and ${activeRuns} active embedded run(s) before restart ${formatRestartDrainBudget()}`, ); - const stillPendingDrainLogger = createStillPendingDrainLogger(); - const [tasksDrain, runsDrain] = await Promise.all([ - activeTasks > 0 - ? waitForActiveTasks(restartDrainTimeoutMs) - : Promise.resolve({ drained: true }), - activeRuns > 0 - ? waitForActiveEmbeddedRuns(restartDrainTimeoutMs) - : Promise.resolve({ drained: true }), - ]).finally(() => clearInterval(stillPendingDrainLogger)); - if (tasksDrain.drained && runsDrain.drained) { - gatewayLog.info("all active work drained"); - } else { - gatewayLog.warn("drain timeout reached; proceeding with restart"); - // Final best-effort abort to avoid carrying active runs into the - // next lifecycle when drain time budget is exhausted. + if (taskBlockers) { + gatewayLog.warn(`restart blocked by active task run(s): ${taskBlockers}`); + } + if (restartIntent?.force) { + gatewayLog.warn("forced restart requested; skipping active work drain"); abortEmbeddedPiRun(undefined, { mode: "all" }); + } else { + const stillPendingDrainLogger = createStillPendingDrainLogger(); + const [tasksDrain, runsDrain] = await Promise.all([ + activeTasks > 0 + ? waitForActiveTasks(restartDrainTimeoutMs) + : Promise.resolve({ drained: true }), + activeRuns > 0 + ? waitForActiveEmbeddedRuns(restartDrainTimeoutMs) + : Promise.resolve({ drained: true }), + ]).finally(() => clearInterval(stillPendingDrainLogger)); + if (tasksDrain.drained && runsDrain.drained) { + gatewayLog.info("all active work drained"); + } else { + gatewayLog.warn("drain timeout reached; proceeding with restart"); + // Final best-effort abort to avoid carrying active runs into the + // next lifecycle when drain time budget is exhausted. + abortEmbeddedPiRun(undefined, { mode: "all" }); + } } } } @@ -390,8 +441,9 @@ export async function runGatewayLoop(params: { const onSigterm = () => { gatewayLog.info("signal SIGTERM received"); void (async () => { - const { consumeGatewayRestartIntentSync } = await loadGatewayLifecycleRuntimeModule(); - request(consumeGatewayRestartIntentSync() ? "restart" : "stop", "SIGTERM"); + const { consumeGatewayRestartIntentPayloadSync } = await loadGatewayLifecycleRuntimeModule(); + const restartIntent = consumeGatewayRestartIntentPayloadSync(); + request(restartIntent ? "restart" : "stop", "SIGTERM", undefined, restartIntent ?? undefined); })(); }; const onSigint = () => { @@ -402,12 +454,18 @@ export async function runGatewayLoop(params: { gatewayLog.info("signal SIGUSR1 received"); void (async () => { const { + consumeGatewayRestartIntentPayloadSync, consumeGatewaySigusr1RestartAuthorization, isGatewaySigusr1RestartExternallyAllowed, markGatewaySigusr1RestartHandled, peekGatewaySigusr1RestartReason, scheduleGatewaySigusr1Restart, } = await loadGatewayLifecycleRuntimeModule(); + const restartIntent = consumeGatewayRestartIntentPayloadSync(); + if (restartIntent) { + request("restart", "SIGUSR1", "gateway.restart", restartIntent); + return; + } const authorized = consumeGatewaySigusr1RestartAuthorization(); if (!authorized) { if (!isGatewaySigusr1RestartExternallyAllowed()) { diff --git a/src/gateway/server-reload-handlers.test.ts b/src/gateway/server-reload-handlers.test.ts index 8a539109082..05420e0e939 100644 --- a/src/gateway/server-reload-handlers.test.ts +++ b/src/gateway/server-reload-handlers.test.ts @@ -9,6 +9,87 @@ import type { ChannelKind } from "./config-reload-plan.js"; import type { GatewayPluginReloadResult } from "./server-reload-handlers.js"; import { __testing, createGatewayReloadHandlers } from "./server-reload-handlers.js"; +const hoisted = vi.hoisted(() => ({ + activeTaskCount: { value: 0 }, + activeTaskBlockers: [] as Array<{ + taskId: string; + status: "queued" | "running"; + runtime: "subagent" | "acp" | "cli" | "cron"; + runId?: string; + label?: string; + title?: string; + }>, +})); + +vi.mock("../tasks/task-registry.maintenance.js", async () => { + const actual = await vi.importActual( + "../tasks/task-registry.maintenance.js", + ); + return { + ...actual, + getInspectableActiveTaskRestartBlockers: () => hoisted.activeTaskBlockers, + getInspectableTaskRegistrySummary: () => ({ + total: hoisted.activeTaskCount.value, + active: hoisted.activeTaskCount.value, + terminal: 0, + failures: 0, + byStatus: { + queued: 0, + running: hoisted.activeTaskCount.value, + succeeded: 0, + failed: 0, + timed_out: 0, + cancelled: 0, + lost: 0, + }, + byRuntime: { + subagent: hoisted.activeTaskCount.value, + acp: 0, + cli: 0, + cron: 0, + }, + }), + }; +}); + +function createReloadHandlersForTest(logReload = { info: vi.fn(), warn: vi.fn() }) { + const cron = { start: vi.fn(async () => {}), stop: vi.fn() }; + const heartbeatRunner = { + stop: vi.fn(), + updateConfig: vi.fn(), + }; + return createGatewayReloadHandlers({ + deps: {} as never, + broadcast: vi.fn(), + getState: () => ({ + hooksConfig: {} as never, + hookClientIpConfig: {} as never, + heartbeatRunner: heartbeatRunner as never, + cronState: { cron, storePath: "/tmp/cron.json", cronEnabled: false } as never, + channelHealthMonitor: null, + }), + setState: vi.fn(), + startChannel: vi.fn(async () => {}), + stopChannel: vi.fn(async () => {}), + reloadPlugins: vi.fn( + async (): Promise => ({ + restartChannels: new Set(), + activeChannels: new Set(), + }), + ), + logHooks: { info: vi.fn(), warn: vi.fn(), error: vi.fn() }, + logChannels: { info: vi.fn(), error: vi.fn() }, + logCron: { error: vi.fn() }, + logReload, + createHealthMonitor: () => null, + }); +} + +afterEach(() => { + hoisted.activeTaskCount.value = 0; + hoisted.activeTaskBlockers.length = 0; +}); + describe("gateway reload recovery handlers", () => { afterEach(() => { embeddedRunTesting.resetActiveEmbeddedRuns(); @@ -52,6 +133,66 @@ describe("gateway reload recovery handlers", () => { }); }); +describe("gateway restart deferral preflight", () => { + it("logs active task run ids before waiting and when forcing after timeout", async () => { + const restartTesting = (await import("../infra/restart.js")).__testing; + restartTesting.resetSigusr1State(); + const logReload = { info: vi.fn(), warn: vi.fn() }; + const { requestGatewayRestart } = createReloadHandlersForTest(logReload); + hoisted.activeTaskCount.value = 1; + hoisted.activeTaskBlockers.push({ + taskId: "task-nightly", + runId: "run-nightly", + status: "running", + runtime: "cron", + label: "nightly sync", + title: "refresh all accounts", + }); + const signalSpy = vi.fn(); + process.once("SIGUSR1", signalSpy); + vi.useFakeTimers(); + + try { + requestGatewayRestart( + { + changedPaths: ["gateway.port"], + restartGateway: true, + restartReasons: ["gateway.port"], + hotReasons: [], + reloadHooks: false, + restartGmailWatcher: false, + restartCron: false, + restartHeartbeat: false, + restartHealthMonitor: false, + reloadPlugins: false, + restartChannels: new Set(), + disposeMcpRuntimes: false, + noopPaths: [], + }, + { + gateway: { reload: { deferralTimeoutMs: 1_000 } }, + }, + ); + + expect(logReload.warn).toHaveBeenCalledWith( + expect.stringContaining("restart blocked by active task run(s): taskId=task-nightly"), + ); + expect(logReload.warn).toHaveBeenCalledWith(expect.stringContaining("runId=run-nightly")); + + await vi.advanceTimersByTimeAsync(1_000); + await Promise.resolve(); + + expect(signalSpy).toHaveBeenCalledTimes(1); + expect(logReload.warn).toHaveBeenCalledWith(expect.stringContaining("; forcing restart")); + } finally { + hoisted.activeTaskCount.value = 0; + vi.useRealTimers(); + process.removeListener("SIGUSR1", signalSpy); + restartTesting.resetSigusr1State(); + } + }); +}); + describe("gateway plugin hot reload handlers", () => { it("stops removed channel plugins from broad activation before swapping plugin runtime", async () => { const previousSkipChannels = process.env.OPENCLAW_SKIP_CHANNELS; diff --git a/src/gateway/server-reload-handlers.ts b/src/gateway/server-reload-handlers.ts index 624d0355856..7dd9915a123 100644 --- a/src/gateway/server-reload-handlers.ts +++ b/src/gateway/server-reload-handlers.ts @@ -21,7 +21,11 @@ import { clearSecretsRuntimeSnapshot, getActiveSecretsRuntimeSnapshot, } from "../secrets/runtime.js"; -import { getInspectableTaskRegistrySummary } from "../tasks/task-registry.maintenance.js"; +import { + getInspectableActiveTaskRestartBlockers, + getInspectableTaskRegistrySummary, + type ActiveTaskRestartBlocker, +} from "../tasks/task-registry.maintenance.js"; import type { ChannelHealthMonitor } from "./channel-health-monitor.js"; import { enqueueConfigRecoveryNotice } from "./config-recovery-notice.js"; import type { ChannelKind } from "./config-reload-plan.js"; @@ -183,6 +187,26 @@ export function createGatewayReloadHandlers(params: GatewayReloadHandlerParams) } return details; }; + const formatTaskBlocker = (task: ActiveTaskRestartBlocker) => { + const details = [ + `taskId=${task.taskId}`, + task.runId ? `runId=${task.runId}` : null, + `status=${task.status}`, + `runtime=${task.runtime}`, + task.label ? `label=${task.label}` : null, + task.title ? `title=${task.title.slice(0, 80)}` : null, + ].filter((value): value is string => Boolean(value)); + return details.join(" "); + }; + const formatTaskBlockers = () => { + const blockers = getInspectableActiveTaskRestartBlockers(); + if (blockers.length === 0) { + return null; + } + const shown = blockers.slice(0, 8).map(formatTaskBlocker); + const omitted = blockers.length - shown.length; + return omitted > 0 ? `${shown.join("; ")}; +${omitted} more` : shown.join("; "); + }; const waitForActiveWorkBeforeChannelReload = async ( channels: Iterable, nextConfig: OpenClawConfig, @@ -412,6 +436,10 @@ export function createGatewayReloadHandlers(params: GatewayReloadHandlerParams) params.logReload.warn( `config change requires gateway restart (${reasons}) — deferring until ${initialDetails.join(", ")} complete`, ); + const taskBlockers = formatTaskBlockers(); + if (taskBlockers) { + params.logReload.warn(`restart blocked by active task run(s): ${taskBlockers}`); + } deferGatewayRestartUntilIdle({ getPendingCount: () => getActiveCounts().totalActive, @@ -425,15 +453,21 @@ export function createGatewayReloadHandlers(params: GatewayReloadHandlerParams) }, onStillPending: (_pending, elapsedMs) => { const remaining = formatActiveDetails(getActiveCounts()); + const taskBlockers = formatTaskBlockers(); params.logReload.warn( - `restart still deferred after ${elapsedMs}ms with ${remaining.join(", ")} active`, + `restart still deferred after ${elapsedMs}ms with ${remaining.join(", ")} active${ + taskBlockers ? ` (${taskBlockers})` : "" + }`, ); }, onTimeout: (_pending, elapsedMs) => { const remaining = formatActiveDetails(getActiveCounts()); + const taskBlockers = formatTaskBlockers(); restartPending = false; params.logReload.warn( - `restart timeout after ${elapsedMs}ms with ${remaining.join(", ")} still active; restarting anyway`, + `restart timeout after ${elapsedMs}ms with ${remaining.join(", ")} still active${ + taskBlockers ? ` (${taskBlockers})` : "" + }; forcing restart`, ); }, onCheckError: (err) => { diff --git a/src/gateway/server.reload.test.ts b/src/gateway/server.reload.test.ts index 170c18ff7c4..bc9be7508d6 100644 --- a/src/gateway/server.reload.test.ts +++ b/src/gateway/server.reload.test.ts @@ -46,6 +46,14 @@ const hoisted = vi.hoisted(() => { const totalPendingReplies = { value: 0 }; const totalQueueSize = { value: 0 }; const activeTaskCount = { value: 0 }; + const activeTaskBlockers: Array<{ + taskId: string; + status: "queued" | "running"; + runtime: "subagent" | "acp" | "cli" | "cron"; + runId?: string; + label?: string; + title?: string; + }> = []; const startGmailWatcher = vi.fn(async () => ({ started: true })); const stopGmailWatcher = vi.fn(async () => {}); @@ -150,6 +158,7 @@ const hoisted = vi.hoisted(() => { totalPendingReplies, totalQueueSize, activeTaskCount, + activeTaskBlockers, startGmailWatcher, stopGmailWatcher, resetModelCatalogCache, @@ -258,6 +267,7 @@ vi.mock("../tasks/task-registry.maintenance.js", async () => { ); return { ...actual, + getInspectableActiveTaskRestartBlockers: () => hoisted.activeTaskBlockers, getInspectableTaskRegistrySummary: () => ({ active: hoisted.activeTaskCount.value, queued: 0, @@ -312,6 +322,7 @@ describe("gateway hot reload", () => { hoisted.totalPendingReplies.value = 0; hoisted.totalQueueSize.value = 0; hoisted.activeTaskCount.value = 0; + hoisted.activeTaskBlockers.length = 0; embeddedRunMock.activeIds.clear(); hoisted.resetModelCatalogCache.mockReset(); hoisted.disposeAllSessionMcpRuntimes.mockReset(); diff --git a/src/infra/restart-intent.test.ts b/src/infra/restart-intent.test.ts index 21d6d73bd80..7abccce03f7 100644 --- a/src/infra/restart-intent.test.ts +++ b/src/infra/restart-intent.test.ts @@ -2,7 +2,11 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; import { afterEach, describe, expect, it } from "vitest"; -import { consumeGatewayRestartIntentSync, writeGatewayRestartIntentSync } from "./restart.js"; +import { + consumeGatewayRestartIntentPayloadSync, + consumeGatewayRestartIntentSync, + writeGatewayRestartIntentSync, +} from "./restart.js"; const tempDirs: string[] = []; @@ -60,6 +64,24 @@ describe("gateway restart intent", () => { expect(fs.statSync(intentPath(env)).mode & 0o777).toBe(0o600); }); + it("round-trips restart force and wait options", () => { + const env = createIntentEnv(); + + expect( + writeGatewayRestartIntentSync({ + env, + targetPid: process.pid, + intent: { force: true, waitMs: 12_345 }, + }), + ).toBe(true); + + expect(consumeGatewayRestartIntentPayloadSync(env)).toEqual({ + force: true, + waitMs: 12_345, + }); + expect(fs.existsSync(intentPath(env))).toBe(false); + }); + it("does not follow an existing intent-path symlink when writing", () => { const env = createIntentEnv(); const targetPath = path.join(env.OPENCLAW_STATE_DIR ?? "", "attacker-target.txt"); diff --git a/src/infra/restart.ts b/src/infra/restart.ts index 7e508efecac..6fdcbd37580 100644 --- a/src/infra/restart.ts +++ b/src/infra/restart.ts @@ -91,6 +91,13 @@ type GatewayRestartIntentPayload = { kind: "gateway-restart"; pid: number; createdAt: number; + force?: boolean; + waitMs?: number; +}; + +export type GatewayRestartIntent = { + force?: boolean; + waitMs?: number; }; function resolveGatewayRestartIntentPath(env: NodeJS.ProcessEnv = process.env): string { @@ -117,6 +124,7 @@ function normalizeRestartIntentPid(pid: number | undefined): number | null { export function writeGatewayRestartIntentSync(opts: { env?: NodeJS.ProcessEnv; targetPid?: number; + intent?: GatewayRestartIntent; }): boolean { const targetPid = normalizeRestartIntentPid(opts.targetPid); if (targetPid === null) { @@ -131,6 +139,12 @@ export function writeGatewayRestartIntentSync(opts: { kind: "gateway-restart", pid: targetPid, createdAt: Date.now(), + ...(opts.intent?.force ? { force: true } : {}), + ...(typeof opts.intent?.waitMs === "number" && + Number.isFinite(opts.intent.waitMs) && + opts.intent.waitMs >= 0 + ? { waitMs: Math.floor(opts.intent.waitMs) } + : {}), }; tmpPath = path.join( path.dirname(intentPath), @@ -168,9 +182,18 @@ function parseGatewayRestartIntent(raw: string): GatewayRestartIntentPayload | n typeof parsed.pid === "number" && Number.isFinite(parsed.pid) && typeof parsed.createdAt === "number" && - Number.isFinite(parsed.createdAt) + Number.isFinite(parsed.createdAt) && + (parsed.force === undefined || typeof parsed.force === "boolean") && + (parsed.waitMs === undefined || + (typeof parsed.waitMs === "number" && Number.isFinite(parsed.waitMs) && parsed.waitMs >= 0)) ) { - return parsed as GatewayRestartIntentPayload; + return { + kind: "gateway-restart", + pid: parsed.pid, + createdAt: parsed.createdAt, + ...(parsed.force ? { force: true } : {}), + ...(typeof parsed.waitMs === "number" ? { waitMs: Math.floor(parsed.waitMs) } : {}), + }; } } catch { return null; @@ -178,32 +201,45 @@ function parseGatewayRestartIntent(raw: string): GatewayRestartIntentPayload | n return null; } -export function consumeGatewayRestartIntentSync( +export function consumeGatewayRestartIntentPayloadSync( env: NodeJS.ProcessEnv = process.env, now = Date.now(), -): boolean { +): GatewayRestartIntent | null { const intentPath = resolveGatewayRestartIntentPath(env); let raw: string; try { const stat = fs.lstatSync(intentPath); if (!stat.isFile() || stat.size > GATEWAY_RESTART_INTENT_MAX_BYTES) { - return false; + return null; } raw = fs.readFileSync(intentPath, "utf8"); } catch { - return false; + return null; } finally { clearGatewayRestartIntentSync(env); } const payload = parseGatewayRestartIntent(raw); if (!payload) { - return false; + return null; } if (payload.pid !== process.pid) { - return false; + return null; } const ageMs = now - payload.createdAt; - return ageMs >= 0 && ageMs <= GATEWAY_RESTART_INTENT_TTL_MS; + if (ageMs < 0 || ageMs > GATEWAY_RESTART_INTENT_TTL_MS) { + return null; + } + return { + ...(payload.force ? { force: true } : {}), + ...(typeof payload.waitMs === "number" ? { waitMs: payload.waitMs } : {}), + }; +} + +export function consumeGatewayRestartIntentSync( + env: NodeJS.ProcessEnv = process.env, + now = Date.now(), +): boolean { + return consumeGatewayRestartIntentPayloadSync(env, now) !== null; } function summarizeChangedPaths(paths: string[] | undefined, maxPaths = 6): string | null { diff --git a/src/tasks/task-registry.maintenance.ts b/src/tasks/task-registry.maintenance.ts index 3fa3edd2fc5..bceadc78091 100644 --- a/src/tasks/task-registry.maintenance.ts +++ b/src/tasks/task-registry.maintenance.ts @@ -837,6 +837,28 @@ export function reconcileInspectableTasks(): TaskRecord[] { configureTaskAuditTaskProvider(reconcileInspectableTasks); +export type ActiveTaskRestartBlocker = { + taskId: string; + status: Extract; + runtime: TaskRecord["runtime"]; + runId?: string; + label?: string; + title?: string; +}; + +export function getInspectableActiveTaskRestartBlockers(): ActiveTaskRestartBlocker[] { + return reconcileInspectableTasks() + .filter((task) => task.status === "queued" || task.status === "running") + .map((task) => ({ + taskId: task.taskId, + status: task.status as Extract, + runtime: task.runtime, + ...(task.runId ? { runId: task.runId } : {}), + ...(task.label ? { label: task.label } : {}), + ...(task.task ? { title: task.task } : {}), + })); +} + export function getInspectableTaskRegistrySummary(): TaskRegistrySummary { return summarizeTaskRecords(reconcileInspectableTasks()); }