fix: preserve restart continuations after reboot (#63406) (thanks @VACInc)

* gateway: add restart continuation sentinel

* gateway: address restart continuation review

* gateway: handle restart continuation edge cases

* gateway: keep restart continuations on threaded delivery path

* fix(gateway): harden restart continuation routing

* test(gateway): cover restart continuation edge cases

* docs(agent): clarify restart continuation usage

* fix: preserve restart continuations after reboot (#63406) (thanks @VACInc)

---------

Co-authored-by: VACInc <3279061+VACInc@users.noreply.github.com>
Co-authored-by: Ayaan Zaidi <hi@obviy.us>
This commit is contained in:
VACInc
2026-04-22 10:39:07 -04:00
committed by GitHub
parent a43be09dca
commit 962b25b4a6
7 changed files with 988 additions and 53 deletions

View File

@@ -53,6 +53,7 @@ Docs: https://docs.openclaw.ai
- fix(config): accept truncateAfterCompaction (#68395). Thanks @MonkeyLeeT
- CLI/Claude: keep Claude CLI session bindings stable across OAuth access-token refreshes, so gateway restarts continue the same Claude conversation instead of minting a fresh one. (#70132) Thanks @obviyus.
- QQBot: add `INTERACTION` intent (`1 << 26`) to the gateway constants and include it in the `FULL_INTENTS` mask so interaction events are received. (#70143) Thanks @cxyhhhhh.
- Gateway/restart: preserve one-shot continuation instructions across gateway restarts so agents can resume and reply back to the original chat after reboot. (#63406) Thanks @VACInc.
## 2026.4.21

View File

@@ -0,0 +1,144 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { RestartSentinelPayload } from "../../infra/restart-sentinel.js";
const isRestartEnabledMock = vi.fn(() => true);
const extractDeliveryInfoMock = vi.fn(() => ({
deliveryContext: {
channel: "slack",
to: "slack:C123",
accountId: "workspace-1",
},
threadId: "thread-42",
}));
const formatDoctorNonInteractiveHintMock = vi.fn(() => "Run: openclaw doctor --non-interactive");
const writeRestartSentinelMock = vi.fn(async (_payload: RestartSentinelPayload) => "/tmp/restart");
const scheduleGatewaySigusr1RestartMock = vi.fn(() => ({ scheduled: true, delayMs: 250 }));
vi.mock("../../config/commands.js", () => ({
isRestartEnabled: isRestartEnabledMock,
}));
vi.mock("../../config/sessions.js", () => ({
extractDeliveryInfo: extractDeliveryInfoMock,
}));
vi.mock("../../infra/restart-sentinel.js", async () => {
const actual = await vi.importActual<typeof import("../../infra/restart-sentinel.js")>(
"../../infra/restart-sentinel.js",
);
return {
...actual,
formatDoctorNonInteractiveHint: formatDoctorNonInteractiveHintMock,
writeRestartSentinel: writeRestartSentinelMock,
};
});
vi.mock("../../infra/restart.js", () => ({
scheduleGatewaySigusr1Restart: scheduleGatewaySigusr1RestartMock,
}));
vi.mock("../../logging/subsystem.js", () => ({
createSubsystemLogger: vi.fn(() => ({
info: vi.fn(),
})),
}));
vi.mock("./gateway.js", () => ({
callGatewayTool: vi.fn(),
readGatewayCallOptions: vi.fn(() => ({})),
}));
describe("gateway tool restart continuation", () => {
beforeEach(() => {
isRestartEnabledMock.mockReset();
isRestartEnabledMock.mockReturnValue(true);
extractDeliveryInfoMock.mockReset();
extractDeliveryInfoMock.mockReturnValue({
deliveryContext: {
channel: "slack",
to: "slack:C123",
accountId: "workspace-1",
},
threadId: "thread-42",
});
formatDoctorNonInteractiveHintMock.mockReset();
formatDoctorNonInteractiveHintMock.mockReturnValue("Run: openclaw doctor --non-interactive");
writeRestartSentinelMock.mockReset();
writeRestartSentinelMock.mockResolvedValue("/tmp/restart");
scheduleGatewaySigusr1RestartMock.mockReset();
scheduleGatewaySigusr1RestartMock.mockReturnValue({ scheduled: true, delayMs: 250 });
});
it("uses a flat enum for continuationKind in the tool schema", async () => {
const { createGatewayTool } = await import("./gateway-tool.js");
const tool = createGatewayTool();
const continuationKind = (
tool.parameters as {
properties?: {
continuationKind?: {
type?: string;
enum?: string[];
anyOf?: unknown[];
};
};
}
).properties?.continuationKind;
expect(continuationKind).toEqual(
expect.objectContaining({
type: "string",
enum: ["systemEvent", "agentTurn"],
}),
);
expect(continuationKind).not.toHaveProperty("anyOf");
});
it("instructs agents to use continuationMessage when a restart still needs a reply", async () => {
const { createGatewayTool } = await import("./gateway-tool.js");
const tool = createGatewayTool();
expect(tool.description).toContain("still owe the user a reply");
expect(tool.description).toContain("continuationMessage");
expect(tool.description).toContain("do not write restart sentinel files directly");
});
it("writes an agentTurn continuation into the restart sentinel", async () => {
const { createGatewayTool } = await import("./gateway-tool.js");
const tool = createGatewayTool({
agentSessionKey: "agent:main:main",
config: {},
});
const result = await tool.execute?.("tool-call-1", {
action: "restart",
delayMs: 250,
reason: "continue after reboot",
note: "Gateway restarting now",
continuationMessage: "Reply with exactly: Yay! I did it!",
});
expect(writeRestartSentinelMock).toHaveBeenCalledWith(
expect.objectContaining({
kind: "restart",
status: "ok",
sessionKey: "agent:main:main",
deliveryContext: {
channel: "slack",
to: "slack:C123",
accountId: "workspace-1",
},
threadId: "thread-42",
message: "Gateway restarting now",
continuation: {
kind: "agentTurn",
message: "Reply with exactly: Yay! I did it!",
},
}),
);
expect(scheduleGatewaySigusr1RestartMock).toHaveBeenCalledWith({
delayMs: 250,
reason: "continue after reboot",
});
expect(result?.details).toEqual({ scheduled: true, delayMs: 250 });
});
});

View File

@@ -14,7 +14,7 @@ import { scheduleGatewaySigusr1Restart } from "../../infra/restart.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
import { collectEnabledInsecureOrDangerousFlags } from "../../security/dangerous-config-flags.js";
import { normalizeOptionalString, readStringValue } from "../../shared/string-coerce.js";
import { stringEnum } from "../schema/typebox.js";
import { optionalStringEnum, stringEnum } from "../schema/typebox.js";
import { type AnyAgentTool, jsonResult, readStringParam } from "./common.js";
import { callGatewayTool, readGatewayCallOptions } from "./gateway.js";
import { isOpenClawOwnerOnlyCoreToolName } from "./owner-only-tools.js";
@@ -289,6 +289,8 @@ const GatewayToolSchema = Type.Object({
// restart
delayMs: Type.Optional(Type.Number()),
reason: Type.Optional(Type.String()),
continuationKind: optionalStringEnum(["systemEvent", "agentTurn"] as const),
continuationMessage: Type.Optional(Type.String()),
// config.get, config.schema.lookup, config.apply, update.run
gatewayUrl: Type.Optional(Type.String()),
gatewayToken: Type.Optional(Type.String()),
@@ -317,7 +319,7 @@ export function createGatewayTool(opts?: {
name: "gateway",
ownerOnly: isOpenClawOwnerOnlyCoreToolName("gateway"),
description:
"Restart, inspect a specific config schema path, apply config, or update the gateway in-place (SIGUSR1). Use config.schema.lookup with a targeted dot path before config edits. Use config.patch for safe partial config updates (merges with existing). Use config.apply only when replacing entire config. Config writes hot-reload when possible and restart when required. Always pass a human-readable completion message via the `note` parameter so the system can deliver it to the user after restart.",
"Restart, inspect a specific config schema path, apply config, or update the gateway in-place (SIGUSR1). Use config.schema.lookup with a targeted dot path before config edits. Use config.patch for safe partial config updates (merges with existing). Use config.apply only when replacing entire config. Config writes hot-reload when possible and restart when required. Always pass a human-readable completion message via the `note` parameter so the system can deliver it to the user after restart. If restarting during a user task and you still owe the user a reply, pass a specific one-shot `continuationMessage` for what to verify or report after boot; do not write restart sentinel files directly. Use `continuationKind` only when it should be a system event instead of a normal agent turn.",
parameters: GatewayToolSchema,
execute: async (_toolCallId, args) => {
const params = args as Record<string, unknown>;
@@ -335,6 +337,8 @@ export function createGatewayTool(opts?: {
: undefined;
const reason = normalizeOptionalString(params.reason)?.slice(0, 200);
const note = normalizeOptionalString(params.note);
const continuationMessage = normalizeOptionalString(params.continuationMessage);
const continuationKind = normalizeOptionalString(params.continuationKind);
// Extract channel + threadId for routing after restart.
// Uses generic :thread: parsing plus plugin-owned session grammars.
const { deliveryContext, threadId } = extractDeliveryInfo(sessionKey);
@@ -346,6 +350,17 @@ export function createGatewayTool(opts?: {
deliveryContext,
threadId,
message: note ?? reason ?? null,
continuation: continuationMessage
? continuationKind === "systemEvent"
? {
kind: "systemEvent",
text: continuationMessage,
}
: {
kind: "agentTurn",
message: continuationMessage,
}
: null,
doctorHint: formatDoctorNonInteractiveHint(),
stats: {
mode: "gateway.restart",

View File

@@ -1,6 +1,12 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { ChannelPlugin } from "../channels/plugins/types.plugin.js";
import { mergeMockedModule } from "../test-utils/vitest-module-mocks.js";
type LoadedSessionEntry = ReturnType<typeof import("./session-utils.js").loadSessionEntry>;
type RecordInboundSessionAndDispatchReplyParams = Parameters<
typeof import("../plugin-sdk/inbound-reply-dispatch.js").recordInboundSessionAndDispatchReply
>[0];
const mocks = vi.hoisted(() => ({
resolveSessionAgentId: vi.fn(() => "agent-from-key"),
consumeRestartSentinel: vi.fn(async () => ({
@@ -22,7 +28,19 @@ const mocks = vi.hoisted(() => ({
threadId: undefined,
}),
),
loadSessionEntry: vi.fn(() => ({ cfg: {}, entry: {} })),
loadSessionEntry: vi.fn(
(): LoadedSessionEntry => ({
cfg: {},
entry: {
sessionId: "agent:main:main",
updatedAt: 0,
},
store: {},
storePath: "/tmp/sessions.json",
canonicalKey: "agent:main:main",
legacyKey: undefined,
}),
),
deliveryContextFromSession: vi.fn(
():
| { channel?: string; to?: string; accountId?: string; threadId?: string | number }
@@ -32,18 +50,23 @@ const mocks = vi.hoisted(() => ({
...b,
...a,
})),
getChannelPlugin: vi.fn(() => undefined),
getChannelPlugin: vi.fn((): ChannelPlugin | undefined => undefined),
normalizeChannelId: vi.fn<(channel?: string | null) => string | null>(),
resolveOutboundTarget: vi.fn((_params?: { to?: string }) => ({
resolveOutboundTarget: vi.fn(((_params?: { to?: string }) => ({
ok: true as const,
to: "+15550002",
})),
})) as (params?: { to?: string }) => { ok: true; to: string } | { ok: false; error: Error }),
deliverOutboundPayloads: vi.fn(async () => [{ channel: "whatsapp", messageId: "msg-1" }]),
enqueueDelivery: vi.fn(async () => "queue-1"),
ackDelivery: vi.fn(async () => {}),
failDelivery: vi.fn(async () => {}),
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
injectTimestamp: vi.fn((message: string) => `stamped:${message}`),
timestampOptsFromConfig: vi.fn(() => ({})),
recordInboundSessionAndDispatchReply: vi.fn(
async (_params: RecordInboundSessionAndDispatchReplyParams) => {},
),
logWarn: vi.fn(),
}));
@@ -110,6 +133,10 @@ vi.mock("../infra/system-events.js", () => ({
enqueueSystemEvent: mocks.enqueueSystemEvent,
}));
vi.mock("../plugin-sdk/inbound-reply-dispatch.js", () => ({
recordInboundSessionAndDispatchReply: mocks.recordInboundSessionAndDispatchReply,
}));
vi.mock("../infra/heartbeat-wake.js", async () => {
return await mergeMockedModule(
await vi.importActual<typeof import("../infra/heartbeat-wake.js")>(
@@ -127,6 +154,11 @@ vi.mock("../logging/subsystem.js", () => ({
})),
}));
vi.mock("./server-methods/agent-timestamp.js", () => ({
injectTimestamp: mocks.injectTimestamp,
timestampOptsFromConfig: mocks.timestampOptsFromConfig,
}));
const { scheduleRestartSentinelWake } = await import("./server-restart-sentinel.js");
describe("scheduleRestartSentinelWake", () => {
@@ -149,9 +181,21 @@ describe("scheduleRestartSentinelWake", () => {
mocks.parseSessionThreadInfo.mockReset();
mocks.parseSessionThreadInfo.mockReturnValue({ baseSessionKey: null, threadId: undefined });
mocks.loadSessionEntry.mockReset();
mocks.loadSessionEntry.mockReturnValue({ cfg: {}, entry: {} });
mocks.loadSessionEntry.mockReturnValue({
cfg: {},
entry: {
sessionId: "agent:main:main",
updatedAt: 0,
},
store: {},
storePath: "/tmp/sessions.json",
canonicalKey: "agent:main:main",
legacyKey: undefined,
});
mocks.deliveryContextFromSession.mockReset();
mocks.deliveryContextFromSession.mockReturnValue(undefined);
mocks.getChannelPlugin.mockReset();
mocks.getChannelPlugin.mockReturnValue(undefined);
mocks.normalizeChannelId.mockClear();
mocks.resolveOutboundTarget.mockReset();
mocks.resolveOutboundTarget.mockReturnValue({ ok: true as const, to: "+15550002" });
@@ -163,6 +207,10 @@ describe("scheduleRestartSentinelWake", () => {
mocks.failDelivery.mockClear();
mocks.enqueueSystemEvent.mockClear();
mocks.requestHeartbeatNow.mockClear();
mocks.injectTimestamp.mockClear();
mocks.timestampOptsFromConfig.mockClear();
mocks.recordInboundSessionAndDispatchReply.mockReset();
mocks.recordInboundSessionAndDispatchReply.mockResolvedValue(undefined);
mocks.logWarn.mockClear();
});
@@ -201,6 +249,7 @@ describe("scheduleRestartSentinelWake", () => {
reason: "wake",
sessionKey: "agent:main:main",
});
expect(mocks.recordInboundSessionAndDispatchReply).not.toHaveBeenCalled();
expect(mocks.logWarn).not.toHaveBeenCalled();
});
@@ -264,6 +313,45 @@ describe("scheduleRestartSentinelWake", () => {
expect(mocks.failDelivery).toHaveBeenCalledWith("queue-1", "transport still not ready");
});
it("still dispatches continuation after restart notice retries are exhausted", async () => {
vi.useFakeTimers();
mocks.deliverOutboundPayloads.mockRejectedValue(new Error("transport still not ready"));
mocks.consumeRestartSentinel.mockResolvedValue({
payload: {
sessionKey: "agent:main:main",
deliveryContext: {
channel: "whatsapp",
to: "+15550002",
accountId: "acct-2",
},
ts: 123,
continuation: {
kind: "agentTurn",
message: "continue",
},
},
} as unknown as Awaited<ReturnType<typeof mocks.consumeRestartSentinel>>);
const wakePromise = scheduleRestartSentinelWake({ deps: {} as never });
await Promise.resolve();
await Promise.resolve();
for (let attempt = 1; attempt < 45; attempt += 1) {
await vi.advanceTimersByTimeAsync(1_000);
}
await wakePromise;
expect(mocks.failDelivery).toHaveBeenCalledWith("queue-1", "transport still not ready");
expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledTimes(1);
expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledWith(
expect.objectContaining({
routeSessionKey: "agent:main:main",
ctxPayload: expect.objectContaining({
Body: "continue",
}),
}),
);
});
it("prefers top-level sentinel threadId for wake routing context", async () => {
// Legacy or malformed sentinel JSON can still carry a nested threadId.
mocks.consumeRestartSentinel.mockResolvedValue({
@@ -277,7 +365,7 @@ describe("scheduleRestartSentinelWake", () => {
} as never,
threadId: "fresh-thread",
},
} as Awaited<ReturnType<typeof mocks.consumeRestartSentinel>>);
} as unknown as Awaited<ReturnType<typeof mocks.consumeRestartSentinel>>);
await scheduleRestartSentinelWake({ deps: {} as never });
@@ -292,6 +380,430 @@ describe("scheduleRestartSentinelWake", () => {
);
});
it("dispatches agentTurn continuation after the restart notice in the same routed thread", async () => {
mocks.consumeRestartSentinel.mockResolvedValue({
payload: {
sessionKey: "agent:main:main",
deliveryContext: {
channel: "whatsapp",
to: "+15550002",
accountId: "acct-2",
},
threadId: "thread-42",
ts: 123,
continuation: {
kind: "agentTurn",
message: "Reply with exactly: Yay! I did it!",
},
},
} as Awaited<ReturnType<typeof mocks.consumeRestartSentinel>>);
mocks.recordInboundSessionAndDispatchReply.mockImplementationOnce(async (params) => {
await params.deliver({
text: "done",
replyToId: "restart-sentinel:agent:main:main:agentTurn:123",
});
});
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.enqueueDelivery).toHaveBeenCalledWith(
expect.objectContaining({
payloads: [{ text: "restart message" }],
threadId: "thread-42",
}),
);
expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledTimes(1);
expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledWith(
expect.objectContaining({
channel: "whatsapp",
accountId: "acct-2",
routeSessionKey: "agent:main:main",
ctxPayload: expect.objectContaining({
Body: "Reply with exactly: Yay! I did it!",
BodyForAgent: "stamped:Reply with exactly: Yay! I did it!",
SessionKey: "agent:main:main",
Provider: "whatsapp",
Surface: "whatsapp",
OriginatingChannel: "whatsapp",
OriginatingTo: "+15550002",
MessageThreadId: "thread-42",
}),
}),
);
});
it("preserves derived reply transport ids in continuation context", async () => {
mocks.getChannelPlugin.mockReturnValue({
id: "whatsapp",
meta: {
id: "whatsapp",
label: "WhatsApp",
selectionLabel: "WhatsApp",
docsPath: "/channels/whatsapp",
blurb: "WhatsApp",
},
capabilities: { chatTypes: ["direct"] },
config: {
listAccountIds: () => [],
resolveAccount: () => ({}),
},
threading: {
resolveReplyTransport: ({ threadId }: { threadId?: string | number | null }) => ({
replyToId: threadId != null ? `reply:${String(threadId)}` : undefined,
threadId: null,
}),
},
});
mocks.consumeRestartSentinel.mockResolvedValue({
payload: {
sessionKey: "agent:main:main",
deliveryContext: {
channel: "whatsapp",
to: "+15550002",
accountId: "acct-2",
},
threadId: "thread-42",
ts: 123,
continuation: {
kind: "agentTurn",
message: "continue",
},
},
} as Awaited<ReturnType<typeof mocks.consumeRestartSentinel>>);
mocks.recordInboundSessionAndDispatchReply.mockImplementationOnce(async (params) => {
await params.deliver({
text: "done",
replyToId: "restart-sentinel:agent:main:main:agentTurn:123",
});
});
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledWith(
expect.objectContaining({
ctxPayload: expect.objectContaining({
ReplyToId: "reply:thread-42",
MessageThreadId: undefined,
}),
}),
);
expect(mocks.deliverOutboundPayloads).toHaveBeenLastCalledWith(
expect.objectContaining({
payloads: [
{
text: "done",
replyToId: "reply:thread-42",
},
],
}),
);
});
it("strips synthetic reply transport ids when no real reply target exists", async () => {
mocks.consumeRestartSentinel.mockResolvedValue({
payload: {
sessionKey: "agent:main:main",
deliveryContext: {
channel: "whatsapp",
to: "+15550002",
accountId: "acct-2",
},
ts: 123,
continuation: {
kind: "agentTurn",
message: "continue",
},
},
} as Awaited<ReturnType<typeof mocks.consumeRestartSentinel>>);
mocks.recordInboundSessionAndDispatchReply.mockImplementationOnce(async (params) => {
await params.deliver({
text: "done",
replyToId: "restart-sentinel:agent:main:main:agentTurn:123",
});
});
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.deliverOutboundPayloads).toHaveBeenLastCalledWith(
expect.objectContaining({
payloads: [{ text: "done" }],
}),
);
});
it("preserves non-synthetic reply transport ids from continuation payloads", async () => {
mocks.consumeRestartSentinel.mockResolvedValue({
payload: {
sessionKey: "agent:main:main",
deliveryContext: {
channel: "whatsapp",
to: "+15550002",
accountId: "acct-2",
},
ts: 123,
continuation: {
kind: "agentTurn",
message: "continue",
},
},
} as Awaited<ReturnType<typeof mocks.consumeRestartSentinel>>);
mocks.recordInboundSessionAndDispatchReply.mockImplementationOnce(async (params) => {
await params.deliver({
text: "done",
replyToId: "provider-reply-id",
});
});
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.deliverOutboundPayloads).toHaveBeenLastCalledWith(
expect.objectContaining({
payloads: [
{
text: "done",
replyToId: "provider-reply-id",
},
],
}),
);
});
it("dispatches agentTurn continuation from session delivery context when sentinel routing is empty", async () => {
mocks.consumeRestartSentinel.mockResolvedValue({
payload: {
sessionKey: "agent:main:main",
ts: 123,
continuation: {
kind: "agentTurn",
message: "continue",
},
},
} as unknown as Awaited<ReturnType<typeof mocks.consumeRestartSentinel>>);
mocks.deliveryContextFromSession.mockReturnValue({
channel: "telegram",
to: "telegram:200482621",
accountId: "default",
});
mocks.resolveOutboundTarget.mockReturnValue({
ok: true as const,
to: "telegram:200482621",
});
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledWith(
expect.objectContaining({
channel: "telegram",
accountId: "default",
ctxPayload: expect.objectContaining({
Body: "continue",
OriginatingChannel: "telegram",
OriginatingTo: "telegram:200482621",
}),
}),
);
});
it("requests another wake after enqueueing a systemEvent continuation", async () => {
mocks.consumeRestartSentinel.mockResolvedValue({
payload: {
sessionKey: "agent:main:main",
deliveryContext: {
channel: "whatsapp",
to: "+15550002",
accountId: "acct-2",
},
threadId: "thread-42",
ts: 123,
continuation: {
kind: "systemEvent",
text: "continue after restart",
},
},
} as Awaited<ReturnType<typeof mocks.consumeRestartSentinel>>);
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.enqueueSystemEvent).toHaveBeenNthCalledWith(
2,
"continue after restart",
expect.objectContaining({
sessionKey: "agent:main:main",
deliveryContext: expect.objectContaining({
channel: "whatsapp",
to: "+15550002",
accountId: "acct-2",
threadId: "thread-42",
}),
}),
);
expect(mocks.requestHeartbeatNow).toHaveBeenNthCalledWith(1, {
reason: "wake",
sessionKey: "agent:main:main",
});
expect(mocks.requestHeartbeatNow).toHaveBeenNthCalledWith(2, {
reason: "wake",
sessionKey: "agent:main:main",
});
});
it("enqueues systemEvent continuation without stale partial delivery context", async () => {
mocks.consumeRestartSentinel.mockResolvedValue({
payload: {
sessionKey: "agent:main:main",
deliveryContext: {
channel: "whatsapp",
to: "+15550002",
accountId: "acct-2",
},
threadId: "thread-42",
ts: 123,
continuation: {
kind: "systemEvent",
text: "continue after restart",
},
},
} as Awaited<ReturnType<typeof mocks.consumeRestartSentinel>>);
mocks.resolveOutboundTarget.mockReturnValueOnce({
ok: false,
error: new Error("missing route"),
});
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.enqueueSystemEvent).toHaveBeenNthCalledWith(2, "continue after restart", {
sessionKey: "agent:main:main",
});
});
it("logs and continues when continuation delivery fails", async () => {
mocks.consumeRestartSentinel.mockResolvedValue({
payload: {
sessionKey: "agent:main:main",
deliveryContext: {
channel: "whatsapp",
to: "+15550002",
accountId: "acct-2",
},
ts: 123,
continuation: {
kind: "agentTurn",
message: "continue",
},
},
} as Awaited<ReturnType<typeof mocks.consumeRestartSentinel>>);
mocks.recordInboundSessionAndDispatchReply.mockRejectedValueOnce(new Error("dispatch failed"));
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.enqueueSystemEvent).toHaveBeenCalledWith(
"restart message",
expect.objectContaining({
sessionKey: "agent:main:main",
}),
);
expect(mocks.logWarn).toHaveBeenCalledWith(
expect.stringContaining("continuation delivery failed"),
expect.objectContaining({
sessionKey: "agent:main:main",
continuationKind: "agentTurn",
}),
);
});
it("logs and continues when continuation dispatch reports a delivery error", async () => {
mocks.consumeRestartSentinel.mockResolvedValue({
payload: {
sessionKey: "agent:main:main",
deliveryContext: {
channel: "whatsapp",
to: "+15550002",
accountId: "acct-2",
},
ts: 123,
continuation: {
kind: "agentTurn",
message: "continue",
},
},
} as Awaited<ReturnType<typeof mocks.consumeRestartSentinel>>);
mocks.recordInboundSessionAndDispatchReply.mockImplementationOnce(
async (params: { onDispatchError: (err: unknown, info: { kind: string }) => void }) => {
params.onDispatchError(new Error("route failed"), { kind: "final" });
},
);
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.logWarn).toHaveBeenCalledWith(
expect.stringContaining("continuation delivery failed"),
expect.objectContaining({
sessionKey: "agent:main:main",
continuationKind: "agentTurn",
}),
);
});
it("warns and skips agentTurn continuation when restart routing cannot resolve a destination", async () => {
mocks.consumeRestartSentinel.mockResolvedValue({
payload: {
sessionKey: "agent:main:main",
deliveryContext: {
channel: "whatsapp",
to: "+15550002",
accountId: "acct-2",
},
ts: 123,
continuation: {
kind: "agentTurn",
message: "continue",
},
},
} as Awaited<ReturnType<typeof mocks.consumeRestartSentinel>>);
mocks.resolveOutboundTarget.mockReturnValueOnce({
ok: false,
error: new Error("missing route"),
});
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.recordInboundSessionAndDispatchReply).not.toHaveBeenCalled();
expect(mocks.logWarn).toHaveBeenCalledWith(
expect.stringContaining("restart continuation route unavailable"),
expect.objectContaining({
sessionKey: "agent:main:main",
continuationKind: "agentTurn",
}),
);
});
it("consumes continuation once and does not replay it on later startup cycles", async () => {
mocks.consumeRestartSentinel
.mockResolvedValueOnce({
payload: {
sessionKey: "agent:main:main",
deliveryContext: {
channel: "whatsapp",
to: "+15550002",
accountId: "acct-2",
},
ts: 123,
continuation: {
kind: "agentTurn",
message: "continue",
},
},
} as Awaited<ReturnType<typeof mocks.consumeRestartSentinel>>)
.mockResolvedValueOnce(
null as unknown as Awaited<ReturnType<typeof mocks.consumeRestartSentinel>>,
);
await scheduleRestartSentinelWake({ deps: {} as never });
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledTimes(1);
});
it("does not wake the main session when the sentinel has no sessionKey", async () => {
mocks.consumeRestartSentinel.mockResolvedValue({
payload: {
@@ -308,6 +820,31 @@ describe("scheduleRestartSentinelWake", () => {
expect(mocks.deliverOutboundPayloads).not.toHaveBeenCalled();
});
it("warns when continuation cannot run because the restart sentinel has no sessionKey", async () => {
mocks.consumeRestartSentinel.mockResolvedValue({
payload: {
message: "restart message",
continuation: {
kind: "agentTurn",
message: "continue",
},
},
} as unknown as Awaited<ReturnType<typeof mocks.consumeRestartSentinel>>);
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.enqueueSystemEvent).toHaveBeenCalledWith("restart message", {
sessionKey: "agent:main:main",
});
expect(mocks.recordInboundSessionAndDispatchReply).not.toHaveBeenCalled();
expect(mocks.logWarn).toHaveBeenCalledWith(
expect.stringContaining("continuation skipped"),
expect.objectContaining({
sessionKey: "agent:main:main",
continuationKind: "agentTurn",
}),
);
});
it("skips outbound restart notice when no canonical delivery context survives restart", async () => {
mocks.consumeRestartSentinel.mockResolvedValue({
payload: {
@@ -389,12 +926,27 @@ describe("scheduleRestartSentinelWake", () => {
.mockReturnValueOnce({
cfg: {},
entry: {
sessionId: "agent:main:matrix:channel:!lowercased:example.org:thread:$thread-event",
updatedAt: 0,
origin: { provider: "matrix", accountId: "acct-thread", threadId: "$thread-event" },
},
store: {},
storePath: "/tmp/sessions.json",
canonicalKey: "agent:main:matrix:channel:!lowercased:example.org:thread:$thread-event",
legacyKey: undefined,
})
.mockReturnValueOnce({
cfg: {},
entry: { lastChannel: "matrix", lastTo: "room:!MixedCase:example.org" },
entry: {
sessionId: "agent:main:matrix:channel:!lowercased:example.org",
updatedAt: 0,
lastChannel: "matrix",
lastTo: "room:!MixedCase:example.org",
},
store: {},
storePath: "/tmp/sessions.json",
canonicalKey: "agent:main:matrix:channel:!lowercased:example.org",
legacyKey: undefined,
});
mocks.deliveryContextFromSession
.mockReturnValueOnce({

View File

@@ -1,4 +1,8 @@
import { resolveSessionAgentId } from "../agents/agent-scope.js";
import { finalizeInboundContext } from "../auto-reply/reply/inbound-context.js";
import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js";
import { getChannelPlugin, normalizeChannelId } from "../channels/plugins/index.js";
import { recordInboundSession } from "../channels/session.js";
import type { CliDeps } from "../cli/deps.types.js";
import { resolveMainSessionKeyFromConfig } from "../config/sessions.js";
import { parseSessionThreadInfo } from "../config/sessions/thread-info.js";
@@ -11,14 +15,18 @@ import { resolveOutboundTarget } from "../infra/outbound/targets.js";
import {
consumeRestartSentinel,
formatRestartSentinelMessage,
type RestartSentinelContinuation,
summarizeRestartSentinel,
} from "../infra/restart-sentinel.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { recordInboundSessionAndDispatchReply } from "../plugin-sdk/inbound-reply-dispatch.js";
import type { OutboundReplyPayload } from "../plugin-sdk/reply-payload.js";
import {
deliveryContextFromSession,
mergeDeliveryContext,
} from "../utils/delivery-context.shared.js";
import { injectTimestamp, timestampOptsFromConfig } from "./server-methods/agent-timestamp.js";
import { loadSessionEntry } from "./session-utils.js";
const log = createSubsystemLogger("gateway/restart-sentinel");
@@ -126,6 +134,171 @@ async function deliverRestartSentinelNotice(params: {
}
}
function buildRestartContinuationMessageId(params: {
sessionKey: string;
kind: RestartSentinelContinuation["kind"];
ts: number;
}) {
return `restart-sentinel:${params.sessionKey}:${params.kind}:${params.ts}`;
}
type RestartContinuationRoute = {
channel: string;
to: string;
accountId?: string;
replyToId?: string;
threadId?: string;
};
function resolveRestartContinuationRoute(params: {
channel?: string;
to?: string;
accountId?: string;
replyToId?: string;
threadId?: string;
}): RestartContinuationRoute | undefined {
if (!params.channel || !params.to) {
return undefined;
}
return {
channel: params.channel,
to: params.to,
...(params.accountId ? { accountId: params.accountId } : {}),
...(params.replyToId ? { replyToId: params.replyToId } : {}),
...(params.threadId ? { threadId: params.threadId } : {}),
};
}
function resolveRestartContinuationOutboundPayload(params: {
payload: OutboundReplyPayload;
messageId: string;
replyToId?: string;
}): OutboundReplyPayload {
if (params.payload.replyToId !== params.messageId) {
return params.payload;
}
const payload: OutboundReplyPayload = { ...params.payload };
delete payload.replyToId;
return params.replyToId ? { ...payload, replyToId: params.replyToId } : payload;
}
async function dispatchRestartSentinelContinuation(params: {
deps: CliDeps;
cfg: ReturnType<typeof loadSessionEntry>["cfg"];
storePath: string;
sessionKey: string;
continuation: RestartSentinelContinuation;
ts: number;
route?: RestartContinuationRoute;
}) {
if (params.continuation.kind === "systemEvent") {
enqueueSystemEvent(params.continuation.text, {
sessionKey: params.sessionKey,
...(params.route
? {
deliveryContext: {
channel: params.route.channel,
to: params.route.to,
...(params.route.accountId ? { accountId: params.route.accountId } : {}),
...(params.route.threadId ? { threadId: params.route.threadId } : {}),
},
}
: {}),
});
requestHeartbeatNow({ reason: "wake", sessionKey: params.sessionKey });
return;
}
if (!params.route) {
throw new Error("restart continuation route unavailable");
}
const route = params.route;
const messageId = buildRestartContinuationMessageId({
sessionKey: params.sessionKey,
kind: params.continuation.kind,
ts: params.ts,
});
const userMessage = params.continuation.message.trim();
const agentId = resolveSessionAgentId({
sessionKey: params.sessionKey,
config: params.cfg,
});
let dispatchError: unknown;
await recordInboundSessionAndDispatchReply({
cfg: params.cfg,
channel: route.channel,
accountId: route.accountId,
agentId,
routeSessionKey: params.sessionKey,
storePath: params.storePath,
ctxPayload: finalizeInboundContext(
{
Body: userMessage,
BodyForAgent: injectTimestamp(userMessage, timestampOptsFromConfig(params.cfg)),
BodyForCommands: userMessage,
RawBody: userMessage,
CommandBody: userMessage,
SessionKey: params.sessionKey,
AccountId: route.accountId,
MessageSid: messageId,
Timestamp: Date.now(),
Provider: route.channel,
Surface: route.channel,
ChatType: "direct",
CommandAuthorized: true,
ReplyToId: route.replyToId,
OriginatingChannel: route.channel,
OriginatingTo: route.to,
ExplicitDeliverRoute: true,
MessageThreadId: route.threadId,
},
{
forceBodyForCommands: true,
forceChatType: true,
},
),
recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher,
deliver: async (payload) => {
const outboundPayload = resolveRestartContinuationOutboundPayload({
payload,
messageId,
replyToId: route.replyToId,
});
const results = await deliverOutboundPayloads({
cfg: params.cfg,
channel: route.channel,
to: route.to,
accountId: route.accountId,
replyToId: route.replyToId,
threadId: route.threadId,
payloads: [outboundPayload],
session: buildOutboundSessionContext({
cfg: params.cfg,
sessionKey: params.sessionKey,
}),
deps: params.deps,
bestEffort: false,
});
if (results.length === 0) {
throw new Error("restart continuation delivery returned no results");
}
},
onRecordError: (err) => {
log.warn(`restart continuation failed to record inbound session metadata: ${String(err)}`, {
sessionKey: params.sessionKey,
});
},
onDispatchError: (err) => {
dispatchError ??= err;
},
});
if (dispatchError) {
throw dispatchError;
}
}
export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) {
const sentinel = await consumeRestartSentinel();
if (!sentinel) {
@@ -145,12 +318,18 @@ export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) {
if (!sessionKey) {
const mainSessionKey = resolveMainSessionKeyFromConfig();
enqueueSystemEvent(message, { sessionKey: mainSessionKey });
if (payload.continuation) {
log.warn(`${summary}: continuation skipped: restart sentinel sessionKey unavailable`, {
sessionKey: mainSessionKey,
continuationKind: payload.continuation.kind,
});
}
return;
}
const { baseSessionKey, threadId: sessionThreadId } = parseSessionThreadInfo(sessionKey);
const { cfg, entry } = loadSessionEntry(sessionKey);
const { cfg, entry, canonicalKey, storePath } = loadSessionEntry(sessionKey);
// Prefer delivery context from sentinel (captured at restart) over session store
// Handles race condition where store wasn't flushed before restart
@@ -175,57 +354,84 @@ export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) {
const channelRaw = origin?.channel;
const channel = channelRaw ? normalizeChannelId(channelRaw) : null;
const to = origin?.to;
if (!channel || !to) {
return;
}
const resolved = resolveOutboundTarget({
channel,
to,
cfg,
accountId: origin?.accountId,
mode: "implicit",
});
if (!resolved.ok) {
return;
}
const threadId =
payload.threadId ??
sessionThreadId ??
(origin?.threadId != null ? String(origin.threadId) : undefined);
let resolvedTo: string | undefined;
let replyToId: string | undefined;
let resolvedThreadId = threadId;
const replyTransport =
getChannelPlugin(channel)?.threading?.resolveReplyTransport?.({
if (channel && to) {
const resolved = resolveOutboundTarget({
channel,
to,
cfg,
accountId: origin?.accountId,
threadId,
}) ?? null;
const replyToId = replyTransport?.replyToId ?? undefined;
const resolvedThreadId =
replyTransport && Object.hasOwn(replyTransport, "threadId")
? replyTransport.threadId != null
? String(replyTransport.threadId)
: undefined
: threadId;
const outboundSession = buildOutboundSessionContext({
cfg,
sessionKey,
});
mode: "implicit",
});
if (resolved.ok) {
resolvedTo = resolved.to;
const replyTransport =
getChannelPlugin(channel)?.threading?.resolveReplyTransport?.({
cfg,
accountId: origin?.accountId,
threadId,
}) ?? null;
replyToId = replyTransport?.replyToId ?? undefined;
resolvedThreadId =
replyTransport && Object.hasOwn(replyTransport, "threadId")
? replyTransport.threadId != null
? String(replyTransport.threadId)
: undefined
: threadId;
const outboundSession = buildOutboundSessionContext({
cfg,
sessionKey: canonicalKey,
});
await deliverRestartSentinelNotice({
deps: params.deps,
cfg,
sessionKey,
summary,
message,
channel,
to: resolved.to,
accountId: origin?.accountId,
replyToId,
threadId: resolvedThreadId,
session: outboundSession,
});
await deliverRestartSentinelNotice({
deps: params.deps,
cfg,
sessionKey: canonicalKey,
summary,
message,
channel,
to: resolvedTo,
accountId: origin?.accountId,
replyToId,
threadId: resolvedThreadId,
session: outboundSession,
});
}
}
if (!payload.continuation) {
return;
}
try {
await dispatchRestartSentinelContinuation({
deps: params.deps,
cfg,
storePath,
sessionKey: canonicalKey,
continuation: payload.continuation,
ts: payload.ts,
route: resolveRestartContinuationRoute({
channel: channel ?? undefined,
to: resolvedTo,
accountId: origin?.accountId,
replyToId,
threadId: resolvedThreadId,
}),
});
} catch (err) {
log.warn(`${summary}: continuation delivery failed: ${String(err)}`, {
sessionKey: canonicalKey,
continuationKind: payload.continuation.kind,
});
}
}
export function shouldWakeFromRestartSentinel() {

View File

@@ -34,6 +34,10 @@ describe("restart sentinel", () => {
status: "ok" as const,
ts: Date.now(),
sessionKey: "agent:main:mobilechat:dm:+15555550123",
continuation: {
kind: "agentTurn" as const,
message: "Reply with exactly: Yay! I did it!",
},
stats: { mode: "git" },
};
const filePath = await writeRestartSentinel(payload);
@@ -41,9 +45,11 @@ describe("restart sentinel", () => {
const read = await readRestartSentinel();
expect(read?.payload.kind).toBe("update");
expect(read?.payload.continuation).toEqual(payload.continuation);
const consumed = await consumeRestartSentinel();
expect(consumed?.payload.sessionKey).toBe(payload.sessionKey);
expect(consumed?.payload.continuation).toEqual(payload.continuation);
const empty = await readRestartSentinel();
expect(empty).toBeNull();

View File

@@ -27,6 +27,16 @@ export type RestartSentinelStats = {
durationMs?: number | null;
};
export type RestartSentinelContinuation =
| {
kind: "systemEvent";
text: string;
}
| {
kind: "agentTurn";
message: string;
};
export type RestartSentinelPayload = {
kind: "config-apply" | "config-patch" | "update" | "restart";
status: "ok" | "error" | "skipped";
@@ -41,6 +51,7 @@ export type RestartSentinelPayload = {
/** Thread ID for reply threading (e.g., Slack thread_ts). */
threadId?: string;
message?: string | null;
continuation?: RestartSentinelContinuation | null;
doctorHint?: string | null;
stats?: RestartSentinelStats | null;
};