From 03addfe9ba59f368df91b48efaaabf1f46271103 Mon Sep 17 00:00:00 2001 From: FullerStackDev <263060202+fuller-stack-dev@users.noreply.github.com> Date: Thu, 23 Apr 2026 16:23:31 -0600 Subject: [PATCH] fix(gateway): tighten session delivery recovery --- src/infra/session-delivery-queue-recovery.ts | 2 +- src/infra/session-delivery-queue-storage.ts | 23 ++++++++++++- .../session-delivery-queue.recovery.test.ts | 33 +++++++++++++++++++ .../session-delivery-queue.storage.test.ts | 14 ++++++++ 4 files changed, 70 insertions(+), 2 deletions(-) diff --git a/src/infra/session-delivery-queue-recovery.ts b/src/infra/session-delivery-queue-recovery.ts index 86902617858..5ac3121268a 100644 --- a/src/infra/session-delivery-queue-recovery.ts +++ b/src/infra/session-delivery-queue-recovery.ts @@ -72,7 +72,7 @@ export function isSessionDeliveryEligibleForRetry( entry: QueuedSessionDelivery, now: number, ): { eligible: true } | { eligible: false; remainingBackoffMs: number } { - const backoff = computeSessionDeliveryBackoffMs(entry.retryCount + 1); + const backoff = computeSessionDeliveryBackoffMs(entry.retryCount); if (backoff <= 0) { return { eligible: true }; } diff --git a/src/infra/session-delivery-queue-storage.ts b/src/infra/session-delivery-queue-storage.ts index 09760c7d8fc..ba4b928f75a 100644 --- a/src/infra/session-delivery-queue-storage.ts +++ b/src/infra/session-delivery-queue-storage.ts @@ -7,6 +7,7 @@ import { generateSecureUuid } from "./secure-random.js"; const QUEUE_DIRNAME = "session-delivery-queue"; const FAILED_DIRNAME = "failed"; +const TMP_SWEEP_MAX_AGE_MS = 5_000; export type SessionDeliveryContext = { channel?: string; @@ -71,6 +72,23 @@ async function unlinkBestEffort(filePath: string): Promise { } } +async function unlinkStaleTmpBestEffort(filePath: string, now: number): Promise { + try { + const stat = await fs.promises.stat(filePath); + if (!stat.isFile()) { + return; + } + if (now - stat.mtimeMs < TMP_SWEEP_MAX_AGE_MS) { + return; + } + await unlinkBestEffort(filePath); + } catch (err) { + if (getErrnoCode(err) !== "ENOENT") { + throw err; + } + } +} + async function writeQueueEntry(filePath: string, entry: QueuedSessionDelivery): Promise { const tmp = `${filePath}.${process.pid}.tmp`; await fs.promises.writeFile(tmp, JSON.stringify(entry, null, 2), { @@ -205,9 +223,12 @@ export async function loadPendingSessionDeliveries( throw err; } + const now = Date.now(); for (const file of files) { - if (file.endsWith(".delivered") || file.endsWith(".tmp")) { + if (file.endsWith(".delivered")) { await unlinkBestEffort(path.join(queueDir, file)); + } else if (file.endsWith(".tmp")) { + await unlinkStaleTmpBestEffort(path.join(queueDir, file), now); } } diff --git a/src/infra/session-delivery-queue.recovery.test.ts b/src/infra/session-delivery-queue.recovery.test.ts index 6d9662298a6..1f62dcf0e19 100644 --- a/src/infra/session-delivery-queue.recovery.test.ts +++ b/src/infra/session-delivery-queue.recovery.test.ts @@ -2,6 +2,8 @@ import { describe, expect, it, vi } from "vitest"; import { withTempDir } from "../test-helpers/temp-dir.js"; import { enqueueSessionDelivery, + failSessionDelivery, + isSessionDeliveryEligibleForRetry, loadPendingSessionDeliveries, recoverPendingSessionDeliveries, } from "./session-delivery-queue.js"; @@ -115,4 +117,35 @@ describe("session-delivery queue recovery", () => { vi.useRealTimers(); }); + + it("uses the persisted retryCount for the first backoff tier", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-04-23T00:00:00.000Z")); + + await withTempDir({ prefix: "openclaw-session-delivery-" }, async (tempDir) => { + const id = await enqueueSessionDelivery( + { + kind: "systemEvent", + sessionKey: "agent:main:main", + text: "retry me", + }, + tempDir, + ); + await failSessionDelivery(id, "transient failure", tempDir); + + const [failedEntry] = await loadPendingSessionDeliveries(tempDir); + expect(failedEntry).toBeDefined(); + expect(failedEntry?.retryCount).toBe(1); + expect(failedEntry?.lastAttemptAt).toBeDefined(); + + const lastAttemptAt = failedEntry?.lastAttemptAt ?? 0; + const notReady = isSessionDeliveryEligibleForRetry(failedEntry, lastAttemptAt + 4_999); + expect(notReady).toEqual({ eligible: false, remainingBackoffMs: 1 }); + + const ready = isSessionDeliveryEligibleForRetry(failedEntry, lastAttemptAt + 5_000); + expect(ready).toEqual({ eligible: true }); + }); + + vi.useRealTimers(); + }); }); diff --git a/src/infra/session-delivery-queue.storage.test.ts b/src/infra/session-delivery-queue.storage.test.ts index 6bfdafa256a..38422e28ed9 100644 --- a/src/infra/session-delivery-queue.storage.test.ts +++ b/src/infra/session-delivery-queue.storage.test.ts @@ -72,10 +72,24 @@ describe("session-delivery queue storage", () => { ); const tmpPath = path.join(resolveSessionDeliveryQueueDir(tempDir), "orphan-entry.tmp"); fs.writeFileSync(tmpPath, "stale tmp"); + const staleAt = new Date(Date.now() - 60_000); + fs.utimesSync(tmpPath, staleAt, staleAt); await loadPendingSessionDeliveries(tempDir); expect(fs.existsSync(tmpPath)).toBe(false); }); }); + + it("keeps fresh temporary queue files while a write may still be in flight", async () => { + await withTempDir({ prefix: "openclaw-session-delivery-" }, async (tempDir) => { + const tmpPath = path.join(resolveSessionDeliveryQueueDir(tempDir), "active-entry.tmp"); + fs.mkdirSync(path.dirname(tmpPath), { recursive: true }); + fs.writeFileSync(tmpPath, "active tmp"); + + await loadPendingSessionDeliveries(tempDir); + + expect(fs.existsSync(tmpPath)).toBe(true); + }); + }); });