Slack: key turn-local dedupe by dispatch kind

Scope Slack turn-local delivery dedupe by reply dispatch kind so identical tool and final payloads on the same thread do not collapse into one send.

Expose the existing dispatcher kind on the public reply-runtime seam and cover the Slack tracker and preview-fallback paths with regression tests.
This commit is contained in:
Gustavo Madeira Santana
2026-04-08 18:15:35 -04:00
parent 68630a9e6d
commit bd7801eefa
5 changed files with 127 additions and 35 deletions

View File

@@ -1,2 +1,2 @@
763d2709dd26f4ec7d5807b2f1781b7f58cb115d2b0a9c9235a6c2c7b3788c1f plugin-sdk-api-baseline.json
87ab9ec219f037b13a8f42378d1fed02701d4035da0e5eca8a091626e8426523 plugin-sdk-api-baseline.jsonl
048efa89df3126388efa43e2d46508b755edc4a88c5cbeb3718273ae2b1758a6 plugin-sdk-api-baseline.json
3b0f8fe32f559266b805a1077820365e91bb8bfac519ae5d54ecfe6d6415fcc1 plugin-sdk-api-baseline.jsonl

View File

@@ -2,10 +2,15 @@ import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
const FINAL_REPLY_TEXT = "final answer";
const THREAD_TS = "thread-1";
const SAME_TEXT = "same reply";
const createSlackDraftStreamMock = vi.fn();
const deliverRepliesMock = vi.fn(async () => {});
const finalizeSlackPreviewEditMock = vi.fn(async () => {});
let mockedDispatchSequence: Array<{
kind: "tool" | "block" | "final";
payload: { text: string };
}> = [];
const noop = () => {};
const noopAsync = async () => {};
@@ -216,7 +221,9 @@ vi.mock("../replies.js", () => ({
}));
vi.mock("../reply.runtime.js", () => ({
createReplyDispatcherWithTyping: (params: { deliver: (payload: unknown) => Promise<void> }) => ({
createReplyDispatcherWithTyping: (params: {
deliver: (payload: unknown, info: { kind: "tool" | "block" | "final" }) => Promise<void>;
}) => ({
dispatcher: {
deliver: params.deliver,
},
@@ -224,13 +231,20 @@ vi.mock("../reply.runtime.js", () => ({
markDispatchIdle: () => {},
}),
dispatchInboundMessage: async (params: {
dispatcher: { deliver: (payload: { text: string }) => Promise<void> };
dispatcher: {
deliver: (
payload: { text: string },
info: { kind: "tool" | "block" | "final" },
) => Promise<void>;
};
}) => {
await params.dispatcher.deliver({ text: FINAL_REPLY_TEXT });
for (const entry of mockedDispatchSequence) {
await params.dispatcher.deliver(entry.payload, { kind: entry.kind });
}
return {
queuedFinal: false,
counts: {
final: 1,
final: mockedDispatchSequence.filter((entry) => entry.kind === "final").length,
},
};
},
@@ -251,6 +265,7 @@ describe("dispatchPreparedSlackMessage preview fallback", () => {
createSlackDraftStreamMock.mockReset();
deliverRepliesMock.mockReset();
finalizeSlackPreviewEditMock.mockReset();
mockedDispatchSequence = [{ kind: "final", payload: { text: FINAL_REPLY_TEXT } }];
createSlackDraftStreamMock.mockReturnValue(createDraftStreamStub());
finalizeSlackPreviewEditMock.mockRejectedValue(new Error("socket closed"));
@@ -268,4 +283,30 @@ describe("dispatchPreparedSlackMessage preview fallback", () => {
}),
);
});
it("keeps same-content tool and final payloads distinct after preview fallback", async () => {
mockedDispatchSequence = [
{ kind: "tool", payload: { text: SAME_TEXT } },
{ kind: "final", payload: { text: SAME_TEXT } },
];
await dispatchPreparedSlackMessage(createPreparedSlackMessage());
expect(finalizeSlackPreviewEditMock).toHaveBeenCalledTimes(2);
expect(deliverRepliesMock).toHaveBeenCalledTimes(2);
expect(deliverRepliesMock).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
replyThreadTs: THREAD_TS,
replies: [expect.objectContaining({ text: SAME_TEXT })],
}),
);
expect(deliverRepliesMock).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
replyThreadTs: THREAD_TS,
replies: [expect.objectContaining({ text: SAME_TEXT })],
}),
);
});
});

View File

