fix(slack): preserve rapid send ordering

Co-authored-by: nightq <zengwei@nightq.cn>
Co-authored-by: xydt cqh <cui.qianhong@xydigit.com>
This commit is contained in:
Peter Steinberger
2026-04-25 01:11:01 +01:00
parent 5f81147c4d
commit 107d2b7a09
5 changed files with 122 additions and 2 deletions

View File

@@ -10,6 +10,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Slack/messages: serialize write-client requests and whole outbound sends per target so rapid multi-message Slack replies preserve send order. Fixes #69101. (#69105) Thanks @nightq and @ztexydt-cqh.
- Slack/exec approvals: resolve native approval button clicks over the Gateway instead of delivering `/approve ...` as plain agent text, preserving retry buttons if Gateway resolution fails. Fixes #71023. (#71025) Thanks @marusan03.
- Slack/files: return non-image `download-file` results as local file paths instead of image payloads, and include Slack file IDs in inbound file placeholders so agents can call `download-file`. Fixes #71212. Thanks @teamrazo.
- Browser control: scope standalone loopback auth to the resolved active gateway credential and fail closed when password mode lacks a resolved password, so inactive tokens or passwords no longer authorize browser routes. Fixes #65626. (#65639) Thanks @coygeek.

View File

@@ -91,5 +91,6 @@ export function resolveSlackWriteClientOptions(options: WebClientOptions = {}):
...options,
agent: options.agent ?? resolveSlackProxyAgent(),
retryConfig: options.retryConfig ?? SLACK_WRITE_RETRY_OPTIONS,
maxRequestConcurrency: options.maxRequestConcurrency ?? 1,
};
}

View File

