diff --git a/CHANGELOG.md b/CHANGELOG.md index 98d58d6a8b0..99db8700713 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -94,6 +94,7 @@ Docs: https://docs.openclaw.ai - Matrix/CLI: pass resolved runtime config into verify commands, so `openclaw matrix verify status` and sibling verify subcommands no longer crash before acquiring the Matrix client. Fixes #70992. (#71102) Thanks @luyao618. - Gateway/startup: await startup sidecars before channel monitors report ready, reducing Discord and plugin startup races while still keeping gateway boot observability intact. - Plugins/Google Meet: report required manual actions for Chrome joins, use browser automation for Meet entry, and persist the private-WS node opt-in so paired-node realtime sessions keep their intended network policy. +- Slack: route native stream fallback replies through the normal chunked sender so long buffered Slack Connect responses are not dropped or duplicated. (#71124) Thanks @martingarramon. ## 2026.4.23 diff --git a/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts b/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts index bb43f08c628..922327808e0 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts @@ -409,7 +409,7 @@ describe("dispatchPreparedSlackMessage preview fallback", () => { expect(deliverRepliesMock).toHaveBeenCalledTimes(1); }); - it("posts pending native stream text when finalize fails before the SDK buffer flushes", async () => { + it("routes pending native stream text through chunked sender when finalize fails before the SDK buffer flushes", async () => { mockedNativeStreaming = true; const session = { channel: "C123", @@ -425,17 +425,18 @@ describe("dispatchPreparedSlackMessage preview fallback", () => { await dispatchPreparedSlackMessage(createPreparedSlackMessage()); - expect(deliverRepliesMock).not.toHaveBeenCalled(); - expect(postMessageMock).toHaveBeenCalledTimes(1); - expect(postMessageMock).toHaveBeenCalledWith({ - channel: "C123", - thread_ts: THREAD_TS, - text: FINAL_REPLY_TEXT, - }); + expect(postMessageMock).not.toHaveBeenCalled(); + expect(deliverRepliesMock).toHaveBeenCalledTimes(1); + expect(deliverRepliesMock).toHaveBeenCalledWith( + expect.objectContaining({ + replyThreadTs: THREAD_TS, + replies: [expect.objectContaining({ text: FINAL_REPLY_TEXT })], + }), + ); expect(session.stopped).toBe(true); }); - it("posts all pending native stream text when an append flush fails", async () => { + it("routes all pending native stream text through chunked sender when an append flush fails", async () => { mockedNativeStreaming = true; mockedDispatchSequence = [ { kind: "block", payload: { text: "first buffered" } }, @@ -456,13 +457,87 @@ describe("dispatchPreparedSlackMessage preview fallback", () => { await dispatchPreparedSlackMessage(createPreparedSlackMessage()); - expect(deliverRepliesMock).not.toHaveBeenCalled(); - expect(postMessageMock).toHaveBeenCalledTimes(1); - expect(postMessageMock).toHaveBeenCalledWith({ - channel: "C123", - thread_ts: THREAD_TS, - text: "first buffered\nsecond flushes", - }); + expect(postMessageMock).not.toHaveBeenCalled(); + expect(deliverRepliesMock).toHaveBeenCalledTimes(1); + expect(deliverRepliesMock).toHaveBeenCalledWith( + expect.objectContaining({ + replyThreadTs: THREAD_TS, + replies: [expect.objectContaining({ text: "first buffered\nsecond flushes" })], + }), + ); expect(stopSlackStreamMock).not.toHaveBeenCalled(); }); + + it("forwards oversized pending stream text to the chunked sender intact (chunking is the sender's responsibility)", async () => { + mockedNativeStreaming = true; + // SLACK_TEXT_LIMIT mocks to 4000; use > 1 message worth of content. + const oversized = "x".repeat(8500); + const session = { + channel: "C123", + threadTs: THREAD_TS, + stopped: false, + delivered: false, + pendingText: oversized, + }; + startSlackStreamMock.mockResolvedValueOnce(session); + stopSlackStreamMock.mockRejectedValueOnce( + new TestSlackStreamNotDeliveredError(oversized, "team_not_found"), + ); + + await dispatchPreparedSlackMessage(createPreparedSlackMessage()); + + expect(postMessageMock).not.toHaveBeenCalled(); + expect(deliverRepliesMock).toHaveBeenCalledTimes(1); + expect(deliverRepliesMock).toHaveBeenCalledWith( + expect.objectContaining({ + replyThreadTs: THREAD_TS, + textLimit: 4000, + replies: [expect.objectContaining({ text: oversized })], + }), + ); + expect(session.stopped).toBe(true); + }); + + it("routes full pendingText (earlier buffered + failing chunk) through chunked sender on non-benign append failure", async () => { + mockedNativeStreaming = true; + mockedDispatchSequence = [ + { kind: "block", payload: { text: "first buffered" } }, + { kind: "final", payload: { text: "second payload" } }, + ]; + const session = { + channel: "C123", + threadTs: THREAD_TS, + stopped: false, + delivered: false, + pendingText: "first buffered", + }; + startSlackStreamMock.mockResolvedValueOnce(session); + // Non-benign error (plain Error, NOT SlackStreamNotDeliveredError). + // appendSlackStream mutates pendingText BEFORE throwing so the full + // buffer (earlier chunk + current chunk) must be preserved and routed + // through the chunked fallback - not dropped or partially re-sent. + appendSlackStreamMock.mockImplementationOnce(async () => { + session.pendingText += "\nsecond payload"; + throw new Error("network socket closed"); + }); + + await dispatchPreparedSlackMessage(createPreparedSlackMessage()); + + // Chunked fallback sent the FULL pendingText, not just the failing + // payload (so the earlier buffered chunk is not dropped). + expect(deliverRepliesMock).toHaveBeenCalledTimes(1); + expect(deliverRepliesMock).toHaveBeenCalledWith( + expect.objectContaining({ + replyThreadTs: THREAD_TS, + replies: [expect.objectContaining({ text: "first buffered\nsecond payload" })], + }), + ); + // Session was marked fallback-delivered by deliverPendingStreamFallback, + // so finalize skips stopSlackStream. + expect(session.pendingText).toBe(""); + expect(session.stopped).toBe(true); + expect(stopSlackStreamMock).not.toHaveBeenCalled(); + // No raw postMessage path was invoked. + expect(postMessageMock).not.toHaveBeenCalled(); + }); }); diff --git a/extensions/slack/src/monitor/message-handler/dispatch.ts b/extensions/slack/src/monitor/message-handler/dispatch.ts index 01bb953ae39..b2c967fc13f 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.ts @@ -436,31 +436,39 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag err: SlackStreamNotDeliveredError, ): Promise => { // The Slack SDK still owns this text in-memory; no streaming API call has - // acknowledged it. Send it once through normal chat.postMessage. + // acknowledged it. Route through deliverReplies so pendingText that + // exceeds Slack's per-message text limit still lands (a single + // chat.postMessage would have failed with msg_too_long), and so the + // fallback respects the configured replyToMode/identity the same way + // normal replies do. const fallbackText = err.pendingText.trim(); if (!fallbackText) { return false; } try { - // Rename-bind to dodge eslint-plugin-unicorn/require-post-message-target-origin - // which cannot distinguish Slack chat.postMessage from window.postMessage. - const postChatMessage = ctx.app.client.chat.postMessage.bind(ctx.app.client.chat); - await postChatMessage({ - channel: session.channel, - thread_ts: session.threadTs, - text: fallbackText, + await deliverReplies({ + cfg: ctx.cfg, + replies: [{ text: fallbackText } as ReplyPayload], + target: prepared.replyTarget, + token: ctx.botToken, + accountId: account.accountId, + runtime, + textLimit: ctx.textLimit, + replyThreadTs: session.threadTs, + replyToMode: prepared.replyToMode, + ...(slackIdentity ? { identity: slackIdentity } : {}), }); markSlackStreamFallbackDelivered(session); observedReplyDelivery = true; usedReplyThreadTs ??= session.threadTs; logVerbose( - `slack-stream: streamed delivery failed (${err.slackCode}); delivered ${fallbackText.length} chars via chat.postMessage fallback`, + `slack-stream: streamed delivery failed (${err.slackCode}); delivered ${fallbackText.length} chars via deliverReplies fallback`, ); return true; } catch (postErr) { runtime.error?.( danger( - `slack-stream: fallback chat.postMessage failed after ${err.slackCode}: ${formatErrorMessage(postErr)}`, + `slack-stream: fallback deliverReplies failed after ${err.slackCode}: ${formatErrorMessage(postErr)}`, ), ); return false; @@ -636,6 +644,29 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag danger(`slack-stream: streaming API call failed: ${formatErrorMessage(err)}, falling back`), ); streamFailed = true; + // Non-benign streaming errors leave `pendingText` populated with every + // buffered chunk since the last flush (appendSlackStream accumulates + // into pendingText BEFORE the SDK call, so the failing chunk is + // included too). Route the full buffer through the chunked fallback so + // earlier chunks aren't lost, then skip deliverNormally - pendingText + // already contains this payload's text. + if (streamSession && streamSession.pendingText) { + const bufferedFallbackErr = new SlackStreamNotDeliveredError( + streamSession.pendingText, + "unknown", + ); + const delivered = await deliverPendingStreamFallback(streamSession, bufferedFallbackErr); + if (delivered) { + replyPlan.markSent(); + deliveryTracker.markDelivered({ + kind: params.kind, + payload: params.payload, + threadTs: streamSession.threadTs, + textOverride: text, + }); + return; + } + } await deliverNormally({ payload: params.payload, kind: params.kind, diff --git a/extensions/slack/src/send.blocks.test.ts b/extensions/slack/src/send.blocks.test.ts index d95406ea13b..391388e1396 100644 --- a/extensions/slack/src/send.blocks.test.ts +++ b/extensions/slack/src/send.blocks.test.ts @@ -74,6 +74,23 @@ describe("sendMessageSlack chunking", () => { }), ); }); + + it("splits oversized fallback text through the normal Slack sender", async () => { + const client = createSlackSendTestClient(); + const message = "a".repeat(8500); + + await sendMessageSlack("channel:C123", message, { + token: "xoxb-test", + cfg: SLACK_TEST_CFG, + client, + }); + + const postedTexts = client.chat.postMessage.mock.calls.map((call) => call[0].text); + + expect(postedTexts).toHaveLength(2); + expect(postedTexts.every((text) => typeof text === "string" && text.length <= 8000)).toBe(true); + expect(postedTexts.join("")).toBe(message); + }); }); describe("sendMessageSlack blocks", () => { diff --git a/extensions/slack/src/streaming.ts b/extensions/slack/src/streaming.ts index 44014a3785e..4a0d422cc63 100644 --- a/extensions/slack/src/streaming.ts +++ b/extensions/slack/src/streaming.ts @@ -74,7 +74,7 @@ export type StopSlackStreamParams = { * Thrown when Slack rejects a stream flush/finalize with a recipient-resolution * error (see {@link BENIGN_SLACK_FINALIZE_ERROR_CODES}) while text is still * only buffered locally by the Slack SDK. Carries the pending text so the - * caller can deliver it via a normal `chat.postMessage`. + * caller can deliver it via the normal Slack reply path. */ export class SlackStreamNotDeliveredError extends Error { readonly pendingText: string; @@ -205,7 +205,7 @@ export async function appendSlackStream(params: AppendSlackStreamParams): Promis * If the same benign error fires while text is still only buffered locally * (e.g. short replies that never exceeded the SDK's buffer_size), this * function throws a {@link SlackStreamNotDeliveredError} carrying that pending - * text so the caller can deliver it via `chat.postMessage`. + * text so the caller can deliver it through the normal Slack reply path. * * All other errors propagate unchanged. */