From 107d2b7a0962e0cf2692edc238e09aedc65829c7 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 25 Apr 2026 01:11:01 +0100 Subject: [PATCH] fix(slack): preserve rapid send ordering Co-authored-by: nightq Co-authored-by: xydt cqh --- CHANGELOG.md | 1 + extensions/slack/src/client-options.ts | 1 + extensions/slack/src/client.test.ts | 13 +++++ extensions/slack/src/send.ts | 66 +++++++++++++++++++++++- extensions/slack/src/send.upload.test.ts | 43 ++++++++++++++- 5 files changed, 122 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 86f4eb093ef..42904fcc1ef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Slack/messages: serialize write-client requests and whole outbound sends per target so rapid multi-message Slack replies preserve send order. Fixes #69101. (#69105) Thanks @nightq and @ztexydt-cqh. - Slack/exec approvals: resolve native approval button clicks over the Gateway instead of delivering `/approve ...` as plain agent text, preserving retry buttons if Gateway resolution fails. Fixes #71023. (#71025) Thanks @marusan03. - Slack/files: return non-image `download-file` results as local file paths instead of image payloads, and include Slack file IDs in inbound file placeholders so agents can call `download-file`. Fixes #71212. Thanks @teamrazo. - Browser control: scope standalone loopback auth to the resolved active gateway credential and fail closed when password mode lacks a resolved password, so inactive tokens or passwords no longer authorize browser routes. Fixes #65626. (#65639) Thanks @coygeek. diff --git a/extensions/slack/src/client-options.ts b/extensions/slack/src/client-options.ts index 519733c6b5c..b69245bb63e 100644 --- a/extensions/slack/src/client-options.ts +++ b/extensions/slack/src/client-options.ts @@ -91,5 +91,6 @@ export function resolveSlackWriteClientOptions(options: WebClientOptions = {}): ...options, agent: options.agent ?? resolveSlackProxyAgent(), retryConfig: options.retryConfig ?? SLACK_WRITE_RETRY_OPTIONS, + maxRequestConcurrency: options.maxRequestConcurrency ?? 1, }; } diff --git a/extensions/slack/src/client.test.ts b/extensions/slack/src/client.test.ts index 5d070d74856..e4368b45598 100644 --- a/extensions/slack/src/client.test.ts +++ b/extensions/slack/src/client.test.ts @@ -69,12 +69,25 @@ describe("slack web client config", () => { expect(options.retryConfig).toEqual(SLACK_WRITE_RETRY_OPTIONS); }); + it("serializes write client requests by default", () => { + const options = resolveSlackWriteClientOptions(); + + expect(options.maxRequestConcurrency).toBe(1); + }); + + it("respects explicit write client concurrency overrides", () => { + const options = resolveSlackWriteClientOptions({ maxRequestConcurrency: 5 }); + + expect(options.maxRequestConcurrency).toBe(5); + }); + it("passes no-retry config into the write client by default", () => { createSlackWriteClient("xoxb-test", { timeout: 4321 }); expect(WebClient).toHaveBeenCalledWith( "xoxb-test", expect.objectContaining({ + maxRequestConcurrency: 1, timeout: 4321, retryConfig: SLACK_WRITE_RETRY_OPTIONS, }), diff --git a/extensions/slack/src/send.ts b/extensions/slack/src/send.ts index d338def34d9..9a4ec0db4e6 100644 --- a/extensions/slack/src/send.ts +++ b/extensions/slack/src/send.ts @@ -31,6 +31,7 @@ const SLACK_UPLOAD_SSRF_POLICY = { }; const SLACK_DM_CHANNEL_CACHE_MAX = 1024; const slackDmChannelCache = new Map(); +const slackSendQueues = new Map>(); type SlackRecipient = | { @@ -179,6 +180,36 @@ function parseRecipient(raw: string): SlackRecipient { return { kind: target.kind, id: target.id }; } +function createSlackSendQueueKey(params: { + accountId: string; + token: string; + recipient: SlackRecipient; + threadTs?: string; +}): string { + const isUserId = params.recipient.kind === "user" || /^U[A-Z0-9]+$/i.test(params.recipient.id); + const recipientKey = `${isUserId ? "user" : params.recipient.kind}:${params.recipient.id}`; + return `${params.accountId}:${params.token}:${recipientKey}:${params.threadTs ?? ""}`; +} + +async function runQueuedSlackSend(key: string, task: () => Promise): Promise { + const previous = slackSendQueues.get(key) ?? Promise.resolve(); + let releaseCurrent!: () => void; + const current = new Promise((resolve) => { + releaseCurrent = resolve; + }); + const queuedCurrent = previous.catch(() => undefined).then(() => current); + slackSendQueues.set(key, queuedCurrent); + await previous.catch(() => undefined); + try { + return await task(); + } finally { + releaseCurrent(); + if (slackSendQueues.get(key) === queuedCurrent) { + slackSendQueues.delete(key); + } + } +} + function createSlackDmCacheKey(params: { accountId?: string; token: string; @@ -236,6 +267,10 @@ export function clearSlackDmChannelCache(): void { slackDmChannelCache.clear(); } +export function clearSlackSendQueuesForTest(): void { + slackSendQueues.clear(); +} + async function uploadSlackFile(params: { client: WebClient; channelId: string; @@ -332,8 +367,37 @@ export async function sendMessageSlack( fallbackToken: account.botToken, fallbackSource: account.botTokenSource, }); - const client = opts.client ?? createSlackWriteClient(token); const recipient = parseRecipient(to); + const queueKey = createSlackSendQueueKey({ + accountId: account.accountId, + token, + recipient, + threadTs: opts.threadTs, + }); + return await runQueuedSlackSend(queueKey, () => + sendMessageSlackQueued({ + trimmedMessage, + opts, + cfg, + account, + token, + recipient, + blocks, + }), + ); +} + +async function sendMessageSlackQueued(params: { + trimmedMessage: string; + opts: SlackSendOpts; + cfg: OpenClawConfig; + account: ReturnType; + token: string; + recipient: SlackRecipient; + blocks?: (Block | KnownBlock)[]; +}): Promise { + const { opts, cfg, account, token, recipient, blocks, trimmedMessage } = params; + const client = opts.client ?? createSlackWriteClient(token); const { channelId } = await resolveChannelId(client, recipient, { accountId: account.accountId, token, diff --git a/extensions/slack/src/send.upload.test.ts b/extensions/slack/src/send.upload.test.ts index 1b502fc5c46..214a90a7320 100644 --- a/extensions/slack/src/send.upload.test.ts +++ b/extensions/slack/src/send.upload.test.ts @@ -43,7 +43,9 @@ vi.mock("./runtime-api.js", async () => { let sendMessageSlack: typeof import("./send.js").sendMessageSlack; let clearSlackDmChannelCache: typeof import("./send.js").clearSlackDmChannelCache; -({ sendMessageSlack, clearSlackDmChannelCache } = await import("./send.js")); +let clearSlackSendQueuesForTest: typeof import("./send.js").clearSlackSendQueuesForTest; +({ sendMessageSlack, clearSlackDmChannelCache, clearSlackSendQueuesForTest } = + await import("./send.js")); const SLACK_TEST_CFG = { channels: { slack: { botToken: "xoxb-test" } } }; type UploadTestClient = WebClient & { @@ -84,6 +86,7 @@ describe("sendMessageSlack file upload with user IDs", () => { fetchWithSsrFGuard.mockClear(); loadOutboundMediaFromUrlMock.mockClear(); clearSlackDmChannelCache(); + clearSlackSendQueuesForTest(); }); afterEach(() => { @@ -158,6 +161,44 @@ describe("sendMessageSlack file upload with user IDs", () => { ); }); + it("serializes concurrent sends to the same Slack target", async () => { + const client = createUploadTestClient(); + let resolveFirst!: () => void; + client.chat.postMessage.mockImplementation(async (payload: { text?: string }) => { + if (payload.text === "first") { + await new Promise((resolve) => { + resolveFirst = resolve; + }); + return { ts: "1.000" }; + } + return { ts: "2.000" }; + }); + + const first = sendMessageSlack("channel:C123CHAN", "first", { + token: "xoxb-test", + cfg: SLACK_TEST_CFG, + client, + }); + await vi.waitFor(() => expect(client.chat.postMessage).toHaveBeenCalledTimes(1)); + + const second = sendMessageSlack("channel:C123CHAN", "second", { + token: "xoxb-test", + cfg: SLACK_TEST_CFG, + client, + }); + await Promise.resolve(); + + expect(client.chat.postMessage).toHaveBeenCalledTimes(1); + resolveFirst(); + + await expect(first).resolves.toEqual({ channelId: "C123CHAN", messageId: "1.000" }); + await expect(second).resolves.toEqual({ channelId: "C123CHAN", messageId: "2.000" }); + expect(client.chat.postMessage).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ text: "second" }), + ); + }); + it("scopes DM channel resolution cache by token identity", async () => { const client = createUploadTestClient();