diff --git a/src/infra/session-delivery-queue-recovery.ts b/src/infra/session-delivery-queue-recovery.ts index b7e6c52d5d5..f37a6a2fc35 100644 --- a/src/infra/session-delivery-queue-recovery.ts +++ b/src/infra/session-delivery-queue-recovery.ts @@ -1,3 +1,8 @@ +import { + resolveDateTimestampMs, + resolveExpiresAtMsFromDurationMs, + resolveNonNegativeIntegerOption, +} from "../shared/number-coercion.js"; import { formatErrorMessage } from "./errors.js"; import { ackSessionDelivery, @@ -72,6 +77,14 @@ function resolveSessionDeliveryMaxRetries(entry: QueuedSessionDelivery): number return entry.maxRetries ?? MAX_SESSION_DELIVERY_RETRIES; } +function resolveSessionDeliveryRecoveryDeadlineMs(maxRecoveryMs: number | undefined): number { + const durationMs = resolveNonNegativeIntegerOption(maxRecoveryMs, 60_000); + if (durationMs <= 0) { + return resolveDateTimestampMs(Date.now()); + } + return resolveExpiresAtMsFromDurationMs(durationMs) ?? resolveDateTimestampMs(Date.now()); +} + export function isSessionDeliveryEligibleForRetry( entry: QueuedSessionDelivery, now: number, @@ -214,7 +227,7 @@ export async function recoverPendingSessionDeliveries(opts: { pending.sort((a, b) => a.enqueuedAt - b.enqueuedAt); const summary = createEmptyRecoverySummary(); - const deadline = Date.now() + (opts.maxRecoveryMs ?? 60_000); + const deadline = resolveSessionDeliveryRecoveryDeadlineMs(opts.maxRecoveryMs); for (const entry of pending) { if (Date.now() >= deadline) { diff --git a/src/infra/session-delivery-queue.recovery.test.ts b/src/infra/session-delivery-queue.recovery.test.ts index 43ea17c320b..d48742c8903 100644 --- a/src/infra/session-delivery-queue.recovery.test.ts +++ b/src/infra/session-delivery-queue.recovery.test.ts @@ -1,4 +1,5 @@ import { describe, expect, it, vi } from "vitest"; +import { MAX_DATE_TIMESTAMP_MS } from "../shared/number-coercion.js"; import { withTempDir } from "../test-helpers/temp-dir.js"; import { drainPendingSessionDeliveries, @@ -38,6 +39,44 @@ describe("session-delivery queue recovery", () => { }); }); + it("defers recovery when the recovery budget would exceed the date range", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date(MAX_DATE_TIMESTAMP_MS)); + + await withTempDir({ prefix: "openclaw-session-delivery-" }, async (tempDir) => { + await enqueueSessionDelivery( + { + kind: "systemEvent", + sessionKey: "agent:main:main", + text: "leave queued", + }, + tempDir, + ); + + const deliver = vi.fn(async () => undefined); + const warn = vi.fn(); + const summary = await recoverPendingSessionDeliveries({ + deliver, + stateDir: tempDir, + maxRecoveryMs: 1, + log: { + info: vi.fn(), + warn, + error: vi.fn(), + }, + }); + + expect(deliver).not.toHaveBeenCalled(); + expect(warn).toHaveBeenCalledWith( + "Session delivery recovery time budget exceeded — remaining entries deferred", + ); + expect(summary.recovered).toBe(0); + expect(await loadPendingSessionDeliveries(tempDir)).toHaveLength(1); + }); + + vi.useRealTimers(); + }); + it("keeps failed entries queued with retry metadata for later recovery", async () => { await withTempDir({ prefix: "openclaw-session-delivery-" }, async (tempDir) => { await enqueueSessionDelivery(