diff --git a/extensions/discord/src/monitor/inbound-dedupe.ts b/extensions/discord/src/monitor/inbound-dedupe.ts new file mode 100644 index 00000000000..5f7ab4a2c45 --- /dev/null +++ b/extensions/discord/src/monitor/inbound-dedupe.ts @@ -0,0 +1,73 @@ +import { createClaimableDedupe, type ClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe"; +import type { DiscordMessageEvent } from "./listeners.js"; +import { resolveDiscordMessageChannelId } from "./message-utils.js"; + +const RECENT_DISCORD_MESSAGE_TTL_MS = 5 * 60_000; +const RECENT_DISCORD_MESSAGE_MAX = 5000; + +export function createDiscordInboundReplayGuard(): ClaimableDedupe { + return createClaimableDedupe({ + ttlMs: RECENT_DISCORD_MESSAGE_TTL_MS, + memoryMaxSize: RECENT_DISCORD_MESSAGE_MAX, + }); +} + +export class DiscordRetryableInboundError extends Error { + constructor(message: string, options?: ErrorOptions) { + super(message, options); + this.name = "DiscordRetryableInboundError"; + } +} + +export function buildDiscordInboundReplayKey(params: { + accountId: string; + data: DiscordMessageEvent; +}): string | null { + const messageId = params.data.message?.id?.trim(); + if (!messageId) { + return null; + } + const channelId = resolveDiscordMessageChannelId({ + message: params.data.message, + eventChannelId: params.data.channel_id, + }); + if (!channelId) { + return null; + } + return `${params.accountId}:${channelId}:${messageId}`; +} + +export async function claimDiscordInboundReplay(params: { + replayKey?: string | null; + replayGuard: ClaimableDedupe; +}): Promise { + const replayKey = params.replayKey?.trim(); + if (!replayKey) { + return true; + } + const claim = await params.replayGuard.claim(replayKey); + return claim.kind === "claimed"; +} + +export async function commitDiscordInboundReplay(params: { + replayKeys?: readonly (string | null | undefined)[]; + replayGuard: ClaimableDedupe; +}): Promise { + const replayKeys = normalizeDiscordInboundReplayKeys(params.replayKeys); + await Promise.all(replayKeys.map((replayKey) => params.replayGuard.commit(replayKey))); +} + +export function releaseDiscordInboundReplay(params: { + replayKeys?: readonly (string | null | undefined)[]; + replayGuard: ClaimableDedupe; + error?: unknown; +}): void { + const replayKeys = normalizeDiscordInboundReplayKeys(params.replayKeys); + replayKeys.forEach((replayKey) => params.replayGuard.release(replayKey, { error: params.error })); +} + +function normalizeDiscordInboundReplayKeys( + replayKeys?: readonly (string | null | undefined)[], +): string[] { + return [...new Set((replayKeys ?? []).map((replayKey) => replayKey?.trim()).filter(Boolean))]; +} diff --git a/extensions/discord/src/monitor/inbound-job.test.ts b/extensions/discord/src/monitor/inbound-job.test.ts index 756602336d9..8cf7983792a 100644 --- a/extensions/discord/src/monitor/inbound-job.test.ts +++ b/extensions/discord/src/monitor/inbound-job.test.ts @@ -92,7 +92,7 @@ describe("buildDiscordInboundJob", () => { it("re-materializes the process context with an overridden abort signal", async () => { const ctx = await createBaseDiscordMessageContext(); - const job = buildDiscordInboundJob(ctx); + const job = buildDiscordInboundJob(ctx, { replayKeys: ["default:ch-1:m-1"] }); const overrideAbortController = new AbortController(); const rematerialized = materializeDiscordInboundJob(job, overrideAbortController.signal); @@ -103,6 +103,7 @@ describe("buildDiscordInboundJob", () => { expect(rematerialized.abortSignal).toBe(overrideAbortController.signal); expect(rematerialized.message).toEqual(job.payload.message); expect(rematerialized.data).toEqual(job.payload.data); + expect(job.replayKeys).toEqual(["default:ch-1:m-1"]); }); it("preserves Carbon message getters across queued jobs", async () => { diff --git a/extensions/discord/src/monitor/inbound-job.ts b/extensions/discord/src/monitor/inbound-job.ts index 2f8c9520f12..087ca41b6cb 100644 --- a/extensions/discord/src/monitor/inbound-job.ts +++ b/extensions/discord/src/monitor/inbound-job.ts @@ -22,6 +22,7 @@ export type DiscordInboundJob = { queueKey: string; payload: DiscordInboundJobPayload; runtime: DiscordInboundJobRuntime; + replayKeys?: string[]; }; export function resolveDiscordInboundJobQueueKey(ctx: DiscordMessagePreflightContext): string { @@ -36,7 +37,10 @@ export function resolveDiscordInboundJobQueueKey(ctx: DiscordMessagePreflightCon return ctx.messageChannelId; } -export function buildDiscordInboundJob(ctx: DiscordMessagePreflightContext): DiscordInboundJob { +export function buildDiscordInboundJob( + ctx: DiscordMessagePreflightContext, + options?: { replayKeys?: readonly string[] }, +): DiscordInboundJob { const { runtime, abortSignal, @@ -70,6 +74,7 @@ export function buildDiscordInboundJob(ctx: DiscordMessagePreflightContext): Dis threadBindings, discordRestFetch, }, + replayKeys: options?.replayKeys ? [...options.replayKeys] : undefined, }; } diff --git a/extensions/discord/src/monitor/inbound-worker.ts b/extensions/discord/src/monitor/inbound-worker.ts index 9340534ffdd..fb34e7e87d8 100644 --- a/extensions/discord/src/monitor/inbound-worker.ts +++ b/extensions/discord/src/monitor/inbound-worker.ts @@ -1,7 +1,14 @@ import { createRunStateMachine } from "openclaw/plugin-sdk/channel-lifecycle"; import { KeyedAsyncQueue } from "openclaw/plugin-sdk/keyed-async-queue"; +import type { ClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe"; import { danger, formatDurationSeconds } from "openclaw/plugin-sdk/runtime-env"; import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime"; +import { + commitDiscordInboundReplay, + createDiscordInboundReplayGuard, + DiscordRetryableInboundError, + releaseDiscordInboundReplay, +} from "./inbound-dedupe.js"; import { materializeDiscordInboundJob, type DiscordInboundJob } from "./inbound-job.js"; import type { RuntimeEnv } from "./message-handler.preflight.types.js"; import { processDiscordMessage } from "./message-handler.process.js"; @@ -15,6 +22,7 @@ type DiscordInboundWorkerParams = { setStatus?: DiscordMonitorStatusSink; abortSignal?: AbortSignal; runTimeoutMs?: number; + replayGuard?: ClaimableDedupe; __testing?: DiscordInboundWorkerTestingHooks; }; @@ -46,6 +54,7 @@ async function processDiscordInboundJob(params: { runtime: RuntimeEnv; lifecycleSignal?: AbortSignal; runTimeoutMs?: number; + replayGuard: ClaimableDedupe; testing?: DiscordInboundWorkerTestingHooks; }) { const timeoutMs = normalizeDiscordInboundWorkerTimeoutMs(params.runTimeoutMs); @@ -54,50 +63,70 @@ async function processDiscordInboundJob(params: { let createdThreadId: string | undefined; let sessionKey: string | undefined; const processDiscordMessageImpl = params.testing?.processDiscordMessage ?? processDiscordMessage; - await runDiscordTaskWithTimeout({ - run: async (abortSignal) => { - await processDiscordMessageImpl(materializeDiscordInboundJob(params.job, abortSignal), { - onFinalReplyStart: () => { - finalReplyStarted = true; - }, - onFinalReplyDelivered: () => { - finalReplyStarted = true; - }, - onReplyPlanResolved: (resolved) => { - createdThreadId = normalizeOptionalString(resolved.createdThreadId); - sessionKey = normalizeOptionalString(resolved.sessionKey); - }, + try { + await runDiscordTaskWithTimeout({ + run: async (abortSignal) => { + await processDiscordMessageImpl(materializeDiscordInboundJob(params.job, abortSignal), { + onFinalReplyStart: () => { + finalReplyStarted = true; + }, + onFinalReplyDelivered: () => { + finalReplyStarted = true; + }, + onReplyPlanResolved: (resolved) => { + createdThreadId = normalizeOptionalString(resolved.createdThreadId); + sessionKey = normalizeOptionalString(resolved.sessionKey); + }, + }); + }, + timeoutMs, + abortSignals: [params.job.runtime.abortSignal, params.lifecycleSignal], + onTimeout: async (resolvedTimeoutMs) => { + params.runtime.error?.( + danger( + `discord inbound worker timed out after ${formatDurationSeconds(resolvedTimeoutMs, { + decimals: 1, + unit: "seconds", + })}${contextSuffix}`, + ), + ); + if (finalReplyStarted) { + return; + } + await sendDiscordInboundWorkerTimeoutReply({ + job: params.job, + runtime: params.runtime, + contextSuffix, + createdThreadId, + sessionKey, + deliverDiscordReplyImpl: params.testing?.deliverDiscordReply, + }); + }, + onErrorAfterTimeout: (error) => { + params.runtime.error?.( + danger(`discord inbound worker failed after timeout: ${String(error)}${contextSuffix}`), + ); + }, + }); + await commitDiscordInboundReplay({ + replayKeys: params.job.replayKeys, + replayGuard: params.replayGuard, + }); + } catch (error) { + if (error instanceof DiscordRetryableInboundError) { + releaseDiscordInboundReplay({ + replayKeys: params.job.replayKeys, + error, + replayGuard: params.replayGuard, }); - }, - timeoutMs, - abortSignals: [params.job.runtime.abortSignal, params.lifecycleSignal], - onTimeout: async (resolvedTimeoutMs) => { - params.runtime.error?.( - danger( - `discord inbound worker timed out after ${formatDurationSeconds(resolvedTimeoutMs, { - decimals: 1, - unit: "seconds", - })}${contextSuffix}`, - ), - ); - if (finalReplyStarted) { - return; - } - await sendDiscordInboundWorkerTimeoutReply({ - job: params.job, - runtime: params.runtime, - contextSuffix, - createdThreadId, - sessionKey, - deliverDiscordReplyImpl: params.testing?.deliverDiscordReply, + } else { + await commitDiscordInboundReplay({ + replayKeys: params.job.replayKeys, + replayGuard: params.replayGuard, }); - }, - onErrorAfterTimeout: (error) => { - params.runtime.error?.( - danger(`discord inbound worker failed after timeout: ${String(error)}${contextSuffix}`), - ); - }, - }); + } + throw error; + } } async function sendDiscordInboundWorkerTimeoutReply(params: { @@ -163,6 +192,7 @@ export function createDiscordInboundWorker( setStatus: params.setStatus, abortSignal: params.abortSignal, }); + const replayGuard = params.replayGuard ?? createDiscordInboundReplayGuard(); return { enqueue(job) { @@ -181,6 +211,7 @@ export function createDiscordInboundWorker( runtime: params.runtime, lifecycleSignal: params.abortSignal, runTimeoutMs: params.runTimeoutMs, + replayGuard, testing: params.__testing, }); } finally { diff --git a/extensions/discord/src/monitor/message-handler.queue.test.ts b/extensions/discord/src/monitor/message-handler.queue.test.ts index 61f02ab5c6a..81bfed66913 100644 --- a/extensions/discord/src/monitor/message-handler.queue.test.ts +++ b/extensions/discord/src/monitor/message-handler.queue.test.ts @@ -1,4 +1,5 @@ import { describe, expect, it, vi } from "vitest"; +import { DiscordRetryableInboundError } from "./inbound-dedupe.js"; import { createDiscordMessageHandler, preflightDiscordMessageMock, @@ -252,6 +253,69 @@ describe("createDiscordMessageHandler queue behavior", () => { expect(preflightDiscordMessageMock).toHaveBeenCalledTimes(1); }); + it("retries duplicate deliveries after an explicit retryable worker failure", async () => { + preflightDiscordMessageMock.mockReset(); + processDiscordMessageMock.mockReset(); + + processDiscordMessageMock + .mockRejectedValueOnce(new DiscordRetryableInboundError("retry me")) + .mockResolvedValueOnce(undefined); + const params = createDiscordHandlerParams(); + const handler = createDiscordMessageHandler(params); + installDefaultDiscordPreflight(); + const duplicate = createMessageData("m-retry"); + + await expect(handler(duplicate as never, {} as never)).resolves.toBeUndefined(); + await vi.waitFor(() => { + expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); + }); + await vi.waitFor(() => { + expect(params.runtime.error).toHaveBeenCalledWith( + expect.stringContaining( + "discord inbound worker failed: DiscordRetryableInboundError: retry me", + ), + ); + }); + + await expect(handler(duplicate as never, {} as never)).resolves.toBeUndefined(); + await vi.waitFor(() => { + expect(processDiscordMessageMock).toHaveBeenCalledTimes(2); + }); + expect(preflightDiscordMessageMock).toHaveBeenCalledTimes(2); + }); + + it("keeps replay committed after a non-retryable worker failure", async () => { + preflightDiscordMessageMock.mockReset(); + processDiscordMessageMock.mockReset(); + + const visibleSideEffect = vi.fn(); + processDiscordMessageMock.mockImplementationOnce(async () => { + visibleSideEffect(); + throw new Error("post-send failure"); + }); + const params = createDiscordHandlerParams(); + const handler = createDiscordMessageHandler(params); + installDefaultDiscordPreflight(); + const duplicate = createMessageData("m-fail"); + + await expect(handler(duplicate as never, {} as never)).resolves.toBeUndefined(); + await vi.waitFor(() => { + expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); + }); + await vi.waitFor(() => { + expect(params.runtime.error).toHaveBeenCalledWith( + expect.stringContaining("discord inbound worker failed: Error: post-send failure"), + ); + }); + + await expect(handler(duplicate as never, {} as never)).resolves.toBeUndefined(); + await Promise.resolve(); + + expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); + expect(preflightDiscordMessageMock).toHaveBeenCalledTimes(1); + expect(visibleSideEffect).toHaveBeenCalledTimes(1); + }); + it("applies explicit inbound worker timeout to queued runs so stalled runs do not block the queue", async () => { vi.useFakeTimers(); try { diff --git a/extensions/discord/src/monitor/message-handler.ts b/extensions/discord/src/monitor/message-handler.ts index f4ede4bcabe..ab168faa622 100644 --- a/extensions/discord/src/monitor/message-handler.ts +++ b/extensions/discord/src/monitor/message-handler.ts @@ -4,8 +4,15 @@ import { shouldDebounceTextInbound, } from "openclaw/plugin-sdk/channel-inbound"; import { resolveOpenProviderRuntimeGroupPolicy } from "openclaw/plugin-sdk/config-runtime"; -import { createDedupeCache } from "openclaw/plugin-sdk/infra-runtime"; import { danger } from "openclaw/plugin-sdk/runtime-env"; +import { + buildDiscordInboundReplayKey, + claimDiscordInboundReplay, + commitDiscordInboundReplay, + createDiscordInboundReplayGuard, + DiscordRetryableInboundError, + releaseDiscordInboundReplay, +} from "./inbound-dedupe.js"; import { buildDiscordInboundJob } from "./inbound-job.js"; import { createDiscordInboundWorker, @@ -40,27 +47,6 @@ export type DiscordMessageHandlerWithLifecycle = DiscordMessageHandler & { deactivate: () => void; }; -const RECENT_DISCORD_MESSAGE_TTL_MS = 5 * 60_000; -const RECENT_DISCORD_MESSAGE_MAX = 5000; - -function buildDiscordInboundDedupeKey(params: { - accountId: string; - data: DiscordMessageEvent; -}): string | null { - const messageId = params.data.message?.id?.trim(); - if (!messageId) { - return null; - } - const channelId = resolveDiscordMessageChannelId({ - message: params.data.message, - eventChannelId: params.data.channel_id, - }); - if (!channelId) { - return null; - } - return `${params.accountId}:${channelId}:${messageId}`; -} - export function createDiscordMessageHandler( params: DiscordMessageHandlerParams, ): DiscordMessageHandlerWithLifecycle { @@ -75,22 +61,21 @@ export function createDiscordMessageHandler( "group-mentions"; const preflightDiscordMessageImpl = params.__testing?.preflightDiscordMessage ?? preflightDiscordMessage; + const replayGuard = createDiscordInboundReplayGuard(); const inboundWorker = createDiscordInboundWorker({ runtime: params.runtime, setStatus: params.setStatus, abortSignal: params.abortSignal, runTimeoutMs: params.workerRunTimeoutMs, + replayGuard, __testing: params.__testing, }); - const recentInboundMessages = createDedupeCache({ - ttlMs: RECENT_DISCORD_MESSAGE_TTL_MS, - maxSize: RECENT_DISCORD_MESSAGE_MAX, - }); const { debouncer } = createChannelInboundDebouncer<{ data: DiscordMessageEvent; client: Client; abortSignal?: AbortSignal; + replayKey?: string; }>({ cfg: params.cfg, channel: "discord", @@ -129,70 +114,90 @@ export function createDiscordMessageHandler( if (!last) { return; } + const replayKeys = entries.map((entry) => entry.replayKey).filter(Boolean); const abortSignal = last.abortSignal; if (abortSignal?.aborted) { + releaseDiscordInboundReplay({ + replayKeys, + error: abortSignal.reason, + replayGuard, + }); return; } - if (entries.length === 1) { + try { + if (entries.length === 1) { + const ctx = await preflightDiscordMessageImpl({ + ...params, + ackReactionScope, + groupPolicy, + abortSignal, + data: last.data, + client: last.client, + }); + if (!ctx) { + await commitDiscordInboundReplay({ replayKeys, replayGuard }); + return; + } + applyImplicitReplyBatchGate(ctx, params.replyToMode, false); + inboundWorker.enqueue(buildDiscordInboundJob(ctx, { replayKeys })); + return; + } + const combinedBaseText = entries + .map((entry) => + resolveDiscordMessageText(entry.data.message, { includeForwarded: false }), + ) + .filter(Boolean) + .join("\n"); + const syntheticMessage = { + ...last.data.message, + content: combinedBaseText, + attachments: [], + message_snapshots: (last.data.message as { message_snapshots?: unknown }) + .message_snapshots, + messageSnapshots: (last.data.message as { messageSnapshots?: unknown }).messageSnapshots, + rawData: { + ...(last.data.message as { rawData?: Record }).rawData, + }, + }; + const syntheticData: DiscordMessageEvent = { + ...last.data, + message: syntheticMessage, + }; const ctx = await preflightDiscordMessageImpl({ ...params, ackReactionScope, groupPolicy, abortSignal, - data: last.data, + data: syntheticData, client: last.client, }); if (!ctx) { + await commitDiscordInboundReplay({ replayKeys, replayGuard }); return; } - applyImplicitReplyBatchGate(ctx, params.replyToMode, false); - inboundWorker.enqueue(buildDiscordInboundJob(ctx)); - return; - } - const combinedBaseText = entries - .map((entry) => resolveDiscordMessageText(entry.data.message, { includeForwarded: false })) - .filter(Boolean) - .join("\n"); - const syntheticMessage = { - ...last.data.message, - content: combinedBaseText, - attachments: [], - message_snapshots: (last.data.message as { message_snapshots?: unknown }).message_snapshots, - messageSnapshots: (last.data.message as { messageSnapshots?: unknown }).messageSnapshots, - rawData: { - ...(last.data.message as { rawData?: Record }).rawData, - }, - }; - const syntheticData: DiscordMessageEvent = { - ...last.data, - message: syntheticMessage, - }; - const ctx = await preflightDiscordMessageImpl({ - ...params, - ackReactionScope, - groupPolicy, - abortSignal, - data: syntheticData, - client: last.client, - }); - if (!ctx) { - return; - } - applyImplicitReplyBatchGate(ctx, params.replyToMode, true); - if (entries.length > 1) { - const ids = entries.map((entry) => entry.data.message?.id).filter(Boolean) as string[]; - if (ids.length > 0) { - const ctxBatch = ctx as typeof ctx & { - MessageSids?: string[]; - MessageSidFirst?: string; - MessageSidLast?: string; - }; - ctxBatch.MessageSids = ids; - ctxBatch.MessageSidFirst = ids[0]; - ctxBatch.MessageSidLast = ids[ids.length - 1]; + applyImplicitReplyBatchGate(ctx, params.replyToMode, true); + if (entries.length > 1) { + const ids = entries.map((entry) => entry.data.message?.id).filter(Boolean) as string[]; + if (ids.length > 0) { + const ctxBatch = ctx as typeof ctx & { + MessageSids?: string[]; + MessageSidFirst?: string; + MessageSidLast?: string; + }; + ctxBatch.MessageSids = ids; + ctxBatch.MessageSidFirst = ids[0]; + ctxBatch.MessageSidLast = ids[ids.length - 1]; + } } + inboundWorker.enqueue(buildDiscordInboundJob(ctx, { replayKeys })); + } catch (error) { + if (error instanceof DiscordRetryableInboundError) { + releaseDiscordInboundReplay({ replayKeys, error, replayGuard }); + } else { + await commitDiscordInboundReplay({ replayKeys, replayGuard }); + } + throw error; } - inboundWorker.enqueue(buildDiscordInboundJob(ctx)); }, onError: (err) => { params.runtime.error?.(danger(`discord debounce flush failed: ${String(err)}`)); @@ -213,15 +218,25 @@ export function createDiscordMessageHandler( if (params.botUserId && msgAuthorId === params.botUserId) { return; } - const dedupeKey = buildDiscordInboundDedupeKey({ + const replayKey = buildDiscordInboundReplayKey({ accountId: params.accountId, data, }); - if (dedupeKey && recentInboundMessages.check(dedupeKey)) { + if ( + !(await claimDiscordInboundReplay({ + replayKey, + replayGuard, + })) + ) { return; } - await debouncer.enqueue({ data, client, abortSignal: options?.abortSignal }); + await debouncer.enqueue({ + data, + client, + abortSignal: options?.abortSignal, + replayKey: replayKey ?? undefined, + }); } catch (err) { params.runtime.error?.(danger(`handler failed: ${String(err)}`)); }