fix(slack): fall back to chat.postMessage when stream finalize fails pre-flush

Address adversarial review finding on #70295: the prior swallow-on-benign
fix silently dropped short replies to Slack Connect users. The SDK's
ChatStreamer buffers text locally until buffer_size (256 default), so
short replies never trigger chat.startStream via append(). streamer.stop()
then issues startStream internally; on Slack Connect recipients this
throws user_not_found. With the prior fix that error was swallowed and
the dispatcher marked the turn delivered - user saw 'done' reaction but
no message.

SlackStreamSession now tracks delivered (true once any Slack API call
returned a response) and pendingText (accumulation of every append +
final-stop text). stopSlackStream:
  - swallows the benign code when delivered=true (prior append flushed;
    text is visible; same behavior as before)
  - throws a new SlackStreamNotDeliveredError carrying pendingText when
    delivered=false (nothing reached Slack)

dispatch.ts catches SlackStreamNotDeliveredError and posts pendingText
via a rename-bound chat.postMessage (to dodge the unicorn lint rule),
and flips streamFallbackDelivered so anyReplyDelivered stays correct.

Fixes #70295
This commit is contained in:
Matt Van Horn
2026-04-22 16:53:23 -07:00
committed by Peter Steinberger
parent 676ed34cbd
commit e55b932632
3 changed files with 252 additions and 60 deletions

View File

