refactor(whatsapp): centralize account connection lifecycle (#65427)

* refactor(whatsapp): centralize account connection lifecycle

* fix(whatsapp): harden controller open failure cleanup

* refactor(whatsapp): remove active listener fallback path

* fix(whatsapp): isolate controller registry state

* debug(whatsapp): trace typing presence updates

* docs(changelog): add whatsapp lifecycle fix note

* debug(whatsapp): log global presence mode

* chore(whatsapp): remove debug presence logs

---------

Co-authored-by: Vincent Koc <vincentkoc@ieee.org>
This commit is contained in:
Marcus Castro
2026-04-12 15:24:49 -03:00
committed by GitHub
parent c37e49f275
commit aa023e4283
19 changed files with 1354 additions and 730 deletions

View File

@@ -51,6 +51,7 @@ Docs: https://docs.openclaw.ai
- Memory/QMD: allow channel sessions in the shipped default QMD scope, while still denying groups.
- Memory/QMD: stop registering the legacy lowercase root memory file as a separate default collection, so QMD now prefers `MEMORY.md` and the `memory/` tree without duplicate collection-add warnings.
- Memory/memory-core: watch the `memory` directory directly and ignore non-markdown churn so nested note changes still sync on macOS + Node 25 environments where recursive `memory/**/*.md` glob watching fails. (#64711) Thanks @jasonxargs-boop and @vincentkoc.
- WhatsApp: centralize per-account connection ownership so reconnects, login recovery, and outbound readiness stay attached to the live socket instead of drifting across monitor and login paths. (#65290) Thanks @mcaxtr and @vincentkoc.
## 2026.4.11

View File

@@ -1,8 +1,5 @@
import { afterEach, describe, expect, it, vi } from "vitest";
// Mock loadConfig so the single-arg setActiveWebListener overload resolves
// the configured default account as "work" (matching the regression test).
// All other tests pass explicit accountIds and are unaffected by this mock.
vi.mock("openclaw/plugin-sdk/config-runtime", () => ({
loadConfig: () => ({
channels: { whatsapp: { accounts: { work: { enabled: true } }, defaultAccount: "work" } },
@@ -17,14 +14,6 @@ async function importActiveListenerModule(cacheBust: string): Promise<ActiveList
return (await import(`${activeListenerModuleUrl}?t=${cacheBust}`)) as ActiveListenerModule;
}
afterEach(async () => {
const mod = await importActiveListenerModule(`cleanup-${Date.now()}`);
mod.setActiveWebListener(null);
mod.setActiveWebListener("work", null);
mod.setActiveWebListener("default", null);
});
/** Minimal listener stub */
function makeListener() {
return {
sendMessage: vi.fn(async () => ({ messageId: "msg-1" })),
@@ -34,92 +23,53 @@ function makeListener() {
};
}
describe("active WhatsApp listener singleton", () => {
it("shares listeners across duplicate module instances (bundle-fragmentation fix)", async () => {
// Simulates the scenario where two bundled copies of active-listener.ts are loaded
// (e.g. channel-web-*.js calls setActiveWebListener, outbound-*.js calls
// requireActiveWebListener). Without resolveGlobalSingleton they would each hold
// their own Map and the listener would never be found by the outbound path.
afterEach(() => {
vi.doUnmock("./connection-controller-registry.js");
});
describe("active WhatsApp listener view", () => {
it("reads controller-backed state across duplicate module instances", async () => {
const listener = makeListener();
vi.doMock("./connection-controller-registry.js", () => ({
getRegisteredWhatsAppConnectionController: (accountId: string) =>
accountId === "work"
? {
getActiveListener: () => listener,
}
: null,
}));
const first = await importActiveListenerModule(`first-${Date.now()}`);
const second = await importActiveListenerModule(`second-${Date.now()}`);
const listener = makeListener();
first.setActiveWebListener("work", listener);
expect(first.getActiveWebListener("work")).toBe(listener);
expect(second.getActiveWebListener("work")).toBe(listener);
expect(second.requireActiveWebListener("work")).toEqual({
accountId: "work",
listener,
});
});
it("single-arg overload registers under configured default account, not always 'default'", async () => {
// Regression: setActiveWebListener(listener) used DEFAULT_ACCOUNT_ID ("default")
// even when the configured default account is named "work". This caused
// requireActiveWebListener("work") to throw while the listener was silently
// registered under the wrong key.
const mod = await importActiveListenerModule(`named-account-${Date.now()}`);
it("resolves the configured default account when accountId is omitted", async () => {
const listener = makeListener();
vi.doMock("./connection-controller-registry.js", () => ({
getRegisteredWhatsAppConnectionController: (accountId: string) =>
accountId === "work"
? {
getActiveListener: () => listener,
}
: null,
}));
// Single-arg call — should resolve accountId from loadConfig() default, which
// vitest config maps to "work" (see mock below).
mod.setActiveWebListener(listener);
const mod = await importActiveListenerModule(`default-${Date.now()}`);
// "work" must be resolvable — previously this threw
expect(mod.requireActiveWebListener("work")).toEqual({
accountId: "work",
listener,
});
expect(mod.resolveWebAccountId()).toBe("work");
expect(mod.getActiveWebListener()).toBe(listener);
});
it("single-arg overload still works when default account is 'default'", async () => {
// Backward-compat: configs that rely on the "default" account name must
// continue to work after the fix. Use single-arg overload with a temporary
// spy that returns "default" as the configured default account.
const configRuntime = await import("openclaw/plugin-sdk/config-runtime");
const spy = vi.spyOn(configRuntime, "loadConfig").mockReturnValue({
channels: {
whatsapp: { accounts: { default: { enabled: true } }, defaultAccount: "default" },
},
} as ReturnType<typeof configRuntime.loadConfig>);
it("returns null when the controller has no active listener for the account", async () => {
vi.doMock("./connection-controller-registry.js", () => ({
getRegisteredWhatsAppConnectionController: () => null,
}));
try {
const mod = await importActiveListenerModule(`default-account-${Date.now()}`);
const listener = makeListener();
// Single-arg call — should resolve to "default" via the spy
mod.setActiveWebListener(listener);
expect(mod.requireActiveWebListener("default")).toEqual({
accountId: "default",
listener,
});
// The legacy no-arg lookup (undefined → "default") must also work
expect(mod.requireActiveWebListener()).toEqual({
accountId: "default",
listener,
});
} finally {
spy.mockRestore();
}
});
it("requireActiveWebListener throws a clear error when listener is missing", async () => {
const mod = await importActiveListenerModule(`missing-${Date.now()}`);
expect(() => mod.requireActiveWebListener("work")).toThrowError(
/No active WhatsApp Web listener \(account: work\)/,
);
});
it("setActiveWebListener with null removes the listener", async () => {
const mod = await importActiveListenerModule(`remove-${Date.now()}`);
const listener = makeListener();
mod.setActiveWebListener("work", listener);
expect(mod.getActiveWebListener("work")).toBe(listener);
mod.setActiveWebListener("work", null);
expect(mod.getActiveWebListener("work")).toBeNull();
});
});

View File

@@ -1,109 +1,15 @@
import { formatCliCommand } from "openclaw/plugin-sdk/cli-runtime";
import { loadConfig } from "openclaw/plugin-sdk/config-runtime";
import type { PollInput } from "openclaw/plugin-sdk/media-runtime";
import { DEFAULT_ACCOUNT_ID } from "openclaw/plugin-sdk/routing";
import { resolveDefaultWhatsAppAccountId } from "./accounts.js";
import { getRegisteredWhatsAppConnectionController } from "./connection-controller-registry.js";
import type { ActiveWebListener, ActiveWebSendOptions } from "./inbound/types.js";
export type ActiveWebSendOptions = {
gifPlayback?: boolean;
accountId?: string;
fileName?: string;
};
export type ActiveWebListener = {
sendMessage: (
to: string,
text: string,
mediaBuffer?: Buffer,
mediaType?: string,
options?: ActiveWebSendOptions,
) => Promise<{ messageId: string }>;
sendPoll: (to: string, poll: PollInput) => Promise<{ messageId: string }>;
sendReaction: (
chatJid: string,
messageId: string,
emoji: string,
fromMe: boolean,
participant?: string,
) => Promise<void>;
sendComposingTo: (to: string) => Promise<void>;
close?: () => Promise<void>;
};
// WhatsApp shares a live Baileys socket between inbound and outbound runtime
// chunks. Keep this on a direct globalThis symbol lookup; the generic
// singleton helper was previously inlined during code-splitting and split the
// listener state back into per-chunk Maps.
const WHATSAPP_ACTIVE_LISTENER_STATE_KEY = Symbol.for("openclaw.whatsapp.activeListenerState");
type ActiveListenerState = {
listeners: Map<string, ActiveWebListener>;
current: ActiveWebListener | null;
};
const g = globalThis as unknown as Record<symbol, ActiveListenerState | undefined>;
if (!g[WHATSAPP_ACTIVE_LISTENER_STATE_KEY]) {
g[WHATSAPP_ACTIVE_LISTENER_STATE_KEY] = {
listeners: new Map<string, ActiveWebListener>(),
current: null,
};
}
const state = g[WHATSAPP_ACTIVE_LISTENER_STATE_KEY];
function setCurrentListener(listener: ActiveWebListener | null): void {
state.current = listener;
}
export type { ActiveWebListener, ActiveWebSendOptions } from "./inbound/types.js";
export function resolveWebAccountId(accountId?: string | null): string {
return (accountId ?? "").trim() || resolveDefaultWhatsAppAccountId(loadConfig());
}
export function requireActiveWebListener(accountId?: string | null): {
accountId: string;
listener: ActiveWebListener;
} {
const id = resolveWebAccountId(accountId);
const listener = state.listeners.get(id) ?? null;
if (!listener) {
throw new Error(
`No active WhatsApp Web listener (account: ${id}). Start the gateway, then link WhatsApp with: ${formatCliCommand(`openclaw channels login --channel whatsapp --account ${id}`)}.`,
);
}
return { accountId: id, listener };
}
export function setActiveWebListener(listener: ActiveWebListener | null): void;
export function setActiveWebListener(
accountId: string | null | undefined,
listener: ActiveWebListener | null,
): void;
export function setActiveWebListener(
accountIdOrListener: string | ActiveWebListener | null | undefined,
maybeListener?: ActiveWebListener | null,
): void {
const { accountId, listener } =
typeof accountIdOrListener === "string"
? { accountId: accountIdOrListener, listener: maybeListener ?? null }
: {
// Resolve the configured default account name so that callers using the
// single-arg overload register under the right key (e.g. "work"), not
// always under DEFAULT_ACCOUNT_ID ("default").
accountId: resolveDefaultWhatsAppAccountId(loadConfig()),
listener: accountIdOrListener ?? null,
};
const id = resolveWebAccountId(accountId);
if (!listener) {
state.listeners.delete(id);
} else {
state.listeners.set(id, listener);
}
if (id === DEFAULT_ACCOUNT_ID) {
setCurrentListener(listener);
}
}
export function getActiveWebListener(accountId?: string | null): ActiveWebListener | null {
const id = resolveWebAccountId(accountId);
return state.listeners.get(id) ?? null;
return getRegisteredWhatsAppConnectionController(id)?.getActiveListener() ?? null;
}

View File

@@ -40,6 +40,22 @@ type WebAutoReplyMonitorHarness = {
export const TEST_NET_IP = "93.184.216.34";
vi.mock("./session.js", async () => {
const actual = await vi.importActual<typeof import("./session.js")>("./session.js");
return {
...actual,
createWaSocket: vi.fn(async () => ({
ev: {
on: vi.fn(),
off: vi.fn(),
},
ws: { close: vi.fn() },
user: { id: "123@s.whatsapp.net" },
})),
waitForWaConnection: vi.fn().mockResolvedValue(undefined),
};
});
vi.mock("openclaw/plugin-sdk/agent-runtime", () => ({
abortEmbeddedPiRun: vi.fn().mockReturnValue(false),
appendCronStyleCurrentTimeLine: (text: string) => text,

View File

@@ -36,8 +36,12 @@ async function startWatchdogScenario(params: {
watchdogCheckMs: 5,
});
await Promise.resolve();
expect(scripted.getListenerCount()).toBe(1);
await vi.waitFor(
() => {
expect(scripted.getListenerCount()).toBe(1);
},
{ timeout: 250, interval: 2 },
);
await vi.waitFor(
() => {
expect(scripted.getOnMessage()).toBeTypeOf("function");
@@ -95,8 +99,12 @@ describe("web auto-reply connection", () => {
reconnect: scenario.reconnect,
});
await Promise.resolve();
expect(scripted.getListenerCount()).toBe(1);
await vi.waitFor(
() => {
expect(scripted.getListenerCount()).toBe(1);
},
{ timeout: 250, interval: 2 },
);
scripted.resolveClose(0);
await vi.waitFor(
@@ -130,8 +138,12 @@ describe("web auto-reply connection", () => {
reconnect: { initialMs: 10, maxMs: 10, maxAttempts: 3, factor: 1.1 },
});
await Promise.resolve();
expect(scripted.getListenerCount()).toBe(1);
await vi.waitFor(
() => {
expect(scripted.getListenerCount()).toBe(1);
},
{ timeout: 250, interval: 2 },
);
scripted.resolveClose(0, {
status: 440,
isLoggedOut: false,

View File

@@ -1,7 +1,5 @@
import type { WASocket } from "@whiskeysockets/baileys";
import { resolveInboundDebounceMs } from "openclaw/plugin-sdk/channel-inbound";
import { formatCliCommand } from "openclaw/plugin-sdk/cli-runtime";
import { waitForever } from "openclaw/plugin-sdk/cli-runtime";
import { hasControlCommand } from "openclaw/plugin-sdk/command-detection";
import { drainPendingDeliveries } from "openclaw/plugin-sdk/infra-runtime";
import { enqueueSystemEvent } from "openclaw/plugin-sdk/infra-runtime";
@@ -16,10 +14,12 @@ import {
type RuntimeEnv,
} from "openclaw/plugin-sdk/runtime-env";
import { resolveWhatsAppAccount, resolveWhatsAppMediaMaxBytes } from "../accounts.js";
import { setActiveWebListener } from "../active-listener.js";
import { monitorWebInbox } from "../inbound.js";
import {
computeBackoff,
WhatsAppConnectionController,
type ManagedWhatsAppListener,
} from "../connection-controller.js";
import { attachWebInboxToSocket } from "../inbound/monitor.js";
import {
newConnectionId,
resolveHeartbeatSeconds,
resolveReconnectPolicy,
@@ -41,30 +41,6 @@ function isNonRetryableWebCloseStatus(statusCode: unknown): boolean {
return statusCode === 440;
}
type ActiveConnectionRun = {
connectionId: string;
startedAt: number;
heartbeat: NodeJS.Timeout | null;
watchdogTimer: NodeJS.Timeout | null;
lastInboundAt: number | null;
handledMessages: number;
unregisterUnhandled: (() => void) | null;
backgroundTasks: Set<Promise<unknown>>;
};
function createActiveConnectionRun(): ActiveConnectionRun {
return {
connectionId: newConnectionId(),
startedAt: Date.now(),
heartbeat: null,
watchdogTimer: null,
lastInboundAt: null,
handledMessages: 0,
unregisterUnhandled: null,
backgroundTasks: new Set<Promise<unknown>>(),
};
}
type ReplyResolver = typeof import("./reply-resolver.runtime.js").getReplyFromConfig;
let replyResolverRuntimePromise: Promise<typeof import("./reply-resolver.runtime.js")> | null =
@@ -85,7 +61,7 @@ function isNoListenerReconnectError(lastError?: string): boolean {
export async function monitorWebChannel(
verbose: boolean,
listenerFactory: typeof monitorWebInbox | undefined = monitorWebInbox,
listenerFactory: typeof attachWebInboxToSocket | undefined = attachWebInboxToSocket,
keepAlive = true,
replyResolver?: ReplyResolver,
runtime: RuntimeEnv = defaultRuntime,
@@ -153,13 +129,6 @@ export async function monitorWebChannel(
tuning.sleep ??
((ms: number, signal?: AbortSignal) => sleepWithAbort(ms, signal ?? abortSignal));
const stopRequested = () => abortSignal?.aborted === true;
const abortPromise =
abortSignal &&
new Promise<"aborted">((resolve) =>
abortSignal.addEventListener("abort", () => resolve("aborted"), {
once: true,
}),
);
// Avoid noisy MaxListenersExceeded warnings in test environments where
// multiple gateway instances may be constructed.
@@ -174,378 +143,290 @@ export async function monitorWebChannel(
};
process.once("SIGINT", handleSigint);
let reconnectAttempts = 0;
const socketRef: { current: WASocket | null } = { current: null };
const disconnectRetryController = new AbortController();
const stopDisconnectRetries = () => {
if (!disconnectRetryController.signal.aborted) {
disconnectRetryController.abort();
}
};
if (abortSignal) {
if (abortSignal.aborted) {
stopDisconnectRetries();
} else {
abortSignal.addEventListener("abort", stopDisconnectRetries, { once: true });
}
}
const messageTimeoutMs = tuning.messageTimeoutMs ?? 30 * 60 * 1000;
const watchdogCheckMs = tuning.watchdogCheckMs ?? 60 * 1000;
const controller = new WhatsAppConnectionController({
accountId: account.accountId,
authDir: account.authDir,
verbose,
keepAlive,
heartbeatSeconds,
messageTimeoutMs,
watchdogCheckMs,
reconnectPolicy,
abortSignal,
sleep,
isNonRetryableStatus: isNonRetryableWebCloseStatus,
});
while (true) {
if (stopRequested()) {
break;
}
const active = createActiveConnectionRun();
// Watchdog to detect stuck message processing (e.g., event emitter died).
// Tuning overrides are test-oriented; production defaults remain unchanged.
const MESSAGE_TIMEOUT_MS = tuning.messageTimeoutMs ?? 30 * 60 * 1000; // 30m default
const WATCHDOG_CHECK_MS = tuning.watchdogCheckMs ?? 60 * 1000; // 1m default
const onMessage = createWebOnMessageHandler({
cfg,
verbose,
connectionId: active.connectionId,
maxMediaBytes,
groupHistoryLimit,
groupHistories,
groupMemberNames,
echoTracker,
backgroundTasks: active.backgroundTasks,
replyResolver: activeReplyResolver,
replyLogger,
baseMentionConfig,
account,
});
const inboundDebounceMs = resolveInboundDebounceMs({ cfg, channel: "whatsapp" });
const shouldDebounce = (msg: WebInboundMsg) => {
if (msg.mediaPath || msg.mediaType) {
return false;
try {
while (true) {
if (stopRequested()) {
break;
}
if (msg.location) {
return false;
}
if (msg.replyToId || msg.replyToBody) {
return false;
}
return !hasControlCommand(msg.body, cfg);
};
const listener = await (listenerFactory ?? monitorWebInbox)({
verbose,
accountId: account.accountId,
authDir: account.authDir,
mediaMaxMb: account.mediaMaxMb,
selfChatMode: account.selfChatMode,
sendReadReceipts: account.sendReadReceipts,
debounceMs: inboundDebounceMs,
shouldDebounce,
socketRef,
shouldRetryDisconnect: () =>
keepAlive && !sigintStop && !stopRequested() && !disconnectRetryController.signal.aborted,
disconnectRetryPolicy: reconnectPolicy,
disconnectRetryAbortSignal: disconnectRetryController.signal,
onMessage: async (msg: WebInboundMsg) => {
active.handledMessages += 1;
active.lastInboundAt = Date.now();
statusController.noteInbound(active.lastInboundAt);
await onMessage(msg);
},
});
const connectionId = newConnectionId();
const inboundDebounceMs = resolveInboundDebounceMs({ cfg, channel: "whatsapp" });
const shouldDebounce = (msg: WebInboundMsg) => {
if (msg.mediaPath || msg.mediaType) {
return false;
}
if (msg.location) {
return false;
}
if (msg.replyToId || msg.replyToBody) {
return false;
}
return !hasControlCommand(msg.body, cfg);
};
statusController.noteConnected();
const connection = await controller.openConnection({
connectionId,
createListener: async ({ sock, connection }) => {
const onMessage = createWebOnMessageHandler({
cfg,
verbose,
connectionId,
maxMediaBytes,
groupHistoryLimit,
groupHistories,
groupMemberNames,
echoTracker,
backgroundTasks: connection.backgroundTasks,
replyResolver: activeReplyResolver,
replyLogger,
baseMentionConfig,
account,
});
// Surface a concise connection event for the next main-session turn/heartbeat.
const { e164: selfE164 } = readWebSelfId(account.authDir);
const connectRoute = resolveAgentRoute({
cfg,
channel: "whatsapp",
accountId: account.accountId,
});
enqueueSystemEvent(`WhatsApp gateway connected${selfE164 ? ` as ${selfE164}` : ""}.`, {
sessionKey: connectRoute.sessionKey,
});
return (await (listenerFactory ?? attachWebInboxToSocket)({
verbose,
accountId: account.accountId,
authDir: account.authDir,
mediaMaxMb: account.mediaMaxMb,
selfChatMode: account.selfChatMode,
sendReadReceipts: account.sendReadReceipts,
debounceMs: inboundDebounceMs,
shouldDebounce,
socketRef: controller.socketRef,
shouldRetryDisconnect: () => !sigintStop && controller.shouldRetryDisconnect(),
disconnectRetryPolicy: reconnectPolicy,
disconnectRetryAbortSignal: controller.getDisconnectRetryAbortSignal(),
onMessage: async (msg: WebInboundMsg) => {
const inboundAt = Date.now();
controller.noteInbound(inboundAt);
statusController.noteInbound(inboundAt);
await onMessage(msg);
},
sock,
})) as ManagedWhatsAppListener;
},
onHeartbeat: (snapshot) => {
const authAgeMs = getWebAuthAgeMs(account.authDir);
const minutesSinceLastMessage = snapshot.lastInboundAt
? Math.floor((Date.now() - snapshot.lastInboundAt) / 60000)
: null;
setActiveWebListener(account.accountId, listener);
const logData = {
connectionId: snapshot.connectionId,
reconnectAttempts: snapshot.reconnectAttempts,
messagesHandled: snapshot.handledMessages,
lastInboundAt: snapshot.lastInboundAt,
authAgeMs,
uptimeMs: snapshot.uptimeMs,
...(minutesSinceLastMessage !== null && minutesSinceLastMessage > 30
? { minutesSinceLastMessage }
: {}),
};
const normalizedAccountId = normalizeReconnectAccountId(account.accountId);
// Reconnect is the transport-ready signal for WhatsApp, so drain eligible
// pending deliveries for this account here instead of hardcoding that
// policy inside the generic queue engine.
void drainPendingDeliveries({
drainKey: `whatsapp:${normalizedAccountId}`,
logLabel: "WhatsApp reconnect drain",
cfg,
log: reconnectLogger,
selectEntry: (entry) => ({
match:
entry.channel === "whatsapp" &&
normalizeReconnectAccountId(entry.accountId) === normalizedAccountId,
// Reconnect changed listener readiness, so these should not sit behind
// the normal backoff window.
bypassBackoff: isNoListenerReconnectError(entry.lastError),
}),
}).catch((err) => {
reconnectLogger.warn(
{ connectionId: active.connectionId, error: String(err) },
"reconnect drain failed",
);
});
active.unregisterUnhandled = registerUnhandledRejectionHandler((reason) => {
if (!isLikelyWhatsAppCryptoError(reason)) {
return false;
}
const errorStr = formatError(reason);
reconnectLogger.warn(
{ connectionId: active.connectionId, error: errorStr },
"web reconnect: unhandled rejection from WhatsApp socket; forcing reconnect",
);
listener.signalClose?.({
status: 499,
isLoggedOut: false,
error: reason,
if (minutesSinceLastMessage && minutesSinceLastMessage > 30) {
heartbeatLogger.warn(logData, "⚠️ web gateway heartbeat - no messages in 30+ minutes");
} else {
heartbeatLogger.info(logData, "web gateway heartbeat");
}
},
onWatchdogTimeout: (snapshot) => {
const watchdogBaselineAt = snapshot.lastInboundAt ?? snapshot.startedAt;
const minutesSinceLastMessage = Math.floor((Date.now() - watchdogBaselineAt) / 60000);
statusController.noteWatchdogStale();
heartbeatLogger.warn(
{
connectionId: snapshot.connectionId,
minutesSinceLastMessage,
lastInboundAt: snapshot.lastInboundAt ? new Date(snapshot.lastInboundAt) : null,
messagesHandled: snapshot.handledMessages,
},
"Message timeout detected - forcing reconnect",
);
whatsappHeartbeatLog.warn(
`No messages received in ${minutesSinceLastMessage}m - restarting connection`,
);
},
});
return true;
});
const closeListener = async () => {
socketRef.current = null;
setActiveWebListener(account.accountId, null);
if (active.unregisterUnhandled) {
active.unregisterUnhandled();
active.unregisterUnhandled = null;
}
if (active.heartbeat) {
clearInterval(active.heartbeat);
}
if (active.watchdogTimer) {
clearInterval(active.watchdogTimer);
}
if (active.backgroundTasks.size > 0) {
await Promise.allSettled(active.backgroundTasks);
active.backgroundTasks.clear();
}
try {
await listener.close();
} catch (err) {
logVerbose(`Socket close failed: ${formatError(err)}`);
}
};
statusController.noteConnected();
controller.setUnhandledRejectionCleanup(
registerUnhandledRejectionHandler((reason) => {
if (!isLikelyWhatsAppCryptoError(reason)) {
return false;
}
const errorStr = formatError(reason);
reconnectLogger.warn(
{ connectionId: connection.connectionId, error: errorStr },
"web reconnect: unhandled rejection from WhatsApp socket; forcing reconnect",
);
controller.forceClose({
status: 499,
isLoggedOut: false,
error: reason,
});
return true;
}),
);
if (keepAlive) {
active.heartbeat = setInterval(() => {
const authAgeMs = getWebAuthAgeMs(account.authDir);
const minutesSinceLastMessage = active.lastInboundAt
? Math.floor((Date.now() - active.lastInboundAt) / 60000)
: null;
const { e164: selfE164 } = readWebSelfId(account.authDir);
const connectRoute = resolveAgentRoute({
cfg,
channel: "whatsapp",
accountId: account.accountId,
});
enqueueSystemEvent(`WhatsApp gateway connected${selfE164 ? ` as ${selfE164}` : ""}.`, {
sessionKey: connectRoute.sessionKey,
});
const logData = {
connectionId: active.connectionId,
reconnectAttempts,
messagesHandled: active.handledMessages,
lastInboundAt: active.lastInboundAt,
authAgeMs,
uptimeMs: Date.now() - active.startedAt,
...(minutesSinceLastMessage !== null && minutesSinceLastMessage > 30
? { minutesSinceLastMessage }
: {}),
};
const normalizedAccountId = normalizeReconnectAccountId(account.accountId);
void drainPendingDeliveries({
drainKey: `whatsapp:${normalizedAccountId}`,
logLabel: "WhatsApp reconnect drain",
cfg,
log: reconnectLogger,
selectEntry: (entry) => ({
match:
entry.channel === "whatsapp" &&
normalizeReconnectAccountId(entry.accountId) === normalizedAccountId,
bypassBackoff: isNoListenerReconnectError(entry.lastError),
}),
}).catch((err) => {
reconnectLogger.warn(
{ connectionId: connection.connectionId, error: String(err) },
"reconnect drain failed",
);
});
if (minutesSinceLastMessage && minutesSinceLastMessage > 30) {
heartbeatLogger.warn(logData, "⚠️ web gateway heartbeat - no messages in 30+ minutes");
whatsappLog.info("Listening for personal WhatsApp inbound messages.");
if (process.stdout.isTTY || process.stderr.isTTY) {
whatsappLog.raw("Ctrl+C to stop.");
}
if (!keepAlive) {
await controller.shutdown();
return;
}
const reason = await controller.waitForClose();
if (stopRequested() || sigintStop || reason === "aborted") {
await controller.shutdown();
break;
}
const decision = controller.resolveCloseDecision(reason);
if (decision === "aborted") {
await controller.shutdown();
break;
}
statusController.noteReconnectAttempts(controller.getReconnectAttempts());
reconnectLogger.info(
{
connectionId: connection.connectionId,
status: decision.normalized.statusLabel,
loggedOut: decision.normalized.isLoggedOut,
reconnectAttempts: decision.reconnectAttempts,
error: decision.normalized.errorText,
},
"web reconnect: connection closed",
);
enqueueSystemEvent(
`WhatsApp gateway disconnected (status ${decision.normalized.statusLabel})`,
{
sessionKey: connectRoute.sessionKey,
},
);
if (decision.action === "stop") {
statusController.noteClose({
statusCode: decision.normalized.statusCode,
loggedOut: decision.normalized.isLoggedOut,
error: decision.normalized.errorText,
reconnectAttempts: decision.reconnectAttempts,
healthState: decision.healthState,
});
if (decision.healthState === "logged-out") {
runtime.error(
`WhatsApp session logged out. Run \`${formatCliCommand("openclaw channels login --channel web")}\` to relink.`,
);
} else if (decision.healthState === "conflict") {
reconnectLogger.warn(
{
connectionId: connection.connectionId,
status: decision.normalized.statusLabel,
error: decision.normalized.errorText,
},
"web reconnect: non-retryable close status; stopping monitor",
);
runtime.error(
`WhatsApp Web connection closed (status ${decision.normalized.statusLabel}: session conflict). Resolve conflicting WhatsApp Web sessions, then relink with \`${formatCliCommand("openclaw channels login --channel web")}\`. Stopping web monitoring.`,
);
} else {
heartbeatLogger.info(logData, "web gateway heartbeat");
reconnectLogger.warn(
{
connectionId: connection.connectionId,
status: decision.normalized.statusLabel,
reconnectAttempts: decision.reconnectAttempts,
maxAttempts: reconnectPolicy.maxAttempts,
},
"web reconnect: max attempts reached; continuing in degraded mode",
);
runtime.error(
`WhatsApp Web reconnect: max attempts reached (${decision.reconnectAttempts}/${reconnectPolicy.maxAttempts}). Stopping web monitoring.`,
);
}
}, heartbeatSeconds * 1000);
active.watchdogTimer = setInterval(() => {
// A reconnect should get a fresh watchdog window even before the next inbound arrives.
const watchdogBaselineAt = active.lastInboundAt ?? active.startedAt;
const timeSinceLastMessage = Date.now() - watchdogBaselineAt;
if (timeSinceLastMessage <= MESSAGE_TIMEOUT_MS) {
return;
}
const minutesSinceLastMessage = Math.floor(timeSinceLastMessage / 60000);
statusController.noteWatchdogStale();
heartbeatLogger.warn(
{
connectionId: active.connectionId,
minutesSinceLastMessage,
lastInboundAt: active.lastInboundAt ? new Date(active.lastInboundAt) : null,
messagesHandled: active.handledMessages,
},
"Message timeout detected - forcing reconnect",
);
whatsappHeartbeatLog.warn(
`No messages received in ${minutesSinceLastMessage}m - restarting connection`,
);
void closeListener().catch((err) => {
logVerbose(`Close listener failed: ${formatError(err)}`);
});
listener.signalClose?.({
status: 499,
isLoggedOut: false,
error: "watchdog-timeout",
});
}, WATCHDOG_CHECK_MS);
}
await controller.shutdown();
break;
}
whatsappLog.info("Listening for personal WhatsApp inbound messages.");
if (process.stdout.isTTY || process.stderr.isTTY) {
whatsappLog.raw("Ctrl+C to stop.");
}
if (!keepAlive) {
stopDisconnectRetries();
await closeListener();
process.removeListener("SIGINT", handleSigint);
return;
}
const reason = await Promise.race([
listener.onClose?.catch((err) => {
reconnectLogger.error({ error: formatError(err) }, "listener.onClose rejected");
return { status: 500, isLoggedOut: false, error: err };
}) ?? waitForever(),
abortPromise ?? waitForever(),
]);
const uptimeMs = Date.now() - active.startedAt;
if (uptimeMs > heartbeatSeconds * 1000) {
reconnectAttempts = 0; // Healthy stretch; reset the backoff.
}
statusController.noteReconnectAttempts(reconnectAttempts);
if (stopRequested() || sigintStop || reason === "aborted") {
stopDisconnectRetries();
await closeListener();
break;
}
const statusCode =
(typeof reason === "object" && reason && "status" in reason
? (reason as { status?: number }).status
: undefined) ?? "unknown";
const loggedOut =
typeof reason === "object" &&
reason &&
"isLoggedOut" in reason &&
(reason as { isLoggedOut?: boolean }).isLoggedOut;
const errorStr = formatError(reason);
const numericStatusCode = typeof statusCode === "number" ? statusCode : undefined;
reconnectLogger.info(
{
connectionId: active.connectionId,
status: statusCode,
loggedOut,
reconnectAttempts,
error: errorStr,
},
"web reconnect: connection closed",
);
enqueueSystemEvent(`WhatsApp gateway disconnected (status ${statusCode ?? "unknown"})`, {
sessionKey: connectRoute.sessionKey,
});
if (loggedOut) {
stopDisconnectRetries();
statusController.noteClose({
statusCode: numericStatusCode,
loggedOut: true,
error: errorStr,
reconnectAttempts,
healthState: "logged-out",
statusCode: decision.normalized.statusCode,
error: decision.normalized.errorText,
reconnectAttempts: decision.reconnectAttempts,
healthState: decision.healthState,
});
runtime.error(
`WhatsApp session logged out. Run \`${formatCliCommand("openclaw channels login --channel web")}\` to relink.`,
);
await closeListener();
break;
}
if (isNonRetryableWebCloseStatus(statusCode)) {
stopDisconnectRetries();
statusController.noteClose({
statusCode: numericStatusCode,
error: errorStr,
reconnectAttempts,
healthState: "conflict",
});
reconnectLogger.warn(
reconnectLogger.info(
{
connectionId: active.connectionId,
status: statusCode,
error: errorStr,
connectionId: connection.connectionId,
status: decision.normalized.statusLabel,
reconnectAttempts: decision.reconnectAttempts,
maxAttempts: reconnectPolicy.maxAttempts || "unlimited",
delayMs: decision.delayMs,
},
"web reconnect: non-retryable close status; stopping monitor",
"web reconnect: scheduling retry",
);
runtime.error(
`WhatsApp Web connection closed (status ${statusCode}: session conflict). Resolve conflicting WhatsApp Web sessions, then relink with \`${formatCliCommand("openclaw channels login --channel web")}\`. Stopping web monitoring.`,
`WhatsApp Web connection closed (status ${decision.normalized.statusLabel}). Retry ${decision.reconnectAttempts}/${reconnectPolicy.maxAttempts || "∞"} in ${formatDurationPrecise(decision.delayMs ?? 0)}… (${decision.normalized.errorText})`,
);
await closeListener();
break;
}
reconnectAttempts += 1;
if (reconnectPolicy.maxAttempts > 0 && reconnectAttempts >= reconnectPolicy.maxAttempts) {
stopDisconnectRetries();
statusController.noteClose({
statusCode: numericStatusCode,
error: errorStr,
reconnectAttempts,
healthState: "stopped",
});
reconnectLogger.warn(
{
connectionId: active.connectionId,
status: statusCode,
reconnectAttempts,
maxAttempts: reconnectPolicy.maxAttempts,
},
"web reconnect: max attempts reached; continuing in degraded mode",
);
runtime.error(
`WhatsApp Web reconnect: max attempts reached (${reconnectAttempts}/${reconnectPolicy.maxAttempts}). Stopping web monitoring.`,
);
await closeListener();
break;
}
statusController.noteClose({
statusCode: numericStatusCode,
error: errorStr,
reconnectAttempts,
healthState: "reconnecting",
});
const delay = computeBackoff(reconnectPolicy, reconnectAttempts);
reconnectLogger.info(
{
connectionId: active.connectionId,
status: statusCode,
reconnectAttempts,
maxAttempts: reconnectPolicy.maxAttempts || "unlimited",
delayMs: delay,
},
"web reconnect: scheduling retry",
);
runtime.error(
`WhatsApp Web connection closed (status ${statusCode}). Retry ${reconnectAttempts}/${reconnectPolicy.maxAttempts || "∞"} in ${formatDurationPrecise(delay)}… (${errorStr})`,
);
await closeListener();
try {
await sleep(delay, abortSignal);
} catch {
break;
await controller.closeCurrentConnection();
try {
await controller.waitBeforeRetry(decision.delayMs ?? 0);
} catch {
break;
}
}
} finally {
statusController.markStopped();
process.removeListener("SIGINT", handleSigint);
await controller.shutdown();
}
statusController.markStopped();
process.removeListener("SIGINT", handleSigint);
}

View File

@@ -0,0 +1,49 @@
import type { ActiveWebListener } from "./inbound/types.js";
export type WhatsAppConnectionControllerHandle = {
getActiveListener(): ActiveWebListener | null;
};
type ConnectionRegistryState = {
controllers: Map<string, WhatsAppConnectionControllerHandle>;
};
const CONNECTION_REGISTRY_KEY = Symbol.for("openclaw.whatsapp.connectionControllerRegistry");
function getConnectionRegistryState(): ConnectionRegistryState {
const globalState = globalThis as typeof globalThis & {
[CONNECTION_REGISTRY_KEY]?: ConnectionRegistryState;
};
const existing = globalState[CONNECTION_REGISTRY_KEY];
if (existing) {
return existing;
}
const created: ConnectionRegistryState = {
controllers: new Map<string, WhatsAppConnectionControllerHandle>(),
};
globalState[CONNECTION_REGISTRY_KEY] = created;
return created;
}
export function getRegisteredWhatsAppConnectionController(
accountId: string,
): WhatsAppConnectionControllerHandle | null {
return getConnectionRegistryState().controllers.get(accountId) ?? null;
}
export function registerWhatsAppConnectionController(
accountId: string,
controller: WhatsAppConnectionControllerHandle,
): void {
getConnectionRegistryState().controllers.set(accountId, controller);
}
export function unregisterWhatsAppConnectionController(
accountId: string,
controller: WhatsAppConnectionControllerHandle,
): void {
const controllers = getConnectionRegistryState().controllers;
if (controllers.get(accountId) === controller) {
controllers.delete(accountId);
}
}

View File

@@ -0,0 +1,135 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { getRegisteredWhatsAppConnectionController } from "./connection-controller-registry.js";
import { WhatsAppConnectionController } from "./connection-controller.js";
import { createWaSocket, waitForWaConnection } from "./session.js";
vi.mock("./session.js", async () => {
const actual = await vi.importActual<typeof import("./session.js")>("./session.js");
return {
...actual,
createWaSocket: vi.fn(),
waitForWaConnection: vi.fn(),
};
});
const createWaSocketMock = vi.mocked(createWaSocket);
const waitForWaConnectionMock = vi.mocked(waitForWaConnection);
describe("WhatsAppConnectionController", () => {
let controller: WhatsAppConnectionController;
beforeEach(() => {
vi.clearAllMocks();
controller = new WhatsAppConnectionController({
accountId: "work",
authDir: "/tmp/wa-auth",
verbose: false,
keepAlive: false,
heartbeatSeconds: 30,
messageTimeoutMs: 60_000,
watchdogCheckMs: 5_000,
reconnectPolicy: {
initialMs: 250,
maxMs: 1_000,
factor: 2,
jitter: 0,
maxAttempts: 5,
},
});
});
afterEach(async () => {
await controller.shutdown();
});
it("closes the socket when open fails before listener creation", async () => {
const sock = {
ws: {
close: vi.fn(),
},
};
const createListener = vi.fn();
createWaSocketMock.mockResolvedValueOnce(sock as never);
waitForWaConnectionMock.mockRejectedValueOnce(new Error("handshake failed"));
await expect(
controller.openConnection({
connectionId: "conn-1",
createListener,
}),
).rejects.toThrow("handshake failed");
expect(createListener).not.toHaveBeenCalled();
expect(sock.ws.close).toHaveBeenCalledOnce();
expect(controller.socketRef.current).toBeNull();
expect(controller.getActiveListener()).toBeNull();
});
it("keeps the previous registered controller until a replacement listener is ready", async () => {
const liveController = new WhatsAppConnectionController({
accountId: "work",
authDir: "/tmp/wa-auth",
verbose: false,
keepAlive: false,
heartbeatSeconds: 30,
messageTimeoutMs: 60_000,
watchdogCheckMs: 5_000,
reconnectPolicy: {
initialMs: 250,
maxMs: 1_000,
factor: 2,
jitter: 0,
maxAttempts: 5,
},
});
const liveListener = {
sendMessage: vi.fn(async () => ({ messageId: "live-msg" })),
sendPoll: vi.fn(async () => ({ messageId: "live-poll" })),
sendReaction: vi.fn(async () => {}),
sendComposingTo: vi.fn(async () => {}),
};
createWaSocketMock.mockResolvedValueOnce({ ws: { close: vi.fn() } } as never);
waitForWaConnectionMock.mockResolvedValueOnce(undefined);
await liveController.openConnection({
connectionId: "live-conn",
createListener: async () => liveListener,
});
expect(getRegisteredWhatsAppConnectionController("work")).toBe(liveController);
const replacement = new WhatsAppConnectionController({
accountId: "work",
authDir: "/tmp/wa-auth-2",
verbose: false,
keepAlive: false,
heartbeatSeconds: 30,
messageTimeoutMs: 60_000,
watchdogCheckMs: 5_000,
reconnectPolicy: {
initialMs: 250,
maxMs: 1_000,
factor: 2,
jitter: 0,
maxAttempts: 5,
},
});
try {
createWaSocketMock.mockResolvedValueOnce({ ws: { close: vi.fn() } } as never);
waitForWaConnectionMock.mockRejectedValueOnce(new Error("replacement failed"));
await expect(
replacement.openConnection({
connectionId: "replacement-conn",
createListener: async () => liveListener,
}),
).rejects.toThrow("replacement failed");
expect(getRegisteredWhatsAppConnectionController("work")).toBe(liveController);
} finally {
await replacement.shutdown();
await liveController.shutdown();
}
});
});

View File

@@ -0,0 +1,564 @@
import { DisconnectReason, type WASocket } from "@whiskeysockets/baileys";
import { info } from "openclaw/plugin-sdk/runtime-env";
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
import {
registerWhatsAppConnectionController,
unregisterWhatsAppConnectionController,
} from "./connection-controller-registry.js";
import type { ActiveWebListener, WebListenerCloseReason } from "./inbound/types.js";
import { computeBackoff, sleepWithAbort, type ReconnectPolicy } from "./reconnect.js";
import {
createWaSocket,
formatError,
getStatusCode,
logoutWeb,
waitForCredsSaveQueueWithTimeout,
waitForWaConnection,
} from "./session.js";
const LOGGED_OUT_STATUS = DisconnectReason?.loggedOut ?? 401;
const WHATSAPP_LOGIN_RESTART_MESSAGE =
"WhatsApp asked for a restart after pairing (code 515); waiting for creds to save…";
export const WHATSAPP_LOGGED_OUT_RELINK_MESSAGE =
"WhatsApp reported the session is logged out. Cleared cached web session; please rerun openclaw channels login and scan the QR again.";
export const WHATSAPP_LOGGED_OUT_QR_MESSAGE =
"WhatsApp reported the session is logged out. Cleared cached web session; please scan a new QR.";
type TimerHandle = ReturnType<typeof setInterval>;
type WaSocket = Awaited<ReturnType<typeof createWaSocket>>;
export type ManagedWhatsAppListener = ActiveWebListener & {
close?: () => Promise<void>;
onClose?: Promise<WebListenerCloseReason>;
signalClose?: (reason?: WebListenerCloseReason) => void;
};
export type WhatsAppLiveConnection = {
connectionId: string;
startedAt: number;
sock: WASocket;
listener: ManagedWhatsAppListener;
heartbeat: TimerHandle | null;
watchdogTimer: TimerHandle | null;
lastInboundAt: number | null;
handledMessages: number;
unregisterUnhandled: (() => void) | null;
backgroundTasks: Set<Promise<unknown>>;
closePromise: Promise<WebListenerCloseReason>;
resolveClose: (reason: WebListenerCloseReason) => void;
};
export type WhatsAppConnectionSnapshot = {
connectionId: string;
startedAt: number;
lastInboundAt: number | null;
handledMessages: number;
reconnectAttempts: number;
uptimeMs: number;
};
export type NormalizedConnectionCloseReason = {
statusCode?: number;
statusLabel: number | "unknown";
isLoggedOut: boolean;
error?: unknown;
errorText: string;
};
export type WhatsAppConnectionCloseDecision = {
action: "stop" | "retry";
delayMs?: number;
reconnectAttempts: number;
healthState: "logged-out" | "conflict" | "stopped" | "reconnecting";
normalized: NormalizedConnectionCloseReason;
};
function createNeverResolvePromise<T>(): Promise<T> {
return new Promise<T>(() => {});
}
function createLiveConnection(params: {
connectionId: string;
sock: WASocket;
listener: ManagedWhatsAppListener;
}): WhatsAppLiveConnection {
let closeResolved = false;
let resolveClosePromise = (_reason: WebListenerCloseReason) => {};
const closePromise = new Promise<WebListenerCloseReason>((resolve) => {
resolveClosePromise = (reason: WebListenerCloseReason) => {
if (closeResolved) {
return;
}
closeResolved = true;
resolve(reason);
};
});
return {
connectionId: params.connectionId,
startedAt: Date.now(),
sock: params.sock,
listener: params.listener,
heartbeat: null,
watchdogTimer: null,
lastInboundAt: null,
handledMessages: 0,
unregisterUnhandled: null,
backgroundTasks: new Set<Promise<unknown>>(),
closePromise,
resolveClose: resolveClosePromise,
};
}
export function closeWaSocket(sock: { ws?: { close?: () => void } } | null | undefined): void {
try {
sock?.ws?.close?.();
} catch {
// ignore best-effort shutdown failures
}
}
export function closeWaSocketSoon(
sock: { ws?: { close?: () => void } } | null | undefined,
delayMs = 500,
): void {
setTimeout(() => {
closeWaSocket(sock);
}, delayMs);
}
export type WhatsAppLoginWaitResult =
| {
outcome: "connected";
restarted: boolean;
sock: WaSocket;
}
| {
outcome: "logged-out";
message: string;
statusCode: number;
error: unknown;
}
| {
outcome: "failed";
message: string;
statusCode?: number;
error: unknown;
};
export async function waitForWhatsAppLoginResult(params: {
sock: WaSocket;
authDir: string;
isLegacyAuthDir: boolean;
verbose: boolean;
runtime: RuntimeEnv;
waitForConnection?: typeof waitForWaConnection;
createSocket?: typeof createWaSocket;
onSocketReplaced?: (sock: WaSocket) => void;
}): Promise<WhatsAppLoginWaitResult> {
const wait = params.waitForConnection ?? waitForWaConnection;
const createSocket = params.createSocket ?? createWaSocket;
let currentSock = params.sock;
let restarted = false;
while (true) {
try {
await wait(currentSock);
return {
outcome: "connected",
restarted,
sock: currentSock,
};
} catch (err) {
const statusCode = getStatusCode(err);
if (statusCode === 515 && !restarted) {
restarted = true;
params.runtime.log(info(WHATSAPP_LOGIN_RESTART_MESSAGE));
closeWaSocket(currentSock);
await waitForCredsSaveQueueWithTimeout(params.authDir);
try {
currentSock = await createSocket(false, params.verbose, {
authDir: params.authDir,
});
params.onSocketReplaced?.(currentSock);
continue;
} catch (createErr) {
return {
outcome: "failed",
message: formatError(createErr),
statusCode: getStatusCode(createErr),
error: createErr,
};
}
}
if (statusCode === LOGGED_OUT_STATUS) {
await logoutWeb({
authDir: params.authDir,
isLegacyAuthDir: params.isLegacyAuthDir,
runtime: params.runtime,
});
return {
outcome: "logged-out",
message: WHATSAPP_LOGGED_OUT_RELINK_MESSAGE,
statusCode: LOGGED_OUT_STATUS,
error: err,
};
}
return {
outcome: "failed",
message: formatError(err),
statusCode,
error: err,
};
}
}
}
export class WhatsAppConnectionController {
readonly accountId: string;
readonly authDir: string;
readonly socketRef: { current: WASocket | null };
private readonly reconnectPolicy: ReconnectPolicy;
private readonly heartbeatSeconds: number;
private readonly keepAlive: boolean;
private readonly messageTimeoutMs: number;
private readonly watchdogCheckMs: number;
private readonly verbose: boolean;
private readonly abortSignal?: AbortSignal;
private readonly sleep: (ms: number, signal?: AbortSignal) => Promise<void>;
private readonly isNonRetryableStatus: (statusCode: unknown) => boolean;
private readonly abortPromise?: Promise<"aborted">;
private readonly disconnectRetryController = new AbortController();
private current: WhatsAppLiveConnection | null = null;
private reconnectAttempts = 0;
constructor(params: {
accountId: string;
authDir: string;
verbose: boolean;
keepAlive: boolean;
heartbeatSeconds: number;
messageTimeoutMs: number;
watchdogCheckMs: number;
reconnectPolicy: ReconnectPolicy;
abortSignal?: AbortSignal;
sleep?: (ms: number, signal?: AbortSignal) => Promise<void>;
isNonRetryableStatus?: (statusCode: unknown) => boolean;
}) {
this.accountId = params.accountId;
this.authDir = params.authDir;
this.verbose = params.verbose;
this.keepAlive = params.keepAlive;
this.heartbeatSeconds = params.heartbeatSeconds;
this.messageTimeoutMs = params.messageTimeoutMs;
this.watchdogCheckMs = params.watchdogCheckMs;
this.reconnectPolicy = params.reconnectPolicy;
this.abortSignal = params.abortSignal;
this.sleep = params.sleep ?? ((ms: number, signal?: AbortSignal) => sleepWithAbort(ms, signal));
this.isNonRetryableStatus = params.isNonRetryableStatus ?? (() => false);
this.socketRef = { current: null };
this.abortPromise =
params.abortSignal &&
new Promise<"aborted">((resolve) => {
params.abortSignal?.addEventListener("abort", () => resolve("aborted"), { once: true });
});
if (params.abortSignal?.aborted) {
this.stopDisconnectRetries();
} else {
params.abortSignal?.addEventListener("abort", () => this.stopDisconnectRetries(), {
once: true,
});
}
}
getActiveListener(): ActiveWebListener | null {
return this.current?.listener ?? null;
}
getReconnectAttempts(): number {
return this.reconnectAttempts;
}
isStopRequested(): boolean {
return this.abortSignal?.aborted === true;
}
shouldRetryDisconnect(): boolean {
return (
this.keepAlive && !this.isStopRequested() && !this.disconnectRetryController.signal.aborted
);
}
getDisconnectRetryAbortSignal(): AbortSignal {
return this.disconnectRetryController.signal;
}
noteInbound(timestamp = Date.now()): void {
if (!this.current) {
return;
}
this.current.handledMessages += 1;
this.current.lastInboundAt = timestamp;
}
getCurrentSnapshot(
connection: WhatsAppLiveConnection | null = this.current,
): WhatsAppConnectionSnapshot | null {
if (!connection) {
return null;
}
return {
connectionId: connection.connectionId,
startedAt: connection.startedAt,
lastInboundAt: connection.lastInboundAt,
handledMessages: connection.handledMessages,
reconnectAttempts: this.reconnectAttempts,
uptimeMs: Date.now() - connection.startedAt,
};
}
setUnhandledRejectionCleanup(unregister: (() => void) | null): void {
if (!this.current) {
unregister?.();
return;
}
this.current.unregisterUnhandled?.();
this.current.unregisterUnhandled = unregister;
}
async openConnection(params: {
connectionId: string;
createListener: (context: {
sock: WASocket;
connection: WhatsAppLiveConnection;
}) => Promise<ManagedWhatsAppListener>;
onHeartbeat?: (snapshot: WhatsAppConnectionSnapshot) => void;
onWatchdogTimeout?: (snapshot: WhatsAppConnectionSnapshot) => void;
}): Promise<WhatsAppLiveConnection> {
if (this.current) {
await this.closeCurrentConnection();
}
let sock: WaSocket | null = null;
let connection: WhatsAppLiveConnection | null = null;
try {
sock = await createWaSocket(false, this.verbose, {
authDir: this.authDir,
});
await waitForWaConnection(sock);
this.socketRef.current = sock;
const placeholderListener = {} as ManagedWhatsAppListener;
connection = createLiveConnection({
connectionId: params.connectionId,
sock,
listener: placeholderListener,
});
const listener = await params.createListener({ sock, connection });
connection.listener = listener;
this.current = connection;
registerWhatsAppConnectionController(this.accountId, this);
this.startTimers(connection, {
onHeartbeat: params.onHeartbeat,
onWatchdogTimeout: params.onWatchdogTimeout,
});
return connection;
} catch (err) {
if (this.socketRef.current === sock) {
this.socketRef.current = null;
}
closeWaSocket(sock);
if (connection?.unregisterUnhandled) {
connection.unregisterUnhandled();
}
throw err;
}
}
async waitForClose(): Promise<WebListenerCloseReason | "aborted"> {
const connection = this.current;
if (!connection) {
return "aborted";
}
const listenerClose =
connection.listener.onClose?.catch((err) => ({
status: 500,
isLoggedOut: false,
error: err,
})) ?? createNeverResolvePromise<WebListenerCloseReason>();
return await Promise.race([
connection.closePromise,
listenerClose,
this.abortPromise ?? createNeverResolvePromise<"aborted">(),
]);
}
normalizeCloseReason(reason: WebListenerCloseReason): NormalizedConnectionCloseReason {
const statusCode =
(typeof reason === "object" && reason && "status" in reason
? (reason as { status?: number }).status
: undefined) ?? undefined;
return {
statusCode,
statusLabel: typeof statusCode === "number" ? statusCode : "unknown",
isLoggedOut:
typeof reason === "object" &&
reason !== null &&
"isLoggedOut" in reason &&
(reason as { isLoggedOut?: boolean }).isLoggedOut === true,
error: reason?.error,
errorText: formatError(reason),
};
}
resolveCloseDecision(
reason: WebListenerCloseReason | "aborted",
): WhatsAppConnectionCloseDecision | "aborted" {
if (reason === "aborted" || this.isStopRequested()) {
return "aborted";
}
const current = this.current;
if (current && Date.now() - current.startedAt > this.heartbeatSeconds * 1000) {
this.reconnectAttempts = 0;
}
const normalized = this.normalizeCloseReason(reason);
if (normalized.isLoggedOut) {
return {
action: "stop",
reconnectAttempts: this.reconnectAttempts,
healthState: "logged-out",
normalized,
};
}
if (this.isNonRetryableStatus(normalized.statusCode)) {
return {
action: "stop",
reconnectAttempts: this.reconnectAttempts,
healthState: "conflict",
normalized,
};
}
this.reconnectAttempts += 1;
if (
this.reconnectPolicy.maxAttempts > 0 &&
this.reconnectAttempts >= this.reconnectPolicy.maxAttempts
) {
return {
action: "stop",
reconnectAttempts: this.reconnectAttempts,
healthState: "stopped",
normalized,
};
}
return {
action: "retry",
delayMs: computeBackoff(this.reconnectPolicy, this.reconnectAttempts),
reconnectAttempts: this.reconnectAttempts,
healthState: "reconnecting",
normalized,
};
}
forceClose(reason: WebListenerCloseReason): void {
const connection = this.current;
if (!connection) {
return;
}
connection.resolveClose(reason);
connection.listener.signalClose?.(reason);
}
async closeCurrentConnection(): Promise<void> {
const connection = this.current;
if (!connection) {
return;
}
this.current = null;
if (this.socketRef.current === connection.sock) {
this.socketRef.current = null;
}
connection.unregisterUnhandled?.();
if (connection.heartbeat) {
clearInterval(connection.heartbeat);
}
if (connection.watchdogTimer) {
clearInterval(connection.watchdogTimer);
}
if (connection.backgroundTasks.size > 0) {
await Promise.allSettled(connection.backgroundTasks);
connection.backgroundTasks.clear();
}
try {
await connection.listener.close?.();
} catch {
// best-effort close
}
closeWaSocket(connection.sock);
}
async waitBeforeRetry(delayMs: number): Promise<void> {
await this.sleep(delayMs, this.abortSignal);
}
async shutdown(): Promise<void> {
this.stopDisconnectRetries();
await this.closeCurrentConnection();
unregisterWhatsAppConnectionController(this.accountId, this);
}
private startTimers(
connection: WhatsAppLiveConnection,
hooks: {
onHeartbeat?: (snapshot: WhatsAppConnectionSnapshot) => void;
onWatchdogTimeout?: (snapshot: WhatsAppConnectionSnapshot) => void;
},
): void {
if (!this.keepAlive) {
return;
}
connection.heartbeat = setInterval(() => {
const snapshot = this.getCurrentSnapshot(connection);
if (!snapshot) {
return;
}
hooks.onHeartbeat?.(snapshot);
}, this.heartbeatSeconds * 1000);
connection.watchdogTimer = setInterval(() => {
const baselineAt = connection.lastInboundAt ?? connection.startedAt;
const staleForMs = Date.now() - baselineAt;
if (staleForMs <= this.messageTimeoutMs) {
return;
}
const snapshot = this.getCurrentSnapshot(connection);
if (!snapshot) {
return;
}
hooks.onWatchdogTimeout?.(snapshot);
this.forceClose({
status: 499,
isLoggedOut: false,
error: "watchdog-timeout",
});
}, this.watchdogCheckMs);
}
private stopDisconnectRetries(): void {
if (!this.disconnectRetryController.signal.aborted) {
this.disconnectRetryController.abort();
}
}
}

View File

@@ -43,7 +43,7 @@ function shouldClearSocketRefAfterSendFailure(err: unknown): boolean {
return /closed|reset|disconnect|no active socket/i.test(formatError(err));
}
export async function monitorWebInbox(options: {
export type MonitorWebInboxOptions = {
verbose: boolean;
accountId: string;
authDir: string;
@@ -71,13 +71,16 @@ export async function monitorWebInbox(options: {
};
/** Abort in-flight reconnect waits when shutdown becomes terminal. */
disconnectRetryAbortSignal?: AbortSignal;
}) {
};
export async function attachWebInboxToSocket(
options: MonitorWebInboxOptions & {
sock: WASocket;
},
) {
const inboundLogger = getChildLogger({ module: "web-inbound" });
const inboundConsoleLog = createSubsystemLogger("gateway/channels/whatsapp").child("inbound");
const sock = await createWaSocket(false, options.verbose, {
authDir: options.authDir,
});
await waitForWaConnection(sock);
const sock = options.sock;
const connectedAtMs = Date.now();
if (options.socketRef) {
options.socketRef.current = sock;
@@ -654,3 +657,14 @@ export async function monitorWebInbox(options: {
...sendApi,
} as const;
}
export async function monitorWebInbox(options: MonitorWebInboxOptions) {
const sock = await createWaSocket(false, options.verbose, {
authDir: options.authDir,
});
await waitForWaConnection(sock);
return attachWebInboxToSocket({
...options,
sock,
});
}

View File

@@ -1,7 +1,7 @@
import type { AnyMessageContent, WAPresence } from "@whiskeysockets/baileys";
import { recordChannelActivity } from "openclaw/plugin-sdk/infra-runtime";
import type { ActiveWebSendOptions } from "../active-listener.js";
import { toWhatsappJid } from "../text-runtime.js";
import type { ActiveWebSendOptions } from "./types.js";
function recordWhatsAppOutbound(accountId: string) {
recordChannelActivity({

View File

@@ -1,5 +1,6 @@
import type { AnyMessageContent } from "@whiskeysockets/baileys";
import type { NormalizedLocation } from "openclaw/plugin-sdk/channel-inbound";
import type { PollInput } from "openclaw/plugin-sdk/media-runtime";
import type { WhatsAppIdentity, WhatsAppReplyContext, WhatsAppSelfIdentity } from "../identity.js";
export type WebListenerCloseReason = {
@@ -8,6 +9,32 @@ export type WebListenerCloseReason = {
error?: unknown;
};
export type ActiveWebSendOptions = {
gifPlayback?: boolean;
accountId?: string;
fileName?: string;
};
export type ActiveWebListener = {
sendMessage: (
to: string,
text: string,
mediaBuffer?: Buffer,
mediaType?: string,
options?: ActiveWebSendOptions,
) => Promise<{ messageId: string }>;
sendPoll: (to: string, poll: PollInput) => Promise<{ messageId: string }>;
sendReaction: (
chatJid: string,
messageId: string,
emoji: string,
fromMe: boolean,
participant?: string,
) => Promise<void>;
sendComposingTo: (to: string) => Promise<void>;
close?: () => Promise<void>;
};
export type WebInboundMessage = {
id?: string;
from: string; // conversation id: E.164 for direct chats, group JID for groups

View File

@@ -91,4 +91,39 @@ describe("login-qr", () => {
expect(createWaSocketMock).toHaveBeenCalledTimes(2);
expect(logoutWebMock).not.toHaveBeenCalled();
});
it("clears auth and reports a relink message when WhatsApp is logged out", async () => {
waitForWaConnectionMock.mockRejectedValueOnce({
output: { statusCode: 401 },
});
const start = await startWebLoginWithQr({ timeoutMs: 5000 });
expect(start.qrDataUrl).toBe("data:image/png;base64,base64");
const result = await waitForWebLogin({ timeoutMs: 5000 });
expect(result).toEqual({
connected: false,
message:
"WhatsApp reported the session is logged out. Cleared cached web session; please scan a new QR.",
});
expect(logoutWebMock).toHaveBeenCalledOnce();
});
it("turns unexpected login cleanup failures into a normal login error", async () => {
waitForWaConnectionMock.mockRejectedValueOnce({
output: { statusCode: 401 },
});
logoutWebMock.mockRejectedValueOnce(new Error("cleanup failed"));
const start = await startWebLoginWithQr({ timeoutMs: 5000 });
expect(start.qrDataUrl).toBe("data:image/png;base64,base64");
const result = await waitForWebLogin({ timeoutMs: 5000 });
expect(result).toEqual({
connected: false,
message: "WhatsApp login failed: cleanup failed",
});
});
});

View File

@@ -1,23 +1,16 @@
import { randomUUID } from "node:crypto";
import { DisconnectReason } from "@whiskeysockets/baileys";
import { loadConfig } from "openclaw/plugin-sdk/config-runtime";
import { danger, info, success } from "openclaw/plugin-sdk/runtime-env";
import { defaultRuntime, type RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
import { logInfo } from "openclaw/plugin-sdk/text-runtime";
import { resolveWhatsAppAccount } from "./accounts.js";
import { renderQrPngBase64 } from "./qr-image.js";
import {
createWaSocket,
formatError,
getStatusCode,
logoutWeb,
readWebSelfId,
waitForCredsSaveQueueWithTimeout,
waitForWaConnection,
webAuthExists,
} from "./session.js";
const LOGGED_OUT_STATUS = DisconnectReason?.loggedOut ?? 401;
closeWaSocket,
waitForWhatsAppLoginResult,
WHATSAPP_LOGGED_OUT_QR_MESSAGE,
} from "./connection-controller.js";
import { renderQrPngBase64 } from "./qr-image.js";
import { createWaSocket, readWebSelfId, webAuthExists } from "./session.js";
type WaSocket = Awaited<ReturnType<typeof createWaSocket>>;
@@ -34,19 +27,15 @@ type ActiveLogin = {
error?: string;
errorStatus?: number;
waitPromise: Promise<void>;
restartAttempted: boolean;
verbose: boolean;
runtime: RuntimeEnv;
};
const ACTIVE_LOGIN_TTL_MS = 3 * 60_000;
const activeLogins = new Map<string, ActiveLogin>();
function closeSocket(sock: WaSocket) {
try {
sock.ws?.close();
} catch {
// ignore
}
closeWaSocket(sock);
}
async function resetActiveLogin(accountId: string, reason?: string) {
@@ -65,50 +54,45 @@ function isLoginFresh(login: ActiveLogin) {
}
function attachLoginWaiter(accountId: string, login: ActiveLogin) {
login.waitPromise = waitForWaConnection(login.sock)
.then(() => {
login.waitPromise = waitForWhatsAppLoginResult({
sock: login.sock,
authDir: login.authDir,
isLegacyAuthDir: login.isLegacyAuthDir,
verbose: login.verbose,
runtime: login.runtime,
onSocketReplaced: (sock) => {
const current = activeLogins.get(accountId);
if (current?.id === login.id) {
current.connected = true;
current.sock = sock;
current.connected = false;
current.error = undefined;
current.errorStatus = undefined;
}
},
})
.then((result) => {
const current = activeLogins.get(accountId);
if (current?.id !== login.id) {
return;
}
if (result.outcome === "connected") {
current.sock = result.sock;
current.connected = true;
return;
}
current.error = result.message;
current.errorStatus = result.statusCode;
})
.catch((err) => {
const current = activeLogins.get(accountId);
if (current?.id !== login.id) {
return;
}
current.error = formatError(err);
current.errorStatus = getStatusCode(err);
current.error = err instanceof Error ? err.message : String(err);
current.errorStatus = undefined;
});
}
async function restartLoginSocket(login: ActiveLogin, runtime: RuntimeEnv) {
if (login.restartAttempted) {
return false;
}
login.restartAttempted = true;
runtime.log(
info("WhatsApp asked for a restart after pairing (code 515); waiting for creds to save…"),
);
closeSocket(login.sock);
await waitForCredsSaveQueueWithTimeout(login.authDir);
try {
const sock = await createWaSocket(false, login.verbose, {
authDir: login.authDir,
});
login.sock = sock;
login.connected = false;
login.error = undefined;
login.errorStatus = undefined;
attachLoginWaiter(login.accountId, login);
return true;
} catch (err) {
login.error = formatError(err);
login.errorStatus = getStatusCode(err);
return false;
}
}
export async function startWebLoginWithQr(
opts: {
verbose?: boolean;
@@ -189,8 +173,8 @@ export async function startWebLoginWithQr(
startedAt: Date.now(),
connected: false,
waitPromise: Promise.resolve(),
restartAttempted: false,
verbose: Boolean(opts.verbose),
runtime,
};
activeLogins.set(account.accountId, login);
if (pendingQr && !login.qr) {
@@ -263,24 +247,12 @@ export async function waitForWebLogin(
}
if (login.error) {
if (login.errorStatus === LOGGED_OUT_STATUS) {
await logoutWeb({
authDir: login.authDir,
isLegacyAuthDir: login.isLegacyAuthDir,
runtime,
});
const message =
"WhatsApp reported the session is logged out. Cleared cached web session; please scan a new QR.";
if (login.errorStatus === 401) {
const message = WHATSAPP_LOGGED_OUT_QR_MESSAGE;
await resetActiveLogin(account.accountId, message);
runtime.log(danger(message));
return { connected: false, message };
}
if (login.errorStatus === 515) {
const restarted = await restartLoginSocket(login, runtime);
if (restarted && isLoginFresh(login)) {
continue;
}
}
const message = `WhatsApp login failed: ${login.error}`;
await resetActiveLogin(account.accountId, message);
runtime.log(danger(message));

View File

@@ -1,20 +1,11 @@
import { DisconnectReason } from "@whiskeysockets/baileys";
import { formatCliCommand } from "openclaw/plugin-sdk/cli-runtime";
import { loadConfig } from "openclaw/plugin-sdk/config-runtime";
import { danger, info, success } from "openclaw/plugin-sdk/runtime-env";
import { danger, success } from "openclaw/plugin-sdk/runtime-env";
import { defaultRuntime, type RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
import { logInfo } from "openclaw/plugin-sdk/text-runtime";
import { resolveWhatsAppAccount } from "./accounts.js";
import {
createWaSocket,
formatError,
getStatusCode,
logoutWeb,
waitForCredsSaveQueueWithTimeout,
waitForWaConnection,
} from "./session.js";
const LOGGED_OUT_STATUS = DisconnectReason?.loggedOut ?? 401;
import { closeWaSocketSoon, waitForWhatsAppLoginResult } from "./connection-controller.js";
import { createWaSocket, waitForWaConnection } from "./session.js";
export async function loginWeb(
verbose: boolean,
@@ -22,63 +13,50 @@ export async function loginWeb(
runtime: RuntimeEnv = defaultRuntime,
accountId?: string,
) {
const wait = waitForConnection ?? waitForWaConnection;
const cfg = loadConfig();
const account = resolveWhatsAppAccount({ cfg, accountId });
const sock = await createWaSocket(true, verbose, {
let sock = await createWaSocket(true, verbose, {
authDir: account.authDir,
});
logInfo("Waiting for WhatsApp connection...", runtime);
try {
await wait(sock);
console.log(success("✅ Linked! Credentials saved for future sends."));
} catch (err) {
const code = getStatusCode(err);
if (code === 515) {
const result = await waitForWhatsAppLoginResult({
sock,
authDir: account.authDir,
isLegacyAuthDir: account.isLegacyAuthDir,
verbose,
runtime,
waitForConnection,
onSocketReplaced: (replacementSock) => {
sock = replacementSock;
},
});
if (result.outcome === "connected") {
console.log(
info("WhatsApp asked for a restart after pairing (code 515); waiting for creds to save…"),
success(
result.restarted
? "✅ Linked after restart; web session ready."
: "✅ Linked! Credentials saved for future sends.",
),
);
try {
sock.ws?.close();
} catch {
// ignore
}
await waitForCredsSaveQueueWithTimeout(account.authDir);
const retry = await createWaSocket(false, verbose, {
authDir: account.authDir,
});
try {
await wait(retry);
console.log(success("✅ Linked after restart; web session ready."));
return;
} finally {
setTimeout(() => retry.ws?.close(), 500);
}
return;
}
if (code === LOGGED_OUT_STATUS) {
await logoutWeb({
authDir: account.authDir,
isLegacyAuthDir: account.isLegacyAuthDir,
runtime,
});
if (result.outcome === "logged-out") {
console.error(
danger(
`WhatsApp reported the session is logged out. Cleared cached web session; please rerun ${formatCliCommand("openclaw channels login")} and scan the QR again.`,
),
);
throw new Error("Session logged out; cache cleared. Re-run login.", { cause: err });
throw new Error("Session logged out; cache cleared. Re-run login.", {
cause: result.error,
});
}
const formatted = formatError(err);
console.error(danger(`WhatsApp Web connection ended before fully opening. ${formatted}`));
throw new Error(formatted, { cause: err });
console.error(danger(`WhatsApp Web connection ended before fully opening. ${result.message}`));
throw new Error(result.message, { cause: result.error });
} finally {
// Let Baileys flush any final events before closing the socket.
setTimeout(() => {
try {
sock.ws?.close();
} catch {
// ignore
}
}, 500);
closeWaSocketSoon(sock);
}
}

View File

@@ -333,6 +333,47 @@ describe("web monitor inbox", () => {
await listener.close();
});
it("flushes pending debounced inbound batches after close", async () => {
vi.useFakeTimers();
try {
const onMessage = vi.fn(async () => undefined);
const { listener, sock } = await startInboxMonitor(onMessage as InboxOnMessage, {
debounceMs: 50,
});
sock.ev.emit(
"messages.upsert",
buildNotifyMessageUpsert({
id: nextMessageId("debounce-close-1"),
remoteJid: "999@s.whatsapp.net",
text: "first",
timestamp: 1_700_000_000,
pushName: "Tester",
}),
);
sock.ev.emit(
"messages.upsert",
buildNotifyMessageUpsert({
id: nextMessageId("debounce-close-2"),
remoteJid: "999@s.whatsapp.net",
text: "second",
timestamp: 1_700_000_001,
pushName: "Tester",
}),
);
await listener.close();
await vi.advanceTimersByTimeAsync(50);
await waitForMessageCalls(onMessage, 1);
expect(onMessage).toHaveBeenCalledWith(
expect.objectContaining({
body: "first\nsecond",
}),
);
} finally {
vi.useRealTimers();
}
});
it("retries timed-out sends on the same socket without clearing the socket ref", async () => {
const onMessage = vi.fn(async () => undefined);
const socketRef = createSocketRef();

View File

@@ -5,18 +5,36 @@ import path from "node:path";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime";
import { redactIdentifier } from "openclaw/plugin-sdk/logging-core";
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import type { ActiveWebListener } from "./inbound/types.js";
const hoisted = vi.hoisted(() => ({
loadOutboundMediaFromUrl: vi.fn(),
controllerListeners: new Map<string, ActiveWebListener>(),
}));
const loadWebMediaMock = vi.fn();
let sendMessageWhatsApp: typeof import("./send.js").sendMessageWhatsApp;
let sendPollWhatsApp: typeof import("./send.js").sendPollWhatsApp;
let sendReactionWhatsApp: typeof import("./send.js").sendReactionWhatsApp;
let setActiveWebListener: typeof import("./active-listener.js").setActiveWebListener;
let resetLogger: typeof import("openclaw/plugin-sdk/runtime-env").resetLogger;
let setLoggerOverride: typeof import("openclaw/plugin-sdk/runtime-env").setLoggerOverride;
vi.mock("./connection-controller-registry.js", async () => {
const actual = await vi.importActual<typeof import("./connection-controller-registry.js")>(
"./connection-controller-registry.js",
);
return {
...actual,
getRegisteredWhatsAppConnectionController: vi.fn((accountId: string) => {
const listener = hoisted.controllerListeners.get(accountId) ?? null;
return listener
? {
getActiveListener: () => listener,
}
: null;
}),
};
});
vi.mock("./outbound-media.runtime.js", async () => {
const actual = await vi.importActual<typeof import("./outbound-media.runtime.js")>(
"./outbound-media.runtime.js",
@@ -35,7 +53,6 @@ describe("web outbound", () => {
beforeAll(async () => {
({ sendMessageWhatsApp, sendPollWhatsApp, sendReactionWhatsApp } = await import("./send.js"));
({ setActiveWebListener } = await import("./active-listener.js"));
({ resetLogger, setLoggerOverride } = await import("openclaw/plugin-sdk/runtime-env"));
});
@@ -61,7 +78,8 @@ describe("web outbound", () => {
hostReadCapability: Boolean(options?.mediaAccess?.readFile ?? options?.mediaReadFile),
}),
);
setActiveWebListener({
hoisted.controllerListeners.clear();
hoisted.controllerListeners.set("default", {
sendComposingTo,
sendMessage,
sendPoll,
@@ -72,8 +90,7 @@ describe("web outbound", () => {
afterEach(() => {
resetLogger();
setLoggerOverride(null);
setActiveWebListener(null);
setActiveWebListener("work", null);
hoisted.controllerListeners.clear();
});
it("sends message via active listener", async () => {
@@ -87,8 +104,8 @@ describe("web outbound", () => {
});
it("uses configured defaultAccount when outbound accountId is omitted", async () => {
setActiveWebListener(null);
setActiveWebListener("work", {
hoisted.controllerListeners.clear();
hoisted.controllerListeners.set("work", {
sendComposingTo,
sendMessage,
sendPoll,
@@ -145,7 +162,7 @@ describe("web outbound", () => {
});
it("throws a helpful error when no active listener exists", async () => {
setActiveWebListener(null);
hoisted.controllerListeners.clear();
await expect(
sendMessageWhatsApp("+1555", "hi", { verbose: false, accountId: "work" }),
).rejects.toThrow(/No active WhatsApp Web listener/);
@@ -259,7 +276,7 @@ describe("web outbound", () => {
});
it("uses account-aware WhatsApp media caps for outbound uploads", async () => {
setActiveWebListener("work", {
hoisted.controllerListeners.set("work", {
sendComposingTo,
sendMessage,
sendPoll,

View File

@@ -1,3 +1,4 @@
import { formatCliCommand } from "openclaw/plugin-sdk/cli-runtime";
import { loadConfig, type OpenClawConfig } from "openclaw/plugin-sdk/config-runtime";
import { resolveMarkdownTableMode } from "openclaw/plugin-sdk/config-runtime";
import { generateSecureUuid } from "openclaw/plugin-sdk/core";
@@ -11,7 +12,8 @@ import {
resolveWhatsAppAccount,
resolveWhatsAppMediaMaxBytes,
} from "./accounts.js";
import { type ActiveWebSendOptions, requireActiveWebListener } from "./active-listener.js";
import { getRegisteredWhatsAppConnectionController } from "./connection-controller-registry.js";
import type { ActiveWebListener, ActiveWebSendOptions } from "./inbound/types.js";
import { loadOutboundMediaFromUrl } from "./outbound-media.runtime.js";
import { markdownToWhatsApp, toWhatsappJid } from "./text-runtime.js";
@@ -28,6 +30,22 @@ function resolveOutboundWhatsAppAccountId(params: {
return resolveDefaultWhatsAppAccountId(params.cfg);
}
function requireOutboundActiveWebListener(params: { cfg: OpenClawConfig; accountId?: string }): {
accountId: string;
listener: ActiveWebListener;
} {
const accountId = resolveOutboundWhatsAppAccountId(params);
const resolvedAccountId = accountId ?? resolveDefaultWhatsAppAccountId(params.cfg);
const listener =
getRegisteredWhatsAppConnectionController(resolvedAccountId)?.getActiveListener() ?? null;
if (!listener) {
throw new Error(
`No active WhatsApp Web listener (account: ${resolvedAccountId}). Start the gateway, then link WhatsApp with: ${formatCliCommand(`openclaw channels login --channel whatsapp --account ${resolvedAccountId}`)}.`,
);
}
return { accountId: resolvedAccountId, listener };
}
export async function sendMessageWhatsApp(
to: string,
body: string,
@@ -60,12 +78,10 @@ export async function sendMessageWhatsApp(
const correlationId = generateSecureUuid();
const startedAt = Date.now();
const cfg = options.cfg ?? loadConfig();
const effectiveAccountId = resolveOutboundWhatsAppAccountId({
const { listener: active, accountId: resolvedAccountId } = requireOutboundActiveWebListener({
cfg,
accountId: options.accountId,
});
const { listener: active, accountId: resolvedAccountId } =
requireActiveWebListener(effectiveAccountId);
const account = resolveWhatsAppAccount({
cfg,
accountId: resolvedAccountId ?? options.accountId,
@@ -158,11 +174,10 @@ export async function sendReactionWhatsApp(
): Promise<void> {
const correlationId = generateSecureUuid();
const cfg = loadConfig();
const effectiveAccountId = resolveOutboundWhatsAppAccountId({
const { listener: active } = requireOutboundActiveWebListener({
cfg,
accountId: options.accountId,
});
const { listener: active } = requireActiveWebListener(effectiveAccountId);
const redactedChatJid = redactIdentifier(chatJid);
const logger = getChildLogger({
module: "web-outbound",
@@ -201,11 +216,10 @@ export async function sendPollWhatsApp(
const correlationId = generateSecureUuid();
const startedAt = Date.now();
const cfg = options.cfg ?? loadConfig();
const effectiveAccountId = resolveOutboundWhatsAppAccountId({
const { listener: active } = requireOutboundActiveWebListener({
cfg,
accountId: options.accountId,
});
const { listener: active } = requireActiveWebListener(effectiveAccountId);
const redactedTo = redactIdentifier(to);
const logger = getChildLogger({
module: "web-outbound",

View File

@@ -11,7 +11,7 @@ type LightModule = {
};
type HeavyModule = {
setActiveWebListener: (
registerControllerForTest: (
accountId: string | null | undefined,
listener: { sendMessage: () => Promise<{ messageId: string }> } | null,
) => void;
@@ -48,23 +48,35 @@ function createBundledWhatsAppRuntimeFixture() {
[bundledDistPluginFile("whatsapp", "light-runtime-api.js")]:
'export { getActiveWebListener } from "../../active-listener.js";\n',
[bundledDistPluginFile("whatsapp", "runtime-api.js")]:
'export { getActiveWebListener, setActiveWebListener } from "../../active-listener.js";\n',
"dist/active-listener.js": [
'const key = Symbol.for("openclaw.whatsapp.activeListenerState");',
'export { registerControllerForTest } from "../../connection-controller-registry.js";\n',
"dist/connection-controller-registry.js": [
'const key = Symbol.for("openclaw.whatsapp.connectionControllerRegistry");',
"const g = globalThis;",
"if (!g[key]) {",
" g[key] = { listeners: new Map(), current: null };",
" g[key] = { controllers: new Map() };",
"}",
"const state = g[key];",
"export function setActiveWebListener(accountIdOrListener, maybeListener) {",
' const accountId = typeof accountIdOrListener === "string" ? accountIdOrListener : "default";',
' const listener = typeof accountIdOrListener === "string" ? (maybeListener ?? null) : (accountIdOrListener ?? null);',
" if (!listener) state.listeners.delete(accountId);",
" else state.listeners.set(accountId, listener);",
' if (accountId === "default") state.current = listener;',
"export function getRegisteredWhatsAppConnectionController(accountId) {",
" return state.controllers.get(accountId) ?? null;",
"}",
"export function registerControllerForTest(accountId, listener) {",
' const id = accountId ?? "default";',
" if (!listener) {",
" state.controllers.delete(id);",
" return;",
" }",
" state.controllers.set(id, {",
" getActiveListener() {",
" return listener;",
" },",
" });",
"}",
"",
].join("\n"),
"dist/active-listener.js": [
'import { getRegisteredWhatsAppConnectionController } from "./connection-controller-registry.js";',
"export function getActiveWebListener(accountId) {",
' return state.listeners.get(accountId ?? "default") ?? null;',
' return getRegisteredWhatsAppConnectionController(accountId ?? "default")?.getActiveListener() ?? null;',
"}",
"",
].join("\n"),
@@ -100,9 +112,9 @@ function expectSharedWhatsAppListenerState(runtimePluginDir: string, accountId:
const { light, heavy } = loadWhatsAppBoundaryModules(runtimePluginDir);
const listener = createListener();
heavy.setActiveWebListener(accountId, listener);
heavy.registerControllerForTest(accountId, listener);
expect(light.getActiveWebListener(accountId)).toBe(listener);
heavy.setActiveWebListener(accountId, null);
heavy.registerControllerForTest(accountId, null);
}
afterEach(() => {