mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-12 09:41:11 +00:00
Merged via squash.
Prepared head SHA: 5ce763406e
Co-authored-by: manuel-claw <268194568+manuel-claw@users.noreply.github.com>
Co-authored-by: mcaxtr <7562095+mcaxtr@users.noreply.github.com>
Reviewed-by: @mcaxtr
This commit is contained in:
@@ -12,6 +12,7 @@ Docs: https://docs.openclaw.ai
|
||||
- fix(exec): replace TOCTOU check-then-read with atomic pinned-fd open in script preflight [AI]. (#62333) Thanks @pgondhi987.
|
||||
- WhatsApp/auto-reply: keep inbound reply, media, and composing sends on the current socket across reconnects, wait through reconnect gaps, and retry timeout-only send failures without dropping the active socket ref. (#62892) Thanks @mcaxtr.
|
||||
- Config/plugins: let config writes keep disabled plugin entries without forcing required plugin config schemas or crashing raw plugin validation, so slot switches and similar plugin-state updates persist cleanly. (#63296) Thanks @fuller-stack-dev.
|
||||
- WhatsApp/outbound queue: drain queued WhatsApp deliveries when the listener reconnects without dropping reconnect-delayed sends after a special TTL or rewriting retry history, so disconnect-window outbound messages can recover once the channel is ready again. (#46299) Thanks @manuel-claw.
|
||||
|
||||
## 2026.4.9
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ import { resolveInboundDebounceMs } from "openclaw/plugin-sdk/channel-inbound";
|
||||
import { formatCliCommand } from "openclaw/plugin-sdk/cli-runtime";
|
||||
import { waitForever } from "openclaw/plugin-sdk/cli-runtime";
|
||||
import { hasControlCommand } from "openclaw/plugin-sdk/command-detection";
|
||||
import { drainReconnectQueue } from "openclaw/plugin-sdk/infra-runtime";
|
||||
import { enqueueSystemEvent } from "openclaw/plugin-sdk/infra-runtime";
|
||||
import { DEFAULT_GROUP_HISTORY_LIMIT } from "openclaw/plugin-sdk/reply-history";
|
||||
import { resolveAgentRoute } from "openclaw/plugin-sdk/routing";
|
||||
@@ -259,6 +260,19 @@ export async function monitorWebChannel(
|
||||
});
|
||||
|
||||
setActiveWebListener(account.accountId, listener);
|
||||
|
||||
// Drain any messages that failed with "no listener" during the disconnect window.
|
||||
void drainReconnectQueue({
|
||||
accountId: account.accountId,
|
||||
cfg,
|
||||
log: reconnectLogger,
|
||||
}).catch((err) => {
|
||||
reconnectLogger.warn(
|
||||
{ connectionId: active.connectionId, error: String(err) },
|
||||
"reconnect drain failed",
|
||||
);
|
||||
});
|
||||
|
||||
active.unregisterUnhandled = registerUnhandledRejectionHandler((reason) => {
|
||||
if (!isLikelyWhatsAppCryptoError(reason)) {
|
||||
return false;
|
||||
|
||||
@@ -54,6 +54,30 @@ const PERMANENT_ERROR_PATTERNS: readonly RegExp[] = [
|
||||
/User .* not in room/i,
|
||||
];
|
||||
|
||||
const NO_LISTENER_ERROR_RE = /No active WhatsApp Web listener/i;
|
||||
|
||||
const drainInProgress = new Map<string, boolean>();
|
||||
const entriesInProgress = new Set<string>();
|
||||
|
||||
type DeliverRuntimeModule = typeof import("./deliver-runtime.js");
|
||||
|
||||
let deliverRuntimePromise: Promise<DeliverRuntimeModule> | null = null;
|
||||
|
||||
function loadDeliverRuntime() {
|
||||
deliverRuntimePromise ??= import("./deliver-runtime.js");
|
||||
return deliverRuntimePromise;
|
||||
}
|
||||
|
||||
function normalizeQueueAccountId(accountId?: string): string {
|
||||
return (accountId ?? "").trim() || "default";
|
||||
}
|
||||
|
||||
function getErrnoCode(err: unknown): string | null {
|
||||
return err && typeof err === "object" && "code" in err
|
||||
? String((err as { code?: unknown }).code)
|
||||
: null;
|
||||
}
|
||||
|
||||
function createEmptyRecoverySummary(): RecoverySummary {
|
||||
return {
|
||||
recovered: 0,
|
||||
@@ -63,6 +87,18 @@ function createEmptyRecoverySummary(): RecoverySummary {
|
||||
};
|
||||
}
|
||||
|
||||
function claimRecoveryEntry(entryId: string): boolean {
|
||||
if (entriesInProgress.has(entryId)) {
|
||||
return false;
|
||||
}
|
||||
entriesInProgress.add(entryId);
|
||||
return true;
|
||||
}
|
||||
|
||||
function releaseRecoveryEntry(entryId: string): void {
|
||||
entriesInProgress.delete(entryId);
|
||||
}
|
||||
|
||||
function buildRecoveryDeliverParams(entry: QueuedDelivery, cfg: OpenClawConfig) {
|
||||
return {
|
||||
cfg,
|
||||
@@ -143,6 +179,85 @@ export function isPermanentDeliveryError(error: string): boolean {
|
||||
return PERMANENT_ERROR_PATTERNS.some((re) => re.test(error));
|
||||
}
|
||||
|
||||
export async function drainReconnectQueue(opts: {
|
||||
accountId: string;
|
||||
cfg: OpenClawConfig;
|
||||
log: RecoveryLogger;
|
||||
stateDir?: string;
|
||||
deliver?: DeliverFn;
|
||||
}): Promise<void> {
|
||||
if (drainInProgress.get(opts.accountId)) {
|
||||
opts.log.info(
|
||||
`WhatsApp reconnect drain: already in progress for account ${opts.accountId}, skipping`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
drainInProgress.set(opts.accountId, true);
|
||||
try {
|
||||
const matchingEntries = (await loadPendingDeliveries(opts.stateDir))
|
||||
.filter(
|
||||
(entry) =>
|
||||
entry.channel === "whatsapp" &&
|
||||
normalizeQueueAccountId(entry.accountId) === opts.accountId &&
|
||||
typeof entry.lastError === "string" &&
|
||||
NO_LISTENER_ERROR_RE.test(entry.lastError),
|
||||
)
|
||||
.toSorted((a, b) => a.enqueuedAt - b.enqueuedAt);
|
||||
|
||||
if (matchingEntries.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
opts.log.info(
|
||||
`WhatsApp reconnect drain: ${matchingEntries.length} pending message(s) for account ${opts.accountId}`,
|
||||
);
|
||||
|
||||
const deliver = opts.deliver ?? (await loadDeliverRuntime()).deliverOutboundPayloads;
|
||||
|
||||
for (const entry of matchingEntries) {
|
||||
if (!claimRecoveryEntry(entry.id)) {
|
||||
opts.log.info(`WhatsApp reconnect drain: entry ${entry.id} is already being recovered`);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (entry.retryCount >= MAX_RETRIES) {
|
||||
try {
|
||||
await moveToFailed(entry.id, opts.stateDir);
|
||||
} catch (err) {
|
||||
if (getErrnoCode(err) === "ENOENT") {
|
||||
opts.log.info(`reconnect drain: entry ${entry.id} already gone, skipping`);
|
||||
continue;
|
||||
}
|
||||
throw err;
|
||||
} finally {
|
||||
releaseRecoveryEntry(entry.id);
|
||||
}
|
||||
opts.log.warn(
|
||||
`WhatsApp reconnect drain: entry ${entry.id} exceeded max retries and was moved to failed/`,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
await deliver(buildRecoveryDeliverParams(entry, opts.cfg));
|
||||
await ackDelivery(entry.id, opts.stateDir);
|
||||
} catch (err) {
|
||||
const errMsg = formatErrorMessage(err);
|
||||
if (isPermanentDeliveryError(errMsg)) {
|
||||
await moveToFailed(entry.id, opts.stateDir).catch(() => {});
|
||||
} else {
|
||||
await failDelivery(entry.id, errMsg, opts.stateDir).catch(() => {});
|
||||
}
|
||||
} finally {
|
||||
releaseRecoveryEntry(entry.id);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
drainInProgress.delete(opts.accountId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* On gateway startup, scan the delivery queue and retry any pending entries.
|
||||
* Uses exponential backoff and moves entries that exceed MAX_RETRIES to failed/.
|
||||
@@ -175,11 +290,19 @@ export async function recoverPendingDeliveries(opts: {
|
||||
break;
|
||||
}
|
||||
if (entry.retryCount >= MAX_RETRIES) {
|
||||
opts.log.warn(
|
||||
`Delivery ${entry.id} exceeded max retries (${entry.retryCount}/${MAX_RETRIES}) — moving to failed/`,
|
||||
);
|
||||
await moveEntryToFailedWithLogging(entry.id, opts.log, opts.stateDir);
|
||||
summary.skippedMaxRetries += 1;
|
||||
if (!claimRecoveryEntry(entry.id)) {
|
||||
opts.log.info(`Recovery skipped for delivery ${entry.id}: already being processed`);
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
opts.log.warn(
|
||||
`Delivery ${entry.id} exceeded max retries (${entry.retryCount}/${MAX_RETRIES}) — moving to failed/`,
|
||||
);
|
||||
await moveEntryToFailedWithLogging(entry.id, opts.log, opts.stateDir);
|
||||
summary.skippedMaxRetries += 1;
|
||||
} finally {
|
||||
releaseRecoveryEntry(entry.id);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -192,6 +315,11 @@ export async function recoverPendingDeliveries(opts: {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!claimRecoveryEntry(entry.id)) {
|
||||
opts.log.info(`Recovery skipped for delivery ${entry.id}: already being processed`);
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
await opts.deliver(buildRecoveryDeliverParams(entry, opts.cfg));
|
||||
await ackDelivery(entry.id, opts.stateDir);
|
||||
@@ -212,6 +340,8 @@ export async function recoverPendingDeliveries(opts: {
|
||||
}
|
||||
summary.failed += 1;
|
||||
opts.log.warn(`Retry failed for delivery ${entry.id}: ${errMsg}`);
|
||||
} finally {
|
||||
releaseRecoveryEntry(entry.id);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -38,7 +38,7 @@ export interface QueuedDelivery extends QueuedDeliveryPayload {
|
||||
lastError?: string;
|
||||
}
|
||||
|
||||
function resolveQueueDir(stateDir?: string): string {
|
||||
export function resolveQueueDir(stateDir?: string): string {
|
||||
const base = stateDir ?? resolveStateDir();
|
||||
return path.join(base, QUEUE_DIRNAME);
|
||||
}
|
||||
|
||||
282
src/infra/outbound/delivery-queue.reconnect-drain.test.ts
Normal file
282
src/infra/outbound/delivery-queue.reconnect-drain.test.ts
Normal file
@@ -0,0 +1,282 @@
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import {
|
||||
type DeliverFn,
|
||||
drainReconnectQueue,
|
||||
enqueueDelivery,
|
||||
failDelivery,
|
||||
MAX_RETRIES,
|
||||
type RecoveryLogger,
|
||||
recoverPendingDeliveries,
|
||||
} from "./delivery-queue.js";
|
||||
|
||||
function createMockLogger(): RecoveryLogger {
|
||||
return {
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
const stubCfg = {} as OpenClawConfig;
|
||||
|
||||
describe("drainReconnectQueue", () => {
|
||||
let fixtureRoot = "";
|
||||
let tmpDir: string;
|
||||
let fixtureCount = 0;
|
||||
|
||||
beforeAll(() => {
|
||||
fixtureRoot = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-drain-"));
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
tmpDir = path.join(fixtureRoot, `case-${fixtureCount++}`);
|
||||
fs.mkdirSync(tmpDir, { recursive: true });
|
||||
});
|
||||
|
||||
afterAll(() => {
|
||||
if (!fixtureRoot) {
|
||||
return;
|
||||
}
|
||||
fs.rmSync(fixtureRoot, { recursive: true, force: true });
|
||||
fixtureRoot = "";
|
||||
});
|
||||
|
||||
it("drains entries that failed with 'no listener' error", async () => {
|
||||
const log = createMockLogger();
|
||||
const deliver = vi.fn<DeliverFn>(async () => {});
|
||||
|
||||
const id = await enqueueDelivery(
|
||||
{ channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" },
|
||||
tmpDir,
|
||||
);
|
||||
await failDelivery(id, "No active WhatsApp Web listener", tmpDir);
|
||||
|
||||
await drainReconnectQueue({
|
||||
accountId: "acct1",
|
||||
cfg: stubCfg,
|
||||
log,
|
||||
stateDir: tmpDir,
|
||||
deliver,
|
||||
});
|
||||
|
||||
expect(deliver).toHaveBeenCalledTimes(1);
|
||||
expect(deliver).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ channel: "whatsapp", to: "+1555", skipQueue: true }),
|
||||
);
|
||||
});
|
||||
|
||||
it("skips entries from other accounts", async () => {
|
||||
const log = createMockLogger();
|
||||
const deliver = vi.fn<DeliverFn>(async () => {});
|
||||
|
||||
const id = await enqueueDelivery(
|
||||
{ channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "other" },
|
||||
tmpDir,
|
||||
);
|
||||
await failDelivery(id, "No active WhatsApp Web listener", tmpDir);
|
||||
|
||||
await drainReconnectQueue({
|
||||
accountId: "acct1",
|
||||
cfg: stubCfg,
|
||||
log,
|
||||
stateDir: tmpDir,
|
||||
deliver,
|
||||
});
|
||||
|
||||
// deliver should not be called since no eligible entries for acct1
|
||||
expect(deliver).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("retries immediately without resetting retry history", async () => {
|
||||
const log = createMockLogger();
|
||||
const deliver = vi.fn<DeliverFn>(async () => {
|
||||
throw new Error("transient failure");
|
||||
});
|
||||
|
||||
const id = await enqueueDelivery(
|
||||
{ channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" },
|
||||
tmpDir,
|
||||
);
|
||||
await failDelivery(id, "No active WhatsApp Web listener", tmpDir);
|
||||
const queueDir = path.join(tmpDir, "delivery-queue");
|
||||
const filePath = path.join(queueDir, `${id}.json`);
|
||||
const before = JSON.parse(fs.readFileSync(filePath, "utf-8")) as {
|
||||
retryCount: number;
|
||||
lastAttemptAt?: number;
|
||||
lastError?: string;
|
||||
};
|
||||
|
||||
await drainReconnectQueue({
|
||||
accountId: "acct1",
|
||||
cfg: stubCfg,
|
||||
log,
|
||||
stateDir: tmpDir,
|
||||
deliver,
|
||||
});
|
||||
|
||||
expect(deliver).toHaveBeenCalledTimes(1);
|
||||
|
||||
const after = JSON.parse(fs.readFileSync(filePath, "utf-8")) as {
|
||||
retryCount: number;
|
||||
lastAttemptAt?: number;
|
||||
lastError?: string;
|
||||
};
|
||||
expect(after.retryCount).toBe(before.retryCount + 1);
|
||||
expect(after.lastAttemptAt).toBeTypeOf("number");
|
||||
expect(after.lastAttemptAt).toBeGreaterThanOrEqual(before.lastAttemptAt ?? 0);
|
||||
expect(after.lastError).toBe("transient failure");
|
||||
});
|
||||
|
||||
it("does not throw if delivery fails during drain", async () => {
|
||||
const log = createMockLogger();
|
||||
const deliver = vi.fn<DeliverFn>(async () => {
|
||||
throw new Error("transient failure");
|
||||
});
|
||||
|
||||
const id = await enqueueDelivery(
|
||||
{ channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" },
|
||||
tmpDir,
|
||||
);
|
||||
await failDelivery(id, "No active WhatsApp Web listener", tmpDir);
|
||||
|
||||
// Should not throw
|
||||
await expect(
|
||||
drainReconnectQueue({
|
||||
accountId: "acct1",
|
||||
cfg: stubCfg,
|
||||
log,
|
||||
stateDir: tmpDir,
|
||||
deliver,
|
||||
}),
|
||||
).resolves.toBeUndefined();
|
||||
});
|
||||
|
||||
it("skips entries where retryCount >= MAX_RETRIES", async () => {
|
||||
const log = createMockLogger();
|
||||
const deliver = vi.fn<DeliverFn>(async () => {});
|
||||
|
||||
const id = await enqueueDelivery(
|
||||
{ channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" },
|
||||
tmpDir,
|
||||
);
|
||||
|
||||
// Bump retryCount to MAX_RETRIES
|
||||
for (let i = 0; i < MAX_RETRIES; i++) {
|
||||
await failDelivery(id, "No active WhatsApp Web listener", tmpDir);
|
||||
}
|
||||
|
||||
await drainReconnectQueue({
|
||||
accountId: "acct1",
|
||||
cfg: stubCfg,
|
||||
log,
|
||||
stateDir: tmpDir,
|
||||
deliver,
|
||||
});
|
||||
|
||||
// Should have moved to failed, not delivered
|
||||
expect(deliver).not.toHaveBeenCalled();
|
||||
const failedDir = path.join(tmpDir, "delivery-queue", "failed");
|
||||
const failedFiles = fs.readdirSync(failedDir).filter((f) => f.endsWith(".json"));
|
||||
expect(failedFiles).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("second concurrent call is skipped (concurrency guard)", async () => {
|
||||
const log = createMockLogger();
|
||||
let resolveDeliver: () => void;
|
||||
const deliverPromise = new Promise<void>((resolve) => {
|
||||
resolveDeliver = resolve;
|
||||
});
|
||||
const deliver = vi.fn<DeliverFn>(async () => {
|
||||
await deliverPromise;
|
||||
});
|
||||
|
||||
await enqueueDelivery(
|
||||
{ channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" },
|
||||
tmpDir,
|
||||
);
|
||||
// Fail it so it matches the "no listener" filter
|
||||
const pending = fs
|
||||
.readdirSync(path.join(tmpDir, "delivery-queue"))
|
||||
.filter((f) => f.endsWith(".json"));
|
||||
const entryPath = path.join(tmpDir, "delivery-queue", pending[0]);
|
||||
const entry = JSON.parse(fs.readFileSync(entryPath, "utf-8"));
|
||||
entry.lastError = "No active WhatsApp Web listener";
|
||||
entry.retryCount = 1;
|
||||
fs.writeFileSync(entryPath, JSON.stringify(entry, null, 2));
|
||||
|
||||
const opts = { accountId: "acct1", cfg: stubCfg, log, stateDir: tmpDir, deliver };
|
||||
|
||||
// Start first drain (will block on deliver)
|
||||
const first = drainReconnectQueue(opts);
|
||||
// Start second drain immediately — should be skipped
|
||||
const second = drainReconnectQueue(opts);
|
||||
await second;
|
||||
|
||||
expect(log.info).toHaveBeenCalledWith(expect.stringContaining("already in progress"));
|
||||
|
||||
// Unblock first drain
|
||||
resolveDeliver!();
|
||||
await first;
|
||||
});
|
||||
|
||||
it("does not re-deliver an entry already being recovered at startup", async () => {
|
||||
const log = createMockLogger();
|
||||
const startupLog = createMockLogger();
|
||||
let resolveDeliver: () => void;
|
||||
const deliverPromise = new Promise<void>((resolve) => {
|
||||
resolveDeliver = resolve;
|
||||
});
|
||||
const deliver = vi.fn<DeliverFn>(async () => {
|
||||
await deliverPromise;
|
||||
});
|
||||
|
||||
const id = await enqueueDelivery(
|
||||
{ channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" },
|
||||
tmpDir,
|
||||
);
|
||||
const queuePath = path.join(tmpDir, "delivery-queue", `${id}.json`);
|
||||
const entry = JSON.parse(fs.readFileSync(queuePath, "utf-8")) as {
|
||||
id: string;
|
||||
enqueuedAt: number;
|
||||
channel: string;
|
||||
to: string;
|
||||
accountId?: string;
|
||||
payloads: Array<{ text: string }>;
|
||||
retryCount: number;
|
||||
lastError?: string;
|
||||
};
|
||||
entry.lastError = "No active WhatsApp Web listener";
|
||||
fs.writeFileSync(queuePath, JSON.stringify(entry, null, 2));
|
||||
|
||||
const startupRecovery = recoverPendingDeliveries({
|
||||
cfg: stubCfg,
|
||||
deliver,
|
||||
log: startupLog,
|
||||
stateDir: tmpDir,
|
||||
});
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(deliver).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
await drainReconnectQueue({
|
||||
accountId: "acct1",
|
||||
cfg: stubCfg,
|
||||
log,
|
||||
stateDir: tmpDir,
|
||||
deliver,
|
||||
});
|
||||
|
||||
expect(deliver).toHaveBeenCalledTimes(1);
|
||||
expect(log.info).toHaveBeenCalledWith(
|
||||
expect.stringContaining(`entry ${id} is already being recovered`),
|
||||
);
|
||||
|
||||
resolveDeliver!();
|
||||
await startupRecovery;
|
||||
});
|
||||
});
|
||||
@@ -9,6 +9,7 @@ export {
|
||||
export type { QueuedDelivery, QueuedDeliveryPayload } from "./delivery-queue-storage.js";
|
||||
export {
|
||||
computeBackoffMs,
|
||||
drainReconnectQueue,
|
||||
isEntryEligibleForRecoveryRetry,
|
||||
isPermanentDeliveryError,
|
||||
MAX_RETRIES,
|
||||
|
||||
@@ -32,6 +32,7 @@ export * from "../infra/net/proxy-env.js";
|
||||
export * from "../infra/net/proxy-fetch.js";
|
||||
export * from "../infra/net/undici-global-dispatcher.js";
|
||||
export * from "../infra/net/ssrf.js";
|
||||
export { drainReconnectQueue } from "../infra/outbound/delivery-queue.js";
|
||||
export * from "../infra/outbound/identity.js";
|
||||
export * from "../infra/outbound/sanitize-text.js";
|
||||
export * from "../infra/parse-finite-number.js";
|
||||
|
||||
Reference in New Issue
Block a user