fix(slack): route stream-fallback delivery through chunked sender (follow-up to #70370) (#71124)

* fix(slack): route stream-fallback delivery through chunked sender

deliverPendingStreamFallback was calling chat.postMessage directly for
err.pendingText, which bypasses the chunked reply path used everywhere
else. For Slack Connect cases where appendSlackStream throws
SlackStreamNotDeliveredError with a large pending buffer, the single
raw post could fail (msg_too_long) and drop the unsent tail.

Two changes:

1. deliverPendingStreamFallback now routes through deliverReplies so
   long pendingText is chunked by the normal sender and the fallback
   honors the configured replyToMode / identity.

2. The non-benign streaming-error branch in deliverWithStreaming now
   clears the session via markSlackStreamFallbackDelivered before
   falling back to deliverNormally. Without this, pendingText stays
   populated and the post-loop finalize (stopSlackStream →
   SlackStreamNotDeliveredError → fallback) re-posts the same chunk
   that deliverNormally already sent.

Addresses the three Codex P1 findings on #70370 about bypassing the
chunked sender, and the related "avoid reposting buffered text after
append fallback" P1 about duplicate delivery. Tests updated to assert
deliverReplies routing (instead of raw postMessage) and a new case
covers the non-benign-error dedup.

Follow-up to #70370.

* fix(slack): preserve pending buffered text on non-benign stream errors

Address Codex P1 on #71124: `markSlackStreamFallbackDelivered` was
clearing `pendingText` before `deliverNormally` ran, so any earlier
buffered chunk was lost. E.g. chunk A buffered in the SDK, then
appending chunk B throws a generic network error → previous fix
dropped A+B and only sent B via `deliverNormally`, silently truncating
the final reply.

Route the full buffered `pendingText` through
`deliverPendingStreamFallback` with a synthetic
`SlackStreamNotDeliveredError`, then skip `deliverNormally` entirely
(pendingText already contains this payload's text, per
`appendSlackStream` accumulating before throw). If the chunked
fallback fails, fall back to `deliverNormally` so at least the current
payload lands.

Test updated to assert the full pendingText ("first buffered\nsecond
payload") gets routed through the chunked sender, not the
chunk-B-only partial send.

* fix(slack): harden stream fallback docs and chunking test (#71124)

---------

Co-authored-by: Peter Steinberger <steipete@gmail.com>
This commit is contained in:
martingarramon
2026-04-24 18:50:18 -03:00
committed by GitHub
parent b0c9810b0f
commit 150053bc86
5 changed files with 152 additions and 28 deletions

View File

@@ -94,6 +94,7 @@ Docs: https://docs.openclaw.ai
- Matrix/CLI: pass resolved runtime config into verify commands, so `openclaw matrix verify status` and sibling verify subcommands no longer crash before acquiring the Matrix client. Fixes #70992. (#71102) Thanks @luyao618.
- Gateway/startup: await startup sidecars before channel monitors report ready, reducing Discord and plugin startup races while still keeping gateway boot observability intact.
- Plugins/Google Meet: report required manual actions for Chrome joins, use browser automation for Meet entry, and persist the private-WS node opt-in so paired-node realtime sessions keep their intended network policy.
- Slack: route native stream fallback replies through the normal chunked sender so long buffered Slack Connect responses are not dropped or duplicated. (#71124) Thanks @martingarramon.
## 2026.4.23

View File

@@ -409,7 +409,7 @@ describe("dispatchPreparedSlackMessage preview fallback", () => {
expect(deliverRepliesMock).toHaveBeenCalledTimes(1);
});
it("posts pending native stream text when finalize fails before the SDK buffer flushes", async () => {
it("routes pending native stream text through chunked sender when finalize fails before the SDK buffer flushes", async () => {
mockedNativeStreaming = true;
const session = {
channel: "C123",
@@ -425,17 +425,18 @@ describe("dispatchPreparedSlackMessage preview fallback", () => {
await dispatchPreparedSlackMessage(createPreparedSlackMessage());
expect(deliverRepliesMock).not.toHaveBeenCalled();
expect(postMessageMock).toHaveBeenCalledTimes(1);
expect(postMessageMock).toHaveBeenCalledWith({
channel: "C123",
thread_ts: THREAD_TS,
text: FINAL_REPLY_TEXT,
});
expect(postMessageMock).not.toHaveBeenCalled();
expect(deliverRepliesMock).toHaveBeenCalledTimes(1);
expect(deliverRepliesMock).toHaveBeenCalledWith(
expect.objectContaining({
replyThreadTs: THREAD_TS,
replies: [expect.objectContaining({ text: FINAL_REPLY_TEXT })],
}),
);
expect(session.stopped).toBe(true);
});
it("posts all pending native stream text when an append flush fails", async () => {
it("routes all pending native stream text through chunked sender when an append flush fails", async () => {
mockedNativeStreaming = true;
mockedDispatchSequence = [
{ kind: "block", payload: { text: "first buffered" } },
@@ -456,13 +457,87 @@ describe("dispatchPreparedSlackMessage preview fallback", () => {
await dispatchPreparedSlackMessage(createPreparedSlackMessage());
expect(deliverRepliesMock).not.toHaveBeenCalled();
expect(postMessageMock).toHaveBeenCalledTimes(1);
expect(postMessageMock).toHaveBeenCalledWith({
channel: "C123",
thread_ts: THREAD_TS,
text: "first buffered\nsecond flushes",
});
expect(postMessageMock).not.toHaveBeenCalled();
expect(deliverRepliesMock).toHaveBeenCalledTimes(1);
expect(deliverRepliesMock).toHaveBeenCalledWith(
expect.objectContaining({
replyThreadTs: THREAD_TS,
replies: [expect.objectContaining({ text: "first buffered\nsecond flushes" })],
}),
);
expect(stopSlackStreamMock).not.toHaveBeenCalled();
});
it("forwards oversized pending stream text to the chunked sender intact (chunking is the sender's responsibility)", async () => {
mockedNativeStreaming = true;
// SLACK_TEXT_LIMIT mocks to 4000; use > 1 message worth of content.
const oversized = "x".repeat(8500);
const session = {
channel: "C123",
threadTs: THREAD_TS,
stopped: false,
delivered: false,
pendingText: oversized,
};
startSlackStreamMock.mockResolvedValueOnce(session);
stopSlackStreamMock.mockRejectedValueOnce(
new TestSlackStreamNotDeliveredError(oversized, "team_not_found"),
);
await dispatchPreparedSlackMessage(createPreparedSlackMessage());
expect(postMessageMock).not.toHaveBeenCalled();
expect(deliverRepliesMock).toHaveBeenCalledTimes(1);
expect(deliverRepliesMock).toHaveBeenCalledWith(
expect.objectContaining({
replyThreadTs: THREAD_TS,
textLimit: 4000,
replies: [expect.objectContaining({ text: oversized })],
}),
);
expect(session.stopped).toBe(true);
});
it("routes full pendingText (earlier buffered + failing chunk) through chunked sender on non-benign append failure", async () => {
mockedNativeStreaming = true;
mockedDispatchSequence = [
{ kind: "block", payload: { text: "first buffered" } },
{ kind: "final", payload: { text: "second payload" } },
];
const session = {
channel: "C123",
threadTs: THREAD_TS,
stopped: false,
delivered: false,
pendingText: "first buffered",
};
startSlackStreamMock.mockResolvedValueOnce(session);
// Non-benign error (plain Error, NOT SlackStreamNotDeliveredError).
// appendSlackStream mutates pendingText BEFORE throwing so the full
// buffer (earlier chunk + current chunk) must be preserved and routed
// through the chunked fallback - not dropped or partially re-sent.
appendSlackStreamMock.mockImplementationOnce(async () => {
session.pendingText += "\nsecond payload";
throw new Error("network socket closed");
});
await dispatchPreparedSlackMessage(createPreparedSlackMessage());
// Chunked fallback sent the FULL pendingText, not just the failing
// payload (so the earlier buffered chunk is not dropped).
expect(deliverRepliesMock).toHaveBeenCalledTimes(1);
expect(deliverRepliesMock).toHaveBeenCalledWith(
expect.objectContaining({
replyThreadTs: THREAD_TS,
replies: [expect.objectContaining({ text: "first buffered\nsecond payload" })],
}),
);
// Session was marked fallback-delivered by deliverPendingStreamFallback,
// so finalize skips stopSlackStream.
expect(session.pendingText).toBe("");
expect(session.stopped).toBe(true);
expect(stopSlackStreamMock).not.toHaveBeenCalled();
// No raw postMessage path was invoked.
expect(postMessageMock).not.toHaveBeenCalled();
});
});

View File

@@ -436,31 +436,39 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
err: SlackStreamNotDeliveredError,
): Promise<boolean> => {
// The Slack SDK still owns this text in-memory; no streaming API call has
// acknowledged it. Send it once through normal chat.postMessage.
// acknowledged it. Route through deliverReplies so pendingText that
// exceeds Slack's per-message text limit still lands (a single
// chat.postMessage would have failed with msg_too_long), and so the
// fallback respects the configured replyToMode/identity the same way
// normal replies do.
const fallbackText = err.pendingText.trim();
if (!fallbackText) {
return false;
}
try {
// Rename-bind to dodge eslint-plugin-unicorn/require-post-message-target-origin
// which cannot distinguish Slack chat.postMessage from window.postMessage.
const postChatMessage = ctx.app.client.chat.postMessage.bind(ctx.app.client.chat);
await postChatMessage({
channel: session.channel,
thread_ts: session.threadTs,
text: fallbackText,
await deliverReplies({
cfg: ctx.cfg,
replies: [{ text: fallbackText } as ReplyPayload],
target: prepared.replyTarget,
token: ctx.botToken,
accountId: account.accountId,
runtime,
textLimit: ctx.textLimit,
replyThreadTs: session.threadTs,
replyToMode: prepared.replyToMode,
...(slackIdentity ? { identity: slackIdentity } : {}),
});
markSlackStreamFallbackDelivered(session);
observedReplyDelivery = true;
usedReplyThreadTs ??= session.threadTs;
logVerbose(
`slack-stream: streamed delivery failed (${err.slackCode}); delivered ${fallbackText.length} chars via chat.postMessage fallback`,
`slack-stream: streamed delivery failed (${err.slackCode}); delivered ${fallbackText.length} chars via deliverReplies fallback`,
);
return true;
} catch (postErr) {
runtime.error?.(
danger(
`slack-stream: fallback chat.postMessage failed after ${err.slackCode}: ${formatErrorMessage(postErr)}`,
`slack-stream: fallback deliverReplies failed after ${err.slackCode}: ${formatErrorMessage(postErr)}`,
),
);
return false;
@@ -636,6 +644,29 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
danger(`slack-stream: streaming API call failed: ${formatErrorMessage(err)}, falling back`),
);
streamFailed = true;
// Non-benign streaming errors leave `pendingText` populated with every
// buffered chunk since the last flush (appendSlackStream accumulates
// into pendingText BEFORE the SDK call, so the failing chunk is
// included too). Route the full buffer through the chunked fallback so
// earlier chunks aren't lost, then skip deliverNormally - pendingText
// already contains this payload's text.
if (streamSession && streamSession.pendingText) {
const bufferedFallbackErr = new SlackStreamNotDeliveredError(
streamSession.pendingText,
"unknown",
);
const delivered = await deliverPendingStreamFallback(streamSession, bufferedFallbackErr);
if (delivered) {
replyPlan.markSent();
deliveryTracker.markDelivered({
kind: params.kind,
payload: params.payload,
threadTs: streamSession.threadTs,
textOverride: text,
});
return;
}
}
await deliverNormally({
payload: params.payload,
kind: params.kind,

View File

@@ -74,6 +74,23 @@ describe("sendMessageSlack chunking", () => {
}),
);
});
it("splits oversized fallback text through the normal Slack sender", async () => {
const client = createSlackSendTestClient();
const message = "a".repeat(8500);
await sendMessageSlack("channel:C123", message, {
token: "xoxb-test",
cfg: SLACK_TEST_CFG,
client,
});
const postedTexts = client.chat.postMessage.mock.calls.map((call) => call[0].text);
expect(postedTexts).toHaveLength(2);
expect(postedTexts.every((text) => typeof text === "string" && text.length <= 8000)).toBe(true);
expect(postedTexts.join("")).toBe(message);
});
});
describe("sendMessageSlack blocks", () => {

View File

@@ -74,7 +74,7 @@ export type StopSlackStreamParams = {
* Thrown when Slack rejects a stream flush/finalize with a recipient-resolution
* error (see {@link BENIGN_SLACK_FINALIZE_ERROR_CODES}) while text is still
* only buffered locally by the Slack SDK. Carries the pending text so the
* caller can deliver it via a normal `chat.postMessage`.
* caller can deliver it via the normal Slack reply path.
*/
export class SlackStreamNotDeliveredError extends Error {
readonly pendingText: string;
@@ -205,7 +205,7 @@ export async function appendSlackStream(params: AppendSlackStreamParams): Promis
* If the same benign error fires while text is still only buffered locally
* (e.g. short replies that never exceeded the SDK's buffer_size), this
* function throws a {@link SlackStreamNotDeliveredError} carrying that pending
* text so the caller can deliver it via `chat.postMessage`.
* text so the caller can deliver it through the normal Slack reply path.
*
* All other errors propagate unchanged.
*/