fix(whatsapp): make inbound retries explicit

This commit is contained in:
Vincent Koc
2026-04-13 15:46:01 +01:00
parent fad06f7c21
commit d70e6b13d7
3 changed files with 110 additions and 30 deletions

View File

@@ -1,4 +1,5 @@
import { createDedupeCache } from "openclaw/plugin-sdk/core";
import { createClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe";
const RECENT_WEB_MESSAGE_TTL_MS = 20 * 60_000;
const RECENT_WEB_MESSAGE_MAX = 5000;
@@ -9,11 +10,22 @@ const recentInboundMessages = createDedupeCache({
ttlMs: RECENT_WEB_MESSAGE_TTL_MS,
maxSize: RECENT_WEB_MESSAGE_MAX,
});
const claimableInboundMessages = createClaimableDedupe({
ttlMs: RECENT_WEB_MESSAGE_TTL_MS,
memoryMaxSize: RECENT_WEB_MESSAGE_MAX,
});
const recentOutboundMessages = createDedupeCache({
ttlMs: RECENT_OUTBOUND_MESSAGE_TTL_MS,
maxSize: RECENT_OUTBOUND_MESSAGE_MAX,
});
export class WhatsAppRetryableInboundError extends Error {
constructor(message: string, options?: ErrorOptions) {
super(message, options);
this.name = "WhatsAppRetryableInboundError";
}
}
function buildMessageKey(params: {
accountId: string;
remoteJid: string;
@@ -30,11 +42,22 @@ function buildMessageKey(params: {
export function resetWebInboundDedupe(): void {
recentInboundMessages.clear();
claimableInboundMessages.clearMemory();
recentOutboundMessages.clear();
}
export function isRecentInboundMessage(key: string): boolean {
return recentInboundMessages.check(key);
export async function claimRecentInboundMessage(key: string): Promise<boolean> {
const claim = await claimableInboundMessages.claim(key);
return claim.kind === "claimed";
}
export async function commitRecentInboundMessage(key: string): Promise<void> {
await claimableInboundMessages.commit(key);
recentInboundMessages.check(key);
}
export function releaseRecentInboundMessage(key: string, error?: unknown): void {
claimableInboundMessages.release(key, { error });
}
export function rememberRecentOutboundMessage(params: {

View File

@@ -11,9 +11,12 @@ import { createWaSocket, formatError, getStatusCode, waitForWaConnection } from
import { resolveJidToE164 } from "../text-runtime.js";
import { checkInboundAccessControl } from "./access-control.js";
import {
isRecentInboundMessage,
claimRecentInboundMessage,
commitRecentInboundMessage,
isRecentOutboundMessage,
releaseRecentInboundMessage,
rememberRecentOutboundMessage,
WhatsAppRetryableInboundError,
} from "./dedupe.js";
import {
describeReplyContext,
@@ -120,7 +123,26 @@ export async function attachWebInboxToSocket(
options.authDir,
sock.user as { id?: string | null; lid?: string | null } | undefined,
);
const debouncer = createInboundDebouncer<WebInboundMessage>({
type QueuedInboundMessage = WebInboundMessage & {
dedupeKey?: string;
};
const finalizeInboundDedupe = async (
entries: QueuedInboundMessage[],
error?: unknown,
): Promise<void> => {
const dedupeKeys = [...new Set(entries.map((entry) => entry.dedupeKey).filter(Boolean))];
if (dedupeKeys.length === 0) {
return;
}
if (error instanceof WhatsAppRetryableInboundError) {
dedupeKeys.forEach((dedupeKey) => releaseRecentInboundMessage(dedupeKey, error));
return;
}
await Promise.all(dedupeKeys.map((dedupeKey) => commitRecentInboundMessage(dedupeKey)));
};
const debouncer = createInboundDebouncer<QueuedInboundMessage>({
debounceMs: options.debounceMs ?? 0,
buildKey: (msg) => {
const sender = msg.sender;
@@ -144,27 +166,34 @@ export async function attachWebInboxToSocket(
if (!last) {
return;
}
if (entries.length === 1) {
await options.onMessage(last);
return;
}
const mentioned = new Set<string>();
for (const entry of entries) {
for (const jid of entry.mentions ?? entry.mentionedJids ?? []) {
mentioned.add(jid);
try {
if (entries.length === 1) {
await options.onMessage(last);
await finalizeInboundDedupe(entries);
return;
}
const mentioned = new Set<string>();
for (const entry of entries) {
for (const jid of entry.mentions ?? entry.mentionedJids ?? []) {
mentioned.add(jid);
}
}
const combinedBody = entries
.map((entry) => entry.body)
.filter(Boolean)
.join("\n");
const combinedMessage: WebInboundMessage = {
...last,
body: combinedBody,
mentions: mentioned.size > 0 ? Array.from(mentioned) : undefined,
mentionedJids: mentioned.size > 0 ? Array.from(mentioned) : undefined,
};
await options.onMessage(combinedMessage);
await finalizeInboundDedupe(entries);
} catch (error) {
await finalizeInboundDedupe(entries, error);
throw error;
}
const combinedBody = entries
.map((entry) => entry.body)
.filter(Boolean)
.join("\n");
const combinedMessage: WebInboundMessage = {
...last,
body: combinedBody,
mentions: mentioned.size > 0 ? Array.from(mentioned) : undefined,
mentionedJids: mentioned.size > 0 ? Array.from(mentioned) : undefined,
};
await options.onMessage(combinedMessage);
},
onError: (err) => {
inboundLogger.error({ error: String(err) }, "failed handling inbound web message");
@@ -306,12 +335,6 @@ export async function attachWebInboxToSocket(
logVerbose(`Skipping recent outbound WhatsApp echo ${id} for ${remoteJid}`);
return null;
}
if (id) {
const dedupeKey = `${options.accountId}:${remoteJid}:${id}`;
if (isRecentInboundMessage(dedupeKey)) {
return null;
}
}
const participantJid = msg.key?.participant ?? undefined;
const from = group ? remoteJid : await resolveInboundJid(remoteJid);
if (!from) {
@@ -482,7 +505,7 @@ export async function attachWebInboxToSocket(
},
"inbound message",
);
const inboundMessage: WebInboundMessage = {
const inboundMessage: QueuedInboundMessage = {
id: inbound.id,
from: inbound.from,
conversationId: inbound.from,
@@ -523,6 +546,7 @@ export async function attachWebInboxToSocket(
mediaPath: enriched.mediaPath,
mediaType: enriched.mediaType,
mediaFileName: enriched.mediaFileName,
dedupeKey: inbound.id ? `${options.accountId}:${inbound.remoteJid}:${inbound.id}` : undefined,
};
try {
const task = Promise.resolve(debouncer.enqueue(inboundMessage));
@@ -569,6 +593,11 @@ export async function attachWebInboxToSocket(
continue;
}
const dedupeKey = inbound.id ? `${options.accountId}:${inbound.remoteJid}:${inbound.id}` : "";
if (dedupeKey && !(await claimRecentInboundMessage(dedupeKey))) {
continue;
}
await enqueueInboundMessage(msg, inbound, enriched);
}
};

View File

@@ -2,6 +2,7 @@ import fsSync from "node:fs";
import path from "node:path";
import "./monitor-inbox.test-harness.js";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { WhatsAppRetryableInboundError } from "./inbound/dedupe.js";
import {
type InboxMonitorOptions,
InboxOnMessage,
@@ -458,6 +459,33 @@ describe("web monitor inbox", () => {
await listener.close();
});
it("retries redelivered messages after an explicit retryable inbound failure", async () => {
let attempts = 0;
const onMessage = vi.fn(async () => {
attempts += 1;
if (attempts === 1) {
throw new WhatsAppRetryableInboundError("retry me");
}
});
const { listener, sock } = await startInboxMonitor(onMessage as InboxOnMessage);
const upsert = buildNotifyMessageUpsert({
id: nextMessageId("retryable-dedupe"),
remoteJid: "999@s.whatsapp.net",
text: "ping",
timestamp: 1_700_000_000,
pushName: "Tester",
});
sock.ev.emit("messages.upsert", upsert);
await waitForMessageCalls(onMessage, 1);
sock.ev.emit("messages.upsert", upsert);
await waitForMessageCalls(onMessage, 2);
await listener.close();
});
it("resolves LID JIDs using Baileys LID mapping store", async () => {
const onMessage = vi.fn(async () => {
return;