fix(telegram): recover restart spool claims

This commit is contained in:
VACInc
2026-05-15 17:22:00 -04:00
committed by Ayaan Zaidi
parent c5b3352326
commit 59d2f88e41
4 changed files with 636 additions and 26 deletions

View File

@@ -51,6 +51,7 @@ vi.mock("openclaw/plugin-sdk/runtime-env", () => ({
}));
let TelegramPollingSession: typeof import("./polling-session.js").TelegramPollingSession;
let claimTelegramSpooledUpdate: typeof import("./telegram-ingress-spool.js").claimTelegramSpooledUpdate;
let listTelegramSpooledUpdates: typeof import("./telegram-ingress-spool.js").listTelegramSpooledUpdates;
let writeTelegramSpooledUpdate: typeof import("./telegram-ingress-spool.js").writeTelegramSpooledUpdate;
@@ -310,10 +311,111 @@ async function waitForApiMiddleware(
throw new Error("Telegram API middleware was not installed");
}
type TestTelegramUpdate = {
update_id: number;
message: {
text: string;
chat: { id: number; type: "supergroup" };
message_thread_id?: number;
is_topic_message?: boolean;
};
};
function topicUpdate(updateId: number, threadId: number, text: string): TestTelegramUpdate {
return {
update_id: updateId,
message: {
text,
message_thread_id: threadId,
is_topic_message: true,
chat: { id: -100, type: "supergroup" },
},
};
}
async function writeSpooledTestUpdates(
spoolDir: string,
updates: readonly TestTelegramUpdate[],
): Promise<void> {
for (const update of updates) {
await writeTelegramSpooledUpdate({ spoolDir, update });
}
}
async function pendingUpdateIds(spoolDir: string, limit: number | "all" = 100): Promise<number[]> {
return (await listTelegramSpooledUpdates({ spoolDir, limit })).map((update) => update.updateId);
}
async function withTempSpool<T>(fn: (spoolDir: string) => Promise<T>): Promise<T> {
const spoolDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-spool-"));
try {
return await fn(spoolDir);
} finally {
await fs.rm(spoolDir, { recursive: true, force: true });
}
}
function createIdleIngressWorker() {
let stopWorker: (() => void) | undefined;
const workerDone = new Promise<void>((resolve) => {
stopWorker = resolve;
});
const createWorker = vi.fn(() => ({
onMessage: vi.fn(() => () => undefined),
stop: vi.fn(async () => {
stopWorker?.();
}),
task: vi.fn(async () => {
await workerDone;
}),
}));
return {
createWorker,
stop: () => stopWorker?.(),
};
}
function startIsolatedIngressSession(params: {
abort: AbortController;
spoolDir: string;
handleUpdate: (update: { update_id?: number }) => Promise<void>;
drainIntervalMs?: number;
log?: (message: string) => void;
stop?: () => Promise<void>;
}) {
const worker = createIdleIngressWorker();
const bot = {
api: {
deleteWebhook: vi.fn(async () => true),
config: { use: vi.fn() },
},
init: vi.fn(async () => undefined),
handleUpdate: vi.fn(params.handleUpdate),
stop: vi.fn(params.stop ?? (async () => undefined)),
};
createTelegramBotMock.mockReturnValueOnce(bot);
const session = createPollingSession({
abortSignal: params.abort.signal,
log: params.log,
isolatedIngress: {
enabled: true,
spoolDir: params.spoolDir,
createWorker: worker.createWorker,
drainIntervalMs: params.drainIntervalMs ?? 10,
},
});
return {
bot,
createWorker: worker.createWorker,
runPromise: session.runUntilAbort(),
stopWorker: worker.stop,
};
}
describe("TelegramPollingSession", () => {
beforeAll(async () => {
({ TelegramPollingSession } = await import("./polling-session.js"));
({ listTelegramSpooledUpdates, writeTelegramSpooledUpdate } =
({ claimTelegramSpooledUpdate, listTelegramSpooledUpdates, writeTelegramSpooledUpdate } =
await import("./telegram-ingress-spool.js"));
});
@@ -538,6 +640,128 @@ describe("TelegramPollingSession", () => {
await runPromise;
});
it("keeps failed lanes blocked for the rest of the drain pass", async () => {
await withTempSpool(async (tempDir) => {
const abort = new AbortController();
const log = vi.fn();
const events: string[] = [];
await writeSpooledTestUpdates(tempDir, [
topicUpdate(42, 10, "first topic 10 turn"),
topicUpdate(43, 11, "topic 11 turn"),
topicUpdate(44, 10, "second topic 10 turn"),
]);
const { runPromise, stopWorker } = startIsolatedIngressSession({
abort,
spoolDir: tempDir,
log,
drainIntervalMs: 500,
handleUpdate: async (update) => {
if (update.update_id === 42) {
events.push("topic10:first");
throw new Error("handler boom");
}
if (update.update_id === 43) {
events.push("topic11");
return;
}
if (update.update_id === 44) {
events.push("topic10:second");
}
},
});
await vi.waitFor(() => expect(events).toEqual(["topic10:first", "topic11"]));
expect(await pendingUpdateIds(tempDir, "all")).toEqual([42, 44]);
expectLogIncludes(log, "spooled update 42 failed; keeping for retry");
abort.abort();
stopWorker();
await runPromise;
});
});
it("recovers restart processing claims before draining later same-lane updates", async () => {
await withTempSpool(async (tempDir) => {
const abort = new AbortController();
const events: string[] = [];
await writeSpooledTestUpdates(tempDir, [
topicUpdate(42, 10, "interrupted topic 10 turn"),
topicUpdate(43, 10, "later topic 10 turn"),
topicUpdate(44, 11, "topic 11 turn"),
]);
const interrupted = (await listTelegramSpooledUpdates({ spoolDir: tempDir })).find(
(update) => update.updateId === 42,
);
if (!interrupted) {
throw new Error("Expected interrupted update");
}
await claimTelegramSpooledUpdate(interrupted);
const { runPromise, stopWorker } = startIsolatedIngressSession({
abort,
spoolDir: tempDir,
handleUpdate: async (update) => {
events.push(`handled:${update.update_id}`);
if (update.update_id === 44) {
abort.abort();
}
},
});
await runPromise;
expect(events).toEqual(["handled:42", "handled:44"]);
expect(await pendingUpdateIds(tempDir)).toEqual([43]);
expect((await fs.readdir(tempDir)).toSorted()).toEqual(["0000000000000043.json"]);
stopWorker();
});
});
it("scans past active-lane backlogs to start unrelated lanes", async () => {
await withTempSpool(async (tempDir) => {
const abort = new AbortController();
const events: string[] = [];
let releaseTopicTenTurn: (() => void) | undefined;
const topicTenTurnDone = new Promise<void>((resolve) => {
releaseTopicTenTurn = resolve;
});
await writeSpooledTestUpdates(tempDir, [topicUpdate(0, 10, "active topic 10 turn")]);
for (let updateId = 1; updateId <= 100; updateId += 1) {
await writeTelegramSpooledUpdate({
spoolDir: tempDir,
update: topicUpdate(updateId, 10, `blocked topic 10 turn ${updateId}`),
});
}
await writeTelegramSpooledUpdate({
spoolDir: tempDir,
update: topicUpdate(101, 11, "topic 11 turn"),
});
const { runPromise, stopWorker } = startIsolatedIngressSession({
abort,
spoolDir: tempDir,
handleUpdate: async (update) => {
if (update.update_id === 0) {
events.push("topic10:start");
await topicTenTurnDone;
events.push("topic10:end");
return;
}
if (update.update_id === 101) {
events.push("handled:101");
abort.abort();
}
},
});
await vi.waitFor(() => expect(events).toEqual(["topic10:start", "handled:101"]));
releaseTopicTenTurn?.();
await runPromise;
expect(events).toEqual(["topic10:start", "handled:101", "topic10:end"]);
releaseTopicTenTurn?.();
stopWorker();
});
});
it("lets isolated ingress drain interleave different Telegram topic lanes", async () => {
const abort = new AbortController();
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-spool-"));
@@ -618,7 +842,7 @@ describe("TelegramPollingSession", () => {
await vi.waitFor(() => expect(events).toEqual(["topic10:start", "topic11"]));
expect(
(await listTelegramSpooledUpdates({ spoolDir: tempDir })).map((update) => update.updateId),
).toEqual([42, 44]);
).toEqual([44]);
releaseTopicTenTurn?.();
await vi.waitFor(() =>
@@ -837,7 +1061,7 @@ describe("TelegramPollingSession", () => {
await vi.waitFor(() => expect(events).toEqual(["regular:start", "status", "stop"]));
expect(
(await listTelegramSpooledUpdates({ spoolDir: tempDir })).map((update) => update.updateId),
).toEqual([42]);
).toEqual([]);
releaseRegularTurn?.();
await vi.waitFor(async () =>

View File

@@ -19,9 +19,14 @@ import { TelegramPollingTransportState } from "./polling-transport-state.js";
import { TELEGRAM_GET_UPDATES_REQUEST_TIMEOUT_MS } from "./request-timeouts.js";
import { getTelegramSequentialKey } from "./sequential-key.js";
import {
claimTelegramSpooledUpdate,
deleteTelegramSpooledUpdate,
listTelegramSpooledUpdateClaims,
listTelegramSpooledUpdates,
recoverStaleTelegramSpooledUpdateClaims,
releaseTelegramSpooledUpdateClaim,
resolveTelegramIngressSpoolDir,
type ClaimedTelegramSpooledUpdate,
type TelegramSpooledUpdate,
} from "./telegram-ingress-spool.js";
import {
@@ -42,6 +47,7 @@ const MAX_POLL_STALL_THRESHOLD_MS = 600_000;
const POLL_WATCHDOG_INTERVAL_MS = 30_000;
const POLL_STOP_GRACE_MS = 15_000;
const ISOLATED_INGRESS_BACKLOG_STALL_MS = 25 * 60_000;
const TELEGRAM_SPOOLED_DRAIN_START_LIMIT = 100;
const TELEGRAM_POLLING_CLIENT_TIMEOUT_FLOOR_SECONDS = Math.ceil(
TELEGRAM_GET_UPDATES_REQUEST_TIMEOUT_MS / 1000,
);
@@ -131,6 +137,10 @@ function buildSpooledUpdateHandlerKey(params: { spoolDir: string; laneKey: strin
return `${params.spoolDir}\0${params.laneKey}`;
}
function isSpooledUpdateHandlerKeyForSpool(handlerKey: string, spoolDir: string): boolean {
return handlerKey.startsWith(`${spoolDir}\0`);
}
export class TelegramPollingSession {
#restartAttempts = 0;
#webhookCleared = false;
@@ -138,6 +148,7 @@ export class TelegramPollingSession {
#activeRunner: ReturnType<typeof run> | undefined;
#activeFetchAbort: AbortController | undefined;
#spooledUpdateHandlerKeys = new Set<string>();
#recoveredRestartSpooledClaims = false;
#transportState: TelegramPollingTransportState;
#status: ReturnType<typeof createTelegramPollingStatusPublisher>;
#stallThresholdMs: number;
@@ -322,24 +333,62 @@ export class TelegramPollingSession {
}
}
async #handleSpooledUpdate(params: {
async #claimSpooledUpdate(
update: TelegramSpooledUpdate,
): Promise<ClaimedTelegramSpooledUpdate | null> {
try {
return await claimTelegramSpooledUpdate(update);
} catch (err) {
this.opts.log(
`[telegram][diag] spooled update ${update.updateId} claim failed; keeping for retry: ${formatErrorMessage(err)}`,
);
return null;
}
}
async #handleClaimedSpooledUpdate(params: {
bot: TelegramBot;
update: TelegramSpooledUpdate;
update: ClaimedTelegramSpooledUpdate;
}): Promise<boolean> {
try {
await params.bot.handleUpdate(
params.update.update as Parameters<typeof params.bot.handleUpdate>[0],
);
} catch (err) {
await this.#releaseFailedSpooledUpdate({
err,
update: params.update,
});
return false;
}
try {
await deleteTelegramSpooledUpdate(params.update);
return true;
} catch (err) {
this.opts.log(
`[telegram][diag] spooled update ${params.update.updateId} failed; keeping for retry: ${formatErrorMessage(err)}`,
`[telegram][diag] spooled update ${params.update.updateId} completed but processing marker cleanup failed: ${formatErrorMessage(err)}`,
);
return false;
}
}
async #releaseFailedSpooledUpdate(params: {
err: unknown;
update: ClaimedTelegramSpooledUpdate;
}): Promise<void> {
try {
await releaseTelegramSpooledUpdateClaim(params.update);
} catch (releaseErr) {
this.opts.log(
`[telegram][diag] spooled update ${params.update.updateId} failed and could not be requeued: ${formatErrorMessage(releaseErr)}`,
);
return;
}
this.opts.log(
`[telegram][diag] spooled update ${params.update.updateId} failed; keeping for retry: ${formatErrorMessage(params.err)}`,
);
}
async #waitForSpooledUpdateHandlers(): Promise<void> {
await Promise.allSettled(
[...this.#spooledUpdateHandlerKeys]
@@ -348,18 +397,49 @@ export class TelegramPollingSession {
);
}
#spooledUpdateLaneKey(update: TelegramSpooledUpdate): string {
return getTelegramSequentialKey({
update: update.update as Parameters<typeof getTelegramSequentialKey>[0]["update"],
...(this.opts.botInfo ? { me: this.opts.botInfo } : {}),
});
}
#activeSpooledUpdateLaneKeysForSpool(spoolDir: string): Set<string> {
const laneKeys = new Set<string>();
for (const [handlerKey, handler] of activeSpooledUpdateHandlersByLane) {
if (isSpooledUpdateHandlerKeyForSpool(handlerKey, spoolDir)) {
laneKeys.add(handler.laneKey);
}
}
return laneKeys;
}
async #drainSpooledUpdates(params: {
bot: TelegramBot;
spoolDir: string;
}): Promise<SpooledUpdateDrainResult> {
const updates = await listTelegramSpooledUpdates({ spoolDir: params.spoolDir, limit: 100 });
const activeLaneKeys = this.#activeSpooledUpdateLaneKeysForSpool(params.spoolDir);
await recoverStaleTelegramSpooledUpdateClaims({
spoolDir: params.spoolDir,
staleMs: this.#recoveredRestartSpooledClaims ? undefined : 0,
shouldRecover: (claim) => !activeLaneKeys.has(this.#spooledUpdateLaneKey(claim)),
});
this.#recoveredRestartSpooledClaims = true;
const claimedLaneKeys = new Set(
(
await listTelegramSpooledUpdateClaims({
spoolDir: params.spoolDir,
})
).map((claim) => this.#spooledUpdateLaneKey(claim)),
);
const updates = await listTelegramSpooledUpdates({
spoolDir: params.spoolDir,
limit: "all",
});
const blockedByLane = new Set<string>();
let started = 0;
for (const update of updates) {
const laneKey = getTelegramSequentialKey({
update: update.update as Parameters<typeof getTelegramSequentialKey>[0]["update"],
...(this.opts.botInfo ? { me: this.opts.botInfo } : {}),
});
const laneKey = this.#spooledUpdateLaneKey(update);
if (this.opts.abortSignal?.aborted) {
break;
}
@@ -368,9 +448,17 @@ export class TelegramPollingSession {
blockedByLane.add(handlerKey);
continue;
}
const handler = this.#handleSpooledUpdate({
if (claimedLaneKeys.has(laneKey)) {
continue;
}
const claimedUpdate = await this.#claimSpooledUpdate(update);
if (!claimedUpdate) {
claimedLaneKeys.add(laneKey);
continue;
}
const handler = this.#handleClaimedSpooledUpdate({
bot: params.bot,
update,
update: claimedUpdate,
});
const state: SpooledUpdateHandlerState = {
handlerKey,
@@ -381,6 +469,7 @@ export class TelegramPollingSession {
};
activeSpooledUpdateHandlersByLane.set(handlerKey, state);
this.#spooledUpdateHandlerKeys.add(handlerKey);
claimedLaneKeys.add(laneKey);
void handler.finally(() => {
if (activeSpooledUpdateHandlersByLane.get(handlerKey) === state) {
activeSpooledUpdateHandlersByLane.delete(handlerKey);
@@ -388,6 +477,9 @@ export class TelegramPollingSession {
this.#spooledUpdateHandlerKeys.delete(handlerKey);
});
started += 1;
if (started >= TELEGRAM_SPOOLED_DRAIN_START_LIMIT) {
break;
}
}
return { blockedByLane, started };
}

View File

@@ -3,15 +3,28 @@ import os from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import {
claimTelegramSpooledUpdate,
deleteTelegramSpooledUpdate,
listTelegramSpooledUpdateClaims,
listTelegramSpooledUpdates,
recoverStaleTelegramSpooledUpdateClaims,
releaseTelegramSpooledUpdateClaim,
TELEGRAM_SPOOLED_UPDATE_PROCESSING_STALE_MS,
writeTelegramSpooledUpdate,
} from "./telegram-ingress-spool.js";
async function withTempSpool<T>(fn: (spoolDir: string) => Promise<T>): Promise<T> {
const spoolDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-spool-"));
try {
return await fn(spoolDir);
} finally {
await fs.rm(spoolDir, { recursive: true, force: true });
}
}
describe("Telegram ingress spool", () => {
it("persists updates durably in update_id order and deletes handled entries", async () => {
const spoolDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-spool-"));
try {
await withTempSpool(async (spoolDir) => {
await writeTelegramSpooledUpdate({
spoolDir,
update: { update_id: 11, message: { text: "second" } },
@@ -37,8 +50,105 @@ describe("Telegram ingress spool", () => {
expect(
(await listTelegramSpooledUpdates({ spoolDir })).map((update) => update.updateId),
).toEqual([11]);
} finally {
await fs.rm(spoolDir, { recursive: true, force: true });
}
});
});
it("claims active updates so they are hidden from pending drain lists", async () => {
await withTempSpool(async (spoolDir) => {
await writeTelegramSpooledUpdate({
spoolDir,
update: { update_id: 20, message: { text: "active" } },
});
const update = (await listTelegramSpooledUpdates({ spoolDir }))[0];
if (!update) {
throw new Error("Expected a spooled update");
}
const claimed = await claimTelegramSpooledUpdate(update);
expect(claimed?.updateId).toBe(20);
expect(claimed?.path.endsWith(".json.processing")).toBe(true);
expect(await listTelegramSpooledUpdates({ spoolDir })).toEqual([]);
expect(
(await listTelegramSpooledUpdateClaims({ spoolDir })).map((claim) => claim.updateId),
).toEqual([20]);
await writeTelegramSpooledUpdate({
spoolDir,
update: { update_id: 20, message: { text: "duplicate" } },
});
expect(await listTelegramSpooledUpdates({ spoolDir })).toEqual([]);
if (!claimed) {
throw new Error("Expected a claimed update");
}
await fs.writeFile(claimed.pendingPath, "duplicate pending race", { mode: 0o600 });
await deleteTelegramSpooledUpdate(claimed);
expect(await fs.readdir(spoolDir)).toEqual([]);
});
});
it("releases failed claims back to the pending spool", async () => {
await withTempSpool(async (spoolDir) => {
await writeTelegramSpooledUpdate({
spoolDir,
update: { update_id: 30, message: { text: "retry me" } },
});
const update = (await listTelegramSpooledUpdates({ spoolDir }))[0];
if (!update) {
throw new Error("Expected a spooled update");
}
const claimed = await claimTelegramSpooledUpdate(update);
if (!claimed) {
throw new Error("Expected a claimed update");
}
await releaseTelegramSpooledUpdateClaim(claimed);
const updates = await listTelegramSpooledUpdates({ spoolDir });
expect(updates.map((entry) => entry.updateId)).toEqual([30]);
expect(updates[0]?.path.endsWith(".json")).toBe(true);
});
});
it("recovers stale processing claims without replaying fresh claims", async () => {
await withTempSpool(async (spoolDir) => {
await writeTelegramSpooledUpdate({
spoolDir,
update: { update_id: 40, message: { text: "fresh" } },
});
await writeTelegramSpooledUpdate({
spoolDir,
update: { update_id: 41, message: { text: "stale" } },
});
const updates = await listTelegramSpooledUpdates({ spoolDir });
const fresh = updates.find((update) => update.updateId === 40);
const stale = updates.find((update) => update.updateId === 41);
if (!fresh || !stale) {
throw new Error("Expected spooled updates");
}
const claimedFresh = await claimTelegramSpooledUpdate(fresh);
const claimedStale = await claimTelegramSpooledUpdate(stale);
if (!claimedFresh || !claimedStale) {
throw new Error("Expected claimed updates");
}
const now = Date.now();
const oldClaimTime = new Date(now - TELEGRAM_SPOOLED_UPDATE_PROCESSING_STALE_MS - 1);
await fs.utimes(claimedStale.path, oldClaimTime, oldClaimTime);
const recovered = await recoverStaleTelegramSpooledUpdateClaims({
spoolDir,
now,
});
expect(recovered).toBe(1);
expect(
(await listTelegramSpooledUpdates({ spoolDir })).map((update) => update.updateId),
).toEqual([41]);
expect((await fs.readdir(spoolDir)).toSorted()).toEqual([
"0000000000000040.json.processing",
"0000000000000041.json",
]);
});
});
});

View File

@@ -6,6 +6,7 @@ import { readJsonFileWithFallback } from "openclaw/plugin-sdk/json-store";
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
const SPOOL_VERSION = 1;
export const TELEGRAM_SPOOLED_UPDATE_PROCESSING_STALE_MS = 6 * 60 * 60 * 1000;
type TelegramSpooledUpdatePayload = {
version: number;
@@ -21,6 +22,10 @@ export type TelegramSpooledUpdate = {
receivedAt: number;
};
export type ClaimedTelegramSpooledUpdate = TelegramSpooledUpdate & {
pendingPath: string;
};
function normalizeAccountId(accountId?: string) {
const trimmed = accountId?.trim();
if (!trimmed) {
@@ -53,6 +58,45 @@ function spoolFileName(updateId: number): string {
return `${String(updateId).padStart(16, "0")}.json`;
}
function processingFileName(updateId: number): string {
return `${spoolFileName(updateId)}.processing`;
}
function isProcessingFileName(fileName: string): boolean {
return fileName.endsWith(".json.processing");
}
function pendingFileNameFromProcessing(fileName: string): string {
return fileName.slice(0, -".processing".length);
}
function processingPath(spoolDir: string, updateId: number): string {
return path.join(spoolDir, processingFileName(updateId));
}
async function pathExists(filePath: string): Promise<boolean> {
try {
await fs.access(filePath);
return true;
} catch (err) {
if ((err as { code?: string }).code === "ENOENT") {
return false;
}
throw err;
}
}
async function unlinkIfPresent(filePath: string): Promise<void> {
try {
await fs.unlink(filePath);
} catch (err) {
if ((err as { code?: string }).code === "ENOENT") {
return;
}
throw err;
}
}
function parseSpooledUpdate(value: unknown, filePath: string): TelegramSpooledUpdate | null {
if (!value || typeof value !== "object") {
return null;
@@ -80,6 +124,10 @@ export async function writeTelegramSpooledUpdate(params: {
}
await fs.mkdir(params.spoolDir, { recursive: true });
const targetPath = path.join(params.spoolDir, spoolFileName(updateId));
const claimedPath = processingPath(params.spoolDir, updateId);
if (await pathExists(claimedPath)) {
return updateId;
}
const tempPath = path.join(params.spoolDir, `${spoolFileName(updateId)}.${randomUUID()}.tmp`);
const payload: TelegramSpooledUpdatePayload = {
version: SPOOL_VERSION,
@@ -88,13 +136,17 @@ export async function writeTelegramSpooledUpdate(params: {
update: params.update,
};
await fs.writeFile(tempPath, `${JSON.stringify(payload)}\n`, { mode: 0o600 });
if (await pathExists(claimedPath)) {
await unlinkIfPresent(tempPath);
return updateId;
}
await fs.rename(tempPath, targetPath);
return updateId;
}
export async function listTelegramSpooledUpdates(params: {
spoolDir: string;
limit?: number;
limit?: number | "all";
}): Promise<TelegramSpooledUpdate[]> {
let entries: string[];
try {
@@ -105,12 +157,11 @@ export async function listTelegramSpooledUpdates(params: {
}
throw err;
}
const files = entries
.filter((entry) => entry.endsWith(".json"))
.toSorted()
.slice(0, Math.max(1, params.limit ?? 100));
const files = entries.filter((entry) => entry.endsWith(".json")).toSorted();
const limitedFiles =
params.limit === "all" ? files : files.slice(0, Math.max(1, params.limit ?? 100));
const updates: TelegramSpooledUpdate[] = [];
for (const file of files) {
for (const file of limitedFiles) {
const filePath = path.join(params.spoolDir, file);
const { value } = await readJsonFileWithFallback<unknown>(filePath, null);
const parsed = parseSpooledUpdate(value, filePath);
@@ -122,12 +173,145 @@ export async function listTelegramSpooledUpdates(params: {
}
export async function deleteTelegramSpooledUpdate(update: TelegramSpooledUpdate): Promise<void> {
await unlinkIfPresent(update.path);
if ("pendingPath" in update && typeof update.pendingPath === "string") {
await unlinkIfPresent(update.pendingPath);
}
}
export async function claimTelegramSpooledUpdate(
update: TelegramSpooledUpdate,
): Promise<ClaimedTelegramSpooledUpdate | null> {
const claimedPath = processingPath(path.dirname(update.path), update.updateId);
try {
await fs.unlink(update.path);
// A hard link is an atomic non-overwriting claim in the same spool directory.
await fs.link(update.path, claimedPath);
} catch (err) {
if ((err as { code?: string }).code === "ENOENT") {
const code = (err as { code?: string }).code;
if (code === "ENOENT") {
return null;
}
if (code === "EEXIST") {
await unlinkIfPresent(update.path);
return null;
}
throw err;
}
try {
const claimedAt = new Date();
await fs.utimes(claimedPath, claimedAt, claimedAt);
await unlinkIfPresent(update.path);
} catch (err) {
await unlinkIfPresent(claimedPath);
throw err;
}
return {
...update,
path: claimedPath,
pendingPath: update.path,
};
}
export async function releaseTelegramSpooledUpdateClaim(
update: ClaimedTelegramSpooledUpdate,
): Promise<void> {
try {
await fs.rename(update.path, update.pendingPath);
} catch (err) {
const code = (err as { code?: string }).code;
if (code === "ENOENT") {
return;
}
if (code === "EEXIST") {
await unlinkIfPresent(update.path);
return;
}
throw err;
}
}
export async function listTelegramSpooledUpdateClaims(params: {
spoolDir: string;
}): Promise<ClaimedTelegramSpooledUpdate[]> {
let entries: string[];
try {
entries = await fs.readdir(params.spoolDir);
} catch (err) {
if ((err as { code?: string }).code === "ENOENT") {
return [];
}
throw err;
}
const claims: ClaimedTelegramSpooledUpdate[] = [];
for (const file of entries.filter(isProcessingFileName).toSorted()) {
const filePath = path.join(params.spoolDir, file);
const { value } = await readJsonFileWithFallback<unknown>(filePath, null);
const parsed = parseSpooledUpdate(value, filePath);
if (parsed) {
claims.push({
...parsed,
pendingPath: path.join(params.spoolDir, pendingFileNameFromProcessing(file)),
});
}
}
return claims;
}
export async function recoverStaleTelegramSpooledUpdateClaims(params: {
spoolDir: string;
staleMs?: number;
now?: number;
shouldRecover?: (claim: ClaimedTelegramSpooledUpdate) => boolean | Promise<boolean>;
}): Promise<number> {
let entries: string[];
try {
entries = await fs.readdir(params.spoolDir);
} catch (err) {
if ((err as { code?: string }).code === "ENOENT") {
return 0;
}
throw err;
}
const staleMs = Math.max(
0,
Math.floor(params.staleMs ?? TELEGRAM_SPOOLED_UPDATE_PROCESSING_STALE_MS),
);
const now = params.now ?? Date.now();
let recovered = 0;
for (const entry of entries.filter(isProcessingFileName).toSorted()) {
const claimedPath = path.join(params.spoolDir, entry);
let stat;
try {
stat = await fs.stat(claimedPath);
} catch (err) {
if ((err as { code?: string }).code === "ENOENT") {
continue;
}
throw err;
}
if (now - stat.mtimeMs < staleMs) {
continue;
}
const pendingPath = path.join(params.spoolDir, pendingFileNameFromProcessing(entry));
if (params.shouldRecover) {
const { value } = await readJsonFileWithFallback<unknown>(claimedPath, null);
const parsed = parseSpooledUpdate(value, claimedPath);
if (
parsed &&
!(await params.shouldRecover({
...parsed,
pendingPath,
}))
) {
continue;
}
}
if (await pathExists(pendingPath)) {
await unlinkIfPresent(claimedPath);
} else {
await fs.rename(claimedPath, pendingPath);
}
recovered += 1;
}
return recovered;
}