mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 05:30:42 +00:00
fix(gateway): add safe restart coordinator (#76923)
Add a safe restart coordinator that preflights active Gateway work before restart. - expose gateway.restart.preflight and gateway.restart.request RPC methods - add explicit openclaw gateway restart --safe / openclaw daemon restart --safe path - narrow restart blockers to running non-ended tasks so queued records no longer block indefinitely - keep existing restart behavior unchanged; --force remains the immediate override Co-authored-by: NikolaFC <54186359+NikolaFC@users.noreply.github.com> Co-authored-by: galiniliev <5711535+galiniliev@users.noreply.github.com>
This commit is contained in:
@@ -457,6 +457,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Status/update: resolve beta update-channel checks from the installed version when config still says `stable`, and let `status --deep` reuse live gateway channel credential state instead of warning on command-path-only token misses.
|
||||
- Doctor/plugins: preserve unmanaged third-party plugin `node_modules` during `doctor --fix`, while still pruning OpenClaw-managed runtime dependency caches.
|
||||
- Gateway/restart: add `openclaw gateway restart --force` and `--wait <duration>`, log active task run IDs before restart deferral timers, and report timeout restarts as explicit forced restarts.
|
||||
- Gateway/restart: align `gateway.restart.safe` preflight with scheduled restart deferral by counting only active restart blockers (running non-ended tasks), so queued task records no longer keep "safe" restarts deferred indefinitely.
|
||||
- Discord: persist slash-command deploy hashes across process restarts so unchanged command sets skip redeploy and avoid restart-loop 429s.
|
||||
- Providers/LM Studio: normalize binary `off`/`on` reasoning metadata from Gemma 4 and other local models to LM Studio's accepted OpenAI-compatible `reasoning_effort` values.
|
||||
- Plugins/externalization: keep official external install docs, update examples, and live Codex npm checks on default npm tags instead of `@beta`. Thanks @vincentkoc.
|
||||
|
||||
@@ -36,7 +36,7 @@ openclaw daemon uninstall
|
||||
|
||||
- `status`: `--url`, `--token`, `--password`, `--timeout`, `--no-probe`, `--require-rpc`, `--deep`, `--json`
|
||||
- `install`: `--port`, `--runtime <node|bun>`, `--token`, `--force`, `--json`
|
||||
- `restart`: `--force`, `--wait <duration>`, `--json`
|
||||
- `restart`: `--safe`, `--force`, `--wait <duration>`, `--json`
|
||||
- lifecycle (`uninstall|start|stop`): `--json`
|
||||
|
||||
Notes:
|
||||
@@ -53,6 +53,7 @@ Notes:
|
||||
- If both `gateway.auth.token` and `gateway.auth.password` are configured and `gateway.auth.mode` is unset, install is blocked until mode is set explicitly.
|
||||
- On macOS, `install` keeps LaunchAgent plists owner-only and loads managed service environment values through an owner-only file and wrapper instead of serializing API keys or auth-profile env refs into `EnvironmentVariables`.
|
||||
- If you intentionally run multiple gateways on one host, isolate ports, config/state, and workspaces; see [/gateway#multiple-gateways-same-host](/gateway#multiple-gateways-same-host).
|
||||
- `restart --safe` asks the running Gateway to preflight active work and schedule one coalesced restart after active work drains. Plain `restart` keeps the existing service-manager behavior; `--force` remains the immediate override path.
|
||||
|
||||
## Prefer
|
||||
|
||||
|
||||
@@ -105,6 +105,16 @@ openclaw gateway run
|
||||
Raw stream jsonl path.
|
||||
</ParamField>
|
||||
|
||||
## Restart the Gateway
|
||||
|
||||
```bash
|
||||
openclaw gateway restart
|
||||
openclaw gateway restart --safe
|
||||
openclaw gateway restart --force
|
||||
```
|
||||
|
||||
`openclaw gateway restart --safe` asks the running Gateway to preflight active OpenClaw work before restarting. If queued operations, reply delivery, embedded runs, or task runs are active, the Gateway reports the blockers, coalesces duplicate safe restart requests, and restarts once the active work drains. Plain `restart` keeps the existing service-manager behavior for compatibility. Use `--force` only when you explicitly want the immediate override path.
|
||||
|
||||
<Warning>
|
||||
Inline `--password` can be exposed in local process listings. Prefer `--password-file`, env, or a SecretRef-backed `gateway.auth.password`.
|
||||
</Warning>
|
||||
|
||||
@@ -49,6 +49,7 @@ const probeGateway = vi.fn<
|
||||
configSnapshot: unknown;
|
||||
}>
|
||||
>();
|
||||
const callGatewayCli = vi.fn();
|
||||
const isRestartEnabled = vi.fn<(config?: { commands?: unknown }) => boolean>(() => true);
|
||||
const loadConfig = vi.hoisted(() => vi.fn(() => ({})));
|
||||
const recoverInstalledLaunchAgent = vi.hoisted(() => vi.fn());
|
||||
@@ -77,6 +78,10 @@ vi.mock("../../gateway/probe.js", () => ({
|
||||
}) => probeGateway(opts),
|
||||
}));
|
||||
|
||||
vi.mock("../../gateway/call.js", () => ({
|
||||
callGatewayCli: (opts: unknown) => callGatewayCli(opts),
|
||||
}));
|
||||
|
||||
vi.mock("../../config/commands.js", () => ({
|
||||
isRestartEnabled: (config?: { commands?: unknown }) => isRestartEnabled(config),
|
||||
}));
|
||||
@@ -113,7 +118,11 @@ vi.mock("./lifecycle-core.js", () => ({
|
||||
|
||||
describe("runDaemonRestart health checks", () => {
|
||||
let runDaemonStart: (opts?: { json?: boolean }) => Promise<void>;
|
||||
let runDaemonRestart: (opts?: { json?: boolean }) => Promise<boolean>;
|
||||
let runDaemonRestart: (opts?: {
|
||||
json?: boolean;
|
||||
safe?: boolean;
|
||||
force?: boolean;
|
||||
}) => Promise<boolean>;
|
||||
let runDaemonStop: (opts?: { json?: boolean }) => Promise<void>;
|
||||
let envSnapshot: ReturnType<typeof captureEnv>;
|
||||
|
||||
@@ -162,6 +171,7 @@ describe("runDaemonRestart health checks", () => {
|
||||
signalVerifiedGatewayPidSync.mockReset();
|
||||
formatGatewayPidList.mockReset();
|
||||
probeGateway.mockReset();
|
||||
callGatewayCli.mockReset();
|
||||
isRestartEnabled.mockReset();
|
||||
loadConfig.mockReset();
|
||||
recoverInstalledLaunchAgent.mockReset();
|
||||
@@ -204,6 +214,31 @@ describe("runDaemonRestart health checks", () => {
|
||||
ok: true,
|
||||
configSnapshot: { commands: { restart: true } },
|
||||
});
|
||||
callGatewayCli.mockResolvedValue({
|
||||
ok: true,
|
||||
status: "deferred",
|
||||
preflight: {
|
||||
safe: false,
|
||||
counts: {
|
||||
queueSize: 1,
|
||||
pendingReplies: 0,
|
||||
embeddedRuns: 0,
|
||||
activeTasks: 0,
|
||||
totalActive: 1,
|
||||
},
|
||||
blockers: [{ kind: "queue", count: 1, message: "1 queued or active operation(s)" }],
|
||||
summary: "restart deferred: 1 queued or active operation(s)",
|
||||
},
|
||||
restart: {
|
||||
ok: true,
|
||||
pid: 123,
|
||||
signal: "SIGUSR1",
|
||||
delayMs: 0,
|
||||
mode: "emit",
|
||||
coalesced: false,
|
||||
cooldownMsApplied: 0,
|
||||
},
|
||||
});
|
||||
isRestartEnabled.mockReturnValue(true);
|
||||
signalVerifiedGatewayPidSync.mockImplementation(() => {});
|
||||
formatGatewayPidList.mockImplementation((pids) => pids.join(", "));
|
||||
@@ -230,6 +265,24 @@ describe("runDaemonRestart health checks", () => {
|
||||
expect(recoverInstalledLaunchAgent).toHaveBeenCalledWith({ result: "started" });
|
||||
});
|
||||
|
||||
it("requests a safe gateway restart over RPC without touching the service manager", async () => {
|
||||
await runDaemonRestart({ json: true, safe: true });
|
||||
|
||||
expect(callGatewayCli).toHaveBeenCalledWith({
|
||||
method: "gateway.restart.request",
|
||||
params: { reason: "gateway.restart.safe" },
|
||||
timeoutMs: 10_000,
|
||||
});
|
||||
expect(runServiceRestart).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("keeps force restart on the existing non-safe path", async () => {
|
||||
await runDaemonRestart({ json: true, force: true });
|
||||
|
||||
expect(callGatewayCli).not.toHaveBeenCalled();
|
||||
expect(runServiceRestart).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("repairs stale loaded service definitions from gateway start", async () => {
|
||||
repairLoadedGatewayServiceForStart.mockResolvedValue({
|
||||
result: "started",
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
import { isRestartEnabled } from "../../config/commands.flags.js";
|
||||
import { readBestEffortConfig, resolveGatewayPort } from "../../config/config.js";
|
||||
import { resolveGatewayService } from "../../daemon/service.js";
|
||||
import { callGatewayCli } from "../../gateway/call.js";
|
||||
import { probeGateway } from "../../gateway/probe.js";
|
||||
import {
|
||||
findVerifiedGatewayListenerPidsOnPortSync,
|
||||
formatGatewayPidList,
|
||||
signalVerifiedGatewayPidSync,
|
||||
} from "../../infra/gateway-processes.js";
|
||||
import type { SafeGatewayRestartRequestResult } from "../../infra/restart-coordinator.js";
|
||||
import { type GatewayRestartIntent, writeGatewayRestartIntentSync } from "../../infra/restart.js";
|
||||
import { defaultRuntime } from "../../runtime.js";
|
||||
import { normalizeOptionalString } from "../../shared/string-coerce.js";
|
||||
@@ -139,6 +141,50 @@ function resolveGatewayRestartIntentOptions(
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function formatSafeRestartWarnings(result: SafeGatewayRestartRequestResult): string[] | undefined {
|
||||
if (result.preflight.blockers.length === 0) {
|
||||
return undefined;
|
||||
}
|
||||
return [result.preflight.summary];
|
||||
}
|
||||
|
||||
async function requestSafeGatewayRestart(opts: DaemonLifecycleOptions): Promise<boolean> {
|
||||
if (opts.force) {
|
||||
throw new Error("--safe cannot be combined with --force; omit --safe to force restart now");
|
||||
}
|
||||
if (opts.wait !== undefined) {
|
||||
throw new Error("--safe cannot be combined with --wait; safe restart uses gateway deferral");
|
||||
}
|
||||
const result = await callGatewayCli<SafeGatewayRestartRequestResult>({
|
||||
method: "gateway.restart.request",
|
||||
params: { reason: "gateway.restart.safe" },
|
||||
timeoutMs: 10_000,
|
||||
});
|
||||
const message =
|
||||
result.status === "coalesced"
|
||||
? "safe restart request joined an existing pending gateway restart"
|
||||
: result.status === "deferred"
|
||||
? "safe restart requested; gateway will restart after active work drains"
|
||||
: "safe restart requested; gateway will restart momentarily";
|
||||
const payload = {
|
||||
ok: true,
|
||||
result: result.status,
|
||||
message,
|
||||
preflight: result.preflight,
|
||||
restart: result.restart,
|
||||
warnings: formatSafeRestartWarnings(result),
|
||||
};
|
||||
if (opts.json) {
|
||||
defaultRuntime.log(JSON.stringify(payload, null, 2));
|
||||
} else {
|
||||
defaultRuntime.log(message);
|
||||
if (result.preflight.blockers.length > 0) {
|
||||
defaultRuntime.log(theme.warn(result.preflight.summary));
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
async function restartGatewayWithoutServiceManager(
|
||||
port: number,
|
||||
restartIntent?: GatewayRestartIntent,
|
||||
@@ -218,6 +264,9 @@ export async function runDaemonStop(opts: DaemonLifecycleOptions = {}) {
|
||||
* Throws/exits on check or restart failures.
|
||||
*/
|
||||
export async function runDaemonRestart(opts: DaemonLifecycleOptions = {}): Promise<boolean> {
|
||||
if (opts.safe) {
|
||||
return await requestSafeGatewayRestart(opts);
|
||||
}
|
||||
const json = Boolean(opts.json);
|
||||
const service = resolveGatewayService();
|
||||
let restartedWithoutServiceManager = false;
|
||||
|
||||
@@ -70,6 +70,17 @@ describe("addGatewayServiceCommands", () => {
|
||||
);
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "forwards restart safe control",
|
||||
argv: ["restart", "--safe"],
|
||||
assert: () => {
|
||||
expect(runDaemonRestart).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
safe: true,
|
||||
}),
|
||||
);
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "forwards restart force control",
|
||||
argv: ["restart", "--force"],
|
||||
|
||||
@@ -49,6 +49,7 @@ function resolveRestartOptions(cmdOpts: DaemonLifecycleOptions, command?: Comman
|
||||
return {
|
||||
...cmdOpts,
|
||||
force: Boolean(cmdOpts.force || parentForce),
|
||||
safe: Boolean(cmdOpts.safe),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -122,6 +123,7 @@ export function addGatewayServiceCommands(parent: Command, opts?: { statusDescri
|
||||
.command("restart")
|
||||
.description("Restart the Gateway service (launchd/systemd/schtasks)")
|
||||
.option("--force", "Restart immediately without waiting for active gateway work", false)
|
||||
.option("--safe", "Request an OpenClaw-aware restart after active work drains", false)
|
||||
.option(
|
||||
"--wait <duration>",
|
||||
"Wait duration before forcing restart (ms, 10s, 5m; 0 waits indefinitely)",
|
||||
|
||||
@@ -27,5 +27,6 @@ export type DaemonInstallOptions = {
|
||||
export type DaemonLifecycleOptions = {
|
||||
json?: boolean;
|
||||
force?: boolean;
|
||||
safe?: boolean;
|
||||
wait?: string;
|
||||
};
|
||||
|
||||
@@ -390,7 +390,9 @@ describe("runGatewayLoop", () => {
|
||||
expect(waitForActiveEmbeddedRuns).not.toHaveBeenCalled();
|
||||
expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "all" });
|
||||
expect(gatewayLog.warn).toHaveBeenCalledWith(
|
||||
expect.stringContaining("restart blocked by active task run(s): taskId=task-force"),
|
||||
expect.stringContaining(
|
||||
"restart blocked by active background task run(s): taskId=task-force",
|
||||
),
|
||||
);
|
||||
expect(gatewayLog.warn).toHaveBeenCalledWith(
|
||||
"forced restart requested; skipping active work drain",
|
||||
|
||||
@@ -392,7 +392,7 @@ export async function runGatewayLoop(params: {
|
||||
`draining ${activeTasks} active task(s) and ${activeRuns} active embedded run(s) before restart ${formatRestartDrainBudget()}`,
|
||||
);
|
||||
if (taskBlockers) {
|
||||
gatewayLog.warn(`restart blocked by active task run(s): ${taskBlockers}`);
|
||||
gatewayLog.warn(`restart blocked by active background task run(s): ${taskBlockers}`);
|
||||
}
|
||||
if (restartIntent?.force) {
|
||||
gatewayLog.warn("forced restart requested; skipping active work drain");
|
||||
|
||||
@@ -112,6 +112,7 @@ const METHOD_SCOPE_GROUPS: Record<OperatorScope, readonly string[]> = {
|
||||
"cron.status",
|
||||
"cron.runs",
|
||||
"gateway.identity.get",
|
||||
"gateway.restart.preflight",
|
||||
"system-presence",
|
||||
"last-heartbeat",
|
||||
"node.list",
|
||||
@@ -199,6 +200,7 @@ const METHOD_SCOPE_GROUPS: Record<OperatorScope, readonly string[]> = {
|
||||
"system-event",
|
||||
"agents.files.set",
|
||||
"update.status",
|
||||
"gateway.restart.request",
|
||||
],
|
||||
[TALK_SECRETS_SCOPE]: [],
|
||||
};
|
||||
|
||||
@@ -148,6 +148,8 @@ const BASE_METHODS = [
|
||||
"cron.run",
|
||||
"cron.runs",
|
||||
"gateway.identity.get",
|
||||
"gateway.restart.preflight",
|
||||
"gateway.restart.request",
|
||||
"system-presence",
|
||||
"system-event",
|
||||
"message.action",
|
||||
|
||||
@@ -30,6 +30,7 @@ import { nodePendingHandlers } from "./server-methods/nodes-pending.js";
|
||||
import { nodeHandlers } from "./server-methods/nodes.js";
|
||||
import { pluginHostHookHandlers } from "./server-methods/plugin-host-hooks.js";
|
||||
import { pushHandlers } from "./server-methods/push.js";
|
||||
import { restartHandlers } from "./server-methods/restart.js";
|
||||
import { sendHandlers } from "./server-methods/send.js";
|
||||
import { sessionsHandlers } from "./server-methods/sessions.js";
|
||||
import { skillsHandlers } from "./server-methods/skills.js";
|
||||
@@ -47,7 +48,12 @@ import { voicewakeHandlers } from "./server-methods/voicewake.js";
|
||||
import { webHandlers } from "./server-methods/web.js";
|
||||
import { wizardHandlers } from "./server-methods/wizard.js";
|
||||
|
||||
const CONTROL_PLANE_WRITE_METHODS = new Set(["config.apply", "config.patch", "update.run"]);
|
||||
const CONTROL_PLANE_WRITE_METHODS = new Set([
|
||||
"config.apply",
|
||||
"config.patch",
|
||||
"gateway.restart.request",
|
||||
"update.run",
|
||||
]);
|
||||
function authorizeGatewayMethod(method: string, client: GatewayRequestOptions["client"]) {
|
||||
if (!client?.connect) {
|
||||
return null;
|
||||
@@ -110,6 +116,7 @@ export const coreGatewayHandlers: GatewayRequestHandlers = {
|
||||
...nodeHandlers,
|
||||
...nodePendingHandlers,
|
||||
...pushHandlers,
|
||||
...restartHandlers,
|
||||
...sendHandlers,
|
||||
...usageHandlers,
|
||||
...agentHandlers,
|
||||
|
||||
22
src/gateway/server-methods/restart.ts
Normal file
22
src/gateway/server-methods/restart.ts
Normal file
@@ -0,0 +1,22 @@
|
||||
import {
|
||||
createSafeGatewayRestartPreflight,
|
||||
requestSafeGatewayRestart,
|
||||
} from "../../infra/restart-coordinator.js";
|
||||
import type { GatewayRequestHandlers } from "./types.js";
|
||||
|
||||
function normalizeReason(value: unknown): string | undefined {
|
||||
return typeof value === "string" && value.trim() ? value.trim().slice(0, 200) : undefined;
|
||||
}
|
||||
|
||||
export const restartHandlers: GatewayRequestHandlers = {
|
||||
"gateway.restart.request": async ({ respond, params }) => {
|
||||
const result = requestSafeGatewayRestart({
|
||||
reason: normalizeReason(params.reason),
|
||||
delayMs: 0,
|
||||
});
|
||||
respond(true, result);
|
||||
},
|
||||
"gateway.restart.preflight": async ({ respond }) => {
|
||||
respond(true, createSafeGatewayRestartPreflight());
|
||||
},
|
||||
};
|
||||
@@ -126,7 +126,9 @@ describe("gateway restart deferral preflight", () => {
|
||||
);
|
||||
|
||||
expect(logReload.warn).toHaveBeenCalledWith(
|
||||
expect.stringContaining("restart blocked by active task run(s): taskId=task-nightly"),
|
||||
expect.stringContaining(
|
||||
"restart blocked by active background task run(s): taskId=task-nightly",
|
||||
),
|
||||
);
|
||||
expect(logReload.warn).toHaveBeenCalledWith(expect.stringContaining("runId=run-nightly"));
|
||||
|
||||
|
||||
@@ -22,7 +22,6 @@ import {
|
||||
} from "../secrets/runtime.js";
|
||||
import {
|
||||
getInspectableActiveTaskRestartBlockers,
|
||||
getInspectableTaskRegistrySummary,
|
||||
type ActiveTaskRestartBlocker,
|
||||
} from "../tasks/task-registry.maintenance.js";
|
||||
import type { ChannelHealthMonitor } from "./channel-health-monitor.js";
|
||||
@@ -143,7 +142,7 @@ export function createGatewayReloadHandlers(params: GatewayReloadHandlerParams)
|
||||
const queueSize = getTotalQueueSize();
|
||||
const pendingReplies = getTotalPendingReplies();
|
||||
const embeddedRuns = getActiveEmbeddedRunCount();
|
||||
const activeTasks = getInspectableTaskRegistrySummary().active;
|
||||
const activeTasks = getInspectableActiveTaskRestartBlockers().length;
|
||||
return {
|
||||
queueSize,
|
||||
pendingReplies,
|
||||
@@ -164,7 +163,7 @@ export function createGatewayReloadHandlers(params: GatewayReloadHandlerParams)
|
||||
details.push(`${counts.embeddedRuns} embedded run(s)`);
|
||||
}
|
||||
if (counts.activeTasks > 0) {
|
||||
details.push(`${counts.activeTasks} task run(s)`);
|
||||
details.push(`${counts.activeTasks} background task run(s)`);
|
||||
}
|
||||
return details;
|
||||
};
|
||||
@@ -420,7 +419,7 @@ export function createGatewayReloadHandlers(params: GatewayReloadHandlerParams)
|
||||
);
|
||||
const taskBlockers = formatTaskBlockers();
|
||||
if (taskBlockers) {
|
||||
params.logReload.warn(`restart blocked by active task run(s): ${taskBlockers}`);
|
||||
params.logReload.warn(`restart blocked by active background task run(s): ${taskBlockers}`);
|
||||
}
|
||||
|
||||
deferGatewayRestartUntilIdle({
|
||||
|
||||
@@ -119,7 +119,8 @@ export async function startGatewayEarlyRuntime(params: {
|
||||
cronRuntimeAuthoritative: true,
|
||||
});
|
||||
taskRegistryMaintenance.startTaskRegistryMaintenance();
|
||||
getActiveTaskCount = () => taskRegistryMaintenance.getInspectableTaskRegistrySummary().active;
|
||||
getActiveTaskCount = () =>
|
||||
taskRegistryMaintenance.getInspectableActiveTaskRestartBlockers().length;
|
||||
}
|
||||
|
||||
const skillsChangeUnsub = params.minimalTestGateway
|
||||
|
||||
@@ -763,7 +763,11 @@ describe("gateway hot reload", () => {
|
||||
|
||||
const restartTesting = (await import("../infra/restart.js")).__testing;
|
||||
restartTesting.resetSigusr1State();
|
||||
hoisted.activeTaskCount.value = 1;
|
||||
hoisted.activeTaskBlockers.push({
|
||||
taskId: "task-running-1",
|
||||
status: "running",
|
||||
runtime: "subagent",
|
||||
});
|
||||
const signalSpy = vi.fn();
|
||||
process.once("SIGUSR1", signalSpy);
|
||||
vi.useFakeTimers();
|
||||
@@ -792,7 +796,7 @@ describe("gateway hot reload", () => {
|
||||
await Promise.resolve();
|
||||
expect(signalSpy).toHaveBeenCalledTimes(1);
|
||||
} finally {
|
||||
hoisted.activeTaskCount.value = 0;
|
||||
hoisted.activeTaskBlockers.length = 0;
|
||||
vi.useRealTimers();
|
||||
process.removeListener("SIGUSR1", signalSpy);
|
||||
restartTesting.resetSigusr1State();
|
||||
|
||||
119
src/infra/restart-coordinator.test.ts
Normal file
119
src/infra/restart-coordinator.test.ts
Normal file
@@ -0,0 +1,119 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
createSafeGatewayRestartPreflight,
|
||||
requestSafeGatewayRestart,
|
||||
} from "./restart-coordinator.js";
|
||||
|
||||
const scheduleGatewaySigusr1Restart = vi.hoisted(() => vi.fn());
|
||||
|
||||
vi.mock("./restart.js", () => ({
|
||||
scheduleGatewaySigusr1Restart: (opts: unknown) => scheduleGatewaySigusr1Restart(opts),
|
||||
}));
|
||||
|
||||
describe("safe gateway restart coordinator", () => {
|
||||
it("reports safe when no restart blockers are active", () => {
|
||||
const preflight = createSafeGatewayRestartPreflight({
|
||||
getQueueSize: () => 0,
|
||||
getPendingReplies: () => 0,
|
||||
getEmbeddedRuns: () => 0,
|
||||
getActiveTasks: () => 0,
|
||||
getTaskBlockers: () => [],
|
||||
});
|
||||
|
||||
expect(preflight).toEqual({
|
||||
safe: true,
|
||||
counts: {
|
||||
queueSize: 0,
|
||||
pendingReplies: 0,
|
||||
embeddedRuns: 0,
|
||||
activeTasks: 0,
|
||||
totalActive: 0,
|
||||
},
|
||||
blockers: [],
|
||||
summary: "safe to restart now",
|
||||
});
|
||||
});
|
||||
|
||||
it("returns structured blockers for active work", () => {
|
||||
const preflight = createSafeGatewayRestartPreflight({
|
||||
getQueueSize: () => 2,
|
||||
getPendingReplies: () => 1,
|
||||
getEmbeddedRuns: () => 1,
|
||||
getActiveTasks: () => 1,
|
||||
getTaskBlockers: () => [
|
||||
{
|
||||
taskId: "task-1",
|
||||
runId: "run-1",
|
||||
status: "running",
|
||||
runtime: "acp",
|
||||
label: "build",
|
||||
title: "Build branch",
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
expect(preflight.safe).toBe(false);
|
||||
expect(preflight.counts.totalActive).toBe(5);
|
||||
expect(preflight.blockers.map((blocker) => blocker.kind)).toEqual([
|
||||
"queue",
|
||||
"reply",
|
||||
"embedded-run",
|
||||
"task",
|
||||
]);
|
||||
expect(preflight.summary).toContain("restart deferred");
|
||||
expect(preflight.summary).toContain("taskId=task-1");
|
||||
});
|
||||
|
||||
it("schedules one restart request and marks active work as deferred", () => {
|
||||
scheduleGatewaySigusr1Restart.mockReturnValueOnce({
|
||||
ok: true,
|
||||
pid: 123,
|
||||
signal: "SIGUSR1",
|
||||
delayMs: 0,
|
||||
mode: "emit",
|
||||
coalesced: false,
|
||||
cooldownMsApplied: 0,
|
||||
});
|
||||
|
||||
const result = requestSafeGatewayRestart({
|
||||
reason: "test.safe",
|
||||
inspect: {
|
||||
getQueueSize: () => 1,
|
||||
getPendingReplies: () => 0,
|
||||
getEmbeddedRuns: () => 0,
|
||||
getActiveTasks: () => 0,
|
||||
getTaskBlockers: () => [],
|
||||
},
|
||||
});
|
||||
|
||||
expect(result.status).toBe("deferred");
|
||||
expect(scheduleGatewaySigusr1Restart).toHaveBeenCalledWith({
|
||||
delayMs: 0,
|
||||
reason: "test.safe",
|
||||
});
|
||||
});
|
||||
|
||||
it("surfaces coalesced restart requests", () => {
|
||||
scheduleGatewaySigusr1Restart.mockReturnValueOnce({
|
||||
ok: true,
|
||||
pid: 123,
|
||||
signal: "SIGUSR1",
|
||||
delayMs: 500,
|
||||
mode: "emit",
|
||||
coalesced: true,
|
||||
cooldownMsApplied: 0,
|
||||
});
|
||||
|
||||
const result = requestSafeGatewayRestart({
|
||||
inspect: {
|
||||
getQueueSize: () => 0,
|
||||
getPendingReplies: () => 0,
|
||||
getEmbeddedRuns: () => 0,
|
||||
getActiveTasks: () => 0,
|
||||
getTaskBlockers: () => [],
|
||||
},
|
||||
});
|
||||
|
||||
expect(result.status).toBe("coalesced");
|
||||
});
|
||||
});
|
||||
166
src/infra/restart-coordinator.ts
Normal file
166
src/infra/restart-coordinator.ts
Normal file
@@ -0,0 +1,166 @@
|
||||
import { getActiveEmbeddedRunCount } from "../agents/pi-embedded-runner/run-state.js";
|
||||
import { getTotalPendingReplies } from "../auto-reply/reply/dispatcher-registry.js";
|
||||
import { getTotalQueueSize } from "../process/command-queue.js";
|
||||
import {
|
||||
getInspectableActiveTaskRestartBlockers,
|
||||
type ActiveTaskRestartBlocker,
|
||||
} from "../tasks/task-registry.maintenance.js";
|
||||
import { scheduleGatewaySigusr1Restart, type ScheduledRestart } from "./restart.js";
|
||||
|
||||
export type SafeGatewayRestartCounts = {
|
||||
queueSize: number;
|
||||
pendingReplies: number;
|
||||
embeddedRuns: number;
|
||||
activeTasks: number;
|
||||
totalActive: number;
|
||||
};
|
||||
|
||||
export type SafeGatewayRestartBlocker = {
|
||||
kind: "queue" | "reply" | "embedded-run" | "task";
|
||||
count: number;
|
||||
message: string;
|
||||
task?: ActiveTaskRestartBlocker;
|
||||
};
|
||||
|
||||
export type SafeGatewayRestartPreflight = {
|
||||
safe: boolean;
|
||||
counts: SafeGatewayRestartCounts;
|
||||
blockers: SafeGatewayRestartBlocker[];
|
||||
summary: string;
|
||||
};
|
||||
|
||||
export type SafeGatewayRestartRequestResult = {
|
||||
ok: true;
|
||||
status: "scheduled" | "deferred" | "coalesced";
|
||||
preflight: SafeGatewayRestartPreflight;
|
||||
restart: ScheduledRestart;
|
||||
};
|
||||
|
||||
type SafeRestartInspectors = {
|
||||
getQueueSize: () => number;
|
||||
getPendingReplies: () => number;
|
||||
getEmbeddedRuns: () => number;
|
||||
getActiveTasks: () => number;
|
||||
getTaskBlockers: () => ActiveTaskRestartBlocker[];
|
||||
};
|
||||
|
||||
const defaultInspectors: SafeRestartInspectors = {
|
||||
getQueueSize: getTotalQueueSize,
|
||||
getPendingReplies: getTotalPendingReplies,
|
||||
getEmbeddedRuns: getActiveEmbeddedRunCount,
|
||||
getActiveTasks: () => getInspectableActiveTaskRestartBlockers().length,
|
||||
getTaskBlockers: getInspectableActiveTaskRestartBlockers,
|
||||
};
|
||||
|
||||
function normalizeCount(value: number): number {
|
||||
return Number.isFinite(value) && value > 0 ? Math.floor(value) : 0;
|
||||
}
|
||||
|
||||
function formatTaskBlocker(task: ActiveTaskRestartBlocker): string {
|
||||
return [
|
||||
`taskId=${task.taskId}`,
|
||||
task.runId ? `runId=${task.runId}` : null,
|
||||
`status=${task.status}`,
|
||||
`runtime=${task.runtime}`,
|
||||
task.label ? `label=${task.label}` : null,
|
||||
task.title ? `title=${task.title.slice(0, 80)}` : null,
|
||||
]
|
||||
.filter((value): value is string => Boolean(value))
|
||||
.join(" ");
|
||||
}
|
||||
|
||||
function createFallbackTaskBlocker(count: number): SafeGatewayRestartBlocker {
|
||||
return {
|
||||
kind: "task",
|
||||
count,
|
||||
message: `${count} active background task run(s)`,
|
||||
};
|
||||
}
|
||||
|
||||
export function createSafeGatewayRestartPreflight(
|
||||
inspectors: Partial<SafeRestartInspectors> = {},
|
||||
): SafeGatewayRestartPreflight {
|
||||
const resolved = { ...defaultInspectors, ...inspectors };
|
||||
const counts: SafeGatewayRestartCounts = {
|
||||
queueSize: normalizeCount(resolved.getQueueSize()),
|
||||
pendingReplies: normalizeCount(resolved.getPendingReplies()),
|
||||
embeddedRuns: normalizeCount(resolved.getEmbeddedRuns()),
|
||||
activeTasks: normalizeCount(resolved.getActiveTasks()),
|
||||
totalActive: 0,
|
||||
};
|
||||
counts.totalActive =
|
||||
counts.queueSize + counts.pendingReplies + counts.embeddedRuns + counts.activeTasks;
|
||||
|
||||
const blockers: SafeGatewayRestartBlocker[] = [];
|
||||
if (counts.queueSize > 0) {
|
||||
blockers.push({
|
||||
kind: "queue",
|
||||
count: counts.queueSize,
|
||||
message: `${counts.queueSize} queued or active operation(s)`,
|
||||
});
|
||||
}
|
||||
if (counts.pendingReplies > 0) {
|
||||
blockers.push({
|
||||
kind: "reply",
|
||||
count: counts.pendingReplies,
|
||||
message: `${counts.pendingReplies} pending reply delivery operation(s)`,
|
||||
});
|
||||
}
|
||||
if (counts.embeddedRuns > 0) {
|
||||
blockers.push({
|
||||
kind: "embedded-run",
|
||||
count: counts.embeddedRuns,
|
||||
message: `${counts.embeddedRuns} active embedded run(s)`,
|
||||
});
|
||||
}
|
||||
if (counts.activeTasks > 0) {
|
||||
const taskBlockers = resolved.getTaskBlockers();
|
||||
if (taskBlockers.length === 0) {
|
||||
blockers.push(createFallbackTaskBlocker(counts.activeTasks));
|
||||
} else {
|
||||
for (const task of taskBlockers.slice(0, 8)) {
|
||||
blockers.push({
|
||||
kind: "task",
|
||||
count: 1,
|
||||
message: formatTaskBlocker(task),
|
||||
task,
|
||||
});
|
||||
}
|
||||
const omitted = counts.activeTasks - taskBlockers.length;
|
||||
if (omitted > 0) {
|
||||
blockers.push(createFallbackTaskBlocker(omitted));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const summary =
|
||||
blockers.length === 0
|
||||
? "safe to restart now"
|
||||
: `restart deferred: ${blockers.map((blocker) => blocker.message).join("; ")}`;
|
||||
return {
|
||||
safe: counts.totalActive === 0,
|
||||
counts,
|
||||
blockers,
|
||||
summary,
|
||||
};
|
||||
}
|
||||
|
||||
export function requestSafeGatewayRestart(
|
||||
opts: {
|
||||
reason?: string;
|
||||
delayMs?: number;
|
||||
inspect?: Partial<SafeRestartInspectors>;
|
||||
} = {},
|
||||
): SafeGatewayRestartRequestResult {
|
||||
const preflight = createSafeGatewayRestartPreflight(opts.inspect);
|
||||
const restart = scheduleGatewaySigusr1Restart({
|
||||
delayMs: opts.delayMs ?? 0,
|
||||
reason: opts.reason ?? "gateway.restart.safe",
|
||||
});
|
||||
return {
|
||||
ok: true,
|
||||
status: restart.coalesced ? "coalesced" : preflight.safe ? "scheduled" : "deferred",
|
||||
preflight,
|
||||
restart,
|
||||
};
|
||||
}
|
||||
@@ -10,6 +10,7 @@ import {
|
||||
getDetachedTaskLifecycleRuntime,
|
||||
} from "./detached-task-runtime.js";
|
||||
import {
|
||||
getInspectableActiveTaskRestartBlockers,
|
||||
previewTaskRegistryMaintenance,
|
||||
reconcileInspectableTasks,
|
||||
resetTaskRegistryMaintenanceRuntimeForTests,
|
||||
@@ -250,6 +251,44 @@ describe("task-registry maintenance issue #60299", () => {
|
||||
expect(currentTasks.get(task.taskId)).toMatchObject({ status: "running" });
|
||||
});
|
||||
|
||||
it("only treats started non-ended running tasks as restart blockers", () => {
|
||||
const now = Date.now();
|
||||
const activeRunning = makeStaleTask({
|
||||
taskId: "task-running-live",
|
||||
runtime: "cli",
|
||||
status: "running",
|
||||
createdAt: now,
|
||||
startedAt: now,
|
||||
lastEventAt: now,
|
||||
runId: "run-running-live",
|
||||
});
|
||||
const queued = makeStaleTask({
|
||||
taskId: "task-queued-durable",
|
||||
runtime: "acp",
|
||||
status: "queued",
|
||||
createdAt: now,
|
||||
startedAt: undefined,
|
||||
lastEventAt: now,
|
||||
});
|
||||
const staleInconsistent = makeStaleTask({
|
||||
taskId: "task-running-ended",
|
||||
runtime: "subagent",
|
||||
status: "running",
|
||||
endedAt: now - 1_000,
|
||||
});
|
||||
|
||||
createTaskRegistryMaintenanceHarness({ tasks: [activeRunning, queued, staleInconsistent] });
|
||||
|
||||
expect(getInspectableActiveTaskRestartBlockers()).toEqual([
|
||||
expect.objectContaining({
|
||||
taskId: "task-running-live",
|
||||
status: "running",
|
||||
runtime: "cli",
|
||||
runId: "run-running-live",
|
||||
}),
|
||||
]);
|
||||
});
|
||||
|
||||
it("marks subagent tasks lost when their child session recovery is tombstoned", async () => {
|
||||
const childSessionKey = "agent:main:subagent:wedged-child";
|
||||
const task = makeStaleTask({
|
||||
|
||||
@@ -839,7 +839,7 @@ configureTaskAuditTaskProvider(reconcileInspectableTasks);
|
||||
|
||||
export type ActiveTaskRestartBlocker = {
|
||||
taskId: string;
|
||||
status: Extract<TaskStatus, "queued" | "running">;
|
||||
status: Extract<TaskStatus, "running">;
|
||||
runtime: TaskRecord["runtime"];
|
||||
runId?: string;
|
||||
label?: string;
|
||||
@@ -849,13 +849,23 @@ export type ActiveTaskRestartBlocker = {
|
||||
function isActiveTaskRestartBlockerStatus(
|
||||
status: TaskStatus,
|
||||
): status is ActiveTaskRestartBlocker["status"] {
|
||||
return status === "queued" || status === "running";
|
||||
return status === "running";
|
||||
}
|
||||
|
||||
function isTaskRestartBlocker(task: TaskRecord): task is TaskRecord & {
|
||||
status: ActiveTaskRestartBlocker["status"];
|
||||
} {
|
||||
// A task that is merely queued has not started user work yet; durable queued
|
||||
// work can survive a gateway restart and should not indefinitely block one.
|
||||
// Likewise, stale records that still say "running" but already have endedAt
|
||||
// are registry inconsistencies, not live restart blockers.
|
||||
return isActiveTaskRestartBlockerStatus(task.status) && !task.endedAt;
|
||||
}
|
||||
|
||||
export function getInspectableActiveTaskRestartBlockers(): ActiveTaskRestartBlocker[] {
|
||||
const blockers: ActiveTaskRestartBlocker[] = [];
|
||||
for (const task of reconcileInspectableTasks()) {
|
||||
if (!isActiveTaskRestartBlockerStatus(task.status)) {
|
||||
if (!isTaskRestartBlocker(task)) {
|
||||
continue;
|
||||
}
|
||||
const blocker: ActiveTaskRestartBlocker = {
|
||||
|
||||
Reference in New Issue
Block a user