From d70e6b13d7e15ce7cf2e8ca22e28e7c1b9d38ab7 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Mon, 13 Apr 2026 15:46:01 +0100 Subject: [PATCH] fix(whatsapp): make inbound retries explicit --- extensions/whatsapp/src/inbound/dedupe.ts | 27 +++++- extensions/whatsapp/src/inbound/monitor.ts | 85 +++++++++++++------ ...tor-inbox.streams-inbound-messages.test.ts | 28 ++++++ 3 files changed, 110 insertions(+), 30 deletions(-) diff --git a/extensions/whatsapp/src/inbound/dedupe.ts b/extensions/whatsapp/src/inbound/dedupe.ts index d558513734e..676717fa137 100644 --- a/extensions/whatsapp/src/inbound/dedupe.ts +++ b/extensions/whatsapp/src/inbound/dedupe.ts @@ -1,4 +1,5 @@ import { createDedupeCache } from "openclaw/plugin-sdk/core"; +import { createClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe"; const RECENT_WEB_MESSAGE_TTL_MS = 20 * 60_000; const RECENT_WEB_MESSAGE_MAX = 5000; @@ -9,11 +10,22 @@ const recentInboundMessages = createDedupeCache({ ttlMs: RECENT_WEB_MESSAGE_TTL_MS, maxSize: RECENT_WEB_MESSAGE_MAX, }); +const claimableInboundMessages = createClaimableDedupe({ + ttlMs: RECENT_WEB_MESSAGE_TTL_MS, + memoryMaxSize: RECENT_WEB_MESSAGE_MAX, +}); const recentOutboundMessages = createDedupeCache({ ttlMs: RECENT_OUTBOUND_MESSAGE_TTL_MS, maxSize: RECENT_OUTBOUND_MESSAGE_MAX, }); +export class WhatsAppRetryableInboundError extends Error { + constructor(message: string, options?: ErrorOptions) { + super(message, options); + this.name = "WhatsAppRetryableInboundError"; + } +} + function buildMessageKey(params: { accountId: string; remoteJid: string; @@ -30,11 +42,22 @@ function buildMessageKey(params: { export function resetWebInboundDedupe(): void { recentInboundMessages.clear(); + claimableInboundMessages.clearMemory(); recentOutboundMessages.clear(); } -export function isRecentInboundMessage(key: string): boolean { - return recentInboundMessages.check(key); +export async function claimRecentInboundMessage(key: string): Promise { + const claim = await claimableInboundMessages.claim(key); + return claim.kind === "claimed"; +} + +export async function commitRecentInboundMessage(key: string): Promise { + await claimableInboundMessages.commit(key); + recentInboundMessages.check(key); +} + +export function releaseRecentInboundMessage(key: string, error?: unknown): void { + claimableInboundMessages.release(key, { error }); } export function rememberRecentOutboundMessage(params: { diff --git a/extensions/whatsapp/src/inbound/monitor.ts b/extensions/whatsapp/src/inbound/monitor.ts index 50985db772e..69757f5631d 100644 --- a/extensions/whatsapp/src/inbound/monitor.ts +++ b/extensions/whatsapp/src/inbound/monitor.ts @@ -11,9 +11,12 @@ import { createWaSocket, formatError, getStatusCode, waitForWaConnection } from import { resolveJidToE164 } from "../text-runtime.js"; import { checkInboundAccessControl } from "./access-control.js"; import { - isRecentInboundMessage, + claimRecentInboundMessage, + commitRecentInboundMessage, isRecentOutboundMessage, + releaseRecentInboundMessage, rememberRecentOutboundMessage, + WhatsAppRetryableInboundError, } from "./dedupe.js"; import { describeReplyContext, @@ -120,7 +123,26 @@ export async function attachWebInboxToSocket( options.authDir, sock.user as { id?: string | null; lid?: string | null } | undefined, ); - const debouncer = createInboundDebouncer({ + type QueuedInboundMessage = WebInboundMessage & { + dedupeKey?: string; + }; + + const finalizeInboundDedupe = async ( + entries: QueuedInboundMessage[], + error?: unknown, + ): Promise => { + const dedupeKeys = [...new Set(entries.map((entry) => entry.dedupeKey).filter(Boolean))]; + if (dedupeKeys.length === 0) { + return; + } + if (error instanceof WhatsAppRetryableInboundError) { + dedupeKeys.forEach((dedupeKey) => releaseRecentInboundMessage(dedupeKey, error)); + return; + } + await Promise.all(dedupeKeys.map((dedupeKey) => commitRecentInboundMessage(dedupeKey))); + }; + + const debouncer = createInboundDebouncer({ debounceMs: options.debounceMs ?? 0, buildKey: (msg) => { const sender = msg.sender; @@ -144,27 +166,34 @@ export async function attachWebInboxToSocket( if (!last) { return; } - if (entries.length === 1) { - await options.onMessage(last); - return; - } - const mentioned = new Set(); - for (const entry of entries) { - for (const jid of entry.mentions ?? entry.mentionedJids ?? []) { - mentioned.add(jid); + try { + if (entries.length === 1) { + await options.onMessage(last); + await finalizeInboundDedupe(entries); + return; } + const mentioned = new Set(); + for (const entry of entries) { + for (const jid of entry.mentions ?? entry.mentionedJids ?? []) { + mentioned.add(jid); + } + } + const combinedBody = entries + .map((entry) => entry.body) + .filter(Boolean) + .join("\n"); + const combinedMessage: WebInboundMessage = { + ...last, + body: combinedBody, + mentions: mentioned.size > 0 ? Array.from(mentioned) : undefined, + mentionedJids: mentioned.size > 0 ? Array.from(mentioned) : undefined, + }; + await options.onMessage(combinedMessage); + await finalizeInboundDedupe(entries); + } catch (error) { + await finalizeInboundDedupe(entries, error); + throw error; } - const combinedBody = entries - .map((entry) => entry.body) - .filter(Boolean) - .join("\n"); - const combinedMessage: WebInboundMessage = { - ...last, - body: combinedBody, - mentions: mentioned.size > 0 ? Array.from(mentioned) : undefined, - mentionedJids: mentioned.size > 0 ? Array.from(mentioned) : undefined, - }; - await options.onMessage(combinedMessage); }, onError: (err) => { inboundLogger.error({ error: String(err) }, "failed handling inbound web message"); @@ -306,12 +335,6 @@ export async function attachWebInboxToSocket( logVerbose(`Skipping recent outbound WhatsApp echo ${id} for ${remoteJid}`); return null; } - if (id) { - const dedupeKey = `${options.accountId}:${remoteJid}:${id}`; - if (isRecentInboundMessage(dedupeKey)) { - return null; - } - } const participantJid = msg.key?.participant ?? undefined; const from = group ? remoteJid : await resolveInboundJid(remoteJid); if (!from) { @@ -482,7 +505,7 @@ export async function attachWebInboxToSocket( }, "inbound message", ); - const inboundMessage: WebInboundMessage = { + const inboundMessage: QueuedInboundMessage = { id: inbound.id, from: inbound.from, conversationId: inbound.from, @@ -523,6 +546,7 @@ export async function attachWebInboxToSocket( mediaPath: enriched.mediaPath, mediaType: enriched.mediaType, mediaFileName: enriched.mediaFileName, + dedupeKey: inbound.id ? `${options.accountId}:${inbound.remoteJid}:${inbound.id}` : undefined, }; try { const task = Promise.resolve(debouncer.enqueue(inboundMessage)); @@ -569,6 +593,11 @@ export async function attachWebInboxToSocket( continue; } + const dedupeKey = inbound.id ? `${options.accountId}:${inbound.remoteJid}:${inbound.id}` : ""; + if (dedupeKey && !(await claimRecentInboundMessage(dedupeKey))) { + continue; + } + await enqueueInboundMessage(msg, inbound, enriched); } }; diff --git a/extensions/whatsapp/src/monitor-inbox.streams-inbound-messages.test.ts b/extensions/whatsapp/src/monitor-inbox.streams-inbound-messages.test.ts index c39c755c771..fa40c36b39f 100644 --- a/extensions/whatsapp/src/monitor-inbox.streams-inbound-messages.test.ts +++ b/extensions/whatsapp/src/monitor-inbox.streams-inbound-messages.test.ts @@ -2,6 +2,7 @@ import fsSync from "node:fs"; import path from "node:path"; import "./monitor-inbox.test-harness.js"; import { beforeEach, describe, expect, it, vi } from "vitest"; +import { WhatsAppRetryableInboundError } from "./inbound/dedupe.js"; import { type InboxMonitorOptions, InboxOnMessage, @@ -458,6 +459,33 @@ describe("web monitor inbox", () => { await listener.close(); }); + it("retries redelivered messages after an explicit retryable inbound failure", async () => { + let attempts = 0; + const onMessage = vi.fn(async () => { + attempts += 1; + if (attempts === 1) { + throw new WhatsAppRetryableInboundError("retry me"); + } + }); + + const { listener, sock } = await startInboxMonitor(onMessage as InboxOnMessage); + const upsert = buildNotifyMessageUpsert({ + id: nextMessageId("retryable-dedupe"), + remoteJid: "999@s.whatsapp.net", + text: "ping", + timestamp: 1_700_000_000, + pushName: "Tester", + }); + + sock.ev.emit("messages.upsert", upsert); + await waitForMessageCalls(onMessage, 1); + + sock.ev.emit("messages.upsert", upsert); + await waitForMessageCalls(onMessage, 2); + + await listener.close(); + }); + it("resolves LID JIDs using Baileys LID mapping store", async () => { const onMessage = vi.fn(async () => { return;