@@ -37,7 +37,12 @@ import {
resolveSlackStreamingConfig,
} from "../../stream-mode.js";
import type { SlackStreamSession } from "../../streaming.js";
import { appendSlackStream, startSlackStream, stopSlackStream } from "../../streaming.js";
import {
appendSlackStream,
SlackStreamNotDeliveredError,
startSlackStream,
stopSlackStream,
} from "../../streaming.js";
import { resolveSlackThreadTargets } from "../../threading.js";
import { normalizeSlackAllowOwnerEntry } from "../allow-list.js";
import { resolveStorePath, updateLastRoute } from "../config.runtime.js";
@@ -862,17 +867,50 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
// -----------------------------------------------------------------------
// Finalize the stream if one was started
// -----------------------------------------------------------------------
let streamFallbackDelivered = false;
const finalStream = streamSession as SlackStreamSession | null;
if (finalStream && !finalStream.stopped) {
try {
await stopSlackStream({ session: finalStream });
} catch (err) {
runtime.error?.(danger(`slack-stream: failed to stop stream: ${formatErrorMessage(err)}`));
if (err instanceof SlackStreamNotDeliveredError) {
// Slack rejected the stream before any text reached the recipient
// (common for short replies to Slack Connect users - the SDK buffers
// under 256 chars and the internal chat.startStream inside stop()
// is the first call to Slack). Fall back to a plain chat.postMessage
// so the reply is not lost.
try {
// Rename-bind to dodge eslint-plugin-unicorn/require-post-message-target-origin
// which cannot distinguish Slack chat.postMessage from window.postMessage.
const postChatMessage = ctx.app.client.chat.postMessage.bind(ctx.app.client.chat);
await postChatMessage({
channel: finalStream.channel,
thread_ts: finalStream.threadTs,
text: err.pendingText,
});
streamFallbackDelivered = true;
logVerbose(
`slack-stream: streamed finalize failed (${err.slackCode}); delivered ${err.pendingText.length} chars via chat.postMessage fallback`,
);
} catch (postErr) {
runtime.error?.(
danger(
`slack-stream: fallback chat.postMessage failed after ${err.slackCode}: ${formatErrorMessage(postErr)}`,
),
);
}
} else {
runtime.error?.(danger(`slack-stream: failed to stop stream: ${formatErrorMessage(err)}`));
}
}
}
const anyReplyDelivered =
observedReplyDelivery || queuedFinal || (counts.block ?? 0) > 0 || (counts.final ?? 0) > 0;
observedReplyDelivery ||
queuedFinal ||
streamFallbackDelivered ||
(counts.block ?? 0) > 0 ||
(counts.final ?? 0) > 0;
if (statusReactionsEnabled) {
if (dispatchError) {

View File

@@ -1,16 +1,28 @@
import type { ChatStreamer } from "@slack/web-api/dist/chat-stream.js";
import { describe, expect, it, vi } from "vitest";
import { stopSlackStream, type SlackStreamSession } from "./streaming.js";
import {
appendSlackStream,
extractSlackErrorCode,
isBenignSlackFinalizeError,
SlackStreamNotDeliveredError,
stopSlackStream,
type SlackStreamSession,
} from "./streaming.js";
function makeSession(stopImpl: () => Promise<void>): SlackStreamSession {
type AppendImpl = () => Promise<unknown>;
type StopImpl = () => Promise<void>;
function makeSession(params: { appendImpl?: AppendImpl; stopImpl?: StopImpl }): SlackStreamSession {
return {
streamer: {
append: vi.fn(async () => {}),
stop: vi.fn(stopImpl),
append: vi.fn(params.appendImpl ?? (async () => null)),
stop: vi.fn(params.stopImpl ?? (async () => {})),
} as unknown as ChatStreamer,
channel: "C123",
threadTs: "1700000000.000100",
stopped: false,
delivered: false,
pendingText: "",
};
}
@@ -21,42 +33,84 @@ function slackApiError(code: string): Error {
}
describe("stopSlackStream finalize error handling", () => {
it("swallows user_not_found (Slack Connect DMs) and marks the session stopped", async () => {
const session = makeSession(async () => {
throw slackApiError("user_not_found");
it("swallows user_not_found after prior append flushed (delivered=true)", async () => {
const session = makeSession({
appendImpl: async () => ({ ts: "1700000000.100200" }), // non-null => flushed
stopImpl: async () => {
throw slackApiError("user_not_found");
},
});
await appendSlackStream({ session, text: "some text that Slack saw" });
expect(session.delivered).toBe(true);
await expect(stopSlackStream({ session })).resolves.toBeUndefined();
expect(session.stopped).toBe(true);
});
it("swallows team_not_found (Slack Connect cross-workspace) and marks stopped", async () => {
const session = makeSession(async () => {
throw slackApiError("team_not_found");
it("throws SlackStreamNotDeliveredError when user_not_found fires before any flush", async () => {
const session = makeSession({
appendImpl: async () => null, // null => buffered, never hit Slack
stopImpl: async () => {
throw slackApiError("user_not_found");
},
});
await appendSlackStream({ session, text: "short reply under buffer size" });
expect(session.delivered).toBe(false);
const thrown = await stopSlackStream({ session }).catch((err: unknown) => err);
expect(thrown).toBeInstanceOf(SlackStreamNotDeliveredError);
expect((thrown as SlackStreamNotDeliveredError).slackCode).toBe("user_not_found");
expect((thrown as SlackStreamNotDeliveredError).pendingText).toBe(
"short reply under buffer size",
);
expect(session.stopped).toBe(true);
});
it("throws SlackStreamNotDeliveredError carrying stop()'s final text too", async () => {
const session = makeSession({
appendImpl: async () => null,
stopImpl: async () => {
throw slackApiError("team_not_found");
},
});
await appendSlackStream({ session, text: "hello " });
const thrown = await stopSlackStream({ session, text: "world" }).catch((err: unknown) => err);
expect(thrown).toBeInstanceOf(SlackStreamNotDeliveredError);
expect((thrown as SlackStreamNotDeliveredError).slackCode).toBe("team_not_found");
expect((thrown as SlackStreamNotDeliveredError).pendingText).toBe("hello world");
});
it("swallows missing_recipient_user_id when delivered", async () => {
const session = makeSession({
appendImpl: async () => ({ ts: "1700000000.100201" }),
stopImpl: async () => {
throw slackApiError("missing_recipient_user_id");
},
});
await appendSlackStream({ session, text: "chars" });
await expect(stopSlackStream({ session })).resolves.toBeUndefined();
expect(session.stopped).toBe(true);
});
it("swallows missing_recipient_user_id (DM closed mid-stream) and marks stopped", async () => {
const session = makeSession(async () => {
throw slackApiError("missing_recipient_user_id");
});
await expect(stopSlackStream({ session })).resolves.toBeUndefined();
expect(session.stopped).toBe(true);
});
it("re-throws unexpected Slack API errors so callers can log them", async () => {
const session = makeSession(async () => {
throw slackApiError("not_authed");
it("re-throws unexpected Slack API errors even when delivered", async () => {
const session = makeSession({
appendImpl: async () => ({ ts: "1700000000.100202" }),
stopImpl: async () => {
throw slackApiError("not_authed");
},
});
await appendSlackStream({ session, text: "some text" });
await expect(stopSlackStream({ session })).rejects.toThrow(/not_authed/);
// Session is still marked stopped so retries do not re-enter streamer.stop.
expect(session.stopped).toBe(true);
});
it("re-throws non-Slack-shaped errors unchanged", async () => {
const session = makeSession(async () => {
throw new Error("socket reset");
const session = makeSession({
stopImpl: async () => {
throw new Error("socket reset");
},
});
await expect(stopSlackStream({ session })).rejects.toThrow(/socket reset/);
expect(session.stopped).toBe(true);
@@ -65,12 +119,59 @@ describe("stopSlackStream finalize error handling", () => {
it("returns a no-op on an already-stopped session", async () => {
const stop = vi.fn(async () => {});
const session: SlackStreamSession = {
streamer: { append: vi.fn(async () => {}), stop } as unknown as ChatStreamer,
streamer: { append: vi.fn(async () => null), stop } as unknown as ChatStreamer,
channel: "C123",
threadTs: "1700000000.000100",
stopped: true,
delivered: false,
pendingText: "",
};
await expect(stopSlackStream({ session })).resolves.toBeUndefined();
expect(stop).not.toHaveBeenCalled();
});
it("marks delivered=true on successful stop() without prior flush", async () => {
const session = makeSession({
appendImpl: async () => null,
stopImpl: async () => {},
});
await appendSlackStream({ session, text: "short" });
expect(session.delivered).toBe(false);
await stopSlackStream({ session });
expect(session.delivered).toBe(true);
});
});
describe("error classification", () => {
it("isBenignSlackFinalizeError matches each allowlisted code", () => {
for (const code of ["user_not_found", "team_not_found", "missing_recipient_user_id"]) {
expect(isBenignSlackFinalizeError(slackApiError(code))).toBe(true);
}
});
it("isBenignSlackFinalizeError rejects non-listed codes", () => {
for (const code of ["not_authed", "ratelimited", "channel_not_found"]) {
expect(isBenignSlackFinalizeError(slackApiError(code))).toBe(false);
}
});
it("extractSlackErrorCode handles data.error, message fallback, and junk shapes", () => {
// Canonical SDK shape
expect(extractSlackErrorCode(slackApiError("user_not_found"))).toBe("user_not_found");
// message-regex fallback when data is absent
expect(extractSlackErrorCode(new Error("An API error occurred: rate_limited"))).toBe(
"rate_limited",
);
// data.error not a string - falls through to message parse
const wrongShape = new Error("plain message");
(wrongShape as unknown as { data: unknown }).data = { error: 42 };
expect(extractSlackErrorCode(wrongShape)).toBeUndefined();
// data.error null - falls through
(wrongShape as unknown as { data: unknown }).data = null;
expect(extractSlackErrorCode(wrongShape)).toBeUndefined();
// Non-object error
expect(extractSlackErrorCode("raw string")).toBeUndefined();
expect(extractSlackErrorCode(null)).toBeUndefined();
expect(extractSlackErrorCode(undefined)).toBeUndefined();
});
});

View File

@@ -28,6 +28,19 @@ export type SlackStreamSession = {
threadTs: string;
/** True once stop() has been called. */
stopped: boolean;
/**
* True once any Slack API call (startStream / appendStream) has succeeded.
* The SDK buffers appended text locally until the buffer exceeds
* `buffer_size` (default 256 chars); only then does it issue a network
* call. Until `delivered` flips, nothing has actually reached Slack.
*/
delivered: boolean;
/**
* Concatenation of every `text` passed to the session. Used by the
* caller to fall back to a normal `chat.postMessage` when finalize fails
* before any append flushed the buffer.
*/
pendingText: string;
};
export type StartSlackStreamParams = {
@@ -61,6 +74,27 @@ export type StopSlackStreamParams = {
text?: string;
};
/**
* Thrown by {@link stopSlackStream} when Slack's `chat.stopStream` rejects
* with a recipient-resolution error (see
* {@link BENIGN_SLACK_FINALIZE_ERROR_CODES}) and no prior `append` had
* flushed the buffer, so no text ever reached Slack. Carries the pending
* text so the caller can deliver it via a normal `chat.postMessage`.
*/
export class SlackStreamNotDeliveredError extends Error {
readonly pendingText: string;
readonly slackCode: string;
constructor(pendingText: string, slackCode: string) {
super(
`slack-stream: finalize failed with ${slackCode} before any text reached Slack ` +
`(${pendingText.length} chars pending)`,
);
this.name = "SlackStreamNotDeliveredError";
this.pendingText = pendingText;
this.slackCode = slackCode;
}
}
// ---------------------------------------------------------------------------
// Stream lifecycle
// ---------------------------------------------------------------------------
@@ -94,13 +128,22 @@ export async function startSlackStream(
channel,
threadTs,
stopped: false,
delivered: false,
pendingText: "",
};
// If initial text is provided, send it as the first append which will
// trigger the ChatStreamer to call chat.startStream under the hood.
if (text) {
await streamer.append({ markdown_text: text });
logVerbose(`slack-stream: appended initial text (${text.length} chars)`);
session.pendingText += text;
// `append` returns the Slack response when it actually hits the network,
// null when the buffer is still under `buffer_size` (see chat-stream.js).
// Flip `delivered` only when Slack acknowledged.
const result = await streamer.append({ markdown_text: text });
if (result) {
session.delivered = true;
}
logVerbose(
`slack-stream: appended initial text (${text.length} chars, ${result ? "flushed" : "buffered"})`,
);
}
return session;
@@ -121,8 +164,12 @@ export async function appendSlackStream(params: AppendSlackStreamParams): Promis
return;
}
await session.streamer.append({ markdown_text: text });
logVerbose(`slack-stream: appended ${text.length} chars`);
session.pendingText += text;
const result = await session.streamer.append({ markdown_text: text });
if (result) {
session.delivered = true;
}
logVerbose(`slack-stream: appended ${text.length} chars (${result ? "flushed" : "buffered"})`);
}
/**
@@ -132,10 +179,16 @@ export async function appendSlackStream(params: AppendSlackStreamParams): Promis
* Optionally include final text to append before stopping.
*
* If Slack's `chat.stopStream` responds with a known benign finalize error
* (e.g. `user_not_found` for Slack Connect recipients - see issue #70295),
* any text already delivered via `append()` stays visible and the session
* is marked stopped. Other Slack API errors still propagate so the caller
* can record them.
* (see {@link BENIGN_SLACK_FINALIZE_ERROR_CODES}) AND any prior `append`
* has already landed on Slack, the error is swallowed and the session is
* marked stopped - the already-delivered text stays visible.
*
* If the same benign error fires before any append flushed (e.g. short
* replies that never exceeded the SDK's buffer_size), this function throws
* a {@link SlackStreamNotDeliveredError} carrying the pending text so the
* caller can deliver it via `chat.postMessage`.
*
* All other errors propagate unchanged.
*/
export async function stopSlackStream(params: StopSlackStreamParams): Promise<void> {
const { session, text } = params;
@@ -146,6 +199,9 @@ export async function stopSlackStream(params: StopSlackStreamParams): Promise<vo
}
session.stopped = true;
if (text) {
session.pendingText += text;
}
logVerbose(
`slack-stream: stopping stream in ${session.channel} thread=${session.threadTs}${
@@ -155,13 +211,20 @@ export async function stopSlackStream(params: StopSlackStreamParams): Promise<vo
try {
await session.streamer.stop(text ? { markdown_text: text } : undefined);
session.delivered = true;
} catch (err) {
if (isBenignSlackFinalizeError(err)) {
logVerbose(
`slack-stream: finalize rejected by Slack (${formatSlackError(err)}); ` +
"appended text remains visible, treating stream as stopped",
);
return;
const code = extractSlackErrorCode(err) ?? "unknown";
if (session.delivered) {
logVerbose(
`slack-stream: finalize rejected by Slack (${code}); prior appends delivered, treating stream as stopped`,
);
return;
}
// No append ever flushed; the ChatStreamer's stop() runs chat.startStream
// internally and that call failed. Surface the pending text so the
// caller can post a normal message via chat.postMessage.
throw new SlackStreamNotDeliveredError(session.pendingText, code);
}
throw err;
}
@@ -174,11 +237,12 @@ export async function stopSlackStream(params: StopSlackStreamParams): Promise<vo
// ---------------------------------------------------------------------------
/**
* Slack API error codes that indicate `chat.stopStream` cannot finalize the
* stream for the current recipient/team, but any `chat.appendStream` calls
* that already landed are still visible to the user. Treat these as benign
* at the dispatch layer so the reply is not reported as an error when text
* did get through.
* Slack API error codes that indicate `chat.stopStream` (or the
* `chat.startStream` call the SDK issues inside `stop()` when the buffer
* never flushed) cannot finalize the stream for the current recipient or
* team. Either the caller falls back to a normal message (see
* {@link SlackStreamNotDeliveredError}) or, if prior appends already
* delivered text, the error is logged verbosely and swallowed.
*/
const BENIGN_SLACK_FINALIZE_ERROR_CODES = new Set<string>([
// Slack Connect recipients: finalize fails because the external user id
@@ -190,12 +254,12 @@ const BENIGN_SLACK_FINALIZE_ERROR_CODES = new Set<string>([
"missing_recipient_user_id",
]);
function isBenignSlackFinalizeError(err: unknown): boolean {
export function isBenignSlackFinalizeError(err: unknown): boolean {
const code = extractSlackErrorCode(err);
return code !== undefined && BENIGN_SLACK_FINALIZE_ERROR_CODES.has(code);
}
function extractSlackErrorCode(err: unknown): string | undefined {
export function extractSlackErrorCode(err: unknown): string | undefined {
if (!err || typeof err !== "object") {
return undefined;
}
@@ -212,14 +276,3 @@ function extractSlackErrorCode(err: unknown): string | undefined {
const match = message.match(/An API error occurred:\s*([a-z_][a-z0-9_]*)/i);
return match?.[1];
}
function formatSlackError(err: unknown): string {
const code = extractSlackErrorCode(err);
if (code) {
return code;
}
if (err instanceof Error) {
return err.message;
}
return String(err);
}