mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-15 19:21:08 +00:00
fix(whatsapp): drain eligible pending deliveries on reconnect (#63916)
* fix(whatsapp): drain eligible pending deliveries on reconnect * docs(changelog): note whatsapp reconnect pending drain
This commit is contained in:
@@ -38,6 +38,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Dreaming/gateway: require `operator.admin` for persistent `/dreaming on|off` changes and treat missing gateway client scopes as unprivileged instead of silently allowing config writes. (#63872) Thanks @mbelinky.
|
||||
- Matrix/multi-account: keep room-level `account` scoping, inherited room overrides, and implicit account selection consistent across top-level default auth, named accounts, and cached-credential env setups. (#58449) thanks @Daanvdplas and @gumadeiras.
|
||||
- Gateway/pairing: prefer explicit QR bootstrap auth over earlier Tailscale auth classification so iOS `/pair qr` silent bootstrap pairing does not fall through to `pairing required`. (#59232) Thanks @ngutman.
|
||||
- WhatsApp/outbound queue: drain same-account pending WhatsApp deliveries when the listener reconnects, including fresh queued sends that are already retry-eligible, so reconnects recover deliverable outbound messages without waiting for another gateway restart. (#63916) Thanks @mcaxtr.
|
||||
- Config/Discord: coerce safe integer numeric Discord IDs to strings during config validation, keep unsafe or precision-losing numeric snowflakes rejected, and align `openclaw doctor` repair guidance with the same fail-closed behavior. (#45125) Thanks @moliendocode.
|
||||
- Gateway/sessions: scope bare `sessions.create` aliases like `main` to the requested agent while preserving the canonical `global` and `unknown` sentinel keys. (#58207) thanks @jalehman.
|
||||
- `/context detail` now compares the tracked prompt estimate with cached context usage and surfaces untracked provider/runtime overhead when present. (#28391) thanks @ImLukeF.
|
||||
|
||||
@@ -3,7 +3,7 @@ import { resolveInboundDebounceMs } from "openclaw/plugin-sdk/channel-inbound";
|
||||
import { formatCliCommand } from "openclaw/plugin-sdk/cli-runtime";
|
||||
import { waitForever } from "openclaw/plugin-sdk/cli-runtime";
|
||||
import { hasControlCommand } from "openclaw/plugin-sdk/command-detection";
|
||||
import { drainReconnectQueue } from "openclaw/plugin-sdk/infra-runtime";
|
||||
import { drainPendingDeliveries } from "openclaw/plugin-sdk/infra-runtime";
|
||||
import { enqueueSystemEvent } from "openclaw/plugin-sdk/infra-runtime";
|
||||
import { DEFAULT_GROUP_HISTORY_LIMIT } from "openclaw/plugin-sdk/reply-history";
|
||||
import { resolveAgentRoute } from "openclaw/plugin-sdk/routing";
|
||||
@@ -75,6 +75,14 @@ function loadReplyResolverRuntime() {
|
||||
return replyResolverRuntimePromise;
|
||||
}
|
||||
|
||||
function normalizeReconnectAccountId(accountId?: string | null): string {
|
||||
return (accountId ?? "").trim() || "default";
|
||||
}
|
||||
|
||||
function isNoListenerReconnectError(lastError?: string): boolean {
|
||||
return typeof lastError === "string" && /No active WhatsApp Web listener/i.test(lastError);
|
||||
}
|
||||
|
||||
export async function monitorWebChannel(
|
||||
verbose: boolean,
|
||||
listenerFactory: typeof monitorWebInbox | undefined = monitorWebInbox,
|
||||
@@ -261,11 +269,24 @@ export async function monitorWebChannel(
|
||||
|
||||
setActiveWebListener(account.accountId, listener);
|
||||
|
||||
// Drain any messages that failed with "no listener" during the disconnect window.
|
||||
void drainReconnectQueue({
|
||||
accountId: account.accountId,
|
||||
const normalizedAccountId = normalizeReconnectAccountId(account.accountId);
|
||||
|
||||
// Reconnect is the transport-ready signal for WhatsApp, so drain eligible
|
||||
// pending deliveries for this account here instead of hardcoding that
|
||||
// policy inside the generic queue engine.
|
||||
void drainPendingDeliveries({
|
||||
drainKey: `whatsapp:${normalizedAccountId}`,
|
||||
logLabel: "WhatsApp reconnect drain",
|
||||
cfg,
|
||||
log: reconnectLogger,
|
||||
selectEntry: (entry) => ({
|
||||
match:
|
||||
entry.channel === "whatsapp" &&
|
||||
normalizeReconnectAccountId(entry.accountId) === normalizedAccountId,
|
||||
// Reconnect changed listener readiness, so these should not sit behind
|
||||
// the normal backoff window.
|
||||
bypassBackoff: isNoListenerReconnectError(entry.lastError),
|
||||
}),
|
||||
}).catch((err) => {
|
||||
reconnectLogger.warn(
|
||||
{ connectionId: active.connectionId, error: String(err) },
|
||||
|
||||
@@ -3,6 +3,7 @@ import { formatErrorMessage } from "../errors.js";
|
||||
import {
|
||||
ackDelivery,
|
||||
failDelivery,
|
||||
loadPendingDelivery,
|
||||
loadPendingDeliveries,
|
||||
moveToFailed,
|
||||
type QueuedDelivery,
|
||||
@@ -30,6 +31,11 @@ export interface RecoveryLogger {
|
||||
error(msg: string): void;
|
||||
}
|
||||
|
||||
export interface PendingDeliveryDrainDecision {
|
||||
match: boolean;
|
||||
bypassBackoff?: boolean;
|
||||
}
|
||||
|
||||
const MAX_RETRIES = 5;
|
||||
|
||||
/** Backoff delays in milliseconds indexed by retry count (1-based). */
|
||||
@@ -54,8 +60,6 @@ const PERMANENT_ERROR_PATTERNS: readonly RegExp[] = [
|
||||
/User .* not in room/i,
|
||||
];
|
||||
|
||||
const NO_LISTENER_ERROR_RE = /No active WhatsApp Web listener/i;
|
||||
|
||||
const drainInProgress = new Map<string, boolean>();
|
||||
const entriesInProgress = new Set<string>();
|
||||
|
||||
@@ -68,10 +72,6 @@ function loadDeliverRuntime() {
|
||||
return deliverRuntimePromise;
|
||||
}
|
||||
|
||||
function normalizeQueueAccountId(accountId?: string): string {
|
||||
return (accountId ?? "").trim() || "default";
|
||||
}
|
||||
|
||||
function getErrnoCode(err: unknown): string | null {
|
||||
return err && typeof err === "object" && "code" in err
|
||||
? String((err as { code?: unknown }).code)
|
||||
@@ -179,82 +179,151 @@ export function isPermanentDeliveryError(error: string): boolean {
|
||||
return PERMANENT_ERROR_PATTERNS.some((re) => re.test(error));
|
||||
}
|
||||
|
||||
export async function drainReconnectQueue(opts: {
|
||||
accountId: string;
|
||||
async function drainQueuedEntry(opts: {
|
||||
entry: QueuedDelivery;
|
||||
cfg: OpenClawConfig;
|
||||
deliver: DeliverFn;
|
||||
stateDir?: string;
|
||||
onRecovered?: (entry: QueuedDelivery) => void;
|
||||
onFailed?: (entry: QueuedDelivery, errMsg: string) => void;
|
||||
}): Promise<"recovered" | "failed" | "moved-to-failed" | "already-gone"> {
|
||||
const { entry } = opts;
|
||||
try {
|
||||
await opts.deliver(buildRecoveryDeliverParams(entry, opts.cfg));
|
||||
await ackDelivery(entry.id, opts.stateDir);
|
||||
opts.onRecovered?.(entry);
|
||||
return "recovered";
|
||||
} catch (err) {
|
||||
const errMsg = formatErrorMessage(err);
|
||||
opts.onFailed?.(entry, errMsg);
|
||||
if (isPermanentDeliveryError(errMsg)) {
|
||||
try {
|
||||
await moveToFailed(entry.id, opts.stateDir);
|
||||
return "moved-to-failed";
|
||||
} catch (moveErr) {
|
||||
if (getErrnoCode(moveErr) === "ENOENT") {
|
||||
return "already-gone";
|
||||
}
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
await failDelivery(entry.id, errMsg, opts.stateDir);
|
||||
return "failed";
|
||||
} catch (failErr) {
|
||||
if (getErrnoCode(failErr) === "ENOENT") {
|
||||
return "already-gone";
|
||||
}
|
||||
}
|
||||
}
|
||||
return "failed";
|
||||
}
|
||||
}
|
||||
|
||||
export async function drainPendingDeliveries(opts: {
|
||||
drainKey: string;
|
||||
logLabel: string;
|
||||
cfg: OpenClawConfig;
|
||||
log: RecoveryLogger;
|
||||
stateDir?: string;
|
||||
deliver?: DeliverFn;
|
||||
selectEntry: (entry: QueuedDelivery, now: number) => PendingDeliveryDrainDecision;
|
||||
}): Promise<void> {
|
||||
if (drainInProgress.get(opts.accountId)) {
|
||||
opts.log.info(
|
||||
`WhatsApp reconnect drain: already in progress for account ${opts.accountId}, skipping`,
|
||||
);
|
||||
if (drainInProgress.get(opts.drainKey)) {
|
||||
opts.log.info(`${opts.logLabel}: already in progress for ${opts.drainKey}, skipping`);
|
||||
return;
|
||||
}
|
||||
|
||||
drainInProgress.set(opts.accountId, true);
|
||||
drainInProgress.set(opts.drainKey, true);
|
||||
try {
|
||||
const now = Date.now();
|
||||
const deliver = opts.deliver ?? (await loadDeliverRuntime()).deliverOutboundPayloads;
|
||||
const matchingEntries = (await loadPendingDeliveries(opts.stateDir))
|
||||
.map((entry) => ({
|
||||
entry,
|
||||
decision: opts.selectEntry(entry, now),
|
||||
}))
|
||||
.filter(
|
||||
(entry) =>
|
||||
entry.channel === "whatsapp" &&
|
||||
normalizeQueueAccountId(entry.accountId) === opts.accountId &&
|
||||
typeof entry.lastError === "string" &&
|
||||
NO_LISTENER_ERROR_RE.test(entry.lastError),
|
||||
(item): item is { entry: QueuedDelivery; decision: PendingDeliveryDrainDecision } =>
|
||||
item.decision.match,
|
||||
)
|
||||
.toSorted((a, b) => a.enqueuedAt - b.enqueuedAt);
|
||||
.toSorted((a, b) => a.entry.enqueuedAt - b.entry.enqueuedAt);
|
||||
|
||||
if (matchingEntries.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
opts.log.info(
|
||||
`WhatsApp reconnect drain: ${matchingEntries.length} pending message(s) for account ${opts.accountId}`,
|
||||
`${opts.logLabel}: ${matchingEntries.length} pending message(s) matched ${opts.drainKey}`,
|
||||
);
|
||||
|
||||
const deliver = opts.deliver ?? (await loadDeliverRuntime()).deliverOutboundPayloads;
|
||||
|
||||
for (const entry of matchingEntries) {
|
||||
for (const { entry, decision } of matchingEntries) {
|
||||
if (!claimRecoveryEntry(entry.id)) {
|
||||
opts.log.info(`WhatsApp reconnect drain: entry ${entry.id} is already being recovered`);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (entry.retryCount >= MAX_RETRIES) {
|
||||
try {
|
||||
await moveToFailed(entry.id, opts.stateDir);
|
||||
} catch (err) {
|
||||
if (getErrnoCode(err) === "ENOENT") {
|
||||
opts.log.info(`reconnect drain: entry ${entry.id} already gone, skipping`);
|
||||
continue;
|
||||
}
|
||||
throw err;
|
||||
} finally {
|
||||
releaseRecoveryEntry(entry.id);
|
||||
}
|
||||
opts.log.warn(
|
||||
`WhatsApp reconnect drain: entry ${entry.id} exceeded max retries and was moved to failed/`,
|
||||
);
|
||||
opts.log.info(`${opts.logLabel}: entry ${entry.id} is already being recovered`);
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
await deliver(buildRecoveryDeliverParams(entry, opts.cfg));
|
||||
await ackDelivery(entry.id, opts.stateDir);
|
||||
} catch (err) {
|
||||
const errMsg = formatErrorMessage(err);
|
||||
if (isPermanentDeliveryError(errMsg)) {
|
||||
await moveToFailed(entry.id, opts.stateDir).catch(() => {});
|
||||
} else {
|
||||
await failDelivery(entry.id, errMsg, opts.stateDir).catch(() => {});
|
||||
// Re-read after claim so the queue file remains the source of truth.
|
||||
// This prevents stale startup/reconnect snapshots from re-sending an
|
||||
// entry that another recovery path already acked.
|
||||
const currentEntry = await loadPendingDelivery(entry.id, opts.stateDir);
|
||||
if (!currentEntry) {
|
||||
opts.log.info(`${opts.logLabel}: entry ${entry.id} already gone, skipping`);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (currentEntry.retryCount >= MAX_RETRIES) {
|
||||
try {
|
||||
await moveToFailed(currentEntry.id, opts.stateDir);
|
||||
} catch (err) {
|
||||
if (getErrnoCode(err) === "ENOENT") {
|
||||
opts.log.info(`${opts.logLabel}: entry ${currentEntry.id} already gone, skipping`);
|
||||
continue;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
opts.log.warn(
|
||||
`${opts.logLabel}: entry ${currentEntry.id} exceeded max retries and was moved to failed/`,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!decision.bypassBackoff) {
|
||||
const retryEligibility = isEntryEligibleForRecoveryRetry(currentEntry, Date.now());
|
||||
if (!retryEligibility.eligible) {
|
||||
opts.log.info(
|
||||
`${opts.logLabel}: entry ${currentEntry.id} not ready for retry yet — backoff ${retryEligibility.remainingBackoffMs}ms remaining`,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
const result = await drainQueuedEntry({
|
||||
entry: currentEntry,
|
||||
cfg: opts.cfg,
|
||||
deliver,
|
||||
stateDir: opts.stateDir,
|
||||
onFailed: (failedEntry, errMsg) => {
|
||||
if (isPermanentDeliveryError(errMsg)) {
|
||||
opts.log.warn(
|
||||
`${opts.logLabel}: entry ${failedEntry.id} hit permanent error — moving to failed/: ${errMsg}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
opts.log.warn(`${opts.logLabel}: retry failed for entry ${failedEntry.id}: ${errMsg}`);
|
||||
},
|
||||
});
|
||||
if (result === "recovered") {
|
||||
opts.log.info(
|
||||
`${opts.logLabel}: drained delivery ${currentEntry.id} on ${currentEntry.channel}`,
|
||||
);
|
||||
}
|
||||
} finally {
|
||||
releaseRecoveryEntry(entry.id);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
drainInProgress.delete(opts.accountId);
|
||||
drainInProgress.delete(opts.drainKey);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -289,31 +358,6 @@ export async function recoverPendingDeliveries(opts: {
|
||||
await deferRemainingEntriesForBudget(pending.slice(i), opts.stateDir);
|
||||
break;
|
||||
}
|
||||
if (entry.retryCount >= MAX_RETRIES) {
|
||||
if (!claimRecoveryEntry(entry.id)) {
|
||||
opts.log.info(`Recovery skipped for delivery ${entry.id}: already being processed`);
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
opts.log.warn(
|
||||
`Delivery ${entry.id} exceeded max retries (${entry.retryCount}/${MAX_RETRIES}) — moving to failed/`,
|
||||
);
|
||||
await moveEntryToFailedWithLogging(entry.id, opts.log, opts.stateDir);
|
||||
summary.skippedMaxRetries += 1;
|
||||
} finally {
|
||||
releaseRecoveryEntry(entry.id);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
const retryEligibility = isEntryEligibleForRecoveryRetry(entry, now);
|
||||
if (!retryEligibility.eligible) {
|
||||
summary.deferredBackoff += 1;
|
||||
opts.log.info(
|
||||
`Delivery ${entry.id} not ready for retry yet — backoff ${retryEligibility.remainingBackoffMs}ms remaining`,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!claimRecoveryEntry(entry.id)) {
|
||||
opts.log.info(`Recovery skipped for delivery ${entry.id}: already being processed`);
|
||||
@@ -321,25 +365,53 @@ export async function recoverPendingDeliveries(opts: {
|
||||
}
|
||||
|
||||
try {
|
||||
await opts.deliver(buildRecoveryDeliverParams(entry, opts.cfg));
|
||||
await ackDelivery(entry.id, opts.stateDir);
|
||||
summary.recovered += 1;
|
||||
opts.log.info(`Recovered delivery ${entry.id} to ${entry.channel}:${entry.to}`);
|
||||
} catch (err) {
|
||||
const errMsg = formatErrorMessage(err);
|
||||
if (isPermanentDeliveryError(errMsg)) {
|
||||
opts.log.warn(`Delivery ${entry.id} hit permanent error — moving to failed/: ${errMsg}`);
|
||||
await moveEntryToFailedWithLogging(entry.id, opts.log, opts.stateDir);
|
||||
summary.failed += 1;
|
||||
const currentEntry = await loadPendingDelivery(entry.id, opts.stateDir);
|
||||
if (!currentEntry) {
|
||||
opts.log.info(`Recovery skipped for delivery ${entry.id}: already gone`);
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
await failDelivery(entry.id, errMsg, opts.stateDir);
|
||||
} catch {
|
||||
// Best-effort update.
|
||||
|
||||
if (currentEntry.retryCount >= MAX_RETRIES) {
|
||||
opts.log.warn(
|
||||
`Delivery ${currentEntry.id} exceeded max retries (${currentEntry.retryCount}/${MAX_RETRIES}) — moving to failed/`,
|
||||
);
|
||||
await moveEntryToFailedWithLogging(currentEntry.id, opts.log, opts.stateDir);
|
||||
summary.skippedMaxRetries += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
const currentRetryEligibility = isEntryEligibleForRecoveryRetry(currentEntry, Date.now());
|
||||
if (!currentRetryEligibility.eligible) {
|
||||
summary.deferredBackoff += 1;
|
||||
opts.log.info(
|
||||
`Delivery ${currentEntry.id} not ready for retry yet — backoff ${currentRetryEligibility.remainingBackoffMs}ms remaining`,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
const result = await drainQueuedEntry({
|
||||
entry: currentEntry,
|
||||
cfg: opts.cfg,
|
||||
deliver: opts.deliver,
|
||||
stateDir: opts.stateDir,
|
||||
onRecovered: (recoveredEntry) => {
|
||||
summary.recovered += 1;
|
||||
opts.log.info(`Recovered delivery ${recoveredEntry.id} on ${recoveredEntry.channel}`);
|
||||
},
|
||||
onFailed: (failedEntry, errMsg) => {
|
||||
summary.failed += 1;
|
||||
if (isPermanentDeliveryError(errMsg)) {
|
||||
opts.log.warn(
|
||||
`Delivery ${failedEntry.id} hit permanent error — moving to failed/: ${errMsg}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
opts.log.warn(`Retry failed for delivery ${failedEntry.id}: ${errMsg}`);
|
||||
},
|
||||
});
|
||||
if (result === "moved-to-failed") {
|
||||
continue;
|
||||
}
|
||||
summary.failed += 1;
|
||||
opts.log.warn(`Retry failed for delivery ${entry.id}: ${errMsg}`);
|
||||
} finally {
|
||||
releaseRecoveryEntry(entry.id);
|
||||
}
|
||||
|
||||
@@ -188,6 +188,30 @@ export async function failDelivery(id: string, error: string, stateDir?: string)
|
||||
await writeQueueEntry(filePath, entry);
|
||||
}
|
||||
|
||||
/** Load a single pending delivery entry by ID from the queue directory. */
|
||||
export async function loadPendingDelivery(
|
||||
id: string,
|
||||
stateDir?: string,
|
||||
): Promise<QueuedDelivery | null> {
|
||||
const { jsonPath } = resolveQueueEntryPaths(id, stateDir);
|
||||
try {
|
||||
const stat = await fs.promises.stat(jsonPath);
|
||||
if (!stat.isFile()) {
|
||||
return null;
|
||||
}
|
||||
const { entry, migrated } = normalizeLegacyQueuedDeliveryEntry(await readQueueEntry(jsonPath));
|
||||
if (migrated) {
|
||||
await writeQueueEntry(jsonPath, entry);
|
||||
}
|
||||
return entry;
|
||||
} catch (err) {
|
||||
if (getErrnoCode(err) === "ENOENT") {
|
||||
return null;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
/** Load all pending delivery entries from the queue directory. */
|
||||
export async function loadPendingDeliveries(stateDir?: string): Promise<QueuedDelivery[]> {
|
||||
const queueDir = resolveQueueDir(stateDir);
|
||||
|
||||
@@ -5,7 +5,7 @@ import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vites
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import {
|
||||
type DeliverFn,
|
||||
drainReconnectQueue,
|
||||
drainPendingDeliveries,
|
||||
enqueueDelivery,
|
||||
failDelivery,
|
||||
MAX_RETRIES,
|
||||
@@ -22,8 +22,37 @@ function createMockLogger(): RecoveryLogger {
|
||||
}
|
||||
|
||||
const stubCfg = {} as OpenClawConfig;
|
||||
const NO_LISTENER_ERROR = "No active WhatsApp Web listener";
|
||||
|
||||
describe("drainReconnectQueue", () => {
|
||||
function normalizeReconnectAccountIdForTest(accountId?: string | null): string {
|
||||
return (accountId ?? "").trim() || "default";
|
||||
}
|
||||
|
||||
async function drainWhatsAppReconnectPending(opts: {
|
||||
accountId: string;
|
||||
deliver: DeliverFn;
|
||||
log: RecoveryLogger;
|
||||
stateDir: string;
|
||||
}) {
|
||||
const normalizedAccountId = normalizeReconnectAccountIdForTest(opts.accountId);
|
||||
await drainPendingDeliveries({
|
||||
drainKey: `whatsapp:${normalizedAccountId}`,
|
||||
logLabel: "WhatsApp reconnect drain",
|
||||
cfg: stubCfg,
|
||||
log: opts.log,
|
||||
stateDir: opts.stateDir,
|
||||
deliver: opts.deliver,
|
||||
selectEntry: (entry) => ({
|
||||
match:
|
||||
entry.channel === "whatsapp" &&
|
||||
normalizeReconnectAccountIdForTest(entry.accountId) === normalizedAccountId,
|
||||
bypassBackoff:
|
||||
typeof entry.lastError === "string" && entry.lastError.includes(NO_LISTENER_ERROR),
|
||||
}),
|
||||
});
|
||||
}
|
||||
|
||||
describe("drainPendingDeliveries for WhatsApp reconnect", () => {
|
||||
let fixtureRoot = "";
|
||||
let tmpDir: string;
|
||||
let fixtureCount = 0;
|
||||
@@ -55,12 +84,11 @@ describe("drainReconnectQueue", () => {
|
||||
);
|
||||
await failDelivery(id, "No active WhatsApp Web listener", tmpDir);
|
||||
|
||||
await drainReconnectQueue({
|
||||
await drainWhatsAppReconnectPending({
|
||||
accountId: "acct1",
|
||||
cfg: stubCfg,
|
||||
deliver,
|
||||
log,
|
||||
stateDir: tmpDir,
|
||||
deliver,
|
||||
});
|
||||
|
||||
expect(deliver).toHaveBeenCalledTimes(1);
|
||||
@@ -79,12 +107,11 @@ describe("drainReconnectQueue", () => {
|
||||
);
|
||||
await failDelivery(id, "No active WhatsApp Web listener", tmpDir);
|
||||
|
||||
await drainReconnectQueue({
|
||||
await drainWhatsAppReconnectPending({
|
||||
accountId: "acct1",
|
||||
cfg: stubCfg,
|
||||
deliver,
|
||||
log,
|
||||
stateDir: tmpDir,
|
||||
deliver,
|
||||
});
|
||||
|
||||
// deliver should not be called since no eligible entries for acct1
|
||||
@@ -110,12 +137,11 @@ describe("drainReconnectQueue", () => {
|
||||
lastError?: string;
|
||||
};
|
||||
|
||||
await drainReconnectQueue({
|
||||
await drainWhatsAppReconnectPending({
|
||||
accountId: "acct1",
|
||||
cfg: stubCfg,
|
||||
deliver,
|
||||
log,
|
||||
stateDir: tmpDir,
|
||||
deliver,
|
||||
});
|
||||
|
||||
expect(deliver).toHaveBeenCalledTimes(1);
|
||||
@@ -145,12 +171,11 @@ describe("drainReconnectQueue", () => {
|
||||
|
||||
// Should not throw
|
||||
await expect(
|
||||
drainReconnectQueue({
|
||||
drainWhatsAppReconnectPending({
|
||||
accountId: "acct1",
|
||||
cfg: stubCfg,
|
||||
deliver,
|
||||
log,
|
||||
stateDir: tmpDir,
|
||||
deliver,
|
||||
}),
|
||||
).resolves.toBeUndefined();
|
||||
});
|
||||
@@ -169,12 +194,11 @@ describe("drainReconnectQueue", () => {
|
||||
await failDelivery(id, "No active WhatsApp Web listener", tmpDir);
|
||||
}
|
||||
|
||||
await drainReconnectQueue({
|
||||
await drainWhatsAppReconnectPending({
|
||||
accountId: "acct1",
|
||||
cfg: stubCfg,
|
||||
deliver,
|
||||
log,
|
||||
stateDir: tmpDir,
|
||||
deliver,
|
||||
});
|
||||
|
||||
// Should have moved to failed, not delivered
|
||||
@@ -208,12 +232,12 @@ describe("drainReconnectQueue", () => {
|
||||
entry.retryCount = 1;
|
||||
fs.writeFileSync(entryPath, JSON.stringify(entry, null, 2));
|
||||
|
||||
const opts = { accountId: "acct1", cfg: stubCfg, log, stateDir: tmpDir, deliver };
|
||||
const opts = { accountId: "acct1", log, stateDir: tmpDir, deliver };
|
||||
|
||||
// Start first drain (will block on deliver)
|
||||
const first = drainReconnectQueue(opts);
|
||||
const first = drainWhatsAppReconnectPending(opts);
|
||||
// Start second drain immediately — should be skipped
|
||||
const second = drainReconnectQueue(opts);
|
||||
const second = drainWhatsAppReconnectPending(opts);
|
||||
await second;
|
||||
|
||||
expect(log.info).toHaveBeenCalledWith(expect.stringContaining("already in progress"));
|
||||
@@ -263,12 +287,11 @@ describe("drainReconnectQueue", () => {
|
||||
expect(deliver).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
await drainReconnectQueue({
|
||||
await drainWhatsAppReconnectPending({
|
||||
accountId: "acct1",
|
||||
cfg: stubCfg,
|
||||
deliver,
|
||||
log,
|
||||
stateDir: tmpDir,
|
||||
deliver,
|
||||
});
|
||||
|
||||
expect(deliver).toHaveBeenCalledTimes(1);
|
||||
@@ -279,4 +302,165 @@ describe("drainReconnectQueue", () => {
|
||||
resolveDeliver!();
|
||||
await startupRecovery;
|
||||
});
|
||||
|
||||
it("does not re-deliver a stale startup snapshot after reconnect already acked it", async () => {
|
||||
const log = createMockLogger();
|
||||
const startupLog = createMockLogger();
|
||||
let releaseBlocker: () => void;
|
||||
const blocker = new Promise<void>((resolve) => {
|
||||
releaseBlocker = resolve;
|
||||
});
|
||||
const deliveredTargets: string[] = [];
|
||||
const deliver = vi.fn<DeliverFn>(async ({ to }) => {
|
||||
deliveredTargets.push(to);
|
||||
if (to === "+1000") {
|
||||
await blocker;
|
||||
}
|
||||
});
|
||||
|
||||
await enqueueDelivery(
|
||||
{ channel: "demo-channel-a", to: "+1000", payloads: [{ text: "blocker" }] },
|
||||
tmpDir,
|
||||
);
|
||||
await enqueueDelivery(
|
||||
{ channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" },
|
||||
tmpDir,
|
||||
);
|
||||
|
||||
const startupRecovery = recoverPendingDeliveries({
|
||||
cfg: stubCfg,
|
||||
deliver,
|
||||
log: startupLog,
|
||||
stateDir: tmpDir,
|
||||
});
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(deliver).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ channel: "demo-channel-a", to: "+1000" }),
|
||||
);
|
||||
});
|
||||
|
||||
await drainWhatsAppReconnectPending({
|
||||
accountId: "acct1",
|
||||
deliver,
|
||||
log,
|
||||
stateDir: tmpDir,
|
||||
});
|
||||
|
||||
releaseBlocker!();
|
||||
await startupRecovery;
|
||||
|
||||
expect(deliver).toHaveBeenCalledTimes(2);
|
||||
expect(deliveredTargets.filter((target) => target === "+1555")).toHaveLength(1);
|
||||
expect(startupLog.info).toHaveBeenCalledWith(
|
||||
expect.stringContaining("Recovery skipped for delivery"),
|
||||
);
|
||||
});
|
||||
it("drains fresh pending WhatsApp entries for the reconnecting account", async () => {
|
||||
const log = createMockLogger();
|
||||
const deliver = vi.fn<DeliverFn>(async () => {});
|
||||
|
||||
await enqueueDelivery(
|
||||
{ channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" },
|
||||
tmpDir,
|
||||
);
|
||||
|
||||
await drainWhatsAppReconnectPending({
|
||||
accountId: "acct1",
|
||||
deliver,
|
||||
log,
|
||||
stateDir: tmpDir,
|
||||
});
|
||||
|
||||
expect(deliver).toHaveBeenCalledTimes(1);
|
||||
expect(
|
||||
fs.readdirSync(path.join(tmpDir, "delivery-queue")).filter((f) => f.endsWith(".json")),
|
||||
).toEqual([]);
|
||||
});
|
||||
|
||||
it("drains backoff-eligible WhatsApp retries on reconnect", async () => {
|
||||
const log = createMockLogger();
|
||||
const deliver = vi.fn<DeliverFn>(async () => {});
|
||||
|
||||
const id = await enqueueDelivery(
|
||||
{ channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" },
|
||||
tmpDir,
|
||||
);
|
||||
await failDelivery(id, "network down", tmpDir);
|
||||
const entryPath = path.join(tmpDir, "delivery-queue", `${id}.json`);
|
||||
const entry = JSON.parse(fs.readFileSync(entryPath, "utf-8")) as {
|
||||
lastAttemptAt?: number;
|
||||
};
|
||||
entry.lastAttemptAt = Date.now() - 30_000;
|
||||
fs.writeFileSync(entryPath, JSON.stringify(entry, null, 2));
|
||||
|
||||
await drainWhatsAppReconnectPending({
|
||||
accountId: "acct1",
|
||||
deliver,
|
||||
log,
|
||||
stateDir: tmpDir,
|
||||
});
|
||||
|
||||
expect(deliver).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("does not bypass backoff for ordinary transient errors on reconnect", async () => {
|
||||
const log = createMockLogger();
|
||||
const deliver = vi.fn<DeliverFn>(async () => {});
|
||||
|
||||
const id = await enqueueDelivery(
|
||||
{ channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" },
|
||||
tmpDir,
|
||||
);
|
||||
await failDelivery(id, "network down", tmpDir);
|
||||
|
||||
await drainWhatsAppReconnectPending({
|
||||
accountId: "acct1",
|
||||
deliver,
|
||||
log,
|
||||
stateDir: tmpDir,
|
||||
});
|
||||
|
||||
expect(deliver).not.toHaveBeenCalled();
|
||||
expect(log.info).toHaveBeenCalledWith(expect.stringContaining("not ready for retry yet"));
|
||||
});
|
||||
|
||||
it("still bypasses backoff for no-listener failures on reconnect", async () => {
|
||||
const log = createMockLogger();
|
||||
const deliver = vi.fn<DeliverFn>(async () => {});
|
||||
|
||||
const id = await enqueueDelivery(
|
||||
{ channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" },
|
||||
tmpDir,
|
||||
);
|
||||
await failDelivery(id, NO_LISTENER_ERROR, tmpDir);
|
||||
|
||||
await drainWhatsAppReconnectPending({
|
||||
accountId: "acct1",
|
||||
deliver,
|
||||
log,
|
||||
stateDir: tmpDir,
|
||||
});
|
||||
|
||||
expect(deliver).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("ignores non-WhatsApp entries even when reconnect drain runs", async () => {
|
||||
const log = createMockLogger();
|
||||
const deliver = vi.fn<DeliverFn>(async () => {});
|
||||
|
||||
await enqueueDelivery(
|
||||
{ channel: "telegram", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" },
|
||||
tmpDir,
|
||||
);
|
||||
|
||||
await drainWhatsAppReconnectPending({
|
||||
accountId: "acct1",
|
||||
deliver,
|
||||
log,
|
||||
stateDir: tmpDir,
|
||||
});
|
||||
|
||||
expect(deliver).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -3,16 +3,22 @@ export {
|
||||
enqueueDelivery,
|
||||
ensureQueueDir,
|
||||
failDelivery,
|
||||
loadPendingDelivery,
|
||||
loadPendingDeliveries,
|
||||
moveToFailed,
|
||||
} from "./delivery-queue-storage.js";
|
||||
export type { QueuedDelivery, QueuedDeliveryPayload } from "./delivery-queue-storage.js";
|
||||
export {
|
||||
computeBackoffMs,
|
||||
drainReconnectQueue,
|
||||
drainPendingDeliveries,
|
||||
isEntryEligibleForRecoveryRetry,
|
||||
isPermanentDeliveryError,
|
||||
MAX_RETRIES,
|
||||
recoverPendingDeliveries,
|
||||
} from "./delivery-queue-recovery.js";
|
||||
export type { DeliverFn, RecoveryLogger, RecoverySummary } from "./delivery-queue-recovery.js";
|
||||
export type {
|
||||
DeliverFn,
|
||||
PendingDeliveryDrainDecision,
|
||||
RecoveryLogger,
|
||||
RecoverySummary,
|
||||
} from "./delivery-queue-recovery.js";
|
||||
|
||||
@@ -1,5 +1,49 @@
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import {
|
||||
drainPendingDeliveries,
|
||||
type DeliverFn,
|
||||
type RecoveryLogger,
|
||||
} from "../infra/outbound/delivery-queue.js";
|
||||
|
||||
// Public runtime/transport helpers for plugins that need shared infra behavior.
|
||||
|
||||
function normalizeWhatsAppReconnectAccountId(accountId?: string): string {
|
||||
return (accountId ?? "").trim() || "default";
|
||||
}
|
||||
|
||||
const WHATSAPP_NO_LISTENER_ERROR_RE = /No active WhatsApp Web listener/i;
|
||||
|
||||
/**
|
||||
* @deprecated Prefer plugin-owned reconnect policy wired through
|
||||
* `drainPendingDeliveries(...)`. This compatibility shim preserves the
|
||||
* historical public SDK symbol for existing plugin callers.
|
||||
*/
|
||||
export async function drainReconnectQueue(opts: {
|
||||
accountId: string;
|
||||
cfg: OpenClawConfig;
|
||||
log: RecoveryLogger;
|
||||
stateDir?: string;
|
||||
deliver?: DeliverFn;
|
||||
}): Promise<void> {
|
||||
const normalizedAccountId = normalizeWhatsAppReconnectAccountId(opts.accountId);
|
||||
await drainPendingDeliveries({
|
||||
drainKey: `whatsapp:${normalizedAccountId}`,
|
||||
logLabel: "WhatsApp reconnect drain",
|
||||
cfg: opts.cfg,
|
||||
log: opts.log,
|
||||
stateDir: opts.stateDir,
|
||||
deliver: opts.deliver,
|
||||
selectEntry: (entry) => ({
|
||||
match:
|
||||
entry.channel === "whatsapp" &&
|
||||
normalizeWhatsAppReconnectAccountId(entry.accountId) === normalizedAccountId &&
|
||||
typeof entry.lastError === "string" &&
|
||||
WHATSAPP_NO_LISTENER_ERROR_RE.test(entry.lastError),
|
||||
bypassBackoff: true,
|
||||
}),
|
||||
});
|
||||
}
|
||||
|
||||
export * from "../infra/backoff.js";
|
||||
export * from "../infra/channel-activity.js";
|
||||
export * from "../infra/dedupe.js";
|
||||
@@ -32,7 +76,7 @@ export * from "../infra/net/proxy-env.js";
|
||||
export * from "../infra/net/proxy-fetch.js";
|
||||
export * from "../infra/net/undici-global-dispatcher.js";
|
||||
export * from "../infra/net/ssrf.js";
|
||||
export { drainReconnectQueue } from "../infra/outbound/delivery-queue.js";
|
||||
export { drainPendingDeliveries };
|
||||
export * from "../infra/outbound/identity.js";
|
||||
export * from "../infra/outbound/sanitize-text.js";
|
||||
export * from "../infra/parse-finite-number.js";
|
||||
|
||||
Reference in New Issue
Block a user