diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b9038a26d6..229548961e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ Docs: https://docs.openclaw.ai ### Changes - Telegram/forum topics: surface human topic names in agent context, prompt metadata, and plugin hook metadata by learning names from Telegram forum service messages. (#65973) Thanks @ptahdunbar. +- Mattermost: stream thinking, tool activity, and partial reply text into a single draft preview post that finalizes in place when safe. (#47838) thanks @ninjaa. ### Fixes diff --git a/extensions/mattermost/src/mattermost/draft-stream.test.ts b/extensions/mattermost/src/mattermost/draft-stream.test.ts index d219881062f..0b9c6de368f 100644 --- a/extensions/mattermost/src/mattermost/draft-stream.test.ts +++ b/extensions/mattermost/src/mattermost/draft-stream.test.ts @@ -7,25 +7,31 @@ type RequestRecord = { init?: RequestInit; }; -function createMockClient(): { client: MattermostClient; calls: RequestRecord[] } { +function createMockClient(): { + client: MattermostClient; + calls: RequestRecord[]; + request: ReturnType; +} { const calls: RequestRecord[] = []; let nextId = 1; + const request = vi.fn(async (path: string, init?: RequestInit): Promise => { + calls.push({ path, init }); + if (path === "/posts") { + return { id: `post-${nextId++}` } as T; + } + if (path.startsWith("/posts/")) { + return { id: "patched" } as T; + } + return {} as T; + }); const client: MattermostClient = { baseUrl: "https://chat.example.com", apiBaseUrl: "https://chat.example.com/api/v4", token: "token", - request: vi.fn(async (path: string, init?: RequestInit) => { - calls.push({ path, init }); - if (path === "/posts") { - return { id: `post-${nextId++}` }; - } - if (path.startsWith("/posts/")) { - return { id: "patched" }; - } - return {}; - }), + request: request as MattermostClient["request"], + fetchImpl: vi.fn(async () => new Response(null, { status: 204 })), }; - return { client, calls }; + return { client, calls, request }; } describe("createMattermostDraftStream", () => { @@ -73,13 +79,15 @@ describe("createMattermostDraftStream", () => { it("warns and stops when preview creation fails", async () => { const warn = vi.fn(); + const request = vi.fn(async () => { + throw new Error("boom"); + }); const client: MattermostClient = { baseUrl: "https://chat.example.com", apiBaseUrl: "https://chat.example.com/api/v4", token: "token", - request: vi.fn(async () => { - throw new Error("boom"); - }), + request: request as MattermostClient["request"], + fetchImpl: vi.fn(async () => new Response(null, { status: 204 })), }; const stream = createMattermostDraftStream({ client, @@ -94,7 +102,7 @@ describe("createMattermostDraftStream", () => { await stream.flush(); expect(warn).toHaveBeenCalled(); - expect(client.request).toHaveBeenCalledTimes(1); + expect(request).toHaveBeenCalledTimes(1); expect(stream.postId()).toBeUndefined(); }); }); diff --git a/extensions/mattermost/src/mattermost/draft-stream.ts b/extensions/mattermost/src/mattermost/draft-stream.ts index d25394345e3..bcfe27e9692 100644 --- a/extensions/mattermost/src/mattermost/draft-stream.ts +++ b/extensions/mattermost/src/mattermost/draft-stream.ts @@ -1,4 +1,4 @@ -import { createFinalizableDraftStreamControlsForState } from "../../../../src/channels/draft-stream-controls.js"; +import { createFinalizableDraftStreamControlsForState } from "openclaw/plugin-sdk/channel-lifecycle"; import { createMattermostPost, updateMattermostPost, type MattermostClient } from "./client.js"; const MATTERMOST_STREAM_MAX_CHARS = 4000; diff --git a/extensions/mattermost/src/mattermost/monitor.test.ts b/extensions/mattermost/src/mattermost/monitor.test.ts index 49f71e46d9a..5aba5ef2fa6 100644 --- a/extensions/mattermost/src/mattermost/monitor.test.ts +++ b/extensions/mattermost/src/mattermost/monitor.test.ts @@ -4,6 +4,7 @@ import type { OpenClawConfig } from "../../runtime-api.js"; import { resolveMattermostAccount } from "./accounts.js"; import { buildMattermostModelPickerSelectMessageSid, + didDeliverAllMattermostDeferredFinalReplies, evaluateMattermostMentionGate, MattermostRetryableInboundError, processMattermostReplayGuardedPost, @@ -436,3 +437,23 @@ describe("resolveMattermostReactionChannelId", () => { expect(resolveMattermostReactionChannelId({})).toBeUndefined(); }); }); + +describe("didDeliverAllMattermostDeferredFinalReplies", () => { + it("returns true when all deferred finals were delivered", () => { + expect( + didDeliverAllMattermostDeferredFinalReplies({ + deliveredCount: 2, + deferredCount: 2, + }), + ).toBe(true); + }); + + it("returns false when a later deferred final failed", () => { + expect( + didDeliverAllMattermostDeferredFinalReplies({ + deliveredCount: 1, + deferredCount: 2, + }), + ).toBe(false); + }); +}); diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index 47c7225e3b8..a657b80f69f 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -10,6 +10,7 @@ import { createMattermostClient, fetchMattermostMe, normalizeMattermostBaseUrl, + updateMattermostPost, type MattermostPost, type MattermostUser, } from "./client.js"; @@ -237,6 +238,13 @@ export function resolveMattermostReplyRootId(params: { return normalizeOptionalString(params.replyToId); } +export function didDeliverAllMattermostDeferredFinalReplies(params: { + deliveredCount: number; + deferredCount: number; +}): boolean { + return params.deferredCount > 0 && params.deliveredCount === params.deferredCount; +} + export function resolveMattermostEffectiveReplyToId(params: { kind: ChatType; postId?: string | null; @@ -2006,6 +2014,15 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} }); let lastPartialText = ""; let finalizedViaPreviewPost = false; + let previewCompletionNotePosted = false; + type DeferredMattermostFinal = { + payload: ReplyPayload; + replyRootId?: string; + previewPostId?: string; + previewFinalText?: string; + canFinalizeInPlace: boolean; + }; + const deferredFinalReplies: DeferredMattermostFinal[] = []; const resolvePreviewFinalText = (text?: string) => { if (typeof text !== "string") { @@ -2034,6 +2051,86 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} return trimmed; }; + const updateDraftPreviewToNormalSend = async (previewPostId?: string) => { + if ( + previewCompletionNotePosted || + finalizedViaPreviewPost || + typeof previewPostId !== "string" + ) { + return; + } + try { + await updateMattermostPost(client, previewPostId, { + message: "↓ See below.", + }); + previewCompletionNotePosted = true; + } catch (err) { + logVerboseMessage( + `mattermost preview completion update failed; continuing with normal send (${String(err)})`, + ); + } + }; + + const deliverDeferredFinalReply = async (entry: DeferredMattermostFinal) => { + await deliverMattermostReplyPayload({ + core, + cfg, + payload: entry.payload, + to, + accountId: account.accountId, + agentId: route.agentId, + replyToId: entry.replyRootId, + textLimit, + tableMode, + sendMessage: sendMessageMattermost, + }); + runtime.log?.(`delivered reply to ${to}`); + }; + + const finalizeOrDeliverDeferredFinalReplies = async (finalCount: number) => { + if (!deferredFinalReplies.length) { + return; + } + await draftStream.flush(); + const pendingFinalReplies = deferredFinalReplies.splice(0); + const firstFinal = pendingFinalReplies[0]; + if ( + finalCount === 1 && + firstFinal?.canFinalizeInPlace === true && + typeof firstFinal.previewPostId === "string" && + typeof firstFinal.previewFinalText === "string" + ) { + try { + await updateMattermostPost(client, firstFinal.previewPostId, { + message: firstFinal.previewFinalText, + }); + finalizedViaPreviewPost = true; + return; + } catch (err) { + logVerboseMessage( + `mattermost preview final edit failed; falling back to normal send (${String(err)})`, + ); + } + } + + const previewPostId = pendingFinalReplies.find( + (entry) => typeof entry.previewPostId === "string", + )?.previewPostId; + await updateDraftPreviewToNormalSend(previewPostId); + let deliveredDeferredCount = 0; + for (const entry of pendingFinalReplies) { + await deliverDeferredFinalReply(entry); + deliveredDeferredCount += 1; + } + if ( + didDeliverAllMattermostDeferredFinalReplies({ + deliveredCount: deliveredDeferredCount, + deferredCount: pendingFinalReplies.length, + }) + ) { + } + }; + const updateDraftFromPartial = (text?: string) => { const cleaned = text?.trim(); if (!cleaned) { @@ -2068,37 +2165,21 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; const previewFinalText = resolvePreviewFinalText(payload.text); const previewPostId = draftStream.postId(); - - if ( - typeof previewPostId === "string" && - !hasMedia && - typeof previewFinalText === "string" && - !payload.isError - ) { - try { - await updateMattermostPost(client, previewPostId, { - message: previewFinalText, - }); - finalizedViaPreviewPost = true; - return; - } catch (err) { - logVerboseMessage( - `mattermost preview final edit failed; falling back to normal send (${String(err)})`, - ); - } - } - - if (typeof previewPostId === "string" && !finalizedViaPreviewPost) { - try { - await updateMattermostPost(client, previewPostId, { - message: "↓ See below.", - }); - } catch (err) { - logVerboseMessage( - `mattermost preview completion update failed; continuing with normal send (${String(err)})`, - ); - } - } + deferredFinalReplies.push({ + payload, + replyRootId: resolveMattermostReplyRootId({ + threadRootId: effectiveReplyToId, + replyToId: payload.replyToId, + }), + previewPostId, + previewFinalText, + canFinalizeInPlace: + typeof previewPostId === "string" && + !hasMedia && + typeof previewFinalText === "string" && + !payload.isError, + }); + return; } await deliverMattermostReplyPayload({ @@ -2124,7 +2205,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} }); try { - await core.channel.reply.withReplyDispatcher({ + const dispatchResult = await core.channel.reply.withReplyDispatcher({ dispatcher, onSettled: () => { markDispatchIdle(); @@ -2158,6 +2239,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} }, }), }); + await finalizeOrDeliverDeferredFinalReplies(dispatchResult.counts.final); } finally { try { await draftStream.stop();