fix(whatsapp): report transport activity so stale-socket health detection works (#72656)

Merged via squash.

Prepared head SHA: 1b1920742c
Co-authored-by: Sathvik-1007 <195685832+Sathvik-1007@users.noreply.github.com>
Co-authored-by: mcaxtr <7562095+mcaxtr@users.noreply.github.com>
Reviewed-by: @mcaxtr
This commit is contained in:
Sathvik Gilakamsetty
2026-04-29 09:16:55 +05:30
committed by GitHub
parent 8edb99f0e3
commit 7ddd815e46
10 changed files with 289 additions and 8 deletions

View File

@@ -91,6 +91,7 @@ Docs: https://docs.openclaw.ai
- Channels/WhatsApp: restrict pairing verification replies to real inbound user content, preventing unsolicited prompts from receipts, typing indicators, presence updates, and other non-message Baileys upserts. Fixes #73797. (#73823) Thanks @hclsys.
- Configure/Ollama: show the configured Ollama model allowlist after Cloud only or Cloud + Local setup and skip slow per-model cloud metadata fetches. (#73995) Thanks @obviyus.
- Channels/WhatsApp: detect explicit group `@mentions` again when the bot's own E.164 is in `allowFrom`, so shared-number setups no longer skip group pings that directly mention the bot. Fixes #49317. (#73453) Thanks @juan-flores077.
- WhatsApp/reliability: publish real transport-liveness into WhatsApp channel status and force earlier reconnects on silent transport stalls, so quiet healthy sessions stay connected while wedged sockets recover before the later remote 408 path. (#72656) Thanks @Sathvik-1007.
## 2026.4.27

View File

@@ -150,6 +150,7 @@ OpenClaw recommends running WhatsApp on a separate number when possible. (The ch
- Baileys socket timings are explicit under `web.whatsapp.*`: `keepAliveIntervalMs` controls WhatsApp Web application pings, `connectTimeoutMs` controls the opening handshake timeout, and `defaultQueryTimeoutMs` controls Baileys query timeouts.
- Outbound sends require an active WhatsApp listener for the target account.
- Status and broadcast chats are ignored (`@status`, `@broadcast`).
- The reconnect watchdog follows WhatsApp Web transport activity, not only inbound app-message volume: quiet linked-device sessions stay up while transport frames continue, but a transport stall forces reconnect well before the later remote disconnect path.
- Direct chats use DM session rules (`session.dmScope`; default `main` collapses DMs to the agent main session).
- Group sessions are isolated (`agent:<agentId>:whatsapp:group:<jid>`).
- WhatsApp Web transport honors standard proxy environment variables on the gateway host (`HTTPS_PROXY`, `HTTP_PROXY`, `NO_PROXY` / lowercase variants). Prefer host-level proxy config over channel-specific WhatsApp proxy settings.

View File

@@ -45,8 +45,13 @@ type WebAutoReplyMonitorHarness = {
run: Promise<unknown>;
};
type MockSessionSocket = {
ev: { on: ReturnType<typeof vi.fn>; off: ReturnType<typeof vi.fn> };
ws: EventEmitter & { close: ReturnType<typeof vi.fn> };
ev: {
on: ReturnType<typeof vi.fn>;
off: ReturnType<typeof vi.fn>;
};
ws: EventEmitter & {
close: ReturnType<typeof vi.fn>;
};
user: { id: string };
};
@@ -68,7 +73,7 @@ vi.mock("./session.js", async () => {
createWaSocket: vi.fn(async () => {
const ws = new EventEmitter() as MockSessionSocket["ws"];
ws.close = vi.fn();
const sock: MockSessionSocket = {
const socket: MockSessionSocket = {
ev: {
on: vi.fn(),
off: vi.fn(),
@@ -76,8 +81,8 @@ vi.mock("./session.js", async () => {
ws,
user: { id: "123@s.whatsapp.net" },
};
getSessionSockets().push(sock);
return sock;
getSessionSockets().push(socket);
return socket;
}),
waitForWaConnection: vi.fn().mockResolvedValue(undefined),
};
@@ -309,6 +314,7 @@ export function startWebAutoReplyMonitor(params: {
sleep: UnknownMock | AsyncUnknownMock;
signal?: AbortSignal;
heartbeatSeconds?: number;
transportTimeoutMs?: number;
messageTimeoutMs?: number;
watchdogCheckMs?: number;
reconnect?: { initialMs: number; maxMs: number; maxAttempts: number; factor: number };
@@ -326,6 +332,7 @@ export function startWebAutoReplyMonitor(params: {
params.signal ?? controller.signal,
{
heartbeatSeconds: params.heartbeatSeconds ?? 1,
transportTimeoutMs: params.transportTimeoutMs,
messageTimeoutMs: params.messageTimeoutMs,
watchdogCheckMs: params.watchdogCheckMs,
reconnect: params.reconnect ?? { initialMs: 10, maxMs: 10, maxAttempts: 3, factor: 1.1 },

View File

@@ -407,7 +407,92 @@ describe("web auto-reply connection", () => {
socket.ws.emit("frame");
await vi.advanceTimersByTimeAsync(20);
}
await vi.waitFor(
() => {
expect(scripted.getListenerCount()).toBeGreaterThanOrEqual(2);
},
{ timeout: 250, interval: 2 },
);
controller.abort();
scripted.resolveClose(scripted.getListenerCount() - 1, {
status: 499,
isLoggedOut: false,
error: "aborted",
});
await Promise.resolve();
await run;
} finally {
vi.useRealTimers();
}
});
it("publishes frame-driven transport activity for quiet sessions", async () => {
vi.useFakeTimers();
try {
const sleep = vi.fn(async () => {});
const statuses: Array<Record<string, unknown>> = [];
const scripted = createScriptedWebListenerFactory();
const { controller, run } = startWebAutoReplyMonitor({
monitorWebChannelFn: monitorWebChannel as never,
listenerFactory: scripted.listenerFactory,
sleep,
heartbeatSeconds: 1,
transportTimeoutMs: 60_000,
messageTimeoutMs: 60_000,
watchdogCheckMs: 5,
statusSink: (next) => statuses.push({ ...next }),
});
await vi.waitFor(
() => {
expect(scripted.getListenerCount()).toBe(1);
},
{ timeout: 250, interval: 2 },
);
const initialTransportAt = Number(statuses.at(-1)?.lastTransportActivityAt ?? 0);
const socket = getLastWebAutoReplySessionSocket();
await vi.advanceTimersByTimeAsync(250);
socket.ws.emit("frame");
await vi.advanceTimersByTimeAsync(1_000);
const lastTransportAt = Number(statuses.at(-1)?.lastTransportActivityAt ?? 0);
expect(lastTransportAt).toBeGreaterThan(initialTransportAt);
controller.abort();
scripted.resolveClose(0, { status: 499, isLoggedOut: false, error: "aborted" });
await Promise.resolve();
await run;
} finally {
vi.useRealTimers();
}
});
it("reconnects on transport stall before the long app-silence window", async () => {
vi.useFakeTimers();
try {
const sleep = vi.fn(async () => {});
const scripted = createScriptedWebListenerFactory();
const { controller, run } = startWebAutoReplyMonitor({
monitorWebChannelFn: monitorWebChannel as never,
listenerFactory: scripted.listenerFactory,
sleep,
heartbeatSeconds: 1,
transportTimeoutMs: 30,
messageTimeoutMs: 3_000,
watchdogCheckMs: 5,
});
await vi.waitFor(
() => {
expect(scripted.getListenerCount()).toBe(1);
},
{ timeout: 250, interval: 2 },
);
await vi.advanceTimersByTimeAsync(36);
await Promise.resolve();
await vi.waitFor(
() => {
expect(scripted.getListenerCount()).toBeGreaterThanOrEqual(2);

View File

@@ -0,0 +1,64 @@
import { describe, expect, it } from "vitest";
import { createWebChannelStatusController } from "./monitor-state.js";
describe("createWebChannelStatusController", () => {
it("sets lastTransportActivityAt on noteConnected", () => {
const patches: Record<string, unknown>[] = [];
const controller = createWebChannelStatusController((s) => patches.push({ ...s }));
controller.noteConnected(1000);
const last = patches.at(-1)!;
expect(last.connected).toBe(true);
expect(last.lastTransportActivityAt).toBe(1000);
});
it("updates lastTransportActivityAt on noteInbound", () => {
const patches: Record<string, unknown>[] = [];
const controller = createWebChannelStatusController((s) => patches.push({ ...s }));
controller.noteConnected(1000);
controller.noteInbound(2000);
const last = patches.at(-1)!;
expect(last.lastTransportActivityAt).toBe(2000);
});
it("updates lastTransportActivityAt from explicit transport activity", () => {
const patches: Record<string, unknown>[] = [];
const controller = createWebChannelStatusController((s) => patches.push({ ...s }));
controller.noteConnected(1000);
controller.noteTransportActivity(3000);
const last = patches.at(-1)!;
expect(last.lastTransportActivityAt).toBe(3000);
});
it("does not set lastTransportActivityAt on noteWatchdogStale", () => {
const patches: Record<string, unknown>[] = [];
const controller = createWebChannelStatusController((s) => patches.push({ ...s }));
controller.noteConnected(1000);
controller.noteWatchdogStale(5000);
const last = patches.at(-1)!;
// Watchdog staleness should not refresh transport activity — it means
// the check loop is running but the socket itself is idle/stale.
expect(last.lastTransportActivityAt).toBe(1000);
});
it("produces snapshots that enable stale-socket health detection", () => {
const patches: Record<string, unknown>[] = [];
const controller = createWebChannelStatusController((s) => patches.push({ ...s }));
controller.noteConnected(1000);
const last = patches.at(-1)!;
// The gateway health policy checks `connected === true && lastTransportActivityAt != null`
// to decide whether to run stale-socket detection. Both must be present.
expect(last.connected).toBe(true);
expect(last.lastTransportActivityAt).not.toBeNull();
expect(typeof last.lastTransportActivityAt).toBe("number");
});
});

View File

@@ -1,4 +1,7 @@
import { createConnectedChannelStatusPatch } from "openclaw/plugin-sdk/gateway-runtime";
import {
createConnectedChannelStatusPatch,
createTransportActivityStatusPatch,
} from "openclaw/plugin-sdk/gateway-runtime";
import type { WebChannelHealthState, WebChannelStatus } from "./types.js";
function cloneStatus(status: WebChannelStatus): WebChannelStatus {
@@ -35,6 +38,7 @@ export function createWebChannelStatusController(statusSink?: (status: WebChanne
snapshot: () => status,
noteConnected(at = Date.now()) {
Object.assign(status, createConnectedChannelStatusPatch(at));
Object.assign(status, createTransportActivityStatusPatch(at));
status.lastError = null;
status.healthState = "healthy";
emit();
@@ -43,11 +47,19 @@ export function createWebChannelStatusController(statusSink?: (status: WebChanne
status.lastInboundAt = at;
status.lastMessageAt = at;
status.lastEventAt = at;
Object.assign(status, createTransportActivityStatusPatch(at));
if (status.connected) {
status.healthState = "healthy";
}
emit();
},
noteTransportActivity(at = Date.now()) {
if (status.lastTransportActivityAt === at) {
return;
}
Object.assign(status, createTransportActivityStatusPatch(at));
emit();
},
noteWatchdogStale(at = Date.now()) {
status.lastEventAt = at;
if (status.connected) {

View File

@@ -134,6 +134,7 @@ async function clearTerminalWebAuthState(params: {
);
}
}
const DEFAULT_TRANSPORT_TIMEOUT_MS = 5 * 60 * 1000;
export async function monitorWebChannel(
verbose: boolean,
@@ -220,6 +221,7 @@ export async function monitorWebChannel(
};
process.once("SIGINT", handleSigint);
const transportTimeoutMs = tuning.transportTimeoutMs ?? DEFAULT_TRANSPORT_TIMEOUT_MS;
const messageTimeoutMs = tuning.messageTimeoutMs ?? 30 * 60 * 1000;
const watchdogCheckMs = tuning.watchdogCheckMs ?? 60 * 1000;
const controller = new WhatsAppConnectionController({
@@ -228,6 +230,7 @@ export async function monitorWebChannel(
verbose,
keepAlive,
heartbeatSeconds,
transportTimeoutMs,
messageTimeoutMs,
watchdogCheckMs,
reconnectPolicy,
@@ -328,6 +331,7 @@ export async function monitorWebChannel(
? { minutesSinceLastMessage }
: {}),
};
statusController.noteTransportActivity(snapshot.lastTransportActivityAt);
if (minutesSinceLastMessage && minutesSinceLastMessage > 30) {
heartbeatLogger.warn(
@@ -345,7 +349,7 @@ export async function monitorWebChannel(
const minutesSinceTransportActivity = Math.floor(transportSilentMs / 60000);
const minutesSinceAppActivity = Math.floor((now - appBaselineAt) / 60000);
const watchdogReason =
transportSilentMs > messageTimeoutMs ? "transport-inactive" : "app-silent";
transportSilentMs > transportTimeoutMs ? "transport-inactive" : "app-silent";
statusController.noteWatchdogStale();
heartbeatLogger.warn(
{

View File

@@ -27,6 +27,7 @@ export type WebChannelStatus = {
lastInboundAt?: number | null;
lastMessageAt?: number | null;
lastEventAt?: number | null;
lastTransportActivityAt?: number | null;
lastError?: string | null;
healthState?: WebChannelHealthState;
};
@@ -35,6 +36,7 @@ export type WebMonitorTuning = {
reconnect?: Partial<ReconnectPolicy>;
socketTiming?: WhatsAppSocketTimingOptions;
heartbeatSeconds?: number;
transportTimeoutMs?: number;
messageTimeoutMs?: number;
watchdogCheckMs?: number;
sleep?: (ms: number, signal?: AbortSignal) => Promise<void>;

View File

@@ -1,3 +1,4 @@
import { EventEmitter } from "node:events";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { getRegisteredWhatsAppConnectionController } from "./connection-controller-registry.js";
import { WhatsAppConnectionController } from "./connection-controller.js";
@@ -24,6 +25,14 @@ function createListenerStub(messageId = "ok") {
};
}
function createSocketWithTransportEmitter() {
const ws = new EventEmitter() as EventEmitter & { close: ReturnType<typeof vi.fn> };
ws.close = vi.fn();
return {
ws,
};
}
describe("WhatsAppConnectionController", () => {
let controller: WhatsAppConnectionController;
@@ -35,6 +44,7 @@ describe("WhatsAppConnectionController", () => {
verbose: false,
keepAlive: false,
heartbeatSeconds: 30,
transportTimeoutMs: 60_000,
messageTimeoutMs: 60_000,
watchdogCheckMs: 5_000,
reconnectPolicy: {
@@ -100,6 +110,7 @@ describe("WhatsAppConnectionController", () => {
verbose: false,
keepAlive: false,
heartbeatSeconds: 30,
transportTimeoutMs: 60_000,
messageTimeoutMs: 60_000,
watchdogCheckMs: 5_000,
reconnectPolicy: {
@@ -126,6 +137,7 @@ describe("WhatsAppConnectionController", () => {
verbose: false,
keepAlive: false,
heartbeatSeconds: 30,
transportTimeoutMs: 60_000,
messageTimeoutMs: 60_000,
watchdogCheckMs: 5_000,
reconnectPolicy: {
@@ -154,4 +166,94 @@ describe("WhatsAppConnectionController", () => {
await liveController.shutdown();
}
});
it("tracks real websocket frame activity in the connection snapshot", async () => {
vi.useFakeTimers();
const controller = new WhatsAppConnectionController({
accountId: "work",
authDir: "/tmp/wa-auth",
verbose: false,
keepAlive: true,
heartbeatSeconds: 1,
transportTimeoutMs: 60_000,
messageTimeoutMs: 60_000,
watchdogCheckMs: 5_000,
reconnectPolicy: {
initialMs: 250,
maxMs: 1_000,
factor: 2,
jitter: 0,
maxAttempts: 5,
},
});
try {
const sock = createSocketWithTransportEmitter();
createWaSocketMock.mockResolvedValueOnce(sock as never);
waitForWaConnectionMock.mockResolvedValueOnce(undefined);
const snapshots: Array<{ lastTransportActivityAt: number }> = [];
await controller.openConnection({
connectionId: "conn-frame-activity",
createListener: async () => createListenerStub() as never,
onHeartbeat: (snapshot) => snapshots.push(snapshot),
});
await vi.advanceTimersByTimeAsync(1_000);
const firstSnapshot = snapshots.at(-1);
expect(firstSnapshot?.lastTransportActivityAt).toBeTypeOf("number");
const firstTransportAt = firstSnapshot?.lastTransportActivityAt ?? 0;
await vi.advanceTimersByTimeAsync(250);
sock.ws.emit("frame");
await vi.advanceTimersByTimeAsync(1_000);
const lastSnapshot = snapshots.at(-1);
expect(lastSnapshot?.lastTransportActivityAt).toBeGreaterThan(firstTransportAt);
} finally {
await controller.shutdown();
vi.useRealTimers();
}
});
it("forces reconnect on transport stall before the long app-silence window", async () => {
vi.useFakeTimers();
const controller = new WhatsAppConnectionController({
accountId: "work",
authDir: "/tmp/wa-auth",
verbose: false,
keepAlive: true,
heartbeatSeconds: 1,
transportTimeoutMs: 30,
messageTimeoutMs: 3_000,
watchdogCheckMs: 5,
reconnectPolicy: {
initialMs: 250,
maxMs: 1_000,
factor: 2,
jitter: 0,
maxAttempts: 5,
},
});
try {
const sock = createSocketWithTransportEmitter();
createWaSocketMock.mockResolvedValueOnce(sock as never);
waitForWaConnectionMock.mockResolvedValueOnce(undefined);
const timeouts: string[] = [];
await controller.openConnection({
connectionId: "conn-transport-timeout",
createListener: async () => createListenerStub() as never,
onWatchdogTimeout: () => timeouts.push("timeout"),
});
await vi.advanceTimersByTimeAsync(40);
expect(timeouts.length).toBeGreaterThanOrEqual(1);
} finally {
await controller.shutdown();
vi.useRealTimers();
}
});
});

View File

@@ -245,6 +245,7 @@ export class WhatsAppConnectionController {
private readonly reconnectPolicy: ReconnectPolicy;
private readonly heartbeatSeconds: number;
private readonly keepAlive: boolean;
private readonly transportTimeoutMs: number;
private readonly messageTimeoutMs: number;
private readonly appSilenceTimeoutMs: number;
private readonly watchdogCheckMs: number;
@@ -265,6 +266,7 @@ export class WhatsAppConnectionController {
verbose: boolean;
keepAlive: boolean;
heartbeatSeconds: number;
transportTimeoutMs: number;
messageTimeoutMs: number;
watchdogCheckMs: number;
reconnectPolicy: ReconnectPolicy;
@@ -278,6 +280,7 @@ export class WhatsAppConnectionController {
this.verbose = params.verbose;
this.keepAlive = params.keepAlive;
this.heartbeatSeconds = params.heartbeatSeconds;
this.transportTimeoutMs = params.transportTimeoutMs;
this.messageTimeoutMs = params.messageTimeoutMs;
this.appSilenceTimeoutMs = Math.max(params.messageTimeoutMs, params.messageTimeoutMs * 4);
this.watchdogCheckMs = params.watchdogCheckMs;
@@ -600,7 +603,7 @@ export class WhatsAppConnectionController {
const appBaselineAt = connection.lastInboundAt ?? connection.startedAt;
const appSilentForMs = now - appBaselineAt;
if (
transportStaleForMs <= this.messageTimeoutMs &&
transportStaleForMs <= this.transportTimeoutMs &&
appSilentForMs <= this.appSilenceTimeoutMs
) {
return;