diff --git a/CHANGELOG.md b/CHANGELOG.md index eb82e904761..0fbde953a42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -360,6 +360,7 @@ Docs: https://docs.openclaw.ai - Approvals/startup: let native approval handlers report ready after gateway authentication while replaying pending approvals in the background, so slow or failing replay delivery no longer blocks handler startup or amplifies reconnect storms. Thanks @steipete. - WhatsApp/security: keep contact/vCard/location structured-object free text out of the inline message body and render it through fenced untrusted metadata JSON, limiting hidden prompt-injection payloads in names, phone fields, and location labels/comments. Thanks @steipete. - Group-chat/security: keep channel-sourced group names and participant labels out of inline group system prompts and render them through fenced untrusted metadata JSON. Thanks @steipete. +- Gateway/restart continuation: durably hand restart continuations to a session-delivery queue before deleting the restart sentinel, recover queued continuation work after crashy restarts, and fall back to a session-only wake when no channel route survives reboot. (#70780) Thanks @fuller-stack-dev. - Agents/replay: preserve Kimi-style `functions.:` tool-call IDs during strict replay sanitization so custom OpenAI-compatible Kimi routes keep multi-turn tool use intact. (#70693) Thanks @geri4. - Discord/replies: preserve final reply permission context through outbound delivery so Discord replies keep the same channel/member routing rules at send time. Thanks @steipete. - Plugins/startup: restore bundled plugin `openclaw/plugin-sdk/*` resolution from packaged installs and external runtime-deps stage roots, so Telegram/Discord no longer crash-loop with `Cannot find package 'openclaw'` after missing dependency repair. (#70852) Thanks @simonemacario. diff --git a/src/gateway/server-restart-sentinel.test.ts b/src/gateway/server-restart-sentinel.test.ts index 46e1d18cef9..8fc9ea98f76 100644 --- a/src/gateway/server-restart-sentinel.test.ts +++ b/src/gateway/server-restart-sentinel.test.ts @@ -7,79 +7,139 @@ type RecordInboundSessionAndDispatchReplyParams = Parameters< typeof import("../plugin-sdk/inbound-reply-dispatch.js").recordInboundSessionAndDispatchReply >[0]; -const mocks = vi.hoisted(() => ({ - resolveSessionAgentId: vi.fn(() => "agent-from-key"), - consumeRestartSentinel: vi.fn(async () => ({ - payload: { - sessionKey: "agent:main:main", - deliveryContext: { - channel: "whatsapp", - to: "+15550002", - accountId: "acct-2", - }, +const mocks = vi.hoisted(() => { + const state = { + queuedSessionDelivery: null as Record | null, + }; + + return { + resolveSessionAgentId: vi.fn(() => "agent-from-key"), + get queuedSessionDelivery() { + return state.queuedSessionDelivery; }, - })), - formatRestartSentinelMessage: vi.fn(() => "restart message"), - summarizeRestartSentinel: vi.fn(() => "restart summary"), - resolveMainSessionKeyFromConfig: vi.fn(() => "agent:main:main"), - parseSessionThreadInfo: vi.fn( - (): { baseSessionKey: string | null | undefined; threadId: string | undefined } => ({ - baseSessionKey: null, - threadId: undefined, - }), - ), - loadSessionEntry: vi.fn( - (): LoadedSessionEntry => ({ - cfg: {}, - entry: { - sessionId: "agent:main:main", - updatedAt: 0, + set queuedSessionDelivery(value: Record | null) { + state.queuedSessionDelivery = value; + }, + readRestartSentinel: vi.fn(async () => ({ + payload: { + sessionKey: "agent:main:main", + deliveryContext: { + channel: "whatsapp", + to: "+15550002", + accountId: "acct-2", + }, }, - store: {}, - storePath: "/tmp/sessions.json", - canonicalKey: "agent:main:main", - legacyKey: undefined, + })), + removeRestartSentinelFile: vi.fn(async () => undefined), + resolveRestartSentinelPath: vi.fn(() => "/tmp/restart-sentinel.json"), + formatRestartSentinelMessage: vi.fn(() => "restart message"), + summarizeRestartSentinel: vi.fn(() => "restart summary"), + resolveMainSessionKeyFromConfig: vi.fn(() => "agent:main:main"), + parseSessionThreadInfo: vi.fn( + (): { baseSessionKey: string | null | undefined; threadId: string | undefined } => ({ + baseSessionKey: null, + threadId: undefined, + }), + ), + loadSessionEntry: vi.fn( + (): LoadedSessionEntry => ({ + cfg: {}, + entry: { + sessionId: "agent:main:main", + updatedAt: 0, + }, + store: {}, + storePath: "/tmp/sessions.json", + canonicalKey: "agent:main:main", + legacyKey: undefined, + }), + ), + deliveryContextFromSession: vi.fn( + (): + | { channel?: string; to?: string; accountId?: string; threadId?: string | number } + | undefined => undefined, + ), + mergeDeliveryContext: vi.fn((a?: Record, b?: Record) => ({ + ...b, + ...a, + })), + getChannelPlugin: vi.fn((): ChannelPlugin | undefined => undefined), + normalizeChannelId: vi.fn<(channel?: string | null) => string | null>(), + resolveOutboundTarget: vi.fn(((_params?: { to?: string }) => ({ + ok: true as const, + to: "+15550002", + })) as (params?: { to?: string }) => { ok: true; to: string } | { ok: false; error: Error }), + deliverOutboundPayloads: vi.fn(async () => [{ channel: "whatsapp", messageId: "msg-1" }]), + enqueueDelivery: vi.fn(async () => "queue-1"), + ackDelivery: vi.fn(async () => {}), + failDelivery: vi.fn(async () => {}), + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + enqueueSessionDelivery: vi.fn(async (payload: Record) => { + state.queuedSessionDelivery = payload; + return "session-delivery-1"; }), - ), - deliveryContextFromSession: vi.fn( - (): - | { channel?: string; to?: string; accountId?: string; threadId?: string | number } - | undefined => undefined, - ), - mergeDeliveryContext: vi.fn((a?: Record, b?: Record) => ({ - ...b, - ...a, - })), - getChannelPlugin: vi.fn((): ChannelPlugin | undefined => undefined), - normalizeChannelId: vi.fn<(channel?: string | null) => string | null>(), - resolveOutboundTarget: vi.fn(((_params?: { to?: string }) => ({ - ok: true as const, - to: "+15550002", - })) as (params?: { to?: string }) => { ok: true; to: string } | { ok: false; error: Error }), - deliverOutboundPayloads: vi.fn(async () => [{ channel: "whatsapp", messageId: "msg-1" }]), - enqueueDelivery: vi.fn(async () => "queue-1"), - ackDelivery: vi.fn(async () => {}), - failDelivery: vi.fn(async () => {}), - enqueueSystemEvent: vi.fn(), - requestHeartbeatNow: vi.fn(), - injectTimestamp: vi.fn((message: string) => `stamped:${message}`), - timestampOptsFromConfig: vi.fn(() => ({})), - recordInboundSessionAndDispatchReply: vi.fn( - async (_params: RecordInboundSessionAndDispatchReplyParams) => {}, - ), - logWarn: vi.fn(), -})); + drainPendingSessionDeliveries: vi.fn( + async (params: { + logLabel: string; + log: { warn: (message: string) => void }; + selectEntry: (entry: Record, now: number) => { match: boolean }; + deliver: (entry: Record) => Promise; + }) => { + if (!state.queuedSessionDelivery) { + return; + } + const entry = { + id: "session-delivery-1", + enqueuedAt: 1, + retryCount: 0, + ...state.queuedSessionDelivery, + }; + if (!params.selectEntry(entry, Date.now()).match) { + return; + } + try { + await params.deliver(entry); + } catch (err) { + params.log.warn(`${params.logLabel}: retry failed for entry ${entry.id}: ${String(err)}`); + } + }, + ), + recoverPendingSessionDeliveries: vi.fn(async () => ({ + recovered: 0, + failed: 0, + skippedMaxRetries: 0, + deferredBackoff: 0, + })), + injectTimestamp: vi.fn((message: string) => `stamped:${message}`), + timestampOptsFromConfig: vi.fn(() => ({})), + recordInboundSessionAndDispatchReply: vi.fn( + async (_params: RecordInboundSessionAndDispatchReplyParams) => {}, + ), + logInfo: vi.fn(), + logWarn: vi.fn(), + logError: vi.fn(), + }; +}); vi.mock("../agents/agent-scope.js", () => ({ resolveSessionAgentId: mocks.resolveSessionAgentId, })); vi.mock("../infra/restart-sentinel.js", () => ({ - consumeRestartSentinel: mocks.consumeRestartSentinel, + readRestartSentinel: mocks.readRestartSentinel, + removeRestartSentinelFile: mocks.removeRestartSentinelFile, + resolveRestartSentinelPath: mocks.resolveRestartSentinelPath, formatRestartSentinelMessage: mocks.formatRestartSentinelMessage, summarizeRestartSentinel: mocks.summarizeRestartSentinel, })); +vi.mock("../infra/session-delivery-queue.js", () => ({ + enqueueSessionDelivery: mocks.enqueueSessionDelivery, + drainPendingSessionDeliveries: mocks.drainPendingSessionDeliveries, + recoverPendingSessionDeliveries: mocks.recoverPendingSessionDeliveries, +})); + vi.mock("../config/sessions.js", () => ({ resolveMainSessionKeyFromConfig: mocks.resolveMainSessionKeyFromConfig, })); @@ -150,7 +210,9 @@ vi.mock("../infra/heartbeat-wake.js", async () => { vi.mock("../logging/subsystem.js", () => ({ createSubsystemLogger: vi.fn(() => ({ + info: mocks.logInfo, warn: mocks.logWarn, + error: mocks.logError, })), })); @@ -168,7 +230,8 @@ describe("scheduleRestartSentinelWake", () => { beforeEach(() => { vi.useRealTimers(); - mocks.consumeRestartSentinel.mockResolvedValue({ + mocks.queuedSessionDelivery = null; + mocks.readRestartSentinel.mockResolvedValue({ payload: { sessionKey: "agent:main:main", deliveryContext: { @@ -207,11 +270,17 @@ describe("scheduleRestartSentinelWake", () => { mocks.failDelivery.mockClear(); mocks.enqueueSystemEvent.mockClear(); mocks.requestHeartbeatNow.mockClear(); + mocks.enqueueSessionDelivery.mockClear(); + mocks.drainPendingSessionDeliveries.mockClear(); + mocks.recoverPendingSessionDeliveries.mockClear(); + mocks.removeRestartSentinelFile.mockClear(); mocks.injectTimestamp.mockClear(); mocks.timestampOptsFromConfig.mockClear(); mocks.recordInboundSessionAndDispatchReply.mockReset(); mocks.recordInboundSessionAndDispatchReply.mockResolvedValue(undefined); + mocks.logInfo.mockClear(); mocks.logWarn.mockClear(); + mocks.logError.mockClear(); }); it("enqueues the sentinel note and wakes the session even when outbound delivery succeeds", async () => { @@ -316,7 +385,7 @@ describe("scheduleRestartSentinelWake", () => { it("still dispatches continuation after restart notice retries are exhausted", async () => { vi.useFakeTimers(); mocks.deliverOutboundPayloads.mockRejectedValue(new Error("transport still not ready")); - mocks.consumeRestartSentinel.mockResolvedValue({ + mocks.readRestartSentinel.mockResolvedValue({ payload: { sessionKey: "agent:main:main", deliveryContext: { @@ -330,7 +399,7 @@ describe("scheduleRestartSentinelWake", () => { message: "continue", }, }, - } as unknown as Awaited>); + } as unknown as Awaited>); const wakePromise = scheduleRestartSentinelWake({ deps: {} as never }); await Promise.resolve(); @@ -354,7 +423,7 @@ describe("scheduleRestartSentinelWake", () => { it("prefers top-level sentinel threadId for wake routing context", async () => { // Legacy or malformed sentinel JSON can still carry a nested threadId. - mocks.consumeRestartSentinel.mockResolvedValue({ + mocks.readRestartSentinel.mockResolvedValue({ payload: { sessionKey: "agent:main:main", deliveryContext: { @@ -365,7 +434,7 @@ describe("scheduleRestartSentinelWake", () => { } as never, threadId: "fresh-thread", }, - } as unknown as Awaited>); + } as unknown as Awaited>); await scheduleRestartSentinelWake({ deps: {} as never }); @@ -381,7 +450,7 @@ describe("scheduleRestartSentinelWake", () => { }); it("dispatches agentTurn continuation after the restart notice in the same routed thread", async () => { - mocks.consumeRestartSentinel.mockResolvedValue({ + mocks.readRestartSentinel.mockResolvedValue({ payload: { sessionKey: "agent:main:main", deliveryContext: { @@ -396,7 +465,7 @@ describe("scheduleRestartSentinelWake", () => { message: "Reply with exactly: Yay! I did it!", }, }, - } as Awaited>); + } as Awaited>); mocks.recordInboundSessionAndDispatchReply.mockImplementationOnce(async (params) => { await params.deliver({ text: "done", @@ -436,7 +505,7 @@ describe("scheduleRestartSentinelWake", () => { }); it("preserves the session chat type for agentTurn continuations", async () => { - mocks.consumeRestartSentinel.mockResolvedValue({ + mocks.readRestartSentinel.mockResolvedValue({ payload: { sessionKey: "agent:main:group", deliveryContext: { @@ -450,7 +519,7 @@ describe("scheduleRestartSentinelWake", () => { message: "continue", }, }, - } as Awaited>); + } as Awaited>); mocks.loadSessionEntry.mockReturnValue({ cfg: {}, entry: { @@ -502,7 +571,7 @@ describe("scheduleRestartSentinelWake", () => { }), }, }); - mocks.consumeRestartSentinel.mockResolvedValue({ + mocks.readRestartSentinel.mockResolvedValue({ payload: { sessionKey: "agent:main:main", deliveryContext: { @@ -517,7 +586,7 @@ describe("scheduleRestartSentinelWake", () => { message: "continue", }, }, - } as Awaited>); + } as Awaited>); mocks.recordInboundSessionAndDispatchReply.mockImplementationOnce(async (params) => { await params.deliver({ text: "done", @@ -548,7 +617,7 @@ describe("scheduleRestartSentinelWake", () => { }); it("strips synthetic reply transport ids when no real reply target exists", async () => { - mocks.consumeRestartSentinel.mockResolvedValue({ + mocks.readRestartSentinel.mockResolvedValue({ payload: { sessionKey: "agent:main:main", deliveryContext: { @@ -562,7 +631,7 @@ describe("scheduleRestartSentinelWake", () => { message: "continue", }, }, - } as Awaited>); + } as Awaited>); mocks.recordInboundSessionAndDispatchReply.mockImplementationOnce(async (params) => { await params.deliver({ text: "done", @@ -580,7 +649,7 @@ describe("scheduleRestartSentinelWake", () => { }); it("preserves non-synthetic reply transport ids from continuation payloads", async () => { - mocks.consumeRestartSentinel.mockResolvedValue({ + mocks.readRestartSentinel.mockResolvedValue({ payload: { sessionKey: "agent:main:main", deliveryContext: { @@ -594,7 +663,7 @@ describe("scheduleRestartSentinelWake", () => { message: "continue", }, }, - } as Awaited>); + } as Awaited>); mocks.recordInboundSessionAndDispatchReply.mockImplementationOnce(async (params) => { await params.deliver({ text: "done", @@ -617,7 +686,7 @@ describe("scheduleRestartSentinelWake", () => { }); it("dispatches agentTurn continuation from session delivery context when sentinel routing is empty", async () => { - mocks.consumeRestartSentinel.mockResolvedValue({ + mocks.readRestartSentinel.mockResolvedValue({ payload: { sessionKey: "agent:main:main", ts: 123, @@ -626,7 +695,7 @@ describe("scheduleRestartSentinelWake", () => { message: "continue", }, }, - } as unknown as Awaited>); + } as unknown as Awaited>); mocks.deliveryContextFromSession.mockReturnValue({ channel: "telegram", to: "telegram:200482621", @@ -653,7 +722,7 @@ describe("scheduleRestartSentinelWake", () => { }); it("requests another wake after enqueueing a systemEvent continuation", async () => { - mocks.consumeRestartSentinel.mockResolvedValue({ + mocks.readRestartSentinel.mockResolvedValue({ payload: { sessionKey: "agent:main:main", deliveryContext: { @@ -668,7 +737,7 @@ describe("scheduleRestartSentinelWake", () => { text: "continue after restart", }, }, - } as Awaited>); + } as Awaited>); await scheduleRestartSentinelWake({ deps: {} as never }); @@ -696,7 +765,7 @@ describe("scheduleRestartSentinelWake", () => { }); it("enqueues systemEvent continuation without stale partial delivery context", async () => { - mocks.consumeRestartSentinel.mockResolvedValue({ + mocks.readRestartSentinel.mockResolvedValue({ payload: { sessionKey: "agent:main:main", deliveryContext: { @@ -711,7 +780,7 @@ describe("scheduleRestartSentinelWake", () => { text: "continue after restart", }, }, - } as Awaited>); + } as Awaited>); mocks.resolveOutboundTarget.mockReturnValueOnce({ ok: false, error: new Error("missing route"), @@ -719,13 +788,23 @@ describe("scheduleRestartSentinelWake", () => { await scheduleRestartSentinelWake({ deps: {} as never }); - expect(mocks.enqueueSystemEvent).toHaveBeenNthCalledWith(2, "continue after restart", { - sessionKey: "agent:main:main", - }); + expect(mocks.enqueueSystemEvent).toHaveBeenNthCalledWith( + 2, + "continue after restart", + expect.objectContaining({ + sessionKey: "agent:main:main", + deliveryContext: expect.objectContaining({ + channel: "whatsapp", + to: "+15550002", + accountId: "acct-2", + threadId: "thread-42", + }), + }), + ); }); it("logs and continues when continuation delivery fails", async () => { - mocks.consumeRestartSentinel.mockResolvedValue({ + mocks.readRestartSentinel.mockResolvedValue({ payload: { sessionKey: "agent:main:main", deliveryContext: { @@ -739,7 +818,7 @@ describe("scheduleRestartSentinelWake", () => { message: "continue", }, }, - } as Awaited>); + } as Awaited>); mocks.recordInboundSessionAndDispatchReply.mockRejectedValueOnce(new Error("dispatch failed")); await scheduleRestartSentinelWake({ deps: {} as never }); @@ -751,16 +830,12 @@ describe("scheduleRestartSentinelWake", () => { }), ); expect(mocks.logWarn).toHaveBeenCalledWith( - expect.stringContaining("continuation delivery failed"), - expect.objectContaining({ - sessionKey: "agent:main:main", - continuationKind: "agentTurn", - }), + expect.stringContaining("retry failed for entry session-delivery-1: Error: dispatch failed"), ); }); it("logs and continues when continuation dispatch reports a delivery error", async () => { - mocks.consumeRestartSentinel.mockResolvedValue({ + mocks.readRestartSentinel.mockResolvedValue({ payload: { sessionKey: "agent:main:main", deliveryContext: { @@ -774,7 +849,7 @@ describe("scheduleRestartSentinelWake", () => { message: "continue", }, }, - } as Awaited>); + } as Awaited>); mocks.recordInboundSessionAndDispatchReply.mockImplementationOnce( async (params: { onDispatchError: (err: unknown, info: { kind: string }) => void }) => { params.onDispatchError(new Error("route failed"), { kind: "final" }); @@ -784,16 +859,12 @@ describe("scheduleRestartSentinelWake", () => { await scheduleRestartSentinelWake({ deps: {} as never }); expect(mocks.logWarn).toHaveBeenCalledWith( - expect.stringContaining("continuation delivery failed"), - expect.objectContaining({ - sessionKey: "agent:main:main", - continuationKind: "agentTurn", - }), + expect.stringContaining("retry failed for entry session-delivery-1: Error: route failed"), ); }); - it("warns and skips agentTurn continuation when restart routing cannot resolve a destination", async () => { - mocks.consumeRestartSentinel.mockResolvedValue({ + it("falls back to a session wake when restart routing cannot resolve a destination", async () => { + mocks.readRestartSentinel.mockResolvedValue({ payload: { sessionKey: "agent:main:main", deliveryContext: { @@ -807,7 +878,7 @@ describe("scheduleRestartSentinelWake", () => { message: "continue", }, }, - } as Awaited>); + } as Awaited>); mocks.resolveOutboundTarget.mockReturnValueOnce({ ok: false, error: new Error("missing route"), @@ -816,17 +887,51 @@ describe("scheduleRestartSentinelWake", () => { await scheduleRestartSentinelWake({ deps: {} as never }); expect(mocks.recordInboundSessionAndDispatchReply).not.toHaveBeenCalled(); - expect(mocks.logWarn).toHaveBeenCalledWith( - expect.stringContaining("restart continuation route unavailable"), + expect(mocks.enqueueSystemEvent).toHaveBeenNthCalledWith( + 2, + "continue", expect.objectContaining({ sessionKey: "agent:main:main", - continuationKind: "agentTurn", + }), + ); + expect(mocks.requestHeartbeatNow).toHaveBeenCalledTimes(2); + expect(mocks.logWarn).not.toHaveBeenCalled(); + }); + + it("keeps the sentinel file when durable continuation handoff fails", async () => { + mocks.readRestartSentinel.mockResolvedValue({ + payload: { + sessionKey: "agent:main:main", + deliveryContext: { + channel: "whatsapp", + to: "+15550002", + accountId: "acct-2", + }, + ts: 123, + continuation: { + kind: "agentTurn", + message: "continue", + }, + }, + } as Awaited>); + mocks.enqueueSessionDelivery.mockRejectedValueOnce(new Error("queue write failed")); + + await scheduleRestartSentinelWake({ deps: {} as never }); + + expect(mocks.removeRestartSentinelFile).not.toHaveBeenCalled(); + expect(mocks.drainPendingSessionDeliveries).not.toHaveBeenCalled(); + expect(mocks.logWarn).toHaveBeenCalledWith( + "startup task failed", + expect.objectContaining({ + source: "restart-sentinel", + sessionKey: "agent:main:main", + reason: "queue write failed", }), ); }); it("consumes continuation once and does not replay it on later startup cycles", async () => { - mocks.consumeRestartSentinel + mocks.readRestartSentinel .mockResolvedValueOnce({ payload: { sessionKey: "agent:main:main", @@ -841,9 +946,9 @@ describe("scheduleRestartSentinelWake", () => { message: "continue", }, }, - } as Awaited>) + } as Awaited>) .mockResolvedValueOnce( - null as unknown as Awaited>, + null as unknown as Awaited>, ); await scheduleRestartSentinelWake({ deps: {} as never }); @@ -853,11 +958,11 @@ describe("scheduleRestartSentinelWake", () => { }); it("does not wake the main session when the sentinel has no sessionKey", async () => { - mocks.consumeRestartSentinel.mockResolvedValue({ + mocks.readRestartSentinel.mockResolvedValue({ payload: { message: "restart message", }, - } as unknown as Awaited>); + } as unknown as Awaited>); await scheduleRestartSentinelWake({ deps: {} as never }); @@ -869,7 +974,7 @@ describe("scheduleRestartSentinelWake", () => { }); it("warns when continuation cannot run because the restart sentinel has no sessionKey", async () => { - mocks.consumeRestartSentinel.mockResolvedValue({ + mocks.readRestartSentinel.mockResolvedValue({ payload: { message: "restart message", continuation: { @@ -877,7 +982,7 @@ describe("scheduleRestartSentinelWake", () => { message: "continue", }, }, - } as unknown as Awaited>); + } as unknown as Awaited>); await scheduleRestartSentinelWake({ deps: {} as never }); @@ -894,11 +999,11 @@ describe("scheduleRestartSentinelWake", () => { ); }); it("skips outbound restart notice when no canonical delivery context survives restart", async () => { - mocks.consumeRestartSentinel.mockResolvedValue({ + mocks.readRestartSentinel.mockResolvedValue({ payload: { sessionKey: "agent:main:matrix:channel:!lowercased:example.org", }, - } as Awaited>); + } as Awaited>); mocks.parseSessionThreadInfo.mockReturnValue({ baseSessionKey: "agent:main:matrix:channel:!lowercased:example.org", threadId: undefined, @@ -919,11 +1024,11 @@ describe("scheduleRestartSentinelWake", () => { }); it("resolves session routing before queueing the heartbeat wake", async () => { - mocks.consumeRestartSentinel.mockResolvedValue({ + mocks.readRestartSentinel.mockResolvedValue({ payload: { sessionKey: "agent:main:qa-channel:channel:qa-room", }, - } as Awaited>); + } as Awaited>); mocks.parseSessionThreadInfo.mockReturnValue({ baseSessionKey: "agent:main:qa-channel:channel:qa-room", threadId: undefined, @@ -961,11 +1066,11 @@ describe("scheduleRestartSentinelWake", () => { }); it("merges base session routing into partial thread metadata", async () => { - mocks.consumeRestartSentinel.mockResolvedValue({ + mocks.readRestartSentinel.mockResolvedValue({ payload: { sessionKey: "agent:main:matrix:channel:!lowercased:example.org:thread:$thread-event", }, - } as Awaited>); + } as Awaited>); mocks.parseSessionThreadInfo.mockReturnValue({ baseSessionKey: "agent:main:matrix:channel:!lowercased:example.org", threadId: "$thread-event", diff --git a/src/gateway/server-restart-sentinel.ts b/src/gateway/server-restart-sentinel.ts index 55cbd06a076..19811f3c934 100644 --- a/src/gateway/server-restart-sentinel.ts +++ b/src/gateway/server-restart-sentinel.ts @@ -14,11 +14,22 @@ import { ackDelivery, enqueueDelivery, failDelivery } from "../infra/outbound/de import { buildOutboundSessionContext } from "../infra/outbound/session-context.js"; import { resolveOutboundTarget } from "../infra/outbound/targets.js"; import { - consumeRestartSentinel, formatRestartSentinelMessage, + readRestartSentinel, + removeRestartSentinelFile, type RestartSentinelContinuation, + resolveRestartSentinelPath, summarizeRestartSentinel, } from "../infra/restart-sentinel.js"; +import { + drainPendingSessionDeliveries, + enqueueSessionDelivery, + recoverPendingSessionDeliveries, + type QueuedSessionDelivery, + type QueuedSessionDeliveryPayload, + type SessionDeliveryRecoveryLogger, + type SessionDeliveryRoute, +} from "../infra/session-delivery-queue.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { recordInboundSessionAndDispatchReply } from "../plugin-sdk/inbound-reply-dispatch.js"; @@ -144,15 +155,6 @@ function buildRestartContinuationMessageId(params: { return `restart-sentinel:${params.sessionKey}:${params.kind}:${params.ts}`; } -type RestartContinuationRoute = { - channel: string; - to: string; - accountId?: string; - replyToId?: string; - threadId?: string; - chatType: ChatType; -}; - function resolveRestartContinuationRoute(params: { channel?: string; to?: string; @@ -160,7 +162,7 @@ function resolveRestartContinuationRoute(params: { replyToId?: string; threadId?: string; chatType: ChatType; -}): RestartContinuationRoute | undefined { +}): SessionDeliveryRoute | undefined { if (!params.channel || !params.to) { return undefined; } @@ -187,64 +189,84 @@ function resolveRestartContinuationOutboundPayload(params: { return params.replyToId ? { ...payload, replyToId: params.replyToId } : payload; } -async function dispatchRestartSentinelContinuation(params: { +function resolveQueuedSessionDeliveryContext(entry: QueuedSessionDelivery): + | { + channel?: string; + to?: string; + accountId?: string; + threadId?: string | number; + } + | undefined { + if (entry.kind === "agentTurn" && entry.route) { + return { + channel: entry.route.channel, + to: entry.route.to, + ...(entry.route.accountId ? { accountId: entry.route.accountId } : {}), + ...(entry.route.threadId ? { threadId: entry.route.threadId } : {}), + }; + } + return entry.deliveryContext; +} + +async function deliverQueuedSessionDelivery(params: { deps: CliDeps; - cfg: ReturnType["cfg"]; - storePath: string; - sessionKey: string; - continuation: RestartSentinelContinuation; - ts: number; - route?: RestartContinuationRoute; + entry: QueuedSessionDelivery; }) { - if (params.continuation.kind === "systemEvent") { - enqueueSystemEvent(params.continuation.text, { - sessionKey: params.sessionKey, - ...(params.route + const { cfg, storePath, canonicalKey } = loadSessionEntry(params.entry.sessionKey); + + if (params.entry.kind === "systemEvent") { + enqueueSystemEvent(params.entry.text, { + sessionKey: canonicalKey, + ...(resolveQueuedSessionDeliveryContext(params.entry) ? { deliveryContext: { - channel: params.route.channel, - to: params.route.to, - ...(params.route.accountId ? { accountId: params.route.accountId } : {}), - ...(params.route.threadId ? { threadId: params.route.threadId } : {}), + ...resolveQueuedSessionDeliveryContext(params.entry), }, } : {}), }); - requestHeartbeatNow({ reason: "wake", sessionKey: params.sessionKey }); + requestHeartbeatNow({ reason: "wake", sessionKey: canonicalKey }); return; } - if (!params.route) { - throw new Error("restart continuation route unavailable"); + if (!params.entry.route) { + enqueueSystemEvent(params.entry.message, { + sessionKey: canonicalKey, + ...(resolveQueuedSessionDeliveryContext(params.entry) + ? { + deliveryContext: { + ...resolveQueuedSessionDeliveryContext(params.entry), + }, + } + : {}), + }); + requestHeartbeatNow({ reason: "wake", sessionKey: canonicalKey }); + return; } - const route = params.route; - const messageId = buildRestartContinuationMessageId({ - sessionKey: params.sessionKey, - kind: params.continuation.kind, - ts: params.ts, - }); - const userMessage = params.continuation.message.trim(); + const route = params.entry.route; + const messageId = params.entry.messageId; + const userMessage = params.entry.message.trim(); const agentId = resolveSessionAgentId({ - sessionKey: params.sessionKey, - config: params.cfg, + sessionKey: canonicalKey, + config: cfg, }); let dispatchError: unknown; await recordInboundSessionAndDispatchReply({ - cfg: params.cfg, + cfg, channel: route.channel, accountId: route.accountId, agentId, - routeSessionKey: params.sessionKey, - storePath: params.storePath, + routeSessionKey: canonicalKey, + storePath, ctxPayload: finalizeInboundContext( { Body: userMessage, - BodyForAgent: injectTimestamp(userMessage, timestampOptsFromConfig(params.cfg)), + BodyForAgent: injectTimestamp(userMessage, timestampOptsFromConfig(cfg)), BodyForCommands: "", RawBody: userMessage, CommandBody: "", - SessionKey: params.sessionKey, + SessionKey: canonicalKey, AccountId: route.accountId, MessageSid: messageId, Timestamp: Date.now(), @@ -272,7 +294,7 @@ async function dispatchRestartSentinelContinuation(params: { replyToId: route.replyToId, }); const results = await deliverOutboundPayloads({ - cfg: params.cfg, + cfg, channel: route.channel, to: route.to, accountId: route.accountId, @@ -280,8 +302,8 @@ async function dispatchRestartSentinelContinuation(params: { threadId: route.threadId, payloads: [outboundPayload], session: buildOutboundSessionContext({ - cfg: params.cfg, - sessionKey: params.sessionKey, + cfg, + sessionKey: canonicalKey, }), deps: params.deps, bestEffort: false, @@ -292,7 +314,7 @@ async function dispatchRestartSentinelContinuation(params: { }, onRecordError: (err) => { log.warn(`restart continuation failed to record inbound session metadata: ${String(err)}`, { - sessionKey: params.sessionKey, + sessionKey: canonicalKey, }); }, onDispatchError: (err) => { @@ -304,13 +326,78 @@ async function dispatchRestartSentinelContinuation(params: { } } +function buildQueuedRestartContinuation(params: { + sessionKey: string; + continuation: RestartSentinelContinuation; + route?: SessionDeliveryRoute; + ts: number; + deliveryContext?: { + channel?: string; + to?: string; + accountId?: string; + threadId?: string | number; + }; +}): QueuedSessionDeliveryPayload { + const idempotencyKey = buildRestartContinuationMessageId({ + sessionKey: params.sessionKey, + kind: params.continuation.kind, + ts: params.ts, + }); + if (params.continuation.kind === "systemEvent") { + return { + kind: "systemEvent", + sessionKey: params.sessionKey, + text: params.continuation.text, + ...(params.deliveryContext ? { deliveryContext: params.deliveryContext } : {}), + idempotencyKey, + }; + } + return { + kind: "agentTurn", + sessionKey: params.sessionKey, + message: params.continuation.message, + messageId: idempotencyKey, + ...(params.route ? { route: params.route } : {}), + ...(params.deliveryContext ? { deliveryContext: params.deliveryContext } : {}), + idempotencyKey, + }; +} + +async function drainRestartContinuationQueue(params: { + deps: CliDeps; + entryId: string; + log: SessionDeliveryRecoveryLogger; +}) { + await drainPendingSessionDeliveries({ + drainKey: `restart-continuation:${params.entryId}`, + logLabel: "restart continuation", + log: params.log, + deliver: (entry) => deliverQueuedSessionDelivery({ deps: params.deps, entry }), + selectEntry: (entry) => ({ + match: entry.id === params.entryId, + bypassBackoff: true, + }), + }); +} + +export async function recoverPendingRestartContinuationDeliveries(params: { + deps: CliDeps; + log?: SessionDeliveryRecoveryLogger; +}) { + await recoverPendingSessionDeliveries({ + deliver: (entry) => deliverQueuedSessionDelivery({ deps: params.deps, entry }), + log: params.log ?? log, + }); +} + async function loadRestartSentinelStartupTask(params: { deps: CliDeps; }): Promise { - const sentinel = await consumeRestartSentinel(); + const sentinel = await readRestartSentinel(); if (!sentinel) { return null; } + const sentinelPath = resolveRestartSentinelPath(); const payload = sentinel.payload; const sessionKey = payload.sessionKey?.trim(); const message = formatRestartSentinelMessage(payload); @@ -332,12 +419,13 @@ async function loadRestartSentinelStartupTask(params: { continuationKind: payload.continuation.kind, }); } + await removeRestartSentinelFile(sentinelPath); return { status: "ran" as const }; } const { baseSessionKey, threadId: sessionThreadId } = parseSessionThreadInfo(sessionKey); - const { cfg, entry, canonicalKey, storePath } = loadSessionEntry(sessionKey); + const { cfg, entry, canonicalKey } = loadSessionEntry(sessionKey); const sentinelContext = payload.deliveryContext; let sessionDeliveryContext = deliveryContextFromSession(entry); @@ -357,8 +445,6 @@ async function loadRestartSentinelStartupTask(params: { const origin = mergeDeliveryContext(sentinelContext, sessionDeliveryContext); - enqueueRestartSentinelWake(message, sessionKey, wakeDeliveryContext); - const channelRaw = origin?.channel; const channel = channelRaw ? normalizeChannelId(channelRaw) : null; const to = origin?.to; @@ -369,6 +455,7 @@ async function loadRestartSentinelStartupTask(params: { let resolvedTo: string | undefined; let replyToId: string | undefined; let resolvedThreadId = threadId; + let continuationQueueId: string | undefined; if (channel && to) { const resolved = resolveOutboundTarget({ @@ -393,54 +480,69 @@ async function loadRestartSentinelStartupTask(params: { ? String(replyTransport.threadId) : undefined : threadId; - const outboundSession = buildOutboundSessionContext({ - cfg, - sessionKey: canonicalKey, - }); - - await deliverRestartSentinelNotice({ - deps: params.deps, - cfg, - sessionKey: canonicalKey, - summary, - message, - channel, - to: resolvedTo, - accountId: origin?.accountId, - replyToId, - threadId: resolvedThreadId, - session: outboundSession, - }); + // Keep the resolved route for the queued continuation and restart notice. } } - if (!payload.continuation) { - return { status: "ran" as const }; + if (payload.continuation) { + continuationQueueId = await enqueueSessionDelivery( + buildQueuedRestartContinuation({ + sessionKey: canonicalKey, + continuation: payload.continuation, + ts: payload.ts, + route: resolveRestartContinuationRoute({ + channel: channel ?? undefined, + to: resolvedTo, + accountId: origin?.accountId, + replyToId, + threadId: resolvedThreadId, + chatType, + }), + deliveryContext: + resolvedTo && channel + ? { + channel, + to: resolvedTo, + ...(origin?.accountId ? { accountId: origin.accountId } : {}), + ...(resolvedThreadId ? { threadId: resolvedThreadId } : {}), + } + : wakeDeliveryContext, + }), + ); } - try { - await dispatchRestartSentinelContinuation({ + await removeRestartSentinelFile(sentinelPath); + enqueueRestartSentinelWake(message, sessionKey, wakeDeliveryContext); + + if (resolvedTo && channel) { + const outboundSession = buildOutboundSessionContext({ + cfg, + sessionKey: canonicalKey, + }); + + await deliverRestartSentinelNotice({ deps: params.deps, cfg, - storePath, sessionKey: canonicalKey, - continuation: payload.continuation, - ts: payload.ts, - route: resolveRestartContinuationRoute({ - channel: channel ?? undefined, - to: resolvedTo, - accountId: origin?.accountId, - replyToId, - threadId: resolvedThreadId, - chatType, - }), - }); - } catch (err) { - log.warn(`${summary}: continuation delivery failed: ${String(err)}`, { - sessionKey: canonicalKey, - continuationKind: payload.continuation.kind, + summary, + message, + channel, + to: resolvedTo, + accountId: origin?.accountId, + replyToId, + threadId: resolvedThreadId, + session: outboundSession, }); } + + if (continuationQueueId) { + await drainRestartContinuationQueue({ + deps: params.deps, + entryId: continuationQueueId, + log, + }); + } + return { status: "ran" as const }; }; diff --git a/src/gateway/server-runtime-services.test.ts b/src/gateway/server-runtime-services.test.ts index 729ca45caaf..6105891b670 100644 --- a/src/gateway/server-runtime-services.test.ts +++ b/src/gateway/server-runtime-services.test.ts @@ -11,6 +11,7 @@ const hoisted = vi.hoisted(() => { startChannelHealthMonitor: vi.fn(() => ({ stop: vi.fn() })), startGatewayModelPricingRefresh: vi.fn(() => vi.fn()), recoverPendingDeliveries: vi.fn(async () => undefined), + recoverPendingRestartContinuationDeliveries: vi.fn(async () => undefined), deliverOutboundPayloads: vi.fn(), }; }); @@ -27,6 +28,10 @@ vi.mock("../infra/outbound/delivery-queue.js", () => ({ recoverPendingDeliveries: hoisted.recoverPendingDeliveries, })); +vi.mock("./server-restart-sentinel.js", () => ({ + recoverPendingRestartContinuationDeliveries: hoisted.recoverPendingRestartContinuationDeliveries, +})); + vi.mock("./channel-health-monitor.js", () => ({ startChannelHealthMonitor: hoisted.startChannelHealthMonitor, })); @@ -47,6 +52,7 @@ describe("server-runtime-services", () => { hoisted.startChannelHealthMonitor.mockClear(); hoisted.startGatewayModelPricingRefresh.mockClear(); hoisted.recoverPendingDeliveries.mockClear(); + hoisted.recoverPendingRestartContinuationDeliveries.mockClear(); hoisted.deliverOutboundPayloads.mockClear(); }); @@ -71,12 +77,14 @@ describe("server-runtime-services", () => { }); it("activates heartbeat, cron, and delivery recovery after sidecars are ready", async () => { + vi.useFakeTimers(); const cron = { start: vi.fn(async () => undefined) }; const log = createLog(); const services = activateGatewayScheduledServices({ minimalTestGateway: false, cfgAtStart: {} as never, + deps: {} as never, cron, logCron: { error: vi.fn() }, log, @@ -85,6 +93,7 @@ describe("server-runtime-services", () => { expect(hoisted.startHeartbeatRunner).toHaveBeenCalledTimes(1); expect(cron.start).toHaveBeenCalledTimes(1); expect(services.heartbeatRunner).toBe(hoisted.heartbeatRunner); + await vi.advanceTimersByTimeAsync(1_250); await vi.dynamicImportSettled(); expect(hoisted.recoverPendingDeliveries).toHaveBeenCalledWith( expect.objectContaining({ @@ -92,6 +101,11 @@ describe("server-runtime-services", () => { cfg: {}, }), ); + expect(hoisted.recoverPendingRestartContinuationDeliveries).toHaveBeenCalledWith( + expect.objectContaining({ + deps: {}, + }), + ); }); it("keeps scheduled services disabled for minimal test gateways", () => { @@ -100,6 +114,7 @@ describe("server-runtime-services", () => { const services = activateGatewayScheduledServices({ minimalTestGateway: true, cfgAtStart: {} as never, + deps: {} as never, cron, logCron: { error: vi.fn() }, log: createLog(), @@ -108,6 +123,7 @@ describe("server-runtime-services", () => { expect(hoisted.startHeartbeatRunner).not.toHaveBeenCalled(); expect(cron.start).not.toHaveBeenCalled(); expect(hoisted.recoverPendingDeliveries).not.toHaveBeenCalled(); + expect(hoisted.recoverPendingRestartContinuationDeliveries).not.toHaveBeenCalled(); services.heartbeatRunner.stop(); expect(hoisted.heartbeatRunner.stop).not.toHaveBeenCalled(); diff --git a/src/gateway/server-runtime-services.ts b/src/gateway/server-runtime-services.ts index dd5cf2460df..0f62c66f326 100644 --- a/src/gateway/server-runtime-services.ts +++ b/src/gateway/server-runtime-services.ts @@ -68,6 +68,24 @@ function recoverPendingOutboundDeliveries(params: { })().catch((err) => params.log.error(`Delivery recovery failed: ${String(err)}`)); } +function recoverPendingSessionDeliveries(params: { + deps: import("../cli/deps.types.js").CliDeps; + log: GatewayRuntimeServiceLogger; +}): void { + const timer = setTimeout(() => { + void (async () => { + const { recoverPendingRestartContinuationDeliveries } = + await import("./server-restart-sentinel.js"); + const logRecovery = params.log.child("session-delivery-recovery"); + await recoverPendingRestartContinuationDeliveries({ + deps: params.deps, + log: logRecovery, + }); + })().catch((err) => params.log.error(`Session delivery recovery failed: ${String(err)}`)); + }, 1_250); + timer.unref?.(); +} + export function startGatewayRuntimeServices(params: { minimalTestGateway: boolean; cfgAtStart: OpenClawConfig; @@ -101,6 +119,7 @@ export function startGatewayRuntimeServices(params: { export function activateGatewayScheduledServices(params: { minimalTestGateway: boolean; cfgAtStart: OpenClawConfig; + deps: import("../cli/deps.types.js").CliDeps; cron: { start: () => Promise }; logCron: { error: (message: string) => void }; log: GatewayRuntimeServiceLogger; @@ -117,5 +136,9 @@ export function activateGatewayScheduledServices(params: { cfg: params.cfgAtStart, log: params.log, }); + recoverPendingSessionDeliveries({ + deps: params.deps, + log: params.log, + }); return { heartbeatRunner }; } diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index d35ae5be2ba..81578434e7a 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -855,6 +855,7 @@ export async function startGatewayServer( const activated = activateGatewayScheduledServices({ minimalTestGateway, cfgAtStart, + deps, cron: runtimeState.cronState.cron, logCron, log, diff --git a/src/infra/session-delivery-queue-recovery.ts b/src/infra/session-delivery-queue-recovery.ts new file mode 100644 index 00000000000..803b6c90de2 --- /dev/null +++ b/src/infra/session-delivery-queue-recovery.ts @@ -0,0 +1,259 @@ +import { formatErrorMessage } from "./errors.js"; +import { + ackSessionDelivery, + failSessionDelivery, + loadPendingSessionDelivery, + loadPendingSessionDeliveries, + moveSessionDeliveryToFailed, + type QueuedSessionDelivery, +} from "./session-delivery-queue-storage.js"; + +export type SessionDeliveryRecoverySummary = { + recovered: number; + failed: number; + skippedMaxRetries: number; + deferredBackoff: number; +}; + +export type DeliverSessionDeliveryFn = (entry: QueuedSessionDelivery) => Promise; + +export interface SessionDeliveryRecoveryLogger { + info(msg: string): void; + warn(msg: string): void; + error(msg: string): void; +} + +export interface PendingSessionDeliveryDrainDecision { + match: boolean; + bypassBackoff?: boolean; +} + +export const MAX_SESSION_DELIVERY_RETRIES = 5; + +const BACKOFF_MS: readonly number[] = [5_000, 25_000, 120_000, 600_000]; +const drainInProgress = new Map(); +const entriesInProgress = new Set(); + +function getErrnoCode(err: unknown): string | null { + return err && typeof err === "object" && "code" in err + ? String((err as { code?: unknown }).code) + : null; +} + +function createEmptyRecoverySummary(): SessionDeliveryRecoverySummary { + return { + recovered: 0, + failed: 0, + skippedMaxRetries: 0, + deferredBackoff: 0, + }; +} + +function claimRecoveryEntry(entryId: string): boolean { + if (entriesInProgress.has(entryId)) { + return false; + } + entriesInProgress.add(entryId); + return true; +} + +function releaseRecoveryEntry(entryId: string): void { + entriesInProgress.delete(entryId); +} + +export function computeSessionDeliveryBackoffMs(retryCount: number): number { + if (retryCount <= 0) { + return 0; + } + return BACKOFF_MS[Math.min(retryCount - 1, BACKOFF_MS.length - 1)] ?? BACKOFF_MS.at(-1) ?? 0; +} + +export function isSessionDeliveryEligibleForRetry( + entry: QueuedSessionDelivery, + now: number, +): { eligible: true } | { eligible: false; remainingBackoffMs: number } { + const backoff = computeSessionDeliveryBackoffMs(entry.retryCount + 1); + if (backoff <= 0) { + return { eligible: true }; + } + const firstReplayAfterCrash = entry.retryCount === 0 && entry.lastAttemptAt === undefined; + if (firstReplayAfterCrash) { + return { eligible: true }; + } + const baseAttemptAt = + typeof entry.lastAttemptAt === "number" && entry.lastAttemptAt > 0 + ? entry.lastAttemptAt + : entry.enqueuedAt; + const nextEligibleAt = baseAttemptAt + backoff; + if (now >= nextEligibleAt) { + return { eligible: true }; + } + return { eligible: false, remainingBackoffMs: nextEligibleAt - now }; +} + +async function drainQueuedEntry(opts: { + entry: QueuedSessionDelivery; + deliver: DeliverSessionDeliveryFn; + stateDir?: string; + onRecovered?: (entry: QueuedSessionDelivery) => void; + onFailed?: (entry: QueuedSessionDelivery, errMsg: string) => void; +}): Promise<"recovered" | "failed" | "moved-to-failed" | "already-gone"> { + const { entry } = opts; + try { + await opts.deliver(entry); + await ackSessionDelivery(entry.id, opts.stateDir); + opts.onRecovered?.(entry); + return "recovered"; + } catch (err) { + const errMsg = formatErrorMessage(err); + opts.onFailed?.(entry, errMsg); + try { + await failSessionDelivery(entry.id, errMsg, opts.stateDir); + return "failed"; + } catch (failErr) { + if (getErrnoCode(failErr) === "ENOENT") { + return "already-gone"; + } + return "failed"; + } + } +} + +export async function drainPendingSessionDeliveries(opts: { + drainKey: string; + logLabel: string; + log: SessionDeliveryRecoveryLogger; + stateDir?: string; + deliver: DeliverSessionDeliveryFn; + selectEntry: (entry: QueuedSessionDelivery, now: number) => PendingSessionDeliveryDrainDecision; +}): Promise { + if (drainInProgress.get(opts.drainKey)) { + opts.log.info(`${opts.logLabel}: already in progress for ${opts.drainKey}, skipping`); + return; + } + + drainInProgress.set(opts.drainKey, true); + try { + const matchingEntries = (await loadPendingSessionDeliveries(opts.stateDir)) + .filter((entry) => opts.selectEntry(entry, Date.now()).match) + .toSorted((a, b) => a.enqueuedAt - b.enqueuedAt); + + for (const entry of matchingEntries) { + if (!claimRecoveryEntry(entry.id)) { + opts.log.info(`${opts.logLabel}: entry ${entry.id} is already being recovered`); + continue; + } + + try { + const currentEntry = await loadPendingSessionDelivery(entry.id, opts.stateDir); + if (!currentEntry) { + continue; + } + const currentDecision = opts.selectEntry(currentEntry, Date.now()); + if (!currentDecision.match) { + continue; + } + if (currentEntry.retryCount >= MAX_SESSION_DELIVERY_RETRIES) { + try { + await moveSessionDeliveryToFailed(currentEntry.id, opts.stateDir); + } catch (err) { + if (getErrnoCode(err) !== "ENOENT") { + throw err; + } + } + opts.log.warn( + `${opts.logLabel}: entry ${currentEntry.id} exceeded max retries and was moved to failed/`, + ); + continue; + } + + if (!currentDecision.bypassBackoff) { + const retryEligibility = isSessionDeliveryEligibleForRetry(currentEntry, Date.now()); + if (!retryEligibility.eligible) { + opts.log.info( + `${opts.logLabel}: entry ${currentEntry.id} not ready for retry yet — backoff ${retryEligibility.remainingBackoffMs}ms remaining`, + ); + continue; + } + } + + await drainQueuedEntry({ + entry: currentEntry, + deliver: opts.deliver, + stateDir: opts.stateDir, + onFailed: (failedEntry, errMsg) => { + opts.log.warn(`${opts.logLabel}: retry failed for entry ${failedEntry.id}: ${errMsg}`); + }, + }); + } finally { + releaseRecoveryEntry(entry.id); + } + } + } finally { + drainInProgress.delete(opts.drainKey); + } +} + +export async function recoverPendingSessionDeliveries(opts: { + deliver: DeliverSessionDeliveryFn; + log: SessionDeliveryRecoveryLogger; + stateDir?: string; + maxRecoveryMs?: number; +}): Promise { + const pending = await loadPendingSessionDeliveries(opts.stateDir); + if (pending.length === 0) { + return createEmptyRecoverySummary(); + } + + pending.sort((a, b) => a.enqueuedAt - b.enqueuedAt); + const summary = createEmptyRecoverySummary(); + const deadline = Date.now() + (opts.maxRecoveryMs ?? 60_000); + + for (const entry of pending) { + if (Date.now() >= deadline) { + opts.log.warn("Session delivery recovery time budget exceeded — remaining entries deferred"); + break; + } + if (!claimRecoveryEntry(entry.id)) { + continue; + } + + try { + const currentEntry = await loadPendingSessionDelivery(entry.id, opts.stateDir); + if (!currentEntry) { + continue; + } + if (currentEntry.retryCount >= MAX_SESSION_DELIVERY_RETRIES) { + summary.skippedMaxRetries += 1; + await moveSessionDeliveryToFailed(currentEntry.id, opts.stateDir).catch(() => {}); + continue; + } + + const retryEligibility = isSessionDeliveryEligibleForRetry(currentEntry, Date.now()); + if (!retryEligibility.eligible) { + summary.deferredBackoff += 1; + continue; + } + + const result = await drainQueuedEntry({ + entry: currentEntry, + deliver: opts.deliver, + stateDir: opts.stateDir, + onRecovered: () => { + summary.recovered += 1; + }, + onFailed: (_failedEntry, errMsg) => { + summary.failed += 1; + opts.log.warn(`Session delivery retry failed: ${errMsg}`); + }, + }); + if (result === "recovered") { + opts.log.info(`Recovered session delivery ${currentEntry.id}`); + } + } finally { + releaseRecoveryEntry(entry.id); + } + } + + return summary; +} diff --git a/src/infra/session-delivery-queue-storage.ts b/src/infra/session-delivery-queue-storage.ts new file mode 100644 index 00000000000..59839625c23 --- /dev/null +++ b/src/infra/session-delivery-queue-storage.ts @@ -0,0 +1,238 @@ +import { createHash } from "node:crypto"; +import fs from "node:fs"; +import path from "node:path"; +import type { ChatType } from "../channels/chat-type.js"; +import { resolveStateDir } from "../config/paths.js"; +import { generateSecureUuid } from "./secure-random.js"; + +const QUEUE_DIRNAME = "session-delivery-queue"; +const FAILED_DIRNAME = "failed"; + +export type SessionDeliveryContext = { + channel?: string; + to?: string; + accountId?: string; + threadId?: string | number; +}; + +export type SessionDeliveryRoute = { + channel: string; + to: string; + accountId?: string; + replyToId?: string; + threadId?: string; + chatType: ChatType; +}; + +export type QueuedSessionDeliveryPayload = + | { + kind: "systemEvent"; + sessionKey: string; + text: string; + deliveryContext?: SessionDeliveryContext; + idempotencyKey?: string; + } + | { + kind: "agentTurn"; + sessionKey: string; + message: string; + messageId: string; + route?: SessionDeliveryRoute; + deliveryContext?: SessionDeliveryContext; + idempotencyKey?: string; + }; + +export type QueuedSessionDelivery = QueuedSessionDeliveryPayload & { + id: string; + enqueuedAt: number; + retryCount: number; + lastAttemptAt?: number; + lastError?: string; +}; + +function getErrnoCode(err: unknown): string | null { + return err && typeof err === "object" && "code" in err + ? String((err as { code?: unknown }).code) + : null; +} + +function buildEntryId(idempotencyKey?: string): string { + if (!idempotencyKey) { + return generateSecureUuid(); + } + return createHash("sha256").update(idempotencyKey).digest("hex"); +} + +async function unlinkBestEffort(filePath: string): Promise { + try { + await fs.promises.unlink(filePath); + } catch { + // Best-effort cleanup. + } +} + +async function writeQueueEntry(filePath: string, entry: QueuedSessionDelivery): Promise { + const tmp = `${filePath}.${process.pid}.tmp`; + await fs.promises.writeFile(tmp, JSON.stringify(entry, null, 2), { + encoding: "utf-8", + mode: 0o600, + }); + await fs.promises.rename(tmp, filePath); +} + +async function readQueueEntry(filePath: string): Promise { + return JSON.parse(await fs.promises.readFile(filePath, "utf-8")) as QueuedSessionDelivery; +} + +export function resolveSessionDeliveryQueueDir(stateDir?: string): string { + const base = stateDir ?? resolveStateDir(); + return path.join(base, QUEUE_DIRNAME); +} + +function resolveFailedDir(stateDir?: string): string { + return path.join(resolveSessionDeliveryQueueDir(stateDir), FAILED_DIRNAME); +} + +function resolveQueueEntryPaths( + id: string, + stateDir?: string, +): { + jsonPath: string; + deliveredPath: string; +} { + const queueDir = resolveSessionDeliveryQueueDir(stateDir); + return { + jsonPath: path.join(queueDir, `${id}.json`), + deliveredPath: path.join(queueDir, `${id}.delivered`), + }; +} + +export async function ensureSessionDeliveryQueueDir(stateDir?: string): Promise { + const queueDir = resolveSessionDeliveryQueueDir(stateDir); + await fs.promises.mkdir(queueDir, { recursive: true, mode: 0o700 }); + await fs.promises.mkdir(resolveFailedDir(stateDir), { recursive: true, mode: 0o700 }); + return queueDir; +} + +export async function enqueueSessionDelivery( + params: QueuedSessionDeliveryPayload, + stateDir?: string, +): Promise { + const queueDir = await ensureSessionDeliveryQueueDir(stateDir); + const id = buildEntryId(params.idempotencyKey); + const filePath = path.join(queueDir, `${id}.json`); + + if (params.idempotencyKey) { + try { + const stat = await fs.promises.stat(filePath); + if (stat.isFile()) { + return id; + } + } catch (err) { + if (getErrnoCode(err) !== "ENOENT") { + throw err; + } + } + } + + await writeQueueEntry(filePath, { + ...params, + id, + enqueuedAt: Date.now(), + retryCount: 0, + }); + return id; +} + +export async function ackSessionDelivery(id: string, stateDir?: string): Promise { + const { jsonPath, deliveredPath } = resolveQueueEntryPaths(id, stateDir); + try { + await fs.promises.rename(jsonPath, deliveredPath); + } catch (err) { + const code = getErrnoCode(err); + if (code === "ENOENT") { + await unlinkBestEffort(deliveredPath); + return; + } + throw err; + } + await unlinkBestEffort(deliveredPath); +} + +export async function failSessionDelivery( + id: string, + error: string, + stateDir?: string, +): Promise { + const filePath = path.join(resolveSessionDeliveryQueueDir(stateDir), `${id}.json`); + const entry = await readQueueEntry(filePath); + entry.retryCount += 1; + entry.lastAttemptAt = Date.now(); + entry.lastError = error; + await writeQueueEntry(filePath, entry); +} + +export async function loadPendingSessionDelivery( + id: string, + stateDir?: string, +): Promise { + const { jsonPath } = resolveQueueEntryPaths(id, stateDir); + try { + const stat = await fs.promises.stat(jsonPath); + if (!stat.isFile()) { + return null; + } + return await readQueueEntry(jsonPath); + } catch (err) { + if (getErrnoCode(err) === "ENOENT") { + return null; + } + throw err; + } +} + +export async function loadPendingSessionDeliveries( + stateDir?: string, +): Promise { + const queueDir = resolveSessionDeliveryQueueDir(stateDir); + let files: string[]; + try { + files = await fs.promises.readdir(queueDir); + } catch (err) { + if (getErrnoCode(err) === "ENOENT") { + return []; + } + throw err; + } + + for (const file of files) { + if (file.endsWith(".delivered")) { + await unlinkBestEffort(path.join(queueDir, file)); + } + } + + const entries: QueuedSessionDelivery[] = []; + for (const file of files) { + if (!file.endsWith(".json")) { + continue; + } + const filePath = path.join(queueDir, file); + try { + const stat = await fs.promises.stat(filePath); + if (!stat.isFile()) { + continue; + } + entries.push(await readQueueEntry(filePath)); + } catch { + // Skip malformed or inaccessible entries. + } + } + return entries; +} + +export async function moveSessionDeliveryToFailed(id: string, stateDir?: string): Promise { + const queueDir = resolveSessionDeliveryQueueDir(stateDir); + const failedDir = resolveFailedDir(stateDir); + await fs.promises.mkdir(failedDir, { recursive: true, mode: 0o700 }); + await fs.promises.rename(path.join(queueDir, `${id}.json`), path.join(failedDir, `${id}.json`)); +} diff --git a/src/infra/session-delivery-queue.recovery.test.ts b/src/infra/session-delivery-queue.recovery.test.ts new file mode 100644 index 00000000000..8aef5194f55 --- /dev/null +++ b/src/infra/session-delivery-queue.recovery.test.ts @@ -0,0 +1,68 @@ +import { describe, expect, it, vi } from "vitest"; +import { withTempDir } from "../test-helpers/temp-dir.js"; +import { + enqueueSessionDelivery, + loadPendingSessionDeliveries, + recoverPendingSessionDeliveries, +} from "./session-delivery-queue.js"; + +describe("session-delivery queue recovery", () => { + it("replays and acks pending entries on recovery", async () => { + await withTempDir({ prefix: "openclaw-session-delivery-" }, async (tempDir) => { + await enqueueSessionDelivery( + { + kind: "systemEvent", + sessionKey: "agent:main:main", + text: "restart complete", + }, + tempDir, + ); + + const deliver = vi.fn(async () => undefined); + const summary = await recoverPendingSessionDeliveries({ + deliver, + stateDir: tempDir, + log: { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }, + }); + + expect(deliver).toHaveBeenCalledTimes(1); + expect(summary.recovered).toBe(1); + expect(await loadPendingSessionDeliveries(tempDir)).toEqual([]); + }); + }); + + it("keeps failed entries queued with retry metadata for later recovery", async () => { + await withTempDir({ prefix: "openclaw-session-delivery-" }, async (tempDir) => { + await enqueueSessionDelivery( + { + kind: "agentTurn", + sessionKey: "agent:main:main", + message: "continue", + messageId: "restart-sentinel:agent:main:main:agentTurn:123", + }, + tempDir, + ); + + const summary = await recoverPendingSessionDeliveries({ + deliver: vi.fn(async () => { + throw new Error("transient failure"); + }), + stateDir: tempDir, + log: { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }, + }); + + const [failedEntry] = await loadPendingSessionDeliveries(tempDir); + expect(summary.failed).toBe(1); + expect(failedEntry?.retryCount).toBe(1); + expect(failedEntry?.lastError).toBe("transient failure"); + }); + }); +}); diff --git a/src/infra/session-delivery-queue.storage.test.ts b/src/infra/session-delivery-queue.storage.test.ts new file mode 100644 index 00000000000..59ecbf3a6bd --- /dev/null +++ b/src/infra/session-delivery-queue.storage.test.ts @@ -0,0 +1,59 @@ +import { describe, expect, it } from "vitest"; +import { withTempDir } from "../test-helpers/temp-dir.js"; +import { + ackSessionDelivery, + enqueueSessionDelivery, + failSessionDelivery, + loadPendingSessionDeliveries, +} from "./session-delivery-queue.js"; + +describe("session-delivery queue storage", () => { + it("dedupes entries when an idempotency key is reused", async () => { + await withTempDir({ prefix: "openclaw-session-delivery-" }, async (tempDir) => { + const firstId = await enqueueSessionDelivery( + { + kind: "agentTurn", + sessionKey: "agent:main:main", + message: "continue after restart", + messageId: "restart-sentinel:agent:main:main:agentTurn:123", + idempotencyKey: "restart-sentinel:agent:main:main:agentTurn:123", + }, + tempDir, + ); + const secondId = await enqueueSessionDelivery( + { + kind: "agentTurn", + sessionKey: "agent:main:main", + message: "continue after restart", + messageId: "restart-sentinel:agent:main:main:agentTurn:123", + idempotencyKey: "restart-sentinel:agent:main:main:agentTurn:123", + }, + tempDir, + ); + + expect(secondId).toBe(firstId); + expect(await loadPendingSessionDeliveries(tempDir)).toHaveLength(1); + }); + }); + + it("persists retry metadata and removes acked entries", async () => { + await withTempDir({ prefix: "openclaw-session-delivery-" }, async (tempDir) => { + const id = await enqueueSessionDelivery( + { + kind: "systemEvent", + sessionKey: "agent:main:main", + text: "restart complete", + }, + tempDir, + ); + + await failSessionDelivery(id, "dispatch failed", tempDir); + const [failedEntry] = await loadPendingSessionDeliveries(tempDir); + expect(failedEntry?.retryCount).toBe(1); + expect(failedEntry?.lastError).toBe("dispatch failed"); + + await ackSessionDelivery(id, tempDir); + expect(await loadPendingSessionDeliveries(tempDir)).toEqual([]); + }); + }); +}); diff --git a/src/infra/session-delivery-queue.ts b/src/infra/session-delivery-queue.ts new file mode 100644 index 00000000000..784857798ef --- /dev/null +++ b/src/infra/session-delivery-queue.ts @@ -0,0 +1,29 @@ +export { + ackSessionDelivery, + enqueueSessionDelivery, + ensureSessionDeliveryQueueDir, + failSessionDelivery, + loadPendingSessionDelivery, + loadPendingSessionDeliveries, + moveSessionDeliveryToFailed, + resolveSessionDeliveryQueueDir, +} from "./session-delivery-queue-storage.js"; +export type { + QueuedSessionDelivery, + QueuedSessionDeliveryPayload, + SessionDeliveryContext, + SessionDeliveryRoute, +} from "./session-delivery-queue-storage.js"; +export { + computeSessionDeliveryBackoffMs, + drainPendingSessionDeliveries, + isSessionDeliveryEligibleForRetry, + MAX_SESSION_DELIVERY_RETRIES, + recoverPendingSessionDeliveries, +} from "./session-delivery-queue-recovery.js"; +export type { + DeliverSessionDeliveryFn, + PendingSessionDeliveryDrainDecision, + SessionDeliveryRecoveryLogger, + SessionDeliveryRecoverySummary, +} from "./session-delivery-queue-recovery.js";