diff --git a/extensions/slack/src/monitor/context.ts b/extensions/slack/src/monitor/context.ts index b9cdddcbb0d..035c69eceff 100644 --- a/extensions/slack/src/monitor/context.ts +++ b/extensions/slack/src/monitor/context.ts @@ -68,6 +68,7 @@ export type SlackMonitorContext = { logger: ReturnType; markMessageSeen: (channelId: string | undefined, ts?: string) => boolean; + releaseSeenMessage: (channelId: string | undefined, ts?: string) => void; shouldDropMismatchedSlackEvent: (body: unknown) => boolean; resolveSlackSystemEventSessionKey: (params: { channelId?: string | null; @@ -160,6 +161,13 @@ export function createSlackMonitorContext(params: { return seenMessages.check(`${channelId}:${ts}`); }; + const releaseSeenMessage = (channelId: string | undefined, ts?: string) => { + if (!channelId || !ts) { + return; + } + seenMessages.delete(`${channelId}:${ts}`); + }; + const resolveSlackSystemEventSessionKey = (p: { channelId?: string | null; channelType?: string | null; @@ -433,6 +441,7 @@ export function createSlackMonitorContext(params: { removeAckAfterReply: params.removeAckAfterReply, logger, markMessageSeen, + releaseSeenMessage, shouldDropMismatchedSlackEvent, resolveSlackSystemEventSessionKey, isChannelAllowed, diff --git a/extensions/slack/src/monitor/message-handler.app-mention-race.test.ts b/extensions/slack/src/monitor/message-handler.app-mention-race.test.ts index a132f2fddc1..6b0e095caec 100644 --- a/extensions/slack/src/monitor/message-handler.app-mention-race.test.ts +++ b/extensions/slack/src/monitor/message-handler.app-mention-race.test.ts @@ -51,30 +51,41 @@ vi.mock("./message-handler/dispatch.js", () => ({ })); let createSlackMessageHandler: typeof import("./message-handler.js").createSlackMessageHandler; +let SlackRetryableInboundError: typeof import("./message-handler.js").SlackRetryableInboundError; function createMarkMessageSeen() { const seen = new Set(); - return (channel: string | undefined, ts: string | undefined) => { - if (!channel || !ts) { + return { + markMessageSeen(channel: string | undefined, ts: string | undefined) { + if (!channel || !ts) { + return false; + } + const key = `${channel}:${ts}`; + if (seen.has(key)) { + return true; + } + seen.add(key); return false; - } - const key = `${channel}:${ts}`; - if (seen.has(key)) { - return true; - } - seen.add(key); - return false; + }, + releaseSeenMessage(channel: string | undefined, ts: string | undefined) { + if (!channel || !ts) { + return; + } + seen.delete(`${channel}:${ts}`); + }, }; } function createTestHandler() { + const seenMessages = createMarkMessageSeen(); return createSlackMessageHandler({ ctx: { cfg: {}, accountId: "default", app: { client: {} }, runtime: {}, - markMessageSeen: createMarkMessageSeen(), + markMessageSeen: seenMessages.markMessageSeen, + releaseSeenMessage: seenMessages.releaseSeenMessage, } as Parameters[0]["ctx"], account: { accountId: "default" } as Parameters[0]["account"], }); @@ -118,7 +129,8 @@ async function createInFlightMessageScenario(ts: string) { describe("createSlackMessageHandler app_mention race handling", () => { beforeAll(async () => { - ({ createSlackMessageHandler } = await import("./message-handler.js")); + ({ createSlackMessageHandler, SlackRetryableInboundError } = + await import("./message-handler.js")); }); beforeEach(() => { @@ -183,4 +195,34 @@ describe("createSlackMessageHandler app_mention race handling", () => { expect(prepareSlackMessageMock).toHaveBeenCalledTimes(1); expect(dispatchPreparedSlackMessageMock).toHaveBeenCalledTimes(1); }); + + it("retries message replay after an explicit retryable dispatch failure", async () => { + prepareSlackMessageMock.mockResolvedValue({ ctxPayload: {} }); + dispatchPreparedSlackMessageMock + .mockRejectedValueOnce(new SlackRetryableInboundError("retry me")) + .mockResolvedValueOnce(undefined); + + const handler = createTestHandler(); + + await expect(sendMessageEvent(handler, "1700000000.000250")).rejects.toThrow("retry me"); + await expect(sendMessageEvent(handler, "1700000000.000250")).resolves.toBeUndefined(); + + expect(prepareSlackMessageMock).toHaveBeenCalledTimes(2); + expect(dispatchPreparedSlackMessageMock).toHaveBeenCalledTimes(2); + }); + + it("keeps message replay deduped after a non-retryable dispatch failure", async () => { + prepareSlackMessageMock.mockResolvedValue({ ctxPayload: {} }); + dispatchPreparedSlackMessageMock.mockRejectedValueOnce(new Error("post-send failure")); + + const handler = createTestHandler(); + + await expect(sendMessageEvent(handler, "1700000000.000300")).rejects.toThrow( + "post-send failure", + ); + await sendMessageEvent(handler, "1700000000.000300"); + + expect(prepareSlackMessageMock).toHaveBeenCalledTimes(1); + expect(dispatchPreparedSlackMessageMock).toHaveBeenCalledTimes(1); + }); }); diff --git a/extensions/slack/src/monitor/message-handler.test.ts b/extensions/slack/src/monitor/message-handler.test.ts index 778f29a5586..f072d5136e2 100644 --- a/extensions/slack/src/monitor/message-handler.test.ts +++ b/extensions/slack/src/monitor/message-handler.test.ts @@ -32,6 +32,7 @@ vi.mock("./thread-resolution.js", () => ({ function createContext(overrides?: { markMessageSeen?: (channel: string | undefined, ts: string | undefined) => boolean; + releaseSeenMessage?: (channel: string | undefined, ts: string | undefined) => void; }) { return { cfg: {}, @@ -42,11 +43,14 @@ function createContext(overrides?: { runtime: {}, markMessageSeen: (channel: string | undefined, ts: string | undefined) => overrides?.markMessageSeen?.(channel, ts) ?? false, + releaseSeenMessage: (channel: string | undefined, ts: string | undefined) => + overrides?.releaseSeenMessage?.(channel, ts), } as Parameters[0]["ctx"]; } function createHandlerWithTracker(overrides?: { markMessageSeen?: (channel: string | undefined, ts: string | undefined) => boolean; + releaseSeenMessage?: (channel: string | undefined, ts: string | undefined) => void; }) { const trackEvent = vi.fn(); const handler = createSlackMessageHandler({ diff --git a/extensions/slack/src/monitor/message-handler.ts b/extensions/slack/src/monitor/message-handler.ts index fb700b78350..d5ecf1f7ac9 100644 --- a/extensions/slack/src/monitor/message-handler.ts +++ b/extensions/slack/src/monitor/message-handler.ts @@ -17,6 +17,13 @@ export type SlackMessageHandler = ( const APP_MENTION_RETRY_TTL_MS = 60_000; +export class SlackRetryableInboundError extends Error { + constructor(message: string, options?: ErrorOptions) { + super(message, options); + this.name = "SlackRetryableInboundError"; + } +} + function resolveSlackSenderId(message: SlackMessageEvent): string | null { return message.user ?? message.bot_id ?? null; } @@ -133,40 +140,53 @@ export function createSlackMessageHandler(params: { ...last.message, text: combinedText, }; - const prepared = await prepareSlackMessage({ - ctx, - account, - message: syntheticMessage, - opts: { - ...last.opts, - wasMentioned: combinedMentioned || last.opts.wasMentioned, - }, - }); const seenMessageKey = buildSeenMessageKey(last.message.channel, last.message.ts); - if (!prepared) { - return; - } - if (seenMessageKey) { - pruneAppMentionRetryKeys(Date.now()); - if (last.opts.source === "app_mention") { - // If app_mention wins the race and dispatches first, drop the later message dispatch. - appMentionDispatchedKeys.set(seenMessageKey, Date.now() + APP_MENTION_RETRY_TTL_MS); - } else if (last.opts.source === "message" && appMentionDispatchedKeys.has(seenMessageKey)) { - appMentionDispatchedKeys.delete(seenMessageKey); - appMentionRetryKeys.delete(seenMessageKey); + try { + const prepared = await prepareSlackMessage({ + ctx, + account, + message: syntheticMessage, + opts: { + ...last.opts, + wasMentioned: combinedMentioned || last.opts.wasMentioned, + }, + }); + if (!prepared) { return; } - appMentionRetryKeys.delete(seenMessageKey); - } - if (entries.length > 1) { - const ids = entries.map((entry) => entry.message.ts).filter(Boolean) as string[]; - if (ids.length > 0) { - prepared.ctxPayload.MessageSids = ids; - prepared.ctxPayload.MessageSidFirst = ids[0]; - prepared.ctxPayload.MessageSidLast = ids[ids.length - 1]; + if (seenMessageKey) { + pruneAppMentionRetryKeys(Date.now()); + if (last.opts.source === "app_mention") { + // If app_mention wins the race and dispatches first, drop the later message dispatch. + appMentionDispatchedKeys.set(seenMessageKey, Date.now() + APP_MENTION_RETRY_TTL_MS); + } else if ( + last.opts.source === "message" && + appMentionDispatchedKeys.has(seenMessageKey) + ) { + appMentionDispatchedKeys.delete(seenMessageKey); + appMentionRetryKeys.delete(seenMessageKey); + return; + } + appMentionRetryKeys.delete(seenMessageKey); } + if (entries.length > 1) { + const ids = entries.map((entry) => entry.message.ts).filter(Boolean) as string[]; + if (ids.length > 0) { + prepared.ctxPayload.MessageSids = ids; + prepared.ctxPayload.MessageSidFirst = ids[0]; + prepared.ctxPayload.MessageSidLast = ids[ids.length - 1]; + } + } + await dispatchPreparedSlackMessage(prepared); + } catch (error) { + if (error instanceof SlackRetryableInboundError) { + if (seenMessageKey) { + appMentionDispatchedKeys.delete(seenMessageKey); + } + ctx.releaseSeenMessage(last.message.channel, last.message.ts); + } + throw error; } - await dispatchPreparedSlackMessage(prepared); }, onError: (err) => { ctx.runtime.error?.(`slack inbound debounce flush failed: ${String(err)}`); diff --git a/extensions/slack/src/monitor/message-handler/prepare.test.ts b/extensions/slack/src/monitor/message-handler/prepare.test.ts index 20ac6e4cf8a..e03a92fba2f 100644 --- a/extensions/slack/src/monitor/message-handler/prepare.test.ts +++ b/extensions/slack/src/monitor/message-handler/prepare.test.ts @@ -642,6 +642,7 @@ describe("prepareSlackMessage sender prefix", () => { removeAckAfterReply: false, logger: { info: vi.fn(), warn: vi.fn() }, markMessageSeen: () => false, + releaseSeenMessage: () => {}, shouldDropMismatchedSlackEvent: () => false, resolveSlackSystemEventSessionKey: () => "agent:main:slack:channel:c1", isChannelAllowed: () => true,