fix: harden Slack stream fallback delivery (#70370) (thanks @mvanhorn)

This commit is contained in:
Peter Steinberger
2026-04-23 01:55:18 +01:00
parent e55b932632
commit 2e90a2247e
5 changed files with 333 additions and 66 deletions

View File

@@ -7,6 +7,27 @@ const SAME_TEXT = "same reply";
const createSlackDraftStreamMock = vi.fn();
const deliverRepliesMock = vi.fn(async () => {});
const finalizeSlackPreviewEditMock = vi.fn(async () => {});
const postMessageMock = vi.fn(async () => ({ ok: true, ts: "171234.999" }));
const appendSlackStreamMock = vi.fn(async () => {});
const startSlackStreamMock = vi.fn(async () => ({
channel: "C123",
threadTs: THREAD_TS,
stopped: false,
delivered: true,
pendingText: "",
}));
const stopSlackStreamMock = vi.fn(async () => {});
class TestSlackStreamNotDeliveredError extends Error {
readonly pendingText: string;
readonly slackCode: string;
constructor(pendingText: string, slackCode: string) {
super(`slack-stream not delivered: ${slackCode}`);
this.name = "SlackStreamNotDeliveredError";
this.pendingText = pendingText;
this.slackCode = slackCode;
}
}
let mockedNativeStreaming = false;
let mockedDispatchSequence: Array<{
kind: "tool" | "block" | "final";
payload: { text: string; isError?: boolean; mediaUrl?: string; mediaUrls?: string[] };
@@ -35,7 +56,7 @@ function createPreparedSlackMessage() {
cfg: {},
runtime: {},
botToken: "xoxb-test",
app: { client: {} },
app: { client: { chat: { postMessage: postMessageMock } } },
teamId: "T1",
textLimit: 4000,
typingReaction: "",
@@ -109,7 +130,7 @@ vi.mock("openclaw/plugin-sdk/channel-reply-pipeline", () => ({
vi.mock("openclaw/plugin-sdk/channel-streaming", () => ({
resolveChannelStreamingBlockEnabled: () => false,
resolveChannelStreamingNativeTransport: () => false,
resolveChannelStreamingNativeTransport: () => mockedNativeStreaming,
resolveChannelStreamingPreviewToolProgress: () => true,
}));
@@ -183,18 +204,28 @@ vi.mock("../../stream-mode.js", () => ({
buildStatusFinalPreviewText: () => "status",
resolveSlackStreamingConfig: () => ({
mode: "partial",
nativeStreaming: false,
nativeStreaming: mockedNativeStreaming,
draftMode: "append",
}),
}));
vi.mock("../../streaming.js", () => ({
appendSlackStream: async () => {},
startSlackStream: async () => ({
threadTs: THREAD_TS,
stopped: false,
}),
stopSlackStream: async () => {},
appendSlackStream: appendSlackStreamMock,
markSlackStreamFallbackDelivered: (session: {
delivered: boolean;
pendingText: string;
stopped: boolean;
}) => {
const hadNativeDelivery = session.delivered;
session.delivered = true;
session.pendingText = "";
if (!hadNativeDelivery) {
session.stopped = true;
}
},
SlackStreamNotDeliveredError: TestSlackStreamNotDeliveredError,
startSlackStream: startSlackStreamMock,
stopSlackStream: stopSlackStreamMock,
}));
vi.mock("../../threading.js", () => ({
@@ -269,10 +300,24 @@ describe("dispatchPreparedSlackMessage preview fallback", () => {
createSlackDraftStreamMock.mockReset();
deliverRepliesMock.mockReset();
finalizeSlackPreviewEditMock.mockReset();
postMessageMock.mockClear();
appendSlackStreamMock.mockReset();
startSlackStreamMock.mockReset();
stopSlackStreamMock.mockReset();
mockedNativeStreaming = false;
mockedDispatchSequence = [{ kind: "final", payload: { text: FINAL_REPLY_TEXT } }];
createSlackDraftStreamMock.mockReturnValue(createDraftStreamStub());
finalizeSlackPreviewEditMock.mockRejectedValue(new Error("socket closed"));
startSlackStreamMock.mockResolvedValue({
channel: "C123",
threadTs: THREAD_TS,
stopped: false,
delivered: true,
pendingText: "",
});
appendSlackStreamMock.mockResolvedValue(undefined);
stopSlackStreamMock.mockResolvedValue(undefined);
});
it("falls back to normal delivery when preview finalize fails", async () => {
@@ -363,4 +408,61 @@ describe("dispatchPreparedSlackMessage preview fallback", () => {
expect(finalizeSlackPreviewEditMock).not.toHaveBeenCalled();
expect(deliverRepliesMock).toHaveBeenCalledTimes(1);
});
it("posts pending native stream text when finalize fails before the SDK buffer flushes", async () => {
mockedNativeStreaming = true;
const session = {
channel: "C123",
threadTs: THREAD_TS,
stopped: false,
delivered: false,
pendingText: FINAL_REPLY_TEXT,
};
startSlackStreamMock.mockResolvedValueOnce(session);
stopSlackStreamMock.mockRejectedValueOnce(
new TestSlackStreamNotDeliveredError(FINAL_REPLY_TEXT, "user_not_found"),
);
await dispatchPreparedSlackMessage(createPreparedSlackMessage());
expect(deliverRepliesMock).not.toHaveBeenCalled();
expect(postMessageMock).toHaveBeenCalledTimes(1);
expect(postMessageMock).toHaveBeenCalledWith({
channel: "C123",
thread_ts: THREAD_TS,
text: FINAL_REPLY_TEXT,
});
expect(session.stopped).toBe(true);
});
it("posts all pending native stream text when an append flush fails", async () => {
mockedNativeStreaming = true;
mockedDispatchSequence = [
{ kind: "block", payload: { text: "first buffered" } },
{ kind: "final", payload: { text: "second flushes" } },
];
const session = {
channel: "C123",
threadTs: THREAD_TS,
stopped: false,
delivered: false,
pendingText: "first buffered",
};
startSlackStreamMock.mockResolvedValueOnce(session);
appendSlackStreamMock.mockImplementationOnce(async () => {
session.pendingText += "\nsecond flushes";
throw new TestSlackStreamNotDeliveredError(session.pendingText, "user_not_found");
});
await dispatchPreparedSlackMessage(createPreparedSlackMessage());
expect(deliverRepliesMock).not.toHaveBeenCalled();
expect(postMessageMock).toHaveBeenCalledTimes(1);
expect(postMessageMock).toHaveBeenCalledWith({
channel: "C123",
thread_ts: THREAD_TS,
text: "first buffered\nsecond flushes",
});
expect(stopSlackStreamMock).not.toHaveBeenCalled();
});
});

View File

@@ -39,6 +39,7 @@ import {
import type { SlackStreamSession } from "../../streaming.js";
import {
appendSlackStream,
markSlackStreamFallbackDelivered,
SlackStreamNotDeliveredError,
startSlackStream,
stopSlackStream,
@@ -430,6 +431,41 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
let usedReplyThreadTs: string | undefined;
let observedReplyDelivery = false;
const deliveryTracker = createSlackTurnDeliveryTracker();
const deliverPendingStreamFallback = async (
session: SlackStreamSession,
err: SlackStreamNotDeliveredError,
): Promise<boolean> => {
// The Slack SDK still owns this text in-memory; no streaming API call has
// acknowledged it. Send it once through normal chat.postMessage.
const fallbackText = err.pendingText.trim();
if (!fallbackText) {
return false;
}
try {
// Rename-bind to dodge eslint-plugin-unicorn/require-post-message-target-origin
// which cannot distinguish Slack chat.postMessage from window.postMessage.
const postChatMessage = ctx.app.client.chat.postMessage.bind(ctx.app.client.chat);
await postChatMessage({
channel: session.channel,
thread_ts: session.threadTs,
text: fallbackText,
});
markSlackStreamFallbackDelivered(session);
observedReplyDelivery = true;
usedReplyThreadTs ??= session.threadTs;
logVerbose(
`slack-stream: streamed delivery failed (${err.slackCode}); delivered ${fallbackText.length} chars via chat.postMessage fallback`,
);
return true;
} catch (postErr) {
runtime.error?.(
danger(
`slack-stream: fallback chat.postMessage failed after ${err.slackCode}: ${formatErrorMessage(postErr)}`,
),
);
return false;
}
};
const deliverNormally = async (params: {
payload: ReplyPayload;
@@ -530,7 +566,11 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
}),
userId: message.user,
});
observedReplyDelivery = true;
// startSlackStream may only buffer locally. Count delivery only after
// the SDK reports a real Slack response.
if (streamSession.delivered) {
observedReplyDelivery = true;
}
usedReplyThreadTs ??= streamThreadTs;
replyPlan.markSent();
deliveryTracker.markDelivered({
@@ -557,6 +597,11 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
session: streamSession,
text: "\n" + text,
});
// appendSlackStream also buffers locally below the SDK threshold; avoid
// optimistic "done" status until Slack acknowledges a flush.
if (streamSession.delivered) {
observedReplyDelivery = true;
}
deliveryTracker.markDelivered({
kind: params.kind,
payload: params.payload,
@@ -564,6 +609,29 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
textOverride: text,
});
} catch (err) {
if (err instanceof SlackStreamNotDeliveredError) {
streamFailed = true;
if (streamSession) {
const delivered = await deliverPendingStreamFallback(streamSession, err);
if (delivered) {
replyPlan.markSent();
deliveryTracker.markDelivered({
kind: params.kind,
payload: params.payload,
threadTs: streamSession.threadTs,
textOverride: text,
});
return;
}
throw err;
}
await deliverNormally({
payload: params.payload,
kind: params.kind,
forcedThreadTs: plannedThreadTs,
});
return;
}
runtime.error?.(
danger(`slack-stream: streaming API call failed: ${formatErrorMessage(err)}, falling back`),
);
@@ -874,31 +942,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
await stopSlackStream({ session: finalStream });
} catch (err) {
if (err instanceof SlackStreamNotDeliveredError) {
// Slack rejected the stream before any text reached the recipient
// (common for short replies to Slack Connect users - the SDK buffers
// under 256 chars and the internal chat.startStream inside stop()
// is the first call to Slack). Fall back to a plain chat.postMessage
// so the reply is not lost.
try {
// Rename-bind to dodge eslint-plugin-unicorn/require-post-message-target-origin
// which cannot distinguish Slack chat.postMessage from window.postMessage.
const postChatMessage = ctx.app.client.chat.postMessage.bind(ctx.app.client.chat);
await postChatMessage({
channel: finalStream.channel,
thread_ts: finalStream.threadTs,
text: err.pendingText,
});
streamFallbackDelivered = true;
logVerbose(
`slack-stream: streamed finalize failed (${err.slackCode}); delivered ${err.pendingText.length} chars via chat.postMessage fallback`,
);
} catch (postErr) {
runtime.error?.(
danger(
`slack-stream: fallback chat.postMessage failed after ${err.slackCode}: ${formatErrorMessage(postErr)}`,
),
);
}
streamFallbackDelivered = await deliverPendingStreamFallback(finalStream, err);
} else {
runtime.error?.(danger(`slack-stream: failed to stop stream: ${formatErrorMessage(err)}`));
}

View File

@@ -4,7 +4,9 @@ import {
appendSlackStream,
extractSlackErrorCode,
isBenignSlackFinalizeError,
markSlackStreamFallbackDelivered,
SlackStreamNotDeliveredError,
startSlackStream,
stopSlackStream,
type SlackStreamSession,
} from "./streaming.js";
@@ -81,6 +83,55 @@ describe("stopSlackStream finalize error handling", () => {
expect((thrown as SlackStreamNotDeliveredError).pendingText).toBe("hello world");
});
it("clears pendingText after an append flush is acknowledged by Slack", async () => {
const session = makeSession({
appendImpl: async () => ({ ts: "1700000000.100203" }),
});
await appendSlackStream({ session, text: "flushed text" });
expect(session.delivered).toBe(true);
expect(session.pendingText).toBe("");
});
it("throws SlackStreamNotDeliveredError with buffered text when append flush fails", async () => {
const session = makeSession({
appendImpl: vi
.fn()
.mockResolvedValueOnce(null)
.mockRejectedValueOnce(slackApiError("user_not_found")),
});
await appendSlackStream({ session, text: "first buffered" });
const thrown = await appendSlackStream({ session, text: "\nsecond flushes" }).catch(
(err: unknown) => err,
);
expect(thrown).toBeInstanceOf(SlackStreamNotDeliveredError);
expect((thrown as SlackStreamNotDeliveredError).pendingText).toBe(
"first buffered\nsecond flushes",
);
});
it("falls back only still-pending tail text after a prior flush succeeded", async () => {
const session = makeSession({
appendImpl: vi
.fn()
.mockResolvedValueOnce({ ts: "1700000000.100204" })
.mockResolvedValue(null),
stopImpl: async () => {
throw slackApiError("team_not_found");
},
});
await appendSlackStream({ session, text: "already visible" });
await appendSlackStream({ session, text: "\npending tail" });
const thrown = await stopSlackStream({ session }).catch((err: unknown) => err);
expect(thrown).toBeInstanceOf(SlackStreamNotDeliveredError);
expect((thrown as SlackStreamNotDeliveredError).pendingText).toBe("\npending tail");
});
it("swallows missing_recipient_user_id when delivered", async () => {
const session = makeSession({
appendImpl: async () => ({ ts: "1700000000.100201" }),
@@ -139,6 +190,45 @@ describe("stopSlackStream finalize error handling", () => {
expect(session.delivered).toBe(false);
await stopSlackStream({ session });
expect(session.delivered).toBe(true);
expect(session.pendingText).toBe("");
});
it("converts a start-time flush rejection into a pending-text fallback error", async () => {
const client = {
chatStream: () => ({
append: async () => {
throw slackApiError("user_not_found");
},
stop: async () => {},
}),
};
const thrown = await startSlackStream({
client: client as never,
channel: "C123",
threadTs: "1700000000.000100",
text: "initial chunk that flushes immediately",
}).catch((err: unknown) => err);
expect(thrown).toBeInstanceOf(SlackStreamNotDeliveredError);
expect((thrown as SlackStreamNotDeliveredError).pendingText).toBe(
"initial chunk that flushes immediately",
);
});
it("marks fallback-delivered sessions stopped only when no native stream exists", () => {
const neverDelivered = makeSession({});
markSlackStreamFallbackDelivered(neverDelivered);
expect(neverDelivered.delivered).toBe(true);
expect(neverDelivered.pendingText).toBe("");
expect(neverDelivered.stopped).toBe(true);
const alreadyDelivered = makeSession({});
alreadyDelivered.delivered = true;
markSlackStreamFallbackDelivered(alreadyDelivered);
expect(alreadyDelivered.delivered).toBe(true);
expect(alreadyDelivered.pendingText).toBe("");
expect(alreadyDelivered.stopped).toBe(false);
});
});

View File

@@ -35,11 +35,7 @@ export type SlackStreamSession = {
* call. Until `delivered` flips, nothing has actually reached Slack.
*/
delivered: boolean;
/**
* Concatenation of every `text` passed to the session. Used by the
* caller to fall back to a normal `chat.postMessage` when finalize fails
* before any append flushed the buffer.
*/
/** Text accepted by the SDK but not yet acknowledged by Slack. */
pendingText: string;
};
@@ -75,11 +71,10 @@ export type StopSlackStreamParams = {
};
/**
* Thrown by {@link stopSlackStream} when Slack's `chat.stopStream` rejects
* with a recipient-resolution error (see
* {@link BENIGN_SLACK_FINALIZE_ERROR_CODES}) and no prior `append` had
* flushed the buffer, so no text ever reached Slack. Carries the pending
* text so the caller can deliver it via a normal `chat.postMessage`.
* Thrown when Slack rejects a stream flush/finalize with a recipient-resolution
* error (see {@link BENIGN_SLACK_FINALIZE_ERROR_CODES}) while text is still
* only buffered locally by the Slack SDK. Carries the pending text so the
* caller can deliver it via a normal `chat.postMessage`.
*/
export class SlackStreamNotDeliveredError extends Error {
readonly pendingText: string;
@@ -134,16 +129,27 @@ export async function startSlackStream(
if (text) {
session.pendingText += text;
// `append` returns the Slack response when it actually hits the network,
// null when the buffer is still under `buffer_size` (see chat-stream.js).
// Flip `delivered` only when Slack acknowledged.
const result = await streamer.append({ markdown_text: text });
if (result) {
session.delivered = true;
// Slack SDK ChatStreamer keeps short markdown_text chunks in a local buffer
// and returns null until buffer_size is reached. Only a non-null response
// means Slack acknowledged startStream/appendStream.
try {
const result = await streamer.append({ markdown_text: text });
if (result) {
session.delivered = true;
session.pendingText = "";
}
logVerbose(
`slack-stream: appended initial text (${text.length} chars, ${result ? "flushed" : "buffered"})`,
);
} catch (err) {
if (isBenignSlackFinalizeError(err) && session.pendingText) {
throw new SlackStreamNotDeliveredError(
session.pendingText,
extractSlackErrorCode(err) ?? "unknown",
);
}
throw err;
}
logVerbose(
`slack-stream: appended initial text (${text.length} chars, ${result ? "flushed" : "buffered"})`,
);
}
return session;
@@ -165,11 +171,24 @@ export async function appendSlackStream(params: AppendSlackStreamParams): Promis
}
session.pendingText += text;
const result = await session.streamer.append({ markdown_text: text });
if (result) {
session.delivered = true;
try {
// Same SDK contract as startSlackStream: null means local-only buffer,
// non-null means Slack accepted the pending buffer and it is visible.
const result = await session.streamer.append({ markdown_text: text });
if (result) {
session.delivered = true;
session.pendingText = "";
}
logVerbose(`slack-stream: appended ${text.length} chars (${result ? "flushed" : "buffered"})`);
} catch (err) {
if (isBenignSlackFinalizeError(err) && session.pendingText) {
throw new SlackStreamNotDeliveredError(
session.pendingText,
extractSlackErrorCode(err) ?? "unknown",
);
}
throw err;
}
logVerbose(`slack-stream: appended ${text.length} chars (${result ? "flushed" : "buffered"})`);
}
/**
@@ -183,10 +202,10 @@ export async function appendSlackStream(params: AppendSlackStreamParams): Promis
* has already landed on Slack, the error is swallowed and the session is
* marked stopped - the already-delivered text stays visible.
*
* If the same benign error fires before any append flushed (e.g. short
* replies that never exceeded the SDK's buffer_size), this function throws
* a {@link SlackStreamNotDeliveredError} carrying the pending text so the
* caller can deliver it via `chat.postMessage`.
* If the same benign error fires while text is still only buffered locally
* (e.g. short replies that never exceeded the SDK's buffer_size), this
* function throws a {@link SlackStreamNotDeliveredError} carrying that pending
* text so the caller can deliver it via `chat.postMessage`.
*
* All other errors propagate unchanged.
*/
@@ -212,19 +231,21 @@ export async function stopSlackStream(params: StopSlackStreamParams): Promise<vo
try {
await session.streamer.stop(text ? { markdown_text: text } : undefined);
session.delivered = true;
session.pendingText = "";
} catch (err) {
if (isBenignSlackFinalizeError(err)) {
const code = extractSlackErrorCode(err) ?? "unknown";
if (session.pendingText) {
// stop() can be the first network call for short replies. If Slack
// Connect rejects it, the user has not seen the SDK-buffered text yet.
throw new SlackStreamNotDeliveredError(session.pendingText, code);
}
if (session.delivered) {
logVerbose(
`slack-stream: finalize rejected by Slack (${code}); prior appends delivered, treating stream as stopped`,
);
return;
}
// No append ever flushed; the ChatStreamer's stop() runs chat.startStream
// internally and that call failed. Surface the pending text so the
// caller can post a normal message via chat.postMessage.
throw new SlackStreamNotDeliveredError(session.pendingText, code);
}
throw err;
}
@@ -276,3 +297,12 @@ export function extractSlackErrorCode(err: unknown): string | undefined {
const match = message.match(/An API error occurred:\s*([a-z_][a-z0-9_]*)/i);
return match?.[1];
}
export function markSlackStreamFallbackDelivered(session: SlackStreamSession): void {
const hadNativeDelivery = session.delivered;
session.pendingText = "";
session.delivered = true;
if (!hadNativeDelivery) {
session.stopped = true;
}
}