From 996eb9a024d03ad68cc2a34f7f1df423aa47e652 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rub=C3=A9n=20Cuevas?= Date: Sat, 18 Apr 2026 00:32:38 -0400 Subject: [PATCH] fix: fence Telegram stale reply delivery after abort (#68100) (thanks @rubencu) * fix(telegram): fence stale reply delivery after abort * refactor(telegram): narrow abort fence scope * fix(telegram): ignore stale reply finalization after abort * fix(telegram): close abort supersession races * fix(telegram): release abort fences on setup errors * fix(telegram): discard superseded draft cleanup * refactor(telegram): distill abort fence cleanup * fix: fence Telegram stale reply delivery after abort (#68100) (thanks @rubencu) --------- Co-authored-by: Ayaan Zaidi --- CHANGELOG.md | 1 + .../telegram/src/bot-message-dispatch.test.ts | 644 +++++++++- .../telegram/src/bot-message-dispatch.ts | 1095 +++++++++-------- .../telegram/src/draft-stream.test-helpers.ts | 6 + extensions/telegram/src/draft-stream.test.ts | 22 +- extensions/telegram/src/draft-stream.ts | 72 +- 6 files changed, 1301 insertions(+), 539 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7094e0a0e4d..19284603d1d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ Docs: https://docs.openclaw.ai - Agents/tools: filter bundled MCP/LSP tools through the final owner-only and tool-policy pipeline after merging them into the effective tool list, so existing allowlists, deny rules, sandbox policy, subagent policy, and owner-only restrictions apply to bundled tools the same way they apply to core tools. (#68195) - Gateway/assistant media: require `operator.read` scope for assistant-media file and metadata requests on identity-bearing HTTP auth paths so callers without a read scope can no longer access assistant media. (#68175) Thanks @eleqtrizit. - Exec approvals/display: escape raw control characters (including newline and carriage return) in the shared and macOS approval-prompt command sanitizers, so trailing command payloads no longer render on hidden extra lines in the approval UI. (#68198) +- Telegram/streaming: fence same-session stale preview and finalization work after aborts so Telegram no longer replays an older reply or flushes a hidden short preview after the abort confirmation lands. (#68100) Thanks @rubencu. - OpenAI Codex/OAuth + Pi: keep imported Codex CLI OAuth bootstrap, Pi auth export, and runtime overlay handling aligned so Codex sessions survive refresh and health checks without leaking transient CLI state into saved auth files. Thanks @vincentkoc. - Agents/TTS: report failed speech synthesis as a real tool error so unconfigured providers no longer feed successful TTS failure output back into agent loops. (#67980) Thanks @lawrence3699. - Gateway/wake: allow unknown properties on wake payloads so external senders like Paperclip can attach opaque metadata without failing schema validation. (#68355) Thanks @kagura-agent. diff --git a/extensions/telegram/src/bot-message-dispatch.test.ts b/extensions/telegram/src/bot-message-dispatch.test.ts index fdf011b0f5c..3adc3198a4b 100644 --- a/extensions/telegram/src/bot-message-dispatch.test.ts +++ b/extensions/telegram/src/bot-message-dispatch.test.ts @@ -58,6 +58,14 @@ const wasSentByBot = vi.hoisted(() => vi.fn(() => false)); const loadSessionStore = vi.hoisted(() => vi.fn()); const resolveStorePath = vi.hoisted(() => vi.fn(() => "/tmp/sessions.json")); const generateTopicLabel = vi.hoisted(() => vi.fn()); +const describeStickerImage = vi.hoisted(() => vi.fn(async () => null)); +const loadModelCatalog = vi.hoisted(() => vi.fn(async () => ({}))); +const findModelInCatalog = vi.hoisted(() => vi.fn(() => null)); +const modelSupportsVision = vi.hoisted(() => vi.fn(() => false)); +const resolveAgentDir = vi.hoisted(() => vi.fn(() => "/tmp/agent")); +const resolveDefaultModelForAgent = vi.hoisted(() => + vi.fn(() => ({ provider: "openai", model: "gpt-test" })), +); vi.mock("./draft-stream.js", () => ({ createTelegramDraftStream, @@ -95,16 +103,26 @@ vi.mock("./bot-message-dispatch.runtime.js", () => ({ resolveStorePath, })); +vi.mock("./bot-message-dispatch.agent.runtime.js", () => ({ + findModelInCatalog, + loadModelCatalog, + modelSupportsVision, + resolveAgentDir, + resolveDefaultModelForAgent, +})); + vi.mock("./sticker-cache.js", () => ({ cacheSticker: vi.fn(), getCachedSticker: () => null, getCacheStats: () => ({ count: 0 }), searchStickers: () => [], getAllCachedStickers: () => [], - describeStickerImage: vi.fn(), + describeStickerImage, })); let dispatchTelegramMessage: typeof import("./bot-message-dispatch.js").dispatchTelegramMessage; +let getTelegramAbortFenceSizeForTests: typeof import("./bot-message-dispatch.js").getTelegramAbortFenceSizeForTests; +let resetTelegramAbortFenceForTests: typeof import("./bot-message-dispatch.js").resetTelegramAbortFenceForTests; const telegramDepsForTest: TelegramBotDeps = { loadConfig: loadConfig as TelegramBotDeps["loadConfig"], @@ -135,10 +153,15 @@ describe("dispatchTelegramMessage draft streaming", () => { type TelegramMessageContext = Parameters[0]["context"]; beforeAll(async () => { - ({ dispatchTelegramMessage } = await import("./bot-message-dispatch.js")); + ({ + dispatchTelegramMessage, + getTelegramAbortFenceSizeForTests, + resetTelegramAbortFenceForTests, + } = await import("./bot-message-dispatch.js")); }); beforeEach(() => { + resetTelegramAbortFenceForTests(); createTelegramDraftStream.mockReset(); dispatchReplyWithBufferedBlockDispatcher.mockReset(); deliverReplies.mockReset(); @@ -162,6 +185,12 @@ describe("dispatchTelegramMessage draft streaming", () => { loadSessionStore.mockReset(); resolveStorePath.mockReset(); generateTopicLabel.mockReset(); + describeStickerImage.mockReset(); + loadModelCatalog.mockReset(); + findModelInCatalog.mockReset(); + modelSupportsVision.mockReset(); + resolveAgentDir.mockReset(); + resolveDefaultModelForAgent.mockReset(); loadConfig.mockReturnValue({}); dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({ queuedFinal: false, @@ -199,6 +228,15 @@ describe("dispatchTelegramMessage draft streaming", () => { resolveStorePath.mockReturnValue("/tmp/sessions.json"); loadSessionStore.mockReturnValue({}); generateTopicLabel.mockResolvedValue("Topic label"); + describeStickerImage.mockResolvedValue(null); + loadModelCatalog.mockResolvedValue({}); + findModelInCatalog.mockReturnValue(null); + modelSupportsVision.mockReturnValue(false); + resolveAgentDir.mockReturnValue("/tmp/agent"); + resolveDefaultModelForAgent.mockReturnValue({ + provider: "openai", + model: "gpt-test", + }); }); const createDraftStream = (messageId?: number) => createTestDraftStream({ messageId }); @@ -2683,6 +2721,608 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(draftB.clear).toHaveBeenCalledTimes(1); }); + it("ignores stale answer finalization after an abort dispatch supersedes the same session", async () => { + let releaseFirstFinal!: () => void; + const firstFinalGate = new Promise((resolve) => { + releaseFirstFinal = resolve; + }); + let resolvePreviewVisible!: () => void; + const previewVisible = new Promise((resolve) => { + resolvePreviewVisible = resolve; + }); + + const firstAnswerDraft = createTestDraftStream({ + messageId: 1001, + onUpdate: (text) => { + if (text === "Old reply partial") { + resolvePreviewVisible(); + } + }, + }); + const firstReasoningDraft = createDraftStream(); + const abortAnswerDraft = createDraftStream(); + const abortReasoningDraft = createDraftStream(); + createTelegramDraftStream + .mockImplementationOnce(() => firstAnswerDraft) + .mockImplementationOnce(() => firstReasoningDraft) + .mockImplementationOnce(() => abortAnswerDraft) + .mockImplementationOnce(() => abortReasoningDraft); + dispatchReplyWithBufferedBlockDispatcher + .mockImplementationOnce(async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Old reply partial" }); + await firstFinalGate; + await dispatcherOptions.deliver({ text: "Old reply final" }, { kind: "final" }); + return { queuedFinal: true }; + }) + .mockImplementationOnce(async ({ dispatcherOptions }) => { + await dispatcherOptions.deliver({ text: "⚙️ Agent was aborted." }, { kind: "final" }); + return { queuedFinal: true }; + }); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" }); + + const firstPromise = dispatchWithContext({ + context: createContext({ + ctxPayload: { + SessionKey: "s1", + Body: "earlier request", + RawBody: "earlier request", + } as never, + }), + }); + + await previewVisible; + + const abortPromise = dispatchWithContext({ + context: createContext({ + ctxPayload: { + SessionKey: "s1", + Body: "abort", + RawBody: "abort", + CommandBody: "abort", + } as never, + }), + }); + + await vi.waitFor(() => { + expect(deliverReplies).toHaveBeenCalledWith( + expect.objectContaining({ + replies: [{ text: "⚙️ Agent was aborted." }], + }), + ); + }); + + releaseFirstFinal(); + await Promise.all([firstPromise, abortPromise]); + + expect(editMessageTelegram).not.toHaveBeenCalledWith( + 123, + 1001, + "Old reply final", + expect.any(Object), + ); + expect(firstAnswerDraft.clear).not.toHaveBeenCalled(); + }); + + it("discards hidden short partials instead of flushing a stale preview after abort", async () => { + let releaseFirstCleanup!: () => void; + const firstCleanupGate = new Promise((resolve) => { + releaseFirstCleanup = resolve; + }); + let resolveShortPartialQueued!: () => void; + const shortPartialQueued = new Promise((resolve) => { + resolveShortPartialQueued = resolve; + }); + + const firstAnswerDraft = createTestDraftStream({ + onUpdate: (text) => { + if (text === "tiny") { + resolveShortPartialQueued(); + } + }, + onStop: () => { + throw new Error("superseded cleanup should discard instead of stop"); + }, + }); + const firstReasoningDraft = createDraftStream(); + const abortAnswerDraft = createDraftStream(); + const abortReasoningDraft = createDraftStream(); + createTelegramDraftStream + .mockImplementationOnce(() => firstAnswerDraft) + .mockImplementationOnce(() => firstReasoningDraft) + .mockImplementationOnce(() => abortAnswerDraft) + .mockImplementationOnce(() => abortReasoningDraft); + dispatchReplyWithBufferedBlockDispatcher + .mockImplementationOnce(async ({ replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "tiny" }); + await firstCleanupGate; + return { queuedFinal: false }; + }) + .mockImplementationOnce(async ({ dispatcherOptions }) => { + await dispatcherOptions.deliver({ text: "⚙️ Agent was aborted." }, { kind: "final" }); + return { queuedFinal: true }; + }); + deliverReplies.mockResolvedValue({ delivered: true }); + + const firstPromise = dispatchWithContext({ + context: createContext({ + ctxPayload: { + SessionKey: "s1", + Body: "earlier request", + RawBody: "earlier request", + } as never, + }), + }); + + await shortPartialQueued; + + const abortPromise = dispatchWithContext({ + context: createContext({ + ctxPayload: { + SessionKey: "s1", + Body: "abort", + RawBody: "abort", + CommandBody: "abort", + } as never, + }), + }); + + await vi.waitFor(() => { + expect(deliverReplies).toHaveBeenCalledWith( + expect.objectContaining({ + replies: [{ text: "⚙️ Agent was aborted." }], + }), + ); + }); + + releaseFirstCleanup(); + await Promise.all([firstPromise, abortPromise]); + + expect(firstAnswerDraft.discard).toHaveBeenCalledTimes(1); + expect(firstAnswerDraft.stop).not.toHaveBeenCalled(); + expect(firstAnswerDraft.clear).not.toHaveBeenCalled(); + }); + + it("suppresses stale replies when abort lands during async pre-dispatch work", async () => { + let releaseCatalogLoad!: () => void; + const catalogLoadGate = new Promise>((resolve) => { + releaseCatalogLoad = () => resolve({}); + }); + let resolveCatalogLoadStarted!: () => void; + const catalogLoadStarted = new Promise((resolve) => { + resolveCatalogLoadStarted = resolve; + }); + + loadModelCatalog.mockImplementationOnce(async () => { + resolveCatalogLoadStarted(); + return await catalogLoadGate; + }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ ctx, dispatcherOptions }) => { + if (ctx.CommandBody === "abort") { + await dispatcherOptions.deliver({ text: "⚙️ Agent was aborted." }, { kind: "final" }); + return { queuedFinal: true }; + } + await dispatcherOptions.deliver({ text: "Old reply final" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + + const firstPromise = dispatchWithContext({ + context: createContext({ + ctxPayload: { + SessionKey: "s1", + Body: "earlier request", + RawBody: "earlier request", + MediaPath: "/tmp/sticker.png", + Sticker: { + fileId: "file-id", + fileUniqueId: "file-unique-id", + }, + } as never, + }), + }); + + await catalogLoadStarted; + + const abortPromise = dispatchWithContext({ + context: createContext({ + ctxPayload: { + SessionKey: "s1", + Body: "abort", + RawBody: "abort", + CommandBody: "abort", + } as never, + }), + }); + + await vi.waitFor(() => { + expect(deliverReplies).toHaveBeenCalledWith( + expect.objectContaining({ + replies: [{ text: "⚙️ Agent was aborted." }], + }), + ); + }); + + releaseCatalogLoad(); + await Promise.all([firstPromise, abortPromise]); + + expect(deliverReplies).not.toHaveBeenCalledWith( + expect.objectContaining({ + replies: [{ text: "Old reply final" }], + }), + ); + }); + + it("releases the abort fence when pre-dispatch setup throws", async () => { + describeStickerImage.mockRejectedValueOnce(new Error("sticker setup failed")); + + await expect( + dispatchWithContext({ + context: createContext({ + ctxPayload: { + SessionKey: "s1", + Body: "earlier request", + RawBody: "earlier request", + MediaPath: "/tmp/sticker.png", + Sticker: { + fileId: "file-id", + fileUniqueId: "file-unique-id", + }, + } as never, + }), + }), + ).rejects.toThrow("sticker setup failed"); + + expect(getTelegramAbortFenceSizeForTests()).toBe(0); + }); + + it("keeps older answer finalization when abort targets a different session", async () => { + let releaseFirstFinal!: () => void; + const firstFinalGate = new Promise((resolve) => { + releaseFirstFinal = resolve; + }); + let resolvePreviewVisible!: () => void; + const previewVisible = new Promise((resolve) => { + resolvePreviewVisible = resolve; + }); + + const firstAnswerDraft = createTestDraftStream({ + messageId: 1001, + onUpdate: (text) => { + if (text === "Old reply partial") { + resolvePreviewVisible(); + } + }, + }); + const firstReasoningDraft = createDraftStream(); + const abortAnswerDraft = createDraftStream(); + const abortReasoningDraft = createDraftStream(); + createTelegramDraftStream + .mockImplementationOnce(() => firstAnswerDraft) + .mockImplementationOnce(() => firstReasoningDraft) + .mockImplementationOnce(() => abortAnswerDraft) + .mockImplementationOnce(() => abortReasoningDraft); + dispatchReplyWithBufferedBlockDispatcher + .mockImplementationOnce(async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Old reply partial" }); + await firstFinalGate; + await dispatcherOptions.deliver({ text: "Old reply final" }, { kind: "final" }); + return { queuedFinal: true }; + }) + .mockImplementationOnce(async ({ dispatcherOptions }) => { + await dispatcherOptions.deliver({ text: "⚙️ Agent was aborted." }, { kind: "final" }); + return { queuedFinal: true }; + }); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" }); + + const firstPromise = dispatchWithContext({ + context: createContext({ + ctxPayload: { + SessionKey: "s1", + Body: "earlier request", + RawBody: "earlier request", + } as never, + }), + }); + + await previewVisible; + + const abortPromise = dispatchWithContext({ + context: createContext({ + ctxPayload: { + SessionKey: "s2", + CommandTargetSessionKey: "s2", + Body: "abort", + RawBody: "abort", + CommandBody: "abort", + } as never, + }), + }); + + await vi.waitFor(() => { + expect(deliverReplies).toHaveBeenCalledWith( + expect.objectContaining({ + replies: [{ text: "⚙️ Agent was aborted." }], + }), + ); + }); + + releaseFirstFinal(); + await Promise.all([firstPromise, abortPromise]); + + expect(editMessageTelegram).toHaveBeenCalledWith( + 123, + 1001, + "Old reply final", + expect.any(Object), + ); + }); + + it("finalizes stale status reactions when an abort supersedes the same session", async () => { + let releaseFirstFinal!: () => void; + const firstFinalGate = new Promise((resolve) => { + releaseFirstFinal = resolve; + }); + let resolvePreviewVisible!: () => void; + const previewVisible = new Promise((resolve) => { + resolvePreviewVisible = resolve; + }); + + const statusReactionController = { + setQueued: vi.fn(), + setThinking: vi.fn(async () => {}), + setTool: vi.fn(async () => {}), + setCompacting: vi.fn(async () => {}), + cancelPending: vi.fn(), + setError: vi.fn(async () => {}), + setDone: vi.fn(async () => {}), + }; + const firstAnswerDraft = createTestDraftStream({ + messageId: 1001, + onUpdate: (text) => { + if (text === "Old reply partial") { + resolvePreviewVisible(); + } + }, + }); + const firstReasoningDraft = createDraftStream(); + const abortAnswerDraft = createDraftStream(); + const abortReasoningDraft = createDraftStream(); + createTelegramDraftStream + .mockImplementationOnce(() => firstAnswerDraft) + .mockImplementationOnce(() => firstReasoningDraft) + .mockImplementationOnce(() => abortAnswerDraft) + .mockImplementationOnce(() => abortReasoningDraft); + dispatchReplyWithBufferedBlockDispatcher + .mockImplementationOnce(async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Old reply partial" }); + await firstFinalGate; + await dispatcherOptions.deliver({ text: "Old reply final" }, { kind: "final" }); + return { queuedFinal: true }; + }) + .mockImplementationOnce(async ({ dispatcherOptions }) => { + await dispatcherOptions.deliver({ text: "⚙️ Agent was aborted." }, { kind: "final" }); + return { queuedFinal: true }; + }); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" }); + + const firstPromise = dispatchWithContext({ + context: createContext({ + statusReactionController: statusReactionController as never, + ctxPayload: { + SessionKey: "s1", + Body: "earlier request", + RawBody: "earlier request", + } as never, + }), + }); + + await previewVisible; + + const abortPromise = dispatchWithContext({ + context: createContext({ + ctxPayload: { + SessionKey: "s1", + Body: "abort", + RawBody: "abort", + CommandBody: "abort", + } as never, + }), + }); + + await vi.waitFor(() => { + expect(deliverReplies).toHaveBeenCalledWith( + expect.objectContaining({ + replies: [{ text: "⚙️ Agent was aborted." }], + }), + ); + }); + + releaseFirstFinal(); + await Promise.all([firstPromise, abortPromise]); + + expect(statusReactionController.setDone).toHaveBeenCalledTimes(1); + expect(statusReactionController.setError).not.toHaveBeenCalled(); + }); + + it("keeps an existing preview when abort arrives during queued draft-lane cleanup", async () => { + let releaseMaterialize!: () => void; + const materializeGate = new Promise((resolve) => { + releaseMaterialize = resolve; + }); + let resolveMaterializeStarted!: () => void; + const materializeStarted = new Promise((resolve) => { + resolveMaterializeStarted = resolve; + }); + let resolvePreviewVisible!: () => void; + const previewVisible = new Promise((resolve) => { + resolvePreviewVisible = resolve; + }); + + const firstAnswerDraft = createTestDraftStream({ + messageId: 1001, + clearMessageIdOnForceNew: true, + onUpdate: (text) => { + if (text === "Old reply partial") { + resolvePreviewVisible(); + } + }, + }); + firstAnswerDraft.materialize.mockImplementation(async () => { + resolveMaterializeStarted(); + await materializeGate; + return 1001; + }); + const firstReasoningDraft = createDraftStream(); + const abortAnswerDraft = createDraftStream(); + const abortReasoningDraft = createDraftStream(); + const bot = createBot(); + createTelegramDraftStream + .mockImplementationOnce(() => firstAnswerDraft) + .mockImplementationOnce(() => firstReasoningDraft) + .mockImplementationOnce(() => abortAnswerDraft) + .mockImplementationOnce(() => abortReasoningDraft); + dispatchReplyWithBufferedBlockDispatcher + .mockImplementationOnce(async ({ replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Old reply partial" }); + void replyOptions?.onAssistantMessageStart?.(); + return { queuedFinal: false }; + }) + .mockImplementationOnce(async ({ dispatcherOptions }) => { + await dispatcherOptions.deliver({ text: "⚙️ Agent was aborted." }, { kind: "final" }); + return { queuedFinal: true }; + }); + deliverReplies.mockResolvedValue({ delivered: true }); + + const firstPromise = dispatchWithContext({ + context: createContext({ + ctxPayload: { + SessionKey: "s1", + Body: "earlier request", + RawBody: "earlier request", + } as never, + }), + bot, + }); + + await previewVisible; + await materializeStarted; + + const abortPromise = dispatchWithContext({ + context: createContext({ + ctxPayload: { + SessionKey: "s1", + Body: "abort", + RawBody: "abort", + CommandBody: "abort", + } as never, + }), + bot, + }); + + await vi.waitFor(() => { + expect(deliverReplies).toHaveBeenCalledWith( + expect.objectContaining({ + replies: [{ text: "⚙️ Agent was aborted." }], + }), + ); + }); + + releaseMaterialize(); + await Promise.all([firstPromise, abortPromise]); + + expect(firstAnswerDraft.clear).not.toHaveBeenCalled(); + expect(bot.api.deleteMessage as ReturnType).not.toHaveBeenCalledWith(123, 1001); + }); + + it("ignores stale answer finalization when abort targets the session via CommandTargetSessionKey", async () => { + let releaseFirstFinal!: () => void; + const firstFinalGate = new Promise((resolve) => { + releaseFirstFinal = resolve; + }); + let resolvePreviewVisible!: () => void; + const previewVisible = new Promise((resolve) => { + resolvePreviewVisible = resolve; + }); + + const firstAnswerDraft = createTestDraftStream({ + messageId: 1001, + onUpdate: (text) => { + if (text === "Old reply partial") { + resolvePreviewVisible(); + } + }, + }); + const firstReasoningDraft = createDraftStream(); + const abortAnswerDraft = createDraftStream(); + const abortReasoningDraft = createDraftStream(); + createTelegramDraftStream + .mockImplementationOnce(() => firstAnswerDraft) + .mockImplementationOnce(() => firstReasoningDraft) + .mockImplementationOnce(() => abortAnswerDraft) + .mockImplementationOnce(() => abortReasoningDraft); + dispatchReplyWithBufferedBlockDispatcher + .mockImplementationOnce(async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Old reply partial" }); + await firstFinalGate; + await dispatcherOptions.deliver({ text: "Old reply final" }, { kind: "final" }); + return { queuedFinal: true }; + }) + .mockImplementationOnce(async ({ dispatcherOptions }) => { + await dispatcherOptions.deliver({ text: "⚙️ Agent was aborted." }, { kind: "final" }); + return { queuedFinal: true }; + }); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" }); + + const firstPromise = dispatchWithContext({ + context: createContext({ + ctxPayload: { + SessionKey: "s1", + Body: "earlier request", + RawBody: "earlier request", + } as never, + }), + }); + + await previewVisible; + + const abortPromise = dispatchWithContext({ + context: createContext({ + ctxPayload: { + SessionKey: "telegram:123:control", + CommandTargetSessionKey: "s1", + Body: "abort", + RawBody: "abort", + CommandBody: "abort", + } as never, + }), + }); + + await vi.waitFor(() => { + expect(deliverReplies).toHaveBeenCalledWith( + expect.objectContaining({ + replies: [{ text: "⚙️ Agent was aborted." }], + }), + ); + }); + + releaseFirstFinal(); + await Promise.all([firstPromise, abortPromise]); + + expect(editMessageTelegram).not.toHaveBeenCalledWith( + 123, + 1001, + "Old reply final", + expect.any(Object), + ); + expect(firstAnswerDraft.clear).not.toHaveBeenCalled(); + }); + it("swallows post-connect network timeout on preview edit to prevent duplicate messages", async () => { const draftStream = createDraftStream(999); createTelegramDraftStream.mockReturnValue(draftStream); diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index 9d80424da8b..0abb9a6d03d 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -14,7 +14,7 @@ import type { import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { clearHistoryEntriesIfEnabled } from "openclaw/plugin-sdk/reply-history"; import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload"; -import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime"; +import { isAbortRequestText, type ReplyPayload } from "openclaw/plugin-sdk/reply-runtime"; import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env"; import { danger, logVerbose } from "openclaw/plugin-sdk/runtime-env"; import { defaultTelegramBotDeps, type TelegramBotDeps } from "./bot-deps.js"; @@ -129,6 +129,71 @@ type DispatchTelegramMessageParams = { type TelegramReasoningLevel = "off" | "on" | "stream"; +type TelegramAbortFenceState = { + generation: number; + activeDispatches: number; +}; + +// Abort can arrive on Telegram's control lane ahead of older same-session reply work. +const telegramAbortFenceByKey = new Map(); + +function normalizeTelegramFenceKey(value: unknown): string | undefined { + if (typeof value !== "string") { + return undefined; + } + const trimmed = value.trim(); + return trimmed.length > 0 ? trimmed : undefined; +} + +function resolveTelegramAbortFenceKey(params: { + ctxPayload: { SessionKey?: string; CommandTargetSessionKey?: string }; + chatId: number | string; + threadSpec: { id?: number | string | null; scope?: string }; +}): string { + return ( + normalizeTelegramFenceKey(params.ctxPayload.CommandTargetSessionKey) ?? + normalizeTelegramFenceKey(params.ctxPayload.SessionKey) ?? + `telegram:${String(params.chatId)}:${params.threadSpec.scope ?? "default"}:${params.threadSpec.id ?? "root"}` + ); +} + +function beginTelegramAbortFence(params: { key: string; supersede: boolean }): number { + const existing = telegramAbortFenceByKey.get(params.key); + const state: TelegramAbortFenceState = existing ?? { + generation: 0, + activeDispatches: 0, + }; + if (params.supersede) { + state.generation += 1; + } + state.activeDispatches += 1; + telegramAbortFenceByKey.set(params.key, state); + return state.generation; +} + +function isTelegramAbortFenceSuperseded(params: { key: string; generation: number }): boolean { + return (telegramAbortFenceByKey.get(params.key)?.generation ?? 0) !== params.generation; +} + +function endTelegramAbortFence(key: string): void { + const state = telegramAbortFenceByKey.get(key); + if (!state) { + return; + } + state.activeDispatches -= 1; + if (state.activeDispatches <= 0) { + telegramAbortFenceByKey.delete(key); + } +} + +export function getTelegramAbortFenceSizeForTests(): number { + return telegramAbortFenceByKey.size; +} + +export function resetTelegramAbortFenceForTests(): void { + telegramAbortFenceByKey.clear(); +} + function resolveTelegramReasoningLevel(params: { cfg: OpenClawConfig; sessionKey?: string; @@ -187,7 +252,26 @@ export const dispatchTelegramMessage = async ({ removeAckAfterReply, statusReactionController, } = context; - + const dispatchFenceKey = resolveTelegramAbortFenceKey({ + ctxPayload, + chatId, + threadSpec, + }); + let abortFenceGeneration: number | undefined; + let dispatchWasSuperseded = false; + const isDispatchSuperseded = () => + abortFenceGeneration !== undefined && + isTelegramAbortFenceSuperseded({ + key: dispatchFenceKey, + generation: abortFenceGeneration, + }); + const releaseAbortFence = () => { + if (abortFenceGeneration === undefined) { + return; + } + endTelegramAbortFence(dispatchFenceKey); + abortFenceGeneration = undefined; + }; const draftMaxChars = Math.min(textLimit, 4096); const tableMode = resolveMarkdownTableMode({ cfg, @@ -216,9 +300,7 @@ export const dispatchTelegramMessage = async ({ const draftReplyToMessageId = replyToMode !== "off" && typeof msg.message_id === "number" ? msg.message_id : undefined; const draftMinInitialChars = DRAFT_MIN_INITIAL_CHARS; - // Keep DM preview lanes on real message transport. Native draft previews still - // require a draft->message materialize hop, and that overlap keeps reintroducing - // a visible duplicate flash at finalize time. + // DM draft previews still duplicate briefly at materialize time. const useMessagePreviewTransportForDm = threadSpec?.scope === "dm" && canStreamAnswerDraft; const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId); const archivedAnswerPreviews: ArchivedPreview[] = []; @@ -264,9 +346,6 @@ export const dispatchTelegramMessage = async ({ answer: createDraftLane("answer", canStreamAnswerDraft), reasoning: createDraftLane("reasoning", canStreamReasoningDraft), }; - // Active preview lifecycle answers "can this current preview still be - // finalized?" Cleanup retention is separate so archived-preview decisions do - // not poison the active lane. const activePreviewLifecycleByLane: Record = { answer: "transient", reasoning: "transient", @@ -279,14 +358,16 @@ export const dispatchTelegramMessage = async ({ const reasoningLane = lanes.reasoning; let splitReasoningOnNextStream = false; let skipNextAnswerMessageStartRotation = false; - // If compaction interrupts a still-transient answer preview, keep the next - // assistant-message boundary on that same preview instead of materializing a - // duplicate retry message. let pendingCompactionReplayBoundary = false; let draftLaneEventQueue = Promise.resolve(); const reasoningStepState = createTelegramReasoningStepState(); const enqueueDraftLaneEvent = (task: () => Promise): Promise => { - const next = draftLaneEventQueue.then(task); + const next = draftLaneEventQueue.then(async () => { + if (isDispatchSuperseded()) { + return; + } + await task(); + }); draftLaneEventQueue = next.catch((err) => { logVerbose(`telegram: draft lane callback failed: ${String(err)}`); }); @@ -320,8 +401,6 @@ export const dispatchTelegramMessage = async ({ const rotateAnswerLaneForNewAssistantMessage = async () => { let didForceNewMessage = false; if (answerLane.hasStreamedMessage) { - // Materialize the current streamed draft into a permanent message - // so it remains visible across tool boundaries. const materializedId = await answerLane.stream?.materialize?.(); const previewMessageId = materializedId ?? answerLane.stream?.messageId(); if ( @@ -339,7 +418,6 @@ export const dispatchTelegramMessage = async ({ } resetDraftLaneState(answerLane); if (didForceNewMessage) { - // New assistant message boundary: this lane now tracks a fresh preview lifecycle. activePreviewLifecycleByLane.answer = "transient"; retainPreviewOnCleanupByLane.answer = false; } @@ -353,11 +431,7 @@ export const dispatchTelegramMessage = async ({ if (text === lane.lastPartialText) { return; } - // Mark that we've received streaming content (for forceNewMessage decision). lane.hasStreamedMessage = true; - // Some providers briefly emit a shorter prefix snapshot (for example - // "Sure." -> "Sure" -> "Sure."). Keep the longer preview to avoid - // visible punctuation flicker. if ( lane.lastPartialText && lane.lastPartialText.startsWith(text) && @@ -372,9 +446,6 @@ export const dispatchTelegramMessage = async ({ const split = splitTextIntoLaneSegments(text); const hasAnswerSegment = split.segments.some((segment) => segment.lane === "answer"); if (hasAnswerSegment && activePreviewLifecycleByLane.answer !== "transient") { - // Some providers can emit the first partial of a new assistant message before - // onAssistantMessageStart() arrives. Rotate preemptively so we do not edit - // the previously finalized preview message with the next message's text. skipNextAnswerMessageStartRotation = await rotateAnswerLaneForNewAssistantMessage(); } for (const segment of split.segments) { @@ -405,56 +476,12 @@ export const dispatchTelegramMessage = async ({ const chunkMode = resolveChunkMode(cfg, "telegram", route.accountId); - // Handle uncached stickers: get a dedicated vision description before dispatch - // This ensures we cache a raw description rather than a conversational response - const sticker = ctxPayload.Sticker; - if (sticker?.fileId && sticker.fileUniqueId && ctxPayload.MediaPath) { - const agentDir = resolveAgentDir(cfg, route.agentId); - const stickerSupportsVision = await resolveStickerVisionSupport(cfg, route.agentId); - let description = sticker.cachedDescription ?? null; - if (!description) { - description = await describeStickerImage({ - imagePath: ctxPayload.MediaPath, - cfg, - agentDir, - agentId: route.agentId, - }); - } - if (description) { - // Format the description with sticker context - const stickerContext = [sticker.emoji, sticker.setName ? `from "${sticker.setName}"` : null] - .filter(Boolean) - .join(" "); - const formattedDesc = `[Sticker${stickerContext ? ` ${stickerContext}` : ""}] ${description}`; - - sticker.cachedDescription = description; - if (!stickerSupportsVision) { - // Update context to use description instead of image - ctxPayload.Body = formattedDesc; - ctxPayload.BodyForAgent = formattedDesc; - // Drop only the sticker attachment; keep replied media context if present. - pruneStickerMediaFromContext(ctxPayload, { - stickerMediaIncluded: ctxPayload.StickerMediaIncluded, - }); - } - - // Cache the description for future encounters - if (sticker.fileId) { - cacheSticker({ - fileId: sticker.fileId, - fileUniqueId: sticker.fileUniqueId, - emoji: sticker.emoji, - setName: sticker.setName, - description, - cachedAt: new Date().toISOString(), - receivedFrom: ctxPayload.From, - }); - logVerbose(`telegram: cached sticker description for ${sticker.fileUniqueId}`); - } else { - logVerbose(`telegram: skipped sticker cache (missing fileId)`); - } - } - } + abortFenceGeneration = beginTelegramAbortFence({ + key: dispatchFenceKey, + supersede: isAbortRequestText( + ctxPayload.CommandBody ?? ctxPayload.RawBody ?? ctxPayload.Body ?? "", + ), + }); const replyQuoteText = ctxPayload.ReplyToIsQuote && ctxPayload.ReplyToBody @@ -463,7 +490,11 @@ export const dispatchTelegramMessage = async ({ const deliveryState = createLaneDeliveryStateTracker(); const clearGroupHistory = () => { if (isGroup && historyKey) { - clearHistoryEntriesIfEnabled({ historyMap: groupHistories, historyKey, limit: historyLimit }); + clearHistoryEntriesIfEnabled({ + historyMap: groupHistories, + historyKey, + limit: historyLimit, + }); } }; const deliveryBaseOptions = { @@ -485,449 +516,531 @@ export const dispatchTelegramMessage = async ({ replyQuoteText, }; const silentErrorReplies = telegramCfg.silentErrorReplies === true; - const applyTextToPayload = (payload: ReplyPayload, text: string): ReplyPayload => { - if (payload.text === text) { - return payload; - } - return { ...payload, text }; - }; - const sendPayload = async (payload: ReplyPayload) => { - const result = await (telegramDeps.deliverReplies ?? deliverReplies)({ - ...deliveryBaseOptions, - replies: [payload], - onVoiceRecording: sendRecordVoice, - silent: silentErrorReplies && payload.isError === true, - mediaLoader: telegramDeps.loadWebMedia, - }); - if (result.delivered) { - deliveryState.markDelivered(); - } - return result.delivered; - }; - const emitPreviewFinalizedHook = (result: LaneDeliveryResult) => { - if (result.kind !== "preview-finalized") { - return; - } - (telegramDeps.emitInternalMessageSentHook ?? emitInternalMessageSentHook)({ - sessionKeyForInternalHooks: deliveryBaseOptions.sessionKeyForInternalHooks, - chatId: deliveryBaseOptions.chatId, - accountId: deliveryBaseOptions.accountId, - content: result.delivery.content, - success: true, - messageId: result.delivery.messageId, - isGroup: deliveryBaseOptions.mirrorIsGroup, - groupId: deliveryBaseOptions.mirrorGroupId, - }); - }; - const deliverLaneText = createLaneTextDeliverer({ - lanes, - archivedAnswerPreviews, - activePreviewLifecycleByLane, - retainPreviewOnCleanupByLane, - draftMaxChars, - applyTextToPayload, - sendPayload, - flushDraftLane, - stopDraftLane: async (lane) => { - await lane.stream?.stop(); - }, - editPreview: async ({ messageId, text, previewButtons }) => { - await (telegramDeps.editMessageTelegram ?? editMessageTelegram)(chatId, messageId, text, { - api: bot.api, - cfg, - accountId: route.accountId, - linkPreview: telegramCfg.linkPreview, - buttons: previewButtons, - }); - }, - deletePreviewMessage: async (messageId) => { - await bot.api.deleteMessage(chatId, messageId); - }, - log: logVerbose, - markDelivered: () => { - deliveryState.markDelivered(); - }, - }); - + const isDmTopic = !isGroup && threadSpec.scope === "dm" && threadSpec.id != null; let queuedFinal = false; let hadErrorReplyFailureOrSkip = false; - - // Determine if this is the first turn in session (for auto-topic-label). - const isDmTopic = !isGroup && threadSpec.scope === "dm" && threadSpec.id != null; - let isFirstTurnInSession = false; - if (isDmTopic) { - try { - const storePath = telegramDeps.resolveStorePath(cfg.session?.store, { - agentId: route.agentId, - }); - const store = (telegramDeps.loadSessionStore ?? loadSessionStore)(storePath, { - skipCache: true, - }); - const sessionKey = ctxPayload.SessionKey; - if (sessionKey) { - const entry = resolveSessionStoreEntry({ store, sessionKey }).existing; - isFirstTurnInSession = !entry?.systemSent; - } else { - logVerbose("auto-topic-label: SessionKey is absent, skipping first-turn detection"); + let dispatchError: unknown; + + try { + const sticker = ctxPayload.Sticker; + if (sticker?.fileId && sticker.fileUniqueId && ctxPayload.MediaPath) { + const agentDir = resolveAgentDir(cfg, route.agentId); + const stickerSupportsVision = await resolveStickerVisionSupport(cfg, route.agentId); + let description = sticker.cachedDescription ?? null; + if (!description) { + description = await describeStickerImage({ + imagePath: ctxPayload.MediaPath, + cfg, + agentDir, + agentId: route.agentId, + }); + } + if (description) { + const stickerContext = [sticker.emoji, sticker.setName ? `from "${sticker.setName}"` : null] + .filter(Boolean) + .join(" "); + const formattedDesc = `[Sticker${stickerContext ? ` ${stickerContext}` : ""}] ${description}`; + + sticker.cachedDescription = description; + if (!stickerSupportsVision) { + ctxPayload.Body = formattedDesc; + ctxPayload.BodyForAgent = formattedDesc; + pruneStickerMediaFromContext(ctxPayload, { + stickerMediaIncluded: ctxPayload.StickerMediaIncluded, + }); + } + cacheSticker({ + fileId: sticker.fileId, + fileUniqueId: sticker.fileUniqueId, + emoji: sticker.emoji, + setName: sticker.setName, + description, + cachedAt: new Date().toISOString(), + receivedFrom: ctxPayload.From, + }); + logVerbose(`telegram: cached sticker description for ${sticker.fileUniqueId}`); } - } catch (err) { - logVerbose(`auto-topic-label: session store error: ${formatErrorMessage(err)}`); } - } - if (statusReactionController) { - void statusReactionController.setThinking(); - } - - const { onModelSelected, ...replyPipeline } = ( - telegramDeps.createChannelReplyPipeline ?? createChannelReplyPipeline - )({ - cfg, - agentId: route.agentId, - channel: "telegram", - accountId: route.accountId, - typing: { - start: sendTyping, - onStartError: (err) => { - logTypingFailure({ - log: logVerbose, - channel: "telegram", - target: String(chatId), - error: err, + const applyTextToPayload = (payload: ReplyPayload, text: string): ReplyPayload => { + if (payload.text === text) { + return payload; + } + return { ...payload, text }; + }; + const sendPayload = async (payload: ReplyPayload) => { + if (isDispatchSuperseded()) { + return false; + } + const result = await (telegramDeps.deliverReplies ?? deliverReplies)({ + ...deliveryBaseOptions, + replies: [payload], + onVoiceRecording: sendRecordVoice, + silent: silentErrorReplies && payload.isError === true, + mediaLoader: telegramDeps.loadWebMedia, + }); + if (result.delivered) { + deliveryState.markDelivered(); + } + return result.delivered; + }; + const emitPreviewFinalizedHook = (result: LaneDeliveryResult) => { + if (isDispatchSuperseded() || result.kind !== "preview-finalized") { + return; + } + (telegramDeps.emitInternalMessageSentHook ?? emitInternalMessageSentHook)({ + sessionKeyForInternalHooks: deliveryBaseOptions.sessionKeyForInternalHooks, + chatId: deliveryBaseOptions.chatId, + accountId: deliveryBaseOptions.accountId, + content: result.delivery.content, + success: true, + messageId: result.delivery.messageId, + isGroup: deliveryBaseOptions.mirrorIsGroup, + groupId: deliveryBaseOptions.mirrorGroupId, + }); + }; + const deliverLaneText = createLaneTextDeliverer({ + lanes, + archivedAnswerPreviews, + activePreviewLifecycleByLane, + retainPreviewOnCleanupByLane, + draftMaxChars, + applyTextToPayload, + sendPayload, + flushDraftLane, + stopDraftLane: async (lane) => { + await lane.stream?.stop(); + }, + editPreview: async ({ messageId, text, previewButtons }) => { + if (isDispatchSuperseded()) { + return; + } + await (telegramDeps.editMessageTelegram ?? editMessageTelegram)(chatId, messageId, text, { + api: bot.api, + cfg, + accountId: route.accountId, + linkPreview: telegramCfg.linkPreview, + buttons: previewButtons, }); }, - }, - }); + deletePreviewMessage: async (messageId) => { + if (isDispatchSuperseded()) { + return; + } + await bot.api.deleteMessage(chatId, messageId); + }, + log: logVerbose, + markDelivered: () => { + deliveryState.markDelivered(); + }, + }); - let dispatchError: unknown; - try { - ({ queuedFinal } = await telegramDeps.dispatchReplyWithBufferedBlockDispatcher({ - ctx: ctxPayload, + if (isDmTopic) { + try { + const storePath = telegramDeps.resolveStorePath(cfg.session?.store, { + agentId: route.agentId, + }); + const store = (telegramDeps.loadSessionStore ?? loadSessionStore)(storePath, { + skipCache: true, + }); + const sessionKey = ctxPayload.SessionKey; + if (sessionKey) { + const entry = resolveSessionStoreEntry({ store, sessionKey }).existing; + isFirstTurnInSession = !entry?.systemSent; + } else { + logVerbose("auto-topic-label: SessionKey is absent, skipping first-turn detection"); + } + } catch (err) { + logVerbose(`auto-topic-label: session store error: ${formatErrorMessage(err)}`); + } + } + + if (statusReactionController) { + void statusReactionController.setThinking(); + } + + const { onModelSelected, ...replyPipeline } = ( + telegramDeps.createChannelReplyPipeline ?? createChannelReplyPipeline + )({ cfg, - dispatcherOptions: { - ...replyPipeline, - deliver: async (payload, info) => { - const clearPendingCompactionReplayBoundaryOnVisibleBoundary = (didDeliver: boolean) => { - if (didDeliver && info.kind !== "final") { - pendingCompactionReplayBoundary = false; - } - }; - if (payload.isError === true) { - hadErrorReplyFailureOrSkip = true; - } - if (info.kind === "final") { - // Assistant callbacks are fire-and-forget; ensure queued boundary - // rotations/partials are applied before final delivery mapping. - await enqueueDraftLaneEvent(async () => {}); - } - if ( - shouldSuppressLocalTelegramExecApprovalPrompt({ - cfg, - accountId: route.accountId, - payload, - }) - ) { - queuedFinal = true; - return; - } - const previewButtons = ( - payload.channelData?.telegram as { buttons?: TelegramInlineButtons } | undefined - )?.buttons; - const split = splitTextIntoLaneSegments(payload.text); - const segments = split.segments; - const reply = resolveSendableOutboundReplyParts(payload); - const _hasMedia = reply.hasMedia; - - const flushBufferedFinalAnswer = async () => { - const buffered = reasoningStepState.takeBufferedFinalAnswer(); - if (!buffered) { - return; - } - const bufferedButtons = ( - buffered.payload.channelData?.telegram as - | { buttons?: TelegramInlineButtons } - | undefined - )?.buttons; - await deliverLaneText({ - laneName: "answer", - text: buffered.text, - payload: buffered.payload, - infoKind: "final", - previewButtons: bufferedButtons, - }); - reasoningStepState.resetForNextStep(); - }; - - for (const segment of segments) { - if ( - segment.lane === "answer" && - info.kind === "final" && - reasoningStepState.shouldBufferFinalAnswer() - ) { - reasoningStepState.bufferFinalAnswer({ - payload, - text: segment.text, - }); - continue; - } - if (segment.lane === "reasoning") { - reasoningStepState.noteReasoningHint(); - } - const result = await deliverLaneText({ - laneName: segment.lane, - text: segment.text, - payload, - infoKind: info.kind, - previewButtons, - allowPreviewUpdateForNonFinal: segment.lane === "reasoning", - }); - if (info.kind === "final") { - emitPreviewFinalizedHook(result); - } - if (segment.lane === "reasoning") { - if (result.kind !== "skipped") { - reasoningStepState.noteReasoningDelivered(); - await flushBufferedFinalAnswer(); - } - continue; - } - if (info.kind === "final") { - if (reasoningLane.hasStreamedMessage) { - activePreviewLifecycleByLane.reasoning = "complete"; - retainPreviewOnCleanupByLane.reasoning = true; - } - reasoningStepState.resetForNextStep(); - } - } - if (segments.length > 0) { - if (info.kind === "final") { - pendingCompactionReplayBoundary = false; - } - return; - } - if (split.suppressedReasoningOnly) { - if (reply.hasMedia) { - const payloadWithoutSuppressedReasoning = - typeof payload.text === "string" ? { ...payload, text: "" } : payload; - clearPendingCompactionReplayBoundaryOnVisibleBoundary( - await sendPayload(payloadWithoutSuppressedReasoning), - ); - } - if (info.kind === "final") { - await flushBufferedFinalAnswer(); - pendingCompactionReplayBoundary = false; - } - return; - } - - if (info.kind === "final") { - await answerLane.stream?.stop(); - await reasoningLane.stream?.stop(); - reasoningStepState.resetForNextStep(); - } - const canSendAsIs = reply.hasMedia || reply.text.length > 0; - if (!canSendAsIs) { - if (info.kind === "final") { - await flushBufferedFinalAnswer(); - pendingCompactionReplayBoundary = false; - } - return; - } - clearPendingCompactionReplayBoundaryOnVisibleBoundary(await sendPayload(payload)); - if (info.kind === "final") { - await flushBufferedFinalAnswer(); - pendingCompactionReplayBoundary = false; - } - }, - onSkip: (payload, info) => { - if (payload.isError === true) { - hadErrorReplyFailureOrSkip = true; - } - if (info.reason !== "silent") { - deliveryState.markNonSilentSkip(); - } - }, - onError: (err, info) => { - const errorPolicy = resolveTelegramErrorPolicy({ - accountConfig: telegramCfg, - groupConfig, - topicConfig, + agentId: route.agentId, + channel: "telegram", + accountId: route.accountId, + typing: { + start: sendTyping, + onStartError: (err) => { + logTypingFailure({ + log: logVerbose, + channel: "telegram", + target: String(chatId), + error: err, }); - if (isSilentErrorPolicy(errorPolicy.policy)) { - return; - } - if ( - errorPolicy.policy === "once" && - shouldSuppressTelegramError({ - scopeKey: buildTelegramErrorScopeKey({ - accountId: route.accountId, - chatId, - threadId: threadSpec.id, - }), - cooldownMs: errorPolicy.cooldownMs, - errorMessage: String(err), - }) - ) { - return; - } - deliveryState.markNonSilentFailure(); - runtime.error?.(danger(`telegram ${info.kind} reply failed: ${String(err)}`)); }, }, - replyOptions: { - skillFilter, - disableBlockStreaming, - onPartialReply: - answerLane.stream || reasoningLane.stream + }); + + try { + ({ queuedFinal } = await telegramDeps.dispatchReplyWithBufferedBlockDispatcher({ + ctx: ctxPayload, + cfg, + dispatcherOptions: { + ...replyPipeline, + deliver: async (payload, info) => { + if (isDispatchSuperseded()) { + return; + } + const clearPendingCompactionReplayBoundaryOnVisibleBoundary = (didDeliver: boolean) => { + if (didDeliver && info.kind !== "final") { + pendingCompactionReplayBoundary = false; + } + }; + if (payload.isError === true) { + hadErrorReplyFailureOrSkip = true; + } + if (info.kind === "final") { + await enqueueDraftLaneEvent(async () => {}); + } + if ( + shouldSuppressLocalTelegramExecApprovalPrompt({ + cfg, + accountId: route.accountId, + payload, + }) + ) { + queuedFinal = true; + return; + } + const previewButtons = ( + payload.channelData?.telegram as { buttons?: TelegramInlineButtons } | undefined + )?.buttons; + const split = splitTextIntoLaneSegments(payload.text); + const segments = split.segments; + const reply = resolveSendableOutboundReplyParts(payload); + const _hasMedia = reply.hasMedia; + + const flushBufferedFinalAnswer = async () => { + const buffered = reasoningStepState.takeBufferedFinalAnswer(); + if (!buffered) { + return; + } + const bufferedButtons = ( + buffered.payload.channelData?.telegram as + | { buttons?: TelegramInlineButtons } + | undefined + )?.buttons; + await deliverLaneText({ + laneName: "answer", + text: buffered.text, + payload: buffered.payload, + infoKind: "final", + previewButtons: bufferedButtons, + }); + reasoningStepState.resetForNextStep(); + }; + + for (const segment of segments) { + if ( + segment.lane === "answer" && + info.kind === "final" && + reasoningStepState.shouldBufferFinalAnswer() + ) { + reasoningStepState.bufferFinalAnswer({ + payload, + text: segment.text, + }); + continue; + } + if (segment.lane === "reasoning") { + reasoningStepState.noteReasoningHint(); + } + const result = await deliverLaneText({ + laneName: segment.lane, + text: segment.text, + payload, + infoKind: info.kind, + previewButtons, + allowPreviewUpdateForNonFinal: segment.lane === "reasoning", + }); + if (info.kind === "final") { + emitPreviewFinalizedHook(result); + } + if (segment.lane === "reasoning") { + if (result.kind !== "skipped") { + reasoningStepState.noteReasoningDelivered(); + await flushBufferedFinalAnswer(); + } + continue; + } + if (info.kind === "final") { + if (reasoningLane.hasStreamedMessage) { + activePreviewLifecycleByLane.reasoning = "complete"; + retainPreviewOnCleanupByLane.reasoning = true; + } + reasoningStepState.resetForNextStep(); + } + } + if (segments.length > 0) { + if (info.kind === "final") { + pendingCompactionReplayBoundary = false; + } + return; + } + if (split.suppressedReasoningOnly) { + if (reply.hasMedia) { + const payloadWithoutSuppressedReasoning = + typeof payload.text === "string" ? { ...payload, text: "" } : payload; + clearPendingCompactionReplayBoundaryOnVisibleBoundary( + await sendPayload(payloadWithoutSuppressedReasoning), + ); + } + if (info.kind === "final") { + await flushBufferedFinalAnswer(); + pendingCompactionReplayBoundary = false; + } + return; + } + + if (info.kind === "final") { + await answerLane.stream?.stop(); + await reasoningLane.stream?.stop(); + reasoningStepState.resetForNextStep(); + } + const canSendAsIs = reply.hasMedia || reply.text.length > 0; + if (!canSendAsIs) { + if (info.kind === "final") { + await flushBufferedFinalAnswer(); + pendingCompactionReplayBoundary = false; + } + return; + } + clearPendingCompactionReplayBoundaryOnVisibleBoundary(await sendPayload(payload)); + if (info.kind === "final") { + await flushBufferedFinalAnswer(); + pendingCompactionReplayBoundary = false; + } + }, + onSkip: (payload, info) => { + if (payload.isError === true) { + hadErrorReplyFailureOrSkip = true; + } + if (info.reason !== "silent") { + deliveryState.markNonSilentSkip(); + } + }, + onError: (err, info) => { + const errorPolicy = resolveTelegramErrorPolicy({ + accountConfig: telegramCfg, + groupConfig, + topicConfig, + }); + if (isSilentErrorPolicy(errorPolicy.policy)) { + return; + } + if ( + errorPolicy.policy === "once" && + shouldSuppressTelegramError({ + scopeKey: buildTelegramErrorScopeKey({ + accountId: route.accountId, + chatId, + threadId: threadSpec.id, + }), + cooldownMs: errorPolicy.cooldownMs, + errorMessage: String(err), + }) + ) { + return; + } + deliveryState.markNonSilentFailure(); + runtime.error?.(danger(`telegram ${info.kind} reply failed: ${String(err)}`)); + }, + }, + replyOptions: { + skillFilter, + disableBlockStreaming, + onPartialReply: + answerLane.stream || reasoningLane.stream + ? (payload) => + enqueueDraftLaneEvent(async () => { + await ingestDraftLaneSegments(payload.text); + }) + : undefined, + onReasoningStream: reasoningLane.stream ? (payload) => enqueueDraftLaneEvent(async () => { + if (splitReasoningOnNextStream) { + reasoningLane.stream?.forceNewMessage(); + resetDraftLaneState(reasoningLane); + splitReasoningOnNextStream = false; + } await ingestDraftLaneSegments(payload.text); }) : undefined, - onReasoningStream: reasoningLane.stream - ? (payload) => - enqueueDraftLaneEvent(async () => { - // Split between reasoning blocks only when the next reasoning - // stream starts. Splitting at reasoning-end can orphan the active - // preview and cause duplicate reasoning sends on reasoning final. - if (splitReasoningOnNextStream) { - reasoningLane.stream?.forceNewMessage(); - resetDraftLaneState(reasoningLane); - splitReasoningOnNextStream = false; - } - await ingestDraftLaneSegments(payload.text); - }) - : undefined, - onAssistantMessageStart: answerLane.stream - ? () => - enqueueDraftLaneEvent(async () => { - reasoningStepState.resetForNextStep(); - if (skipNextAnswerMessageStartRotation) { - skipNextAnswerMessageStartRotation = false; + onAssistantMessageStart: answerLane.stream + ? () => + enqueueDraftLaneEvent(async () => { + reasoningStepState.resetForNextStep(); + if (skipNextAnswerMessageStartRotation) { + skipNextAnswerMessageStartRotation = false; + activePreviewLifecycleByLane.answer = "transient"; + retainPreviewOnCleanupByLane.answer = false; + return; + } + if (pendingCompactionReplayBoundary) { + pendingCompactionReplayBoundary = false; + activePreviewLifecycleByLane.answer = "transient"; + retainPreviewOnCleanupByLane.answer = false; + return; + } + await rotateAnswerLaneForNewAssistantMessage(); activePreviewLifecycleByLane.answer = "transient"; retainPreviewOnCleanupByLane.answer = false; - return; - } - if (pendingCompactionReplayBoundary) { - pendingCompactionReplayBoundary = false; - activePreviewLifecycleByLane.answer = "transient"; - retainPreviewOnCleanupByLane.answer = false; - return; - } - await rotateAnswerLaneForNewAssistantMessage(); - // Message-start is an explicit assistant-message boundary. - // Even when no forceNewMessage happened (e.g. prior answer had no - // streamed partials), the next partial belongs to a fresh lifecycle - // and must not trigger late pre-rotation mid-message. - activePreviewLifecycleByLane.answer = "transient"; - retainPreviewOnCleanupByLane.answer = false; - }) - : undefined, - onReasoningEnd: reasoningLane.stream - ? () => - enqueueDraftLaneEvent(async () => { - // Split when/if a later reasoning block begins. - splitReasoningOnNextStream = reasoningLane.hasStreamedMessage; - }) - : undefined, - onToolStart: statusReactionController - ? async (payload) => { - const toolName = payload.name?.trim(); - if (toolName) { - await statusReactionController.setTool(toolName); - } - } - : undefined, - onCompactionStart: - statusReactionController || answerLane.stream - ? async () => { - if ( - answerLane.hasStreamedMessage && - activePreviewLifecycleByLane.answer === "transient" - ) { - pendingCompactionReplayBoundary = true; - } - if (statusReactionController) { - await statusReactionController.setCompacting(); + }) + : undefined, + onReasoningEnd: reasoningLane.stream + ? () => + enqueueDraftLaneEvent(async () => { + splitReasoningOnNextStream = reasoningLane.hasStreamedMessage; + }) + : undefined, + onToolStart: statusReactionController + ? async (payload) => { + const toolName = payload.name?.trim(); + if (toolName) { + await statusReactionController.setTool(toolName); } } : undefined, - onCompactionEnd: statusReactionController - ? async () => { - statusReactionController.cancelPending(); - await statusReactionController.setThinking(); - } - : undefined, - onModelSelected, - }, - })); - } catch (err) { - dispatchError = err; - runtime.error?.(danger(`telegram dispatch failed: ${String(err)}`)); + onCompactionStart: + statusReactionController || answerLane.stream + ? async () => { + if ( + answerLane.hasStreamedMessage && + activePreviewLifecycleByLane.answer === "transient" + ) { + pendingCompactionReplayBoundary = true; + } + if (statusReactionController) { + await statusReactionController.setCompacting(); + } + } + : undefined, + onCompactionEnd: statusReactionController + ? async () => { + statusReactionController.cancelPending(); + await statusReactionController.setThinking(); + } + : undefined, + onModelSelected, + }, + })); + } catch (err) { + dispatchError = err; + runtime.error?.(danger(`telegram dispatch failed: ${String(err)}`)); + } finally { + await draftLaneEventQueue; + if (isDispatchSuperseded()) { + if (answerLane.hasStreamedMessage || typeof answerLane.stream?.messageId() === "number") { + retainPreviewOnCleanupByLane.answer = true; + } + for (const archivedPreview of archivedAnswerPreviews) { + archivedPreview.deleteIfUnused = false; + } + } + const streamCleanupStates = new Map< + NonNullable, + { shouldClear: boolean } + >(); + const lanesToCleanup: Array<{ laneName: LaneName; lane: DraftLaneState }> = [ + { laneName: "answer", lane: answerLane }, + { laneName: "reasoning", lane: reasoningLane }, + ]; + for (const laneState of lanesToCleanup) { + const stream = laneState.lane.stream; + if (!stream) { + continue; + } + const activePreviewMessageId = stream.messageId(); + const hasBoundaryFinalizedActivePreview = + laneState.laneName === "answer" && + typeof activePreviewMessageId === "number" && + archivedAnswerPreviews.some( + (p) => p.deleteIfUnused === false && p.messageId === activePreviewMessageId, + ); + const shouldClear = + !retainPreviewOnCleanupByLane[laneState.laneName] && !hasBoundaryFinalizedActivePreview; + const existing = streamCleanupStates.get(stream); + if (!existing) { + streamCleanupStates.set(stream, { shouldClear }); + continue; + } + existing.shouldClear = existing.shouldClear && shouldClear; + } + for (const [stream, cleanupState] of streamCleanupStates) { + if (isDispatchSuperseded()) { + await (typeof stream.discard === "function" ? stream.discard() : stream.stop()); + continue; + } + await stream.stop(); + if (cleanupState.shouldClear) { + await stream.clear(); + } + } + if (!isDispatchSuperseded()) { + for (const archivedPreview of archivedAnswerPreviews) { + if (archivedPreview.deleteIfUnused === false) { + continue; + } + try { + await bot.api.deleteMessage(chatId, archivedPreview.messageId); + } catch (err) { + logVerbose( + `telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`, + ); + } + } + for (const messageId of archivedReasoningPreviewIds) { + try { + await bot.api.deleteMessage(chatId, messageId); + } catch (err) { + logVerbose( + `telegram: archived reasoning preview cleanup failed (${messageId}): ${String(err)}`, + ); + } + } + } + } } finally { - // Upstream assistant callbacks are fire-and-forget; drain queued lane work - // before stream cleanup so boundary rotations/materialization complete first. - await draftLaneEventQueue; - // Must stop() first to flush debounced content before clear() wipes state. - const streamCleanupStates = new Map< - NonNullable, - { shouldClear: boolean } - >(); - const lanesToCleanup: Array<{ laneName: LaneName; lane: DraftLaneState }> = [ - { laneName: "answer", lane: answerLane }, - { laneName: "reasoning", lane: reasoningLane }, - ]; - for (const laneState of lanesToCleanup) { - const stream = laneState.lane.stream; - if (!stream) { - continue; - } - // Don't clear (delete) the stream if: (a) it was finalized, or - // (b) the active stream message is itself a boundary-finalized archive. - const activePreviewMessageId = stream.messageId(); - const hasBoundaryFinalizedActivePreview = - laneState.laneName === "answer" && - typeof activePreviewMessageId === "number" && - archivedAnswerPreviews.some( - (p) => p.deleteIfUnused === false && p.messageId === activePreviewMessageId, - ); - const shouldClear = - !retainPreviewOnCleanupByLane[laneState.laneName] && !hasBoundaryFinalizedActivePreview; - const existing = streamCleanupStates.get(stream); - if (!existing) { - streamCleanupStates.set(stream, { shouldClear }); - continue; - } - existing.shouldClear = existing.shouldClear && shouldClear; - } - for (const [stream, cleanupState] of streamCleanupStates) { - await stream.stop(); - if (cleanupState.shouldClear) { - await stream.clear(); - } - } - for (const archivedPreview of archivedAnswerPreviews) { - if (archivedPreview.deleteIfUnused === false) { - continue; - } - try { - await bot.api.deleteMessage(chatId, archivedPreview.messageId); - } catch (err) { - logVerbose( - `telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`, - ); - } - } - for (const messageId of archivedReasoningPreviewIds) { - try { - await bot.api.deleteMessage(chatId, messageId); - } catch (err) { - logVerbose( - `telegram: archived reasoning preview cleanup failed (${messageId}): ${String(err)}`, - ); - } + dispatchWasSuperseded = isDispatchSuperseded(); + releaseAbortFence(); + } + if (dispatchWasSuperseded) { + if (statusReactionController) { + void Promise.resolve(statusReactionController.setDone()).catch((err: unknown) => { + logVerbose(`telegram: status reaction finalize failed: ${String(err)}`); + }); + } else { + removeAckReactionAfterReply({ + removeAfterReply: removeAckAfterReply, + ackReactionPromise, + ackReactionValue: ackReactionPromise ? "ack" : null, + remove: () => + (reactionApi?.(chatId, msg.message_id ?? 0, []) ?? Promise.resolve()).then(() => {}), + onError: (err) => { + if (!msg.message_id) { + return; + } + logAckFailure({ + log: logVerbose, + channel: "telegram", + target: `${chatId}/${msg.message_id}`, + error: err, + }); + }, + }); } + clearGroupHistory(); + return; } let sentFallback = false; const deliverySummary = deliveryState.snapshot(); diff --git a/extensions/telegram/src/draft-stream.test-helpers.ts b/extensions/telegram/src/draft-stream.test-helpers.ts index b68a3c226b1..428b296efc9 100644 --- a/extensions/telegram/src/draft-stream.test-helpers.ts +++ b/extensions/telegram/src/draft-stream.test-helpers.ts @@ -11,6 +11,7 @@ export type TestDraftStream = { lastDeliveredText: ReturnType string>>; clear: ReturnType Promise>>; stop: ReturnType Promise>>; + discard: ReturnType Promise>>; materialize: ReturnType Promise>>; forceNewMessage: ReturnType void>>; sendMayHaveLanded: ReturnType boolean>>; @@ -22,6 +23,7 @@ export function createTestDraftStream(params?: { previewMode?: DraftPreviewMode; onUpdate?: (text: string) => void; onStop?: () => void | Promise; + onDiscard?: () => void | Promise; clearMessageIdOnForceNew?: boolean; }): TestDraftStream { let messageId = params?.messageId; @@ -42,6 +44,9 @@ export function createTestDraftStream(params?: { stop: vi.fn().mockImplementation(async () => { await params?.onStop?.(); }), + discard: vi.fn().mockImplementation(async () => { + await params?.onDiscard?.(); + }), materialize: vi.fn().mockImplementation(async () => messageId), forceNewMessage: vi.fn().mockImplementation(() => { if (params?.clearMessageIdOnForceNew) { @@ -75,6 +80,7 @@ export function createSequencedTestDraftStream(startMessageId = 1001): TestDraft lastDeliveredText: vi.fn().mockImplementation(() => lastDeliveredText), clear: vi.fn().mockResolvedValue(undefined), stop: vi.fn().mockResolvedValue(undefined), + discard: vi.fn().mockResolvedValue(undefined), materialize: vi.fn().mockImplementation(async () => activeMessageId), forceNewMessage: vi.fn().mockImplementation(() => { activeMessageId = undefined; diff --git a/extensions/telegram/src/draft-stream.test.ts b/extensions/telegram/src/draft-stream.test.ts index f4152a2184c..64d7245fe4e 100644 --- a/extensions/telegram/src/draft-stream.test.ts +++ b/extensions/telegram/src/draft-stream.test.ts @@ -624,17 +624,28 @@ describe("draft stream initial message debounce", () => { const api = createMockApi(); const stream = createDebouncedStream(api); - stream.update("Processing"); // 10 chars, below 30 + stream.update("Processing"); await stream.flush(); expect(api.sendMessage).not.toHaveBeenCalled(); }); + it("does not send a first message when discard() supersedes a short partial", async () => { + const api = createMockApi(); + const stream = createDebouncedStream(api); + + stream.update("Processing"); + await stream.discard?.(); + await stream.flush(); + + expect(api.sendMessage).not.toHaveBeenCalled(); + expect(api.editMessageText).not.toHaveBeenCalled(); + }); + it("sends first message when reaching threshold", async () => { const api = createMockApi(); const stream = createDebouncedStream(api); - // Exactly 30 chars stream.update("I am processing your request.."); await stream.flush(); @@ -645,7 +656,7 @@ describe("draft stream initial message debounce", () => { const api = createMockApi(); const stream = createDebouncedStream(api); - stream.update("I am processing your request, please wait a moment"); // 50 chars + stream.update("I am processing your request, please wait a moment"); await stream.flush(); expect(api.sendMessage).toHaveBeenCalled(); @@ -657,17 +668,15 @@ describe("draft stream initial message debounce", () => { const api = createMockApi(); const stream = createDebouncedStream(api); - // First message at threshold (30 chars) stream.update("I am processing your request.."); await stream.flush(); expect(api.sendMessage).toHaveBeenCalledTimes(1); - // Subsequent updates should edit, not wait for threshold stream.update("I am processing your request.. and summarizing"); await stream.flush(); expect(api.editMessageText).toHaveBeenCalled(); - expect(api.sendMessage).toHaveBeenCalledTimes(1); // still only 1 send + expect(api.sendMessage).toHaveBeenCalledTimes(1); }); }); @@ -677,7 +686,6 @@ describe("draft stream initial message debounce", () => { const stream = createTelegramDraftStream({ api: api as unknown as Bot["api"], chatId: 123, - // no minInitialChars (backward-compatible behavior) }); stream.update("Hi"); diff --git a/extensions/telegram/src/draft-stream.ts b/extensions/telegram/src/draft-stream.ts index ddffa165495..a2f88aae216 100644 --- a/extensions/telegram/src/draft-stream.ts +++ b/extensions/telegram/src/draft-stream.ts @@ -1,5 +1,8 @@ import type { Bot } from "grammy"; -import { createFinalizableDraftLifecycle } from "openclaw/plugin-sdk/channel-lifecycle"; +import { + clearFinalizableDraftMessage, + createFinalizableDraftStreamControlsForState, +} from "openclaw/plugin-sdk/channel-lifecycle"; import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { buildTelegramThreadParams, type TelegramThreadSpec } from "./bot/helpers.js"; import { isSafeToRetrySendError, isTelegramClientRejection } from "./network-errors.js"; @@ -96,6 +99,8 @@ export type TelegramDraftStream = { lastDeliveredText?: () => string; clear: () => Promise; stop: () => Promise; + /** Stop without a final flush or delete. */ + discard?: () => Promise; /** Convert the current draft preview into a permanent message (sendMessage). */ materialize?: () => Promise; /** Reset internal state so the next update creates a new message instead of editing. */ @@ -240,9 +245,6 @@ export function createTelegramDraftStream(params: { "telegram stream preview send failed with message_thread_id, retrying without thread", })); } catch (err) { - // Pre-connect failures (DNS, refused) and explicit Telegram rejections (4xx) - // guarantee the message was never delivered — clear the flag so - // sendMayHaveLanded() doesn't suppress fallback. if (isSafeToRetrySendError(err) || isTelegramClientRejection(err)) { messageSendAttempted = false; } @@ -288,7 +290,6 @@ export function createTelegramDraftStream(params: { }; const sendOrEditStreamMessage = async (text: string): Promise => { - // Allow final flush even if stopped (e.g., after clear()). if (streamState.stopped && !streamState.final) { return false; } @@ -303,8 +304,6 @@ export function createTelegramDraftStream(params: { return false; } if (renderedText.length > maxChars) { - // Telegram text messages/edits cap at 4096 chars. - // Stop streaming once we exceed the cap to avoid repeated API failures. streamState.stopped = true; params.warn?.( `telegram stream preview stopped (text length ${renderedText.length} > ${maxChars})`, @@ -316,7 +315,6 @@ export function createTelegramDraftStream(params: { } const sendGeneration = generation; - // Debounce first preview send for better push notification quality. if (typeof streamMessageId !== "number" && minInitialChars != null && !streamState.final) { if (renderedText.length < minInitialChars) { return false; @@ -368,29 +366,37 @@ export function createTelegramDraftStream(params: { } }; - const { loop, update, stop, clear } = createFinalizableDraftLifecycle({ + const { loop, update, stop, stopForClear } = createFinalizableDraftStreamControlsForState({ throttleMs, state: streamState, sendOrEditStreamMessage, - readMessageId: () => streamMessageId, - clearMessageId: () => { - streamMessageId = undefined; - }, - isValidMessageId: (value): value is number => - typeof value === "number" && Number.isFinite(value), - deleteMessage: async (messageId) => { - await params.api.deleteMessage(chatId, messageId); - }, - onDeleteSuccess: (messageId) => { - params.log?.(`telegram stream preview deleted (chat=${chatId}, message=${messageId})`); - }, - warn: params.warn, - warnPrefix: "telegram stream preview cleanup failed", }); + const clear = async () => { + await clearFinalizableDraftMessage({ + stopForClear, + readMessageId: () => streamMessageId, + clearMessageId: () => { + streamMessageId = undefined; + }, + isValidMessageId: (value): value is number => + typeof value === "number" && Number.isFinite(value), + deleteMessage: async (messageId) => { + await params.api.deleteMessage(chatId, messageId); + }, + onDeleteSuccess: (messageId) => { + params.log?.(`telegram stream preview deleted (chat=${chatId}, message=${messageId})`); + }, + warn: params.warn, + warnPrefix: "telegram stream preview cleanup failed", + }); + }; + + const discard = async () => { + await stopForClear(); + }; + const forceNewMessage = () => { - // Boundary rotation may call stop() to finalize the previous draft. - // Re-open the stream lifecycle for the next assistant segment. streamState.final = false; generation += 1; messageSendAttempted = false; @@ -404,20 +410,11 @@ export function createTelegramDraftStream(params: { loop.resetThrottleWindow(); }; - /** - * Materialize the current draft into a permanent message. - * For draft transport: sends the accumulated text as a real sendMessage. - * For message transport: the message is already permanent (noop). - * Returns the permanent message id, or undefined if nothing to materialize. - */ const materialize = async (): Promise => { await stop(); - // If using message transport, the streamMessageId is already a real message. if (previewTransport === "message" && typeof streamMessageId === "number") { return streamMessageId; } - // For draft transport, use the rendered snapshot first so parse_mode stays - // aligned with the text being materialized. const renderedText = lastSentText || lastDeliveredText; if (!renderedText) { return undefined; @@ -433,8 +430,6 @@ export function createTelegramDraftStream(params: { const sentId = sent?.message_id; if (typeof sentId === "number" && Number.isFinite(sentId)) { streamMessageId = Math.trunc(sentId); - // Clear the draft so Telegram's input area doesn't briefly show a - // stale copy alongside the newly materialized real message. if (resolvedDraftApi != null && streamDraftId != null) { const clearDraftId = streamDraftId; const clearThreadParams = @@ -443,9 +438,7 @@ export function createTelegramDraftStream(params: { : undefined; try { await resolvedDraftApi(chatId, clearDraftId, "", clearThreadParams); - } catch { - // Best-effort cleanup; draft clear failure is cosmetic. - } + } catch {} } return streamMessageId; } @@ -466,6 +459,7 @@ export function createTelegramDraftStream(params: { lastDeliveredText: () => lastDeliveredText, clear, stop, + discard, materialize, forceNewMessage, sendMayHaveLanded: () => messageSendAttempted && typeof streamMessageId !== "number",