diff --git a/extensions/slack/src/monitor/message-handler.test.ts b/extensions/slack/src/monitor/message-handler.test.ts index 361d6a67ba08..d6c22e5e5d50 100644 --- a/extensions/slack/src/monitor/message-handler.test.ts +++ b/extensions/slack/src/monitor/message-handler.test.ts @@ -6,6 +6,8 @@ const flushKeyMock = vi.fn(async (_key: string) => {}); const onFlushCallbacks: Array<(entries: Array>) => Promise> = []; const prepareSlackMessageMock = vi.fn(async () => ({ ctxPayload: {} })); const dispatchPreparedSlackMessageMock = vi.fn(async () => {}); +const hasSlackInboundMessageDeliveryMock = vi.fn(async () => false); +const recordSlackInboundMessageDeliveriesMock = vi.fn(async () => {}); const resolveThreadTsMock = vi.fn(async ({ message }: { message: Record }) => ({ ...message, })); @@ -45,8 +47,8 @@ vi.mock("./message-handler/pipeline.runtime.js", () => ({ })); vi.mock("./inbound-delivery-state.js", () => ({ - hasSlackInboundMessageDelivery: vi.fn(async () => false), - recordSlackInboundMessageDeliveries: vi.fn(async () => {}), + hasSlackInboundMessageDelivery: hasSlackInboundMessageDeliveryMock, + recordSlackInboundMessageDeliveries: recordSlackInboundMessageDeliveriesMock, })); function createContext(overrides?: { @@ -101,6 +103,9 @@ describe("createSlackMessageHandler", () => { onFlushCallbacks.length = 0; prepareSlackMessageMock.mockClear(); dispatchPreparedSlackMessageMock.mockClear(); + hasSlackInboundMessageDeliveryMock.mockReset(); + hasSlackInboundMessageDeliveryMock.mockResolvedValue(false); + recordSlackInboundMessageDeliveriesMock.mockClear(); resolveThreadTsMock.mockClear(); }); @@ -270,4 +275,138 @@ describe("createSlackMessageHandler", () => { const flushFailure = expect(onFlushCallbacks[0]?.([entry])).rejects.toThrow("dispatch failed"); await Promise.all([handledFailure, flushFailure]); }); + + it("retries native session initialization conflicts through the delivery gates", async () => { + const releaseSeenMessage = vi.fn(); + dispatchPreparedSlackMessageMock.mockRejectedValueOnce( + new Error("Slack dispatch failed", { + cause: new Error( + "reply session initialization conflicted for agent:main:main:thread:123.456", + ), + }), + ); + const { handler } = createHandlerWithTracker({ releaseSeenMessage }); + await handler( + { + type: "message", + channel: "C111", + user: "U111", + ts: "1709000000.000700", + text: "native message", + } as never, + { source: "message" }, + ); + + const entry = enqueueMock.mock.calls[0]?.[0] as Record; + vi.useFakeTimers(); + try { + await expect(onFlushCallbacks[0]?.([entry])).rejects.toThrow("Slack dispatch failed"); + await vi.advanceTimersByTimeAsync(1000); + + expect(releaseSeenMessage).toHaveBeenCalledWith("C111", "1709000000.000700"); + expect(recordSlackInboundMessageDeliveriesMock).not.toHaveBeenCalled(); + expect(hasSlackInboundMessageDeliveryMock).toHaveBeenCalledTimes(2); + expect(enqueueMock).toHaveBeenCalledTimes(2); + expect(enqueueMock.mock.calls[1]?.[0]).toMatchObject({ + opts: { + retryAttempt: 1, + }, + }); + expect(enqueueMock.mock.calls[1]?.[0]).not.toHaveProperty("opts.dispatchCompletion"); + } finally { + vi.useRealTimers(); + } + }); + + it("leaves relay session conflict retries to unacknowledged redelivery", async () => { + const releaseSeenMessage = vi.fn(); + dispatchPreparedSlackMessageMock.mockRejectedValueOnce( + new Error("Slack dispatch failed", { + cause: new Error( + "reply session initialization conflicted for agent:main:main:thread:123.456", + ), + }), + ); + const { handler } = createHandlerWithTracker({ releaseSeenMessage }); + const handled = handler( + { + type: "message", + channel: "C111", + user: "U111", + ts: "1709000000.000800", + text: "relay message", + } as never, + { source: "message", awaitDispatch: true }, + ); + + await vi.waitFor(() => expect(enqueueMock).toHaveBeenCalledTimes(1)); + const entry = enqueueMock.mock.calls[0]?.[0] as Record; + vi.useFakeTimers(); + try { + const handledFailure = expect(handled).rejects.toThrow("Slack dispatch failed"); + const flushFailure = expect(onFlushCallbacks[0]?.([entry])).rejects.toThrow( + "Slack dispatch failed", + ); + await Promise.all([handledFailure, flushFailure]); + await vi.advanceTimersByTimeAsync(1000); + + expect(releaseSeenMessage).toHaveBeenCalledWith("C111", "1709000000.000800"); + expect(recordSlackInboundMessageDeliveriesMock).not.toHaveBeenCalled(); + expect(enqueueMock).toHaveBeenCalledTimes(1); + } finally { + vi.useRealTimers(); + } + }); + + it("settles an already-delivered relay event without enqueueing", async () => { + hasSlackInboundMessageDeliveryMock.mockResolvedValueOnce(true); + const { handler } = createHandlerWithTracker(); + + await expect( + handler( + { + type: "message", + channel: "C111", + user: "U111", + ts: "1709000000.000850", + text: "relay replay", + } as never, + { source: "message", awaitDispatch: true }, + ), + ).resolves.toBeUndefined(); + + expect(enqueueMock).not.toHaveBeenCalled(); + }); + + it("skips a native retry when another delivery already succeeded", async () => { + dispatchPreparedSlackMessageMock.mockRejectedValueOnce( + new Error("reply session initialization conflicted for agent:main:main:thread:123.456"), + ); + hasSlackInboundMessageDeliveryMock.mockResolvedValueOnce(false).mockResolvedValueOnce(true); + const { handler } = createHandlerWithTracker(); + await handler( + { + type: "message", + channel: "C111", + user: "U111", + ts: "1709000000.000900", + text: "native message", + } as never, + { source: "message" }, + ); + + const entry = enqueueMock.mock.calls[0]?.[0] as Record; + vi.useFakeTimers(); + try { + await expect(onFlushCallbacks[0]?.([entry])).rejects.toThrow( + "reply session initialization conflicted", + ); + await vi.advanceTimersByTimeAsync(1000); + + expect(hasSlackInboundMessageDeliveryMock).toHaveBeenCalledTimes(2); + expect(enqueueMock).toHaveBeenCalledTimes(1); + } finally { + vi.useRealTimers(); + } + }); }); diff --git a/extensions/slack/src/monitor/message-handler.ts b/extensions/slack/src/monitor/message-handler.ts index 38eb4aa8153d..87abe225c379 100644 --- a/extensions/slack/src/monitor/message-handler.ts +++ b/extensions/slack/src/monitor/message-handler.ts @@ -3,7 +3,7 @@ import { createChannelInboundDebouncer, shouldDebounceTextInbound, } from "openclaw/plugin-sdk/channel-inbound"; -import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; +import { collectErrorGraphCandidates, formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { createLazyRuntimeModule } from "openclaw/plugin-sdk/lazy-runtime"; import { asDateTimestampMs, @@ -45,7 +45,11 @@ type SlackDispatchCompletion = { reject: (error: unknown) => void; }; -type QueuedSlackMessageOptions = Parameters[1] & { +type IngressSlackMessageOptions = Parameters[1] & { + retryAttempt?: number; +}; + +type QueuedSlackMessageOptions = IngressSlackMessageOptions & { dispatchCompletion?: Omit; }; @@ -60,6 +64,9 @@ function createSlackDispatchCompletion(): SlackDispatchCompletion { } const APP_MENTION_RETRY_TTL_MS = 60_000; +const RETRYABLE_FLUSH_MAX_ATTEMPTS = 3; +const RETRYABLE_FLUSH_RETRY_DELAY_MS = 1_000; +const REPLY_SESSION_INIT_CONFLICT_MESSAGE_RE = /reply session initialization conflicted for \S+/u; export class SlackRetryableInboundError extends Error { constructor(message: string, options?: ErrorOptions) { @@ -68,6 +75,15 @@ export class SlackRetryableInboundError extends Error { } } +function isRetryableSlackInboundError(error: unknown): boolean { + if (error instanceof SlackRetryableInboundError) { + return true; + } + return collectErrorGraphCandidates(error, (current) => [current.cause, current.error]).some( + (candidate) => REPLY_SESSION_INIT_CONFLICT_MESSAGE_RE.test(formatErrorMessage(candidate)), + ); +} + function shouldDebounceSlackMessage(message: SlackMessageEvent, cfg: SlackMonitorContext["cfg"]) { const text = message.text ?? ""; const textForCommandDetection = stripSlackMentionsForCommandDetection(text); @@ -101,6 +117,46 @@ export function createSlackMessageHandler(params: { buildKey: (entry) => buildSlackDebounceKey(entry.message, ctx.accountId), shouldDebounce: (entry) => shouldDebounceSlackMessage(entry.message, ctx.cfg), onFlush: async (entries) => { + const retryEntries = (sourceError: unknown): boolean => { + if (!isRetryableSlackInboundError(sourceError)) { + return false; + } + const nextEntries = entries + .map((entry) => { + // Relay delivery owns retry until its dispatch completion is acknowledged. + // Scheduling here as well can race the router redelivery and duplicate a reply. + if (entry.opts.dispatchCompletion) { + return null; + } + const retryAttempt = entry.opts.retryAttempt ?? 0; + if (retryAttempt >= RETRYABLE_FLUSH_MAX_ATTEMPTS) { + return null; + } + const { dispatchCompletion: _dispatchCompletion, ...retryOpts } = entry.opts; + return { + ...entry, + opts: { + ...retryOpts, + retryAttempt: retryAttempt + 1, + }, + }; + }) + .filter((entry) => entry !== null); + if (nextEntries.length === 0) { + return false; + } + const retryTimer = setTimeout(() => { + for (const entry of nextEntries) { + // Re-enter ingress so a relay replay or another successful attempt wins + // through the normal delivery and seen-message gates before dispatch. + void enqueueSlackMessage(entry.message, entry.opts).catch((err: unknown) => { + ctx.runtime.error?.(`slack inbound retry enqueue failed: ${formatErrorMessage(err)}`); + }); + } + }, RETRYABLE_FLUSH_RETRY_DELAY_MS); + retryTimer.unref?.(); + return true; + }; const completions = entries .map((entry) => entry.opts.dispatchCompletion) .filter((completion) => completion !== undefined); @@ -187,7 +243,7 @@ export function createSlackMessageHandler(params: { messages: entries.map((entry) => entry.message), }); } catch (error) { - if (!(error instanceof SlackRetryableInboundError)) { + if (!isRetryableSlackInboundError(error)) { await recordSlackInboundMessageDeliveries({ accountId: ctx.accountId, messages: entries.map((entry) => entry.message), @@ -196,11 +252,16 @@ export function createSlackMessageHandler(params: { throw error; } } catch (error) { - if (error instanceof SlackRetryableInboundError) { - if (seenMessageKey) { - appMentionDispatchedKeys.delete(seenMessageKey); + if (isRetryableSlackInboundError(error)) { + // Every buffered event passed the seen gate before this combined dispatch. + // Release all of them so the retry can rebuild the same batch. + for (const entry of entries) { + const entrySeenKey = buildSeenMessageKey(entry.message.channel, entry.message.ts); + if (entrySeenKey) { + appMentionDispatchedKeys.delete(entrySeenKey); + } + ctx.releaseSeenMessage(entry.message.channel, entry.message.ts); } - ctx.releaseSeenMessage(last.message.channel, last.message.ts); } throw error; } @@ -209,6 +270,7 @@ export function createSlackMessageHandler(params: { completion.resolve(); } } catch (error) { + retryEntries(error); for (const completion of completions) { completion.reject(error); } @@ -271,9 +333,12 @@ export function createSlackMessageHandler(params: { return true; }; - return async (message, opts) => { + async function enqueueSlackMessage( + message: SlackMessageEvent, + opts: IngressSlackMessageOptions, + ): Promise { if (opts.source === "message" && message.type !== "message") { - return; + return undefined; } if ( opts.source === "message" && @@ -282,7 +347,7 @@ export function createSlackMessageHandler(params: { message.subtype !== "bot_message" && message.subtype !== "thread_broadcast" ) { - return; + return undefined; } const seenMessageKey = buildSeenMessageKey(message.channel, message.ts); if ( @@ -293,7 +358,7 @@ export function createSlackMessageHandler(params: { ts: message.ts, })) ) { - return; + return undefined; } const wasSeen = seenMessageKey ? ctx.markMessageSeen(message.channel, message.ts) : false; if (seenMessageKey && opts.source === "message" && !wasSeen) { @@ -305,7 +370,7 @@ export function createSlackMessageHandler(params: { // Allow exactly one app_mention retry if the same ts was previously dropped // from the message stream before it reached dispatch. if (opts.source !== "app_mention" || !consumeAppMentionRetryKey(seenMessageKey)) { - return; + return undefined; } } trackEvent?.(); @@ -342,6 +407,11 @@ export function createSlackMessageHandler(params: { : {}), }, }); + return dispatchCompletion; + } + + return async (message, opts) => { + const dispatchCompletion = await enqueueSlackMessage(message, opts); await dispatchCompletion?.promise; }; }