diff --git a/CHANGELOG.md b/CHANGELOG.md index c329cfdb6a7..54e7abee858 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ Docs: https://docs.openclaw.ai - fix(exec): replace TOCTOU check-then-read with atomic pinned-fd open in script preflight [AI]. (#62333) Thanks @pgondhi987. - WhatsApp/auto-reply: keep inbound reply, media, and composing sends on the current socket across reconnects, wait through reconnect gaps, and retry timeout-only send failures without dropping the active socket ref. (#62892) Thanks @mcaxtr. - Config/plugins: let config writes keep disabled plugin entries without forcing required plugin config schemas or crashing raw plugin validation, so slot switches and similar plugin-state updates persist cleanly. (#63296) Thanks @fuller-stack-dev. +- WhatsApp/outbound queue: drain queued WhatsApp deliveries when the listener reconnects without dropping reconnect-delayed sends after a special TTL or rewriting retry history, so disconnect-window outbound messages can recover once the channel is ready again. (#46299) Thanks @manuel-claw. ## 2026.4.9 diff --git a/extensions/whatsapp/src/auto-reply/monitor.ts b/extensions/whatsapp/src/auto-reply/monitor.ts index 5dd584837ce..ffc1a84d564 100644 --- a/extensions/whatsapp/src/auto-reply/monitor.ts +++ b/extensions/whatsapp/src/auto-reply/monitor.ts @@ -3,6 +3,7 @@ import { resolveInboundDebounceMs } from "openclaw/plugin-sdk/channel-inbound"; import { formatCliCommand } from "openclaw/plugin-sdk/cli-runtime"; import { waitForever } from "openclaw/plugin-sdk/cli-runtime"; import { hasControlCommand } from "openclaw/plugin-sdk/command-detection"; +import { drainReconnectQueue } from "openclaw/plugin-sdk/infra-runtime"; import { enqueueSystemEvent } from "openclaw/plugin-sdk/infra-runtime"; import { DEFAULT_GROUP_HISTORY_LIMIT } from "openclaw/plugin-sdk/reply-history"; import { resolveAgentRoute } from "openclaw/plugin-sdk/routing"; @@ -259,6 +260,19 @@ export async function monitorWebChannel( }); setActiveWebListener(account.accountId, listener); + + // Drain any messages that failed with "no listener" during the disconnect window. + void drainReconnectQueue({ + accountId: account.accountId, + cfg, + log: reconnectLogger, + }).catch((err) => { + reconnectLogger.warn( + { connectionId: active.connectionId, error: String(err) }, + "reconnect drain failed", + ); + }); + active.unregisterUnhandled = registerUnhandledRejectionHandler((reason) => { if (!isLikelyWhatsAppCryptoError(reason)) { return false; diff --git a/src/infra/outbound/delivery-queue-recovery.ts b/src/infra/outbound/delivery-queue-recovery.ts index 82dfc188f7b..bda76a1249d 100644 --- a/src/infra/outbound/delivery-queue-recovery.ts +++ b/src/infra/outbound/delivery-queue-recovery.ts @@ -54,6 +54,30 @@ const PERMANENT_ERROR_PATTERNS: readonly RegExp[] = [ /User .* not in room/i, ]; +const NO_LISTENER_ERROR_RE = /No active WhatsApp Web listener/i; + +const drainInProgress = new Map(); +const entriesInProgress = new Set(); + +type DeliverRuntimeModule = typeof import("./deliver-runtime.js"); + +let deliverRuntimePromise: Promise | null = null; + +function loadDeliverRuntime() { + deliverRuntimePromise ??= import("./deliver-runtime.js"); + return deliverRuntimePromise; +} + +function normalizeQueueAccountId(accountId?: string): string { + return (accountId ?? "").trim() || "default"; +} + +function getErrnoCode(err: unknown): string | null { + return err && typeof err === "object" && "code" in err + ? String((err as { code?: unknown }).code) + : null; +} + function createEmptyRecoverySummary(): RecoverySummary { return { recovered: 0, @@ -63,6 +87,18 @@ function createEmptyRecoverySummary(): RecoverySummary { }; } +function claimRecoveryEntry(entryId: string): boolean { + if (entriesInProgress.has(entryId)) { + return false; + } + entriesInProgress.add(entryId); + return true; +} + +function releaseRecoveryEntry(entryId: string): void { + entriesInProgress.delete(entryId); +} + function buildRecoveryDeliverParams(entry: QueuedDelivery, cfg: OpenClawConfig) { return { cfg, @@ -143,6 +179,85 @@ export function isPermanentDeliveryError(error: string): boolean { return PERMANENT_ERROR_PATTERNS.some((re) => re.test(error)); } +export async function drainReconnectQueue(opts: { + accountId: string; + cfg: OpenClawConfig; + log: RecoveryLogger; + stateDir?: string; + deliver?: DeliverFn; +}): Promise { + if (drainInProgress.get(opts.accountId)) { + opts.log.info( + `WhatsApp reconnect drain: already in progress for account ${opts.accountId}, skipping`, + ); + return; + } + + drainInProgress.set(opts.accountId, true); + try { + const matchingEntries = (await loadPendingDeliveries(opts.stateDir)) + .filter( + (entry) => + entry.channel === "whatsapp" && + normalizeQueueAccountId(entry.accountId) === opts.accountId && + typeof entry.lastError === "string" && + NO_LISTENER_ERROR_RE.test(entry.lastError), + ) + .toSorted((a, b) => a.enqueuedAt - b.enqueuedAt); + + if (matchingEntries.length === 0) { + return; + } + + opts.log.info( + `WhatsApp reconnect drain: ${matchingEntries.length} pending message(s) for account ${opts.accountId}`, + ); + + const deliver = opts.deliver ?? (await loadDeliverRuntime()).deliverOutboundPayloads; + + for (const entry of matchingEntries) { + if (!claimRecoveryEntry(entry.id)) { + opts.log.info(`WhatsApp reconnect drain: entry ${entry.id} is already being recovered`); + continue; + } + + if (entry.retryCount >= MAX_RETRIES) { + try { + await moveToFailed(entry.id, opts.stateDir); + } catch (err) { + if (getErrnoCode(err) === "ENOENT") { + opts.log.info(`reconnect drain: entry ${entry.id} already gone, skipping`); + continue; + } + throw err; + } finally { + releaseRecoveryEntry(entry.id); + } + opts.log.warn( + `WhatsApp reconnect drain: entry ${entry.id} exceeded max retries and was moved to failed/`, + ); + continue; + } + + try { + await deliver(buildRecoveryDeliverParams(entry, opts.cfg)); + await ackDelivery(entry.id, opts.stateDir); + } catch (err) { + const errMsg = formatErrorMessage(err); + if (isPermanentDeliveryError(errMsg)) { + await moveToFailed(entry.id, opts.stateDir).catch(() => {}); + } else { + await failDelivery(entry.id, errMsg, opts.stateDir).catch(() => {}); + } + } finally { + releaseRecoveryEntry(entry.id); + } + } + } finally { + drainInProgress.delete(opts.accountId); + } +} + /** * On gateway startup, scan the delivery queue and retry any pending entries. * Uses exponential backoff and moves entries that exceed MAX_RETRIES to failed/. @@ -175,11 +290,19 @@ export async function recoverPendingDeliveries(opts: { break; } if (entry.retryCount >= MAX_RETRIES) { - opts.log.warn( - `Delivery ${entry.id} exceeded max retries (${entry.retryCount}/${MAX_RETRIES}) — moving to failed/`, - ); - await moveEntryToFailedWithLogging(entry.id, opts.log, opts.stateDir); - summary.skippedMaxRetries += 1; + if (!claimRecoveryEntry(entry.id)) { + opts.log.info(`Recovery skipped for delivery ${entry.id}: already being processed`); + continue; + } + try { + opts.log.warn( + `Delivery ${entry.id} exceeded max retries (${entry.retryCount}/${MAX_RETRIES}) — moving to failed/`, + ); + await moveEntryToFailedWithLogging(entry.id, opts.log, opts.stateDir); + summary.skippedMaxRetries += 1; + } finally { + releaseRecoveryEntry(entry.id); + } continue; } @@ -192,6 +315,11 @@ export async function recoverPendingDeliveries(opts: { continue; } + if (!claimRecoveryEntry(entry.id)) { + opts.log.info(`Recovery skipped for delivery ${entry.id}: already being processed`); + continue; + } + try { await opts.deliver(buildRecoveryDeliverParams(entry, opts.cfg)); await ackDelivery(entry.id, opts.stateDir); @@ -212,6 +340,8 @@ export async function recoverPendingDeliveries(opts: { } summary.failed += 1; opts.log.warn(`Retry failed for delivery ${entry.id}: ${errMsg}`); + } finally { + releaseRecoveryEntry(entry.id); } } diff --git a/src/infra/outbound/delivery-queue-storage.ts b/src/infra/outbound/delivery-queue-storage.ts index 601fc46beda..6b41484e6dc 100644 --- a/src/infra/outbound/delivery-queue-storage.ts +++ b/src/infra/outbound/delivery-queue-storage.ts @@ -38,7 +38,7 @@ export interface QueuedDelivery extends QueuedDeliveryPayload { lastError?: string; } -function resolveQueueDir(stateDir?: string): string { +export function resolveQueueDir(stateDir?: string): string { const base = stateDir ?? resolveStateDir(); return path.join(base, QUEUE_DIRNAME); } diff --git a/src/infra/outbound/delivery-queue.reconnect-drain.test.ts b/src/infra/outbound/delivery-queue.reconnect-drain.test.ts new file mode 100644 index 00000000000..d653d2efc77 --- /dev/null +++ b/src/infra/outbound/delivery-queue.reconnect-drain.test.ts @@ -0,0 +1,282 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import type { OpenClawConfig } from "../../config/config.js"; +import { + type DeliverFn, + drainReconnectQueue, + enqueueDelivery, + failDelivery, + MAX_RETRIES, + type RecoveryLogger, + recoverPendingDeliveries, +} from "./delivery-queue.js"; + +function createMockLogger(): RecoveryLogger { + return { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }; +} + +const stubCfg = {} as OpenClawConfig; + +describe("drainReconnectQueue", () => { + let fixtureRoot = ""; + let tmpDir: string; + let fixtureCount = 0; + + beforeAll(() => { + fixtureRoot = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-drain-")); + }); + + beforeEach(() => { + tmpDir = path.join(fixtureRoot, `case-${fixtureCount++}`); + fs.mkdirSync(tmpDir, { recursive: true }); + }); + + afterAll(() => { + if (!fixtureRoot) { + return; + } + fs.rmSync(fixtureRoot, { recursive: true, force: true }); + fixtureRoot = ""; + }); + + it("drains entries that failed with 'no listener' error", async () => { + const log = createMockLogger(); + const deliver = vi.fn(async () => {}); + + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" }, + tmpDir, + ); + await failDelivery(id, "No active WhatsApp Web listener", tmpDir); + + await drainReconnectQueue({ + accountId: "acct1", + cfg: stubCfg, + log, + stateDir: tmpDir, + deliver, + }); + + expect(deliver).toHaveBeenCalledTimes(1); + expect(deliver).toHaveBeenCalledWith( + expect.objectContaining({ channel: "whatsapp", to: "+1555", skipQueue: true }), + ); + }); + + it("skips entries from other accounts", async () => { + const log = createMockLogger(); + const deliver = vi.fn(async () => {}); + + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "other" }, + tmpDir, + ); + await failDelivery(id, "No active WhatsApp Web listener", tmpDir); + + await drainReconnectQueue({ + accountId: "acct1", + cfg: stubCfg, + log, + stateDir: tmpDir, + deliver, + }); + + // deliver should not be called since no eligible entries for acct1 + expect(deliver).not.toHaveBeenCalled(); + }); + + it("retries immediately without resetting retry history", async () => { + const log = createMockLogger(); + const deliver = vi.fn(async () => { + throw new Error("transient failure"); + }); + + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" }, + tmpDir, + ); + await failDelivery(id, "No active WhatsApp Web listener", tmpDir); + const queueDir = path.join(tmpDir, "delivery-queue"); + const filePath = path.join(queueDir, `${id}.json`); + const before = JSON.parse(fs.readFileSync(filePath, "utf-8")) as { + retryCount: number; + lastAttemptAt?: number; + lastError?: string; + }; + + await drainReconnectQueue({ + accountId: "acct1", + cfg: stubCfg, + log, + stateDir: tmpDir, + deliver, + }); + + expect(deliver).toHaveBeenCalledTimes(1); + + const after = JSON.parse(fs.readFileSync(filePath, "utf-8")) as { + retryCount: number; + lastAttemptAt?: number; + lastError?: string; + }; + expect(after.retryCount).toBe(before.retryCount + 1); + expect(after.lastAttemptAt).toBeTypeOf("number"); + expect(after.lastAttemptAt).toBeGreaterThanOrEqual(before.lastAttemptAt ?? 0); + expect(after.lastError).toBe("transient failure"); + }); + + it("does not throw if delivery fails during drain", async () => { + const log = createMockLogger(); + const deliver = vi.fn(async () => { + throw new Error("transient failure"); + }); + + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" }, + tmpDir, + ); + await failDelivery(id, "No active WhatsApp Web listener", tmpDir); + + // Should not throw + await expect( + drainReconnectQueue({ + accountId: "acct1", + cfg: stubCfg, + log, + stateDir: tmpDir, + deliver, + }), + ).resolves.toBeUndefined(); + }); + + it("skips entries where retryCount >= MAX_RETRIES", async () => { + const log = createMockLogger(); + const deliver = vi.fn(async () => {}); + + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" }, + tmpDir, + ); + + // Bump retryCount to MAX_RETRIES + for (let i = 0; i < MAX_RETRIES; i++) { + await failDelivery(id, "No active WhatsApp Web listener", tmpDir); + } + + await drainReconnectQueue({ + accountId: "acct1", + cfg: stubCfg, + log, + stateDir: tmpDir, + deliver, + }); + + // Should have moved to failed, not delivered + expect(deliver).not.toHaveBeenCalled(); + const failedDir = path.join(tmpDir, "delivery-queue", "failed"); + const failedFiles = fs.readdirSync(failedDir).filter((f) => f.endsWith(".json")); + expect(failedFiles).toHaveLength(1); + }); + + it("second concurrent call is skipped (concurrency guard)", async () => { + const log = createMockLogger(); + let resolveDeliver: () => void; + const deliverPromise = new Promise((resolve) => { + resolveDeliver = resolve; + }); + const deliver = vi.fn(async () => { + await deliverPromise; + }); + + await enqueueDelivery( + { channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" }, + tmpDir, + ); + // Fail it so it matches the "no listener" filter + const pending = fs + .readdirSync(path.join(tmpDir, "delivery-queue")) + .filter((f) => f.endsWith(".json")); + const entryPath = path.join(tmpDir, "delivery-queue", pending[0]); + const entry = JSON.parse(fs.readFileSync(entryPath, "utf-8")); + entry.lastError = "No active WhatsApp Web listener"; + entry.retryCount = 1; + fs.writeFileSync(entryPath, JSON.stringify(entry, null, 2)); + + const opts = { accountId: "acct1", cfg: stubCfg, log, stateDir: tmpDir, deliver }; + + // Start first drain (will block on deliver) + const first = drainReconnectQueue(opts); + // Start second drain immediately — should be skipped + const second = drainReconnectQueue(opts); + await second; + + expect(log.info).toHaveBeenCalledWith(expect.stringContaining("already in progress")); + + // Unblock first drain + resolveDeliver!(); + await first; + }); + + it("does not re-deliver an entry already being recovered at startup", async () => { + const log = createMockLogger(); + const startupLog = createMockLogger(); + let resolveDeliver: () => void; + const deliverPromise = new Promise((resolve) => { + resolveDeliver = resolve; + }); + const deliver = vi.fn(async () => { + await deliverPromise; + }); + + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" }, + tmpDir, + ); + const queuePath = path.join(tmpDir, "delivery-queue", `${id}.json`); + const entry = JSON.parse(fs.readFileSync(queuePath, "utf-8")) as { + id: string; + enqueuedAt: number; + channel: string; + to: string; + accountId?: string; + payloads: Array<{ text: string }>; + retryCount: number; + lastError?: string; + }; + entry.lastError = "No active WhatsApp Web listener"; + fs.writeFileSync(queuePath, JSON.stringify(entry, null, 2)); + + const startupRecovery = recoverPendingDeliveries({ + cfg: stubCfg, + deliver, + log: startupLog, + stateDir: tmpDir, + }); + + await vi.waitFor(() => { + expect(deliver).toHaveBeenCalledTimes(1); + }); + + await drainReconnectQueue({ + accountId: "acct1", + cfg: stubCfg, + log, + stateDir: tmpDir, + deliver, + }); + + expect(deliver).toHaveBeenCalledTimes(1); + expect(log.info).toHaveBeenCalledWith( + expect.stringContaining(`entry ${id} is already being recovered`), + ); + + resolveDeliver!(); + await startupRecovery; + }); +}); diff --git a/src/infra/outbound/delivery-queue.ts b/src/infra/outbound/delivery-queue.ts index f4da3e991ed..3a4b4293316 100644 --- a/src/infra/outbound/delivery-queue.ts +++ b/src/infra/outbound/delivery-queue.ts @@ -9,6 +9,7 @@ export { export type { QueuedDelivery, QueuedDeliveryPayload } from "./delivery-queue-storage.js"; export { computeBackoffMs, + drainReconnectQueue, isEntryEligibleForRecoveryRetry, isPermanentDeliveryError, MAX_RETRIES, diff --git a/src/plugin-sdk/infra-runtime.ts b/src/plugin-sdk/infra-runtime.ts index 950fa0af1e0..399ec70bc9d 100644 --- a/src/plugin-sdk/infra-runtime.ts +++ b/src/plugin-sdk/infra-runtime.ts @@ -32,6 +32,7 @@ export * from "../infra/net/proxy-env.js"; export * from "../infra/net/proxy-fetch.js"; export * from "../infra/net/undici-global-dispatcher.js"; export * from "../infra/net/ssrf.js"; +export { drainReconnectQueue } from "../infra/outbound/delivery-queue.js"; export * from "../infra/outbound/identity.js"; export * from "../infra/outbound/sanitize-text.js"; export * from "../infra/parse-finite-number.js";