diff --git a/CHANGELOG.md b/CHANGELOG.md index bbc7a433635..4d3f7304fbe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -67,6 +67,7 @@ Docs: https://docs.openclaw.ai - Plugin SDK/tool-result transforms: bound middleware `details`, validate in-place result mutations, and mark fail-closed middleware fallbacks with canonical `error` status. Thanks @vincentkoc. - Discord/gateway: prevent startup from getting stuck at `awaiting gateway readiness` when Carbon gateway registration races with a lifecycle reconnect. Fixes #52372. (#68159) Thanks @IVY-AI-gif. - Discord/gateway: supervise Carbon's async gateway registration promise so fatal Discord metadata failures surface through startup instead of process-level unhandled rejections. (#62451) Thanks @safzanpirani. +- Slack/streaming: suppress block replies while native or draft preview streaming owns the turn, preventing duplicate Slack delivery when block streaming is also enabled. Addresses #56675. Thanks @hsiaoa. - Plugins/cache: restore plugin command and interactive handler registries on loader cache hits without resetting interactive callback dedupe, so cached external plugins keep slash commands and callback handlers available after reloads. Fixes #71100. Thanks @BomBastikDE. - Gateway/OpenAI-compatible: report non-zero token usage for `/v1/chat/completions` when the agent run has only last-call usage metadata available. Fixes #71118. (#71242) Thanks @RenzoMXD. - Plugin SDK/tool-result transforms: restrict harness tool-result middleware to bundled plugins, fail closed on middleware errors, validate rewritten result shapes, preserve Pi per-call ids, and keep Codex media trust checks anchored to raw tool provenance. Thanks @vincentkoc. diff --git a/docs/concepts/streaming.md b/docs/concepts/streaming.md index 1d00311b64d..1317dd7ee77 100644 --- a/docs/concepts/streaming.md +++ b/docs/concepts/streaming.md @@ -156,6 +156,7 @@ Slack: - `partial` can use Slack native streaming (`chat.startStream`/`append`/`stop`) when available. - `block` uses append-style draft previews. - `progress` uses status preview text, then final answer. +- Native and draft preview streaming suppress block replies for that turn, so a Slack reply is streamed by one delivery path only. - Final media/error payloads and progress finals do not create throwaway draft messages; only text/block finals that can edit the preview flush pending draft text. Mattermost: diff --git a/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts b/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts index 922327808e0..7f3f8342297 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts @@ -28,6 +28,8 @@ class TestSlackStreamNotDeliveredError extends Error { } } let mockedNativeStreaming = false; +let mockedBlockStreamingEnabled: boolean | undefined = false; +let capturedReplyOptions: { disableBlockStreaming?: boolean } | undefined; let mockedDispatchSequence: Array<{ kind: "tool" | "block" | "final"; payload: { text: string; isError?: boolean; mediaUrl?: string; mediaUrls?: string[] }; @@ -129,7 +131,7 @@ vi.mock("openclaw/plugin-sdk/channel-reply-pipeline", () => ({ })); vi.mock("openclaw/plugin-sdk/channel-streaming", () => ({ - resolveChannelStreamingBlockEnabled: () => false, + resolveChannelStreamingBlockEnabled: () => mockedBlockStreamingEnabled, resolveChannelStreamingNativeTransport: () => mockedNativeStreaming, resolveChannelStreamingPreviewToolProgress: () => true, })); @@ -266,6 +268,7 @@ vi.mock("../reply.runtime.js", () => ({ markDispatchIdle: () => {}, }), dispatchInboundMessage: async (params: { + replyOptions?: { disableBlockStreaming?: boolean }; dispatcher: { deliver: ( payload: { text: string; isError?: boolean; mediaUrl?: string; mediaUrls?: string[] }, @@ -273,6 +276,7 @@ vi.mock("../reply.runtime.js", () => ({ ) => Promise; }; }) => { + capturedReplyOptions = params.replyOptions; for (const entry of mockedDispatchSequence) { await params.dispatcher.deliver(entry.payload, { kind: entry.kind }); } @@ -305,6 +309,8 @@ describe("dispatchPreparedSlackMessage preview fallback", () => { startSlackStreamMock.mockReset(); stopSlackStreamMock.mockReset(); mockedNativeStreaming = false; + mockedBlockStreamingEnabled = false; + capturedReplyOptions = undefined; mockedDispatchSequence = [{ kind: "final", payload: { text: FINAL_REPLY_TEXT } }]; createSlackDraftStreamMock.mockReturnValue(createDraftStreamStub()); @@ -333,6 +339,14 @@ describe("dispatchPreparedSlackMessage preview fallback", () => { ); }); + it("suppresses block streaming when Slack draft preview streaming is active", async () => { + mockedBlockStreamingEnabled = true; + + await dispatchPreparedSlackMessage(createPreparedSlackMessage()); + + expect(capturedReplyOptions?.disableBlockStreaming).toBe(true); + }); + it("keeps same-content tool and final payloads distinct after preview fallback", async () => { mockedDispatchSequence = [ { kind: "tool", payload: { text: SAME_TEXT } }, diff --git a/extensions/slack/src/monitor/message-handler/dispatch.streaming.test.ts b/extensions/slack/src/monitor/message-handler/dispatch.streaming.test.ts index 6ea4d14a7d0..c7d8ddfb078 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.streaming.test.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.streaming.test.ts @@ -2,6 +2,7 @@ import { describe, expect, it, vi } from "vitest"; import { createSlackTurnDeliveryTracker, isSlackStreamingEnabled, + resolveSlackDisableBlockStreaming, resolveSlackStreamRecipientTeamId, resolveSlackStreamingThreadHint, shouldEnableSlackPreviewStreaming, @@ -242,3 +243,52 @@ describe("slack draft stream initialization", () => { ).toBe(true); }); }); + +describe("slack block streaming suppression", () => { + it("disables block streaming when native Slack streaming is active", () => { + expect( + resolveSlackDisableBlockStreaming({ + useStreaming: true, + shouldUseDraftStream: false, + blockStreamingEnabled: true, + }), + ).toBe(true); + }); + + it("disables block streaming when draft preview streaming is active", () => { + expect( + resolveSlackDisableBlockStreaming({ + useStreaming: false, + shouldUseDraftStream: true, + blockStreamingEnabled: true, + }), + ).toBe(true); + }); + + it("respects explicit block streaming config when preview streaming is inactive", () => { + expect( + resolveSlackDisableBlockStreaming({ + useStreaming: false, + shouldUseDraftStream: false, + blockStreamingEnabled: true, + }), + ).toBe(false); + expect( + resolveSlackDisableBlockStreaming({ + useStreaming: false, + shouldUseDraftStream: false, + blockStreamingEnabled: false, + }), + ).toBe(true); + }); + + it("leaves block streaming policy unset when no channel override exists", () => { + expect( + resolveSlackDisableBlockStreaming({ + useStreaming: false, + shouldUseDraftStream: false, + blockStreamingEnabled: undefined, + }), + ).toBeUndefined(); + }); +}); diff --git a/extensions/slack/src/monitor/message-handler/dispatch.ts b/extensions/slack/src/monitor/message-handler/dispatch.ts index ab9adbf19ac..86deec6e4a1 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.ts @@ -121,6 +121,19 @@ export function shouldInitializeSlackDraftStream(params: { return params.previewStreamingEnabled && !params.useStreaming; } +export function resolveSlackDisableBlockStreaming(params: { + useStreaming: boolean; + shouldUseDraftStream: boolean; + blockStreamingEnabled: boolean | undefined; +}): boolean | undefined { + if (params.useStreaming || params.shouldUseDraftStream) { + return true; + } + return typeof params.blockStreamingEnabled === "boolean" + ? !params.blockStreamingEnabled + : undefined; +} + export function resolveSlackStreamingThreadHint(params: { replyToMode: "off" | "first" | "all" | "batched"; incomingThreadTs: string | undefined; @@ -426,6 +439,12 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag previewStreamingEnabled, useStreaming, }); + const blockStreamingEnabled = resolveChannelStreamingBlockEnabled(account.config); + const disableBlockStreaming = resolveSlackDisableBlockStreaming({ + useStreaming, + shouldUseDraftStream, + blockStreamingEnabled, + }); let streamSession: SlackStreamSession | null = null; let streamFailed = false; let usedReplyThreadTs: string | undefined; @@ -909,11 +928,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag ...replyOptions, skillFilter: prepared.channelConfig?.skills, hasRepliedRef, - disableBlockStreaming: useStreaming - ? true - : typeof resolveChannelStreamingBlockEnabled(account.config) === "boolean" - ? !resolveChannelStreamingBlockEnabled(account.config) - : undefined, + disableBlockStreaming, onModelSelected, suppressDefaultToolProgressMessages: previewToolProgressEnabled ? true : undefined, onPartialReply: useStreaming