fix: harden whatsapp transport liveness

This commit is contained in:
Marcus Castro
2026-04-28 23:50:22 -03:00
parent cd2e3e0444
commit 38738f83e2
10 changed files with 226 additions and 7 deletions

View File

@@ -1394,6 +1394,7 @@ Docs: https://docs.openclaw.ai
- Plugins/QR: replace legacy `qrcode-terminal` QR rendering with bounded `qrcode-tui` helpers for plugin login/setup flows. (#65969) Thanks @vincentkoc.
- Voice-call/realtime: wait for OpenAI session configuration before greeting or forwarding buffered audio, and reject non-allowlisted Twilio callers before stream setup. (#43501) Thanks @forrestblount.
- ACPX/Codex: stop materializing `auth.json` bridge files for Codex ACP, Codex app-server, and Codex CLI runs; Codex-owned runtimes now use their normal `CODEX_HOME`/`~/.codex` auth path directly.
- WhatsApp/reliability: publish real transport-liveness into WhatsApp channel status and force earlier reconnects on silent transport stalls, so quiet healthy sessions stay connected while wedged sockets recover before the later remote 408 path. (#72656) Thanks @Sathvik-1007.
- Auto-reply/system events: route async exec-event completion replies through the persisted session delivery context, so long-running command results return to the originating channel instead of being dropped when live origin metadata is missing. (#70258) Thanks @wzfukui.
- Gateway/sessions: extend the webchat session-mutation guard to `sessions.compact` and `sessions.compaction.restore`, so `WEBCHAT_UI` clients are rejected from compaction-side session mutations consistently with the existing patch/delete guards. (#70716) Thanks @drobison00.
- QA channel/security: reject non-HTTP(S) inbound attachment URLs before media fetch, and log rejected schemes so suspicious or misconfigured payloads are visible during debugging. (#70708) Thanks @vincentkoc.

View File

@@ -150,6 +150,7 @@ OpenClaw recommends running WhatsApp on a separate number when possible. (The ch
- Baileys socket timings are explicit under `web.whatsapp.*`: `keepAliveIntervalMs` controls WhatsApp Web application pings, `connectTimeoutMs` controls the opening handshake timeout, and `defaultQueryTimeoutMs` controls Baileys query timeouts.
- Outbound sends require an active WhatsApp listener for the target account.
- Status and broadcast chats are ignored (`@status`, `@broadcast`).
- The reconnect watchdog follows WhatsApp Web transport activity, not only inbound app-message volume: quiet linked-device sessions stay up while transport frames continue, but a transport stall forces reconnect well before the later remote disconnect path.
- Direct chats use DM session rules (`session.dmScope`; default `main` collapses DMs to the agent main session).
- Group sessions are isolated (`agent:<agentId>:whatsapp:group:<jid>`).
- WhatsApp Web transport honors standard proxy environment variables on the gateway host (`HTTPS_PROXY`, `HTTP_PROXY`, `NO_PROXY` / lowercase variants). Prefer host-level proxy config over channel-specific WhatsApp proxy settings.

View File

@@ -45,8 +45,13 @@ type WebAutoReplyMonitorHarness = {
run: Promise<unknown>;
};
type MockSessionSocket = {
ev: { on: ReturnType<typeof vi.fn>; off: ReturnType<typeof vi.fn> };
ws: EventEmitter & { close: ReturnType<typeof vi.fn> };
ev: {
on: ReturnType<typeof vi.fn>;
off: ReturnType<typeof vi.fn>;
};
ws: EventEmitter & {
close: ReturnType<typeof vi.fn>;
};
user: { id: string };
};
@@ -68,7 +73,7 @@ vi.mock("./session.js", async () => {
createWaSocket: vi.fn(async () => {
const ws = new EventEmitter() as MockSessionSocket["ws"];
ws.close = vi.fn();
const sock: MockSessionSocket = {
const socket: MockSessionSocket = {
ev: {
on: vi.fn(),
off: vi.fn(),
@@ -76,8 +81,8 @@ vi.mock("./session.js", async () => {
ws,
user: { id: "123@s.whatsapp.net" },
};
getSessionSockets().push(sock);
return sock;
getSessionSockets().push(socket);
return socket;
}),
waitForWaConnection: vi.fn().mockResolvedValue(undefined),
};
@@ -309,6 +314,7 @@ export function startWebAutoReplyMonitor(params: {
sleep: UnknownMock | AsyncUnknownMock;
signal?: AbortSignal;
heartbeatSeconds?: number;
transportTimeoutMs?: number;
messageTimeoutMs?: number;
watchdogCheckMs?: number;
reconnect?: { initialMs: number; maxMs: number; maxAttempts: number; factor: number };
@@ -326,6 +332,7 @@ export function startWebAutoReplyMonitor(params: {
params.signal ?? controller.signal,
{
heartbeatSeconds: params.heartbeatSeconds ?? 1,
transportTimeoutMs: params.transportTimeoutMs,
messageTimeoutMs: params.messageTimeoutMs,
watchdogCheckMs: params.watchdogCheckMs,
reconnect: params.reconnect ?? { initialMs: 10, maxMs: 10, maxAttempts: 3, factor: 1.1 },

View File

@@ -407,7 +407,92 @@ describe("web auto-reply connection", () => {
socket.ws.emit("frame");
await vi.advanceTimersByTimeAsync(20);
}
await vi.waitFor(
() => {
expect(scripted.getListenerCount()).toBeGreaterThanOrEqual(2);
},
{ timeout: 250, interval: 2 },
);
controller.abort();
scripted.resolveClose(scripted.getListenerCount() - 1, {
status: 499,
isLoggedOut: false,
error: "aborted",
});
await Promise.resolve();
await run;
} finally {
vi.useRealTimers();
}
});
it("publishes frame-driven transport activity for quiet sessions", async () => {
vi.useFakeTimers();
try {
const sleep = vi.fn(async () => {});
const statuses: Array<Record<string, unknown>> = [];
const scripted = createScriptedWebListenerFactory();
const { controller, run } = startWebAutoReplyMonitor({
monitorWebChannelFn: monitorWebChannel as never,
listenerFactory: scripted.listenerFactory,
sleep,
heartbeatSeconds: 1,
transportTimeoutMs: 60_000,
messageTimeoutMs: 60_000,
watchdogCheckMs: 5,
statusSink: (next) => statuses.push({ ...next }),
});
await vi.waitFor(
() => {
expect(scripted.getListenerCount()).toBe(1);
},
{ timeout: 250, interval: 2 },
);
const initialTransportAt = Number(statuses.at(-1)?.lastTransportActivityAt ?? 0);
const socket = getLastWebAutoReplySessionSocket();
await vi.advanceTimersByTimeAsync(250);
socket.ws.emit("frame");
await vi.advanceTimersByTimeAsync(1_000);
const lastTransportAt = Number(statuses.at(-1)?.lastTransportActivityAt ?? 0);
expect(lastTransportAt).toBeGreaterThan(initialTransportAt);
controller.abort();
scripted.resolveClose(0, { status: 499, isLoggedOut: false, error: "aborted" });
await Promise.resolve();
await run;
} finally {
vi.useRealTimers();
}
});
it("reconnects on transport stall before the long app-silence window", async () => {
vi.useFakeTimers();
try {
const sleep = vi.fn(async () => {});
const scripted = createScriptedWebListenerFactory();
const { controller, run } = startWebAutoReplyMonitor({
monitorWebChannelFn: monitorWebChannel as never,
listenerFactory: scripted.listenerFactory,
sleep,
heartbeatSeconds: 1,
transportTimeoutMs: 30,
messageTimeoutMs: 3_000,
watchdogCheckMs: 5,
});
await vi.waitFor(
() => {
expect(scripted.getListenerCount()).toBe(1);
},
{ timeout: 250, interval: 2 },
);
await vi.advanceTimersByTimeAsync(36);
await Promise.resolve();
await vi.waitFor(
() => {
expect(scripted.getListenerCount()).toBeGreaterThanOrEqual(2);

View File

@@ -24,6 +24,17 @@ describe("createWebChannelStatusController", () => {
expect(last.lastTransportActivityAt).toBe(2000);
});
it("updates lastTransportActivityAt from explicit transport activity", () => {
const patches: Record<string, unknown>[] = [];
const controller = createWebChannelStatusController((s) => patches.push({ ...s }));
controller.noteConnected(1000);
controller.noteTransportActivity(3000);
const last = patches.at(-1)!;
expect(last.lastTransportActivityAt).toBe(3000);
});
it("does not set lastTransportActivityAt on noteWatchdogStale", () => {
const patches: Record<string, unknown>[] = [];
const controller = createWebChannelStatusController((s) => patches.push({ ...s }));

View File

@@ -53,6 +53,13 @@ export function createWebChannelStatusController(statusSink?: (status: WebChanne
}
emit();
},
noteTransportActivity(at = Date.now()) {
if (status.lastTransportActivityAt === at) {
return;
}
Object.assign(status, createTransportActivityStatusPatch(at));
emit();
},
noteWatchdogStale(at = Date.now()) {
status.lastEventAt = at;
if (status.connected) {

View File

@@ -134,6 +134,7 @@ async function clearTerminalWebAuthState(params: {
);
}
}
const DEFAULT_TRANSPORT_TIMEOUT_MS = 5 * 60 * 1000;
export async function monitorWebChannel(
verbose: boolean,
@@ -220,6 +221,7 @@ export async function monitorWebChannel(
};
process.once("SIGINT", handleSigint);
const transportTimeoutMs = tuning.transportTimeoutMs ?? DEFAULT_TRANSPORT_TIMEOUT_MS;
const messageTimeoutMs = tuning.messageTimeoutMs ?? 30 * 60 * 1000;
const watchdogCheckMs = tuning.watchdogCheckMs ?? 60 * 1000;
const controller = new WhatsAppConnectionController({
@@ -228,6 +230,7 @@ export async function monitorWebChannel(
verbose,
keepAlive,
heartbeatSeconds,
transportTimeoutMs,
messageTimeoutMs,
watchdogCheckMs,
reconnectPolicy,
@@ -328,6 +331,7 @@ export async function monitorWebChannel(
? { minutesSinceLastMessage }
: {}),
};
statusController.noteTransportActivity(snapshot.lastTransportActivityAt);
if (minutesSinceLastMessage && minutesSinceLastMessage > 30) {
heartbeatLogger.warn(
@@ -345,7 +349,7 @@ export async function monitorWebChannel(
const minutesSinceTransportActivity = Math.floor(transportSilentMs / 60000);
const minutesSinceAppActivity = Math.floor((now - appBaselineAt) / 60000);
const watchdogReason =
transportSilentMs > messageTimeoutMs ? "transport-inactive" : "app-silent";
transportSilentMs > transportTimeoutMs ? "transport-inactive" : "app-silent";
statusController.noteWatchdogStale();
heartbeatLogger.warn(
{

View File

@@ -36,6 +36,7 @@ export type WebMonitorTuning = {
reconnect?: Partial<ReconnectPolicy>;
socketTiming?: WhatsAppSocketTimingOptions;
heartbeatSeconds?: number;
transportTimeoutMs?: number;
messageTimeoutMs?: number;
watchdogCheckMs?: number;
sleep?: (ms: number, signal?: AbortSignal) => Promise<void>;

View File

@@ -1,3 +1,4 @@
import { EventEmitter } from "node:events";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { getRegisteredWhatsAppConnectionController } from "./connection-controller-registry.js";
import { WhatsAppConnectionController } from "./connection-controller.js";
@@ -24,6 +25,14 @@ function createListenerStub(messageId = "ok") {
};
}
function createSocketWithTransportEmitter() {
const ws = new EventEmitter() as EventEmitter & { close: ReturnType<typeof vi.fn> };
ws.close = vi.fn();
return {
ws,
};
}
describe("WhatsAppConnectionController", () => {
let controller: WhatsAppConnectionController;
@@ -154,4 +163,94 @@ describe("WhatsAppConnectionController", () => {
await liveController.shutdown();
}
});
it("tracks real websocket frame activity in the connection snapshot", async () => {
vi.useFakeTimers();
const controller = new WhatsAppConnectionController({
accountId: "work",
authDir: "/tmp/wa-auth",
verbose: false,
keepAlive: true,
heartbeatSeconds: 1,
transportTimeoutMs: 60_000,
messageTimeoutMs: 60_000,
watchdogCheckMs: 5_000,
reconnectPolicy: {
initialMs: 250,
maxMs: 1_000,
factor: 2,
jitter: 0,
maxAttempts: 5,
},
});
try {
const sock = createSocketWithTransportEmitter();
createWaSocketMock.mockResolvedValueOnce(sock as never);
waitForWaConnectionMock.mockResolvedValueOnce(undefined);
const snapshots: Array<{ lastTransportActivityAt: number }> = [];
await controller.openConnection({
connectionId: "conn-frame-activity",
createListener: async () => createListenerStub() as never,
onHeartbeat: (snapshot) => snapshots.push(snapshot),
});
await vi.advanceTimersByTimeAsync(1_000);
const firstSnapshot = snapshots.at(-1);
expect(firstSnapshot?.lastTransportActivityAt).toBeTypeOf("number");
const firstTransportAt = firstSnapshot?.lastTransportActivityAt ?? 0;
await vi.advanceTimersByTimeAsync(250);
sock.ws.emit("frame");
await vi.advanceTimersByTimeAsync(1_000);
const lastSnapshot = snapshots.at(-1);
expect(lastSnapshot?.lastTransportActivityAt).toBeGreaterThan(firstTransportAt);
} finally {
await controller.shutdown();
vi.useRealTimers();
}
});
it("forces reconnect on transport stall before the long app-silence window", async () => {
vi.useFakeTimers();
const controller = new WhatsAppConnectionController({
accountId: "work",
authDir: "/tmp/wa-auth",
verbose: false,
keepAlive: true,
heartbeatSeconds: 1,
transportTimeoutMs: 30,
messageTimeoutMs: 3_000,
watchdogCheckMs: 5,
reconnectPolicy: {
initialMs: 250,
maxMs: 1_000,
factor: 2,
jitter: 0,
maxAttempts: 5,
},
});
try {
const sock = createSocketWithTransportEmitter();
createWaSocketMock.mockResolvedValueOnce(sock as never);
waitForWaConnectionMock.mockResolvedValueOnce(undefined);
const timeouts: string[] = [];
await controller.openConnection({
connectionId: "conn-transport-timeout",
createListener: async () => createListenerStub() as never,
onWatchdogTimeout: () => timeouts.push("timeout"),
});
await vi.advanceTimersByTimeAsync(40);
expect(timeouts.length).toBeGreaterThanOrEqual(1);
} finally {
await controller.shutdown();
vi.useRealTimers();
}
});
});

View File

@@ -245,6 +245,7 @@ export class WhatsAppConnectionController {
private readonly reconnectPolicy: ReconnectPolicy;
private readonly heartbeatSeconds: number;
private readonly keepAlive: boolean;
private readonly transportTimeoutMs: number;
private readonly messageTimeoutMs: number;
private readonly appSilenceTimeoutMs: number;
private readonly watchdogCheckMs: number;
@@ -265,6 +266,7 @@ export class WhatsAppConnectionController {
verbose: boolean;
keepAlive: boolean;
heartbeatSeconds: number;
transportTimeoutMs: number;
messageTimeoutMs: number;
watchdogCheckMs: number;
reconnectPolicy: ReconnectPolicy;
@@ -278,6 +280,7 @@ export class WhatsAppConnectionController {
this.verbose = params.verbose;
this.keepAlive = params.keepAlive;
this.heartbeatSeconds = params.heartbeatSeconds;
this.transportTimeoutMs = params.transportTimeoutMs;
this.messageTimeoutMs = params.messageTimeoutMs;
this.appSilenceTimeoutMs = Math.max(params.messageTimeoutMs, params.messageTimeoutMs * 4);
this.watchdogCheckMs = params.watchdogCheckMs;
@@ -600,7 +603,7 @@ export class WhatsAppConnectionController {
const appBaselineAt = connection.lastInboundAt ?? connection.startedAt;
const appSilentForMs = now - appBaselineAt;
if (
transportStaleForMs <= this.messageTimeoutMs &&
transportStaleForMs <= this.transportTimeoutMs &&
appSilentForMs <= this.appSilenceTimeoutMs
) {
return;