diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a4cbf8ed2b..ce05d981340 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Delivery queue/recovery backoff: prevent retry starvation by persisting `lastAttemptAt` on failed sends and deferring recovery retries until each entry's `lastAttemptAt + backoff` window is eligible, while continuing to recover ready entries behind deferred ones. Landed from contributor PR #27710 by @Jimmy-xuzimo. Thanks @Jimmy-xuzimo. - Microsoft Teams/File uploads: acknowledge `fileConsent/invoke` immediately (`invokeResponse` before upload + file card send) so Teams no longer shows false "Something went wrong" timeout banners while upload completion continues asynchronously; includes updated async regression coverage. Landed from contributor PR #27641 by @scz2011. - Queue/Drain/Cron reliability: harden lane draining with guaranteed `draining` flag reset on synchronous pump failures, reject new queue enqueues during gateway restart drain windows (instead of silently killing accepted tasks), add `/stop` queued-backlog cutoff metadata with stale-message skipping (while avoiding cross-session native-stop cutoff bleed), and raise isolated cron `agentTurn` outer safety timeout to avoid false 10-minute timeout races against longer agent session timeouts. (#27407, #27332, #27427) - Typing/Main reply pipeline: always mark dispatch idle in `agent-runner` finalization so typing cleanup runs even when dispatcher `onIdle` does not fire, preventing stuck typing indicators after run completion. (#27250) Thanks @Sid-Qin. diff --git a/src/infra/outbound/delivery-queue.ts b/src/infra/outbound/delivery-queue.ts index bd195d94a46..4928b0ff62f 100644 --- a/src/infra/outbound/delivery-queue.ts +++ b/src/infra/outbound/delivery-queue.ts @@ -47,6 +47,7 @@ export interface QueuedDelivery extends QueuedDeliveryPayload { id: string; enqueuedAt: number; retryCount: number; + lastAttemptAt?: number; lastError?: string; } @@ -122,6 +123,7 @@ export async function failDelivery(id: string, error: string, stateDir?: string) const raw = await fs.promises.readFile(filePath, "utf-8"); const entry: QueuedDelivery = JSON.parse(raw); entry.retryCount += 1; + entry.lastAttemptAt = Date.now(); entry.lastError = error; const tmp = `${filePath}.${process.pid}.tmp`; await fs.promises.writeFile(tmp, JSON.stringify(entry, null, 2), { @@ -208,8 +210,6 @@ export async function recoverPendingDeliveries(opts: { log: RecoveryLogger; cfg: OpenClawConfig; stateDir?: string; - /** Override for testing — resolves instead of using real setTimeout. */ - delay?: (ms: number) => Promise; /** Maximum wall-clock time for recovery in ms. Remaining entries are deferred to next restart. Default: 60 000. */ maxRecoveryMs?: number; }): Promise<{ recovered: number; failed: number; skipped: number }> { @@ -223,12 +223,12 @@ export async function recoverPendingDeliveries(opts: { opts.log.info(`Found ${pending.length} pending delivery entries — starting recovery`); - const delayFn = opts.delay ?? ((ms: number) => new Promise((r) => setTimeout(r, ms))); const deadline = Date.now() + (opts.maxRecoveryMs ?? 60_000); let recovered = 0; let failed = 0; let skipped = 0; + let deferred = 0; for (const entry of pending) { const now = Date.now(); @@ -252,15 +252,18 @@ export async function recoverPendingDeliveries(opts: { const backoff = computeBackoffMs(entry.retryCount + 1); if (backoff > 0) { - if (now + backoff >= deadline) { - opts.log.info( - `Backoff ${backoff}ms exceeds budget for ${entry.id} — skipping to next entry`, - ); - skipped += 1; - continue; + const firstReplayAfterCrash = entry.retryCount === 0 && entry.lastAttemptAt === undefined; + if (!firstReplayAfterCrash) { + const baseAttemptAt = entry.lastAttemptAt ?? entry.enqueuedAt; + const nextEligibleAt = baseAttemptAt + backoff; + if (now < nextEligibleAt) { + deferred += 1; + opts.log.info( + `Delivery ${entry.id} not ready for retry yet — backoff ${nextEligibleAt - now}ms remaining`, + ); + continue; + } } - opts.log.info(`Waiting ${backoff}ms before retrying delivery ${entry.id}`); - await delayFn(backoff); } try { @@ -304,7 +307,7 @@ export async function recoverPendingDeliveries(opts: { } opts.log.info( - `Delivery recovery complete: ${recovered} recovered, ${failed} failed, ${skipped} skipped (max retries)`, + `Delivery recovery complete: ${recovered} recovered, ${failed} failed, ${skipped} skipped (max retries), ${deferred} deferred (backoff)`, ); return { recovered, failed, skipped }; } diff --git a/src/infra/outbound/outbound.test.ts b/src/infra/outbound/outbound.test.ts index 7793f070748..bce1c246147 100644 --- a/src/infra/outbound/outbound.test.ts +++ b/src/infra/outbound/outbound.test.ts @@ -104,7 +104,7 @@ describe("delivery-queue", () => { }); describe("failDelivery", () => { - it("increments retryCount and sets lastError", async () => { + it("increments retryCount, records attempt time, and sets lastError", async () => { const id = await enqueueDelivery( { channel: "telegram", @@ -119,6 +119,8 @@ describe("delivery-queue", () => { const queueDir = path.join(tmpDir, "delivery-queue"); const entry = JSON.parse(fs.readFileSync(path.join(queueDir, `${id}.json`), "utf-8")); expect(entry.retryCount).toBe(1); + expect(typeof entry.lastAttemptAt).toBe("number"); + expect(entry.lastAttemptAt).toBeGreaterThan(0); expect(entry.lastError).toBe("connection refused"); }); }); @@ -204,28 +206,36 @@ describe("delivery-queue", () => { }); describe("recoverPendingDeliveries", () => { - const noopDelay = async () => {}; const baseCfg = {}; const createLog = () => ({ info: vi.fn(), warn: vi.fn(), error: vi.fn() }); const enqueueCrashRecoveryEntries = async () => { await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir); await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir); }; - const setEntryRetryCount = (id: string, retryCount: number) => { + const setEntryState = ( + id: string, + state: { retryCount: number; lastAttemptAt?: number; enqueuedAt?: number }, + ) => { const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`); const entry = JSON.parse(fs.readFileSync(filePath, "utf-8")); - entry.retryCount = retryCount; + entry.retryCount = state.retryCount; + if (state.lastAttemptAt === undefined) { + delete entry.lastAttemptAt; + } else { + entry.lastAttemptAt = state.lastAttemptAt; + } + if (state.enqueuedAt !== undefined) { + entry.enqueuedAt = state.enqueuedAt; + } fs.writeFileSync(filePath, JSON.stringify(entry), "utf-8"); }; const runRecovery = async ({ deliver, log = createLog(), - delay = noopDelay, maxRecoveryMs, }: { deliver: ReturnType; log?: ReturnType; - delay?: (ms: number) => Promise; maxRecoveryMs?: number; }) => { const result = await recoverPendingDeliveries({ @@ -233,7 +243,6 @@ describe("delivery-queue", () => { log, cfg: baseCfg, stateDir: tmpDir, - delay, ...(maxRecoveryMs === undefined ? {} : { maxRecoveryMs }), }); return { result, log }; @@ -261,7 +270,7 @@ describe("delivery-queue", () => { { channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir, ); - setEntryRetryCount(id, MAX_RETRIES); + setEntryState(id, { retryCount: MAX_RETRIES }); const deliver = vi.fn(); const { result } = await runRecovery({ deliver }); @@ -377,29 +386,82 @@ describe("delivery-queue", () => { expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("deferred to next restart")); }); - it("defers entries when backoff exceeds the recovery budget", async () => { + it("defers entries until backoff becomes eligible", async () => { const id = await enqueueDelivery( { channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir, ); - setEntryRetryCount(id, 3); + setEntryState(id, { retryCount: 3, lastAttemptAt: Date.now() }); const deliver = vi.fn().mockResolvedValue([]); - const delay = vi.fn(async () => {}); const { result, log } = await runRecovery({ deliver, - delay, - maxRecoveryMs: 1000, + maxRecoveryMs: 60_000, }); expect(deliver).not.toHaveBeenCalled(); - expect(delay).not.toHaveBeenCalled(); - expect(result).toEqual({ recovered: 0, failed: 0, skipped: 1 }); + expect(result).toEqual({ recovered: 0, failed: 0, skipped: 0 }); const remaining = await loadPendingDeliveries(tmpDir); expect(remaining).toHaveLength(1); - expect(log.info).toHaveBeenCalledWith(expect.stringContaining("Backoff")); + expect(log.info).toHaveBeenCalledWith(expect.stringContaining("not ready for retry yet")); + }); + + it("continues past high-backoff entries and recovers ready entries behind them", async () => { + const now = Date.now(); + const blockedId = await enqueueDelivery( + { channel: "whatsapp", to: "+1", payloads: [{ text: "blocked" }] }, + tmpDir, + ); + const readyId = await enqueueDelivery( + { channel: "telegram", to: "2", payloads: [{ text: "ready" }] }, + tmpDir, + ); + + setEntryState(blockedId, { retryCount: 3, lastAttemptAt: now, enqueuedAt: now - 30_000 }); + setEntryState(readyId, { retryCount: 0, enqueuedAt: now - 10_000 }); + + const deliver = vi.fn().mockResolvedValue([]); + const { result } = await runRecovery({ deliver, maxRecoveryMs: 60_000 }); + + expect(result).toEqual({ recovered: 1, failed: 0, skipped: 0 }); + expect(deliver).toHaveBeenCalledTimes(1); + expect(deliver).toHaveBeenCalledWith( + expect.objectContaining({ channel: "telegram", to: "2", skipQueue: true }), + ); + + const remaining = await loadPendingDeliveries(tmpDir); + expect(remaining).toHaveLength(1); + expect(remaining[0]?.id).toBe(blockedId); + }); + + it("recovers deferred entries on a later restart once backoff elapsed", async () => { + vi.useFakeTimers(); + const start = new Date("2026-01-01T00:00:00.000Z"); + vi.setSystemTime(start); + + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1", payloads: [{ text: "later" }] }, + tmpDir, + ); + setEntryState(id, { retryCount: 3, lastAttemptAt: start.getTime() }); + + const firstDeliver = vi.fn().mockResolvedValue([]); + const firstRun = await runRecovery({ deliver: firstDeliver, maxRecoveryMs: 60_000 }); + expect(firstRun.result).toEqual({ recovered: 0, failed: 0, skipped: 0 }); + expect(firstDeliver).not.toHaveBeenCalled(); + + vi.setSystemTime(new Date(start.getTime() + 600_000 + 1)); + const secondDeliver = vi.fn().mockResolvedValue([]); + const secondRun = await runRecovery({ deliver: secondDeliver, maxRecoveryMs: 60_000 }); + expect(secondRun.result).toEqual({ recovered: 1, failed: 0, skipped: 0 }); + expect(secondDeliver).toHaveBeenCalledTimes(1); + + const remaining = await loadPendingDeliveries(tmpDir); + expect(remaining).toHaveLength(0); + + vi.useRealTimers(); }); it("returns zeros when queue is empty", async () => {