refactor(telegram): simplify spooled lane tracking

This commit is contained in:
Ayaan Zaidi
2026-05-14 21:11:05 +05:30
parent 3f132370f4
commit 52c9860bde

View File

@@ -110,9 +110,7 @@ export class TelegramPollingSession {
#forceRestarted = false;
#activeRunner: ReturnType<typeof run> | undefined;
#activeFetchAbort: AbortController | undefined;
#spooledUpdatesInFlight = new Set<number>();
#spooledUpdateLanesInFlight = new Set<string>();
#spooledUpdateHandlerPromises = new Set<Promise<boolean>>();
#spooledUpdateHandlersByLane = new Map<string, Promise<boolean>>();
#transportState: TelegramPollingTransportState;
#status: ReturnType<typeof createTelegramPollingStatusPublisher>;
#stallThresholdMs: number;
@@ -266,7 +264,6 @@ export class TelegramPollingSession {
async #handleSpooledUpdate(params: {
bot: TelegramBot;
update: TelegramSpooledUpdate;
laneKey: string;
}): Promise<boolean> {
try {
await params.bot.handleUpdate(
@@ -279,18 +276,11 @@ export class TelegramPollingSession {
`[telegram][diag] spooled update ${params.update.updateId} failed; keeping for retry: ${formatErrorMessage(err)}`,
);
return false;
} finally {
this.#spooledUpdatesInFlight.delete(params.update.updateId);
this.#spooledUpdateLanesInFlight.delete(params.laneKey);
}
}
async #waitForSpooledUpdateHandlers(): Promise<void> {
const pending = [...this.#spooledUpdateHandlerPromises];
if (pending.length === 0) {
return;
}
await Promise.allSettled(pending);
await Promise.allSettled(this.#spooledUpdateHandlersByLane.values());
}
async #drainSpooledUpdates(params: { bot: TelegramBot; spoolDir: string }): Promise<number> {
@@ -304,22 +294,16 @@ export class TelegramPollingSession {
if (this.opts.abortSignal?.aborted) {
break;
}
if (this.#spooledUpdatesInFlight.has(update.updateId)) {
if (this.#spooledUpdateHandlersByLane.has(laneKey)) {
continue;
}
if (this.#spooledUpdateLanesInFlight.has(laneKey)) {
continue;
}
this.#spooledUpdatesInFlight.add(update.updateId);
this.#spooledUpdateLanesInFlight.add(laneKey);
const handler = this.#handleSpooledUpdate({
bot: params.bot,
update,
laneKey,
});
this.#spooledUpdateHandlerPromises.add(handler);
this.#spooledUpdateHandlersByLane.set(laneKey, handler);
void handler.finally(() => {
this.#spooledUpdateHandlerPromises.delete(handler);
this.#spooledUpdateHandlersByLane.delete(laneKey);
});
started += 1;
}