fix(whatsapp): stop reconnecting quiet sockets

Fixes #70678.\n\nKeeps quiet but healthy WhatsApp linked-device sessions connected by tracking WhatsApp Web transport activity, while retaining a longer app-silence cap so frame activity cannot mask a stuck session forever. Also cleans up transport activity listeners on failed connection-open paths.\n\nCarries forward the focused #71466 approach and keeps #63939 as related configurable-timeout follow-up. Thanks @vincentkoc and @oromeis.\n\nValidation:\n- pnpm test:serial extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts extensions/whatsapp/src/connection-controller.test.ts\n- pnpm check:changed\n- codex review --base origin/main
This commit is contained in:
Vincent Koc
2026-04-26 09:51:41 -07:00
committed by Peter Steinberger
parent a188d486dd
commit 1f194f1d55
6 changed files with 200 additions and 16 deletions

View File

@@ -16,6 +16,7 @@ Docs: https://docs.openclaw.ai
- Plugins: fail `plugins update` when tracked plugin or hook updates error, keep bundled runtime-dependency repair behind restrictive allowlists, and reject package installs with unloadable extension entries. Thanks @codex.
- Gateway/chat: keep duplicate attachment-backed `chat.send` retries with the same idempotency key on the documented in-flight path so aborts still target the real active run. Fixes #70139. Thanks @Feelw00.
- Plugins: share package entrypoint resolution between install and discovery, reject mismatched `runtimeExtensions`, and cache bundled runtime-dependency manifest reads during scans. Thanks @codex.
- WhatsApp/Web: keep quiet but healthy linked-device sessions connected by basing the watchdog on WhatsApp Web transport activity, while retaining a longer app-silence cap so frame activity cannot mask a stuck session forever. Fixes #70678; carries forward the focused #71466 approach and keeps #63939 as related configurable-timeout follow-up. Thanks @vincentkoc and @oromeis.
## 2026.4.26

View File

