From e672b61417af5c45b0431df6d9109a1f4b618ef5 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Sun, 26 Apr 2026 09:51:41 -0700 Subject: [PATCH] fix(whatsapp): stop reconnecting quiet sockets Fixes #70678.\n\nKeeps quiet but healthy WhatsApp linked-device sessions connected by tracking WhatsApp Web transport activity, while retaining a longer app-silence cap so frame activity cannot mask a stuck session forever. Also cleans up transport activity listeners on failed connection-open paths.\n\nCarries forward the focused #71466 approach and keeps #63939 as related configurable-timeout follow-up. Thanks @vincentkoc and @oromeis.\n\nValidation:\n- pnpm test:serial extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts extensions/whatsapp/src/connection-controller.test.ts\n- pnpm check:changed\n- codex review --base origin/main --- CHANGELOG.md | 1 + docs/channels/whatsapp.md | 5 ++ .../whatsapp/src/auto-reply.test-harness.ts | 50 +++++++++-- ...o-reply.connection-and-logging.e2e.test.ts | 87 +++++++++++++++++++ extensions/whatsapp/src/auto-reply/monitor.ts | 19 ++-- .../whatsapp/src/connection-controller.ts | 54 +++++++++++- 6 files changed, 200 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index de08cba1708..52bd60a09f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Docs: https://docs.openclaw.ai - Plugins: fail `plugins update` when tracked plugin or hook updates error, keep bundled runtime-dependency repair behind restrictive allowlists, and reject package installs with unloadable extension entries. Thanks @codex. - Gateway/chat: keep duplicate attachment-backed `chat.send` retries with the same idempotency key on the documented in-flight path so aborts still target the real active run. Fixes #70139. Thanks @Feelw00. - Plugins: share package entrypoint resolution between install and discovery, reject mismatched `runtimeExtensions`, and cache bundled runtime-dependency manifest reads during scans. Thanks @codex. +- WhatsApp/Web: keep quiet but healthy linked-device sessions connected by basing the watchdog on WhatsApp Web transport activity, while retaining a longer app-silence cap so frame activity cannot mask a stuck session forever. Fixes #70678; carries forward the focused #71466 approach and keeps #63939 as related configurable-timeout follow-up. Thanks @vincentkoc and @oromeis. ## 2026.4.26 diff --git a/docs/channels/whatsapp.md b/docs/channels/whatsapp.md index 13092af45c8..194f15b1e74 100644 --- a/docs/channels/whatsapp.md +++ b/docs/channels/whatsapp.md @@ -146,6 +146,7 @@ OpenClaw recommends running WhatsApp on a separate number when possible. (The ch ## Runtime model - Gateway owns the WhatsApp socket and reconnect loop. +- The reconnect watchdog uses WhatsApp Web transport activity, not only inbound app-message volume, so a quiet linked-device session is not restarted solely because nobody has sent a message recently. A longer application-silence cap still forces a reconnect if transport frames keep arriving but no application messages are handled for the watchdog window. - Outbound sends require an active WhatsApp listener for the target account. - Status and broadcast chats are ignored (`@status`, `@broadcast`). - Direct chats use DM session rules (`session.dmScope`; default `main` collapses DMs to the agent main session). @@ -510,6 +511,10 @@ Behavior notes: Symptom: linked account with repeated disconnects or reconnect attempts. + Quiet accounts can stay connected past the normal message timeout; the watchdog + restarts when WhatsApp Web transport activity stops, the socket closes, or + application-level activity stays silent beyond the longer safety window. + Fix: ```bash diff --git a/extensions/whatsapp/src/auto-reply.test-harness.ts b/extensions/whatsapp/src/auto-reply.test-harness.ts index 3dd8b9c646a..efa957d8396 100644 --- a/extensions/whatsapp/src/auto-reply.test-harness.ts +++ b/extensions/whatsapp/src/auto-reply.test-harness.ts @@ -1,4 +1,5 @@ import "./test-helpers.js"; +import { EventEmitter } from "node:events"; import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; @@ -42,25 +43,57 @@ type WebAutoReplyMonitorHarness = { controller: AbortController; run: Promise; }; +type MockSessionSocket = { + ev: { on: ReturnType; off: ReturnType }; + ws: EventEmitter & { close: ReturnType }; + user: { id: string }; +}; export const TEST_NET_IP = "93.184.216.34"; +const WEB_AUTO_REPLY_SOCKETS_KEY = Symbol.for("openclaw:webAutoReplySessionSockets"); + +function getSessionSockets(): MockSessionSocket[] { + const store = globalThis as Record; + if (!Array.isArray(store[WEB_AUTO_REPLY_SOCKETS_KEY])) { + store[WEB_AUTO_REPLY_SOCKETS_KEY] = []; + } + return store[WEB_AUTO_REPLY_SOCKETS_KEY] as MockSessionSocket[]; +} vi.mock("./session.js", async () => { const actual = await vi.importActual("./session.js"); return { ...actual, - createWaSocket: vi.fn(async () => ({ - ev: { - on: vi.fn(), - off: vi.fn(), - }, - ws: { close: vi.fn() }, - user: { id: "123@s.whatsapp.net" }, - })), + createWaSocket: vi.fn(async () => { + const ws = new EventEmitter() as MockSessionSocket["ws"]; + ws.close = vi.fn(); + const sock: MockSessionSocket = { + ev: { + on: vi.fn(), + off: vi.fn(), + }, + ws, + user: { id: "123@s.whatsapp.net" }, + }; + getSessionSockets().push(sock); + return sock; + }), waitForWaConnection: vi.fn().mockResolvedValue(undefined), }; }); +export function getLastWebAutoReplySessionSocket(): MockSessionSocket { + const last = getSessionSockets().at(-1); + if (!last) { + throw new Error("No WhatsApp Web auto-reply test socket created"); + } + return last; +} + +export function resetWebAutoReplySessionSockets() { + getSessionSockets().length = 0; +} + vi.mock("openclaw/plugin-sdk/agent-runtime", () => ({ abortEmbeddedPiRun: vi.fn().mockReturnValue(false), appendCronStyleCurrentTimeLine: (text: string) => text, @@ -166,6 +199,7 @@ export function installWebAutoReplyUnitTestHooks(opts?: { pinDns?: boolean }) { beforeEach(async () => { vi.clearAllMocks(); + resetWebAutoReplySessionSockets(); _resetBaileysMocks(); _resetLoadConfigMock(); if (opts?.pinDns) { diff --git a/extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts b/extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts index ce97327b6bc..6585de0c481 100644 --- a/extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts +++ b/extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts @@ -12,6 +12,7 @@ import { createMockWebListener, createScriptedWebListenerFactory, createWebListenerFactoryCapture, + getLastWebAutoReplySessionSocket, installWebAutoReplyTestHomeHooks, installWebAutoReplyUnitTestHooks, makeSessionStore, @@ -255,6 +256,92 @@ describe("web auto-reply connection", () => { } }); + it("keeps quiet linked-device sessions open when transport frames keep arriving", async () => { + vi.useFakeTimers(); + try { + const sleep = vi.fn(async () => {}); + const scripted = createScriptedWebListenerFactory(); + const { controller, run } = startWebAutoReplyMonitor({ + monitorWebChannelFn: monitorWebChannel as never, + listenerFactory: scripted.listenerFactory, + sleep, + heartbeatSeconds: 60, + messageTimeoutMs: 30, + watchdogCheckMs: 5, + }); + + await vi.waitFor( + () => { + expect(scripted.getListenerCount()).toBe(1); + }, + { timeout: 250, interval: 2 }, + ); + + const socket = getLastWebAutoReplySessionSocket(); + await vi.advanceTimersByTimeAsync(20); + socket.ws.emit("frame"); + await vi.advanceTimersByTimeAsync(20); + socket.ws.emit("frame"); + await vi.advanceTimersByTimeAsync(20); + + expect(scripted.getListenerCount()).toBe(1); + + controller.abort(); + scripted.resolveClose(0, { status: 499, isLoggedOut: false }); + await Promise.resolve(); + await run; + } finally { + vi.useRealTimers(); + } + }); + + it("does not let transport frames mask application silence forever", async () => { + vi.useFakeTimers(); + try { + const sleep = vi.fn(async () => {}); + const scripted = createScriptedWebListenerFactory(); + const { controller, run } = startWebAutoReplyMonitor({ + monitorWebChannelFn: monitorWebChannel as never, + listenerFactory: scripted.listenerFactory, + sleep, + heartbeatSeconds: 60, + messageTimeoutMs: 30, + watchdogCheckMs: 5, + }); + + await vi.waitFor( + () => { + expect(scripted.getListenerCount()).toBe(1); + }, + { timeout: 250, interval: 2 }, + ); + + const socket = getLastWebAutoReplySessionSocket(); + for (let elapsedMs = 0; elapsedMs < 140; elapsedMs += 20) { + socket.ws.emit("frame"); + await vi.advanceTimersByTimeAsync(20); + } + + await vi.waitFor( + () => { + expect(scripted.getListenerCount()).toBeGreaterThanOrEqual(2); + }, + { timeout: 250, interval: 2 }, + ); + + controller.abort(); + scripted.resolveClose(scripted.getListenerCount() - 1, { + status: 499, + isLoggedOut: false, + error: "aborted", + }); + await Promise.resolve(); + await run; + } finally { + vi.useRealTimers(); + } + }); + it("gives a reconnected listener a fresh watchdog window", async () => { vi.useFakeTimers(); try { diff --git a/extensions/whatsapp/src/auto-reply/monitor.ts b/extensions/whatsapp/src/auto-reply/monitor.ts index 44135be09c4..9980fdf02b1 100644 --- a/extensions/whatsapp/src/auto-reply/monitor.ts +++ b/extensions/whatsapp/src/auto-reply/monitor.ts @@ -280,6 +280,7 @@ export async function monitorWebChannel( reconnectAttempts: snapshot.reconnectAttempts, messagesHandled: snapshot.handledMessages, lastInboundAt: snapshot.lastInboundAt, + lastTransportActivityAt: snapshot.lastTransportActivityAt, authAgeMs, uptimeMs: snapshot.uptimeMs, ...(minutesSinceLastMessage !== null && minutesSinceLastMessage > 30 @@ -297,20 +298,28 @@ export async function monitorWebChannel( } }, onWatchdogTimeout: (snapshot) => { - const watchdogBaselineAt = snapshot.lastInboundAt ?? snapshot.startedAt; - const minutesSinceLastMessage = Math.floor((Date.now() - watchdogBaselineAt) / 60000); + const now = Date.now(); + const transportSilentMs = now - snapshot.lastTransportActivityAt; + const appBaselineAt = snapshot.lastInboundAt ?? snapshot.startedAt; + const minutesSinceTransportActivity = Math.floor(transportSilentMs / 60000); + const minutesSinceAppActivity = Math.floor((now - appBaselineAt) / 60000); + const watchdogReason = + transportSilentMs > messageTimeoutMs ? "transport-inactive" : "app-silent"; statusController.noteWatchdogStale(); heartbeatLogger.warn( { connectionId: snapshot.connectionId, - minutesSinceLastMessage, + watchdogReason, + minutesSinceTransportActivity, + minutesSinceAppActivity, lastInboundAt: snapshot.lastInboundAt ? new Date(snapshot.lastInboundAt) : null, + lastTransportActivityAt: new Date(snapshot.lastTransportActivityAt), messagesHandled: snapshot.handledMessages, }, - "Message timeout detected - forcing reconnect", + "WhatsApp watchdog timeout detected - forcing reconnect", ); whatsappHeartbeatLog.warn( - `No messages received in ${minutesSinceLastMessage}m - restarting connection`, + `WhatsApp watchdog timeout (${watchdogReason}) - restarting connection`, ); }, }); diff --git a/extensions/whatsapp/src/connection-controller.ts b/extensions/whatsapp/src/connection-controller.ts index 6142dacfc6e..a0fc5f04b28 100644 --- a/extensions/whatsapp/src/connection-controller.ts +++ b/extensions/whatsapp/src/connection-controller.ts @@ -40,8 +40,10 @@ export type WhatsAppLiveConnection = { heartbeat: TimerHandle | null; watchdogTimer: TimerHandle | null; lastInboundAt: number | null; + lastTransportActivityAt: number; handledMessages: number; unregisterUnhandled: (() => void) | null; + unregisterTransportActivity: (() => void) | null; backgroundTasks: Set>; closePromise: Promise; resolveClose: (reason: WebListenerCloseReason) => void; @@ -51,6 +53,7 @@ export type WhatsAppConnectionSnapshot = { connectionId: string; startedAt: number; lastInboundAt: number | null; + lastTransportActivityAt: number; handledMessages: number; reconnectAttempts: number; uptimeMs: number; @@ -83,6 +86,12 @@ function createNeverResolvePromise(): Promise { return new Promise(() => {}); } +type SocketActivityEmitter = { + on?: (event: string, listener: (...args: unknown[]) => void) => void; + off?: (event: string, listener: (...args: unknown[]) => void) => void; + removeListener?: (event: string, listener: (...args: unknown[]) => void) => void; +}; + function createLiveConnection(params: { connectionId: string; sock: WASocket; @@ -108,8 +117,10 @@ function createLiveConnection(params: { heartbeat: null, watchdogTimer: null, lastInboundAt: null, + lastTransportActivityAt: Date.now(), handledMessages: 0, unregisterUnhandled: null, + unregisterTransportActivity: null, backgroundTasks: new Set>(), closePromise, resolveClose: resolveClosePromise, @@ -232,6 +243,7 @@ export class WhatsAppConnectionController { private readonly heartbeatSeconds: number; private readonly keepAlive: boolean; private readonly messageTimeoutMs: number; + private readonly appSilenceTimeoutMs: number; private readonly watchdogCheckMs: number; private readonly verbose: boolean; private readonly abortSignal?: AbortSignal; @@ -262,6 +274,7 @@ export class WhatsAppConnectionController { this.keepAlive = params.keepAlive; this.heartbeatSeconds = params.heartbeatSeconds; this.messageTimeoutMs = params.messageTimeoutMs; + this.appSilenceTimeoutMs = Math.max(params.messageTimeoutMs, params.messageTimeoutMs * 4); this.watchdogCheckMs = params.watchdogCheckMs; this.reconnectPolicy = params.reconnectPolicy; this.abortSignal = params.abortSignal; @@ -311,6 +324,14 @@ export class WhatsAppConnectionController { } this.current.handledMessages += 1; this.current.lastInboundAt = timestamp; + this.current.lastTransportActivityAt = timestamp; + } + + noteTransportActivity(timestamp = Date.now()): void { + if (!this.current) { + return; + } + this.current.lastTransportActivityAt = timestamp; } getCurrentSnapshot( @@ -323,6 +344,7 @@ export class WhatsAppConnectionController { connectionId: connection.connectionId, startedAt: connection.startedAt, lastInboundAt: connection.lastInboundAt, + lastTransportActivityAt: connection.lastTransportActivityAt, handledMessages: connection.handledMessages, reconnectAttempts: this.reconnectAttempts, uptimeMs: Date.now() - connection.startedAt, @@ -369,6 +391,7 @@ export class WhatsAppConnectionController { const listener = await params.createListener({ sock, connection }); connection.listener = listener; this.current = connection; + connection.unregisterTransportActivity = this.attachTransportActivityListener(sock); registerWhatsAppConnectionController(this.accountId, this); this.startTimers(connection, { onHeartbeat: params.onHeartbeat, @@ -383,6 +406,7 @@ export class WhatsAppConnectionController { if (connection?.unregisterUnhandled) { connection.unregisterUnhandled(); } + connection?.unregisterTransportActivity?.(); throw err; } } @@ -515,6 +539,7 @@ export class WhatsAppConnectionController { this.socketRef.current = null; } connection.unregisterUnhandled?.(); + connection.unregisterTransportActivity?.(); if (connection.heartbeat) { clearInterval(connection.heartbeat); } @@ -563,9 +588,14 @@ export class WhatsAppConnectionController { }, this.heartbeatSeconds * 1000); connection.watchdogTimer = setInterval(() => { - const baselineAt = connection.lastInboundAt ?? connection.startedAt; - const staleForMs = Date.now() - baselineAt; - if (staleForMs <= this.messageTimeoutMs) { + const now = Date.now(); + const transportStaleForMs = now - connection.lastTransportActivityAt; + const appBaselineAt = connection.lastInboundAt ?? connection.startedAt; + const appSilentForMs = now - appBaselineAt; + if ( + transportStaleForMs <= this.messageTimeoutMs && + appSilentForMs <= this.appSilenceTimeoutMs + ) { return; } const snapshot = this.getCurrentSnapshot(connection); @@ -581,6 +611,24 @@ export class WhatsAppConnectionController { }, this.watchdogCheckMs); } + private attachTransportActivityListener(sock: WASocket): (() => void) | null { + const ws = sock.ws as SocketActivityEmitter | undefined; + if (!ws || typeof ws.on !== "function") { + return null; + } + + const noteActivity = () => this.noteTransportActivity(); + ws.on("frame", noteActivity); + + return () => { + if (typeof ws.off === "function") { + ws.off("frame", noteActivity); + return; + } + ws.removeListener?.("frame", noteActivity); + }; + } + private stopDisconnectRetries(): void { if (!this.disconnectRetryController.signal.aborted) { this.disconnectRetryController.abort();