From 3fe4c19305883abf246312c0587ece47422d185a Mon Sep 17 00:00:00 2001 From: OpenCils <114985039+OpenCils@users.noreply.github.com> Date: Tue, 3 Mar 2026 20:04:46 +0800 Subject: [PATCH] fix(telegram): prevent duplicate messages in DM draft streaming mode (#32118) * fix(telegram): prevent duplicate messages in DM draft streaming mode When using sendMessageDraft for DM streaming (streaming: 'partial'), the draft bubble auto-converts to the final message. The code was incorrectly falling through to sendPayload() after the draft was finalized, causing a duplicate message. This fix checks if we're in draft preview mode with hasStreamedMessage and skips the sendPayload call, returning "preview-finalized" directly. Key changes: - Use hasStreamedMessage flag instead of previewRevision comparison - Avoids double stopDraftLane calls by returning early - Prevents duplicate messages when final text equals last streamed text Root cause: In lane-delivery.ts, the final message handling logic did not properly handle the DM draft flow where sendMessageDraft creates a transient bubble that doesn't need a separate final send. * fix(telegram): harden DM draft finalization path * fix(telegram): require emitted draft preview for unchanged finals * fix(telegram): require final draft text emission before finalize * fix: update changelog for telegram draft finalization (#32118) (thanks @OpenCils) --------- Co-authored-by: Ayaan Zaidi --- CHANGELOG.md | 1 + src/telegram/bot-message-dispatch.ts | 2 + src/telegram/draft-stream.test-helpers.ts | 9 +- src/telegram/draft-stream.ts | 4 + src/telegram/lane-delivery.test.ts | 183 +++++++++++++++++++++- src/telegram/lane-delivery.ts | 44 +++++- 6 files changed, 233 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f7619d2ef91..5b975c2882f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Telegram/DM draft finalization reliability: require verified final-text draft emission before treating preview finalization as delivered, and fall back to normal payload send when final draft delivery is not confirmed (preventing missing final responses and preserving media/button delivery). (#32118) Thanks @OpenCils. - Exec heartbeat routing: scope exec-triggered heartbeat wakes to agent session keys so unrelated agents are no longer awakened by exec events, while preserving legacy unscoped behavior for non-canonical session keys. (#32724) thanks @altaywtf - macOS/Tailscale remote gateway discovery: add a Tailscale Serve fallback peer probe path (`wss://.ts.net`) when Bonjour and wide-area DNS-SD discovery return no gateways, and refresh both discovery paths from macOS onboarding. (#32860) Thanks @ngutman. - Telegram/multi-account default routing clarity: warn only for ambiguous (2+) account setups without an explicit default, add `openclaw doctor` warnings for missing/invalid multi-account defaults across channels, and document explicit-default guidance for channel routing and Telegram config. (#32544) thanks @Sid-Qin. diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index 7dfcc80e6a8..addad5a48c8 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -225,6 +225,7 @@ export const dispatchTelegramMessage = async ({ stream, lastPartialText: "", hasStreamedMessage: false, + previewRevisionBaseline: stream?.previewRevision?.() ?? 0, }; }; const lanes: Record = { @@ -259,6 +260,7 @@ export const dispatchTelegramMessage = async ({ const resetDraftLaneState = (lane: DraftLaneState) => { lane.lastPartialText = ""; lane.hasStreamedMessage = false; + lane.previewRevisionBaseline = lane.stream?.previewRevision?.() ?? lane.previewRevisionBaseline; }; const updateDraftFromPartial = (lane: DraftLaneState, text: string | undefined) => { const laneStream = lane.stream; diff --git a/src/telegram/draft-stream.test-helpers.ts b/src/telegram/draft-stream.test-helpers.ts index abb958e36f7..120204ecb01 100644 --- a/src/telegram/draft-stream.test-helpers.ts +++ b/src/telegram/draft-stream.test-helpers.ts @@ -8,6 +8,7 @@ export type TestDraftStream = { messageId: ReturnType number | undefined>>; previewMode: ReturnType DraftPreviewMode>>; previewRevision: ReturnType number>>; + lastDeliveredText: ReturnType string>>; clear: ReturnType Promise>>; stop: ReturnType Promise>>; forceNewMessage: ReturnType void>>; @@ -23,15 +24,18 @@ export function createTestDraftStream(params?: { }): TestDraftStream { let messageId = params?.messageId; let previewRevision = 0; + let lastDeliveredText = ""; return { update: vi.fn().mockImplementation((text: string) => { previewRevision += 1; + lastDeliveredText = text.trimEnd(); params?.onUpdate?.(text); }), flush: vi.fn().mockResolvedValue(undefined), messageId: vi.fn().mockImplementation(() => messageId), previewMode: vi.fn().mockReturnValue(params?.previewMode ?? "message"), previewRevision: vi.fn().mockImplementation(() => previewRevision), + lastDeliveredText: vi.fn().mockImplementation(() => lastDeliveredText), clear: vi.fn().mockResolvedValue(undefined), stop: vi.fn().mockImplementation(async () => { await params?.onStop?.(); @@ -51,17 +55,20 @@ export function createSequencedTestDraftStream(startMessageId = 1001): TestDraft let activeMessageId: number | undefined; let nextMessageId = startMessageId; let previewRevision = 0; + let lastDeliveredText = ""; return { - update: vi.fn().mockImplementation(() => { + update: vi.fn().mockImplementation((text: string) => { if (activeMessageId == null) { activeMessageId = nextMessageId++; } previewRevision += 1; + lastDeliveredText = text.trimEnd(); }), flush: vi.fn().mockResolvedValue(undefined), messageId: vi.fn().mockImplementation(() => activeMessageId), previewMode: vi.fn().mockReturnValue("message"), previewRevision: vi.fn().mockImplementation(() => previewRevision), + lastDeliveredText: vi.fn().mockImplementation(() => lastDeliveredText), clear: vi.fn().mockResolvedValue(undefined), stop: vi.fn().mockResolvedValue(undefined), forceNewMessage: vi.fn().mockImplementation(() => { diff --git a/src/telegram/draft-stream.ts b/src/telegram/draft-stream.ts index e0f44f98451..1a578ad46ec 100644 --- a/src/telegram/draft-stream.ts +++ b/src/telegram/draft-stream.ts @@ -59,6 +59,7 @@ export type TelegramDraftStream = { messageId: () => number | undefined; previewMode?: () => "message" | "draft"; previewRevision?: () => number; + lastDeliveredText?: () => string; clear: () => Promise; stop: () => Promise; /** Reset internal state so the next update creates a new message instead of editing. */ @@ -127,6 +128,7 @@ export function createTelegramDraftStream(params: { let streamDraftId = usesDraftTransport ? allocateTelegramDraftId() : undefined; let previewTransport: "message" | "draft" = usesDraftTransport ? "draft" : "message"; let lastSentText = ""; + let lastDeliveredText = ""; let lastSentParseMode: "HTML" | undefined; let previewRevision = 0; let generation = 0; @@ -289,6 +291,7 @@ export function createTelegramDraftStream(params: { } if (sent) { previewRevision += 1; + lastDeliveredText = trimmed; } return sent; } catch (err) { @@ -340,6 +343,7 @@ export function createTelegramDraftStream(params: { messageId: () => streamMessageId, previewMode: () => previewTransport, previewRevision: () => previewRevision, + lastDeliveredText: () => lastDeliveredText, clear, stop, forceNewMessage, diff --git a/src/telegram/lane-delivery.test.ts b/src/telegram/lane-delivery.test.ts index 155fa7b63eb..5ead4a065af 100644 --- a/src/telegram/lane-delivery.test.ts +++ b/src/telegram/lane-delivery.test.ts @@ -7,19 +7,26 @@ function createHarness(params?: { answerMessageId?: number; draftMaxChars?: number; answerMessageIdAfterStop?: number; + answerStream?: DraftLaneState["stream"]; + answerHasStreamedMessage?: boolean; + answerLastPartialText?: string; + answerPreviewRevisionBaseline?: number; }) { - const answer = createTestDraftStream({ messageId: params?.answerMessageId }); + const answer = + params?.answerStream ?? createTestDraftStream({ messageId: params?.answerMessageId }); const reasoning = createTestDraftStream(); const lanes: Record = { answer: { - stream: answer as DraftLaneState["stream"], - lastPartialText: "", - hasStreamedMessage: false, + stream: answer, + lastPartialText: params?.answerLastPartialText ?? "", + hasStreamedMessage: params?.answerHasStreamedMessage ?? false, + previewRevisionBaseline: params?.answerPreviewRevisionBaseline ?? 0, }, reasoning: { stream: reasoning as DraftLaneState["stream"], lastPartialText: "", hasStreamedMessage: false, + previewRevisionBaseline: 0, }, }; const sendPayload = vi.fn().mockResolvedValue(true); @@ -28,7 +35,9 @@ function createHarness(params?: { }); const stopDraftLane = vi.fn().mockImplementation(async (lane: DraftLaneState) => { if (lane === lanes.answer && params?.answerMessageIdAfterStop !== undefined) { - answer.setMessageId(params.answerMessageIdAfterStop); + (answer as { setMessageId?: (value: number | undefined) => void }).setMessageId?.( + params.answerMessageIdAfterStop, + ); } await lane.stream?.stop(); }); @@ -59,7 +68,7 @@ function createHarness(params?: { lanes, answer: { stream: answer, - setMessageId: answer.setMessageId, + setMessageId: (answer as { setMessageId?: (value: number | undefined) => void }).setMessageId, }, sendPayload, flushDraftLane, @@ -106,7 +115,7 @@ describe("createLaneTextDeliverer", () => { }); expect(result).toBe("preview-finalized"); - expect(harness.answer.stream.update).toHaveBeenCalledWith("no problem"); + expect(harness.answer.stream?.update).toHaveBeenCalledWith("no problem"); expect(harness.editPreview).toHaveBeenCalledWith( expect.objectContaining({ laneName: "answer", @@ -202,4 +211,164 @@ describe("createLaneTextDeliverer", () => { expect(harness.sendPayload).toHaveBeenCalledWith(expect.objectContaining({ text: longText })); expect(harness.log).toHaveBeenCalledWith(expect.stringContaining("preview final too long")); }); + + it("treats unchanged DM draft final text as already finalized", async () => { + const answerStream = createTestDraftStream({ previewMode: "draft" }); + answerStream.previewRevision.mockReturnValue(7); + answerStream.lastDeliveredText.mockReturnValue("Hello final"); + answerStream.update.mockImplementation(() => {}); + const harness = createHarness({ + answerStream: answerStream as DraftLaneState["stream"], + answerHasStreamedMessage: true, + answerLastPartialText: "Hello final", + }); + + const result = await harness.deliverLaneText({ + laneName: "answer", + text: "Hello final", + payload: { text: "Hello final" }, + infoKind: "final", + }); + + expect(result).toBe("preview-finalized"); + expect(harness.flushDraftLane).toHaveBeenCalledTimes(1); + expect(harness.stopDraftLane).toHaveBeenCalledTimes(1); + expect(harness.sendPayload).not.toHaveBeenCalled(); + expect(harness.markDelivered).toHaveBeenCalledTimes(1); + }); + + it("falls back once when DM draft finalization emits no update", async () => { + const answerStream = createTestDraftStream({ previewMode: "draft" }); + answerStream.previewRevision.mockReturnValue(3); + answerStream.update.mockImplementation(() => {}); + const harness = createHarness({ + answerStream: answerStream as DraftLaneState["stream"], + answerHasStreamedMessage: true, + answerLastPartialText: "Partial", + }); + + const result = await harness.deliverLaneText({ + laneName: "answer", + text: "Final answer", + payload: { text: "Final answer" }, + infoKind: "final", + }); + + expect(result).toBe("sent"); + expect(harness.flushDraftLane).toHaveBeenCalledTimes(1); + expect(harness.stopDraftLane).toHaveBeenCalledTimes(1); + expect(harness.sendPayload).toHaveBeenCalledWith( + expect.objectContaining({ text: "Final answer" }), + ); + expect(harness.markDelivered).not.toHaveBeenCalled(); + expect(harness.log).toHaveBeenCalledWith( + expect.stringContaining("draft final text not emitted"), + ); + }); + + it("falls back when unchanged final text has no emitted draft preview in current lane", async () => { + const answerStream = createTestDraftStream({ previewMode: "draft" }); + answerStream.previewRevision.mockReturnValue(7); + answerStream.update.mockImplementation(() => {}); + const harness = createHarness({ + answerStream: answerStream as DraftLaneState["stream"], + answerHasStreamedMessage: true, + answerLastPartialText: "Hello final", + answerPreviewRevisionBaseline: 7, + }); + + const result = await harness.deliverLaneText({ + laneName: "answer", + text: "Hello final", + payload: { text: "Hello final" }, + infoKind: "final", + }); + + expect(result).toBe("sent"); + expect(harness.stopDraftLane).toHaveBeenCalledTimes(1); + expect(harness.sendPayload).toHaveBeenCalledWith( + expect.objectContaining({ text: "Hello final" }), + ); + expect(harness.markDelivered).not.toHaveBeenCalled(); + expect(harness.log).toHaveBeenCalledWith( + expect.stringContaining("draft final text not emitted"), + ); + }); + + it("falls back when revision advances but final text was not emitted", async () => { + let previewRevision = 7; + const answerStream = createTestDraftStream({ previewMode: "draft" }); + answerStream.previewRevision.mockImplementation(() => previewRevision); + answerStream.lastDeliveredText.mockReturnValue("Older partial"); + answerStream.update.mockImplementation(() => {}); + answerStream.flush.mockImplementation(async () => { + previewRevision += 1; + }); + const harness = createHarness({ + answerStream: answerStream as DraftLaneState["stream"], + answerHasStreamedMessage: true, + answerLastPartialText: "Final answer", + }); + + const result = await harness.deliverLaneText({ + laneName: "answer", + text: "Final answer", + payload: { text: "Final answer" }, + infoKind: "final", + }); + + expect(result).toBe("sent"); + expect(harness.sendPayload).toHaveBeenCalledWith( + expect.objectContaining({ text: "Final answer" }), + ); + expect(harness.markDelivered).not.toHaveBeenCalled(); + expect(harness.log).toHaveBeenCalledWith( + expect.stringContaining("draft final text not emitted"), + ); + }); + + it("does not use DM draft final shortcut for media payloads", async () => { + const answerStream = createTestDraftStream({ previewMode: "draft" }); + const harness = createHarness({ + answerStream: answerStream as DraftLaneState["stream"], + answerHasStreamedMessage: true, + answerLastPartialText: "Image incoming", + }); + + const result = await harness.deliverLaneText({ + laneName: "answer", + text: "Image incoming", + payload: { text: "Image incoming", mediaUrl: "file:///tmp/example.png" }, + infoKind: "final", + }); + + expect(result).toBe("sent"); + expect(harness.sendPayload).toHaveBeenCalledWith( + expect.objectContaining({ text: "Image incoming", mediaUrl: "file:///tmp/example.png" }), + ); + expect(harness.markDelivered).not.toHaveBeenCalled(); + }); + + it("does not use DM draft final shortcut when inline buttons are present", async () => { + const answerStream = createTestDraftStream({ previewMode: "draft" }); + const harness = createHarness({ + answerStream: answerStream as DraftLaneState["stream"], + answerHasStreamedMessage: true, + answerLastPartialText: "Choose one", + }); + + const result = await harness.deliverLaneText({ + laneName: "answer", + text: "Choose one", + payload: { text: "Choose one" }, + previewButtons: [[{ text: "OK", callback_data: "ok" }]], + infoKind: "final", + }); + + expect(result).toBe("sent"); + expect(harness.sendPayload).toHaveBeenCalledWith( + expect.objectContaining({ text: "Choose one" }), + ); + expect(harness.markDelivered).not.toHaveBeenCalled(); + }); }); diff --git a/src/telegram/lane-delivery.ts b/src/telegram/lane-delivery.ts index 7ae70fbe9f3..f80eff7b32c 100644 --- a/src/telegram/lane-delivery.ts +++ b/src/telegram/lane-delivery.ts @@ -8,6 +8,7 @@ export type DraftLaneState = { stream: TelegramDraftStream | undefined; lastPartialText: string; hasStreamedMessage: boolean; + previewRevisionBaseline: number; }; export type ArchivedPreview = { @@ -328,6 +329,43 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { !hasMedia && text.length > 0 && text.length <= params.draftMaxChars && !payload.isError; if (infoKind === "final") { + const hasPreviewButtons = Boolean(previewButtons?.some((row) => row.length > 0)); + const canFinalizeDraftPreviewDirectly = + isDraftPreviewLane(lane) && + lane.hasStreamedMessage && + canEditViaPreview && + !hasPreviewButtons; + let draftPreviewStopped = false; + if (canFinalizeDraftPreviewDirectly) { + const previewRevisionBeforeFlush = lane.stream?.previewRevision?.() ?? 0; + const finalTextSnapshot = text.trimEnd(); + const hasEmittedPreviewInCurrentLane = + previewRevisionBeforeFlush > lane.previewRevisionBaseline; + const deliveredPreviewTextBeforeFinal = lane.stream?.lastDeliveredText?.() ?? ""; + const finalTextAlreadyDelivered = + deliveredPreviewTextBeforeFinal === finalTextSnapshot && hasEmittedPreviewInCurrentLane; + const unchangedFinalText = text === lane.lastPartialText; + lane.stream?.update(text); + await params.flushDraftLane(lane); + await params.stopDraftLane(lane); + draftPreviewStopped = true; + const previewUpdated = (lane.stream?.previewRevision?.() ?? 0) > previewRevisionBeforeFlush; + const deliveredPreviewTextAfterFinal = + lane.stream?.lastDeliveredText?.() ?? deliveredPreviewTextBeforeFinal; + if ( + (previewUpdated && deliveredPreviewTextAfterFinal === finalTextSnapshot) || + (unchangedFinalText && finalTextAlreadyDelivered) + ) { + lane.lastPartialText = text; + params.finalizedPreviewByLane[laneName] = true; + params.markDelivered(); + return "preview-finalized"; + } + params.log( + `telegram: ${laneName} draft final text not emitted; falling back to standard send`, + ); + } + if (laneName === "answer") { const archivedResult = await consumeArchivedAnswerPreviewForFinal({ lane, @@ -340,7 +378,7 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { return archivedResult; } } - if (canEditViaPreview && !params.finalizedPreviewByLane[laneName]) { + if (canEditViaPreview && !params.finalizedPreviewByLane[laneName] && !draftPreviewStopped) { await params.flushDraftLane(lane); if (laneName === "answer") { const archivedResultAfterFlush = await consumeArchivedAnswerPreviewForFinal({ @@ -372,7 +410,9 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { `telegram: preview final too long for edit (${text.length} > ${params.draftMaxChars}); falling back to standard send`, ); } - await params.stopDraftLane(lane); + if (!draftPreviewStopped) { + await params.stopDraftLane(lane); + } const delivered = await params.sendPayload(params.applyTextToPayload(payload, text)); return delivered ? "sent" : "skipped"; }