diff --git a/extensions/mattermost/src/mattermost/client.ts b/extensions/mattermost/src/mattermost/client.ts index 6f4a14f1e46..3e1c3c93139 100644 --- a/extensions/mattermost/src/mattermost/client.ts +++ b/extensions/mattermost/src/mattermost/client.ts @@ -545,6 +545,15 @@ export async function updateMattermostPost( }); } +export async function deleteMattermostPost( + client: MattermostClient, + postId: string, +): Promise { + await client.request(`/posts/${postId}`, { + method: "DELETE", + }); +} + export async function uploadMattermostFile( client: MattermostClient, params: { diff --git a/extensions/mattermost/src/mattermost/draft-stream.test.ts b/extensions/mattermost/src/mattermost/draft-stream.test.ts index 0b9c6de368f..37d82c2425a 100644 --- a/extensions/mattermost/src/mattermost/draft-stream.test.ts +++ b/extensions/mattermost/src/mattermost/draft-stream.test.ts @@ -10,11 +10,14 @@ type RequestRecord = { function createMockClient(): { client: MattermostClient; calls: RequestRecord[]; - request: ReturnType; + requestMock: ReturnType; } { const calls: RequestRecord[] = []; let nextId = 1; - const request = vi.fn(async (path: string, init?: RequestInit): Promise => { + const requestImpl: MattermostClient["request"] = async ( + path: string, + init?: RequestInit, + ): Promise => { calls.push({ path, init }); if (path === "/posts") { return { id: `post-${nextId++}` } as T; @@ -23,15 +26,16 @@ function createMockClient(): { return { id: "patched" } as T; } return {} as T; - }); + }; + const requestMock = vi.fn(requestImpl); const client: MattermostClient = { baseUrl: "https://chat.example.com", apiBaseUrl: "https://chat.example.com/api/v4", token: "token", - request: request as MattermostClient["request"], - fetchImpl: vi.fn(async () => new Response(null, { status: 204 })), + request: requestMock as MattermostClient["request"], + fetchImpl: vi.fn() as MattermostClient["fetchImpl"], }; - return { client, calls, request }; + return { client, calls, requestMock }; } describe("createMattermostDraftStream", () => { @@ -52,7 +56,7 @@ describe("createMattermostDraftStream", () => { expect(calls).toHaveLength(1); expect(calls[0]?.path).toBe("/posts"); - const createBody = JSON.parse(calls[0]?.init?.body as string); + const createBody = JSON.parse((calls[0]?.init?.body as string | undefined) ?? "{}"); expect(createBody).toMatchObject({ channel_id: "channel-1", root_id: "root-1", @@ -77,17 +81,61 @@ describe("createMattermostDraftStream", () => { expect(calls).toHaveLength(1); }); + it("clears the preview post when no final reply is delivered", async () => { + const { client, calls } = createMockClient(); + const stream = createMattermostDraftStream({ + client, + channelId: "channel-1", + rootId: "root-1", + throttleMs: 0, + }); + + stream.update("Working..."); + await stream.flush(); + await stream.clear(); + + expect(calls).toHaveLength(2); + expect(calls[1]?.path).toBe("/posts/post-1"); + expect(calls[1]?.init?.method).toBe("DELETE"); + expect(stream.postId()).toBeUndefined(); + }); + + it("stop flushes the last pending update and ignores later ones", async () => { + const { client, calls } = createMockClient(); + const stream = createMattermostDraftStream({ + client, + channelId: "channel-1", + rootId: "root-1", + throttleMs: 1000, + }); + + stream.update("Working..."); + await stream.flush(); + stream.update("Stale partial"); + await stream.stop(); + stream.update("Late partial"); + await stream.flush(); + + expect(calls).toHaveLength(2); + expect(calls[0]?.path).toBe("/posts"); + expect(calls[1]?.path).toBe("/posts/post-1"); + expect(JSON.parse((calls[1]?.init?.body as string | undefined) ?? "{}")).toMatchObject({ + message: "Stale partial", + }); + }); + it("warns and stops when preview creation fails", async () => { const warn = vi.fn(); - const request = vi.fn(async () => { + const requestImpl: MattermostClient["request"] = async () => { throw new Error("boom"); - }); + }; + const requestMock = vi.fn(requestImpl); const client: MattermostClient = { baseUrl: "https://chat.example.com", apiBaseUrl: "https://chat.example.com/api/v4", token: "token", - request: request as MattermostClient["request"], - fetchImpl: vi.fn(async () => new Response(null, { status: 204 })), + request: requestMock as MattermostClient["request"], + fetchImpl: vi.fn() as MattermostClient["fetchImpl"], }; const stream = createMattermostDraftStream({ client, @@ -102,19 +150,65 @@ describe("createMattermostDraftStream", () => { await stream.flush(); expect(warn).toHaveBeenCalled(); - expect(request).toHaveBeenCalledTimes(1); + expect(requestMock).toHaveBeenCalledTimes(1); expect(stream.postId()).toBeUndefined(); }); + + it("does not resend after an update failure followed by stop", async () => { + const warn = vi.fn(); + const calls: RequestRecord[] = []; + let failNextPatch = true; + const requestImpl: MattermostClient["request"] = async ( + path: string, + init?: RequestInit, + ): Promise => { + calls.push({ path, init }); + if (path === "/posts") { + return { id: "post-1" } as T; + } + if (path === "/posts/post-1") { + if (failNextPatch) { + failNextPatch = false; + throw new Error("patch failed"); + } + return { id: "patched" } as T; + } + return {} as T; + }; + const requestMock = vi.fn(requestImpl); + const client: MattermostClient = { + baseUrl: "https://chat.example.com", + apiBaseUrl: "https://chat.example.com/api/v4", + token: "token", + request: requestMock as MattermostClient["request"], + fetchImpl: vi.fn() as MattermostClient["fetchImpl"], + }; + const stream = createMattermostDraftStream({ + client, + channelId: "channel-1", + throttleMs: 1000, + warn, + }); + + stream.update("Working..."); + await stream.flush(); + stream.update("Will fail"); + await stream.flush(); + await stream.stop(); + + expect(warn).toHaveBeenCalledWith("mattermost stream preview failed: patch failed"); + expect(calls).toHaveLength(2); + expect(calls[0]?.path).toBe("/posts"); + expect(calls[1]?.path).toBe("/posts/post-1"); + }); }); describe("buildMattermostToolStatusText", () => { - it("renders a start status when phase is absent", () => { + it("renders a status with the tool name", () => { expect(buildMattermostToolStatusText({ name: "read" })).toBe("Running `read`…"); }); - it("renders an update status when phase is update", () => { - expect(buildMattermostToolStatusText({ name: "exec", phase: "update" })).toBe( - "Running `exec`…", - ); + it("falls back to a generic running tool status", () => { + expect(buildMattermostToolStatusText({ name: "exec" })).toBe("Running `exec`…"); }); }); diff --git a/extensions/mattermost/src/mattermost/draft-stream.ts b/extensions/mattermost/src/mattermost/draft-stream.ts index bcfe27e9692..3b928448022 100644 --- a/extensions/mattermost/src/mattermost/draft-stream.ts +++ b/extensions/mattermost/src/mattermost/draft-stream.ts @@ -1,5 +1,10 @@ -import { createFinalizableDraftStreamControlsForState } from "openclaw/plugin-sdk/channel-lifecycle"; -import { createMattermostPost, updateMattermostPost, type MattermostClient } from "./client.js"; +import { createFinalizableDraftLifecycle } from "openclaw/plugin-sdk/channel-lifecycle"; +import { + createMattermostPost, + deleteMattermostPost, + updateMattermostPost, + type MattermostClient, +} from "./client.js"; const MATTERMOST_STREAM_MAX_CHARS = 4000; const DEFAULT_THROTTLE_MS = 1000; @@ -8,6 +13,7 @@ export type MattermostDraftStream = { update: (text: string) => void; flush: () => Promise; postId: () => string | undefined; + clear: () => Promise; stop: () => Promise; forceNewMessage: () => void; }; @@ -89,10 +95,20 @@ export function createMattermostDraftStream(params: { } }; - const { loop, update, stop } = createFinalizableDraftStreamControlsForState({ + const { loop, update, stop, clear } = createFinalizableDraftLifecycle({ throttleMs, state: streamState, sendOrEditStreamMessage, + readMessageId: () => streamPostId, + clearMessageId: () => { + streamPostId = undefined; + }, + isValidMessageId: (value): value is string => typeof value === "string" && value.length > 0, + deleteMessage: async (postId) => { + await deleteMattermostPost(params.client, postId); + }, + warn: params.warn, + warnPrefix: "mattermost stream preview cleanup failed", }); const forceNewMessage = () => { @@ -108,6 +124,7 @@ export function createMattermostDraftStream(params: { update, flush: loop.flush, postId: () => streamPostId, + clear, stop, forceNewMessage, }; diff --git a/extensions/mattermost/src/mattermost/monitor.test.ts b/extensions/mattermost/src/mattermost/monitor.test.ts index 5aba5ef2fa6..fabd2c77e89 100644 --- a/extensions/mattermost/src/mattermost/monitor.test.ts +++ b/extensions/mattermost/src/mattermost/monitor.test.ts @@ -1,10 +1,13 @@ import { createClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe"; -import { describe, expect, it, vi } from "vitest"; +import { beforeEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../../runtime-api.js"; import { resolveMattermostAccount } from "./accounts.js"; +import * as clientModule from "./client.js"; +import type { MattermostClient } from "./client.js"; import { buildMattermostModelPickerSelectMessageSid, - didDeliverAllMattermostDeferredFinalReplies, + canFinalizeMattermostPreviewInPlace, + deliverMattermostReplyWithDraftPreview, evaluateMattermostMentionGate, MattermostRetryableInboundError, processMattermostReplayGuardedPost, @@ -12,6 +15,8 @@ import { resolveMattermostEffectiveReplyToId, resolveMattermostReplyRootId, resolveMattermostThreadSessionContext, + shouldFinalizeMattermostPreviewAfterDispatch, + shouldClearMattermostDraftPreview, type MattermostMentionGateInput, type MattermostRequireMentionResolverInput, } from "./monitor.js"; @@ -42,6 +47,33 @@ function resolveRequireMentionForTest(params: MattermostRequireMentionResolverIn return true; } +const updateMattermostPostSpy = vi.spyOn(clientModule, "updateMattermostPost"); + +function createMattermostClientMock(): MattermostClient { + return { + baseUrl: "https://chat.example.com", + apiBaseUrl: "https://chat.example.com/api/v4", + token: "token", + request: vi.fn(async () => ({})) as MattermostClient["request"], + fetchImpl: vi.fn( + async () => new Response(null, { status: 200 }), + ) as MattermostClient["fetchImpl"], + }; +} + +function createDraftStreamMock(postId: string | undefined = "preview-post-1") { + return { + flush: vi.fn(async () => {}), + postId: vi.fn(() => postId), + clear: vi.fn(async () => {}), + }; +} + +beforeEach(() => { + vi.clearAllMocks(); + updateMattermostPostSpy.mockResolvedValue({ id: "patched" } as never); +}); + function evaluateMentionGateForMessage(params: { cfg: OpenClawConfig; threadRootId?: string }) { const account = resolveMattermostAccount({ cfg: params.cfg, accountId: "default" }); const resolver = vi.fn(resolveRequireMentionForTest); @@ -168,6 +200,182 @@ describe("resolveMattermostReplyRootId", () => { }); }); +describe("canFinalizeMattermostPreviewInPlace", () => { + it("allows in-place finalization when the final reply target matches the preview thread", () => { + expect( + canFinalizeMattermostPreviewInPlace({ + previewRootId: "thread-root-456", + threadRootId: "thread-root-456", + replyToId: "child-post-789", + }), + ).toBe(true); + }); + + it("prevents in-place finalization when a top-level preview would become a threaded reply", () => { + expect( + canFinalizeMattermostPreviewInPlace({ + replyToId: "child-post-789", + }), + ).toBe(false); + }); +}); + +describe("shouldClearMattermostDraftPreview", () => { + it("deletes the preview after successful normal final delivery", () => { + expect( + shouldClearMattermostDraftPreview({ + finalizedViaPreviewPost: false, + finalReplyDelivered: true, + }), + ).toBe(true); + }); + + it("keeps the preview when final delivery failed", () => { + expect( + shouldClearMattermostDraftPreview({ + finalizedViaPreviewPost: false, + finalReplyDelivered: false, + }), + ).toBe(false); + }); + + it("keeps the preview when it already became the final reply", () => { + expect( + shouldClearMattermostDraftPreview({ + finalizedViaPreviewPost: true, + finalReplyDelivered: true, + }), + ).toBe(false); + }); +}); + +describe("deliverMattermostReplyWithDraftPreview", () => { + it("deletes the preview after a successful normal final send", async () => { + const draftStream = createDraftStreamMock(); + const deliverFinal = vi.fn(async () => {}); + + await deliverMattermostReplyWithDraftPreview({ + payload: { text: "All good", replyToId: "reply-1" } as never, + info: { kind: "final" }, + client: createMattermostClientMock(), + draftStream, + resolvePreviewFinalText: (text) => text?.trim(), + previewState: { finalizedViaPreviewPost: false }, + logVerboseMessage: vi.fn(), + deliverFinal, + }); + + expect(deliverFinal).toHaveBeenCalledTimes(1); + expect(draftStream.clear).toHaveBeenCalledTimes(1); + expect(updateMattermostPostSpy).not.toHaveBeenCalled(); + }); + + it("deletes the preview after a successful non-finalizable media final", async () => { + const draftStream = createDraftStreamMock(); + const deliverFinal = vi.fn(async () => {}); + + await deliverMattermostReplyWithDraftPreview({ + payload: { + text: "Photo", + replyToId: "reply-1", + mediaUrl: "https://example.com/a.png", + } as never, + info: { kind: "final" }, + client: createMattermostClientMock(), + draftStream, + effectiveReplyToId: "thread-root-1", + resolvePreviewFinalText: (text) => text?.trim(), + previewState: { finalizedViaPreviewPost: false }, + logVerboseMessage: vi.fn(), + deliverFinal, + }); + + expect(deliverFinal).toHaveBeenCalledTimes(1); + expect(draftStream.clear).toHaveBeenCalledTimes(1); + }); + + it("finalizes the preview in place when the final targets the same thread", async () => { + const draftStream = createDraftStreamMock(); + const deliverFinal = vi.fn(async () => {}); + + await deliverMattermostReplyWithDraftPreview({ + payload: { text: "Final answer", replyToId: "child-post-789" } as never, + info: { kind: "final" }, + client: createMattermostClientMock(), + draftStream, + effectiveReplyToId: "thread-root-456", + resolvePreviewFinalText: (text) => text?.trim(), + previewState: { finalizedViaPreviewPost: false }, + logVerboseMessage: vi.fn(), + deliverFinal, + }); + + expect(updateMattermostPostSpy).toHaveBeenCalledWith( + expect.anything(), + "preview-post-1", + expect.objectContaining({ message: "Final answer" }), + ); + expect(deliverFinal).not.toHaveBeenCalled(); + expect(draftStream.clear).not.toHaveBeenCalled(); + }); + + it("keeps the existing preview unchanged when final delivery fails", async () => { + const draftStream = createDraftStreamMock(); + const deliverFinal = vi.fn(async () => { + throw new Error("send failed"); + }); + + await expect( + deliverMattermostReplyWithDraftPreview({ + payload: { text: "Broken", replyToId: "reply-1" } as never, + info: { kind: "final" }, + client: createMattermostClientMock(), + draftStream, + resolvePreviewFinalText: (text) => text?.trim(), + previewState: { finalizedViaPreviewPost: false }, + logVerboseMessage: vi.fn(), + deliverFinal, + }), + ).rejects.toThrow("send failed"); + + expect(draftStream.clear).not.toHaveBeenCalled(); + expect(updateMattermostPostSpy).not.toHaveBeenCalledWith( + expect.anything(), + "preview-post-1", + expect.objectContaining({ message: "↓ See below." }), + ); + }); +}); + +describe("shouldFinalizeMattermostPreviewAfterDispatch", () => { + it("reuses the preview only for a single eligible final payload", () => { + expect( + shouldFinalizeMattermostPreviewAfterDispatch({ + finalCount: 1, + canFinalizeInPlace: true, + }), + ).toBe(true); + }); + + it("falls back to normal sends for multi-payload finals", () => { + expect( + shouldFinalizeMattermostPreviewAfterDispatch({ + finalCount: 2, + canFinalizeInPlace: true, + }), + ).toBe(false); + }); + + it("falls back to normal sends when the final cannot be edited into the preview", () => { + expect( + shouldFinalizeMattermostPreviewAfterDispatch({ + finalCount: 1, + canFinalizeInPlace: false, + }), + ).toBe(false); + }); +}); + describe("resolveMattermostEffectiveReplyToId", () => { it("keeps an existing thread root", () => { expect( @@ -437,23 +645,3 @@ describe("resolveMattermostReactionChannelId", () => { expect(resolveMattermostReactionChannelId({})).toBeUndefined(); }); }); - -describe("didDeliverAllMattermostDeferredFinalReplies", () => { - it("returns true when all deferred finals were delivered", () => { - expect( - didDeliverAllMattermostDeferredFinalReplies({ - deliveredCount: 2, - deferredCount: 2, - }), - ).toBe(true); - }); - - it("returns false when a later deferred final failed", () => { - expect( - didDeliverAllMattermostDeferredFinalReplies({ - deliveredCount: 1, - deferredCount: 2, - }), - ).toBe(false); - }); -}); diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index a657b80f69f..90826dc49b0 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -11,6 +11,7 @@ import { fetchMattermostMe, normalizeMattermostBaseUrl, updateMattermostPost, + type MattermostClient, type MattermostPost, type MattermostUser, } from "./client.js"; @@ -238,11 +239,112 @@ export function resolveMattermostReplyRootId(params: { return normalizeOptionalString(params.replyToId); } -export function didDeliverAllMattermostDeferredFinalReplies(params: { - deliveredCount: number; - deferredCount: number; +export function canFinalizeMattermostPreviewInPlace(params: { + previewRootId?: string; + threadRootId?: string; + replyToId?: string; }): boolean { - return params.deferredCount > 0 && params.deliveredCount === params.deferredCount; + return ( + resolveMattermostReplyRootId({ + threadRootId: params.threadRootId, + replyToId: params.replyToId, + }) === params.previewRootId?.trim() + ); +} + +export function shouldClearMattermostDraftPreview(params: { + finalizedViaPreviewPost: boolean; + finalReplyDelivered: boolean; +}): boolean { + return params.finalReplyDelivered && !params.finalizedViaPreviewPost; +} + +export function shouldFinalizeMattermostPreviewAfterDispatch(params: { + finalCount: number; + canFinalizeInPlace: boolean; +}): boolean { + return params.finalCount === 1 && params.canFinalizeInPlace; +} + +type MattermostDraftPreviewState = { + finalizedViaPreviewPost: boolean; +}; + +type MattermostDraftPreviewDeliverParams = { + payload: ReplyPayload; + info: { kind: "tool" | "block" | "final" }; + client: MattermostClient; + draftStream: Pick, "flush" | "postId" | "clear">; + effectiveReplyToId?: string; + resolvePreviewFinalText: (text?: string) => string | undefined; + previewState: MattermostDraftPreviewState; + logVerboseMessage: (message: string) => void; + deliverFinal: () => Promise; +}; + +export async function deliverMattermostReplyWithDraftPreview( + params: MattermostDraftPreviewDeliverParams, +): Promise { + if (params.payload.isReasoning) { + return; + } + + const isFinal = params.info.kind === "final"; + let previewPostId: string | undefined; + if (isFinal) { + await params.draftStream.flush(); + const hasMedia = + Boolean(params.payload.mediaUrl) || (params.payload.mediaUrls?.length ?? 0) > 0; + const previewFinalText = params.resolvePreviewFinalText(params.payload.text); + previewPostId = params.draftStream.postId(); + + if ( + typeof previewPostId === "string" && + !hasMedia && + typeof previewFinalText === "string" && + !params.payload.isError && + canFinalizeMattermostPreviewInPlace({ + previewRootId: params.effectiveReplyToId, + threadRootId: params.effectiveReplyToId, + replyToId: params.payload.replyToId, + }) + ) { + try { + await updateMattermostPost(params.client, previewPostId, { + message: previewFinalText, + }); + params.previewState.finalizedViaPreviewPost = true; + return; + } catch (err) { + params.logVerboseMessage( + `mattermost preview final edit failed; falling back to normal send (${String(err)})`, + ); + } + } + } + + let finalReplyDelivered = false; + try { + await params.deliverFinal(); + finalReplyDelivered = true; + } finally { + if ( + isFinal && + typeof previewPostId === "string" && + shouldClearMattermostDraftPreview({ + finalizedViaPreviewPost: params.previewState.finalizedViaPreviewPost, + finalReplyDelivered, + }) + ) { + try { + await params.draftStream.clear(); + } catch (err) { + params.logVerboseMessage( + `mattermost draft preview clear failed after successful final delivery (${String(err)})`, + ); + } + } + } } export function resolveMattermostEffectiveReplyToId(params: { @@ -1525,52 +1627,156 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} }, }, }); - const { dispatcher, replyOptions, markDispatchIdle } = + const draftStream = createMattermostDraftStream({ + client, + channelId, + rootId: effectiveReplyToId, + throttleMs: 1200, + log: logVerboseMessage, + warn: logVerboseMessage, + }); + let lastPartialText = ""; + const previewState: MattermostDraftPreviewState = { + finalizedViaPreviewPost: false, + }; + + const resolvePreviewFinalText = (text?: string) => { + if (typeof text !== "string") { + return undefined; + } + const formatted = core.channel.text.convertMarkdownTables(text, tableMode); + const chunkMode = core.channel.text.resolveChunkMode( + cfg, + "mattermost", + account.accountId, + ); + const chunks = core.channel.text.chunkMarkdownTextWithMode( + formatted, + textLimit, + chunkMode, + ); + if (!chunks.length && formatted) { + chunks.push(formatted); + } + if (chunks.length != 1) { + return undefined; + } + const trimmed = chunks[0]?.trim(); + if (!trimmed) { + return undefined; + } + if ( + lastPartialText && + lastPartialText.startsWith(trimmed) && + trimmed.length < lastPartialText.length + ) { + return undefined; + } + return trimmed; + }; + + const updateDraftFromPartial = (text?: string) => { + const cleaned = text?.trim(); + if (!cleaned) { + return; + } + if (cleaned === lastPartialText) { + return; + } + if ( + lastPartialText && + lastPartialText.startsWith(cleaned) && + cleaned.length < lastPartialText.length + ) { + return; + } + lastPartialText = cleaned; + draftStream.update(cleaned); + }; + + const { dispatcher, replyOptions, markDispatchIdle, markRunComplete } = core.channel.reply.createReplyDispatcherWithTyping({ ...replyPipeline, humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId), typingCallbacks, - deliver: async (payload: ReplyPayload) => { - await deliverMattermostReplyPayload({ - core, - cfg, + deliver: async (payload: ReplyPayload, info) => { + await deliverMattermostReplyWithDraftPreview({ payload, - to, - accountId: account.accountId, - agentId: route.agentId, - replyToId: resolveMattermostReplyRootId({ - threadRootId: effectiveReplyToId, - replyToId: payload.replyToId, - }), - textLimit, - tableMode, - sendMessage: sendMessageMattermost, + info, + client, + draftStream, + effectiveReplyToId, + resolvePreviewFinalText, + previewState, + logVerboseMessage, + deliverFinal: async () => { + await deliverMattermostReplyPayload({ + core, + cfg, + payload, + to, + accountId: account.accountId, + agentId: route.agentId, + replyToId: resolveMattermostReplyRootId({ + threadRootId: effectiveReplyToId, + replyToId: payload.replyToId, + }), + textLimit, + tableMode, + sendMessage: sendMessageMattermost, + }); + runtime.log?.(`delivered reply to ${to}`); + }, }); - runtime.log?.(`delivered reply to ${to}`); }, onError: (err, info) => { runtime.error?.(`mattermost ${info.kind} reply failed: ${String(err)}`); }, }); - await core.channel.reply.withReplyDispatcher({ - dispatcher, - onSettled: () => { - markDispatchIdle(); - }, - run: () => - core.channel.reply.dispatchReplyFromConfig({ - ctx: ctxPayload, - cfg, - dispatcher, - replyOptions: { - ...replyOptions, - disableBlockStreaming: - typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined, - onModelSelected, - }, - }), - }); + try { + await core.channel.reply.withReplyDispatcher({ + dispatcher, + onSettled: () => { + markDispatchIdle(); + }, + run: () => + core.channel.reply.dispatchReplyFromConfig({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions: { + ...replyOptions, + disableBlockStreaming: true, + onModelSelected, + onPartialReply: (payload) => { + updateDraftFromPartial(payload.text); + }, + onAssistantMessageStart: () => { + lastPartialText = ""; + }, + onReasoningEnd: () => { + lastPartialText = ""; + }, + onReasoningStream: async () => { + if (!lastPartialText) { + draftStream.update("Thinking…"); + } + }, + onToolStart: async (payload) => { + draftStream.update(buildMattermostToolStatusText(payload)); + }, + }, + }), + }); + } finally { + try { + await draftStream.stop(); + } catch (err) { + logVerboseMessage(`mattermost draft preview cleanup failed: ${String(err)}`); + } + markRunComplete(); + } if (historyKey) { clearHistoryEntriesIfEnabled({ historyMap: channelHistories, @@ -1586,675 +1792,6 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} ); return; } - const senderId = post.user_id ?? payload.broadcast?.user_id; - if (!senderId) { - logVerboseMessage("mattermost: drop post (missing sender id)"); - return; - } - if (senderId === botUserId) { - logVerboseMessage(`mattermost: drop post (self sender=${senderId})`); - return; - } - if (isSystemPost(post)) { - logVerboseMessage(`mattermost: drop post (system post type=${post.type ?? "unknown"})`); - return; - } - - const channelInfo = await resolveChannelInfo(channelId); - const channelType = payload.data?.channel_type ?? channelInfo?.type ?? undefined; - const kind = mapMattermostChannelTypeToChatType(channelType); - const chatType = channelChatType(kind); - - const senderName = - payload.data?.sender_name?.trim() || - (await resolveUserInfo(senderId))?.username?.trim() || - senderId; - const rawText = post.message?.trim() || ""; - const dmPolicy = account.config.dmPolicy ?? "pairing"; - const normalizedAllowFrom = normalizeMattermostAllowList(account.config.allowFrom ?? []); - const normalizedGroupAllowFrom = normalizeMattermostAllowList( - account.config.groupAllowFrom ?? [], - ); - const storeAllowFrom = normalizeMattermostAllowList( - await readStoreAllowFromForDmPolicy({ - provider: "mattermost", - accountId: account.accountId, - dmPolicy, - readStore: pairing.readStoreForDmPolicy, - }), - ); - const accessDecision = resolveDmGroupAccessWithLists({ - isGroup: kind !== "direct", - dmPolicy, - groupPolicy, - allowFrom: normalizedAllowFrom, - groupAllowFrom: normalizedGroupAllowFrom, - storeAllowFrom, - isSenderAllowed: (allowFrom) => - isMattermostSenderAllowed({ - senderId, - senderName, - allowFrom, - allowNameMatching, - }), - }); - const effectiveAllowFrom = accessDecision.effectiveAllowFrom; - const effectiveGroupAllowFrom = accessDecision.effectiveGroupAllowFrom; - const allowTextCommands = core.channel.commands.shouldHandleTextCommands({ - cfg, - surface: "mattermost", - }); - const hasControlCommand = core.channel.text.hasControlCommand(rawText, cfg); - const isControlCommand = allowTextCommands && hasControlCommand; - const useAccessGroups = cfg.commands?.useAccessGroups !== false; - const commandDmAllowFrom = kind === "direct" ? effectiveAllowFrom : normalizedAllowFrom; - const senderAllowedForCommands = isMattermostSenderAllowed({ - senderId, - senderName, - allowFrom: commandDmAllowFrom, - allowNameMatching, - }); - const groupAllowedForCommands = isMattermostSenderAllowed({ - senderId, - senderName, - allowFrom: effectiveGroupAllowFrom, - allowNameMatching, - }); - const commandGate = resolveControlCommandGate({ - useAccessGroups, - authorizers: [ - { configured: commandDmAllowFrom.length > 0, allowed: senderAllowedForCommands }, - { - configured: effectiveGroupAllowFrom.length > 0, - allowed: groupAllowedForCommands, - }, - ], - allowTextCommands, - hasControlCommand, - }); - const commandAuthorized = commandGate.commandAuthorized; - - if (accessDecision.decision !== "allow") { - if (kind === "direct") { - if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.DM_POLICY_DISABLED) { - logVerboseMessage(`mattermost: drop dm (dmPolicy=disabled sender=${senderId})`); - return; - } - if (accessDecision.decision === "pairing") { - const { code, created } = await pairing.upsertPairingRequest({ - id: senderId, - meta: { name: senderName }, - }); - logVerboseMessage(`mattermost: pairing request sender=${senderId} created=${created}`); - if (created) { - try { - await sendMessageMattermost( - `user:${senderId}`, - core.channel.pairing.buildPairingReply({ - channel: "mattermost", - idLine: `Your Mattermost user id: ${senderId}`, - code, - }), - { cfg, accountId: account.accountId }, - ); - opts.statusSink?.({ lastOutboundAt: Date.now() }); - } catch (err) { - logVerboseMessage(`mattermost: pairing reply failed for ${senderId}: ${String(err)}`); - } - } - return; - } - logVerboseMessage(`mattermost: drop dm sender=${senderId} (dmPolicy=${dmPolicy})`); - return; - } - if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_DISABLED) { - logVerboseMessage("mattermost: drop group message (groupPolicy=disabled)"); - return; - } - if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_EMPTY_ALLOWLIST) { - logVerboseMessage("mattermost: drop group message (no group allowlist)"); - return; - } - if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_NOT_ALLOWLISTED) { - logVerboseMessage(`mattermost: drop group sender=${senderId} (not in groupAllowFrom)`); - return; - } - logVerboseMessage( - `mattermost: drop group message (groupPolicy=${groupPolicy} reason=${accessDecision.reason})`, - ); - return; - } - - if (kind !== "direct" && commandGate.shouldBlock) { - logInboundDrop({ - log: logVerboseMessage, - channel: "mattermost", - reason: "control command (unauthorized)", - target: senderId, - }); - return; - } - - const teamId = payload.data?.team_id ?? channelInfo?.team_id ?? undefined; - const channelName = payload.data?.channel_name ?? channelInfo?.name ?? ""; - const channelDisplay = - payload.data?.channel_display_name ?? channelInfo?.display_name ?? channelName; - const roomLabel = channelName ? `#${channelName}` : channelDisplay || `#${channelId}`; - - const route = core.channel.routing.resolveAgentRoute({ - cfg, - channel: "mattermost", - accountId: account.accountId, - teamId, - peer: { - kind, - id: kind === "direct" ? senderId : channelId, - }, - }); - - const baseSessionKey = route.sessionKey; - const threadRootId = post.root_id?.trim() || undefined; - const replyToMode = resolveMattermostReplyToMode(account, kind); - const threadContext = resolveMattermostThreadSessionContext({ - baseSessionKey, - kind, - postId: post.id, - replyToMode, - threadRootId, - }); - const { effectiveReplyToId, sessionKey, parentSessionKey } = threadContext; - const historyKey = kind === "direct" ? null : sessionKey; - - const mentionRegexes = core.channel.mentions.buildMentionRegexes(cfg, route.agentId); - const wasMentioned = - kind !== "direct" && - ((botUsername ? rawText.toLowerCase().includes(`@${botUsername.toLowerCase()}`) : false) || - core.channel.mentions.matchesMentionPatterns(rawText, mentionRegexes)); - const pendingBody = - rawText || - (post.file_ids?.length - ? `[Mattermost ${post.file_ids.length === 1 ? "file" : "files"}]` - : ""); - const pendingSender = senderName; - const recordPendingHistory = () => { - const trimmed = pendingBody.trim(); - recordPendingHistoryEntryIfEnabled({ - historyMap: channelHistories, - limit: historyLimit, - historyKey: historyKey ?? "", - entry: - historyKey && trimmed - ? { - sender: pendingSender, - body: trimmed, - timestamp: typeof post.create_at === "number" ? post.create_at : undefined, - messageId: post.id ?? undefined, - } - : null, - }); - }; - - const oncharEnabled = account.chatmode === "onchar" && kind !== "direct"; - const oncharPrefixes = oncharEnabled ? resolveOncharPrefixes(account.oncharPrefixes) : []; - const oncharResult = oncharEnabled - ? stripOncharPrefix(rawText, oncharPrefixes) - : { triggered: false, stripped: rawText }; - const oncharTriggered = oncharResult.triggered; - const canDetectMention = Boolean(botUsername) || mentionRegexes.length > 0; - const mentionDecision = evaluateMattermostMentionGate({ - kind, - cfg, - accountId: account.accountId, - channelId, - threadRootId, - requireMentionOverride: account.requireMention, - resolveRequireMention: core.channel.groups.resolveRequireMention, - wasMentioned, - isControlCommand, - commandAuthorized, - oncharEnabled, - oncharTriggered, - canDetectMention, - }); - const { shouldRequireMention, shouldBypassMention } = mentionDecision; - - if (mentionDecision.dropReason === "onchar-not-triggered") { - logVerboseMessage( - `mattermost: drop group message (onchar not triggered channel=${channelId} sender=${senderId})`, - ); - recordPendingHistory(); - return; - } - - if (mentionDecision.dropReason === "missing-mention") { - logVerboseMessage( - `mattermost: drop group message (missing mention channel=${channelId} sender=${senderId} requireMention=${shouldRequireMention} bypass=${shouldBypassMention} canDetectMention=${canDetectMention})`, - ); - recordPendingHistory(); - return; - } - const mediaList = await resolveMattermostMedia(post.file_ids); - const mediaPlaceholder = buildMattermostAttachmentPlaceholder(mediaList); - const bodySource = oncharTriggered ? oncharResult.stripped : rawText; - const baseText = [bodySource, mediaPlaceholder].filter(Boolean).join("\n").trim(); - const bodyText = normalizeMention(baseText, botUsername); - if (!bodyText) { - logVerboseMessage( - `mattermost: drop group message (empty body after normalization channel=${channelId} sender=${senderId})`, - ); - return; - } - - core.channel.activity.record({ - channel: "mattermost", - accountId: account.accountId, - direction: "inbound", - }); - - const fromLabel = formatInboundFromLabel({ - isGroup: kind !== "direct", - groupLabel: channelDisplay || roomLabel, - groupId: channelId, - groupFallback: roomLabel || "Channel", - directLabel: senderName, - directId: senderId, - }); - - const preview = bodyText.replace(/\s+/g, " ").slice(0, 160); - const inboundLabel = - kind === "direct" - ? `Mattermost DM from ${senderName}` - : `Mattermost message in ${roomLabel} from ${senderName}`; - core.system.enqueueSystemEvent(`${inboundLabel}: ${preview}`, { - sessionKey, - contextKey: `mattermost:message:${channelId}:${post.id ?? "unknown"}`, - }); - - const textWithId = `${bodyText}\n[mattermost message id: ${post.id ?? "unknown"} channel: ${channelId}]`; - const body = core.channel.reply.formatInboundEnvelope({ - channel: "Mattermost", - from: fromLabel, - timestamp: typeof post.create_at === "number" ? post.create_at : undefined, - body: textWithId, - chatType, - sender: { name: senderName, id: senderId }, - }); - let combinedBody = body; - if (historyKey) { - combinedBody = buildPendingHistoryContextFromMap({ - historyMap: channelHistories, - historyKey, - limit: historyLimit, - currentMessage: combinedBody, - formatEntry: (entry) => - core.channel.reply.formatInboundEnvelope({ - channel: "Mattermost", - from: fromLabel, - timestamp: entry.timestamp, - body: `${entry.body}${ - entry.messageId ? ` [id:${entry.messageId} channel:${channelId}]` : "" - }`, - chatType, - senderLabel: entry.sender, - }), - }); - } - - const to = kind === "direct" ? `user:${senderId}` : `channel:${channelId}`; - const mediaPayload = buildAgentMediaPayload(mediaList); - const commandBody = rawText.trim(); - const inboundHistory = - historyKey && historyLimit > 0 - ? (channelHistories.get(historyKey) ?? []).map((entry) => ({ - sender: entry.sender, - body: entry.body, - timestamp: entry.timestamp, - })) - : undefined; - const ctxPayload = core.channel.reply.finalizeInboundContext({ - Body: combinedBody, - BodyForAgent: bodyText, - InboundHistory: inboundHistory, - RawBody: bodyText, - CommandBody: commandBody, - BodyForCommands: commandBody, - From: - kind === "direct" - ? `mattermost:${senderId}` - : kind === "group" - ? `mattermost:group:${channelId}` - : `mattermost:channel:${channelId}`, - To: to, - SessionKey: sessionKey, - ParentSessionKey: parentSessionKey, - AccountId: route.accountId, - ChatType: chatType, - ConversationLabel: fromLabel, - GroupSubject: kind !== "direct" ? channelDisplay || roomLabel : undefined, - GroupChannel: channelName ? `#${channelName}` : undefined, - GroupSpace: teamId, - SenderName: senderName, - SenderId: senderId, - Provider: "mattermost" as const, - Surface: "mattermost" as const, - MessageSid: post.id ?? undefined, - MessageSids: allMessageIds.length > 1 ? allMessageIds : undefined, - MessageSidFirst: allMessageIds.length > 1 ? allMessageIds[0] : undefined, - MessageSidLast: - allMessageIds.length > 1 ? allMessageIds[allMessageIds.length - 1] : undefined, - ReplyToId: effectiveReplyToId, - MessageThreadId: effectiveReplyToId, - Timestamp: typeof post.create_at === "number" ? post.create_at : undefined, - WasMentioned: kind !== "direct" ? mentionDecision.effectiveWasMentioned : undefined, - CommandAuthorized: commandAuthorized, - OriginatingChannel: "mattermost" as const, - OriginatingTo: to, - ...mediaPayload, - }); - - if (kind === "direct") { - const sessionCfg = cfg.session; - const storePath = core.channel.session.resolveStorePath(sessionCfg?.store, { - agentId: route.agentId, - }); - await core.channel.session.updateLastRoute({ - storePath, - sessionKey: route.mainSessionKey, - deliveryContext: { - channel: "mattermost", - to, - accountId: route.accountId, - }, - }); - } - - const previewLine = bodyText.slice(0, 200).replace(/\n/g, "\\n"); - logVerboseMessage( - `mattermost inbound: from=${ctxPayload.From} len=${bodyText.length} preview="${previewLine}"`, - ); - - const textLimit = core.channel.text.resolveTextChunkLimit( - cfg, - "mattermost", - account.accountId, - { - fallbackLimit: account.textChunkLimit ?? 4000, - }, - ); - const tableMode = core.channel.text.resolveMarkdownTableMode({ - cfg, - channel: "mattermost", - accountId: account.accountId, - }); - - const { onModelSelected, typingCallbacks, ...replyPipeline } = createChannelReplyPipeline({ - cfg, - agentId: route.agentId, - channel: "mattermost", - accountId: account.accountId, - typing: { - start: () => sendTypingIndicator(channelId, effectiveReplyToId), - onStartError: (err) => { - logTypingFailure({ - log: (message) => logger.debug?.(message), - channel: "mattermost", - target: channelId, - error: err, - }); - }, - }, - }); - const draftStream = createMattermostDraftStream({ - client, - channelId, - rootId: effectiveReplyToId, - throttleMs: 1200, - log: logVerboseMessage, - warn: logVerboseMessage, - }); - let lastPartialText = ""; - let finalizedViaPreviewPost = false; - let previewCompletionNotePosted = false; - type DeferredMattermostFinal = { - payload: ReplyPayload; - replyRootId?: string; - previewPostId?: string; - previewFinalText?: string; - canFinalizeInPlace: boolean; - }; - const deferredFinalReplies: DeferredMattermostFinal[] = []; - - const resolvePreviewFinalText = (text?: string) => { - if (typeof text !== "string") { - return undefined; - } - const formatted = core.channel.text.convertMarkdownTables(text, tableMode); - const chunkMode = core.channel.text.resolveChunkMode(cfg, "mattermost", account.accountId); - const chunks = core.channel.text.chunkMarkdownTextWithMode(formatted, textLimit, chunkMode); - if (!chunks.length && formatted) { - chunks.push(formatted); - } - if (chunks.length !== 1) { - return undefined; - } - const trimmed = chunks[0]?.trim(); - if (!trimmed) { - return undefined; - } - if ( - lastPartialText && - lastPartialText.startsWith(trimmed) && - trimmed.length < lastPartialText.length - ) { - return undefined; - } - return trimmed; - }; - - const updateDraftPreviewToNormalSend = async (previewPostId?: string) => { - if ( - previewCompletionNotePosted || - finalizedViaPreviewPost || - typeof previewPostId !== "string" - ) { - return; - } - try { - await updateMattermostPost(client, previewPostId, { - message: "↓ See below.", - }); - previewCompletionNotePosted = true; - } catch (err) { - logVerboseMessage( - `mattermost preview completion update failed; continuing with normal send (${String(err)})`, - ); - } - }; - - const deliverDeferredFinalReply = async (entry: DeferredMattermostFinal) => { - await deliverMattermostReplyPayload({ - core, - cfg, - payload: entry.payload, - to, - accountId: account.accountId, - agentId: route.agentId, - replyToId: entry.replyRootId, - textLimit, - tableMode, - sendMessage: sendMessageMattermost, - }); - runtime.log?.(`delivered reply to ${to}`); - }; - - const finalizeOrDeliverDeferredFinalReplies = async (finalCount: number) => { - if (!deferredFinalReplies.length) { - return; - } - await draftStream.flush(); - const pendingFinalReplies = deferredFinalReplies.splice(0); - const firstFinal = pendingFinalReplies[0]; - if ( - finalCount === 1 && - firstFinal?.canFinalizeInPlace === true && - typeof firstFinal.previewPostId === "string" && - typeof firstFinal.previewFinalText === "string" - ) { - try { - await updateMattermostPost(client, firstFinal.previewPostId, { - message: firstFinal.previewFinalText, - }); - finalizedViaPreviewPost = true; - return; - } catch (err) { - logVerboseMessage( - `mattermost preview final edit failed; falling back to normal send (${String(err)})`, - ); - } - } - - const previewPostId = pendingFinalReplies.find( - (entry) => typeof entry.previewPostId === "string", - )?.previewPostId; - await updateDraftPreviewToNormalSend(previewPostId); - let deliveredDeferredCount = 0; - for (const entry of pendingFinalReplies) { - await deliverDeferredFinalReply(entry); - deliveredDeferredCount += 1; - } - if ( - didDeliverAllMattermostDeferredFinalReplies({ - deliveredCount: deliveredDeferredCount, - deferredCount: pendingFinalReplies.length, - }) - ) { - } - }; - - const updateDraftFromPartial = (text?: string) => { - const cleaned = text?.trim(); - if (!cleaned) { - return; - } - if (cleaned === lastPartialText) { - return; - } - if ( - lastPartialText && - lastPartialText.startsWith(cleaned) && - cleaned.length < lastPartialText.length - ) { - return; - } - lastPartialText = cleaned; - draftStream.update(cleaned); - }; - - const { dispatcher, replyOptions, markDispatchIdle, markRunComplete } = - core.channel.reply.createReplyDispatcherWithTyping({ - ...replyPipeline, - humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId), - typingCallbacks, - deliver: async (payload: ReplyPayload, info) => { - if (payload.isReasoning) { - return; - } - const isFinal = info.kind === "final"; - if (isFinal) { - await draftStream.flush(); - const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; - const previewFinalText = resolvePreviewFinalText(payload.text); - const previewPostId = draftStream.postId(); - deferredFinalReplies.push({ - payload, - replyRootId: resolveMattermostReplyRootId({ - threadRootId: effectiveReplyToId, - replyToId: payload.replyToId, - }), - previewPostId, - previewFinalText, - canFinalizeInPlace: - typeof previewPostId === "string" && - !hasMedia && - typeof previewFinalText === "string" && - !payload.isError, - }); - return; - } - - await deliverMattermostReplyPayload({ - core, - cfg, - payload, - to, - accountId: account.accountId, - agentId: route.agentId, - replyToId: resolveMattermostReplyRootId({ - threadRootId: effectiveReplyToId, - replyToId: payload.replyToId, - }), - textLimit, - tableMode, - sendMessage: sendMessageMattermost, - }); - runtime.log?.(`delivered reply to ${to}`); - }, - onError: (err, info) => { - runtime.error?.(`mattermost ${info.kind} reply failed: ${String(err)}`); - }, - }); - - try { - const dispatchResult = await core.channel.reply.withReplyDispatcher({ - dispatcher, - onSettled: () => { - markDispatchIdle(); - }, - run: () => - core.channel.reply.dispatchReplyFromConfig({ - ctx: ctxPayload, - cfg, - dispatcher, - replyOptions: { - ...replyOptions, - disableBlockStreaming: true, - onModelSelected, - onPartialReply: (payload) => { - updateDraftFromPartial(payload.text); - }, - onAssistantMessageStart: () => { - lastPartialText = ""; - }, - onReasoningEnd: () => { - lastPartialText = ""; - }, - onReasoningStream: async () => { - if (!lastPartialText) { - draftStream.update("Thinking…"); - } - }, - onToolStart: async (payload) => { - draftStream.update(buildMattermostToolStatusText(payload)); - }, - }, - }), - }); - await finalizeOrDeliverDeferredFinalReplies(dispatchResult.counts.final); - } finally { - try { - await draftStream.stop(); - } catch (err) { - logVerboseMessage(`mattermost draft preview cleanup failed: ${String(err)}`); - } - markRunComplete(); - } - if (historyKey) { - clearHistoryEntriesIfEnabled({ - historyMap: channelHistories, - historyKey, - limit: historyLimit, - }); - } }; const handleReactionEvent = async (payload: MattermostEventPayload) => {