From e55b932632127c3af05909375a47d5feb727f11c Mon Sep 17 00:00:00 2001 From: Matt Van Horn <455140+mvanhorn@users.noreply.github.com> Date: Wed, 22 Apr 2026 16:53:23 -0700 Subject: [PATCH] fix(slack): fall back to chat.postMessage when stream finalize fails pre-flush Address adversarial review finding on #70295: the prior swallow-on-benign fix silently dropped short replies to Slack Connect users. The SDK's ChatStreamer buffers text locally until buffer_size (256 default), so short replies never trigger chat.startStream via append(). streamer.stop() then issues startStream internally; on Slack Connect recipients this throws user_not_found. With the prior fix that error was swallowed and the dispatcher marked the turn delivered - user saw 'done' reaction but no message. SlackStreamSession now tracks delivered (true once any Slack API call returned a response) and pendingText (accumulation of every append + final-stop text). stopSlackStream: - swallows the benign code when delivered=true (prior append flushed; text is visible; same behavior as before) - throws a new SlackStreamNotDeliveredError carrying pendingText when delivered=false (nothing reached Slack) dispatch.ts catches SlackStreamNotDeliveredError and posts pendingText via a rename-bound chat.postMessage (to dodge the unicorn lint rule), and flips streamFallbackDelivered so anyReplyDelivered stays correct. Fixes #70295 --- .../src/monitor/message-handler/dispatch.ts | 44 +++++- extensions/slack/src/streaming.test.ts | 149 +++++++++++++++--- extensions/slack/src/streaming.ts | 119 ++++++++++---- 3 files changed, 252 insertions(+), 60 deletions(-) diff --git a/extensions/slack/src/monitor/message-handler/dispatch.ts b/extensions/slack/src/monitor/message-handler/dispatch.ts index 715e2bd7b4c..2980a8f9820 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.ts @@ -37,7 +37,12 @@ import { resolveSlackStreamingConfig, } from "../../stream-mode.js"; import type { SlackStreamSession } from "../../streaming.js"; -import { appendSlackStream, startSlackStream, stopSlackStream } from "../../streaming.js"; +import { + appendSlackStream, + SlackStreamNotDeliveredError, + startSlackStream, + stopSlackStream, +} from "../../streaming.js"; import { resolveSlackThreadTargets } from "../../threading.js"; import { normalizeSlackAllowOwnerEntry } from "../allow-list.js"; import { resolveStorePath, updateLastRoute } from "../config.runtime.js"; @@ -862,17 +867,50 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag // ----------------------------------------------------------------------- // Finalize the stream if one was started // ----------------------------------------------------------------------- + let streamFallbackDelivered = false; const finalStream = streamSession as SlackStreamSession | null; if (finalStream && !finalStream.stopped) { try { await stopSlackStream({ session: finalStream }); } catch (err) { - runtime.error?.(danger(`slack-stream: failed to stop stream: ${formatErrorMessage(err)}`)); + if (err instanceof SlackStreamNotDeliveredError) { + // Slack rejected the stream before any text reached the recipient + // (common for short replies to Slack Connect users - the SDK buffers + // under 256 chars and the internal chat.startStream inside stop() + // is the first call to Slack). Fall back to a plain chat.postMessage + // so the reply is not lost. + try { + // Rename-bind to dodge eslint-plugin-unicorn/require-post-message-target-origin + // which cannot distinguish Slack chat.postMessage from window.postMessage. + const postChatMessage = ctx.app.client.chat.postMessage.bind(ctx.app.client.chat); + await postChatMessage({ + channel: finalStream.channel, + thread_ts: finalStream.threadTs, + text: err.pendingText, + }); + streamFallbackDelivered = true; + logVerbose( + `slack-stream: streamed finalize failed (${err.slackCode}); delivered ${err.pendingText.length} chars via chat.postMessage fallback`, + ); + } catch (postErr) { + runtime.error?.( + danger( + `slack-stream: fallback chat.postMessage failed after ${err.slackCode}: ${formatErrorMessage(postErr)}`, + ), + ); + } + } else { + runtime.error?.(danger(`slack-stream: failed to stop stream: ${formatErrorMessage(err)}`)); + } } } const anyReplyDelivered = - observedReplyDelivery || queuedFinal || (counts.block ?? 0) > 0 || (counts.final ?? 0) > 0; + observedReplyDelivery || + queuedFinal || + streamFallbackDelivered || + (counts.block ?? 0) > 0 || + (counts.final ?? 0) > 0; if (statusReactionsEnabled) { if (dispatchError) { diff --git a/extensions/slack/src/streaming.test.ts b/extensions/slack/src/streaming.test.ts index 179c75b5877..efd27ce4761 100644 --- a/extensions/slack/src/streaming.test.ts +++ b/extensions/slack/src/streaming.test.ts @@ -1,16 +1,28 @@ import type { ChatStreamer } from "@slack/web-api/dist/chat-stream.js"; import { describe, expect, it, vi } from "vitest"; -import { stopSlackStream, type SlackStreamSession } from "./streaming.js"; +import { + appendSlackStream, + extractSlackErrorCode, + isBenignSlackFinalizeError, + SlackStreamNotDeliveredError, + stopSlackStream, + type SlackStreamSession, +} from "./streaming.js"; -function makeSession(stopImpl: () => Promise): SlackStreamSession { +type AppendImpl = () => Promise; +type StopImpl = () => Promise; + +function makeSession(params: { appendImpl?: AppendImpl; stopImpl?: StopImpl }): SlackStreamSession { return { streamer: { - append: vi.fn(async () => {}), - stop: vi.fn(stopImpl), + append: vi.fn(params.appendImpl ?? (async () => null)), + stop: vi.fn(params.stopImpl ?? (async () => {})), } as unknown as ChatStreamer, channel: "C123", threadTs: "1700000000.000100", stopped: false, + delivered: false, + pendingText: "", }; } @@ -21,42 +33,84 @@ function slackApiError(code: string): Error { } describe("stopSlackStream finalize error handling", () => { - it("swallows user_not_found (Slack Connect DMs) and marks the session stopped", async () => { - const session = makeSession(async () => { - throw slackApiError("user_not_found"); + it("swallows user_not_found after prior append flushed (delivered=true)", async () => { + const session = makeSession({ + appendImpl: async () => ({ ts: "1700000000.100200" }), // non-null => flushed + stopImpl: async () => { + throw slackApiError("user_not_found"); + }, }); + await appendSlackStream({ session, text: "some text that Slack saw" }); + expect(session.delivered).toBe(true); + await expect(stopSlackStream({ session })).resolves.toBeUndefined(); expect(session.stopped).toBe(true); }); - it("swallows team_not_found (Slack Connect cross-workspace) and marks stopped", async () => { - const session = makeSession(async () => { - throw slackApiError("team_not_found"); + it("throws SlackStreamNotDeliveredError when user_not_found fires before any flush", async () => { + const session = makeSession({ + appendImpl: async () => null, // null => buffered, never hit Slack + stopImpl: async () => { + throw slackApiError("user_not_found"); + }, }); + await appendSlackStream({ session, text: "short reply under buffer size" }); + expect(session.delivered).toBe(false); + + const thrown = await stopSlackStream({ session }).catch((err: unknown) => err); + expect(thrown).toBeInstanceOf(SlackStreamNotDeliveredError); + expect((thrown as SlackStreamNotDeliveredError).slackCode).toBe("user_not_found"); + expect((thrown as SlackStreamNotDeliveredError).pendingText).toBe( + "short reply under buffer size", + ); + expect(session.stopped).toBe(true); + }); + + it("throws SlackStreamNotDeliveredError carrying stop()'s final text too", async () => { + const session = makeSession({ + appendImpl: async () => null, + stopImpl: async () => { + throw slackApiError("team_not_found"); + }, + }); + await appendSlackStream({ session, text: "hello " }); + + const thrown = await stopSlackStream({ session, text: "world" }).catch((err: unknown) => err); + expect(thrown).toBeInstanceOf(SlackStreamNotDeliveredError); + expect((thrown as SlackStreamNotDeliveredError).slackCode).toBe("team_not_found"); + expect((thrown as SlackStreamNotDeliveredError).pendingText).toBe("hello world"); + }); + + it("swallows missing_recipient_user_id when delivered", async () => { + const session = makeSession({ + appendImpl: async () => ({ ts: "1700000000.100201" }), + stopImpl: async () => { + throw slackApiError("missing_recipient_user_id"); + }, + }); + await appendSlackStream({ session, text: "chars" }); await expect(stopSlackStream({ session })).resolves.toBeUndefined(); expect(session.stopped).toBe(true); }); - it("swallows missing_recipient_user_id (DM closed mid-stream) and marks stopped", async () => { - const session = makeSession(async () => { - throw slackApiError("missing_recipient_user_id"); - }); - await expect(stopSlackStream({ session })).resolves.toBeUndefined(); - expect(session.stopped).toBe(true); - }); - - it("re-throws unexpected Slack API errors so callers can log them", async () => { - const session = makeSession(async () => { - throw slackApiError("not_authed"); + it("re-throws unexpected Slack API errors even when delivered", async () => { + const session = makeSession({ + appendImpl: async () => ({ ts: "1700000000.100202" }), + stopImpl: async () => { + throw slackApiError("not_authed"); + }, }); + await appendSlackStream({ session, text: "some text" }); await expect(stopSlackStream({ session })).rejects.toThrow(/not_authed/); // Session is still marked stopped so retries do not re-enter streamer.stop. expect(session.stopped).toBe(true); }); it("re-throws non-Slack-shaped errors unchanged", async () => { - const session = makeSession(async () => { - throw new Error("socket reset"); + const session = makeSession({ + stopImpl: async () => { + throw new Error("socket reset"); + }, }); await expect(stopSlackStream({ session })).rejects.toThrow(/socket reset/); expect(session.stopped).toBe(true); @@ -65,12 +119,59 @@ describe("stopSlackStream finalize error handling", () => { it("returns a no-op on an already-stopped session", async () => { const stop = vi.fn(async () => {}); const session: SlackStreamSession = { - streamer: { append: vi.fn(async () => {}), stop } as unknown as ChatStreamer, + streamer: { append: vi.fn(async () => null), stop } as unknown as ChatStreamer, channel: "C123", threadTs: "1700000000.000100", stopped: true, + delivered: false, + pendingText: "", }; await expect(stopSlackStream({ session })).resolves.toBeUndefined(); expect(stop).not.toHaveBeenCalled(); }); + + it("marks delivered=true on successful stop() without prior flush", async () => { + const session = makeSession({ + appendImpl: async () => null, + stopImpl: async () => {}, + }); + await appendSlackStream({ session, text: "short" }); + expect(session.delivered).toBe(false); + await stopSlackStream({ session }); + expect(session.delivered).toBe(true); + }); +}); + +describe("error classification", () => { + it("isBenignSlackFinalizeError matches each allowlisted code", () => { + for (const code of ["user_not_found", "team_not_found", "missing_recipient_user_id"]) { + expect(isBenignSlackFinalizeError(slackApiError(code))).toBe(true); + } + }); + + it("isBenignSlackFinalizeError rejects non-listed codes", () => { + for (const code of ["not_authed", "ratelimited", "channel_not_found"]) { + expect(isBenignSlackFinalizeError(slackApiError(code))).toBe(false); + } + }); + + it("extractSlackErrorCode handles data.error, message fallback, and junk shapes", () => { + // Canonical SDK shape + expect(extractSlackErrorCode(slackApiError("user_not_found"))).toBe("user_not_found"); + // message-regex fallback when data is absent + expect(extractSlackErrorCode(new Error("An API error occurred: rate_limited"))).toBe( + "rate_limited", + ); + // data.error not a string - falls through to message parse + const wrongShape = new Error("plain message"); + (wrongShape as unknown as { data: unknown }).data = { error: 42 }; + expect(extractSlackErrorCode(wrongShape)).toBeUndefined(); + // data.error null - falls through + (wrongShape as unknown as { data: unknown }).data = null; + expect(extractSlackErrorCode(wrongShape)).toBeUndefined(); + // Non-object error + expect(extractSlackErrorCode("raw string")).toBeUndefined(); + expect(extractSlackErrorCode(null)).toBeUndefined(); + expect(extractSlackErrorCode(undefined)).toBeUndefined(); + }); }); diff --git a/extensions/slack/src/streaming.ts b/extensions/slack/src/streaming.ts index 1b2b478cb9f..9aab4c649c4 100644 --- a/extensions/slack/src/streaming.ts +++ b/extensions/slack/src/streaming.ts @@ -28,6 +28,19 @@ export type SlackStreamSession = { threadTs: string; /** True once stop() has been called. */ stopped: boolean; + /** + * True once any Slack API call (startStream / appendStream) has succeeded. + * The SDK buffers appended text locally until the buffer exceeds + * `buffer_size` (default 256 chars); only then does it issue a network + * call. Until `delivered` flips, nothing has actually reached Slack. + */ + delivered: boolean; + /** + * Concatenation of every `text` passed to the session. Used by the + * caller to fall back to a normal `chat.postMessage` when finalize fails + * before any append flushed the buffer. + */ + pendingText: string; }; export type StartSlackStreamParams = { @@ -61,6 +74,27 @@ export type StopSlackStreamParams = { text?: string; }; +/** + * Thrown by {@link stopSlackStream} when Slack's `chat.stopStream` rejects + * with a recipient-resolution error (see + * {@link BENIGN_SLACK_FINALIZE_ERROR_CODES}) and no prior `append` had + * flushed the buffer, so no text ever reached Slack. Carries the pending + * text so the caller can deliver it via a normal `chat.postMessage`. + */ +export class SlackStreamNotDeliveredError extends Error { + readonly pendingText: string; + readonly slackCode: string; + constructor(pendingText: string, slackCode: string) { + super( + `slack-stream: finalize failed with ${slackCode} before any text reached Slack ` + + `(${pendingText.length} chars pending)`, + ); + this.name = "SlackStreamNotDeliveredError"; + this.pendingText = pendingText; + this.slackCode = slackCode; + } +} + // --------------------------------------------------------------------------- // Stream lifecycle // --------------------------------------------------------------------------- @@ -94,13 +128,22 @@ export async function startSlackStream( channel, threadTs, stopped: false, + delivered: false, + pendingText: "", }; - // If initial text is provided, send it as the first append which will - // trigger the ChatStreamer to call chat.startStream under the hood. if (text) { - await streamer.append({ markdown_text: text }); - logVerbose(`slack-stream: appended initial text (${text.length} chars)`); + session.pendingText += text; + // `append` returns the Slack response when it actually hits the network, + // null when the buffer is still under `buffer_size` (see chat-stream.js). + // Flip `delivered` only when Slack acknowledged. + const result = await streamer.append({ markdown_text: text }); + if (result) { + session.delivered = true; + } + logVerbose( + `slack-stream: appended initial text (${text.length} chars, ${result ? "flushed" : "buffered"})`, + ); } return session; @@ -121,8 +164,12 @@ export async function appendSlackStream(params: AppendSlackStreamParams): Promis return; } - await session.streamer.append({ markdown_text: text }); - logVerbose(`slack-stream: appended ${text.length} chars`); + session.pendingText += text; + const result = await session.streamer.append({ markdown_text: text }); + if (result) { + session.delivered = true; + } + logVerbose(`slack-stream: appended ${text.length} chars (${result ? "flushed" : "buffered"})`); } /** @@ -132,10 +179,16 @@ export async function appendSlackStream(params: AppendSlackStreamParams): Promis * Optionally include final text to append before stopping. * * If Slack's `chat.stopStream` responds with a known benign finalize error - * (e.g. `user_not_found` for Slack Connect recipients - see issue #70295), - * any text already delivered via `append()` stays visible and the session - * is marked stopped. Other Slack API errors still propagate so the caller - * can record them. + * (see {@link BENIGN_SLACK_FINALIZE_ERROR_CODES}) AND any prior `append` + * has already landed on Slack, the error is swallowed and the session is + * marked stopped - the already-delivered text stays visible. + * + * If the same benign error fires before any append flushed (e.g. short + * replies that never exceeded the SDK's buffer_size), this function throws + * a {@link SlackStreamNotDeliveredError} carrying the pending text so the + * caller can deliver it via `chat.postMessage`. + * + * All other errors propagate unchanged. */ export async function stopSlackStream(params: StopSlackStreamParams): Promise { const { session, text } = params; @@ -146,6 +199,9 @@ export async function stopSlackStream(params: StopSlackStreamParams): Promise([ // Slack Connect recipients: finalize fails because the external user id @@ -190,12 +254,12 @@ const BENIGN_SLACK_FINALIZE_ERROR_CODES = new Set([ "missing_recipient_user_id", ]); -function isBenignSlackFinalizeError(err: unknown): boolean { +export function isBenignSlackFinalizeError(err: unknown): boolean { const code = extractSlackErrorCode(err); return code !== undefined && BENIGN_SLACK_FINALIZE_ERROR_CODES.has(code); } -function extractSlackErrorCode(err: unknown): string | undefined { +export function extractSlackErrorCode(err: unknown): string | undefined { if (!err || typeof err !== "object") { return undefined; } @@ -212,14 +276,3 @@ function extractSlackErrorCode(err: unknown): string | undefined { const match = message.match(/An API error occurred:\s*([a-z_][a-z0-9_]*)/i); return match?.[1]; } - -function formatSlackError(err: unknown): string { - const code = extractSlackErrorCode(err); - if (code) { - return code; - } - if (err instanceof Error) { - return err.message; - } - return String(err); -}