From 4693d20cad81f31495f29b7e62e26b87f2cccd8e Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 25 Apr 2026 00:17:29 +0100 Subject: [PATCH] fix(slack): keep block replies in first thread --- CHANGELOG.md | 1 + .../dispatch.preview-fallback.test.ts | 43 +++++++++++++++- .../src/monitor/message-handler/dispatch.ts | 49 ++++++++++++++++--- extensions/slack/src/monitor/replies.test.ts | 44 ++++++++++++++++- extensions/slack/src/monitor/replies.ts | 20 ++++++-- 5 files changed, 142 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c68aac80e13..94ea35123cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai - Providers/GitHub Copilot: keep the plugin stream wrapper from claiming transport selection before OpenClaw picks a boundary-aware stream path, avoiding Pi's stale fallback Copilot headers on normal model turns. Thanks @steipete. - Discord/subagents: pass runtime config into thread-bound native subagent binding and require it at the helper boundary so Discord channel resolution keeps account-aware config. Fixes #71054. (#70945) Thanks @jai. - Slack/Assistant: accept Slack Assistant DM `message_changed` events when their metadata identifies the human sender, while continuing to drop self-authored bot edits. Fixes #55445. Thanks @AlfredPros. +- Slack/block replies: keep multi-part block deliveries in the first Slack reply thread when `replyToMode` is `first`, matching text reply threading instead of leaking later blocks into the channel. Fixes #49341. Thanks @pholmstr and @xiwuqi. - Agents/failover: stop body-less HTTP 400/422 proxy failures from defaulting to `"format"` classification, so embedded retries surface the opaque provider failure instead of falling into a compaction loop. Fixes #66462. (#67024) Thanks @altaywtf and @HongzhuLiu. - Plugins/loader: use cached discovery-mode snapshot loads for read-only plugin capability lookups, keep snapshot caches isolated from active Gateway registries, and make same-plugin channel/HTTP route re-registration idempotent so repeated snapshot or hot-reload paths no longer rerun full plugin side effects or accumulate duplicate surfaces. Fixes #51781, #52031, #54181, and #57514. Thanks @livingghost, @okuyam2y, @ShionEria, and @bbshih. - Plugins/loader: reuse the compatible active Gateway registry for broad runtime plugin ensure calls after a gateway-bindable boot load, so non-bundled plugins no longer re-run `register()` during the same boot path. Fixes #69250. Thanks @markthebest12. diff --git a/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts b/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts index e75951452ed..5dd8163037a 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts @@ -31,6 +31,7 @@ let mockedNativeStreaming = false; let mockedBlockStreamingEnabled: boolean | undefined = false; let capturedReplyOptions: { disableBlockStreaming?: boolean } | undefined; let mockedReplyThreadTs: string | undefined = THREAD_TS; +let mockedReplyThreadTsSequence: Array | undefined; let mockedDispatchSequence: Array<{ kind: "tool" | "block" | "final"; payload: { text: string; isError?: boolean; mediaUrl?: string; mediaUrls?: string[] }; @@ -258,12 +259,19 @@ vi.mock("../config.runtime.js", () => ({ vi.mock("../replies.js", () => ({ createSlackReplyDeliveryPlan: () => ({ - peekThreadTs: () => mockedReplyThreadTs, - nextThreadTs: () => mockedReplyThreadTs, + peekThreadTs: () => mockedReplyThreadTsSequence?.[0] ?? mockedReplyThreadTs, + nextThreadTs: () => + mockedReplyThreadTsSequence ? mockedReplyThreadTsSequence.shift() : mockedReplyThreadTs, markSent: () => {}, }), deliverReplies: deliverRepliesMock, readSlackReplyBlocks: () => undefined, + resolveDeliveredSlackReplyThreadTs: (params: { + replyToMode: "off" | "first" | "all" | "batched"; + payloadReplyToId?: string; + replyThreadTs?: string; + }) => + (params.replyToMode === "off" ? undefined : params.payloadReplyToId) ?? params.replyThreadTs, resolveSlackThreadTs: () => mockedReplyThreadTs, })); @@ -322,6 +330,7 @@ describe("dispatchPreparedSlackMessage preview fallback", () => { mockedBlockStreamingEnabled = false; capturedReplyOptions = undefined; mockedReplyThreadTs = THREAD_TS; + mockedReplyThreadTsSequence = undefined; mockedDispatchSequence = [{ kind: "final", payload: { text: FINAL_REPLY_TEXT } }]; createSlackDraftStreamMock.mockReturnValue(createDraftStreamStub()); @@ -406,6 +415,36 @@ describe("dispatchPreparedSlackMessage preview fallback", () => { ); }); + it("keeps multi-part block replies in the first reply thread after the plan is consumed", async () => { + mockedReplyThreadTsSequence = [THREAD_TS, undefined]; + mockedDispatchSequence = [ + { kind: "block", payload: { text: "first block" } }, + { kind: "block", payload: { text: "second block" } }, + ]; + + await dispatchPreparedSlackMessage( + createPreparedSlackMessage({ + replyToMode: "first", + }), + ); + + expect(deliverRepliesMock).toHaveBeenCalledTimes(2); + expect(deliverRepliesMock).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + replyThreadTs: THREAD_TS, + replies: [expect.objectContaining({ text: "first block" })], + }), + ); + expect(deliverRepliesMock).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + replyThreadTs: THREAD_TS, + replies: [expect.objectContaining({ text: "second block" })], + }), + ); + }); + it("does not flush draft previews for media finals before normal delivery", async () => { const draftStream = { ...createDraftStreamStub(), diff --git a/extensions/slack/src/monitor/message-handler/dispatch.ts b/extensions/slack/src/monitor/message-handler/dispatch.ts index 86deec6e4a1..9509c7228d5 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.ts @@ -51,6 +51,7 @@ import { createSlackReplyDeliveryPlan, deliverReplies, readSlackReplyBlocks, + resolveDeliveredSlackReplyThreadTs, resolveSlackThreadTs, } from "../replies.js"; import { createReplyDispatcherWithTyping, dispatchInboundMessage } from "../reply.runtime.js"; @@ -448,8 +449,32 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag let streamSession: SlackStreamSession | null = null; let streamFailed = false; let usedReplyThreadTs: string | undefined; + let usedBlockReplyThreadTs: string | undefined; let observedReplyDelivery = false; const deliveryTracker = createSlackTurnDeliveryTracker(); + const resolveDeliveryThreadTs = (params: { + kind: ReplyDispatchKind; + forcedThreadTs?: string; + }): string | undefined => { + const plannedThreadTs = params.forcedThreadTs ? undefined : replyPlan.nextThreadTs(); + return ( + params.forcedThreadTs ?? + plannedThreadTs ?? + (params.kind === "block" ? usedBlockReplyThreadTs : undefined) + ); + }; + const rememberDeliveredThreadTs = ( + kind: ReplyDispatchKind, + deliveredThreadTs: string | undefined, + ) => { + if (!deliveredThreadTs) { + return; + } + usedReplyThreadTs ??= deliveredThreadTs; + if (kind === "block") { + usedBlockReplyThreadTs = deliveredThreadTs; + } + }; const deliverPendingStreamFallback = async ( session: SlackStreamSession, err: SlackStreamNotDeliveredError, @@ -499,7 +524,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag kind: ReplyDispatchKind; forcedThreadTs?: string; }): Promise => { - const replyThreadTs = params.forcedThreadTs ?? replyPlan.nextThreadTs(); + const replyThreadTs = resolveDeliveryThreadTs(params); if ( deliveryTracker.hasDelivered({ kind: params.kind, @@ -523,10 +548,13 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag ...(slackIdentity ? { identity: slackIdentity } : {}), }); observedReplyDelivery = true; + const deliveredThreadTs = resolveDeliveredSlackReplyThreadTs({ + replyToMode: prepared.replyToMode, + payloadReplyToId: params.payload.replyToId, + replyThreadTs, + }); // Record the thread ts only after confirmed delivery success. - if (replyThreadTs) { - usedReplyThreadTs ??= replyThreadTs; - } + rememberDeliveredThreadTs(params.kind, deliveredThreadTs); replyPlan.markSent(); deliveryTracker.markDelivered({ kind: params.kind, @@ -553,6 +581,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag threadTs: params.session.threadTs, textOverride: params.textOverride, }); + rememberDeliveredThreadTs(params.kind, params.session.threadTs); return true; }; @@ -586,7 +615,10 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag "slack-stream: no reply thread target for stream start, falling back to normal delivery", ); streamFailed = true; - await deliverNormally({ payload: params.payload, kind: params.kind }); + await deliverNormally({ + payload: params.payload, + kind: params.kind, + }); return; } if ( @@ -619,7 +651,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag if (streamSession.delivered) { observedReplyDelivery = true; } - usedReplyThreadTs ??= streamThreadTs; + rememberDeliveredThreadTs(params.kind, streamThreadTs); replyPlan.markSent(); deliveryTracker.markDelivered({ kind: params.kind, @@ -792,7 +824,10 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag }); }, deliverNormally: async () => { - await deliverNormally({ payload, kind: info.kind }); + await deliverNormally({ + payload, + kind: info.kind, + }); }, onPreviewFinalized: (_preview) => { const finalThreadTs = usedReplyThreadTs ?? statusThreadTs; diff --git a/extensions/slack/src/monitor/replies.test.ts b/extensions/slack/src/monitor/replies.test.ts index ce3430a82e1..4ad6f25fe59 100644 --- a/extensions/slack/src/monitor/replies.test.ts +++ b/extensions/slack/src/monitor/replies.test.ts @@ -7,6 +7,7 @@ vi.mock("../send.js", () => ({ let deliverReplies: typeof import("./replies.js").deliverReplies; let createSlackReplyDeliveryPlan: typeof import("./replies.js").createSlackReplyDeliveryPlan; +let resolveDeliveredSlackReplyThreadTs: typeof import("./replies.js").resolveDeliveredSlackReplyThreadTs; let resolveSlackThreadTs: typeof import("./replies.js").resolveSlackThreadTs; import { deliverSlackSlashReplies } from "./replies.js"; @@ -27,8 +28,12 @@ function baseParams(overrides?: Record) { describe("deliverReplies identity passthrough", () => { beforeAll(async () => { - ({ createSlackReplyDeliveryPlan, deliverReplies, resolveSlackThreadTs } = - await import("./replies.js")); + ({ + createSlackReplyDeliveryPlan, + deliverReplies, + resolveDeliveredSlackReplyThreadTs, + resolveSlackThreadTs, + } = await import("./replies.js")); }); beforeEach(() => { @@ -171,6 +176,41 @@ describe("deliverReplies identity passthrough", () => { }); }); +describe("resolveDeliveredSlackReplyThreadTs", () => { + beforeAll(async () => { + ({ resolveDeliveredSlackReplyThreadTs } = await import("./replies.js")); + }); + + it("prefers explicit reply targets when reply tags are enabled", () => { + expect( + resolveDeliveredSlackReplyThreadTs({ + replyToMode: "first", + payloadReplyToId: "explicit-thread", + replyThreadTs: "planned-thread", + }), + ).toBe("explicit-thread"); + }); + + it("ignores explicit reply tags when replyToMode is off", () => { + expect( + resolveDeliveredSlackReplyThreadTs({ + replyToMode: "off", + payloadReplyToId: "explicit-thread", + replyThreadTs: "planned-thread", + }), + ).toBe("planned-thread"); + }); + + it("falls back to the planned reply thread when no explicit reply tag exists", () => { + expect( + resolveDeliveredSlackReplyThreadTs({ + replyToMode: "batched", + replyThreadTs: "planned-thread", + }), + ).toBe("planned-thread"); + }); +}); + describe("resolveSlackThreadTs fallback classification", () => { const threadTs = "1234567890.123456"; const messageTs = "9999999999.999999"; diff --git a/extensions/slack/src/monitor/replies.ts b/extensions/slack/src/monitor/replies.ts index 8c5ef919064..ff64ee9566a 100644 --- a/extensions/slack/src/monitor/replies.ts +++ b/extensions/slack/src/monitor/replies.ts @@ -21,6 +21,17 @@ export function readSlackReplyBlocks(payload: ReplyPayload) { return resolveSlackReplyBlocks(payload); } +export function resolveDeliveredSlackReplyThreadTs(params: { + replyToMode: "off" | "first" | "all" | "batched"; + payloadReplyToId?: string; + replyThreadTs?: string; +}): string | undefined { + // Keep reply tags opt-in: when replyToMode is off, explicit reply tags + // must not force threading. + const inlineReplyToId = params.replyToMode === "off" ? undefined : params.payloadReplyToId; + return inlineReplyToId ?? params.replyThreadTs; +} + export async function deliverReplies(params: { cfg: OpenClawConfig; replies: ReplyPayload[]; @@ -34,10 +45,11 @@ export async function deliverReplies(params: { identity?: SlackSendIdentity; }) { for (const payload of params.replies) { - // Keep reply tags opt-in: when replyToMode is off, explicit reply tags - // must not force threading. - const inlineReplyToId = params.replyToMode === "off" ? undefined : payload.replyToId; - const threadTs = inlineReplyToId ?? params.replyThreadTs; + const threadTs = resolveDeliveredSlackReplyThreadTs({ + replyToMode: params.replyToMode, + payloadReplyToId: payload.replyToId, + replyThreadTs: params.replyThreadTs, + }); const reply = resolveSendableOutboundReplyParts(payload); const slackBlocks = readSlackReplyBlocks(payload); if (!reply.hasContent && !slackBlocks?.length) {