diff --git a/CHANGELOG.md b/CHANGELOG.md index b57b0a0c49c..6c2fbd0df9b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ Docs: https://docs.openclaw.ai - Dreaming/gateway: require `operator.admin` for persistent `/dreaming on|off` changes and treat missing gateway client scopes as unprivileged instead of silently allowing config writes. (#63872) Thanks @mbelinky. - Matrix/multi-account: keep room-level `account` scoping, inherited room overrides, and implicit account selection consistent across top-level default auth, named accounts, and cached-credential env setups. (#58449) thanks @Daanvdplas and @gumadeiras. - Gateway/pairing: prefer explicit QR bootstrap auth over earlier Tailscale auth classification so iOS `/pair qr` silent bootstrap pairing does not fall through to `pairing required`. (#59232) Thanks @ngutman. +- WhatsApp/outbound queue: drain same-account pending WhatsApp deliveries when the listener reconnects, including fresh queued sends that are already retry-eligible, so reconnects recover deliverable outbound messages without waiting for another gateway restart. (#63916) Thanks @mcaxtr. - Config/Discord: coerce safe integer numeric Discord IDs to strings during config validation, keep unsafe or precision-losing numeric snowflakes rejected, and align `openclaw doctor` repair guidance with the same fail-closed behavior. (#45125) Thanks @moliendocode. - Gateway/sessions: scope bare `sessions.create` aliases like `main` to the requested agent while preserving the canonical `global` and `unknown` sentinel keys. (#58207) thanks @jalehman. - `/context detail` now compares the tracked prompt estimate with cached context usage and surfaces untracked provider/runtime overhead when present. (#28391) thanks @ImLukeF. diff --git a/extensions/whatsapp/src/auto-reply/monitor.ts b/extensions/whatsapp/src/auto-reply/monitor.ts index ffc1a84d564..1b3cb07d209 100644 --- a/extensions/whatsapp/src/auto-reply/monitor.ts +++ b/extensions/whatsapp/src/auto-reply/monitor.ts @@ -3,7 +3,7 @@ import { resolveInboundDebounceMs } from "openclaw/plugin-sdk/channel-inbound"; import { formatCliCommand } from "openclaw/plugin-sdk/cli-runtime"; import { waitForever } from "openclaw/plugin-sdk/cli-runtime"; import { hasControlCommand } from "openclaw/plugin-sdk/command-detection"; -import { drainReconnectQueue } from "openclaw/plugin-sdk/infra-runtime"; +import { drainPendingDeliveries } from "openclaw/plugin-sdk/infra-runtime"; import { enqueueSystemEvent } from "openclaw/plugin-sdk/infra-runtime"; import { DEFAULT_GROUP_HISTORY_LIMIT } from "openclaw/plugin-sdk/reply-history"; import { resolveAgentRoute } from "openclaw/plugin-sdk/routing"; @@ -75,6 +75,14 @@ function loadReplyResolverRuntime() { return replyResolverRuntimePromise; } +function normalizeReconnectAccountId(accountId?: string | null): string { + return (accountId ?? "").trim() || "default"; +} + +function isNoListenerReconnectError(lastError?: string): boolean { + return typeof lastError === "string" && /No active WhatsApp Web listener/i.test(lastError); +} + export async function monitorWebChannel( verbose: boolean, listenerFactory: typeof monitorWebInbox | undefined = monitorWebInbox, @@ -261,11 +269,24 @@ export async function monitorWebChannel( setActiveWebListener(account.accountId, listener); - // Drain any messages that failed with "no listener" during the disconnect window. - void drainReconnectQueue({ - accountId: account.accountId, + const normalizedAccountId = normalizeReconnectAccountId(account.accountId); + + // Reconnect is the transport-ready signal for WhatsApp, so drain eligible + // pending deliveries for this account here instead of hardcoding that + // policy inside the generic queue engine. + void drainPendingDeliveries({ + drainKey: `whatsapp:${normalizedAccountId}`, + logLabel: "WhatsApp reconnect drain", cfg, log: reconnectLogger, + selectEntry: (entry) => ({ + match: + entry.channel === "whatsapp" && + normalizeReconnectAccountId(entry.accountId) === normalizedAccountId, + // Reconnect changed listener readiness, so these should not sit behind + // the normal backoff window. + bypassBackoff: isNoListenerReconnectError(entry.lastError), + }), }).catch((err) => { reconnectLogger.warn( { connectionId: active.connectionId, error: String(err) }, diff --git a/src/infra/outbound/delivery-queue-recovery.ts b/src/infra/outbound/delivery-queue-recovery.ts index bda76a1249d..08fccf4bd82 100644 --- a/src/infra/outbound/delivery-queue-recovery.ts +++ b/src/infra/outbound/delivery-queue-recovery.ts @@ -3,6 +3,7 @@ import { formatErrorMessage } from "../errors.js"; import { ackDelivery, failDelivery, + loadPendingDelivery, loadPendingDeliveries, moveToFailed, type QueuedDelivery, @@ -30,6 +31,11 @@ export interface RecoveryLogger { error(msg: string): void; } +export interface PendingDeliveryDrainDecision { + match: boolean; + bypassBackoff?: boolean; +} + const MAX_RETRIES = 5; /** Backoff delays in milliseconds indexed by retry count (1-based). */ @@ -54,8 +60,6 @@ const PERMANENT_ERROR_PATTERNS: readonly RegExp[] = [ /User .* not in room/i, ]; -const NO_LISTENER_ERROR_RE = /No active WhatsApp Web listener/i; - const drainInProgress = new Map(); const entriesInProgress = new Set(); @@ -68,10 +72,6 @@ function loadDeliverRuntime() { return deliverRuntimePromise; } -function normalizeQueueAccountId(accountId?: string): string { - return (accountId ?? "").trim() || "default"; -} - function getErrnoCode(err: unknown): string | null { return err && typeof err === "object" && "code" in err ? String((err as { code?: unknown }).code) @@ -179,82 +179,151 @@ export function isPermanentDeliveryError(error: string): boolean { return PERMANENT_ERROR_PATTERNS.some((re) => re.test(error)); } -export async function drainReconnectQueue(opts: { - accountId: string; +async function drainQueuedEntry(opts: { + entry: QueuedDelivery; + cfg: OpenClawConfig; + deliver: DeliverFn; + stateDir?: string; + onRecovered?: (entry: QueuedDelivery) => void; + onFailed?: (entry: QueuedDelivery, errMsg: string) => void; +}): Promise<"recovered" | "failed" | "moved-to-failed" | "already-gone"> { + const { entry } = opts; + try { + await opts.deliver(buildRecoveryDeliverParams(entry, opts.cfg)); + await ackDelivery(entry.id, opts.stateDir); + opts.onRecovered?.(entry); + return "recovered"; + } catch (err) { + const errMsg = formatErrorMessage(err); + opts.onFailed?.(entry, errMsg); + if (isPermanentDeliveryError(errMsg)) { + try { + await moveToFailed(entry.id, opts.stateDir); + return "moved-to-failed"; + } catch (moveErr) { + if (getErrnoCode(moveErr) === "ENOENT") { + return "already-gone"; + } + } + } else { + try { + await failDelivery(entry.id, errMsg, opts.stateDir); + return "failed"; + } catch (failErr) { + if (getErrnoCode(failErr) === "ENOENT") { + return "already-gone"; + } + } + } + return "failed"; + } +} + +export async function drainPendingDeliveries(opts: { + drainKey: string; + logLabel: string; cfg: OpenClawConfig; log: RecoveryLogger; stateDir?: string; deliver?: DeliverFn; + selectEntry: (entry: QueuedDelivery, now: number) => PendingDeliveryDrainDecision; }): Promise { - if (drainInProgress.get(opts.accountId)) { - opts.log.info( - `WhatsApp reconnect drain: already in progress for account ${opts.accountId}, skipping`, - ); + if (drainInProgress.get(opts.drainKey)) { + opts.log.info(`${opts.logLabel}: already in progress for ${opts.drainKey}, skipping`); return; } - drainInProgress.set(opts.accountId, true); + drainInProgress.set(opts.drainKey, true); try { + const now = Date.now(); + const deliver = opts.deliver ?? (await loadDeliverRuntime()).deliverOutboundPayloads; const matchingEntries = (await loadPendingDeliveries(opts.stateDir)) + .map((entry) => ({ + entry, + decision: opts.selectEntry(entry, now), + })) .filter( - (entry) => - entry.channel === "whatsapp" && - normalizeQueueAccountId(entry.accountId) === opts.accountId && - typeof entry.lastError === "string" && - NO_LISTENER_ERROR_RE.test(entry.lastError), + (item): item is { entry: QueuedDelivery; decision: PendingDeliveryDrainDecision } => + item.decision.match, ) - .toSorted((a, b) => a.enqueuedAt - b.enqueuedAt); + .toSorted((a, b) => a.entry.enqueuedAt - b.entry.enqueuedAt); if (matchingEntries.length === 0) { return; } opts.log.info( - `WhatsApp reconnect drain: ${matchingEntries.length} pending message(s) for account ${opts.accountId}`, + `${opts.logLabel}: ${matchingEntries.length} pending message(s) matched ${opts.drainKey}`, ); - const deliver = opts.deliver ?? (await loadDeliverRuntime()).deliverOutboundPayloads; - - for (const entry of matchingEntries) { + for (const { entry, decision } of matchingEntries) { if (!claimRecoveryEntry(entry.id)) { - opts.log.info(`WhatsApp reconnect drain: entry ${entry.id} is already being recovered`); - continue; - } - - if (entry.retryCount >= MAX_RETRIES) { - try { - await moveToFailed(entry.id, opts.stateDir); - } catch (err) { - if (getErrnoCode(err) === "ENOENT") { - opts.log.info(`reconnect drain: entry ${entry.id} already gone, skipping`); - continue; - } - throw err; - } finally { - releaseRecoveryEntry(entry.id); - } - opts.log.warn( - `WhatsApp reconnect drain: entry ${entry.id} exceeded max retries and was moved to failed/`, - ); + opts.log.info(`${opts.logLabel}: entry ${entry.id} is already being recovered`); continue; } try { - await deliver(buildRecoveryDeliverParams(entry, opts.cfg)); - await ackDelivery(entry.id, opts.stateDir); - } catch (err) { - const errMsg = formatErrorMessage(err); - if (isPermanentDeliveryError(errMsg)) { - await moveToFailed(entry.id, opts.stateDir).catch(() => {}); - } else { - await failDelivery(entry.id, errMsg, opts.stateDir).catch(() => {}); + // Re-read after claim so the queue file remains the source of truth. + // This prevents stale startup/reconnect snapshots from re-sending an + // entry that another recovery path already acked. + const currentEntry = await loadPendingDelivery(entry.id, opts.stateDir); + if (!currentEntry) { + opts.log.info(`${opts.logLabel}: entry ${entry.id} already gone, skipping`); + continue; + } + + if (currentEntry.retryCount >= MAX_RETRIES) { + try { + await moveToFailed(currentEntry.id, opts.stateDir); + } catch (err) { + if (getErrnoCode(err) === "ENOENT") { + opts.log.info(`${opts.logLabel}: entry ${currentEntry.id} already gone, skipping`); + continue; + } + throw err; + } + opts.log.warn( + `${opts.logLabel}: entry ${currentEntry.id} exceeded max retries and was moved to failed/`, + ); + continue; + } + + if (!decision.bypassBackoff) { + const retryEligibility = isEntryEligibleForRecoveryRetry(currentEntry, Date.now()); + if (!retryEligibility.eligible) { + opts.log.info( + `${opts.logLabel}: entry ${currentEntry.id} not ready for retry yet — backoff ${retryEligibility.remainingBackoffMs}ms remaining`, + ); + continue; + } + } + + const result = await drainQueuedEntry({ + entry: currentEntry, + cfg: opts.cfg, + deliver, + stateDir: opts.stateDir, + onFailed: (failedEntry, errMsg) => { + if (isPermanentDeliveryError(errMsg)) { + opts.log.warn( + `${opts.logLabel}: entry ${failedEntry.id} hit permanent error — moving to failed/: ${errMsg}`, + ); + return; + } + opts.log.warn(`${opts.logLabel}: retry failed for entry ${failedEntry.id}: ${errMsg}`); + }, + }); + if (result === "recovered") { + opts.log.info( + `${opts.logLabel}: drained delivery ${currentEntry.id} on ${currentEntry.channel}`, + ); } } finally { releaseRecoveryEntry(entry.id); } } } finally { - drainInProgress.delete(opts.accountId); + drainInProgress.delete(opts.drainKey); } } @@ -289,31 +358,6 @@ export async function recoverPendingDeliveries(opts: { await deferRemainingEntriesForBudget(pending.slice(i), opts.stateDir); break; } - if (entry.retryCount >= MAX_RETRIES) { - if (!claimRecoveryEntry(entry.id)) { - opts.log.info(`Recovery skipped for delivery ${entry.id}: already being processed`); - continue; - } - try { - opts.log.warn( - `Delivery ${entry.id} exceeded max retries (${entry.retryCount}/${MAX_RETRIES}) — moving to failed/`, - ); - await moveEntryToFailedWithLogging(entry.id, opts.log, opts.stateDir); - summary.skippedMaxRetries += 1; - } finally { - releaseRecoveryEntry(entry.id); - } - continue; - } - - const retryEligibility = isEntryEligibleForRecoveryRetry(entry, now); - if (!retryEligibility.eligible) { - summary.deferredBackoff += 1; - opts.log.info( - `Delivery ${entry.id} not ready for retry yet — backoff ${retryEligibility.remainingBackoffMs}ms remaining`, - ); - continue; - } if (!claimRecoveryEntry(entry.id)) { opts.log.info(`Recovery skipped for delivery ${entry.id}: already being processed`); @@ -321,25 +365,53 @@ export async function recoverPendingDeliveries(opts: { } try { - await opts.deliver(buildRecoveryDeliverParams(entry, opts.cfg)); - await ackDelivery(entry.id, opts.stateDir); - summary.recovered += 1; - opts.log.info(`Recovered delivery ${entry.id} to ${entry.channel}:${entry.to}`); - } catch (err) { - const errMsg = formatErrorMessage(err); - if (isPermanentDeliveryError(errMsg)) { - opts.log.warn(`Delivery ${entry.id} hit permanent error — moving to failed/: ${errMsg}`); - await moveEntryToFailedWithLogging(entry.id, opts.log, opts.stateDir); - summary.failed += 1; + const currentEntry = await loadPendingDelivery(entry.id, opts.stateDir); + if (!currentEntry) { + opts.log.info(`Recovery skipped for delivery ${entry.id}: already gone`); continue; } - try { - await failDelivery(entry.id, errMsg, opts.stateDir); - } catch { - // Best-effort update. + + if (currentEntry.retryCount >= MAX_RETRIES) { + opts.log.warn( + `Delivery ${currentEntry.id} exceeded max retries (${currentEntry.retryCount}/${MAX_RETRIES}) — moving to failed/`, + ); + await moveEntryToFailedWithLogging(currentEntry.id, opts.log, opts.stateDir); + summary.skippedMaxRetries += 1; + continue; + } + + const currentRetryEligibility = isEntryEligibleForRecoveryRetry(currentEntry, Date.now()); + if (!currentRetryEligibility.eligible) { + summary.deferredBackoff += 1; + opts.log.info( + `Delivery ${currentEntry.id} not ready for retry yet — backoff ${currentRetryEligibility.remainingBackoffMs}ms remaining`, + ); + continue; + } + + const result = await drainQueuedEntry({ + entry: currentEntry, + cfg: opts.cfg, + deliver: opts.deliver, + stateDir: opts.stateDir, + onRecovered: (recoveredEntry) => { + summary.recovered += 1; + opts.log.info(`Recovered delivery ${recoveredEntry.id} on ${recoveredEntry.channel}`); + }, + onFailed: (failedEntry, errMsg) => { + summary.failed += 1; + if (isPermanentDeliveryError(errMsg)) { + opts.log.warn( + `Delivery ${failedEntry.id} hit permanent error — moving to failed/: ${errMsg}`, + ); + return; + } + opts.log.warn(`Retry failed for delivery ${failedEntry.id}: ${errMsg}`); + }, + }); + if (result === "moved-to-failed") { + continue; } - summary.failed += 1; - opts.log.warn(`Retry failed for delivery ${entry.id}: ${errMsg}`); } finally { releaseRecoveryEntry(entry.id); } diff --git a/src/infra/outbound/delivery-queue-storage.ts b/src/infra/outbound/delivery-queue-storage.ts index 6b41484e6dc..59a5ed0b890 100644 --- a/src/infra/outbound/delivery-queue-storage.ts +++ b/src/infra/outbound/delivery-queue-storage.ts @@ -188,6 +188,30 @@ export async function failDelivery(id: string, error: string, stateDir?: string) await writeQueueEntry(filePath, entry); } +/** Load a single pending delivery entry by ID from the queue directory. */ +export async function loadPendingDelivery( + id: string, + stateDir?: string, +): Promise { + const { jsonPath } = resolveQueueEntryPaths(id, stateDir); + try { + const stat = await fs.promises.stat(jsonPath); + if (!stat.isFile()) { + return null; + } + const { entry, migrated } = normalizeLegacyQueuedDeliveryEntry(await readQueueEntry(jsonPath)); + if (migrated) { + await writeQueueEntry(jsonPath, entry); + } + return entry; + } catch (err) { + if (getErrnoCode(err) === "ENOENT") { + return null; + } + throw err; + } +} + /** Load all pending delivery entries from the queue directory. */ export async function loadPendingDeliveries(stateDir?: string): Promise { const queueDir = resolveQueueDir(stateDir); diff --git a/src/infra/outbound/delivery-queue.reconnect-drain.test.ts b/src/infra/outbound/delivery-queue.reconnect-drain.test.ts index d653d2efc77..2387eecf15c 100644 --- a/src/infra/outbound/delivery-queue.reconnect-drain.test.ts +++ b/src/infra/outbound/delivery-queue.reconnect-drain.test.ts @@ -5,7 +5,7 @@ import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vites import type { OpenClawConfig } from "../../config/config.js"; import { type DeliverFn, - drainReconnectQueue, + drainPendingDeliveries, enqueueDelivery, failDelivery, MAX_RETRIES, @@ -22,8 +22,37 @@ function createMockLogger(): RecoveryLogger { } const stubCfg = {} as OpenClawConfig; +const NO_LISTENER_ERROR = "No active WhatsApp Web listener"; -describe("drainReconnectQueue", () => { +function normalizeReconnectAccountIdForTest(accountId?: string | null): string { + return (accountId ?? "").trim() || "default"; +} + +async function drainWhatsAppReconnectPending(opts: { + accountId: string; + deliver: DeliverFn; + log: RecoveryLogger; + stateDir: string; +}) { + const normalizedAccountId = normalizeReconnectAccountIdForTest(opts.accountId); + await drainPendingDeliveries({ + drainKey: `whatsapp:${normalizedAccountId}`, + logLabel: "WhatsApp reconnect drain", + cfg: stubCfg, + log: opts.log, + stateDir: opts.stateDir, + deliver: opts.deliver, + selectEntry: (entry) => ({ + match: + entry.channel === "whatsapp" && + normalizeReconnectAccountIdForTest(entry.accountId) === normalizedAccountId, + bypassBackoff: + typeof entry.lastError === "string" && entry.lastError.includes(NO_LISTENER_ERROR), + }), + }); +} + +describe("drainPendingDeliveries for WhatsApp reconnect", () => { let fixtureRoot = ""; let tmpDir: string; let fixtureCount = 0; @@ -55,12 +84,11 @@ describe("drainReconnectQueue", () => { ); await failDelivery(id, "No active WhatsApp Web listener", tmpDir); - await drainReconnectQueue({ + await drainWhatsAppReconnectPending({ accountId: "acct1", - cfg: stubCfg, + deliver, log, stateDir: tmpDir, - deliver, }); expect(deliver).toHaveBeenCalledTimes(1); @@ -79,12 +107,11 @@ describe("drainReconnectQueue", () => { ); await failDelivery(id, "No active WhatsApp Web listener", tmpDir); - await drainReconnectQueue({ + await drainWhatsAppReconnectPending({ accountId: "acct1", - cfg: stubCfg, + deliver, log, stateDir: tmpDir, - deliver, }); // deliver should not be called since no eligible entries for acct1 @@ -110,12 +137,11 @@ describe("drainReconnectQueue", () => { lastError?: string; }; - await drainReconnectQueue({ + await drainWhatsAppReconnectPending({ accountId: "acct1", - cfg: stubCfg, + deliver, log, stateDir: tmpDir, - deliver, }); expect(deliver).toHaveBeenCalledTimes(1); @@ -145,12 +171,11 @@ describe("drainReconnectQueue", () => { // Should not throw await expect( - drainReconnectQueue({ + drainWhatsAppReconnectPending({ accountId: "acct1", - cfg: stubCfg, + deliver, log, stateDir: tmpDir, - deliver, }), ).resolves.toBeUndefined(); }); @@ -169,12 +194,11 @@ describe("drainReconnectQueue", () => { await failDelivery(id, "No active WhatsApp Web listener", tmpDir); } - await drainReconnectQueue({ + await drainWhatsAppReconnectPending({ accountId: "acct1", - cfg: stubCfg, + deliver, log, stateDir: tmpDir, - deliver, }); // Should have moved to failed, not delivered @@ -208,12 +232,12 @@ describe("drainReconnectQueue", () => { entry.retryCount = 1; fs.writeFileSync(entryPath, JSON.stringify(entry, null, 2)); - const opts = { accountId: "acct1", cfg: stubCfg, log, stateDir: tmpDir, deliver }; + const opts = { accountId: "acct1", log, stateDir: tmpDir, deliver }; // Start first drain (will block on deliver) - const first = drainReconnectQueue(opts); + const first = drainWhatsAppReconnectPending(opts); // Start second drain immediately — should be skipped - const second = drainReconnectQueue(opts); + const second = drainWhatsAppReconnectPending(opts); await second; expect(log.info).toHaveBeenCalledWith(expect.stringContaining("already in progress")); @@ -263,12 +287,11 @@ describe("drainReconnectQueue", () => { expect(deliver).toHaveBeenCalledTimes(1); }); - await drainReconnectQueue({ + await drainWhatsAppReconnectPending({ accountId: "acct1", - cfg: stubCfg, + deliver, log, stateDir: tmpDir, - deliver, }); expect(deliver).toHaveBeenCalledTimes(1); @@ -279,4 +302,165 @@ describe("drainReconnectQueue", () => { resolveDeliver!(); await startupRecovery; }); + + it("does not re-deliver a stale startup snapshot after reconnect already acked it", async () => { + const log = createMockLogger(); + const startupLog = createMockLogger(); + let releaseBlocker: () => void; + const blocker = new Promise((resolve) => { + releaseBlocker = resolve; + }); + const deliveredTargets: string[] = []; + const deliver = vi.fn(async ({ to }) => { + deliveredTargets.push(to); + if (to === "+1000") { + await blocker; + } + }); + + await enqueueDelivery( + { channel: "demo-channel-a", to: "+1000", payloads: [{ text: "blocker" }] }, + tmpDir, + ); + await enqueueDelivery( + { channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" }, + tmpDir, + ); + + const startupRecovery = recoverPendingDeliveries({ + cfg: stubCfg, + deliver, + log: startupLog, + stateDir: tmpDir, + }); + + await vi.waitFor(() => { + expect(deliver).toHaveBeenCalledWith( + expect.objectContaining({ channel: "demo-channel-a", to: "+1000" }), + ); + }); + + await drainWhatsAppReconnectPending({ + accountId: "acct1", + deliver, + log, + stateDir: tmpDir, + }); + + releaseBlocker!(); + await startupRecovery; + + expect(deliver).toHaveBeenCalledTimes(2); + expect(deliveredTargets.filter((target) => target === "+1555")).toHaveLength(1); + expect(startupLog.info).toHaveBeenCalledWith( + expect.stringContaining("Recovery skipped for delivery"), + ); + }); + it("drains fresh pending WhatsApp entries for the reconnecting account", async () => { + const log = createMockLogger(); + const deliver = vi.fn(async () => {}); + + await enqueueDelivery( + { channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" }, + tmpDir, + ); + + await drainWhatsAppReconnectPending({ + accountId: "acct1", + deliver, + log, + stateDir: tmpDir, + }); + + expect(deliver).toHaveBeenCalledTimes(1); + expect( + fs.readdirSync(path.join(tmpDir, "delivery-queue")).filter((f) => f.endsWith(".json")), + ).toEqual([]); + }); + + it("drains backoff-eligible WhatsApp retries on reconnect", async () => { + const log = createMockLogger(); + const deliver = vi.fn(async () => {}); + + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" }, + tmpDir, + ); + await failDelivery(id, "network down", tmpDir); + const entryPath = path.join(tmpDir, "delivery-queue", `${id}.json`); + const entry = JSON.parse(fs.readFileSync(entryPath, "utf-8")) as { + lastAttemptAt?: number; + }; + entry.lastAttemptAt = Date.now() - 30_000; + fs.writeFileSync(entryPath, JSON.stringify(entry, null, 2)); + + await drainWhatsAppReconnectPending({ + accountId: "acct1", + deliver, + log, + stateDir: tmpDir, + }); + + expect(deliver).toHaveBeenCalledTimes(1); + }); + + it("does not bypass backoff for ordinary transient errors on reconnect", async () => { + const log = createMockLogger(); + const deliver = vi.fn(async () => {}); + + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" }, + tmpDir, + ); + await failDelivery(id, "network down", tmpDir); + + await drainWhatsAppReconnectPending({ + accountId: "acct1", + deliver, + log, + stateDir: tmpDir, + }); + + expect(deliver).not.toHaveBeenCalled(); + expect(log.info).toHaveBeenCalledWith(expect.stringContaining("not ready for retry yet")); + }); + + it("still bypasses backoff for no-listener failures on reconnect", async () => { + const log = createMockLogger(); + const deliver = vi.fn(async () => {}); + + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" }, + tmpDir, + ); + await failDelivery(id, NO_LISTENER_ERROR, tmpDir); + + await drainWhatsAppReconnectPending({ + accountId: "acct1", + deliver, + log, + stateDir: tmpDir, + }); + + expect(deliver).toHaveBeenCalledTimes(1); + }); + + it("ignores non-WhatsApp entries even when reconnect drain runs", async () => { + const log = createMockLogger(); + const deliver = vi.fn(async () => {}); + + await enqueueDelivery( + { channel: "telegram", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" }, + tmpDir, + ); + + await drainWhatsAppReconnectPending({ + accountId: "acct1", + deliver, + log, + stateDir: tmpDir, + }); + + expect(deliver).not.toHaveBeenCalled(); + }); }); diff --git a/src/infra/outbound/delivery-queue.ts b/src/infra/outbound/delivery-queue.ts index 3a4b4293316..78cfd1bff87 100644 --- a/src/infra/outbound/delivery-queue.ts +++ b/src/infra/outbound/delivery-queue.ts @@ -3,16 +3,22 @@ export { enqueueDelivery, ensureQueueDir, failDelivery, + loadPendingDelivery, loadPendingDeliveries, moveToFailed, } from "./delivery-queue-storage.js"; export type { QueuedDelivery, QueuedDeliveryPayload } from "./delivery-queue-storage.js"; export { computeBackoffMs, - drainReconnectQueue, + drainPendingDeliveries, isEntryEligibleForRecoveryRetry, isPermanentDeliveryError, MAX_RETRIES, recoverPendingDeliveries, } from "./delivery-queue-recovery.js"; -export type { DeliverFn, RecoveryLogger, RecoverySummary } from "./delivery-queue-recovery.js"; +export type { + DeliverFn, + PendingDeliveryDrainDecision, + RecoveryLogger, + RecoverySummary, +} from "./delivery-queue-recovery.js"; diff --git a/src/plugin-sdk/infra-runtime.ts b/src/plugin-sdk/infra-runtime.ts index 399ec70bc9d..e03a10f342c 100644 --- a/src/plugin-sdk/infra-runtime.ts +++ b/src/plugin-sdk/infra-runtime.ts @@ -1,5 +1,49 @@ +import type { OpenClawConfig } from "../config/config.js"; +import { + drainPendingDeliveries, + type DeliverFn, + type RecoveryLogger, +} from "../infra/outbound/delivery-queue.js"; + // Public runtime/transport helpers for plugins that need shared infra behavior. +function normalizeWhatsAppReconnectAccountId(accountId?: string): string { + return (accountId ?? "").trim() || "default"; +} + +const WHATSAPP_NO_LISTENER_ERROR_RE = /No active WhatsApp Web listener/i; + +/** + * @deprecated Prefer plugin-owned reconnect policy wired through + * `drainPendingDeliveries(...)`. This compatibility shim preserves the + * historical public SDK symbol for existing plugin callers. + */ +export async function drainReconnectQueue(opts: { + accountId: string; + cfg: OpenClawConfig; + log: RecoveryLogger; + stateDir?: string; + deliver?: DeliverFn; +}): Promise { + const normalizedAccountId = normalizeWhatsAppReconnectAccountId(opts.accountId); + await drainPendingDeliveries({ + drainKey: `whatsapp:${normalizedAccountId}`, + logLabel: "WhatsApp reconnect drain", + cfg: opts.cfg, + log: opts.log, + stateDir: opts.stateDir, + deliver: opts.deliver, + selectEntry: (entry) => ({ + match: + entry.channel === "whatsapp" && + normalizeWhatsAppReconnectAccountId(entry.accountId) === normalizedAccountId && + typeof entry.lastError === "string" && + WHATSAPP_NO_LISTENER_ERROR_RE.test(entry.lastError), + bypassBackoff: true, + }), + }); +} + export * from "../infra/backoff.js"; export * from "../infra/channel-activity.js"; export * from "../infra/dedupe.js"; @@ -32,7 +76,7 @@ export * from "../infra/net/proxy-env.js"; export * from "../infra/net/proxy-fetch.js"; export * from "../infra/net/undici-global-dispatcher.js"; export * from "../infra/net/ssrf.js"; -export { drainReconnectQueue } from "../infra/outbound/delivery-queue.js"; +export { drainPendingDeliveries }; export * from "../infra/outbound/identity.js"; export * from "../infra/outbound/sanitize-text.js"; export * from "../infra/parse-finite-number.js";