From fb9a21ae8fca75ad78b82ce184670b7a0abefe44 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 22 Apr 2026 02:28:58 +0100 Subject: [PATCH] fix: centralize draft preview finalization --- CHANGELOG.md | 1 + .../.generated/plugin-sdk-api-baseline.sha256 | 4 +- docs/channels/discord.md | 1 + docs/channels/mattermost.md | 2 +- docs/channels/slack.md | 1 + docs/concepts/streaming.md | 8 + docs/plugins/sdk-migration.md | 2 +- docs/plugins/sdk-overview.md | 2 +- extensions/discord/src/draft-stream.test.ts | 46 ++++++ extensions/discord/src/draft-stream.ts | 6 +- .../monitor/message-handler.process.test.ts | 55 ++++++- .../src/monitor/message-handler.process.ts | 142 +++++++++--------- .../matrix/src/matrix/draft-stream.test.ts | 19 +++ extensions/matrix/src/matrix/draft-stream.ts | 9 ++ .../matrix/src/matrix/monitor/handler.test.ts | 44 ++++++ .../matrix/src/matrix/monitor/handler.ts | 24 ++- .../src/mattermost/draft-stream.test.ts | 39 +++++ .../mattermost/src/mattermost/draft-stream.ts | 6 +- .../mattermost/src/mattermost/monitor.test.ts | 34 ++++- .../mattermost/src/mattermost/monitor.ts | 101 ++++++------- extensions/slack/src/draft-stream.test.ts | 16 ++ extensions/slack/src/draft-stream.ts | 10 +- .../dispatch.preview-fallback.test.ts | 59 +++++++- .../src/monitor/message-handler/dispatch.ts | 114 ++++++++------ extensions/slack/src/monitor/replies.test.ts | 27 +++- extensions/slack/src/monitor/replies.ts | 2 + src/auto-reply/reply/reply-reference.ts | 14 +- src/auto-reply/reply/reply-utils.test.ts | 23 +++ src/channels/draft-preview-finalizer.test.ts | 98 ++++++++++++ src/channels/draft-preview-finalizer.ts | 70 +++++++++ src/channels/draft-stream-controls.test.ts | 31 ++++ src/channels/draft-stream-controls.ts | 8 + src/plugin-sdk/channel-lifecycle.ts | 1 + 33 files changed, 824 insertions(+), 195 deletions(-) create mode 100644 src/channels/draft-preview-finalizer.test.ts create mode 100644 src/channels/draft-preview-finalizer.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index ce28a28b89e..7b113bc2f32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Channels/preview streaming: centralize draft-preview finalization so Slack, Discord, Mattermost, and Matrix no longer flush temporary preview messages for media/error finals, and preserve first-reply threading for normal fallback delivery. - Discord: keep slash command follow-up chunks ephemeral when the command is configured for ephemeral replies, so long `/status` output no longer leaks fallback model or runtime details into the public channel. (#69869) thanks @gumadeiras. - Plugins/discovery: reject package plugin source entries that escape the package directory before explicit runtime entries or inferred built JavaScript peers can be used. (#69868) thanks @gumadeiras. - CLI/channels: resolve channel presence through a shared policy that keeps ambient env vars and stale persisted auth from surfacing disabled bundled plugins in status, doctor, security audit, and cron delivery validation unless the channel or plugin is effectively enabled or explicitly configured. (#69862) Thanks @gumadeiras. diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index 1bcbad9175d..6a5f7e49f1f 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -cfeee4630cb43ffc4d702f207d28d35962c6458aa8fd2b1671c35e0be158bb35 plugin-sdk-api-baseline.json -af4fbf19861c6ec000b41ac5a3ded597700e45bb15f8b1d74bb2d1f550bd09b6 plugin-sdk-api-baseline.jsonl +3a2cde4b15041b5456420b2052b572f9968a93690814d2cf924382fd2f54d1d3 plugin-sdk-api-baseline.json +38cd9086be93fc9531a8036812c197118c7830d52d40424be08dc9c6d51092e2 plugin-sdk-api-baseline.jsonl diff --git a/docs/channels/discord.md b/docs/channels/discord.md index da9264409b6..2c2f58905c6 100644 --- a/docs/channels/discord.md +++ b/docs/channels/discord.md @@ -593,6 +593,7 @@ Default slash command settings: - `channels.discord.streamMode` is a legacy alias and is auto-migrated. - `partial` edits a single preview message as tokens arrive. - `block` emits draft-sized chunks (use `draftChunk` to tune size and breakpoints). + - Media, error, and explicit-reply finals cancel pending preview edits without flushing a temporary draft before normal delivery. - `streaming.preview.toolProgress` controls whether tool/progress updates reuse the same draft preview message (default: `true`). Set `false` to keep separate tool/progress messages. Example: diff --git a/docs/channels/mattermost.md b/docs/channels/mattermost.md index 2ee0c52492e..918de5c2747 100644 --- a/docs/channels/mattermost.md +++ b/docs/channels/mattermost.md @@ -246,7 +246,7 @@ Notes: ## Preview streaming -Mattermost streams thinking, tool activity, and partial reply text into a single **draft preview post** that finalizes in place when the final answer is safe to send. The preview updates on the same post id instead of spamming the channel with per-chunk messages. +Mattermost streams thinking, tool activity, and partial reply text into a single **draft preview post** that finalizes in place when the final answer is safe to send. The preview updates on the same post id instead of spamming the channel with per-chunk messages. Media/error finals cancel pending preview edits and use normal delivery instead of flushing a throwaway preview post. Enable via `channels.mattermost.streaming`: diff --git a/docs/channels/slack.md b/docs/channels/slack.md index 629df72c524..39486e8f398 100644 --- a/docs/channels/slack.md +++ b/docs/channels/slack.md @@ -742,6 +742,7 @@ Notes: - Channel and group-chat roots can still use the normal draft preview when native streaming is unavailable. - Top-level Slack DMs stay off-thread by default, so they do not show the thread-style preview; use thread replies or `typingReaction` if you want visible progress there. - Media and non-text payloads fall back to normal delivery. +- Media/error finals cancel pending preview edits without flushing a temporary draft; eligible text/block finals flush only when they can edit the preview in place. - If streaming fails mid-reply, OpenClaw falls back to normal delivery for remaining payloads. Use draft preview instead of Slack native text streaming: diff --git a/docs/concepts/streaming.md b/docs/concepts/streaming.md index d047154def6..9658dfdbaa0 100644 --- a/docs/concepts/streaming.md +++ b/docs/concepts/streaming.md @@ -149,17 +149,25 @@ Discord: - Uses send + edit preview messages. - `block` mode uses draft chunking (`draftChunk`). - Preview streaming is skipped when Discord block streaming is explicitly enabled. +- Final media, error, and explicit-reply payloads cancel pending previews without flushing a new draft, then use normal delivery. Slack: - `partial` can use Slack native streaming (`chat.startStream`/`append`/`stop`) when available. - `block` uses append-style draft previews. - `progress` uses status preview text, then final answer. +- Final media/error payloads and progress finals do not create throwaway draft messages; only text/block finals that can edit the preview flush pending draft text. Mattermost: - Streams thinking, tool activity, and partial reply text into a single draft preview post that finalizes in place when the final answer is safe to send. - Falls back to sending a fresh final post if the preview post was deleted or is otherwise unavailable at finalize time. +- Final media/error payloads cancel pending preview updates before normal delivery instead of flushing a temporary preview post. + +Matrix: + +- Draft previews finalize in place when the final text can reuse the preview event. +- Media-only, error, and reply-target-mismatch finals cancel pending preview updates before normal delivery; an already-visible stale preview is redacted. ### Tool-progress preview updates diff --git a/docs/plugins/sdk-migration.md b/docs/plugins/sdk-migration.md index cc03ff7aba8..aae3e2f4b0e 100644 --- a/docs/plugins/sdk-migration.md +++ b/docs/plugins/sdk-migration.md @@ -202,7 +202,7 @@ Current bundled provider examples: | `plugin-sdk/channel-config-schema` | Config schema builders | Channel config schema types | | `plugin-sdk/telegram-command-config` | Telegram command config helpers | Command-name normalization, description trimming, duplicate/conflict validation | | `plugin-sdk/channel-policy` | Group/DM policy resolution | `resolveChannelGroupRequireMention` | - | `plugin-sdk/channel-lifecycle` | Account status tracking | `createAccountStatusSink` | + | `plugin-sdk/channel-lifecycle` | Account status and draft stream lifecycle helpers | `createAccountStatusSink`, draft preview finalization helpers | | `plugin-sdk/inbound-envelope` | Inbound envelope helpers | Shared route + envelope builder helpers | | `plugin-sdk/inbound-reply-dispatch` | Inbound reply helpers | Shared record-and-dispatch helpers | | `plugin-sdk/messaging-targets` | Messaging target parsing | Target parsing/matching helpers | diff --git a/docs/plugins/sdk-overview.md b/docs/plugins/sdk-overview.md index 32101687845..a435e54497c 100644 --- a/docs/plugins/sdk-overview.md +++ b/docs/plugins/sdk-overview.md @@ -90,7 +90,7 @@ explicitly promotes one as public. | `plugin-sdk/telegram-command-config` | Telegram custom-command normalization/validation helpers with bundled-contract fallback | | `plugin-sdk/command-gating` | Narrow command authorization gate helpers | | `plugin-sdk/channel-policy` | `resolveChannelGroupRequireMention` | - | `plugin-sdk/channel-lifecycle` | `createAccountStatusSink` | + | `plugin-sdk/channel-lifecycle` | `createAccountStatusSink`, draft stream lifecycle/finalization helpers | | `plugin-sdk/inbound-envelope` | Shared inbound route + envelope builder helpers | | `plugin-sdk/inbound-reply-dispatch` | Shared inbound record-and-dispatch helpers | | `plugin-sdk/messaging-targets` | Target parsing/matching helpers | diff --git a/extensions/discord/src/draft-stream.test.ts b/extensions/discord/src/draft-stream.test.ts index ee4febef744..110395b4007 100644 --- a/extensions/discord/src/draft-stream.test.ts +++ b/extensions/discord/src/draft-stream.test.ts @@ -78,4 +78,50 @@ describe("createDiscordDraftStream", () => { expect(warn).toHaveBeenCalledWith(expect.stringContaining("discord stream preview stopped")); expect(stream.messageId()).toBeUndefined(); }); + + it("discardPending keeps an existing preview but ignores later updates", async () => { + const rest = { + post: vi.fn(async () => ({ id: "m1" })), + patch: vi.fn(async () => undefined), + delete: vi.fn(async () => undefined), + }; + const stream = createDiscordDraftStream({ + rest: rest as never, + channelId: "c1", + throttleMs: 250, + }); + + stream.update("first draft"); + await stream.flush(); + await stream.discardPending(); + stream.update("late draft"); + await stream.flush(); + + expect(rest.post).toHaveBeenCalledTimes(1); + expect(rest.patch).not.toHaveBeenCalled(); + expect(rest.delete).not.toHaveBeenCalled(); + expect(stream.messageId()).toBe("m1"); + }); + + it("seal keeps an existing preview and cancels pending final overwrites", async () => { + const rest = { + post: vi.fn(async () => ({ id: "m1" })), + patch: vi.fn(async () => undefined), + delete: vi.fn(async () => undefined), + }; + const stream = createDiscordDraftStream({ + rest: rest as never, + channelId: "c1", + throttleMs: 250, + }); + + stream.update("first draft"); + await stream.flush(); + stream.update("stale final draft"); + await stream.seal(); + + expect(rest.post).toHaveBeenCalledTimes(1); + expect(rest.patch).not.toHaveBeenCalled(); + expect(stream.messageId()).toBe("m1"); + }); }); diff --git a/extensions/discord/src/draft-stream.ts b/extensions/discord/src/draft-stream.ts index 417e6857bf6..7cc74eb78e9 100644 --- a/extensions/discord/src/draft-stream.ts +++ b/extensions/discord/src/draft-stream.ts @@ -12,6 +12,8 @@ export type DiscordDraftStream = { flush: () => Promise; messageId: () => string | undefined; clear: () => Promise; + discardPending: () => Promise; + seal: () => Promise; stop: () => Promise; /** Reset internal state so the next update creates a new message instead of editing. */ forceNewMessage: () => void; @@ -113,7 +115,7 @@ export function createDiscordDraftStream(params: { await rest.delete(Routes.channelMessage(channelId, messageId)); }; - const { loop, update, stop, clear } = createFinalizableDraftLifecycle({ + const { loop, update, stop, clear, discardPending, seal } = createFinalizableDraftLifecycle({ throttleMs, state: streamState, sendOrEditStreamMessage, @@ -138,6 +140,8 @@ export function createDiscordDraftStream(params: { flush: loop.flush, messageId: () => streamMessageId, clear, + discardPending, + seal, stop, forceNewMessage, }; diff --git a/extensions/discord/src/monitor/message-handler.process.test.ts b/extensions/discord/src/monitor/message-handler.process.test.ts index 2c5ee607e3a..017270464f1 100644 --- a/extensions/discord/src/monitor/message-handler.process.test.ts +++ b/extensions/discord/src/monitor/message-handler.process.test.ts @@ -11,11 +11,16 @@ const sendMocks = vi.hoisted(() => ({ >(async () => {}), })); function createMockDraftStream() { + let messageId: string | undefined = "preview-1"; return { update: vi.fn<(text: string) => void>(() => {}), flush: vi.fn(async () => {}), - messageId: vi.fn(() => "preview-1"), - clear: vi.fn(async () => {}), + messageId: vi.fn(() => messageId), + clear: vi.fn(async () => { + messageId = undefined; + }), + discardPending: vi.fn(async () => {}), + seal: vi.fn(async () => {}), stop: vi.fn(async () => {}), forceNewMessage: vi.fn(() => {}), }; @@ -820,6 +825,52 @@ describe("processDiscordMessage draft streaming", () => { expect(deliverDiscordReply).toHaveBeenCalledTimes(1); }); + it("does not flush draft previews for media finals before normal delivery", async () => { + const draftStream = createMockDraftStreamForTest(); + dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { + await params?.dispatcher.sendFinalReply({ + text: "Photo", + mediaUrl: "https://example.com/a.png", + } as never); + return { queuedFinal: true, counts: { final: 1, tool: 0, block: 0 } }; + }); + + const ctx = await createBaseContext({ + discordConfig: { streamMode: "partial", maxLinesPerMessage: 5 }, + }); + + await processDiscordMessage(ctx as any); + + expect(draftStream.flush).not.toHaveBeenCalled(); + expect(draftStream.discardPending).toHaveBeenCalledTimes(1); + expect(draftStream.clear).toHaveBeenCalledTimes(1); + expect(editMessageDiscord).not.toHaveBeenCalled(); + expect(deliverDiscordReply).toHaveBeenCalledTimes(1); + }); + + it("does not flush draft previews for error finals before normal delivery", async () => { + const draftStream = createMockDraftStreamForTest(); + dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { + await params?.dispatcher.sendFinalReply({ + text: "Something failed", + isError: true, + } as never); + return { queuedFinal: true, counts: { final: 1, tool: 0, block: 0 } }; + }); + + const ctx = await createBaseContext({ + discordConfig: { streamMode: "partial", maxLinesPerMessage: 5 }, + }); + + await processDiscordMessage(ctx as any); + + expect(draftStream.flush).not.toHaveBeenCalled(); + expect(draftStream.discardPending).toHaveBeenCalledTimes(1); + expect(draftStream.clear).toHaveBeenCalledTimes(1); + expect(editMessageDiscord).not.toHaveBeenCalled(); + expect(deliverDiscordReply).toHaveBeenCalledTimes(1); + }); + it("suppresses reasoning payload delivery to Discord", async () => { mockDispatchSingleBlockReply({ text: "thinking...", isReasoning: true }); await processStreamOffDiscordMessage(); diff --git a/extensions/discord/src/monitor/message-handler.process.ts b/extensions/discord/src/monitor/message-handler.process.ts index a0d1584dc6b..97d11e54362 100644 --- a/extensions/discord/src/monitor/message-handler.process.ts +++ b/extensions/discord/src/monitor/message-handler.process.ts @@ -15,6 +15,7 @@ import { formatInboundEnvelope, resolveEnvelopeFormatOptions, } from "openclaw/plugin-sdk/channel-inbound"; +import { deliverFinalizableDraftPreview } from "openclaw/plugin-sdk/channel-lifecycle"; import { createChannelReplyPipeline } from "openclaw/plugin-sdk/channel-reply-pipeline"; import { resolveChannelStreamingBlockEnabled, @@ -579,7 +580,7 @@ export async function processDiscordMessage( resolveChannelStreamingBlockEnabled(discordConfig) ?? cfg.agents?.defaults?.blockStreamingDefault === "on"; const canStreamDraft = discordStreamMode !== "off" && !accountBlockStreamingEnabled; - const draftReplyToMessageId = () => replyReference.use(); + const draftReplyToMessageId = () => replyReference.peek(); const deliverChannelId = deliverTarget.startsWith("channel:") ? deliverTarget.slice("channel:".length) : messageChannelId; @@ -605,6 +606,7 @@ export async function processDiscordMessage( let draftText = ""; let hasStreamedMessage = false; let finalizedViaPreviewMessage = false; + let draftFinalDeliveryHandled = false; const previewToolProgressEnabled = Boolean(draftStream) && resolveChannelStreamingPreviewToolProgress(discordConfig); let previewToolProgressSuppressed = false; @@ -770,7 +772,7 @@ export async function processDiscordMessage( return; } if (draftStream && isFinal) { - await flushDraft(); + draftFinalDeliveryHandled = true; const reply = resolveSendableOutboundReplyParts(payload); const hasMedia = reply.hasMedia; const finalText = payload.text; @@ -778,78 +780,79 @@ export async function processDiscordMessage( const hasExplicitReplyDirective = Boolean(payload.replyToTag || payload.replyToCurrent) || (typeof finalText === "string" && /\[\[\s*reply_to(?:_current|\s*:)/i.test(finalText)); - const previewMessageId = draftStream.messageId(); - // Try to finalize via preview edit (text-only, fits in 2000 chars, not an error) - const canFinalizeViaPreviewEdit = - !finalizedViaPreviewMessage && - !hasMedia && - typeof previewFinalText === "string" && - typeof previewMessageId === "string" && - !hasExplicitReplyDirective && - !payload.isError; - - if (canFinalizeViaPreviewEdit) { - await draftStream.stop(); - if (isProcessAborted(abortSignal)) { - return; - } - try { + const result = await deliverFinalizableDraftPreview({ + kind: info.kind, + payload, + draft: { + flush: flushDraft, + clear: draftStream.clear, + discardPending: draftStream.discardPending, + seal: draftStream.seal, + id: draftStream.messageId, + }, + buildFinalEdit: () => { + if ( + finalizedViaPreviewMessage || + hasMedia || + typeof previewFinalText !== "string" || + hasExplicitReplyDirective || + payload.isError + ) { + return undefined; + } + return { content: previewFinalText }; + }, + editFinal: async (previewMessageId, edit) => { + if (isProcessAborted(abortSignal)) { + throw new Error("process aborted"); + } notifyFinalReplyStart(); - await editMessageDiscord( - deliverChannelId, - previewMessageId, - { content: previewFinalText }, - { rest: deliveryRest }, - ); + await editMessageDiscord(deliverChannelId, previewMessageId, edit, { + rest: deliveryRest, + }); + }, + deliverNormally: async () => { + if (isProcessAborted(abortSignal)) { + return false; + } + const replyToId = replyReference.use(); + notifyFinalReplyStart(); + await deliverDiscordReply({ + cfg, + replies: [payload], + target: deliverTarget, + token, + accountId, + rest: deliveryRest, + runtime, + replyToId, + replyToMode, + textLimit, + maxLinesPerMessage, + tableMode, + chunkMode, + sessionKey: ctxPayload.SessionKey, + threadBindings, + mediaLocalRoots, + }); + replyReference.markSent(); + observer?.onFinalReplyDelivered?.(); + return true; + }, + onPreviewFinalized: () => { finalizedViaPreviewMessage = true; replyReference.markSent(); observer?.onFinalReplyDelivered?.(); - return; - } catch (err) { + }, + logPreviewEditFailure: (err) => { logVerbose( `discord: preview final edit failed; falling back to standard send (${String(err)})`, ); - } - } - - // Check if stop() flushed a message we can edit - if (!finalizedViaPreviewMessage) { - await draftStream.stop(); - if (isProcessAborted(abortSignal)) { - return; - } - const messageIdAfterStop = draftStream.messageId(); - if ( - typeof messageIdAfterStop === "string" && - typeof previewFinalText === "string" && - !hasMedia && - !hasExplicitReplyDirective && - !payload.isError - ) { - try { - notifyFinalReplyStart(); - await editMessageDiscord( - deliverChannelId, - messageIdAfterStop, - { content: previewFinalText }, - { rest: deliveryRest }, - ); - finalizedViaPreviewMessage = true; - replyReference.markSent(); - observer?.onFinalReplyDelivered?.(); - return; - } catch (err) { - logVerbose( - `discord: post-stop preview edit failed; falling back to standard send (${String(err)})`, - ); - } - } - } - - // Clear the preview and fall through to standard delivery - if (!finalizedViaPreviewMessage) { - await draftStream.clear(); + }, + }); + if (result !== "normal-skipped") { + return; } } if (isProcessAborted(abortSignal)) { @@ -1019,9 +1022,10 @@ export async function processDiscordMessage( throw err; } finally { try { - // Must stop() first to flush debounced content before clear() wipes state. - await draftStream?.stop(); - if (!finalizedViaPreviewMessage) { + if (!draftFinalDeliveryHandled) { + await draftStream?.discardPending(); + } + if (!draftFinalDeliveryHandled && !finalizedViaPreviewMessage && draftStream?.messageId()) { await draftStream?.clear(); } } catch (err) { diff --git a/extensions/matrix/src/matrix/draft-stream.test.ts b/extensions/matrix/src/matrix/draft-stream.test.ts index 559dddacd41..113f3247f99 100644 --- a/extensions/matrix/src/matrix/draft-stream.test.ts +++ b/extensions/matrix/src/matrix/draft-stream.test.ts @@ -182,6 +182,7 @@ describe("createMatrixDraftStream", () => { .mockReset() .mockImplementation((text: string) => (text ? [text] : [])); convertMarkdownTablesMock.mockReset().mockImplementation((text: string) => text); + sendModuleMocks.editMessageMatrix.mockClear(); }); afterEach(() => { @@ -503,6 +504,24 @@ describe("createMatrixDraftStream", () => { ); }); + it("discardPending cancels pending updates without creating another preview event", async () => { + const stream = createMatrixDraftStream({ + roomId: "!room:test", + client, + cfg: {} as import("../types.js").CoreConfig, + }); + + stream.update("First draft"); + await stream.flush(); + stream.update("Pending draft"); + await stream.discardPending(); + await stream.flush(); + + expect(sendMessageMock).toHaveBeenCalledTimes(1); + expect(sendModuleMocks.editMessageMatrix).not.toHaveBeenCalled(); + expect(stream.eventId()).toBe("$evt1"); + }); + it("uses converted Matrix text when checking the single-event preview limit", async () => { const log = vi.fn(); resolveTextChunkLimitMock.mockReturnValue(5); diff --git a/extensions/matrix/src/matrix/draft-stream.ts b/extensions/matrix/src/matrix/draft-stream.ts index 1d18c4c3e0e..8ef7dacef8b 100644 --- a/extensions/matrix/src/matrix/draft-stream.ts +++ b/extensions/matrix/src/matrix/draft-stream.ts @@ -29,6 +29,8 @@ export type MatrixDraftStream = { flush: () => Promise; /** Flush and mark this block as done. Returns the event ID if a message was sent. */ stop: () => Promise; + /** Cancel pending draft updates without creating a new preview event. */ + discardPending: () => Promise; /** Clear the MSC4357 live marker in place when the draft is kept as final text. */ finalizeLive: () => Promise; /** Reset state for the next text block (after tool calls). */ @@ -180,6 +182,12 @@ export function createMatrixDraftStream(params: { return currentEventId; }; + const discardPending = async (): Promise => { + stopped = true; + loop.stop(); + await loop.waitForInFlight(); + }; + const reset = (): void => { // Clear reply context unless preserveReplyId is set (replyToMode "all"), // in which case subsequent blocks should keep replying to the original. @@ -203,6 +211,7 @@ export function createMatrixDraftStream(params: { }, flush: loop.flush, stop, + discardPending, finalizeLive, reset, eventId: () => currentEventId, diff --git a/extensions/matrix/src/matrix/monitor/handler.test.ts b/extensions/matrix/src/matrix/monitor/handler.test.ts index 1f719226bfb..b46b6ed60a4 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test.ts @@ -3231,6 +3231,50 @@ describe("matrix monitor handler draft streaming", () => { await finish(); }); + it("does not create a throwaway draft for fast media-only finals", async () => { + const { dispatch, redactEventMock } = createStreamingHarness(); + const { deliver, finish } = await dispatch(); + + await deliver({ mediaUrl: "https://example.com/image.png" }, { kind: "final" }); + + expect(sendSingleTextMessageMatrixMock).not.toHaveBeenCalled(); + expect(editMessageMatrixMock).not.toHaveBeenCalled(); + expect(redactEventMock).not.toHaveBeenCalled(); + expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1); + await finish(); + }); + + it("does not create a throwaway draft for fast error finals", async () => { + const { dispatch, redactEventMock } = createStreamingHarness(); + const { deliver, finish } = await dispatch(); + + await deliver({ text: "Something failed", isError: true } as never, { kind: "final" }); + + expect(sendSingleTextMessageMatrixMock).not.toHaveBeenCalled(); + expect(editMessageMatrixMock).not.toHaveBeenCalled(); + expect(redactEventMock).not.toHaveBeenCalled(); + expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1); + await finish(); + }); + + it("redacts existing drafts for text error finals and uses normal delivery", async () => { + const { dispatch, redactEventMock } = createStreamingHarness(); + const { deliver, opts, finish } = await dispatch(); + + opts.onPartialReply?.({ text: "Partial reply" }); + await vi.waitFor(() => { + expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1); + }); + + deliverMatrixRepliesMock.mockClear(); + await deliver({ text: "Something failed", isError: true } as never, { kind: "final" }); + + expect(editMessageMatrixMock).not.toHaveBeenCalled(); + expect(redactEventMock).toHaveBeenCalledWith("!room:example.org", "$draft1"); + expect(deliverMatrixRepliesMock).toHaveBeenCalledTimes(1); + await finish(); + }); + it("finalizes partial drafts before reusing unchanged media captions", async () => { const { dispatch, redactEventMock } = createStreamingHarness({ streaming: "partial" }); const { deliver, opts, finish } = await dispatch(); diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index b41d378daf5..867808306cb 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -121,6 +121,7 @@ type MatrixAllowBotsMode = "off" | "mentions" | "all"; type MatrixDraftStreamHandle = { update: (text: string) => void; stop: () => Promise; + discardPending: () => Promise; eventId: () => string | undefined; mustDeliverFinalNormally: () => boolean; matchesPreparedText: (text: string) => boolean; @@ -1547,10 +1548,8 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam if (draftStream && info.kind !== "tool" && !payload.isCompactionNotice) { const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; - await draftStream.stop(); - const draftEventId = draftStream.eventId(); - if (draftConsumed) { + await draftStream.discardPending(); await deliverMatrixReplies({ cfg, replies: [payload], @@ -1572,11 +1571,25 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam replyToMode !== "off" && !threadTarget && payloadReplyToId !== currentDraftReplyToId; - const mustDeliverFinalNormally = draftStream.mustDeliverFinalNormally(); + let mustDeliverFinalNormally = draftStream.mustDeliverFinalNormally(); + const canPotentiallyFinalizeDraft = + Boolean(payload.text?.trim()) && + !payload.isError && + !payloadReplyMismatch && + !mustDeliverFinalNormally; + + if (canPotentiallyFinalizeDraft) { + await draftStream.stop(); + mustDeliverFinalNormally = draftStream.mustDeliverFinalNormally(); + } else { + await draftStream.discardPending(); + } + const draftEventId = draftStream.eventId(); if ( draftEventId && payload.text && + !payload.isError && !hasMedia && !payloadReplyMismatch && !mustDeliverFinalNormally @@ -1666,7 +1679,8 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam draftConsumed = true; } else { const draftRedacted = - Boolean(draftEventId) && (payloadReplyMismatch || mustDeliverFinalNormally); + Boolean(draftEventId) && + (payload.isError || payloadReplyMismatch || mustDeliverFinalNormally); if (draftRedacted && draftEventId) { await redactMatrixDraftEvent(client, roomId, draftEventId); } diff --git a/extensions/mattermost/src/mattermost/draft-stream.test.ts b/extensions/mattermost/src/mattermost/draft-stream.test.ts index 37d82c2425a..c922b8d760c 100644 --- a/extensions/mattermost/src/mattermost/draft-stream.test.ts +++ b/extensions/mattermost/src/mattermost/draft-stream.test.ts @@ -100,6 +100,45 @@ describe("createMattermostDraftStream", () => { expect(stream.postId()).toBeUndefined(); }); + it("discardPending keeps the preview post but ignores later updates", async () => { + const { client, calls } = createMockClient(); + const stream = createMattermostDraftStream({ + client, + channelId: "channel-1", + rootId: "root-1", + throttleMs: 0, + }); + + stream.update("Working..."); + await stream.flush(); + await stream.discardPending(); + stream.update("Late update"); + await stream.flush(); + + expect(calls).toHaveLength(1); + expect(calls[0]?.path).toBe("/posts"); + expect(stream.postId()).toBe("post-1"); + }); + + it("seal keeps the preview post and cancels pending final overwrites", async () => { + const { client, calls } = createMockClient(); + const stream = createMattermostDraftStream({ + client, + channelId: "channel-1", + rootId: "root-1", + throttleMs: 0, + }); + + stream.update("Working..."); + await stream.flush(); + stream.update("Stale final draft"); + await stream.seal(); + + expect(calls).toHaveLength(1); + expect(calls[0]?.path).toBe("/posts"); + expect(stream.postId()).toBe("post-1"); + }); + it("stop flushes the last pending update and ignores later ones", async () => { const { client, calls } = createMockClient(); const stream = createMattermostDraftStream({ diff --git a/extensions/mattermost/src/mattermost/draft-stream.ts b/extensions/mattermost/src/mattermost/draft-stream.ts index 3b928448022..c022fccc845 100644 --- a/extensions/mattermost/src/mattermost/draft-stream.ts +++ b/extensions/mattermost/src/mattermost/draft-stream.ts @@ -14,6 +14,8 @@ export type MattermostDraftStream = { flush: () => Promise; postId: () => string | undefined; clear: () => Promise; + discardPending: () => Promise; + seal: () => Promise; stop: () => Promise; forceNewMessage: () => void; }; @@ -95,7 +97,7 @@ export function createMattermostDraftStream(params: { } }; - const { loop, update, stop, clear } = createFinalizableDraftLifecycle({ + const { loop, update, stop, clear, discardPending, seal } = createFinalizableDraftLifecycle({ throttleMs, state: streamState, sendOrEditStreamMessage, @@ -125,6 +127,8 @@ export function createMattermostDraftStream(params: { flush: loop.flush, postId: () => streamPostId, clear, + discardPending, + seal, stop, forceNewMessage, }; diff --git a/extensions/mattermost/src/mattermost/monitor.test.ts b/extensions/mattermost/src/mattermost/monitor.test.ts index 4369437c15c..663d7f0a95e 100644 --- a/extensions/mattermost/src/mattermost/monitor.test.ts +++ b/extensions/mattermost/src/mattermost/monitor.test.ts @@ -66,7 +66,8 @@ function createDraftStreamMock(postId: string | undefined = "preview-post-1") { flush: vi.fn(async () => {}), postId: vi.fn(() => postId), clear: vi.fn(async () => {}), - stop: vi.fn(async () => {}), + discardPending: vi.fn(async () => {}), + seal: vi.fn(async () => {}), }; } @@ -267,6 +268,8 @@ describe("deliverMattermostReplyWithDraftPreview", () => { }); expect(deliverFinal).toHaveBeenCalledTimes(1); + expect(draftStream.flush).not.toHaveBeenCalled(); + expect(draftStream.discardPending).toHaveBeenCalledTimes(1); expect(draftStream.clear).toHaveBeenCalledTimes(1); expect(updateMattermostPostSpy).not.toHaveBeenCalled(); }); @@ -291,6 +294,29 @@ describe("deliverMattermostReplyWithDraftPreview", () => { deliverFinal, }); + expect(deliverFinal).toHaveBeenCalledTimes(1); + expect(draftStream.flush).not.toHaveBeenCalled(); + expect(draftStream.discardPending).toHaveBeenCalledTimes(1); + expect(draftStream.clear).toHaveBeenCalledTimes(1); + }); + + it("does not flush error finals before normal delivery", async () => { + const draftStream = createDraftStreamMock(); + const deliverFinal = vi.fn(async () => {}); + + await deliverMattermostReplyWithDraftPreview({ + payload: { text: "Error", isError: true } as never, + info: { kind: "final" }, + client: createMattermostClientMock(), + draftStream, + effectiveReplyToId: "thread-root-1", + resolvePreviewFinalText: (text) => text?.trim(), + previewState: { finalizedViaPreviewPost: false }, + logVerboseMessage: vi.fn(), + deliverFinal, + }); + + expect(draftStream.flush).not.toHaveBeenCalled(); expect(deliverFinal).toHaveBeenCalledTimes(1); expect(draftStream.clear).toHaveBeenCalledTimes(1); }); @@ -316,8 +342,9 @@ describe("deliverMattermostReplyWithDraftPreview", () => { "preview-post-1", expect.objectContaining({ message: "Final answer" }), ); - expect(draftStream.stop).toHaveBeenCalledTimes(1); - expect(draftStream.stop.mock.invocationCallOrder[0]).toBeLessThan( + expect(draftStream.flush).toHaveBeenCalledTimes(1); + expect(draftStream.seal).toHaveBeenCalledTimes(1); + expect(draftStream.seal.mock.invocationCallOrder[0]).toBeLessThan( updateMattermostPostSpy.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, ); expect(deliverFinal).not.toHaveBeenCalled(); @@ -343,6 +370,7 @@ describe("deliverMattermostReplyWithDraftPreview", () => { }), ).rejects.toThrow("send failed"); + expect(draftStream.discardPending).toHaveBeenCalledTimes(1); expect(draftStream.clear).not.toHaveBeenCalled(); expect(updateMattermostPostSpy).not.toHaveBeenCalledWith( expect.anything(), diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index 8696ec337ae..83ea786ad98 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -1,3 +1,4 @@ +import { deliverFinalizableDraftPreview } from "openclaw/plugin-sdk/channel-lifecycle"; import { createClaimableDedupe, type ClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe"; import { isPrivateNetworkOptInEnabled } from "openclaw/plugin-sdk/ssrf-runtime"; import { @@ -276,7 +277,7 @@ type MattermostDraftPreviewDeliverParams = { client: MattermostClient; draftStream: Pick< ReturnType, - "flush" | "postId" | "clear" | "stop" + "flush" | "postId" | "clear" | "discardPending" | "seal" >; effectiveReplyToId?: string; resolvePreviewFinalText: (text?: string) => string | undefined; @@ -292,65 +293,49 @@ export async function deliverMattermostReplyWithDraftPreview( return; } - const isFinal = params.info.kind === "final"; - let previewPostId: string | undefined; - if (isFinal) { - await params.draftStream.flush(); - const hasMedia = - Boolean(params.payload.mediaUrl) || (params.payload.mediaUrls?.length ?? 0) > 0; - const previewFinalText = params.resolvePreviewFinalText(params.payload.text); - previewPostId = params.draftStream.postId(); + await deliverFinalizableDraftPreview({ + kind: params.info.kind, + payload: params.payload, + draft: { + flush: params.draftStream.flush, + clear: params.draftStream.clear, + discardPending: params.draftStream.discardPending, + seal: params.draftStream.seal, + id: params.draftStream.postId, + }, + buildFinalEdit: (payload) => { + const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; + const previewFinalText = params.resolvePreviewFinalText(payload.text); - if ( - typeof previewPostId === "string" && - !hasMedia && - typeof previewFinalText === "string" && - !params.payload.isError && - canFinalizeMattermostPreviewInPlace({ - previewRootId: params.effectiveReplyToId, - threadRootId: params.effectiveReplyToId, - replyToId: params.payload.replyToId, - }) - ) { - try { - // Seal the preview before the final edit so late draft events cannot - // patch over the finalized visible message. - await params.draftStream.stop(); - await updateMattermostPost(params.client, previewPostId, { - message: previewFinalText, - }); - params.previewState.finalizedViaPreviewPost = true; - return; - } catch (err) { - params.logVerboseMessage( - `mattermost preview final edit failed; falling back to normal send (${String(err)})`, - ); + if ( + hasMedia || + typeof previewFinalText !== "string" || + payload.isError || + !canFinalizeMattermostPreviewInPlace({ + previewRootId: params.effectiveReplyToId, + threadRootId: params.effectiveReplyToId, + replyToId: payload.replyToId, + }) + ) { + return undefined; } - } - } - - let finalReplyDelivered = false; - try { - await params.deliverFinal(); - finalReplyDelivered = true; - } finally { - if ( - isFinal && - typeof previewPostId === "string" && - shouldClearMattermostDraftPreview({ - finalizedViaPreviewPost: params.previewState.finalizedViaPreviewPost, - finalReplyDelivered, - }) - ) { - try { - await params.draftStream.clear(); - } catch (err) { - params.logVerboseMessage( - `mattermost draft preview clear failed after successful final delivery (${String(err)})`, - ); - } - } - } + return { message: previewFinalText }; + }, + editFinal: async (previewPostId, edit) => { + await updateMattermostPost(params.client, previewPostId, edit); + }, + deliverNormally: async () => { + await params.deliverFinal(); + }, + onPreviewFinalized: () => { + params.previewState.finalizedViaPreviewPost = true; + }, + logPreviewEditFailure: (err) => { + params.logVerboseMessage( + `mattermost preview final edit failed; falling back to normal send (${String(err)})`, + ); + }, + }); } export function resolveMattermostEffectiveReplyToId(params: { diff --git a/extensions/slack/src/draft-stream.test.ts b/extensions/slack/src/draft-stream.test.ts index 9f41ab03294..9ee95622b5f 100644 --- a/extensions/slack/src/draft-stream.test.ts +++ b/extensions/slack/src/draft-stream.test.ts @@ -132,6 +132,22 @@ describe("createSlackDraftStream", () => { expect(stream.channelId()).toBeUndefined(); }); + it("discardPending stops late updates without deleting the visible preview", async () => { + const { stream, send, edit, remove } = createDraftStreamHarness(); + + stream.update("hello"); + await stream.flush(); + await stream.discardPending(); + stream.update("late"); + await stream.flush(); + + expect(send).toHaveBeenCalledTimes(1); + expect(edit).not.toHaveBeenCalled(); + expect(remove).not.toHaveBeenCalled(); + expect(stream.messageId()).toBe("111.222"); + expect(stream.channelId()).toBe("C123"); + }); + it("clear is a no-op when no preview message exists", async () => { const { stream, remove } = createDraftStreamHarness(); diff --git a/extensions/slack/src/draft-stream.ts b/extensions/slack/src/draft-stream.ts index 52a19f1bb90..f52ef14169a 100644 --- a/extensions/slack/src/draft-stream.ts +++ b/extensions/slack/src/draft-stream.ts @@ -10,6 +10,8 @@ export type SlackDraftStream = { update: (text: string) => void; flush: () => Promise; clear: () => Promise; + discardPending: () => Promise; + seal: () => Promise; stop: () => void; forceNewMessage: () => void; messageId: () => string | undefined; @@ -95,9 +97,13 @@ export function createSlackDraftStream(params: { loop.stop(); }; - const clear = async () => { + const discardPending = async () => { stop(); await loop.waitForInFlight(); + }; + + const clear = async () => { + await discardPending(); const channelId = streamChannelId; const messageId = streamMessageId; streamChannelId = undefined; @@ -129,6 +135,8 @@ export function createSlackDraftStream(params: { update: loop.update, flush: loop.flush, clear, + discardPending, + seal: discardPending, stop, forceNewMessage, messageId: () => streamMessageId, diff --git a/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts b/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts index 942e766cd0a..a9c096e39b3 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts @@ -9,7 +9,7 @@ const deliverRepliesMock = vi.fn(async () => {}); const finalizeSlackPreviewEditMock = vi.fn(async () => {}); let mockedDispatchSequence: Array<{ kind: "tool" | "block" | "final"; - payload: { text: string }; + payload: { text: string; isError?: boolean; mediaUrl?: string; mediaUrls?: string[] }; }> = []; const noop = () => {}; @@ -20,6 +20,8 @@ function createDraftStreamStub() { update: noop, flush: noopAsync, clear: noopAsync, + discardPending: noopAsync, + seal: noopAsync, stop: noop, forceNewMessage: noop, messageId: () => "171234.567", @@ -213,6 +215,7 @@ vi.mock("../config.runtime.js", () => ({ vi.mock("../replies.js", () => ({ createSlackReplyDeliveryPlan: () => ({ + peekThreadTs: () => THREAD_TS, nextThreadTs: () => THREAD_TS, markSent: () => {}, }), @@ -234,7 +237,7 @@ vi.mock("../reply.runtime.js", () => ({ dispatchInboundMessage: async (params: { dispatcher: { deliver: ( - payload: { text: string }, + payload: { text: string; isError?: boolean; mediaUrl?: string; mediaUrls?: string[] }, info: { kind: "tool" | "block" | "final" }, ) => Promise; }; @@ -293,7 +296,7 @@ describe("dispatchPreparedSlackMessage preview fallback", () => { await dispatchPreparedSlackMessage(createPreparedSlackMessage()); - expect(finalizeSlackPreviewEditMock).toHaveBeenCalledTimes(2); + expect(finalizeSlackPreviewEditMock).toHaveBeenCalledTimes(1); expect(deliverRepliesMock).toHaveBeenCalledTimes(2); expect(deliverRepliesMock).toHaveBeenNthCalledWith( 1, @@ -310,4 +313,54 @@ describe("dispatchPreparedSlackMessage preview fallback", () => { }), ); }); + + it("does not flush draft previews for media finals before normal delivery", async () => { + const draftStream = { + ...createDraftStreamStub(), + flush: vi.fn(noopAsync), + clear: vi.fn(noopAsync), + discardPending: vi.fn(noopAsync), + seal: vi.fn(noopAsync), + }; + createSlackDraftStreamMock.mockReturnValueOnce(draftStream); + mockedDispatchSequence = [ + { + kind: "final", + payload: { text: "Photo", mediaUrl: "https://example.com/a.png" }, + }, + ]; + + await dispatchPreparedSlackMessage(createPreparedSlackMessage()); + + expect(draftStream.flush).not.toHaveBeenCalled(); + expect(draftStream.discardPending).toHaveBeenCalled(); + expect(draftStream.clear).toHaveBeenCalledTimes(1); + expect(finalizeSlackPreviewEditMock).not.toHaveBeenCalled(); + expect(deliverRepliesMock).toHaveBeenCalledTimes(1); + }); + + it("does not flush draft previews for error finals before normal delivery", async () => { + const draftStream = { + ...createDraftStreamStub(), + flush: vi.fn(noopAsync), + clear: vi.fn(noopAsync), + discardPending: vi.fn(noopAsync), + seal: vi.fn(noopAsync), + }; + createSlackDraftStreamMock.mockReturnValueOnce(draftStream); + mockedDispatchSequence = [ + { + kind: "final", + payload: { text: "Something failed", isError: true }, + }, + ]; + + await dispatchPreparedSlackMessage(createPreparedSlackMessage()); + + expect(draftStream.flush).not.toHaveBeenCalled(); + expect(draftStream.discardPending).toHaveBeenCalled(); + expect(draftStream.clear).toHaveBeenCalledTimes(1); + expect(finalizeSlackPreviewEditMock).not.toHaveBeenCalled(); + expect(deliverRepliesMock).toHaveBeenCalledTimes(1); + }); }); diff --git a/extensions/slack/src/monitor/message-handler/dispatch.ts b/extensions/slack/src/monitor/message-handler/dispatch.ts index 4c5b8525b3b..5bd5afa6525 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.ts @@ -7,6 +7,7 @@ import { removeAckReactionAfterReply, type StatusReactionAdapter, } from "openclaw/plugin-sdk/channel-feedback"; +import { deliverFinalizableDraftPreview } from "openclaw/plugin-sdk/channel-lifecycle"; import { createChannelReplyPipeline } from "openclaw/plugin-sdk/channel-reply-pipeline"; import { resolveChannelStreamingBlockEnabled, @@ -580,45 +581,9 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag const reply = resolveSendableOutboundReplyParts(payload); const slackBlocks = readSlackReplyBlocks(payload); - const draftMessageId = draftStream?.messageId(); - const draftChannelId = draftStream?.channelId(); const trimmedFinalText = reply.trimmedText; - const canFinalizeViaPreviewEdit = - previewStreamingEnabled && - streamMode !== "status_final" && - !reply.hasMedia && - !payload.isError && - (trimmedFinalText.length > 0 || Boolean(slackBlocks?.length)) && - typeof draftMessageId === "string" && - typeof draftChannelId === "string"; - if (canFinalizeViaPreviewEdit) { - const finalThreadTs = usedReplyThreadTs ?? statusThreadTs; - if (deliveryTracker.hasDelivered({ kind: info.kind, payload, threadTs: finalThreadTs })) { - observedReplyDelivery = true; - return; - } - draftStream?.stop(); - try { - await finalizeSlackPreviewEdit({ - client: ctx.app.client, - token: ctx.botToken, - accountId: account.accountId, - channelId: draftChannelId, - messageId: draftMessageId, - text: normalizeSlackOutboundText(trimmedFinalText), - ...(slackBlocks?.length ? { blocks: slackBlocks } : {}), - threadTs: finalThreadTs, - }); - observedReplyDelivery = true; - deliveryTracker.markDelivered({ kind: info.kind, payload, threadTs: finalThreadTs }); - return; - } catch (err) { - logVerbose( - `slack: preview final edit failed; falling back to standard send (${formatErrorMessage(err)})`, - ); - } - } else if (previewStreamingEnabled && streamMode === "status_final" && hasStreamedMessage) { + if (previewStreamingEnabled && streamMode === "status_final" && hasStreamedMessage) { try { const statusChannelId = draftStream?.channelId(); const statusMessageId = draftStream?.messageId(); @@ -633,12 +598,75 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag } catch (err) { logVerbose(`slack: status_final completion update failed (${formatErrorMessage(err)})`); } - } else if (reply.hasMedia) { - await draftStream?.clear(); hasStreamedMessage = false; } - await deliverNormally({ payload, kind: info.kind }); + const result = await deliverFinalizableDraftPreview({ + kind: info.kind, + payload, + draft: draftStream + ? { + flush: draftStream.flush, + clear: draftStream.clear, + discardPending: draftStream.discardPending, + seal: draftStream.seal, + id: () => { + const channelId = draftStream.channelId(); + const messageId = draftStream.messageId(); + return channelId && messageId ? { channelId, messageId } : undefined; + }, + } + : undefined, + buildFinalEdit: () => { + if ( + !previewStreamingEnabled || + streamMode === "status_final" || + reply.hasMedia || + payload.isError || + (trimmedFinalText.length === 0 && !slackBlocks?.length) + ) { + return undefined; + } + return { + text: normalizeSlackOutboundText(trimmedFinalText), + blocks: slackBlocks, + threadTs: usedReplyThreadTs ?? statusThreadTs, + }; + }, + editFinal: async (preview, edit) => { + if (deliveryTracker.hasDelivered({ kind: info.kind, payload, threadTs: edit.threadTs })) { + return; + } + await finalizeSlackPreviewEdit({ + client: ctx.app.client, + token: ctx.botToken, + accountId: account.accountId, + channelId: preview.channelId, + messageId: preview.messageId, + text: edit.text, + ...(edit.blocks?.length ? { blocks: edit.blocks } : {}), + threadTs: edit.threadTs, + }); + }, + deliverNormally: async () => { + await deliverNormally({ payload, kind: info.kind }); + }, + onPreviewFinalized: (_preview) => { + const finalThreadTs = usedReplyThreadTs ?? statusThreadTs; + observedReplyDelivery = true; + replyPlan.markSent(); + deliveryTracker.markDelivered({ kind: info.kind, payload, threadTs: finalThreadTs }); + }, + logPreviewEditFailure: (err) => { + logVerbose( + `slack: preview final edit failed; falling back to standard send (${formatErrorMessage(err)})`, + ); + }, + }); + + if (result === "preview-finalized") { + return; + } }, onError: (err, info) => { runtime.error?.(danger(`slack ${info.kind} reply failed: ${formatErrorMessage(err)}`)); @@ -653,13 +681,12 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag accountId: account.accountId, maxChars: Math.min(ctx.textLimit, SLACK_TEXT_LIMIT), resolveThreadTs: () => { - const ts = replyPlan.nextThreadTs(); + const ts = replyPlan.peekThreadTs(); if (ts) { usedReplyThreadTs ??= ts; } return ts; }, - onMessageSent: () => replyPlan.markSent(), log: logVerbose, warn: logVerbose, }) @@ -826,8 +853,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag } catch (err) { dispatchError = err; } finally { - await draftStream?.flush(); - draftStream?.stop(); + await draftStream?.discardPending(); markDispatchIdle(); } diff --git a/extensions/slack/src/monitor/replies.test.ts b/extensions/slack/src/monitor/replies.test.ts index a8e460b7662..7d3d877c427 100644 --- a/extensions/slack/src/monitor/replies.test.ts +++ b/extensions/slack/src/monitor/replies.test.ts @@ -6,6 +6,7 @@ vi.mock("../send.js", () => ({ })); let deliverReplies: typeof import("./replies.js").deliverReplies; +let createSlackReplyDeliveryPlan: typeof import("./replies.js").createSlackReplyDeliveryPlan; let resolveSlackThreadTs: typeof import("./replies.js").resolveSlackThreadTs; import { deliverSlackSlashReplies } from "./replies.js"; @@ -23,7 +24,8 @@ function baseParams(overrides?: Record) { describe("deliverReplies identity passthrough", () => { beforeAll(async () => { - ({ deliverReplies, resolveSlackThreadTs } = await import("./replies.js")); + ({ createSlackReplyDeliveryPlan, deliverReplies, resolveSlackThreadTs } = + await import("./replies.js")); }); beforeEach(() => { @@ -211,6 +213,29 @@ describe("resolveSlackThreadTs fallback classification", () => { }); }); +describe("createSlackReplyDeliveryPlan", () => { + it("lets draft previews inspect first thread targets without consuming them", () => { + const hasRepliedRef = { value: false }; + const plan = createSlackReplyDeliveryPlan({ + replyToMode: "first", + incomingThreadTs: undefined, + messageTs: "9999999999.999999", + hasRepliedRef, + isThreadReply: false, + }); + + expect(plan.peekThreadTs()).toBe("9999999999.999999"); + expect(plan.peekThreadTs()).toBe("9999999999.999999"); + expect(hasRepliedRef.value).toBe(false); + + plan.markSent(); + + expect(hasRepliedRef.value).toBe(true); + expect(plan.peekThreadTs()).toBeUndefined(); + expect(plan.nextThreadTs()).toBeUndefined(); + }); +}); + describe("deliverSlackSlashReplies chunking", () => { it("keeps a 4205-character reply in a single slash response by default", async () => { const respond = vi.fn(async () => undefined); diff --git a/extensions/slack/src/monitor/replies.ts b/extensions/slack/src/monitor/replies.ts index dbd2d7ead8f..8ed01a76cf8 100644 --- a/extensions/slack/src/monitor/replies.ts +++ b/extensions/slack/src/monitor/replies.ts @@ -127,6 +127,7 @@ export function resolveSlackThreadTs(params: { } type SlackReplyDeliveryPlan = { + peekThreadTs: () => string | undefined; nextThreadTs: () => string | undefined; markSent: () => void; }; @@ -168,6 +169,7 @@ export function createSlackReplyDeliveryPlan(params: { isThreadReply: params.isThreadReply, }); return { + peekThreadTs: () => replyReference.peek(), nextThreadTs: () => replyReference.use(), markSent: () => { replyReference.markSent(); diff --git a/src/auto-reply/reply/reply-reference.ts b/src/auto-reply/reply/reply-reference.ts index bd4d1c601f5..87e8669a3b1 100644 --- a/src/auto-reply/reply/reply-reference.ts +++ b/src/auto-reply/reply/reply-reference.ts @@ -2,6 +2,8 @@ import type { ReplyToMode } from "../../config/types.js"; import { normalizeOptionalString } from "../../shared/string-coerce.js"; export type ReplyReferencePlanner = { + /** Returns the effective reply/thread id for the next send without updating state. */ + peek(): string | undefined; /** Returns the effective reply/thread id for the next send and updates state. */ use(): string | undefined; /** Mark that a reply was sent (needed when no reference is used). */ @@ -30,7 +32,7 @@ export function createReplyReferencePlanner(options: { const existingId = normalizeOptionalString(options.existingId); const startId = normalizeOptionalString(options.startId); - const use = (): string | undefined => { + const resolve = (): string | undefined => { if (!allowReference) { return undefined; } @@ -42,12 +44,19 @@ export function createReplyReferencePlanner(options: { return undefined; } if (options.replyToMode === "all") { - hasReplied = true; return id; } if (isSingleUseReplyToMode(options.replyToMode) && hasReplied) { return undefined; } + return id; + }; + + const use = (): string | undefined => { + const id = resolve(); + if (!id) { + return undefined; + } hasReplied = true; return id; }; @@ -57,6 +66,7 @@ export function createReplyReferencePlanner(options: { }; return { + peek: resolve, use, markSent, hasReplied: () => hasReplied, diff --git a/src/auto-reply/reply/reply-utils.test.ts b/src/auto-reply/reply/reply-utils.test.ts index 76a877fdae4..09a008b74f0 100644 --- a/src/auto-reply/reply/reply-utils.test.ts +++ b/src/auto-reply/reply/reply-utils.test.ts @@ -895,15 +895,20 @@ describe("createReplyReferencePlanner", () => { replyToMode: "first", startId: "parent", }); + expect(firstPlanner.peek()).toBe("parent"); + expect(firstPlanner.hasReplied()).toBe(false); expect(firstPlanner.use()).toBe("parent"); expect(firstPlanner.hasReplied()).toBe(true); firstPlanner.markSent(); + expect(firstPlanner.peek()).toBeUndefined(); expect(firstPlanner.use()).toBeUndefined(); const allPlanner = createReplyReferencePlanner({ replyToMode: "all", startId: "parent", }); + expect(allPlanner.peek()).toBe("parent"); + expect(allPlanner.hasReplied()).toBe(false); expect(allPlanner.use()).toBe("parent"); expect(allPlanner.use()).toBe("parent"); @@ -919,10 +924,28 @@ describe("createReplyReferencePlanner", () => { replyToMode: "batched", startId: "parent", }); + expect(batchedPlanner.peek()).toBe("parent"); expect(batchedPlanner.use()).toBe("parent"); + expect(batchedPlanner.peek()).toBeUndefined(); expect(batchedPlanner.use()).toBeUndefined(); }); + it("lets transient previews inspect first references without consuming them", () => { + const planner = createReplyReferencePlanner({ + replyToMode: "first", + startId: "parent", + }); + + expect(planner.peek()).toBe("parent"); + expect(planner.peek()).toBe("parent"); + expect(planner.hasReplied()).toBe(false); + + planner.markSent(); + + expect(planner.peek()).toBeUndefined(); + expect(planner.use()).toBeUndefined(); + }); + it("honors allowReference=false", () => { const planner = createReplyReferencePlanner({ replyToMode: "all", diff --git a/src/channels/draft-preview-finalizer.test.ts b/src/channels/draft-preview-finalizer.test.ts new file mode 100644 index 00000000000..2104f3de0e9 --- /dev/null +++ b/src/channels/draft-preview-finalizer.test.ts @@ -0,0 +1,98 @@ +import { describe, expect, it, vi } from "vitest"; +import { deliverFinalizableDraftPreview } from "./draft-preview-finalizer.js"; + +function createDraft(id: string | undefined = "preview-1") { + return { + flush: vi.fn(async () => {}), + id: vi.fn(() => id), + seal: vi.fn(async () => {}), + discardPending: vi.fn(async () => {}), + clear: vi.fn(async () => {}), + }; +} + +describe("deliverFinalizableDraftPreview", () => { + it("does not flush non-finalizable finals before normal delivery", async () => { + const draft = createDraft("preview-1"); + const deliverNormally = vi.fn(async () => {}); + + await deliverFinalizableDraftPreview({ + kind: "final", + payload: { text: "image", mediaUrl: "https://example.com/a.png" }, + draft, + buildFinalEdit: () => undefined, + editFinal: vi.fn(async () => {}), + deliverNormally, + }); + + expect(draft.flush).not.toHaveBeenCalled(); + expect(draft.discardPending).toHaveBeenCalledTimes(1); + expect(deliverNormally).toHaveBeenCalledTimes(1); + expect(draft.clear).toHaveBeenCalledTimes(1); + }); + + it("flushes only eligible finals and edits the preview in place", async () => { + const draft = createDraft("preview-1"); + const editFinal = vi.fn(async () => {}); + const deliverNormally = vi.fn(async () => {}); + + const result = await deliverFinalizableDraftPreview({ + kind: "final", + payload: { text: "final" }, + draft, + buildFinalEdit: (payload) => payload.text, + editFinal, + deliverNormally, + }); + + expect(result).toBe("preview-finalized"); + expect(draft.flush).toHaveBeenCalledTimes(1); + expect(draft.seal).toHaveBeenCalledTimes(1); + expect(editFinal).toHaveBeenCalledWith("preview-1", "final"); + expect(deliverNormally).not.toHaveBeenCalled(); + expect(draft.clear).not.toHaveBeenCalled(); + }); + + it("falls back to normal delivery and clears only after success when edit fails", async () => { + const draft = createDraft("preview-1"); + const editFinal = vi.fn(async () => { + throw new Error("gone"); + }); + const deliverNormally = vi.fn(async () => {}); + + await deliverFinalizableDraftPreview({ + kind: "final", + payload: { text: "final" }, + draft, + buildFinalEdit: (payload) => payload.text, + editFinal, + deliverNormally, + logPreviewEditFailure: vi.fn(), + }); + + expect(draft.flush).toHaveBeenCalledTimes(1); + expect(draft.discardPending).toHaveBeenCalledTimes(1); + expect(deliverNormally).toHaveBeenCalledTimes(1); + expect(draft.clear).toHaveBeenCalledTimes(1); + }); + + it("keeps an existing preview if normal fallback delivery throws", async () => { + const draft = createDraft("preview-1"); + + await expect( + deliverFinalizableDraftPreview({ + kind: "final", + payload: { text: "image" }, + draft, + buildFinalEdit: () => undefined, + editFinal: vi.fn(async () => {}), + deliverNormally: vi.fn(async () => { + throw new Error("send failed"); + }), + }), + ).rejects.toThrow("send failed"); + + expect(draft.discardPending).toHaveBeenCalledTimes(1); + expect(draft.clear).not.toHaveBeenCalled(); + }); +}); diff --git a/src/channels/draft-preview-finalizer.ts b/src/channels/draft-preview-finalizer.ts new file mode 100644 index 00000000000..bfd2d65cf58 --- /dev/null +++ b/src/channels/draft-preview-finalizer.ts @@ -0,0 +1,70 @@ +export type DraftPreviewFinalizerDraft = { + flush: () => Promise; + id: () => TId | undefined; + seal?: () => Promise; + discardPending?: () => Promise; + clear: () => Promise; +}; + +export type DraftPreviewFinalizerResult = + | "normal-delivered" + | "normal-skipped" + | "preview-finalized"; + +export async function deliverFinalizableDraftPreview(params: { + kind: "tool" | "block" | "final"; + payload: TPayload; + draft?: DraftPreviewFinalizerDraft; + buildFinalEdit: (payload: TPayload) => TEdit | undefined; + editFinal: (id: TId, edit: TEdit) => Promise; + deliverNormally: (payload: TPayload) => Promise; + onPreviewFinalized?: (id: TId) => Promise | void; + onNormalDelivered?: () => Promise | void; + logPreviewEditFailure?: (error: unknown) => void; +}): Promise { + if (params.kind !== "final" || !params.draft) { + const delivered = await params.deliverNormally(params.payload); + if (delivered === false) { + return "normal-skipped"; + } + await params.onNormalDelivered?.(); + return "normal-delivered"; + } + + const edit = params.buildFinalEdit(params.payload); + if (edit !== undefined) { + await params.draft.flush(); + const previewId = params.draft.id(); + if (previewId !== undefined) { + await params.draft.seal?.(); + try { + await params.editFinal(previewId, edit); + await params.onPreviewFinalized?.(previewId); + return "preview-finalized"; + } catch (err) { + params.logPreviewEditFailure?.(err); + } + } + } + + if (params.draft.discardPending) { + await params.draft.discardPending(); + } else { + await params.draft.clear(); + } + + let delivered = false; + try { + const result = await params.deliverNormally(params.payload); + delivered = result !== false; + if (delivered) { + await params.onNormalDelivered?.(); + } + } finally { + if (delivered) { + await params.draft.clear(); + } + } + + return delivered ? "normal-delivered" : "normal-skipped"; +} diff --git a/src/channels/draft-stream-controls.test.ts b/src/channels/draft-stream-controls.test.ts index aafae33bd7c..4c627aff636 100644 --- a/src/channels/draft-stream-controls.test.ts +++ b/src/channels/draft-stream-controls.test.ts @@ -119,4 +119,35 @@ describe("draft-stream-controls", () => { expect(messageId).toBeUndefined(); expect(deleteMessage).toHaveBeenCalledWith("m-4"); }); + + it("lifecycle seal ignores late updates without clearing the preview id", async () => { + const state = { stopped: false, final: false }; + let messageId: string | undefined = "m-5"; + const sendOrEditStreamMessage = vi.fn(async () => true); + const deleteMessage = vi.fn(async () => {}); + + const lifecycle = createFinalizableDraftLifecycle({ + throttleMs: 250, + state, + sendOrEditStreamMessage, + readMessageId: () => messageId, + clearMessageId: () => { + messageId = undefined; + }, + isValidMessageId: (value): value is string => typeof value === "string", + deleteMessage, + warnPrefix: "cleanup failed", + }); + + lifecycle.update("stale"); + await lifecycle.seal(); + lifecycle.update("late"); + await lifecycle.loop.flush(); + + expect(state.final).toBe(true); + expect(messageId).toBe("m-5"); + expect(sendOrEditStreamMessage).toHaveBeenCalledTimes(1); + expect(sendOrEditStreamMessage).toHaveBeenCalledWith("stale"); + expect(deleteMessage).not.toHaveBeenCalled(); + }); }); diff --git a/src/channels/draft-stream-controls.ts b/src/channels/draft-stream-controls.ts index 2ae1efaa6ee..689b86fc077 100644 --- a/src/channels/draft-stream-controls.ts +++ b/src/channels/draft-stream-controls.ts @@ -61,10 +61,18 @@ export function createFinalizableDraftStreamControls(params: { await loop.waitForInFlight(); }; + const seal = async (): Promise => { + params.markFinal(); + loop.stop(); + await loop.waitForInFlight(); + }; + return { loop, update, stop, + seal, + discardPending: stopForClear, stopForClear, }; } diff --git a/src/plugin-sdk/channel-lifecycle.ts b/src/plugin-sdk/channel-lifecycle.ts index f078925e5aa..9c3ffc64d48 100644 --- a/src/plugin-sdk/channel-lifecycle.ts +++ b/src/plugin-sdk/channel-lifecycle.ts @@ -1,4 +1,5 @@ export * from "./channel-lifecycle.core.js"; +export * from "../channels/draft-preview-finalizer.js"; export * from "../channels/draft-stream-controls.js"; export * from "../channels/draft-stream-loop.js"; export { createRunStateMachine } from "../channels/run-state-machine.js";