@@ -69,12 +69,25 @@ describe("slack web client config", () => {
expect(options.retryConfig).toEqual(SLACK_WRITE_RETRY_OPTIONS);
});
it("serializes write client requests by default", () => {
const options = resolveSlackWriteClientOptions();
expect(options.maxRequestConcurrency).toBe(1);
});
it("respects explicit write client concurrency overrides", () => {
const options = resolveSlackWriteClientOptions({ maxRequestConcurrency: 5 });
expect(options.maxRequestConcurrency).toBe(5);
});
it("passes no-retry config into the write client by default", () => {
createSlackWriteClient("xoxb-test", { timeout: 4321 });
expect(WebClient).toHaveBeenCalledWith(
"xoxb-test",
expect.objectContaining({
maxRequestConcurrency: 1,
timeout: 4321,
retryConfig: SLACK_WRITE_RETRY_OPTIONS,
}),

View File

@@ -31,6 +31,7 @@ const SLACK_UPLOAD_SSRF_POLICY = {
};
const SLACK_DM_CHANNEL_CACHE_MAX = 1024;
const slackDmChannelCache = new Map<string, string>();
const slackSendQueues = new Map<string, Promise<void>>();
type SlackRecipient =
| {
@@ -179,6 +180,36 @@ function parseRecipient(raw: string): SlackRecipient {
return { kind: target.kind, id: target.id };
}
function createSlackSendQueueKey(params: {
accountId: string;
token: string;
recipient: SlackRecipient;
threadTs?: string;
}): string {
const isUserId = params.recipient.kind === "user" || /^U[A-Z0-9]+$/i.test(params.recipient.id);
const recipientKey = `${isUserId ? "user" : params.recipient.kind}:${params.recipient.id}`;
return `${params.accountId}:${params.token}:${recipientKey}:${params.threadTs ?? ""}`;
}
async function runQueuedSlackSend<T>(key: string, task: () => Promise<T>): Promise<T> {
const previous = slackSendQueues.get(key) ?? Promise.resolve();
let releaseCurrent!: () => void;
const current = new Promise<void>((resolve) => {
releaseCurrent = resolve;
});
const queuedCurrent = previous.catch(() => undefined).then(() => current);
slackSendQueues.set(key, queuedCurrent);
await previous.catch(() => undefined);
try {
return await task();
} finally {
releaseCurrent();
if (slackSendQueues.get(key) === queuedCurrent) {
slackSendQueues.delete(key);
}
}
}
function createSlackDmCacheKey(params: {
accountId?: string;
token: string;
@@ -236,6 +267,10 @@ export function clearSlackDmChannelCache(): void {
slackDmChannelCache.clear();
}
export function clearSlackSendQueuesForTest(): void {
slackSendQueues.clear();
}
async function uploadSlackFile(params: {
client: WebClient;
channelId: string;
@@ -332,8 +367,37 @@ export async function sendMessageSlack(
fallbackToken: account.botToken,
fallbackSource: account.botTokenSource,
});
const client = opts.client ?? createSlackWriteClient(token);
const recipient = parseRecipient(to);
const queueKey = createSlackSendQueueKey({
accountId: account.accountId,
token,
recipient,
threadTs: opts.threadTs,
});
return await runQueuedSlackSend(queueKey, () =>
sendMessageSlackQueued({
trimmedMessage,
opts,
cfg,
account,
token,
recipient,
blocks,
}),
);
}
async function sendMessageSlackQueued(params: {
trimmedMessage: string;
opts: SlackSendOpts;
cfg: OpenClawConfig;
account: ReturnType<typeof resolveSlackAccount>;
token: string;
recipient: SlackRecipient;
blocks?: (Block | KnownBlock)[];
}): Promise<SlackSendResult> {
const { opts, cfg, account, token, recipient, blocks, trimmedMessage } = params;
const client = opts.client ?? createSlackWriteClient(token);
const { channelId } = await resolveChannelId(client, recipient, {
accountId: account.accountId,
token,

View File

@@ -43,7 +43,9 @@ vi.mock("./runtime-api.js", async () => {
let sendMessageSlack: typeof import("./send.js").sendMessageSlack;
let clearSlackDmChannelCache: typeof import("./send.js").clearSlackDmChannelCache;
({ sendMessageSlack, clearSlackDmChannelCache } = await import("./send.js"));
let clearSlackSendQueuesForTest: typeof import("./send.js").clearSlackSendQueuesForTest;
({ sendMessageSlack, clearSlackDmChannelCache, clearSlackSendQueuesForTest } =
await import("./send.js"));
const SLACK_TEST_CFG = { channels: { slack: { botToken: "xoxb-test" } } };
type UploadTestClient = WebClient & {
@@ -84,6 +86,7 @@ describe("sendMessageSlack file upload with user IDs", () => {
fetchWithSsrFGuard.mockClear();
loadOutboundMediaFromUrlMock.mockClear();
clearSlackDmChannelCache();
clearSlackSendQueuesForTest();
});
afterEach(() => {
@@ -158,6 +161,44 @@ describe("sendMessageSlack file upload with user IDs", () => {
);
});
it("serializes concurrent sends to the same Slack target", async () => {
const client = createUploadTestClient();
let resolveFirst!: () => void;
client.chat.postMessage.mockImplementation(async (payload: { text?: string }) => {
if (payload.text === "first") {
await new Promise<void>((resolve) => {
resolveFirst = resolve;
});
return { ts: "1.000" };
}
return { ts: "2.000" };
});
const first = sendMessageSlack("channel:C123CHAN", "first", {
token: "xoxb-test",
cfg: SLACK_TEST_CFG,
client,
});
await vi.waitFor(() => expect(client.chat.postMessage).toHaveBeenCalledTimes(1));
const second = sendMessageSlack("channel:C123CHAN", "second", {
token: "xoxb-test",
cfg: SLACK_TEST_CFG,
client,
});
await Promise.resolve();
expect(client.chat.postMessage).toHaveBeenCalledTimes(1);
resolveFirst();
await expect(first).resolves.toEqual({ channelId: "C123CHAN", messageId: "1.000" });
await expect(second).resolves.toEqual({ channelId: "C123CHAN", messageId: "2.000" });
expect(client.chat.postMessage).toHaveBeenNthCalledWith(
2,
expect.objectContaining({ text: "second" }),
);
});
it("scopes DM channel resolution cache by token identity", async () => {
const client = createUploadTestClient();