diff --git a/extensions/telegram/src/bot-message-dispatch.test.ts b/extensions/telegram/src/bot-message-dispatch.test.ts index 2936fa3bafd..70d26e95ec7 100644 --- a/extensions/telegram/src/bot-message-dispatch.test.ts +++ b/extensions/telegram/src/bot-message-dispatch.test.ts @@ -2124,7 +2124,6 @@ describe("dispatchTelegramMessage draft streaming", () => { secondStarted = resolve; }); let firstAbortSignal: AbortSignal | undefined; - let sideAbortSignal: AbortSignal | undefined; dispatchReplyWithBufferedBlockDispatcher .mockImplementationOnce(async ({ replyOptions }) => { firstAbortSignal = replyOptions?.abortSignal; @@ -2283,6 +2282,70 @@ describe("dispatchTelegramMessage draft streaming", () => { await Promise.all([firstPromise, sidePromise]); }); + it("lets authorized /stop abort active non-interrupting side dispatch", async () => { + const historyKey = "telegram:group:-100123"; + const groupHistories = new Map([[historyKey, []]]); + let sideStarted: (() => void) | undefined; + const sideStartGate = new Promise((resolve) => { + sideStarted = resolve; + }); + let releaseSide: (() => void) | undefined; + const sideGate = new Promise((resolve) => { + releaseSide = resolve; + }); + let sideAbortSignal: AbortSignal | undefined; + dispatchReplyWithBufferedBlockDispatcher.mockImplementationOnce(async ({ replyOptions }) => { + sideAbortSignal = replyOptions?.abortSignal; + sideStarted?.(); + await sideGate; + return { + queuedFinal: false, + counts: { block: 0, final: 0, tool: 0 }, + }; + }); + deliverReplies.mockResolvedValue({ delivered: true }); + + const createGroupContext = (messageId: number, body: string) => + createContext({ + ctxPayload: { + SessionKey: "agent:main:telegram:group:-100123", + ChatType: "group", + MessageSid: String(messageId), + RawBody: body, + BodyForAgent: body, + CommandBody: body, + CommandAuthorized: true, + } as unknown as TelegramMessageContext["ctxPayload"], + msg: { + chat: { id: -100123, type: "supergroup" }, + message_id: messageId, + text: body, + } as unknown as TelegramMessageContext["msg"], + chatId: -100123, + isGroup: true, + historyKey, + historyLimit: 10, + groupHistories, + threadSpec: { id: undefined, scope: "none" }, + }); + + const sidePromise = dispatchWithContext({ + context: createGroupContext(100, "/btw what changed?"), + streamMode: "off", + }); + await sideStartGate; + expect(sideAbortSignal?.aborted).toBe(false); + + await dispatchWithContext({ + context: createGroupContext(101, "/stop"), + streamMode: "off", + }); + + expect(sideAbortSignal?.aborted).toBe(true); + releaseSide?.(); + await sidePromise; + }); + it("keeps queued room events abortable after their source dispatch returns", async () => { const historyKey = "telegram:group:-100123"; const groupHistories = new Map([[historyKey, []]]); diff --git a/extensions/telegram/src/telegram-reply-fence.test.ts b/extensions/telegram/src/telegram-reply-fence.test.ts index 93132707cad..6543927e312 100644 --- a/extensions/telegram/src/telegram-reply-fence.test.ts +++ b/extensions/telegram/src/telegram-reply-fence.test.ts @@ -1,5 +1,11 @@ import { describe, expect, it } from "vitest"; -import { shouldSupersedeTelegramReplyFence } from "./telegram-reply-fence.js"; +import { + beginTelegramReplyFence, + buildTelegramNonInterruptingReplyFenceKey, + resetTelegramReplyFenceForTests, + shouldSupersedeTelegramReplyFence, + supersedeTelegramReplyFence, +} from "./telegram-reply-fence.js"; describe("shouldSupersedeTelegramReplyFence", () => { it("keeps non-interrupting side and status commands from superseding active runs", () => { @@ -44,3 +50,30 @@ describe("shouldSupersedeTelegramReplyFence", () => { ).toBe(true); }); }); + +describe("telegram reply fence supersede", () => { + it("cascades base supersedes to non-interrupting child fences", () => { + resetTelegramReplyFenceForTests(); + const activeKey = "agent:main:telegram:group:-100123"; + const sideController = new AbortController(); + const mainController = new AbortController(); + beginTelegramReplyFence({ + key: activeKey, + supersede: true, + abortController: mainController, + }); + beginTelegramReplyFence({ + key: buildTelegramNonInterruptingReplyFenceKey({ + activeKey, + laneKey: "default\0telegram:-100123:btw:100", + }), + supersede: false, + abortController: sideController, + }); + + expect(supersedeTelegramReplyFence(activeKey)).toBe(true); + expect(mainController.signal.aborted).toBe(true); + expect(sideController.signal.aborted).toBe(true); + resetTelegramReplyFenceForTests(); + }); +}); diff --git a/extensions/telegram/src/telegram-reply-fence.ts b/extensions/telegram/src/telegram-reply-fence.ts index e21f0d72b37..57eb94f99e8 100644 --- a/extensions/telegram/src/telegram-reply-fence.ts +++ b/extensions/telegram/src/telegram-reply-fence.ts @@ -31,7 +31,11 @@ export function buildTelegramNonInterruptingReplyFenceKey(params: { activeKey: string; laneKey: string; }): string { - return `${params.activeKey}\0non-interrupting\0${params.laneKey}`; + return `${buildTelegramNonInterruptingReplyFenceKeyPrefix(params.activeKey)}${params.laneKey}`; +} + +function buildTelegramNonInterruptingReplyFenceKeyPrefix(activeKey: string): string { + return `${activeKey}\0non-interrupting\0`; } function normalizeTelegramFenceKey(value: unknown): string | undefined { @@ -98,6 +102,7 @@ export function beginTelegramReplyFence(params: { if (params.supersede) { state.generation += 1; abortTelegramReplyFenceControllers(state); + supersedeTelegramNonInterruptingReplyFenceChildren(params.key); } if (params.abortController) { (state.abortControllers ??= new Set()).add(params.abortController); @@ -114,7 +119,7 @@ export function beginTelegramReplyFence(params: { return state.generation; } -export function supersedeTelegramReplyFence(key: string): boolean { +function supersedeTelegramReplyFenceState(key: string): boolean { const state = telegramReplyFenceByKey.get(key); if (!state) { return false; @@ -125,6 +130,23 @@ export function supersedeTelegramReplyFence(key: string): boolean { return true; } +function supersedeTelegramNonInterruptingReplyFenceChildren(key: string): boolean { + let superseded = false; + const childPrefix = buildTelegramNonInterruptingReplyFenceKeyPrefix(key); + for (const childKey of [...telegramReplyFenceByKey.keys()]) { + if (childKey.startsWith(childPrefix)) { + superseded = supersedeTelegramReplyFenceState(childKey) || superseded; + } + } + return superseded; +} + +export function supersedeTelegramReplyFence(key: string): boolean { + let superseded = supersedeTelegramReplyFenceState(key); + superseded = supersedeTelegramNonInterruptingReplyFenceChildren(key) || superseded; + return superseded; +} + export function supersedeTelegramReplyFenceLane(laneKey: string): boolean { const keys = [...(telegramReplyFenceKeysByLane.get(laneKey) ?? [])]; let superseded = false;