mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 09:50:42 +00:00
fix(gateway): bind restart continuation to emitted restart
This commit is contained in:
@@ -1,5 +1,8 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { RestartSentinelPayload } from "../../infra/restart-sentinel.js";
|
||||
import type { scheduleGatewaySigusr1Restart } from "../../infra/restart.js";
|
||||
|
||||
type ScheduleGatewayRestartArgs = Parameters<typeof scheduleGatewaySigusr1Restart>[0];
|
||||
|
||||
const isRestartEnabledMock = vi.fn(() => true);
|
||||
const extractDeliveryInfoMock = vi.fn(() => ({
|
||||
@@ -12,7 +15,11 @@ const extractDeliveryInfoMock = vi.fn(() => ({
|
||||
}));
|
||||
const formatDoctorNonInteractiveHintMock = vi.fn(() => "Run: openclaw doctor --non-interactive");
|
||||
const writeRestartSentinelMock = vi.fn(async (_payload: RestartSentinelPayload) => "/tmp/restart");
|
||||
const scheduleGatewaySigusr1RestartMock = vi.fn(() => ({ scheduled: true, delayMs: 250 }));
|
||||
const removeRestartSentinelFileMock = vi.fn(async (_path: string | null | undefined) => undefined);
|
||||
const scheduleGatewaySigusr1RestartMock = vi.fn((_opts?: ScheduleGatewayRestartArgs) => ({
|
||||
scheduled: true,
|
||||
delayMs: 250,
|
||||
}));
|
||||
|
||||
vi.mock("../../config/commands.js", () => ({
|
||||
isRestartEnabled: isRestartEnabledMock,
|
||||
@@ -29,6 +36,7 @@ vi.mock("../../infra/restart-sentinel.js", async () => {
|
||||
return {
|
||||
...actual,
|
||||
formatDoctorNonInteractiveHint: formatDoctorNonInteractiveHintMock,
|
||||
removeRestartSentinelFile: removeRestartSentinelFileMock,
|
||||
writeRestartSentinel: writeRestartSentinelMock,
|
||||
};
|
||||
});
|
||||
@@ -65,6 +73,7 @@ describe("gateway tool restart continuation", () => {
|
||||
formatDoctorNonInteractiveHintMock.mockReturnValue("Run: openclaw doctor --non-interactive");
|
||||
writeRestartSentinelMock.mockReset();
|
||||
writeRestartSentinelMock.mockResolvedValue("/tmp/restart");
|
||||
removeRestartSentinelFileMock.mockClear();
|
||||
scheduleGatewaySigusr1RestartMock.mockReset();
|
||||
scheduleGatewaySigusr1RestartMock.mockReturnValue({ scheduled: true, delayMs: 250 });
|
||||
});
|
||||
@@ -105,6 +114,10 @@ describe("gateway tool restart continuation", () => {
|
||||
continuationMessage: "Reply with exactly: Yay! I did it!",
|
||||
});
|
||||
|
||||
expect(writeRestartSentinelMock).not.toHaveBeenCalled();
|
||||
const scheduledArgs = scheduleGatewaySigusr1RestartMock.mock.calls.at(-1)?.[0];
|
||||
await scheduledArgs?.emitHooks?.beforeEmit?.();
|
||||
|
||||
expect(writeRestartSentinelMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
kind: "restart",
|
||||
@@ -126,6 +139,10 @@ describe("gateway tool restart continuation", () => {
|
||||
expect(scheduleGatewaySigusr1RestartMock).toHaveBeenCalledWith({
|
||||
delayMs: 250,
|
||||
reason: "continue after reboot",
|
||||
emitHooks: expect.objectContaining({
|
||||
beforeEmit: expect.any(Function),
|
||||
afterEmitRejected: expect.any(Function),
|
||||
}),
|
||||
});
|
||||
expect(result?.details).toEqual({ scheduled: true, delayMs: 250 });
|
||||
});
|
||||
@@ -143,6 +160,9 @@ describe("gateway tool restart continuation", () => {
|
||||
continuationMessage: "Reply after restart",
|
||||
});
|
||||
|
||||
const scheduledArgs = scheduleGatewaySigusr1RestartMock.mock.calls.at(-1)?.[0];
|
||||
await scheduledArgs?.emitHooks?.beforeEmit?.();
|
||||
|
||||
expect(writeRestartSentinelMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
continuation: {
|
||||
@@ -168,6 +188,9 @@ describe("gateway tool restart continuation", () => {
|
||||
reason: "restart requested",
|
||||
});
|
||||
|
||||
const scheduledArgs = scheduleGatewaySigusr1RestartMock.mock.calls.at(-1)?.[0];
|
||||
await scheduledArgs?.emitHooks?.beforeEmit?.();
|
||||
|
||||
expect(writeRestartSentinelMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
sessionKey: "agent:main:main",
|
||||
@@ -178,4 +201,22 @@ describe("gateway tool restart continuation", () => {
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("removes the prepared sentinel when restart emission is rejected", async () => {
|
||||
const { createGatewayTool } = await import("./gateway-tool.js");
|
||||
const tool = createGatewayTool({
|
||||
agentSessionKey: "agent:main:main",
|
||||
config: {},
|
||||
});
|
||||
|
||||
await tool.execute?.("tool-call-1", {
|
||||
action: "restart",
|
||||
});
|
||||
|
||||
const scheduledArgs = scheduleGatewaySigusr1RestartMock.mock.calls.at(-1)?.[0];
|
||||
await scheduledArgs?.emitHooks?.beforeEmit?.();
|
||||
await scheduledArgs?.emitHooks?.afterEmitRejected?.();
|
||||
|
||||
expect(removeRestartSentinelFileMock).toHaveBeenCalledWith("/tmp/restart");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -8,6 +8,7 @@ import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import {
|
||||
buildRestartSuccessContinuation,
|
||||
formatDoctorNonInteractiveHint,
|
||||
removeRestartSentinelFile,
|
||||
type RestartSentinelPayload,
|
||||
writeRestartSentinel,
|
||||
} from "../../infra/restart-sentinel.js";
|
||||
@@ -359,17 +360,21 @@ export function createGatewayTool(opts?: {
|
||||
reason,
|
||||
},
|
||||
};
|
||||
try {
|
||||
await writeRestartSentinel(payload);
|
||||
} catch {
|
||||
// ignore: sentinel is best-effort
|
||||
}
|
||||
log.info(
|
||||
`gateway tool: restart requested (delayMs=${delayMs ?? "default"}, reason=${reason ?? "none"})`,
|
||||
);
|
||||
let sentinelPath: string | null = null;
|
||||
const scheduled = scheduleGatewaySigusr1Restart({
|
||||
delayMs,
|
||||
reason,
|
||||
emitHooks: {
|
||||
beforeEmit: async () => {
|
||||
sentinelPath = await writeRestartSentinel(payload);
|
||||
},
|
||||
afterEmitRejected: async () => {
|
||||
await removeRestartSentinelFile(sentinelPath);
|
||||
},
|
||||
},
|
||||
});
|
||||
return jsonResult(scheduled);
|
||||
}
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { RestartSentinelPayload } from "../../infra/restart-sentinel.js";
|
||||
import type { scheduleGatewaySigusr1Restart } from "../../infra/restart.js";
|
||||
import type { HandleCommandsParams } from "./commands-types.js";
|
||||
|
||||
type ScheduleGatewayRestartArgs = Parameters<typeof scheduleGatewaySigusr1Restart>[0];
|
||||
|
||||
const mocks = vi.hoisted(() => ({
|
||||
unlink: vi.fn(async (_path: string) => undefined),
|
||||
isRestartEnabled: vi.fn(() => true),
|
||||
@@ -15,7 +18,9 @@ const mocks = vi.hoisted(() => ({
|
||||
})),
|
||||
formatDoctorNonInteractiveHint: vi.fn(() => "Run: openclaw doctor --non-interactive"),
|
||||
writeRestartSentinel: vi.fn(async (_payload: RestartSentinelPayload) => "/tmp/sentinel.json"),
|
||||
scheduleGatewaySigusr1Restart: vi.fn(() => ({ scheduled: true })),
|
||||
scheduleGatewaySigusr1Restart: vi.fn((_opts?: ScheduleGatewayRestartArgs) => ({
|
||||
scheduled: true,
|
||||
})),
|
||||
triggerOpenClawRestart: vi.fn(() => ({ ok: true, method: "launchctl" })),
|
||||
}));
|
||||
|
||||
@@ -143,6 +148,34 @@ describe("handleRestartCommand", () => {
|
||||
expect(mocks.triggerOpenClawRestart).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("prepares the routed sentinel only when SIGUSR1 restart emits", async () => {
|
||||
const handler = () => {};
|
||||
process.on("SIGUSR1", handler);
|
||||
try {
|
||||
const result = await handleRestartCommand(restartCommandParams(), true);
|
||||
|
||||
expect(result?.reply?.text).toContain("SIGUSR1");
|
||||
expect(mocks.writeRestartSentinel).not.toHaveBeenCalled();
|
||||
expect(mocks.triggerOpenClawRestart).not.toHaveBeenCalled();
|
||||
|
||||
const scheduledArgs = mocks.scheduleGatewaySigusr1Restart.mock.calls.at(-1)?.[0];
|
||||
await scheduledArgs?.emitHooks?.beforeEmit?.();
|
||||
|
||||
expect(mocks.writeRestartSentinel).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
kind: "restart",
|
||||
status: "ok",
|
||||
sessionKey: "agent:main:telegram:direct:123:thread:thread-1",
|
||||
continuation: expect.objectContaining({
|
||||
kind: "agentTurn",
|
||||
}),
|
||||
}),
|
||||
);
|
||||
} finally {
|
||||
process.removeListener("SIGUSR1", handler);
|
||||
}
|
||||
});
|
||||
|
||||
it("rejects authorized non-owner restart commands", async () => {
|
||||
const result = await handleRestartCommand(
|
||||
restartCommandParams({
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import fs from "node:fs/promises";
|
||||
import { resolveSessionAgentId } from "../../agents/agent-scope.js";
|
||||
import { resolveFastModeState } from "../../agents/fast-mode.js";
|
||||
import {
|
||||
@@ -16,6 +15,7 @@ import type { SessionBindingRecord } from "../../infra/outbound/session-binding-
|
||||
import {
|
||||
buildRestartSuccessContinuation,
|
||||
formatDoctorNonInteractiveHint,
|
||||
removeRestartSentinelFile,
|
||||
type RestartSentinelPayload,
|
||||
writeRestartSentinel,
|
||||
} from "../../infra/restart-sentinel.js";
|
||||
@@ -42,7 +42,7 @@ const SESSION_DURATION_OFF_VALUES = new Set(["off", "disable", "disabled", "none
|
||||
const SESSION_ACTION_IDLE = "idle";
|
||||
const SESSION_ACTION_MAX_AGE = "max-age";
|
||||
|
||||
async function writeRestartCommandSentinel(params: HandleCommandsParams): Promise<string | null> {
|
||||
function buildRestartCommandSentinel(params: HandleCommandsParams): RestartSentinelPayload | null {
|
||||
const sessionKey = normalizeOptionalString(params.sessionKey);
|
||||
if (!sessionKey) {
|
||||
return null;
|
||||
@@ -63,14 +63,7 @@ async function writeRestartCommandSentinel(params: HandleCommandsParams): Promis
|
||||
reason: "/restart",
|
||||
},
|
||||
};
|
||||
return await writeRestartSentinel(payload);
|
||||
}
|
||||
|
||||
async function removeRestartCommandSentinel(filePath: string | null) {
|
||||
if (!filePath) {
|
||||
return;
|
||||
}
|
||||
await fs.unlink(filePath).catch(() => {});
|
||||
return payload;
|
||||
}
|
||||
|
||||
function resolveSessionCommandUsage() {
|
||||
@@ -683,9 +676,34 @@ export const handleRestartCommand: CommandHandler = async (params, allowTextComm
|
||||
};
|
||||
}
|
||||
const hasSigusr1Listener = process.listenerCount("SIGUSR1") > 0;
|
||||
const sentinelPayload = buildRestartCommandSentinel(params);
|
||||
if (hasSigusr1Listener) {
|
||||
let sentinelPath: string | null = null;
|
||||
scheduleGatewaySigusr1Restart({
|
||||
reason: "/restart",
|
||||
emitHooks: sentinelPayload
|
||||
? {
|
||||
beforeEmit: async () => {
|
||||
sentinelPath = await writeRestartSentinel(sentinelPayload);
|
||||
},
|
||||
afterEmitRejected: async () => {
|
||||
await removeRestartSentinelFile(sentinelPath);
|
||||
},
|
||||
}
|
||||
: undefined,
|
||||
});
|
||||
return {
|
||||
shouldContinue: false,
|
||||
reply: {
|
||||
text: "⚙️ Restarting OpenClaw in-process (SIGUSR1); back in a few seconds.",
|
||||
},
|
||||
};
|
||||
}
|
||||
let sentinelPath: string | null = null;
|
||||
try {
|
||||
sentinelPath = await writeRestartCommandSentinel(params);
|
||||
if (sentinelPayload) {
|
||||
sentinelPath = await writeRestartSentinel(sentinelPayload);
|
||||
}
|
||||
} catch (err) {
|
||||
logVerbose(`failed to write /restart sentinel: ${String(err)}`);
|
||||
return {
|
||||
@@ -695,18 +713,9 @@ export const handleRestartCommand: CommandHandler = async (params, allowTextComm
|
||||
},
|
||||
};
|
||||
}
|
||||
if (hasSigusr1Listener) {
|
||||
scheduleGatewaySigusr1Restart({ reason: "/restart" });
|
||||
return {
|
||||
shouldContinue: false,
|
||||
reply: {
|
||||
text: "⚙️ Restarting OpenClaw in-process (SIGUSR1); back in a few seconds.",
|
||||
},
|
||||
};
|
||||
}
|
||||
const restartMethod = triggerOpenClawRestart();
|
||||
if (!restartMethod.ok) {
|
||||
await removeRestartCommandSentinel(sentinelPath);
|
||||
await removeRestartSentinelFile(sentinelPath);
|
||||
const detail = restartMethod.detail ? ` Details: ${restartMethod.detail}` : "";
|
||||
return {
|
||||
shouldContinue: false,
|
||||
|
||||
@@ -421,6 +421,9 @@ describe("scheduleRestartSentinelWake", () => {
|
||||
ctxPayload: expect.objectContaining({
|
||||
Body: "Reply with exactly: Yay! I did it!",
|
||||
BodyForAgent: "stamped:Reply with exactly: Yay! I did it!",
|
||||
BodyForCommands: "",
|
||||
CommandBody: "",
|
||||
CommandAuthorized: false,
|
||||
SessionKey: "agent:main:main",
|
||||
Provider: "whatsapp",
|
||||
Surface: "whatsapp",
|
||||
|
||||
@@ -241,9 +241,9 @@ async function dispatchRestartSentinelContinuation(params: {
|
||||
{
|
||||
Body: userMessage,
|
||||
BodyForAgent: injectTimestamp(userMessage, timestampOptsFromConfig(params.cfg)),
|
||||
BodyForCommands: userMessage,
|
||||
BodyForCommands: "",
|
||||
RawBody: userMessage,
|
||||
CommandBody: userMessage,
|
||||
CommandBody: "",
|
||||
SessionKey: params.sessionKey,
|
||||
AccountId: route.accountId,
|
||||
MessageSid: messageId,
|
||||
@@ -251,7 +251,7 @@ async function dispatchRestartSentinelContinuation(params: {
|
||||
Provider: route.channel,
|
||||
Surface: route.channel,
|
||||
ChatType: route.chatType,
|
||||
CommandAuthorized: true,
|
||||
CommandAuthorized: false,
|
||||
ReplyToId: route.replyToId,
|
||||
OriginatingChannel: route.channel,
|
||||
OriginatingTo: route.to,
|
||||
|
||||
@@ -94,6 +94,45 @@ describe("infra runtime", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("runs restart preparation only when the scheduled restart emits", async () => {
|
||||
const beforeEmit = vi.fn(async () => {});
|
||||
const emitSpy = vi.spyOn(process, "emit");
|
||||
const handler = () => {};
|
||||
process.on("SIGUSR1", handler);
|
||||
try {
|
||||
scheduleGatewaySigusr1Restart({
|
||||
delayMs: 1_000,
|
||||
emitHooks: { beforeEmit },
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(999);
|
||||
expect(beforeEmit).not.toHaveBeenCalled();
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
expect(beforeEmit).toHaveBeenCalledTimes(1);
|
||||
expect(emitSpy).toHaveBeenCalledWith("SIGUSR1");
|
||||
} finally {
|
||||
process.removeListener("SIGUSR1", handler);
|
||||
}
|
||||
});
|
||||
|
||||
it("rolls back prepared restart state when emission is rejected", async () => {
|
||||
const beforeEmit = vi.fn(async () => {});
|
||||
const afterEmitRejected = vi.fn(async () => {});
|
||||
vi.spyOn(process, "kill").mockImplementation(() => {
|
||||
throw new Error("no signal");
|
||||
});
|
||||
|
||||
scheduleGatewaySigusr1Restart({
|
||||
delayMs: 0,
|
||||
emitHooks: { beforeEmit, afterEmitRejected },
|
||||
});
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
|
||||
expect(beforeEmit).toHaveBeenCalledTimes(1);
|
||||
expect(afterEmitRejected).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("applies restart cooldown between emitted restart cycles", async () => {
|
||||
const emitSpy = vi.spyOn(process, "emit");
|
||||
const handler = () => {};
|
||||
|
||||
@@ -87,6 +87,13 @@ export async function writeRestartSentinel(
|
||||
return filePath;
|
||||
}
|
||||
|
||||
export async function removeRestartSentinelFile(filePath: string | null | undefined) {
|
||||
if (!filePath) {
|
||||
return;
|
||||
}
|
||||
await fs.unlink(filePath).catch(() => {});
|
||||
}
|
||||
|
||||
export function buildRestartSuccessContinuation(params: {
|
||||
sessionKey?: string;
|
||||
continuationMessage?: string | null;
|
||||
@@ -140,7 +147,7 @@ export async function consumeRestartSentinel(
|
||||
if (!parsed) {
|
||||
return null;
|
||||
}
|
||||
await fs.unlink(filePath).catch(() => {});
|
||||
await removeRestartSentinelFile(filePath);
|
||||
return parsed;
|
||||
}
|
||||
|
||||
|
||||
@@ -197,6 +197,25 @@ export type RestartDeferralHooks = {
|
||||
onCheckError?: (err: unknown) => void;
|
||||
};
|
||||
|
||||
export type RestartEmitHooks = {
|
||||
beforeEmit?: () => Promise<void>;
|
||||
afterEmitRejected?: () => Promise<void>;
|
||||
};
|
||||
|
||||
async function emitPreparedGatewayRestart(hooks?: RestartEmitHooks): Promise<void> {
|
||||
try {
|
||||
await hooks?.beforeEmit?.();
|
||||
} catch (err) {
|
||||
restartLog.warn(`restart preparation failed; restart not emitted: ${String(err)}`);
|
||||
return;
|
||||
}
|
||||
|
||||
const emitted = emitGatewayRestart();
|
||||
if (!emitted) {
|
||||
await hooks?.afterEmitRejected?.().catch(() => undefined);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Poll pending work until it drains (or times out), then emit one restart signal.
|
||||
* Shared by both the direct RPC restart path and the config watcher path.
|
||||
@@ -204,6 +223,7 @@ export type RestartDeferralHooks = {
|
||||
export function deferGatewayRestartUntilIdle(opts: {
|
||||
getPendingCount: () => number;
|
||||
hooks?: RestartDeferralHooks;
|
||||
emitHooks?: RestartEmitHooks;
|
||||
pollMs?: number;
|
||||
maxWaitMs?: number;
|
||||
}): void {
|
||||
@@ -217,12 +237,12 @@ export function deferGatewayRestartUntilIdle(opts: {
|
||||
pending = opts.getPendingCount();
|
||||
} catch (err) {
|
||||
opts.hooks?.onCheckError?.(err);
|
||||
emitGatewayRestart();
|
||||
void emitPreparedGatewayRestart(opts.emitHooks);
|
||||
return;
|
||||
}
|
||||
if (pending <= 0) {
|
||||
opts.hooks?.onReady?.();
|
||||
emitGatewayRestart();
|
||||
void emitPreparedGatewayRestart(opts.emitHooks);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -236,14 +256,14 @@ export function deferGatewayRestartUntilIdle(opts: {
|
||||
clearInterval(poll);
|
||||
activeDeferralPolls.delete(poll);
|
||||
opts.hooks?.onCheckError?.(err);
|
||||
emitGatewayRestart();
|
||||
void emitPreparedGatewayRestart(opts.emitHooks);
|
||||
return;
|
||||
}
|
||||
if (current <= 0) {
|
||||
clearInterval(poll);
|
||||
activeDeferralPolls.delete(poll);
|
||||
opts.hooks?.onReady?.();
|
||||
emitGatewayRestart();
|
||||
void emitPreparedGatewayRestart(opts.emitHooks);
|
||||
return;
|
||||
}
|
||||
const elapsedMs = Date.now() - startedAt;
|
||||
@@ -251,7 +271,7 @@ export function deferGatewayRestartUntilIdle(opts: {
|
||||
clearInterval(poll);
|
||||
activeDeferralPolls.delete(poll);
|
||||
opts.hooks?.onTimeout?.(current, elapsedMs);
|
||||
emitGatewayRestart();
|
||||
void emitPreparedGatewayRestart(opts.emitHooks);
|
||||
}
|
||||
}, pollMs);
|
||||
activeDeferralPolls.add(poll);
|
||||
@@ -419,6 +439,7 @@ export function scheduleGatewaySigusr1Restart(opts?: {
|
||||
delayMs?: number;
|
||||
reason?: string;
|
||||
audit?: RestartAuditInfo;
|
||||
emitHooks?: RestartEmitHooks;
|
||||
}): ScheduledRestart {
|
||||
const delayMsRaw =
|
||||
typeof opts?.delayMs === "number" && Number.isFinite(opts.delayMs)
|
||||
@@ -484,13 +505,14 @@ export function scheduleGatewaySigusr1Restart(opts?: {
|
||||
pendingRestartReason = undefined;
|
||||
const pendingCheck = preRestartCheck;
|
||||
if (!pendingCheck) {
|
||||
emitGatewayRestart();
|
||||
void emitPreparedGatewayRestart(opts?.emitHooks);
|
||||
return;
|
||||
}
|
||||
const cfg = getRuntimeConfig();
|
||||
deferGatewayRestartUntilIdle({
|
||||
getPendingCount: pendingCheck,
|
||||
maxWaitMs: cfg.gateway?.reload?.deferralTimeoutMs,
|
||||
emitHooks: opts?.emitHooks,
|
||||
});
|
||||
},
|
||||
Math.max(0, requestedDueAt - nowMs),
|
||||
|
||||
Reference in New Issue
Block a user