@@ -25,27 +25,39 @@ describe("slack turn delivery tracker", () => {
const tracker = createSlackTurnDeliveryTracker();
const payload = { text: "same reply" };
expect(tracker.hasDelivered({ payload, threadTs: "123.456" })).toBe(false);
tracker.markDelivered({ payload, threadTs: "123.456" });
expect(tracker.hasDelivered({ payload, threadTs: "123.456" })).toBe(true);
expect(tracker.hasDelivered({ payload, threadTs: "other-thread" })).toBe(false);
expect(tracker.hasDelivered({ kind: "final", payload, threadTs: "123.456" })).toBe(false);
tracker.markDelivered({ kind: "final", payload, threadTs: "123.456" });
expect(tracker.hasDelivered({ kind: "final", payload, threadTs: "123.456" })).toBe(true);
expect(tracker.hasDelivered({ kind: "final", payload, threadTs: "other-thread" })).toBe(false);
});
it("keeps explicit reply targets distinct from the shared thread target", () => {
const tracker = createSlackTurnDeliveryTracker();
tracker.markDelivered({
kind: "final",
payload: { text: "same reply", replyToId: "thread-A" },
threadTs: "123.456",
});
expect(
tracker.hasDelivered({
kind: "final",
payload: { text: "same reply", replyToId: "thread-B" },
threadTs: "123.456",
}),
).toBe(false);
});
it("keeps distinct dispatch kinds separate for identical payloads", () => {
const tracker = createSlackTurnDeliveryTracker();
const payload = { text: "same reply" };
tracker.markDelivered({ kind: "tool", payload, threadTs: "123.456" });
expect(tracker.hasDelivered({ kind: "tool", payload, threadTs: "123.456" })).toBe(true);
expect(tracker.hasDelivered({ kind: "final", payload, threadTs: "123.456" })).toBe(false);
});
});
describe("slack native streaming thread hint", () => {

View File

@@ -15,7 +15,7 @@ import {
import { resolveAgentOutboundIdentity } from "openclaw/plugin-sdk/outbound-runtime";
import { clearHistoryEntriesIfEnabled } from "openclaw/plugin-sdk/reply-history";
import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload";
import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime";
import type { ReplyDispatchKind, ReplyPayload } from "openclaw/plugin-sdk/reply-runtime";
import { danger, logVerbose, shouldLogVerbose } from "openclaw/plugin-sdk/runtime-env";
import { resolvePinnedMainDmOwnerFromAllowlist } from "openclaw/plugin-sdk/security-runtime";
import { normalizeOptionalLowercaseString } from "openclaw/plugin-sdk/text-runtime";
@@ -123,11 +123,14 @@ export function resolveSlackStreamingThreadHint(params: {
});
}
function buildSlackTurnDeliveryKey(params: {
type SlackTurnDeliveryAttempt = {
kind: ReplyDispatchKind;
payload: ReplyPayload;
threadTs?: string;
textOverride?: string;
}): string | null {
};
function buildSlackTurnDeliveryKey(params: SlackTurnDeliveryAttempt): string | null {
const reply = resolveSendableOutboundReplyParts(params.payload, {
text: params.textOverride,
});
@@ -136,6 +139,7 @@ function buildSlackTurnDeliveryKey(params: {
return null;
}
return JSON.stringify({
kind: params.kind,
threadTs: params.threadTs ?? "",
replyToId: params.payload.replyToId ?? null,
text: reply.trimmedText,
@@ -147,11 +151,11 @@ function buildSlackTurnDeliveryKey(params: {
export function createSlackTurnDeliveryTracker() {
const deliveredKeys = new Set<string>();
return {
hasDelivered(params: { payload: ReplyPayload; threadTs?: string; textOverride?: string }) {
hasDelivered(params: SlackTurnDeliveryAttempt) {
const key = buildSlackTurnDeliveryKey(params);
return key ? deliveredKeys.has(key) : false;
},
markDelivered(params: { payload: ReplyPayload; threadTs?: string; textOverride?: string }) {
markDelivered(params: SlackTurnDeliveryAttempt) {
const key = buildSlackTurnDeliveryKey(params);
if (key) {
deliveredKeys.add(key);
@@ -388,14 +392,24 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
let observedReplyDelivery = false;
const deliveryTracker = createSlackTurnDeliveryTracker();
const deliverNormally = async (payload: ReplyPayload, forcedThreadTs?: string): Promise<void> => {
const replyThreadTs = forcedThreadTs ?? replyPlan.nextThreadTs();
if (deliveryTracker.hasDelivered({ payload, threadTs: replyThreadTs })) {
const deliverNormally = async (params: {
payload: ReplyPayload;
kind: ReplyDispatchKind;
forcedThreadTs?: string;
}): Promise<void> => {
const replyThreadTs = params.forcedThreadTs ?? replyPlan.nextThreadTs();
if (
deliveryTracker.hasDelivered({
kind: params.kind,
payload: params.payload,
threadTs: replyThreadTs,
})
) {
logVerbose("slack: suppressed duplicate normal delivery within the same turn");
return;
}
await deliverReplies({
replies: [payload],
replies: [params.payload],
target: prepared.replyTarget,
token: ctx.botToken,
accountId: account.accountId,
@@ -411,13 +425,29 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
usedReplyThreadTs ??= replyThreadTs;
}
replyPlan.markSent();
deliveryTracker.markDelivered({ payload, threadTs: replyThreadTs });
deliveryTracker.markDelivered({
kind: params.kind,
payload: params.payload,
threadTs: replyThreadTs,
});
};
const deliverWithStreaming = async (payload: ReplyPayload): Promise<void> => {
const reply = resolveSendableOutboundReplyParts(payload);
if (streamFailed || reply.hasMedia || readSlackReplyBlocks(payload)?.length || !reply.hasText) {
await deliverNormally(payload, streamSession?.threadTs);
const deliverWithStreaming = async (params: {
payload: ReplyPayload;
kind: ReplyDispatchKind;
}): Promise<void> => {
const reply = resolveSendableOutboundReplyParts(params.payload);
if (
streamFailed ||
reply.hasMedia ||
readSlackReplyBlocks(params.payload)?.length ||
!reply.hasText
) {
await deliverNormally({
payload: params.payload,
kind: params.kind,
forcedThreadTs: streamSession?.threadTs,
});
return;
}
@@ -432,12 +462,13 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
"slack-stream: no reply thread target for stream start, falling back to normal delivery",
);
streamFailed = true;
await deliverNormally(payload);
await deliverNormally({ payload: params.payload, kind: params.kind });
return;
}
if (
deliveryTracker.hasDelivered({
payload,
kind: params.kind,
payload: params.payload,
threadTs: streamThreadTs,
textOverride: text,
})
@@ -458,7 +489,8 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
usedReplyThreadTs ??= streamThreadTs;
replyPlan.markSent();
deliveryTracker.markDelivered({
payload,
kind: params.kind,
payload: params.payload,
threadTs: streamThreadTs,
textOverride: text,
});
@@ -466,7 +498,8 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
}
if (
deliveryTracker.hasDelivered({
payload,
kind: params.kind,
payload: params.payload,
threadTs: streamSession.threadTs,
textOverride: text,
})
@@ -480,7 +513,8 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
text: "\n" + text,
});
deliveryTracker.markDelivered({
payload,
kind: params.kind,
payload: params.payload,
threadTs: streamSession.threadTs,
textOverride: text,
});
@@ -489,16 +523,20 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
danger(`slack-stream: streaming API call failed: ${String(err)}, falling back`),
);
streamFailed = true;
await deliverNormally(payload, streamSession?.threadTs ?? plannedThreadTs);
await deliverNormally({
payload: params.payload,
kind: params.kind,
forcedThreadTs: streamSession?.threadTs ?? plannedThreadTs,
});
}
};
const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({
...replyPipeline,
humanDelay: resolveHumanDelayConfig(cfg, route.agentId),
deliver: async (payload) => {
deliver: async (payload, info) => {
if (useStreaming) {
await deliverWithStreaming(payload);
await deliverWithStreaming({ payload, kind: info.kind });
return;
}
@@ -518,7 +556,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
if (canFinalizeViaPreviewEdit) {
const finalThreadTs = usedReplyThreadTs ?? statusThreadTs;
if (deliveryTracker.hasDelivered({ payload, threadTs: finalThreadTs })) {
if (deliveryTracker.hasDelivered({ kind: info.kind, payload, threadTs: finalThreadTs })) {
observedReplyDelivery = true;
return;
}
@@ -535,7 +573,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
threadTs: finalThreadTs,
});
observedReplyDelivery = true;
deliveryTracker.markDelivered({ payload, threadTs: finalThreadTs });
deliveryTracker.markDelivered({ kind: info.kind, payload, threadTs: finalThreadTs });
return;
} catch (err) {
logVerbose(
@@ -562,7 +600,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
hasStreamedMessage = false;
}
await deliverNormally(payload);
await deliverNormally({ payload, kind: info.kind });
},
onError: (err, info) => {
runtime.error?.(danger(`slack ${info.kind} reply failed: ${String(err)}`));

View File

@@ -45,6 +45,7 @@ export {
createReplyDispatcherWithTyping,
} from "../auto-reply/reply/reply-dispatcher.js";
export type {
ReplyDispatchKind,
ReplyDispatcher,
ReplyDispatcherOptions,
ReplyDispatcherWithTypingOptions,