From fa9c7ddadfd201db6a1bcc146d99aa76fa979907 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 15 May 2026 16:43:44 +0100 Subject: [PATCH] fix: abort superseded telegram room events --- .../telegram/src/action-runtime.test.ts | 42 +++++++++++++++++++ extensions/telegram/src/action-runtime.ts | 17 +++++--- .../telegram/src/bot-message-dispatch.test.ts | 5 ++- .../telegram/src/bot-message-dispatch.ts | 29 +++++++++++-- 4 files changed, 83 insertions(+), 10 deletions(-) diff --git a/extensions/telegram/src/action-runtime.test.ts b/extensions/telegram/src/action-runtime.test.ts index a936505a9a0..03bfadb0d1e 100644 --- a/extensions/telegram/src/action-runtime.test.ts +++ b/extensions/telegram/src/action-runtime.test.ts @@ -444,6 +444,48 @@ describe("handleTelegramAction", () => { endUserRequest(); }); + it.each([ + { + name: "poll", + params: { + action: "poll", + to: "@testchannel", + question: "Ready?", + answers: ["Yes", "No"], + }, + cfg: telegramConfig(), + }, + { + name: "sticker", + params: { + action: "sendSticker", + to: "@testchannel", + fileId: "sticker-1", + }, + cfg: telegramConfig({ actions: { sticker: true } }), + }, + ])("marks room-event delivery after successful $name actions", async ({ params, cfg }) => { + let count = 0; + const end = beginTelegramInboundTurnDeliveryCorrelation( + "telegram-session", + { + outboundTo: "@testchannel", + markInboundTurnDelivered: () => { + count += 1; + }, + }, + { inboundTurnKind: "room_event" }, + ); + + await handleTelegramAction(params, cfg, { + sessionKey: "telegram-session", + inboundTurnKind: "room_event", + }); + + expect(count).toBe(1); + end(); + }); + it("accepts shared send action aliases", async () => { await handleTelegramAction( { diff --git a/extensions/telegram/src/action-runtime.ts b/extensions/telegram/src/action-runtime.ts index 7791add854f..8154cef5705 100644 --- a/extensions/telegram/src/action-runtime.ts +++ b/extensions/telegram/src/action-runtime.ts @@ -244,6 +244,14 @@ export async function handleTelegramAction( cfg, accountId, }); + const notifyVisibleOutboundSuccess = (to: string) => { + notifyTelegramInboundTurnOutboundSuccess({ + sessionKey: options?.sessionKey ?? undefined, + to, + accountId, + inboundTurnKind: options?.inboundTurnKind, + }); + }; if (action === "react") { // All react failures return soft results (jsonResult with ok:false) instead @@ -400,12 +408,7 @@ export async function handleTelegramAction( readBooleanParam(params, "asDocument") ?? false, }); - notifyTelegramInboundTurnOutboundSuccess({ - sessionKey: options?.sessionKey ?? undefined, - to, - accountId, - inboundTurnKind: options?.inboundTurnKind, - }); + notifyVisibleOutboundSuccess(to); await maybePinTelegramActionSend({ args: params, cfg, @@ -483,6 +486,7 @@ export async function handleTelegramAction( silent: silent ?? undefined, }, ); + notifyVisibleOutboundSuccess(to); return jsonResult({ ok: true, messageId: result.messageId, @@ -593,6 +597,7 @@ export async function handleTelegramAction( replyToMessageId: replyToMessageId ?? undefined, messageThreadId: messageThreadId ?? undefined, }); + notifyVisibleOutboundSuccess(to); return jsonResult({ ok: true, messageId: result.messageId, diff --git a/extensions/telegram/src/bot-message-dispatch.test.ts b/extensions/telegram/src/bot-message-dispatch.test.ts index 63a4f2e09aa..96403880b67 100644 --- a/extensions/telegram/src/bot-message-dispatch.test.ts +++ b/extensions/telegram/src/bot-message-dispatch.test.ts @@ -1821,8 +1821,10 @@ describe("dispatchTelegramMessage draft streaming", () => { const userRequestStartGate = new Promise((resolve) => { userRequestStarted = resolve; }); + let roomEventAbortSignal: AbortSignal | undefined; dispatchReplyWithBufferedBlockDispatcher - .mockImplementationOnce(async ({ dispatcherOptions }) => { + .mockImplementationOnce(async ({ dispatcherOptions, replyOptions }) => { + roomEventAbortSignal = replyOptions?.abortSignal; roomEventStarted?.(); await roomEventGate; await dispatcherOptions.deliver({ text: "stale ambient answer" }, { kind: "final" }); @@ -1879,6 +1881,7 @@ describe("dispatchTelegramMessage draft streaming", () => { streamMode: "off", }); await userRequestStartGate; + expect(roomEventAbortSignal?.aborted).toBe(true); releaseRoomEvent?.(); await Promise.all([roomEventPromise, userRequestPromise]); diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index a93e70fb5fb..bb1dc23ccbe 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -166,6 +166,7 @@ type TelegramTranscriptMirrorPayload = { text?: string; mediaUrls?: string[] }; type TelegramReplyFenceState = { generation: number; activeDispatches: number; + abortControllers?: Set; }; type TelegramReplyFenceKey = { @@ -200,7 +201,18 @@ function resolveTelegramReplyFenceKey(params: { }; } -function beginTelegramReplyFence(params: { key: string; supersede: boolean }): number { +function abortTelegramReplyFenceControllers(state: TelegramReplyFenceState): void { + for (const controller of state.abortControllers ?? []) { + controller.abort(); + } + state.abortControllers?.clear(); +} + +function beginTelegramReplyFence(params: { + key: string; + supersede: boolean; + abortController?: AbortController; +}): number { const existing = telegramReplyFenceByKey.get(params.key); const state: TelegramReplyFenceState = existing ?? { generation: 0, @@ -208,6 +220,10 @@ function beginTelegramReplyFence(params: { key: string; supersede: boolean }): n }; if (params.supersede) { state.generation += 1; + abortTelegramReplyFenceControllers(state); + } + if (params.abortController) { + (state.abortControllers ??= new Set()).add(params.abortController); } state.activeDispatches += 1; telegramReplyFenceByKey.set(params.key, state); @@ -220,6 +236,7 @@ function supersedeTelegramReplyFence(key: string): void { return; } state.generation += 1; + abortTelegramReplyFenceControllers(state); telegramReplyFenceByKey.set(key, state); } @@ -227,11 +244,14 @@ function isTelegramReplyFenceSuperseded(params: { key: string; generation: numbe return (telegramReplyFenceByKey.get(params.key)?.generation ?? 0) !== params.generation; } -function endTelegramReplyFence(key: string): void { +function endTelegramReplyFence(key: string, abortController?: AbortController): void { const state = telegramReplyFenceByKey.get(key); if (!state) { return; } + if (abortController) { + state.abortControllers?.delete(abortController); + } state.activeDispatches -= 1; if (state.activeDispatches <= 0) { telegramReplyFenceByKey.delete(key); @@ -473,6 +493,7 @@ export const dispatchTelegramMessage = async ({ threadSpec, }); let replyFenceGeneration: number | undefined; + const roomEventAbortController = isRoomEvent ? new AbortController() : undefined; let dispatchWasSuperseded = false; const isDispatchSuperseded = () => replyFenceGeneration !== undefined && @@ -484,7 +505,7 @@ export const dispatchTelegramMessage = async ({ if (replyFenceGeneration === undefined) { return; } - endTelegramReplyFence(replyFenceKey.activeKey); + endTelegramReplyFence(replyFenceKey.activeKey, roomEventAbortController); replyFenceGeneration = undefined; }; const draftMaxChars = Math.min(textLimit, 4096); @@ -876,6 +897,7 @@ export const dispatchTelegramMessage = async ({ replyFenceGeneration = beginTelegramReplyFence({ key: replyFenceKey.activeKey, supersede: supersedeReplyFence, + abortController: roomEventAbortController, }); const implicitQuoteReplyTargetId = @@ -1459,6 +1481,7 @@ export const dispatchTelegramMessage = async ({ replyOptions: { skillFilter, disableBlockStreaming, + abortSignal: roomEventAbortController?.signal, sourceReplyDeliveryMode: isRoomEvent ? "message_tool_only" : undefined, suppressTyping: isRoomEvent, onPartialReply: