refactor(telegram): clarify reply supersession fence

This commit is contained in:
Peter Steinberger
2026-05-03 17:17:26 +01:00
parent 2696baba81
commit 5330cbb25d
2 changed files with 48 additions and 41 deletions

View File

@@ -129,8 +129,8 @@ vi.mock("./sticker-cache.js", () => ({
}));
let dispatchTelegramMessage: typeof import("./bot-message-dispatch.js").dispatchTelegramMessage;
let getTelegramAbortFenceSizeForTests: typeof import("./bot-message-dispatch.js").getTelegramAbortFenceSizeForTests;
let resetTelegramAbortFenceForTests: typeof import("./bot-message-dispatch.js").resetTelegramAbortFenceForTests;
let getTelegramReplyFenceSizeForTests: typeof import("./bot-message-dispatch.js").getTelegramReplyFenceSizeForTests;
let resetTelegramReplyFenceForTests: typeof import("./bot-message-dispatch.js").resetTelegramReplyFenceForTests;
const telegramDepsForTest: TelegramBotDeps = {
getRuntimeConfig: loadConfig as TelegramBotDeps["getRuntimeConfig"],
@@ -163,13 +163,13 @@ describe("dispatchTelegramMessage draft streaming", () => {
beforeAll(async () => {
({
dispatchTelegramMessage,
getTelegramAbortFenceSizeForTests,
resetTelegramAbortFenceForTests,
getTelegramReplyFenceSizeForTests,
resetTelegramReplyFenceForTests,
} = await import("./bot-message-dispatch.js"));
});
beforeEach(() => {
resetTelegramAbortFenceForTests();
resetTelegramReplyFenceForTests();
createTelegramDraftStream.mockReset();
dispatchReplyWithBufferedBlockDispatcher.mockReset();
deliverReplies.mockReset();
@@ -3707,7 +3707,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
}),
).rejects.toThrow("sticker setup failed");
expect(getTelegramAbortFenceSizeForTests()).toBe(0);
expect(getTelegramReplyFenceSizeForTests()).toBe(0);
});
it("keeps older answer finalization when abort targets a different session", async () => {

View File

@@ -123,13 +123,13 @@ type DispatchTelegramMessageParams = {
type TelegramReasoningLevel = "off" | "on" | "stream";
type TelegramAbortFenceState = {
type TelegramReplyFenceState = {
generation: number;
activeDispatches: number;
};
// Newer accepted turns and authorized aborts can arrive ahead of older same-session reply work.
const telegramAbortFenceByKey = new Map<string, TelegramAbortFenceState>();
const telegramReplyFenceByKey = new Map<string, TelegramReplyFenceState>();
function normalizeTelegramFenceKey(value: unknown): string | undefined {
if (typeof value !== "string") {
@@ -139,7 +139,7 @@ function normalizeTelegramFenceKey(value: unknown): string | undefined {
return trimmed.length > 0 ? trimmed : undefined;
}
function resolveTelegramAbortFenceKey(params: {
function resolveTelegramReplyFenceKey(params: {
ctxPayload: { SessionKey?: string; CommandTargetSessionKey?: string };
chatId: number | string;
threadSpec: { id?: number | string | null; scope?: string };
@@ -151,9 +151,9 @@ function resolveTelegramAbortFenceKey(params: {
);
}
function beginTelegramAbortFence(params: { key: string; supersede: boolean }): number {
const existing = telegramAbortFenceByKey.get(params.key);
const state: TelegramAbortFenceState = existing ?? {
function beginTelegramReplyFence(params: { key: string; supersede: boolean }): number {
const existing = telegramReplyFenceByKey.get(params.key);
const state: TelegramReplyFenceState = existing ?? {
generation: 0,
activeDispatches: 0,
};
@@ -161,31 +161,41 @@ function beginTelegramAbortFence(params: { key: string; supersede: boolean }): n
state.generation += 1;
}
state.activeDispatches += 1;
telegramAbortFenceByKey.set(params.key, state);
telegramReplyFenceByKey.set(params.key, state);
return state.generation;
}
function isTelegramAbortFenceSuperseded(params: { key: string; generation: number }): boolean {
return (telegramAbortFenceByKey.get(params.key)?.generation ?? 0) !== params.generation;
function isTelegramReplyFenceSuperseded(params: { key: string; generation: number }): boolean {
return (telegramReplyFenceByKey.get(params.key)?.generation ?? 0) !== params.generation;
}
function endTelegramAbortFence(key: string): void {
const state = telegramAbortFenceByKey.get(key);
function endTelegramReplyFence(key: string): void {
const state = telegramReplyFenceByKey.get(key);
if (!state) {
return;
}
state.activeDispatches -= 1;
if (state.activeDispatches <= 0) {
telegramAbortFenceByKey.delete(key);
telegramReplyFenceByKey.delete(key);
}
}
export function getTelegramAbortFenceSizeForTests(): number {
return telegramAbortFenceByKey.size;
function shouldSupersedeTelegramReplyFence(ctxPayload: {
Body?: string;
RawBody?: string;
CommandBody?: string;
CommandAuthorized: boolean;
}): boolean {
const dispatchText = ctxPayload.CommandBody ?? ctxPayload.RawBody ?? ctxPayload.Body ?? "";
return !isAbortRequestText(dispatchText) || ctxPayload.CommandAuthorized;
}
export function resetTelegramAbortFenceForTests(): void {
telegramAbortFenceByKey.clear();
export function getTelegramReplyFenceSizeForTests(): number {
return telegramReplyFenceByKey.size;
}
export function resetTelegramReplyFenceForTests(): void {
telegramReplyFenceByKey.clear();
}
function resolveTelegramReasoningLevel(params: {
@@ -305,25 +315,25 @@ export const dispatchTelegramMessage = async ({
}
await statusReactionController.restoreInitial();
};
const dispatchFenceKey = resolveTelegramAbortFenceKey({
const replyFenceKey = resolveTelegramReplyFenceKey({
ctxPayload,
chatId,
threadSpec,
});
let abortFenceGeneration: number | undefined;
let replyFenceGeneration: number | undefined;
let dispatchWasSuperseded = false;
const isDispatchSuperseded = () =>
abortFenceGeneration !== undefined &&
isTelegramAbortFenceSuperseded({
key: dispatchFenceKey,
generation: abortFenceGeneration,
replyFenceGeneration !== undefined &&
isTelegramReplyFenceSuperseded({
key: replyFenceKey,
generation: replyFenceGeneration,
});
const releaseAbortFence = () => {
if (abortFenceGeneration === undefined) {
const releaseReplyFence = () => {
if (replyFenceGeneration === undefined) {
return;
}
endTelegramAbortFence(dispatchFenceKey);
abortFenceGeneration = undefined;
endTelegramReplyFence(replyFenceKey);
replyFenceGeneration = undefined;
};
const draftMaxChars = Math.min(textLimit, 4096);
const tableMode = resolveMarkdownTableMode({
@@ -607,13 +617,10 @@ export const dispatchTelegramMessage = async ({
: undefined;
const chunkMode = resolveChunkMode(cfg, "telegram", route.accountId);
const dispatchText = ctxPayload.CommandBody ?? ctxPayload.RawBody ?? ctxPayload.Body ?? "";
const isAbortRequest = isAbortRequestText(dispatchText);
const shouldSupersedeAbortFence = isAbortRequest ? ctxPayload.CommandAuthorized : true;
abortFenceGeneration = beginTelegramAbortFence({
key: dispatchFenceKey,
supersede: shouldSupersedeAbortFence,
replyFenceGeneration = beginTelegramReplyFence({
key: replyFenceKey,
supersede: shouldSupersedeTelegramReplyFence(ctxPayload),
});
const implicitQuoteReplyTargetId =
@@ -908,7 +915,7 @@ export const dispatchTelegramMessage = async ({
const flushBufferedFinalAnswer = async () => {
const buffered =
reasoningStepState.takeBufferedFinalAnswer(abortFenceGeneration);
reasoningStepState.takeBufferedFinalAnswer(replyFenceGeneration);
if (!buffered) {
return;
}
@@ -936,7 +943,7 @@ export const dispatchTelegramMessage = async ({
reasoningStepState.bufferFinalAnswer({
payload,
text: segment.text,
bufferedGeneration: abortFenceGeneration,
bufferedGeneration: replyFenceGeneration,
});
continue;
}
@@ -1254,7 +1261,7 @@ export const dispatchTelegramMessage = async ({
}
} finally {
dispatchWasSuperseded = isDispatchSuperseded();
releaseAbortFence();
releaseReplyFence();
}
if (dispatchWasSuperseded) {
if (statusReactionController) {