Fix Telegram spooled claim refresh (#96962)

This commit is contained in:
Josh Avant
2026-06-26 01:30:57 -05:00
committed by GitHub
parent 4fc504d321
commit db255b1154
6 changed files with 547 additions and 20 deletions

View File

@@ -486,6 +486,49 @@ async function pendingUpdateIds(spoolDir: string, limit: number | "all" = 100):
return (await listTelegramSpooledUpdates({ spoolDir, limit })).map((update) => update.updateId);
}
async function claimedAtForUpdate(spoolDir: string, updateId: number): Promise<number> {
const claim = (await listTelegramSpooledUpdateClaims({ spoolDir })).find(
(entry) => entry.updateId === updateId,
);
if (!claim?.claim) {
throw new Error(`Expected claimed spooled update ${updateId}`);
}
return claim.claim.claimedAt;
}
function installSpooledClaimRefreshHarness(): {
restore: () => void;
triggerRefresh: () => void;
} {
let refresh: (() => void) | undefined;
const realSetInterval = globalThis.setInterval.bind(globalThis);
const setIntervalSpy = vi.spyOn(globalThis, "setInterval").mockImplementation(((
handler: Parameters<typeof setInterval>[0],
timeout?: number,
) => {
if (timeout === pollingSessionTesting.spooledClaimRefreshIntervalMs) {
refresh = () => {
if (typeof handler === "function") {
handler();
}
};
const timer = realSetInterval(() => undefined, 2_147_483_647);
timer.unref?.();
return timer;
}
return realSetInterval(handler, timeout);
}) as typeof setInterval);
return {
restore: () => setIntervalSpy.mockRestore(),
triggerRefresh: () => {
if (!refresh) {
throw new Error("Expected spooled claim refresh interval to be registered");
}
refresh();
},
};
}
function normalizeTelegramTestAccountId(spoolDir: string): string {
const trimmed = path.basename(spoolDir).trim();
return trimmed ? trimmed.replace(/[^a-z0-9._-]+/gi, "_") : "default";
@@ -1575,6 +1618,49 @@ describe("TelegramPollingSession", () => {
});
});
it("refreshes active spooled claims while the handler is still running", async () => {
const refreshHarness = installSpooledClaimRefreshHarness();
await withTempSpool(async (tempDir) => {
const abort = new AbortController();
const events: string[] = [];
let releaseHandler: (() => void) | undefined;
const handlerDone = new Promise<void>((resolve) => {
releaseHandler = resolve;
});
await writeSpooledTestUpdates(tempDir, [topicUpdate(42, 10, "long topic 10 turn")]);
const { runPromise, stopWorker } = startIsolatedIngressSession({
abort,
spoolDir: tempDir,
handleUpdate: async (update) => {
events.push(`topic10:${update.update_id}`);
await handlerDone;
},
});
try {
await vi.waitFor(() => expect(events).toEqual(["topic10:42"]));
const before = await claimedAtForUpdate(tempDir, 42);
refreshHarness.triggerRefresh();
await vi.waitFor(async () =>
expect(await claimedAtForUpdate(tempDir, 42)).toBeGreaterThan(before),
);
releaseHandler?.();
await vi.waitFor(async () =>
expect(await listTelegramSpooledUpdateClaims({ spoolDir: tempDir })).toEqual([]),
);
} finally {
releaseHandler?.();
abort.abort();
stopWorker();
refreshHarness.restore();
await runPromise;
}
});
});
it("holds buffered spooled claims until deferred processing settles without blocking same-lane buffering", async () => {
await withTempSpool(async (tempDir) => {
const abort = new AbortController();
@@ -1625,6 +1711,50 @@ describe("TelegramPollingSession", () => {
});
});
it("refreshes deferred spooled claims after the active handler hands off", async () => {
const refreshHarness = installSpooledClaimRefreshHarness();
await withTempSpool(async (tempDir) => {
const abort = new AbortController();
const participants: TelegramSpooledReplayDeferredParticipant[] = [];
await writeSpooledTestUpdates(tempDir, [topicUpdate(42, 10, "buffered topic 10 turn")]);
const { runPromise, stopWorker } = startIsolatedIngressSession({
abort,
spoolDir: tempDir,
handleUpdate: async (update) => {
const participant = createTelegramSpooledReplayDeferredParticipant(
`test-buffer:${update.update_id}`,
);
if (!participant) {
throw new Error("expected spooled replay participant");
}
participants.push(participant);
},
});
try {
await vi.waitFor(() => expect(participants).toHaveLength(1));
const before = await claimedAtForUpdate(tempDir, 42);
refreshHarness.triggerRefresh();
await vi.waitFor(async () =>
expect(await claimedAtForUpdate(tempDir, 42)).toBeGreaterThan(before),
);
participants[0]?.settle({ kind: "completed" });
await vi.waitFor(async () =>
expect(await listTelegramSpooledUpdateClaims({ spoolDir: tempDir })).toEqual([]),
);
} finally {
participants[0]?.settle({ kind: "completed" });
abort.abort();
stopWorker();
refreshHarness.restore();
await runPromise;
}
});
});
it("releases buffered spooled claims for retry when deferred processing fails", async () => {
await withTempSpool(async (tempDir) => {
const abort = new AbortController();
@@ -3585,6 +3715,106 @@ describe("TelegramPollingSession", () => {
}
});
it("marks isolated ingress unhealthy when a spooled backlog stalls before handler timeout", async () => {
vi.useFakeTimers({ now: 1_000, shouldAdvanceTime: true });
const abort = new AbortController();
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-spool-"));
const setStatus = vi.fn();
let releaseRegularTurn: (() => void) | undefined;
const regularTurnDone = new Promise<void>((resolve) => {
releaseRegularTurn = resolve;
});
const handleUpdate = vi.fn(async () => {
await regularTurnDone;
});
createTelegramBotMock.mockReturnValueOnce({
api: {
deleteWebhook: vi.fn(async () => true),
config: { use: vi.fn() },
},
init: vi.fn(async () => undefined),
handleUpdate,
stop: vi.fn(async () => undefined),
});
await writeSpooledTestUpdates(tempDir, [
topicUpdate(42, 10, "active topic 10 turn"),
topicUpdate(43, 10, "later topic 10 turn"),
]);
const workerListeners: WorkerMessageListener[] = [];
let stopWorker: (() => void) | undefined;
const workerDone = new Promise<void>((resolve) => {
stopWorker = resolve;
});
const createWorker = vi.fn(() => ({
onMessage: vi.fn((listener: WorkerMessageListener) => {
workerListeners.push(listener);
return () => undefined;
}),
stop: vi.fn(async () => {
stopWorker?.();
}),
task: vi.fn(async () => {
await workerDone;
}),
}));
try {
const session = createPollingSession({
abortSignal: abort.signal,
setStatus,
isolatedIngress: {
enabled: true,
spoolDir: tempDir,
createWorker,
drainIntervalMs: pollingSessionTesting.isolatedIngressBacklogStallMs * 2,
spooledUpdateHandlerTimeoutMs: pollingSessionTesting.isolatedIngressBacklogStallMs * 2,
},
});
const runPromise = session.runUntilAbort();
await vi.waitFor(() => expect(handleUpdate).toHaveBeenCalledTimes(1));
workerListeners[0]?.({
type: "poll-success",
offset: null,
count: 0,
finishedAt: Date.now(),
});
expect(statusPatches(setStatus).some((patch) => patch.connected === true)).toBe(true);
vi.setSystemTime(1_000 + pollingSessionTesting.isolatedIngressBacklogStallMs + 1);
workerListeners[0]?.({ type: "spooled", updateId: 43, queued: 1 });
await vi.waitFor(() =>
expect(
statusPatches(setStatus).some(
(patch) =>
patch.connected === false &&
String(patch.lastError).includes("isolated polling spool backlog stalled"),
),
).toBe(true),
);
expect(await failedUpdateIds(tempDir)).toEqual([]);
expect(await pendingUpdateIds(tempDir, "all")).toEqual([43]);
expect(
(await listTelegramSpooledUpdateClaims({ spoolDir: tempDir })).map(
(claim) => claim.updateId,
),
).toEqual([42]);
releaseRegularTurn?.();
abort.abort();
stopWorker?.();
await vi.advanceTimersByTimeAsync(20_000);
await runPromise;
} finally {
releaseRegularTurn?.();
abort.abort();
stopWorker?.();
vi.useRealTimers();
await fs.rm(tempDir, { recursive: true, force: true });
}
});
it("marks isolated ingress unhealthy when a spooled backlog handler times out", async () => {
vi.useFakeTimers({ shouldAdvanceTime: true });
const abort = new AbortController();

View File

@@ -41,6 +41,7 @@ import {
listTelegramSpooledUpdateClaims,
listTelegramSpooledUpdates,
recoverStaleTelegramSpooledUpdateClaims,
refreshTelegramSpooledUpdateClaim,
releaseTelegramSpooledUpdateClaim,
resolveTelegramIngressSpoolDir,
writeTelegramSpooledUpdate,
@@ -131,6 +132,7 @@ const TELEGRAM_SPOOLED_HANDLER_ABORT_GRACE_MS = 5_000;
const TELEGRAM_SPOOLED_HANDLER_TIMEOUT_ENV = "OPENCLAW_TELEGRAM_SPOOLED_HANDLER_TIMEOUT_MS";
const TELEGRAM_SPOOLED_DRAIN_START_LIMIT = 100;
const TELEGRAM_SPOOLED_DRAIN_SCAN_LIMIT = TELEGRAM_SPOOLED_DRAIN_START_LIMIT * 10;
const TELEGRAM_SPOOLED_CLAIM_REFRESH_INTERVAL_MS = 5 * 60 * 1000;
const TELEGRAM_SPOOLED_SESSION_INIT_CONFLICT_RETRY_BASE_MS = 5_000;
const TELEGRAM_SPOOLED_SESSION_INIT_CONFLICT_RETRY_MAX_MS = 60_000;
const TELEGRAM_POLLING_CLIENT_TIMEOUT_FLOOR_SECONDS = Math.ceil(
@@ -291,6 +293,8 @@ type SpooledUpdateHandlerState = {
update: ClaimedTelegramSpooledUpdate;
updateId: number;
startedAt: number;
stopClaimRefresh: () => void;
backlogStatusMessage?: string;
timedOutAt?: number;
timeoutMessage?: string;
};
@@ -303,6 +307,7 @@ type DeferredSpooledUpdateClaimState = {
timedOutMessage?: string;
update: ClaimedTelegramSpooledUpdate;
updateId: number;
stopClaimRefresh: () => void;
};
const deferredSpooledUpdateClaimsByKey = new Map<string, DeferredSpooledUpdateClaimState>();
@@ -572,8 +577,46 @@ export class TelegramPollingSession {
}
}
#startSpooledUpdateClaimRefresh(update: ClaimedTelegramSpooledUpdate): () => void {
// Refresh only while this process still owns useful work for this claim token.
// Stopping before release/fail/delete lets stale recovery take over if work stalls.
let stopped = false;
let refreshing = false;
const refresh = async (): Promise<void> => {
if (stopped || refreshing) {
return;
}
refreshing = true;
try {
const refreshed = await refreshTelegramSpooledUpdateClaim(update);
if (!refreshed && !stopped) {
stopped = true;
clearInterval(timer);
}
} catch (err) {
this.opts.log(
`[telegram][diag] spooled update ${update.updateId} claim refresh failed: ${formatErrorMessage(err)}`,
);
} finally {
refreshing = false;
}
};
const timer = setInterval(() => {
void refresh();
}, TELEGRAM_SPOOLED_CLAIM_REFRESH_INTERVAL_MS);
timer.unref?.();
return () => {
if (stopped) {
return;
}
stopped = true;
clearInterval(timer);
};
}
async #handleClaimedSpooledUpdate(params: {
bot: TelegramBot;
stopClaimRefresh: () => void;
update: ClaimedTelegramSpooledUpdate;
}): Promise<boolean> {
let replay: { deferredWork?: TelegramSpooledReplayDeferredParticipant };
@@ -583,6 +626,7 @@ export class TelegramPollingSession {
await params.bot.handleUpdate(update);
});
} catch (err) {
params.stopClaimRefresh();
await this.#releaseFailedSpooledUpdate({
err,
update: params.update,
@@ -593,11 +637,13 @@ export class TelegramPollingSession {
this.#registerDeferredSpooledUpdate({
deferredWork: replay.deferredWork,
laneKey: this.#spooledUpdateLaneKey(params.update),
stopClaimRefresh: params.stopClaimRefresh,
update: params.update,
});
return true;
}
try {
params.stopClaimRefresh();
await deleteTelegramSpooledUpdate(params.update);
return true;
} catch (err) {
@@ -611,6 +657,7 @@ export class TelegramPollingSession {
#registerDeferredSpooledUpdate(params: {
deferredWork: TelegramSpooledReplayDeferredParticipant;
laneKey: string;
stopClaimRefresh: () => void;
update: ClaimedTelegramSpooledUpdate;
}): void {
const claimKey = buildDeferredSpooledUpdateClaimKey(params.update);
@@ -619,6 +666,7 @@ export class TelegramPollingSession {
if (previous.timer) {
clearTimeout(previous.timer);
}
previous.stopClaimRefresh();
deferredSpooledUpdateClaimsByKey.delete(claimKey);
}
let settled = false;
@@ -630,6 +678,7 @@ export class TelegramPollingSession {
if (state.timer) {
clearTimeout(state.timer);
}
state.stopClaimRefresh();
if (deferredSpooledUpdateClaimsByKey.get(claimKey) === state) {
deferredSpooledUpdateClaimsByKey.delete(claimKey);
}
@@ -661,10 +710,12 @@ export class TelegramPollingSession {
}),
update: params.update,
updateId: params.update.updateId,
stopClaimRefresh: params.stopClaimRefresh,
};
state.timer = setTimeout(() => {
const age = formatDurationPrecise(this.#spooledUpdateHandlerTimeoutMs);
state.timedOutMessage = `Telegram isolated polling spool buffered processing timed out behind update ${params.update.updateId} on lane ${params.laneKey} after ${age}; marking the update failed, aborting active reply work, and keeping the claim out of retry while the buffered task settles.`;
state.stopClaimRefresh();
params.deferredWork.settle({
kind: "failed-retryable",
error: new Error(state.timedOutMessage),
@@ -905,8 +956,10 @@ export class TelegramPollingSession {
claimedLaneKeys.add(laneKey);
continue;
}
const stopClaimRefresh = this.#startSpooledUpdateClaimRefresh(claimedUpdate);
const handler = this.#handleClaimedSpooledUpdate({
bot: params.bot,
stopClaimRefresh,
update: claimedUpdate,
});
const state: SpooledUpdateHandlerState = {
@@ -916,11 +969,17 @@ export class TelegramPollingSession {
update: claimedUpdate,
updateId: update.updateId,
startedAt: Date.now(),
stopClaimRefresh,
};
activeSpooledUpdateHandlersByLane.set(handlerKey, state);
this.#spooledUpdateHandlerKeys.add(handlerKey);
claimedLaneKeys.add(laneKey);
void handler.finally(() => {
if (
!deferredSpooledUpdateClaimsByKey.has(buildDeferredSpooledUpdateClaimKey(claimedUpdate))
) {
state.stopClaimRefresh();
}
if (activeSpooledUpdateHandlersByLane.get(handlerKey) === state) {
activeSpooledUpdateHandlersByLane.delete(handlerKey);
}
@@ -969,6 +1028,7 @@ export class TelegramPollingSession {
}
const age = formatDurationPrecise(timedOutHandler.ageMs);
activeHandler.timedOutAt = Date.now();
activeHandler.stopClaimRefresh();
const message = `Telegram isolated polling spool handler timed out behind update ${handler.updateId} on lane ${handler.laneKey} after ${age}; marking the update failed, aborting active reply work, and restarting isolated ingress so later updates can drain.`;
activeHandler.timeoutMessage = message;
try {
@@ -1025,6 +1085,27 @@ export class TelegramPollingSession {
return { handlerKey: handler.handlerKey, restart: true };
}
#noteSpooledBacklogStalls(blockedHandlerKeys: Set<string>): Set<string> {
const stalled = new Set<string>();
const now = Date.now();
for (const handlerKey of blockedHandlerKeys) {
const handler = activeSpooledUpdateHandlersByLane.get(handlerKey);
if (!handler || handler.timedOutAt !== undefined) {
continue;
}
const ageMs = now - handler.startedAt;
if (ageMs < ISOLATED_INGRESS_BACKLOG_STALL_MS) {
continue;
}
stalled.add(handlerKey);
if (!handler.backlogStatusMessage) {
handler.backlogStatusMessage = `Telegram isolated polling spool backlog stalled behind update ${handler.updateId} on lane ${handler.laneKey} for ${formatDurationPrecise(ageMs)}; marking polling unhealthy until the backlog drains.`;
this.#status.notePollingError(handler.backlogStatusMessage);
}
}
return stalled;
}
async #runIsolatedIngressCycle(bot: TelegramBot): Promise<"continue" | "exit"> {
const ingress = this.opts.isolatedIngress;
if (!ingress?.enabled) {
@@ -1222,6 +1303,9 @@ export class TelegramPollingSession {
this.#status.notePollingError(handler.timeoutMessage);
}
}
for (const handlerKey of this.#noteSpooledBacklogStalls(drain.blockedByLane)) {
stalledBacklogKeys.add(handlerKey);
}
// Active handlers can outlive their owning session after shutdown grace.
// Recover every handler for this spool, including lone handlers with no backlog.
const timeoutCandidateHandlerKeys = this.#activeSpooledUpdateHandlerKeysForSpool(spoolDir);
@@ -1561,6 +1645,8 @@ export const testing = {
resetTelegramRestartBackoffState,
resolveTelegramRestartDelayMs,
resolveSpooledUpdateRetryDelayMs,
isolatedIngressBacklogStallMs: ISOLATED_INGRESS_BACKLOG_STALL_MS,
spooledClaimRefreshIntervalMs: TELEGRAM_SPOOLED_CLAIM_REFRESH_INTERVAL_MS,
resolveSpooledUpdateHandlerAbortGraceMs: (valueMs: unknown): number =>
resolvePositiveTimerTimeoutMs(valueMs, TELEGRAM_SPOOLED_HANDLER_ABORT_GRACE_MS),
};

View File

@@ -17,6 +17,7 @@ import {
listTelegramSpooledUpdateClaims,
listTelegramSpooledUpdates,
recoverStaleTelegramSpooledUpdateClaims,
refreshTelegramSpooledUpdateClaim,
releaseTelegramSpooledUpdateClaim,
TELEGRAM_SPOOLED_UPDATE_PROCESSING_STALE_MS,
writeTelegramSpooledUpdate,
@@ -140,6 +141,32 @@ describe("Telegram ingress spool", () => {
});
});
it("refreshes active claim timestamps through the Telegram spool queue", async () => {
await withTempSpool(async (spoolDir) => {
await writeTelegramSpooledUpdate({
spoolDir,
update: { update_id: 31, message: { text: "refresh me" } },
});
const update = (await listTelegramSpooledUpdates({ spoolDir }))[0];
if (!update) {
throw new Error("Expected a spooled update");
}
const claimed = await claimTelegramSpooledUpdate(update);
if (!claimed) {
throw new Error("Expected a claimed update");
}
await expect(refreshTelegramSpooledUpdateClaim(claimed, { refreshedAt: 123 })).resolves.toBe(
true,
);
const claims = await listTelegramSpooledUpdateClaims({ spoolDir });
expect(claims).toHaveLength(1);
expect(claims[0]?.updateId).toBe(31);
expect(claims[0]?.claim?.claimedAt).toBe(123);
});
});
it("marks timed out claims failed without requeueing them", async () => {
await withTempSpool(async (spoolDir) => {
await writeTelegramSpooledUpdate({

View File

@@ -281,6 +281,23 @@ export async function releaseTelegramSpooledUpdateClaim(
);
}
export async function refreshTelegramSpooledUpdateClaim(
update: ClaimedTelegramSpooledUpdate,
options?: { refreshedAt?: number },
): Promise<boolean> {
const claimToken = update.claim?.claimToken;
if (!claimToken) {
return false;
}
const queue = createTelegramIngressQueue(path.dirname(update.pendingPath));
return (
(await queue.refreshClaim?.(
{ id: queueEventId(update.updateId), claim: { token: claimToken } },
options,
)) ?? false
);
}
export async function failTelegramSpooledUpdateClaim(params: {
update: ClaimedTelegramSpooledUpdate;
reason: string;

View File

@@ -277,6 +277,105 @@ describe("channel ingress queue", () => {
});
});
it("refreshes claimed rows only with the active claim token", async () => {
await withTempState(async (stateDir) => {
const queue = createChannelIngressQueue<{ text: string }>({
channelId: "test",
accountId: "account",
stateDir,
now: () => 10,
});
await queue.enqueue("event-1", { text: "claimed" });
const claimed = await queue.claim("event-1", { ownerId: "worker" });
if (!claimed) {
throw new Error("Expected a claimed ingress event");
}
expect(await queue.refreshClaim?.(claimed, { refreshedAt: 20 })).toBe(true);
expect(
(await queue.listClaims()).map((claim) => ({
id: claim.id,
claimedAt: claim.claim.claimedAt,
updatedAt: claim.updatedAt,
})),
).toEqual([{ id: "event-1", claimedAt: 20, updatedAt: 20 }]);
expect(
await queue.refreshClaim?.(
{ id: "event-1", claim: { token: "wrong" } },
{
refreshedAt: 30,
},
),
).toBe(false);
expect((await queue.listClaims())[0]?.claim.claimedAt).toBe(20);
});
});
it("does not let old claim tokens refresh recovered and reclaimed rows", async () => {
await withTempState(async (stateDir) => {
const queue = createChannelIngressQueue<{ text: string }>({
channelId: "test",
accountId: "account",
stateDir,
now: () => 10,
});
await queue.enqueue("event-1", { text: "claimed" });
const oldClaim = await queue.claim("event-1", { ownerId: "worker-1" });
if (!oldClaim) {
throw new Error("Expected a claimed ingress event");
}
expect(await queue.recoverStaleClaims({ staleMs: 5, now: 20 })).toBe(1);
const newClaim = await queue.claim("event-1", { ownerId: "worker-2" });
if (!newClaim) {
throw new Error("Expected reclaimed ingress event");
}
expect(await queue.refreshClaim?.(oldClaim, { refreshedAt: 30 })).toBe(false);
expect(await queue.refreshClaim?.(newClaim, { refreshedAt: 40 })).toBe(true);
expect((await queue.listClaims())[0]?.claim).toMatchObject({
ownerId: "worker-2",
claimedAt: 40,
});
});
});
it("does not recover a claim refreshed after stale recovery snapshots it", async () => {
await withTempState(async (stateDir) => {
const queue = createChannelIngressQueue<{ text: string }>({
channelId: "test",
accountId: "account",
stateDir,
now: () => 10,
});
await queue.enqueue("event-1", { text: "claimed" });
const claimed = await queue.claim("event-1", { ownerId: "worker" });
if (!claimed) {
throw new Error("Expected a claimed ingress event");
}
expect(
await queue.recoverStaleClaims({
staleMs: 5,
now: 20,
shouldRecover: async (claim) => {
expect(claim.id).toBe("event-1");
expect(await queue.refreshClaim?.(claim, { refreshedAt: 20 })).toBe(true);
return true;
},
}),
).toBe(0);
expect((await queue.listPending()).map((record) => record.id)).toEqual([]);
expect((await queue.listClaims())[0]?.claim).toMatchObject({
ownerId: "worker",
claimedAt: 20,
});
});
});
it("recovers stale claims and prunes completed or failed rows", async () => {
await withTempState(async (stateDir) => {
const queue = createChannelIngressQueue<{ text: string }>({

View File

@@ -142,6 +142,10 @@ export type ChannelIngressQueue<TPayload, TMetadata = unknown, TCompletedMetadat
id: string,
options?: { ownerId?: string },
): Promise<ChannelIngressQueueClaim<TPayload, TMetadata> | null>;
refreshClaim?(
claim: ChannelIngressQueueClaimRef,
options?: { refreshedAt?: number },
): Promise<boolean>;
complete(
idOrClaim: string | ChannelIngressQueueClaimRef,
options?: { metadata?: TCompletedMetadata; completedAt?: number },
@@ -440,26 +444,6 @@ export function createChannelIngressQueue<
return rows.map((row) => claimedRecord<TPayload, TMetadata>(row));
};
const recoverStaleClaims: ChannelIngressQueue<
TPayload,
TMetadata,
TCompletedMetadata
>["recoverStaleClaims"] = async (recoverOptions) => {
const staleMs = Math.max(0, Math.floor(recoverOptions?.staleMs ?? 0));
const cutoff = (recoverOptions?.now ?? now()) - staleMs;
const claims = (await listClaims()).filter((claim) => claim.claim.claimedAt <= cutoff);
let recovered = 0;
for (const claim of claims) {
if (recoverOptions?.shouldRecover && !(await recoverOptions.shouldRecover(claim))) {
continue;
}
if (await release(claim, { releasedAt: recoverOptions?.now ?? now() })) {
recovered += 1;
}
}
return recovered;
};
const claimNext: ChannelIngressQueue<
TPayload,
TMetadata,
@@ -561,6 +545,89 @@ export function createChannelIngressQueue<
);
};
const refreshClaim: NonNullable<
ChannelIngressQueue<TPayload, TMetadata, TCompletedMetadata>["refreshClaim"]
> = async (claimRef, refreshOptions) => {
const eventId = idFrom(claimRef);
const refreshedAt = refreshOptions?.refreshedAt ?? now();
const database = openStateDatabase(options.stateDir);
return runOpenClawStateWriteTransaction(
(tx) => {
const kysely = getChannelIngressKysely(tx.db);
const result = executeSqliteQuerySync(
tx.db,
kysely
.updateTable("channel_ingress_events")
.set({
claimed_at: refreshedAt,
updated_at: refreshedAt,
})
.where("queue_name", "=", queueName)
.where("event_id", "=", eventId)
.where("status", "=", "claimed")
.where("claim_token", "=", claimRef.claim.token),
);
return affectedRows(result) > 0;
},
{ path: database.path },
);
};
const releaseClaimIfStillStale = async (
claimRef: ChannelIngressQueueClaimRef,
releaseOptions: { cutoff: number; releasedAt: number },
): Promise<boolean> => {
const eventId = idFrom(claimRef);
const database = openStateDatabase(options.stateDir);
return runOpenClawStateWriteTransaction(
(tx) => {
const kysely = getChannelIngressKysely(tx.db);
const result = executeSqliteQuerySync(
tx.db,
kysely
.updateTable("channel_ingress_events")
.set((eb) => ({
status: "pending",
claim_token: null,
claim_owner: null,
claimed_at: null,
attempts: eb("attempts", "+", 1),
last_attempt_at: releaseOptions.releasedAt,
updated_at: releaseOptions.releasedAt,
}))
.where("queue_name", "=", queueName)
.where("event_id", "=", eventId)
.where("status", "=", "claimed")
.where("claim_token", "=", claimRef.claim.token)
.where("claimed_at", "<=", releaseOptions.cutoff),
);
return affectedRows(result) > 0;
},
{ path: database.path },
);
};
const recoverStaleClaims: ChannelIngressQueue<
TPayload,
TMetadata,
TCompletedMetadata
>["recoverStaleClaims"] = async (recoverOptions) => {
const current = recoverOptions?.now ?? now();
const staleMs = Math.max(0, Math.floor(recoverOptions?.staleMs ?? 0));
const cutoff = current - staleMs;
const staleClaims = (await listClaims()).filter((claimed) => claimed.claim.claimedAt <= cutoff);
let recovered = 0;
for (const staleClaim of staleClaims) {
if (recoverOptions?.shouldRecover && !(await recoverOptions.shouldRecover(staleClaim))) {
continue;
}
if (await releaseClaimIfStillStale(staleClaim, { cutoff, releasedAt: current })) {
recovered += 1;
}
}
return recovered;
};
const complete: ChannelIngressQueue<TPayload, TMetadata, TCompletedMetadata>["complete"] = async (
idOrClaim,
completeOptions,
@@ -845,6 +912,7 @@ export function createChannelIngressQueue<
listClaims,
claimNext,
claim,
refreshClaim,
complete,
release,
fail,