mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-05 22:30:21 +00:00
fix(slack): trim DM reply overhead and restore Codex auto transport (#53957)
* perf(slack): instrument runtime and trim DM overhead * perf(slack): lazy-init draft previews * perf(slack): add turn summary diagnostics * perf(core): trim repeated runtime setup noise * perf(core): preselect default web search providers * perf(agent): restore OpenAI auto transport defaults * refactor(slack): drop temporary perf wiring * fix(slack): address follow-up review notes * fix(security): tighten slack and runtime defaults * style(web-search): fix import ordering * style(agent): remove useless spread fallback * docs(changelog): note slack runtime hardening
This commit is contained in:
@@ -1,5 +1,10 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { isSlackStreamingEnabled, resolveSlackStreamingThreadHint } from "./dispatch.js";
|
||||
import {
|
||||
isSlackStreamingEnabled,
|
||||
resolveSlackStreamingThreadHint,
|
||||
shouldEnableSlackPreviewStreaming,
|
||||
shouldInitializeSlackDraftStream,
|
||||
} from "./dispatch.js";
|
||||
|
||||
describe("slack native streaming defaults", () => {
|
||||
it("is enabled for partial mode when native streaming is on", () => {
|
||||
@@ -45,3 +50,80 @@ describe("slack native streaming thread hint", () => {
|
||||
).toBe("2000.1");
|
||||
});
|
||||
});
|
||||
|
||||
describe("slack preview streaming eligibility", () => {
|
||||
it("stays on for room messages when streaming mode is enabled", () => {
|
||||
expect(
|
||||
shouldEnableSlackPreviewStreaming({
|
||||
mode: "partial",
|
||||
isDirectMessage: false,
|
||||
}),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("stays off for top-level DMs without a reply thread", () => {
|
||||
expect(
|
||||
shouldEnableSlackPreviewStreaming({
|
||||
mode: "partial",
|
||||
isDirectMessage: true,
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it("allows DM preview when the reply is threaded", () => {
|
||||
expect(
|
||||
shouldEnableSlackPreviewStreaming({
|
||||
mode: "partial",
|
||||
isDirectMessage: true,
|
||||
threadTs: "1000.1",
|
||||
}),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("keeps top-level DMs off even when replyToMode would create a reply thread", () => {
|
||||
const streamThreadHint = resolveSlackStreamingThreadHint({
|
||||
replyToMode: "all",
|
||||
incomingThreadTs: undefined,
|
||||
messageTs: "1000.4",
|
||||
isThreadReply: false,
|
||||
});
|
||||
|
||||
expect(
|
||||
shouldEnableSlackPreviewStreaming({
|
||||
mode: "partial",
|
||||
isDirectMessage: true,
|
||||
threadTs: undefined,
|
||||
}),
|
||||
).toBe(false);
|
||||
expect(streamThreadHint).toBe("1000.4");
|
||||
});
|
||||
});
|
||||
|
||||
describe("slack draft stream initialization", () => {
|
||||
it("stays off when preview streaming is disabled", () => {
|
||||
expect(
|
||||
shouldInitializeSlackDraftStream({
|
||||
previewStreamingEnabled: false,
|
||||
useStreaming: false,
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it("stays off when native streaming is active", () => {
|
||||
expect(
|
||||
shouldInitializeSlackDraftStream({
|
||||
previewStreamingEnabled: true,
|
||||
useStreaming: true,
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it("turns on only for preview-only paths", () => {
|
||||
expect(
|
||||
shouldInitializeSlackDraftStream({
|
||||
previewStreamingEnabled: true,
|
||||
useStreaming: false,
|
||||
}),
|
||||
).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -50,6 +50,27 @@ export function isSlackStreamingEnabled(params: {
|
||||
return params.nativeStreaming;
|
||||
}
|
||||
|
||||
export function shouldEnableSlackPreviewStreaming(params: {
|
||||
mode: "off" | "partial" | "block" | "progress";
|
||||
isDirectMessage: boolean;
|
||||
threadTs?: string;
|
||||
}): boolean {
|
||||
if (params.mode === "off") {
|
||||
return false;
|
||||
}
|
||||
if (!params.isDirectMessage) {
|
||||
return true;
|
||||
}
|
||||
return Boolean(params.threadTs);
|
||||
}
|
||||
|
||||
export function shouldInitializeSlackDraftStream(params: {
|
||||
previewStreamingEnabled: boolean;
|
||||
useStreaming: boolean;
|
||||
}): boolean {
|
||||
return params.previewStreamingEnabled && !params.useStreaming;
|
||||
}
|
||||
|
||||
export function resolveSlackStreamingThreadHint(params: {
|
||||
replyToMode: "off" | "first" | "all";
|
||||
incomingThreadTs: string | undefined;
|
||||
@@ -213,21 +234,29 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
|
||||
streamMode: account.config.streamMode,
|
||||
nativeStreaming: account.config.nativeStreaming,
|
||||
});
|
||||
const previewStreamingEnabled = slackStreaming.mode !== "off";
|
||||
const streamingEnabled = isSlackStreamingEnabled({
|
||||
mode: slackStreaming.mode,
|
||||
nativeStreaming: slackStreaming.nativeStreaming,
|
||||
});
|
||||
const streamThreadHint = resolveSlackStreamingThreadHint({
|
||||
replyToMode: prepared.replyToMode,
|
||||
incomingThreadTs,
|
||||
messageTs,
|
||||
isThreadReply,
|
||||
});
|
||||
const previewStreamingEnabled = shouldEnableSlackPreviewStreaming({
|
||||
mode: slackStreaming.mode,
|
||||
isDirectMessage: prepared.isDirectMessage,
|
||||
threadTs: streamThreadHint,
|
||||
});
|
||||
const streamingEnabled = isSlackStreamingEnabled({
|
||||
mode: slackStreaming.mode,
|
||||
nativeStreaming: slackStreaming.nativeStreaming,
|
||||
});
|
||||
const useStreaming = shouldUseStreaming({
|
||||
streamingEnabled,
|
||||
threadTs: streamThreadHint,
|
||||
});
|
||||
const shouldUseDraftStream = shouldInitializeSlackDraftStream({
|
||||
previewStreamingEnabled,
|
||||
useStreaming,
|
||||
});
|
||||
let streamSession: SlackStreamSession | null = null;
|
||||
let streamFailed = false;
|
||||
let usedReplyThreadTs: string | undefined;
|
||||
@@ -372,22 +401,24 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
|
||||
},
|
||||
});
|
||||
|
||||
const draftStream = createSlackDraftStream({
|
||||
target: prepared.replyTarget,
|
||||
token: ctx.botToken,
|
||||
accountId: account.accountId,
|
||||
maxChars: Math.min(ctx.textLimit, SLACK_TEXT_LIMIT),
|
||||
resolveThreadTs: () => {
|
||||
const ts = replyPlan.nextThreadTs();
|
||||
if (ts) {
|
||||
usedReplyThreadTs ??= ts;
|
||||
}
|
||||
return ts;
|
||||
},
|
||||
onMessageSent: () => replyPlan.markSent(),
|
||||
log: logVerbose,
|
||||
warn: logVerbose,
|
||||
});
|
||||
const draftStream = shouldUseDraftStream
|
||||
? createSlackDraftStream({
|
||||
target: prepared.replyTarget,
|
||||
token: ctx.botToken,
|
||||
accountId: account.accountId,
|
||||
maxChars: Math.min(ctx.textLimit, SLACK_TEXT_LIMIT),
|
||||
resolveThreadTs: () => {
|
||||
const ts = replyPlan.nextThreadTs();
|
||||
if (ts) {
|
||||
usedReplyThreadTs ??= ts;
|
||||
}
|
||||
return ts;
|
||||
},
|
||||
onMessageSent: () => replyPlan.markSent(),
|
||||
log: logVerbose,
|
||||
warn: logVerbose,
|
||||
})
|
||||
: undefined;
|
||||
let hasStreamedMessage = false;
|
||||
const streamMode = slackStreaming.draftMode;
|
||||
let appendRenderedText = "";
|
||||
@@ -410,7 +441,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
|
||||
if (!next.changed) {
|
||||
return;
|
||||
}
|
||||
draftStream.update(next.rendered);
|
||||
draftStream?.update(next.rendered);
|
||||
hasStreamedMessage = true;
|
||||
return;
|
||||
}
|
||||
@@ -420,26 +451,25 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
|
||||
if (statusUpdateCount > 1 && statusUpdateCount % 4 !== 0) {
|
||||
return;
|
||||
}
|
||||
draftStream.update(buildStatusFinalPreviewText(statusUpdateCount));
|
||||
draftStream?.update(buildStatusFinalPreviewText(statusUpdateCount));
|
||||
hasStreamedMessage = true;
|
||||
return;
|
||||
}
|
||||
|
||||
draftStream.update(trimmed);
|
||||
draftStream?.update(trimmed);
|
||||
hasStreamedMessage = true;
|
||||
};
|
||||
const onDraftBoundary =
|
||||
useStreaming || !previewStreamingEnabled
|
||||
? undefined
|
||||
: async () => {
|
||||
if (hasStreamedMessage) {
|
||||
draftStream.forceNewMessage();
|
||||
hasStreamedMessage = false;
|
||||
appendRenderedText = "";
|
||||
appendSourceText = "";
|
||||
statusUpdateCount = 0;
|
||||
}
|
||||
};
|
||||
const onDraftBoundary = !shouldUseDraftStream
|
||||
? undefined
|
||||
: async () => {
|
||||
if (hasStreamedMessage) {
|
||||
draftStream?.forceNewMessage();
|
||||
hasStreamedMessage = false;
|
||||
appendRenderedText = "";
|
||||
appendSourceText = "";
|
||||
statusUpdateCount = 0;
|
||||
}
|
||||
};
|
||||
|
||||
const { queuedFinal, counts } = await dispatchInboundMessage({
|
||||
ctx: prepared.ctxPayload,
|
||||
@@ -466,8 +496,8 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
|
||||
onReasoningEnd: onDraftBoundary,
|
||||
},
|
||||
});
|
||||
await draftStream.flush();
|
||||
draftStream.stop();
|
||||
await draftStream?.flush();
|
||||
draftStream?.stop();
|
||||
markDispatchIdle();
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
@@ -493,7 +523,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
|
||||
}
|
||||
|
||||
if (!anyReplyDelivered) {
|
||||
await draftStream.clear();
|
||||
await draftStream?.clear();
|
||||
if (prepared.isRoomish) {
|
||||
clearHistoryEntriesIfEnabled({
|
||||
historyMap: ctx.channelHistories,
|
||||
|
||||
@@ -27,6 +27,8 @@ const SLACK_UPLOAD_SSRF_POLICY = {
|
||||
allowedHostnames: ["*.slack.com", "*.slack-edge.com", "*.slack-files.com"],
|
||||
allowRfc2544BenchmarkRange: true,
|
||||
};
|
||||
const SLACK_DM_CHANNEL_CACHE_MAX = 1024;
|
||||
const slackDmChannelCache = new Map<string, string>();
|
||||
|
||||
type SlackRecipient =
|
||||
| {
|
||||
@@ -167,10 +169,31 @@ function parseRecipient(raw: string): SlackRecipient {
|
||||
return { kind: target.kind, id: target.id };
|
||||
}
|
||||
|
||||
function createSlackDmCacheKey(params: {
|
||||
accountId?: string;
|
||||
token: string;
|
||||
recipientId: string;
|
||||
}): string {
|
||||
return `${params.accountId ?? "default"}:${params.token}:${params.recipientId}`;
|
||||
}
|
||||
|
||||
function setSlackDmChannelCache(key: string, channelId: string): void {
|
||||
if (slackDmChannelCache.has(key)) {
|
||||
slackDmChannelCache.delete(key);
|
||||
} else if (slackDmChannelCache.size >= SLACK_DM_CHANNEL_CACHE_MAX) {
|
||||
const oldest = slackDmChannelCache.keys().next().value;
|
||||
if (oldest) {
|
||||
slackDmChannelCache.delete(oldest);
|
||||
}
|
||||
}
|
||||
slackDmChannelCache.set(key, channelId);
|
||||
}
|
||||
|
||||
async function resolveChannelId(
|
||||
client: WebClient,
|
||||
recipient: SlackRecipient,
|
||||
): Promise<{ channelId: string; isDm?: boolean }> {
|
||||
params: { accountId?: string; token: string },
|
||||
): Promise<{ channelId: string; isDm?: boolean; cacheHit?: boolean }> {
|
||||
// Bare Slack user IDs (U-prefix) may arrive with kind="channel" when the
|
||||
// target string had no explicit prefix (parseSlackTarget defaults bare IDs
|
||||
// to "channel"). chat.postMessage tolerates user IDs directly, but
|
||||
@@ -181,12 +204,26 @@ async function resolveChannelId(
|
||||
if (!isUserId) {
|
||||
return { channelId: recipient.id };
|
||||
}
|
||||
const cacheKey = createSlackDmCacheKey({
|
||||
accountId: params.accountId,
|
||||
token: params.token,
|
||||
recipientId: recipient.id,
|
||||
});
|
||||
const cachedChannelId = slackDmChannelCache.get(cacheKey);
|
||||
if (cachedChannelId) {
|
||||
return { channelId: cachedChannelId, isDm: true, cacheHit: true };
|
||||
}
|
||||
const response = await client.conversations.open({ users: recipient.id });
|
||||
const channelId = response.channel?.id;
|
||||
if (!channelId) {
|
||||
throw new Error("Failed to open Slack DM channel");
|
||||
}
|
||||
return { channelId, isDm: true };
|
||||
setSlackDmChannelCache(cacheKey, channelId);
|
||||
return { channelId, isDm: true, cacheHit: false };
|
||||
}
|
||||
|
||||
export function clearSlackDmChannelCache(): void {
|
||||
slackDmChannelCache.clear();
|
||||
}
|
||||
|
||||
async function uploadSlackFile(params: {
|
||||
@@ -276,7 +313,10 @@ export async function sendMessageSlack(
|
||||
});
|
||||
const client = opts.client ?? createSlackWebClient(token);
|
||||
const recipient = parseRecipient(to);
|
||||
const { channelId } = await resolveChannelId(client, recipient);
|
||||
const { channelId } = await resolveChannelId(client, recipient, {
|
||||
accountId: account.accountId,
|
||||
token,
|
||||
});
|
||||
if (blocks) {
|
||||
if (opts.mediaUrl) {
|
||||
throw new Error("Slack send does not support blocks with mediaUrl");
|
||||
|
||||
@@ -32,6 +32,7 @@ vi.mock("openclaw/plugin-sdk/web-media", () => ({
|
||||
}));
|
||||
|
||||
let sendMessageSlack: typeof import("./send.js").sendMessageSlack;
|
||||
let clearSlackDmChannelCache: typeof import("./send.js").clearSlackDmChannelCache;
|
||||
|
||||
type UploadTestClient = WebClient & {
|
||||
conversations: { open: ReturnType<typeof vi.fn> };
|
||||
@@ -66,7 +67,7 @@ describe("sendMessageSlack file upload with user IDs", () => {
|
||||
|
||||
beforeAll(async () => {
|
||||
vi.resetModules();
|
||||
({ sendMessageSlack } = await import("./send.js"));
|
||||
({ sendMessageSlack, clearSlackDmChannelCache } = await import("./send.js"));
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
@@ -74,6 +75,7 @@ describe("sendMessageSlack file upload with user IDs", () => {
|
||||
async () => new Response("ok", { status: 200 }),
|
||||
) as unknown as typeof fetch;
|
||||
fetchWithSsrFGuard.mockClear();
|
||||
clearSlackDmChannelCache();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
@@ -121,6 +123,44 @@ describe("sendMessageSlack file upload with user IDs", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("caches DM channel resolution per account", async () => {
|
||||
const client = createUploadTestClient();
|
||||
|
||||
await sendMessageSlack("user:UABC123", "first", {
|
||||
token: "xoxb-test",
|
||||
client,
|
||||
});
|
||||
await sendMessageSlack("user:UABC123", "second", {
|
||||
token: "xoxb-test",
|
||||
client,
|
||||
});
|
||||
|
||||
expect(client.conversations.open).toHaveBeenCalledTimes(1);
|
||||
expect(client.chat.postMessage).toHaveBeenCalledTimes(2);
|
||||
expect(client.chat.postMessage).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
expect.objectContaining({
|
||||
channel: "D99RESOLVED",
|
||||
text: "second",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("scopes DM channel resolution cache by token identity", async () => {
|
||||
const client = createUploadTestClient();
|
||||
|
||||
await sendMessageSlack("user:UABC123", "first", {
|
||||
token: "xoxb-test-a",
|
||||
client,
|
||||
});
|
||||
await sendMessageSlack("user:UABC123", "second", {
|
||||
token: "xoxb-test-b",
|
||||
client,
|
||||
});
|
||||
|
||||
expect(client.conversations.open).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("sends file directly to channel without conversations.open", async () => {
|
||||
const client = createUploadTestClient();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user