diff --git a/CHANGELOG.md b/CHANGELOG.md index bd08c9a9d50..f0fa69853b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,7 @@ Docs: https://docs.openclaw.ai - fix(config): accept truncateAfterCompaction (#68395). Thanks @MonkeyLeeT - CLI/Claude: keep Claude CLI session bindings stable across OAuth access-token refreshes, so gateway restarts continue the same Claude conversation instead of minting a fresh one. (#70132) Thanks @obviyus. - QQBot: add `INTERACTION` intent (`1 << 26`) to the gateway constants and include it in the `FULL_INTENTS` mask so interaction events are received. (#70143) Thanks @cxyhhhhh. +- Gateway/restart: preserve one-shot continuation instructions across gateway restarts so agents can resume and reply back to the original chat after reboot. (#63406) Thanks @VACInc. ## 2026.4.21 diff --git a/src/agents/tools/gateway-tool.test.ts b/src/agents/tools/gateway-tool.test.ts new file mode 100644 index 00000000000..07fe109642b --- /dev/null +++ b/src/agents/tools/gateway-tool.test.ts @@ -0,0 +1,144 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { RestartSentinelPayload } from "../../infra/restart-sentinel.js"; + +const isRestartEnabledMock = vi.fn(() => true); +const extractDeliveryInfoMock = vi.fn(() => ({ + deliveryContext: { + channel: "slack", + to: "slack:C123", + accountId: "workspace-1", + }, + threadId: "thread-42", +})); +const formatDoctorNonInteractiveHintMock = vi.fn(() => "Run: openclaw doctor --non-interactive"); +const writeRestartSentinelMock = vi.fn(async (_payload: RestartSentinelPayload) => "/tmp/restart"); +const scheduleGatewaySigusr1RestartMock = vi.fn(() => ({ scheduled: true, delayMs: 250 })); + +vi.mock("../../config/commands.js", () => ({ + isRestartEnabled: isRestartEnabledMock, +})); + +vi.mock("../../config/sessions.js", () => ({ + extractDeliveryInfo: extractDeliveryInfoMock, +})); + +vi.mock("../../infra/restart-sentinel.js", async () => { + const actual = await vi.importActual( + "../../infra/restart-sentinel.js", + ); + return { + ...actual, + formatDoctorNonInteractiveHint: formatDoctorNonInteractiveHintMock, + writeRestartSentinel: writeRestartSentinelMock, + }; +}); + +vi.mock("../../infra/restart.js", () => ({ + scheduleGatewaySigusr1Restart: scheduleGatewaySigusr1RestartMock, +})); + +vi.mock("../../logging/subsystem.js", () => ({ + createSubsystemLogger: vi.fn(() => ({ + info: vi.fn(), + })), +})); + +vi.mock("./gateway.js", () => ({ + callGatewayTool: vi.fn(), + readGatewayCallOptions: vi.fn(() => ({})), +})); + +describe("gateway tool restart continuation", () => { + beforeEach(() => { + isRestartEnabledMock.mockReset(); + isRestartEnabledMock.mockReturnValue(true); + extractDeliveryInfoMock.mockReset(); + extractDeliveryInfoMock.mockReturnValue({ + deliveryContext: { + channel: "slack", + to: "slack:C123", + accountId: "workspace-1", + }, + threadId: "thread-42", + }); + formatDoctorNonInteractiveHintMock.mockReset(); + formatDoctorNonInteractiveHintMock.mockReturnValue("Run: openclaw doctor --non-interactive"); + writeRestartSentinelMock.mockReset(); + writeRestartSentinelMock.mockResolvedValue("/tmp/restart"); + scheduleGatewaySigusr1RestartMock.mockReset(); + scheduleGatewaySigusr1RestartMock.mockReturnValue({ scheduled: true, delayMs: 250 }); + }); + + it("uses a flat enum for continuationKind in the tool schema", async () => { + const { createGatewayTool } = await import("./gateway-tool.js"); + const tool = createGatewayTool(); + const continuationKind = ( + tool.parameters as { + properties?: { + continuationKind?: { + type?: string; + enum?: string[]; + anyOf?: unknown[]; + }; + }; + } + ).properties?.continuationKind; + + expect(continuationKind).toEqual( + expect.objectContaining({ + type: "string", + enum: ["systemEvent", "agentTurn"], + }), + ); + expect(continuationKind).not.toHaveProperty("anyOf"); + }); + + it("instructs agents to use continuationMessage when a restart still needs a reply", async () => { + const { createGatewayTool } = await import("./gateway-tool.js"); + const tool = createGatewayTool(); + + expect(tool.description).toContain("still owe the user a reply"); + expect(tool.description).toContain("continuationMessage"); + expect(tool.description).toContain("do not write restart sentinel files directly"); + }); + + it("writes an agentTurn continuation into the restart sentinel", async () => { + const { createGatewayTool } = await import("./gateway-tool.js"); + const tool = createGatewayTool({ + agentSessionKey: "agent:main:main", + config: {}, + }); + + const result = await tool.execute?.("tool-call-1", { + action: "restart", + delayMs: 250, + reason: "continue after reboot", + note: "Gateway restarting now", + continuationMessage: "Reply with exactly: Yay! I did it!", + }); + + expect(writeRestartSentinelMock).toHaveBeenCalledWith( + expect.objectContaining({ + kind: "restart", + status: "ok", + sessionKey: "agent:main:main", + deliveryContext: { + channel: "slack", + to: "slack:C123", + accountId: "workspace-1", + }, + threadId: "thread-42", + message: "Gateway restarting now", + continuation: { + kind: "agentTurn", + message: "Reply with exactly: Yay! I did it!", + }, + }), + ); + expect(scheduleGatewaySigusr1RestartMock).toHaveBeenCalledWith({ + delayMs: 250, + reason: "continue after reboot", + }); + expect(result?.details).toEqual({ scheduled: true, delayMs: 250 }); + }); +}); diff --git a/src/agents/tools/gateway-tool.ts b/src/agents/tools/gateway-tool.ts index ee6e7300d5f..66a6f3a157d 100644 --- a/src/agents/tools/gateway-tool.ts +++ b/src/agents/tools/gateway-tool.ts @@ -14,7 +14,7 @@ import { scheduleGatewaySigusr1Restart } from "../../infra/restart.js"; import { createSubsystemLogger } from "../../logging/subsystem.js"; import { collectEnabledInsecureOrDangerousFlags } from "../../security/dangerous-config-flags.js"; import { normalizeOptionalString, readStringValue } from "../../shared/string-coerce.js"; -import { stringEnum } from "../schema/typebox.js"; +import { optionalStringEnum, stringEnum } from "../schema/typebox.js"; import { type AnyAgentTool, jsonResult, readStringParam } from "./common.js"; import { callGatewayTool, readGatewayCallOptions } from "./gateway.js"; import { isOpenClawOwnerOnlyCoreToolName } from "./owner-only-tools.js"; @@ -289,6 +289,8 @@ const GatewayToolSchema = Type.Object({ // restart delayMs: Type.Optional(Type.Number()), reason: Type.Optional(Type.String()), + continuationKind: optionalStringEnum(["systemEvent", "agentTurn"] as const), + continuationMessage: Type.Optional(Type.String()), // config.get, config.schema.lookup, config.apply, update.run gatewayUrl: Type.Optional(Type.String()), gatewayToken: Type.Optional(Type.String()), @@ -317,7 +319,7 @@ export function createGatewayTool(opts?: { name: "gateway", ownerOnly: isOpenClawOwnerOnlyCoreToolName("gateway"), description: - "Restart, inspect a specific config schema path, apply config, or update the gateway in-place (SIGUSR1). Use config.schema.lookup with a targeted dot path before config edits. Use config.patch for safe partial config updates (merges with existing). Use config.apply only when replacing entire config. Config writes hot-reload when possible and restart when required. Always pass a human-readable completion message via the `note` parameter so the system can deliver it to the user after restart.", + "Restart, inspect a specific config schema path, apply config, or update the gateway in-place (SIGUSR1). Use config.schema.lookup with a targeted dot path before config edits. Use config.patch for safe partial config updates (merges with existing). Use config.apply only when replacing entire config. Config writes hot-reload when possible and restart when required. Always pass a human-readable completion message via the `note` parameter so the system can deliver it to the user after restart. If restarting during a user task and you still owe the user a reply, pass a specific one-shot `continuationMessage` for what to verify or report after boot; do not write restart sentinel files directly. Use `continuationKind` only when it should be a system event instead of a normal agent turn.", parameters: GatewayToolSchema, execute: async (_toolCallId, args) => { const params = args as Record; @@ -335,6 +337,8 @@ export function createGatewayTool(opts?: { : undefined; const reason = normalizeOptionalString(params.reason)?.slice(0, 200); const note = normalizeOptionalString(params.note); + const continuationMessage = normalizeOptionalString(params.continuationMessage); + const continuationKind = normalizeOptionalString(params.continuationKind); // Extract channel + threadId for routing after restart. // Uses generic :thread: parsing plus plugin-owned session grammars. const { deliveryContext, threadId } = extractDeliveryInfo(sessionKey); @@ -346,6 +350,17 @@ export function createGatewayTool(opts?: { deliveryContext, threadId, message: note ?? reason ?? null, + continuation: continuationMessage + ? continuationKind === "systemEvent" + ? { + kind: "systemEvent", + text: continuationMessage, + } + : { + kind: "agentTurn", + message: continuationMessage, + } + : null, doctorHint: formatDoctorNonInteractiveHint(), stats: { mode: "gateway.restart", diff --git a/src/gateway/server-restart-sentinel.test.ts b/src/gateway/server-restart-sentinel.test.ts index 632ea857a69..006038327c4 100644 --- a/src/gateway/server-restart-sentinel.test.ts +++ b/src/gateway/server-restart-sentinel.test.ts @@ -1,6 +1,12 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { ChannelPlugin } from "../channels/plugins/types.plugin.js"; import { mergeMockedModule } from "../test-utils/vitest-module-mocks.js"; +type LoadedSessionEntry = ReturnType; +type RecordInboundSessionAndDispatchReplyParams = Parameters< + typeof import("../plugin-sdk/inbound-reply-dispatch.js").recordInboundSessionAndDispatchReply +>[0]; + const mocks = vi.hoisted(() => ({ resolveSessionAgentId: vi.fn(() => "agent-from-key"), consumeRestartSentinel: vi.fn(async () => ({ @@ -22,7 +28,19 @@ const mocks = vi.hoisted(() => ({ threadId: undefined, }), ), - loadSessionEntry: vi.fn(() => ({ cfg: {}, entry: {} })), + loadSessionEntry: vi.fn( + (): LoadedSessionEntry => ({ + cfg: {}, + entry: { + sessionId: "agent:main:main", + updatedAt: 0, + }, + store: {}, + storePath: "/tmp/sessions.json", + canonicalKey: "agent:main:main", + legacyKey: undefined, + }), + ), deliveryContextFromSession: vi.fn( (): | { channel?: string; to?: string; accountId?: string; threadId?: string | number } @@ -32,18 +50,23 @@ const mocks = vi.hoisted(() => ({ ...b, ...a, })), - getChannelPlugin: vi.fn(() => undefined), + getChannelPlugin: vi.fn((): ChannelPlugin | undefined => undefined), normalizeChannelId: vi.fn<(channel?: string | null) => string | null>(), - resolveOutboundTarget: vi.fn((_params?: { to?: string }) => ({ + resolveOutboundTarget: vi.fn(((_params?: { to?: string }) => ({ ok: true as const, to: "+15550002", - })), + })) as (params?: { to?: string }) => { ok: true; to: string } | { ok: false; error: Error }), deliverOutboundPayloads: vi.fn(async () => [{ channel: "whatsapp", messageId: "msg-1" }]), enqueueDelivery: vi.fn(async () => "queue-1"), ackDelivery: vi.fn(async () => {}), failDelivery: vi.fn(async () => {}), enqueueSystemEvent: vi.fn(), requestHeartbeatNow: vi.fn(), + injectTimestamp: vi.fn((message: string) => `stamped:${message}`), + timestampOptsFromConfig: vi.fn(() => ({})), + recordInboundSessionAndDispatchReply: vi.fn( + async (_params: RecordInboundSessionAndDispatchReplyParams) => {}, + ), logWarn: vi.fn(), })); @@ -110,6 +133,10 @@ vi.mock("../infra/system-events.js", () => ({ enqueueSystemEvent: mocks.enqueueSystemEvent, })); +vi.mock("../plugin-sdk/inbound-reply-dispatch.js", () => ({ + recordInboundSessionAndDispatchReply: mocks.recordInboundSessionAndDispatchReply, +})); + vi.mock("../infra/heartbeat-wake.js", async () => { return await mergeMockedModule( await vi.importActual( @@ -127,6 +154,11 @@ vi.mock("../logging/subsystem.js", () => ({ })), })); +vi.mock("./server-methods/agent-timestamp.js", () => ({ + injectTimestamp: mocks.injectTimestamp, + timestampOptsFromConfig: mocks.timestampOptsFromConfig, +})); + const { scheduleRestartSentinelWake } = await import("./server-restart-sentinel.js"); describe("scheduleRestartSentinelWake", () => { @@ -149,9 +181,21 @@ describe("scheduleRestartSentinelWake", () => { mocks.parseSessionThreadInfo.mockReset(); mocks.parseSessionThreadInfo.mockReturnValue({ baseSessionKey: null, threadId: undefined }); mocks.loadSessionEntry.mockReset(); - mocks.loadSessionEntry.mockReturnValue({ cfg: {}, entry: {} }); + mocks.loadSessionEntry.mockReturnValue({ + cfg: {}, + entry: { + sessionId: "agent:main:main", + updatedAt: 0, + }, + store: {}, + storePath: "/tmp/sessions.json", + canonicalKey: "agent:main:main", + legacyKey: undefined, + }); mocks.deliveryContextFromSession.mockReset(); mocks.deliveryContextFromSession.mockReturnValue(undefined); + mocks.getChannelPlugin.mockReset(); + mocks.getChannelPlugin.mockReturnValue(undefined); mocks.normalizeChannelId.mockClear(); mocks.resolveOutboundTarget.mockReset(); mocks.resolveOutboundTarget.mockReturnValue({ ok: true as const, to: "+15550002" }); @@ -163,6 +207,10 @@ describe("scheduleRestartSentinelWake", () => { mocks.failDelivery.mockClear(); mocks.enqueueSystemEvent.mockClear(); mocks.requestHeartbeatNow.mockClear(); + mocks.injectTimestamp.mockClear(); + mocks.timestampOptsFromConfig.mockClear(); + mocks.recordInboundSessionAndDispatchReply.mockReset(); + mocks.recordInboundSessionAndDispatchReply.mockResolvedValue(undefined); mocks.logWarn.mockClear(); }); @@ -201,6 +249,7 @@ describe("scheduleRestartSentinelWake", () => { reason: "wake", sessionKey: "agent:main:main", }); + expect(mocks.recordInboundSessionAndDispatchReply).not.toHaveBeenCalled(); expect(mocks.logWarn).not.toHaveBeenCalled(); }); @@ -264,6 +313,45 @@ describe("scheduleRestartSentinelWake", () => { expect(mocks.failDelivery).toHaveBeenCalledWith("queue-1", "transport still not ready"); }); + it("still dispatches continuation after restart notice retries are exhausted", async () => { + vi.useFakeTimers(); + mocks.deliverOutboundPayloads.mockRejectedValue(new Error("transport still not ready")); + mocks.consumeRestartSentinel.mockResolvedValue({ + payload: { + sessionKey: "agent:main:main", + deliveryContext: { + channel: "whatsapp", + to: "+15550002", + accountId: "acct-2", + }, + ts: 123, + continuation: { + kind: "agentTurn", + message: "continue", + }, + }, + } as unknown as Awaited>); + + const wakePromise = scheduleRestartSentinelWake({ deps: {} as never }); + await Promise.resolve(); + await Promise.resolve(); + for (let attempt = 1; attempt < 45; attempt += 1) { + await vi.advanceTimersByTimeAsync(1_000); + } + await wakePromise; + + expect(mocks.failDelivery).toHaveBeenCalledWith("queue-1", "transport still not ready"); + expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledTimes(1); + expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledWith( + expect.objectContaining({ + routeSessionKey: "agent:main:main", + ctxPayload: expect.objectContaining({ + Body: "continue", + }), + }), + ); + }); + it("prefers top-level sentinel threadId for wake routing context", async () => { // Legacy or malformed sentinel JSON can still carry a nested threadId. mocks.consumeRestartSentinel.mockResolvedValue({ @@ -277,7 +365,7 @@ describe("scheduleRestartSentinelWake", () => { } as never, threadId: "fresh-thread", }, - } as Awaited>); + } as unknown as Awaited>); await scheduleRestartSentinelWake({ deps: {} as never }); @@ -292,6 +380,430 @@ describe("scheduleRestartSentinelWake", () => { ); }); + it("dispatches agentTurn continuation after the restart notice in the same routed thread", async () => { + mocks.consumeRestartSentinel.mockResolvedValue({ + payload: { + sessionKey: "agent:main:main", + deliveryContext: { + channel: "whatsapp", + to: "+15550002", + accountId: "acct-2", + }, + threadId: "thread-42", + ts: 123, + continuation: { + kind: "agentTurn", + message: "Reply with exactly: Yay! I did it!", + }, + }, + } as Awaited>); + mocks.recordInboundSessionAndDispatchReply.mockImplementationOnce(async (params) => { + await params.deliver({ + text: "done", + replyToId: "restart-sentinel:agent:main:main:agentTurn:123", + }); + }); + + await scheduleRestartSentinelWake({ deps: {} as never }); + + expect(mocks.enqueueDelivery).toHaveBeenCalledWith( + expect.objectContaining({ + payloads: [{ text: "restart message" }], + threadId: "thread-42", + }), + ); + expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledTimes(1); + expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "whatsapp", + accountId: "acct-2", + routeSessionKey: "agent:main:main", + ctxPayload: expect.objectContaining({ + Body: "Reply with exactly: Yay! I did it!", + BodyForAgent: "stamped:Reply with exactly: Yay! I did it!", + SessionKey: "agent:main:main", + Provider: "whatsapp", + Surface: "whatsapp", + OriginatingChannel: "whatsapp", + OriginatingTo: "+15550002", + MessageThreadId: "thread-42", + }), + }), + ); + }); + + it("preserves derived reply transport ids in continuation context", async () => { + mocks.getChannelPlugin.mockReturnValue({ + id: "whatsapp", + meta: { + id: "whatsapp", + label: "WhatsApp", + selectionLabel: "WhatsApp", + docsPath: "/channels/whatsapp", + blurb: "WhatsApp", + }, + capabilities: { chatTypes: ["direct"] }, + config: { + listAccountIds: () => [], + resolveAccount: () => ({}), + }, + threading: { + resolveReplyTransport: ({ threadId }: { threadId?: string | number | null }) => ({ + replyToId: threadId != null ? `reply:${String(threadId)}` : undefined, + threadId: null, + }), + }, + }); + mocks.consumeRestartSentinel.mockResolvedValue({ + payload: { + sessionKey: "agent:main:main", + deliveryContext: { + channel: "whatsapp", + to: "+15550002", + accountId: "acct-2", + }, + threadId: "thread-42", + ts: 123, + continuation: { + kind: "agentTurn", + message: "continue", + }, + }, + } as Awaited>); + mocks.recordInboundSessionAndDispatchReply.mockImplementationOnce(async (params) => { + await params.deliver({ + text: "done", + replyToId: "restart-sentinel:agent:main:main:agentTurn:123", + }); + }); + + await scheduleRestartSentinelWake({ deps: {} as never }); + + expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledWith( + expect.objectContaining({ + ctxPayload: expect.objectContaining({ + ReplyToId: "reply:thread-42", + MessageThreadId: undefined, + }), + }), + ); + expect(mocks.deliverOutboundPayloads).toHaveBeenLastCalledWith( + expect.objectContaining({ + payloads: [ + { + text: "done", + replyToId: "reply:thread-42", + }, + ], + }), + ); + }); + + it("strips synthetic reply transport ids when no real reply target exists", async () => { + mocks.consumeRestartSentinel.mockResolvedValue({ + payload: { + sessionKey: "agent:main:main", + deliveryContext: { + channel: "whatsapp", + to: "+15550002", + accountId: "acct-2", + }, + ts: 123, + continuation: { + kind: "agentTurn", + message: "continue", + }, + }, + } as Awaited>); + mocks.recordInboundSessionAndDispatchReply.mockImplementationOnce(async (params) => { + await params.deliver({ + text: "done", + replyToId: "restart-sentinel:agent:main:main:agentTurn:123", + }); + }); + + await scheduleRestartSentinelWake({ deps: {} as never }); + + expect(mocks.deliverOutboundPayloads).toHaveBeenLastCalledWith( + expect.objectContaining({ + payloads: [{ text: "done" }], + }), + ); + }); + + it("preserves non-synthetic reply transport ids from continuation payloads", async () => { + mocks.consumeRestartSentinel.mockResolvedValue({ + payload: { + sessionKey: "agent:main:main", + deliveryContext: { + channel: "whatsapp", + to: "+15550002", + accountId: "acct-2", + }, + ts: 123, + continuation: { + kind: "agentTurn", + message: "continue", + }, + }, + } as Awaited>); + mocks.recordInboundSessionAndDispatchReply.mockImplementationOnce(async (params) => { + await params.deliver({ + text: "done", + replyToId: "provider-reply-id", + }); + }); + + await scheduleRestartSentinelWake({ deps: {} as never }); + + expect(mocks.deliverOutboundPayloads).toHaveBeenLastCalledWith( + expect.objectContaining({ + payloads: [ + { + text: "done", + replyToId: "provider-reply-id", + }, + ], + }), + ); + }); + + it("dispatches agentTurn continuation from session delivery context when sentinel routing is empty", async () => { + mocks.consumeRestartSentinel.mockResolvedValue({ + payload: { + sessionKey: "agent:main:main", + ts: 123, + continuation: { + kind: "agentTurn", + message: "continue", + }, + }, + } as unknown as Awaited>); + mocks.deliveryContextFromSession.mockReturnValue({ + channel: "telegram", + to: "telegram:200482621", + accountId: "default", + }); + mocks.resolveOutboundTarget.mockReturnValue({ + ok: true as const, + to: "telegram:200482621", + }); + + await scheduleRestartSentinelWake({ deps: {} as never }); + + expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "telegram", + accountId: "default", + ctxPayload: expect.objectContaining({ + Body: "continue", + OriginatingChannel: "telegram", + OriginatingTo: "telegram:200482621", + }), + }), + ); + }); + + it("requests another wake after enqueueing a systemEvent continuation", async () => { + mocks.consumeRestartSentinel.mockResolvedValue({ + payload: { + sessionKey: "agent:main:main", + deliveryContext: { + channel: "whatsapp", + to: "+15550002", + accountId: "acct-2", + }, + threadId: "thread-42", + ts: 123, + continuation: { + kind: "systemEvent", + text: "continue after restart", + }, + }, + } as Awaited>); + + await scheduleRestartSentinelWake({ deps: {} as never }); + + expect(mocks.enqueueSystemEvent).toHaveBeenNthCalledWith( + 2, + "continue after restart", + expect.objectContaining({ + sessionKey: "agent:main:main", + deliveryContext: expect.objectContaining({ + channel: "whatsapp", + to: "+15550002", + accountId: "acct-2", + threadId: "thread-42", + }), + }), + ); + expect(mocks.requestHeartbeatNow).toHaveBeenNthCalledWith(1, { + reason: "wake", + sessionKey: "agent:main:main", + }); + expect(mocks.requestHeartbeatNow).toHaveBeenNthCalledWith(2, { + reason: "wake", + sessionKey: "agent:main:main", + }); + }); + + it("enqueues systemEvent continuation without stale partial delivery context", async () => { + mocks.consumeRestartSentinel.mockResolvedValue({ + payload: { + sessionKey: "agent:main:main", + deliveryContext: { + channel: "whatsapp", + to: "+15550002", + accountId: "acct-2", + }, + threadId: "thread-42", + ts: 123, + continuation: { + kind: "systemEvent", + text: "continue after restart", + }, + }, + } as Awaited>); + mocks.resolveOutboundTarget.mockReturnValueOnce({ + ok: false, + error: new Error("missing route"), + }); + + await scheduleRestartSentinelWake({ deps: {} as never }); + + expect(mocks.enqueueSystemEvent).toHaveBeenNthCalledWith(2, "continue after restart", { + sessionKey: "agent:main:main", + }); + }); + + it("logs and continues when continuation delivery fails", async () => { + mocks.consumeRestartSentinel.mockResolvedValue({ + payload: { + sessionKey: "agent:main:main", + deliveryContext: { + channel: "whatsapp", + to: "+15550002", + accountId: "acct-2", + }, + ts: 123, + continuation: { + kind: "agentTurn", + message: "continue", + }, + }, + } as Awaited>); + mocks.recordInboundSessionAndDispatchReply.mockRejectedValueOnce(new Error("dispatch failed")); + + await scheduleRestartSentinelWake({ deps: {} as never }); + + expect(mocks.enqueueSystemEvent).toHaveBeenCalledWith( + "restart message", + expect.objectContaining({ + sessionKey: "agent:main:main", + }), + ); + expect(mocks.logWarn).toHaveBeenCalledWith( + expect.stringContaining("continuation delivery failed"), + expect.objectContaining({ + sessionKey: "agent:main:main", + continuationKind: "agentTurn", + }), + ); + }); + + it("logs and continues when continuation dispatch reports a delivery error", async () => { + mocks.consumeRestartSentinel.mockResolvedValue({ + payload: { + sessionKey: "agent:main:main", + deliveryContext: { + channel: "whatsapp", + to: "+15550002", + accountId: "acct-2", + }, + ts: 123, + continuation: { + kind: "agentTurn", + message: "continue", + }, + }, + } as Awaited>); + mocks.recordInboundSessionAndDispatchReply.mockImplementationOnce( + async (params: { onDispatchError: (err: unknown, info: { kind: string }) => void }) => { + params.onDispatchError(new Error("route failed"), { kind: "final" }); + }, + ); + + await scheduleRestartSentinelWake({ deps: {} as never }); + + expect(mocks.logWarn).toHaveBeenCalledWith( + expect.stringContaining("continuation delivery failed"), + expect.objectContaining({ + sessionKey: "agent:main:main", + continuationKind: "agentTurn", + }), + ); + }); + + it("warns and skips agentTurn continuation when restart routing cannot resolve a destination", async () => { + mocks.consumeRestartSentinel.mockResolvedValue({ + payload: { + sessionKey: "agent:main:main", + deliveryContext: { + channel: "whatsapp", + to: "+15550002", + accountId: "acct-2", + }, + ts: 123, + continuation: { + kind: "agentTurn", + message: "continue", + }, + }, + } as Awaited>); + mocks.resolveOutboundTarget.mockReturnValueOnce({ + ok: false, + error: new Error("missing route"), + }); + + await scheduleRestartSentinelWake({ deps: {} as never }); + + expect(mocks.recordInboundSessionAndDispatchReply).not.toHaveBeenCalled(); + expect(mocks.logWarn).toHaveBeenCalledWith( + expect.stringContaining("restart continuation route unavailable"), + expect.objectContaining({ + sessionKey: "agent:main:main", + continuationKind: "agentTurn", + }), + ); + }); + + it("consumes continuation once and does not replay it on later startup cycles", async () => { + mocks.consumeRestartSentinel + .mockResolvedValueOnce({ + payload: { + sessionKey: "agent:main:main", + deliveryContext: { + channel: "whatsapp", + to: "+15550002", + accountId: "acct-2", + }, + ts: 123, + continuation: { + kind: "agentTurn", + message: "continue", + }, + }, + } as Awaited>) + .mockResolvedValueOnce( + null as unknown as Awaited>, + ); + + await scheduleRestartSentinelWake({ deps: {} as never }); + await scheduleRestartSentinelWake({ deps: {} as never }); + + expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledTimes(1); + }); + it("does not wake the main session when the sentinel has no sessionKey", async () => { mocks.consumeRestartSentinel.mockResolvedValue({ payload: { @@ -308,6 +820,31 @@ describe("scheduleRestartSentinelWake", () => { expect(mocks.deliverOutboundPayloads).not.toHaveBeenCalled(); }); + it("warns when continuation cannot run because the restart sentinel has no sessionKey", async () => { + mocks.consumeRestartSentinel.mockResolvedValue({ + payload: { + message: "restart message", + continuation: { + kind: "agentTurn", + message: "continue", + }, + }, + } as unknown as Awaited>); + + await scheduleRestartSentinelWake({ deps: {} as never }); + + expect(mocks.enqueueSystemEvent).toHaveBeenCalledWith("restart message", { + sessionKey: "agent:main:main", + }); + expect(mocks.recordInboundSessionAndDispatchReply).not.toHaveBeenCalled(); + expect(mocks.logWarn).toHaveBeenCalledWith( + expect.stringContaining("continuation skipped"), + expect.objectContaining({ + sessionKey: "agent:main:main", + continuationKind: "agentTurn", + }), + ); + }); it("skips outbound restart notice when no canonical delivery context survives restart", async () => { mocks.consumeRestartSentinel.mockResolvedValue({ payload: { @@ -389,12 +926,27 @@ describe("scheduleRestartSentinelWake", () => { .mockReturnValueOnce({ cfg: {}, entry: { + sessionId: "agent:main:matrix:channel:!lowercased:example.org:thread:$thread-event", + updatedAt: 0, origin: { provider: "matrix", accountId: "acct-thread", threadId: "$thread-event" }, }, + store: {}, + storePath: "/tmp/sessions.json", + canonicalKey: "agent:main:matrix:channel:!lowercased:example.org:thread:$thread-event", + legacyKey: undefined, }) .mockReturnValueOnce({ cfg: {}, - entry: { lastChannel: "matrix", lastTo: "room:!MixedCase:example.org" }, + entry: { + sessionId: "agent:main:matrix:channel:!lowercased:example.org", + updatedAt: 0, + lastChannel: "matrix", + lastTo: "room:!MixedCase:example.org", + }, + store: {}, + storePath: "/tmp/sessions.json", + canonicalKey: "agent:main:matrix:channel:!lowercased:example.org", + legacyKey: undefined, }); mocks.deliveryContextFromSession .mockReturnValueOnce({ diff --git a/src/gateway/server-restart-sentinel.ts b/src/gateway/server-restart-sentinel.ts index 20541517091..cb39204c840 100644 --- a/src/gateway/server-restart-sentinel.ts +++ b/src/gateway/server-restart-sentinel.ts @@ -1,4 +1,8 @@ +import { resolveSessionAgentId } from "../agents/agent-scope.js"; +import { finalizeInboundContext } from "../auto-reply/reply/inbound-context.js"; +import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js"; import { getChannelPlugin, normalizeChannelId } from "../channels/plugins/index.js"; +import { recordInboundSession } from "../channels/session.js"; import type { CliDeps } from "../cli/deps.types.js"; import { resolveMainSessionKeyFromConfig } from "../config/sessions.js"; import { parseSessionThreadInfo } from "../config/sessions/thread-info.js"; @@ -11,14 +15,18 @@ import { resolveOutboundTarget } from "../infra/outbound/targets.js"; import { consumeRestartSentinel, formatRestartSentinelMessage, + type RestartSentinelContinuation, summarizeRestartSentinel, } from "../infra/restart-sentinel.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; +import { recordInboundSessionAndDispatchReply } from "../plugin-sdk/inbound-reply-dispatch.js"; +import type { OutboundReplyPayload } from "../plugin-sdk/reply-payload.js"; import { deliveryContextFromSession, mergeDeliveryContext, } from "../utils/delivery-context.shared.js"; +import { injectTimestamp, timestampOptsFromConfig } from "./server-methods/agent-timestamp.js"; import { loadSessionEntry } from "./session-utils.js"; const log = createSubsystemLogger("gateway/restart-sentinel"); @@ -126,6 +134,171 @@ async function deliverRestartSentinelNotice(params: { } } +function buildRestartContinuationMessageId(params: { + sessionKey: string; + kind: RestartSentinelContinuation["kind"]; + ts: number; +}) { + return `restart-sentinel:${params.sessionKey}:${params.kind}:${params.ts}`; +} + +type RestartContinuationRoute = { + channel: string; + to: string; + accountId?: string; + replyToId?: string; + threadId?: string; +}; + +function resolveRestartContinuationRoute(params: { + channel?: string; + to?: string; + accountId?: string; + replyToId?: string; + threadId?: string; +}): RestartContinuationRoute | undefined { + if (!params.channel || !params.to) { + return undefined; + } + return { + channel: params.channel, + to: params.to, + ...(params.accountId ? { accountId: params.accountId } : {}), + ...(params.replyToId ? { replyToId: params.replyToId } : {}), + ...(params.threadId ? { threadId: params.threadId } : {}), + }; +} + +function resolveRestartContinuationOutboundPayload(params: { + payload: OutboundReplyPayload; + messageId: string; + replyToId?: string; +}): OutboundReplyPayload { + if (params.payload.replyToId !== params.messageId) { + return params.payload; + } + const payload: OutboundReplyPayload = { ...params.payload }; + delete payload.replyToId; + return params.replyToId ? { ...payload, replyToId: params.replyToId } : payload; +} + +async function dispatchRestartSentinelContinuation(params: { + deps: CliDeps; + cfg: ReturnType["cfg"]; + storePath: string; + sessionKey: string; + continuation: RestartSentinelContinuation; + ts: number; + route?: RestartContinuationRoute; +}) { + if (params.continuation.kind === "systemEvent") { + enqueueSystemEvent(params.continuation.text, { + sessionKey: params.sessionKey, + ...(params.route + ? { + deliveryContext: { + channel: params.route.channel, + to: params.route.to, + ...(params.route.accountId ? { accountId: params.route.accountId } : {}), + ...(params.route.threadId ? { threadId: params.route.threadId } : {}), + }, + } + : {}), + }); + requestHeartbeatNow({ reason: "wake", sessionKey: params.sessionKey }); + return; + } + + if (!params.route) { + throw new Error("restart continuation route unavailable"); + } + + const route = params.route; + const messageId = buildRestartContinuationMessageId({ + sessionKey: params.sessionKey, + kind: params.continuation.kind, + ts: params.ts, + }); + const userMessage = params.continuation.message.trim(); + const agentId = resolveSessionAgentId({ + sessionKey: params.sessionKey, + config: params.cfg, + }); + let dispatchError: unknown; + await recordInboundSessionAndDispatchReply({ + cfg: params.cfg, + channel: route.channel, + accountId: route.accountId, + agentId, + routeSessionKey: params.sessionKey, + storePath: params.storePath, + ctxPayload: finalizeInboundContext( + { + Body: userMessage, + BodyForAgent: injectTimestamp(userMessage, timestampOptsFromConfig(params.cfg)), + BodyForCommands: userMessage, + RawBody: userMessage, + CommandBody: userMessage, + SessionKey: params.sessionKey, + AccountId: route.accountId, + MessageSid: messageId, + Timestamp: Date.now(), + Provider: route.channel, + Surface: route.channel, + ChatType: "direct", + CommandAuthorized: true, + ReplyToId: route.replyToId, + OriginatingChannel: route.channel, + OriginatingTo: route.to, + ExplicitDeliverRoute: true, + MessageThreadId: route.threadId, + }, + { + forceBodyForCommands: true, + forceChatType: true, + }, + ), + recordInboundSession, + dispatchReplyWithBufferedBlockDispatcher, + deliver: async (payload) => { + const outboundPayload = resolveRestartContinuationOutboundPayload({ + payload, + messageId, + replyToId: route.replyToId, + }); + const results = await deliverOutboundPayloads({ + cfg: params.cfg, + channel: route.channel, + to: route.to, + accountId: route.accountId, + replyToId: route.replyToId, + threadId: route.threadId, + payloads: [outboundPayload], + session: buildOutboundSessionContext({ + cfg: params.cfg, + sessionKey: params.sessionKey, + }), + deps: params.deps, + bestEffort: false, + }); + if (results.length === 0) { + throw new Error("restart continuation delivery returned no results"); + } + }, + onRecordError: (err) => { + log.warn(`restart continuation failed to record inbound session metadata: ${String(err)}`, { + sessionKey: params.sessionKey, + }); + }, + onDispatchError: (err) => { + dispatchError ??= err; + }, + }); + if (dispatchError) { + throw dispatchError; + } +} + export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) { const sentinel = await consumeRestartSentinel(); if (!sentinel) { @@ -145,12 +318,18 @@ export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) { if (!sessionKey) { const mainSessionKey = resolveMainSessionKeyFromConfig(); enqueueSystemEvent(message, { sessionKey: mainSessionKey }); + if (payload.continuation) { + log.warn(`${summary}: continuation skipped: restart sentinel sessionKey unavailable`, { + sessionKey: mainSessionKey, + continuationKind: payload.continuation.kind, + }); + } return; } const { baseSessionKey, threadId: sessionThreadId } = parseSessionThreadInfo(sessionKey); - const { cfg, entry } = loadSessionEntry(sessionKey); + const { cfg, entry, canonicalKey, storePath } = loadSessionEntry(sessionKey); // Prefer delivery context from sentinel (captured at restart) over session store // Handles race condition where store wasn't flushed before restart @@ -175,57 +354,84 @@ export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) { const channelRaw = origin?.channel; const channel = channelRaw ? normalizeChannelId(channelRaw) : null; const to = origin?.to; - if (!channel || !to) { - return; - } - - const resolved = resolveOutboundTarget({ - channel, - to, - cfg, - accountId: origin?.accountId, - mode: "implicit", - }); - if (!resolved.ok) { - return; - } - const threadId = payload.threadId ?? sessionThreadId ?? (origin?.threadId != null ? String(origin.threadId) : undefined); + let resolvedTo: string | undefined; + let replyToId: string | undefined; + let resolvedThreadId = threadId; - const replyTransport = - getChannelPlugin(channel)?.threading?.resolveReplyTransport?.({ + if (channel && to) { + const resolved = resolveOutboundTarget({ + channel, + to, cfg, accountId: origin?.accountId, - threadId, - }) ?? null; - const replyToId = replyTransport?.replyToId ?? undefined; - const resolvedThreadId = - replyTransport && Object.hasOwn(replyTransport, "threadId") - ? replyTransport.threadId != null - ? String(replyTransport.threadId) - : undefined - : threadId; - const outboundSession = buildOutboundSessionContext({ - cfg, - sessionKey, - }); + mode: "implicit", + }); + if (resolved.ok) { + resolvedTo = resolved.to; + const replyTransport = + getChannelPlugin(channel)?.threading?.resolveReplyTransport?.({ + cfg, + accountId: origin?.accountId, + threadId, + }) ?? null; + replyToId = replyTransport?.replyToId ?? undefined; + resolvedThreadId = + replyTransport && Object.hasOwn(replyTransport, "threadId") + ? replyTransport.threadId != null + ? String(replyTransport.threadId) + : undefined + : threadId; + const outboundSession = buildOutboundSessionContext({ + cfg, + sessionKey: canonicalKey, + }); - await deliverRestartSentinelNotice({ - deps: params.deps, - cfg, - sessionKey, - summary, - message, - channel, - to: resolved.to, - accountId: origin?.accountId, - replyToId, - threadId: resolvedThreadId, - session: outboundSession, - }); + await deliverRestartSentinelNotice({ + deps: params.deps, + cfg, + sessionKey: canonicalKey, + summary, + message, + channel, + to: resolvedTo, + accountId: origin?.accountId, + replyToId, + threadId: resolvedThreadId, + session: outboundSession, + }); + } + } + + if (!payload.continuation) { + return; + } + + try { + await dispatchRestartSentinelContinuation({ + deps: params.deps, + cfg, + storePath, + sessionKey: canonicalKey, + continuation: payload.continuation, + ts: payload.ts, + route: resolveRestartContinuationRoute({ + channel: channel ?? undefined, + to: resolvedTo, + accountId: origin?.accountId, + replyToId, + threadId: resolvedThreadId, + }), + }); + } catch (err) { + log.warn(`${summary}: continuation delivery failed: ${String(err)}`, { + sessionKey: canonicalKey, + continuationKind: payload.continuation.kind, + }); + } } export function shouldWakeFromRestartSentinel() { diff --git a/src/infra/restart-sentinel.test.ts b/src/infra/restart-sentinel.test.ts index 631f5f4fde2..aca1d7e6668 100644 --- a/src/infra/restart-sentinel.test.ts +++ b/src/infra/restart-sentinel.test.ts @@ -34,6 +34,10 @@ describe("restart sentinel", () => { status: "ok" as const, ts: Date.now(), sessionKey: "agent:main:mobilechat:dm:+15555550123", + continuation: { + kind: "agentTurn" as const, + message: "Reply with exactly: Yay! I did it!", + }, stats: { mode: "git" }, }; const filePath = await writeRestartSentinel(payload); @@ -41,9 +45,11 @@ describe("restart sentinel", () => { const read = await readRestartSentinel(); expect(read?.payload.kind).toBe("update"); + expect(read?.payload.continuation).toEqual(payload.continuation); const consumed = await consumeRestartSentinel(); expect(consumed?.payload.sessionKey).toBe(payload.sessionKey); + expect(consumed?.payload.continuation).toEqual(payload.continuation); const empty = await readRestartSentinel(); expect(empty).toBeNull(); diff --git a/src/infra/restart-sentinel.ts b/src/infra/restart-sentinel.ts index 60cd97a613c..7c35125c6fa 100644 --- a/src/infra/restart-sentinel.ts +++ b/src/infra/restart-sentinel.ts @@ -27,6 +27,16 @@ export type RestartSentinelStats = { durationMs?: number | null; }; +export type RestartSentinelContinuation = + | { + kind: "systemEvent"; + text: string; + } + | { + kind: "agentTurn"; + message: string; + }; + export type RestartSentinelPayload = { kind: "config-apply" | "config-patch" | "update" | "restart"; status: "ok" | "error" | "skipped"; @@ -41,6 +51,7 @@ export type RestartSentinelPayload = { /** Thread ID for reply threading (e.g., Slack thread_ts). */ threadId?: string; message?: string | null; + continuation?: RestartSentinelContinuation | null; doctorHint?: string | null; stats?: RestartSentinelStats | null; };