fix(gateway): tighten session delivery recovery

This commit is contained in:
FullerStackDev
2026-04-23 16:23:31 -06:00
committed by Ayaan Zaidi
parent 2261550633
commit 03addfe9ba
4 changed files with 70 additions and 2 deletions

View File

@@ -72,7 +72,7 @@ export function isSessionDeliveryEligibleForRetry(
entry: QueuedSessionDelivery,
now: number,
): { eligible: true } | { eligible: false; remainingBackoffMs: number } {
const backoff = computeSessionDeliveryBackoffMs(entry.retryCount + 1);
const backoff = computeSessionDeliveryBackoffMs(entry.retryCount);
if (backoff <= 0) {
return { eligible: true };
}

View File

@@ -7,6 +7,7 @@ import { generateSecureUuid } from "./secure-random.js";
const QUEUE_DIRNAME = "session-delivery-queue";
const FAILED_DIRNAME = "failed";
const TMP_SWEEP_MAX_AGE_MS = 5_000;
export type SessionDeliveryContext = {
channel?: string;
@@ -71,6 +72,23 @@ async function unlinkBestEffort(filePath: string): Promise<void> {
}
}
async function unlinkStaleTmpBestEffort(filePath: string, now: number): Promise<void> {
try {
const stat = await fs.promises.stat(filePath);
if (!stat.isFile()) {
return;
}
if (now - stat.mtimeMs < TMP_SWEEP_MAX_AGE_MS) {
return;
}
await unlinkBestEffort(filePath);
} catch (err) {
if (getErrnoCode(err) !== "ENOENT") {
throw err;
}
}
}
async function writeQueueEntry(filePath: string, entry: QueuedSessionDelivery): Promise<void> {
const tmp = `${filePath}.${process.pid}.tmp`;
await fs.promises.writeFile(tmp, JSON.stringify(entry, null, 2), {
@@ -205,9 +223,12 @@ export async function loadPendingSessionDeliveries(
throw err;
}
const now = Date.now();
for (const file of files) {
if (file.endsWith(".delivered") || file.endsWith(".tmp")) {
if (file.endsWith(".delivered")) {
await unlinkBestEffort(path.join(queueDir, file));
} else if (file.endsWith(".tmp")) {
await unlinkStaleTmpBestEffort(path.join(queueDir, file), now);
}
}

View File

@@ -2,6 +2,8 @@ import { describe, expect, it, vi } from "vitest";
import { withTempDir } from "../test-helpers/temp-dir.js";
import {
enqueueSessionDelivery,
failSessionDelivery,
isSessionDeliveryEligibleForRetry,
loadPendingSessionDeliveries,
recoverPendingSessionDeliveries,
} from "./session-delivery-queue.js";
@@ -115,4 +117,35 @@ describe("session-delivery queue recovery", () => {
vi.useRealTimers();
});
it("uses the persisted retryCount for the first backoff tier", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-04-23T00:00:00.000Z"));
await withTempDir({ prefix: "openclaw-session-delivery-" }, async (tempDir) => {
const id = await enqueueSessionDelivery(
{
kind: "systemEvent",
sessionKey: "agent:main:main",
text: "retry me",
},
tempDir,
);
await failSessionDelivery(id, "transient failure", tempDir);
const [failedEntry] = await loadPendingSessionDeliveries(tempDir);
expect(failedEntry).toBeDefined();
expect(failedEntry?.retryCount).toBe(1);
expect(failedEntry?.lastAttemptAt).toBeDefined();
const lastAttemptAt = failedEntry?.lastAttemptAt ?? 0;
const notReady = isSessionDeliveryEligibleForRetry(failedEntry, lastAttemptAt + 4_999);
expect(notReady).toEqual({ eligible: false, remainingBackoffMs: 1 });
const ready = isSessionDeliveryEligibleForRetry(failedEntry, lastAttemptAt + 5_000);
expect(ready).toEqual({ eligible: true });
});
vi.useRealTimers();
});
});

View File

@@ -72,10 +72,24 @@ describe("session-delivery queue storage", () => {
);
const tmpPath = path.join(resolveSessionDeliveryQueueDir(tempDir), "orphan-entry.tmp");
fs.writeFileSync(tmpPath, "stale tmp");
const staleAt = new Date(Date.now() - 60_000);
fs.utimesSync(tmpPath, staleAt, staleAt);
await loadPendingSessionDeliveries(tempDir);
expect(fs.existsSync(tmpPath)).toBe(false);
});
});
it("keeps fresh temporary queue files while a write may still be in flight", async () => {
await withTempDir({ prefix: "openclaw-session-delivery-" }, async (tempDir) => {
const tmpPath = path.join(resolveSessionDeliveryQueueDir(tempDir), "active-entry.tmp");
fs.mkdirSync(path.dirname(tmpPath), { recursive: true });
fs.writeFileSync(tmpPath, "active tmp");
await loadPendingSessionDeliveries(tempDir);
expect(fs.existsSync(tmpPath)).toBe(true);
});
});
});