fix(mattermost): prevent duplicate messages when block streaming + threading are active (#41362)

* fix(mattermost): prevent duplicate messages when block streaming + threading are active

Remove replyToId from createBlockReplyPayloadKey so identical content is
deduplicated regardless of threading target. Add explicit threading dock
to the Mattermost plugin with resolveReplyToMode reading from config
(default "all"), and add replyToMode to the Mattermost config schema.

Fixes #41219

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(mattermost): address PR review — per-account replyToMode and test clarity

Read replyToMode from the merged per-account config via
resolveMattermostAccount so account-level overrides are honored in
multi-account setups. Add replyToMode to MattermostAccountConfig type.
Rename misleading test to clarify it exercises shouldDropFinalPayloads
short-circuit, not payload key dedup.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Replies: keep block-pipeline reply targets distinct

* Tests: cover block reply target-aware dedupe

* Update CHANGELOG.md

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Vincent Koc <vincentkoc@ieee.org>
This commit is contained in:
Mathias Nagler
2026-03-12 08:15:17 +01:00
committed by GitHub
parent 241e8cc553
commit e8a162d3d8
10 changed files with 182 additions and 6 deletions

View File

@@ -20,6 +20,7 @@ Docs: https://docs.openclaw.ai
- TUI/chat log: reuse the active assistant message component for the same streaming run so `openclaw tui` no longer renders duplicate assistant replies. (#35364) Thanks @lisitan.
- macOS/Reminders: add the missing `NSRemindersUsageDescription` to the bundled app so `apple-reminders` can trigger the system permission prompt from OpenClaw.app. (#8559) Thanks @dinakars777.
- iMessage/self-chat echo dedupe: drop reflected duplicate copies only when a matching `is_from_me` event was just seen for the same chat, text, and `created_at`, preventing self-chat loops without broad text-only suppression. Related to #32166. (#38440) Thanks @vincentkoc.
- Mattermost/block streaming: fix duplicate message delivery (one threaded, one top-level) when block streaming is active by excluding `replyToId` from the block reply dedup key and adding an explicit `threading` dock to the Mattermost plugin. (#41362) Thanks @mathiasnagler and @vincentkoc.
- BlueBubbles/self-chat echo dedupe: drop reflected duplicate webhook copies only when a matching `fromMe` event was just seen for the same chat, body, and timestamp, preventing self-chat loops without broad webhook suppression. Related to #32166. (#38442) Thanks @vincentkoc.
## 2026.3.11

View File

@@ -270,6 +270,16 @@ export const mattermostPlugin: ChannelPlugin<ResolvedMattermostAccount> = {
streaming: {
blockStreamingCoalesceDefaults: { minChars: 1500, idleMs: 1000 },
},
threading: {
resolveReplyToMode: ({ cfg, accountId }) => {
const account = resolveMattermostAccount({ cfg, accountId: accountId ?? "default" });
const mode = account.config.replyToMode;
if (mode === "off" || mode === "first") {
return mode;
}
return "all";
},
},
reload: { configPrefixes: ["channels.mattermost"] },
configSchema: buildChannelConfigSchema(MattermostConfigSchema),
config: {

View File

@@ -43,6 +43,7 @@ const MattermostAccountSchemaBase = z
chunkMode: z.enum(["length", "newline"]).optional(),
blockStreaming: z.boolean().optional(),
blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(),
replyToMode: z.enum(["off", "first", "all"]).optional(),
responsePrefix: z.string().optional(),
actions: z
.object({

View File

@@ -109,6 +109,29 @@ describe("mattermost mention gating", () => {
});
});
describe("resolveMattermostReplyRootId with block streaming payloads", () => {
it("uses threadRootId for block-streamed payloads with replyToId", () => {
// When block streaming sends a payload with replyToId from the threading
// mode, the deliver callback should still use the existing threadRootId.
expect(
resolveMattermostReplyRootId({
threadRootId: "thread-root-1",
replyToId: "streamed-reply-id",
}),
).toBe("thread-root-1");
});
it("falls back to payload replyToId when no threadRootId in block streaming", () => {
// Top-level channel message: no threadRootId, payload carries the
// inbound post id as replyToId from the "all" threading mode.
expect(
resolveMattermostReplyRootId({
replyToId: "inbound-post-for-threading",
}),
).toBe("inbound-post-for-threading");
});
});
describe("resolveMattermostReplyRootId", () => {
it("uses replyToId for top-level replies", () => {
expect(

View File

@@ -52,6 +52,8 @@ export type MattermostAccountConfig = {
blockStreaming?: boolean;
/** Merge streamed block replies before sending. */
blockStreamingCoalesce?: BlockStreamingCoalesceConfig;
/** Control reply threading (off|first|all). Default: "all". */
replyToMode?: "off" | "first" | "all";
/** Outbound response prefix override for this channel/account. */
responsePrefix?: string;
/** Action toggles for this account. */

View File

@@ -169,6 +169,50 @@ describe("buildReplyPayloads media filter integration", () => {
expect(replyPayloads).toHaveLength(0);
});
it("drops all final payloads when block pipeline streamed successfully", async () => {
const pipeline: Parameters<typeof buildReplyPayloads>[0]["blockReplyPipeline"] = {
didStream: () => true,
isAborted: () => false,
hasSentPayload: () => false,
enqueue: () => {},
flush: async () => {},
stop: () => {},
hasBuffered: () => false,
};
// shouldDropFinalPayloads short-circuits to [] when the pipeline streamed
// without aborting, so hasSentPayload is never reached.
const { replyPayloads } = await buildReplyPayloads({
...baseParams,
blockStreamingEnabled: true,
blockReplyPipeline: pipeline,
replyToMode: "all",
payloads: [{ text: "response", replyToId: "post-123" }],
});
expect(replyPayloads).toHaveLength(0);
});
it("deduplicates final payloads against directly sent block keys regardless of replyToId", async () => {
// When block streaming is not active but directlySentBlockKeys has entries
// (e.g. from pre-tool flush), the key should match even if replyToId differs.
const { createBlockReplyContentKey } = await import("./block-reply-pipeline.js");
const directlySentBlockKeys = new Set<string>();
directlySentBlockKeys.add(
createBlockReplyContentKey({ text: "response", replyToId: "post-1" }),
);
const { replyPayloads } = await buildReplyPayloads({
...baseParams,
blockStreamingEnabled: false,
blockReplyPipeline: null,
directlySentBlockKeys,
replyToMode: "off",
payloads: [{ text: "response" }],
});
expect(replyPayloads).toHaveLength(0);
});
it("does not suppress same-target replies when accountId differs", async () => {
const { replyPayloads } = await buildReplyPayloads({
...baseParams,

View File

@@ -5,7 +5,7 @@ import type { OriginatingChannelType } from "../templating.js";
import { SILENT_REPLY_TOKEN } from "../tokens.js";
import type { ReplyPayload } from "../types.js";
import { formatBunFetchSocketError, isBunFetchSocketError } from "./agent-runner-utils.js";
import { createBlockReplyPayloadKey, type BlockReplyPipeline } from "./block-reply-pipeline.js";
import { createBlockReplyContentKey, type BlockReplyPipeline } from "./block-reply-pipeline.js";
import {
resolveOriginAccountId,
resolveOriginMessageProvider,
@@ -213,7 +213,7 @@ export async function buildReplyPayloads(params: {
)
: params.directlySentBlockKeys?.size
? mediaFilteredPayloads.filter(
(payload) => !params.directlySentBlockKeys!.has(createBlockReplyPayloadKey(payload)),
(payload) => !params.directlySentBlockKeys!.has(createBlockReplyContentKey(payload)),
)
: mediaFilteredPayloads;
const replyPayloads = suppressMessagingToolReplies ? [] : filteredPayloads;

View File

@@ -0,0 +1,79 @@
import { describe, expect, it } from "vitest";
import {
createBlockReplyContentKey,
createBlockReplyPayloadKey,
createBlockReplyPipeline,
} from "./block-reply-pipeline.js";
describe("createBlockReplyPayloadKey", () => {
it("produces different keys for payloads differing only by replyToId", () => {
const a = createBlockReplyPayloadKey({ text: "hello world", replyToId: "post-1" });
const b = createBlockReplyPayloadKey({ text: "hello world", replyToId: "post-2" });
const c = createBlockReplyPayloadKey({ text: "hello world" });
expect(a).not.toBe(b);
expect(a).not.toBe(c);
});
it("produces different keys for payloads with different text", () => {
const a = createBlockReplyPayloadKey({ text: "hello" });
const b = createBlockReplyPayloadKey({ text: "world" });
expect(a).not.toBe(b);
});
it("produces different keys for payloads with different media", () => {
const a = createBlockReplyPayloadKey({ text: "hello", mediaUrl: "file:///a.png" });
const b = createBlockReplyPayloadKey({ text: "hello", mediaUrl: "file:///b.png" });
expect(a).not.toBe(b);
});
it("trims whitespace from text for key comparison", () => {
const a = createBlockReplyPayloadKey({ text: " hello " });
const b = createBlockReplyPayloadKey({ text: "hello" });
expect(a).toBe(b);
});
});
describe("createBlockReplyContentKey", () => {
it("produces the same key for payloads differing only by replyToId", () => {
const a = createBlockReplyContentKey({ text: "hello world", replyToId: "post-1" });
const b = createBlockReplyContentKey({ text: "hello world", replyToId: "post-2" });
const c = createBlockReplyContentKey({ text: "hello world" });
expect(a).toBe(b);
expect(a).toBe(c);
});
});
describe("createBlockReplyPipeline dedup with threading", () => {
it("keeps separate deliveries for same text with different replyToId", async () => {
const sent: Array<{ text?: string; replyToId?: string }> = [];
const pipeline = createBlockReplyPipeline({
onBlockReply: async (payload) => {
sent.push({ text: payload.text, replyToId: payload.replyToId });
},
timeoutMs: 5000,
});
pipeline.enqueue({ text: "response text", replyToId: "thread-root-1" });
pipeline.enqueue({ text: "response text", replyToId: undefined });
await pipeline.flush();
expect(sent).toEqual([
{ text: "response text", replyToId: "thread-root-1" },
{ text: "response text", replyToId: undefined },
]);
});
it("hasSentPayload matches regardless of replyToId", async () => {
const pipeline = createBlockReplyPipeline({
onBlockReply: async () => {},
timeoutMs: 5000,
});
pipeline.enqueue({ text: "response text", replyToId: "thread-root-1" });
await pipeline.flush();
// Final payload with no replyToId should be recognized as already sent
expect(pipeline.hasSentPayload({ text: "response text" })).toBe(true);
expect(pipeline.hasSentPayload({ text: "response text", replyToId: "other-id" })).toBe(true);
});
});

View File

@@ -48,6 +48,19 @@ export function createBlockReplyPayloadKey(payload: ReplyPayload): string {
});
}
export function createBlockReplyContentKey(payload: ReplyPayload): string {
const text = payload.text?.trim() ?? "";
const mediaList = payload.mediaUrls?.length
? payload.mediaUrls
: payload.mediaUrl
? [payload.mediaUrl]
: [];
// Content-only key used for final-payload suppression after block streaming.
// This intentionally ignores replyToId so a streamed threaded payload and the
// later final payload still collapse when they carry the same content.
return JSON.stringify({ text, mediaList });
}
const withTimeout = async <T>(
promise: Promise<T>,
timeoutMs: number,
@@ -80,6 +93,7 @@ export function createBlockReplyPipeline(params: {
}): BlockReplyPipeline {
const { onBlockReply, timeoutMs, coalescing, buffer } = params;
const sentKeys = new Set<string>();
const sentContentKeys = new Set<string>();
const pendingKeys = new Set<string>();
const seenKeys = new Set<string>();
const bufferedKeys = new Set<string>();
@@ -95,6 +109,7 @@ export function createBlockReplyPipeline(params: {
return;
}
const payloadKey = createBlockReplyPayloadKey(payload);
const contentKey = createBlockReplyContentKey(payload);
if (!bypassSeenCheck) {
if (seenKeys.has(payloadKey)) {
return;
@@ -130,6 +145,7 @@ export function createBlockReplyPipeline(params: {
return;
}
sentKeys.add(payloadKey);
sentContentKeys.add(contentKey);
didStream = true;
})
.catch((err) => {
@@ -238,8 +254,8 @@ export function createBlockReplyPipeline(params: {
didStream: () => didStream,
isAborted: () => aborted,
hasSentPayload: (payload) => {
const payloadKey = createBlockReplyPayloadKey(payload);
return sentKeys.has(payloadKey);
const payloadKey = createBlockReplyContentKey(payload);
return sentContentKeys.has(payloadKey);
},
};
}

View File

@@ -2,7 +2,7 @@ import { logVerbose } from "../../globals.js";
import { SILENT_REPLY_TOKEN } from "../tokens.js";
import type { BlockReplyContext, ReplyPayload } from "../types.js";
import type { BlockReplyPipeline } from "./block-reply-pipeline.js";
import { createBlockReplyPayloadKey } from "./block-reply-pipeline.js";
import { createBlockReplyContentKey } from "./block-reply-pipeline.js";
import { parseReplyDirectives } from "./reply-directives.js";
import { applyReplyTagsToPayload, isRenderablePayload } from "./reply-payloads.js";
import type { TypingSignaler } from "./typing-mode.js";
@@ -128,7 +128,7 @@ export function createBlockReplyDeliveryHandler(params: {
} else if (params.blockStreamingEnabled) {
// Send directly when flushing before tool execution (no pipeline but streaming enabled).
// Track sent key to avoid duplicate in final payloads.
params.directlySentBlockKeys.add(createBlockReplyPayloadKey(blockPayload));
params.directlySentBlockKeys.add(createBlockReplyContentKey(blockPayload));
await params.onBlockReply(blockPayload);
}
// When streaming is disabled entirely, blocks are accumulated in final text instead.