From 5330cbb25d3db060cc3e374cc8789b9c762bdf3a Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 3 May 2026 17:17:26 +0100 Subject: [PATCH] refactor(telegram): clarify reply supersession fence --- .../telegram/src/bot-message-dispatch.test.ts | 12 +-- .../telegram/src/bot-message-dispatch.ts | 77 ++++++++++--------- 2 files changed, 48 insertions(+), 41 deletions(-) diff --git a/extensions/telegram/src/bot-message-dispatch.test.ts b/extensions/telegram/src/bot-message-dispatch.test.ts index 2d0b0341661..30ff8dcc104 100644 --- a/extensions/telegram/src/bot-message-dispatch.test.ts +++ b/extensions/telegram/src/bot-message-dispatch.test.ts @@ -129,8 +129,8 @@ vi.mock("./sticker-cache.js", () => ({ })); let dispatchTelegramMessage: typeof import("./bot-message-dispatch.js").dispatchTelegramMessage; -let getTelegramAbortFenceSizeForTests: typeof import("./bot-message-dispatch.js").getTelegramAbortFenceSizeForTests; -let resetTelegramAbortFenceForTests: typeof import("./bot-message-dispatch.js").resetTelegramAbortFenceForTests; +let getTelegramReplyFenceSizeForTests: typeof import("./bot-message-dispatch.js").getTelegramReplyFenceSizeForTests; +let resetTelegramReplyFenceForTests: typeof import("./bot-message-dispatch.js").resetTelegramReplyFenceForTests; const telegramDepsForTest: TelegramBotDeps = { getRuntimeConfig: loadConfig as TelegramBotDeps["getRuntimeConfig"], @@ -163,13 +163,13 @@ describe("dispatchTelegramMessage draft streaming", () => { beforeAll(async () => { ({ dispatchTelegramMessage, - getTelegramAbortFenceSizeForTests, - resetTelegramAbortFenceForTests, + getTelegramReplyFenceSizeForTests, + resetTelegramReplyFenceForTests, } = await import("./bot-message-dispatch.js")); }); beforeEach(() => { - resetTelegramAbortFenceForTests(); + resetTelegramReplyFenceForTests(); createTelegramDraftStream.mockReset(); dispatchReplyWithBufferedBlockDispatcher.mockReset(); deliverReplies.mockReset(); @@ -3707,7 +3707,7 @@ describe("dispatchTelegramMessage draft streaming", () => { }), ).rejects.toThrow("sticker setup failed"); - expect(getTelegramAbortFenceSizeForTests()).toBe(0); + expect(getTelegramReplyFenceSizeForTests()).toBe(0); }); it("keeps older answer finalization when abort targets a different session", async () => { diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index cb8cb6cfd6c..ded094ba960 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -123,13 +123,13 @@ type DispatchTelegramMessageParams = { type TelegramReasoningLevel = "off" | "on" | "stream"; -type TelegramAbortFenceState = { +type TelegramReplyFenceState = { generation: number; activeDispatches: number; }; // Newer accepted turns and authorized aborts can arrive ahead of older same-session reply work. -const telegramAbortFenceByKey = new Map(); +const telegramReplyFenceByKey = new Map(); function normalizeTelegramFenceKey(value: unknown): string | undefined { if (typeof value !== "string") { @@ -139,7 +139,7 @@ function normalizeTelegramFenceKey(value: unknown): string | undefined { return trimmed.length > 0 ? trimmed : undefined; } -function resolveTelegramAbortFenceKey(params: { +function resolveTelegramReplyFenceKey(params: { ctxPayload: { SessionKey?: string; CommandTargetSessionKey?: string }; chatId: number | string; threadSpec: { id?: number | string | null; scope?: string }; @@ -151,9 +151,9 @@ function resolveTelegramAbortFenceKey(params: { ); } -function beginTelegramAbortFence(params: { key: string; supersede: boolean }): number { - const existing = telegramAbortFenceByKey.get(params.key); - const state: TelegramAbortFenceState = existing ?? { +function beginTelegramReplyFence(params: { key: string; supersede: boolean }): number { + const existing = telegramReplyFenceByKey.get(params.key); + const state: TelegramReplyFenceState = existing ?? { generation: 0, activeDispatches: 0, }; @@ -161,31 +161,41 @@ function beginTelegramAbortFence(params: { key: string; supersede: boolean }): n state.generation += 1; } state.activeDispatches += 1; - telegramAbortFenceByKey.set(params.key, state); + telegramReplyFenceByKey.set(params.key, state); return state.generation; } -function isTelegramAbortFenceSuperseded(params: { key: string; generation: number }): boolean { - return (telegramAbortFenceByKey.get(params.key)?.generation ?? 0) !== params.generation; +function isTelegramReplyFenceSuperseded(params: { key: string; generation: number }): boolean { + return (telegramReplyFenceByKey.get(params.key)?.generation ?? 0) !== params.generation; } -function endTelegramAbortFence(key: string): void { - const state = telegramAbortFenceByKey.get(key); +function endTelegramReplyFence(key: string): void { + const state = telegramReplyFenceByKey.get(key); if (!state) { return; } state.activeDispatches -= 1; if (state.activeDispatches <= 0) { - telegramAbortFenceByKey.delete(key); + telegramReplyFenceByKey.delete(key); } } -export function getTelegramAbortFenceSizeForTests(): number { - return telegramAbortFenceByKey.size; +function shouldSupersedeTelegramReplyFence(ctxPayload: { + Body?: string; + RawBody?: string; + CommandBody?: string; + CommandAuthorized: boolean; +}): boolean { + const dispatchText = ctxPayload.CommandBody ?? ctxPayload.RawBody ?? ctxPayload.Body ?? ""; + return !isAbortRequestText(dispatchText) || ctxPayload.CommandAuthorized; } -export function resetTelegramAbortFenceForTests(): void { - telegramAbortFenceByKey.clear(); +export function getTelegramReplyFenceSizeForTests(): number { + return telegramReplyFenceByKey.size; +} + +export function resetTelegramReplyFenceForTests(): void { + telegramReplyFenceByKey.clear(); } function resolveTelegramReasoningLevel(params: { @@ -305,25 +315,25 @@ export const dispatchTelegramMessage = async ({ } await statusReactionController.restoreInitial(); }; - const dispatchFenceKey = resolveTelegramAbortFenceKey({ + const replyFenceKey = resolveTelegramReplyFenceKey({ ctxPayload, chatId, threadSpec, }); - let abortFenceGeneration: number | undefined; + let replyFenceGeneration: number | undefined; let dispatchWasSuperseded = false; const isDispatchSuperseded = () => - abortFenceGeneration !== undefined && - isTelegramAbortFenceSuperseded({ - key: dispatchFenceKey, - generation: abortFenceGeneration, + replyFenceGeneration !== undefined && + isTelegramReplyFenceSuperseded({ + key: replyFenceKey, + generation: replyFenceGeneration, }); - const releaseAbortFence = () => { - if (abortFenceGeneration === undefined) { + const releaseReplyFence = () => { + if (replyFenceGeneration === undefined) { return; } - endTelegramAbortFence(dispatchFenceKey); - abortFenceGeneration = undefined; + endTelegramReplyFence(replyFenceKey); + replyFenceGeneration = undefined; }; const draftMaxChars = Math.min(textLimit, 4096); const tableMode = resolveMarkdownTableMode({ @@ -607,13 +617,10 @@ export const dispatchTelegramMessage = async ({ : undefined; const chunkMode = resolveChunkMode(cfg, "telegram", route.accountId); - const dispatchText = ctxPayload.CommandBody ?? ctxPayload.RawBody ?? ctxPayload.Body ?? ""; - const isAbortRequest = isAbortRequestText(dispatchText); - const shouldSupersedeAbortFence = isAbortRequest ? ctxPayload.CommandAuthorized : true; - abortFenceGeneration = beginTelegramAbortFence({ - key: dispatchFenceKey, - supersede: shouldSupersedeAbortFence, + replyFenceGeneration = beginTelegramReplyFence({ + key: replyFenceKey, + supersede: shouldSupersedeTelegramReplyFence(ctxPayload), }); const implicitQuoteReplyTargetId = @@ -908,7 +915,7 @@ export const dispatchTelegramMessage = async ({ const flushBufferedFinalAnswer = async () => { const buffered = - reasoningStepState.takeBufferedFinalAnswer(abortFenceGeneration); + reasoningStepState.takeBufferedFinalAnswer(replyFenceGeneration); if (!buffered) { return; } @@ -936,7 +943,7 @@ export const dispatchTelegramMessage = async ({ reasoningStepState.bufferFinalAnswer({ payload, text: segment.text, - bufferedGeneration: abortFenceGeneration, + bufferedGeneration: replyFenceGeneration, }); continue; } @@ -1254,7 +1261,7 @@ export const dispatchTelegramMessage = async ({ } } finally { dispatchWasSuperseded = isDispatchSuperseded(); - releaseAbortFence(); + releaseReplyFence(); } if (dispatchWasSuperseded) { if (statusReactionController) {