fix(gateway): expose restart drain controls

This commit is contained in:
Vincent Koc
2026-05-02 14:43:53 -07:00
parent 624eaf5d4a
commit f6f8d74419
15 changed files with 540 additions and 41 deletions

View File

@@ -326,6 +326,27 @@ describe("runServiceRestart token drift", () => {
expect(service.restart).toHaveBeenCalledTimes(1);
});
it("writes restart force and wait options into the service-manager intent", async () => {
service.readRuntime.mockResolvedValue({ status: "running", pid: 1234 });
await runServiceRestart({
...createServiceRunArgs(),
opts: {
json: true,
restartIntent: {
waitMs: 2_500,
},
},
});
expect(writeGatewayRestartIntentSync).toHaveBeenCalledWith({
targetPid: 1234,
intent: {
waitMs: 2_500,
},
});
});
it("clears restart intent when service-manager restart fails before signaling", async () => {
service.readRuntime.mockResolvedValue({ status: "running", pid: 1234 });
writeGatewayRestartIntentSync.mockReturnValueOnce(true);

View File

@@ -13,6 +13,7 @@ import { isSystemdUserServiceAvailable } from "../../daemon/systemd.js";
import { isGatewaySecretRefUnavailableError } from "../../gateway/credentials.js";
import {
clearGatewayRestartIntentSync,
type GatewayRestartIntent,
writeGatewayRestartIntentSync,
} from "../../infra/restart.js";
import { isWSL } from "../../infra/wsl.js";
@@ -28,6 +29,9 @@ import { filterContainerGenericHints } from "./shared.js";
type DaemonLifecycleOptions = {
json?: boolean;
force?: boolean;
wait?: string;
restartIntent?: GatewayRestartIntent;
};
type RestartPostCheckContext = {
@@ -440,6 +444,7 @@ export async function runServiceRestart(params: {
const json = Boolean(params.opts?.json);
const { stdout, emit, fail } = createDaemonActionContext({ action: "restart", json });
const warnings: string[] = [];
const restartIntent = params.opts?.restartIntent;
let handledRecovery: ServiceRecoveryResult | null = null;
let recoveredLoadedState: boolean | null = null;
const emitScheduledRestart = (
@@ -552,6 +557,7 @@ export async function runServiceRestart(params: {
const runtime = await params.service.readRuntime(process.env).catch(() => null);
wroteRestartIntent = writeGatewayRestartIntentSync({
targetPid: runtime?.pid,
...(restartIntent ? { intent: restartIntent } : {}),
});
}
try {

View File

@@ -7,10 +7,12 @@ import {
formatGatewayPidList,
signalVerifiedGatewayPidSync,
} from "../../infra/gateway-processes.js";
import { type GatewayRestartIntent, writeGatewayRestartIntentSync } from "../../infra/restart.js";
import { defaultRuntime } from "../../runtime.js";
import { normalizeOptionalString } from "../../shared/string-coerce.js";
import { theme } from "../../terminal/theme.js";
import { formatCliCommand } from "../command-format.js";
import { parseDurationMs } from "../parse-duration.js";
import { recoverInstalledLaunchAgent } from "./launchd-recovery.js";
import {
runServiceRestart,
@@ -122,7 +124,25 @@ async function stopGatewayWithoutServiceManager(port: number) {
};
}
async function restartGatewayWithoutServiceManager(port: number) {
function resolveGatewayRestartIntentOptions(
opts: DaemonLifecycleOptions,
): GatewayRestartIntent | undefined {
if (opts.force && opts.wait !== undefined) {
throw new Error("--force cannot be combined with --wait");
}
if (opts.force) {
return { force: true };
}
if (opts.wait !== undefined) {
return { waitMs: parseDurationMs(String(opts.wait)) };
}
return undefined;
}
async function restartGatewayWithoutServiceManager(
port: number,
restartIntent?: GatewayRestartIntent,
) {
await assertUnmanagedGatewayRestartEnabled(port);
const pids = resolveVerifiedGatewayListenerPids(port);
if (pids.length === 0) {
@@ -133,6 +153,10 @@ async function restartGatewayWithoutServiceManager(port: number) {
`multiple gateway processes are listening on port ${port}: ${formatGatewayPidList(pids)}; use "openclaw gateway status --deep" before retrying restart`,
);
}
writeGatewayRestartIntentSync({
targetPid: pids[0],
...(restartIntent ? { intent: restartIntent } : {}),
});
signalVerifiedGatewayPidSync(pids[0], "SIGUSR1");
return {
result: "restarted" as const,
@@ -197,6 +221,7 @@ export async function runDaemonRestart(opts: DaemonLifecycleOptions = {}): Promi
const json = Boolean(opts.json);
const service = resolveGatewayService();
let restartedWithoutServiceManager = false;
const restartIntent = resolveGatewayRestartIntentOptions(opts);
const restartPort = await resolveGatewayLifecyclePort(service).catch(() =>
resolveGatewayPortFallback(),
);
@@ -208,7 +233,10 @@ export async function runDaemonRestart(opts: DaemonLifecycleOptions = {}): Promi
serviceNoun: "Gateway",
service,
renderStartHints: renderGatewayServiceStartHints,
opts,
opts: {
...opts,
...(restartIntent ? { restartIntent } : {}),
},
checkTokenDrift: true,
onNotLoaded: async () => {
if (process.platform === "darwin") {
@@ -217,7 +245,7 @@ export async function runDaemonRestart(opts: DaemonLifecycleOptions = {}): Promi
return recovered;
}
}
const handled = await restartGatewayWithoutServiceManager(restartPort);
const handled = await restartGatewayWithoutServiceManager(restartPort, restartIntent);
if (handled) {
restartedWithoutServiceManager = true;
return handled;

View File

@@ -59,6 +59,28 @@ describe("addGatewayServiceCommands", () => {
);
},
},
{
name: "forwards restart force and wait controls",
argv: ["restart", "--wait", "30s"],
assert: () => {
expect(runDaemonRestart).toHaveBeenCalledWith(
expect.objectContaining({
wait: "30s",
}),
);
},
},
{
name: "forwards restart force control",
argv: ["restart", "--force"],
assert: () => {
expect(runDaemonRestart).toHaveBeenCalledWith(
expect.objectContaining({
force: true,
}),
);
},
},
{
name: "forwards status auth collisions from parent gateway command",
argv: ["status", "--token", "tok_status", "--password", "pw_status"],

View File

@@ -1,7 +1,7 @@
import type { Command } from "commander";
import { createLazyImportLoader } from "../../shared/lazy-promise.js";
import { inheritOptionFromParent } from "../command-options.js";
import type { DaemonInstallOptions, GatewayRpcOpts } from "./types.js";
import type { DaemonInstallOptions, DaemonLifecycleOptions, GatewayRpcOpts } from "./types.js";
const daemonInstallModuleLoader = createLazyImportLoader(() => import("./install.runtime.js"));
const daemonLifecycleModuleLoader = createLazyImportLoader(() => import("./lifecycle.runtime.js"));
@@ -44,6 +44,14 @@ function resolveRpcOptions(cmdOpts: GatewayRpcOpts, command?: Command): GatewayR
};
}
function resolveRestartOptions(cmdOpts: DaemonLifecycleOptions, command?: Command) {
const parentForce = inheritOptionFromParent<boolean>(command, "force");
return {
...cmdOpts,
force: Boolean(cmdOpts.force || parentForce),
};
}
export function addGatewayServiceCommands(parent: Command, opts?: { statusDescription?: string }) {
parent
.command("status")
@@ -113,9 +121,14 @@ export function addGatewayServiceCommands(parent: Command, opts?: { statusDescri
parent
.command("restart")
.description("Restart the Gateway service (launchd/systemd/schtasks)")
.option("--force", "Restart immediately without waiting for active gateway work", false)
.option(
"--wait <duration>",
"Wait duration before forcing restart (ms, 10s, 5m; 0 waits indefinitely)",
)
.option("--json", "Output JSON", false)
.action(async (cmdOpts) => {
.action(async (cmdOpts, command) => {
const { runDaemonRestart } = await loadDaemonLifecycleModule();
await runDaemonRestart(cmdOpts);
await runDaemonRestart(resolveRestartOptions(cmdOpts, command));
});
}

View File

@@ -26,4 +26,6 @@ export type DaemonInstallOptions = {
export type DaemonLifecycleOptions = {
json?: boolean;
force?: boolean;
wait?: string;
};

View File

@@ -10,6 +10,7 @@ export {
} from "../../infra/process-respawn.js";
export {
resolveGatewayRestartDeferralTimeoutMs,
consumeGatewayRestartIntentPayloadSync,
consumeGatewayRestartIntentSync,
consumeGatewaySigusr1RestartAuthorization,
isGatewaySigusr1RestartExternallyAllowed,
@@ -27,4 +28,5 @@ export {
resetAllLanes,
waitForActiveTasks,
} from "../../process/command-queue.js";
export { getInspectableActiveTaskRestartBlockers } from "../../tasks/task-registry.maintenance.js";
export { reloadTaskRegistryFromStore } from "../../tasks/runtime-internal.js";

View File

@@ -5,6 +5,9 @@ import { pickBeaconHost, pickGatewayPort } from "./discover.js";
const acquireGatewayLock = vi.fn(async (_opts?: { port?: number }) => ({
release: vi.fn(async () => {}),
}));
const consumeGatewayRestartIntentPayloadSync = vi.fn<
() => { force?: boolean; waitMs?: number } | null
>(() => null);
const consumeGatewaySigusr1RestartAuthorization = vi.fn(() => true);
const consumeGatewayRestartIntentSync = vi.fn(() => false);
const isGatewaySigusr1RestartExternallyAllowed = vi.fn(() => false);
@@ -21,6 +24,17 @@ const scheduleGatewaySigusr1Restart = vi.fn((_opts?: { delayMs?: number; reason?
cooldownMsApplied: 0,
}));
const getActiveTaskCount = vi.fn(() => 0);
const getInspectableActiveTaskRestartBlockers = vi.fn(
() =>
[] as Array<{
taskId: string;
status: "queued" | "running";
runtime: "subagent" | "acp" | "cli" | "cron";
runId?: string;
label?: string;
title?: string;
}>,
);
const markGatewayDraining = vi.fn();
const waitForActiveTasks = vi.fn(async (_timeoutMs?: number) => ({ drained: true }));
const resetAllLanes = vi.fn();
@@ -64,6 +78,7 @@ vi.mock("../../infra/gateway-lock.js", () => ({
}));
vi.mock("../../infra/restart.js", () => ({
consumeGatewayRestartIntentPayloadSync: () => consumeGatewayRestartIntentPayloadSync(),
consumeGatewaySigusr1RestartAuthorization: () => consumeGatewaySigusr1RestartAuthorization(),
consumeGatewayRestartIntentSync: () => consumeGatewayRestartIntentSync(),
isGatewaySigusr1RestartExternallyAllowed: () => isGatewaySigusr1RestartExternallyAllowed(),
@@ -103,6 +118,10 @@ vi.mock("../../tasks/runtime-internal.js", () => ({
reloadTaskRegistryFromStore: () => reloadTaskRegistryFromStore(),
}));
vi.mock("../../tasks/task-registry.maintenance.js", () => ({
getInspectableActiveTaskRestartBlockers: () => getInspectableActiveTaskRestartBlockers(),
}));
vi.mock("../../agents/pi-embedded-runner/runs.js", () => ({
abortEmbeddedPiRun: (sessionId?: string, opts?: { mode?: "all" | "compacting" }) =>
abortEmbeddedPiRun(sessionId, opts),
@@ -270,7 +289,7 @@ describe("runGatewayLoop", () => {
it("treats SIGTERM with a restart intent as a draining restart", async () => {
vi.clearAllMocks();
consumeGatewayRestartIntentSync.mockReturnValueOnce(true);
consumeGatewayRestartIntentPayloadSync.mockReturnValueOnce({});
getActiveTaskCount.mockReturnValueOnce(1).mockReturnValue(0);
await withIsolatedSignals(async ({ captureSignal }) => {
@@ -301,7 +320,7 @@ describe("runGatewayLoop", () => {
await new Promise<void>((resolve) => setImmediate(resolve));
await new Promise<void>((resolve) => setImmediate(resolve));
expect(consumeGatewayRestartIntentSync).toHaveBeenCalledOnce();
expect(consumeGatewayRestartIntentPayloadSync).toHaveBeenCalledOnce();
expect(markGatewayDraining).toHaveBeenCalledOnce();
expect(waitForActiveTasks).toHaveBeenCalledWith(90_000);
expect(closeFirst).toHaveBeenCalledWith({
@@ -321,6 +340,68 @@ describe("runGatewayLoop", () => {
});
});
it("uses restart intent wait overrides for SIGTERM drain", async () => {
vi.clearAllMocks();
consumeGatewayRestartIntentPayloadSync.mockReturnValueOnce({ waitMs: 2_500 });
getActiveTaskCount.mockReturnValueOnce(1).mockReturnValue(0);
await withIsolatedSignals(async ({ captureSignal }) => {
const { start, exited } = await createSignaledLoopHarness();
const sigterm = captureSignal("SIGTERM");
const sigint = captureSignal("SIGINT");
sigterm();
await new Promise<void>((resolve) => setImmediate(resolve));
await new Promise<void>((resolve) => setImmediate(resolve));
expect(waitForActiveTasks).toHaveBeenCalledWith(2_500);
expect(start).toHaveBeenCalledTimes(2);
sigint();
await expect(exited).resolves.toBe(0);
});
});
it("forces SIGTERM restarts without waiting for active task drain", async () => {
vi.clearAllMocks();
consumeGatewayRestartIntentPayloadSync.mockReturnValueOnce({ force: true });
getActiveTaskCount.mockReturnValueOnce(1).mockReturnValue(0);
getActiveEmbeddedRunCount.mockReturnValueOnce(1).mockReturnValue(0);
getInspectableActiveTaskRestartBlockers.mockReturnValueOnce([
{
taskId: "task-force",
runId: "run-force",
status: "running",
runtime: "cron",
label: "forced",
},
]);
await withIsolatedSignals(async ({ captureSignal }) => {
const { start, exited } = await createSignaledLoopHarness();
const sigterm = captureSignal("SIGTERM");
const sigint = captureSignal("SIGINT");
sigterm();
await new Promise<void>((resolve) => setImmediate(resolve));
await new Promise<void>((resolve) => setImmediate(resolve));
expect(waitForActiveTasks).not.toHaveBeenCalled();
expect(waitForActiveEmbeddedRuns).not.toHaveBeenCalled();
expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "all" });
expect(gatewayLog.warn).toHaveBeenCalledWith(
expect.stringContaining("restart blocked by active task run(s): taskId=task-force"),
);
expect(gatewayLog.warn).toHaveBeenCalledWith(
"forced restart requested; skipping active work drain",
);
expect(start).toHaveBeenCalledTimes(2);
sigint();
await expect(exited).resolves.toBe(0);
});
});
it("restarts after SIGUSR1 even when drain times out, and resets runtime state for the new iteration", async () => {
vi.clearAllMocks();
loadConfig.mockReturnValue({

View File

@@ -15,6 +15,10 @@ const UPDATE_RESPAWN_HEALTH_POLL_MS = 200;
type GatewayRunSignalAction = "stop" | "restart";
type RestartDrainTimeoutMs = number | undefined;
type RestartIntentOptions = {
force?: boolean;
waitMs?: number;
};
type GatewayLifecycleRuntimeModule = typeof import("./lifecycle.runtime.js");
@@ -245,7 +249,15 @@ export async function runGatewayLoop(params: {
const SUPERVISOR_STOP_TIMEOUT_MS = 30_000;
const SHUTDOWN_TIMEOUT_MS = SUPERVISOR_STOP_TIMEOUT_MS - 5_000;
const resolveRestartDrainTimeoutMs = async (): Promise<RestartDrainTimeoutMs> => {
const resolveRestartDrainTimeoutMs = async (
restartIntent?: RestartIntentOptions,
): Promise<RestartDrainTimeoutMs> => {
if (restartIntent?.force) {
return 0;
}
if (typeof restartIntent?.waitMs === "number" && Number.isFinite(restartIntent.waitMs)) {
return restartIntent.waitMs > 0 ? Math.floor(restartIntent.waitMs) : undefined;
}
try {
const { getRuntimeConfig, resolveGatewayRestartDeferralTimeoutMs } =
await loadGatewayLifecycleRuntimeModule();
@@ -256,7 +268,12 @@ export async function runGatewayLoop(params: {
}
};
const request = (action: GatewayRunSignalAction, signal: string, restartReason?: string) => {
const request = (
action: GatewayRunSignalAction,
signal: string,
restartReason?: string,
restartIntent?: RestartIntentOptions,
) => {
if (shuttingDown) {
gatewayLog.info(`received ${signal} during shutdown; ignoring`);
return;
@@ -295,7 +312,9 @@ export async function runGatewayLoop(params: {
};
void (async () => {
const restartDrainTimeoutMs = isRestart ? await resolveRestartDrainTimeoutMs() : 0;
const restartDrainTimeoutMs = isRestart
? await resolveRestartDrainTimeoutMs(restartIntent)
: 0;
if (!isRestart) {
armForceExitTimer(SHUTDOWN_TIMEOUT_MS);
} else if (restartDrainTimeoutMs !== undefined) {
@@ -319,12 +338,35 @@ export async function runGatewayLoop(params: {
if (isRestart) {
const {
abortEmbeddedPiRun,
getInspectableActiveTaskRestartBlockers,
getActiveEmbeddedRunCount,
getActiveTaskCount,
markGatewayDraining,
waitForActiveEmbeddedRuns,
waitForActiveTasks,
} = await loadGatewayLifecycleRuntimeModule();
const formatTaskBlockers = () => {
const blockers = getInspectableActiveTaskRestartBlockers();
if (blockers.length === 0) {
return null;
}
const shown = blockers
.slice(0, 8)
.map((task) =>
[
`taskId=${task.taskId}`,
task.runId ? `runId=${task.runId}` : null,
`status=${task.status}`,
`runtime=${task.runtime}`,
task.label ? `label=${task.label}` : null,
task.title ? `title=${task.title.slice(0, 80)}` : null,
]
.filter((value): value is string => Boolean(value))
.join(" "),
);
const omitted = blockers.length - shown.length;
return omitted > 0 ? `${shown.join("; ")}; +${omitted} more` : shown.join("; ");
};
const createStillPendingDrainLogger = () =>
setInterval(() => {
gatewayLog.warn(
@@ -345,25 +387,34 @@ export async function runGatewayLoop(params: {
}
if (activeTasks > 0 || activeRuns > 0) {
const taskBlockers = formatTaskBlockers();
gatewayLog.info(
`draining ${activeTasks} active task(s) and ${activeRuns} active embedded run(s) before restart ${formatRestartDrainBudget()}`,
);
const stillPendingDrainLogger = createStillPendingDrainLogger();
const [tasksDrain, runsDrain] = await Promise.all([
activeTasks > 0
? waitForActiveTasks(restartDrainTimeoutMs)
: Promise.resolve({ drained: true }),
activeRuns > 0
? waitForActiveEmbeddedRuns(restartDrainTimeoutMs)
: Promise.resolve({ drained: true }),
]).finally(() => clearInterval(stillPendingDrainLogger));
if (tasksDrain.drained && runsDrain.drained) {
gatewayLog.info("all active work drained");
} else {
gatewayLog.warn("drain timeout reached; proceeding with restart");
// Final best-effort abort to avoid carrying active runs into the
// next lifecycle when drain time budget is exhausted.
if (taskBlockers) {
gatewayLog.warn(`restart blocked by active task run(s): ${taskBlockers}`);
}
if (restartIntent?.force) {
gatewayLog.warn("forced restart requested; skipping active work drain");
abortEmbeddedPiRun(undefined, { mode: "all" });
} else {
const stillPendingDrainLogger = createStillPendingDrainLogger();
const [tasksDrain, runsDrain] = await Promise.all([
activeTasks > 0
? waitForActiveTasks(restartDrainTimeoutMs)
: Promise.resolve({ drained: true }),
activeRuns > 0
? waitForActiveEmbeddedRuns(restartDrainTimeoutMs)
: Promise.resolve({ drained: true }),
]).finally(() => clearInterval(stillPendingDrainLogger));
if (tasksDrain.drained && runsDrain.drained) {
gatewayLog.info("all active work drained");
} else {
gatewayLog.warn("drain timeout reached; proceeding with restart");
// Final best-effort abort to avoid carrying active runs into the
// next lifecycle when drain time budget is exhausted.
abortEmbeddedPiRun(undefined, { mode: "all" });
}
}
}
}
@@ -390,8 +441,9 @@ export async function runGatewayLoop(params: {
const onSigterm = () => {
gatewayLog.info("signal SIGTERM received");
void (async () => {
const { consumeGatewayRestartIntentSync } = await loadGatewayLifecycleRuntimeModule();
request(consumeGatewayRestartIntentSync() ? "restart" : "stop", "SIGTERM");
const { consumeGatewayRestartIntentPayloadSync } = await loadGatewayLifecycleRuntimeModule();
const restartIntent = consumeGatewayRestartIntentPayloadSync();
request(restartIntent ? "restart" : "stop", "SIGTERM", undefined, restartIntent ?? undefined);
})();
};
const onSigint = () => {
@@ -402,12 +454,18 @@ export async function runGatewayLoop(params: {
gatewayLog.info("signal SIGUSR1 received");
void (async () => {
const {
consumeGatewayRestartIntentPayloadSync,
consumeGatewaySigusr1RestartAuthorization,
isGatewaySigusr1RestartExternallyAllowed,
markGatewaySigusr1RestartHandled,
peekGatewaySigusr1RestartReason,
scheduleGatewaySigusr1Restart,
} = await loadGatewayLifecycleRuntimeModule();
const restartIntent = consumeGatewayRestartIntentPayloadSync();
if (restartIntent) {
request("restart", "SIGUSR1", "gateway.restart", restartIntent);
return;
}
const authorized = consumeGatewaySigusr1RestartAuthorization();
if (!authorized) {
if (!isGatewaySigusr1RestartExternallyAllowed()) {

View File

@@ -9,6 +9,87 @@ import type { ChannelKind } from "./config-reload-plan.js";
import type { GatewayPluginReloadResult } from "./server-reload-handlers.js";
import { __testing, createGatewayReloadHandlers } from "./server-reload-handlers.js";
const hoisted = vi.hoisted(() => ({
activeTaskCount: { value: 0 },
activeTaskBlockers: [] as Array<{
taskId: string;
status: "queued" | "running";
runtime: "subagent" | "acp" | "cli" | "cron";
runId?: string;
label?: string;
title?: string;
}>,
}));
vi.mock("../tasks/task-registry.maintenance.js", async () => {
const actual = await vi.importActual<typeof import("../tasks/task-registry.maintenance.js")>(
"../tasks/task-registry.maintenance.js",
);
return {
...actual,
getInspectableActiveTaskRestartBlockers: () => hoisted.activeTaskBlockers,
getInspectableTaskRegistrySummary: () => ({
total: hoisted.activeTaskCount.value,
active: hoisted.activeTaskCount.value,
terminal: 0,
failures: 0,
byStatus: {
queued: 0,
running: hoisted.activeTaskCount.value,
succeeded: 0,
failed: 0,
timed_out: 0,
cancelled: 0,
lost: 0,
},
byRuntime: {
subagent: hoisted.activeTaskCount.value,
acp: 0,
cli: 0,
cron: 0,
},
}),
};
});
function createReloadHandlersForTest(logReload = { info: vi.fn(), warn: vi.fn() }) {
const cron = { start: vi.fn(async () => {}), stop: vi.fn() };
const heartbeatRunner = {
stop: vi.fn(),
updateConfig: vi.fn(),
};
return createGatewayReloadHandlers({
deps: {} as never,
broadcast: vi.fn(),
getState: () => ({
hooksConfig: {} as never,
hookClientIpConfig: {} as never,
heartbeatRunner: heartbeatRunner as never,
cronState: { cron, storePath: "/tmp/cron.json", cronEnabled: false } as never,
channelHealthMonitor: null,
}),
setState: vi.fn(),
startChannel: vi.fn(async () => {}),
stopChannel: vi.fn(async () => {}),
reloadPlugins: vi.fn(
async (): Promise<GatewayPluginReloadResult> => ({
restartChannels: new Set(),
activeChannels: new Set(),
}),
),
logHooks: { info: vi.fn(), warn: vi.fn(), error: vi.fn() },
logChannels: { info: vi.fn(), error: vi.fn() },
logCron: { error: vi.fn() },
logReload,
createHealthMonitor: () => null,
});
}
afterEach(() => {
hoisted.activeTaskCount.value = 0;
hoisted.activeTaskBlockers.length = 0;
});
describe("gateway reload recovery handlers", () => {
afterEach(() => {
embeddedRunTesting.resetActiveEmbeddedRuns();
@@ -52,6 +133,66 @@ describe("gateway reload recovery handlers", () => {
});
});
describe("gateway restart deferral preflight", () => {
it("logs active task run ids before waiting and when forcing after timeout", async () => {
const restartTesting = (await import("../infra/restart.js")).__testing;
restartTesting.resetSigusr1State();
const logReload = { info: vi.fn(), warn: vi.fn() };
const { requestGatewayRestart } = createReloadHandlersForTest(logReload);
hoisted.activeTaskCount.value = 1;
hoisted.activeTaskBlockers.push({
taskId: "task-nightly",
runId: "run-nightly",
status: "running",
runtime: "cron",
label: "nightly sync",
title: "refresh all accounts",
});
const signalSpy = vi.fn();
process.once("SIGUSR1", signalSpy);
vi.useFakeTimers();
try {
requestGatewayRestart(
{
changedPaths: ["gateway.port"],
restartGateway: true,
restartReasons: ["gateway.port"],
hotReasons: [],
reloadHooks: false,
restartGmailWatcher: false,
restartCron: false,
restartHeartbeat: false,
restartHealthMonitor: false,
reloadPlugins: false,
restartChannels: new Set(),
disposeMcpRuntimes: false,
noopPaths: [],
},
{
gateway: { reload: { deferralTimeoutMs: 1_000 } },
},
);
expect(logReload.warn).toHaveBeenCalledWith(
expect.stringContaining("restart blocked by active task run(s): taskId=task-nightly"),
);
expect(logReload.warn).toHaveBeenCalledWith(expect.stringContaining("runId=run-nightly"));
await vi.advanceTimersByTimeAsync(1_000);
await Promise.resolve();
expect(signalSpy).toHaveBeenCalledTimes(1);
expect(logReload.warn).toHaveBeenCalledWith(expect.stringContaining("; forcing restart"));
} finally {
hoisted.activeTaskCount.value = 0;
vi.useRealTimers();
process.removeListener("SIGUSR1", signalSpy);
restartTesting.resetSigusr1State();
}
});
});
describe("gateway plugin hot reload handlers", () => {
it("stops removed channel plugins from broad activation before swapping plugin runtime", async () => {
const previousSkipChannels = process.env.OPENCLAW_SKIP_CHANNELS;

View File

@@ -21,7 +21,11 @@ import {
clearSecretsRuntimeSnapshot,
getActiveSecretsRuntimeSnapshot,
} from "../secrets/runtime.js";
import { getInspectableTaskRegistrySummary } from "../tasks/task-registry.maintenance.js";
import {
getInspectableActiveTaskRestartBlockers,
getInspectableTaskRegistrySummary,
type ActiveTaskRestartBlocker,
} from "../tasks/task-registry.maintenance.js";
import type { ChannelHealthMonitor } from "./channel-health-monitor.js";
import { enqueueConfigRecoveryNotice } from "./config-recovery-notice.js";
import type { ChannelKind } from "./config-reload-plan.js";
@@ -183,6 +187,26 @@ export function createGatewayReloadHandlers(params: GatewayReloadHandlerParams)
}
return details;
};
const formatTaskBlocker = (task: ActiveTaskRestartBlocker) => {
const details = [
`taskId=${task.taskId}`,
task.runId ? `runId=${task.runId}` : null,
`status=${task.status}`,
`runtime=${task.runtime}`,
task.label ? `label=${task.label}` : null,
task.title ? `title=${task.title.slice(0, 80)}` : null,
].filter((value): value is string => Boolean(value));
return details.join(" ");
};
const formatTaskBlockers = () => {
const blockers = getInspectableActiveTaskRestartBlockers();
if (blockers.length === 0) {
return null;
}
const shown = blockers.slice(0, 8).map(formatTaskBlocker);
const omitted = blockers.length - shown.length;
return omitted > 0 ? `${shown.join("; ")}; +${omitted} more` : shown.join("; ");
};
const waitForActiveWorkBeforeChannelReload = async (
channels: Iterable<ChannelKind>,
nextConfig: OpenClawConfig,
@@ -412,6 +436,10 @@ export function createGatewayReloadHandlers(params: GatewayReloadHandlerParams)
params.logReload.warn(
`config change requires gateway restart (${reasons}) — deferring until ${initialDetails.join(", ")} complete`,
);
const taskBlockers = formatTaskBlockers();
if (taskBlockers) {
params.logReload.warn(`restart blocked by active task run(s): ${taskBlockers}`);
}
deferGatewayRestartUntilIdle({
getPendingCount: () => getActiveCounts().totalActive,
@@ -425,15 +453,21 @@ export function createGatewayReloadHandlers(params: GatewayReloadHandlerParams)
},
onStillPending: (_pending, elapsedMs) => {
const remaining = formatActiveDetails(getActiveCounts());
const taskBlockers = formatTaskBlockers();
params.logReload.warn(
`restart still deferred after ${elapsedMs}ms with ${remaining.join(", ")} active`,
`restart still deferred after ${elapsedMs}ms with ${remaining.join(", ")} active${
taskBlockers ? ` (${taskBlockers})` : ""
}`,
);
},
onTimeout: (_pending, elapsedMs) => {
const remaining = formatActiveDetails(getActiveCounts());
const taskBlockers = formatTaskBlockers();
restartPending = false;
params.logReload.warn(
`restart timeout after ${elapsedMs}ms with ${remaining.join(", ")} still active; restarting anyway`,
`restart timeout after ${elapsedMs}ms with ${remaining.join(", ")} still active${
taskBlockers ? ` (${taskBlockers})` : ""
}; forcing restart`,
);
},
onCheckError: (err) => {

View File

@@ -46,6 +46,14 @@ const hoisted = vi.hoisted(() => {
const totalPendingReplies = { value: 0 };
const totalQueueSize = { value: 0 };
const activeTaskCount = { value: 0 };
const activeTaskBlockers: Array<{
taskId: string;
status: "queued" | "running";
runtime: "subagent" | "acp" | "cli" | "cron";
runId?: string;
label?: string;
title?: string;
}> = [];
const startGmailWatcher = vi.fn(async () => ({ started: true }));
const stopGmailWatcher = vi.fn(async () => {});
@@ -150,6 +158,7 @@ const hoisted = vi.hoisted(() => {
totalPendingReplies,
totalQueueSize,
activeTaskCount,
activeTaskBlockers,
startGmailWatcher,
stopGmailWatcher,
resetModelCatalogCache,
@@ -258,6 +267,7 @@ vi.mock("../tasks/task-registry.maintenance.js", async () => {
);
return {
...actual,
getInspectableActiveTaskRestartBlockers: () => hoisted.activeTaskBlockers,
getInspectableTaskRegistrySummary: () => ({
active: hoisted.activeTaskCount.value,
queued: 0,
@@ -312,6 +322,7 @@ describe("gateway hot reload", () => {
hoisted.totalPendingReplies.value = 0;
hoisted.totalQueueSize.value = 0;
hoisted.activeTaskCount.value = 0;
hoisted.activeTaskBlockers.length = 0;
embeddedRunMock.activeIds.clear();
hoisted.resetModelCatalogCache.mockReset();
hoisted.disposeAllSessionMcpRuntimes.mockReset();

View File

@@ -2,7 +2,11 @@ import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import { consumeGatewayRestartIntentSync, writeGatewayRestartIntentSync } from "./restart.js";
import {
consumeGatewayRestartIntentPayloadSync,
consumeGatewayRestartIntentSync,
writeGatewayRestartIntentSync,
} from "./restart.js";
const tempDirs: string[] = [];
@@ -60,6 +64,24 @@ describe("gateway restart intent", () => {
expect(fs.statSync(intentPath(env)).mode & 0o777).toBe(0o600);
});
it("round-trips restart force and wait options", () => {
const env = createIntentEnv();
expect(
writeGatewayRestartIntentSync({
env,
targetPid: process.pid,
intent: { force: true, waitMs: 12_345 },
}),
).toBe(true);
expect(consumeGatewayRestartIntentPayloadSync(env)).toEqual({
force: true,
waitMs: 12_345,
});
expect(fs.existsSync(intentPath(env))).toBe(false);
});
it("does not follow an existing intent-path symlink when writing", () => {
const env = createIntentEnv();
const targetPath = path.join(env.OPENCLAW_STATE_DIR ?? "", "attacker-target.txt");

View File

@@ -91,6 +91,13 @@ type GatewayRestartIntentPayload = {
kind: "gateway-restart";
pid: number;
createdAt: number;
force?: boolean;
waitMs?: number;
};
export type GatewayRestartIntent = {
force?: boolean;
waitMs?: number;
};
function resolveGatewayRestartIntentPath(env: NodeJS.ProcessEnv = process.env): string {
@@ -117,6 +124,7 @@ function normalizeRestartIntentPid(pid: number | undefined): number | null {
export function writeGatewayRestartIntentSync(opts: {
env?: NodeJS.ProcessEnv;
targetPid?: number;
intent?: GatewayRestartIntent;
}): boolean {
const targetPid = normalizeRestartIntentPid(opts.targetPid);
if (targetPid === null) {
@@ -131,6 +139,12 @@ export function writeGatewayRestartIntentSync(opts: {
kind: "gateway-restart",
pid: targetPid,
createdAt: Date.now(),
...(opts.intent?.force ? { force: true } : {}),
...(typeof opts.intent?.waitMs === "number" &&
Number.isFinite(opts.intent.waitMs) &&
opts.intent.waitMs >= 0
? { waitMs: Math.floor(opts.intent.waitMs) }
: {}),
};
tmpPath = path.join(
path.dirname(intentPath),
@@ -168,9 +182,18 @@ function parseGatewayRestartIntent(raw: string): GatewayRestartIntentPayload | n
typeof parsed.pid === "number" &&
Number.isFinite(parsed.pid) &&
typeof parsed.createdAt === "number" &&
Number.isFinite(parsed.createdAt)
Number.isFinite(parsed.createdAt) &&
(parsed.force === undefined || typeof parsed.force === "boolean") &&
(parsed.waitMs === undefined ||
(typeof parsed.waitMs === "number" && Number.isFinite(parsed.waitMs) && parsed.waitMs >= 0))
) {
return parsed as GatewayRestartIntentPayload;
return {
kind: "gateway-restart",
pid: parsed.pid,
createdAt: parsed.createdAt,
...(parsed.force ? { force: true } : {}),
...(typeof parsed.waitMs === "number" ? { waitMs: Math.floor(parsed.waitMs) } : {}),
};
}
} catch {
return null;
@@ -178,32 +201,45 @@ function parseGatewayRestartIntent(raw: string): GatewayRestartIntentPayload | n
return null;
}
export function consumeGatewayRestartIntentSync(
export function consumeGatewayRestartIntentPayloadSync(
env: NodeJS.ProcessEnv = process.env,
now = Date.now(),
): boolean {
): GatewayRestartIntent | null {
const intentPath = resolveGatewayRestartIntentPath(env);
let raw: string;
try {
const stat = fs.lstatSync(intentPath);
if (!stat.isFile() || stat.size > GATEWAY_RESTART_INTENT_MAX_BYTES) {
return false;
return null;
}
raw = fs.readFileSync(intentPath, "utf8");
} catch {
return false;
return null;
} finally {
clearGatewayRestartIntentSync(env);
}
const payload = parseGatewayRestartIntent(raw);
if (!payload) {
return false;
return null;
}
if (payload.pid !== process.pid) {
return false;
return null;
}
const ageMs = now - payload.createdAt;
return ageMs >= 0 && ageMs <= GATEWAY_RESTART_INTENT_TTL_MS;
if (ageMs < 0 || ageMs > GATEWAY_RESTART_INTENT_TTL_MS) {
return null;
}
return {
...(payload.force ? { force: true } : {}),
...(typeof payload.waitMs === "number" ? { waitMs: payload.waitMs } : {}),
};
}
export function consumeGatewayRestartIntentSync(
env: NodeJS.ProcessEnv = process.env,
now = Date.now(),
): boolean {
return consumeGatewayRestartIntentPayloadSync(env, now) !== null;
}
function summarizeChangedPaths(paths: string[] | undefined, maxPaths = 6): string | null {

View File

@@ -837,6 +837,28 @@ export function reconcileInspectableTasks(): TaskRecord[] {
configureTaskAuditTaskProvider(reconcileInspectableTasks);
export type ActiveTaskRestartBlocker = {
taskId: string;
status: Extract<TaskStatus, "queued" | "running">;
runtime: TaskRecord["runtime"];
runId?: string;
label?: string;
title?: string;
};
export function getInspectableActiveTaskRestartBlockers(): ActiveTaskRestartBlocker[] {
return reconcileInspectableTasks()
.filter((task) => task.status === "queued" || task.status === "running")
.map((task) => ({
taskId: task.taskId,
status: task.status as Extract<TaskStatus, "queued" | "running">,
runtime: task.runtime,
...(task.runId ? { runId: task.runId } : {}),
...(task.label ? { label: task.label } : {}),
...(task.task ? { title: task.task } : {}),
}));
}
export function getInspectableTaskRegistrySummary(): TaskRegistrySummary {
return summarizeTaskRecords(reconcileInspectableTasks());
}