From 7ddd815e469e5f5a9407d67cd8dc306eff771b84 Mon Sep 17 00:00:00 2001 From: Sathvik Gilakamsetty Date: Wed, 29 Apr 2026 09:16:55 +0530 Subject: [PATCH] fix(whatsapp): report transport activity so stale-socket health detection works (#72656) Merged via squash. Prepared head SHA: 1b1920742c94a09f5d1645f077c7be5785603876 Co-authored-by: Sathvik-1007 <195685832+Sathvik-1007@users.noreply.github.com> Co-authored-by: mcaxtr <7562095+mcaxtr@users.noreply.github.com> Reviewed-by: @mcaxtr --- CHANGELOG.md | 1 + docs/channels/whatsapp.md | 1 + .../whatsapp/src/auto-reply.test-harness.ts | 17 ++- ...o-reply.connection-and-logging.e2e.test.ts | 85 +++++++++++++++ .../src/auto-reply/monitor-state.test.ts | 64 +++++++++++ .../whatsapp/src/auto-reply/monitor-state.ts | 14 ++- extensions/whatsapp/src/auto-reply/monitor.ts | 6 +- extensions/whatsapp/src/auto-reply/types.ts | 2 + .../src/connection-controller.test.ts | 102 ++++++++++++++++++ .../whatsapp/src/connection-controller.ts | 5 +- 10 files changed, 289 insertions(+), 8 deletions(-) create mode 100644 extensions/whatsapp/src/auto-reply/monitor-state.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 90447e9b3ad..49661bbd2a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -91,6 +91,7 @@ Docs: https://docs.openclaw.ai - Channels/WhatsApp: restrict pairing verification replies to real inbound user content, preventing unsolicited prompts from receipts, typing indicators, presence updates, and other non-message Baileys upserts. Fixes #73797. (#73823) Thanks @hclsys. - Configure/Ollama: show the configured Ollama model allowlist after Cloud only or Cloud + Local setup and skip slow per-model cloud metadata fetches. (#73995) Thanks @obviyus. - Channels/WhatsApp: detect explicit group `@mentions` again when the bot's own E.164 is in `allowFrom`, so shared-number setups no longer skip group pings that directly mention the bot. Fixes #49317. (#73453) Thanks @juan-flores077. +- WhatsApp/reliability: publish real transport-liveness into WhatsApp channel status and force earlier reconnects on silent transport stalls, so quiet healthy sessions stay connected while wedged sockets recover before the later remote 408 path. (#72656) Thanks @Sathvik-1007. ## 2026.4.27 diff --git a/docs/channels/whatsapp.md b/docs/channels/whatsapp.md index 08af114f43b..da9835ed12c 100644 --- a/docs/channels/whatsapp.md +++ b/docs/channels/whatsapp.md @@ -150,6 +150,7 @@ OpenClaw recommends running WhatsApp on a separate number when possible. (The ch - Baileys socket timings are explicit under `web.whatsapp.*`: `keepAliveIntervalMs` controls WhatsApp Web application pings, `connectTimeoutMs` controls the opening handshake timeout, and `defaultQueryTimeoutMs` controls Baileys query timeouts. - Outbound sends require an active WhatsApp listener for the target account. - Status and broadcast chats are ignored (`@status`, `@broadcast`). +- The reconnect watchdog follows WhatsApp Web transport activity, not only inbound app-message volume: quiet linked-device sessions stay up while transport frames continue, but a transport stall forces reconnect well before the later remote disconnect path. - Direct chats use DM session rules (`session.dmScope`; default `main` collapses DMs to the agent main session). - Group sessions are isolated (`agent::whatsapp:group:`). - WhatsApp Web transport honors standard proxy environment variables on the gateway host (`HTTPS_PROXY`, `HTTP_PROXY`, `NO_PROXY` / lowercase variants). Prefer host-level proxy config over channel-specific WhatsApp proxy settings. diff --git a/extensions/whatsapp/src/auto-reply.test-harness.ts b/extensions/whatsapp/src/auto-reply.test-harness.ts index d6d0da90f43..e5fc5b5f228 100644 --- a/extensions/whatsapp/src/auto-reply.test-harness.ts +++ b/extensions/whatsapp/src/auto-reply.test-harness.ts @@ -45,8 +45,13 @@ type WebAutoReplyMonitorHarness = { run: Promise; }; type MockSessionSocket = { - ev: { on: ReturnType; off: ReturnType }; - ws: EventEmitter & { close: ReturnType }; + ev: { + on: ReturnType; + off: ReturnType; + }; + ws: EventEmitter & { + close: ReturnType; + }; user: { id: string }; }; @@ -68,7 +73,7 @@ vi.mock("./session.js", async () => { createWaSocket: vi.fn(async () => { const ws = new EventEmitter() as MockSessionSocket["ws"]; ws.close = vi.fn(); - const sock: MockSessionSocket = { + const socket: MockSessionSocket = { ev: { on: vi.fn(), off: vi.fn(), @@ -76,8 +81,8 @@ vi.mock("./session.js", async () => { ws, user: { id: "123@s.whatsapp.net" }, }; - getSessionSockets().push(sock); - return sock; + getSessionSockets().push(socket); + return socket; }), waitForWaConnection: vi.fn().mockResolvedValue(undefined), }; @@ -309,6 +314,7 @@ export function startWebAutoReplyMonitor(params: { sleep: UnknownMock | AsyncUnknownMock; signal?: AbortSignal; heartbeatSeconds?: number; + transportTimeoutMs?: number; messageTimeoutMs?: number; watchdogCheckMs?: number; reconnect?: { initialMs: number; maxMs: number; maxAttempts: number; factor: number }; @@ -326,6 +332,7 @@ export function startWebAutoReplyMonitor(params: { params.signal ?? controller.signal, { heartbeatSeconds: params.heartbeatSeconds ?? 1, + transportTimeoutMs: params.transportTimeoutMs, messageTimeoutMs: params.messageTimeoutMs, watchdogCheckMs: params.watchdogCheckMs, reconnect: params.reconnect ?? { initialMs: 10, maxMs: 10, maxAttempts: 3, factor: 1.1 }, diff --git a/extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts b/extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts index 94c6e362e7f..4ab0c9029ea 100644 --- a/extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts +++ b/extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts @@ -407,7 +407,92 @@ describe("web auto-reply connection", () => { socket.ws.emit("frame"); await vi.advanceTimersByTimeAsync(20); } + await vi.waitFor( + () => { + expect(scripted.getListenerCount()).toBeGreaterThanOrEqual(2); + }, + { timeout: 250, interval: 2 }, + ); + controller.abort(); + scripted.resolveClose(scripted.getListenerCount() - 1, { + status: 499, + isLoggedOut: false, + error: "aborted", + }); + await Promise.resolve(); + await run; + } finally { + vi.useRealTimers(); + } + }); + + it("publishes frame-driven transport activity for quiet sessions", async () => { + vi.useFakeTimers(); + try { + const sleep = vi.fn(async () => {}); + const statuses: Array> = []; + const scripted = createScriptedWebListenerFactory(); + const { controller, run } = startWebAutoReplyMonitor({ + monitorWebChannelFn: monitorWebChannel as never, + listenerFactory: scripted.listenerFactory, + sleep, + heartbeatSeconds: 1, + transportTimeoutMs: 60_000, + messageTimeoutMs: 60_000, + watchdogCheckMs: 5, + statusSink: (next) => statuses.push({ ...next }), + }); + + await vi.waitFor( + () => { + expect(scripted.getListenerCount()).toBe(1); + }, + { timeout: 250, interval: 2 }, + ); + + const initialTransportAt = Number(statuses.at(-1)?.lastTransportActivityAt ?? 0); + const socket = getLastWebAutoReplySessionSocket(); + await vi.advanceTimersByTimeAsync(250); + socket.ws.emit("frame"); + await vi.advanceTimersByTimeAsync(1_000); + + const lastTransportAt = Number(statuses.at(-1)?.lastTransportActivityAt ?? 0); + expect(lastTransportAt).toBeGreaterThan(initialTransportAt); + + controller.abort(); + scripted.resolveClose(0, { status: 499, isLoggedOut: false, error: "aborted" }); + await Promise.resolve(); + await run; + } finally { + vi.useRealTimers(); + } + }); + + it("reconnects on transport stall before the long app-silence window", async () => { + vi.useFakeTimers(); + try { + const sleep = vi.fn(async () => {}); + const scripted = createScriptedWebListenerFactory(); + const { controller, run } = startWebAutoReplyMonitor({ + monitorWebChannelFn: monitorWebChannel as never, + listenerFactory: scripted.listenerFactory, + sleep, + heartbeatSeconds: 1, + transportTimeoutMs: 30, + messageTimeoutMs: 3_000, + watchdogCheckMs: 5, + }); + + await vi.waitFor( + () => { + expect(scripted.getListenerCount()).toBe(1); + }, + { timeout: 250, interval: 2 }, + ); + + await vi.advanceTimersByTimeAsync(36); + await Promise.resolve(); await vi.waitFor( () => { expect(scripted.getListenerCount()).toBeGreaterThanOrEqual(2); diff --git a/extensions/whatsapp/src/auto-reply/monitor-state.test.ts b/extensions/whatsapp/src/auto-reply/monitor-state.test.ts new file mode 100644 index 00000000000..a7ec43941e5 --- /dev/null +++ b/extensions/whatsapp/src/auto-reply/monitor-state.test.ts @@ -0,0 +1,64 @@ +import { describe, expect, it } from "vitest"; +import { createWebChannelStatusController } from "./monitor-state.js"; + +describe("createWebChannelStatusController", () => { + it("sets lastTransportActivityAt on noteConnected", () => { + const patches: Record[] = []; + const controller = createWebChannelStatusController((s) => patches.push({ ...s })); + + controller.noteConnected(1000); + + const last = patches.at(-1)!; + expect(last.connected).toBe(true); + expect(last.lastTransportActivityAt).toBe(1000); + }); + + it("updates lastTransportActivityAt on noteInbound", () => { + const patches: Record[] = []; + const controller = createWebChannelStatusController((s) => patches.push({ ...s })); + + controller.noteConnected(1000); + controller.noteInbound(2000); + + const last = patches.at(-1)!; + expect(last.lastTransportActivityAt).toBe(2000); + }); + + it("updates lastTransportActivityAt from explicit transport activity", () => { + const patches: Record[] = []; + const controller = createWebChannelStatusController((s) => patches.push({ ...s })); + + controller.noteConnected(1000); + controller.noteTransportActivity(3000); + + const last = patches.at(-1)!; + expect(last.lastTransportActivityAt).toBe(3000); + }); + + it("does not set lastTransportActivityAt on noteWatchdogStale", () => { + const patches: Record[] = []; + const controller = createWebChannelStatusController((s) => patches.push({ ...s })); + + controller.noteConnected(1000); + controller.noteWatchdogStale(5000); + + const last = patches.at(-1)!; + // Watchdog staleness should not refresh transport activity — it means + // the check loop is running but the socket itself is idle/stale. + expect(last.lastTransportActivityAt).toBe(1000); + }); + + it("produces snapshots that enable stale-socket health detection", () => { + const patches: Record[] = []; + const controller = createWebChannelStatusController((s) => patches.push({ ...s })); + + controller.noteConnected(1000); + + const last = patches.at(-1)!; + // The gateway health policy checks `connected === true && lastTransportActivityAt != null` + // to decide whether to run stale-socket detection. Both must be present. + expect(last.connected).toBe(true); + expect(last.lastTransportActivityAt).not.toBeNull(); + expect(typeof last.lastTransportActivityAt).toBe("number"); + }); +}); diff --git a/extensions/whatsapp/src/auto-reply/monitor-state.ts b/extensions/whatsapp/src/auto-reply/monitor-state.ts index b03ed80f39b..b54136f528b 100644 --- a/extensions/whatsapp/src/auto-reply/monitor-state.ts +++ b/extensions/whatsapp/src/auto-reply/monitor-state.ts @@ -1,4 +1,7 @@ -import { createConnectedChannelStatusPatch } from "openclaw/plugin-sdk/gateway-runtime"; +import { + createConnectedChannelStatusPatch, + createTransportActivityStatusPatch, +} from "openclaw/plugin-sdk/gateway-runtime"; import type { WebChannelHealthState, WebChannelStatus } from "./types.js"; function cloneStatus(status: WebChannelStatus): WebChannelStatus { @@ -35,6 +38,7 @@ export function createWebChannelStatusController(statusSink?: (status: WebChanne snapshot: () => status, noteConnected(at = Date.now()) { Object.assign(status, createConnectedChannelStatusPatch(at)); + Object.assign(status, createTransportActivityStatusPatch(at)); status.lastError = null; status.healthState = "healthy"; emit(); @@ -43,11 +47,19 @@ export function createWebChannelStatusController(statusSink?: (status: WebChanne status.lastInboundAt = at; status.lastMessageAt = at; status.lastEventAt = at; + Object.assign(status, createTransportActivityStatusPatch(at)); if (status.connected) { status.healthState = "healthy"; } emit(); }, + noteTransportActivity(at = Date.now()) { + if (status.lastTransportActivityAt === at) { + return; + } + Object.assign(status, createTransportActivityStatusPatch(at)); + emit(); + }, noteWatchdogStale(at = Date.now()) { status.lastEventAt = at; if (status.connected) { diff --git a/extensions/whatsapp/src/auto-reply/monitor.ts b/extensions/whatsapp/src/auto-reply/monitor.ts index 712850bbc7e..f4a2f3a4674 100644 --- a/extensions/whatsapp/src/auto-reply/monitor.ts +++ b/extensions/whatsapp/src/auto-reply/monitor.ts @@ -134,6 +134,7 @@ async function clearTerminalWebAuthState(params: { ); } } +const DEFAULT_TRANSPORT_TIMEOUT_MS = 5 * 60 * 1000; export async function monitorWebChannel( verbose: boolean, @@ -220,6 +221,7 @@ export async function monitorWebChannel( }; process.once("SIGINT", handleSigint); + const transportTimeoutMs = tuning.transportTimeoutMs ?? DEFAULT_TRANSPORT_TIMEOUT_MS; const messageTimeoutMs = tuning.messageTimeoutMs ?? 30 * 60 * 1000; const watchdogCheckMs = tuning.watchdogCheckMs ?? 60 * 1000; const controller = new WhatsAppConnectionController({ @@ -228,6 +230,7 @@ export async function monitorWebChannel( verbose, keepAlive, heartbeatSeconds, + transportTimeoutMs, messageTimeoutMs, watchdogCheckMs, reconnectPolicy, @@ -328,6 +331,7 @@ export async function monitorWebChannel( ? { minutesSinceLastMessage } : {}), }; + statusController.noteTransportActivity(snapshot.lastTransportActivityAt); if (minutesSinceLastMessage && minutesSinceLastMessage > 30) { heartbeatLogger.warn( @@ -345,7 +349,7 @@ export async function monitorWebChannel( const minutesSinceTransportActivity = Math.floor(transportSilentMs / 60000); const minutesSinceAppActivity = Math.floor((now - appBaselineAt) / 60000); const watchdogReason = - transportSilentMs > messageTimeoutMs ? "transport-inactive" : "app-silent"; + transportSilentMs > transportTimeoutMs ? "transport-inactive" : "app-silent"; statusController.noteWatchdogStale(); heartbeatLogger.warn( { diff --git a/extensions/whatsapp/src/auto-reply/types.ts b/extensions/whatsapp/src/auto-reply/types.ts index 68c3ee545f6..ce07f59fc77 100644 --- a/extensions/whatsapp/src/auto-reply/types.ts +++ b/extensions/whatsapp/src/auto-reply/types.ts @@ -27,6 +27,7 @@ export type WebChannelStatus = { lastInboundAt?: number | null; lastMessageAt?: number | null; lastEventAt?: number | null; + lastTransportActivityAt?: number | null; lastError?: string | null; healthState?: WebChannelHealthState; }; @@ -35,6 +36,7 @@ export type WebMonitorTuning = { reconnect?: Partial; socketTiming?: WhatsAppSocketTimingOptions; heartbeatSeconds?: number; + transportTimeoutMs?: number; messageTimeoutMs?: number; watchdogCheckMs?: number; sleep?: (ms: number, signal?: AbortSignal) => Promise; diff --git a/extensions/whatsapp/src/connection-controller.test.ts b/extensions/whatsapp/src/connection-controller.test.ts index cf53f42e989..7d017a0feb1 100644 --- a/extensions/whatsapp/src/connection-controller.test.ts +++ b/extensions/whatsapp/src/connection-controller.test.ts @@ -1,3 +1,4 @@ +import { EventEmitter } from "node:events"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { getRegisteredWhatsAppConnectionController } from "./connection-controller-registry.js"; import { WhatsAppConnectionController } from "./connection-controller.js"; @@ -24,6 +25,14 @@ function createListenerStub(messageId = "ok") { }; } +function createSocketWithTransportEmitter() { + const ws = new EventEmitter() as EventEmitter & { close: ReturnType }; + ws.close = vi.fn(); + return { + ws, + }; +} + describe("WhatsAppConnectionController", () => { let controller: WhatsAppConnectionController; @@ -35,6 +44,7 @@ describe("WhatsAppConnectionController", () => { verbose: false, keepAlive: false, heartbeatSeconds: 30, + transportTimeoutMs: 60_000, messageTimeoutMs: 60_000, watchdogCheckMs: 5_000, reconnectPolicy: { @@ -100,6 +110,7 @@ describe("WhatsAppConnectionController", () => { verbose: false, keepAlive: false, heartbeatSeconds: 30, + transportTimeoutMs: 60_000, messageTimeoutMs: 60_000, watchdogCheckMs: 5_000, reconnectPolicy: { @@ -126,6 +137,7 @@ describe("WhatsAppConnectionController", () => { verbose: false, keepAlive: false, heartbeatSeconds: 30, + transportTimeoutMs: 60_000, messageTimeoutMs: 60_000, watchdogCheckMs: 5_000, reconnectPolicy: { @@ -154,4 +166,94 @@ describe("WhatsAppConnectionController", () => { await liveController.shutdown(); } }); + + it("tracks real websocket frame activity in the connection snapshot", async () => { + vi.useFakeTimers(); + const controller = new WhatsAppConnectionController({ + accountId: "work", + authDir: "/tmp/wa-auth", + verbose: false, + keepAlive: true, + heartbeatSeconds: 1, + transportTimeoutMs: 60_000, + messageTimeoutMs: 60_000, + watchdogCheckMs: 5_000, + reconnectPolicy: { + initialMs: 250, + maxMs: 1_000, + factor: 2, + jitter: 0, + maxAttempts: 5, + }, + }); + + try { + const sock = createSocketWithTransportEmitter(); + createWaSocketMock.mockResolvedValueOnce(sock as never); + waitForWaConnectionMock.mockResolvedValueOnce(undefined); + + const snapshots: Array<{ lastTransportActivityAt: number }> = []; + await controller.openConnection({ + connectionId: "conn-frame-activity", + createListener: async () => createListenerStub() as never, + onHeartbeat: (snapshot) => snapshots.push(snapshot), + }); + + await vi.advanceTimersByTimeAsync(1_000); + const firstSnapshot = snapshots.at(-1); + expect(firstSnapshot?.lastTransportActivityAt).toBeTypeOf("number"); + + const firstTransportAt = firstSnapshot?.lastTransportActivityAt ?? 0; + await vi.advanceTimersByTimeAsync(250); + sock.ws.emit("frame"); + await vi.advanceTimersByTimeAsync(1_000); + + const lastSnapshot = snapshots.at(-1); + expect(lastSnapshot?.lastTransportActivityAt).toBeGreaterThan(firstTransportAt); + } finally { + await controller.shutdown(); + vi.useRealTimers(); + } + }); + + it("forces reconnect on transport stall before the long app-silence window", async () => { + vi.useFakeTimers(); + const controller = new WhatsAppConnectionController({ + accountId: "work", + authDir: "/tmp/wa-auth", + verbose: false, + keepAlive: true, + heartbeatSeconds: 1, + transportTimeoutMs: 30, + messageTimeoutMs: 3_000, + watchdogCheckMs: 5, + reconnectPolicy: { + initialMs: 250, + maxMs: 1_000, + factor: 2, + jitter: 0, + maxAttempts: 5, + }, + }); + + try { + const sock = createSocketWithTransportEmitter(); + createWaSocketMock.mockResolvedValueOnce(sock as never); + waitForWaConnectionMock.mockResolvedValueOnce(undefined); + + const timeouts: string[] = []; + await controller.openConnection({ + connectionId: "conn-transport-timeout", + createListener: async () => createListenerStub() as never, + onWatchdogTimeout: () => timeouts.push("timeout"), + }); + + await vi.advanceTimersByTimeAsync(40); + + expect(timeouts.length).toBeGreaterThanOrEqual(1); + } finally { + await controller.shutdown(); + vi.useRealTimers(); + } + }); }); diff --git a/extensions/whatsapp/src/connection-controller.ts b/extensions/whatsapp/src/connection-controller.ts index 56ab1caa72b..7fa383b8fe0 100644 --- a/extensions/whatsapp/src/connection-controller.ts +++ b/extensions/whatsapp/src/connection-controller.ts @@ -245,6 +245,7 @@ export class WhatsAppConnectionController { private readonly reconnectPolicy: ReconnectPolicy; private readonly heartbeatSeconds: number; private readonly keepAlive: boolean; + private readonly transportTimeoutMs: number; private readonly messageTimeoutMs: number; private readonly appSilenceTimeoutMs: number; private readonly watchdogCheckMs: number; @@ -265,6 +266,7 @@ export class WhatsAppConnectionController { verbose: boolean; keepAlive: boolean; heartbeatSeconds: number; + transportTimeoutMs: number; messageTimeoutMs: number; watchdogCheckMs: number; reconnectPolicy: ReconnectPolicy; @@ -278,6 +280,7 @@ export class WhatsAppConnectionController { this.verbose = params.verbose; this.keepAlive = params.keepAlive; this.heartbeatSeconds = params.heartbeatSeconds; + this.transportTimeoutMs = params.transportTimeoutMs; this.messageTimeoutMs = params.messageTimeoutMs; this.appSilenceTimeoutMs = Math.max(params.messageTimeoutMs, params.messageTimeoutMs * 4); this.watchdogCheckMs = params.watchdogCheckMs; @@ -600,7 +603,7 @@ export class WhatsAppConnectionController { const appBaselineAt = connection.lastInboundAt ?? connection.startedAt; const appSilentForMs = now - appBaselineAt; if ( - transportStaleForMs <= this.messageTimeoutMs && + transportStaleForMs <= this.transportTimeoutMs && appSilentForMs <= this.appSilenceTimeoutMs ) { return;