fix: make slack socket health event-driven

This commit is contained in:
Bek
2026-04-21 16:15:11 -04:00
committed by Peter Steinberger
parent da86ce7887
commit cd1977bf16
16 changed files with 565 additions and 96 deletions

View File

@@ -284,6 +284,10 @@ describe("slackPlugin actions", () => {
});
describe("slackPlugin status", () => {
it("opts out of the generic stale socket health check", () => {
expect(slackPlugin.status?.skipStaleSocketHealthCheck).toBe(true);
});
it("uses the direct Slack probe helper when runtime is not initialized", async () => {
const probeSpy = vi.spyOn(probeModule, "probeSlack").mockResolvedValueOnce({
ok: true,
@@ -316,6 +320,26 @@ describe("slackPlugin status", () => {
team: { id: "T1", name: "OpenClaw" },
});
});
it("recovers thread routing from mixed-case Slack session keys", async () => {
const resolveRoute = slackPlugin.messaging?.resolveOutboundSessionRoute;
if (!resolveRoute) {
throw new Error("slack messaging.resolveOutboundSessionRoute unavailable");
}
const route = await resolveRoute({
cfg: {} as OpenClawConfig,
agentId: "main",
target: "channel:C1",
currentSessionKey: "agent:main:slack:channel:C1:thread:1712345678.123456",
});
expect(route).toMatchObject({
sessionKey: "agent:main:slack:channel:c1:thread:1712345678.123456",
baseSessionKey: "agent:main:slack:channel:c1",
threadId: "1712345678.123456",
});
});
});
describe("slackPlugin security", () => {

View File

@@ -254,6 +254,11 @@ vi.mock("@slack/bolt", () => {
const { handlers, client: slackClient } = ensureSlackTestRuntime();
class App {
client = slackClient;
receiver: unknown;
constructor(args?: { receiver?: unknown }) {
this.receiver = args?.receiver;
}
event(name: string, handler: SlackHandler) {
handlers.set(name, handler);
}
@@ -266,5 +271,17 @@ vi.mock("@slack/bolt", () => {
class HTTPReceiver {
requestListener = vi.fn();
}
return { App, HTTPReceiver, default: { App, HTTPReceiver } };
class SocketModeReceiver {
client = {
...slackClient,
on: vi.fn(),
off: vi.fn(),
};
}
return {
App,
HTTPReceiver,
SocketModeReceiver,
default: { App, HTTPReceiver, SocketModeReceiver },
};
});

View File

@@ -23,5 +23,5 @@ export function registerSlackMonitorEvents(params: {
registerSlackMemberEvents({ ctx: params.ctx, trackEvent: params.trackEvent });
registerSlackChannelEvents({ ctx: params.ctx, trackEvent: params.trackEvent });
registerSlackPinEvents({ ctx: params.ctx, trackEvent: params.trackEvent });
registerSlackInteractionEvents({ ctx: params.ctx });
registerSlackInteractionEvents({ ctx: params.ctx, trackEvent: params.trackEvent });
}

View File

@@ -720,6 +720,7 @@ async function updateSlackLegacyBlockAction(params: {
async function handleSlackBlockAction(params: {
ctx: SlackMonitorContext;
trackEvent?: () => void;
args: SlackActionMiddlewareArgs;
formatSystemEvent: (payload: Record<string, unknown>) => string;
}): Promise<void> {
@@ -737,6 +738,7 @@ async function handleSlackBlockAction(params: {
if (!parsed) {
return;
}
params.trackEvent?.();
const auth = await authorizeSlackBlockAction({
ctx: params.ctx,
parsed,
@@ -788,6 +790,7 @@ async function handleSlackBlockAction(params: {
export function registerSlackBlockActionHandler(params: {
ctx: SlackMonitorContext;
trackEvent?: () => void;
formatSystemEvent: (payload: Record<string, unknown>) => string;
}): void {
if (typeof params.ctx.app.action !== "function") {
@@ -796,6 +799,7 @@ export function registerSlackBlockActionHandler(params: {
params.ctx.app.action(/.+/, async (args: SlackActionMiddlewareArgs) => {
await handleSlackBlockAction({
ctx: params.ctx,
trackEvent: params.trackEvent,
args,
formatSystemEvent: params.formatSystemEvent,
});

View File

@@ -238,6 +238,7 @@ export function registerModalLifecycleHandler(params: {
register: RegisterSlackModalHandler;
matcher: RegExp;
ctx: SlackMonitorContext;
trackEvent?: () => void;
interactionType: SlackModalInteractionKind;
contextPrefix: SlackInteractionContextPrefix;
summarizeViewState: (values: unknown) => ModalInputSummary[];
@@ -251,6 +252,7 @@ export function registerModalLifecycleHandler(params: {
);
return;
}
params.trackEvent?.();
await emitSlackModalLifecycleEvent({
ctx: params.ctx,
body: body as SlackModalBody,

View File

@@ -225,7 +225,8 @@ describe("registerSlackInteractionEvents", () => {
it("enqueues structured events and updates button rows", async () => {
const { ctx, app, getHandler, resolveSessionKey } = createContext();
registerSlackInteractionEvents({ ctx: ctx as never });
const trackEvent = vi.fn();
registerSlackInteractionEvents({ ctx: ctx as never, trackEvent });
const handler = getHandler();
expect(handler).toBeTruthy();
@@ -296,6 +297,7 @@ describe("registerSlackInteractionEvents", () => {
channelType: "channel",
senderId: "U123",
});
expect(trackEvent).toHaveBeenCalledTimes(1);
expect(app.client.chat.update).toHaveBeenCalledTimes(1);
});
@@ -1414,7 +1416,8 @@ describe("registerSlackInteractionEvents", () => {
it("captures modal submissions and enqueues view submission event", async () => {
enqueueSystemEventMock.mockClear();
const { ctx, getViewHandler, resolveSessionKey } = createContext();
registerSlackInteractionEvents({ ctx: ctx as never });
const trackEvent = vi.fn();
registerSlackInteractionEvents({ ctx: ctx as never, trackEvent });
const viewHandler = getViewHandler();
expect(viewHandler).toBeTruthy();
@@ -1508,6 +1511,7 @@ describe("registerSlackInteractionEvents", () => {
expect.objectContaining({ actionId: "notes_input", inputValue: "ship now" }),
]),
);
expect(trackEvent).toHaveBeenCalledTimes(1);
});
it("blocks modal events when private metadata userId does not match submitter", async () => {
@@ -1857,7 +1861,8 @@ describe("registerSlackInteractionEvents", () => {
it("captures modal close events and enqueues view closed event", async () => {
enqueueSystemEventMock.mockClear();
const { ctx, getViewClosedHandler, resolveSessionKey } = createContext();
registerSlackInteractionEvents({ ctx: ctx as never });
const trackEvent = vi.fn();
registerSlackInteractionEvents({ ctx: ctx as never, trackEvent });
const viewClosedHandler = getViewClosedHandler();
expect(viewClosedHandler).toBeTruthy();
@@ -1937,6 +1942,7 @@ describe("registerSlackInteractionEvents", () => {
expect.objectContaining({ actionId: "env_select", selectedValues: ["canary"] }),
]),
);
expect(trackEvent).toHaveBeenCalledTimes(1);
expect(options.sessionKey).toBe("agent:main:slack:channel:C99");
});

View File

@@ -175,10 +175,14 @@ function summarizeViewState(values: unknown): ModalInputSummary[] {
return entries;
}
export function registerSlackInteractionEvents(params: { ctx: SlackMonitorContext }) {
const { ctx } = params;
export function registerSlackInteractionEvents(params: {
ctx: SlackMonitorContext;
trackEvent?: () => void;
}) {
const { ctx, trackEvent } = params;
registerSlackBlockActionHandler({
ctx,
trackEvent,
formatSystemEvent: formatSlackInteractionSystemEvent,
});
@@ -192,6 +196,7 @@ export function registerSlackInteractionEvents(params: { ctx: SlackMonitorContex
register: (matcher, handler) => ctx.app.view(matcher, handler),
matcher: modalMatcher,
ctx,
trackEvent,
interactionType: "view_submission",
contextPrefix: "slack:interaction:view",
summarizeViewState,
@@ -212,6 +217,7 @@ export function registerSlackInteractionEvents(params: { ctx: SlackMonitorContex
register: viewClosed,
matcher: modalMatcher,
ctx,
trackEvent,
interactionType: "view_closed",
contextPrefix: "slack:interaction:view-closed",
summarizeViewState,

View File

@@ -4,12 +4,14 @@ import { __testing } from "./provider.js";
describe("resolveSlackBoltInterop", () => {
function FakeApp() {}
function FakeHTTPReceiver() {}
function FakeSocketModeReceiver() {}
it("uses the default import when it already exposes named exports", () => {
const resolved = __testing.resolveSlackBoltInterop({
defaultImport: {
App: FakeApp,
HTTPReceiver: FakeHTTPReceiver,
SocketModeReceiver: FakeSocketModeReceiver,
},
namespaceImport: {},
});
@@ -17,6 +19,7 @@ describe("resolveSlackBoltInterop", () => {
expect(resolved).toEqual({
App: FakeApp,
HTTPReceiver: FakeHTTPReceiver,
SocketModeReceiver: FakeSocketModeReceiver,
});
});
@@ -26,6 +29,7 @@ describe("resolveSlackBoltInterop", () => {
default: {
App: FakeApp,
HTTPReceiver: FakeHTTPReceiver,
SocketModeReceiver: FakeSocketModeReceiver,
},
},
namespaceImport: {},
@@ -34,6 +38,7 @@ describe("resolveSlackBoltInterop", () => {
expect(resolved).toEqual({
App: FakeApp,
HTTPReceiver: FakeHTTPReceiver,
SocketModeReceiver: FakeSocketModeReceiver,
});
});
@@ -42,12 +47,14 @@ describe("resolveSlackBoltInterop", () => {
defaultImport: FakeApp,
namespaceImport: {
HTTPReceiver: FakeHTTPReceiver,
SocketModeReceiver: FakeSocketModeReceiver,
},
});
expect(resolved).toEqual({
App: FakeApp,
HTTPReceiver: FakeHTTPReceiver,
SocketModeReceiver: FakeSocketModeReceiver,
});
});
@@ -58,6 +65,7 @@ describe("resolveSlackBoltInterop", () => {
default: {
App: FakeApp,
HTTPReceiver: FakeHTTPReceiver,
SocketModeReceiver: FakeSocketModeReceiver,
},
},
});
@@ -65,6 +73,7 @@ describe("resolveSlackBoltInterop", () => {
expect(resolved).toEqual({
App: FakeApp,
HTTPReceiver: FakeHTTPReceiver,
SocketModeReceiver: FakeSocketModeReceiver,
});
});
@@ -74,12 +83,14 @@ describe("resolveSlackBoltInterop", () => {
namespaceImport: {
App: FakeApp,
HTTPReceiver: FakeHTTPReceiver,
SocketModeReceiver: FakeSocketModeReceiver,
},
});
expect(resolved).toEqual({
App: FakeApp,
HTTPReceiver: FakeHTTPReceiver,
SocketModeReceiver: FakeSocketModeReceiver,
});
});
@@ -92,3 +103,88 @@ describe("resolveSlackBoltInterop", () => {
).toThrow("Unable to resolve @slack/bolt App/HTTPReceiver exports");
});
});
describe("createSlackBoltApp", () => {
class FakeApp {
args: Record<string, unknown>;
constructor(args: Record<string, unknown>) {
this.args = args;
}
}
class FakeHTTPReceiver {
args: Record<string, unknown>;
constructor(args: Record<string, unknown>) {
this.args = args;
}
}
class FakeSocketModeReceiver {
args: Record<string, unknown>;
constructor(args: Record<string, unknown>) {
this.args = args;
}
}
it("uses SocketModeReceiver with OpenClaw-owned reconnects and shared client options", () => {
const clientOptions = { teamId: "T1" };
const { app, receiver } = __testing.createSlackBoltApp({
interop: {
App: FakeApp as never,
HTTPReceiver: FakeHTTPReceiver as never,
SocketModeReceiver: FakeSocketModeReceiver as never,
},
slackMode: "socket",
botToken: "xoxb-test",
appToken: "xapp-test",
slackWebhookPath: "/slack/events",
clientOptions,
});
expect(receiver).toBeInstanceOf(FakeSocketModeReceiver);
expect((receiver as unknown as FakeSocketModeReceiver).args).toEqual({
appToken: "xapp-test",
autoReconnectEnabled: false,
installerOptions: {
clientOptions,
},
});
expect(app).toBeInstanceOf(FakeApp);
expect((app as unknown as FakeApp).args).toEqual({
token: "xoxb-test",
receiver,
clientOptions,
});
});
it("uses HTTPReceiver for webhook mode", () => {
const clientOptions = { teamId: "T1" };
const { app, receiver } = __testing.createSlackBoltApp({
interop: {
App: FakeApp as never,
HTTPReceiver: FakeHTTPReceiver as never,
SocketModeReceiver: FakeSocketModeReceiver as never,
},
slackMode: "http",
botToken: "xoxb-test",
signingSecret: "secret",
slackWebhookPath: "/slack/events",
clientOptions,
});
expect(receiver).toBeInstanceOf(FakeHTTPReceiver);
expect((receiver as unknown as FakeHTTPReceiver).args).toEqual({
signingSecret: "secret",
endpoints: "/slack/events",
});
expect(app).toBeInstanceOf(FakeApp);
expect((app as unknown as FakeApp).args).toEqual({
token: "xoxb-test",
receiver,
clientOptions,
});
});
});

View File

@@ -22,7 +22,7 @@ class FakeEmitter {
}
describe("slack socket reconnect helpers", () => {
it("seeds event liveness when socket mode connects", () => {
it("marks socket mode healthy without seeding event liveness on connect", () => {
const setStatus = vi.fn();
__testing.publishSlackConnectedStatus(setStatus);
@@ -32,13 +32,16 @@ describe("slack socket reconnect helpers", () => {
expect.objectContaining({
connected: true,
lastConnectedAt: expect.any(Number),
lastEventAt: expect.any(Number),
healthState: "healthy",
lastError: null,
}),
);
expect(setStatus).not.toHaveBeenCalledWith(
expect.objectContaining({ lastEventAt: expect.any(Number) }),
);
});
it("clears connected state when socket mode disconnects", () => {
it("marks socket mode disconnected when an error closes the socket", () => {
const setStatus = vi.fn();
const err = new Error("dns down");
@@ -47,6 +50,7 @@ describe("slack socket reconnect helpers", () => {
expect(setStatus).toHaveBeenCalledTimes(1);
expect(setStatus).toHaveBeenCalledWith({
connected: false,
healthState: "disconnected",
lastDisconnect: {
at: expect.any(Number),
error: "dns down",
@@ -55,7 +59,7 @@ describe("slack socket reconnect helpers", () => {
});
});
it("clears connected state without error when socket mode disconnects cleanly", () => {
it("marks socket mode disconnected without error when the socket closes cleanly", () => {
const setStatus = vi.fn();
__testing.publishSlackDisconnectedStatus(setStatus);
@@ -63,6 +67,7 @@ describe("slack socket reconnect helpers", () => {
expect(setStatus).toHaveBeenCalledTimes(1);
expect(setStatus).toHaveBeenCalledWith({
connected: false,
healthState: "disconnected",
lastDisconnect: {
at: expect.any(Number),
},
@@ -91,6 +96,27 @@ describe("slack socket reconnect helpers", () => {
await expect(waiter).resolves.toEqual({ event: "error", error: err });
});
it("installs the disconnect waiter before socket start completes", async () => {
const client = new FakeEmitter();
const app = {
receiver: { client },
start: vi.fn().mockImplementation(async () => {
client.emit("disconnected");
}),
};
const onStarted = vi.fn();
await expect(
__testing.startSlackSocketAndWaitForDisconnect({
app: app as never,
onStarted,
}),
).resolves.toEqual({ event: "disconnect" });
expect(app.start).toHaveBeenCalledTimes(1);
expect(onStarted).toHaveBeenCalledTimes(1);
});
it("preserves error payload from unable_to_socket_mode_start event", async () => {
const client = new FakeEmitter();
const app = { receiver: { client } };

View File

@@ -10,7 +10,6 @@ import {
import { CHANNEL_APPROVAL_NATIVE_RUNTIME_CONTEXT_CAPABILITY } from "openclaw/plugin-sdk/approval-handler-adapter-runtime";
import { registerChannelRuntimeContext } from "openclaw/plugin-sdk/channel-runtime-context";
import type { SessionScope } from "openclaw/plugin-sdk/config-runtime";
import { createConnectedChannelStatusPatch } from "openclaw/plugin-sdk/gateway-runtime";
import { DEFAULT_GROUP_HISTORY_LIMIT } from "openclaw/plugin-sdk/reply-history";
import { normalizeMainKey } from "openclaw/plugin-sdk/routing";
import { warn } from "openclaw/plugin-sdk/runtime-env";
@@ -56,9 +55,11 @@ import type { MonitorSlackOpts } from "./types.js";
type SlackAppConstructor = typeof import("@slack/bolt").App;
type SlackHttpReceiverConstructor = typeof import("@slack/bolt").HTTPReceiver;
type SlackSocketModeReceiverConstructor = typeof import("@slack/bolt").SocketModeReceiver;
type SlackBoltResolvedExports = {
App: SlackAppConstructor;
HTTPReceiver: SlackHttpReceiverConstructor;
SocketModeReceiver: SlackSocketModeReceiverConstructor;
};
type SlackSocketShutdownClient = {
shuttingDown?: boolean;
@@ -78,15 +79,18 @@ function resolveSlackBoltModule(value: unknown): SlackBoltResolvedExports | null
}
const app = Reflect.get(value, "App");
const httpReceiver = Reflect.get(value, "HTTPReceiver");
const socketModeReceiver = Reflect.get(value, "SocketModeReceiver");
if (
!isConstructorFunction<SlackAppConstructor>(app) ||
!isConstructorFunction<SlackHttpReceiverConstructor>(httpReceiver)
!isConstructorFunction<SlackHttpReceiverConstructor>(httpReceiver) ||
!isConstructorFunction<SlackSocketModeReceiverConstructor>(socketModeReceiver)
) {
return null;
}
return {
App: app,
HTTPReceiver: httpReceiver,
SocketModeReceiver: socketModeReceiver,
};
}
@@ -107,6 +111,10 @@ function resolveSlackBoltInterop(params: {
namespaceImport && typeof namespaceImport === "object"
? Reflect.get(namespaceImport, "HTTPReceiver")
: undefined;
const namespaceSocketModeReceiver =
namespaceImport && typeof namespaceImport === "object"
? Reflect.get(namespaceImport, "SocketModeReceiver")
: undefined;
const directModule =
resolveSlackBoltModule(defaultImport) ??
resolveSlackBoltModule(nestedDefault) ??
@@ -117,11 +125,13 @@ function resolveSlackBoltInterop(params: {
}
if (
isConstructorFunction<SlackAppConstructor>(defaultImport) &&
isConstructorFunction<SlackHttpReceiverConstructor>(namespaceReceiver)
isConstructorFunction<SlackHttpReceiverConstructor>(namespaceReceiver) &&
isConstructorFunction<SlackSocketModeReceiverConstructor>(namespaceSocketModeReceiver)
) {
return {
App: defaultImport,
HTTPReceiver: namespaceReceiver,
SocketModeReceiver: namespaceSocketModeReceiver,
};
}
throw new TypeError("Unable to resolve @slack/bolt App/HTTPReceiver exports");
@@ -157,7 +167,9 @@ function publishSlackConnectedStatus(setStatus?: (next: Record<string, unknown>)
}
const now = Date.now();
setStatus({
...createConnectedChannelStatusPatch(now),
connected: true,
lastConnectedAt: now,
healthState: "healthy",
lastError: null,
});
}
@@ -173,11 +185,80 @@ function publishSlackDisconnectedStatus(
const message = error ? formatUnknownError(error) : undefined;
setStatus({
connected: false,
healthState: "disconnected",
lastDisconnect: message ? { at, error: message } : { at },
lastError: message ?? null,
});
}
function createSlackBoltApp(params: {
interop: SlackBoltResolvedExports;
slackMode: "socket" | "http";
botToken: string;
appToken?: string;
signingSecret?: string;
slackWebhookPath: string;
clientOptions: Record<string, unknown>;
}) {
const receiver =
params.slackMode === "socket"
? new params.interop.SocketModeReceiver({
appToken: params.appToken ?? "",
autoReconnectEnabled: false,
installerOptions: {
clientOptions: params.clientOptions,
},
})
: new params.interop.HTTPReceiver({
signingSecret: params.signingSecret ?? "",
endpoints: params.slackWebhookPath,
});
const app = new params.interop.App({
token: params.botToken,
receiver,
clientOptions: params.clientOptions,
});
return { app, receiver };
}
function createSlackSocketDisconnectWaiter(app: unknown, abortSignal?: AbortSignal) {
const waiterAbortController = new AbortController();
const relayAbort = () => waiterAbortController.abort();
abortSignal?.addEventListener("abort", relayAbort, { once: true });
return {
promise: waitForSlackSocketDisconnect(app, waiterAbortController.signal),
cancel: () => {
waiterAbortController.abort();
abortSignal?.removeEventListener("abort", relayAbort);
},
complete: () => {
abortSignal?.removeEventListener("abort", relayAbort);
},
};
}
async function startSlackSocketAndWaitForDisconnect(params: {
app: { start: () => unknown };
abortSignal?: AbortSignal;
onStarted?: () => void;
}) {
const disconnectWaiter = createSlackSocketDisconnectWaiter(params.app, params.abortSignal);
try {
await Promise.resolve(params.app.start());
} catch (err) {
disconnectWaiter.cancel();
throw err;
}
if (params.abortSignal?.aborted) {
disconnectWaiter.cancel();
return null;
}
params.onStarted?.();
const disconnect = await disconnectWaiter.promise;
disconnectWaiter.complete();
return disconnect;
}
function resolveSlackSocketShutdownClient(app: unknown): SlackSocketShutdownClient | undefined {
if (!app || typeof app !== "object") {
return undefined;
@@ -325,30 +406,16 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
const typingReaction = slackCfg.typingReaction?.trim() ?? "";
const mediaMaxBytes = (opts.mediaMaxMb ?? slackCfg.mediaMaxMb ?? 20) * 1024 * 1024;
const removeAckAfterReply = cfg.messages?.removeAckAfterReply ?? false;
const { App, HTTPReceiver } = getSlackBoltInterop();
const receiver =
slackMode === "http"
? new HTTPReceiver({
signingSecret: signingSecret ?? "",
endpoints: slackWebhookPath,
})
: null;
const clientOptions = resolveSlackWebClientOptions();
const app = new App(
slackMode === "socket"
? {
token: botToken,
appToken,
socketMode: true,
clientOptions,
}
: {
token: botToken,
receiver: receiver ?? undefined,
clientOptions,
},
);
const { app, receiver } = createSlackBoltApp({
interop: getSlackBoltInterop(),
slackMode,
botToken,
appToken: appToken ?? undefined,
signingSecret: signingSecret ?? undefined,
slackWebhookPath,
clientOptions: clientOptions as Record<string, unknown>,
});
// Pre-set shuttingDown on the SocketModeClient before app.stop() to prevent
// a race where the library's internal ping timeout fires disconnect() before
@@ -361,6 +428,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
const slackHttpHandler =
slackMode === "http" && receiver
? async (req: IncomingMessage, res: ServerResponse) => {
const httpReceiver = receiver as InstanceType<SlackHttpReceiverConstructor>;
const guard = installRequestBodyLimitGuard(req, res, {
maxBytes: SLACK_WEBHOOK_MAX_BODY_BYTES,
timeoutMs: SLACK_WEBHOOK_BODY_TIMEOUT_MS,
@@ -370,7 +438,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
return;
}
try {
await Promise.resolve(receiver.requestListener(req, res));
await Promise.resolve(httpReceiver.requestListener(req, res));
} catch (err) {
if (!guard.isTripped()) {
throw err;
@@ -470,7 +538,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
}
registerSlackMonitorEvents({ ctx, account, handleSlackMessage, trackEvent });
await registerSlackMonitorSlashCommands({ ctx, account });
await registerSlackMonitorSlashCommands({ ctx, account, trackEvent });
if (slackMode === "http" && slackHttpHandler) {
unregisterHttpHandler = registerSlackHttpHandler({
path: slackWebhookPath,
@@ -592,10 +660,55 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
let reconnectAttempts = 0;
while (!opts.abortSignal?.aborted) {
try {
await app.start();
reconnectAttempts = 0;
publishSlackConnectedStatus(opts.setStatus);
runtime.log?.("slack socket mode connected");
const disconnect = await startSlackSocketAndWaitForDisconnect({
app,
abortSignal: opts.abortSignal,
onStarted: () => {
reconnectAttempts = 0;
publishSlackConnectedStatus(opts.setStatus);
runtime.log?.("slack socket mode connected");
},
});
if (!disconnect) {
break;
}
if (opts.abortSignal?.aborted) {
break;
}
publishSlackDisconnectedStatus(opts.setStatus, disconnect.error);
// Bail immediately on non-recoverable auth errors during reconnect too.
if (disconnect.error && isNonRecoverableSlackAuthError(disconnect.error)) {
runtime.error?.(
`slack socket mode disconnected due to non-recoverable auth error — skipping channel (${formatUnknownError(disconnect.error)})`,
);
throw disconnect.error instanceof Error
? disconnect.error
: new Error(formatUnknownError(disconnect.error));
}
reconnectAttempts += 1;
if (
SLACK_SOCKET_RECONNECT_POLICY.maxAttempts > 0 &&
reconnectAttempts >= SLACK_SOCKET_RECONNECT_POLICY.maxAttempts
) {
throw new Error(
`Slack socket mode reconnect max attempts reached (${reconnectAttempts}/${SLACK_SOCKET_RECONNECT_POLICY.maxAttempts}) after ${disconnect.event}`,
);
}
const delayMs = computeBackoff(SLACK_SOCKET_RECONNECT_POLICY, reconnectAttempts);
runtime.error?.(
`slack socket disconnected (${disconnect.event}). retry ${reconnectAttempts}/${SLACK_SOCKET_RECONNECT_POLICY.maxAttempts || "∞"} in ${Math.round(delayMs / 1000)}s${
disconnect.error ? ` (${formatUnknownError(disconnect.error)})` : ""
}`,
);
await gracefulStop();
try {
await sleepWithAbort(delayMs, opts.abortSignal);
} catch {
break;
}
} catch (err) {
// Auth errors (account_inactive, invalid_auth, etc.) are permanent —
// retrying will never succeed and blocks the entire gateway. Fail fast.
@@ -623,49 +736,6 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
}
continue;
}
if (opts.abortSignal?.aborted) {
break;
}
const disconnect = await waitForSlackSocketDisconnect(app, opts.abortSignal);
if (opts.abortSignal?.aborted) {
break;
}
publishSlackDisconnectedStatus(opts.setStatus, disconnect.error);
// Bail immediately on non-recoverable auth errors during reconnect too.
if (disconnect.error && isNonRecoverableSlackAuthError(disconnect.error)) {
runtime.error?.(
`slack socket mode disconnected due to non-recoverable auth error — skipping channel (${formatUnknownError(disconnect.error)})`,
);
throw disconnect.error instanceof Error
? disconnect.error
: new Error(formatUnknownError(disconnect.error));
}
reconnectAttempts += 1;
if (
SLACK_SOCKET_RECONNECT_POLICY.maxAttempts > 0 &&
reconnectAttempts >= SLACK_SOCKET_RECONNECT_POLICY.maxAttempts
) {
throw new Error(
`Slack socket mode reconnect max attempts reached (${reconnectAttempts}/${SLACK_SOCKET_RECONNECT_POLICY.maxAttempts}) after ${disconnect.event}`,
);
}
const delayMs = computeBackoff(SLACK_SOCKET_RECONNECT_POLICY, reconnectAttempts);
runtime.error?.(
`slack socket disconnected (${disconnect.event}). retry ${reconnectAttempts}/${SLACK_SOCKET_RECONNECT_POLICY.maxAttempts || "∞"} in ${Math.round(delayMs / 1000)}s${
disconnect.error ? ` (${formatUnknownError(disconnect.error)})` : ""
}`,
);
await gracefulStop();
try {
await sleepWithAbort(delayMs, opts.abortSignal);
} catch {
break;
}
}
} else {
runtime.log?.(`slack http mode listening at ${slackWebhookPath}`);
@@ -698,6 +768,9 @@ export const __testing = {
resolveSlackRuntimeGroupPolicy: resolveOpenProviderRuntimeGroupPolicy,
resolveDefaultGroupPolicy,
resolveSlackBoltInterop,
createSlackBoltApp,
createSlackSocketDisconnectWaiter,
startSlackSocketAndWaitForDisconnect,
getSocketEmitter,
waitForSlackSocketDisconnect,
};

View File

@@ -190,8 +190,12 @@ beforeEach(() => {
resetSlackSlashMocks();
});
async function registerCommands(ctx: unknown, account: unknown) {
await registerSlackMonitorSlashCommands({ ctx: ctx as never, account: account as never });
async function registerCommands(ctx: unknown, account: unknown, trackEvent?: () => void) {
await registerSlackMonitorSlashCommands({
ctx: ctx as never,
account: account as never,
trackEvent,
} as never);
}
function encodeValue(parts: { command: string; arg: string; value: string; userId: string }) {
@@ -569,6 +573,17 @@ describe("Slack native command argument menus", () => {
expect(call.ctx?.Body).toBe("/usage tokens");
});
it("tracks accepted slash command activity", async () => {
const trackingHarness = createArgMenusHarness();
const trackEvent = vi.fn();
await registerCommands(trackingHarness.ctx, trackingHarness.account, trackEvent);
const usageTrackingHandler = requireHandler(trackingHarness.commands, "/usage", "/usage");
await runCommandHandler(usageTrackingHandler);
expect(trackEvent).toHaveBeenCalledTimes(1);
});
it("maps /agentstatus to /status when dispatching", async () => {
await runCommandHandler(agentStatusHandler);
expectSingleDispatchedSlashBody("/status");
@@ -639,6 +654,36 @@ describe("Slack native command argument menus", () => {
expect(optionTexts.some((text) => text.includes("Period 12"))).toBe(true);
});
it("tracks accepted external_select option requests", async () => {
const trackingHarness = createArgMenusHarness();
const trackEvent = vi.fn();
await registerCommands(trackingHarness.ctx, trackingHarness.account, trackEvent);
const reportExternalTrackingHandler = requireHandler(
trackingHarness.commands,
"/reportexternal",
"/reportexternal",
);
const argMenuOptionsTrackingHandler = requireHandler(
trackingHarness.options,
"openclaw_cmdarg",
"arg-menu options",
);
const { blockId } = await runCommandAndResolveActionsBlock(reportExternalTrackingHandler);
const ackOptions = vi.fn().mockResolvedValue(undefined);
trackEvent.mockClear();
await argMenuOptionsTrackingHandler({
ack: ackOptions,
body: {
user: { id: "U1" },
value: "period 12",
actions: [{ block_id: blockId }],
},
});
expect(trackEvent).toHaveBeenCalledTimes(1);
});
it("rejects external_select option requests without user identity", async () => {
const { blockId } = await runCommandAndResolveActionsBlock(reportExternalHandler);
expect(blockId).toContain("openclaw_cmdarg_ext:");
@@ -672,6 +717,25 @@ describe("Slack native command argument menus", () => {
});
});
it("tracks accepted arg-menu actions", async () => {
const trackingHarness = createArgMenusHarness();
const trackEvent = vi.fn();
await registerCommands(trackingHarness.ctx, trackingHarness.account, trackEvent);
const argMenuTrackingHandler = requireHandler(
trackingHarness.actions,
/^openclaw_cmdarg/,
"arg-menu action",
);
await runArgMenuAction(argMenuTrackingHandler, {
action: {
value: encodeValue({ command: "usage", arg: "mode", value: "tokens", userId: "U1" }),
},
});
expect(trackEvent).toHaveBeenCalledTimes(1);
});
it("falls back to postEphemeral with token when respond is unavailable", async () => {
await runArgMenuAction(argMenuHandler, {
action: { value: "garbage" },

View File

@@ -282,8 +282,9 @@ function buildSlackCommandArgMenuBlocks(params: {
export async function registerSlackMonitorSlashCommands(params: {
ctx: SlackMonitorContext;
account: ResolvedSlackAccount;
trackEvent?: () => void;
}): Promise<void> {
const { ctx, account } = params;
const { ctx, account, trackEvent } = params;
const cfg = ctx.cfg;
const runtime = ctx.runtime;
@@ -313,6 +314,7 @@ export async function registerSlackMonitorSlashCommands(params: {
);
return;
}
trackEvent?.();
if (!prompt.trim()) {
await ack({
text: "Message required.",
@@ -768,6 +770,7 @@ export async function registerSlackMonitorSlashCommands(params: {
runtime.log?.("slack: drop slash arg options payload (mismatched app/team)");
return;
}
trackEvent?.();
const typedBody = body as {
value?: string;
user?: { id?: string };

View File

@@ -817,6 +817,135 @@ describe("gateway send mirroring", () => {
);
});
it("updates mirror session keys and delivery thread ids when Slack routing derives a thread", async () => {
mockDeliverySuccess("m-thread-derived");
mocks.resolveOutboundSessionRoute.mockResolvedValueOnce({
sessionKey: "agent:main:slack:channel:c1:thread:1710000000.9999",
baseSessionKey: "agent:main:slack:channel:c1",
peer: { kind: "channel", id: "c1" },
chatType: "channel",
from: "slack:channel:C1",
to: "channel:C1",
threadId: "1710000000.9999",
});
await runSend({
to: "channel:C1",
message: "threaded",
channel: "slack",
sessionKey: "agent:main:slack:channel:c1",
idempotencyKey: "idem-thread-derived",
});
expect(mocks.ensureOutboundSessionEntry).toHaveBeenCalledWith(
expect.objectContaining({
route: expect.objectContaining({
sessionKey: "agent:main:slack:channel:c1:thread:1710000000.9999",
baseSessionKey: "agent:main:slack:channel:c1",
threadId: "1710000000.9999",
}),
}),
);
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
expect.objectContaining({
threadId: "1710000000.9999",
mirror: expect.objectContaining({
sessionKey: "agent:main:slack:channel:c1:thread:1710000000.9999",
}),
}),
);
});
it("preserves the provided session when Slack derives a thread for a different base session", async () => {
mockDeliverySuccess("m-thread-mismatch");
mocks.resolveOutboundSessionRoute.mockResolvedValueOnce({
sessionKey: "agent:main:slack:channel:c2:thread:1710000000.9999",
baseSessionKey: "agent:main:slack:channel:c2",
peer: { kind: "channel", id: "c2" },
chatType: "channel",
from: "slack:channel:C2",
to: "channel:C2",
threadId: "1710000000.9999",
});
await runSend({
to: "channel:C2",
message: "threaded",
channel: "slack",
sessionKey: "agent:main:slack:channel:c1",
threadId: "1710000000.9999",
idempotencyKey: "idem-thread-mismatch",
});
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
expect.objectContaining({
threadId: "1710000000.9999",
session: expect.objectContaining({
key: "agent:main:slack:channel:c1",
}),
mirror: expect.objectContaining({
sessionKey: "agent:main:slack:channel:c1",
}),
}),
);
});
it("preserves derived thread delivery for existing thread-scoped Slack session keys", async () => {
mockDeliverySuccess("m-thread-session");
mocks.resolveOutboundSessionRoute.mockResolvedValueOnce({
sessionKey: "agent:main:slack:channel:c1:thread:1710000000.9999",
baseSessionKey: "agent:main:slack:channel:c1",
peer: { kind: "channel", id: "c1" },
chatType: "channel",
from: "slack:channel:C1",
to: "channel:C1",
threadId: "1710000000.9999",
});
await runSend({
to: "channel:C1",
message: "threaded",
channel: "slack",
sessionKey: "agent:main:slack:channel:c1:thread:1710000000.9999",
idempotencyKey: "idem-thread-session",
});
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
expect.objectContaining({
threadId: "1710000000.9999",
session: expect.objectContaining({
key: "agent:main:slack:channel:c1:thread:1710000000.9999",
}),
}),
);
});
it("preserves numeric derived thread ids for non-Slack channels", async () => {
mockDeliverySuccess("m-topic-derived");
mocks.resolveOutboundSessionRoute.mockResolvedValueOnce({
sessionKey: "agent:main:telegram:group:-100123:thread:77",
baseSessionKey: "agent:main:telegram:group:-100123",
peer: { kind: "group", id: "-100123" },
chatType: "group",
from: "telegram:group:-100123",
to: "channel:-100123",
threadId: 77,
});
await runSend({
to: "-100123:topic:77",
message: "topic message",
channel: "telegram",
idempotencyKey: "idem-topic-derived",
});
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
expect.objectContaining({
threadId: 77,
}),
);
});
it("returns invalid request when outbound target resolution fails", async () => {
mocks.resolveOutboundTarget.mockReturnValue({
ok: false,

View File

@@ -21,6 +21,7 @@ import { maybeResolveIdLikeTarget } from "../../infra/outbound/target-resolver.j
import { resolveOutboundTarget } from "../../infra/outbound/targets.js";
import { extractToolPayload } from "../../infra/outbound/tool-payload.js";
import { normalizePollInput } from "../../polls.js";
import { parseThreadSessionSuffix } from "../../sessions/session-key-utils.js";
import {
normalizeOptionalLowercaseString,
normalizeOptionalString,
@@ -486,13 +487,27 @@ export const sendHandlers: GatewayRequestHandlers = {
resolvedTarget: idLikeTarget,
threadId,
});
const providedSessionBaseKey =
parseThreadSessionSuffix(providedSessionKey).baseSessionKey ?? providedSessionKey;
const shouldUseDerivedThreadSessionKey =
channel === "slack" &&
!!providedSessionKey &&
!!normalizeOptionalString(derivedRoute?.threadId) &&
normalizeOptionalLowercaseString(derivedRoute?.baseSessionKey) ===
normalizeOptionalLowercaseString(providedSessionBaseKey) &&
normalizeOptionalLowercaseString(derivedRoute?.sessionKey) !== providedSessionKey;
const outboundRoute = derivedRoute
? providedSessionKey
? {
...derivedRoute,
sessionKey: providedSessionKey,
baseSessionKey: providedSessionKey,
}
? shouldUseDerivedThreadSessionKey
? {
...derivedRoute,
baseSessionKey: derivedRoute.baseSessionKey ?? providedSessionKey,
}
: {
...derivedRoute,
sessionKey: providedSessionKey,
baseSessionKey: providedSessionKey,
}
: derivedRoute
: null;
if (outboundRoute) {
@@ -517,7 +532,7 @@ export const sendHandlers: GatewayRequestHandlers = {
payloads: outboundPayloads,
session: outboundSession,
gifPlayback: request.gifPlayback,
threadId: threadId ?? null,
threadId: outboundRoute?.threadId ?? threadId ?? null,
deps: outboundDeps,
gatewayClientScopes: client?.connect?.scopes ?? [],
mirror: outboundSessionKey

View File

@@ -99,6 +99,7 @@ function createComputedStatusAdapter() {
{ ok: boolean }
>({
defaultRuntime: createDefaultChannelRuntimeState("default"),
skipStaleSocketHealthCheck: true,
resolveAccountSnapshot: ({ account, runtime, probe }) => ({
accountId: account.accountId,
enabled: account.enabled,
@@ -118,6 +119,7 @@ function createAsyncStatusAdapter() {
{ ok: boolean }
>({
defaultRuntime: createDefaultChannelRuntimeState("default"),
skipStaleSocketHealthCheck: true,
resolveAccountSnapshot: async ({ account, runtime, probe }) => ({
accountId: account.accountId,
enabled: account.enabled,
@@ -283,6 +285,7 @@ describe("computed account status adapters", () => {
"builds account snapshots from $name computed account metadata and extras",
async ({ createStatus }) => {
const status = createStatus();
expect(status.skipStaleSocketHealthCheck).toBe(true);
await expect(
Promise.resolve(
status.buildAccountSnapshot?.({

View File

@@ -68,6 +68,7 @@ function buildComputedAccountStatusAdapterBase<ResolvedAccount, Probe, Audit>(
): Omit<ChannelStatusAdapter<ResolvedAccount, Probe, Audit>, "buildAccountSnapshot"> {
return {
defaultRuntime: options.defaultRuntime,
skipStaleSocketHealthCheck: options.skipStaleSocketHealthCheck,
buildChannelSummary: options.buildChannelSummary,
probeAccount: options.probeAccount,
formatCapabilitiesProbe: options.formatCapabilitiesProbe,