From 0f67dfd074e25cef641580750ccdd558d8c53cf0 Mon Sep 17 00:00:00 2001 From: Neerav Makwana <261249544+neeravmakwana@users.noreply.github.com> Date: Mon, 25 May 2026 17:17:39 -0400 Subject: [PATCH] fix(telegram): keep overlapping DM replies deliverable (#85361) (thanks @neeravmakwana) Behavior addressed: Telegram direct-message turns no longer drop an earlier overlapping normal reply, while authorized aborts and explicit/native/plugin/skill command turns still supersede active reply work. Real environment tested: local OpenClaw focused Telegram test shard plus existing contributor Telegram screenshot/log proof in the PR body. Exact steps or command run after this patch: pnpm test extensions/telegram/src/telegram-reply-fence.test.ts extensions/telegram/src/bot-message-dispatch.test.ts; .agents/skills/autoreview/scripts/autoreview --mode branch --base origin/main Evidence after fix: 2 test files passed, 93 tests passed; final autoreview clean with no accepted/actionable findings. Observed result after fix: overlapping normal Telegram DMs use non-interrupting reply fences and both final replies remain deliverable; direct /stop, authorized built-in commands, and explicit text/native command turns still supersede. What was not tested: fresh live Telegram Desktop rerun by this agent; PR retains contributor screenshot/log proof and the Real behavior proof bot remains red despite proof labels. Thanks @neeravmakwana. Co-authored-by: Neerav Makwana <261249544+neeravmakwana@users.noreply.github.com> --- .../telegram/src/bot-message-dispatch.test.ts | 82 +++++++++++++++++++ .../telegram/src/telegram-reply-fence.test.ts | 52 ++++++++++++ .../telegram/src/telegram-reply-fence.ts | 24 ++++++ 3 files changed, 158 insertions(+) diff --git a/extensions/telegram/src/bot-message-dispatch.test.ts b/extensions/telegram/src/bot-message-dispatch.test.ts index 71e8c71c3ec..27f8d5aeeb4 100644 --- a/extensions/telegram/src/bot-message-dispatch.test.ts +++ b/extensions/telegram/src/bot-message-dispatch.test.ts @@ -3050,6 +3050,88 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(deliveredTexts).toContain("fresh request answer"); }); + it("keeps newer DM requests from aborting active same-session dispatch", async () => { + let firstStarted: (() => void) | undefined; + const firstStartGate = new Promise((resolve) => { + firstStarted = resolve; + }); + let releaseFirst: (() => void) | undefined; + const firstGate = new Promise((resolve) => { + releaseFirst = resolve; + }); + let secondStarted: (() => void) | undefined; + const secondStartGate = new Promise((resolve) => { + secondStarted = resolve; + }); + let firstAbortSignal: AbortSignal | undefined; + dispatchReplyWithBufferedBlockDispatcher + .mockImplementationOnce(async ({ dispatcherOptions, replyOptions }) => { + firstAbortSignal = replyOptions?.abortSignal; + firstStarted?.(); + await firstGate; + await dispatcherOptions.deliver({ text: "earlier DM answer" }, { kind: "final" }); + return { + queuedFinal: true, + counts: { block: 0, final: 1, tool: 0 }, + }; + }) + .mockImplementationOnce(async ({ dispatcherOptions }) => { + secondStarted?.(); + await dispatcherOptions.deliver({ text: "fresh DM answer" }, { kind: "final" }); + return { + queuedFinal: true, + counts: { block: 0, final: 1, tool: 0 }, + }; + }); + deliverReplies.mockResolvedValue({ delivered: true }); + + const createDirectContext = (messageId: number, body: string) => + createContext({ + ctxPayload: { + SessionKey: "agent:main:main", + ChatType: "direct", + MessageSid: String(messageId), + RawBody: body, + BodyForAgent: body, + CommandBody: body, + CommandAuthorized: true, + } as unknown as TelegramMessageContext["ctxPayload"], + msg: { + chat: { id: 123, type: "private" }, + message_id: messageId, + } as unknown as TelegramMessageContext["msg"], + chatId: 123, + isGroup: false, + historyKey: "telegram:123", + historyLimit: 10, + groupHistories: new Map(), + threadSpec: { id: undefined, scope: "none" }, + }); + + const firstPromise = dispatchWithContext({ + context: createDirectContext(99, "first request"), + streamMode: "off", + }); + await firstStartGate; + const secondPromise = dispatchWithContext({ + context: createDirectContext(100, "second request"), + streamMode: "off", + }); + await secondStartGate; + + expect(firstAbortSignal?.aborted).toBe(false); + releaseFirst?.(); + await Promise.all([firstPromise, secondPromise]); + + const deliveredTexts = deliverReplies.mock.calls.flatMap((call) => + ((call[0] as { replies?: Array<{ text?: string }> }).replies ?? []).map( + (reply) => reply.text, + ), + ); + expect(deliveredTexts).toContain("fresh DM answer"); + expect(deliveredTexts).toContain("earlier DM answer"); + }); + it("keeps /btw side questions from aborting an active same-session dispatch", async () => { const historyKey = "telegram:group:-100123"; const groupHistories = new Map([[historyKey, []]]); diff --git a/extensions/telegram/src/telegram-reply-fence.test.ts b/extensions/telegram/src/telegram-reply-fence.test.ts index 965e44667da..a8b32838df9 100644 --- a/extensions/telegram/src/telegram-reply-fence.test.ts +++ b/extensions/telegram/src/telegram-reply-fence.test.ts @@ -55,6 +55,58 @@ describe("shouldSupersedeTelegramReplyFence", () => { }), ).toBe(true); }); + + it("keeps normal direct turns deliverable while preserving direct aborts", () => { + expect( + shouldSupersedeTelegramReplyFence({ + ChatType: "direct", + CommandBody: "answer this", + CommandAuthorized: true, + }), + ).toBe(false); + expect( + shouldSupersedeTelegramReplyFence({ + ChatType: "direct", + CommandBody: "/stop", + CommandAuthorized: true, + }), + ).toBe(true); + expect( + shouldSupersedeTelegramReplyFence({ + ChatType: "direct", + CommandBody: "/diagnostics confirm abc123def456", + CommandAuthorized: true, + }), + ).toBe(true); + expect( + shouldSupersedeTelegramReplyFence({ + ChatType: "direct", + CommandBody: "/diagnostics confirm abc123def456", + CommandAuthorized: false, + }), + ).toBe(false); + expect( + shouldSupersedeTelegramReplyFence({ + ChatType: "direct", + CommandBody: "/var/log error", + CommandAuthorized: true, + }), + ).toBe(false); + expect( + shouldSupersedeTelegramReplyFence({ + ChatType: "direct", + CommandBody: "/plugin_command", + CommandAuthorized: true, + CommandTurn: { + kind: "text-slash", + source: "text", + authorized: true, + commandName: "plugin_command", + body: "/plugin_command", + }, + }), + ).toBe(true); + }); }); describe("telegram reply fence supersede", () => { diff --git a/extensions/telegram/src/telegram-reply-fence.ts b/extensions/telegram/src/telegram-reply-fence.ts index 817e908d0b5..424a5809b71 100644 --- a/extensions/telegram/src/telegram-reply-fence.ts +++ b/extensions/telegram/src/telegram-reply-fence.ts @@ -1,3 +1,11 @@ +import { + isExplicitCommandTurn, + type CommandTurnContext, +} from "openclaw/plugin-sdk/channel-inbound"; +import { + maybeResolveTextAlias, + normalizeCommandBody, +} from "openclaw/plugin-sdk/command-auth-native"; import { isAbortRequestText, isBtwRequestText, @@ -190,11 +198,17 @@ export function releaseTelegramReplyFenceAbortController( maybeDeleteTelegramReplyFenceState(key, state); } +function isRecognizedTelegramTextCommand(rawText: string): boolean { + return maybeResolveTextAlias(normalizeCommandBody(rawText)) != null; +} + export function shouldSupersedeTelegramReplyFence(ctxPayload: { Body?: string; + ChatType?: string; RawBody?: string; CommandBody?: string; CommandAuthorized: boolean; + CommandTurn?: CommandTurnContext; }): boolean { const dispatchText = ctxPayload.CommandBody ?? ctxPayload.RawBody ?? ctxPayload.Body ?? ""; if (isAbortRequestText(dispatchText)) { @@ -206,6 +220,16 @@ export function shouldSupersedeTelegramReplyFence(ctxPayload: { ) { return false; } + if (ctxPayload.ChatType === "direct") { + if ( + ctxPayload.CommandAuthorized && + (isExplicitCommandTurn(ctxPayload.CommandTurn) || + isRecognizedTelegramTextCommand(dispatchText)) + ) { + return true; + } + return false; + } return true; }