From eb66def65604f7dffce25d3a07142bf8e2abfdd0 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 4 May 2026 01:35:14 +0100 Subject: [PATCH] fix: scope messaging tool final reply dedupe Co-authored-by: HCL --- CHANGELOG.md | 1 + .../src/app-server/dynamic-tools.test.ts | 2 + .../codex/src/app-server/dynamic-tools.ts | 5 +- ...enclaw-owned-tool-runtime-contract.test.ts | 2 + ...enclaw-owned-tool-runtime-contract.test.ts | 2 + ...ded-helpers.sanitizeuserfacingtext.test.ts | 7 + .../pi-embedded-helpers/messaging-dedupe.ts | 9 +- src/agents/pi-embedded-messaging.types.ts | 2 + ...-embedded-subscribe.handlers.tools.test.ts | 14 ++ .../pi-embedded-subscribe.handlers.tools.ts | 32 ++-- .../reply/agent-runner-payloads.test.ts | 142 +++++++++++++++++- src/auto-reply/reply/agent-runner-payloads.ts | 35 ++++- .../agent-runner.misc.runreplyagent.test.ts | 17 ++- .../reply/followup-delivery.test.ts | 98 +++++++++++- src/auto-reply/reply/followup-delivery.ts | 26 +++- src/auto-reply/reply/followup-runner.test.ts | 2 +- .../reply/reply-payloads-dedupe.runtime.ts | 2 +- src/auto-reply/reply/reply-payloads-dedupe.ts | 66 ++++++-- src/auto-reply/reply/reply-payloads.test.ts | 120 +++++++++++---- src/auto-reply/reply/reply-payloads.ts | 2 +- 20 files changed, 502 insertions(+), 84 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d488e1d5153..0b40a6aa18d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ Docs: https://docs.openclaw.ai - Gateway/restart: verify listener PIDs by argv when `lsof` reports only the Node process name, so stale gateway cleanup can find macOS `cnode` listeners. Fixes #70664. - Gateway/logging: expand leading `~` in `logging.file` before creating the file logger, preventing startup crash loops for home-relative log paths. Fixes #73587. - Channels/CLI: keep `openclaw channels list --json` usable when provider usage fetching fails, and report per-provider usage errors without aborting the channel list. Refs #67595. +- Agents/messaging: deliver distinct final commentary after same-target `message` tool sends while still deduping text/media already sent by the tool, so short closing remarks are no longer silently dropped. Fixes #76915. Thanks @hclsys. - Gateway/systemd: preserve operator-added secrets in the Gateway env file across re-stage while clearing OpenClaw-managed keys (such as `OPENCLAW_GATEWAY_TOKEN`) so a fresh staging value is never shadowed by a stale env-file copy; operator secrets are also retained when the state-dir `.env` is empty. Fixes #76860. Thanks @hclsys. - Plugin updates: do not short-circuit trusted official npm updates as unchanged when the default/latest spec still resolves to an already-installed prerelease that the installer should replace with a stable fallback. Thanks @vincentkoc. - Plugin tools: keep auth-unavailable optional tools hidden even when another default tool from the same plugin is available and `tools.alsoAllow` names the optional tool. Thanks @vincentkoc. diff --git a/extensions/codex/src/app-server/dynamic-tools.test.ts b/extensions/codex/src/app-server/dynamic-tools.test.ts index f15d21a96ff..7519d981932 100644 --- a/extensions/codex/src/app-server/dynamic-tools.test.ts +++ b/extensions/codex/src/app-server/dynamic-tools.test.ts @@ -179,6 +179,8 @@ describe("createCodexDynamicToolBridge", () => { provider: "telegram", to: "chat-1", threadId: "thread-ts-1", + text: "hello from Codex", + mediaUrls: ["/tmp/reply.png"], }, ], }); diff --git a/extensions/codex/src/app-server/dynamic-tools.ts b/extensions/codex/src/app-server/dynamic-tools.ts index 58953391fea..64a36d500a4 100644 --- a/extensions/codex/src/app-server/dynamic-tools.ts +++ b/extensions/codex/src/app-server/dynamic-tools.ts @@ -231,13 +231,16 @@ function collectToolTelemetry(params: { if (text) { params.telemetry.messagingToolSentTexts.push(text); } - params.telemetry.messagingToolSentMediaUrls.push(...collectMediaUrls(params.args)); + const mediaUrls = collectMediaUrls(params.args); + params.telemetry.messagingToolSentMediaUrls.push(...mediaUrls); params.telemetry.messagingToolSentTargets.push({ tool: params.toolName, provider: readFirstString(params.args, ["provider", "channel"]) ?? params.toolName, accountId: readFirstString(params.args, ["accountId", "account_id"]), to: readFirstString(params.args, ["to", "target", "recipient"]), threadId: readFirstString(params.args, ["threadId", "thread_id", "messageThreadId"]), + ...(text ? { text } : {}), + ...(mediaUrls.length > 0 ? { mediaUrls } : {}), }); } diff --git a/extensions/codex/src/app-server/openclaw-owned-tool-runtime-contract.test.ts b/extensions/codex/src/app-server/openclaw-owned-tool-runtime-contract.test.ts index b63fd13527b..ad9bcbc6379 100644 --- a/extensions/codex/src/app-server/openclaw-owned-tool-runtime-contract.test.ts +++ b/extensions/codex/src/app-server/openclaw-owned-tool-runtime-contract.test.ts @@ -308,6 +308,8 @@ describe("OpenClaw-owned tool runtime contract — Codex app-server adapter", () provider: "telegram", to: "chat-1", threadId: "thread-ts-1", + text: "hello from Codex", + mediaUrls: ["/tmp/codex-reply.png"], }, ], }); diff --git a/src/agents/openclaw-owned-tool-runtime-contract.test.ts b/src/agents/openclaw-owned-tool-runtime-contract.test.ts index 15b63344d83..ebd1dd5fe3d 100644 --- a/src/agents/openclaw-owned-tool-runtime-contract.test.ts +++ b/src/agents/openclaw-owned-tool-runtime-contract.test.ts @@ -291,6 +291,8 @@ describe("OpenClaw-owned tool runtime contract — Pi adapter", () => { tool: "message", provider: "telegram", to: "chat-1", + text: "hello from Pi", + mediaUrls: ["/tmp/pi-reply.png"], }), ]); await vi.waitFor(() => { diff --git a/src/agents/pi-embedded-helpers.sanitizeuserfacingtext.test.ts b/src/agents/pi-embedded-helpers.sanitizeuserfacingtext.test.ts index 8dfd83f7f4b..7ee36b3b7b0 100644 --- a/src/agents/pi-embedded-helpers.sanitizeuserfacingtext.test.ts +++ b/src/agents/pi-embedded-helpers.sanitizeuserfacingtext.test.ts @@ -798,6 +798,13 @@ describe("isMessagingToolDuplicate", () => { sentTexts: ['I sent the message: "Hello, this is a test message!"'], expected: true, }, + { + input: "v2ex hot topics delivered to telegram", + sentTexts: [ + "1. some article title\n2. another title\nv2ex hot topics delivered to telegram\n3. yet another", + ], + expected: false, + }, { input: "This is completely different content.", sentTexts: ["Hello, this is a test message!"], diff --git a/src/agents/pi-embedded-helpers/messaging-dedupe.ts b/src/agents/pi-embedded-helpers/messaging-dedupe.ts index 819e8eb24d7..6435c6e9fe9 100644 --- a/src/agents/pi-embedded-helpers/messaging-dedupe.ts +++ b/src/agents/pi-embedded-helpers/messaging-dedupe.ts @@ -1,6 +1,7 @@ import { normalizeLowercaseStringOrEmpty } from "../../shared/string-coerce.js"; const MIN_DUPLICATE_TEXT_LENGTH = 10; +const MIN_REVERSE_SUBSTRING_DUPLICATE_RATIO = 0.5; /** * Normalize text for duplicate comparison. @@ -30,7 +31,13 @@ export function isMessagingToolDuplicateNormalized( if (!normalizedSent || normalizedSent.length < MIN_DUPLICATE_TEXT_LENGTH) { return false; } - return normalized.includes(normalizedSent) || normalizedSent.includes(normalized); + if (normalized.includes(normalizedSent)) { + return true; + } + return ( + normalizedSent.includes(normalized) && + normalized.length >= normalizedSent.length * MIN_REVERSE_SUBSTRING_DUPLICATE_RATIO + ); }); } diff --git a/src/agents/pi-embedded-messaging.types.ts b/src/agents/pi-embedded-messaging.types.ts index 9040ef920fb..5cc64fdecd7 100644 --- a/src/agents/pi-embedded-messaging.types.ts +++ b/src/agents/pi-embedded-messaging.types.ts @@ -4,4 +4,6 @@ export type MessagingToolSend = { accountId?: string; to?: string; threadId?: string; + text?: string; + mediaUrls?: string[]; }; diff --git a/src/agents/pi-embedded-subscribe.handlers.tools.test.ts b/src/agents/pi-embedded-subscribe.handlers.tools.test.ts index 8d11b2d6946..d263d05b07c 100644 --- a/src/agents/pi-embedded-subscribe.handlers.tools.test.ts +++ b/src/agents/pi-embedded-subscribe.handlers.tools.test.ts @@ -847,6 +847,13 @@ describe("messaging tool media URL tracking", () => { await handleToolExecutionEnd(ctx, endEvt); expect(ctx.state.messagingToolSentMediaUrls).toContain("file:///img.jpg"); + expect(ctx.state.messagingToolSentTargets).toEqual([ + expect.objectContaining({ + to: "channel:123", + text: "hi", + mediaUrls: ["file:///img.jpg"], + }), + ]); expect(ctx.state.pendingMessagingMediaUrls.has("tool-m2")).toBe(false); }); @@ -883,6 +890,13 @@ describe("messaging tool media URL tracking", () => { "file:///img-a.jpg", "file:///img-b.jpg", ]); + expect(ctx.state.messagingToolSentTargets).toEqual([ + expect.objectContaining({ + to: "channel:123", + text: "hi", + mediaUrls: ["file:///img-a.jpg", "file:///img-b.jpg"], + }), + ]); }); it("trims messagingToolSentMediaUrls to 200 on commit (FIFO)", async () => { diff --git a/src/agents/pi-embedded-subscribe.handlers.tools.ts b/src/agents/pi-embedded-subscribe.handlers.tools.ts index 8ef520fe41c..ecc1445f287 100644 --- a/src/agents/pi-embedded-subscribe.handlers.tools.ts +++ b/src/agents/pi-embedded-subscribe.handlers.tools.ts @@ -867,9 +867,21 @@ export async function handleToolExecutionEnd( }); } - // Commit messaging tool text on success, discard on error. + // Commit messaging tool evidence on success, discard on error. const pendingText = ctx.state.pendingMessagingTexts.get(toolCallId); const pendingTarget = ctx.state.pendingMessagingTargets.get(toolCallId); + const pendingMediaUrls = ctx.state.pendingMessagingMediaUrls.get(toolCallId) ?? []; + const startArgs = + startData?.args && typeof startData.args === "object" + ? (startData.args as Record) + : {}; + const isMessagingSend = + pendingMediaUrls.length > 0 || + (isMessagingTool(toolName) && isMessagingToolSendAction(toolName, startArgs)); + const committedMediaUrls = + !isToolError && isMessagingSend + ? [...pendingMediaUrls, ...collectMessagingMediaUrlsFromToolResult(result)] + : []; if (pendingText) { ctx.state.pendingMessagingTexts.delete(toolCallId); if (!isToolError) { @@ -882,24 +894,16 @@ export async function handleToolExecutionEnd( if (pendingTarget) { ctx.state.pendingMessagingTargets.delete(toolCallId); if (!isToolError) { - ctx.state.messagingToolSentTargets.push(pendingTarget); + ctx.state.messagingToolSentTargets.push({ + ...pendingTarget, + ...(pendingText ? { text: pendingText } : {}), + ...(committedMediaUrls.length > 0 ? { mediaUrls: committedMediaUrls.slice() } : {}), + }); ctx.trimMessagingToolSent(); } } - const pendingMediaUrls = ctx.state.pendingMessagingMediaUrls.get(toolCallId) ?? []; ctx.state.pendingMessagingMediaUrls.delete(toolCallId); - const startArgs = - startData?.args && typeof startData.args === "object" - ? (startData.args as Record) - : {}; - const isMessagingSend = - pendingMediaUrls.length > 0 || - (isMessagingTool(toolName) && isMessagingToolSendAction(toolName, startArgs)); if (!isToolError && isMessagingSend) { - const committedMediaUrls = [ - ...pendingMediaUrls, - ...collectMessagingMediaUrlsFromToolResult(result), - ]; if (committedMediaUrls.length > 0) { ctx.state.messagingToolSentMediaUrls.push(...committedMediaUrls); ctx.trimMessagingToolSent(); diff --git a/src/auto-reply/reply/agent-runner-payloads.test.ts b/src/auto-reply/reply/agent-runner-payloads.test.ts index fd98c5a70ad..2ca5c1f99c0 100644 --- a/src/auto-reply/reply/agent-runner-payloads.test.ts +++ b/src/auto-reply/reply/agent-runner-payloads.test.ts @@ -11,7 +11,7 @@ const baseParams = { replyToMode: "off" as const, }; -async function expectSameTargetRepliesSuppressed(params: { provider: string; to: string }) { +async function expectSameTargetRepliesDelivered(params: { provider: string; to: string }) { const { replyPayloads } = await buildReplyPayloads({ ...baseParams, payloads: [{ text: "hello world!" }], @@ -22,7 +22,8 @@ async function expectSameTargetRepliesSuppressed(params: { provider: string; to: messagingToolSentTargets: [{ tool: "message", provider: params.provider, to: params.to }], }); - expect(replyPayloads).toHaveLength(0); + expect(replyPayloads).toHaveLength(1); + expect(replyPayloads[0]?.text).toBe("hello world!"); } describe("buildReplyPayloads media filter integration", () => { @@ -177,7 +178,93 @@ describe("buildReplyPayloads media filter integration", () => { expect(replyPayloads[0]?.mediaUrl).toBe("file:///tmp/photo.jpg"); }); - it("suppresses same-target replies when messageProvider is synthetic but originatingChannel is set", async () => { + it("dedupes final text only against message-tool text sent to the same route", async () => { + const { replyPayloads } = await buildReplyPayloads({ + ...baseParams, + payloads: [{ text: "discord-only text" }], + messageProvider: "slack", + originatingTo: "channel:C1", + messagingToolSentTexts: ["slack text", "discord-only text"], + messagingToolSentTargets: [ + { tool: "slack", provider: "slack", to: "channel:C1", text: "slack text" }, + { + tool: "discord", + provider: "discord", + to: "channel:C2", + text: "discord-only text", + }, + ], + }); + + expect(replyPayloads).toHaveLength(1); + expect(replyPayloads[0]?.text).toBe("discord-only text"); + }); + + it("falls back to global text dedupe for legacy multi-target messaging telemetry", async () => { + const { replyPayloads } = await buildReplyPayloads({ + ...baseParams, + payloads: [{ text: "hello world!" }], + messageProvider: "slack", + originatingTo: "channel:C1", + messagingToolSentTexts: ["hello world!"], + messagingToolSentTargets: [ + { tool: "slack", provider: "slack", to: "channel:C1" }, + { tool: "discord", provider: "discord", to: "channel:C2" }, + ], + }); + + expect(replyPayloads).toHaveLength(0); + }); + + it("dedupes final media only against message-tool media sent to the same route", async () => { + const { replyPayloads } = await buildReplyPayloads({ + ...baseParams, + payloads: [{ text: "photo", mediaUrl: "file:///tmp/discord-photo.jpg" }], + messageProvider: "slack", + originatingTo: "channel:C1", + messagingToolSentMediaUrls: ["file:///tmp/slack-photo.jpg", "file:///tmp/discord-photo.jpg"], + messagingToolSentTargets: [ + { + tool: "slack", + provider: "slack", + to: "channel:C1", + mediaUrls: ["file:///tmp/slack-photo.jpg"], + }, + { + tool: "discord", + provider: "discord", + to: "channel:C2", + mediaUrls: ["file:///tmp/discord-photo.jpg"], + }, + ], + }); + + expect(replyPayloads).toHaveLength(1); + expect(replyPayloads[0]?.mediaUrl).toBe("file:///tmp/discord-photo.jpg"); + }); + + it("falls back to global media dedupe for legacy multi-target messaging telemetry", async () => { + const { replyPayloads } = await buildReplyPayloads({ + ...baseParams, + payloads: [{ text: "photo", mediaUrl: "file:///tmp/photo.jpg" }], + messageProvider: "slack", + originatingTo: "channel:C1", + messagingToolSentMediaUrls: ["file:///tmp/photo.jpg"], + messagingToolSentTargets: [ + { tool: "slack", provider: "slack", to: "channel:C1" }, + { tool: "discord", provider: "discord", to: "channel:C2" }, + ], + }); + + expect(replyPayloads).toHaveLength(1); + expect(replyPayloads[0]).toMatchObject({ + text: "photo", + mediaUrl: undefined, + mediaUrls: undefined, + }); + }); + + it("delivers distinct same-target replies when messageProvider is synthetic but originatingChannel is set", async () => { const { replyPayloads } = await buildReplyPayloads({ ...baseParams, payloads: [{ text: "hello world!" }], @@ -188,14 +275,15 @@ describe("buildReplyPayloads media filter integration", () => { messagingToolSentTargets: [{ tool: "telegram", provider: "telegram", to: "268300329" }], }); - expect(replyPayloads).toHaveLength(0); + expect(replyPayloads).toHaveLength(1); + expect(replyPayloads[0]?.text).toBe("hello world!"); }); - it("suppresses same-target replies when message tool target provider is generic", async () => { - await expectSameTargetRepliesSuppressed({ provider: "message", to: "ou_abc123" }); + it("delivers distinct same-target replies when message tool target provider is generic", async () => { + await expectSameTargetRepliesDelivered({ provider: "message", to: "ou_abc123" }); }); - it("suppresses same-target replies when target provider is channel alias", async () => { + it("delivers distinct same-target replies when target provider is channel alias", async () => { resetPluginRuntimeStateForTest(); setActivePluginRegistry( createTestRegistry([ @@ -218,7 +306,45 @@ describe("buildReplyPayloads media filter integration", () => { }, ]), ); - await expectSameTargetRepliesSuppressed({ provider: "lark", to: "ou_abc123" }); + await expectSameTargetRepliesDelivered({ provider: "lark", to: "ou_abc123" }); + }); + + it("dedupes duplicate same-target reply text without suppressing unrelated finals", async () => { + const { replyPayloads } = await buildReplyPayloads({ + ...baseParams, + payloads: [{ text: "hello world!" }], + messageProvider: "telegram", + originatingTo: "268300329", + messagingToolSentTexts: ["hello world!"], + messagingToolSentTargets: [ + { tool: "telegram", provider: "telegram", to: "268300329", text: "hello world!" }, + ], + }); + + expect(replyPayloads).toHaveLength(0); + }); + + it("does not dedupe short commentary that appears inside a longer same-target message", async () => { + const { replyPayloads } = await buildReplyPayloads({ + ...baseParams, + payloads: [{ text: "v2ex hot topics delivered to telegram" }], + messageProvider: "telegram", + originatingTo: "268300329", + messagingToolSentTexts: [ + "1. some article title\n2. another title\nv2ex hot topics delivered to telegram\n3. yet another", + ], + messagingToolSentTargets: [ + { + tool: "telegram", + provider: "telegram", + to: "268300329", + text: "1. some article title\n2. another title\nv2ex hot topics delivered to telegram\n3. yet another", + }, + ], + }); + + expect(replyPayloads).toHaveLength(1); + expect(replyPayloads[0]?.text).toBe("v2ex hot topics delivered to telegram"); }); it("strips media already sent by the block pipeline after normalizing both paths", async () => { diff --git a/src/auto-reply/reply/agent-runner-payloads.ts b/src/auto-reply/reply/agent-runner-payloads.ts index eb929b77d6f..43ac234731e 100644 --- a/src/auto-reply/reply/agent-runner-payloads.ts +++ b/src/auto-reply/reply/agent-runner-payloads.ts @@ -216,15 +216,38 @@ export async function buildReplyPayloads(params: { }), }) ?? { shouldDedupePayloads: shouldCheckMessagingToolDedupe && messagingToolSentTargets.length === 0, - suppressReplies: false, + matchingRoute: false, + routeSentTexts: [], + routeSentMediaUrls: [], + useGlobalSentTextEvidenceFallback: false, + useGlobalSentMediaUrlEvidenceFallback: false, }; const dedupeMessagingToolPayloads = messagingToolPayloadDedupe.shouldDedupePayloads; + const sentMediaUrlFallback = params.messagingToolSentMediaUrls ?? []; + const shouldUseGlobalSentMediaUrlEvidence = + messagingToolPayloadDedupe.matchingRoute && + messagingToolPayloadDedupe.routeSentMediaUrls.length === 0 && + messagingToolPayloadDedupe.useGlobalSentMediaUrlEvidenceFallback; + const shouldUseGlobalSentTextEvidence = + messagingToolPayloadDedupe.matchingRoute && + messagingToolPayloadDedupe.routeSentTexts.length === 0 && + messagingToolPayloadDedupe.useGlobalSentTextEvidenceFallback; + const sentMediaUrlsForDedupe = messagingToolPayloadDedupe.matchingRoute + ? shouldUseGlobalSentMediaUrlEvidence + ? sentMediaUrlFallback + : messagingToolPayloadDedupe.routeSentMediaUrls + : sentMediaUrlFallback; + const sentTextsForDedupe = messagingToolPayloadDedupe.matchingRoute + ? shouldUseGlobalSentTextEvidence + ? messagingToolSentTexts + : messagingToolPayloadDedupe.routeSentTexts + : messagingToolSentTexts; const messagingToolSentMediaUrls = dedupeMessagingToolPayloads ? await normalizeSentMediaUrlsForDedupe({ - sentMediaUrls: params.messagingToolSentMediaUrls ?? [], + sentMediaUrls: sentMediaUrlsForDedupe, normalizeMediaPaths: params.normalizeMediaPaths, }) - : (params.messagingToolSentMediaUrls ?? []); + : sentMediaUrlsForDedupe; const mediaFilteredPayloads = dedupeMessagingToolPayloads ? ( dedupeRuntime ?? (await loadReplyPayloadsDedupeRuntime()) @@ -236,7 +259,7 @@ export async function buildReplyPayloads(params: { const dedupedPayloads = dedupeMessagingToolPayloads ? (dedupeRuntime ?? (await loadReplyPayloadsDedupeRuntime())).filterMessagingToolDuplicates({ payloads: mediaFilteredPayloads, - sentTexts: messagingToolSentTexts, + sentTexts: sentTextsForDedupe, }) : mediaFilteredPayloads; const isDirectlySentBlockPayload = (payload: ReplyPayload) => @@ -295,9 +318,7 @@ export async function buildReplyPayloads(params: { sentMediaUrls: blockSentMediaUrls, }) : contentSuppressedPayloads; - const replyPayloads = messagingToolPayloadDedupe.suppressReplies - ? [] - : filteredPayloads.filter(isRenderablePayload); + const replyPayloads = filteredPayloads.filter(isRenderablePayload); return { replyPayloads, diff --git a/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts b/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts index 77a87282ccb..3fce0ad29a7 100644 --- a/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts +++ b/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts @@ -1718,7 +1718,7 @@ describe("runReplyAgent claude-cli routing", () => { }); }); -describe("runReplyAgent messaging tool suppression", () => { +describe("runReplyAgent messaging tool dedupe", () => { function createRun( messageProvider = "slack", opts: { storePath?: string; sessionKey?: string } = {}, @@ -1782,7 +1782,7 @@ describe("runReplyAgent messaging tool suppression", () => { }); } - it("drops replies when a messaging tool sent via the same provider + target", async () => { + it("delivers distinct replies when a messaging tool sent via the same provider + target", async () => { runEmbeddedPiAgentMock.mockResolvedValueOnce({ payloads: [{ text: "hello world!" }], messagingToolSentTexts: ["different message"], @@ -1792,6 +1792,19 @@ describe("runReplyAgent messaging tool suppression", () => { const result = await createRun("slack"); + expect(result).toMatchObject({ text: "hello world!" }); + }); + + it("drops duplicate replies when a messaging tool sent the same text via the same provider + target", async () => { + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + messagingToolSentTexts: ["hello world!"], + messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }], + meta: {}, + }); + + const result = await createRun("slack"); + expect(result).toBeUndefined(); }); diff --git a/src/auto-reply/reply/followup-delivery.test.ts b/src/auto-reply/reply/followup-delivery.test.ts index df61cb10054..59cf07f42a9 100644 --- a/src/auto-reply/reply/followup-delivery.test.ts +++ b/src/auto-reply/reply/followup-delivery.test.ts @@ -69,7 +69,86 @@ describe("resolveFollowupDeliveryPayloads", () => { ).toEqual([{ text: "photo", mediaUrl: "file:///tmp/photo.jpg" }]); }); - it("suppresses replies when a messaging tool already sent to the same provider and target", () => { + it("dedupes final text only against message-tool text sent to the same route", () => { + expect( + resolveFollowupDeliveryPayloads({ + cfg: baseConfig, + payloads: [{ text: "discord-only text" }], + messageProvider: "slack", + originatingTo: "channel:C1", + sentTexts: ["slack text", "discord-only text"], + sentTargets: [ + { tool: "slack", provider: "slack", to: "channel:C1", text: "slack text" }, + { + tool: "discord", + provider: "discord", + to: "channel:C2", + text: "discord-only text", + }, + ], + }), + ).toEqual([{ text: "discord-only text" }]); + }); + + it("falls back to global text dedupe for legacy multi-target messaging telemetry", () => { + expect( + resolveFollowupDeliveryPayloads({ + cfg: baseConfig, + payloads: [{ text: "hello world!" }], + messageProvider: "slack", + originatingTo: "channel:C1", + sentTexts: ["hello world!"], + sentTargets: [ + { tool: "slack", provider: "slack", to: "channel:C1" }, + { tool: "discord", provider: "discord", to: "channel:C2" }, + ], + }), + ).toEqual([]); + }); + + it("dedupes final media only against message-tool media sent to the same route", () => { + expect( + resolveFollowupDeliveryPayloads({ + cfg: baseConfig, + payloads: [{ text: "photo", mediaUrl: "file:///tmp/discord-photo.jpg" }], + messageProvider: "slack", + originatingTo: "channel:C1", + sentMediaUrls: ["file:///tmp/slack-photo.jpg", "file:///tmp/discord-photo.jpg"], + sentTargets: [ + { + tool: "slack", + provider: "slack", + to: "channel:C1", + mediaUrls: ["file:///tmp/slack-photo.jpg"], + }, + { + tool: "discord", + provider: "discord", + to: "channel:C2", + mediaUrls: ["file:///tmp/discord-photo.jpg"], + }, + ], + }), + ).toEqual([{ text: "photo", mediaUrl: "file:///tmp/discord-photo.jpg" }]); + }); + + it("falls back to global media dedupe for legacy multi-target messaging telemetry", () => { + expect( + resolveFollowupDeliveryPayloads({ + cfg: baseConfig, + payloads: [{ text: "photo", mediaUrl: "file:///tmp/photo.jpg" }], + messageProvider: "slack", + originatingTo: "channel:C1", + sentMediaUrls: ["file:///tmp/photo.jpg"], + sentTargets: [ + { tool: "slack", provider: "slack", to: "channel:C1" }, + { tool: "discord", provider: "discord", to: "channel:C2" }, + ], + }), + ).toEqual([{ text: "photo", mediaUrl: undefined, mediaUrls: undefined }]); + }); + + it("delivers distinct replies when a messaging tool already sent to the same provider and target", () => { expect( resolveFollowupDeliveryPayloads({ cfg: baseConfig, @@ -78,10 +157,23 @@ describe("resolveFollowupDeliveryPayloads", () => { originatingTo: "channel:C1", sentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }], }), + ).toEqual([{ text: "hello world!" }]); + }); + + it("dedupes duplicate replies when a messaging tool already sent to the same provider and target", () => { + expect( + resolveFollowupDeliveryPayloads({ + cfg: baseConfig, + payloads: [{ text: "hello world!" }], + messageProvider: "slack", + originatingTo: "channel:C1", + sentTexts: ["hello world!"], + sentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1", text: "hello world!" }], + }), ).toEqual([]); }); - it("suppresses replies when originating channel resolves the provider", () => { + it("delivers distinct replies when originating channel resolves the provider", () => { expect( resolveFollowupDeliveryPayloads({ cfg: baseConfig, @@ -91,6 +183,6 @@ describe("resolveFollowupDeliveryPayloads", () => { originatingTo: "268300329", sentTargets: [{ tool: "telegram", provider: "telegram", to: "268300329" }], }), - ).toEqual([]); + ).toEqual([{ text: "hello world!" }]); }); }); diff --git a/src/auto-reply/reply/followup-delivery.ts b/src/auto-reply/reply/followup-delivery.ts index 3b09c8e8027..1fde4532aa4 100644 --- a/src/auto-reply/reply/followup-delivery.ts +++ b/src/auto-reply/reply/followup-delivery.ts @@ -73,17 +73,37 @@ export function resolveFollowupDeliveryPayloads(params: { originatingAccountId: params.originatingAccountId, }), }); + const sentMediaUrlFallback = params.sentMediaUrls ?? []; + const sentTextFallback = params.sentTexts ?? []; + const shouldUseGlobalSentMediaUrlEvidence = + messagingToolPayloadDedupe.matchingRoute && + messagingToolPayloadDedupe.routeSentMediaUrls.length === 0 && + messagingToolPayloadDedupe.useGlobalSentMediaUrlEvidenceFallback; + const shouldUseGlobalSentTextEvidence = + messagingToolPayloadDedupe.matchingRoute && + messagingToolPayloadDedupe.routeSentTexts.length === 0 && + messagingToolPayloadDedupe.useGlobalSentTextEvidenceFallback; + const sentMediaUrlsForDedupe = messagingToolPayloadDedupe.matchingRoute + ? shouldUseGlobalSentMediaUrlEvidence + ? sentMediaUrlFallback + : messagingToolPayloadDedupe.routeSentMediaUrls + : sentMediaUrlFallback; + const sentTextsForDedupe = messagingToolPayloadDedupe.matchingRoute + ? shouldUseGlobalSentTextEvidence + ? sentTextFallback + : messagingToolPayloadDedupe.routeSentTexts + : sentTextFallback; const mediaFilteredPayloads = messagingToolPayloadDedupe.shouldDedupePayloads ? filterMessagingToolMediaDuplicates({ payloads: replyTaggedPayloads, - sentMediaUrls: params.sentMediaUrls ?? [], + sentMediaUrls: sentMediaUrlsForDedupe, }) : replyTaggedPayloads; const dedupedPayloads = messagingToolPayloadDedupe.shouldDedupePayloads ? filterMessagingToolDuplicates({ payloads: mediaFilteredPayloads, - sentTexts: params.sentTexts ?? [], + sentTexts: sentTextsForDedupe, }) : mediaFilteredPayloads; - return messagingToolPayloadDedupe.suppressReplies ? [] : dedupedPayloads; + return dedupedPayloads; } diff --git a/src/auto-reply/reply/followup-runner.test.ts b/src/auto-reply/reply/followup-runner.test.ts index ed56582bc8f..7f77a3d48f4 100644 --- a/src/auto-reply/reply/followup-runner.test.ts +++ b/src/auto-reply/reply/followup-runner.test.ts @@ -1145,7 +1145,7 @@ describe("createFollowupRunner messaging delivery and dedupe", () => { const { onBlockReply } = await runMessagingCase({ agentResult: { - ...makeTextReplyDedupeResult(), + ...makeTextReplyDedupeResult({ messagingToolSentTexts: ["hello world!"] }), messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }], meta: { agentMeta: { diff --git a/src/auto-reply/reply/reply-payloads-dedupe.runtime.ts b/src/auto-reply/reply/reply-payloads-dedupe.runtime.ts index 0ed11bde7ab..809c6b1862d 100644 --- a/src/auto-reply/reply/reply-payloads-dedupe.runtime.ts +++ b/src/auto-reply/reply/reply-payloads-dedupe.runtime.ts @@ -2,6 +2,6 @@ export { filterMessagingToolDuplicates, filterMessagingToolMediaDuplicates, resolveMessagingToolPayloadDedupe, - shouldSuppressMessagingToolReplies, + shouldDedupeMessagingToolRepliesForRoute, type MessagingToolPayloadDedupeDecision, } from "./reply-payloads-dedupe.js"; diff --git a/src/auto-reply/reply/reply-payloads-dedupe.ts b/src/auto-reply/reply/reply-payloads-dedupe.ts index d1f59b2ff0d..66cb4d36a63 100644 --- a/src/auto-reply/reply/reply-payloads-dedupe.ts +++ b/src/auto-reply/reply/reply-payloads-dedupe.ts @@ -106,17 +106,17 @@ function resolveTargetProviderForComparison(params: { return targetProvider; } -type SuppressionRouteTarget = ChannelRouteTargetInput & { +type MessagingToolDedupeRouteTarget = ChannelRouteTargetInput & { channel: string; to: string; }; -function normalizeRouteTargetForSuppression(params: { +function normalizeRouteTargetForDedupe(params: { provider: string; rawTarget?: string; accountId?: string; threadId?: string; -}): SuppressionRouteTarget | null { +}): MessagingToolDedupeRouteTarget | null { const to = normalizeTargetForProvider(params.provider, params.rawTarget); if (!to) { return null; @@ -129,7 +129,7 @@ function normalizeRouteTargetForSuppression(params: { }; } -function targetsMatchForSuppression(params: { +function targetsMatchForDedupe(params: { provider: string; originTarget: string; targetKey: string; @@ -146,23 +146,32 @@ function targetsMatchForSuppression(params: { return params.targetKey === params.originTarget; } -export function shouldSuppressMessagingToolReplies(params: { +export function shouldDedupeMessagingToolRepliesForRoute(params: { messageProvider?: string; messagingToolSentTargets?: MessagingToolSend[]; originatingTo?: string; accountId?: string; }): boolean { + return getMatchingMessagingToolReplyTargets(params).length > 0; +} + +export function getMatchingMessagingToolReplyTargets(params: { + messageProvider?: string; + messagingToolSentTargets?: MessagingToolSend[]; + originatingTo?: string; + accountId?: string; +}): MessagingToolSend[] { const provider = normalizeProviderForComparison(params.messageProvider); if (!provider) { - return false; + return []; } const originRawTarget = normalizeOptionalString(params.originatingTo); const originAccount = normalizeOptionalAccountId(params.accountId); const sentTargets = params.messagingToolSentTargets ?? []; if (sentTargets.length === 0) { - return false; + return []; } - return sentTargets.some((target) => { + return sentTargets.filter((target) => { const targetProvider = resolveTargetProviderForComparison({ currentProvider: provider, targetProvider: target?.provider, @@ -176,7 +185,7 @@ export function shouldSuppressMessagingToolReplies(params: { } const targetRaw = normalizeOptionalString(target.to); const routeAccount = originAccount ?? targetAccount; - const originRoute = normalizeRouteTargetForSuppression({ + const originRoute = normalizeRouteTargetForDedupe({ provider, rawTarget: originRawTarget, accountId: routeAccount, @@ -184,7 +193,7 @@ export function shouldSuppressMessagingToolReplies(params: { if (!originRoute) { return false; } - const targetRoute = normalizeRouteTargetForSuppression({ + const targetRoute = normalizeRouteTargetForDedupe({ provider: targetProvider, rawTarget: targetRaw, accountId: routeAccount, @@ -196,7 +205,7 @@ export function shouldSuppressMessagingToolReplies(params: { if (channelRouteTargetsMatchExact({ left: originRoute, right: targetRoute })) { return true; } - return targetsMatchForSuppression({ + return targetsMatchForDedupe({ provider, originTarget: originRoute.to, targetKey: targetRoute.to, @@ -207,7 +216,11 @@ export function shouldSuppressMessagingToolReplies(params: { export type MessagingToolPayloadDedupeDecision = { shouldDedupePayloads: boolean; - suppressReplies: boolean; + matchingRoute: boolean; + routeSentTexts: string[]; + routeSentMediaUrls: string[]; + useGlobalSentTextEvidenceFallback: boolean; + useGlobalSentMediaUrlEvidenceFallback: boolean; }; export function resolveMessagingToolPayloadDedupe(params: { @@ -217,15 +230,38 @@ export function resolveMessagingToolPayloadDedupe(params: { accountId?: string; }): MessagingToolPayloadDedupeDecision { const sentTargets = params.messagingToolSentTargets ?? []; - const suppressReplies = shouldSuppressMessagingToolReplies({ + const matchingTargets = getMatchingMessagingToolReplyTargets({ messageProvider: params.messageProvider, messagingToolSentTargets: sentTargets, originatingTo: params.originatingTo, accountId: params.accountId, }); + const matchingRoute = matchingTargets.length > 0; + const routeSentTexts = matchingTargets.flatMap((target) => + typeof target.text === "string" && target.text.trim() ? [target.text] : [], + ); + const routeSentMediaUrls = matchingTargets.flatMap((target) => + Array.isArray(target.mediaUrls) + ? target.mediaUrls.filter( + (url): url is string => typeof url === "string" && Boolean(url.trim()), + ) + : [], + ); + const hasTargetTextEvidence = sentTargets.some( + (target) => typeof target.text === "string" && Boolean(target.text.trim()), + ); + const hasTargetMediaUrlEvidence = sentTargets.some( + (target) => + Array.isArray(target.mediaUrls) && + target.mediaUrls.some((url) => typeof url === "string" && Boolean(url.trim())), + ); return { - shouldDedupePayloads: suppressReplies || sentTargets.length === 0, - suppressReplies, + shouldDedupePayloads: matchingRoute || sentTargets.length === 0, + matchingRoute, + routeSentTexts, + routeSentMediaUrls, + useGlobalSentTextEvidenceFallback: matchingRoute && !hasTargetTextEvidence, + useGlobalSentMediaUrlEvidenceFallback: matchingRoute && !hasTargetMediaUrlEvidence, }; } diff --git a/src/auto-reply/reply/reply-payloads.test.ts b/src/auto-reply/reply/reply-payloads.test.ts index 0fb2cd9dd0a..1a7cbe257c2 100644 --- a/src/auto-reply/reply/reply-payloads.test.ts +++ b/src/auto-reply/reply/reply-payloads.test.ts @@ -4,7 +4,7 @@ import { createOutboundTestPlugin, createTestRegistry } from "../../test-utils/c import { filterMessagingToolMediaDuplicates, resolveMessagingToolPayloadDedupe, - shouldSuppressMessagingToolReplies, + shouldDedupeMessagingToolRepliesForRoute, } from "./reply-payloads.js"; function targetsMatchTelegramReplySuppression(params: { @@ -110,7 +110,7 @@ describe("filterMessagingToolMediaDuplicates", () => { }); }); -describe("shouldSuppressMessagingToolReplies", () => { +describe("shouldDedupeMessagingToolRepliesForRoute", () => { const installTelegramSuppressionRegistry = () => { resetPluginRuntimeStateForTest(); setActivePluginRegistry( @@ -130,9 +130,9 @@ describe("shouldSuppressMessagingToolReplies", () => { ); }; - it("suppresses when target provider is missing but target matches current provider route", () => { + it("matches when target provider is missing but target matches current provider route", () => { expect( - shouldSuppressMessagingToolReplies({ + shouldDedupeMessagingToolRepliesForRoute({ messageProvider: "telegram", originatingTo: "123", messagingToolSentTargets: [{ tool: "message", provider: "", to: "123" }], @@ -140,9 +140,9 @@ describe("shouldSuppressMessagingToolReplies", () => { ).toBe(true); }); - it('suppresses when target provider uses "message" placeholder and target matches', () => { + it('matches when target provider uses "message" placeholder and target matches', () => { expect( - shouldSuppressMessagingToolReplies({ + shouldDedupeMessagingToolRepliesForRoute({ messageProvider: "telegram", originatingTo: "123", messagingToolSentTargets: [{ tool: "message", provider: "message", to: "123" }], @@ -150,9 +150,9 @@ describe("shouldSuppressMessagingToolReplies", () => { ).toBe(true); }); - it("does not suppress when providerless target does not match origin route", () => { + it("does not match when providerless target does not match origin route", () => { expect( - shouldSuppressMessagingToolReplies({ + shouldDedupeMessagingToolRepliesForRoute({ messageProvider: "telegram", originatingTo: "123", messagingToolSentTargets: [{ tool: "message", provider: "", to: "456" }], @@ -160,9 +160,9 @@ describe("shouldSuppressMessagingToolReplies", () => { ).toBe(false); }); - it("suppresses when only one side carries the account id", () => { + it("matches when only one side carries the account id", () => { expect( - shouldSuppressMessagingToolReplies({ + shouldDedupeMessagingToolRepliesForRoute({ messageProvider: "telegram", originatingTo: "123", accountId: "work", @@ -171,9 +171,9 @@ describe("shouldSuppressMessagingToolReplies", () => { ).toBe(true); }); - it("does not suppress when route accounts differ", () => { + it("does not match when route accounts differ", () => { expect( - shouldSuppressMessagingToolReplies({ + shouldDedupeMessagingToolRepliesForRoute({ messageProvider: "telegram", originatingTo: "123", accountId: "work", @@ -184,10 +184,10 @@ describe("shouldSuppressMessagingToolReplies", () => { ).toBe(false); }); - it("suppresses telegram topic-origin replies when explicit threadId matches", () => { + it("matches telegram topic-origin replies when explicit threadId matches", () => { installTelegramSuppressionRegistry(); expect( - shouldSuppressMessagingToolReplies({ + shouldDedupeMessagingToolRepliesForRoute({ messageProvider: "telegram", originatingTo: "telegram:group:-100123:topic:77", messagingToolSentTargets: [ @@ -197,9 +197,9 @@ describe("shouldSuppressMessagingToolReplies", () => { ).toBe(true); }); - it("does not suppress telegram topic-origin replies when explicit threadId differs", () => { + it("does not match telegram topic-origin replies when explicit threadId differs", () => { expect( - shouldSuppressMessagingToolReplies({ + shouldDedupeMessagingToolRepliesForRoute({ messageProvider: "telegram", originatingTo: "telegram:group:-100123:topic:77", messagingToolSentTargets: [ @@ -209,9 +209,9 @@ describe("shouldSuppressMessagingToolReplies", () => { ).toBe(false); }); - it("does not suppress telegram topic-origin replies when target omits topic metadata", () => { + it("does not match telegram topic-origin replies when target omits topic metadata", () => { expect( - shouldSuppressMessagingToolReplies({ + shouldDedupeMessagingToolRepliesForRoute({ messageProvider: "telegram", originatingTo: "telegram:group:-100123:topic:77", messagingToolSentTargets: [{ tool: "message", provider: "telegram", to: "-100123" }], @@ -219,10 +219,10 @@ describe("shouldSuppressMessagingToolReplies", () => { ).toBe(false); }); - it("suppresses telegram replies when chatId matches but target forms differ", () => { + it("matches telegram replies when chatId matches but target forms differ", () => { installTelegramSuppressionRegistry(); expect( - shouldSuppressMessagingToolReplies({ + shouldDedupeMessagingToolRepliesForRoute({ messageProvider: "telegram", originatingTo: "telegram:group:-100123", messagingToolSentTargets: [{ tool: "message", provider: "telegram", to: "-100123" }], @@ -230,12 +230,12 @@ describe("shouldSuppressMessagingToolReplies", () => { ).toBe(true); }); - it("suppresses telegram replies even when the active plugin registry omits telegram", () => { + it("matches telegram replies even when the active plugin registry omits telegram", () => { resetPluginRuntimeStateForTest(); setActivePluginRegistry(createTestRegistry([])); expect( - shouldSuppressMessagingToolReplies({ + shouldDedupeMessagingToolRepliesForRoute({ messageProvider: "telegram", originatingTo: "telegram:group:-100123:topic:77", messagingToolSentTargets: [ @@ -255,20 +255,82 @@ describe("resolveMessagingToolPayloadDedupe", () => { }), ).toEqual({ shouldDedupePayloads: true, - suppressReplies: false, + matchingRoute: false, + routeSentTexts: [], + routeSentMediaUrls: [], + useGlobalSentTextEvidenceFallback: false, + useGlobalSentMediaUrlEvidenceFallback: false, }); }); - it("suppresses final replies when a messaging tool sent to the same route", () => { + it("dedupes final replies by content when a messaging tool sent to the same route", () => { expect( resolveMessagingToolPayloadDedupe({ messageProvider: "telegram", originatingTo: "123", - messagingToolSentTargets: [{ tool: "message", provider: "telegram", to: "123" }], + messagingToolSentTargets: [ + { + tool: "message", + provider: "telegram", + to: "123", + text: "sent text", + mediaUrls: ["file:///tmp/sent.png"], + }, + ], }), ).toEqual({ shouldDedupePayloads: true, - suppressReplies: true, + matchingRoute: true, + routeSentTexts: ["sent text"], + routeSentMediaUrls: ["file:///tmp/sent.png"], + useGlobalSentTextEvidenceFallback: false, + useGlobalSentMediaUrlEvidenceFallback: false, + }); + }); + + it("preserves global evidence fallback for legacy multi-target records", () => { + expect( + resolveMessagingToolPayloadDedupe({ + messageProvider: "slack", + originatingTo: "channel:C1", + messagingToolSentTargets: [ + { tool: "slack", provider: "slack", to: "channel:C1" }, + { tool: "discord", provider: "discord", to: "channel:C2" }, + ], + }), + ).toEqual({ + shouldDedupePayloads: true, + matchingRoute: true, + routeSentTexts: [], + routeSentMediaUrls: [], + useGlobalSentTextEvidenceFallback: true, + useGlobalSentMediaUrlEvidenceFallback: true, + }); + }); + + it("scopes matching-route evidence to the matched target", () => { + expect( + resolveMessagingToolPayloadDedupe({ + messageProvider: "slack", + originatingTo: "channel:C1", + messagingToolSentTargets: [ + { tool: "slack", provider: "slack", to: "channel:C1", text: "slack text" }, + { + tool: "discord", + provider: "discord", + to: "channel:C2", + text: "discord text", + mediaUrls: ["file:///tmp/discord.png"], + }, + ], + }), + ).toEqual({ + shouldDedupePayloads: true, + matchingRoute: true, + routeSentTexts: ["slack text"], + routeSentMediaUrls: [], + useGlobalSentTextEvidenceFallback: false, + useGlobalSentMediaUrlEvidenceFallback: false, }); }); @@ -281,7 +343,11 @@ describe("resolveMessagingToolPayloadDedupe", () => { }), ).toEqual({ shouldDedupePayloads: false, - suppressReplies: false, + matchingRoute: false, + routeSentTexts: [], + routeSentMediaUrls: [], + useGlobalSentTextEvidenceFallback: false, + useGlobalSentMediaUrlEvidenceFallback: false, }); }); }); diff --git a/src/auto-reply/reply/reply-payloads.ts b/src/auto-reply/reply/reply-payloads.ts index 2a36f632464..52e532c9b48 100644 --- a/src/auto-reply/reply/reply-payloads.ts +++ b/src/auto-reply/reply/reply-payloads.ts @@ -9,5 +9,5 @@ export { filterMessagingToolDuplicates, filterMessagingToolMediaDuplicates, resolveMessagingToolPayloadDedupe, - shouldSuppressMessagingToolReplies, + shouldDedupeMessagingToolRepliesForRoute, } from "./reply-payloads-dedupe.js";