From db255b1154c11fe8c3c19c62e27e0ff726a07edb Mon Sep 17 00:00:00 2001 From: Josh Avant <830519+joshavant@users.noreply.github.com> Date: Fri, 26 Jun 2026 01:30:57 -0500 Subject: [PATCH] Fix Telegram spooled claim refresh (#96962) --- .../telegram/src/polling-session.test.ts | 230 ++++++++++++++++++ extensions/telegram/src/polling-session.ts | 86 +++++++ .../src/telegram-ingress-spool.test.ts | 27 ++ .../telegram/src/telegram-ingress-spool.ts | 17 ++ src/channels/message/ingress-queue.test.ts | 99 ++++++++ src/channels/message/ingress-queue.ts | 108 ++++++-- 6 files changed, 547 insertions(+), 20 deletions(-) diff --git a/extensions/telegram/src/polling-session.test.ts b/extensions/telegram/src/polling-session.test.ts index e5cb1d8148d..944768a2a18 100644 --- a/extensions/telegram/src/polling-session.test.ts +++ b/extensions/telegram/src/polling-session.test.ts @@ -486,6 +486,49 @@ async function pendingUpdateIds(spoolDir: string, limit: number | "all" = 100): return (await listTelegramSpooledUpdates({ spoolDir, limit })).map((update) => update.updateId); } +async function claimedAtForUpdate(spoolDir: string, updateId: number): Promise { + const claim = (await listTelegramSpooledUpdateClaims({ spoolDir })).find( + (entry) => entry.updateId === updateId, + ); + if (!claim?.claim) { + throw new Error(`Expected claimed spooled update ${updateId}`); + } + return claim.claim.claimedAt; +} + +function installSpooledClaimRefreshHarness(): { + restore: () => void; + triggerRefresh: () => void; +} { + let refresh: (() => void) | undefined; + const realSetInterval = globalThis.setInterval.bind(globalThis); + const setIntervalSpy = vi.spyOn(globalThis, "setInterval").mockImplementation((( + handler: Parameters[0], + timeout?: number, + ) => { + if (timeout === pollingSessionTesting.spooledClaimRefreshIntervalMs) { + refresh = () => { + if (typeof handler === "function") { + handler(); + } + }; + const timer = realSetInterval(() => undefined, 2_147_483_647); + timer.unref?.(); + return timer; + } + return realSetInterval(handler, timeout); + }) as typeof setInterval); + return { + restore: () => setIntervalSpy.mockRestore(), + triggerRefresh: () => { + if (!refresh) { + throw new Error("Expected spooled claim refresh interval to be registered"); + } + refresh(); + }, + }; +} + function normalizeTelegramTestAccountId(spoolDir: string): string { const trimmed = path.basename(spoolDir).trim(); return trimmed ? trimmed.replace(/[^a-z0-9._-]+/gi, "_") : "default"; @@ -1575,6 +1618,49 @@ describe("TelegramPollingSession", () => { }); }); + it("refreshes active spooled claims while the handler is still running", async () => { + const refreshHarness = installSpooledClaimRefreshHarness(); + await withTempSpool(async (tempDir) => { + const abort = new AbortController(); + const events: string[] = []; + let releaseHandler: (() => void) | undefined; + const handlerDone = new Promise((resolve) => { + releaseHandler = resolve; + }); + await writeSpooledTestUpdates(tempDir, [topicUpdate(42, 10, "long topic 10 turn")]); + + const { runPromise, stopWorker } = startIsolatedIngressSession({ + abort, + spoolDir: tempDir, + handleUpdate: async (update) => { + events.push(`topic10:${update.update_id}`); + await handlerDone; + }, + }); + + try { + await vi.waitFor(() => expect(events).toEqual(["topic10:42"])); + const before = await claimedAtForUpdate(tempDir, 42); + + refreshHarness.triggerRefresh(); + await vi.waitFor(async () => + expect(await claimedAtForUpdate(tempDir, 42)).toBeGreaterThan(before), + ); + + releaseHandler?.(); + await vi.waitFor(async () => + expect(await listTelegramSpooledUpdateClaims({ spoolDir: tempDir })).toEqual([]), + ); + } finally { + releaseHandler?.(); + abort.abort(); + stopWorker(); + refreshHarness.restore(); + await runPromise; + } + }); + }); + it("holds buffered spooled claims until deferred processing settles without blocking same-lane buffering", async () => { await withTempSpool(async (tempDir) => { const abort = new AbortController(); @@ -1625,6 +1711,50 @@ describe("TelegramPollingSession", () => { }); }); + it("refreshes deferred spooled claims after the active handler hands off", async () => { + const refreshHarness = installSpooledClaimRefreshHarness(); + await withTempSpool(async (tempDir) => { + const abort = new AbortController(); + const participants: TelegramSpooledReplayDeferredParticipant[] = []; + await writeSpooledTestUpdates(tempDir, [topicUpdate(42, 10, "buffered topic 10 turn")]); + + const { runPromise, stopWorker } = startIsolatedIngressSession({ + abort, + spoolDir: tempDir, + handleUpdate: async (update) => { + const participant = createTelegramSpooledReplayDeferredParticipant( + `test-buffer:${update.update_id}`, + ); + if (!participant) { + throw new Error("expected spooled replay participant"); + } + participants.push(participant); + }, + }); + + try { + await vi.waitFor(() => expect(participants).toHaveLength(1)); + const before = await claimedAtForUpdate(tempDir, 42); + + refreshHarness.triggerRefresh(); + await vi.waitFor(async () => + expect(await claimedAtForUpdate(tempDir, 42)).toBeGreaterThan(before), + ); + + participants[0]?.settle({ kind: "completed" }); + await vi.waitFor(async () => + expect(await listTelegramSpooledUpdateClaims({ spoolDir: tempDir })).toEqual([]), + ); + } finally { + participants[0]?.settle({ kind: "completed" }); + abort.abort(); + stopWorker(); + refreshHarness.restore(); + await runPromise; + } + }); + }); + it("releases buffered spooled claims for retry when deferred processing fails", async () => { await withTempSpool(async (tempDir) => { const abort = new AbortController(); @@ -3585,6 +3715,106 @@ describe("TelegramPollingSession", () => { } }); + it("marks isolated ingress unhealthy when a spooled backlog stalls before handler timeout", async () => { + vi.useFakeTimers({ now: 1_000, shouldAdvanceTime: true }); + const abort = new AbortController(); + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-spool-")); + const setStatus = vi.fn(); + let releaseRegularTurn: (() => void) | undefined; + const regularTurnDone = new Promise((resolve) => { + releaseRegularTurn = resolve; + }); + const handleUpdate = vi.fn(async () => { + await regularTurnDone; + }); + createTelegramBotMock.mockReturnValueOnce({ + api: { + deleteWebhook: vi.fn(async () => true), + config: { use: vi.fn() }, + }, + init: vi.fn(async () => undefined), + handleUpdate, + stop: vi.fn(async () => undefined), + }); + await writeSpooledTestUpdates(tempDir, [ + topicUpdate(42, 10, "active topic 10 turn"), + topicUpdate(43, 10, "later topic 10 turn"), + ]); + + const workerListeners: WorkerMessageListener[] = []; + let stopWorker: (() => void) | undefined; + const workerDone = new Promise((resolve) => { + stopWorker = resolve; + }); + const createWorker = vi.fn(() => ({ + onMessage: vi.fn((listener: WorkerMessageListener) => { + workerListeners.push(listener); + return () => undefined; + }), + stop: vi.fn(async () => { + stopWorker?.(); + }), + task: vi.fn(async () => { + await workerDone; + }), + })); + + try { + const session = createPollingSession({ + abortSignal: abort.signal, + setStatus, + isolatedIngress: { + enabled: true, + spoolDir: tempDir, + createWorker, + drainIntervalMs: pollingSessionTesting.isolatedIngressBacklogStallMs * 2, + spooledUpdateHandlerTimeoutMs: pollingSessionTesting.isolatedIngressBacklogStallMs * 2, + }, + }); + + const runPromise = session.runUntilAbort(); + await vi.waitFor(() => expect(handleUpdate).toHaveBeenCalledTimes(1)); + workerListeners[0]?.({ + type: "poll-success", + offset: null, + count: 0, + finishedAt: Date.now(), + }); + expect(statusPatches(setStatus).some((patch) => patch.connected === true)).toBe(true); + + vi.setSystemTime(1_000 + pollingSessionTesting.isolatedIngressBacklogStallMs + 1); + workerListeners[0]?.({ type: "spooled", updateId: 43, queued: 1 }); + await vi.waitFor(() => + expect( + statusPatches(setStatus).some( + (patch) => + patch.connected === false && + String(patch.lastError).includes("isolated polling spool backlog stalled"), + ), + ).toBe(true), + ); + expect(await failedUpdateIds(tempDir)).toEqual([]); + expect(await pendingUpdateIds(tempDir, "all")).toEqual([43]); + expect( + (await listTelegramSpooledUpdateClaims({ spoolDir: tempDir })).map( + (claim) => claim.updateId, + ), + ).toEqual([42]); + + releaseRegularTurn?.(); + abort.abort(); + stopWorker?.(); + await vi.advanceTimersByTimeAsync(20_000); + await runPromise; + } finally { + releaseRegularTurn?.(); + abort.abort(); + stopWorker?.(); + vi.useRealTimers(); + await fs.rm(tempDir, { recursive: true, force: true }); + } + }); + it("marks isolated ingress unhealthy when a spooled backlog handler times out", async () => { vi.useFakeTimers({ shouldAdvanceTime: true }); const abort = new AbortController(); diff --git a/extensions/telegram/src/polling-session.ts b/extensions/telegram/src/polling-session.ts index f6086eb9afc..9c41a0f9ae5 100644 --- a/extensions/telegram/src/polling-session.ts +++ b/extensions/telegram/src/polling-session.ts @@ -41,6 +41,7 @@ import { listTelegramSpooledUpdateClaims, listTelegramSpooledUpdates, recoverStaleTelegramSpooledUpdateClaims, + refreshTelegramSpooledUpdateClaim, releaseTelegramSpooledUpdateClaim, resolveTelegramIngressSpoolDir, writeTelegramSpooledUpdate, @@ -131,6 +132,7 @@ const TELEGRAM_SPOOLED_HANDLER_ABORT_GRACE_MS = 5_000; const TELEGRAM_SPOOLED_HANDLER_TIMEOUT_ENV = "OPENCLAW_TELEGRAM_SPOOLED_HANDLER_TIMEOUT_MS"; const TELEGRAM_SPOOLED_DRAIN_START_LIMIT = 100; const TELEGRAM_SPOOLED_DRAIN_SCAN_LIMIT = TELEGRAM_SPOOLED_DRAIN_START_LIMIT * 10; +const TELEGRAM_SPOOLED_CLAIM_REFRESH_INTERVAL_MS = 5 * 60 * 1000; const TELEGRAM_SPOOLED_SESSION_INIT_CONFLICT_RETRY_BASE_MS = 5_000; const TELEGRAM_SPOOLED_SESSION_INIT_CONFLICT_RETRY_MAX_MS = 60_000; const TELEGRAM_POLLING_CLIENT_TIMEOUT_FLOOR_SECONDS = Math.ceil( @@ -291,6 +293,8 @@ type SpooledUpdateHandlerState = { update: ClaimedTelegramSpooledUpdate; updateId: number; startedAt: number; + stopClaimRefresh: () => void; + backlogStatusMessage?: string; timedOutAt?: number; timeoutMessage?: string; }; @@ -303,6 +307,7 @@ type DeferredSpooledUpdateClaimState = { timedOutMessage?: string; update: ClaimedTelegramSpooledUpdate; updateId: number; + stopClaimRefresh: () => void; }; const deferredSpooledUpdateClaimsByKey = new Map(); @@ -572,8 +577,46 @@ export class TelegramPollingSession { } } + #startSpooledUpdateClaimRefresh(update: ClaimedTelegramSpooledUpdate): () => void { + // Refresh only while this process still owns useful work for this claim token. + // Stopping before release/fail/delete lets stale recovery take over if work stalls. + let stopped = false; + let refreshing = false; + const refresh = async (): Promise => { + if (stopped || refreshing) { + return; + } + refreshing = true; + try { + const refreshed = await refreshTelegramSpooledUpdateClaim(update); + if (!refreshed && !stopped) { + stopped = true; + clearInterval(timer); + } + } catch (err) { + this.opts.log( + `[telegram][diag] spooled update ${update.updateId} claim refresh failed: ${formatErrorMessage(err)}`, + ); + } finally { + refreshing = false; + } + }; + const timer = setInterval(() => { + void refresh(); + }, TELEGRAM_SPOOLED_CLAIM_REFRESH_INTERVAL_MS); + timer.unref?.(); + return () => { + if (stopped) { + return; + } + stopped = true; + clearInterval(timer); + }; + } + async #handleClaimedSpooledUpdate(params: { bot: TelegramBot; + stopClaimRefresh: () => void; update: ClaimedTelegramSpooledUpdate; }): Promise { let replay: { deferredWork?: TelegramSpooledReplayDeferredParticipant }; @@ -583,6 +626,7 @@ export class TelegramPollingSession { await params.bot.handleUpdate(update); }); } catch (err) { + params.stopClaimRefresh(); await this.#releaseFailedSpooledUpdate({ err, update: params.update, @@ -593,11 +637,13 @@ export class TelegramPollingSession { this.#registerDeferredSpooledUpdate({ deferredWork: replay.deferredWork, laneKey: this.#spooledUpdateLaneKey(params.update), + stopClaimRefresh: params.stopClaimRefresh, update: params.update, }); return true; } try { + params.stopClaimRefresh(); await deleteTelegramSpooledUpdate(params.update); return true; } catch (err) { @@ -611,6 +657,7 @@ export class TelegramPollingSession { #registerDeferredSpooledUpdate(params: { deferredWork: TelegramSpooledReplayDeferredParticipant; laneKey: string; + stopClaimRefresh: () => void; update: ClaimedTelegramSpooledUpdate; }): void { const claimKey = buildDeferredSpooledUpdateClaimKey(params.update); @@ -619,6 +666,7 @@ export class TelegramPollingSession { if (previous.timer) { clearTimeout(previous.timer); } + previous.stopClaimRefresh(); deferredSpooledUpdateClaimsByKey.delete(claimKey); } let settled = false; @@ -630,6 +678,7 @@ export class TelegramPollingSession { if (state.timer) { clearTimeout(state.timer); } + state.stopClaimRefresh(); if (deferredSpooledUpdateClaimsByKey.get(claimKey) === state) { deferredSpooledUpdateClaimsByKey.delete(claimKey); } @@ -661,10 +710,12 @@ export class TelegramPollingSession { }), update: params.update, updateId: params.update.updateId, + stopClaimRefresh: params.stopClaimRefresh, }; state.timer = setTimeout(() => { const age = formatDurationPrecise(this.#spooledUpdateHandlerTimeoutMs); state.timedOutMessage = `Telegram isolated polling spool buffered processing timed out behind update ${params.update.updateId} on lane ${params.laneKey} after ${age}; marking the update failed, aborting active reply work, and keeping the claim out of retry while the buffered task settles.`; + state.stopClaimRefresh(); params.deferredWork.settle({ kind: "failed-retryable", error: new Error(state.timedOutMessage), @@ -905,8 +956,10 @@ export class TelegramPollingSession { claimedLaneKeys.add(laneKey); continue; } + const stopClaimRefresh = this.#startSpooledUpdateClaimRefresh(claimedUpdate); const handler = this.#handleClaimedSpooledUpdate({ bot: params.bot, + stopClaimRefresh, update: claimedUpdate, }); const state: SpooledUpdateHandlerState = { @@ -916,11 +969,17 @@ export class TelegramPollingSession { update: claimedUpdate, updateId: update.updateId, startedAt: Date.now(), + stopClaimRefresh, }; activeSpooledUpdateHandlersByLane.set(handlerKey, state); this.#spooledUpdateHandlerKeys.add(handlerKey); claimedLaneKeys.add(laneKey); void handler.finally(() => { + if ( + !deferredSpooledUpdateClaimsByKey.has(buildDeferredSpooledUpdateClaimKey(claimedUpdate)) + ) { + state.stopClaimRefresh(); + } if (activeSpooledUpdateHandlersByLane.get(handlerKey) === state) { activeSpooledUpdateHandlersByLane.delete(handlerKey); } @@ -969,6 +1028,7 @@ export class TelegramPollingSession { } const age = formatDurationPrecise(timedOutHandler.ageMs); activeHandler.timedOutAt = Date.now(); + activeHandler.stopClaimRefresh(); const message = `Telegram isolated polling spool handler timed out behind update ${handler.updateId} on lane ${handler.laneKey} after ${age}; marking the update failed, aborting active reply work, and restarting isolated ingress so later updates can drain.`; activeHandler.timeoutMessage = message; try { @@ -1025,6 +1085,27 @@ export class TelegramPollingSession { return { handlerKey: handler.handlerKey, restart: true }; } + #noteSpooledBacklogStalls(blockedHandlerKeys: Set): Set { + const stalled = new Set(); + const now = Date.now(); + for (const handlerKey of blockedHandlerKeys) { + const handler = activeSpooledUpdateHandlersByLane.get(handlerKey); + if (!handler || handler.timedOutAt !== undefined) { + continue; + } + const ageMs = now - handler.startedAt; + if (ageMs < ISOLATED_INGRESS_BACKLOG_STALL_MS) { + continue; + } + stalled.add(handlerKey); + if (!handler.backlogStatusMessage) { + handler.backlogStatusMessage = `Telegram isolated polling spool backlog stalled behind update ${handler.updateId} on lane ${handler.laneKey} for ${formatDurationPrecise(ageMs)}; marking polling unhealthy until the backlog drains.`; + this.#status.notePollingError(handler.backlogStatusMessage); + } + } + return stalled; + } + async #runIsolatedIngressCycle(bot: TelegramBot): Promise<"continue" | "exit"> { const ingress = this.opts.isolatedIngress; if (!ingress?.enabled) { @@ -1222,6 +1303,9 @@ export class TelegramPollingSession { this.#status.notePollingError(handler.timeoutMessage); } } + for (const handlerKey of this.#noteSpooledBacklogStalls(drain.blockedByLane)) { + stalledBacklogKeys.add(handlerKey); + } // Active handlers can outlive their owning session after shutdown grace. // Recover every handler for this spool, including lone handlers with no backlog. const timeoutCandidateHandlerKeys = this.#activeSpooledUpdateHandlerKeysForSpool(spoolDir); @@ -1561,6 +1645,8 @@ export const testing = { resetTelegramRestartBackoffState, resolveTelegramRestartDelayMs, resolveSpooledUpdateRetryDelayMs, + isolatedIngressBacklogStallMs: ISOLATED_INGRESS_BACKLOG_STALL_MS, + spooledClaimRefreshIntervalMs: TELEGRAM_SPOOLED_CLAIM_REFRESH_INTERVAL_MS, resolveSpooledUpdateHandlerAbortGraceMs: (valueMs: unknown): number => resolvePositiveTimerTimeoutMs(valueMs, TELEGRAM_SPOOLED_HANDLER_ABORT_GRACE_MS), }; diff --git a/extensions/telegram/src/telegram-ingress-spool.test.ts b/extensions/telegram/src/telegram-ingress-spool.test.ts index 5e9d8413d76..ffe8eb44292 100644 --- a/extensions/telegram/src/telegram-ingress-spool.test.ts +++ b/extensions/telegram/src/telegram-ingress-spool.test.ts @@ -17,6 +17,7 @@ import { listTelegramSpooledUpdateClaims, listTelegramSpooledUpdates, recoverStaleTelegramSpooledUpdateClaims, + refreshTelegramSpooledUpdateClaim, releaseTelegramSpooledUpdateClaim, TELEGRAM_SPOOLED_UPDATE_PROCESSING_STALE_MS, writeTelegramSpooledUpdate, @@ -140,6 +141,32 @@ describe("Telegram ingress spool", () => { }); }); + it("refreshes active claim timestamps through the Telegram spool queue", async () => { + await withTempSpool(async (spoolDir) => { + await writeTelegramSpooledUpdate({ + spoolDir, + update: { update_id: 31, message: { text: "refresh me" } }, + }); + const update = (await listTelegramSpooledUpdates({ spoolDir }))[0]; + if (!update) { + throw new Error("Expected a spooled update"); + } + const claimed = await claimTelegramSpooledUpdate(update); + if (!claimed) { + throw new Error("Expected a claimed update"); + } + + await expect(refreshTelegramSpooledUpdateClaim(claimed, { refreshedAt: 123 })).resolves.toBe( + true, + ); + + const claims = await listTelegramSpooledUpdateClaims({ spoolDir }); + expect(claims).toHaveLength(1); + expect(claims[0]?.updateId).toBe(31); + expect(claims[0]?.claim?.claimedAt).toBe(123); + }); + }); + it("marks timed out claims failed without requeueing them", async () => { await withTempSpool(async (spoolDir) => { await writeTelegramSpooledUpdate({ diff --git a/extensions/telegram/src/telegram-ingress-spool.ts b/extensions/telegram/src/telegram-ingress-spool.ts index 27e890416df..ed28ba74697 100644 --- a/extensions/telegram/src/telegram-ingress-spool.ts +++ b/extensions/telegram/src/telegram-ingress-spool.ts @@ -281,6 +281,23 @@ export async function releaseTelegramSpooledUpdateClaim( ); } +export async function refreshTelegramSpooledUpdateClaim( + update: ClaimedTelegramSpooledUpdate, + options?: { refreshedAt?: number }, +): Promise { + const claimToken = update.claim?.claimToken; + if (!claimToken) { + return false; + } + const queue = createTelegramIngressQueue(path.dirname(update.pendingPath)); + return ( + (await queue.refreshClaim?.( + { id: queueEventId(update.updateId), claim: { token: claimToken } }, + options, + )) ?? false + ); +} + export async function failTelegramSpooledUpdateClaim(params: { update: ClaimedTelegramSpooledUpdate; reason: string; diff --git a/src/channels/message/ingress-queue.test.ts b/src/channels/message/ingress-queue.test.ts index ae3c3ec0e67..b68cd7f024f 100644 --- a/src/channels/message/ingress-queue.test.ts +++ b/src/channels/message/ingress-queue.test.ts @@ -277,6 +277,105 @@ describe("channel ingress queue", () => { }); }); + it("refreshes claimed rows only with the active claim token", async () => { + await withTempState(async (stateDir) => { + const queue = createChannelIngressQueue<{ text: string }>({ + channelId: "test", + accountId: "account", + stateDir, + now: () => 10, + }); + + await queue.enqueue("event-1", { text: "claimed" }); + const claimed = await queue.claim("event-1", { ownerId: "worker" }); + if (!claimed) { + throw new Error("Expected a claimed ingress event"); + } + + expect(await queue.refreshClaim?.(claimed, { refreshedAt: 20 })).toBe(true); + expect( + (await queue.listClaims()).map((claim) => ({ + id: claim.id, + claimedAt: claim.claim.claimedAt, + updatedAt: claim.updatedAt, + })), + ).toEqual([{ id: "event-1", claimedAt: 20, updatedAt: 20 }]); + + expect( + await queue.refreshClaim?.( + { id: "event-1", claim: { token: "wrong" } }, + { + refreshedAt: 30, + }, + ), + ).toBe(false); + expect((await queue.listClaims())[0]?.claim.claimedAt).toBe(20); + }); + }); + + it("does not let old claim tokens refresh recovered and reclaimed rows", async () => { + await withTempState(async (stateDir) => { + const queue = createChannelIngressQueue<{ text: string }>({ + channelId: "test", + accountId: "account", + stateDir, + now: () => 10, + }); + + await queue.enqueue("event-1", { text: "claimed" }); + const oldClaim = await queue.claim("event-1", { ownerId: "worker-1" }); + if (!oldClaim) { + throw new Error("Expected a claimed ingress event"); + } + expect(await queue.recoverStaleClaims({ staleMs: 5, now: 20 })).toBe(1); + const newClaim = await queue.claim("event-1", { ownerId: "worker-2" }); + if (!newClaim) { + throw new Error("Expected reclaimed ingress event"); + } + + expect(await queue.refreshClaim?.(oldClaim, { refreshedAt: 30 })).toBe(false); + expect(await queue.refreshClaim?.(newClaim, { refreshedAt: 40 })).toBe(true); + expect((await queue.listClaims())[0]?.claim).toMatchObject({ + ownerId: "worker-2", + claimedAt: 40, + }); + }); + }); + + it("does not recover a claim refreshed after stale recovery snapshots it", async () => { + await withTempState(async (stateDir) => { + const queue = createChannelIngressQueue<{ text: string }>({ + channelId: "test", + accountId: "account", + stateDir, + now: () => 10, + }); + + await queue.enqueue("event-1", { text: "claimed" }); + const claimed = await queue.claim("event-1", { ownerId: "worker" }); + if (!claimed) { + throw new Error("Expected a claimed ingress event"); + } + + expect( + await queue.recoverStaleClaims({ + staleMs: 5, + now: 20, + shouldRecover: async (claim) => { + expect(claim.id).toBe("event-1"); + expect(await queue.refreshClaim?.(claim, { refreshedAt: 20 })).toBe(true); + return true; + }, + }), + ).toBe(0); + expect((await queue.listPending()).map((record) => record.id)).toEqual([]); + expect((await queue.listClaims())[0]?.claim).toMatchObject({ + ownerId: "worker", + claimedAt: 20, + }); + }); + }); + it("recovers stale claims and prunes completed or failed rows", async () => { await withTempState(async (stateDir) => { const queue = createChannelIngressQueue<{ text: string }>({ diff --git a/src/channels/message/ingress-queue.ts b/src/channels/message/ingress-queue.ts index c125cd35d06..49afa7a3bd6 100644 --- a/src/channels/message/ingress-queue.ts +++ b/src/channels/message/ingress-queue.ts @@ -142,6 +142,10 @@ export type ChannelIngressQueue | null>; + refreshClaim?( + claim: ChannelIngressQueueClaimRef, + options?: { refreshedAt?: number }, + ): Promise; complete( idOrClaim: string | ChannelIngressQueueClaimRef, options?: { metadata?: TCompletedMetadata; completedAt?: number }, @@ -440,26 +444,6 @@ export function createChannelIngressQueue< return rows.map((row) => claimedRecord(row)); }; - const recoverStaleClaims: ChannelIngressQueue< - TPayload, - TMetadata, - TCompletedMetadata - >["recoverStaleClaims"] = async (recoverOptions) => { - const staleMs = Math.max(0, Math.floor(recoverOptions?.staleMs ?? 0)); - const cutoff = (recoverOptions?.now ?? now()) - staleMs; - const claims = (await listClaims()).filter((claim) => claim.claim.claimedAt <= cutoff); - let recovered = 0; - for (const claim of claims) { - if (recoverOptions?.shouldRecover && !(await recoverOptions.shouldRecover(claim))) { - continue; - } - if (await release(claim, { releasedAt: recoverOptions?.now ?? now() })) { - recovered += 1; - } - } - return recovered; - }; - const claimNext: ChannelIngressQueue< TPayload, TMetadata, @@ -561,6 +545,89 @@ export function createChannelIngressQueue< ); }; + const refreshClaim: NonNullable< + ChannelIngressQueue["refreshClaim"] + > = async (claimRef, refreshOptions) => { + const eventId = idFrom(claimRef); + const refreshedAt = refreshOptions?.refreshedAt ?? now(); + const database = openStateDatabase(options.stateDir); + return runOpenClawStateWriteTransaction( + (tx) => { + const kysely = getChannelIngressKysely(tx.db); + const result = executeSqliteQuerySync( + tx.db, + kysely + .updateTable("channel_ingress_events") + .set({ + claimed_at: refreshedAt, + updated_at: refreshedAt, + }) + .where("queue_name", "=", queueName) + .where("event_id", "=", eventId) + .where("status", "=", "claimed") + .where("claim_token", "=", claimRef.claim.token), + ); + return affectedRows(result) > 0; + }, + { path: database.path }, + ); + }; + + const releaseClaimIfStillStale = async ( + claimRef: ChannelIngressQueueClaimRef, + releaseOptions: { cutoff: number; releasedAt: number }, + ): Promise => { + const eventId = idFrom(claimRef); + const database = openStateDatabase(options.stateDir); + return runOpenClawStateWriteTransaction( + (tx) => { + const kysely = getChannelIngressKysely(tx.db); + const result = executeSqliteQuerySync( + tx.db, + kysely + .updateTable("channel_ingress_events") + .set((eb) => ({ + status: "pending", + claim_token: null, + claim_owner: null, + claimed_at: null, + attempts: eb("attempts", "+", 1), + last_attempt_at: releaseOptions.releasedAt, + updated_at: releaseOptions.releasedAt, + })) + .where("queue_name", "=", queueName) + .where("event_id", "=", eventId) + .where("status", "=", "claimed") + .where("claim_token", "=", claimRef.claim.token) + .where("claimed_at", "<=", releaseOptions.cutoff), + ); + return affectedRows(result) > 0; + }, + { path: database.path }, + ); + }; + + const recoverStaleClaims: ChannelIngressQueue< + TPayload, + TMetadata, + TCompletedMetadata + >["recoverStaleClaims"] = async (recoverOptions) => { + const current = recoverOptions?.now ?? now(); + const staleMs = Math.max(0, Math.floor(recoverOptions?.staleMs ?? 0)); + const cutoff = current - staleMs; + const staleClaims = (await listClaims()).filter((claimed) => claimed.claim.claimedAt <= cutoff); + let recovered = 0; + for (const staleClaim of staleClaims) { + if (recoverOptions?.shouldRecover && !(await recoverOptions.shouldRecover(staleClaim))) { + continue; + } + if (await releaseClaimIfStillStale(staleClaim, { cutoff, releasedAt: current })) { + recovered += 1; + } + } + return recovered; + }; + const complete: ChannelIngressQueue["complete"] = async ( idOrClaim, completeOptions, @@ -845,6 +912,7 @@ export function createChannelIngressQueue< listClaims, claimNext, claim, + refreshClaim, complete, release, fail,