fix(telegram): stop noninterrupting reply fences

This commit is contained in:
Peter Steinberger
2026-05-18 12:01:41 +01:00
parent 1ba9f5ded3
commit 25aa72edbd
3 changed files with 122 additions and 4 deletions

View File

@@ -2124,7 +2124,6 @@ describe("dispatchTelegramMessage draft streaming", () => {
secondStarted = resolve;
});
let firstAbortSignal: AbortSignal | undefined;
let sideAbortSignal: AbortSignal | undefined;
dispatchReplyWithBufferedBlockDispatcher
.mockImplementationOnce(async ({ replyOptions }) => {
firstAbortSignal = replyOptions?.abortSignal;
@@ -2283,6 +2282,70 @@ describe("dispatchTelegramMessage draft streaming", () => {
await Promise.all([firstPromise, sidePromise]);
});
it("lets authorized /stop abort active non-interrupting side dispatch", async () => {
const historyKey = "telegram:group:-100123";
const groupHistories = new Map([[historyKey, []]]);
let sideStarted: (() => void) | undefined;
const sideStartGate = new Promise<void>((resolve) => {
sideStarted = resolve;
});
let releaseSide: (() => void) | undefined;
const sideGate = new Promise<void>((resolve) => {
releaseSide = resolve;
});
let sideAbortSignal: AbortSignal | undefined;
dispatchReplyWithBufferedBlockDispatcher.mockImplementationOnce(async ({ replyOptions }) => {
sideAbortSignal = replyOptions?.abortSignal;
sideStarted?.();
await sideGate;
return {
queuedFinal: false,
counts: { block: 0, final: 0, tool: 0 },
};
});
deliverReplies.mockResolvedValue({ delivered: true });
const createGroupContext = (messageId: number, body: string) =>
createContext({
ctxPayload: {
SessionKey: "agent:main:telegram:group:-100123",
ChatType: "group",
MessageSid: String(messageId),
RawBody: body,
BodyForAgent: body,
CommandBody: body,
CommandAuthorized: true,
} as unknown as TelegramMessageContext["ctxPayload"],
msg: {
chat: { id: -100123, type: "supergroup" },
message_id: messageId,
text: body,
} as unknown as TelegramMessageContext["msg"],
chatId: -100123,
isGroup: true,
historyKey,
historyLimit: 10,
groupHistories,
threadSpec: { id: undefined, scope: "none" },
});
const sidePromise = dispatchWithContext({
context: createGroupContext(100, "/btw what changed?"),
streamMode: "off",
});
await sideStartGate;
expect(sideAbortSignal?.aborted).toBe(false);
await dispatchWithContext({
context: createGroupContext(101, "/stop"),
streamMode: "off",
});
expect(sideAbortSignal?.aborted).toBe(true);
releaseSide?.();
await sidePromise;
});
it("keeps queued room events abortable after their source dispatch returns", async () => {
const historyKey = "telegram:group:-100123";
const groupHistories = new Map([[historyKey, []]]);

View File

@@ -1,5 +1,11 @@
import { describe, expect, it } from "vitest";
import { shouldSupersedeTelegramReplyFence } from "./telegram-reply-fence.js";
import {
beginTelegramReplyFence,
buildTelegramNonInterruptingReplyFenceKey,
resetTelegramReplyFenceForTests,
shouldSupersedeTelegramReplyFence,
supersedeTelegramReplyFence,
} from "./telegram-reply-fence.js";
describe("shouldSupersedeTelegramReplyFence", () => {
it("keeps non-interrupting side and status commands from superseding active runs", () => {
@@ -44,3 +50,30 @@ describe("shouldSupersedeTelegramReplyFence", () => {
).toBe(true);
});
});
describe("telegram reply fence supersede", () => {
it("cascades base supersedes to non-interrupting child fences", () => {
resetTelegramReplyFenceForTests();
const activeKey = "agent:main:telegram:group:-100123";
const sideController = new AbortController();
const mainController = new AbortController();
beginTelegramReplyFence({
key: activeKey,
supersede: true,
abortController: mainController,
});
beginTelegramReplyFence({
key: buildTelegramNonInterruptingReplyFenceKey({
activeKey,
laneKey: "default\0telegram:-100123:btw:100",
}),
supersede: false,
abortController: sideController,
});
expect(supersedeTelegramReplyFence(activeKey)).toBe(true);
expect(mainController.signal.aborted).toBe(true);
expect(sideController.signal.aborted).toBe(true);
resetTelegramReplyFenceForTests();
});
});

View File

@@ -31,7 +31,11 @@ export function buildTelegramNonInterruptingReplyFenceKey(params: {
activeKey: string;
laneKey: string;
}): string {
return `${params.activeKey}\0non-interrupting\0${params.laneKey}`;
return `${buildTelegramNonInterruptingReplyFenceKeyPrefix(params.activeKey)}${params.laneKey}`;
}
function buildTelegramNonInterruptingReplyFenceKeyPrefix(activeKey: string): string {
return `${activeKey}\0non-interrupting\0`;
}
function normalizeTelegramFenceKey(value: unknown): string | undefined {
@@ -98,6 +102,7 @@ export function beginTelegramReplyFence(params: {
if (params.supersede) {
state.generation += 1;
abortTelegramReplyFenceControllers(state);
supersedeTelegramNonInterruptingReplyFenceChildren(params.key);
}
if (params.abortController) {
(state.abortControllers ??= new Set()).add(params.abortController);
@@ -114,7 +119,7 @@ export function beginTelegramReplyFence(params: {
return state.generation;
}
export function supersedeTelegramReplyFence(key: string): boolean {
function supersedeTelegramReplyFenceState(key: string): boolean {
const state = telegramReplyFenceByKey.get(key);
if (!state) {
return false;
@@ -125,6 +130,23 @@ export function supersedeTelegramReplyFence(key: string): boolean {
return true;
}
function supersedeTelegramNonInterruptingReplyFenceChildren(key: string): boolean {
let superseded = false;
const childPrefix = buildTelegramNonInterruptingReplyFenceKeyPrefix(key);
for (const childKey of [...telegramReplyFenceByKey.keys()]) {
if (childKey.startsWith(childPrefix)) {
superseded = supersedeTelegramReplyFenceState(childKey) || superseded;
}
}
return superseded;
}
export function supersedeTelegramReplyFence(key: string): boolean {
let superseded = supersedeTelegramReplyFenceState(key);
superseded = supersedeTelegramNonInterruptingReplyFenceChildren(key) || superseded;
return superseded;
}
export function supersedeTelegramReplyFenceLane(laneKey: string): boolean {
const keys = [...(telegramReplyFenceKeysByLane.get(laneKey) ?? [])];
let superseded = false;