diff --git a/extensions/slack/src/channel.ts b/extensions/slack/src/channel.ts index 2f6555cf6ca..0720a71a338 100644 --- a/extensions/slack/src/channel.ts +++ b/extensions/slack/src/channel.ts @@ -427,6 +427,8 @@ export const slackPlugin: ChannelPlugin = { abortSignal: ctx.abortSignal, mediaMaxMb: account.config.mediaMaxMb, slashCommand: account.config.slashCommand, + setStatus: ctx.setStatus as (next: Record) => void, + getStatus: ctx.getStatus as () => Record, }); }, }, diff --git a/src/gateway/channel-health-monitor.test.ts b/src/gateway/channel-health-monitor.test.ts index 6a919efa653..22f1e565f8c 100644 --- a/src/gateway/channel-health-monitor.test.ts +++ b/src/gateway/channel-health-monitor.test.ts @@ -306,4 +306,110 @@ describe("channel-health-monitor", () => { expect(manager.stopChannel).not.toHaveBeenCalled(); monitor.stop(); }); + + describe("stale socket detection", () => { + const STALE_THRESHOLD = 30 * 60_000; + + it("restarts a channel with no events past the stale threshold", async () => { + const now = Date.now(); + const manager = createSnapshotManager({ + slack: { + default: { + running: true, + connected: true, + enabled: true, + configured: true, + lastStartAt: now - STALE_THRESHOLD - 60_000, + lastEventAt: now - STALE_THRESHOLD - 30_000, + }, + }, + }); + const monitor = await startAndRunCheck(manager); + expect(manager.stopChannel).toHaveBeenCalledWith("slack", "default"); + expect(manager.startChannel).toHaveBeenCalledWith("slack", "default"); + monitor.stop(); + }); + + it("skips channels with recent events", async () => { + const now = Date.now(); + const manager = createSnapshotManager({ + slack: { + default: { + running: true, + connected: true, + enabled: true, + configured: true, + lastStartAt: now - STALE_THRESHOLD - 60_000, + lastEventAt: now - 5_000, + }, + }, + }); + const monitor = await startAndRunCheck(manager); + expect(manager.stopChannel).not.toHaveBeenCalled(); + expect(manager.startChannel).not.toHaveBeenCalled(); + monitor.stop(); + }); + + it("skips channels still within the startup grace window for stale detection", async () => { + const now = Date.now(); + const manager = createSnapshotManager({ + slack: { + default: { + running: true, + connected: true, + enabled: true, + configured: true, + lastStartAt: now - 5_000, + lastEventAt: null, + }, + }, + }); + const monitor = await startAndRunCheck(manager); + expect(manager.stopChannel).not.toHaveBeenCalled(); + expect(manager.startChannel).not.toHaveBeenCalled(); + monitor.stop(); + }); + + it("restarts a channel that never received any event past the stale threshold", async () => { + const now = Date.now(); + const manager = createSnapshotManager({ + slack: { + default: { + running: true, + connected: true, + enabled: true, + configured: true, + lastStartAt: now - STALE_THRESHOLD - 60_000, + }, + }, + }); + const monitor = await startAndRunCheck(manager); + expect(manager.stopChannel).toHaveBeenCalledWith("slack", "default"); + expect(manager.startChannel).toHaveBeenCalledWith("slack", "default"); + monitor.stop(); + }); + + it("respects custom staleEventThresholdMs", async () => { + const customThreshold = 10 * 60_000; + const now = Date.now(); + const manager = createSnapshotManager({ + slack: { + default: { + running: true, + connected: true, + enabled: true, + configured: true, + lastStartAt: now - customThreshold - 60_000, + lastEventAt: now - customThreshold - 30_000, + }, + }, + }); + const monitor = await startAndRunCheck(manager, { + staleEventThresholdMs: customThreshold, + }); + expect(manager.stopChannel).toHaveBeenCalledWith("slack", "default"); + expect(manager.startChannel).toHaveBeenCalledWith("slack", "default"); + monitor.stop(); + }); + }); }); diff --git a/src/gateway/channel-health-monitor.ts b/src/gateway/channel-health-monitor.ts index 980f652ea39..5d841f73688 100644 --- a/src/gateway/channel-health-monitor.ts +++ b/src/gateway/channel-health-monitor.ts @@ -10,12 +10,21 @@ const DEFAULT_COOLDOWN_CYCLES = 2; const DEFAULT_MAX_RESTARTS_PER_HOUR = 3; const ONE_HOUR_MS = 60 * 60_000; +/** + * How long a connected channel can go without receiving any event before + * the health monitor treats it as a "stale socket" and triggers a restart. + * This catches the half-dead WebSocket scenario where the connection appears + * alive (health checks pass) but Slack silently stops delivering events. + */ +const DEFAULT_STALE_EVENT_THRESHOLD_MS = 30 * 60_000; + export type ChannelHealthMonitorDeps = { channelManager: ChannelManager; checkIntervalMs?: number; startupGraceMs?: number; cooldownCycles?: number; maxRestartsPerHour?: number; + staleEventThresholdMs?: number; abortSignal?: AbortSignal; }; @@ -32,12 +41,17 @@ function isManagedAccount(snapshot: { enabled?: boolean; configured?: boolean }) return snapshot.enabled !== false && snapshot.configured !== false; } -function isChannelHealthy(snapshot: { - running?: boolean; - connected?: boolean; - enabled?: boolean; - configured?: boolean; -}): boolean { +function isChannelHealthy( + snapshot: { + running?: boolean; + connected?: boolean; + enabled?: boolean; + configured?: boolean; + lastEventAt?: number | null; + lastStartAt?: number | null; + }, + opts: { now: number; staleEventThresholdMs: number }, +): boolean { if (!isManagedAccount(snapshot)) { return true; } @@ -47,6 +61,22 @@ function isChannelHealthy(snapshot: { if (snapshot.connected === false) { return false; } + + // Stale socket detection: if the channel has been running long enough + // (past the stale threshold) and we have never received an event, or the + // last event was received longer ago than the threshold, treat as unhealthy. + if (snapshot.lastEventAt != null || snapshot.lastStartAt != null) { + const upSince = snapshot.lastStartAt ?? 0; + const upDuration = opts.now - upSince; + if (upDuration > opts.staleEventThresholdMs) { + const lastEvent = snapshot.lastEventAt ?? 0; + const eventAge = opts.now - lastEvent; + if (eventAge > opts.staleEventThresholdMs) { + return false; + } + } + } + return true; } @@ -57,6 +87,7 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann startupGraceMs = DEFAULT_STARTUP_GRACE_MS, cooldownCycles = DEFAULT_COOLDOWN_CYCLES, maxRestartsPerHour = DEFAULT_MAX_RESTARTS_PER_HOUR, + staleEventThresholdMs = DEFAULT_STALE_EVENT_THRESHOLD_MS, abortSignal, } = deps; @@ -101,7 +132,7 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann if (channelManager.isManuallyStopped(channelId as ChannelId, accountId)) { continue; } - if (isChannelHealthy(status)) { + if (isChannelHealthy(status, { now, staleEventThresholdMs })) { continue; } @@ -123,11 +154,19 @@ export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): Chann continue; } + const isStaleSocket = + status.running && + status.connected !== false && + status.lastEventAt != null && + now - (status.lastEventAt ?? 0) > staleEventThresholdMs; + const reason = !status.running ? status.reconnectAttempts && status.reconnectAttempts >= 10 ? "gave-up" : "stopped" - : "stuck"; + : isStaleSocket + ? "stale-socket" + : "stuck"; log.info?.(`[${channelId}:${accountId}] health-monitor: restarting (reason: ${reason})`); diff --git a/src/slack/monitor/events.ts b/src/slack/monitor/events.ts index 851028e6461..778ca9d83ca 100644 --- a/src/slack/monitor/events.ts +++ b/src/slack/monitor/events.ts @@ -12,14 +12,16 @@ export function registerSlackMonitorEvents(params: { ctx: SlackMonitorContext; account: ResolvedSlackAccount; handleSlackMessage: SlackMessageHandler; + /** Called on each inbound event to update liveness tracking. */ + trackEvent?: () => void; }) { registerSlackMessageEvents({ ctx: params.ctx, handleSlackMessage: params.handleSlackMessage, }); - registerSlackReactionEvents({ ctx: params.ctx }); - registerSlackMemberEvents({ ctx: params.ctx }); - registerSlackChannelEvents({ ctx: params.ctx }); - registerSlackPinEvents({ ctx: params.ctx }); + registerSlackReactionEvents({ ctx: params.ctx, trackEvent: params.trackEvent }); + registerSlackMemberEvents({ ctx: params.ctx, trackEvent: params.trackEvent }); + registerSlackChannelEvents({ ctx: params.ctx, trackEvent: params.trackEvent }); + registerSlackPinEvents({ ctx: params.ctx, trackEvent: params.trackEvent }); registerSlackInteractionEvents({ ctx: params.ctx }); } diff --git a/src/slack/monitor/events/channels.test.ts b/src/slack/monitor/events/channels.test.ts new file mode 100644 index 00000000000..1c4bec094d2 --- /dev/null +++ b/src/slack/monitor/events/channels.test.ts @@ -0,0 +1,67 @@ +import { describe, expect, it, vi } from "vitest"; +import { registerSlackChannelEvents } from "./channels.js"; +import { createSlackSystemEventTestHarness } from "./system-event-test-harness.js"; + +const enqueueSystemEventMock = vi.fn(); + +vi.mock("../../../infra/system-events.js", () => ({ + enqueueSystemEvent: (...args: unknown[]) => enqueueSystemEventMock(...args), +})); + +type SlackChannelHandler = (args: { + event: Record; + body: unknown; +}) => Promise; + +function createChannelContext(params?: { + trackEvent?: () => void; + shouldDropMismatchedSlackEvent?: (body: unknown) => boolean; +}) { + const harness = createSlackSystemEventTestHarness(); + if (params?.shouldDropMismatchedSlackEvent) { + harness.ctx.shouldDropMismatchedSlackEvent = params.shouldDropMismatchedSlackEvent; + } + registerSlackChannelEvents({ ctx: harness.ctx, trackEvent: params?.trackEvent }); + return { + getCreatedHandler: () => harness.getHandler("channel_created") as SlackChannelHandler | null, + }; +} + +describe("registerSlackChannelEvents", () => { + it("does not track mismatched events", async () => { + const trackEvent = vi.fn(); + const { getCreatedHandler } = createChannelContext({ + trackEvent, + shouldDropMismatchedSlackEvent: () => true, + }); + const createdHandler = getCreatedHandler(); + expect(createdHandler).toBeTruthy(); + + await createdHandler!({ + event: { + channel: { id: "C1", name: "general" }, + }, + body: { api_app_id: "A_OTHER" }, + }); + + expect(trackEvent).not.toHaveBeenCalled(); + expect(enqueueSystemEventMock).not.toHaveBeenCalled(); + }); + + it("tracks accepted events", async () => { + const trackEvent = vi.fn(); + const { getCreatedHandler } = createChannelContext({ trackEvent }); + const createdHandler = getCreatedHandler(); + expect(createdHandler).toBeTruthy(); + + await createdHandler!({ + event: { + channel: { id: "C1", name: "general" }, + }, + body: {}, + }); + + expect(trackEvent).toHaveBeenCalledTimes(1); + expect(enqueueSystemEventMock).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/slack/monitor/events/channels.ts b/src/slack/monitor/events/channels.ts index 962f2655b77..3241eda41fd 100644 --- a/src/slack/monitor/events/channels.ts +++ b/src/slack/monitor/events/channels.ts @@ -12,8 +12,11 @@ import type { SlackChannelRenamedEvent, } from "../types.js"; -export function registerSlackChannelEvents(params: { ctx: SlackMonitorContext }) { - const { ctx } = params; +export function registerSlackChannelEvents(params: { + ctx: SlackMonitorContext; + trackEvent?: () => void; +}) { + const { ctx, trackEvent } = params; const enqueueChannelSystemEvent = (params: { kind: "created" | "renamed"; @@ -51,6 +54,7 @@ export function registerSlackChannelEvents(params: { ctx: SlackMonitorContext }) if (ctx.shouldDropMismatchedSlackEvent(body)) { return; } + trackEvent?.(); const payload = event as SlackChannelCreatedEvent; const channelId = payload.channel?.id; @@ -69,6 +73,7 @@ export function registerSlackChannelEvents(params: { ctx: SlackMonitorContext }) if (ctx.shouldDropMismatchedSlackEvent(body)) { return; } + trackEvent?.(); const payload = event as SlackChannelRenamedEvent; const channelId = payload.channel?.id; @@ -87,6 +92,7 @@ export function registerSlackChannelEvents(params: { ctx: SlackMonitorContext }) if (ctx.shouldDropMismatchedSlackEvent(body)) { return; } + trackEvent?.(); const payload = event as SlackChannelIdChangedEvent; const oldChannelId = payload.old_channel_id; diff --git a/src/slack/monitor/events/members.test.ts b/src/slack/monitor/events/members.test.ts index bc9c6805aaa..d476a492e6e 100644 --- a/src/slack/monitor/events/members.test.ts +++ b/src/slack/monitor/events/members.test.ts @@ -21,9 +21,16 @@ type SlackMemberHandler = (args: { body: unknown; }) => Promise; -function createMembersContext(overrides?: SlackSystemEventTestOverrides) { - const harness = createSlackSystemEventTestHarness(overrides); - registerSlackMemberEvents({ ctx: harness.ctx }); +function createMembersContext(params?: { + overrides?: SlackSystemEventTestOverrides; + trackEvent?: () => void; + shouldDropMismatchedSlackEvent?: (body: unknown) => boolean; +}) { + const harness = createSlackSystemEventTestHarness(params?.overrides); + if (params?.shouldDropMismatchedSlackEvent) { + harness.ctx.shouldDropMismatchedSlackEvent = params.shouldDropMismatchedSlackEvent; + } + registerSlackMemberEvents({ ctx: harness.ctx, trackEvent: params?.trackEvent }); return { getJoinedHandler: () => harness.getHandler("member_joined_channel") as SlackMemberHandler | null, @@ -44,7 +51,7 @@ describe("registerSlackMemberEvents", () => { it("enqueues DM member events when dmPolicy is open", async () => { enqueueSystemEventMock.mockClear(); readAllowFromStoreMock.mockReset().mockResolvedValue([]); - const { getJoinedHandler } = createMembersContext({ dmPolicy: "open" }); + const { getJoinedHandler } = createMembersContext({ overrides: { dmPolicy: "open" } }); const joinedHandler = getJoinedHandler(); expect(joinedHandler).toBeTruthy(); @@ -59,7 +66,7 @@ describe("registerSlackMemberEvents", () => { it("blocks DM member events when dmPolicy is disabled", async () => { enqueueSystemEventMock.mockClear(); readAllowFromStoreMock.mockReset().mockResolvedValue([]); - const { getJoinedHandler } = createMembersContext({ dmPolicy: "disabled" }); + const { getJoinedHandler } = createMembersContext({ overrides: { dmPolicy: "disabled" } }); const joinedHandler = getJoinedHandler(); expect(joinedHandler).toBeTruthy(); @@ -75,8 +82,7 @@ describe("registerSlackMemberEvents", () => { enqueueSystemEventMock.mockClear(); readAllowFromStoreMock.mockReset().mockResolvedValue([]); const { getJoinedHandler } = createMembersContext({ - dmPolicy: "allowlist", - allowFrom: ["U2"], + overrides: { dmPolicy: "allowlist", allowFrom: ["U2"] }, }); const joinedHandler = getJoinedHandler(); expect(joinedHandler).toBeTruthy(); @@ -93,8 +99,7 @@ describe("registerSlackMemberEvents", () => { enqueueSystemEventMock.mockClear(); readAllowFromStoreMock.mockReset().mockResolvedValue([]); const { getLeftHandler } = createMembersContext({ - dmPolicy: "allowlist", - allowFrom: ["U1"], + overrides: { dmPolicy: "allowlist", allowFrom: ["U1"] }, }); const leftHandler = getLeftHandler(); expect(leftHandler).toBeTruthy(); @@ -114,9 +119,11 @@ describe("registerSlackMemberEvents", () => { enqueueSystemEventMock.mockClear(); readAllowFromStoreMock.mockReset().mockResolvedValue([]); const { getJoinedHandler } = createMembersContext({ - dmPolicy: "open", - channelType: "channel", - channelUsers: ["U_OWNER"], + overrides: { + dmPolicy: "open", + channelType: "channel", + channelUsers: ["U_OWNER"], + }, }); const joinedHandler = getJoinedHandler(); expect(joinedHandler).toBeTruthy(); @@ -128,4 +135,35 @@ describe("registerSlackMemberEvents", () => { expect(enqueueSystemEventMock).not.toHaveBeenCalled(); }); + + it("does not track mismatched events", async () => { + const trackEvent = vi.fn(); + const { getJoinedHandler } = createMembersContext({ + trackEvent, + shouldDropMismatchedSlackEvent: () => true, + }); + const joinedHandler = getJoinedHandler(); + expect(joinedHandler).toBeTruthy(); + + await joinedHandler!({ + event: makeMemberEvent(), + body: { api_app_id: "A_OTHER" }, + }); + + expect(trackEvent).not.toHaveBeenCalled(); + }); + + it("tracks accepted member events", async () => { + const trackEvent = vi.fn(); + const { getJoinedHandler } = createMembersContext({ trackEvent }); + const joinedHandler = getJoinedHandler(); + expect(joinedHandler).toBeTruthy(); + + await joinedHandler!({ + event: makeMemberEvent(), + body: {}, + }); + + expect(trackEvent).toHaveBeenCalledTimes(1); + }); }); diff --git a/src/slack/monitor/events/members.ts b/src/slack/monitor/events/members.ts index ca7907706d2..27dd2968a66 100644 --- a/src/slack/monitor/events/members.ts +++ b/src/slack/monitor/events/members.ts @@ -5,8 +5,11 @@ import type { SlackMonitorContext } from "../context.js"; import type { SlackMemberChannelEvent } from "../types.js"; import { authorizeAndResolveSlackSystemEventContext } from "./system-event-context.js"; -export function registerSlackMemberEvents(params: { ctx: SlackMonitorContext }) { - const { ctx } = params; +export function registerSlackMemberEvents(params: { + ctx: SlackMonitorContext; + trackEvent?: () => void; +}) { + const { ctx, trackEvent } = params; const handleMemberChannelEvent = async (params: { verb: "joined" | "left"; @@ -17,6 +20,7 @@ export function registerSlackMemberEvents(params: { ctx: SlackMonitorContext }) if (ctx.shouldDropMismatchedSlackEvent(params.body)) { return; } + trackEvent?.(); const payload = params.event; const channelId = payload.channel; const channelInfo = channelId ? await ctx.resolveChannelName(channelId) : {}; diff --git a/src/slack/monitor/events/pins.test.ts b/src/slack/monitor/events/pins.test.ts index 00c2528bbdb..17b5e50d62e 100644 --- a/src/slack/monitor/events/pins.test.ts +++ b/src/slack/monitor/events/pins.test.ts @@ -18,9 +18,16 @@ vi.mock("../../../pairing/pairing-store.js", () => ({ type SlackPinHandler = (args: { event: Record; body: unknown }) => Promise; -function createPinContext(overrides?: SlackSystemEventTestOverrides) { - const harness = createSlackSystemEventTestHarness(overrides); - registerSlackPinEvents({ ctx: harness.ctx }); +function createPinContext(params?: { + overrides?: SlackSystemEventTestOverrides; + trackEvent?: () => void; + shouldDropMismatchedSlackEvent?: (body: unknown) => boolean; +}) { + const harness = createSlackSystemEventTestHarness(params?.overrides); + if (params?.shouldDropMismatchedSlackEvent) { + harness.ctx.shouldDropMismatchedSlackEvent = params.shouldDropMismatchedSlackEvent; + } + registerSlackPinEvents({ ctx: harness.ctx, trackEvent: params?.trackEvent }); return { getAddedHandler: () => harness.getHandler("pin_added") as SlackPinHandler | null, getRemovedHandler: () => harness.getHandler("pin_removed") as SlackPinHandler | null, @@ -46,7 +53,7 @@ describe("registerSlackPinEvents", () => { it("enqueues DM pin system events when dmPolicy is open", async () => { enqueueSystemEventMock.mockClear(); readAllowFromStoreMock.mockReset().mockResolvedValue([]); - const { getAddedHandler } = createPinContext({ dmPolicy: "open" }); + const { getAddedHandler } = createPinContext({ overrides: { dmPolicy: "open" } }); const addedHandler = getAddedHandler(); expect(addedHandler).toBeTruthy(); @@ -61,7 +68,7 @@ describe("registerSlackPinEvents", () => { it("blocks DM pin system events when dmPolicy is disabled", async () => { enqueueSystemEventMock.mockClear(); readAllowFromStoreMock.mockReset().mockResolvedValue([]); - const { getAddedHandler } = createPinContext({ dmPolicy: "disabled" }); + const { getAddedHandler } = createPinContext({ overrides: { dmPolicy: "disabled" } }); const addedHandler = getAddedHandler(); expect(addedHandler).toBeTruthy(); @@ -77,8 +84,7 @@ describe("registerSlackPinEvents", () => { enqueueSystemEventMock.mockClear(); readAllowFromStoreMock.mockReset().mockResolvedValue([]); const { getAddedHandler } = createPinContext({ - dmPolicy: "allowlist", - allowFrom: ["U2"], + overrides: { dmPolicy: "allowlist", allowFrom: ["U2"] }, }); const addedHandler = getAddedHandler(); expect(addedHandler).toBeTruthy(); @@ -95,8 +101,7 @@ describe("registerSlackPinEvents", () => { enqueueSystemEventMock.mockClear(); readAllowFromStoreMock.mockReset().mockResolvedValue([]); const { getAddedHandler } = createPinContext({ - dmPolicy: "allowlist", - allowFrom: ["U1"], + overrides: { dmPolicy: "allowlist", allowFrom: ["U1"] }, }); const addedHandler = getAddedHandler(); expect(addedHandler).toBeTruthy(); @@ -113,9 +118,11 @@ describe("registerSlackPinEvents", () => { enqueueSystemEventMock.mockClear(); readAllowFromStoreMock.mockReset().mockResolvedValue([]); const { getAddedHandler } = createPinContext({ - dmPolicy: "open", - channelType: "channel", - channelUsers: ["U_OWNER"], + overrides: { + dmPolicy: "open", + channelType: "channel", + channelUsers: ["U_OWNER"], + }, }); const addedHandler = getAddedHandler(); expect(addedHandler).toBeTruthy(); @@ -127,4 +134,35 @@ describe("registerSlackPinEvents", () => { expect(enqueueSystemEventMock).not.toHaveBeenCalled(); }); + + it("does not track mismatched events", async () => { + const trackEvent = vi.fn(); + const { getAddedHandler } = createPinContext({ + trackEvent, + shouldDropMismatchedSlackEvent: () => true, + }); + const addedHandler = getAddedHandler(); + expect(addedHandler).toBeTruthy(); + + await addedHandler!({ + event: makePinEvent(), + body: { api_app_id: "A_OTHER" }, + }); + + expect(trackEvent).not.toHaveBeenCalled(); + }); + + it("tracks accepted pin events", async () => { + const trackEvent = vi.fn(); + const { getAddedHandler } = createPinContext({ trackEvent }); + const addedHandler = getAddedHandler(); + expect(addedHandler).toBeTruthy(); + + await addedHandler!({ + event: makePinEvent(), + body: {}, + }); + + expect(trackEvent).toHaveBeenCalledTimes(1); + }); }); diff --git a/src/slack/monitor/events/pins.ts b/src/slack/monitor/events/pins.ts index 9a63aa4a972..e3d076d8d7f 100644 --- a/src/slack/monitor/events/pins.ts +++ b/src/slack/monitor/events/pins.ts @@ -7,18 +7,20 @@ import { authorizeAndResolveSlackSystemEventContext } from "./system-event-conte async function handleSlackPinEvent(params: { ctx: SlackMonitorContext; + trackEvent?: () => void; body: unknown; event: unknown; action: "pinned" | "unpinned"; contextKeySuffix: "added" | "removed"; errorLabel: string; }): Promise { - const { ctx, body, event, action, contextKeySuffix, errorLabel } = params; + const { ctx, trackEvent, body, event, action, contextKeySuffix, errorLabel } = params; try { if (ctx.shouldDropMismatchedSlackEvent(body)) { return; } + trackEvent?.(); const payload = event as SlackPinEvent; const channelId = payload.channel_id; @@ -47,12 +49,16 @@ async function handleSlackPinEvent(params: { } } -export function registerSlackPinEvents(params: { ctx: SlackMonitorContext }) { - const { ctx } = params; +export function registerSlackPinEvents(params: { + ctx: SlackMonitorContext; + trackEvent?: () => void; +}) { + const { ctx, trackEvent } = params; ctx.app.event("pin_added", async ({ event, body }: SlackEventMiddlewareArgs<"pin_added">) => { await handleSlackPinEvent({ ctx, + trackEvent, body, event, action: "pinned", @@ -64,6 +70,7 @@ export function registerSlackPinEvents(params: { ctx: SlackMonitorContext }) { ctx.app.event("pin_removed", async ({ event, body }: SlackEventMiddlewareArgs<"pin_removed">) => { await handleSlackPinEvent({ ctx, + trackEvent, body, event, action: "unpinned", diff --git a/src/slack/monitor/events/reactions.test.ts b/src/slack/monitor/events/reactions.test.ts index e95a1ec5a8c..84269c73e5d 100644 --- a/src/slack/monitor/events/reactions.test.ts +++ b/src/slack/monitor/events/reactions.test.ts @@ -21,9 +21,16 @@ type SlackReactionHandler = (args: { body: unknown; }) => Promise; -function createReactionContext(overrides?: SlackSystemEventTestOverrides) { - const harness = createSlackSystemEventTestHarness(overrides); - registerSlackReactionEvents({ ctx: harness.ctx }); +function createReactionContext(params?: { + overrides?: SlackSystemEventTestOverrides; + trackEvent?: () => void; + shouldDropMismatchedSlackEvent?: (body: unknown) => boolean; +}) { + const harness = createSlackSystemEventTestHarness(params?.overrides); + if (params?.shouldDropMismatchedSlackEvent) { + harness.ctx.shouldDropMismatchedSlackEvent = params.shouldDropMismatchedSlackEvent; + } + registerSlackReactionEvents({ ctx: harness.ctx, trackEvent: params?.trackEvent }); return { getAddedHandler: () => harness.getHandler("reaction_added") as SlackReactionHandler | null, getRemovedHandler: () => harness.getHandler("reaction_removed") as SlackReactionHandler | null, @@ -48,7 +55,7 @@ describe("registerSlackReactionEvents", () => { it("enqueues DM reaction system events when dmPolicy is open", async () => { enqueueSystemEventMock.mockClear(); readAllowFromStoreMock.mockReset().mockResolvedValue([]); - const { getAddedHandler } = createReactionContext({ dmPolicy: "open" }); + const { getAddedHandler } = createReactionContext({ overrides: { dmPolicy: "open" } }); const addedHandler = getAddedHandler(); expect(addedHandler).toBeTruthy(); @@ -63,7 +70,7 @@ describe("registerSlackReactionEvents", () => { it("blocks DM reaction system events when dmPolicy is disabled", async () => { enqueueSystemEventMock.mockClear(); readAllowFromStoreMock.mockReset().mockResolvedValue([]); - const { getAddedHandler } = createReactionContext({ dmPolicy: "disabled" }); + const { getAddedHandler } = createReactionContext({ overrides: { dmPolicy: "disabled" } }); const addedHandler = getAddedHandler(); expect(addedHandler).toBeTruthy(); @@ -79,8 +86,7 @@ describe("registerSlackReactionEvents", () => { enqueueSystemEventMock.mockClear(); readAllowFromStoreMock.mockReset().mockResolvedValue([]); const { getAddedHandler } = createReactionContext({ - dmPolicy: "allowlist", - allowFrom: ["U2"], + overrides: { dmPolicy: "allowlist", allowFrom: ["U2"] }, }); const addedHandler = getAddedHandler(); expect(addedHandler).toBeTruthy(); @@ -97,8 +103,7 @@ describe("registerSlackReactionEvents", () => { enqueueSystemEventMock.mockClear(); readAllowFromStoreMock.mockReset().mockResolvedValue([]); const { getAddedHandler } = createReactionContext({ - dmPolicy: "allowlist", - allowFrom: ["U1"], + overrides: { dmPolicy: "allowlist", allowFrom: ["U1"] }, }); const addedHandler = getAddedHandler(); expect(addedHandler).toBeTruthy(); @@ -115,8 +120,7 @@ describe("registerSlackReactionEvents", () => { enqueueSystemEventMock.mockClear(); readAllowFromStoreMock.mockReset().mockResolvedValue([]); const { getRemovedHandler } = createReactionContext({ - dmPolicy: "disabled", - channelType: "channel", + overrides: { dmPolicy: "disabled", channelType: "channel" }, }); const removedHandler = getRemovedHandler(); expect(removedHandler).toBeTruthy(); @@ -136,9 +140,11 @@ describe("registerSlackReactionEvents", () => { enqueueSystemEventMock.mockClear(); readAllowFromStoreMock.mockReset().mockResolvedValue([]); const { getAddedHandler } = createReactionContext({ - dmPolicy: "open", - channelType: "channel", - channelUsers: ["U_OWNER"], + overrides: { + dmPolicy: "open", + channelType: "channel", + channelUsers: ["U_OWNER"], + }, }); const addedHandler = getAddedHandler(); expect(addedHandler).toBeTruthy(); @@ -150,4 +156,35 @@ describe("registerSlackReactionEvents", () => { expect(enqueueSystemEventMock).not.toHaveBeenCalled(); }); + + it("does not track mismatched events", async () => { + const trackEvent = vi.fn(); + const { getAddedHandler } = createReactionContext({ + trackEvent, + shouldDropMismatchedSlackEvent: () => true, + }); + const addedHandler = getAddedHandler(); + expect(addedHandler).toBeTruthy(); + + await addedHandler!({ + event: makeReactionEvent(), + body: { api_app_id: "A_OTHER" }, + }); + + expect(trackEvent).not.toHaveBeenCalled(); + }); + + it("tracks accepted message reactions", async () => { + const trackEvent = vi.fn(); + const { getAddedHandler } = createReactionContext({ trackEvent }); + const addedHandler = getAddedHandler(); + expect(addedHandler).toBeTruthy(); + + await addedHandler!({ + event: makeReactionEvent(), + body: {}, + }); + + expect(trackEvent).toHaveBeenCalledTimes(1); + }); }); diff --git a/src/slack/monitor/events/reactions.ts b/src/slack/monitor/events/reactions.ts index 07dcf0f8be3..b3633ce33d3 100644 --- a/src/slack/monitor/events/reactions.ts +++ b/src/slack/monitor/events/reactions.ts @@ -5,8 +5,11 @@ import type { SlackMonitorContext } from "../context.js"; import type { SlackReactionEvent } from "../types.js"; import { authorizeAndResolveSlackSystemEventContext } from "./system-event-context.js"; -export function registerSlackReactionEvents(params: { ctx: SlackMonitorContext }) { - const { ctx } = params; +export function registerSlackReactionEvents(params: { + ctx: SlackMonitorContext; + trackEvent?: () => void; +}) { + const { ctx, trackEvent } = params; const handleReactionEvent = async (event: SlackReactionEvent, action: string) => { try { @@ -14,6 +17,7 @@ export function registerSlackReactionEvents(params: { ctx: SlackMonitorContext } if (!item || item.type !== "message") { return; } + trackEvent?.(); const ingressContext = await authorizeAndResolveSlackSystemEventContext({ ctx, diff --git a/src/slack/monitor/message-handler.test.ts b/src/slack/monitor/message-handler.test.ts new file mode 100644 index 00000000000..c40254ec93d --- /dev/null +++ b/src/slack/monitor/message-handler.test.ts @@ -0,0 +1,116 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { createSlackMessageHandler } from "./message-handler.js"; + +const enqueueMock = vi.fn(async (_entry: unknown) => {}); +const resolveThreadTsMock = vi.fn(async ({ message }: { message: Record }) => ({ + ...message, +})); + +vi.mock("../../auto-reply/inbound-debounce.js", () => ({ + resolveInboundDebounceMs: () => 10, + createInboundDebouncer: () => ({ + enqueue: (entry: unknown) => enqueueMock(entry), + }), +})); + +vi.mock("./thread-resolution.js", () => ({ + createSlackThreadTsResolver: () => ({ + resolve: (entry: { message: Record }) => resolveThreadTsMock(entry), + }), +})); + +function createContext(overrides?: { + markMessageSeen?: (channel: string | undefined, ts: string | undefined) => boolean; +}) { + return { + cfg: {}, + accountId: "default", + app: { + client: {}, + }, + runtime: {}, + markMessageSeen: (channel: string | undefined, ts: string | undefined) => + overrides?.markMessageSeen?.(channel, ts) ?? false, + } as Parameters[0]["ctx"]; +} + +describe("createSlackMessageHandler", () => { + beforeEach(() => { + enqueueMock.mockClear(); + resolveThreadTsMock.mockClear(); + }); + + it("does not track invalid non-message events from the message stream", async () => { + const trackEvent = vi.fn(); + const handler = createSlackMessageHandler({ + ctx: createContext(), + account: { accountId: "default" } as Parameters< + typeof createSlackMessageHandler + >[0]["account"], + trackEvent, + }); + + await handler( + { + type: "reaction_added", + channel: "D1", + ts: "123.456", + } as never, + { source: "message" }, + ); + + expect(trackEvent).not.toHaveBeenCalled(); + expect(resolveThreadTsMock).not.toHaveBeenCalled(); + expect(enqueueMock).not.toHaveBeenCalled(); + }); + + it("does not track duplicate messages that are already seen", async () => { + const trackEvent = vi.fn(); + const handler = createSlackMessageHandler({ + ctx: createContext({ markMessageSeen: () => true }), + account: { accountId: "default" } as Parameters< + typeof createSlackMessageHandler + >[0]["account"], + trackEvent, + }); + + await handler( + { + type: "message", + channel: "D1", + ts: "123.456", + text: "hello", + } as never, + { source: "message" }, + ); + + expect(trackEvent).not.toHaveBeenCalled(); + expect(resolveThreadTsMock).not.toHaveBeenCalled(); + expect(enqueueMock).not.toHaveBeenCalled(); + }); + + it("tracks accepted non-duplicate messages", async () => { + const trackEvent = vi.fn(); + const handler = createSlackMessageHandler({ + ctx: createContext(), + account: { accountId: "default" } as Parameters< + typeof createSlackMessageHandler + >[0]["account"], + trackEvent, + }); + + await handler( + { + type: "message", + channel: "D1", + ts: "123.456", + text: "hello", + } as never, + { source: "message" }, + ); + + expect(trackEvent).toHaveBeenCalledTimes(1); + expect(resolveThreadTsMock).toHaveBeenCalledTimes(1); + expect(enqueueMock).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/slack/monitor/message-handler.ts b/src/slack/monitor/message-handler.ts index ec537dfcd65..e763bfb0cc2 100644 --- a/src/slack/monitor/message-handler.ts +++ b/src/slack/monitor/message-handler.ts @@ -19,8 +19,10 @@ export type SlackMessageHandler = ( export function createSlackMessageHandler(params: { ctx: SlackMonitorContext; account: ResolvedSlackAccount; + /** Called on each inbound event to update liveness tracking. */ + trackEvent?: () => void; }): SlackMessageHandler { - const { ctx, account } = params; + const { ctx, account, trackEvent } = params; const debounceMs = resolveInboundDebounceMs({ cfg: ctx.cfg, channel: "slack" }); const threadTsResolver = createSlackThreadTsResolver({ client: ctx.app.client }); @@ -113,6 +115,7 @@ export function createSlackMessageHandler(params: { if (ctx.markMessageSeen(message.channel, message.ts)) { return; } + trackEvent?.(); const resolvedMessage = await threadTsResolver.resolve({ message, source: opts.source }); await debouncer.enqueue({ message: resolvedMessage, opts }); }; diff --git a/src/slack/monitor/provider.ts b/src/slack/monitor/provider.ts index 316791460f9..4263ecb3415 100644 --- a/src/slack/monitor/provider.ts +++ b/src/slack/monitor/provider.ts @@ -337,9 +337,18 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { removeAckAfterReply, }); - const handleSlackMessage = createSlackMessageHandler({ ctx, account }); + // Wire up event liveness tracking: update lastEventAt on every inbound event + // so the health monitor can detect "half-dead" sockets that pass health checks + // but silently stop delivering events. + const trackEvent = opts.setStatus + ? () => { + opts.setStatus!({ lastEventAt: Date.now(), lastInboundAt: Date.now() }); + } + : undefined; - registerSlackMonitorEvents({ ctx, account, handleSlackMessage }); + const handleSlackMessage = createSlackMessageHandler({ ctx, account, trackEvent }); + + registerSlackMonitorEvents({ ctx, account, handleSlackMessage, trackEvent }); await registerSlackMonitorSlashCommands({ ctx, account }); if (slackMode === "http" && slackHttpHandler) { unregisterHttpHandler = registerSlackHttpHandler({ diff --git a/src/slack/monitor/types.ts b/src/slack/monitor/types.ts index 58c103e04a5..7aa27b5a4e1 100644 --- a/src/slack/monitor/types.ts +++ b/src/slack/monitor/types.ts @@ -12,6 +12,10 @@ export type MonitorSlackOpts = { abortSignal?: AbortSignal; mediaMaxMb?: number; slashCommand?: SlackSlashCommandConfig; + /** Callback to update the channel account status snapshot (e.g. lastEventAt). */ + setStatus?: (next: Record) => void; + /** Callback to read the current channel account status snapshot. */ + getStatus?: () => Record; }; export type SlackReactionEvent = {