fix: abort superseded telegram room events

This commit is contained in:
Peter Steinberger
2026-05-15 16:43:44 +01:00
parent ad29f089e4
commit fa9c7ddadf
4 changed files with 83 additions and 10 deletions

View File

@@ -444,6 +444,48 @@ describe("handleTelegramAction", () => {
endUserRequest();
});
it.each([
{
name: "poll",
params: {
action: "poll",
to: "@testchannel",
question: "Ready?",
answers: ["Yes", "No"],
},
cfg: telegramConfig(),
},
{
name: "sticker",
params: {
action: "sendSticker",
to: "@testchannel",
fileId: "sticker-1",
},
cfg: telegramConfig({ actions: { sticker: true } }),
},
])("marks room-event delivery after successful $name actions", async ({ params, cfg }) => {
let count = 0;
const end = beginTelegramInboundTurnDeliveryCorrelation(
"telegram-session",
{
outboundTo: "@testchannel",
markInboundTurnDelivered: () => {
count += 1;
},
},
{ inboundTurnKind: "room_event" },
);
await handleTelegramAction(params, cfg, {
sessionKey: "telegram-session",
inboundTurnKind: "room_event",
});
expect(count).toBe(1);
end();
});
it("accepts shared send action aliases", async () => {
await handleTelegramAction(
{

View File

@@ -244,6 +244,14 @@ export async function handleTelegramAction(
cfg,
accountId,
});
const notifyVisibleOutboundSuccess = (to: string) => {
notifyTelegramInboundTurnOutboundSuccess({
sessionKey: options?.sessionKey ?? undefined,
to,
accountId,
inboundTurnKind: options?.inboundTurnKind,
});
};
if (action === "react") {
// All react failures return soft results (jsonResult with ok:false) instead
@@ -400,12 +408,7 @@ export async function handleTelegramAction(
readBooleanParam(params, "asDocument") ??
false,
});
notifyTelegramInboundTurnOutboundSuccess({
sessionKey: options?.sessionKey ?? undefined,
to,
accountId,
inboundTurnKind: options?.inboundTurnKind,
});
notifyVisibleOutboundSuccess(to);
await maybePinTelegramActionSend({
args: params,
cfg,
@@ -483,6 +486,7 @@ export async function handleTelegramAction(
silent: silent ?? undefined,
},
);
notifyVisibleOutboundSuccess(to);
return jsonResult({
ok: true,
messageId: result.messageId,
@@ -593,6 +597,7 @@ export async function handleTelegramAction(
replyToMessageId: replyToMessageId ?? undefined,
messageThreadId: messageThreadId ?? undefined,
});
notifyVisibleOutboundSuccess(to);
return jsonResult({
ok: true,
messageId: result.messageId,

View File

@@ -1821,8 +1821,10 @@ describe("dispatchTelegramMessage draft streaming", () => {
const userRequestStartGate = new Promise<void>((resolve) => {
userRequestStarted = resolve;
});
let roomEventAbortSignal: AbortSignal | undefined;
dispatchReplyWithBufferedBlockDispatcher
.mockImplementationOnce(async ({ dispatcherOptions }) => {
.mockImplementationOnce(async ({ dispatcherOptions, replyOptions }) => {
roomEventAbortSignal = replyOptions?.abortSignal;
roomEventStarted?.();
await roomEventGate;
await dispatcherOptions.deliver({ text: "stale ambient answer" }, { kind: "final" });
@@ -1879,6 +1881,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
streamMode: "off",
});
await userRequestStartGate;
expect(roomEventAbortSignal?.aborted).toBe(true);
releaseRoomEvent?.();
await Promise.all([roomEventPromise, userRequestPromise]);

View File

@@ -166,6 +166,7 @@ type TelegramTranscriptMirrorPayload = { text?: string; mediaUrls?: string[] };
type TelegramReplyFenceState = {
generation: number;
activeDispatches: number;
abortControllers?: Set<AbortController>;
};
type TelegramReplyFenceKey = {
@@ -200,7 +201,18 @@ function resolveTelegramReplyFenceKey(params: {
};
}
function beginTelegramReplyFence(params: { key: string; supersede: boolean }): number {
function abortTelegramReplyFenceControllers(state: TelegramReplyFenceState): void {
for (const controller of state.abortControllers ?? []) {
controller.abort();
}
state.abortControllers?.clear();
}
function beginTelegramReplyFence(params: {
key: string;
supersede: boolean;
abortController?: AbortController;
}): number {
const existing = telegramReplyFenceByKey.get(params.key);
const state: TelegramReplyFenceState = existing ?? {
generation: 0,
@@ -208,6 +220,10 @@ function beginTelegramReplyFence(params: { key: string; supersede: boolean }): n
};
if (params.supersede) {
state.generation += 1;
abortTelegramReplyFenceControllers(state);
}
if (params.abortController) {
(state.abortControllers ??= new Set()).add(params.abortController);
}
state.activeDispatches += 1;
telegramReplyFenceByKey.set(params.key, state);
@@ -220,6 +236,7 @@ function supersedeTelegramReplyFence(key: string): void {
return;
}
state.generation += 1;
abortTelegramReplyFenceControllers(state);
telegramReplyFenceByKey.set(key, state);
}
@@ -227,11 +244,14 @@ function isTelegramReplyFenceSuperseded(params: { key: string; generation: numbe
return (telegramReplyFenceByKey.get(params.key)?.generation ?? 0) !== params.generation;
}
function endTelegramReplyFence(key: string): void {
function endTelegramReplyFence(key: string, abortController?: AbortController): void {
const state = telegramReplyFenceByKey.get(key);
if (!state) {
return;
}
if (abortController) {
state.abortControllers?.delete(abortController);
}
state.activeDispatches -= 1;
if (state.activeDispatches <= 0) {
telegramReplyFenceByKey.delete(key);
@@ -473,6 +493,7 @@ export const dispatchTelegramMessage = async ({
threadSpec,
});
let replyFenceGeneration: number | undefined;
const roomEventAbortController = isRoomEvent ? new AbortController() : undefined;
let dispatchWasSuperseded = false;
const isDispatchSuperseded = () =>
replyFenceGeneration !== undefined &&
@@ -484,7 +505,7 @@ export const dispatchTelegramMessage = async ({
if (replyFenceGeneration === undefined) {
return;
}
endTelegramReplyFence(replyFenceKey.activeKey);
endTelegramReplyFence(replyFenceKey.activeKey, roomEventAbortController);
replyFenceGeneration = undefined;
};
const draftMaxChars = Math.min(textLimit, 4096);
@@ -876,6 +897,7 @@ export const dispatchTelegramMessage = async ({
replyFenceGeneration = beginTelegramReplyFence({
key: replyFenceKey.activeKey,
supersede: supersedeReplyFence,
abortController: roomEventAbortController,
});
const implicitQuoteReplyTargetId =
@@ -1459,6 +1481,7 @@ export const dispatchTelegramMessage = async ({
replyOptions: {
skillFilter,
disableBlockStreaming,
abortSignal: roomEventAbortController?.signal,
sourceReplyDeliveryMode: isRoomEvent ? "message_tool_only" : undefined,
suppressTyping: isRoomEvent,
onPartialReply: