fix(discord): make inbound retries explicit

This commit is contained in:
Vincent Koc
2026-04-13 15:53:42 +01:00
parent 3792a39fd6
commit 01d49cf32f
6 changed files with 310 additions and 121 deletions

View File

@@ -0,0 +1,73 @@
import { createClaimableDedupe, type ClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe";
import type { DiscordMessageEvent } from "./listeners.js";
import { resolveDiscordMessageChannelId } from "./message-utils.js";
const RECENT_DISCORD_MESSAGE_TTL_MS = 5 * 60_000;
const RECENT_DISCORD_MESSAGE_MAX = 5000;
export function createDiscordInboundReplayGuard(): ClaimableDedupe {
return createClaimableDedupe({
ttlMs: RECENT_DISCORD_MESSAGE_TTL_MS,
memoryMaxSize: RECENT_DISCORD_MESSAGE_MAX,
});
}
export class DiscordRetryableInboundError extends Error {
constructor(message: string, options?: ErrorOptions) {
super(message, options);
this.name = "DiscordRetryableInboundError";
}
}
export function buildDiscordInboundReplayKey(params: {
accountId: string;
data: DiscordMessageEvent;
}): string | null {
const messageId = params.data.message?.id?.trim();
if (!messageId) {
return null;
}
const channelId = resolveDiscordMessageChannelId({
message: params.data.message,
eventChannelId: params.data.channel_id,
});
if (!channelId) {
return null;
}
return `${params.accountId}:${channelId}:${messageId}`;
}
export async function claimDiscordInboundReplay(params: {
replayKey?: string | null;
replayGuard: ClaimableDedupe;
}): Promise<boolean> {
const replayKey = params.replayKey?.trim();
if (!replayKey) {
return true;
}
const claim = await params.replayGuard.claim(replayKey);
return claim.kind === "claimed";
}
export async function commitDiscordInboundReplay(params: {
replayKeys?: readonly (string | null | undefined)[];
replayGuard: ClaimableDedupe;
}): Promise<void> {
const replayKeys = normalizeDiscordInboundReplayKeys(params.replayKeys);
await Promise.all(replayKeys.map((replayKey) => params.replayGuard.commit(replayKey)));
}
export function releaseDiscordInboundReplay(params: {
replayKeys?: readonly (string | null | undefined)[];
replayGuard: ClaimableDedupe;
error?: unknown;
}): void {
const replayKeys = normalizeDiscordInboundReplayKeys(params.replayKeys);
replayKeys.forEach((replayKey) => params.replayGuard.release(replayKey, { error: params.error }));
}
function normalizeDiscordInboundReplayKeys(
replayKeys?: readonly (string | null | undefined)[],
): string[] {
return [...new Set((replayKeys ?? []).map((replayKey) => replayKey?.trim()).filter(Boolean))];
}

View File

@@ -92,7 +92,7 @@ describe("buildDiscordInboundJob", () => {
it("re-materializes the process context with an overridden abort signal", async () => {
const ctx = await createBaseDiscordMessageContext();
const job = buildDiscordInboundJob(ctx);
const job = buildDiscordInboundJob(ctx, { replayKeys: ["default:ch-1:m-1"] });
const overrideAbortController = new AbortController();
const rematerialized = materializeDiscordInboundJob(job, overrideAbortController.signal);
@@ -103,6 +103,7 @@ describe("buildDiscordInboundJob", () => {
expect(rematerialized.abortSignal).toBe(overrideAbortController.signal);
expect(rematerialized.message).toEqual(job.payload.message);
expect(rematerialized.data).toEqual(job.payload.data);
expect(job.replayKeys).toEqual(["default:ch-1:m-1"]);
});
it("preserves Carbon message getters across queued jobs", async () => {

View File

@@ -22,6 +22,7 @@ export type DiscordInboundJob = {
queueKey: string;
payload: DiscordInboundJobPayload;
runtime: DiscordInboundJobRuntime;
replayKeys?: string[];
};
export function resolveDiscordInboundJobQueueKey(ctx: DiscordMessagePreflightContext): string {
@@ -36,7 +37,10 @@ export function resolveDiscordInboundJobQueueKey(ctx: DiscordMessagePreflightCon
return ctx.messageChannelId;
}
export function buildDiscordInboundJob(ctx: DiscordMessagePreflightContext): DiscordInboundJob {
export function buildDiscordInboundJob(
ctx: DiscordMessagePreflightContext,
options?: { replayKeys?: readonly string[] },
): DiscordInboundJob {
const {
runtime,
abortSignal,
@@ -70,6 +74,7 @@ export function buildDiscordInboundJob(ctx: DiscordMessagePreflightContext): Dis
threadBindings,
discordRestFetch,
},
replayKeys: options?.replayKeys ? [...options.replayKeys] : undefined,
};
}

View File

@@ -1,7 +1,14 @@
import { createRunStateMachine } from "openclaw/plugin-sdk/channel-lifecycle";
import { KeyedAsyncQueue } from "openclaw/plugin-sdk/keyed-async-queue";
import type { ClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe";
import { danger, formatDurationSeconds } from "openclaw/plugin-sdk/runtime-env";
import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime";
import {
commitDiscordInboundReplay,
createDiscordInboundReplayGuard,
DiscordRetryableInboundError,
releaseDiscordInboundReplay,
} from "./inbound-dedupe.js";
import { materializeDiscordInboundJob, type DiscordInboundJob } from "./inbound-job.js";
import type { RuntimeEnv } from "./message-handler.preflight.types.js";
import { processDiscordMessage } from "./message-handler.process.js";
@@ -15,6 +22,7 @@ type DiscordInboundWorkerParams = {
setStatus?: DiscordMonitorStatusSink;
abortSignal?: AbortSignal;
runTimeoutMs?: number;
replayGuard?: ClaimableDedupe;
__testing?: DiscordInboundWorkerTestingHooks;
};
@@ -46,6 +54,7 @@ async function processDiscordInboundJob(params: {
runtime: RuntimeEnv;
lifecycleSignal?: AbortSignal;
runTimeoutMs?: number;
replayGuard: ClaimableDedupe;
testing?: DiscordInboundWorkerTestingHooks;
}) {
const timeoutMs = normalizeDiscordInboundWorkerTimeoutMs(params.runTimeoutMs);
@@ -54,50 +63,70 @@ async function processDiscordInboundJob(params: {
let createdThreadId: string | undefined;
let sessionKey: string | undefined;
const processDiscordMessageImpl = params.testing?.processDiscordMessage ?? processDiscordMessage;
await runDiscordTaskWithTimeout({
run: async (abortSignal) => {
await processDiscordMessageImpl(materializeDiscordInboundJob(params.job, abortSignal), {
onFinalReplyStart: () => {
finalReplyStarted = true;
},
onFinalReplyDelivered: () => {
finalReplyStarted = true;
},
onReplyPlanResolved: (resolved) => {
createdThreadId = normalizeOptionalString(resolved.createdThreadId);
sessionKey = normalizeOptionalString(resolved.sessionKey);
},
try {
await runDiscordTaskWithTimeout({
run: async (abortSignal) => {
await processDiscordMessageImpl(materializeDiscordInboundJob(params.job, abortSignal), {
onFinalReplyStart: () => {
finalReplyStarted = true;
},
onFinalReplyDelivered: () => {
finalReplyStarted = true;
},
onReplyPlanResolved: (resolved) => {
createdThreadId = normalizeOptionalString(resolved.createdThreadId);
sessionKey = normalizeOptionalString(resolved.sessionKey);
},
});
},
timeoutMs,
abortSignals: [params.job.runtime.abortSignal, params.lifecycleSignal],
onTimeout: async (resolvedTimeoutMs) => {
params.runtime.error?.(
danger(
`discord inbound worker timed out after ${formatDurationSeconds(resolvedTimeoutMs, {
decimals: 1,
unit: "seconds",
})}${contextSuffix}`,
),
);
if (finalReplyStarted) {
return;
}
await sendDiscordInboundWorkerTimeoutReply({
job: params.job,
runtime: params.runtime,
contextSuffix,
createdThreadId,
sessionKey,
deliverDiscordReplyImpl: params.testing?.deliverDiscordReply,
});
},
onErrorAfterTimeout: (error) => {
params.runtime.error?.(
danger(`discord inbound worker failed after timeout: ${String(error)}${contextSuffix}`),
);
},
});
await commitDiscordInboundReplay({
replayKeys: params.job.replayKeys,
replayGuard: params.replayGuard,
});
} catch (error) {
if (error instanceof DiscordRetryableInboundError) {
releaseDiscordInboundReplay({
replayKeys: params.job.replayKeys,
error,
replayGuard: params.replayGuard,
});
},
timeoutMs,
abortSignals: [params.job.runtime.abortSignal, params.lifecycleSignal],
onTimeout: async (resolvedTimeoutMs) => {
params.runtime.error?.(
danger(
`discord inbound worker timed out after ${formatDurationSeconds(resolvedTimeoutMs, {
decimals: 1,
unit: "seconds",
})}${contextSuffix}`,
),
);
if (finalReplyStarted) {
return;
}
await sendDiscordInboundWorkerTimeoutReply({
job: params.job,
runtime: params.runtime,
contextSuffix,
createdThreadId,
sessionKey,
deliverDiscordReplyImpl: params.testing?.deliverDiscordReply,
} else {
await commitDiscordInboundReplay({
replayKeys: params.job.replayKeys,
replayGuard: params.replayGuard,
});
},
onErrorAfterTimeout: (error) => {
params.runtime.error?.(
danger(`discord inbound worker failed after timeout: ${String(error)}${contextSuffix}`),
);
},
});
}
throw error;
}
}
async function sendDiscordInboundWorkerTimeoutReply(params: {
@@ -163,6 +192,7 @@ export function createDiscordInboundWorker(
setStatus: params.setStatus,
abortSignal: params.abortSignal,
});
const replayGuard = params.replayGuard ?? createDiscordInboundReplayGuard();
return {
enqueue(job) {
@@ -181,6 +211,7 @@ export function createDiscordInboundWorker(
runtime: params.runtime,
lifecycleSignal: params.abortSignal,
runTimeoutMs: params.runTimeoutMs,
replayGuard,
testing: params.__testing,
});
} finally {

View File

@@ -1,4 +1,5 @@
import { describe, expect, it, vi } from "vitest";
import { DiscordRetryableInboundError } from "./inbound-dedupe.js";
import {
createDiscordMessageHandler,
preflightDiscordMessageMock,
@@ -252,6 +253,69 @@ describe("createDiscordMessageHandler queue behavior", () => {
expect(preflightDiscordMessageMock).toHaveBeenCalledTimes(1);
});
it("retries duplicate deliveries after an explicit retryable worker failure", async () => {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
processDiscordMessageMock
.mockRejectedValueOnce(new DiscordRetryableInboundError("retry me"))
.mockResolvedValueOnce(undefined);
const params = createDiscordHandlerParams();
const handler = createDiscordMessageHandler(params);
installDefaultDiscordPreflight();
const duplicate = createMessageData("m-retry");
await expect(handler(duplicate as never, {} as never)).resolves.toBeUndefined();
await vi.waitFor(() => {
expect(processDiscordMessageMock).toHaveBeenCalledTimes(1);
});
await vi.waitFor(() => {
expect(params.runtime.error).toHaveBeenCalledWith(
expect.stringContaining(
"discord inbound worker failed: DiscordRetryableInboundError: retry me",
),
);
});
await expect(handler(duplicate as never, {} as never)).resolves.toBeUndefined();
await vi.waitFor(() => {
expect(processDiscordMessageMock).toHaveBeenCalledTimes(2);
});
expect(preflightDiscordMessageMock).toHaveBeenCalledTimes(2);
});
it("keeps replay committed after a non-retryable worker failure", async () => {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
const visibleSideEffect = vi.fn();
processDiscordMessageMock.mockImplementationOnce(async () => {
visibleSideEffect();
throw new Error("post-send failure");
});
const params = createDiscordHandlerParams();
const handler = createDiscordMessageHandler(params);
installDefaultDiscordPreflight();
const duplicate = createMessageData("m-fail");
await expect(handler(duplicate as never, {} as never)).resolves.toBeUndefined();
await vi.waitFor(() => {
expect(processDiscordMessageMock).toHaveBeenCalledTimes(1);
});
await vi.waitFor(() => {
expect(params.runtime.error).toHaveBeenCalledWith(
expect.stringContaining("discord inbound worker failed: Error: post-send failure"),
);
});
await expect(handler(duplicate as never, {} as never)).resolves.toBeUndefined();
await Promise.resolve();
expect(processDiscordMessageMock).toHaveBeenCalledTimes(1);
expect(preflightDiscordMessageMock).toHaveBeenCalledTimes(1);
expect(visibleSideEffect).toHaveBeenCalledTimes(1);
});
it("applies explicit inbound worker timeout to queued runs so stalled runs do not block the queue", async () => {
vi.useFakeTimers();
try {

View File

@@ -4,8 +4,15 @@ import {
shouldDebounceTextInbound,
} from "openclaw/plugin-sdk/channel-inbound";
import { resolveOpenProviderRuntimeGroupPolicy } from "openclaw/plugin-sdk/config-runtime";
import { createDedupeCache } from "openclaw/plugin-sdk/infra-runtime";
import { danger } from "openclaw/plugin-sdk/runtime-env";
import {
buildDiscordInboundReplayKey,
claimDiscordInboundReplay,
commitDiscordInboundReplay,
createDiscordInboundReplayGuard,
DiscordRetryableInboundError,
releaseDiscordInboundReplay,
} from "./inbound-dedupe.js";
import { buildDiscordInboundJob } from "./inbound-job.js";
import {
createDiscordInboundWorker,
@@ -40,27 +47,6 @@ export type DiscordMessageHandlerWithLifecycle = DiscordMessageHandler & {
deactivate: () => void;
};
const RECENT_DISCORD_MESSAGE_TTL_MS = 5 * 60_000;
const RECENT_DISCORD_MESSAGE_MAX = 5000;
function buildDiscordInboundDedupeKey(params: {
accountId: string;
data: DiscordMessageEvent;
}): string | null {
const messageId = params.data.message?.id?.trim();
if (!messageId) {
return null;
}
const channelId = resolveDiscordMessageChannelId({
message: params.data.message,
eventChannelId: params.data.channel_id,
});
if (!channelId) {
return null;
}
return `${params.accountId}:${channelId}:${messageId}`;
}
export function createDiscordMessageHandler(
params: DiscordMessageHandlerParams,
): DiscordMessageHandlerWithLifecycle {
@@ -75,22 +61,21 @@ export function createDiscordMessageHandler(
"group-mentions";
const preflightDiscordMessageImpl =
params.__testing?.preflightDiscordMessage ?? preflightDiscordMessage;
const replayGuard = createDiscordInboundReplayGuard();
const inboundWorker = createDiscordInboundWorker({
runtime: params.runtime,
setStatus: params.setStatus,
abortSignal: params.abortSignal,
runTimeoutMs: params.workerRunTimeoutMs,
replayGuard,
__testing: params.__testing,
});
const recentInboundMessages = createDedupeCache({
ttlMs: RECENT_DISCORD_MESSAGE_TTL_MS,
maxSize: RECENT_DISCORD_MESSAGE_MAX,
});
const { debouncer } = createChannelInboundDebouncer<{
data: DiscordMessageEvent;
client: Client;
abortSignal?: AbortSignal;
replayKey?: string;
}>({
cfg: params.cfg,
channel: "discord",
@@ -129,70 +114,90 @@ export function createDiscordMessageHandler(
if (!last) {
return;
}
const replayKeys = entries.map((entry) => entry.replayKey).filter(Boolean);
const abortSignal = last.abortSignal;
if (abortSignal?.aborted) {
releaseDiscordInboundReplay({
replayKeys,
error: abortSignal.reason,
replayGuard,
});
return;
}
if (entries.length === 1) {
try {
if (entries.length === 1) {
const ctx = await preflightDiscordMessageImpl({
...params,
ackReactionScope,
groupPolicy,
abortSignal,
data: last.data,
client: last.client,
});
if (!ctx) {
await commitDiscordInboundReplay({ replayKeys, replayGuard });
return;
}
applyImplicitReplyBatchGate(ctx, params.replyToMode, false);
inboundWorker.enqueue(buildDiscordInboundJob(ctx, { replayKeys }));
return;
}
const combinedBaseText = entries
.map((entry) =>
resolveDiscordMessageText(entry.data.message, { includeForwarded: false }),
)
.filter(Boolean)
.join("\n");
const syntheticMessage = {
...last.data.message,
content: combinedBaseText,
attachments: [],
message_snapshots: (last.data.message as { message_snapshots?: unknown })
.message_snapshots,
messageSnapshots: (last.data.message as { messageSnapshots?: unknown }).messageSnapshots,
rawData: {
...(last.data.message as { rawData?: Record<string, unknown> }).rawData,
},
};
const syntheticData: DiscordMessageEvent = {
...last.data,
message: syntheticMessage,
};
const ctx = await preflightDiscordMessageImpl({
...params,
ackReactionScope,
groupPolicy,
abortSignal,
data: last.data,
data: syntheticData,
client: last.client,
});
if (!ctx) {
await commitDiscordInboundReplay({ replayKeys, replayGuard });
return;
}
applyImplicitReplyBatchGate(ctx, params.replyToMode, false);
inboundWorker.enqueue(buildDiscordInboundJob(ctx));
return;
}
const combinedBaseText = entries
.map((entry) => resolveDiscordMessageText(entry.data.message, { includeForwarded: false }))
.filter(Boolean)
.join("\n");
const syntheticMessage = {
...last.data.message,
content: combinedBaseText,
attachments: [],
message_snapshots: (last.data.message as { message_snapshots?: unknown }).message_snapshots,
messageSnapshots: (last.data.message as { messageSnapshots?: unknown }).messageSnapshots,
rawData: {
...(last.data.message as { rawData?: Record<string, unknown> }).rawData,
},
};
const syntheticData: DiscordMessageEvent = {
...last.data,
message: syntheticMessage,
};
const ctx = await preflightDiscordMessageImpl({
...params,
ackReactionScope,
groupPolicy,
abortSignal,
data: syntheticData,
client: last.client,
});
if (!ctx) {
return;
}
applyImplicitReplyBatchGate(ctx, params.replyToMode, true);
if (entries.length > 1) {
const ids = entries.map((entry) => entry.data.message?.id).filter(Boolean) as string[];
if (ids.length > 0) {
const ctxBatch = ctx as typeof ctx & {
MessageSids?: string[];
MessageSidFirst?: string;
MessageSidLast?: string;
};
ctxBatch.MessageSids = ids;
ctxBatch.MessageSidFirst = ids[0];
ctxBatch.MessageSidLast = ids[ids.length - 1];
applyImplicitReplyBatchGate(ctx, params.replyToMode, true);
if (entries.length > 1) {
const ids = entries.map((entry) => entry.data.message?.id).filter(Boolean) as string[];
if (ids.length > 0) {
const ctxBatch = ctx as typeof ctx & {
MessageSids?: string[];
MessageSidFirst?: string;
MessageSidLast?: string;
};
ctxBatch.MessageSids = ids;
ctxBatch.MessageSidFirst = ids[0];
ctxBatch.MessageSidLast = ids[ids.length - 1];
}
}
inboundWorker.enqueue(buildDiscordInboundJob(ctx, { replayKeys }));
} catch (error) {
if (error instanceof DiscordRetryableInboundError) {
releaseDiscordInboundReplay({ replayKeys, error, replayGuard });
} else {
await commitDiscordInboundReplay({ replayKeys, replayGuard });
}
throw error;
}
inboundWorker.enqueue(buildDiscordInboundJob(ctx));
},
onError: (err) => {
params.runtime.error?.(danger(`discord debounce flush failed: ${String(err)}`));
@@ -213,15 +218,25 @@ export function createDiscordMessageHandler(
if (params.botUserId && msgAuthorId === params.botUserId) {
return;
}
const dedupeKey = buildDiscordInboundDedupeKey({
const replayKey = buildDiscordInboundReplayKey({
accountId: params.accountId,
data,
});
if (dedupeKey && recentInboundMessages.check(dedupeKey)) {
if (
!(await claimDiscordInboundReplay({
replayKey,
replayGuard,
}))
) {
return;
}
await debouncer.enqueue({ data, client, abortSignal: options?.abortSignal });
await debouncer.enqueue({
data,
client,
abortSignal: options?.abortSignal,
replayKey: replayKey ?? undefined,
});
} catch (err) {
params.runtime.error?.(danger(`handler failed: ${String(err)}`));
}