diff --git a/CHANGELOG.md b/CHANGELOG.md index c8984bd64d0..07c1ed098d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,7 @@ Docs: https://docs.openclaw.ai - Memory/QMD: allow channel sessions in the shipped default QMD scope, while still denying groups. - Memory/QMD: stop registering the legacy lowercase root memory file as a separate default collection, so QMD now prefers `MEMORY.md` and the `memory/` tree without duplicate collection-add warnings. - Memory/memory-core: watch the `memory` directory directly and ignore non-markdown churn so nested note changes still sync on macOS + Node 25 environments where recursive `memory/**/*.md` glob watching fails. (#64711) Thanks @jasonxargs-boop and @vincentkoc. +- WhatsApp: centralize per-account connection ownership so reconnects, login recovery, and outbound readiness stay attached to the live socket instead of drifting across monitor and login paths. (#65290) Thanks @mcaxtr and @vincentkoc. ## 2026.4.11 diff --git a/extensions/whatsapp/src/active-listener.test.ts b/extensions/whatsapp/src/active-listener.test.ts index c5c6d103db5..ed79bcdc20d 100644 --- a/extensions/whatsapp/src/active-listener.test.ts +++ b/extensions/whatsapp/src/active-listener.test.ts @@ -1,8 +1,5 @@ import { afterEach, describe, expect, it, vi } from "vitest"; -// Mock loadConfig so the single-arg setActiveWebListener overload resolves -// the configured default account as "work" (matching the regression test). -// All other tests pass explicit accountIds and are unaffected by this mock. vi.mock("openclaw/plugin-sdk/config-runtime", () => ({ loadConfig: () => ({ channels: { whatsapp: { accounts: { work: { enabled: true } }, defaultAccount: "work" } }, @@ -17,14 +14,6 @@ async function importActiveListenerModule(cacheBust: string): Promise { - const mod = await importActiveListenerModule(`cleanup-${Date.now()}`); - mod.setActiveWebListener(null); - mod.setActiveWebListener("work", null); - mod.setActiveWebListener("default", null); -}); - -/** Minimal listener stub */ function makeListener() { return { sendMessage: vi.fn(async () => ({ messageId: "msg-1" })), @@ -34,92 +23,53 @@ function makeListener() { }; } -describe("active WhatsApp listener singleton", () => { - it("shares listeners across duplicate module instances (bundle-fragmentation fix)", async () => { - // Simulates the scenario where two bundled copies of active-listener.ts are loaded - // (e.g. channel-web-*.js calls setActiveWebListener, outbound-*.js calls - // requireActiveWebListener). Without resolveGlobalSingleton they would each hold - // their own Map and the listener would never be found by the outbound path. +afterEach(() => { + vi.doUnmock("./connection-controller-registry.js"); +}); + +describe("active WhatsApp listener view", () => { + it("reads controller-backed state across duplicate module instances", async () => { + const listener = makeListener(); + vi.doMock("./connection-controller-registry.js", () => ({ + getRegisteredWhatsAppConnectionController: (accountId: string) => + accountId === "work" + ? { + getActiveListener: () => listener, + } + : null, + })); + const first = await importActiveListenerModule(`first-${Date.now()}`); const second = await importActiveListenerModule(`second-${Date.now()}`); - const listener = makeListener(); - - first.setActiveWebListener("work", listener); + expect(first.getActiveWebListener("work")).toBe(listener); expect(second.getActiveWebListener("work")).toBe(listener); - expect(second.requireActiveWebListener("work")).toEqual({ - accountId: "work", - listener, - }); }); - it("single-arg overload registers under configured default account, not always 'default'", async () => { - // Regression: setActiveWebListener(listener) used DEFAULT_ACCOUNT_ID ("default") - // even when the configured default account is named "work". This caused - // requireActiveWebListener("work") to throw while the listener was silently - // registered under the wrong key. - const mod = await importActiveListenerModule(`named-account-${Date.now()}`); + it("resolves the configured default account when accountId is omitted", async () => { const listener = makeListener(); + vi.doMock("./connection-controller-registry.js", () => ({ + getRegisteredWhatsAppConnectionController: (accountId: string) => + accountId === "work" + ? { + getActiveListener: () => listener, + } + : null, + })); - // Single-arg call — should resolve accountId from loadConfig() default, which - // vitest config maps to "work" (see mock below). - mod.setActiveWebListener(listener); + const mod = await importActiveListenerModule(`default-${Date.now()}`); - // "work" must be resolvable — previously this threw - expect(mod.requireActiveWebListener("work")).toEqual({ - accountId: "work", - listener, - }); + expect(mod.resolveWebAccountId()).toBe("work"); + expect(mod.getActiveWebListener()).toBe(listener); }); - it("single-arg overload still works when default account is 'default'", async () => { - // Backward-compat: configs that rely on the "default" account name must - // continue to work after the fix. Use single-arg overload with a temporary - // spy that returns "default" as the configured default account. - const configRuntime = await import("openclaw/plugin-sdk/config-runtime"); - const spy = vi.spyOn(configRuntime, "loadConfig").mockReturnValue({ - channels: { - whatsapp: { accounts: { default: { enabled: true } }, defaultAccount: "default" }, - }, - } as ReturnType); + it("returns null when the controller has no active listener for the account", async () => { + vi.doMock("./connection-controller-registry.js", () => ({ + getRegisteredWhatsAppConnectionController: () => null, + })); - try { - const mod = await importActiveListenerModule(`default-account-${Date.now()}`); - const listener = makeListener(); - - // Single-arg call — should resolve to "default" via the spy - mod.setActiveWebListener(listener); - - expect(mod.requireActiveWebListener("default")).toEqual({ - accountId: "default", - listener, - }); - // The legacy no-arg lookup (undefined → "default") must also work - expect(mod.requireActiveWebListener()).toEqual({ - accountId: "default", - listener, - }); - } finally { - spy.mockRestore(); - } - }); - - it("requireActiveWebListener throws a clear error when listener is missing", async () => { const mod = await importActiveListenerModule(`missing-${Date.now()}`); - expect(() => mod.requireActiveWebListener("work")).toThrowError( - /No active WhatsApp Web listener \(account: work\)/, - ); - }); - - it("setActiveWebListener with null removes the listener", async () => { - const mod = await importActiveListenerModule(`remove-${Date.now()}`); - const listener = makeListener(); - - mod.setActiveWebListener("work", listener); - expect(mod.getActiveWebListener("work")).toBe(listener); - - mod.setActiveWebListener("work", null); expect(mod.getActiveWebListener("work")).toBeNull(); }); }); diff --git a/extensions/whatsapp/src/active-listener.ts b/extensions/whatsapp/src/active-listener.ts index c1e68a20c41..8dbf727a40b 100644 --- a/extensions/whatsapp/src/active-listener.ts +++ b/extensions/whatsapp/src/active-listener.ts @@ -1,109 +1,15 @@ -import { formatCliCommand } from "openclaw/plugin-sdk/cli-runtime"; import { loadConfig } from "openclaw/plugin-sdk/config-runtime"; -import type { PollInput } from "openclaw/plugin-sdk/media-runtime"; -import { DEFAULT_ACCOUNT_ID } from "openclaw/plugin-sdk/routing"; import { resolveDefaultWhatsAppAccountId } from "./accounts.js"; +import { getRegisteredWhatsAppConnectionController } from "./connection-controller-registry.js"; +import type { ActiveWebListener, ActiveWebSendOptions } from "./inbound/types.js"; -export type ActiveWebSendOptions = { - gifPlayback?: boolean; - accountId?: string; - fileName?: string; -}; - -export type ActiveWebListener = { - sendMessage: ( - to: string, - text: string, - mediaBuffer?: Buffer, - mediaType?: string, - options?: ActiveWebSendOptions, - ) => Promise<{ messageId: string }>; - sendPoll: (to: string, poll: PollInput) => Promise<{ messageId: string }>; - sendReaction: ( - chatJid: string, - messageId: string, - emoji: string, - fromMe: boolean, - participant?: string, - ) => Promise; - sendComposingTo: (to: string) => Promise; - close?: () => Promise; -}; - -// WhatsApp shares a live Baileys socket between inbound and outbound runtime -// chunks. Keep this on a direct globalThis symbol lookup; the generic -// singleton helper was previously inlined during code-splitting and split the -// listener state back into per-chunk Maps. -const WHATSAPP_ACTIVE_LISTENER_STATE_KEY = Symbol.for("openclaw.whatsapp.activeListenerState"); - -type ActiveListenerState = { - listeners: Map; - current: ActiveWebListener | null; -}; - -const g = globalThis as unknown as Record; -if (!g[WHATSAPP_ACTIVE_LISTENER_STATE_KEY]) { - g[WHATSAPP_ACTIVE_LISTENER_STATE_KEY] = { - listeners: new Map(), - current: null, - }; -} -const state = g[WHATSAPP_ACTIVE_LISTENER_STATE_KEY]; - -function setCurrentListener(listener: ActiveWebListener | null): void { - state.current = listener; -} +export type { ActiveWebListener, ActiveWebSendOptions } from "./inbound/types.js"; export function resolveWebAccountId(accountId?: string | null): string { return (accountId ?? "").trim() || resolveDefaultWhatsAppAccountId(loadConfig()); } -export function requireActiveWebListener(accountId?: string | null): { - accountId: string; - listener: ActiveWebListener; -} { - const id = resolveWebAccountId(accountId); - const listener = state.listeners.get(id) ?? null; - if (!listener) { - throw new Error( - `No active WhatsApp Web listener (account: ${id}). Start the gateway, then link WhatsApp with: ${formatCliCommand(`openclaw channels login --channel whatsapp --account ${id}`)}.`, - ); - } - return { accountId: id, listener }; -} - -export function setActiveWebListener(listener: ActiveWebListener | null): void; -export function setActiveWebListener( - accountId: string | null | undefined, - listener: ActiveWebListener | null, -): void; -export function setActiveWebListener( - accountIdOrListener: string | ActiveWebListener | null | undefined, - maybeListener?: ActiveWebListener | null, -): void { - const { accountId, listener } = - typeof accountIdOrListener === "string" - ? { accountId: accountIdOrListener, listener: maybeListener ?? null } - : { - // Resolve the configured default account name so that callers using the - // single-arg overload register under the right key (e.g. "work"), not - // always under DEFAULT_ACCOUNT_ID ("default"). - accountId: resolveDefaultWhatsAppAccountId(loadConfig()), - listener: accountIdOrListener ?? null, - }; - - const id = resolveWebAccountId(accountId); - if (!listener) { - state.listeners.delete(id); - } else { - state.listeners.set(id, listener); - } - if (id === DEFAULT_ACCOUNT_ID) { - setCurrentListener(listener); - } -} - export function getActiveWebListener(accountId?: string | null): ActiveWebListener | null { const id = resolveWebAccountId(accountId); - return state.listeners.get(id) ?? null; + return getRegisteredWhatsAppConnectionController(id)?.getActiveListener() ?? null; } diff --git a/extensions/whatsapp/src/auto-reply.test-harness.ts b/extensions/whatsapp/src/auto-reply.test-harness.ts index ab57b9ac38f..6f419736720 100644 --- a/extensions/whatsapp/src/auto-reply.test-harness.ts +++ b/extensions/whatsapp/src/auto-reply.test-harness.ts @@ -40,6 +40,22 @@ type WebAutoReplyMonitorHarness = { export const TEST_NET_IP = "93.184.216.34"; +vi.mock("./session.js", async () => { + const actual = await vi.importActual("./session.js"); + return { + ...actual, + createWaSocket: vi.fn(async () => ({ + ev: { + on: vi.fn(), + off: vi.fn(), + }, + ws: { close: vi.fn() }, + user: { id: "123@s.whatsapp.net" }, + })), + waitForWaConnection: vi.fn().mockResolvedValue(undefined), + }; +}); + vi.mock("openclaw/plugin-sdk/agent-runtime", () => ({ abortEmbeddedPiRun: vi.fn().mockReturnValue(false), appendCronStyleCurrentTimeLine: (text: string) => text, diff --git a/extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts b/extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts index cdbcd3080f1..743202f9027 100644 --- a/extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts +++ b/extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts @@ -36,8 +36,12 @@ async function startWatchdogScenario(params: { watchdogCheckMs: 5, }); - await Promise.resolve(); - expect(scripted.getListenerCount()).toBe(1); + await vi.waitFor( + () => { + expect(scripted.getListenerCount()).toBe(1); + }, + { timeout: 250, interval: 2 }, + ); await vi.waitFor( () => { expect(scripted.getOnMessage()).toBeTypeOf("function"); @@ -95,8 +99,12 @@ describe("web auto-reply connection", () => { reconnect: scenario.reconnect, }); - await Promise.resolve(); - expect(scripted.getListenerCount()).toBe(1); + await vi.waitFor( + () => { + expect(scripted.getListenerCount()).toBe(1); + }, + { timeout: 250, interval: 2 }, + ); scripted.resolveClose(0); await vi.waitFor( @@ -130,8 +138,12 @@ describe("web auto-reply connection", () => { reconnect: { initialMs: 10, maxMs: 10, maxAttempts: 3, factor: 1.1 }, }); - await Promise.resolve(); - expect(scripted.getListenerCount()).toBe(1); + await vi.waitFor( + () => { + expect(scripted.getListenerCount()).toBe(1); + }, + { timeout: 250, interval: 2 }, + ); scripted.resolveClose(0, { status: 440, isLoggedOut: false, diff --git a/extensions/whatsapp/src/auto-reply/monitor.ts b/extensions/whatsapp/src/auto-reply/monitor.ts index 1b3cb07d209..801f39ba26c 100644 --- a/extensions/whatsapp/src/auto-reply/monitor.ts +++ b/extensions/whatsapp/src/auto-reply/monitor.ts @@ -1,7 +1,5 @@ -import type { WASocket } from "@whiskeysockets/baileys"; import { resolveInboundDebounceMs } from "openclaw/plugin-sdk/channel-inbound"; import { formatCliCommand } from "openclaw/plugin-sdk/cli-runtime"; -import { waitForever } from "openclaw/plugin-sdk/cli-runtime"; import { hasControlCommand } from "openclaw/plugin-sdk/command-detection"; import { drainPendingDeliveries } from "openclaw/plugin-sdk/infra-runtime"; import { enqueueSystemEvent } from "openclaw/plugin-sdk/infra-runtime"; @@ -16,10 +14,12 @@ import { type RuntimeEnv, } from "openclaw/plugin-sdk/runtime-env"; import { resolveWhatsAppAccount, resolveWhatsAppMediaMaxBytes } from "../accounts.js"; -import { setActiveWebListener } from "../active-listener.js"; -import { monitorWebInbox } from "../inbound.js"; import { - computeBackoff, + WhatsAppConnectionController, + type ManagedWhatsAppListener, +} from "../connection-controller.js"; +import { attachWebInboxToSocket } from "../inbound/monitor.js"; +import { newConnectionId, resolveHeartbeatSeconds, resolveReconnectPolicy, @@ -41,30 +41,6 @@ function isNonRetryableWebCloseStatus(statusCode: unknown): boolean { return statusCode === 440; } -type ActiveConnectionRun = { - connectionId: string; - startedAt: number; - heartbeat: NodeJS.Timeout | null; - watchdogTimer: NodeJS.Timeout | null; - lastInboundAt: number | null; - handledMessages: number; - unregisterUnhandled: (() => void) | null; - backgroundTasks: Set>; -}; - -function createActiveConnectionRun(): ActiveConnectionRun { - return { - connectionId: newConnectionId(), - startedAt: Date.now(), - heartbeat: null, - watchdogTimer: null, - lastInboundAt: null, - handledMessages: 0, - unregisterUnhandled: null, - backgroundTasks: new Set>(), - }; -} - type ReplyResolver = typeof import("./reply-resolver.runtime.js").getReplyFromConfig; let replyResolverRuntimePromise: Promise | null = @@ -85,7 +61,7 @@ function isNoListenerReconnectError(lastError?: string): boolean { export async function monitorWebChannel( verbose: boolean, - listenerFactory: typeof monitorWebInbox | undefined = monitorWebInbox, + listenerFactory: typeof attachWebInboxToSocket | undefined = attachWebInboxToSocket, keepAlive = true, replyResolver?: ReplyResolver, runtime: RuntimeEnv = defaultRuntime, @@ -153,13 +129,6 @@ export async function monitorWebChannel( tuning.sleep ?? ((ms: number, signal?: AbortSignal) => sleepWithAbort(ms, signal ?? abortSignal)); const stopRequested = () => abortSignal?.aborted === true; - const abortPromise = - abortSignal && - new Promise<"aborted">((resolve) => - abortSignal.addEventListener("abort", () => resolve("aborted"), { - once: true, - }), - ); // Avoid noisy MaxListenersExceeded warnings in test environments where // multiple gateway instances may be constructed. @@ -174,378 +143,290 @@ export async function monitorWebChannel( }; process.once("SIGINT", handleSigint); - let reconnectAttempts = 0; - const socketRef: { current: WASocket | null } = { current: null }; - const disconnectRetryController = new AbortController(); - const stopDisconnectRetries = () => { - if (!disconnectRetryController.signal.aborted) { - disconnectRetryController.abort(); - } - }; - if (abortSignal) { - if (abortSignal.aborted) { - stopDisconnectRetries(); - } else { - abortSignal.addEventListener("abort", stopDisconnectRetries, { once: true }); - } - } + const messageTimeoutMs = tuning.messageTimeoutMs ?? 30 * 60 * 1000; + const watchdogCheckMs = tuning.watchdogCheckMs ?? 60 * 1000; + const controller = new WhatsAppConnectionController({ + accountId: account.accountId, + authDir: account.authDir, + verbose, + keepAlive, + heartbeatSeconds, + messageTimeoutMs, + watchdogCheckMs, + reconnectPolicy, + abortSignal, + sleep, + isNonRetryableStatus: isNonRetryableWebCloseStatus, + }); - while (true) { - if (stopRequested()) { - break; - } - - const active = createActiveConnectionRun(); - - // Watchdog to detect stuck message processing (e.g., event emitter died). - // Tuning overrides are test-oriented; production defaults remain unchanged. - const MESSAGE_TIMEOUT_MS = tuning.messageTimeoutMs ?? 30 * 60 * 1000; // 30m default - const WATCHDOG_CHECK_MS = tuning.watchdogCheckMs ?? 60 * 1000; // 1m default - - const onMessage = createWebOnMessageHandler({ - cfg, - verbose, - connectionId: active.connectionId, - maxMediaBytes, - groupHistoryLimit, - groupHistories, - groupMemberNames, - echoTracker, - backgroundTasks: active.backgroundTasks, - replyResolver: activeReplyResolver, - replyLogger, - baseMentionConfig, - account, - }); - - const inboundDebounceMs = resolveInboundDebounceMs({ cfg, channel: "whatsapp" }); - const shouldDebounce = (msg: WebInboundMsg) => { - if (msg.mediaPath || msg.mediaType) { - return false; + try { + while (true) { + if (stopRequested()) { + break; } - if (msg.location) { - return false; - } - if (msg.replyToId || msg.replyToBody) { - return false; - } - return !hasControlCommand(msg.body, cfg); - }; - const listener = await (listenerFactory ?? monitorWebInbox)({ - verbose, - accountId: account.accountId, - authDir: account.authDir, - mediaMaxMb: account.mediaMaxMb, - selfChatMode: account.selfChatMode, - sendReadReceipts: account.sendReadReceipts, - debounceMs: inboundDebounceMs, - shouldDebounce, - socketRef, - shouldRetryDisconnect: () => - keepAlive && !sigintStop && !stopRequested() && !disconnectRetryController.signal.aborted, - disconnectRetryPolicy: reconnectPolicy, - disconnectRetryAbortSignal: disconnectRetryController.signal, - onMessage: async (msg: WebInboundMsg) => { - active.handledMessages += 1; - active.lastInboundAt = Date.now(); - statusController.noteInbound(active.lastInboundAt); - await onMessage(msg); - }, - }); + const connectionId = newConnectionId(); + const inboundDebounceMs = resolveInboundDebounceMs({ cfg, channel: "whatsapp" }); + const shouldDebounce = (msg: WebInboundMsg) => { + if (msg.mediaPath || msg.mediaType) { + return false; + } + if (msg.location) { + return false; + } + if (msg.replyToId || msg.replyToBody) { + return false; + } + return !hasControlCommand(msg.body, cfg); + }; - statusController.noteConnected(); + const connection = await controller.openConnection({ + connectionId, + createListener: async ({ sock, connection }) => { + const onMessage = createWebOnMessageHandler({ + cfg, + verbose, + connectionId, + maxMediaBytes, + groupHistoryLimit, + groupHistories, + groupMemberNames, + echoTracker, + backgroundTasks: connection.backgroundTasks, + replyResolver: activeReplyResolver, + replyLogger, + baseMentionConfig, + account, + }); - // Surface a concise connection event for the next main-session turn/heartbeat. - const { e164: selfE164 } = readWebSelfId(account.authDir); - const connectRoute = resolveAgentRoute({ - cfg, - channel: "whatsapp", - accountId: account.accountId, - }); - enqueueSystemEvent(`WhatsApp gateway connected${selfE164 ? ` as ${selfE164}` : ""}.`, { - sessionKey: connectRoute.sessionKey, - }); + return (await (listenerFactory ?? attachWebInboxToSocket)({ + verbose, + accountId: account.accountId, + authDir: account.authDir, + mediaMaxMb: account.mediaMaxMb, + selfChatMode: account.selfChatMode, + sendReadReceipts: account.sendReadReceipts, + debounceMs: inboundDebounceMs, + shouldDebounce, + socketRef: controller.socketRef, + shouldRetryDisconnect: () => !sigintStop && controller.shouldRetryDisconnect(), + disconnectRetryPolicy: reconnectPolicy, + disconnectRetryAbortSignal: controller.getDisconnectRetryAbortSignal(), + onMessage: async (msg: WebInboundMsg) => { + const inboundAt = Date.now(); + controller.noteInbound(inboundAt); + statusController.noteInbound(inboundAt); + await onMessage(msg); + }, + sock, + })) as ManagedWhatsAppListener; + }, + onHeartbeat: (snapshot) => { + const authAgeMs = getWebAuthAgeMs(account.authDir); + const minutesSinceLastMessage = snapshot.lastInboundAt + ? Math.floor((Date.now() - snapshot.lastInboundAt) / 60000) + : null; - setActiveWebListener(account.accountId, listener); + const logData = { + connectionId: snapshot.connectionId, + reconnectAttempts: snapshot.reconnectAttempts, + messagesHandled: snapshot.handledMessages, + lastInboundAt: snapshot.lastInboundAt, + authAgeMs, + uptimeMs: snapshot.uptimeMs, + ...(minutesSinceLastMessage !== null && minutesSinceLastMessage > 30 + ? { minutesSinceLastMessage } + : {}), + }; - const normalizedAccountId = normalizeReconnectAccountId(account.accountId); - - // Reconnect is the transport-ready signal for WhatsApp, so drain eligible - // pending deliveries for this account here instead of hardcoding that - // policy inside the generic queue engine. - void drainPendingDeliveries({ - drainKey: `whatsapp:${normalizedAccountId}`, - logLabel: "WhatsApp reconnect drain", - cfg, - log: reconnectLogger, - selectEntry: (entry) => ({ - match: - entry.channel === "whatsapp" && - normalizeReconnectAccountId(entry.accountId) === normalizedAccountId, - // Reconnect changed listener readiness, so these should not sit behind - // the normal backoff window. - bypassBackoff: isNoListenerReconnectError(entry.lastError), - }), - }).catch((err) => { - reconnectLogger.warn( - { connectionId: active.connectionId, error: String(err) }, - "reconnect drain failed", - ); - }); - - active.unregisterUnhandled = registerUnhandledRejectionHandler((reason) => { - if (!isLikelyWhatsAppCryptoError(reason)) { - return false; - } - const errorStr = formatError(reason); - reconnectLogger.warn( - { connectionId: active.connectionId, error: errorStr }, - "web reconnect: unhandled rejection from WhatsApp socket; forcing reconnect", - ); - listener.signalClose?.({ - status: 499, - isLoggedOut: false, - error: reason, + if (minutesSinceLastMessage && minutesSinceLastMessage > 30) { + heartbeatLogger.warn(logData, "⚠️ web gateway heartbeat - no messages in 30+ minutes"); + } else { + heartbeatLogger.info(logData, "web gateway heartbeat"); + } + }, + onWatchdogTimeout: (snapshot) => { + const watchdogBaselineAt = snapshot.lastInboundAt ?? snapshot.startedAt; + const minutesSinceLastMessage = Math.floor((Date.now() - watchdogBaselineAt) / 60000); + statusController.noteWatchdogStale(); + heartbeatLogger.warn( + { + connectionId: snapshot.connectionId, + minutesSinceLastMessage, + lastInboundAt: snapshot.lastInboundAt ? new Date(snapshot.lastInboundAt) : null, + messagesHandled: snapshot.handledMessages, + }, + "Message timeout detected - forcing reconnect", + ); + whatsappHeartbeatLog.warn( + `No messages received in ${minutesSinceLastMessage}m - restarting connection`, + ); + }, }); - return true; - }); - const closeListener = async () => { - socketRef.current = null; - setActiveWebListener(account.accountId, null); - if (active.unregisterUnhandled) { - active.unregisterUnhandled(); - active.unregisterUnhandled = null; - } - if (active.heartbeat) { - clearInterval(active.heartbeat); - } - if (active.watchdogTimer) { - clearInterval(active.watchdogTimer); - } - if (active.backgroundTasks.size > 0) { - await Promise.allSettled(active.backgroundTasks); - active.backgroundTasks.clear(); - } - try { - await listener.close(); - } catch (err) { - logVerbose(`Socket close failed: ${formatError(err)}`); - } - }; + statusController.noteConnected(); + controller.setUnhandledRejectionCleanup( + registerUnhandledRejectionHandler((reason) => { + if (!isLikelyWhatsAppCryptoError(reason)) { + return false; + } + const errorStr = formatError(reason); + reconnectLogger.warn( + { connectionId: connection.connectionId, error: errorStr }, + "web reconnect: unhandled rejection from WhatsApp socket; forcing reconnect", + ); + controller.forceClose({ + status: 499, + isLoggedOut: false, + error: reason, + }); + return true; + }), + ); - if (keepAlive) { - active.heartbeat = setInterval(() => { - const authAgeMs = getWebAuthAgeMs(account.authDir); - const minutesSinceLastMessage = active.lastInboundAt - ? Math.floor((Date.now() - active.lastInboundAt) / 60000) - : null; + const { e164: selfE164 } = readWebSelfId(account.authDir); + const connectRoute = resolveAgentRoute({ + cfg, + channel: "whatsapp", + accountId: account.accountId, + }); + enqueueSystemEvent(`WhatsApp gateway connected${selfE164 ? ` as ${selfE164}` : ""}.`, { + sessionKey: connectRoute.sessionKey, + }); - const logData = { - connectionId: active.connectionId, - reconnectAttempts, - messagesHandled: active.handledMessages, - lastInboundAt: active.lastInboundAt, - authAgeMs, - uptimeMs: Date.now() - active.startedAt, - ...(minutesSinceLastMessage !== null && minutesSinceLastMessage > 30 - ? { minutesSinceLastMessage } - : {}), - }; + const normalizedAccountId = normalizeReconnectAccountId(account.accountId); + void drainPendingDeliveries({ + drainKey: `whatsapp:${normalizedAccountId}`, + logLabel: "WhatsApp reconnect drain", + cfg, + log: reconnectLogger, + selectEntry: (entry) => ({ + match: + entry.channel === "whatsapp" && + normalizeReconnectAccountId(entry.accountId) === normalizedAccountId, + bypassBackoff: isNoListenerReconnectError(entry.lastError), + }), + }).catch((err) => { + reconnectLogger.warn( + { connectionId: connection.connectionId, error: String(err) }, + "reconnect drain failed", + ); + }); - if (minutesSinceLastMessage && minutesSinceLastMessage > 30) { - heartbeatLogger.warn(logData, "⚠️ web gateway heartbeat - no messages in 30+ minutes"); + whatsappLog.info("Listening for personal WhatsApp inbound messages."); + if (process.stdout.isTTY || process.stderr.isTTY) { + whatsappLog.raw("Ctrl+C to stop."); + } + + if (!keepAlive) { + await controller.shutdown(); + return; + } + + const reason = await controller.waitForClose(); + if (stopRequested() || sigintStop || reason === "aborted") { + await controller.shutdown(); + break; + } + + const decision = controller.resolveCloseDecision(reason); + if (decision === "aborted") { + await controller.shutdown(); + break; + } + statusController.noteReconnectAttempts(controller.getReconnectAttempts()); + + reconnectLogger.info( + { + connectionId: connection.connectionId, + status: decision.normalized.statusLabel, + loggedOut: decision.normalized.isLoggedOut, + reconnectAttempts: decision.reconnectAttempts, + error: decision.normalized.errorText, + }, + "web reconnect: connection closed", + ); + + enqueueSystemEvent( + `WhatsApp gateway disconnected (status ${decision.normalized.statusLabel})`, + { + sessionKey: connectRoute.sessionKey, + }, + ); + + if (decision.action === "stop") { + statusController.noteClose({ + statusCode: decision.normalized.statusCode, + loggedOut: decision.normalized.isLoggedOut, + error: decision.normalized.errorText, + reconnectAttempts: decision.reconnectAttempts, + healthState: decision.healthState, + }); + + if (decision.healthState === "logged-out") { + runtime.error( + `WhatsApp session logged out. Run \`${formatCliCommand("openclaw channels login --channel web")}\` to relink.`, + ); + } else if (decision.healthState === "conflict") { + reconnectLogger.warn( + { + connectionId: connection.connectionId, + status: decision.normalized.statusLabel, + error: decision.normalized.errorText, + }, + "web reconnect: non-retryable close status; stopping monitor", + ); + runtime.error( + `WhatsApp Web connection closed (status ${decision.normalized.statusLabel}: session conflict). Resolve conflicting WhatsApp Web sessions, then relink with \`${formatCliCommand("openclaw channels login --channel web")}\`. Stopping web monitoring.`, + ); } else { - heartbeatLogger.info(logData, "web gateway heartbeat"); + reconnectLogger.warn( + { + connectionId: connection.connectionId, + status: decision.normalized.statusLabel, + reconnectAttempts: decision.reconnectAttempts, + maxAttempts: reconnectPolicy.maxAttempts, + }, + "web reconnect: max attempts reached; continuing in degraded mode", + ); + runtime.error( + `WhatsApp Web reconnect: max attempts reached (${decision.reconnectAttempts}/${reconnectPolicy.maxAttempts}). Stopping web monitoring.`, + ); } - }, heartbeatSeconds * 1000); - active.watchdogTimer = setInterval(() => { - // A reconnect should get a fresh watchdog window even before the next inbound arrives. - const watchdogBaselineAt = active.lastInboundAt ?? active.startedAt; - const timeSinceLastMessage = Date.now() - watchdogBaselineAt; - if (timeSinceLastMessage <= MESSAGE_TIMEOUT_MS) { - return; - } - const minutesSinceLastMessage = Math.floor(timeSinceLastMessage / 60000); - statusController.noteWatchdogStale(); - heartbeatLogger.warn( - { - connectionId: active.connectionId, - minutesSinceLastMessage, - lastInboundAt: active.lastInboundAt ? new Date(active.lastInboundAt) : null, - messagesHandled: active.handledMessages, - }, - "Message timeout detected - forcing reconnect", - ); - whatsappHeartbeatLog.warn( - `No messages received in ${minutesSinceLastMessage}m - restarting connection`, - ); - void closeListener().catch((err) => { - logVerbose(`Close listener failed: ${formatError(err)}`); - }); - listener.signalClose?.({ - status: 499, - isLoggedOut: false, - error: "watchdog-timeout", - }); - }, WATCHDOG_CHECK_MS); - } + await controller.shutdown(); + break; + } - whatsappLog.info("Listening for personal WhatsApp inbound messages."); - if (process.stdout.isTTY || process.stderr.isTTY) { - whatsappLog.raw("Ctrl+C to stop."); - } - - if (!keepAlive) { - stopDisconnectRetries(); - await closeListener(); - process.removeListener("SIGINT", handleSigint); - return; - } - - const reason = await Promise.race([ - listener.onClose?.catch((err) => { - reconnectLogger.error({ error: formatError(err) }, "listener.onClose rejected"); - return { status: 500, isLoggedOut: false, error: err }; - }) ?? waitForever(), - abortPromise ?? waitForever(), - ]); - - const uptimeMs = Date.now() - active.startedAt; - if (uptimeMs > heartbeatSeconds * 1000) { - reconnectAttempts = 0; // Healthy stretch; reset the backoff. - } - statusController.noteReconnectAttempts(reconnectAttempts); - - if (stopRequested() || sigintStop || reason === "aborted") { - stopDisconnectRetries(); - await closeListener(); - break; - } - - const statusCode = - (typeof reason === "object" && reason && "status" in reason - ? (reason as { status?: number }).status - : undefined) ?? "unknown"; - const loggedOut = - typeof reason === "object" && - reason && - "isLoggedOut" in reason && - (reason as { isLoggedOut?: boolean }).isLoggedOut; - - const errorStr = formatError(reason); - const numericStatusCode = typeof statusCode === "number" ? statusCode : undefined; - - reconnectLogger.info( - { - connectionId: active.connectionId, - status: statusCode, - loggedOut, - reconnectAttempts, - error: errorStr, - }, - "web reconnect: connection closed", - ); - - enqueueSystemEvent(`WhatsApp gateway disconnected (status ${statusCode ?? "unknown"})`, { - sessionKey: connectRoute.sessionKey, - }); - - if (loggedOut) { - stopDisconnectRetries(); statusController.noteClose({ - statusCode: numericStatusCode, - loggedOut: true, - error: errorStr, - reconnectAttempts, - healthState: "logged-out", + statusCode: decision.normalized.statusCode, + error: decision.normalized.errorText, + reconnectAttempts: decision.reconnectAttempts, + healthState: decision.healthState, }); - runtime.error( - `WhatsApp session logged out. Run \`${formatCliCommand("openclaw channels login --channel web")}\` to relink.`, - ); - await closeListener(); - break; - } - - if (isNonRetryableWebCloseStatus(statusCode)) { - stopDisconnectRetries(); - statusController.noteClose({ - statusCode: numericStatusCode, - error: errorStr, - reconnectAttempts, - healthState: "conflict", - }); - reconnectLogger.warn( + reconnectLogger.info( { - connectionId: active.connectionId, - status: statusCode, - error: errorStr, + connectionId: connection.connectionId, + status: decision.normalized.statusLabel, + reconnectAttempts: decision.reconnectAttempts, + maxAttempts: reconnectPolicy.maxAttempts || "unlimited", + delayMs: decision.delayMs, }, - "web reconnect: non-retryable close status; stopping monitor", + "web reconnect: scheduling retry", ); runtime.error( - `WhatsApp Web connection closed (status ${statusCode}: session conflict). Resolve conflicting WhatsApp Web sessions, then relink with \`${formatCliCommand("openclaw channels login --channel web")}\`. Stopping web monitoring.`, + `WhatsApp Web connection closed (status ${decision.normalized.statusLabel}). Retry ${decision.reconnectAttempts}/${reconnectPolicy.maxAttempts || "∞"} in ${formatDurationPrecise(decision.delayMs ?? 0)}… (${decision.normalized.errorText})`, ); - await closeListener(); - break; - } - - reconnectAttempts += 1; - if (reconnectPolicy.maxAttempts > 0 && reconnectAttempts >= reconnectPolicy.maxAttempts) { - stopDisconnectRetries(); - statusController.noteClose({ - statusCode: numericStatusCode, - error: errorStr, - reconnectAttempts, - healthState: "stopped", - }); - reconnectLogger.warn( - { - connectionId: active.connectionId, - status: statusCode, - reconnectAttempts, - maxAttempts: reconnectPolicy.maxAttempts, - }, - "web reconnect: max attempts reached; continuing in degraded mode", - ); - runtime.error( - `WhatsApp Web reconnect: max attempts reached (${reconnectAttempts}/${reconnectPolicy.maxAttempts}). Stopping web monitoring.`, - ); - await closeListener(); - break; - } - - statusController.noteClose({ - statusCode: numericStatusCode, - error: errorStr, - reconnectAttempts, - healthState: "reconnecting", - }); - const delay = computeBackoff(reconnectPolicy, reconnectAttempts); - reconnectLogger.info( - { - connectionId: active.connectionId, - status: statusCode, - reconnectAttempts, - maxAttempts: reconnectPolicy.maxAttempts || "unlimited", - delayMs: delay, - }, - "web reconnect: scheduling retry", - ); - runtime.error( - `WhatsApp Web connection closed (status ${statusCode}). Retry ${reconnectAttempts}/${reconnectPolicy.maxAttempts || "∞"} in ${formatDurationPrecise(delay)}… (${errorStr})`, - ); - await closeListener(); - try { - await sleep(delay, abortSignal); - } catch { - break; + await controller.closeCurrentConnection(); + try { + await controller.waitBeforeRetry(decision.delayMs ?? 0); + } catch { + break; + } } + } finally { + statusController.markStopped(); + process.removeListener("SIGINT", handleSigint); + await controller.shutdown(); } - - statusController.markStopped(); - - process.removeListener("SIGINT", handleSigint); } diff --git a/extensions/whatsapp/src/connection-controller-registry.ts b/extensions/whatsapp/src/connection-controller-registry.ts new file mode 100644 index 00000000000..f1e3bab0bef --- /dev/null +++ b/extensions/whatsapp/src/connection-controller-registry.ts @@ -0,0 +1,49 @@ +import type { ActiveWebListener } from "./inbound/types.js"; + +export type WhatsAppConnectionControllerHandle = { + getActiveListener(): ActiveWebListener | null; +}; + +type ConnectionRegistryState = { + controllers: Map; +}; + +const CONNECTION_REGISTRY_KEY = Symbol.for("openclaw.whatsapp.connectionControllerRegistry"); + +function getConnectionRegistryState(): ConnectionRegistryState { + const globalState = globalThis as typeof globalThis & { + [CONNECTION_REGISTRY_KEY]?: ConnectionRegistryState; + }; + const existing = globalState[CONNECTION_REGISTRY_KEY]; + if (existing) { + return existing; + } + const created: ConnectionRegistryState = { + controllers: new Map(), + }; + globalState[CONNECTION_REGISTRY_KEY] = created; + return created; +} + +export function getRegisteredWhatsAppConnectionController( + accountId: string, +): WhatsAppConnectionControllerHandle | null { + return getConnectionRegistryState().controllers.get(accountId) ?? null; +} + +export function registerWhatsAppConnectionController( + accountId: string, + controller: WhatsAppConnectionControllerHandle, +): void { + getConnectionRegistryState().controllers.set(accountId, controller); +} + +export function unregisterWhatsAppConnectionController( + accountId: string, + controller: WhatsAppConnectionControllerHandle, +): void { + const controllers = getConnectionRegistryState().controllers; + if (controllers.get(accountId) === controller) { + controllers.delete(accountId); + } +} diff --git a/extensions/whatsapp/src/connection-controller.test.ts b/extensions/whatsapp/src/connection-controller.test.ts new file mode 100644 index 00000000000..cf5c4c73767 --- /dev/null +++ b/extensions/whatsapp/src/connection-controller.test.ts @@ -0,0 +1,135 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { getRegisteredWhatsAppConnectionController } from "./connection-controller-registry.js"; +import { WhatsAppConnectionController } from "./connection-controller.js"; +import { createWaSocket, waitForWaConnection } from "./session.js"; + +vi.mock("./session.js", async () => { + const actual = await vi.importActual("./session.js"); + return { + ...actual, + createWaSocket: vi.fn(), + waitForWaConnection: vi.fn(), + }; +}); + +const createWaSocketMock = vi.mocked(createWaSocket); +const waitForWaConnectionMock = vi.mocked(waitForWaConnection); + +describe("WhatsAppConnectionController", () => { + let controller: WhatsAppConnectionController; + + beforeEach(() => { + vi.clearAllMocks(); + controller = new WhatsAppConnectionController({ + accountId: "work", + authDir: "/tmp/wa-auth", + verbose: false, + keepAlive: false, + heartbeatSeconds: 30, + messageTimeoutMs: 60_000, + watchdogCheckMs: 5_000, + reconnectPolicy: { + initialMs: 250, + maxMs: 1_000, + factor: 2, + jitter: 0, + maxAttempts: 5, + }, + }); + }); + + afterEach(async () => { + await controller.shutdown(); + }); + + it("closes the socket when open fails before listener creation", async () => { + const sock = { + ws: { + close: vi.fn(), + }, + }; + const createListener = vi.fn(); + + createWaSocketMock.mockResolvedValueOnce(sock as never); + waitForWaConnectionMock.mockRejectedValueOnce(new Error("handshake failed")); + + await expect( + controller.openConnection({ + connectionId: "conn-1", + createListener, + }), + ).rejects.toThrow("handshake failed"); + + expect(createListener).not.toHaveBeenCalled(); + expect(sock.ws.close).toHaveBeenCalledOnce(); + expect(controller.socketRef.current).toBeNull(); + expect(controller.getActiveListener()).toBeNull(); + }); + + it("keeps the previous registered controller until a replacement listener is ready", async () => { + const liveController = new WhatsAppConnectionController({ + accountId: "work", + authDir: "/tmp/wa-auth", + verbose: false, + keepAlive: false, + heartbeatSeconds: 30, + messageTimeoutMs: 60_000, + watchdogCheckMs: 5_000, + reconnectPolicy: { + initialMs: 250, + maxMs: 1_000, + factor: 2, + jitter: 0, + maxAttempts: 5, + }, + }); + const liveListener = { + sendMessage: vi.fn(async () => ({ messageId: "live-msg" })), + sendPoll: vi.fn(async () => ({ messageId: "live-poll" })), + sendReaction: vi.fn(async () => {}), + sendComposingTo: vi.fn(async () => {}), + }; + createWaSocketMock.mockResolvedValueOnce({ ws: { close: vi.fn() } } as never); + waitForWaConnectionMock.mockResolvedValueOnce(undefined); + await liveController.openConnection({ + connectionId: "live-conn", + createListener: async () => liveListener, + }); + + expect(getRegisteredWhatsAppConnectionController("work")).toBe(liveController); + + const replacement = new WhatsAppConnectionController({ + accountId: "work", + authDir: "/tmp/wa-auth-2", + verbose: false, + keepAlive: false, + heartbeatSeconds: 30, + messageTimeoutMs: 60_000, + watchdogCheckMs: 5_000, + reconnectPolicy: { + initialMs: 250, + maxMs: 1_000, + factor: 2, + jitter: 0, + maxAttempts: 5, + }, + }); + + try { + createWaSocketMock.mockResolvedValueOnce({ ws: { close: vi.fn() } } as never); + waitForWaConnectionMock.mockRejectedValueOnce(new Error("replacement failed")); + + await expect( + replacement.openConnection({ + connectionId: "replacement-conn", + createListener: async () => liveListener, + }), + ).rejects.toThrow("replacement failed"); + + expect(getRegisteredWhatsAppConnectionController("work")).toBe(liveController); + } finally { + await replacement.shutdown(); + await liveController.shutdown(); + } + }); +}); diff --git a/extensions/whatsapp/src/connection-controller.ts b/extensions/whatsapp/src/connection-controller.ts new file mode 100644 index 00000000000..fa37bbf7a95 --- /dev/null +++ b/extensions/whatsapp/src/connection-controller.ts @@ -0,0 +1,564 @@ +import { DisconnectReason, type WASocket } from "@whiskeysockets/baileys"; +import { info } from "openclaw/plugin-sdk/runtime-env"; +import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env"; +import { + registerWhatsAppConnectionController, + unregisterWhatsAppConnectionController, +} from "./connection-controller-registry.js"; +import type { ActiveWebListener, WebListenerCloseReason } from "./inbound/types.js"; +import { computeBackoff, sleepWithAbort, type ReconnectPolicy } from "./reconnect.js"; +import { + createWaSocket, + formatError, + getStatusCode, + logoutWeb, + waitForCredsSaveQueueWithTimeout, + waitForWaConnection, +} from "./session.js"; + +const LOGGED_OUT_STATUS = DisconnectReason?.loggedOut ?? 401; +const WHATSAPP_LOGIN_RESTART_MESSAGE = + "WhatsApp asked for a restart after pairing (code 515); waiting for creds to save…"; +export const WHATSAPP_LOGGED_OUT_RELINK_MESSAGE = + "WhatsApp reported the session is logged out. Cleared cached web session; please rerun openclaw channels login and scan the QR again."; +export const WHATSAPP_LOGGED_OUT_QR_MESSAGE = + "WhatsApp reported the session is logged out. Cleared cached web session; please scan a new QR."; + +type TimerHandle = ReturnType; +type WaSocket = Awaited>; + +export type ManagedWhatsAppListener = ActiveWebListener & { + close?: () => Promise; + onClose?: Promise; + signalClose?: (reason?: WebListenerCloseReason) => void; +}; + +export type WhatsAppLiveConnection = { + connectionId: string; + startedAt: number; + sock: WASocket; + listener: ManagedWhatsAppListener; + heartbeat: TimerHandle | null; + watchdogTimer: TimerHandle | null; + lastInboundAt: number | null; + handledMessages: number; + unregisterUnhandled: (() => void) | null; + backgroundTasks: Set>; + closePromise: Promise; + resolveClose: (reason: WebListenerCloseReason) => void; +}; + +export type WhatsAppConnectionSnapshot = { + connectionId: string; + startedAt: number; + lastInboundAt: number | null; + handledMessages: number; + reconnectAttempts: number; + uptimeMs: number; +}; + +export type NormalizedConnectionCloseReason = { + statusCode?: number; + statusLabel: number | "unknown"; + isLoggedOut: boolean; + error?: unknown; + errorText: string; +}; + +export type WhatsAppConnectionCloseDecision = { + action: "stop" | "retry"; + delayMs?: number; + reconnectAttempts: number; + healthState: "logged-out" | "conflict" | "stopped" | "reconnecting"; + normalized: NormalizedConnectionCloseReason; +}; + +function createNeverResolvePromise(): Promise { + return new Promise(() => {}); +} + +function createLiveConnection(params: { + connectionId: string; + sock: WASocket; + listener: ManagedWhatsAppListener; +}): WhatsAppLiveConnection { + let closeResolved = false; + let resolveClosePromise = (_reason: WebListenerCloseReason) => {}; + const closePromise = new Promise((resolve) => { + resolveClosePromise = (reason: WebListenerCloseReason) => { + if (closeResolved) { + return; + } + closeResolved = true; + resolve(reason); + }; + }); + + return { + connectionId: params.connectionId, + startedAt: Date.now(), + sock: params.sock, + listener: params.listener, + heartbeat: null, + watchdogTimer: null, + lastInboundAt: null, + handledMessages: 0, + unregisterUnhandled: null, + backgroundTasks: new Set>(), + closePromise, + resolveClose: resolveClosePromise, + }; +} + +export function closeWaSocket(sock: { ws?: { close?: () => void } } | null | undefined): void { + try { + sock?.ws?.close?.(); + } catch { + // ignore best-effort shutdown failures + } +} + +export function closeWaSocketSoon( + sock: { ws?: { close?: () => void } } | null | undefined, + delayMs = 500, +): void { + setTimeout(() => { + closeWaSocket(sock); + }, delayMs); +} + +export type WhatsAppLoginWaitResult = + | { + outcome: "connected"; + restarted: boolean; + sock: WaSocket; + } + | { + outcome: "logged-out"; + message: string; + statusCode: number; + error: unknown; + } + | { + outcome: "failed"; + message: string; + statusCode?: number; + error: unknown; + }; + +export async function waitForWhatsAppLoginResult(params: { + sock: WaSocket; + authDir: string; + isLegacyAuthDir: boolean; + verbose: boolean; + runtime: RuntimeEnv; + waitForConnection?: typeof waitForWaConnection; + createSocket?: typeof createWaSocket; + onSocketReplaced?: (sock: WaSocket) => void; +}): Promise { + const wait = params.waitForConnection ?? waitForWaConnection; + const createSocket = params.createSocket ?? createWaSocket; + let currentSock = params.sock; + let restarted = false; + + while (true) { + try { + await wait(currentSock); + return { + outcome: "connected", + restarted, + sock: currentSock, + }; + } catch (err) { + const statusCode = getStatusCode(err); + if (statusCode === 515 && !restarted) { + restarted = true; + params.runtime.log(info(WHATSAPP_LOGIN_RESTART_MESSAGE)); + closeWaSocket(currentSock); + await waitForCredsSaveQueueWithTimeout(params.authDir); + try { + currentSock = await createSocket(false, params.verbose, { + authDir: params.authDir, + }); + params.onSocketReplaced?.(currentSock); + continue; + } catch (createErr) { + return { + outcome: "failed", + message: formatError(createErr), + statusCode: getStatusCode(createErr), + error: createErr, + }; + } + } + + if (statusCode === LOGGED_OUT_STATUS) { + await logoutWeb({ + authDir: params.authDir, + isLegacyAuthDir: params.isLegacyAuthDir, + runtime: params.runtime, + }); + return { + outcome: "logged-out", + message: WHATSAPP_LOGGED_OUT_RELINK_MESSAGE, + statusCode: LOGGED_OUT_STATUS, + error: err, + }; + } + + return { + outcome: "failed", + message: formatError(err), + statusCode, + error: err, + }; + } + } +} + +export class WhatsAppConnectionController { + readonly accountId: string; + readonly authDir: string; + readonly socketRef: { current: WASocket | null }; + + private readonly reconnectPolicy: ReconnectPolicy; + private readonly heartbeatSeconds: number; + private readonly keepAlive: boolean; + private readonly messageTimeoutMs: number; + private readonly watchdogCheckMs: number; + private readonly verbose: boolean; + private readonly abortSignal?: AbortSignal; + private readonly sleep: (ms: number, signal?: AbortSignal) => Promise; + private readonly isNonRetryableStatus: (statusCode: unknown) => boolean; + private readonly abortPromise?: Promise<"aborted">; + private readonly disconnectRetryController = new AbortController(); + + private current: WhatsAppLiveConnection | null = null; + private reconnectAttempts = 0; + + constructor(params: { + accountId: string; + authDir: string; + verbose: boolean; + keepAlive: boolean; + heartbeatSeconds: number; + messageTimeoutMs: number; + watchdogCheckMs: number; + reconnectPolicy: ReconnectPolicy; + abortSignal?: AbortSignal; + sleep?: (ms: number, signal?: AbortSignal) => Promise; + isNonRetryableStatus?: (statusCode: unknown) => boolean; + }) { + this.accountId = params.accountId; + this.authDir = params.authDir; + this.verbose = params.verbose; + this.keepAlive = params.keepAlive; + this.heartbeatSeconds = params.heartbeatSeconds; + this.messageTimeoutMs = params.messageTimeoutMs; + this.watchdogCheckMs = params.watchdogCheckMs; + this.reconnectPolicy = params.reconnectPolicy; + this.abortSignal = params.abortSignal; + this.sleep = params.sleep ?? ((ms: number, signal?: AbortSignal) => sleepWithAbort(ms, signal)); + this.isNonRetryableStatus = params.isNonRetryableStatus ?? (() => false); + this.socketRef = { current: null }; + this.abortPromise = + params.abortSignal && + new Promise<"aborted">((resolve) => { + params.abortSignal?.addEventListener("abort", () => resolve("aborted"), { once: true }); + }); + + if (params.abortSignal?.aborted) { + this.stopDisconnectRetries(); + } else { + params.abortSignal?.addEventListener("abort", () => this.stopDisconnectRetries(), { + once: true, + }); + } + } + + getActiveListener(): ActiveWebListener | null { + return this.current?.listener ?? null; + } + + getReconnectAttempts(): number { + return this.reconnectAttempts; + } + + isStopRequested(): boolean { + return this.abortSignal?.aborted === true; + } + + shouldRetryDisconnect(): boolean { + return ( + this.keepAlive && !this.isStopRequested() && !this.disconnectRetryController.signal.aborted + ); + } + + getDisconnectRetryAbortSignal(): AbortSignal { + return this.disconnectRetryController.signal; + } + + noteInbound(timestamp = Date.now()): void { + if (!this.current) { + return; + } + this.current.handledMessages += 1; + this.current.lastInboundAt = timestamp; + } + + getCurrentSnapshot( + connection: WhatsAppLiveConnection | null = this.current, + ): WhatsAppConnectionSnapshot | null { + if (!connection) { + return null; + } + return { + connectionId: connection.connectionId, + startedAt: connection.startedAt, + lastInboundAt: connection.lastInboundAt, + handledMessages: connection.handledMessages, + reconnectAttempts: this.reconnectAttempts, + uptimeMs: Date.now() - connection.startedAt, + }; + } + + setUnhandledRejectionCleanup(unregister: (() => void) | null): void { + if (!this.current) { + unregister?.(); + return; + } + this.current.unregisterUnhandled?.(); + this.current.unregisterUnhandled = unregister; + } + + async openConnection(params: { + connectionId: string; + createListener: (context: { + sock: WASocket; + connection: WhatsAppLiveConnection; + }) => Promise; + onHeartbeat?: (snapshot: WhatsAppConnectionSnapshot) => void; + onWatchdogTimeout?: (snapshot: WhatsAppConnectionSnapshot) => void; + }): Promise { + if (this.current) { + await this.closeCurrentConnection(); + } + + let sock: WaSocket | null = null; + let connection: WhatsAppLiveConnection | null = null; + try { + sock = await createWaSocket(false, this.verbose, { + authDir: this.authDir, + }); + await waitForWaConnection(sock); + + this.socketRef.current = sock; + const placeholderListener = {} as ManagedWhatsAppListener; + connection = createLiveConnection({ + connectionId: params.connectionId, + sock, + listener: placeholderListener, + }); + const listener = await params.createListener({ sock, connection }); + connection.listener = listener; + this.current = connection; + registerWhatsAppConnectionController(this.accountId, this); + this.startTimers(connection, { + onHeartbeat: params.onHeartbeat, + onWatchdogTimeout: params.onWatchdogTimeout, + }); + return connection; + } catch (err) { + if (this.socketRef.current === sock) { + this.socketRef.current = null; + } + closeWaSocket(sock); + if (connection?.unregisterUnhandled) { + connection.unregisterUnhandled(); + } + throw err; + } + } + + async waitForClose(): Promise { + const connection = this.current; + if (!connection) { + return "aborted"; + } + const listenerClose = + connection.listener.onClose?.catch((err) => ({ + status: 500, + isLoggedOut: false, + error: err, + })) ?? createNeverResolvePromise(); + + return await Promise.race([ + connection.closePromise, + listenerClose, + this.abortPromise ?? createNeverResolvePromise<"aborted">(), + ]); + } + + normalizeCloseReason(reason: WebListenerCloseReason): NormalizedConnectionCloseReason { + const statusCode = + (typeof reason === "object" && reason && "status" in reason + ? (reason as { status?: number }).status + : undefined) ?? undefined; + return { + statusCode, + statusLabel: typeof statusCode === "number" ? statusCode : "unknown", + isLoggedOut: + typeof reason === "object" && + reason !== null && + "isLoggedOut" in reason && + (reason as { isLoggedOut?: boolean }).isLoggedOut === true, + error: reason?.error, + errorText: formatError(reason), + }; + } + + resolveCloseDecision( + reason: WebListenerCloseReason | "aborted", + ): WhatsAppConnectionCloseDecision | "aborted" { + if (reason === "aborted" || this.isStopRequested()) { + return "aborted"; + } + + const current = this.current; + if (current && Date.now() - current.startedAt > this.heartbeatSeconds * 1000) { + this.reconnectAttempts = 0; + } + + const normalized = this.normalizeCloseReason(reason); + if (normalized.isLoggedOut) { + return { + action: "stop", + reconnectAttempts: this.reconnectAttempts, + healthState: "logged-out", + normalized, + }; + } + + if (this.isNonRetryableStatus(normalized.statusCode)) { + return { + action: "stop", + reconnectAttempts: this.reconnectAttempts, + healthState: "conflict", + normalized, + }; + } + + this.reconnectAttempts += 1; + if ( + this.reconnectPolicy.maxAttempts > 0 && + this.reconnectAttempts >= this.reconnectPolicy.maxAttempts + ) { + return { + action: "stop", + reconnectAttempts: this.reconnectAttempts, + healthState: "stopped", + normalized, + }; + } + + return { + action: "retry", + delayMs: computeBackoff(this.reconnectPolicy, this.reconnectAttempts), + reconnectAttempts: this.reconnectAttempts, + healthState: "reconnecting", + normalized, + }; + } + + forceClose(reason: WebListenerCloseReason): void { + const connection = this.current; + if (!connection) { + return; + } + connection.resolveClose(reason); + connection.listener.signalClose?.(reason); + } + + async closeCurrentConnection(): Promise { + const connection = this.current; + if (!connection) { + return; + } + this.current = null; + + if (this.socketRef.current === connection.sock) { + this.socketRef.current = null; + } + connection.unregisterUnhandled?.(); + if (connection.heartbeat) { + clearInterval(connection.heartbeat); + } + if (connection.watchdogTimer) { + clearInterval(connection.watchdogTimer); + } + if (connection.backgroundTasks.size > 0) { + await Promise.allSettled(connection.backgroundTasks); + connection.backgroundTasks.clear(); + } + try { + await connection.listener.close?.(); + } catch { + // best-effort close + } + closeWaSocket(connection.sock); + } + + async waitBeforeRetry(delayMs: number): Promise { + await this.sleep(delayMs, this.abortSignal); + } + + async shutdown(): Promise { + this.stopDisconnectRetries(); + await this.closeCurrentConnection(); + unregisterWhatsAppConnectionController(this.accountId, this); + } + + private startTimers( + connection: WhatsAppLiveConnection, + hooks: { + onHeartbeat?: (snapshot: WhatsAppConnectionSnapshot) => void; + onWatchdogTimeout?: (snapshot: WhatsAppConnectionSnapshot) => void; + }, + ): void { + if (!this.keepAlive) { + return; + } + + connection.heartbeat = setInterval(() => { + const snapshot = this.getCurrentSnapshot(connection); + if (!snapshot) { + return; + } + hooks.onHeartbeat?.(snapshot); + }, this.heartbeatSeconds * 1000); + + connection.watchdogTimer = setInterval(() => { + const baselineAt = connection.lastInboundAt ?? connection.startedAt; + const staleForMs = Date.now() - baselineAt; + if (staleForMs <= this.messageTimeoutMs) { + return; + } + const snapshot = this.getCurrentSnapshot(connection); + if (!snapshot) { + return; + } + hooks.onWatchdogTimeout?.(snapshot); + this.forceClose({ + status: 499, + isLoggedOut: false, + error: "watchdog-timeout", + }); + }, this.watchdogCheckMs); + } + + private stopDisconnectRetries(): void { + if (!this.disconnectRetryController.signal.aborted) { + this.disconnectRetryController.abort(); + } + } +} diff --git a/extensions/whatsapp/src/inbound/monitor.ts b/extensions/whatsapp/src/inbound/monitor.ts index 3696c54ff33..50985db772e 100644 --- a/extensions/whatsapp/src/inbound/monitor.ts +++ b/extensions/whatsapp/src/inbound/monitor.ts @@ -43,7 +43,7 @@ function shouldClearSocketRefAfterSendFailure(err: unknown): boolean { return /closed|reset|disconnect|no active socket/i.test(formatError(err)); } -export async function monitorWebInbox(options: { +export type MonitorWebInboxOptions = { verbose: boolean; accountId: string; authDir: string; @@ -71,13 +71,16 @@ export async function monitorWebInbox(options: { }; /** Abort in-flight reconnect waits when shutdown becomes terminal. */ disconnectRetryAbortSignal?: AbortSignal; -}) { +}; + +export async function attachWebInboxToSocket( + options: MonitorWebInboxOptions & { + sock: WASocket; + }, +) { const inboundLogger = getChildLogger({ module: "web-inbound" }); const inboundConsoleLog = createSubsystemLogger("gateway/channels/whatsapp").child("inbound"); - const sock = await createWaSocket(false, options.verbose, { - authDir: options.authDir, - }); - await waitForWaConnection(sock); + const sock = options.sock; const connectedAtMs = Date.now(); if (options.socketRef) { options.socketRef.current = sock; @@ -654,3 +657,14 @@ export async function monitorWebInbox(options: { ...sendApi, } as const; } + +export async function monitorWebInbox(options: MonitorWebInboxOptions) { + const sock = await createWaSocket(false, options.verbose, { + authDir: options.authDir, + }); + await waitForWaConnection(sock); + return attachWebInboxToSocket({ + ...options, + sock, + }); +} diff --git a/extensions/whatsapp/src/inbound/send-api.ts b/extensions/whatsapp/src/inbound/send-api.ts index d8a55c56c21..2b043c3f266 100644 --- a/extensions/whatsapp/src/inbound/send-api.ts +++ b/extensions/whatsapp/src/inbound/send-api.ts @@ -1,7 +1,7 @@ import type { AnyMessageContent, WAPresence } from "@whiskeysockets/baileys"; import { recordChannelActivity } from "openclaw/plugin-sdk/infra-runtime"; -import type { ActiveWebSendOptions } from "../active-listener.js"; import { toWhatsappJid } from "../text-runtime.js"; +import type { ActiveWebSendOptions } from "./types.js"; function recordWhatsAppOutbound(accountId: string) { recordChannelActivity({ diff --git a/extensions/whatsapp/src/inbound/types.ts b/extensions/whatsapp/src/inbound/types.ts index f252304e442..7e83c6fa5cd 100644 --- a/extensions/whatsapp/src/inbound/types.ts +++ b/extensions/whatsapp/src/inbound/types.ts @@ -1,5 +1,6 @@ import type { AnyMessageContent } from "@whiskeysockets/baileys"; import type { NormalizedLocation } from "openclaw/plugin-sdk/channel-inbound"; +import type { PollInput } from "openclaw/plugin-sdk/media-runtime"; import type { WhatsAppIdentity, WhatsAppReplyContext, WhatsAppSelfIdentity } from "../identity.js"; export type WebListenerCloseReason = { @@ -8,6 +9,32 @@ export type WebListenerCloseReason = { error?: unknown; }; +export type ActiveWebSendOptions = { + gifPlayback?: boolean; + accountId?: string; + fileName?: string; +}; + +export type ActiveWebListener = { + sendMessage: ( + to: string, + text: string, + mediaBuffer?: Buffer, + mediaType?: string, + options?: ActiveWebSendOptions, + ) => Promise<{ messageId: string }>; + sendPoll: (to: string, poll: PollInput) => Promise<{ messageId: string }>; + sendReaction: ( + chatJid: string, + messageId: string, + emoji: string, + fromMe: boolean, + participant?: string, + ) => Promise; + sendComposingTo: (to: string) => Promise; + close?: () => Promise; +}; + export type WebInboundMessage = { id?: string; from: string; // conversation id: E.164 for direct chats, group JID for groups diff --git a/extensions/whatsapp/src/login-qr.test.ts b/extensions/whatsapp/src/login-qr.test.ts index fe6ff19e846..b8d9298ed63 100644 --- a/extensions/whatsapp/src/login-qr.test.ts +++ b/extensions/whatsapp/src/login-qr.test.ts @@ -91,4 +91,39 @@ describe("login-qr", () => { expect(createWaSocketMock).toHaveBeenCalledTimes(2); expect(logoutWebMock).not.toHaveBeenCalled(); }); + + it("clears auth and reports a relink message when WhatsApp is logged out", async () => { + waitForWaConnectionMock.mockRejectedValueOnce({ + output: { statusCode: 401 }, + }); + + const start = await startWebLoginWithQr({ timeoutMs: 5000 }); + expect(start.qrDataUrl).toBe("data:image/png;base64,base64"); + + const result = await waitForWebLogin({ timeoutMs: 5000 }); + + expect(result).toEqual({ + connected: false, + message: + "WhatsApp reported the session is logged out. Cleared cached web session; please scan a new QR.", + }); + expect(logoutWebMock).toHaveBeenCalledOnce(); + }); + + it("turns unexpected login cleanup failures into a normal login error", async () => { + waitForWaConnectionMock.mockRejectedValueOnce({ + output: { statusCode: 401 }, + }); + logoutWebMock.mockRejectedValueOnce(new Error("cleanup failed")); + + const start = await startWebLoginWithQr({ timeoutMs: 5000 }); + expect(start.qrDataUrl).toBe("data:image/png;base64,base64"); + + const result = await waitForWebLogin({ timeoutMs: 5000 }); + + expect(result).toEqual({ + connected: false, + message: "WhatsApp login failed: cleanup failed", + }); + }); }); diff --git a/extensions/whatsapp/src/login-qr.ts b/extensions/whatsapp/src/login-qr.ts index e93390b5479..c5448b278b0 100644 --- a/extensions/whatsapp/src/login-qr.ts +++ b/extensions/whatsapp/src/login-qr.ts @@ -1,23 +1,16 @@ import { randomUUID } from "node:crypto"; -import { DisconnectReason } from "@whiskeysockets/baileys"; import { loadConfig } from "openclaw/plugin-sdk/config-runtime"; import { danger, info, success } from "openclaw/plugin-sdk/runtime-env"; import { defaultRuntime, type RuntimeEnv } from "openclaw/plugin-sdk/runtime-env"; import { logInfo } from "openclaw/plugin-sdk/text-runtime"; import { resolveWhatsAppAccount } from "./accounts.js"; -import { renderQrPngBase64 } from "./qr-image.js"; import { - createWaSocket, - formatError, - getStatusCode, - logoutWeb, - readWebSelfId, - waitForCredsSaveQueueWithTimeout, - waitForWaConnection, - webAuthExists, -} from "./session.js"; - -const LOGGED_OUT_STATUS = DisconnectReason?.loggedOut ?? 401; + closeWaSocket, + waitForWhatsAppLoginResult, + WHATSAPP_LOGGED_OUT_QR_MESSAGE, +} from "./connection-controller.js"; +import { renderQrPngBase64 } from "./qr-image.js"; +import { createWaSocket, readWebSelfId, webAuthExists } from "./session.js"; type WaSocket = Awaited>; @@ -34,19 +27,15 @@ type ActiveLogin = { error?: string; errorStatus?: number; waitPromise: Promise; - restartAttempted: boolean; verbose: boolean; + runtime: RuntimeEnv; }; const ACTIVE_LOGIN_TTL_MS = 3 * 60_000; const activeLogins = new Map(); function closeSocket(sock: WaSocket) { - try { - sock.ws?.close(); - } catch { - // ignore - } + closeWaSocket(sock); } async function resetActiveLogin(accountId: string, reason?: string) { @@ -65,50 +54,45 @@ function isLoginFresh(login: ActiveLogin) { } function attachLoginWaiter(accountId: string, login: ActiveLogin) { - login.waitPromise = waitForWaConnection(login.sock) - .then(() => { + login.waitPromise = waitForWhatsAppLoginResult({ + sock: login.sock, + authDir: login.authDir, + isLegacyAuthDir: login.isLegacyAuthDir, + verbose: login.verbose, + runtime: login.runtime, + onSocketReplaced: (sock) => { const current = activeLogins.get(accountId); if (current?.id === login.id) { - current.connected = true; + current.sock = sock; + current.connected = false; + current.error = undefined; + current.errorStatus = undefined; } + }, + }) + .then((result) => { + const current = activeLogins.get(accountId); + if (current?.id !== login.id) { + return; + } + if (result.outcome === "connected") { + current.sock = result.sock; + current.connected = true; + return; + } + current.error = result.message; + current.errorStatus = result.statusCode; }) .catch((err) => { const current = activeLogins.get(accountId); if (current?.id !== login.id) { return; } - current.error = formatError(err); - current.errorStatus = getStatusCode(err); + current.error = err instanceof Error ? err.message : String(err); + current.errorStatus = undefined; }); } -async function restartLoginSocket(login: ActiveLogin, runtime: RuntimeEnv) { - if (login.restartAttempted) { - return false; - } - login.restartAttempted = true; - runtime.log( - info("WhatsApp asked for a restart after pairing (code 515); waiting for creds to save…"), - ); - closeSocket(login.sock); - await waitForCredsSaveQueueWithTimeout(login.authDir); - try { - const sock = await createWaSocket(false, login.verbose, { - authDir: login.authDir, - }); - login.sock = sock; - login.connected = false; - login.error = undefined; - login.errorStatus = undefined; - attachLoginWaiter(login.accountId, login); - return true; - } catch (err) { - login.error = formatError(err); - login.errorStatus = getStatusCode(err); - return false; - } -} - export async function startWebLoginWithQr( opts: { verbose?: boolean; @@ -189,8 +173,8 @@ export async function startWebLoginWithQr( startedAt: Date.now(), connected: false, waitPromise: Promise.resolve(), - restartAttempted: false, verbose: Boolean(opts.verbose), + runtime, }; activeLogins.set(account.accountId, login); if (pendingQr && !login.qr) { @@ -263,24 +247,12 @@ export async function waitForWebLogin( } if (login.error) { - if (login.errorStatus === LOGGED_OUT_STATUS) { - await logoutWeb({ - authDir: login.authDir, - isLegacyAuthDir: login.isLegacyAuthDir, - runtime, - }); - const message = - "WhatsApp reported the session is logged out. Cleared cached web session; please scan a new QR."; + if (login.errorStatus === 401) { + const message = WHATSAPP_LOGGED_OUT_QR_MESSAGE; await resetActiveLogin(account.accountId, message); runtime.log(danger(message)); return { connected: false, message }; } - if (login.errorStatus === 515) { - const restarted = await restartLoginSocket(login, runtime); - if (restarted && isLoginFresh(login)) { - continue; - } - } const message = `WhatsApp login failed: ${login.error}`; await resetActiveLogin(account.accountId, message); runtime.log(danger(message)); diff --git a/extensions/whatsapp/src/login.ts b/extensions/whatsapp/src/login.ts index 985190405f2..50010f0c411 100644 --- a/extensions/whatsapp/src/login.ts +++ b/extensions/whatsapp/src/login.ts @@ -1,20 +1,11 @@ -import { DisconnectReason } from "@whiskeysockets/baileys"; import { formatCliCommand } from "openclaw/plugin-sdk/cli-runtime"; import { loadConfig } from "openclaw/plugin-sdk/config-runtime"; -import { danger, info, success } from "openclaw/plugin-sdk/runtime-env"; +import { danger, success } from "openclaw/plugin-sdk/runtime-env"; import { defaultRuntime, type RuntimeEnv } from "openclaw/plugin-sdk/runtime-env"; import { logInfo } from "openclaw/plugin-sdk/text-runtime"; import { resolveWhatsAppAccount } from "./accounts.js"; -import { - createWaSocket, - formatError, - getStatusCode, - logoutWeb, - waitForCredsSaveQueueWithTimeout, - waitForWaConnection, -} from "./session.js"; - -const LOGGED_OUT_STATUS = DisconnectReason?.loggedOut ?? 401; +import { closeWaSocketSoon, waitForWhatsAppLoginResult } from "./connection-controller.js"; +import { createWaSocket, waitForWaConnection } from "./session.js"; export async function loginWeb( verbose: boolean, @@ -22,63 +13,50 @@ export async function loginWeb( runtime: RuntimeEnv = defaultRuntime, accountId?: string, ) { - const wait = waitForConnection ?? waitForWaConnection; const cfg = loadConfig(); const account = resolveWhatsAppAccount({ cfg, accountId }); - const sock = await createWaSocket(true, verbose, { + let sock = await createWaSocket(true, verbose, { authDir: account.authDir, }); logInfo("Waiting for WhatsApp connection...", runtime); try { - await wait(sock); - console.log(success("✅ Linked! Credentials saved for future sends.")); - } catch (err) { - const code = getStatusCode(err); - if (code === 515) { + const result = await waitForWhatsAppLoginResult({ + sock, + authDir: account.authDir, + isLegacyAuthDir: account.isLegacyAuthDir, + verbose, + runtime, + waitForConnection, + onSocketReplaced: (replacementSock) => { + sock = replacementSock; + }, + }); + if (result.outcome === "connected") { console.log( - info("WhatsApp asked for a restart after pairing (code 515); waiting for creds to save…"), + success( + result.restarted + ? "✅ Linked after restart; web session ready." + : "✅ Linked! Credentials saved for future sends.", + ), ); - try { - sock.ws?.close(); - } catch { - // ignore - } - await waitForCredsSaveQueueWithTimeout(account.authDir); - const retry = await createWaSocket(false, verbose, { - authDir: account.authDir, - }); - try { - await wait(retry); - console.log(success("✅ Linked after restart; web session ready.")); - return; - } finally { - setTimeout(() => retry.ws?.close(), 500); - } + return; } - if (code === LOGGED_OUT_STATUS) { - await logoutWeb({ - authDir: account.authDir, - isLegacyAuthDir: account.isLegacyAuthDir, - runtime, - }); + + if (result.outcome === "logged-out") { console.error( danger( `WhatsApp reported the session is logged out. Cleared cached web session; please rerun ${formatCliCommand("openclaw channels login")} and scan the QR again.`, ), ); - throw new Error("Session logged out; cache cleared. Re-run login.", { cause: err }); + throw new Error("Session logged out; cache cleared. Re-run login.", { + cause: result.error, + }); } - const formatted = formatError(err); - console.error(danger(`WhatsApp Web connection ended before fully opening. ${formatted}`)); - throw new Error(formatted, { cause: err }); + + console.error(danger(`WhatsApp Web connection ended before fully opening. ${result.message}`)); + throw new Error(result.message, { cause: result.error }); } finally { // Let Baileys flush any final events before closing the socket. - setTimeout(() => { - try { - sock.ws?.close(); - } catch { - // ignore - } - }, 500); + closeWaSocketSoon(sock); } } diff --git a/extensions/whatsapp/src/monitor-inbox.streams-inbound-messages.test.ts b/extensions/whatsapp/src/monitor-inbox.streams-inbound-messages.test.ts index 541c3943739..c39c755c771 100644 --- a/extensions/whatsapp/src/monitor-inbox.streams-inbound-messages.test.ts +++ b/extensions/whatsapp/src/monitor-inbox.streams-inbound-messages.test.ts @@ -333,6 +333,47 @@ describe("web monitor inbox", () => { await listener.close(); }); + it("flushes pending debounced inbound batches after close", async () => { + vi.useFakeTimers(); + try { + const onMessage = vi.fn(async () => undefined); + const { listener, sock } = await startInboxMonitor(onMessage as InboxOnMessage, { + debounceMs: 50, + }); + sock.ev.emit( + "messages.upsert", + buildNotifyMessageUpsert({ + id: nextMessageId("debounce-close-1"), + remoteJid: "999@s.whatsapp.net", + text: "first", + timestamp: 1_700_000_000, + pushName: "Tester", + }), + ); + sock.ev.emit( + "messages.upsert", + buildNotifyMessageUpsert({ + id: nextMessageId("debounce-close-2"), + remoteJid: "999@s.whatsapp.net", + text: "second", + timestamp: 1_700_000_001, + pushName: "Tester", + }), + ); + + await listener.close(); + await vi.advanceTimersByTimeAsync(50); + await waitForMessageCalls(onMessage, 1); + expect(onMessage).toHaveBeenCalledWith( + expect.objectContaining({ + body: "first\nsecond", + }), + ); + } finally { + vi.useRealTimers(); + } + }); + it("retries timed-out sends on the same socket without clearing the socket ref", async () => { const onMessage = vi.fn(async () => undefined); const socketRef = createSocketRef(); diff --git a/extensions/whatsapp/src/send.test.ts b/extensions/whatsapp/src/send.test.ts index c135543d483..df84e6b9ce8 100644 --- a/extensions/whatsapp/src/send.test.ts +++ b/extensions/whatsapp/src/send.test.ts @@ -5,18 +5,36 @@ import path from "node:path"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime"; import { redactIdentifier } from "openclaw/plugin-sdk/logging-core"; import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import type { ActiveWebListener } from "./inbound/types.js"; const hoisted = vi.hoisted(() => ({ loadOutboundMediaFromUrl: vi.fn(), + controllerListeners: new Map(), })); const loadWebMediaMock = vi.fn(); let sendMessageWhatsApp: typeof import("./send.js").sendMessageWhatsApp; let sendPollWhatsApp: typeof import("./send.js").sendPollWhatsApp; let sendReactionWhatsApp: typeof import("./send.js").sendReactionWhatsApp; -let setActiveWebListener: typeof import("./active-listener.js").setActiveWebListener; let resetLogger: typeof import("openclaw/plugin-sdk/runtime-env").resetLogger; let setLoggerOverride: typeof import("openclaw/plugin-sdk/runtime-env").setLoggerOverride; +vi.mock("./connection-controller-registry.js", async () => { + const actual = await vi.importActual( + "./connection-controller-registry.js", + ); + return { + ...actual, + getRegisteredWhatsAppConnectionController: vi.fn((accountId: string) => { + const listener = hoisted.controllerListeners.get(accountId) ?? null; + return listener + ? { + getActiveListener: () => listener, + } + : null; + }), + }; +}); + vi.mock("./outbound-media.runtime.js", async () => { const actual = await vi.importActual( "./outbound-media.runtime.js", @@ -35,7 +53,6 @@ describe("web outbound", () => { beforeAll(async () => { ({ sendMessageWhatsApp, sendPollWhatsApp, sendReactionWhatsApp } = await import("./send.js")); - ({ setActiveWebListener } = await import("./active-listener.js")); ({ resetLogger, setLoggerOverride } = await import("openclaw/plugin-sdk/runtime-env")); }); @@ -61,7 +78,8 @@ describe("web outbound", () => { hostReadCapability: Boolean(options?.mediaAccess?.readFile ?? options?.mediaReadFile), }), ); - setActiveWebListener({ + hoisted.controllerListeners.clear(); + hoisted.controllerListeners.set("default", { sendComposingTo, sendMessage, sendPoll, @@ -72,8 +90,7 @@ describe("web outbound", () => { afterEach(() => { resetLogger(); setLoggerOverride(null); - setActiveWebListener(null); - setActiveWebListener("work", null); + hoisted.controllerListeners.clear(); }); it("sends message via active listener", async () => { @@ -87,8 +104,8 @@ describe("web outbound", () => { }); it("uses configured defaultAccount when outbound accountId is omitted", async () => { - setActiveWebListener(null); - setActiveWebListener("work", { + hoisted.controllerListeners.clear(); + hoisted.controllerListeners.set("work", { sendComposingTo, sendMessage, sendPoll, @@ -145,7 +162,7 @@ describe("web outbound", () => { }); it("throws a helpful error when no active listener exists", async () => { - setActiveWebListener(null); + hoisted.controllerListeners.clear(); await expect( sendMessageWhatsApp("+1555", "hi", { verbose: false, accountId: "work" }), ).rejects.toThrow(/No active WhatsApp Web listener/); @@ -259,7 +276,7 @@ describe("web outbound", () => { }); it("uses account-aware WhatsApp media caps for outbound uploads", async () => { - setActiveWebListener("work", { + hoisted.controllerListeners.set("work", { sendComposingTo, sendMessage, sendPoll, diff --git a/extensions/whatsapp/src/send.ts b/extensions/whatsapp/src/send.ts index 50c71b9370b..c397c89c397 100644 --- a/extensions/whatsapp/src/send.ts +++ b/extensions/whatsapp/src/send.ts @@ -1,3 +1,4 @@ +import { formatCliCommand } from "openclaw/plugin-sdk/cli-runtime"; import { loadConfig, type OpenClawConfig } from "openclaw/plugin-sdk/config-runtime"; import { resolveMarkdownTableMode } from "openclaw/plugin-sdk/config-runtime"; import { generateSecureUuid } from "openclaw/plugin-sdk/core"; @@ -11,7 +12,8 @@ import { resolveWhatsAppAccount, resolveWhatsAppMediaMaxBytes, } from "./accounts.js"; -import { type ActiveWebSendOptions, requireActiveWebListener } from "./active-listener.js"; +import { getRegisteredWhatsAppConnectionController } from "./connection-controller-registry.js"; +import type { ActiveWebListener, ActiveWebSendOptions } from "./inbound/types.js"; import { loadOutboundMediaFromUrl } from "./outbound-media.runtime.js"; import { markdownToWhatsApp, toWhatsappJid } from "./text-runtime.js"; @@ -28,6 +30,22 @@ function resolveOutboundWhatsAppAccountId(params: { return resolveDefaultWhatsAppAccountId(params.cfg); } +function requireOutboundActiveWebListener(params: { cfg: OpenClawConfig; accountId?: string }): { + accountId: string; + listener: ActiveWebListener; +} { + const accountId = resolveOutboundWhatsAppAccountId(params); + const resolvedAccountId = accountId ?? resolveDefaultWhatsAppAccountId(params.cfg); + const listener = + getRegisteredWhatsAppConnectionController(resolvedAccountId)?.getActiveListener() ?? null; + if (!listener) { + throw new Error( + `No active WhatsApp Web listener (account: ${resolvedAccountId}). Start the gateway, then link WhatsApp with: ${formatCliCommand(`openclaw channels login --channel whatsapp --account ${resolvedAccountId}`)}.`, + ); + } + return { accountId: resolvedAccountId, listener }; +} + export async function sendMessageWhatsApp( to: string, body: string, @@ -60,12 +78,10 @@ export async function sendMessageWhatsApp( const correlationId = generateSecureUuid(); const startedAt = Date.now(); const cfg = options.cfg ?? loadConfig(); - const effectiveAccountId = resolveOutboundWhatsAppAccountId({ + const { listener: active, accountId: resolvedAccountId } = requireOutboundActiveWebListener({ cfg, accountId: options.accountId, }); - const { listener: active, accountId: resolvedAccountId } = - requireActiveWebListener(effectiveAccountId); const account = resolveWhatsAppAccount({ cfg, accountId: resolvedAccountId ?? options.accountId, @@ -158,11 +174,10 @@ export async function sendReactionWhatsApp( ): Promise { const correlationId = generateSecureUuid(); const cfg = loadConfig(); - const effectiveAccountId = resolveOutboundWhatsAppAccountId({ + const { listener: active } = requireOutboundActiveWebListener({ cfg, accountId: options.accountId, }); - const { listener: active } = requireActiveWebListener(effectiveAccountId); const redactedChatJid = redactIdentifier(chatJid); const logger = getChildLogger({ module: "web-outbound", @@ -201,11 +216,10 @@ export async function sendPollWhatsApp( const correlationId = generateSecureUuid(); const startedAt = Date.now(); const cfg = options.cfg ?? loadConfig(); - const effectiveAccountId = resolveOutboundWhatsAppAccountId({ + const { listener: active } = requireOutboundActiveWebListener({ cfg, accountId: options.accountId, }); - const { listener: active } = requireActiveWebListener(effectiveAccountId); const redactedTo = redactIdentifier(to); const logger = getChildLogger({ module: "web-outbound", diff --git a/src/plugins/runtime-plugin-boundary.whatsapp.test.ts b/src/plugins/runtime-plugin-boundary.whatsapp.test.ts index 05ad7c655ae..ef6e00448bc 100644 --- a/src/plugins/runtime-plugin-boundary.whatsapp.test.ts +++ b/src/plugins/runtime-plugin-boundary.whatsapp.test.ts @@ -11,7 +11,7 @@ type LightModule = { }; type HeavyModule = { - setActiveWebListener: ( + registerControllerForTest: ( accountId: string | null | undefined, listener: { sendMessage: () => Promise<{ messageId: string }> } | null, ) => void; @@ -48,23 +48,35 @@ function createBundledWhatsAppRuntimeFixture() { [bundledDistPluginFile("whatsapp", "light-runtime-api.js")]: 'export { getActiveWebListener } from "../../active-listener.js";\n', [bundledDistPluginFile("whatsapp", "runtime-api.js")]: - 'export { getActiveWebListener, setActiveWebListener } from "../../active-listener.js";\n', - "dist/active-listener.js": [ - 'const key = Symbol.for("openclaw.whatsapp.activeListenerState");', + 'export { registerControllerForTest } from "../../connection-controller-registry.js";\n', + "dist/connection-controller-registry.js": [ + 'const key = Symbol.for("openclaw.whatsapp.connectionControllerRegistry");', "const g = globalThis;", "if (!g[key]) {", - " g[key] = { listeners: new Map(), current: null };", + " g[key] = { controllers: new Map() };", "}", "const state = g[key];", - "export function setActiveWebListener(accountIdOrListener, maybeListener) {", - ' const accountId = typeof accountIdOrListener === "string" ? accountIdOrListener : "default";', - ' const listener = typeof accountIdOrListener === "string" ? (maybeListener ?? null) : (accountIdOrListener ?? null);', - " if (!listener) state.listeners.delete(accountId);", - " else state.listeners.set(accountId, listener);", - ' if (accountId === "default") state.current = listener;', + "export function getRegisteredWhatsAppConnectionController(accountId) {", + " return state.controllers.get(accountId) ?? null;", "}", + "export function registerControllerForTest(accountId, listener) {", + ' const id = accountId ?? "default";', + " if (!listener) {", + " state.controllers.delete(id);", + " return;", + " }", + " state.controllers.set(id, {", + " getActiveListener() {", + " return listener;", + " },", + " });", + "}", + "", + ].join("\n"), + "dist/active-listener.js": [ + 'import { getRegisteredWhatsAppConnectionController } from "./connection-controller-registry.js";', "export function getActiveWebListener(accountId) {", - ' return state.listeners.get(accountId ?? "default") ?? null;', + ' return getRegisteredWhatsAppConnectionController(accountId ?? "default")?.getActiveListener() ?? null;', "}", "", ].join("\n"), @@ -100,9 +112,9 @@ function expectSharedWhatsAppListenerState(runtimePluginDir: string, accountId: const { light, heavy } = loadWhatsAppBoundaryModules(runtimePluginDir); const listener = createListener(); - heavy.setActiveWebListener(accountId, listener); + heavy.registerControllerForTest(accountId, listener); expect(light.getActiveWebListener(accountId)).toBe(listener); - heavy.setActiveWebListener(accountId, null); + heavy.registerControllerForTest(accountId, null); } afterEach(() => {