From bd7801eefaa1929f787797a05e65916ef6e9bbf3 Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Wed, 8 Apr 2026 18:15:35 -0400 Subject: [PATCH] Slack: key turn-local dedupe by dispatch kind Scope Slack turn-local delivery dedupe by reply dispatch kind so identical tool and final payloads on the same thread do not collapse into one send. Expose the existing dispatcher kind on the public reply-runtime seam and cover the Slack tracker and preview-fallback paths with regression tests. --- .../.generated/plugin-sdk-api-baseline.sha256 | 4 +- .../dispatch.preview-fallback.test.ts | 49 ++++++++++- .../dispatch.streaming.test.ts | 20 ++++- .../src/monitor/message-handler/dispatch.ts | 88 +++++++++++++------ src/plugin-sdk/reply-runtime.ts | 1 + 5 files changed, 127 insertions(+), 35 deletions(-) diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index cafc6ff94c1..78fb166991c 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -763d2709dd26f4ec7d5807b2f1781b7f58cb115d2b0a9c9235a6c2c7b3788c1f plugin-sdk-api-baseline.json -87ab9ec219f037b13a8f42378d1fed02701d4035da0e5eca8a091626e8426523 plugin-sdk-api-baseline.jsonl +048efa89df3126388efa43e2d46508b755edc4a88c5cbeb3718273ae2b1758a6 plugin-sdk-api-baseline.json +3b0f8fe32f559266b805a1077820365e91bb8bfac519ae5d54ecfe6d6415fcc1 plugin-sdk-api-baseline.jsonl diff --git a/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts b/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts index 9369bf22ee7..5f30eb11b49 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts @@ -2,10 +2,15 @@ import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; const FINAL_REPLY_TEXT = "final answer"; const THREAD_TS = "thread-1"; +const SAME_TEXT = "same reply"; const createSlackDraftStreamMock = vi.fn(); const deliverRepliesMock = vi.fn(async () => {}); const finalizeSlackPreviewEditMock = vi.fn(async () => {}); +let mockedDispatchSequence: Array<{ + kind: "tool" | "block" | "final"; + payload: { text: string }; +}> = []; const noop = () => {}; const noopAsync = async () => {}; @@ -216,7 +221,9 @@ vi.mock("../replies.js", () => ({ })); vi.mock("../reply.runtime.js", () => ({ - createReplyDispatcherWithTyping: (params: { deliver: (payload: unknown) => Promise }) => ({ + createReplyDispatcherWithTyping: (params: { + deliver: (payload: unknown, info: { kind: "tool" | "block" | "final" }) => Promise; + }) => ({ dispatcher: { deliver: params.deliver, }, @@ -224,13 +231,20 @@ vi.mock("../reply.runtime.js", () => ({ markDispatchIdle: () => {}, }), dispatchInboundMessage: async (params: { - dispatcher: { deliver: (payload: { text: string }) => Promise }; + dispatcher: { + deliver: ( + payload: { text: string }, + info: { kind: "tool" | "block" | "final" }, + ) => Promise; + }; }) => { - await params.dispatcher.deliver({ text: FINAL_REPLY_TEXT }); + for (const entry of mockedDispatchSequence) { + await params.dispatcher.deliver(entry.payload, { kind: entry.kind }); + } return { queuedFinal: false, counts: { - final: 1, + final: mockedDispatchSequence.filter((entry) => entry.kind === "final").length, }, }; }, @@ -251,6 +265,7 @@ describe("dispatchPreparedSlackMessage preview fallback", () => { createSlackDraftStreamMock.mockReset(); deliverRepliesMock.mockReset(); finalizeSlackPreviewEditMock.mockReset(); + mockedDispatchSequence = [{ kind: "final", payload: { text: FINAL_REPLY_TEXT } }]; createSlackDraftStreamMock.mockReturnValue(createDraftStreamStub()); finalizeSlackPreviewEditMock.mockRejectedValue(new Error("socket closed")); @@ -268,4 +283,30 @@ describe("dispatchPreparedSlackMessage preview fallback", () => { }), ); }); + + it("keeps same-content tool and final payloads distinct after preview fallback", async () => { + mockedDispatchSequence = [ + { kind: "tool", payload: { text: SAME_TEXT } }, + { kind: "final", payload: { text: SAME_TEXT } }, + ]; + + await dispatchPreparedSlackMessage(createPreparedSlackMessage()); + + expect(finalizeSlackPreviewEditMock).toHaveBeenCalledTimes(2); + expect(deliverRepliesMock).toHaveBeenCalledTimes(2); + expect(deliverRepliesMock).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + replyThreadTs: THREAD_TS, + replies: [expect.objectContaining({ text: SAME_TEXT })], + }), + ); + expect(deliverRepliesMock).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + replyThreadTs: THREAD_TS, + replies: [expect.objectContaining({ text: SAME_TEXT })], + }), + ); + }); }); diff --git a/extensions/slack/src/monitor/message-handler/dispatch.streaming.test.ts b/extensions/slack/src/monitor/message-handler/dispatch.streaming.test.ts index af728d8af6f..445cf768241 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.streaming.test.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.streaming.test.ts @@ -25,27 +25,39 @@ describe("slack turn delivery tracker", () => { const tracker = createSlackTurnDeliveryTracker(); const payload = { text: "same reply" }; - expect(tracker.hasDelivered({ payload, threadTs: "123.456" })).toBe(false); - tracker.markDelivered({ payload, threadTs: "123.456" }); - expect(tracker.hasDelivered({ payload, threadTs: "123.456" })).toBe(true); - expect(tracker.hasDelivered({ payload, threadTs: "other-thread" })).toBe(false); + expect(tracker.hasDelivered({ kind: "final", payload, threadTs: "123.456" })).toBe(false); + tracker.markDelivered({ kind: "final", payload, threadTs: "123.456" }); + expect(tracker.hasDelivered({ kind: "final", payload, threadTs: "123.456" })).toBe(true); + expect(tracker.hasDelivered({ kind: "final", payload, threadTs: "other-thread" })).toBe(false); }); it("keeps explicit reply targets distinct from the shared thread target", () => { const tracker = createSlackTurnDeliveryTracker(); tracker.markDelivered({ + kind: "final", payload: { text: "same reply", replyToId: "thread-A" }, threadTs: "123.456", }); expect( tracker.hasDelivered({ + kind: "final", payload: { text: "same reply", replyToId: "thread-B" }, threadTs: "123.456", }), ).toBe(false); }); + + it("keeps distinct dispatch kinds separate for identical payloads", () => { + const tracker = createSlackTurnDeliveryTracker(); + const payload = { text: "same reply" }; + + tracker.markDelivered({ kind: "tool", payload, threadTs: "123.456" }); + + expect(tracker.hasDelivered({ kind: "tool", payload, threadTs: "123.456" })).toBe(true); + expect(tracker.hasDelivered({ kind: "final", payload, threadTs: "123.456" })).toBe(false); + }); }); describe("slack native streaming thread hint", () => { diff --git a/extensions/slack/src/monitor/message-handler/dispatch.ts b/extensions/slack/src/monitor/message-handler/dispatch.ts index 322522149d9..ae73615e48b 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.ts @@ -15,7 +15,7 @@ import { import { resolveAgentOutboundIdentity } from "openclaw/plugin-sdk/outbound-runtime"; import { clearHistoryEntriesIfEnabled } from "openclaw/plugin-sdk/reply-history"; import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload"; -import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime"; +import type { ReplyDispatchKind, ReplyPayload } from "openclaw/plugin-sdk/reply-runtime"; import { danger, logVerbose, shouldLogVerbose } from "openclaw/plugin-sdk/runtime-env"; import { resolvePinnedMainDmOwnerFromAllowlist } from "openclaw/plugin-sdk/security-runtime"; import { normalizeOptionalLowercaseString } from "openclaw/plugin-sdk/text-runtime"; @@ -123,11 +123,14 @@ export function resolveSlackStreamingThreadHint(params: { }); } -function buildSlackTurnDeliveryKey(params: { +type SlackTurnDeliveryAttempt = { + kind: ReplyDispatchKind; payload: ReplyPayload; threadTs?: string; textOverride?: string; -}): string | null { +}; + +function buildSlackTurnDeliveryKey(params: SlackTurnDeliveryAttempt): string | null { const reply = resolveSendableOutboundReplyParts(params.payload, { text: params.textOverride, }); @@ -136,6 +139,7 @@ function buildSlackTurnDeliveryKey(params: { return null; } return JSON.stringify({ + kind: params.kind, threadTs: params.threadTs ?? "", replyToId: params.payload.replyToId ?? null, text: reply.trimmedText, @@ -147,11 +151,11 @@ function buildSlackTurnDeliveryKey(params: { export function createSlackTurnDeliveryTracker() { const deliveredKeys = new Set(); return { - hasDelivered(params: { payload: ReplyPayload; threadTs?: string; textOverride?: string }) { + hasDelivered(params: SlackTurnDeliveryAttempt) { const key = buildSlackTurnDeliveryKey(params); return key ? deliveredKeys.has(key) : false; }, - markDelivered(params: { payload: ReplyPayload; threadTs?: string; textOverride?: string }) { + markDelivered(params: SlackTurnDeliveryAttempt) { const key = buildSlackTurnDeliveryKey(params); if (key) { deliveredKeys.add(key); @@ -388,14 +392,24 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag let observedReplyDelivery = false; const deliveryTracker = createSlackTurnDeliveryTracker(); - const deliverNormally = async (payload: ReplyPayload, forcedThreadTs?: string): Promise => { - const replyThreadTs = forcedThreadTs ?? replyPlan.nextThreadTs(); - if (deliveryTracker.hasDelivered({ payload, threadTs: replyThreadTs })) { + const deliverNormally = async (params: { + payload: ReplyPayload; + kind: ReplyDispatchKind; + forcedThreadTs?: string; + }): Promise => { + const replyThreadTs = params.forcedThreadTs ?? replyPlan.nextThreadTs(); + if ( + deliveryTracker.hasDelivered({ + kind: params.kind, + payload: params.payload, + threadTs: replyThreadTs, + }) + ) { logVerbose("slack: suppressed duplicate normal delivery within the same turn"); return; } await deliverReplies({ - replies: [payload], + replies: [params.payload], target: prepared.replyTarget, token: ctx.botToken, accountId: account.accountId, @@ -411,13 +425,29 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag usedReplyThreadTs ??= replyThreadTs; } replyPlan.markSent(); - deliveryTracker.markDelivered({ payload, threadTs: replyThreadTs }); + deliveryTracker.markDelivered({ + kind: params.kind, + payload: params.payload, + threadTs: replyThreadTs, + }); }; - const deliverWithStreaming = async (payload: ReplyPayload): Promise => { - const reply = resolveSendableOutboundReplyParts(payload); - if (streamFailed || reply.hasMedia || readSlackReplyBlocks(payload)?.length || !reply.hasText) { - await deliverNormally(payload, streamSession?.threadTs); + const deliverWithStreaming = async (params: { + payload: ReplyPayload; + kind: ReplyDispatchKind; + }): Promise => { + const reply = resolveSendableOutboundReplyParts(params.payload); + if ( + streamFailed || + reply.hasMedia || + readSlackReplyBlocks(params.payload)?.length || + !reply.hasText + ) { + await deliverNormally({ + payload: params.payload, + kind: params.kind, + forcedThreadTs: streamSession?.threadTs, + }); return; } @@ -432,12 +462,13 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag "slack-stream: no reply thread target for stream start, falling back to normal delivery", ); streamFailed = true; - await deliverNormally(payload); + await deliverNormally({ payload: params.payload, kind: params.kind }); return; } if ( deliveryTracker.hasDelivered({ - payload, + kind: params.kind, + payload: params.payload, threadTs: streamThreadTs, textOverride: text, }) @@ -458,7 +489,8 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag usedReplyThreadTs ??= streamThreadTs; replyPlan.markSent(); deliveryTracker.markDelivered({ - payload, + kind: params.kind, + payload: params.payload, threadTs: streamThreadTs, textOverride: text, }); @@ -466,7 +498,8 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag } if ( deliveryTracker.hasDelivered({ - payload, + kind: params.kind, + payload: params.payload, threadTs: streamSession.threadTs, textOverride: text, }) @@ -480,7 +513,8 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag text: "\n" + text, }); deliveryTracker.markDelivered({ - payload, + kind: params.kind, + payload: params.payload, threadTs: streamSession.threadTs, textOverride: text, }); @@ -489,16 +523,20 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag danger(`slack-stream: streaming API call failed: ${String(err)}, falling back`), ); streamFailed = true; - await deliverNormally(payload, streamSession?.threadTs ?? plannedThreadTs); + await deliverNormally({ + payload: params.payload, + kind: params.kind, + forcedThreadTs: streamSession?.threadTs ?? plannedThreadTs, + }); } }; const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({ ...replyPipeline, humanDelay: resolveHumanDelayConfig(cfg, route.agentId), - deliver: async (payload) => { + deliver: async (payload, info) => { if (useStreaming) { - await deliverWithStreaming(payload); + await deliverWithStreaming({ payload, kind: info.kind }); return; } @@ -518,7 +556,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag if (canFinalizeViaPreviewEdit) { const finalThreadTs = usedReplyThreadTs ?? statusThreadTs; - if (deliveryTracker.hasDelivered({ payload, threadTs: finalThreadTs })) { + if (deliveryTracker.hasDelivered({ kind: info.kind, payload, threadTs: finalThreadTs })) { observedReplyDelivery = true; return; } @@ -535,7 +573,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag threadTs: finalThreadTs, }); observedReplyDelivery = true; - deliveryTracker.markDelivered({ payload, threadTs: finalThreadTs }); + deliveryTracker.markDelivered({ kind: info.kind, payload, threadTs: finalThreadTs }); return; } catch (err) { logVerbose( @@ -562,7 +600,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag hasStreamedMessage = false; } - await deliverNormally(payload); + await deliverNormally({ payload, kind: info.kind }); }, onError: (err, info) => { runtime.error?.(danger(`slack ${info.kind} reply failed: ${String(err)}`)); diff --git a/src/plugin-sdk/reply-runtime.ts b/src/plugin-sdk/reply-runtime.ts index c6afedd6080..fda01e3c7f2 100644 --- a/src/plugin-sdk/reply-runtime.ts +++ b/src/plugin-sdk/reply-runtime.ts @@ -45,6 +45,7 @@ export { createReplyDispatcherWithTyping, } from "../auto-reply/reply/reply-dispatcher.js"; export type { + ReplyDispatchKind, ReplyDispatcher, ReplyDispatcherOptions, ReplyDispatcherWithTypingOptions,