diff --git a/CHANGELOG.md b/CHANGELOG.md index 85916a30d9e..b0f6c76c617 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ Docs: https://docs.openclaw.ai +## Unreleased + +### Fixes + +- WhatsApp: drain pending outbound deliveries on a 30s periodic timer in addition to the reconnect handler, so messages enqueued while the provider is already connected no longer wait for the next reconnect to send. (#79083) Thanks @Oviemudiaga. + ## 2026.5.19 ### Changes diff --git a/extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts b/extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts index 0e9f54dd66d..76a54a20d01 100644 --- a/extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts +++ b/extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts @@ -27,6 +27,25 @@ import { startWebAutoReplyMonitor, } from "./auto-reply.test-harness.js"; +type DrainSelectionEntry = { + channel: string; + accountId?: string | null; + lastError?: string; +}; +type DrainPendingDeliveriesCall = { + drainKey: string; + logLabel: string; + selectEntry: (entry: DrainSelectionEntry) => { match: boolean; bypassBackoff: boolean }; +}; + +const deliveryQueueMocks = vi.hoisted(() => ({ + drainPendingDeliveries: vi.fn(async (_opts: unknown) => undefined), +})); + +vi.mock("openclaw/plugin-sdk/delivery-queue-runtime", () => ({ + drainPendingDeliveries: deliveryQueueMocks.drainPendingDeliveries, +})); + installWebAutoReplyTestHomeHooks(); function requireOnMessage( @@ -247,6 +266,78 @@ describe("web auto-reply connection", () => { expect(sleep).toHaveBeenCalled(); }); + it("drains pending deliveries while connected and stops after close", async () => { + vi.useFakeTimers(); + try { + const sleep = vi.fn(async () => {}); + const scripted = createScriptedWebListenerFactory(); + const { controller, run } = startWebAutoReplyMonitor({ + monitorWebChannelFn: monitorWebChannel as never, + listenerFactory: scripted.listenerFactory, + sleep, + accountId: "work", + }); + + await vi.waitFor( + () => { + expect(scripted.getListenerCount()).toBe(1); + }, + { timeout: 250, interval: 2 }, + ); + expect(deliveryQueueMocks.drainPendingDeliveries).toHaveBeenCalledWith( + expect.objectContaining({ + drainKey: "whatsapp:work", + logLabel: "WhatsApp reconnect drain", + }), + ); + + deliveryQueueMocks.drainPendingDeliveries.mockClear(); + await vi.advanceTimersByTimeAsync(30_000); + await vi.waitFor(() => { + expect(deliveryQueueMocks.drainPendingDeliveries).toHaveBeenCalledTimes(1); + }); + + const periodicCall = deliveryQueueMocks.drainPendingDeliveries.mock.calls.at(-1)?.[0] as + | DrainPendingDeliveriesCall + | undefined; + expect(periodicCall).toBeDefined(); + if (!periodicCall) { + throw new Error("Expected WhatsApp periodic drain call"); + } + expect(periodicCall.drainKey).toBe("whatsapp:work"); + expect(periodicCall.logLabel).toBe("WhatsApp periodic drain"); + expect( + periodicCall.selectEntry({ + channel: "whatsapp", + accountId: "work", + }), + ).toEqual({ match: true, bypassBackoff: false }); + expect( + periodicCall.selectEntry({ + channel: "whatsapp", + accountId: "default", + }), + ).toEqual({ match: false, bypassBackoff: false }); + expect( + periodicCall.selectEntry({ + channel: "telegram", + accountId: "work", + }), + ).toEqual({ match: false, bypassBackoff: false }); + + controller.abort(); + scripted.resolveClose(0, { status: 499, isLoggedOut: false, error: "aborted" }); + await Promise.resolve(); + await run; + + deliveryQueueMocks.drainPendingDeliveries.mockClear(); + await vi.advanceTimersByTimeAsync(30_000); + expect(deliveryQueueMocks.drainPendingDeliveries).not.toHaveBeenCalled(); + } finally { + vi.useRealTimers(); + } + }); + it("treats status 440 as non-retryable and stops without retrying", async () => { const sleep = vi.fn(async () => {}); const scripted = createScriptedWebListenerFactory(); diff --git a/extensions/whatsapp/src/auto-reply/monitor.ts b/extensions/whatsapp/src/auto-reply/monitor.ts index 1990fd89bf1..795112c65bb 100644 --- a/extensions/whatsapp/src/auto-reply/monitor.ts +++ b/extensions/whatsapp/src/auto-reply/monitor.ts @@ -541,17 +541,40 @@ export async function monitorWebChannel( ); }); + const periodicDrainInterval = setInterval(() => { + void drainPendingDeliveries({ + drainKey: `whatsapp:${normalizedAccountId}`, + logLabel: "WhatsApp periodic drain", + cfg, + log: reconnectLogger, + selectEntry: (entry) => ({ + match: + entry.channel === "whatsapp" && + normalizeReconnectAccountId(entry.accountId) === normalizedAccountId, + bypassBackoff: false, + }), + }).catch((err) => { + reconnectLogger.warn( + { connectionId: connection.connectionId, error: String(err) }, + "periodic drain failed", + ); + }); + }, 30_000); + whatsappLog.info("Listening for personal WhatsApp inbound messages."); if (process.stdout.isTTY || process.stderr.isTTY) { whatsappLog.raw("Ctrl+C to stop."); } if (!keepAlive) { + clearInterval(periodicDrainInterval); await controller.shutdown(); return; } - const reason = await controller.waitForClose(); + const reason = await controller + .waitForClose() + .finally(() => clearInterval(periodicDrainInterval)); if (stopRequested() || sigintStop || reason === "aborted") { await controller.shutdown(); break;