diff --git a/CHANGELOG.md b/CHANGELOG.md index 7dd8ba5adc8..fb7c3ba9402 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ Docs: https://docs.openclaw.ai - TUI/chat log: reuse the active assistant message component for the same streaming run so `openclaw tui` no longer renders duplicate assistant replies. (#35364) Thanks @lisitan. - macOS/Reminders: add the missing `NSRemindersUsageDescription` to the bundled app so `apple-reminders` can trigger the system permission prompt from OpenClaw.app. (#8559) Thanks @dinakars777. - iMessage/self-chat echo dedupe: drop reflected duplicate copies only when a matching `is_from_me` event was just seen for the same chat, text, and `created_at`, preventing self-chat loops without broad text-only suppression. Related to #32166. (#38440) Thanks @vincentkoc. +- Mattermost/block streaming: fix duplicate message delivery (one threaded, one top-level) when block streaming is active by excluding `replyToId` from the block reply dedup key and adding an explicit `threading` dock to the Mattermost plugin. (#41362) Thanks @mathiasnagler and @vincentkoc. - BlueBubbles/self-chat echo dedupe: drop reflected duplicate webhook copies only when a matching `fromMe` event was just seen for the same chat, body, and timestamp, preventing self-chat loops without broad webhook suppression. Related to #32166. (#38442) Thanks @vincentkoc. ## 2026.3.11 diff --git a/extensions/mattermost/src/channel.ts b/extensions/mattermost/src/channel.ts index 2dffaa6f3cf..42d167948a0 100644 --- a/extensions/mattermost/src/channel.ts +++ b/extensions/mattermost/src/channel.ts @@ -270,6 +270,16 @@ export const mattermostPlugin: ChannelPlugin = { streaming: { blockStreamingCoalesceDefaults: { minChars: 1500, idleMs: 1000 }, }, + threading: { + resolveReplyToMode: ({ cfg, accountId }) => { + const account = resolveMattermostAccount({ cfg, accountId: accountId ?? "default" }); + const mode = account.config.replyToMode; + if (mode === "off" || mode === "first") { + return mode; + } + return "all"; + }, + }, reload: { configPrefixes: ["channels.mattermost"] }, configSchema: buildChannelConfigSchema(MattermostConfigSchema), config: { diff --git a/extensions/mattermost/src/config-schema.ts b/extensions/mattermost/src/config-schema.ts index 51d9bdbe33a..43dd7ede8d2 100644 --- a/extensions/mattermost/src/config-schema.ts +++ b/extensions/mattermost/src/config-schema.ts @@ -43,6 +43,7 @@ const MattermostAccountSchemaBase = z chunkMode: z.enum(["length", "newline"]).optional(), blockStreaming: z.boolean().optional(), blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), + replyToMode: z.enum(["off", "first", "all"]).optional(), responsePrefix: z.string().optional(), actions: z .object({ diff --git a/extensions/mattermost/src/mattermost/monitor.test.ts b/extensions/mattermost/src/mattermost/monitor.test.ts index 1bd871714c4..d479909ac05 100644 --- a/extensions/mattermost/src/mattermost/monitor.test.ts +++ b/extensions/mattermost/src/mattermost/monitor.test.ts @@ -109,6 +109,29 @@ describe("mattermost mention gating", () => { }); }); +describe("resolveMattermostReplyRootId with block streaming payloads", () => { + it("uses threadRootId for block-streamed payloads with replyToId", () => { + // When block streaming sends a payload with replyToId from the threading + // mode, the deliver callback should still use the existing threadRootId. + expect( + resolveMattermostReplyRootId({ + threadRootId: "thread-root-1", + replyToId: "streamed-reply-id", + }), + ).toBe("thread-root-1"); + }); + + it("falls back to payload replyToId when no threadRootId in block streaming", () => { + // Top-level channel message: no threadRootId, payload carries the + // inbound post id as replyToId from the "all" threading mode. + expect( + resolveMattermostReplyRootId({ + replyToId: "inbound-post-for-threading", + }), + ).toBe("inbound-post-for-threading"); + }); +}); + describe("resolveMattermostReplyRootId", () => { it("uses replyToId for top-level replies", () => { expect( diff --git a/extensions/mattermost/src/types.ts b/extensions/mattermost/src/types.ts index ba664baa894..86de9c1a714 100644 --- a/extensions/mattermost/src/types.ts +++ b/extensions/mattermost/src/types.ts @@ -52,6 +52,8 @@ export type MattermostAccountConfig = { blockStreaming?: boolean; /** Merge streamed block replies before sending. */ blockStreamingCoalesce?: BlockStreamingCoalesceConfig; + /** Control reply threading (off|first|all). Default: "all". */ + replyToMode?: "off" | "first" | "all"; /** Outbound response prefix override for this channel/account. */ responsePrefix?: string; /** Action toggles for this account. */ diff --git a/src/auto-reply/reply/agent-runner-payloads.test.ts b/src/auto-reply/reply/agent-runner-payloads.test.ts index 94088b2b5b8..26f23d7a42c 100644 --- a/src/auto-reply/reply/agent-runner-payloads.test.ts +++ b/src/auto-reply/reply/agent-runner-payloads.test.ts @@ -169,6 +169,50 @@ describe("buildReplyPayloads media filter integration", () => { expect(replyPayloads).toHaveLength(0); }); + it("drops all final payloads when block pipeline streamed successfully", async () => { + const pipeline: Parameters[0]["blockReplyPipeline"] = { + didStream: () => true, + isAborted: () => false, + hasSentPayload: () => false, + enqueue: () => {}, + flush: async () => {}, + stop: () => {}, + hasBuffered: () => false, + }; + // shouldDropFinalPayloads short-circuits to [] when the pipeline streamed + // without aborting, so hasSentPayload is never reached. + const { replyPayloads } = await buildReplyPayloads({ + ...baseParams, + blockStreamingEnabled: true, + blockReplyPipeline: pipeline, + replyToMode: "all", + payloads: [{ text: "response", replyToId: "post-123" }], + }); + + expect(replyPayloads).toHaveLength(0); + }); + + it("deduplicates final payloads against directly sent block keys regardless of replyToId", async () => { + // When block streaming is not active but directlySentBlockKeys has entries + // (e.g. from pre-tool flush), the key should match even if replyToId differs. + const { createBlockReplyContentKey } = await import("./block-reply-pipeline.js"); + const directlySentBlockKeys = new Set(); + directlySentBlockKeys.add( + createBlockReplyContentKey({ text: "response", replyToId: "post-1" }), + ); + + const { replyPayloads } = await buildReplyPayloads({ + ...baseParams, + blockStreamingEnabled: false, + blockReplyPipeline: null, + directlySentBlockKeys, + replyToMode: "off", + payloads: [{ text: "response" }], + }); + + expect(replyPayloads).toHaveLength(0); + }); + it("does not suppress same-target replies when accountId differs", async () => { const { replyPayloads } = await buildReplyPayloads({ ...baseParams, diff --git a/src/auto-reply/reply/agent-runner-payloads.ts b/src/auto-reply/reply/agent-runner-payloads.ts index 263dea9fd54..9e89c921407 100644 --- a/src/auto-reply/reply/agent-runner-payloads.ts +++ b/src/auto-reply/reply/agent-runner-payloads.ts @@ -5,7 +5,7 @@ import type { OriginatingChannelType } from "../templating.js"; import { SILENT_REPLY_TOKEN } from "../tokens.js"; import type { ReplyPayload } from "../types.js"; import { formatBunFetchSocketError, isBunFetchSocketError } from "./agent-runner-utils.js"; -import { createBlockReplyPayloadKey, type BlockReplyPipeline } from "./block-reply-pipeline.js"; +import { createBlockReplyContentKey, type BlockReplyPipeline } from "./block-reply-pipeline.js"; import { resolveOriginAccountId, resolveOriginMessageProvider, @@ -213,7 +213,7 @@ export async function buildReplyPayloads(params: { ) : params.directlySentBlockKeys?.size ? mediaFilteredPayloads.filter( - (payload) => !params.directlySentBlockKeys!.has(createBlockReplyPayloadKey(payload)), + (payload) => !params.directlySentBlockKeys!.has(createBlockReplyContentKey(payload)), ) : mediaFilteredPayloads; const replyPayloads = suppressMessagingToolReplies ? [] : filteredPayloads; diff --git a/src/auto-reply/reply/block-reply-pipeline.test.ts b/src/auto-reply/reply/block-reply-pipeline.test.ts new file mode 100644 index 00000000000..92564033df5 --- /dev/null +++ b/src/auto-reply/reply/block-reply-pipeline.test.ts @@ -0,0 +1,79 @@ +import { describe, expect, it } from "vitest"; +import { + createBlockReplyContentKey, + createBlockReplyPayloadKey, + createBlockReplyPipeline, +} from "./block-reply-pipeline.js"; + +describe("createBlockReplyPayloadKey", () => { + it("produces different keys for payloads differing only by replyToId", () => { + const a = createBlockReplyPayloadKey({ text: "hello world", replyToId: "post-1" }); + const b = createBlockReplyPayloadKey({ text: "hello world", replyToId: "post-2" }); + const c = createBlockReplyPayloadKey({ text: "hello world" }); + expect(a).not.toBe(b); + expect(a).not.toBe(c); + }); + + it("produces different keys for payloads with different text", () => { + const a = createBlockReplyPayloadKey({ text: "hello" }); + const b = createBlockReplyPayloadKey({ text: "world" }); + expect(a).not.toBe(b); + }); + + it("produces different keys for payloads with different media", () => { + const a = createBlockReplyPayloadKey({ text: "hello", mediaUrl: "file:///a.png" }); + const b = createBlockReplyPayloadKey({ text: "hello", mediaUrl: "file:///b.png" }); + expect(a).not.toBe(b); + }); + + it("trims whitespace from text for key comparison", () => { + const a = createBlockReplyPayloadKey({ text: " hello " }); + const b = createBlockReplyPayloadKey({ text: "hello" }); + expect(a).toBe(b); + }); +}); + +describe("createBlockReplyContentKey", () => { + it("produces the same key for payloads differing only by replyToId", () => { + const a = createBlockReplyContentKey({ text: "hello world", replyToId: "post-1" }); + const b = createBlockReplyContentKey({ text: "hello world", replyToId: "post-2" }); + const c = createBlockReplyContentKey({ text: "hello world" }); + expect(a).toBe(b); + expect(a).toBe(c); + }); +}); + +describe("createBlockReplyPipeline dedup with threading", () => { + it("keeps separate deliveries for same text with different replyToId", async () => { + const sent: Array<{ text?: string; replyToId?: string }> = []; + const pipeline = createBlockReplyPipeline({ + onBlockReply: async (payload) => { + sent.push({ text: payload.text, replyToId: payload.replyToId }); + }, + timeoutMs: 5000, + }); + + pipeline.enqueue({ text: "response text", replyToId: "thread-root-1" }); + pipeline.enqueue({ text: "response text", replyToId: undefined }); + await pipeline.flush(); + + expect(sent).toEqual([ + { text: "response text", replyToId: "thread-root-1" }, + { text: "response text", replyToId: undefined }, + ]); + }); + + it("hasSentPayload matches regardless of replyToId", async () => { + const pipeline = createBlockReplyPipeline({ + onBlockReply: async () => {}, + timeoutMs: 5000, + }); + + pipeline.enqueue({ text: "response text", replyToId: "thread-root-1" }); + await pipeline.flush(); + + // Final payload with no replyToId should be recognized as already sent + expect(pipeline.hasSentPayload({ text: "response text" })).toBe(true); + expect(pipeline.hasSentPayload({ text: "response text", replyToId: "other-id" })).toBe(true); + }); +}); diff --git a/src/auto-reply/reply/block-reply-pipeline.ts b/src/auto-reply/reply/block-reply-pipeline.ts index 752c70a1da2..9ce85334238 100644 --- a/src/auto-reply/reply/block-reply-pipeline.ts +++ b/src/auto-reply/reply/block-reply-pipeline.ts @@ -48,6 +48,19 @@ export function createBlockReplyPayloadKey(payload: ReplyPayload): string { }); } +export function createBlockReplyContentKey(payload: ReplyPayload): string { + const text = payload.text?.trim() ?? ""; + const mediaList = payload.mediaUrls?.length + ? payload.mediaUrls + : payload.mediaUrl + ? [payload.mediaUrl] + : []; + // Content-only key used for final-payload suppression after block streaming. + // This intentionally ignores replyToId so a streamed threaded payload and the + // later final payload still collapse when they carry the same content. + return JSON.stringify({ text, mediaList }); +} + const withTimeout = async ( promise: Promise, timeoutMs: number, @@ -80,6 +93,7 @@ export function createBlockReplyPipeline(params: { }): BlockReplyPipeline { const { onBlockReply, timeoutMs, coalescing, buffer } = params; const sentKeys = new Set(); + const sentContentKeys = new Set(); const pendingKeys = new Set(); const seenKeys = new Set(); const bufferedKeys = new Set(); @@ -95,6 +109,7 @@ export function createBlockReplyPipeline(params: { return; } const payloadKey = createBlockReplyPayloadKey(payload); + const contentKey = createBlockReplyContentKey(payload); if (!bypassSeenCheck) { if (seenKeys.has(payloadKey)) { return; @@ -130,6 +145,7 @@ export function createBlockReplyPipeline(params: { return; } sentKeys.add(payloadKey); + sentContentKeys.add(contentKey); didStream = true; }) .catch((err) => { @@ -238,8 +254,8 @@ export function createBlockReplyPipeline(params: { didStream: () => didStream, isAborted: () => aborted, hasSentPayload: (payload) => { - const payloadKey = createBlockReplyPayloadKey(payload); - return sentKeys.has(payloadKey); + const payloadKey = createBlockReplyContentKey(payload); + return sentContentKeys.has(payloadKey); }, }; } diff --git a/src/auto-reply/reply/reply-delivery.ts b/src/auto-reply/reply/reply-delivery.ts index acf04e73a3e..cacd6b083cb 100644 --- a/src/auto-reply/reply/reply-delivery.ts +++ b/src/auto-reply/reply/reply-delivery.ts @@ -2,7 +2,7 @@ import { logVerbose } from "../../globals.js"; import { SILENT_REPLY_TOKEN } from "../tokens.js"; import type { BlockReplyContext, ReplyPayload } from "../types.js"; import type { BlockReplyPipeline } from "./block-reply-pipeline.js"; -import { createBlockReplyPayloadKey } from "./block-reply-pipeline.js"; +import { createBlockReplyContentKey } from "./block-reply-pipeline.js"; import { parseReplyDirectives } from "./reply-directives.js"; import { applyReplyTagsToPayload, isRenderablePayload } from "./reply-payloads.js"; import type { TypingSignaler } from "./typing-mode.js"; @@ -128,7 +128,7 @@ export function createBlockReplyDeliveryHandler(params: { } else if (params.blockStreamingEnabled) { // Send directly when flushing before tool execution (no pipeline but streaming enabled). // Track sent key to avoid duplicate in final payloads. - params.directlySentBlockKeys.add(createBlockReplyPayloadKey(blockPayload)); + params.directlySentBlockKeys.add(createBlockReplyContentKey(blockPayload)); await params.onBlockReply(blockPayload); } // When streaming is disabled entirely, blocks are accumulated in final text instead.