diff --git a/CHANGELOG.md b/CHANGELOG.md index 3eaf4e39e7c..c34c89e193f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -457,6 +457,7 @@ Docs: https://docs.openclaw.ai - Status/update: resolve beta update-channel checks from the installed version when config still says `stable`, and let `status --deep` reuse live gateway channel credential state instead of warning on command-path-only token misses. - Doctor/plugins: preserve unmanaged third-party plugin `node_modules` during `doctor --fix`, while still pruning OpenClaw-managed runtime dependency caches. - Gateway/restart: add `openclaw gateway restart --force` and `--wait `, log active task run IDs before restart deferral timers, and report timeout restarts as explicit forced restarts. +- Gateway/restart: align `gateway.restart.safe` preflight with scheduled restart deferral by counting only active restart blockers (running non-ended tasks), so queued task records no longer keep "safe" restarts deferred indefinitely. - Discord: persist slash-command deploy hashes across process restarts so unchanged command sets skip redeploy and avoid restart-loop 429s. - Providers/LM Studio: normalize binary `off`/`on` reasoning metadata from Gemma 4 and other local models to LM Studio's accepted OpenAI-compatible `reasoning_effort` values. - Plugins/externalization: keep official external install docs, update examples, and live Codex npm checks on default npm tags instead of `@beta`. Thanks @vincentkoc. diff --git a/docs/cli/daemon.md b/docs/cli/daemon.md index 4581398c4bc..69c5fa9f3fd 100644 --- a/docs/cli/daemon.md +++ b/docs/cli/daemon.md @@ -36,7 +36,7 @@ openclaw daemon uninstall - `status`: `--url`, `--token`, `--password`, `--timeout`, `--no-probe`, `--require-rpc`, `--deep`, `--json` - `install`: `--port`, `--runtime `, `--token`, `--force`, `--json` -- `restart`: `--force`, `--wait `, `--json` +- `restart`: `--safe`, `--force`, `--wait `, `--json` - lifecycle (`uninstall|start|stop`): `--json` Notes: @@ -53,6 +53,7 @@ Notes: - If both `gateway.auth.token` and `gateway.auth.password` are configured and `gateway.auth.mode` is unset, install is blocked until mode is set explicitly. - On macOS, `install` keeps LaunchAgent plists owner-only and loads managed service environment values through an owner-only file and wrapper instead of serializing API keys or auth-profile env refs into `EnvironmentVariables`. - If you intentionally run multiple gateways on one host, isolate ports, config/state, and workspaces; see [/gateway#multiple-gateways-same-host](/gateway#multiple-gateways-same-host). +- `restart --safe` asks the running Gateway to preflight active work and schedule one coalesced restart after active work drains. Plain `restart` keeps the existing service-manager behavior; `--force` remains the immediate override path. ## Prefer diff --git a/docs/cli/gateway.md b/docs/cli/gateway.md index af8a34cf7a8..7f159238833 100644 --- a/docs/cli/gateway.md +++ b/docs/cli/gateway.md @@ -105,6 +105,16 @@ openclaw gateway run Raw stream jsonl path. +## Restart the Gateway + +```bash +openclaw gateway restart +openclaw gateway restart --safe +openclaw gateway restart --force +``` + +`openclaw gateway restart --safe` asks the running Gateway to preflight active OpenClaw work before restarting. If queued operations, reply delivery, embedded runs, or task runs are active, the Gateway reports the blockers, coalesces duplicate safe restart requests, and restarts once the active work drains. Plain `restart` keeps the existing service-manager behavior for compatibility. Use `--force` only when you explicitly want the immediate override path. + Inline `--password` can be exposed in local process listings. Prefer `--password-file`, env, or a SecretRef-backed `gateway.auth.password`. diff --git a/src/cli/daemon-cli/lifecycle.test.ts b/src/cli/daemon-cli/lifecycle.test.ts index bd5cb7f3507..0603c738830 100644 --- a/src/cli/daemon-cli/lifecycle.test.ts +++ b/src/cli/daemon-cli/lifecycle.test.ts @@ -49,6 +49,7 @@ const probeGateway = vi.fn< configSnapshot: unknown; }> >(); +const callGatewayCli = vi.fn(); const isRestartEnabled = vi.fn<(config?: { commands?: unknown }) => boolean>(() => true); const loadConfig = vi.hoisted(() => vi.fn(() => ({}))); const recoverInstalledLaunchAgent = vi.hoisted(() => vi.fn()); @@ -77,6 +78,10 @@ vi.mock("../../gateway/probe.js", () => ({ }) => probeGateway(opts), })); +vi.mock("../../gateway/call.js", () => ({ + callGatewayCli: (opts: unknown) => callGatewayCli(opts), +})); + vi.mock("../../config/commands.js", () => ({ isRestartEnabled: (config?: { commands?: unknown }) => isRestartEnabled(config), })); @@ -113,7 +118,11 @@ vi.mock("./lifecycle-core.js", () => ({ describe("runDaemonRestart health checks", () => { let runDaemonStart: (opts?: { json?: boolean }) => Promise; - let runDaemonRestart: (opts?: { json?: boolean }) => Promise; + let runDaemonRestart: (opts?: { + json?: boolean; + safe?: boolean; + force?: boolean; + }) => Promise; let runDaemonStop: (opts?: { json?: boolean }) => Promise; let envSnapshot: ReturnType; @@ -162,6 +171,7 @@ describe("runDaemonRestart health checks", () => { signalVerifiedGatewayPidSync.mockReset(); formatGatewayPidList.mockReset(); probeGateway.mockReset(); + callGatewayCli.mockReset(); isRestartEnabled.mockReset(); loadConfig.mockReset(); recoverInstalledLaunchAgent.mockReset(); @@ -204,6 +214,31 @@ describe("runDaemonRestart health checks", () => { ok: true, configSnapshot: { commands: { restart: true } }, }); + callGatewayCli.mockResolvedValue({ + ok: true, + status: "deferred", + preflight: { + safe: false, + counts: { + queueSize: 1, + pendingReplies: 0, + embeddedRuns: 0, + activeTasks: 0, + totalActive: 1, + }, + blockers: [{ kind: "queue", count: 1, message: "1 queued or active operation(s)" }], + summary: "restart deferred: 1 queued or active operation(s)", + }, + restart: { + ok: true, + pid: 123, + signal: "SIGUSR1", + delayMs: 0, + mode: "emit", + coalesced: false, + cooldownMsApplied: 0, + }, + }); isRestartEnabled.mockReturnValue(true); signalVerifiedGatewayPidSync.mockImplementation(() => {}); formatGatewayPidList.mockImplementation((pids) => pids.join(", ")); @@ -230,6 +265,24 @@ describe("runDaemonRestart health checks", () => { expect(recoverInstalledLaunchAgent).toHaveBeenCalledWith({ result: "started" }); }); + it("requests a safe gateway restart over RPC without touching the service manager", async () => { + await runDaemonRestart({ json: true, safe: true }); + + expect(callGatewayCli).toHaveBeenCalledWith({ + method: "gateway.restart.request", + params: { reason: "gateway.restart.safe" }, + timeoutMs: 10_000, + }); + expect(runServiceRestart).not.toHaveBeenCalled(); + }); + + it("keeps force restart on the existing non-safe path", async () => { + await runDaemonRestart({ json: true, force: true }); + + expect(callGatewayCli).not.toHaveBeenCalled(); + expect(runServiceRestart).toHaveBeenCalled(); + }); + it("repairs stale loaded service definitions from gateway start", async () => { repairLoadedGatewayServiceForStart.mockResolvedValue({ result: "started", diff --git a/src/cli/daemon-cli/lifecycle.ts b/src/cli/daemon-cli/lifecycle.ts index cea631423c4..572530b82f1 100644 --- a/src/cli/daemon-cli/lifecycle.ts +++ b/src/cli/daemon-cli/lifecycle.ts @@ -1,12 +1,14 @@ import { isRestartEnabled } from "../../config/commands.flags.js"; import { readBestEffortConfig, resolveGatewayPort } from "../../config/config.js"; import { resolveGatewayService } from "../../daemon/service.js"; +import { callGatewayCli } from "../../gateway/call.js"; import { probeGateway } from "../../gateway/probe.js"; import { findVerifiedGatewayListenerPidsOnPortSync, formatGatewayPidList, signalVerifiedGatewayPidSync, } from "../../infra/gateway-processes.js"; +import type { SafeGatewayRestartRequestResult } from "../../infra/restart-coordinator.js"; import { type GatewayRestartIntent, writeGatewayRestartIntentSync } from "../../infra/restart.js"; import { defaultRuntime } from "../../runtime.js"; import { normalizeOptionalString } from "../../shared/string-coerce.js"; @@ -139,6 +141,50 @@ function resolveGatewayRestartIntentOptions( return undefined; } +function formatSafeRestartWarnings(result: SafeGatewayRestartRequestResult): string[] | undefined { + if (result.preflight.blockers.length === 0) { + return undefined; + } + return [result.preflight.summary]; +} + +async function requestSafeGatewayRestart(opts: DaemonLifecycleOptions): Promise { + if (opts.force) { + throw new Error("--safe cannot be combined with --force; omit --safe to force restart now"); + } + if (opts.wait !== undefined) { + throw new Error("--safe cannot be combined with --wait; safe restart uses gateway deferral"); + } + const result = await callGatewayCli({ + method: "gateway.restart.request", + params: { reason: "gateway.restart.safe" }, + timeoutMs: 10_000, + }); + const message = + result.status === "coalesced" + ? "safe restart request joined an existing pending gateway restart" + : result.status === "deferred" + ? "safe restart requested; gateway will restart after active work drains" + : "safe restart requested; gateway will restart momentarily"; + const payload = { + ok: true, + result: result.status, + message, + preflight: result.preflight, + restart: result.restart, + warnings: formatSafeRestartWarnings(result), + }; + if (opts.json) { + defaultRuntime.log(JSON.stringify(payload, null, 2)); + } else { + defaultRuntime.log(message); + if (result.preflight.blockers.length > 0) { + defaultRuntime.log(theme.warn(result.preflight.summary)); + } + } + return true; +} + async function restartGatewayWithoutServiceManager( port: number, restartIntent?: GatewayRestartIntent, @@ -218,6 +264,9 @@ export async function runDaemonStop(opts: DaemonLifecycleOptions = {}) { * Throws/exits on check or restart failures. */ export async function runDaemonRestart(opts: DaemonLifecycleOptions = {}): Promise { + if (opts.safe) { + return await requestSafeGatewayRestart(opts); + } const json = Boolean(opts.json); const service = resolveGatewayService(); let restartedWithoutServiceManager = false; diff --git a/src/cli/daemon-cli/register-service-commands.test.ts b/src/cli/daemon-cli/register-service-commands.test.ts index 0d33fe39bdc..983b4d90c93 100644 --- a/src/cli/daemon-cli/register-service-commands.test.ts +++ b/src/cli/daemon-cli/register-service-commands.test.ts @@ -70,6 +70,17 @@ describe("addGatewayServiceCommands", () => { ); }, }, + { + name: "forwards restart safe control", + argv: ["restart", "--safe"], + assert: () => { + expect(runDaemonRestart).toHaveBeenCalledWith( + expect.objectContaining({ + safe: true, + }), + ); + }, + }, { name: "forwards restart force control", argv: ["restart", "--force"], diff --git a/src/cli/daemon-cli/register-service-commands.ts b/src/cli/daemon-cli/register-service-commands.ts index 13081d38aff..335865cad6f 100644 --- a/src/cli/daemon-cli/register-service-commands.ts +++ b/src/cli/daemon-cli/register-service-commands.ts @@ -49,6 +49,7 @@ function resolveRestartOptions(cmdOpts: DaemonLifecycleOptions, command?: Comman return { ...cmdOpts, force: Boolean(cmdOpts.force || parentForce), + safe: Boolean(cmdOpts.safe), }; } @@ -122,6 +123,7 @@ export function addGatewayServiceCommands(parent: Command, opts?: { statusDescri .command("restart") .description("Restart the Gateway service (launchd/systemd/schtasks)") .option("--force", "Restart immediately without waiting for active gateway work", false) + .option("--safe", "Request an OpenClaw-aware restart after active work drains", false) .option( "--wait ", "Wait duration before forcing restart (ms, 10s, 5m; 0 waits indefinitely)", diff --git a/src/cli/daemon-cli/types.ts b/src/cli/daemon-cli/types.ts index 90df43a6612..5d50d24fa34 100644 --- a/src/cli/daemon-cli/types.ts +++ b/src/cli/daemon-cli/types.ts @@ -27,5 +27,6 @@ export type DaemonInstallOptions = { export type DaemonLifecycleOptions = { json?: boolean; force?: boolean; + safe?: boolean; wait?: string; }; diff --git a/src/cli/gateway-cli/run-loop.test.ts b/src/cli/gateway-cli/run-loop.test.ts index 39c3effecd2..a73f9cf566f 100644 --- a/src/cli/gateway-cli/run-loop.test.ts +++ b/src/cli/gateway-cli/run-loop.test.ts @@ -390,7 +390,9 @@ describe("runGatewayLoop", () => { expect(waitForActiveEmbeddedRuns).not.toHaveBeenCalled(); expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "all" }); expect(gatewayLog.warn).toHaveBeenCalledWith( - expect.stringContaining("restart blocked by active task run(s): taskId=task-force"), + expect.stringContaining( + "restart blocked by active background task run(s): taskId=task-force", + ), ); expect(gatewayLog.warn).toHaveBeenCalledWith( "forced restart requested; skipping active work drain", diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index 25e028219c7..d671b30521e 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -392,7 +392,7 @@ export async function runGatewayLoop(params: { `draining ${activeTasks} active task(s) and ${activeRuns} active embedded run(s) before restart ${formatRestartDrainBudget()}`, ); if (taskBlockers) { - gatewayLog.warn(`restart blocked by active task run(s): ${taskBlockers}`); + gatewayLog.warn(`restart blocked by active background task run(s): ${taskBlockers}`); } if (restartIntent?.force) { gatewayLog.warn("forced restart requested; skipping active work drain"); diff --git a/src/gateway/method-scopes.ts b/src/gateway/method-scopes.ts index af96bc0f09b..6cf956d3138 100644 --- a/src/gateway/method-scopes.ts +++ b/src/gateway/method-scopes.ts @@ -112,6 +112,7 @@ const METHOD_SCOPE_GROUPS: Record = { "cron.status", "cron.runs", "gateway.identity.get", + "gateway.restart.preflight", "system-presence", "last-heartbeat", "node.list", @@ -199,6 +200,7 @@ const METHOD_SCOPE_GROUPS: Record = { "system-event", "agents.files.set", "update.status", + "gateway.restart.request", ], [TALK_SECRETS_SCOPE]: [], }; diff --git a/src/gateway/server-methods-list.ts b/src/gateway/server-methods-list.ts index c89d34e2b6a..48ade335a2c 100644 --- a/src/gateway/server-methods-list.ts +++ b/src/gateway/server-methods-list.ts @@ -148,6 +148,8 @@ const BASE_METHODS = [ "cron.run", "cron.runs", "gateway.identity.get", + "gateway.restart.preflight", + "gateway.restart.request", "system-presence", "system-event", "message.action", diff --git a/src/gateway/server-methods.ts b/src/gateway/server-methods.ts index 78a613a9188..dcb42620a5d 100644 --- a/src/gateway/server-methods.ts +++ b/src/gateway/server-methods.ts @@ -30,6 +30,7 @@ import { nodePendingHandlers } from "./server-methods/nodes-pending.js"; import { nodeHandlers } from "./server-methods/nodes.js"; import { pluginHostHookHandlers } from "./server-methods/plugin-host-hooks.js"; import { pushHandlers } from "./server-methods/push.js"; +import { restartHandlers } from "./server-methods/restart.js"; import { sendHandlers } from "./server-methods/send.js"; import { sessionsHandlers } from "./server-methods/sessions.js"; import { skillsHandlers } from "./server-methods/skills.js"; @@ -47,7 +48,12 @@ import { voicewakeHandlers } from "./server-methods/voicewake.js"; import { webHandlers } from "./server-methods/web.js"; import { wizardHandlers } from "./server-methods/wizard.js"; -const CONTROL_PLANE_WRITE_METHODS = new Set(["config.apply", "config.patch", "update.run"]); +const CONTROL_PLANE_WRITE_METHODS = new Set([ + "config.apply", + "config.patch", + "gateway.restart.request", + "update.run", +]); function authorizeGatewayMethod(method: string, client: GatewayRequestOptions["client"]) { if (!client?.connect) { return null; @@ -110,6 +116,7 @@ export const coreGatewayHandlers: GatewayRequestHandlers = { ...nodeHandlers, ...nodePendingHandlers, ...pushHandlers, + ...restartHandlers, ...sendHandlers, ...usageHandlers, ...agentHandlers, diff --git a/src/gateway/server-methods/restart.ts b/src/gateway/server-methods/restart.ts new file mode 100644 index 00000000000..7fdcabf15bf --- /dev/null +++ b/src/gateway/server-methods/restart.ts @@ -0,0 +1,22 @@ +import { + createSafeGatewayRestartPreflight, + requestSafeGatewayRestart, +} from "../../infra/restart-coordinator.js"; +import type { GatewayRequestHandlers } from "./types.js"; + +function normalizeReason(value: unknown): string | undefined { + return typeof value === "string" && value.trim() ? value.trim().slice(0, 200) : undefined; +} + +export const restartHandlers: GatewayRequestHandlers = { + "gateway.restart.request": async ({ respond, params }) => { + const result = requestSafeGatewayRestart({ + reason: normalizeReason(params.reason), + delayMs: 0, + }); + respond(true, result); + }, + "gateway.restart.preflight": async ({ respond }) => { + respond(true, createSafeGatewayRestartPreflight()); + }, +}; diff --git a/src/gateway/server-reload-handlers.test.ts b/src/gateway/server-reload-handlers.test.ts index 9e038f29996..f5ea25d6ce3 100644 --- a/src/gateway/server-reload-handlers.test.ts +++ b/src/gateway/server-reload-handlers.test.ts @@ -126,7 +126,9 @@ describe("gateway restart deferral preflight", () => { ); expect(logReload.warn).toHaveBeenCalledWith( - expect.stringContaining("restart blocked by active task run(s): taskId=task-nightly"), + expect.stringContaining( + "restart blocked by active background task run(s): taskId=task-nightly", + ), ); expect(logReload.warn).toHaveBeenCalledWith(expect.stringContaining("runId=run-nightly")); diff --git a/src/gateway/server-reload-handlers.ts b/src/gateway/server-reload-handlers.ts index d04f9caa139..92754844cc8 100644 --- a/src/gateway/server-reload-handlers.ts +++ b/src/gateway/server-reload-handlers.ts @@ -22,7 +22,6 @@ import { } from "../secrets/runtime.js"; import { getInspectableActiveTaskRestartBlockers, - getInspectableTaskRegistrySummary, type ActiveTaskRestartBlocker, } from "../tasks/task-registry.maintenance.js"; import type { ChannelHealthMonitor } from "./channel-health-monitor.js"; @@ -143,7 +142,7 @@ export function createGatewayReloadHandlers(params: GatewayReloadHandlerParams) const queueSize = getTotalQueueSize(); const pendingReplies = getTotalPendingReplies(); const embeddedRuns = getActiveEmbeddedRunCount(); - const activeTasks = getInspectableTaskRegistrySummary().active; + const activeTasks = getInspectableActiveTaskRestartBlockers().length; return { queueSize, pendingReplies, @@ -164,7 +163,7 @@ export function createGatewayReloadHandlers(params: GatewayReloadHandlerParams) details.push(`${counts.embeddedRuns} embedded run(s)`); } if (counts.activeTasks > 0) { - details.push(`${counts.activeTasks} task run(s)`); + details.push(`${counts.activeTasks} background task run(s)`); } return details; }; @@ -420,7 +419,7 @@ export function createGatewayReloadHandlers(params: GatewayReloadHandlerParams) ); const taskBlockers = formatTaskBlockers(); if (taskBlockers) { - params.logReload.warn(`restart blocked by active task run(s): ${taskBlockers}`); + params.logReload.warn(`restart blocked by active background task run(s): ${taskBlockers}`); } deferGatewayRestartUntilIdle({ diff --git a/src/gateway/server-startup-early.ts b/src/gateway/server-startup-early.ts index 7153839b5cb..6d96316bcce 100644 --- a/src/gateway/server-startup-early.ts +++ b/src/gateway/server-startup-early.ts @@ -119,7 +119,8 @@ export async function startGatewayEarlyRuntime(params: { cronRuntimeAuthoritative: true, }); taskRegistryMaintenance.startTaskRegistryMaintenance(); - getActiveTaskCount = () => taskRegistryMaintenance.getInspectableTaskRegistrySummary().active; + getActiveTaskCount = () => + taskRegistryMaintenance.getInspectableActiveTaskRestartBlockers().length; } const skillsChangeUnsub = params.minimalTestGateway diff --git a/src/gateway/server.reload.test.ts b/src/gateway/server.reload.test.ts index f5f87acd370..e2296c94df4 100644 --- a/src/gateway/server.reload.test.ts +++ b/src/gateway/server.reload.test.ts @@ -763,7 +763,11 @@ describe("gateway hot reload", () => { const restartTesting = (await import("../infra/restart.js")).__testing; restartTesting.resetSigusr1State(); - hoisted.activeTaskCount.value = 1; + hoisted.activeTaskBlockers.push({ + taskId: "task-running-1", + status: "running", + runtime: "subagent", + }); const signalSpy = vi.fn(); process.once("SIGUSR1", signalSpy); vi.useFakeTimers(); @@ -792,7 +796,7 @@ describe("gateway hot reload", () => { await Promise.resolve(); expect(signalSpy).toHaveBeenCalledTimes(1); } finally { - hoisted.activeTaskCount.value = 0; + hoisted.activeTaskBlockers.length = 0; vi.useRealTimers(); process.removeListener("SIGUSR1", signalSpy); restartTesting.resetSigusr1State(); diff --git a/src/infra/restart-coordinator.test.ts b/src/infra/restart-coordinator.test.ts new file mode 100644 index 00000000000..c96567965fc --- /dev/null +++ b/src/infra/restart-coordinator.test.ts @@ -0,0 +1,119 @@ +import { describe, expect, it, vi } from "vitest"; +import { + createSafeGatewayRestartPreflight, + requestSafeGatewayRestart, +} from "./restart-coordinator.js"; + +const scheduleGatewaySigusr1Restart = vi.hoisted(() => vi.fn()); + +vi.mock("./restart.js", () => ({ + scheduleGatewaySigusr1Restart: (opts: unknown) => scheduleGatewaySigusr1Restart(opts), +})); + +describe("safe gateway restart coordinator", () => { + it("reports safe when no restart blockers are active", () => { + const preflight = createSafeGatewayRestartPreflight({ + getQueueSize: () => 0, + getPendingReplies: () => 0, + getEmbeddedRuns: () => 0, + getActiveTasks: () => 0, + getTaskBlockers: () => [], + }); + + expect(preflight).toEqual({ + safe: true, + counts: { + queueSize: 0, + pendingReplies: 0, + embeddedRuns: 0, + activeTasks: 0, + totalActive: 0, + }, + blockers: [], + summary: "safe to restart now", + }); + }); + + it("returns structured blockers for active work", () => { + const preflight = createSafeGatewayRestartPreflight({ + getQueueSize: () => 2, + getPendingReplies: () => 1, + getEmbeddedRuns: () => 1, + getActiveTasks: () => 1, + getTaskBlockers: () => [ + { + taskId: "task-1", + runId: "run-1", + status: "running", + runtime: "acp", + label: "build", + title: "Build branch", + }, + ], + }); + + expect(preflight.safe).toBe(false); + expect(preflight.counts.totalActive).toBe(5); + expect(preflight.blockers.map((blocker) => blocker.kind)).toEqual([ + "queue", + "reply", + "embedded-run", + "task", + ]); + expect(preflight.summary).toContain("restart deferred"); + expect(preflight.summary).toContain("taskId=task-1"); + }); + + it("schedules one restart request and marks active work as deferred", () => { + scheduleGatewaySigusr1Restart.mockReturnValueOnce({ + ok: true, + pid: 123, + signal: "SIGUSR1", + delayMs: 0, + mode: "emit", + coalesced: false, + cooldownMsApplied: 0, + }); + + const result = requestSafeGatewayRestart({ + reason: "test.safe", + inspect: { + getQueueSize: () => 1, + getPendingReplies: () => 0, + getEmbeddedRuns: () => 0, + getActiveTasks: () => 0, + getTaskBlockers: () => [], + }, + }); + + expect(result.status).toBe("deferred"); + expect(scheduleGatewaySigusr1Restart).toHaveBeenCalledWith({ + delayMs: 0, + reason: "test.safe", + }); + }); + + it("surfaces coalesced restart requests", () => { + scheduleGatewaySigusr1Restart.mockReturnValueOnce({ + ok: true, + pid: 123, + signal: "SIGUSR1", + delayMs: 500, + mode: "emit", + coalesced: true, + cooldownMsApplied: 0, + }); + + const result = requestSafeGatewayRestart({ + inspect: { + getQueueSize: () => 0, + getPendingReplies: () => 0, + getEmbeddedRuns: () => 0, + getActiveTasks: () => 0, + getTaskBlockers: () => [], + }, + }); + + expect(result.status).toBe("coalesced"); + }); +}); diff --git a/src/infra/restart-coordinator.ts b/src/infra/restart-coordinator.ts new file mode 100644 index 00000000000..aa2310c7ed3 --- /dev/null +++ b/src/infra/restart-coordinator.ts @@ -0,0 +1,166 @@ +import { getActiveEmbeddedRunCount } from "../agents/pi-embedded-runner/run-state.js"; +import { getTotalPendingReplies } from "../auto-reply/reply/dispatcher-registry.js"; +import { getTotalQueueSize } from "../process/command-queue.js"; +import { + getInspectableActiveTaskRestartBlockers, + type ActiveTaskRestartBlocker, +} from "../tasks/task-registry.maintenance.js"; +import { scheduleGatewaySigusr1Restart, type ScheduledRestart } from "./restart.js"; + +export type SafeGatewayRestartCounts = { + queueSize: number; + pendingReplies: number; + embeddedRuns: number; + activeTasks: number; + totalActive: number; +}; + +export type SafeGatewayRestartBlocker = { + kind: "queue" | "reply" | "embedded-run" | "task"; + count: number; + message: string; + task?: ActiveTaskRestartBlocker; +}; + +export type SafeGatewayRestartPreflight = { + safe: boolean; + counts: SafeGatewayRestartCounts; + blockers: SafeGatewayRestartBlocker[]; + summary: string; +}; + +export type SafeGatewayRestartRequestResult = { + ok: true; + status: "scheduled" | "deferred" | "coalesced"; + preflight: SafeGatewayRestartPreflight; + restart: ScheduledRestart; +}; + +type SafeRestartInspectors = { + getQueueSize: () => number; + getPendingReplies: () => number; + getEmbeddedRuns: () => number; + getActiveTasks: () => number; + getTaskBlockers: () => ActiveTaskRestartBlocker[]; +}; + +const defaultInspectors: SafeRestartInspectors = { + getQueueSize: getTotalQueueSize, + getPendingReplies: getTotalPendingReplies, + getEmbeddedRuns: getActiveEmbeddedRunCount, + getActiveTasks: () => getInspectableActiveTaskRestartBlockers().length, + getTaskBlockers: getInspectableActiveTaskRestartBlockers, +}; + +function normalizeCount(value: number): number { + return Number.isFinite(value) && value > 0 ? Math.floor(value) : 0; +} + +function formatTaskBlocker(task: ActiveTaskRestartBlocker): string { + return [ + `taskId=${task.taskId}`, + task.runId ? `runId=${task.runId}` : null, + `status=${task.status}`, + `runtime=${task.runtime}`, + task.label ? `label=${task.label}` : null, + task.title ? `title=${task.title.slice(0, 80)}` : null, + ] + .filter((value): value is string => Boolean(value)) + .join(" "); +} + +function createFallbackTaskBlocker(count: number): SafeGatewayRestartBlocker { + return { + kind: "task", + count, + message: `${count} active background task run(s)`, + }; +} + +export function createSafeGatewayRestartPreflight( + inspectors: Partial = {}, +): SafeGatewayRestartPreflight { + const resolved = { ...defaultInspectors, ...inspectors }; + const counts: SafeGatewayRestartCounts = { + queueSize: normalizeCount(resolved.getQueueSize()), + pendingReplies: normalizeCount(resolved.getPendingReplies()), + embeddedRuns: normalizeCount(resolved.getEmbeddedRuns()), + activeTasks: normalizeCount(resolved.getActiveTasks()), + totalActive: 0, + }; + counts.totalActive = + counts.queueSize + counts.pendingReplies + counts.embeddedRuns + counts.activeTasks; + + const blockers: SafeGatewayRestartBlocker[] = []; + if (counts.queueSize > 0) { + blockers.push({ + kind: "queue", + count: counts.queueSize, + message: `${counts.queueSize} queued or active operation(s)`, + }); + } + if (counts.pendingReplies > 0) { + blockers.push({ + kind: "reply", + count: counts.pendingReplies, + message: `${counts.pendingReplies} pending reply delivery operation(s)`, + }); + } + if (counts.embeddedRuns > 0) { + blockers.push({ + kind: "embedded-run", + count: counts.embeddedRuns, + message: `${counts.embeddedRuns} active embedded run(s)`, + }); + } + if (counts.activeTasks > 0) { + const taskBlockers = resolved.getTaskBlockers(); + if (taskBlockers.length === 0) { + blockers.push(createFallbackTaskBlocker(counts.activeTasks)); + } else { + for (const task of taskBlockers.slice(0, 8)) { + blockers.push({ + kind: "task", + count: 1, + message: formatTaskBlocker(task), + task, + }); + } + const omitted = counts.activeTasks - taskBlockers.length; + if (omitted > 0) { + blockers.push(createFallbackTaskBlocker(omitted)); + } + } + } + + const summary = + blockers.length === 0 + ? "safe to restart now" + : `restart deferred: ${blockers.map((blocker) => blocker.message).join("; ")}`; + return { + safe: counts.totalActive === 0, + counts, + blockers, + summary, + }; +} + +export function requestSafeGatewayRestart( + opts: { + reason?: string; + delayMs?: number; + inspect?: Partial; + } = {}, +): SafeGatewayRestartRequestResult { + const preflight = createSafeGatewayRestartPreflight(opts.inspect); + const restart = scheduleGatewaySigusr1Restart({ + delayMs: opts.delayMs ?? 0, + reason: opts.reason ?? "gateway.restart.safe", + }); + return { + ok: true, + status: restart.coalesced ? "coalesced" : preflight.safe ? "scheduled" : "deferred", + preflight, + restart, + }; +} diff --git a/src/tasks/task-registry.maintenance.issue-60299.test.ts b/src/tasks/task-registry.maintenance.issue-60299.test.ts index 9996e776898..fd010a6c4c8 100644 --- a/src/tasks/task-registry.maintenance.issue-60299.test.ts +++ b/src/tasks/task-registry.maintenance.issue-60299.test.ts @@ -10,6 +10,7 @@ import { getDetachedTaskLifecycleRuntime, } from "./detached-task-runtime.js"; import { + getInspectableActiveTaskRestartBlockers, previewTaskRegistryMaintenance, reconcileInspectableTasks, resetTaskRegistryMaintenanceRuntimeForTests, @@ -250,6 +251,44 @@ describe("task-registry maintenance issue #60299", () => { expect(currentTasks.get(task.taskId)).toMatchObject({ status: "running" }); }); + it("only treats started non-ended running tasks as restart blockers", () => { + const now = Date.now(); + const activeRunning = makeStaleTask({ + taskId: "task-running-live", + runtime: "cli", + status: "running", + createdAt: now, + startedAt: now, + lastEventAt: now, + runId: "run-running-live", + }); + const queued = makeStaleTask({ + taskId: "task-queued-durable", + runtime: "acp", + status: "queued", + createdAt: now, + startedAt: undefined, + lastEventAt: now, + }); + const staleInconsistent = makeStaleTask({ + taskId: "task-running-ended", + runtime: "subagent", + status: "running", + endedAt: now - 1_000, + }); + + createTaskRegistryMaintenanceHarness({ tasks: [activeRunning, queued, staleInconsistent] }); + + expect(getInspectableActiveTaskRestartBlockers()).toEqual([ + expect.objectContaining({ + taskId: "task-running-live", + status: "running", + runtime: "cli", + runId: "run-running-live", + }), + ]); + }); + it("marks subagent tasks lost when their child session recovery is tombstoned", async () => { const childSessionKey = "agent:main:subagent:wedged-child"; const task = makeStaleTask({ diff --git a/src/tasks/task-registry.maintenance.ts b/src/tasks/task-registry.maintenance.ts index 7b437baf3b6..11f66456584 100644 --- a/src/tasks/task-registry.maintenance.ts +++ b/src/tasks/task-registry.maintenance.ts @@ -839,7 +839,7 @@ configureTaskAuditTaskProvider(reconcileInspectableTasks); export type ActiveTaskRestartBlocker = { taskId: string; - status: Extract; + status: Extract; runtime: TaskRecord["runtime"]; runId?: string; label?: string; @@ -849,13 +849,23 @@ export type ActiveTaskRestartBlocker = { function isActiveTaskRestartBlockerStatus( status: TaskStatus, ): status is ActiveTaskRestartBlocker["status"] { - return status === "queued" || status === "running"; + return status === "running"; +} + +function isTaskRestartBlocker(task: TaskRecord): task is TaskRecord & { + status: ActiveTaskRestartBlocker["status"]; +} { + // A task that is merely queued has not started user work yet; durable queued + // work can survive a gateway restart and should not indefinitely block one. + // Likewise, stale records that still say "running" but already have endedAt + // are registry inconsistencies, not live restart blockers. + return isActiveTaskRestartBlockerStatus(task.status) && !task.endedAt; } export function getInspectableActiveTaskRestartBlockers(): ActiveTaskRestartBlocker[] { const blockers: ActiveTaskRestartBlocker[] = []; for (const task of reconcileInspectableTasks()) { - if (!isActiveTaskRestartBlockerStatus(task.status)) { + if (!isTaskRestartBlocker(task)) { continue; } const blocker: ActiveTaskRestartBlocker = {