@@ -146,6 +146,7 @@ OpenClaw recommends running WhatsApp on a separate number when possible. (The ch
## Runtime model
- Gateway owns the WhatsApp socket and reconnect loop.
- The reconnect watchdog uses WhatsApp Web transport activity, not only inbound app-message volume, so a quiet linked-device session is not restarted solely because nobody has sent a message recently. A longer application-silence cap still forces a reconnect if transport frames keep arriving but no application messages are handled for the watchdog window.
- Outbound sends require an active WhatsApp listener for the target account.
- Status and broadcast chats are ignored (`@status`, `@broadcast`).
- Direct chats use DM session rules (`session.dmScope`; default `main` collapses DMs to the agent main session).
@@ -510,6 +511,10 @@ Behavior notes:
<Accordion title="Linked but disconnected / reconnect loop">
Symptom: linked account with repeated disconnects or reconnect attempts.
Quiet accounts can stay connected past the normal message timeout; the watchdog
restarts when WhatsApp Web transport activity stops, the socket closes, or
application-level activity stays silent beyond the longer safety window.
Fix:
```bash

View File

@@ -1,4 +1,5 @@
import "./test-helpers.js";
import { EventEmitter } from "node:events";
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
@@ -42,25 +43,57 @@ type WebAutoReplyMonitorHarness = {
controller: AbortController;
run: Promise<unknown>;
};
type MockSessionSocket = {
ev: { on: ReturnType<typeof vi.fn>; off: ReturnType<typeof vi.fn> };
ws: EventEmitter & { close: ReturnType<typeof vi.fn> };
user: { id: string };
};
export const TEST_NET_IP = "93.184.216.34";
const WEB_AUTO_REPLY_SOCKETS_KEY = Symbol.for("openclaw:webAutoReplySessionSockets");
function getSessionSockets(): MockSessionSocket[] {
const store = globalThis as Record<PropertyKey, unknown>;
if (!Array.isArray(store[WEB_AUTO_REPLY_SOCKETS_KEY])) {
store[WEB_AUTO_REPLY_SOCKETS_KEY] = [];
}
return store[WEB_AUTO_REPLY_SOCKETS_KEY] as MockSessionSocket[];
}
vi.mock("./session.js", async () => {
const actual = await vi.importActual<typeof import("./session.js")>("./session.js");
return {
...actual,
createWaSocket: vi.fn(async () => ({
ev: {
on: vi.fn(),
off: vi.fn(),
},
ws: { close: vi.fn() },
user: { id: "123@s.whatsapp.net" },
})),
createWaSocket: vi.fn(async () => {
const ws = new EventEmitter() as MockSessionSocket["ws"];
ws.close = vi.fn();
const sock: MockSessionSocket = {
ev: {
on: vi.fn(),
off: vi.fn(),
},
ws,
user: { id: "123@s.whatsapp.net" },
};
getSessionSockets().push(sock);
return sock;
}),
waitForWaConnection: vi.fn().mockResolvedValue(undefined),
};
});
export function getLastWebAutoReplySessionSocket(): MockSessionSocket {
const last = getSessionSockets().at(-1);
if (!last) {
throw new Error("No WhatsApp Web auto-reply test socket created");
}
return last;
}
export function resetWebAutoReplySessionSockets() {
getSessionSockets().length = 0;
}
vi.mock("openclaw/plugin-sdk/agent-runtime", () => ({
abortEmbeddedPiRun: vi.fn().mockReturnValue(false),
appendCronStyleCurrentTimeLine: (text: string) => text,
@@ -166,6 +199,7 @@ export function installWebAutoReplyUnitTestHooks(opts?: { pinDns?: boolean }) {
beforeEach(async () => {
vi.clearAllMocks();
resetWebAutoReplySessionSockets();
_resetBaileysMocks();
_resetLoadConfigMock();
if (opts?.pinDns) {

View File

@@ -12,6 +12,7 @@ import {
createMockWebListener,
createScriptedWebListenerFactory,
createWebListenerFactoryCapture,
getLastWebAutoReplySessionSocket,
installWebAutoReplyTestHomeHooks,
installWebAutoReplyUnitTestHooks,
makeSessionStore,
@@ -255,6 +256,92 @@ describe("web auto-reply connection", () => {
}
});
it("keeps quiet linked-device sessions open when transport frames keep arriving", async () => {
vi.useFakeTimers();
try {
const sleep = vi.fn(async () => {});
const scripted = createScriptedWebListenerFactory();
const { controller, run } = startWebAutoReplyMonitor({
monitorWebChannelFn: monitorWebChannel as never,
listenerFactory: scripted.listenerFactory,
sleep,
heartbeatSeconds: 60,
messageTimeoutMs: 30,
watchdogCheckMs: 5,
});
await vi.waitFor(
() => {
expect(scripted.getListenerCount()).toBe(1);
},
{ timeout: 250, interval: 2 },
);
const socket = getLastWebAutoReplySessionSocket();
await vi.advanceTimersByTimeAsync(20);
socket.ws.emit("frame");
await vi.advanceTimersByTimeAsync(20);
socket.ws.emit("frame");
await vi.advanceTimersByTimeAsync(20);
expect(scripted.getListenerCount()).toBe(1);
controller.abort();
scripted.resolveClose(0, { status: 499, isLoggedOut: false });
await Promise.resolve();
await run;
} finally {
vi.useRealTimers();
}
});
it("does not let transport frames mask application silence forever", async () => {
vi.useFakeTimers();
try {
const sleep = vi.fn(async () => {});
const scripted = createScriptedWebListenerFactory();
const { controller, run } = startWebAutoReplyMonitor({
monitorWebChannelFn: monitorWebChannel as never,
listenerFactory: scripted.listenerFactory,
sleep,
heartbeatSeconds: 60,
messageTimeoutMs: 30,
watchdogCheckMs: 5,
});
await vi.waitFor(
() => {
expect(scripted.getListenerCount()).toBe(1);
},
{ timeout: 250, interval: 2 },
);
const socket = getLastWebAutoReplySessionSocket();
for (let elapsedMs = 0; elapsedMs < 140; elapsedMs += 20) {
socket.ws.emit("frame");
await vi.advanceTimersByTimeAsync(20);
}
await vi.waitFor(
() => {
expect(scripted.getListenerCount()).toBeGreaterThanOrEqual(2);
},
{ timeout: 250, interval: 2 },
);
controller.abort();
scripted.resolveClose(scripted.getListenerCount() - 1, {
status: 499,
isLoggedOut: false,
error: "aborted",
});
await Promise.resolve();
await run;
} finally {
vi.useRealTimers();
}
});
it("gives a reconnected listener a fresh watchdog window", async () => {
vi.useFakeTimers();
try {

View File

@@ -280,6 +280,7 @@ export async function monitorWebChannel(
reconnectAttempts: snapshot.reconnectAttempts,
messagesHandled: snapshot.handledMessages,
lastInboundAt: snapshot.lastInboundAt,
lastTransportActivityAt: snapshot.lastTransportActivityAt,
authAgeMs,
uptimeMs: snapshot.uptimeMs,
...(minutesSinceLastMessage !== null && minutesSinceLastMessage > 30
@@ -297,20 +298,28 @@ export async function monitorWebChannel(
}
},
onWatchdogTimeout: (snapshot) => {
const watchdogBaselineAt = snapshot.lastInboundAt ?? snapshot.startedAt;
const minutesSinceLastMessage = Math.floor((Date.now() - watchdogBaselineAt) / 60000);
const now = Date.now();
const transportSilentMs = now - snapshot.lastTransportActivityAt;
const appBaselineAt = snapshot.lastInboundAt ?? snapshot.startedAt;
const minutesSinceTransportActivity = Math.floor(transportSilentMs / 60000);
const minutesSinceAppActivity = Math.floor((now - appBaselineAt) / 60000);
const watchdogReason =
transportSilentMs > messageTimeoutMs ? "transport-inactive" : "app-silent";
statusController.noteWatchdogStale();
heartbeatLogger.warn(
{
connectionId: snapshot.connectionId,
minutesSinceLastMessage,
watchdogReason,
minutesSinceTransportActivity,
minutesSinceAppActivity,
lastInboundAt: snapshot.lastInboundAt ? new Date(snapshot.lastInboundAt) : null,
lastTransportActivityAt: new Date(snapshot.lastTransportActivityAt),
messagesHandled: snapshot.handledMessages,
},
"Message timeout detected - forcing reconnect",
"WhatsApp watchdog timeout detected - forcing reconnect",
);
whatsappHeartbeatLog.warn(
`No messages received in ${minutesSinceLastMessage}m - restarting connection`,
`WhatsApp watchdog timeout (${watchdogReason}) - restarting connection`,
);
},
});

View File

@@ -40,8 +40,10 @@ export type WhatsAppLiveConnection = {
heartbeat: TimerHandle | null;
watchdogTimer: TimerHandle | null;
lastInboundAt: number | null;
lastTransportActivityAt: number;
handledMessages: number;
unregisterUnhandled: (() => void) | null;
unregisterTransportActivity: (() => void) | null;
backgroundTasks: Set<Promise<unknown>>;
closePromise: Promise<WebListenerCloseReason>;
resolveClose: (reason: WebListenerCloseReason) => void;
@@ -51,6 +53,7 @@ export type WhatsAppConnectionSnapshot = {
connectionId: string;
startedAt: number;
lastInboundAt: number | null;
lastTransportActivityAt: number;
handledMessages: number;
reconnectAttempts: number;
uptimeMs: number;
@@ -83,6 +86,12 @@ function createNeverResolvePromise<T>(): Promise<T> {
return new Promise<T>(() => {});
}
type SocketActivityEmitter = {
on?: (event: string, listener: (...args: unknown[]) => void) => void;
off?: (event: string, listener: (...args: unknown[]) => void) => void;
removeListener?: (event: string, listener: (...args: unknown[]) => void) => void;
};
function createLiveConnection(params: {
connectionId: string;
sock: WASocket;
@@ -108,8 +117,10 @@ function createLiveConnection(params: {
heartbeat: null,
watchdogTimer: null,
lastInboundAt: null,
lastTransportActivityAt: Date.now(),
handledMessages: 0,
unregisterUnhandled: null,
unregisterTransportActivity: null,
backgroundTasks: new Set<Promise<unknown>>(),
closePromise,
resolveClose: resolveClosePromise,
@@ -232,6 +243,7 @@ export class WhatsAppConnectionController {
private readonly heartbeatSeconds: number;
private readonly keepAlive: boolean;
private readonly messageTimeoutMs: number;
private readonly appSilenceTimeoutMs: number;
private readonly watchdogCheckMs: number;
private readonly verbose: boolean;
private readonly abortSignal?: AbortSignal;
@@ -262,6 +274,7 @@ export class WhatsAppConnectionController {
this.keepAlive = params.keepAlive;
this.heartbeatSeconds = params.heartbeatSeconds;
this.messageTimeoutMs = params.messageTimeoutMs;
this.appSilenceTimeoutMs = Math.max(params.messageTimeoutMs, params.messageTimeoutMs * 4);
this.watchdogCheckMs = params.watchdogCheckMs;
this.reconnectPolicy = params.reconnectPolicy;
this.abortSignal = params.abortSignal;
@@ -311,6 +324,14 @@ export class WhatsAppConnectionController {
}
this.current.handledMessages += 1;
this.current.lastInboundAt = timestamp;
this.current.lastTransportActivityAt = timestamp;
}
noteTransportActivity(timestamp = Date.now()): void {
if (!this.current) {
return;
}
this.current.lastTransportActivityAt = timestamp;
}
getCurrentSnapshot(
@@ -323,6 +344,7 @@ export class WhatsAppConnectionController {
connectionId: connection.connectionId,
startedAt: connection.startedAt,
lastInboundAt: connection.lastInboundAt,
lastTransportActivityAt: connection.lastTransportActivityAt,
handledMessages: connection.handledMessages,
reconnectAttempts: this.reconnectAttempts,
uptimeMs: Date.now() - connection.startedAt,
@@ -369,6 +391,7 @@ export class WhatsAppConnectionController {
const listener = await params.createListener({ sock, connection });
connection.listener = listener;
this.current = connection;
connection.unregisterTransportActivity = this.attachTransportActivityListener(sock);
registerWhatsAppConnectionController(this.accountId, this);
this.startTimers(connection, {
onHeartbeat: params.onHeartbeat,
@@ -383,6 +406,7 @@ export class WhatsAppConnectionController {
if (connection?.unregisterUnhandled) {
connection.unregisterUnhandled();
}
connection?.unregisterTransportActivity?.();
throw err;
}
}
@@ -515,6 +539,7 @@ export class WhatsAppConnectionController {
this.socketRef.current = null;
}
connection.unregisterUnhandled?.();
connection.unregisterTransportActivity?.();
if (connection.heartbeat) {
clearInterval(connection.heartbeat);
}
@@ -563,9 +588,14 @@ export class WhatsAppConnectionController {
}, this.heartbeatSeconds * 1000);
connection.watchdogTimer = setInterval(() => {
const baselineAt = connection.lastInboundAt ?? connection.startedAt;
const staleForMs = Date.now() - baselineAt;
if (staleForMs <= this.messageTimeoutMs) {
const now = Date.now();
const transportStaleForMs = now - connection.lastTransportActivityAt;
const appBaselineAt = connection.lastInboundAt ?? connection.startedAt;
const appSilentForMs = now - appBaselineAt;
if (
transportStaleForMs <= this.messageTimeoutMs &&
appSilentForMs <= this.appSilenceTimeoutMs
) {
return;
}
const snapshot = this.getCurrentSnapshot(connection);
@@ -581,6 +611,24 @@ export class WhatsAppConnectionController {
}, this.watchdogCheckMs);
}
private attachTransportActivityListener(sock: WASocket): (() => void) | null {
const ws = sock.ws as SocketActivityEmitter | undefined;
if (!ws || typeof ws.on !== "function") {
return null;
}
const noteActivity = () => this.noteTransportActivity();
ws.on("frame", noteActivity);
return () => {
if (typeof ws.off === "function") {
ws.off("frame", noteActivity);
return;
}
ws.removeListener?.("frame", noteActivity);
};
}
private stopDisconnectRetries(): void {
if (!this.disconnectRetryController.signal.aborted) {
this.disconnectRetryController.abort();