From cd1977bf164ca908bb287ea283be25525c3db03f Mon Sep 17 00:00:00 2001 From: Bek Date: Tue, 21 Apr 2026 16:15:11 -0400 Subject: [PATCH] fix: make slack socket health event-driven --- extensions/slack/src/channel.test.ts | 24 ++ extensions/slack/src/monitor.test-helpers.ts | 19 +- extensions/slack/src/monitor/events.ts | 2 +- .../events/interactions.block-actions.ts | 4 + .../src/monitor/events/interactions.modal.ts | 2 + .../src/monitor/events/interactions.test.ts | 12 +- .../slack/src/monitor/events/interactions.ts | 10 +- .../src/monitor/provider.interop.test.ts | 96 ++++++++ .../src/monitor/provider.reconnect.test.ts | 34 ++- extensions/slack/src/monitor/provider.ts | 225 ++++++++++++------ extensions/slack/src/monitor/slash.test.ts | 68 +++++- extensions/slack/src/monitor/slash.ts | 5 +- src/gateway/server-methods/send.test.ts | 129 ++++++++++ src/gateway/server-methods/send.ts | 27 ++- src/plugin-sdk/status-helpers.test.ts | 3 + src/plugin-sdk/status-helpers.ts | 1 + 16 files changed, 565 insertions(+), 96 deletions(-) diff --git a/extensions/slack/src/channel.test.ts b/extensions/slack/src/channel.test.ts index 32eb23814c2..5f967d0e416 100644 --- a/extensions/slack/src/channel.test.ts +++ b/extensions/slack/src/channel.test.ts @@ -284,6 +284,10 @@ describe("slackPlugin actions", () => { }); describe("slackPlugin status", () => { + it("opts out of the generic stale socket health check", () => { + expect(slackPlugin.status?.skipStaleSocketHealthCheck).toBe(true); + }); + it("uses the direct Slack probe helper when runtime is not initialized", async () => { const probeSpy = vi.spyOn(probeModule, "probeSlack").mockResolvedValueOnce({ ok: true, @@ -316,6 +320,26 @@ describe("slackPlugin status", () => { team: { id: "T1", name: "OpenClaw" }, }); }); + + it("recovers thread routing from mixed-case Slack session keys", async () => { + const resolveRoute = slackPlugin.messaging?.resolveOutboundSessionRoute; + if (!resolveRoute) { + throw new Error("slack messaging.resolveOutboundSessionRoute unavailable"); + } + + const route = await resolveRoute({ + cfg: {} as OpenClawConfig, + agentId: "main", + target: "channel:C1", + currentSessionKey: "agent:main:slack:channel:C1:thread:1712345678.123456", + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:slack:channel:c1:thread:1712345678.123456", + baseSessionKey: "agent:main:slack:channel:c1", + threadId: "1712345678.123456", + }); + }); }); describe("slackPlugin security", () => { diff --git a/extensions/slack/src/monitor.test-helpers.ts b/extensions/slack/src/monitor.test-helpers.ts index 9be0005aecd..3825d70f7e6 100644 --- a/extensions/slack/src/monitor.test-helpers.ts +++ b/extensions/slack/src/monitor.test-helpers.ts @@ -254,6 +254,11 @@ vi.mock("@slack/bolt", () => { const { handlers, client: slackClient } = ensureSlackTestRuntime(); class App { client = slackClient; + receiver: unknown; + + constructor(args?: { receiver?: unknown }) { + this.receiver = args?.receiver; + } event(name: string, handler: SlackHandler) { handlers.set(name, handler); } @@ -266,5 +271,17 @@ vi.mock("@slack/bolt", () => { class HTTPReceiver { requestListener = vi.fn(); } - return { App, HTTPReceiver, default: { App, HTTPReceiver } }; + class SocketModeReceiver { + client = { + ...slackClient, + on: vi.fn(), + off: vi.fn(), + }; + } + return { + App, + HTTPReceiver, + SocketModeReceiver, + default: { App, HTTPReceiver, SocketModeReceiver }, + }; }); diff --git a/extensions/slack/src/monitor/events.ts b/extensions/slack/src/monitor/events.ts index 778ca9d83ca..7c637588204 100644 --- a/extensions/slack/src/monitor/events.ts +++ b/extensions/slack/src/monitor/events.ts @@ -23,5 +23,5 @@ export function registerSlackMonitorEvents(params: { registerSlackMemberEvents({ ctx: params.ctx, trackEvent: params.trackEvent }); registerSlackChannelEvents({ ctx: params.ctx, trackEvent: params.trackEvent }); registerSlackPinEvents({ ctx: params.ctx, trackEvent: params.trackEvent }); - registerSlackInteractionEvents({ ctx: params.ctx }); + registerSlackInteractionEvents({ ctx: params.ctx, trackEvent: params.trackEvent }); } diff --git a/extensions/slack/src/monitor/events/interactions.block-actions.ts b/extensions/slack/src/monitor/events/interactions.block-actions.ts index c337c42ffaf..82f9561659f 100644 --- a/extensions/slack/src/monitor/events/interactions.block-actions.ts +++ b/extensions/slack/src/monitor/events/interactions.block-actions.ts @@ -720,6 +720,7 @@ async function updateSlackLegacyBlockAction(params: { async function handleSlackBlockAction(params: { ctx: SlackMonitorContext; + trackEvent?: () => void; args: SlackActionMiddlewareArgs; formatSystemEvent: (payload: Record) => string; }): Promise { @@ -737,6 +738,7 @@ async function handleSlackBlockAction(params: { if (!parsed) { return; } + params.trackEvent?.(); const auth = await authorizeSlackBlockAction({ ctx: params.ctx, parsed, @@ -788,6 +790,7 @@ async function handleSlackBlockAction(params: { export function registerSlackBlockActionHandler(params: { ctx: SlackMonitorContext; + trackEvent?: () => void; formatSystemEvent: (payload: Record) => string; }): void { if (typeof params.ctx.app.action !== "function") { @@ -796,6 +799,7 @@ export function registerSlackBlockActionHandler(params: { params.ctx.app.action(/.+/, async (args: SlackActionMiddlewareArgs) => { await handleSlackBlockAction({ ctx: params.ctx, + trackEvent: params.trackEvent, args, formatSystemEvent: params.formatSystemEvent, }); diff --git a/extensions/slack/src/monitor/events/interactions.modal.ts b/extensions/slack/src/monitor/events/interactions.modal.ts index 9e579be94b9..fab8a7aa5ba 100644 --- a/extensions/slack/src/monitor/events/interactions.modal.ts +++ b/extensions/slack/src/monitor/events/interactions.modal.ts @@ -238,6 +238,7 @@ export function registerModalLifecycleHandler(params: { register: RegisterSlackModalHandler; matcher: RegExp; ctx: SlackMonitorContext; + trackEvent?: () => void; interactionType: SlackModalInteractionKind; contextPrefix: SlackInteractionContextPrefix; summarizeViewState: (values: unknown) => ModalInputSummary[]; @@ -251,6 +252,7 @@ export function registerModalLifecycleHandler(params: { ); return; } + params.trackEvent?.(); await emitSlackModalLifecycleEvent({ ctx: params.ctx, body: body as SlackModalBody, diff --git a/extensions/slack/src/monitor/events/interactions.test.ts b/extensions/slack/src/monitor/events/interactions.test.ts index add1c41fa81..a9b8236e852 100644 --- a/extensions/slack/src/monitor/events/interactions.test.ts +++ b/extensions/slack/src/monitor/events/interactions.test.ts @@ -225,7 +225,8 @@ describe("registerSlackInteractionEvents", () => { it("enqueues structured events and updates button rows", async () => { const { ctx, app, getHandler, resolveSessionKey } = createContext(); - registerSlackInteractionEvents({ ctx: ctx as never }); + const trackEvent = vi.fn(); + registerSlackInteractionEvents({ ctx: ctx as never, trackEvent }); const handler = getHandler(); expect(handler).toBeTruthy(); @@ -296,6 +297,7 @@ describe("registerSlackInteractionEvents", () => { channelType: "channel", senderId: "U123", }); + expect(trackEvent).toHaveBeenCalledTimes(1); expect(app.client.chat.update).toHaveBeenCalledTimes(1); }); @@ -1414,7 +1416,8 @@ describe("registerSlackInteractionEvents", () => { it("captures modal submissions and enqueues view submission event", async () => { enqueueSystemEventMock.mockClear(); const { ctx, getViewHandler, resolveSessionKey } = createContext(); - registerSlackInteractionEvents({ ctx: ctx as never }); + const trackEvent = vi.fn(); + registerSlackInteractionEvents({ ctx: ctx as never, trackEvent }); const viewHandler = getViewHandler(); expect(viewHandler).toBeTruthy(); @@ -1508,6 +1511,7 @@ describe("registerSlackInteractionEvents", () => { expect.objectContaining({ actionId: "notes_input", inputValue: "ship now" }), ]), ); + expect(trackEvent).toHaveBeenCalledTimes(1); }); it("blocks modal events when private metadata userId does not match submitter", async () => { @@ -1857,7 +1861,8 @@ describe("registerSlackInteractionEvents", () => { it("captures modal close events and enqueues view closed event", async () => { enqueueSystemEventMock.mockClear(); const { ctx, getViewClosedHandler, resolveSessionKey } = createContext(); - registerSlackInteractionEvents({ ctx: ctx as never }); + const trackEvent = vi.fn(); + registerSlackInteractionEvents({ ctx: ctx as never, trackEvent }); const viewClosedHandler = getViewClosedHandler(); expect(viewClosedHandler).toBeTruthy(); @@ -1937,6 +1942,7 @@ describe("registerSlackInteractionEvents", () => { expect.objectContaining({ actionId: "env_select", selectedValues: ["canary"] }), ]), ); + expect(trackEvent).toHaveBeenCalledTimes(1); expect(options.sessionKey).toBe("agent:main:slack:channel:C99"); }); diff --git a/extensions/slack/src/monitor/events/interactions.ts b/extensions/slack/src/monitor/events/interactions.ts index bcce09d42a2..6cdfa3f7c66 100644 --- a/extensions/slack/src/monitor/events/interactions.ts +++ b/extensions/slack/src/monitor/events/interactions.ts @@ -175,10 +175,14 @@ function summarizeViewState(values: unknown): ModalInputSummary[] { return entries; } -export function registerSlackInteractionEvents(params: { ctx: SlackMonitorContext }) { - const { ctx } = params; +export function registerSlackInteractionEvents(params: { + ctx: SlackMonitorContext; + trackEvent?: () => void; +}) { + const { ctx, trackEvent } = params; registerSlackBlockActionHandler({ ctx, + trackEvent, formatSystemEvent: formatSlackInteractionSystemEvent, }); @@ -192,6 +196,7 @@ export function registerSlackInteractionEvents(params: { ctx: SlackMonitorContex register: (matcher, handler) => ctx.app.view(matcher, handler), matcher: modalMatcher, ctx, + trackEvent, interactionType: "view_submission", contextPrefix: "slack:interaction:view", summarizeViewState, @@ -212,6 +217,7 @@ export function registerSlackInteractionEvents(params: { ctx: SlackMonitorContex register: viewClosed, matcher: modalMatcher, ctx, + trackEvent, interactionType: "view_closed", contextPrefix: "slack:interaction:view-closed", summarizeViewState, diff --git a/extensions/slack/src/monitor/provider.interop.test.ts b/extensions/slack/src/monitor/provider.interop.test.ts index 9cb3ace4cea..59754d30978 100644 --- a/extensions/slack/src/monitor/provider.interop.test.ts +++ b/extensions/slack/src/monitor/provider.interop.test.ts @@ -4,12 +4,14 @@ import { __testing } from "./provider.js"; describe("resolveSlackBoltInterop", () => { function FakeApp() {} function FakeHTTPReceiver() {} + function FakeSocketModeReceiver() {} it("uses the default import when it already exposes named exports", () => { const resolved = __testing.resolveSlackBoltInterop({ defaultImport: { App: FakeApp, HTTPReceiver: FakeHTTPReceiver, + SocketModeReceiver: FakeSocketModeReceiver, }, namespaceImport: {}, }); @@ -17,6 +19,7 @@ describe("resolveSlackBoltInterop", () => { expect(resolved).toEqual({ App: FakeApp, HTTPReceiver: FakeHTTPReceiver, + SocketModeReceiver: FakeSocketModeReceiver, }); }); @@ -26,6 +29,7 @@ describe("resolveSlackBoltInterop", () => { default: { App: FakeApp, HTTPReceiver: FakeHTTPReceiver, + SocketModeReceiver: FakeSocketModeReceiver, }, }, namespaceImport: {}, @@ -34,6 +38,7 @@ describe("resolveSlackBoltInterop", () => { expect(resolved).toEqual({ App: FakeApp, HTTPReceiver: FakeHTTPReceiver, + SocketModeReceiver: FakeSocketModeReceiver, }); }); @@ -42,12 +47,14 @@ describe("resolveSlackBoltInterop", () => { defaultImport: FakeApp, namespaceImport: { HTTPReceiver: FakeHTTPReceiver, + SocketModeReceiver: FakeSocketModeReceiver, }, }); expect(resolved).toEqual({ App: FakeApp, HTTPReceiver: FakeHTTPReceiver, + SocketModeReceiver: FakeSocketModeReceiver, }); }); @@ -58,6 +65,7 @@ describe("resolveSlackBoltInterop", () => { default: { App: FakeApp, HTTPReceiver: FakeHTTPReceiver, + SocketModeReceiver: FakeSocketModeReceiver, }, }, }); @@ -65,6 +73,7 @@ describe("resolveSlackBoltInterop", () => { expect(resolved).toEqual({ App: FakeApp, HTTPReceiver: FakeHTTPReceiver, + SocketModeReceiver: FakeSocketModeReceiver, }); }); @@ -74,12 +83,14 @@ describe("resolveSlackBoltInterop", () => { namespaceImport: { App: FakeApp, HTTPReceiver: FakeHTTPReceiver, + SocketModeReceiver: FakeSocketModeReceiver, }, }); expect(resolved).toEqual({ App: FakeApp, HTTPReceiver: FakeHTTPReceiver, + SocketModeReceiver: FakeSocketModeReceiver, }); }); @@ -92,3 +103,88 @@ describe("resolveSlackBoltInterop", () => { ).toThrow("Unable to resolve @slack/bolt App/HTTPReceiver exports"); }); }); + +describe("createSlackBoltApp", () => { + class FakeApp { + args: Record; + + constructor(args: Record) { + this.args = args; + } + } + + class FakeHTTPReceiver { + args: Record; + + constructor(args: Record) { + this.args = args; + } + } + + class FakeSocketModeReceiver { + args: Record; + + constructor(args: Record) { + this.args = args; + } + } + + it("uses SocketModeReceiver with OpenClaw-owned reconnects and shared client options", () => { + const clientOptions = { teamId: "T1" }; + const { app, receiver } = __testing.createSlackBoltApp({ + interop: { + App: FakeApp as never, + HTTPReceiver: FakeHTTPReceiver as never, + SocketModeReceiver: FakeSocketModeReceiver as never, + }, + slackMode: "socket", + botToken: "xoxb-test", + appToken: "xapp-test", + slackWebhookPath: "/slack/events", + clientOptions, + }); + + expect(receiver).toBeInstanceOf(FakeSocketModeReceiver); + expect((receiver as unknown as FakeSocketModeReceiver).args).toEqual({ + appToken: "xapp-test", + autoReconnectEnabled: false, + installerOptions: { + clientOptions, + }, + }); + expect(app).toBeInstanceOf(FakeApp); + expect((app as unknown as FakeApp).args).toEqual({ + token: "xoxb-test", + receiver, + clientOptions, + }); + }); + + it("uses HTTPReceiver for webhook mode", () => { + const clientOptions = { teamId: "T1" }; + const { app, receiver } = __testing.createSlackBoltApp({ + interop: { + App: FakeApp as never, + HTTPReceiver: FakeHTTPReceiver as never, + SocketModeReceiver: FakeSocketModeReceiver as never, + }, + slackMode: "http", + botToken: "xoxb-test", + signingSecret: "secret", + slackWebhookPath: "/slack/events", + clientOptions, + }); + + expect(receiver).toBeInstanceOf(FakeHTTPReceiver); + expect((receiver as unknown as FakeHTTPReceiver).args).toEqual({ + signingSecret: "secret", + endpoints: "/slack/events", + }); + expect(app).toBeInstanceOf(FakeApp); + expect((app as unknown as FakeApp).args).toEqual({ + token: "xoxb-test", + receiver, + clientOptions, + }); + }); +}); diff --git a/extensions/slack/src/monitor/provider.reconnect.test.ts b/extensions/slack/src/monitor/provider.reconnect.test.ts index f43c657edfc..517a66fa3a9 100644 --- a/extensions/slack/src/monitor/provider.reconnect.test.ts +++ b/extensions/slack/src/monitor/provider.reconnect.test.ts @@ -22,7 +22,7 @@ class FakeEmitter { } describe("slack socket reconnect helpers", () => { - it("seeds event liveness when socket mode connects", () => { + it("marks socket mode healthy without seeding event liveness on connect", () => { const setStatus = vi.fn(); __testing.publishSlackConnectedStatus(setStatus); @@ -32,13 +32,16 @@ describe("slack socket reconnect helpers", () => { expect.objectContaining({ connected: true, lastConnectedAt: expect.any(Number), - lastEventAt: expect.any(Number), + healthState: "healthy", lastError: null, }), ); + expect(setStatus).not.toHaveBeenCalledWith( + expect.objectContaining({ lastEventAt: expect.any(Number) }), + ); }); - it("clears connected state when socket mode disconnects", () => { + it("marks socket mode disconnected when an error closes the socket", () => { const setStatus = vi.fn(); const err = new Error("dns down"); @@ -47,6 +50,7 @@ describe("slack socket reconnect helpers", () => { expect(setStatus).toHaveBeenCalledTimes(1); expect(setStatus).toHaveBeenCalledWith({ connected: false, + healthState: "disconnected", lastDisconnect: { at: expect.any(Number), error: "dns down", @@ -55,7 +59,7 @@ describe("slack socket reconnect helpers", () => { }); }); - it("clears connected state without error when socket mode disconnects cleanly", () => { + it("marks socket mode disconnected without error when the socket closes cleanly", () => { const setStatus = vi.fn(); __testing.publishSlackDisconnectedStatus(setStatus); @@ -63,6 +67,7 @@ describe("slack socket reconnect helpers", () => { expect(setStatus).toHaveBeenCalledTimes(1); expect(setStatus).toHaveBeenCalledWith({ connected: false, + healthState: "disconnected", lastDisconnect: { at: expect.any(Number), }, @@ -91,6 +96,27 @@ describe("slack socket reconnect helpers", () => { await expect(waiter).resolves.toEqual({ event: "error", error: err }); }); + it("installs the disconnect waiter before socket start completes", async () => { + const client = new FakeEmitter(); + const app = { + receiver: { client }, + start: vi.fn().mockImplementation(async () => { + client.emit("disconnected"); + }), + }; + const onStarted = vi.fn(); + + await expect( + __testing.startSlackSocketAndWaitForDisconnect({ + app: app as never, + onStarted, + }), + ).resolves.toEqual({ event: "disconnect" }); + + expect(app.start).toHaveBeenCalledTimes(1); + expect(onStarted).toHaveBeenCalledTimes(1); + }); + it("preserves error payload from unable_to_socket_mode_start event", async () => { const client = new FakeEmitter(); const app = { receiver: { client } }; diff --git a/extensions/slack/src/monitor/provider.ts b/extensions/slack/src/monitor/provider.ts index 52324ebd428..db0161b8295 100644 --- a/extensions/slack/src/monitor/provider.ts +++ b/extensions/slack/src/monitor/provider.ts @@ -10,7 +10,6 @@ import { import { CHANNEL_APPROVAL_NATIVE_RUNTIME_CONTEXT_CAPABILITY } from "openclaw/plugin-sdk/approval-handler-adapter-runtime"; import { registerChannelRuntimeContext } from "openclaw/plugin-sdk/channel-runtime-context"; import type { SessionScope } from "openclaw/plugin-sdk/config-runtime"; -import { createConnectedChannelStatusPatch } from "openclaw/plugin-sdk/gateway-runtime"; import { DEFAULT_GROUP_HISTORY_LIMIT } from "openclaw/plugin-sdk/reply-history"; import { normalizeMainKey } from "openclaw/plugin-sdk/routing"; import { warn } from "openclaw/plugin-sdk/runtime-env"; @@ -56,9 +55,11 @@ import type { MonitorSlackOpts } from "./types.js"; type SlackAppConstructor = typeof import("@slack/bolt").App; type SlackHttpReceiverConstructor = typeof import("@slack/bolt").HTTPReceiver; +type SlackSocketModeReceiverConstructor = typeof import("@slack/bolt").SocketModeReceiver; type SlackBoltResolvedExports = { App: SlackAppConstructor; HTTPReceiver: SlackHttpReceiverConstructor; + SocketModeReceiver: SlackSocketModeReceiverConstructor; }; type SlackSocketShutdownClient = { shuttingDown?: boolean; @@ -78,15 +79,18 @@ function resolveSlackBoltModule(value: unknown): SlackBoltResolvedExports | null } const app = Reflect.get(value, "App"); const httpReceiver = Reflect.get(value, "HTTPReceiver"); + const socketModeReceiver = Reflect.get(value, "SocketModeReceiver"); if ( !isConstructorFunction(app) || - !isConstructorFunction(httpReceiver) + !isConstructorFunction(httpReceiver) || + !isConstructorFunction(socketModeReceiver) ) { return null; } return { App: app, HTTPReceiver: httpReceiver, + SocketModeReceiver: socketModeReceiver, }; } @@ -107,6 +111,10 @@ function resolveSlackBoltInterop(params: { namespaceImport && typeof namespaceImport === "object" ? Reflect.get(namespaceImport, "HTTPReceiver") : undefined; + const namespaceSocketModeReceiver = + namespaceImport && typeof namespaceImport === "object" + ? Reflect.get(namespaceImport, "SocketModeReceiver") + : undefined; const directModule = resolveSlackBoltModule(defaultImport) ?? resolveSlackBoltModule(nestedDefault) ?? @@ -117,11 +125,13 @@ function resolveSlackBoltInterop(params: { } if ( isConstructorFunction(defaultImport) && - isConstructorFunction(namespaceReceiver) + isConstructorFunction(namespaceReceiver) && + isConstructorFunction(namespaceSocketModeReceiver) ) { return { App: defaultImport, HTTPReceiver: namespaceReceiver, + SocketModeReceiver: namespaceSocketModeReceiver, }; } throw new TypeError("Unable to resolve @slack/bolt App/HTTPReceiver exports"); @@ -157,7 +167,9 @@ function publishSlackConnectedStatus(setStatus?: (next: Record) } const now = Date.now(); setStatus({ - ...createConnectedChannelStatusPatch(now), + connected: true, + lastConnectedAt: now, + healthState: "healthy", lastError: null, }); } @@ -173,11 +185,80 @@ function publishSlackDisconnectedStatus( const message = error ? formatUnknownError(error) : undefined; setStatus({ connected: false, + healthState: "disconnected", lastDisconnect: message ? { at, error: message } : { at }, lastError: message ?? null, }); } +function createSlackBoltApp(params: { + interop: SlackBoltResolvedExports; + slackMode: "socket" | "http"; + botToken: string; + appToken?: string; + signingSecret?: string; + slackWebhookPath: string; + clientOptions: Record; +}) { + const receiver = + params.slackMode === "socket" + ? new params.interop.SocketModeReceiver({ + appToken: params.appToken ?? "", + autoReconnectEnabled: false, + installerOptions: { + clientOptions: params.clientOptions, + }, + }) + : new params.interop.HTTPReceiver({ + signingSecret: params.signingSecret ?? "", + endpoints: params.slackWebhookPath, + }); + const app = new params.interop.App({ + token: params.botToken, + receiver, + clientOptions: params.clientOptions, + }); + return { app, receiver }; +} + +function createSlackSocketDisconnectWaiter(app: unknown, abortSignal?: AbortSignal) { + const waiterAbortController = new AbortController(); + const relayAbort = () => waiterAbortController.abort(); + abortSignal?.addEventListener("abort", relayAbort, { once: true }); + return { + promise: waitForSlackSocketDisconnect(app, waiterAbortController.signal), + cancel: () => { + waiterAbortController.abort(); + abortSignal?.removeEventListener("abort", relayAbort); + }, + complete: () => { + abortSignal?.removeEventListener("abort", relayAbort); + }, + }; +} + +async function startSlackSocketAndWaitForDisconnect(params: { + app: { start: () => unknown }; + abortSignal?: AbortSignal; + onStarted?: () => void; +}) { + const disconnectWaiter = createSlackSocketDisconnectWaiter(params.app, params.abortSignal); + try { + await Promise.resolve(params.app.start()); + } catch (err) { + disconnectWaiter.cancel(); + throw err; + } + if (params.abortSignal?.aborted) { + disconnectWaiter.cancel(); + return null; + } + params.onStarted?.(); + const disconnect = await disconnectWaiter.promise; + disconnectWaiter.complete(); + return disconnect; +} + function resolveSlackSocketShutdownClient(app: unknown): SlackSocketShutdownClient | undefined { if (!app || typeof app !== "object") { return undefined; @@ -325,30 +406,16 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { const typingReaction = slackCfg.typingReaction?.trim() ?? ""; const mediaMaxBytes = (opts.mediaMaxMb ?? slackCfg.mediaMaxMb ?? 20) * 1024 * 1024; const removeAckAfterReply = cfg.messages?.removeAckAfterReply ?? false; - const { App, HTTPReceiver } = getSlackBoltInterop(); - - const receiver = - slackMode === "http" - ? new HTTPReceiver({ - signingSecret: signingSecret ?? "", - endpoints: slackWebhookPath, - }) - : null; const clientOptions = resolveSlackWebClientOptions(); - const app = new App( - slackMode === "socket" - ? { - token: botToken, - appToken, - socketMode: true, - clientOptions, - } - : { - token: botToken, - receiver: receiver ?? undefined, - clientOptions, - }, - ); + const { app, receiver } = createSlackBoltApp({ + interop: getSlackBoltInterop(), + slackMode, + botToken, + appToken: appToken ?? undefined, + signingSecret: signingSecret ?? undefined, + slackWebhookPath, + clientOptions: clientOptions as Record, + }); // Pre-set shuttingDown on the SocketModeClient before app.stop() to prevent // a race where the library's internal ping timeout fires disconnect() before @@ -361,6 +428,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { const slackHttpHandler = slackMode === "http" && receiver ? async (req: IncomingMessage, res: ServerResponse) => { + const httpReceiver = receiver as InstanceType; const guard = installRequestBodyLimitGuard(req, res, { maxBytes: SLACK_WEBHOOK_MAX_BODY_BYTES, timeoutMs: SLACK_WEBHOOK_BODY_TIMEOUT_MS, @@ -370,7 +438,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { return; } try { - await Promise.resolve(receiver.requestListener(req, res)); + await Promise.resolve(httpReceiver.requestListener(req, res)); } catch (err) { if (!guard.isTripped()) { throw err; @@ -470,7 +538,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { } registerSlackMonitorEvents({ ctx, account, handleSlackMessage, trackEvent }); - await registerSlackMonitorSlashCommands({ ctx, account }); + await registerSlackMonitorSlashCommands({ ctx, account, trackEvent }); if (slackMode === "http" && slackHttpHandler) { unregisterHttpHandler = registerSlackHttpHandler({ path: slackWebhookPath, @@ -592,10 +660,55 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { let reconnectAttempts = 0; while (!opts.abortSignal?.aborted) { try { - await app.start(); - reconnectAttempts = 0; - publishSlackConnectedStatus(opts.setStatus); - runtime.log?.("slack socket mode connected"); + const disconnect = await startSlackSocketAndWaitForDisconnect({ + app, + abortSignal: opts.abortSignal, + onStarted: () => { + reconnectAttempts = 0; + publishSlackConnectedStatus(opts.setStatus); + runtime.log?.("slack socket mode connected"); + }, + }); + if (!disconnect) { + break; + } + if (opts.abortSignal?.aborted) { + break; + } + publishSlackDisconnectedStatus(opts.setStatus, disconnect.error); + + // Bail immediately on non-recoverable auth errors during reconnect too. + if (disconnect.error && isNonRecoverableSlackAuthError(disconnect.error)) { + runtime.error?.( + `slack socket mode disconnected due to non-recoverable auth error — skipping channel (${formatUnknownError(disconnect.error)})`, + ); + throw disconnect.error instanceof Error + ? disconnect.error + : new Error(formatUnknownError(disconnect.error)); + } + + reconnectAttempts += 1; + if ( + SLACK_SOCKET_RECONNECT_POLICY.maxAttempts > 0 && + reconnectAttempts >= SLACK_SOCKET_RECONNECT_POLICY.maxAttempts + ) { + throw new Error( + `Slack socket mode reconnect max attempts reached (${reconnectAttempts}/${SLACK_SOCKET_RECONNECT_POLICY.maxAttempts}) after ${disconnect.event}`, + ); + } + + const delayMs = computeBackoff(SLACK_SOCKET_RECONNECT_POLICY, reconnectAttempts); + runtime.error?.( + `slack socket disconnected (${disconnect.event}). retry ${reconnectAttempts}/${SLACK_SOCKET_RECONNECT_POLICY.maxAttempts || "∞"} in ${Math.round(delayMs / 1000)}s${ + disconnect.error ? ` (${formatUnknownError(disconnect.error)})` : "" + }`, + ); + await gracefulStop(); + try { + await sleepWithAbort(delayMs, opts.abortSignal); + } catch { + break; + } } catch (err) { // Auth errors (account_inactive, invalid_auth, etc.) are permanent — // retrying will never succeed and blocks the entire gateway. Fail fast. @@ -623,49 +736,6 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { } continue; } - - if (opts.abortSignal?.aborted) { - break; - } - - const disconnect = await waitForSlackSocketDisconnect(app, opts.abortSignal); - if (opts.abortSignal?.aborted) { - break; - } - publishSlackDisconnectedStatus(opts.setStatus, disconnect.error); - - // Bail immediately on non-recoverable auth errors during reconnect too. - if (disconnect.error && isNonRecoverableSlackAuthError(disconnect.error)) { - runtime.error?.( - `slack socket mode disconnected due to non-recoverable auth error — skipping channel (${formatUnknownError(disconnect.error)})`, - ); - throw disconnect.error instanceof Error - ? disconnect.error - : new Error(formatUnknownError(disconnect.error)); - } - - reconnectAttempts += 1; - if ( - SLACK_SOCKET_RECONNECT_POLICY.maxAttempts > 0 && - reconnectAttempts >= SLACK_SOCKET_RECONNECT_POLICY.maxAttempts - ) { - throw new Error( - `Slack socket mode reconnect max attempts reached (${reconnectAttempts}/${SLACK_SOCKET_RECONNECT_POLICY.maxAttempts}) after ${disconnect.event}`, - ); - } - - const delayMs = computeBackoff(SLACK_SOCKET_RECONNECT_POLICY, reconnectAttempts); - runtime.error?.( - `slack socket disconnected (${disconnect.event}). retry ${reconnectAttempts}/${SLACK_SOCKET_RECONNECT_POLICY.maxAttempts || "∞"} in ${Math.round(delayMs / 1000)}s${ - disconnect.error ? ` (${formatUnknownError(disconnect.error)})` : "" - }`, - ); - await gracefulStop(); - try { - await sleepWithAbort(delayMs, opts.abortSignal); - } catch { - break; - } } } else { runtime.log?.(`slack http mode listening at ${slackWebhookPath}`); @@ -698,6 +768,9 @@ export const __testing = { resolveSlackRuntimeGroupPolicy: resolveOpenProviderRuntimeGroupPolicy, resolveDefaultGroupPolicy, resolveSlackBoltInterop, + createSlackBoltApp, + createSlackSocketDisconnectWaiter, + startSlackSocketAndWaitForDisconnect, getSocketEmitter, waitForSlackSocketDisconnect, }; diff --git a/extensions/slack/src/monitor/slash.test.ts b/extensions/slack/src/monitor/slash.test.ts index b4f8eb77ee0..f57ff470863 100644 --- a/extensions/slack/src/monitor/slash.test.ts +++ b/extensions/slack/src/monitor/slash.test.ts @@ -190,8 +190,12 @@ beforeEach(() => { resetSlackSlashMocks(); }); -async function registerCommands(ctx: unknown, account: unknown) { - await registerSlackMonitorSlashCommands({ ctx: ctx as never, account: account as never }); +async function registerCommands(ctx: unknown, account: unknown, trackEvent?: () => void) { + await registerSlackMonitorSlashCommands({ + ctx: ctx as never, + account: account as never, + trackEvent, + } as never); } function encodeValue(parts: { command: string; arg: string; value: string; userId: string }) { @@ -569,6 +573,17 @@ describe("Slack native command argument menus", () => { expect(call.ctx?.Body).toBe("/usage tokens"); }); + it("tracks accepted slash command activity", async () => { + const trackingHarness = createArgMenusHarness(); + const trackEvent = vi.fn(); + await registerCommands(trackingHarness.ctx, trackingHarness.account, trackEvent); + const usageTrackingHandler = requireHandler(trackingHarness.commands, "/usage", "/usage"); + + await runCommandHandler(usageTrackingHandler); + + expect(trackEvent).toHaveBeenCalledTimes(1); + }); + it("maps /agentstatus to /status when dispatching", async () => { await runCommandHandler(agentStatusHandler); expectSingleDispatchedSlashBody("/status"); @@ -639,6 +654,36 @@ describe("Slack native command argument menus", () => { expect(optionTexts.some((text) => text.includes("Period 12"))).toBe(true); }); + it("tracks accepted external_select option requests", async () => { + const trackingHarness = createArgMenusHarness(); + const trackEvent = vi.fn(); + await registerCommands(trackingHarness.ctx, trackingHarness.account, trackEvent); + const reportExternalTrackingHandler = requireHandler( + trackingHarness.commands, + "/reportexternal", + "/reportexternal", + ); + const argMenuOptionsTrackingHandler = requireHandler( + trackingHarness.options, + "openclaw_cmdarg", + "arg-menu options", + ); + const { blockId } = await runCommandAndResolveActionsBlock(reportExternalTrackingHandler); + const ackOptions = vi.fn().mockResolvedValue(undefined); + trackEvent.mockClear(); + + await argMenuOptionsTrackingHandler({ + ack: ackOptions, + body: { + user: { id: "U1" }, + value: "period 12", + actions: [{ block_id: blockId }], + }, + }); + + expect(trackEvent).toHaveBeenCalledTimes(1); + }); + it("rejects external_select option requests without user identity", async () => { const { blockId } = await runCommandAndResolveActionsBlock(reportExternalHandler); expect(blockId).toContain("openclaw_cmdarg_ext:"); @@ -672,6 +717,25 @@ describe("Slack native command argument menus", () => { }); }); + it("tracks accepted arg-menu actions", async () => { + const trackingHarness = createArgMenusHarness(); + const trackEvent = vi.fn(); + await registerCommands(trackingHarness.ctx, trackingHarness.account, trackEvent); + const argMenuTrackingHandler = requireHandler( + trackingHarness.actions, + /^openclaw_cmdarg/, + "arg-menu action", + ); + + await runArgMenuAction(argMenuTrackingHandler, { + action: { + value: encodeValue({ command: "usage", arg: "mode", value: "tokens", userId: "U1" }), + }, + }); + + expect(trackEvent).toHaveBeenCalledTimes(1); + }); + it("falls back to postEphemeral with token when respond is unavailable", async () => { await runArgMenuAction(argMenuHandler, { action: { value: "garbage" }, diff --git a/extensions/slack/src/monitor/slash.ts b/extensions/slack/src/monitor/slash.ts index e29689f6e8b..a9449432b0e 100644 --- a/extensions/slack/src/monitor/slash.ts +++ b/extensions/slack/src/monitor/slash.ts @@ -282,8 +282,9 @@ function buildSlackCommandArgMenuBlocks(params: { export async function registerSlackMonitorSlashCommands(params: { ctx: SlackMonitorContext; account: ResolvedSlackAccount; + trackEvent?: () => void; }): Promise { - const { ctx, account } = params; + const { ctx, account, trackEvent } = params; const cfg = ctx.cfg; const runtime = ctx.runtime; @@ -313,6 +314,7 @@ export async function registerSlackMonitorSlashCommands(params: { ); return; } + trackEvent?.(); if (!prompt.trim()) { await ack({ text: "Message required.", @@ -768,6 +770,7 @@ export async function registerSlackMonitorSlashCommands(params: { runtime.log?.("slack: drop slash arg options payload (mismatched app/team)"); return; } + trackEvent?.(); const typedBody = body as { value?: string; user?: { id?: string }; diff --git a/src/gateway/server-methods/send.test.ts b/src/gateway/server-methods/send.test.ts index 81eb243e40c..1e3ab7ae6a9 100644 --- a/src/gateway/server-methods/send.test.ts +++ b/src/gateway/server-methods/send.test.ts @@ -817,6 +817,135 @@ describe("gateway send mirroring", () => { ); }); + it("updates mirror session keys and delivery thread ids when Slack routing derives a thread", async () => { + mockDeliverySuccess("m-thread-derived"); + mocks.resolveOutboundSessionRoute.mockResolvedValueOnce({ + sessionKey: "agent:main:slack:channel:c1:thread:1710000000.9999", + baseSessionKey: "agent:main:slack:channel:c1", + peer: { kind: "channel", id: "c1" }, + chatType: "channel", + from: "slack:channel:C1", + to: "channel:C1", + threadId: "1710000000.9999", + }); + + await runSend({ + to: "channel:C1", + message: "threaded", + channel: "slack", + sessionKey: "agent:main:slack:channel:c1", + idempotencyKey: "idem-thread-derived", + }); + + expect(mocks.ensureOutboundSessionEntry).toHaveBeenCalledWith( + expect.objectContaining({ + route: expect.objectContaining({ + sessionKey: "agent:main:slack:channel:c1:thread:1710000000.9999", + baseSessionKey: "agent:main:slack:channel:c1", + threadId: "1710000000.9999", + }), + }), + ); + expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith( + expect.objectContaining({ + threadId: "1710000000.9999", + mirror: expect.objectContaining({ + sessionKey: "agent:main:slack:channel:c1:thread:1710000000.9999", + }), + }), + ); + }); + + it("preserves the provided session when Slack derives a thread for a different base session", async () => { + mockDeliverySuccess("m-thread-mismatch"); + mocks.resolveOutboundSessionRoute.mockResolvedValueOnce({ + sessionKey: "agent:main:slack:channel:c2:thread:1710000000.9999", + baseSessionKey: "agent:main:slack:channel:c2", + peer: { kind: "channel", id: "c2" }, + chatType: "channel", + from: "slack:channel:C2", + to: "channel:C2", + threadId: "1710000000.9999", + }); + + await runSend({ + to: "channel:C2", + message: "threaded", + channel: "slack", + sessionKey: "agent:main:slack:channel:c1", + threadId: "1710000000.9999", + idempotencyKey: "idem-thread-mismatch", + }); + + expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith( + expect.objectContaining({ + threadId: "1710000000.9999", + session: expect.objectContaining({ + key: "agent:main:slack:channel:c1", + }), + mirror: expect.objectContaining({ + sessionKey: "agent:main:slack:channel:c1", + }), + }), + ); + }); + + it("preserves derived thread delivery for existing thread-scoped Slack session keys", async () => { + mockDeliverySuccess("m-thread-session"); + mocks.resolveOutboundSessionRoute.mockResolvedValueOnce({ + sessionKey: "agent:main:slack:channel:c1:thread:1710000000.9999", + baseSessionKey: "agent:main:slack:channel:c1", + peer: { kind: "channel", id: "c1" }, + chatType: "channel", + from: "slack:channel:C1", + to: "channel:C1", + threadId: "1710000000.9999", + }); + + await runSend({ + to: "channel:C1", + message: "threaded", + channel: "slack", + sessionKey: "agent:main:slack:channel:c1:thread:1710000000.9999", + idempotencyKey: "idem-thread-session", + }); + + expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith( + expect.objectContaining({ + threadId: "1710000000.9999", + session: expect.objectContaining({ + key: "agent:main:slack:channel:c1:thread:1710000000.9999", + }), + }), + ); + }); + + it("preserves numeric derived thread ids for non-Slack channels", async () => { + mockDeliverySuccess("m-topic-derived"); + mocks.resolveOutboundSessionRoute.mockResolvedValueOnce({ + sessionKey: "agent:main:telegram:group:-100123:thread:77", + baseSessionKey: "agent:main:telegram:group:-100123", + peer: { kind: "group", id: "-100123" }, + chatType: "group", + from: "telegram:group:-100123", + to: "channel:-100123", + threadId: 77, + }); + + await runSend({ + to: "-100123:topic:77", + message: "topic message", + channel: "telegram", + idempotencyKey: "idem-topic-derived", + }); + + expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith( + expect.objectContaining({ + threadId: 77, + }), + ); + }); + it("returns invalid request when outbound target resolution fails", async () => { mocks.resolveOutboundTarget.mockReturnValue({ ok: false, diff --git a/src/gateway/server-methods/send.ts b/src/gateway/server-methods/send.ts index 330ee489585..6b3ef493e42 100644 --- a/src/gateway/server-methods/send.ts +++ b/src/gateway/server-methods/send.ts @@ -21,6 +21,7 @@ import { maybeResolveIdLikeTarget } from "../../infra/outbound/target-resolver.j import { resolveOutboundTarget } from "../../infra/outbound/targets.js"; import { extractToolPayload } from "../../infra/outbound/tool-payload.js"; import { normalizePollInput } from "../../polls.js"; +import { parseThreadSessionSuffix } from "../../sessions/session-key-utils.js"; import { normalizeOptionalLowercaseString, normalizeOptionalString, @@ -486,13 +487,27 @@ export const sendHandlers: GatewayRequestHandlers = { resolvedTarget: idLikeTarget, threadId, }); + const providedSessionBaseKey = + parseThreadSessionSuffix(providedSessionKey).baseSessionKey ?? providedSessionKey; + const shouldUseDerivedThreadSessionKey = + channel === "slack" && + !!providedSessionKey && + !!normalizeOptionalString(derivedRoute?.threadId) && + normalizeOptionalLowercaseString(derivedRoute?.baseSessionKey) === + normalizeOptionalLowercaseString(providedSessionBaseKey) && + normalizeOptionalLowercaseString(derivedRoute?.sessionKey) !== providedSessionKey; const outboundRoute = derivedRoute ? providedSessionKey - ? { - ...derivedRoute, - sessionKey: providedSessionKey, - baseSessionKey: providedSessionKey, - } + ? shouldUseDerivedThreadSessionKey + ? { + ...derivedRoute, + baseSessionKey: derivedRoute.baseSessionKey ?? providedSessionKey, + } + : { + ...derivedRoute, + sessionKey: providedSessionKey, + baseSessionKey: providedSessionKey, + } : derivedRoute : null; if (outboundRoute) { @@ -517,7 +532,7 @@ export const sendHandlers: GatewayRequestHandlers = { payloads: outboundPayloads, session: outboundSession, gifPlayback: request.gifPlayback, - threadId: threadId ?? null, + threadId: outboundRoute?.threadId ?? threadId ?? null, deps: outboundDeps, gatewayClientScopes: client?.connect?.scopes ?? [], mirror: outboundSessionKey diff --git a/src/plugin-sdk/status-helpers.test.ts b/src/plugin-sdk/status-helpers.test.ts index 45698e51380..ec1335656c5 100644 --- a/src/plugin-sdk/status-helpers.test.ts +++ b/src/plugin-sdk/status-helpers.test.ts @@ -99,6 +99,7 @@ function createComputedStatusAdapter() { { ok: boolean } >({ defaultRuntime: createDefaultChannelRuntimeState("default"), + skipStaleSocketHealthCheck: true, resolveAccountSnapshot: ({ account, runtime, probe }) => ({ accountId: account.accountId, enabled: account.enabled, @@ -118,6 +119,7 @@ function createAsyncStatusAdapter() { { ok: boolean } >({ defaultRuntime: createDefaultChannelRuntimeState("default"), + skipStaleSocketHealthCheck: true, resolveAccountSnapshot: async ({ account, runtime, probe }) => ({ accountId: account.accountId, enabled: account.enabled, @@ -283,6 +285,7 @@ describe("computed account status adapters", () => { "builds account snapshots from $name computed account metadata and extras", async ({ createStatus }) => { const status = createStatus(); + expect(status.skipStaleSocketHealthCheck).toBe(true); await expect( Promise.resolve( status.buildAccountSnapshot?.({ diff --git a/src/plugin-sdk/status-helpers.ts b/src/plugin-sdk/status-helpers.ts index 3d8b4c1a951..54091341eb1 100644 --- a/src/plugin-sdk/status-helpers.ts +++ b/src/plugin-sdk/status-helpers.ts @@ -68,6 +68,7 @@ function buildComputedAccountStatusAdapterBase( ): Omit, "buildAccountSnapshot"> { return { defaultRuntime: options.defaultRuntime, + skipStaleSocketHealthCheck: options.skipStaleSocketHealthCheck, buildChannelSummary: options.buildChannelSummary, probeAccount: options.probeAccount, formatCapabilitiesProbe: options.formatCapabilitiesProbe,