diff --git a/CHANGELOG.md b/CHANGELOG.md index b3d65e6122c..9601ec56a73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai - Plugins/tasks: add a detached runtime registration contract so plugin executors can own detached task lifecycle and cancellation without reaching into core task internals. (#68915) Thanks @mbelinky. - Terminal/logging: optimize `sanitizeForLog()` by replacing the iterative control-character stripping loop with a single regex pass while preserving the existing ANSI-first sanitization behavior. (#67205) Thanks @bulutmuf. - QA/CI: make `openclaw qa suite` and `openclaw qa telegram` fail by default when scenarios fail, add `--allow-failures` for artifact-only runs, and tighten live-lane defaults for CI automation. (#69122) Thanks @joshavant. +- Mattermost: stream thinking, tool activity, and partial reply text into a single draft preview post that finalizes in place when safe. (#47838) thanks @ninjaa. ### Fixes diff --git a/extensions/mattermost/src/mattermost/client.ts b/extensions/mattermost/src/mattermost/client.ts index 6f4a14f1e46..3e1c3c93139 100644 --- a/extensions/mattermost/src/mattermost/client.ts +++ b/extensions/mattermost/src/mattermost/client.ts @@ -545,6 +545,15 @@ export async function updateMattermostPost( }); } +export async function deleteMattermostPost( + client: MattermostClient, + postId: string, +): Promise { + await client.request(`/posts/${postId}`, { + method: "DELETE", + }); +} + export async function uploadMattermostFile( client: MattermostClient, params: { diff --git a/extensions/mattermost/src/mattermost/draft-stream.test.ts b/extensions/mattermost/src/mattermost/draft-stream.test.ts new file mode 100644 index 00000000000..37d82c2425a --- /dev/null +++ b/extensions/mattermost/src/mattermost/draft-stream.test.ts @@ -0,0 +1,214 @@ +import { describe, expect, it, vi } from "vitest"; +import type { MattermostClient } from "./client.js"; +import { buildMattermostToolStatusText, createMattermostDraftStream } from "./draft-stream.js"; + +type RequestRecord = { + path: string; + init?: RequestInit; +}; + +function createMockClient(): { + client: MattermostClient; + calls: RequestRecord[]; + requestMock: ReturnType; +} { + const calls: RequestRecord[] = []; + let nextId = 1; + const requestImpl: MattermostClient["request"] = async ( + path: string, + init?: RequestInit, + ): Promise => { + calls.push({ path, init }); + if (path === "/posts") { + return { id: `post-${nextId++}` } as T; + } + if (path.startsWith("/posts/")) { + return { id: "patched" } as T; + } + return {} as T; + }; + const requestMock = vi.fn(requestImpl); + const client: MattermostClient = { + baseUrl: "https://chat.example.com", + apiBaseUrl: "https://chat.example.com/api/v4", + token: "token", + request: requestMock as MattermostClient["request"], + fetchImpl: vi.fn() as MattermostClient["fetchImpl"], + }; + return { client, calls, requestMock }; +} + +describe("createMattermostDraftStream", () => { + it("creates a preview post and updates it on later changes", async () => { + const { client, calls } = createMockClient(); + const stream = createMattermostDraftStream({ + client, + channelId: "channel-1", + rootId: "root-1", + throttleMs: 0, + }); + + stream.update("Running `read`…"); + await stream.flush(); + stream.update("Running `read`…"); + await stream.flush(); + + expect(calls).toHaveLength(1); + expect(calls[0]?.path).toBe("/posts"); + + const createBody = JSON.parse((calls[0]?.init?.body as string | undefined) ?? "{}"); + expect(createBody).toMatchObject({ + channel_id: "channel-1", + root_id: "root-1", + message: "Running `read`…", + }); + expect(stream.postId()).toBe("post-1"); + }); + + it("does not resend identical updates", async () => { + const { client, calls } = createMockClient(); + const stream = createMattermostDraftStream({ + client, + channelId: "channel-1", + throttleMs: 0, + }); + + stream.update("Working..."); + await stream.flush(); + stream.update("Working..."); + await stream.flush(); + + expect(calls).toHaveLength(1); + }); + + it("clears the preview post when no final reply is delivered", async () => { + const { client, calls } = createMockClient(); + const stream = createMattermostDraftStream({ + client, + channelId: "channel-1", + rootId: "root-1", + throttleMs: 0, + }); + + stream.update("Working..."); + await stream.flush(); + await stream.clear(); + + expect(calls).toHaveLength(2); + expect(calls[1]?.path).toBe("/posts/post-1"); + expect(calls[1]?.init?.method).toBe("DELETE"); + expect(stream.postId()).toBeUndefined(); + }); + + it("stop flushes the last pending update and ignores later ones", async () => { + const { client, calls } = createMockClient(); + const stream = createMattermostDraftStream({ + client, + channelId: "channel-1", + rootId: "root-1", + throttleMs: 1000, + }); + + stream.update("Working..."); + await stream.flush(); + stream.update("Stale partial"); + await stream.stop(); + stream.update("Late partial"); + await stream.flush(); + + expect(calls).toHaveLength(2); + expect(calls[0]?.path).toBe("/posts"); + expect(calls[1]?.path).toBe("/posts/post-1"); + expect(JSON.parse((calls[1]?.init?.body as string | undefined) ?? "{}")).toMatchObject({ + message: "Stale partial", + }); + }); + + it("warns and stops when preview creation fails", async () => { + const warn = vi.fn(); + const requestImpl: MattermostClient["request"] = async () => { + throw new Error("boom"); + }; + const requestMock = vi.fn(requestImpl); + const client: MattermostClient = { + baseUrl: "https://chat.example.com", + apiBaseUrl: "https://chat.example.com/api/v4", + token: "token", + request: requestMock as MattermostClient["request"], + fetchImpl: vi.fn() as MattermostClient["fetchImpl"], + }; + const stream = createMattermostDraftStream({ + client, + channelId: "channel-1", + throttleMs: 0, + warn, + }); + + stream.update("Working..."); + await stream.flush(); + stream.update("Still working..."); + await stream.flush(); + + expect(warn).toHaveBeenCalled(); + expect(requestMock).toHaveBeenCalledTimes(1); + expect(stream.postId()).toBeUndefined(); + }); + + it("does not resend after an update failure followed by stop", async () => { + const warn = vi.fn(); + const calls: RequestRecord[] = []; + let failNextPatch = true; + const requestImpl: MattermostClient["request"] = async ( + path: string, + init?: RequestInit, + ): Promise => { + calls.push({ path, init }); + if (path === "/posts") { + return { id: "post-1" } as T; + } + if (path === "/posts/post-1") { + if (failNextPatch) { + failNextPatch = false; + throw new Error("patch failed"); + } + return { id: "patched" } as T; + } + return {} as T; + }; + const requestMock = vi.fn(requestImpl); + const client: MattermostClient = { + baseUrl: "https://chat.example.com", + apiBaseUrl: "https://chat.example.com/api/v4", + token: "token", + request: requestMock as MattermostClient["request"], + fetchImpl: vi.fn() as MattermostClient["fetchImpl"], + }; + const stream = createMattermostDraftStream({ + client, + channelId: "channel-1", + throttleMs: 1000, + warn, + }); + + stream.update("Working..."); + await stream.flush(); + stream.update("Will fail"); + await stream.flush(); + await stream.stop(); + + expect(warn).toHaveBeenCalledWith("mattermost stream preview failed: patch failed"); + expect(calls).toHaveLength(2); + expect(calls[0]?.path).toBe("/posts"); + expect(calls[1]?.path).toBe("/posts/post-1"); + }); +}); + +describe("buildMattermostToolStatusText", () => { + it("renders a status with the tool name", () => { + expect(buildMattermostToolStatusText({ name: "read" })).toBe("Running `read`…"); + }); + + it("falls back to a generic running tool status", () => { + expect(buildMattermostToolStatusText({ name: "exec" })).toBe("Running `exec`…"); + }); +}); diff --git a/extensions/mattermost/src/mattermost/draft-stream.ts b/extensions/mattermost/src/mattermost/draft-stream.ts new file mode 100644 index 00000000000..3b928448022 --- /dev/null +++ b/extensions/mattermost/src/mattermost/draft-stream.ts @@ -0,0 +1,131 @@ +import { createFinalizableDraftLifecycle } from "openclaw/plugin-sdk/channel-lifecycle"; +import { + createMattermostPost, + deleteMattermostPost, + updateMattermostPost, + type MattermostClient, +} from "./client.js"; + +const MATTERMOST_STREAM_MAX_CHARS = 4000; +const DEFAULT_THROTTLE_MS = 1000; + +export type MattermostDraftStream = { + update: (text: string) => void; + flush: () => Promise; + postId: () => string | undefined; + clear: () => Promise; + stop: () => Promise; + forceNewMessage: () => void; +}; + +export function normalizeMattermostDraftText(text: string, maxChars: number): string { + const trimmed = text.trim(); + if (!trimmed) { + return ""; + } + if (trimmed.length <= maxChars) { + return trimmed; + } + return `${trimmed.slice(0, Math.max(0, maxChars - 3)).trimEnd()}...`; +} + +export function buildMattermostToolStatusText(params: { name?: string; phase?: string }): string { + const tool = params.name?.trim() ? ` \`${params.name.trim()}\`` : " tool"; + return `Running${tool}…`; +} + +export function createMattermostDraftStream(params: { + client: MattermostClient; + channelId: string; + rootId?: string; + maxChars?: number; + throttleMs?: number; + renderText?: (text: string) => string; + log?: (message: string) => void; + warn?: (message: string) => void; +}): MattermostDraftStream { + const maxChars = Math.min( + params.maxChars ?? MATTERMOST_STREAM_MAX_CHARS, + MATTERMOST_STREAM_MAX_CHARS, + ); + const throttleMs = Math.max(250, params.throttleMs ?? DEFAULT_THROTTLE_MS); + const streamState = { stopped: false, final: false }; + let streamPostId: string | undefined; + let lastSentText = ""; + + const sendOrEditStreamMessage = async (text: string): Promise => { + if (streamState.stopped && !streamState.final) { + return false; + } + const rendered = params.renderText?.(text) ?? text; + const normalized = normalizeMattermostDraftText(rendered, maxChars); + if (!normalized) { + return false; + } + if (normalized === lastSentText) { + return true; + } + try { + if (streamPostId) { + await updateMattermostPost(params.client, streamPostId, { + message: normalized, + }); + } else { + const sent = await createMattermostPost(params.client, { + channelId: params.channelId, + message: normalized, + rootId: params.rootId, + }); + const postId = sent.id?.trim(); + if (!postId) { + streamState.stopped = true; + params.warn?.("mattermost stream preview stopped (missing post id from create)"); + return false; + } + streamPostId = postId; + } + lastSentText = normalized; + return true; + } catch (err) { + streamState.stopped = true; + params.warn?.( + `mattermost stream preview failed: ${err instanceof Error ? err.message : String(err)}`, + ); + return false; + } + }; + + const { loop, update, stop, clear } = createFinalizableDraftLifecycle({ + throttleMs, + state: streamState, + sendOrEditStreamMessage, + readMessageId: () => streamPostId, + clearMessageId: () => { + streamPostId = undefined; + }, + isValidMessageId: (value): value is string => typeof value === "string" && value.length > 0, + deleteMessage: async (postId) => { + await deleteMattermostPost(params.client, postId); + }, + warn: params.warn, + warnPrefix: "mattermost stream preview cleanup failed", + }); + + const forceNewMessage = () => { + streamPostId = undefined; + lastSentText = ""; + loop.resetPending(); + loop.resetThrottleWindow(); + }; + + params.log?.(`mattermost stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`); + + return { + update, + flush: loop.flush, + postId: () => streamPostId, + clear, + stop, + forceNewMessage, + }; +} diff --git a/extensions/mattermost/src/mattermost/monitor.test.ts b/extensions/mattermost/src/mattermost/monitor.test.ts index 49f71e46d9a..4369437c15c 100644 --- a/extensions/mattermost/src/mattermost/monitor.test.ts +++ b/extensions/mattermost/src/mattermost/monitor.test.ts @@ -1,9 +1,13 @@ import { createClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe"; -import { describe, expect, it, vi } from "vitest"; +import { beforeEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../../runtime-api.js"; import { resolveMattermostAccount } from "./accounts.js"; +import * as clientModule from "./client.js"; +import type { MattermostClient } from "./client.js"; import { buildMattermostModelPickerSelectMessageSid, + canFinalizeMattermostPreviewInPlace, + deliverMattermostReplyWithDraftPreview, evaluateMattermostMentionGate, MattermostRetryableInboundError, processMattermostReplayGuardedPost, @@ -11,6 +15,8 @@ import { resolveMattermostEffectiveReplyToId, resolveMattermostReplyRootId, resolveMattermostThreadSessionContext, + shouldFinalizeMattermostPreviewAfterDispatch, + shouldClearMattermostDraftPreview, type MattermostMentionGateInput, type MattermostRequireMentionResolverInput, } from "./monitor.js"; @@ -41,6 +47,34 @@ function resolveRequireMentionForTest(params: MattermostRequireMentionResolverIn return true; } +const updateMattermostPostSpy = vi.spyOn(clientModule, "updateMattermostPost"); + +function createMattermostClientMock(): MattermostClient { + return { + baseUrl: "https://chat.example.com", + apiBaseUrl: "https://chat.example.com/api/v4", + token: "token", + request: vi.fn(async () => ({})) as MattermostClient["request"], + fetchImpl: vi.fn( + async () => new Response(null, { status: 200 }), + ) as MattermostClient["fetchImpl"], + }; +} + +function createDraftStreamMock(postId: string | undefined = "preview-post-1") { + return { + flush: vi.fn(async () => {}), + postId: vi.fn(() => postId), + clear: vi.fn(async () => {}), + stop: vi.fn(async () => {}), + }; +} + +beforeEach(() => { + vi.clearAllMocks(); + updateMattermostPostSpy.mockResolvedValue({ id: "patched" } as never); +}); + function evaluateMentionGateForMessage(params: { cfg: OpenClawConfig; threadRootId?: string }) { const account = resolveMattermostAccount({ cfg: params.cfg, accountId: "default" }); const resolver = vi.fn(resolveRequireMentionForTest); @@ -167,6 +201,186 @@ describe("resolveMattermostReplyRootId", () => { }); }); +describe("canFinalizeMattermostPreviewInPlace", () => { + it("allows in-place finalization when the final reply target matches the preview thread", () => { + expect( + canFinalizeMattermostPreviewInPlace({ + previewRootId: "thread-root-456", + threadRootId: "thread-root-456", + replyToId: "child-post-789", + }), + ).toBe(true); + }); + + it("prevents in-place finalization when a top-level preview would become a threaded reply", () => { + expect( + canFinalizeMattermostPreviewInPlace({ + replyToId: "child-post-789", + }), + ).toBe(false); + }); +}); + +describe("shouldClearMattermostDraftPreview", () => { + it("deletes the preview after successful normal final delivery", () => { + expect( + shouldClearMattermostDraftPreview({ + finalizedViaPreviewPost: false, + finalReplyDelivered: true, + }), + ).toBe(true); + }); + + it("keeps the preview when final delivery failed", () => { + expect( + shouldClearMattermostDraftPreview({ + finalizedViaPreviewPost: false, + finalReplyDelivered: false, + }), + ).toBe(false); + }); + + it("keeps the preview when it already became the final reply", () => { + expect( + shouldClearMattermostDraftPreview({ + finalizedViaPreviewPost: true, + finalReplyDelivered: true, + }), + ).toBe(false); + }); +}); + +describe("deliverMattermostReplyWithDraftPreview", () => { + it("deletes the preview after a successful normal final send", async () => { + const draftStream = createDraftStreamMock(); + const deliverFinal = vi.fn(async () => {}); + + await deliverMattermostReplyWithDraftPreview({ + payload: { text: "All good", replyToId: "reply-1" } as never, + info: { kind: "final" }, + client: createMattermostClientMock(), + draftStream, + resolvePreviewFinalText: (text) => text?.trim(), + previewState: { finalizedViaPreviewPost: false }, + logVerboseMessage: vi.fn(), + deliverFinal, + }); + + expect(deliverFinal).toHaveBeenCalledTimes(1); + expect(draftStream.clear).toHaveBeenCalledTimes(1); + expect(updateMattermostPostSpy).not.toHaveBeenCalled(); + }); + + it("deletes the preview after a successful non-finalizable media final", async () => { + const draftStream = createDraftStreamMock(); + const deliverFinal = vi.fn(async () => {}); + + await deliverMattermostReplyWithDraftPreview({ + payload: { + text: "Photo", + replyToId: "reply-1", + mediaUrl: "https://example.com/a.png", + } as never, + info: { kind: "final" }, + client: createMattermostClientMock(), + draftStream, + effectiveReplyToId: "thread-root-1", + resolvePreviewFinalText: (text) => text?.trim(), + previewState: { finalizedViaPreviewPost: false }, + logVerboseMessage: vi.fn(), + deliverFinal, + }); + + expect(deliverFinal).toHaveBeenCalledTimes(1); + expect(draftStream.clear).toHaveBeenCalledTimes(1); + }); + + it("finalizes the preview in place when the final targets the same thread", async () => { + const draftStream = createDraftStreamMock(); + const deliverFinal = vi.fn(async () => {}); + + await deliverMattermostReplyWithDraftPreview({ + payload: { text: "Final answer", replyToId: "child-post-789" } as never, + info: { kind: "final" }, + client: createMattermostClientMock(), + draftStream, + effectiveReplyToId: "thread-root-456", + resolvePreviewFinalText: (text) => text?.trim(), + previewState: { finalizedViaPreviewPost: false }, + logVerboseMessage: vi.fn(), + deliverFinal, + }); + + expect(updateMattermostPostSpy).toHaveBeenCalledWith( + expect.anything(), + "preview-post-1", + expect.objectContaining({ message: "Final answer" }), + ); + expect(draftStream.stop).toHaveBeenCalledTimes(1); + expect(draftStream.stop.mock.invocationCallOrder[0]).toBeLessThan( + updateMattermostPostSpy.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY, + ); + expect(deliverFinal).not.toHaveBeenCalled(); + expect(draftStream.clear).not.toHaveBeenCalled(); + }); + + it("keeps the existing preview unchanged when final delivery fails", async () => { + const draftStream = createDraftStreamMock(); + const deliverFinal = vi.fn(async () => { + throw new Error("send failed"); + }); + + await expect( + deliverMattermostReplyWithDraftPreview({ + payload: { text: "Broken", replyToId: "reply-1" } as never, + info: { kind: "final" }, + client: createMattermostClientMock(), + draftStream, + resolvePreviewFinalText: (text) => text?.trim(), + previewState: { finalizedViaPreviewPost: false }, + logVerboseMessage: vi.fn(), + deliverFinal, + }), + ).rejects.toThrow("send failed"); + + expect(draftStream.clear).not.toHaveBeenCalled(); + expect(updateMattermostPostSpy).not.toHaveBeenCalledWith( + expect.anything(), + "preview-post-1", + expect.objectContaining({ message: "↓ See below." }), + ); + }); +}); + +describe("shouldFinalizeMattermostPreviewAfterDispatch", () => { + it("reuses the preview only for a single eligible final payload", () => { + expect( + shouldFinalizeMattermostPreviewAfterDispatch({ + finalCount: 1, + canFinalizeInPlace: true, + }), + ).toBe(true); + }); + + it("falls back to normal sends for multi-payload finals", () => { + expect( + shouldFinalizeMattermostPreviewAfterDispatch({ + finalCount: 2, + canFinalizeInPlace: true, + }), + ).toBe(false); + }); + + it("falls back to normal sends when the final cannot be edited into the preview", () => { + expect( + shouldFinalizeMattermostPreviewAfterDispatch({ + finalCount: 1, + canFinalizeInPlace: false, + }), + ).toBe(false); + }); +}); + describe("resolveMattermostEffectiveReplyToId", () => { it("keeps an existing thread root", () => { expect( diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index 811db94ded9..8696ec337ae 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -10,9 +10,12 @@ import { createMattermostClient, fetchMattermostMe, normalizeMattermostBaseUrl, + updateMattermostPost, + type MattermostClient, type MattermostPost, type MattermostUser, } from "./client.js"; +import { buildMattermostToolStatusText, createMattermostDraftStream } from "./draft-stream.js"; import { computeInteractionCallbackUrl, createMattermostInteractionHandler, @@ -236,6 +239,120 @@ export function resolveMattermostReplyRootId(params: { return normalizeOptionalString(params.replyToId); } +export function canFinalizeMattermostPreviewInPlace(params: { + previewRootId?: string; + threadRootId?: string; + replyToId?: string; +}): boolean { + return ( + resolveMattermostReplyRootId({ + threadRootId: params.threadRootId, + replyToId: params.replyToId, + }) === params.previewRootId?.trim() + ); +} + +export function shouldClearMattermostDraftPreview(params: { + finalizedViaPreviewPost: boolean; + finalReplyDelivered: boolean; +}): boolean { + return params.finalReplyDelivered && !params.finalizedViaPreviewPost; +} + +export function shouldFinalizeMattermostPreviewAfterDispatch(params: { + finalCount: number; + canFinalizeInPlace: boolean; +}): boolean { + return params.finalCount === 1 && params.canFinalizeInPlace; +} + +type MattermostDraftPreviewState = { + finalizedViaPreviewPost: boolean; +}; + +type MattermostDraftPreviewDeliverParams = { + payload: ReplyPayload; + info: { kind: "tool" | "block" | "final" }; + client: MattermostClient; + draftStream: Pick< + ReturnType, + "flush" | "postId" | "clear" | "stop" + >; + effectiveReplyToId?: string; + resolvePreviewFinalText: (text?: string) => string | undefined; + previewState: MattermostDraftPreviewState; + logVerboseMessage: (message: string) => void; + deliverFinal: () => Promise; +}; + +export async function deliverMattermostReplyWithDraftPreview( + params: MattermostDraftPreviewDeliverParams, +): Promise { + if (params.payload.isReasoning) { + return; + } + + const isFinal = params.info.kind === "final"; + let previewPostId: string | undefined; + if (isFinal) { + await params.draftStream.flush(); + const hasMedia = + Boolean(params.payload.mediaUrl) || (params.payload.mediaUrls?.length ?? 0) > 0; + const previewFinalText = params.resolvePreviewFinalText(params.payload.text); + previewPostId = params.draftStream.postId(); + + if ( + typeof previewPostId === "string" && + !hasMedia && + typeof previewFinalText === "string" && + !params.payload.isError && + canFinalizeMattermostPreviewInPlace({ + previewRootId: params.effectiveReplyToId, + threadRootId: params.effectiveReplyToId, + replyToId: params.payload.replyToId, + }) + ) { + try { + // Seal the preview before the final edit so late draft events cannot + // patch over the finalized visible message. + await params.draftStream.stop(); + await updateMattermostPost(params.client, previewPostId, { + message: previewFinalText, + }); + params.previewState.finalizedViaPreviewPost = true; + return; + } catch (err) { + params.logVerboseMessage( + `mattermost preview final edit failed; falling back to normal send (${String(err)})`, + ); + } + } + } + + let finalReplyDelivered = false; + try { + await params.deliverFinal(); + finalReplyDelivered = true; + } finally { + if ( + isFinal && + typeof previewPostId === "string" && + shouldClearMattermostDraftPreview({ + finalizedViaPreviewPost: params.previewState.finalizedViaPreviewPost, + finalReplyDelivered, + }) + ) { + try { + await params.draftStream.clear(); + } catch (err) { + params.logVerboseMessage( + `mattermost draft preview clear failed after successful final delivery (${String(err)})`, + ); + } + } + } +} + export function resolveMattermostEffectiveReplyToId(params: { kind: ChatType; postId?: string | null; @@ -1516,52 +1633,156 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} }, }, }); - const { dispatcher, replyOptions, markDispatchIdle } = + const draftStream = createMattermostDraftStream({ + client, + channelId, + rootId: effectiveReplyToId, + throttleMs: 1200, + log: logVerboseMessage, + warn: logVerboseMessage, + }); + let lastPartialText = ""; + const previewState: MattermostDraftPreviewState = { + finalizedViaPreviewPost: false, + }; + + const resolvePreviewFinalText = (text?: string) => { + if (typeof text !== "string") { + return undefined; + } + const formatted = core.channel.text.convertMarkdownTables(text, tableMode); + const chunkMode = core.channel.text.resolveChunkMode( + cfg, + "mattermost", + account.accountId, + ); + const chunks = core.channel.text.chunkMarkdownTextWithMode( + formatted, + textLimit, + chunkMode, + ); + if (!chunks.length && formatted) { + chunks.push(formatted); + } + if (chunks.length != 1) { + return undefined; + } + const trimmed = chunks[0]?.trim(); + if (!trimmed) { + return undefined; + } + if ( + lastPartialText && + lastPartialText.startsWith(trimmed) && + trimmed.length < lastPartialText.length + ) { + return undefined; + } + return trimmed; + }; + + const updateDraftFromPartial = (text?: string) => { + const cleaned = text?.trim(); + if (!cleaned) { + return; + } + if (cleaned === lastPartialText) { + return; + } + if ( + lastPartialText && + lastPartialText.startsWith(cleaned) && + cleaned.length < lastPartialText.length + ) { + return; + } + lastPartialText = cleaned; + draftStream.update(cleaned); + }; + + const { dispatcher, replyOptions, markDispatchIdle, markRunComplete } = core.channel.reply.createReplyDispatcherWithTyping({ ...replyPipeline, humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId), typingCallbacks, - deliver: async (payload: ReplyPayload) => { - await deliverMattermostReplyPayload({ - core, - cfg, + deliver: async (payload: ReplyPayload, info) => { + await deliverMattermostReplyWithDraftPreview({ payload, - to, - accountId: account.accountId, - agentId: route.agentId, - replyToId: resolveMattermostReplyRootId({ - threadRootId: effectiveReplyToId, - replyToId: payload.replyToId, - }), - textLimit, - tableMode, - sendMessage: sendMessageMattermost, + info, + client, + draftStream, + effectiveReplyToId, + resolvePreviewFinalText, + previewState, + logVerboseMessage, + deliverFinal: async () => { + await deliverMattermostReplyPayload({ + core, + cfg, + payload, + to, + accountId: account.accountId, + agentId: route.agentId, + replyToId: resolveMattermostReplyRootId({ + threadRootId: effectiveReplyToId, + replyToId: payload.replyToId, + }), + textLimit, + tableMode, + sendMessage: sendMessageMattermost, + }); + runtime.log?.(`delivered reply to ${to}`); + }, }); - runtime.log?.(`delivered reply to ${to}`); }, onError: (err, info) => { runtime.error?.(`mattermost ${info.kind} reply failed: ${String(err)}`); }, }); - await core.channel.reply.withReplyDispatcher({ - dispatcher, - onSettled: () => { - markDispatchIdle(); - }, - run: () => - core.channel.reply.dispatchReplyFromConfig({ - ctx: ctxPayload, - cfg, - dispatcher, - replyOptions: { - ...replyOptions, - disableBlockStreaming: - typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined, - onModelSelected, - }, - }), - }); + try { + await core.channel.reply.withReplyDispatcher({ + dispatcher, + onSettled: () => { + markDispatchIdle(); + }, + run: () => + core.channel.reply.dispatchReplyFromConfig({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions: { + ...replyOptions, + disableBlockStreaming: true, + onModelSelected, + onPartialReply: (payload) => { + updateDraftFromPartial(payload.text); + }, + onAssistantMessageStart: () => { + lastPartialText = ""; + }, + onReasoningEnd: () => { + lastPartialText = ""; + }, + onReasoningStream: async () => { + if (!lastPartialText) { + draftStream.update("Thinking…"); + } + }, + onToolStart: async (payload) => { + draftStream.update(buildMattermostToolStatusText(payload)); + }, + }, + }), + }); + } finally { + try { + await draftStream.stop(); + } catch (err) { + logVerboseMessage(`mattermost draft preview cleanup failed: ${String(err)}`); + } + markRunComplete(); + } if (historyKey) { clearHistoryEntriesIfEnabled({ historyMap: channelHistories,