Files
openclaw/extensions/whatsapp/src/connection-controller.test.ts
2026-05-30 00:59:12 -03:00

403 lines
13 KiB
TypeScript

import { EventEmitter } from "node:events";
import { DisconnectReason } from "baileys";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { getRegisteredWhatsAppConnectionController } from "./connection-controller-registry.js";
import {
closeWaSocket,
waitForWhatsAppLoginResult,
WhatsAppConnectionController,
} from "./connection-controller.js";
import type { WhatsAppSendKind, WhatsAppSendResult } from "./inbound/send-result.js";
import { createWaSocket, waitForWaConnection } from "./session.js";
vi.mock("./session.js", async () => {
const actual = await vi.importActual<typeof import("./session.js")>("./session.js");
return {
...actual,
createWaSocket: vi.fn(),
waitForWaConnection: vi.fn(),
};
});
const createWaSocketMock = vi.mocked(createWaSocket);
const waitForWaConnectionMock = vi.mocked(waitForWaConnection);
function acceptedSendResult(kind: WhatsAppSendKind, id: string): WhatsAppSendResult {
return {
kind,
messageId: id,
keys: [{ id }],
providerAccepted: true,
};
}
function createListenerStub(messageId = "ok") {
return {
sendMessage: vi.fn(async () => acceptedSendResult("text", messageId)),
sendPoll: vi.fn(async () => acceptedSendResult("poll", messageId)),
sendReaction: vi.fn(async () => acceptedSendResult("reaction", messageId)),
sendComposingTo: vi.fn(async () => {}),
};
}
function createSocketWithTransportEmitter() {
const ws = new EventEmitter() as EventEmitter & { close: ReturnType<typeof vi.fn> };
ws.close = vi.fn();
return {
end: vi.fn(),
ws,
};
}
describe("WhatsAppConnectionController", () => {
let controller: WhatsAppConnectionController;
beforeEach(() => {
vi.clearAllMocks();
controller = new WhatsAppConnectionController({
accountId: "work",
authDir: "/tmp/wa-auth",
verbose: false,
keepAlive: false,
heartbeatSeconds: 30,
transportTimeoutMs: 60_000,
messageTimeoutMs: 60_000,
watchdogCheckMs: 5_000,
reconnectPolicy: {
initialMs: 250,
maxMs: 1_000,
factor: 2,
jitter: 0,
maxAttempts: 5,
},
});
});
afterEach(async () => {
await controller.shutdown();
});
it("closes the socket when open fails before listener creation", async () => {
const sock = {
end: vi.fn(),
ws: {
close: vi.fn(),
},
};
const createListener = vi.fn();
createWaSocketMock.mockResolvedValueOnce(sock as never);
waitForWaConnectionMock.mockRejectedValueOnce(new Error("handshake failed"));
await expect(
controller.openConnection({
connectionId: "conn-1",
createListener,
}),
).rejects.toThrow("handshake failed");
expect(createListener).not.toHaveBeenCalled();
expect(sock.end).toHaveBeenCalledOnce();
const closeError = sock.end.mock.calls[0]?.[0] as Error | undefined;
expect(closeError).toBeInstanceOf(Error);
expect(closeError?.message).toBe("OpenClaw WhatsApp socket close");
expect(sock.ws.close).not.toHaveBeenCalled();
expect(controller.socketRef.current).toBeNull();
expect(controller.getActiveListener()).toBeNull();
});
it("falls back to raw websocket close when Baileys end is unavailable", () => {
const sock = { ws: { close: vi.fn() } };
closeWaSocket(sock);
expect(sock.ws.close).toHaveBeenCalledOnce();
});
it("lets createWaSocket own the auth barrier before opening a socket", async () => {
const callOrder: string[] = [];
createWaSocketMock.mockImplementationOnce(async () => {
callOrder.push("create");
return { ws: { close: vi.fn() } } as never;
});
waitForWaConnectionMock.mockImplementationOnce(async () => {
callOrder.push("wait-for-connection");
});
await controller.openConnection({
connectionId: "conn-flush-first",
createListener: async () => createListenerStub() as never,
});
expect(callOrder).toEqual(["create", "wait-for-connection"]);
});
it("restarts login once on status 408 and preserves replacement socket options", async () => {
const initialSock = createSocketWithTransportEmitter();
const replacementSock = createSocketWithTransportEmitter();
const waitForConnection = vi
.fn()
.mockRejectedValueOnce({ output: { statusCode: DisconnectReason.timedOut } })
.mockResolvedValueOnce(undefined);
const onQr = vi.fn();
const onSocketReplaced = vi.fn();
const createSocket = vi.fn(
async (_printQr: boolean, _verbose: boolean, opts?: { onQr?: (qr: string) => void }) => {
opts?.onQr?.("qr-after-timeout");
return replacementSock;
},
);
const result = await waitForWhatsAppLoginResult({
sock: initialSock as never,
authDir: "/tmp/wa-auth",
isLegacyAuthDir: false,
verbose: true,
runtime: { log: vi.fn() } as never,
waitForConnection: waitForConnection as never,
createSocket: createSocket as never,
socketTiming: {
connectTimeoutMs: 10_000,
defaultQueryTimeoutMs: 20_000,
keepAliveIntervalMs: 30_000,
},
onQr,
onSocketReplaced,
});
expect(result).toEqual({
outcome: "connected",
restarted: true,
sock: replacementSock,
});
expect(initialSock.end).toHaveBeenCalledOnce();
expect(createSocket).toHaveBeenCalledWith(false, true, {
authDir: "/tmp/wa-auth",
connectTimeoutMs: 10_000,
defaultQueryTimeoutMs: 20_000,
keepAliveIntervalMs: 30_000,
onQr,
});
expect(onQr).toHaveBeenCalledWith("qr-after-timeout");
expect(onSocketReplaced).toHaveBeenCalledWith(replacementSock);
});
it("still honors the post-pairing 515 restart after a status 408 recovery", async () => {
const initialSock = createSocketWithTransportEmitter();
const afterTimeoutSock = createSocketWithTransportEmitter();
const afterPairingRestartSock = createSocketWithTransportEmitter();
const waitForConnection = vi
.fn()
.mockRejectedValueOnce({ output: { statusCode: DisconnectReason.timedOut } })
.mockRejectedValueOnce({ output: { statusCode: 515 } })
.mockResolvedValueOnce(undefined);
const createSocket = vi
.fn()
.mockResolvedValueOnce(afterTimeoutSock)
.mockResolvedValueOnce(afterPairingRestartSock);
const result = await waitForWhatsAppLoginResult({
sock: initialSock as never,
authDir: "/tmp/wa-auth",
isLegacyAuthDir: false,
verbose: false,
runtime: { log: vi.fn() } as never,
waitForConnection: waitForConnection as never,
createSocket: createSocket as never,
});
expect(result).toEqual({
outcome: "connected",
restarted: true,
sock: afterPairingRestartSock,
});
expect(createSocket).toHaveBeenCalledTimes(2);
expect(waitForConnection).toHaveBeenCalledTimes(3);
expect(initialSock.end).toHaveBeenCalledOnce();
expect(afterTimeoutSock.end).toHaveBeenCalledOnce();
});
it("does not keep recreating sockets when login status 408 persists", async () => {
const initialSock = createSocketWithTransportEmitter();
const replacementSock = createSocketWithTransportEmitter();
const timeoutError = { output: { statusCode: DisconnectReason.timedOut } };
const waitForConnection = vi
.fn()
.mockRejectedValueOnce(timeoutError)
.mockRejectedValueOnce(timeoutError);
const createSocket = vi.fn(async () => replacementSock);
const result = await waitForWhatsAppLoginResult({
sock: initialSock as never,
authDir: "/tmp/wa-auth",
isLegacyAuthDir: false,
verbose: false,
runtime: { log: vi.fn() } as never,
waitForConnection: waitForConnection as never,
createSocket: createSocket as never,
});
expect(result).toMatchObject({
outcome: "failed",
statusCode: DisconnectReason.timedOut,
error: timeoutError,
});
expect(createSocket).toHaveBeenCalledOnce();
expect(waitForConnection).toHaveBeenCalledTimes(2);
});
it("keeps the previous registered controller until a replacement listener is ready", async () => {
const liveController = new WhatsAppConnectionController({
accountId: "work",
authDir: "/tmp/wa-auth",
verbose: false,
keepAlive: false,
heartbeatSeconds: 30,
transportTimeoutMs: 60_000,
messageTimeoutMs: 60_000,
watchdogCheckMs: 5_000,
reconnectPolicy: {
initialMs: 250,
maxMs: 1_000,
factor: 2,
jitter: 0,
maxAttempts: 5,
},
});
const liveListener = createListenerStub("live");
createWaSocketMock.mockResolvedValueOnce({ ws: { close: vi.fn() } } as never);
waitForWaConnectionMock.mockResolvedValueOnce(undefined);
await liveController.openConnection({
connectionId: "live-conn",
createListener: async () => liveListener,
});
expect(getRegisteredWhatsAppConnectionController("work")).toBe(liveController);
const replacement = new WhatsAppConnectionController({
accountId: "work",
authDir: "/tmp/wa-auth-2",
verbose: false,
keepAlive: false,
heartbeatSeconds: 30,
transportTimeoutMs: 60_000,
messageTimeoutMs: 60_000,
watchdogCheckMs: 5_000,
reconnectPolicy: {
initialMs: 250,
maxMs: 1_000,
factor: 2,
jitter: 0,
maxAttempts: 5,
},
});
try {
createWaSocketMock.mockResolvedValueOnce({ ws: { close: vi.fn() } } as never);
waitForWaConnectionMock.mockRejectedValueOnce(new Error("replacement failed"));
await expect(
replacement.openConnection({
connectionId: "replacement-conn",
createListener: async () => liveListener,
}),
).rejects.toThrow("replacement failed");
expect(getRegisteredWhatsAppConnectionController("work")).toBe(liveController);
} finally {
await replacement.shutdown();
await liveController.shutdown();
}
});
it("tracks real websocket frame activity in the connection snapshot", async () => {
vi.useFakeTimers();
const controller = new WhatsAppConnectionController({
accountId: "work",
authDir: "/tmp/wa-auth",
verbose: false,
keepAlive: true,
heartbeatSeconds: 1,
transportTimeoutMs: 60_000,
messageTimeoutMs: 60_000,
watchdogCheckMs: 5_000,
reconnectPolicy: {
initialMs: 250,
maxMs: 1_000,
factor: 2,
jitter: 0,
maxAttempts: 5,
},
});
try {
const sock = createSocketWithTransportEmitter();
createWaSocketMock.mockResolvedValueOnce(sock as never);
waitForWaConnectionMock.mockResolvedValueOnce(undefined);
const snapshots: Array<{ lastTransportActivityAt: number }> = [];
await controller.openConnection({
connectionId: "conn-frame-activity",
createListener: async () => createListenerStub() as never,
onHeartbeat: (snapshot) => snapshots.push(snapshot),
});
await vi.advanceTimersByTimeAsync(1_000);
const firstSnapshot = snapshots.at(-1);
expect(firstSnapshot?.lastTransportActivityAt).toBeTypeOf("number");
const firstTransportAt = firstSnapshot?.lastTransportActivityAt ?? 0;
await vi.advanceTimersByTimeAsync(250);
sock.ws.emit("frame");
await vi.advanceTimersByTimeAsync(1_000);
const lastSnapshot = snapshots.at(-1);
expect(lastSnapshot?.lastTransportActivityAt).toBeGreaterThan(firstTransportAt);
} finally {
await controller.shutdown();
vi.useRealTimers();
}
});
it("forces reconnect on transport stall before the long app-silence window", async () => {
vi.useFakeTimers();
const controller = new WhatsAppConnectionController({
accountId: "work",
authDir: "/tmp/wa-auth",
verbose: false,
keepAlive: true,
heartbeatSeconds: 1,
transportTimeoutMs: 30,
messageTimeoutMs: 3_000,
watchdogCheckMs: 5,
reconnectPolicy: {
initialMs: 250,
maxMs: 1_000,
factor: 2,
jitter: 0,
maxAttempts: 5,
},
});
try {
const sock = createSocketWithTransportEmitter();
createWaSocketMock.mockResolvedValueOnce(sock as never);
waitForWaConnectionMock.mockResolvedValueOnce(undefined);
const timeouts: string[] = [];
await controller.openConnection({
connectionId: "conn-transport-timeout",
createListener: async () => createListenerStub() as never,
onWatchdogTimeout: () => timeouts.push("timeout"),
});
await vi.advanceTimersByTimeAsync(40);
expect(timeouts.length).toBeGreaterThanOrEqual(1);
} finally {
await controller.shutdown();
vi.useRealTimers();
}
});
});