fix: add gateway supervisor restart handoff

This commit is contained in:
Shakker
2026-05-05 05:31:47 +01:00
parent f9da484365
commit acb0acd8dd
5 changed files with 627 additions and 5 deletions

View File

@@ -19,6 +19,7 @@ export {
resetGatewayRestartStateForInProcessRestart,
scheduleGatewaySigusr1Restart,
} from "../../infra/restart.js";
export { writeGatewayRestartHandoffSync } from "../../infra/restart-handoff.js";
export { markUpdateRestartSentinelFailure } from "../../infra/restart-sentinel.js";
export { detectRespawnSupervisor } from "../../infra/supervisor-markers.js";
export { writeDiagnosticStabilityBundleForFailureSync } from "../../logging/diagnostic-stability-bundle.js";

View File

@@ -14,6 +14,17 @@ const isGatewaySigusr1RestartExternallyAllowed = vi.fn(() => false);
const markGatewaySigusr1RestartHandled = vi.fn();
const peekGatewaySigusr1RestartReason = vi.fn<() => string | undefined>(() => undefined);
const resetGatewayRestartStateForInProcessRestart = vi.fn();
const writeGatewayRestartHandoffSync = vi.fn((_opts: unknown) => ({
kind: "gateway-supervisor-restart-handoff" as const,
version: 1 as const,
intentId: "test-intent",
pid: process.pid,
createdAt: Date.now(),
expiresAt: Date.now() + 60_000,
source: "unknown" as const,
restartKind: "full-process" as const,
supervisorMode: "external" as const,
}));
const scheduleGatewaySigusr1Restart = vi.fn((_opts?: { delayMs?: number; reason?: string }) => ({
ok: true,
pid: process.pid,
@@ -107,6 +118,10 @@ vi.mock("../../infra/restart-sentinel.js", () => ({
markUpdateRestartSentinelFailure: (reason: string) => markUpdateRestartSentinelFailure(reason),
}));
vi.mock("../../infra/restart-handoff.js", () => ({
writeGatewayRestartHandoffSync: (opts: unknown) => writeGatewayRestartHandoffSync(opts),
}));
vi.mock("../../process/command-queue.js", () => ({
getActiveTaskCount: () => getActiveTaskCount(),
markGatewayDraining: () => markGatewayDraining(),
@@ -595,6 +610,7 @@ describe("runGatewayLoop", () => {
expect(lockRelease).toHaveBeenCalled();
expect(runtime.exit).toHaveBeenCalledWith(0);
expect(exitCallOrder).toEqual(["lockRelease", "exit"]);
expect(writeGatewayRestartHandoffSync).not.toHaveBeenCalled();
});
});
@@ -616,6 +632,12 @@ describe("runGatewayLoop", () => {
sigusr1();
await expect(exited).resolves.toBe(0);
expect(runtime.exit).toHaveBeenCalledWith(0);
expect(writeGatewayRestartHandoffSync).toHaveBeenCalledWith({
restartKind: "full-process",
reason: undefined,
processInstanceId: expect.any(String),
supervisorMode: "launchd",
});
expect(Date.now() - startedAt).toBeGreaterThanOrEqual(1400);
});
} finally {
@@ -719,9 +741,40 @@ describe("runGatewayLoop", () => {
expect(respawnGatewayProcessForUpdate).toHaveBeenCalledTimes(1);
expect(start).toHaveBeenCalledTimes(1);
expect(markUpdateRestartSentinelFailure).not.toHaveBeenCalled();
expect(writeGatewayRestartHandoffSync).not.toHaveBeenCalled();
});
});
it("writes a handoff before exiting for supervised update restarts", async () => {
vi.clearAllMocks();
peekGatewaySigusr1RestartReason.mockReturnValue("update.run");
respawnGatewayProcessForUpdate.mockReturnValueOnce({
mode: "supervised",
});
try {
setPlatform("freebsd");
await withIsolatedSignals(async ({ captureSignal }) => {
const { runtime, exited } = await createSignaledLoopHarness();
const sigusr1 = captureSignal("SIGUSR1");
sigusr1();
await expect(exited).resolves.toBe(0);
expect(runtime.exit).toHaveBeenCalledWith(0);
expect(writeGatewayRestartHandoffSync).toHaveBeenCalledWith({
restartKind: "update-process",
reason: "update.run",
processInstanceId: expect.any(String),
supervisorMode: "external",
});
});
} finally {
if (originalPlatformDescriptor) {
Object.defineProperty(process, "platform", originalPlatformDescriptor);
}
}
});
it("probes the configured gateway host for update respawn health", async () => {
vi.clearAllMocks();
peekGatewaySigusr1RestartReason.mockReturnValue("update.run");

View File

@@ -1,3 +1,4 @@
import { randomUUID } from "node:crypto";
import net from "node:net";
import type { startGatewayServer } from "../../gateway/server.js";
import { formatErrorMessage } from "../../infra/errors.js";
@@ -94,6 +95,7 @@ export async function runGatewayLoop(params: {
let server: Awaited<ReturnType<typeof startGatewayServer>> | null = null;
let shuttingDown = false;
let restartResolver: (() => void) | null = null;
const processInstanceId = randomUUID();
const waitForHealthyChild = params.waitForHealthyChild ?? waitForHealthyGatewayChild;
const cleanupSignals = () => {
@@ -140,6 +142,7 @@ export async function runGatewayLoop(params: {
markUpdateRestartSentinelFailure,
respawnGatewayProcessForUpdate,
restartGatewayProcessWithFreshPid,
writeGatewayRestartHandoffSync,
} = await loadGatewayLifecycleRuntimeModule();
if (isUpdateRestart) {
@@ -176,8 +179,15 @@ export async function runGatewayLoop(params: {
return;
}
if (respawn.mode === "supervised") {
const supervisorMode = detectRespawnSupervisor(process.env, process.platform);
writeGatewayRestartHandoffSync({
restartKind: "update-process",
reason: restartReason,
processInstanceId,
supervisorMode: supervisorMode ?? "external",
});
gatewayLog.info("restart mode: update process respawn (supervisor restart)");
if (detectRespawnSupervisor(process.env, process.platform) === "launchd") {
if (supervisorMode === "launchd") {
await new Promise((resolve) => {
setTimeout(resolve, LAUNCHD_SUPERVISED_RESTART_EXIT_DELAY_MS);
});
@@ -208,15 +218,24 @@ export async function runGatewayLoop(params: {
// Release the lock BEFORE spawning so the child can acquire it immediately.
const respawn = restartGatewayProcessWithFreshPid();
if (respawn.mode === "spawned" || respawn.mode === "supervised") {
const supervisorMode =
respawn.mode === "supervised"
? detectRespawnSupervisor(process.env, process.platform)
: null;
const modeLabel =
respawn.mode === "spawned"
? `spawned pid ${respawn.pid ?? "unknown"}`
: "supervisor restart";
if (respawn.mode === "supervised") {
writeGatewayRestartHandoffSync({
restartKind: "full-process",
reason: restartReason,
processInstanceId,
supervisorMode: supervisorMode ?? "external",
});
}
gatewayLog.info(`restart mode: full process restart (${modeLabel})`);
if (
respawn.mode === "supervised" &&
detectRespawnSupervisor(process.env, process.platform) === "launchd"
) {
if (supervisorMode === "launchd") {
// A short clean-exit pause keeps rapid SIGUSR1/config restarts from
// tripping launchd crash-loop throttling before KeepAlive relaunches.
await new Promise((resolve) => {

View File

@@ -0,0 +1,221 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import {
consumeGatewayRestartHandoffForExitedProcessSync,
GATEWAY_SUPERVISOR_RESTART_HANDOFF_FILENAME,
GATEWAY_SUPERVISOR_RESTART_HANDOFF_KIND,
readGatewayRestartHandoffSync,
writeGatewayRestartHandoffSync,
} from "./restart-handoff.js";
const tempDirs: string[] = [];
function createHandoffEnv(): NodeJS.ProcessEnv {
const dir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-restart-handoff-"));
tempDirs.push(dir);
return {
...process.env,
OPENCLAW_STATE_DIR: dir,
};
}
function handoffPath(env: NodeJS.ProcessEnv): string {
return path.join(env.OPENCLAW_STATE_DIR ?? "", GATEWAY_SUPERVISOR_RESTART_HANDOFF_FILENAME);
}
describe("gateway restart handoff", () => {
afterEach(() => {
for (const dir of tempDirs.splice(0)) {
fs.rmSync(dir, { force: true, recursive: true });
}
});
it("writes a supervisor handoff for an exited gateway process", () => {
const env = createHandoffEnv();
const handoff = writeGatewayRestartHandoffSync({
env,
pid: 12_345,
processInstanceId: "gateway-instance-1",
reason: "plugin source changed",
restartKind: "full-process",
supervisorMode: "launchd",
createdAt: 1_000,
});
expect(handoff).toMatchObject({
kind: GATEWAY_SUPERVISOR_RESTART_HANDOFF_KIND,
version: 1,
pid: 12_345,
processInstanceId: "gateway-instance-1",
reason: "plugin source changed",
source: "plugin-change",
restartKind: "full-process",
supervisorMode: "launchd",
createdAt: 1_000,
expiresAt: 61_000,
});
expect(fs.statSync(handoffPath(env)).mode & 0o777).toBe(0o600);
expect(readGatewayRestartHandoffSync(env, 1_500)).toMatchObject({
pid: 12_345,
reason: "plugin source changed",
});
});
it("consumes a fresh handoff by exited pid instead of current process pid", () => {
const env = createHandoffEnv();
expect(
writeGatewayRestartHandoffSync({
env,
pid: process.pid + 1,
reason: "update.run",
restartKind: "update-process",
supervisorMode: "systemd",
createdAt: 2_000,
}),
).not.toBeNull();
expect(
consumeGatewayRestartHandoffForExitedProcessSync({
env,
exitedPid: process.pid + 1,
now: 2_001,
}),
).toMatchObject({
pid: process.pid + 1,
source: "gateway-update",
restartKind: "update-process",
supervisorMode: "systemd",
});
expect(fs.existsSync(handoffPath(env))).toBe(false);
});
it("rejects handoffs for a different exited pid and clears them", () => {
const env = createHandoffEnv();
expect(
writeGatewayRestartHandoffSync({
env,
pid: 111,
restartKind: "full-process",
supervisorMode: "external",
createdAt: 1_000,
}),
).not.toBeNull();
expect(
consumeGatewayRestartHandoffForExitedProcessSync({
env,
exitedPid: 222,
now: 1_001,
}),
).toBeNull();
expect(fs.existsSync(handoffPath(env))).toBe(false);
});
it("rejects a handoff when the supplied process instance does not match", () => {
const env = createHandoffEnv();
expect(
writeGatewayRestartHandoffSync({
env,
pid: 111,
processInstanceId: "gateway-instance-1",
restartKind: "full-process",
supervisorMode: "external",
createdAt: 1_000,
}),
).not.toBeNull();
expect(
consumeGatewayRestartHandoffForExitedProcessSync({
env,
exitedPid: 111,
processInstanceId: "gateway-instance-2",
now: 1_001,
}),
).toBeNull();
expect(fs.existsSync(handoffPath(env))).toBe(false);
});
it("rejects malformed handoff payloads", () => {
const env = createHandoffEnv();
fs.writeFileSync(
handoffPath(env),
`${JSON.stringify({
kind: GATEWAY_SUPERVISOR_RESTART_HANDOFF_KIND,
version: 1,
intentId: "bad",
pid: 111,
createdAt: 1_000,
expiresAt: 61_000,
reason: 123,
source: "bad-source",
restartKind: "full-process",
supervisorMode: "external",
})}\n`,
{ encoding: "utf8", mode: 0o600 },
);
expect(readGatewayRestartHandoffSync(env, 1_001)).toBeNull();
});
it("rejects expired and oversized handoff files", () => {
const env = createHandoffEnv();
expect(
writeGatewayRestartHandoffSync({
env,
pid: 111,
restartKind: "full-process",
supervisorMode: "external",
createdAt: 1_000,
ttlMs: 1_000,
}),
).not.toBeNull();
expect(readGatewayRestartHandoffSync(env, 2_001)).toBeNull();
fs.writeFileSync(handoffPath(env), "x".repeat(8192), { encoding: "utf8", mode: 0o600 });
expect(
consumeGatewayRestartHandoffForExitedProcessSync({
env,
exitedPid: 111,
now: 2_001,
}),
).toBeNull();
expect(fs.existsSync(handoffPath(env))).toBe(false);
});
it("does not follow an existing handoff-path symlink when writing", () => {
const env = createHandoffEnv();
const targetPath = path.join(env.OPENCLAW_STATE_DIR ?? "", "attacker-target.txt");
fs.writeFileSync(targetPath, "keep", "utf8");
try {
fs.symlinkSync(targetPath, handoffPath(env));
} catch {
return;
}
expect(
writeGatewayRestartHandoffSync({
env,
pid: 12_345,
restartKind: "full-process",
supervisorMode: "external",
}),
).not.toBeNull();
expect(fs.readFileSync(targetPath, "utf8")).toBe("keep");
expect(fs.lstatSync(handoffPath(env)).isSymbolicLink()).toBe(false);
expect(
consumeGatewayRestartHandoffForExitedProcessSync({
env,
exitedPid: 12_345,
}),
).toMatchObject({ pid: 12_345 });
});
});

View File

@@ -0,0 +1,328 @@
import { randomUUID } from "node:crypto";
import fs from "node:fs";
import path from "node:path";
import { resolveStateDir } from "../config/paths.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
export const GATEWAY_SUPERVISOR_RESTART_HANDOFF_FILENAME =
"gateway-supervisor-restart-handoff.json";
export const GATEWAY_SUPERVISOR_RESTART_HANDOFF_KIND = "gateway-supervisor-restart-handoff";
const GATEWAY_RESTART_HANDOFF_TTL_MS = 60_000;
const GATEWAY_RESTART_HANDOFF_MAX_BYTES = 4096;
const MAX_INTENT_ID_LENGTH = 120;
const MAX_PROCESS_INSTANCE_ID_LENGTH = 120;
const MAX_REASON_LENGTH = 200;
const handoffLog = createSubsystemLogger("restart-handoff");
export type GatewayRestartHandoffRestartKind = "full-process" | "update-process";
export type GatewayRestartHandoffSource =
| "config-write"
| "gateway-update"
| "operator-restart"
| "plugin-change"
| "signal"
| "unknown";
export type GatewayRestartHandoffSupervisorMode = "launchd" | "systemd" | "schtasks" | "external";
export type GatewayRestartHandoff = {
kind: typeof GATEWAY_SUPERVISOR_RESTART_HANDOFF_KIND;
version: 1;
intentId: string;
pid: number;
processInstanceId?: string;
createdAt: number;
expiresAt: number;
reason?: string;
source: GatewayRestartHandoffSource;
restartKind: GatewayRestartHandoffRestartKind;
supervisorMode: GatewayRestartHandoffSupervisorMode;
};
function resolveGatewayRestartHandoffPath(env: NodeJS.ProcessEnv = process.env): string {
return path.join(resolveStateDir(env), GATEWAY_SUPERVISOR_RESTART_HANDOFF_FILENAME);
}
function unlinkRegularFileSync(filePath: string): boolean {
try {
const stat = fs.lstatSync(filePath);
if (!stat.isFile() || stat.nlink > 1) {
return false;
}
fs.unlinkSync(filePath);
return true;
} catch {
return false;
}
}
export function clearGatewayRestartHandoffSync(env: NodeJS.ProcessEnv = process.env): void {
unlinkRegularFileSync(resolveGatewayRestartHandoffPath(env));
}
function normalizePid(pid: number | undefined): number | null {
return typeof pid === "number" && Number.isSafeInteger(pid) && pid > 0 ? pid : null;
}
function normalizeText(value: unknown, maxLength: number): string | undefined {
const text = typeof value === "string" ? value.trim() : "";
return text ? text.slice(0, maxLength) : undefined;
}
function normalizeCreatedAt(value: number | undefined): number {
return typeof value === "number" && Number.isFinite(value) && value > 0
? Math.floor(value)
: Date.now();
}
function normalizeTtlMs(value: number | undefined): number {
if (typeof value !== "number" || !Number.isFinite(value) || value <= 0) {
return GATEWAY_RESTART_HANDOFF_TTL_MS;
}
return Math.min(Math.floor(value), GATEWAY_RESTART_HANDOFF_TTL_MS);
}
function normalizeSource(
source: GatewayRestartHandoffSource | undefined,
reason: string | undefined,
): GatewayRestartHandoffSource {
if (source) {
return source;
}
if (!reason) {
return "unknown";
}
const normalized = reason.toLowerCase();
if (normalized === "update.run") {
return "gateway-update";
}
if (normalized === "sigusr1") {
return "signal";
}
if (normalized === "gateway.restart") {
return "operator-restart";
}
if (normalized.includes("plugin")) {
return "plugin-change";
}
if (normalized.includes("config") || normalized.includes("include")) {
return "config-write";
}
return "unknown";
}
function isSource(value: unknown): value is GatewayRestartHandoffSource {
return (
value === "config-write" ||
value === "gateway-update" ||
value === "operator-restart" ||
value === "plugin-change" ||
value === "signal" ||
value === "unknown"
);
}
function isRestartKind(value: unknown): value is GatewayRestartHandoffRestartKind {
return value === "full-process" || value === "update-process";
}
function isSupervisorMode(value: unknown): value is GatewayRestartHandoffSupervisorMode {
return value === "launchd" || value === "systemd" || value === "schtasks" || value === "external";
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function parseGatewayRestartHandoff(raw: string): GatewayRestartHandoff | null {
let parsed: unknown;
try {
parsed = JSON.parse(raw);
} catch {
return null;
}
if (!isRecord(parsed)) {
return null;
}
if (
parsed.kind !== GATEWAY_SUPERVISOR_RESTART_HANDOFF_KIND ||
parsed.version !== 1 ||
typeof parsed.intentId !== "string" ||
parsed.intentId.trim().length === 0 ||
typeof parsed.pid !== "number" ||
!Number.isSafeInteger(parsed.pid) ||
parsed.pid <= 0 ||
typeof parsed.createdAt !== "number" ||
!Number.isFinite(parsed.createdAt) ||
typeof parsed.expiresAt !== "number" ||
!Number.isFinite(parsed.expiresAt) ||
parsed.expiresAt <= parsed.createdAt ||
!isSource(parsed.source) ||
!isRestartKind(parsed.restartKind) ||
!isSupervisorMode(parsed.supervisorMode)
) {
return null;
}
if (parsed.reason !== undefined && typeof parsed.reason !== "string") {
return null;
}
if (parsed.processInstanceId !== undefined && typeof parsed.processInstanceId !== "string") {
return null;
}
const processInstanceId = normalizeText(parsed.processInstanceId, MAX_PROCESS_INSTANCE_ID_LENGTH);
const reason = normalizeText(parsed.reason, MAX_REASON_LENGTH);
return {
kind: GATEWAY_SUPERVISOR_RESTART_HANDOFF_KIND,
version: 1,
intentId: parsed.intentId.trim().slice(0, MAX_INTENT_ID_LENGTH),
pid: parsed.pid,
...(processInstanceId ? { processInstanceId } : {}),
createdAt: Math.floor(parsed.createdAt),
expiresAt: Math.floor(parsed.expiresAt),
...(reason ? { reason } : {}),
source: parsed.source,
restartKind: parsed.restartKind,
supervisorMode: parsed.supervisorMode,
};
}
function readGatewayRestartHandoffRawSync(env: NodeJS.ProcessEnv): string | null {
const handoffPath = resolveGatewayRestartHandoffPath(env);
try {
const stat = fs.lstatSync(handoffPath);
if (!stat.isFile() || stat.nlink > 1 || stat.size > GATEWAY_RESTART_HANDOFF_MAX_BYTES) {
return null;
}
return fs.readFileSync(handoffPath, "utf8");
} catch {
return null;
}
}
export function writeGatewayRestartHandoffSync(opts: {
env?: NodeJS.ProcessEnv;
pid?: number;
processInstanceId?: string;
reason?: string;
source?: GatewayRestartHandoffSource;
restartKind: GatewayRestartHandoffRestartKind;
supervisorMode?: GatewayRestartHandoffSupervisorMode | null;
ttlMs?: number;
createdAt?: number;
}): GatewayRestartHandoff | null {
const pid = normalizePid(opts.pid ?? process.pid);
if (pid === null || !isRestartKind(opts.restartKind)) {
return null;
}
if (opts.source !== undefined && !isSource(opts.source)) {
return null;
}
const supervisorMode = opts.supervisorMode ?? "external";
if (!isSupervisorMode(supervisorMode)) {
return null;
}
const env = opts.env ?? process.env;
const createdAt = normalizeCreatedAt(opts.createdAt);
const ttlMs = normalizeTtlMs(opts.ttlMs);
const reason = normalizeText(opts.reason, MAX_REASON_LENGTH);
const processInstanceId = normalizeText(opts.processInstanceId, MAX_PROCESS_INSTANCE_ID_LENGTH);
const payload: GatewayRestartHandoff = {
kind: GATEWAY_SUPERVISOR_RESTART_HANDOFF_KIND,
version: 1,
intentId: randomUUID(),
pid,
...(processInstanceId ? { processInstanceId } : {}),
createdAt,
expiresAt: createdAt + ttlMs,
...(reason ? { reason } : {}),
source: normalizeSource(opts.source, reason),
restartKind: opts.restartKind,
supervisorMode,
};
let tmpPath: string | undefined;
try {
const handoffPath = resolveGatewayRestartHandoffPath(env);
fs.mkdirSync(path.dirname(handoffPath), { recursive: true });
tmpPath = path.join(
path.dirname(handoffPath),
`.${path.basename(handoffPath)}.${process.pid}.${Date.now()}.${randomUUID()}.tmp`,
);
let fd: number | undefined;
try {
fd = fs.openSync(tmpPath, "wx", 0o600);
fs.writeFileSync(fd, `${JSON.stringify(payload)}\n`, "utf8");
} finally {
if (fd !== undefined) {
fs.closeSync(fd);
}
}
fs.renameSync(tmpPath, handoffPath);
return payload;
} catch (err) {
if (tmpPath) {
unlinkRegularFileSync(tmpPath);
}
handoffLog.warn(`failed to write gateway restart handoff: ${String(err)}`);
return null;
}
}
export function readGatewayRestartHandoffSync(
env: NodeJS.ProcessEnv = process.env,
now = Date.now(),
): GatewayRestartHandoff | null {
const raw = readGatewayRestartHandoffRawSync(env);
if (!raw) {
return null;
}
const payload = parseGatewayRestartHandoff(raw);
if (!payload || now < payload.createdAt || now > payload.expiresAt) {
return null;
}
return payload;
}
export function consumeGatewayRestartHandoffForExitedProcessSync(opts: {
env?: NodeJS.ProcessEnv;
exitedPid?: number;
processInstanceId?: string;
now?: number;
}): GatewayRestartHandoff | null {
const env = opts.env ?? process.env;
const handoffPath = resolveGatewayRestartHandoffPath(env);
let raw: string | null = null;
try {
const stat = fs.lstatSync(handoffPath);
if (!stat.isFile() || stat.nlink > 1 || stat.size > GATEWAY_RESTART_HANDOFF_MAX_BYTES) {
return null;
}
raw = fs.readFileSync(handoffPath, "utf8");
} catch {
return null;
} finally {
clearGatewayRestartHandoffSync(env);
}
const payload = raw ? parseGatewayRestartHandoff(raw) : null;
const exitedPid = normalizePid(opts.exitedPid);
if (!payload || exitedPid === null || payload.pid !== exitedPid) {
return null;
}
const expectedProcessInstanceId = normalizeText(
opts.processInstanceId,
MAX_PROCESS_INSTANCE_ID_LENGTH,
);
if (expectedProcessInstanceId && payload.processInstanceId !== expectedProcessInstanceId) {
return null;
}
const now = opts.now ?? Date.now();
if (now < payload.createdAt || now > payload.expiresAt) {
return null;
}
return payload;
}