diff --git a/extensions/telegram/src/polling-session.test.ts b/extensions/telegram/src/polling-session.test.ts index 4d71e82a66f..895a4023d9e 100644 --- a/extensions/telegram/src/polling-session.test.ts +++ b/extensions/telegram/src/polling-session.test.ts @@ -51,6 +51,7 @@ vi.mock("openclaw/plugin-sdk/runtime-env", () => ({ })); let TelegramPollingSession: typeof import("./polling-session.js").TelegramPollingSession; +let claimTelegramSpooledUpdate: typeof import("./telegram-ingress-spool.js").claimTelegramSpooledUpdate; let listTelegramSpooledUpdates: typeof import("./telegram-ingress-spool.js").listTelegramSpooledUpdates; let writeTelegramSpooledUpdate: typeof import("./telegram-ingress-spool.js").writeTelegramSpooledUpdate; @@ -310,10 +311,111 @@ async function waitForApiMiddleware( throw new Error("Telegram API middleware was not installed"); } +type TestTelegramUpdate = { + update_id: number; + message: { + text: string; + chat: { id: number; type: "supergroup" }; + message_thread_id?: number; + is_topic_message?: boolean; + }; +}; + +function topicUpdate(updateId: number, threadId: number, text: string): TestTelegramUpdate { + return { + update_id: updateId, + message: { + text, + message_thread_id: threadId, + is_topic_message: true, + chat: { id: -100, type: "supergroup" }, + }, + }; +} + +async function writeSpooledTestUpdates( + spoolDir: string, + updates: readonly TestTelegramUpdate[], +): Promise { + for (const update of updates) { + await writeTelegramSpooledUpdate({ spoolDir, update }); + } +} + +async function pendingUpdateIds(spoolDir: string, limit: number | "all" = 100): Promise { + return (await listTelegramSpooledUpdates({ spoolDir, limit })).map((update) => update.updateId); +} + +async function withTempSpool(fn: (spoolDir: string) => Promise): Promise { + const spoolDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-spool-")); + try { + return await fn(spoolDir); + } finally { + await fs.rm(spoolDir, { recursive: true, force: true }); + } +} + +function createIdleIngressWorker() { + let stopWorker: (() => void) | undefined; + const workerDone = new Promise((resolve) => { + stopWorker = resolve; + }); + const createWorker = vi.fn(() => ({ + onMessage: vi.fn(() => () => undefined), + stop: vi.fn(async () => { + stopWorker?.(); + }), + task: vi.fn(async () => { + await workerDone; + }), + })); + return { + createWorker, + stop: () => stopWorker?.(), + }; +} + +function startIsolatedIngressSession(params: { + abort: AbortController; + spoolDir: string; + handleUpdate: (update: { update_id?: number }) => Promise; + drainIntervalMs?: number; + log?: (message: string) => void; + stop?: () => Promise; +}) { + const worker = createIdleIngressWorker(); + const bot = { + api: { + deleteWebhook: vi.fn(async () => true), + config: { use: vi.fn() }, + }, + init: vi.fn(async () => undefined), + handleUpdate: vi.fn(params.handleUpdate), + stop: vi.fn(params.stop ?? (async () => undefined)), + }; + createTelegramBotMock.mockReturnValueOnce(bot); + const session = createPollingSession({ + abortSignal: params.abort.signal, + log: params.log, + isolatedIngress: { + enabled: true, + spoolDir: params.spoolDir, + createWorker: worker.createWorker, + drainIntervalMs: params.drainIntervalMs ?? 10, + }, + }); + return { + bot, + createWorker: worker.createWorker, + runPromise: session.runUntilAbort(), + stopWorker: worker.stop, + }; +} + describe("TelegramPollingSession", () => { beforeAll(async () => { ({ TelegramPollingSession } = await import("./polling-session.js")); - ({ listTelegramSpooledUpdates, writeTelegramSpooledUpdate } = + ({ claimTelegramSpooledUpdate, listTelegramSpooledUpdates, writeTelegramSpooledUpdate } = await import("./telegram-ingress-spool.js")); }); @@ -538,6 +640,128 @@ describe("TelegramPollingSession", () => { await runPromise; }); + it("keeps failed lanes blocked for the rest of the drain pass", async () => { + await withTempSpool(async (tempDir) => { + const abort = new AbortController(); + const log = vi.fn(); + const events: string[] = []; + await writeSpooledTestUpdates(tempDir, [ + topicUpdate(42, 10, "first topic 10 turn"), + topicUpdate(43, 11, "topic 11 turn"), + topicUpdate(44, 10, "second topic 10 turn"), + ]); + + const { runPromise, stopWorker } = startIsolatedIngressSession({ + abort, + spoolDir: tempDir, + log, + drainIntervalMs: 500, + handleUpdate: async (update) => { + if (update.update_id === 42) { + events.push("topic10:first"); + throw new Error("handler boom"); + } + if (update.update_id === 43) { + events.push("topic11"); + return; + } + if (update.update_id === 44) { + events.push("topic10:second"); + } + }, + }); + + await vi.waitFor(() => expect(events).toEqual(["topic10:first", "topic11"])); + expect(await pendingUpdateIds(tempDir, "all")).toEqual([42, 44]); + expectLogIncludes(log, "spooled update 42 failed; keeping for retry"); + abort.abort(); + stopWorker(); + await runPromise; + }); + }); + + it("recovers restart processing claims before draining later same-lane updates", async () => { + await withTempSpool(async (tempDir) => { + const abort = new AbortController(); + const events: string[] = []; + await writeSpooledTestUpdates(tempDir, [ + topicUpdate(42, 10, "interrupted topic 10 turn"), + topicUpdate(43, 10, "later topic 10 turn"), + topicUpdate(44, 11, "topic 11 turn"), + ]); + const interrupted = (await listTelegramSpooledUpdates({ spoolDir: tempDir })).find( + (update) => update.updateId === 42, + ); + if (!interrupted) { + throw new Error("Expected interrupted update"); + } + await claimTelegramSpooledUpdate(interrupted); + + const { runPromise, stopWorker } = startIsolatedIngressSession({ + abort, + spoolDir: tempDir, + handleUpdate: async (update) => { + events.push(`handled:${update.update_id}`); + if (update.update_id === 44) { + abort.abort(); + } + }, + }); + + await runPromise; + expect(events).toEqual(["handled:42", "handled:44"]); + expect(await pendingUpdateIds(tempDir)).toEqual([43]); + expect((await fs.readdir(tempDir)).toSorted()).toEqual(["0000000000000043.json"]); + stopWorker(); + }); + }); + + it("scans past active-lane backlogs to start unrelated lanes", async () => { + await withTempSpool(async (tempDir) => { + const abort = new AbortController(); + const events: string[] = []; + let releaseTopicTenTurn: (() => void) | undefined; + const topicTenTurnDone = new Promise((resolve) => { + releaseTopicTenTurn = resolve; + }); + await writeSpooledTestUpdates(tempDir, [topicUpdate(0, 10, "active topic 10 turn")]); + for (let updateId = 1; updateId <= 100; updateId += 1) { + await writeTelegramSpooledUpdate({ + spoolDir: tempDir, + update: topicUpdate(updateId, 10, `blocked topic 10 turn ${updateId}`), + }); + } + await writeTelegramSpooledUpdate({ + spoolDir: tempDir, + update: topicUpdate(101, 11, "topic 11 turn"), + }); + + const { runPromise, stopWorker } = startIsolatedIngressSession({ + abort, + spoolDir: tempDir, + handleUpdate: async (update) => { + if (update.update_id === 0) { + events.push("topic10:start"); + await topicTenTurnDone; + events.push("topic10:end"); + return; + } + if (update.update_id === 101) { + events.push("handled:101"); + abort.abort(); + } + }, + }); + + await vi.waitFor(() => expect(events).toEqual(["topic10:start", "handled:101"])); + releaseTopicTenTurn?.(); + await runPromise; + expect(events).toEqual(["topic10:start", "handled:101", "topic10:end"]); + releaseTopicTenTurn?.(); + stopWorker(); + }); + }); + it("lets isolated ingress drain interleave different Telegram topic lanes", async () => { const abort = new AbortController(); const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-spool-")); @@ -618,7 +842,7 @@ describe("TelegramPollingSession", () => { await vi.waitFor(() => expect(events).toEqual(["topic10:start", "topic11"])); expect( (await listTelegramSpooledUpdates({ spoolDir: tempDir })).map((update) => update.updateId), - ).toEqual([42, 44]); + ).toEqual([44]); releaseTopicTenTurn?.(); await vi.waitFor(() => @@ -837,7 +1061,7 @@ describe("TelegramPollingSession", () => { await vi.waitFor(() => expect(events).toEqual(["regular:start", "status", "stop"])); expect( (await listTelegramSpooledUpdates({ spoolDir: tempDir })).map((update) => update.updateId), - ).toEqual([42]); + ).toEqual([]); releaseRegularTurn?.(); await vi.waitFor(async () => diff --git a/extensions/telegram/src/polling-session.ts b/extensions/telegram/src/polling-session.ts index 953f8a3aff5..5859da92961 100644 --- a/extensions/telegram/src/polling-session.ts +++ b/extensions/telegram/src/polling-session.ts @@ -19,9 +19,14 @@ import { TelegramPollingTransportState } from "./polling-transport-state.js"; import { TELEGRAM_GET_UPDATES_REQUEST_TIMEOUT_MS } from "./request-timeouts.js"; import { getTelegramSequentialKey } from "./sequential-key.js"; import { + claimTelegramSpooledUpdate, deleteTelegramSpooledUpdate, + listTelegramSpooledUpdateClaims, listTelegramSpooledUpdates, + recoverStaleTelegramSpooledUpdateClaims, + releaseTelegramSpooledUpdateClaim, resolveTelegramIngressSpoolDir, + type ClaimedTelegramSpooledUpdate, type TelegramSpooledUpdate, } from "./telegram-ingress-spool.js"; import { @@ -42,6 +47,7 @@ const MAX_POLL_STALL_THRESHOLD_MS = 600_000; const POLL_WATCHDOG_INTERVAL_MS = 30_000; const POLL_STOP_GRACE_MS = 15_000; const ISOLATED_INGRESS_BACKLOG_STALL_MS = 25 * 60_000; +const TELEGRAM_SPOOLED_DRAIN_START_LIMIT = 100; const TELEGRAM_POLLING_CLIENT_TIMEOUT_FLOOR_SECONDS = Math.ceil( TELEGRAM_GET_UPDATES_REQUEST_TIMEOUT_MS / 1000, ); @@ -131,6 +137,10 @@ function buildSpooledUpdateHandlerKey(params: { spoolDir: string; laneKey: strin return `${params.spoolDir}\0${params.laneKey}`; } +function isSpooledUpdateHandlerKeyForSpool(handlerKey: string, spoolDir: string): boolean { + return handlerKey.startsWith(`${spoolDir}\0`); +} + export class TelegramPollingSession { #restartAttempts = 0; #webhookCleared = false; @@ -138,6 +148,7 @@ export class TelegramPollingSession { #activeRunner: ReturnType | undefined; #activeFetchAbort: AbortController | undefined; #spooledUpdateHandlerKeys = new Set(); + #recoveredRestartSpooledClaims = false; #transportState: TelegramPollingTransportState; #status: ReturnType; #stallThresholdMs: number; @@ -322,24 +333,62 @@ export class TelegramPollingSession { } } - async #handleSpooledUpdate(params: { + async #claimSpooledUpdate( + update: TelegramSpooledUpdate, + ): Promise { + try { + return await claimTelegramSpooledUpdate(update); + } catch (err) { + this.opts.log( + `[telegram][diag] spooled update ${update.updateId} claim failed; keeping for retry: ${formatErrorMessage(err)}`, + ); + return null; + } + } + + async #handleClaimedSpooledUpdate(params: { bot: TelegramBot; - update: TelegramSpooledUpdate; + update: ClaimedTelegramSpooledUpdate; }): Promise { try { await params.bot.handleUpdate( params.update.update as Parameters[0], ); + } catch (err) { + await this.#releaseFailedSpooledUpdate({ + err, + update: params.update, + }); + return false; + } + try { await deleteTelegramSpooledUpdate(params.update); return true; } catch (err) { this.opts.log( - `[telegram][diag] spooled update ${params.update.updateId} failed; keeping for retry: ${formatErrorMessage(err)}`, + `[telegram][diag] spooled update ${params.update.updateId} completed but processing marker cleanup failed: ${formatErrorMessage(err)}`, ); return false; } } + async #releaseFailedSpooledUpdate(params: { + err: unknown; + update: ClaimedTelegramSpooledUpdate; + }): Promise { + try { + await releaseTelegramSpooledUpdateClaim(params.update); + } catch (releaseErr) { + this.opts.log( + `[telegram][diag] spooled update ${params.update.updateId} failed and could not be requeued: ${formatErrorMessage(releaseErr)}`, + ); + return; + } + this.opts.log( + `[telegram][diag] spooled update ${params.update.updateId} failed; keeping for retry: ${formatErrorMessage(params.err)}`, + ); + } + async #waitForSpooledUpdateHandlers(): Promise { await Promise.allSettled( [...this.#spooledUpdateHandlerKeys] @@ -348,18 +397,49 @@ export class TelegramPollingSession { ); } + #spooledUpdateLaneKey(update: TelegramSpooledUpdate): string { + return getTelegramSequentialKey({ + update: update.update as Parameters[0]["update"], + ...(this.opts.botInfo ? { me: this.opts.botInfo } : {}), + }); + } + + #activeSpooledUpdateLaneKeysForSpool(spoolDir: string): Set { + const laneKeys = new Set(); + for (const [handlerKey, handler] of activeSpooledUpdateHandlersByLane) { + if (isSpooledUpdateHandlerKeyForSpool(handlerKey, spoolDir)) { + laneKeys.add(handler.laneKey); + } + } + return laneKeys; + } + async #drainSpooledUpdates(params: { bot: TelegramBot; spoolDir: string; }): Promise { - const updates = await listTelegramSpooledUpdates({ spoolDir: params.spoolDir, limit: 100 }); + const activeLaneKeys = this.#activeSpooledUpdateLaneKeysForSpool(params.spoolDir); + await recoverStaleTelegramSpooledUpdateClaims({ + spoolDir: params.spoolDir, + staleMs: this.#recoveredRestartSpooledClaims ? undefined : 0, + shouldRecover: (claim) => !activeLaneKeys.has(this.#spooledUpdateLaneKey(claim)), + }); + this.#recoveredRestartSpooledClaims = true; + const claimedLaneKeys = new Set( + ( + await listTelegramSpooledUpdateClaims({ + spoolDir: params.spoolDir, + }) + ).map((claim) => this.#spooledUpdateLaneKey(claim)), + ); + const updates = await listTelegramSpooledUpdates({ + spoolDir: params.spoolDir, + limit: "all", + }); const blockedByLane = new Set(); let started = 0; for (const update of updates) { - const laneKey = getTelegramSequentialKey({ - update: update.update as Parameters[0]["update"], - ...(this.opts.botInfo ? { me: this.opts.botInfo } : {}), - }); + const laneKey = this.#spooledUpdateLaneKey(update); if (this.opts.abortSignal?.aborted) { break; } @@ -368,9 +448,17 @@ export class TelegramPollingSession { blockedByLane.add(handlerKey); continue; } - const handler = this.#handleSpooledUpdate({ + if (claimedLaneKeys.has(laneKey)) { + continue; + } + const claimedUpdate = await this.#claimSpooledUpdate(update); + if (!claimedUpdate) { + claimedLaneKeys.add(laneKey); + continue; + } + const handler = this.#handleClaimedSpooledUpdate({ bot: params.bot, - update, + update: claimedUpdate, }); const state: SpooledUpdateHandlerState = { handlerKey, @@ -381,6 +469,7 @@ export class TelegramPollingSession { }; activeSpooledUpdateHandlersByLane.set(handlerKey, state); this.#spooledUpdateHandlerKeys.add(handlerKey); + claimedLaneKeys.add(laneKey); void handler.finally(() => { if (activeSpooledUpdateHandlersByLane.get(handlerKey) === state) { activeSpooledUpdateHandlersByLane.delete(handlerKey); @@ -388,6 +477,9 @@ export class TelegramPollingSession { this.#spooledUpdateHandlerKeys.delete(handlerKey); }); started += 1; + if (started >= TELEGRAM_SPOOLED_DRAIN_START_LIMIT) { + break; + } } return { blockedByLane, started }; } diff --git a/extensions/telegram/src/telegram-ingress-spool.test.ts b/extensions/telegram/src/telegram-ingress-spool.test.ts index 78ea3069536..4c58da3fc39 100644 --- a/extensions/telegram/src/telegram-ingress-spool.test.ts +++ b/extensions/telegram/src/telegram-ingress-spool.test.ts @@ -3,15 +3,28 @@ import os from "node:os"; import path from "node:path"; import { describe, expect, it } from "vitest"; import { + claimTelegramSpooledUpdate, deleteTelegramSpooledUpdate, + listTelegramSpooledUpdateClaims, listTelegramSpooledUpdates, + recoverStaleTelegramSpooledUpdateClaims, + releaseTelegramSpooledUpdateClaim, + TELEGRAM_SPOOLED_UPDATE_PROCESSING_STALE_MS, writeTelegramSpooledUpdate, } from "./telegram-ingress-spool.js"; +async function withTempSpool(fn: (spoolDir: string) => Promise): Promise { + const spoolDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-spool-")); + try { + return await fn(spoolDir); + } finally { + await fs.rm(spoolDir, { recursive: true, force: true }); + } +} + describe("Telegram ingress spool", () => { it("persists updates durably in update_id order and deletes handled entries", async () => { - const spoolDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-spool-")); - try { + await withTempSpool(async (spoolDir) => { await writeTelegramSpooledUpdate({ spoolDir, update: { update_id: 11, message: { text: "second" } }, @@ -37,8 +50,105 @@ describe("Telegram ingress spool", () => { expect( (await listTelegramSpooledUpdates({ spoolDir })).map((update) => update.updateId), ).toEqual([11]); - } finally { - await fs.rm(spoolDir, { recursive: true, force: true }); - } + }); + }); + + it("claims active updates so they are hidden from pending drain lists", async () => { + await withTempSpool(async (spoolDir) => { + await writeTelegramSpooledUpdate({ + spoolDir, + update: { update_id: 20, message: { text: "active" } }, + }); + const update = (await listTelegramSpooledUpdates({ spoolDir }))[0]; + if (!update) { + throw new Error("Expected a spooled update"); + } + + const claimed = await claimTelegramSpooledUpdate(update); + + expect(claimed?.updateId).toBe(20); + expect(claimed?.path.endsWith(".json.processing")).toBe(true); + expect(await listTelegramSpooledUpdates({ spoolDir })).toEqual([]); + expect( + (await listTelegramSpooledUpdateClaims({ spoolDir })).map((claim) => claim.updateId), + ).toEqual([20]); + + await writeTelegramSpooledUpdate({ + spoolDir, + update: { update_id: 20, message: { text: "duplicate" } }, + }); + expect(await listTelegramSpooledUpdates({ spoolDir })).toEqual([]); + + if (!claimed) { + throw new Error("Expected a claimed update"); + } + await fs.writeFile(claimed.pendingPath, "duplicate pending race", { mode: 0o600 }); + await deleteTelegramSpooledUpdate(claimed); + expect(await fs.readdir(spoolDir)).toEqual([]); + }); + }); + + it("releases failed claims back to the pending spool", async () => { + await withTempSpool(async (spoolDir) => { + await writeTelegramSpooledUpdate({ + spoolDir, + update: { update_id: 30, message: { text: "retry me" } }, + }); + const update = (await listTelegramSpooledUpdates({ spoolDir }))[0]; + if (!update) { + throw new Error("Expected a spooled update"); + } + const claimed = await claimTelegramSpooledUpdate(update); + if (!claimed) { + throw new Error("Expected a claimed update"); + } + + await releaseTelegramSpooledUpdateClaim(claimed); + + const updates = await listTelegramSpooledUpdates({ spoolDir }); + expect(updates.map((entry) => entry.updateId)).toEqual([30]); + expect(updates[0]?.path.endsWith(".json")).toBe(true); + }); + }); + + it("recovers stale processing claims without replaying fresh claims", async () => { + await withTempSpool(async (spoolDir) => { + await writeTelegramSpooledUpdate({ + spoolDir, + update: { update_id: 40, message: { text: "fresh" } }, + }); + await writeTelegramSpooledUpdate({ + spoolDir, + update: { update_id: 41, message: { text: "stale" } }, + }); + const updates = await listTelegramSpooledUpdates({ spoolDir }); + const fresh = updates.find((update) => update.updateId === 40); + const stale = updates.find((update) => update.updateId === 41); + if (!fresh || !stale) { + throw new Error("Expected spooled updates"); + } + const claimedFresh = await claimTelegramSpooledUpdate(fresh); + const claimedStale = await claimTelegramSpooledUpdate(stale); + if (!claimedFresh || !claimedStale) { + throw new Error("Expected claimed updates"); + } + const now = Date.now(); + const oldClaimTime = new Date(now - TELEGRAM_SPOOLED_UPDATE_PROCESSING_STALE_MS - 1); + await fs.utimes(claimedStale.path, oldClaimTime, oldClaimTime); + + const recovered = await recoverStaleTelegramSpooledUpdateClaims({ + spoolDir, + now, + }); + + expect(recovered).toBe(1); + expect( + (await listTelegramSpooledUpdates({ spoolDir })).map((update) => update.updateId), + ).toEqual([41]); + expect((await fs.readdir(spoolDir)).toSorted()).toEqual([ + "0000000000000040.json.processing", + "0000000000000041.json", + ]); + }); }); }); diff --git a/extensions/telegram/src/telegram-ingress-spool.ts b/extensions/telegram/src/telegram-ingress-spool.ts index 5fb623d08bd..0a0d72ff809 100644 --- a/extensions/telegram/src/telegram-ingress-spool.ts +++ b/extensions/telegram/src/telegram-ingress-spool.ts @@ -6,6 +6,7 @@ import { readJsonFileWithFallback } from "openclaw/plugin-sdk/json-store"; import { resolveStateDir } from "openclaw/plugin-sdk/state-paths"; const SPOOL_VERSION = 1; +export const TELEGRAM_SPOOLED_UPDATE_PROCESSING_STALE_MS = 6 * 60 * 60 * 1000; type TelegramSpooledUpdatePayload = { version: number; @@ -21,6 +22,10 @@ export type TelegramSpooledUpdate = { receivedAt: number; }; +export type ClaimedTelegramSpooledUpdate = TelegramSpooledUpdate & { + pendingPath: string; +}; + function normalizeAccountId(accountId?: string) { const trimmed = accountId?.trim(); if (!trimmed) { @@ -53,6 +58,45 @@ function spoolFileName(updateId: number): string { return `${String(updateId).padStart(16, "0")}.json`; } +function processingFileName(updateId: number): string { + return `${spoolFileName(updateId)}.processing`; +} + +function isProcessingFileName(fileName: string): boolean { + return fileName.endsWith(".json.processing"); +} + +function pendingFileNameFromProcessing(fileName: string): string { + return fileName.slice(0, -".processing".length); +} + +function processingPath(spoolDir: string, updateId: number): string { + return path.join(spoolDir, processingFileName(updateId)); +} + +async function pathExists(filePath: string): Promise { + try { + await fs.access(filePath); + return true; + } catch (err) { + if ((err as { code?: string }).code === "ENOENT") { + return false; + } + throw err; + } +} + +async function unlinkIfPresent(filePath: string): Promise { + try { + await fs.unlink(filePath); + } catch (err) { + if ((err as { code?: string }).code === "ENOENT") { + return; + } + throw err; + } +} + function parseSpooledUpdate(value: unknown, filePath: string): TelegramSpooledUpdate | null { if (!value || typeof value !== "object") { return null; @@ -80,6 +124,10 @@ export async function writeTelegramSpooledUpdate(params: { } await fs.mkdir(params.spoolDir, { recursive: true }); const targetPath = path.join(params.spoolDir, spoolFileName(updateId)); + const claimedPath = processingPath(params.spoolDir, updateId); + if (await pathExists(claimedPath)) { + return updateId; + } const tempPath = path.join(params.spoolDir, `${spoolFileName(updateId)}.${randomUUID()}.tmp`); const payload: TelegramSpooledUpdatePayload = { version: SPOOL_VERSION, @@ -88,13 +136,17 @@ export async function writeTelegramSpooledUpdate(params: { update: params.update, }; await fs.writeFile(tempPath, `${JSON.stringify(payload)}\n`, { mode: 0o600 }); + if (await pathExists(claimedPath)) { + await unlinkIfPresent(tempPath); + return updateId; + } await fs.rename(tempPath, targetPath); return updateId; } export async function listTelegramSpooledUpdates(params: { spoolDir: string; - limit?: number; + limit?: number | "all"; }): Promise { let entries: string[]; try { @@ -105,12 +157,11 @@ export async function listTelegramSpooledUpdates(params: { } throw err; } - const files = entries - .filter((entry) => entry.endsWith(".json")) - .toSorted() - .slice(0, Math.max(1, params.limit ?? 100)); + const files = entries.filter((entry) => entry.endsWith(".json")).toSorted(); + const limitedFiles = + params.limit === "all" ? files : files.slice(0, Math.max(1, params.limit ?? 100)); const updates: TelegramSpooledUpdate[] = []; - for (const file of files) { + for (const file of limitedFiles) { const filePath = path.join(params.spoolDir, file); const { value } = await readJsonFileWithFallback(filePath, null); const parsed = parseSpooledUpdate(value, filePath); @@ -122,12 +173,145 @@ export async function listTelegramSpooledUpdates(params: { } export async function deleteTelegramSpooledUpdate(update: TelegramSpooledUpdate): Promise { + await unlinkIfPresent(update.path); + if ("pendingPath" in update && typeof update.pendingPath === "string") { + await unlinkIfPresent(update.pendingPath); + } +} + +export async function claimTelegramSpooledUpdate( + update: TelegramSpooledUpdate, +): Promise { + const claimedPath = processingPath(path.dirname(update.path), update.updateId); try { - await fs.unlink(update.path); + // A hard link is an atomic non-overwriting claim in the same spool directory. + await fs.link(update.path, claimedPath); } catch (err) { - if ((err as { code?: string }).code === "ENOENT") { + const code = (err as { code?: string }).code; + if (code === "ENOENT") { + return null; + } + if (code === "EEXIST") { + await unlinkIfPresent(update.path); + return null; + } + throw err; + } + try { + const claimedAt = new Date(); + await fs.utimes(claimedPath, claimedAt, claimedAt); + await unlinkIfPresent(update.path); + } catch (err) { + await unlinkIfPresent(claimedPath); + throw err; + } + return { + ...update, + path: claimedPath, + pendingPath: update.path, + }; +} + +export async function releaseTelegramSpooledUpdateClaim( + update: ClaimedTelegramSpooledUpdate, +): Promise { + try { + await fs.rename(update.path, update.pendingPath); + } catch (err) { + const code = (err as { code?: string }).code; + if (code === "ENOENT") { + return; + } + if (code === "EEXIST") { + await unlinkIfPresent(update.path); return; } throw err; } } + +export async function listTelegramSpooledUpdateClaims(params: { + spoolDir: string; +}): Promise { + let entries: string[]; + try { + entries = await fs.readdir(params.spoolDir); + } catch (err) { + if ((err as { code?: string }).code === "ENOENT") { + return []; + } + throw err; + } + const claims: ClaimedTelegramSpooledUpdate[] = []; + for (const file of entries.filter(isProcessingFileName).toSorted()) { + const filePath = path.join(params.spoolDir, file); + const { value } = await readJsonFileWithFallback(filePath, null); + const parsed = parseSpooledUpdate(value, filePath); + if (parsed) { + claims.push({ + ...parsed, + pendingPath: path.join(params.spoolDir, pendingFileNameFromProcessing(file)), + }); + } + } + return claims; +} + +export async function recoverStaleTelegramSpooledUpdateClaims(params: { + spoolDir: string; + staleMs?: number; + now?: number; + shouldRecover?: (claim: ClaimedTelegramSpooledUpdate) => boolean | Promise; +}): Promise { + let entries: string[]; + try { + entries = await fs.readdir(params.spoolDir); + } catch (err) { + if ((err as { code?: string }).code === "ENOENT") { + return 0; + } + throw err; + } + const staleMs = Math.max( + 0, + Math.floor(params.staleMs ?? TELEGRAM_SPOOLED_UPDATE_PROCESSING_STALE_MS), + ); + const now = params.now ?? Date.now(); + let recovered = 0; + for (const entry of entries.filter(isProcessingFileName).toSorted()) { + const claimedPath = path.join(params.spoolDir, entry); + let stat; + try { + stat = await fs.stat(claimedPath); + } catch (err) { + if ((err as { code?: string }).code === "ENOENT") { + continue; + } + throw err; + } + if (now - stat.mtimeMs < staleMs) { + continue; + } + const pendingPath = path.join(params.spoolDir, pendingFileNameFromProcessing(entry)); + if (params.shouldRecover) { + const { value } = await readJsonFileWithFallback(claimedPath, null); + const parsed = parseSpooledUpdate(value, claimedPath); + if ( + parsed && + !(await params.shouldRecover({ + ...parsed, + pendingPath, + })) + ) { + continue; + } + } + if (await pathExists(pendingPath)) { + await unlinkIfPresent(claimedPath); + } else { + await fs.rename(claimedPath, pendingPath); + } + recovered += 1; + } + return recovered; +}