From 150053bc86996cbc8343e213e285b275d1f03a9b Mon Sep 17 00:00:00 2001 From: martingarramon Date: Fri, 24 Apr 2026 18:50:18 -0300 Subject: [PATCH] fix(slack): route stream-fallback delivery through chunked sender (follow-up to #70370) (#71124) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(slack): route stream-fallback delivery through chunked sender deliverPendingStreamFallback was calling chat.postMessage directly for err.pendingText, which bypasses the chunked reply path used everywhere else. For Slack Connect cases where appendSlackStream throws SlackStreamNotDeliveredError with a large pending buffer, the single raw post could fail (msg_too_long) and drop the unsent tail. Two changes: 1. deliverPendingStreamFallback now routes through deliverReplies so long pendingText is chunked by the normal sender and the fallback honors the configured replyToMode / identity. 2. The non-benign streaming-error branch in deliverWithStreaming now clears the session via markSlackStreamFallbackDelivered before falling back to deliverNormally. Without this, pendingText stays populated and the post-loop finalize (stopSlackStream → SlackStreamNotDeliveredError → fallback) re-posts the same chunk that deliverNormally already sent. Addresses the three Codex P1 findings on #70370 about bypassing the chunked sender, and the related "avoid reposting buffered text after append fallback" P1 about duplicate delivery. Tests updated to assert deliverReplies routing (instead of raw postMessage) and a new case covers the non-benign-error dedup. Follow-up to #70370. * fix(slack): preserve pending buffered text on non-benign stream errors Address Codex P1 on #71124: `markSlackStreamFallbackDelivered` was clearing `pendingText` before `deliverNormally` ran, so any earlier buffered chunk was lost. E.g. chunk A buffered in the SDK, then appending chunk B throws a generic network error → previous fix dropped A+B and only sent B via `deliverNormally`, silently truncating the final reply. Route the full buffered `pendingText` through `deliverPendingStreamFallback` with a synthetic `SlackStreamNotDeliveredError`, then skip `deliverNormally` entirely (pendingText already contains this payload's text, per `appendSlackStream` accumulating before throw). If the chunked fallback fails, fall back to `deliverNormally` so at least the current payload lands. Test updated to assert the full pendingText ("first buffered\nsecond payload") gets routed through the chunked sender, not the chunk-B-only partial send. * fix(slack): harden stream fallback docs and chunking test (#71124) --------- Co-authored-by: Peter Steinberger --- CHANGELOG.md | 1 + .../dispatch.preview-fallback.test.ts | 107 +++++++++++++++--- .../src/monitor/message-handler/dispatch.ts | 51 +++++++-- extensions/slack/src/send.blocks.test.ts | 17 +++ extensions/slack/src/streaming.ts | 4 +- 5 files changed, 152 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 98d58d6a8b0..99db8700713 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -94,6 +94,7 @@ Docs: https://docs.openclaw.ai - Matrix/CLI: pass resolved runtime config into verify commands, so `openclaw matrix verify status` and sibling verify subcommands no longer crash before acquiring the Matrix client. Fixes #70992. (#71102) Thanks @luyao618. - Gateway/startup: await startup sidecars before channel monitors report ready, reducing Discord and plugin startup races while still keeping gateway boot observability intact. - Plugins/Google Meet: report required manual actions for Chrome joins, use browser automation for Meet entry, and persist the private-WS node opt-in so paired-node realtime sessions keep their intended network policy. +- Slack: route native stream fallback replies through the normal chunked sender so long buffered Slack Connect responses are not dropped or duplicated. (#71124) Thanks @martingarramon. ## 2026.4.23 diff --git a/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts b/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts index bb43f08c628..922327808e0 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts @@ -409,7 +409,7 @@ describe("dispatchPreparedSlackMessage preview fallback", () => { expect(deliverRepliesMock).toHaveBeenCalledTimes(1); }); - it("posts pending native stream text when finalize fails before the SDK buffer flushes", async () => { + it("routes pending native stream text through chunked sender when finalize fails before the SDK buffer flushes", async () => { mockedNativeStreaming = true; const session = { channel: "C123", @@ -425,17 +425,18 @@ describe("dispatchPreparedSlackMessage preview fallback", () => { await dispatchPreparedSlackMessage(createPreparedSlackMessage()); - expect(deliverRepliesMock).not.toHaveBeenCalled(); - expect(postMessageMock).toHaveBeenCalledTimes(1); - expect(postMessageMock).toHaveBeenCalledWith({ - channel: "C123", - thread_ts: THREAD_TS, - text: FINAL_REPLY_TEXT, - }); + expect(postMessageMock).not.toHaveBeenCalled(); + expect(deliverRepliesMock).toHaveBeenCalledTimes(1); + expect(deliverRepliesMock).toHaveBeenCalledWith( + expect.objectContaining({ + replyThreadTs: THREAD_TS, + replies: [expect.objectContaining({ text: FINAL_REPLY_TEXT })], + }), + ); expect(session.stopped).toBe(true); }); - it("posts all pending native stream text when an append flush fails", async () => { + it("routes all pending native stream text through chunked sender when an append flush fails", async () => { mockedNativeStreaming = true; mockedDispatchSequence = [ { kind: "block", payload: { text: "first buffered" } }, @@ -456,13 +457,87 @@ describe("dispatchPreparedSlackMessage preview fallback", () => { await dispatchPreparedSlackMessage(createPreparedSlackMessage()); - expect(deliverRepliesMock).not.toHaveBeenCalled(); - expect(postMessageMock).toHaveBeenCalledTimes(1); - expect(postMessageMock).toHaveBeenCalledWith({ - channel: "C123", - thread_ts: THREAD_TS, - text: "first buffered\nsecond flushes", - }); + expect(postMessageMock).not.toHaveBeenCalled(); + expect(deliverRepliesMock).toHaveBeenCalledTimes(1); + expect(deliverRepliesMock).toHaveBeenCalledWith( + expect.objectContaining({ + replyThreadTs: THREAD_TS, + replies: [expect.objectContaining({ text: "first buffered\nsecond flushes" })], + }), + ); expect(stopSlackStreamMock).not.toHaveBeenCalled(); }); + + it("forwards oversized pending stream text to the chunked sender intact (chunking is the sender's responsibility)", async () => { + mockedNativeStreaming = true; + // SLACK_TEXT_LIMIT mocks to 4000; use > 1 message worth of content. + const oversized = "x".repeat(8500); + const session = { + channel: "C123", + threadTs: THREAD_TS, + stopped: false, + delivered: false, + pendingText: oversized, + }; + startSlackStreamMock.mockResolvedValueOnce(session); + stopSlackStreamMock.mockRejectedValueOnce( + new TestSlackStreamNotDeliveredError(oversized, "team_not_found"), + ); + + await dispatchPreparedSlackMessage(createPreparedSlackMessage()); + + expect(postMessageMock).not.toHaveBeenCalled(); + expect(deliverRepliesMock).toHaveBeenCalledTimes(1); + expect(deliverRepliesMock).toHaveBeenCalledWith( + expect.objectContaining({ + replyThreadTs: THREAD_TS, + textLimit: 4000, + replies: [expect.objectContaining({ text: oversized })], + }), + ); + expect(session.stopped).toBe(true); + }); + + it("routes full pendingText (earlier buffered + failing chunk) through chunked sender on non-benign append failure", async () => { + mockedNativeStreaming = true; + mockedDispatchSequence = [ + { kind: "block", payload: { text: "first buffered" } }, + { kind: "final", payload: { text: "second payload" } }, + ]; + const session = { + channel: "C123", + threadTs: THREAD_TS, + stopped: false, + delivered: false, + pendingText: "first buffered", + }; + startSlackStreamMock.mockResolvedValueOnce(session); + // Non-benign error (plain Error, NOT SlackStreamNotDeliveredError). + // appendSlackStream mutates pendingText BEFORE throwing so the full + // buffer (earlier chunk + current chunk) must be preserved and routed + // through the chunked fallback - not dropped or partially re-sent. + appendSlackStreamMock.mockImplementationOnce(async () => { + session.pendingText += "\nsecond payload"; + throw new Error("network socket closed"); + }); + + await dispatchPreparedSlackMessage(createPreparedSlackMessage()); + + // Chunked fallback sent the FULL pendingText, not just the failing + // payload (so the earlier buffered chunk is not dropped). + expect(deliverRepliesMock).toHaveBeenCalledTimes(1); + expect(deliverRepliesMock).toHaveBeenCalledWith( + expect.objectContaining({ + replyThreadTs: THREAD_TS, + replies: [expect.objectContaining({ text: "first buffered\nsecond payload" })], + }), + ); + // Session was marked fallback-delivered by deliverPendingStreamFallback, + // so finalize skips stopSlackStream. + expect(session.pendingText).toBe(""); + expect(session.stopped).toBe(true); + expect(stopSlackStreamMock).not.toHaveBeenCalled(); + // No raw postMessage path was invoked. + expect(postMessageMock).not.toHaveBeenCalled(); + }); }); diff --git a/extensions/slack/src/monitor/message-handler/dispatch.ts b/extensions/slack/src/monitor/message-handler/dispatch.ts index 01bb953ae39..b2c967fc13f 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.ts @@ -436,31 +436,39 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag err: SlackStreamNotDeliveredError, ): Promise => { // The Slack SDK still owns this text in-memory; no streaming API call has - // acknowledged it. Send it once through normal chat.postMessage. + // acknowledged it. Route through deliverReplies so pendingText that + // exceeds Slack's per-message text limit still lands (a single + // chat.postMessage would have failed with msg_too_long), and so the + // fallback respects the configured replyToMode/identity the same way + // normal replies do. const fallbackText = err.pendingText.trim(); if (!fallbackText) { return false; } try { - // Rename-bind to dodge eslint-plugin-unicorn/require-post-message-target-origin - // which cannot distinguish Slack chat.postMessage from window.postMessage. - const postChatMessage = ctx.app.client.chat.postMessage.bind(ctx.app.client.chat); - await postChatMessage({ - channel: session.channel, - thread_ts: session.threadTs, - text: fallbackText, + await deliverReplies({ + cfg: ctx.cfg, + replies: [{ text: fallbackText } as ReplyPayload], + target: prepared.replyTarget, + token: ctx.botToken, + accountId: account.accountId, + runtime, + textLimit: ctx.textLimit, + replyThreadTs: session.threadTs, + replyToMode: prepared.replyToMode, + ...(slackIdentity ? { identity: slackIdentity } : {}), }); markSlackStreamFallbackDelivered(session); observedReplyDelivery = true; usedReplyThreadTs ??= session.threadTs; logVerbose( - `slack-stream: streamed delivery failed (${err.slackCode}); delivered ${fallbackText.length} chars via chat.postMessage fallback`, + `slack-stream: streamed delivery failed (${err.slackCode}); delivered ${fallbackText.length} chars via deliverReplies fallback`, ); return true; } catch (postErr) { runtime.error?.( danger( - `slack-stream: fallback chat.postMessage failed after ${err.slackCode}: ${formatErrorMessage(postErr)}`, + `slack-stream: fallback deliverReplies failed after ${err.slackCode}: ${formatErrorMessage(postErr)}`, ), ); return false; @@ -636,6 +644,29 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag danger(`slack-stream: streaming API call failed: ${formatErrorMessage(err)}, falling back`), ); streamFailed = true; + // Non-benign streaming errors leave `pendingText` populated with every + // buffered chunk since the last flush (appendSlackStream accumulates + // into pendingText BEFORE the SDK call, so the failing chunk is + // included too). Route the full buffer through the chunked fallback so + // earlier chunks aren't lost, then skip deliverNormally - pendingText + // already contains this payload's text. + if (streamSession && streamSession.pendingText) { + const bufferedFallbackErr = new SlackStreamNotDeliveredError( + streamSession.pendingText, + "unknown", + ); + const delivered = await deliverPendingStreamFallback(streamSession, bufferedFallbackErr); + if (delivered) { + replyPlan.markSent(); + deliveryTracker.markDelivered({ + kind: params.kind, + payload: params.payload, + threadTs: streamSession.threadTs, + textOverride: text, + }); + return; + } + } await deliverNormally({ payload: params.payload, kind: params.kind, diff --git a/extensions/slack/src/send.blocks.test.ts b/extensions/slack/src/send.blocks.test.ts index d95406ea13b..391388e1396 100644 --- a/extensions/slack/src/send.blocks.test.ts +++ b/extensions/slack/src/send.blocks.test.ts @@ -74,6 +74,23 @@ describe("sendMessageSlack chunking", () => { }), ); }); + + it("splits oversized fallback text through the normal Slack sender", async () => { + const client = createSlackSendTestClient(); + const message = "a".repeat(8500); + + await sendMessageSlack("channel:C123", message, { + token: "xoxb-test", + cfg: SLACK_TEST_CFG, + client, + }); + + const postedTexts = client.chat.postMessage.mock.calls.map((call) => call[0].text); + + expect(postedTexts).toHaveLength(2); + expect(postedTexts.every((text) => typeof text === "string" && text.length <= 8000)).toBe(true); + expect(postedTexts.join("")).toBe(message); + }); }); describe("sendMessageSlack blocks", () => { diff --git a/extensions/slack/src/streaming.ts b/extensions/slack/src/streaming.ts index 44014a3785e..4a0d422cc63 100644 --- a/extensions/slack/src/streaming.ts +++ b/extensions/slack/src/streaming.ts @@ -74,7 +74,7 @@ export type StopSlackStreamParams = { * Thrown when Slack rejects a stream flush/finalize with a recipient-resolution * error (see {@link BENIGN_SLACK_FINALIZE_ERROR_CODES}) while text is still * only buffered locally by the Slack SDK. Carries the pending text so the - * caller can deliver it via a normal `chat.postMessage`. + * caller can deliver it via the normal Slack reply path. */ export class SlackStreamNotDeliveredError extends Error { readonly pendingText: string; @@ -205,7 +205,7 @@ export async function appendSlackStream(params: AppendSlackStreamParams): Promis * If the same benign error fires while text is still only buffered locally * (e.g. short replies that never exceeded the SDK's buffer_size), this * function throws a {@link SlackStreamNotDeliveredError} carrying that pending - * text so the caller can deliver it via `chat.postMessage`. + * text so the caller can deliver it through the normal Slack reply path. * * All other errors propagate unchanged. */