diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index cafc6ff94c1..78fb166991c 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -763d2709dd26f4ec7d5807b2f1781b7f58cb115d2b0a9c9235a6c2c7b3788c1f plugin-sdk-api-baseline.json -87ab9ec219f037b13a8f42378d1fed02701d4035da0e5eca8a091626e8426523 plugin-sdk-api-baseline.jsonl +048efa89df3126388efa43e2d46508b755edc4a88c5cbeb3718273ae2b1758a6 plugin-sdk-api-baseline.json +3b0f8fe32f559266b805a1077820365e91bb8bfac519ae5d54ecfe6d6415fcc1 plugin-sdk-api-baseline.jsonl diff --git a/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts b/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts index 9369bf22ee7..5f30eb11b49 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts @@ -2,10 +2,15 @@ import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; const FINAL_REPLY_TEXT = "final answer"; const THREAD_TS = "thread-1"; +const SAME_TEXT = "same reply"; const createSlackDraftStreamMock = vi.fn(); const deliverRepliesMock = vi.fn(async () => {}); const finalizeSlackPreviewEditMock = vi.fn(async () => {}); +let mockedDispatchSequence: Array<{ + kind: "tool" | "block" | "final"; + payload: { text: string }; +}> = []; const noop = () => {}; const noopAsync = async () => {}; @@ -216,7 +221,9 @@ vi.mock("../replies.js", () => ({ })); vi.mock("../reply.runtime.js", () => ({ - createReplyDispatcherWithTyping: (params: { deliver: (payload: unknown) => Promise }) => ({ + createReplyDispatcherWithTyping: (params: { + deliver: (payload: unknown, info: { kind: "tool" | "block" | "final" }) => Promise; + }) => ({ dispatcher: { deliver: params.deliver, }, @@ -224,13 +231,20 @@ vi.mock("../reply.runtime.js", () => ({ markDispatchIdle: () => {}, }), dispatchInboundMessage: async (params: { - dispatcher: { deliver: (payload: { text: string }) => Promise }; + dispatcher: { + deliver: ( + payload: { text: string }, + info: { kind: "tool" | "block" | "final" }, + ) => Promise; + }; }) => { - await params.dispatcher.deliver({ text: FINAL_REPLY_TEXT }); + for (const entry of mockedDispatchSequence) { + await params.dispatcher.deliver(entry.payload, { kind: entry.kind }); + } return { queuedFinal: false, counts: { - final: 1, + final: mockedDispatchSequence.filter((entry) => entry.kind === "final").length, }, }; }, @@ -251,6 +265,7 @@ describe("dispatchPreparedSlackMessage preview fallback", () => { createSlackDraftStreamMock.mockReset(); deliverRepliesMock.mockReset(); finalizeSlackPreviewEditMock.mockReset(); + mockedDispatchSequence = [{ kind: "final", payload: { text: FINAL_REPLY_TEXT } }]; createSlackDraftStreamMock.mockReturnValue(createDraftStreamStub()); finalizeSlackPreviewEditMock.mockRejectedValue(new Error("socket closed")); @@ -268,4 +283,30 @@ describe("dispatchPreparedSlackMessage preview fallback", () => { }), ); }); + + it("keeps same-content tool and final payloads distinct after preview fallback", async () => { + mockedDispatchSequence = [ + { kind: "tool", payload: { text: SAME_TEXT } }, + { kind: "final", payload: { text: SAME_TEXT } }, + ]; + + await dispatchPreparedSlackMessage(createPreparedSlackMessage()); + + expect(finalizeSlackPreviewEditMock).toHaveBeenCalledTimes(2); + expect(deliverRepliesMock).toHaveBeenCalledTimes(2); + expect(deliverRepliesMock).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + replyThreadTs: THREAD_TS, + replies: [expect.objectContaining({ text: SAME_TEXT })], + }), + ); + expect(deliverRepliesMock).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + replyThreadTs: THREAD_TS, + replies: [expect.objectContaining({ text: SAME_TEXT })], + }), + ); + }); }); diff --git a/extensions/slack/src/monitor/message-handler/dispatch.streaming.test.ts b/extensions/slack/src/monitor/message-handler/dispatch.streaming.test.ts index af728d8af6f..445cf768241 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.streaming.test.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.streaming.test.ts @@ -25,27 +25,39 @@ describe("slack turn delivery tracker", () => { const tracker = createSlackTurnDeliveryTracker(); const payload = { text: "same reply" }; - expect(tracker.hasDelivered({ payload, threadTs: "123.456" })).toBe(false); - tracker.markDelivered({ payload, threadTs: "123.456" }); - expect(tracker.hasDelivered({ payload, threadTs: "123.456" })).toBe(true); - expect(tracker.hasDelivered({ payload, threadTs: "other-thread" })).toBe(false); + expect(tracker.hasDelivered({ kind: "final", payload, threadTs: "123.456" })).toBe(false); + tracker.markDelivered({ kind: "final", payload, threadTs: "123.456" }); + expect(tracker.hasDelivered({ kind: "final", payload, threadTs: "123.456" })).toBe(true); + expect(tracker.hasDelivered({ kind: "final", payload, threadTs: "other-thread" })).toBe(false); }); it("keeps explicit reply targets distinct from the shared thread target", () => { const tracker = createSlackTurnDeliveryTracker(); tracker.markDelivered({ + kind: "final", payload: { text: "same reply", replyToId: "thread-A" }, threadTs: "123.456", }); expect( tracker.hasDelivered({ + kind: "final", payload: { text: "same reply", replyToId: "thread-B" }, threadTs: "123.456", }), ).toBe(false); }); + + it("keeps distinct dispatch kinds separate for identical payloads", () => { + const tracker = createSlackTurnDeliveryTracker(); + const payload = { text: "same reply" }; + + tracker.markDelivered({ kind: "tool", payload, threadTs: "123.456" }); + + expect(tracker.hasDelivered({ kind: "tool", payload, threadTs: "123.456" })).toBe(true); + expect(tracker.hasDelivered({ kind: "final", payload, threadTs: "123.456" })).toBe(false); + }); }); describe("slack native streaming thread hint", () => { diff --git a/extensions/slack/src/monitor/message-handler/dispatch.ts b/extensions/slack/src/monitor/message-handler/dispatch.ts index 322522149d9..ae73615e48b 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.ts @@ -15,7 +15,7 @@ import { import { resolveAgentOutboundIdentity } from "openclaw/plugin-sdk/outbound-runtime"; import { clearHistoryEntriesIfEnabled } from "openclaw/plugin-sdk/reply-history"; import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload"; -import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime"; +import type { ReplyDispatchKind, ReplyPayload } from "openclaw/plugin-sdk/reply-runtime"; import { danger, logVerbose, shouldLogVerbose } from "openclaw/plugin-sdk/runtime-env"; import { resolvePinnedMainDmOwnerFromAllowlist } from "openclaw/plugin-sdk/security-runtime"; import { normalizeOptionalLowercaseString } from "openclaw/plugin-sdk/text-runtime"; @@ -123,11 +123,14 @@ export function resolveSlackStreamingThreadHint(params: { }); } -function buildSlackTurnDeliveryKey(params: { +type SlackTurnDeliveryAttempt = { + kind: ReplyDispatchKind; payload: ReplyPayload; threadTs?: string; textOverride?: string; -}): string | null { +}; + +function buildSlackTurnDeliveryKey(params: SlackTurnDeliveryAttempt): string | null { const reply = resolveSendableOutboundReplyParts(params.payload, { text: params.textOverride, }); @@ -136,6 +139,7 @@ function buildSlackTurnDeliveryKey(params: { return null; } return JSON.stringify({ + kind: params.kind, threadTs: params.threadTs ?? "", replyToId: params.payload.replyToId ?? null, text: reply.trimmedText, @@ -147,11 +151,11 @@ function buildSlackTurnDeliveryKey(params: { export function createSlackTurnDeliveryTracker() { const deliveredKeys = new Set(); return { - hasDelivered(params: { payload: ReplyPayload; threadTs?: string; textOverride?: string }) { + hasDelivered(params: SlackTurnDeliveryAttempt) { const key = buildSlackTurnDeliveryKey(params); return key ? deliveredKeys.has(key) : false; }, - markDelivered(params: { payload: ReplyPayload; threadTs?: string; textOverride?: string }) { + markDelivered(params: SlackTurnDeliveryAttempt) { const key = buildSlackTurnDeliveryKey(params); if (key) { deliveredKeys.add(key); @@ -388,14 +392,24 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag let observedReplyDelivery = false; const deliveryTracker = createSlackTurnDeliveryTracker(); - const deliverNormally = async (payload: ReplyPayload, forcedThreadTs?: string): Promise => { - const replyThreadTs = forcedThreadTs ?? replyPlan.nextThreadTs(); - if (deliveryTracker.hasDelivered({ payload, threadTs: replyThreadTs })) { + const deliverNormally = async (params: { + payload: ReplyPayload; + kind: ReplyDispatchKind; + forcedThreadTs?: string; + }): Promise => { + const replyThreadTs = params.forcedThreadTs ?? replyPlan.nextThreadTs(); + if ( + deliveryTracker.hasDelivered({ + kind: params.kind, + payload: params.payload, + threadTs: replyThreadTs, + }) + ) { logVerbose("slack: suppressed duplicate normal delivery within the same turn"); return; } await deliverReplies({ - replies: [payload], + replies: [params.payload], target: prepared.replyTarget, token: ctx.botToken, accountId: account.accountId, @@ -411,13 +425,29 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag usedReplyThreadTs ??= replyThreadTs; } replyPlan.markSent(); - deliveryTracker.markDelivered({ payload, threadTs: replyThreadTs }); + deliveryTracker.markDelivered({ + kind: params.kind, + payload: params.payload, + threadTs: replyThreadTs, + }); }; - const deliverWithStreaming = async (payload: ReplyPayload): Promise => { - const reply = resolveSendableOutboundReplyParts(payload); - if (streamFailed || reply.hasMedia || readSlackReplyBlocks(payload)?.length || !reply.hasText) { - await deliverNormally(payload, streamSession?.threadTs); + const deliverWithStreaming = async (params: { + payload: ReplyPayload; + kind: ReplyDispatchKind; + }): Promise => { + const reply = resolveSendableOutboundReplyParts(params.payload); + if ( + streamFailed || + reply.hasMedia || + readSlackReplyBlocks(params.payload)?.length || + !reply.hasText + ) { + await deliverNormally({ + payload: params.payload, + kind: params.kind, + forcedThreadTs: streamSession?.threadTs, + }); return; } @@ -432,12 +462,13 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag "slack-stream: no reply thread target for stream start, falling back to normal delivery", ); streamFailed = true; - await deliverNormally(payload); + await deliverNormally({ payload: params.payload, kind: params.kind }); return; } if ( deliveryTracker.hasDelivered({ - payload, + kind: params.kind, + payload: params.payload, threadTs: streamThreadTs, textOverride: text, }) @@ -458,7 +489,8 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag usedReplyThreadTs ??= streamThreadTs; replyPlan.markSent(); deliveryTracker.markDelivered({ - payload, + kind: params.kind, + payload: params.payload, threadTs: streamThreadTs, textOverride: text, }); @@ -466,7 +498,8 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag } if ( deliveryTracker.hasDelivered({ - payload, + kind: params.kind, + payload: params.payload, threadTs: streamSession.threadTs, textOverride: text, }) @@ -480,7 +513,8 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag text: "\n" + text, }); deliveryTracker.markDelivered({ - payload, + kind: params.kind, + payload: params.payload, threadTs: streamSession.threadTs, textOverride: text, }); @@ -489,16 +523,20 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag danger(`slack-stream: streaming API call failed: ${String(err)}, falling back`), ); streamFailed = true; - await deliverNormally(payload, streamSession?.threadTs ?? plannedThreadTs); + await deliverNormally({ + payload: params.payload, + kind: params.kind, + forcedThreadTs: streamSession?.threadTs ?? plannedThreadTs, + }); } }; const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({ ...replyPipeline, humanDelay: resolveHumanDelayConfig(cfg, route.agentId), - deliver: async (payload) => { + deliver: async (payload, info) => { if (useStreaming) { - await deliverWithStreaming(payload); + await deliverWithStreaming({ payload, kind: info.kind }); return; } @@ -518,7 +556,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag if (canFinalizeViaPreviewEdit) { const finalThreadTs = usedReplyThreadTs ?? statusThreadTs; - if (deliveryTracker.hasDelivered({ payload, threadTs: finalThreadTs })) { + if (deliveryTracker.hasDelivered({ kind: info.kind, payload, threadTs: finalThreadTs })) { observedReplyDelivery = true; return; } @@ -535,7 +573,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag threadTs: finalThreadTs, }); observedReplyDelivery = true; - deliveryTracker.markDelivered({ payload, threadTs: finalThreadTs }); + deliveryTracker.markDelivered({ kind: info.kind, payload, threadTs: finalThreadTs }); return; } catch (err) { logVerbose( @@ -562,7 +600,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag hasStreamedMessage = false; } - await deliverNormally(payload); + await deliverNormally({ payload, kind: info.kind }); }, onError: (err, info) => { runtime.error?.(danger(`slack ${info.kind} reply failed: ${String(err)}`)); diff --git a/src/plugin-sdk/reply-runtime.ts b/src/plugin-sdk/reply-runtime.ts index c6afedd6080..fda01e3c7f2 100644 --- a/src/plugin-sdk/reply-runtime.ts +++ b/src/plugin-sdk/reply-runtime.ts @@ -45,6 +45,7 @@ export { createReplyDispatcherWithTyping, } from "../auto-reply/reply/reply-dispatcher.js"; export type { + ReplyDispatchKind, ReplyDispatcher, ReplyDispatcherOptions, ReplyDispatcherWithTypingOptions,