diff --git a/CHANGELOG.md b/CHANGELOG.md index 692bb00fe42..2ec64e19c44 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -132,6 +132,7 @@ Docs: https://docs.openclaw.ai - Local model prompt caching: keep stable Project Context above volatile channel/session prompt guidance and stop embedding current channel names in the message tool description, so Ollama, MLX, llama.cpp, and other prefix-cache backends avoid avoidable full prompt reprocessing across channel turns. Fixes #40256; supersedes #40296. Thanks @rhclaw and @sriram369. - Gateway/OpenAI-compatible API: guard provider policy lookup against runtime providers with non-array `models` values, so `/v1/chat/completions` no longer fails with `provider?.models?.some is not a function`. Fixes #66744; carries forward #66761. Thanks @MightyMoud, @MukundaKatta. - WhatsApp/Web: pass explicit Baileys socket timings into every WhatsApp Web socket and expose `web.whatsapp.*` keepalive, connect, and query timeout settings so unstable networks can avoid repeated 408 disconnect and opening-handshake timeout loops. Fixes #56365. (#73580) Thanks @velvet-shark. +- WhatsApp/Web: recover recently active listeners when a post-408 reconnect keeps receiving transport frames but stops delivering app messages, while keeping group metadata fallback off Baileys sends. Fixes #63855 and #66920; refs #7433, #67986, #70856, #60007, and #72621. Thanks @legonhilltech-jpg, @octopuslabs-fl, @Kanorin-chan, and @stuswan. - Channels/Telegram: persist native command metadata on target sessions so topic, helper, and ACP-bound slash commands keep their session metadata attached to the routed conversation. (#57548) Thanks @GaosCode. - Channels/native commands: keep validated native slash command replies visible in group chats while preserving explicit owner allowlists for command authorization. (#73672) Thanks @obviyus. - Pairing/doctor: bootstrap `commands.ownerAllowFrom` from the first approved DM pairing when no command owner exists, and have doctor explain missing owners so privileged slash commands are not accidentally unusable after onboarding. Thanks @pashpashpash. diff --git a/docs/channels/whatsapp.md b/docs/channels/whatsapp.md index ee055ccf20b..18289cfa88d 100644 --- a/docs/channels/whatsapp.md +++ b/docs/channels/whatsapp.md @@ -151,7 +151,7 @@ OpenClaw recommends running WhatsApp on a separate number when possible. (The ch ## Runtime model - Gateway owns the WhatsApp socket and reconnect loop. -- The reconnect watchdog uses WhatsApp Web transport activity, not only inbound app-message volume, so a quiet linked-device session is not restarted solely because nobody has sent a message recently. A longer application-silence cap still forces a reconnect if transport frames keep arriving but no application messages are handled for the watchdog window. +- The reconnect watchdog uses WhatsApp Web transport activity, not only inbound app-message volume, so a quiet linked-device session is not restarted solely because nobody has sent a message recently. A longer application-silence cap still forces a reconnect if transport frames keep arriving but no application messages are handled for the watchdog window; after a transient reconnect for a recently active session, that application-silence check uses the normal message timeout for the first recovery window. - Baileys socket timings are explicit under `web.whatsapp.*`: `keepAliveIntervalMs` controls WhatsApp Web application pings, `connectTimeoutMs` controls the opening handshake timeout, and `defaultQueryTimeoutMs` controls Baileys query timeouts. - Outbound sends require an active WhatsApp listener for the target account. - Status and broadcast chats are ignored (`@status`, `@broadcast`). diff --git a/extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts b/extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts index 4ab0c9029ea..5ab9a3c0846 100644 --- a/extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts +++ b/extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts @@ -513,6 +513,51 @@ describe("web auto-reply connection", () => { } }); + it("recovers a post-408 listener when transport frames continue but app delivery stays silent", async () => { + vi.useFakeTimers(); + try { + const { scripted, controller, run } = await startWatchdogScenario({ + monitorWebChannel, + }); + + scripted.resolveClose(0, { + status: 408, + isLoggedOut: false, + error: "status=408 Request Time-out", + }); + await vi.waitFor( + () => { + expect(scripted.getListenerCount()).toBe(2); + }, + { timeout: 250, interval: 2 }, + ); + + const reconnectedSocket = getLastWebAutoReplySessionSocket(); + for (let elapsedMs = 0; elapsedMs < 45; elapsedMs += 5) { + reconnectedSocket.ws.emit("frame"); + await vi.advanceTimersByTimeAsync(5); + } + + await vi.waitFor( + () => { + expect(scripted.getListenerCount()).toBeGreaterThanOrEqual(3); + }, + { timeout: 250, interval: 2 }, + ); + + controller.abort(); + scripted.resolveClose(scripted.getListenerCount() - 1, { + status: 499, + isLoggedOut: false, + error: "aborted", + }); + await Promise.resolve(); + await run; + } finally { + vi.useRealTimers(); + } + }); + it("gives a reconnected listener a fresh watchdog window", async () => { vi.useFakeTimers(); try { diff --git a/extensions/whatsapp/src/auto-reply/monitor.ts b/extensions/whatsapp/src/auto-reply/monitor.ts index 662b793280f..595b52ff534 100644 --- a/extensions/whatsapp/src/auto-reply/monitor.ts +++ b/extensions/whatsapp/src/auto-reply/monitor.ts @@ -20,7 +20,7 @@ import { WhatsAppConnectionController, type ManagedWhatsAppListener, } from "../connection-controller.js"; -import { attachWebInboxToSocket } from "../inbound/monitor.js"; +import { attachWebInboxToSocket, type WhatsAppGroupMetadataCache } from "../inbound/monitor.js"; import { newConnectionId, resolveHeartbeatSeconds, @@ -202,6 +202,7 @@ export async function monitorWebChannel( }> >(); const groupMemberNames = new Map>(); + const groupMetadataCache: WhatsAppGroupMetadataCache = new Map(); const echoTracker = createEchoTracker({ maxItems: 100, logVerbose }); const sleep = @@ -305,6 +306,7 @@ export async function monitorWebChannel( shouldRetryDisconnect: () => !sigintStop && controller.shouldRetryDisconnect(), disconnectRetryPolicy: reconnectPolicy, disconnectRetryAbortSignal: controller.getDisconnectRetryAbortSignal(), + groupMetadataCache, onMessage: async (msg: WebInboundMsg) => { const inboundAt = Date.now(); controller.noteInbound(inboundAt); diff --git a/extensions/whatsapp/src/connection-controller.ts b/extensions/whatsapp/src/connection-controller.ts index 7fa383b8fe0..10762947403 100644 --- a/extensions/whatsapp/src/connection-controller.ts +++ b/extensions/whatsapp/src/connection-controller.ts @@ -45,6 +45,7 @@ export type WhatsAppLiveConnection = { handledMessages: number; unregisterUnhandled: (() => void) | null; unregisterTransportActivity: (() => void) | null; + openedAfterRecentInbound: boolean; backgroundTasks: Set>; closePromise: Promise; resolveClose: (reason: WebListenerCloseReason) => void; @@ -97,6 +98,7 @@ function createLiveConnection(params: { connectionId: string; sock: WASocket; listener: ManagedWhatsAppListener; + openedAfterRecentInbound: boolean; }): WhatsAppLiveConnection { let closeResolved = false; let resolveClosePromise = (_reason: WebListenerCloseReason) => {}; @@ -122,6 +124,7 @@ function createLiveConnection(params: { handledMessages: 0, unregisterUnhandled: null, unregisterTransportActivity: null, + openedAfterRecentInbound: params.openedAfterRecentInbound, backgroundTasks: new Set>(), closePromise, resolveClose: resolveClosePromise, @@ -259,6 +262,7 @@ export class WhatsAppConnectionController { private current: WhatsAppLiveConnection | null = null; private reconnectAttempts = 0; + private lastHandledInboundAt: number | null = null; constructor(params: { accountId: string; @@ -334,6 +338,8 @@ export class WhatsAppConnectionController { this.current.handledMessages += 1; this.current.lastInboundAt = timestamp; this.current.lastTransportActivityAt = timestamp; + this.current.openedAfterRecentInbound = false; + this.lastHandledInboundAt = timestamp; } noteTransportActivity(timestamp = Date.now()): void { @@ -397,6 +403,7 @@ export class WhatsAppConnectionController { connectionId: params.connectionId, sock, listener: placeholderListener, + openedAfterRecentInbound: this.isOpeningAfterRecentInbound(), }); const listener = await params.createListener({ sock, connection }); connection.listener = listener; @@ -602,10 +609,10 @@ export class WhatsAppConnectionController { const transportStaleForMs = now - connection.lastTransportActivityAt; const appBaselineAt = connection.lastInboundAt ?? connection.startedAt; const appSilentForMs = now - appBaselineAt; - if ( - transportStaleForMs <= this.transportTimeoutMs && - appSilentForMs <= this.appSilenceTimeoutMs - ) { + const appSilenceTimeoutMs = connection.openedAfterRecentInbound + ? this.messageTimeoutMs + : this.appSilenceTimeoutMs; + if (transportStaleForMs <= this.transportTimeoutMs && appSilentForMs <= appSilenceTimeoutMs) { return; } const snapshot = this.getCurrentSnapshot(connection); @@ -639,6 +646,13 @@ export class WhatsAppConnectionController { }; } + private isOpeningAfterRecentInbound(): boolean { + if (this.reconnectAttempts <= 0 || this.lastHandledInboundAt === null) { + return false; + } + return Date.now() - this.lastHandledInboundAt <= this.appSilenceTimeoutMs; + } + private stopDisconnectRetries(): void { if (!this.disconnectRetryController.signal.aborted) { this.disconnectRetryController.abort(); diff --git a/extensions/whatsapp/src/inbound/monitor.ts b/extensions/whatsapp/src/inbound/monitor.ts index a2107afd0f2..497a5e4afc0 100644 --- a/extensions/whatsapp/src/inbound/monitor.ts +++ b/extensions/whatsapp/src/inbound/monitor.ts @@ -2,6 +2,7 @@ import type { AnyMessageContent, MiscMessageGenerationOptions, proto, + GroupMetadata, WAMessage, WASocket, } from "@whiskeysockets/baileys"; @@ -45,6 +46,53 @@ import type { WebInboundMessage, WebListenerCloseReason } from "./types.js"; const LOGGED_OUT_STATUS = DisconnectReason?.loggedOut ?? 401; const RECONNECT_IN_PROGRESS_ERROR = "no active socket - reconnection in progress"; +const GROUP_META_TTL_MS = 5 * 60 * 1000; // 5 minutes +export const WHATSAPP_GROUP_METADATA_CACHE_MAX_ENTRIES = 500; + +export type WhatsAppGroupMetadataCacheEntry = { + subject?: string; + expires: number; +}; +export type WhatsAppGroupMetadataCache = Map; +type LocalGroupMetadataCacheEntry = WhatsAppGroupMetadataCacheEntry & { + participants?: string[]; +}; + +function rememberGroupMetadataCacheEntry( + cache: Map, + jid: string, + entry: T, +): void { + if (cache.has(jid)) { + cache.delete(jid); + } + cache.set(jid, entry); + + while (cache.size > WHATSAPP_GROUP_METADATA_CACHE_MAX_ENTRIES) { + const oldest = cache.keys().next(); + if (oldest.done) { + break; + } + cache.delete(oldest.value); + } +} + +function readGroupMetadataCacheEntry( + cache: Map, + jid: string, +): T | null { + const entry = cache.get(jid); + if (!entry) { + return null; + } + if (entry.expires <= Date.now()) { + cache.delete(jid); + return null; + } + cache.delete(jid); + cache.set(jid, entry); + return entry; +} function logWhatsAppVerbose(enabled: boolean | undefined, message: string) { if (!enabled) { @@ -98,6 +146,8 @@ export type MonitorWebInboxOptions = { }; /** Abort in-flight reconnect waits when shutdown becomes terminal. */ disconnectRetryAbortSignal?: AbortSignal; + /** Shared group metadata cache used only for inbound metadata fallback after fetch failures. */ + groupMetadataCache?: WhatsAppGroupMetadataCache; }; export async function attachWebInboxToSocket( @@ -234,11 +284,8 @@ export async function attachWebInboxToSocket( inboundConsoleLog.error(`Failed handling inbound web message: ${String(err)}`); }, }); - const groupMetaCache = new Map< - string, - { subject?: string; participants?: string[]; expires: number } - >(); - const GROUP_META_TTL_MS = 5 * 60 * 1000; // 5 minutes + const groupMetadataCache = options.groupMetadataCache ?? new Map(); + const groupMetaCache = new Map(); const lidLookup = sock.signalRepository?.lidMapping; const resolveInboundJid = async (jid: string | null | undefined): Promise => @@ -306,30 +353,54 @@ export async function attachWebInboxToSocket( } }; + const summarizeGroupMeta = async (meta: GroupMetadata) => { + const participants = + ( + await Promise.all( + meta.participants?.map(async (p) => { + const mapped = await resolveInboundJid(p.id); + return mapped ?? p.id; + }) ?? [], + ) + ).filter(Boolean) ?? []; + return { + subject: meta.subject, + participants, + expires: Date.now() + GROUP_META_TTL_MS, + }; + }; + + const summarizeGroupMetaForReconnectCache = ( + meta: GroupMetadata, + ): WhatsAppGroupMetadataCacheEntry => ({ + subject: meta.subject, + expires: Date.now() + GROUP_META_TTL_MS, + }); + const getGroupMeta = async (jid: string) => { - const cached = groupMetaCache.get(jid); - if (cached && cached.expires > Date.now()) { + const cached = readGroupMetadataCacheEntry(groupMetaCache, jid); + if (cached) { return cached; } try { const meta = await sock.groupMetadata(jid); - const participants = - ( - await Promise.all( - meta.participants?.map(async (p) => { - const mapped = await resolveInboundJid(p.id); - return mapped ?? p.id; - }) ?? [], - ) - ).filter(Boolean) ?? []; - const entry = { - subject: meta.subject, - participants, - expires: Date.now() + GROUP_META_TTL_MS, - }; - groupMetaCache.set(jid, entry); + const entry = await summarizeGroupMeta(meta); + rememberGroupMetadataCacheEntry(groupMetadataCache, jid, { + subject: entry.subject, + expires: entry.expires, + }); + rememberGroupMetadataCacheEntry(groupMetaCache, jid, entry); return entry; } catch (err) { + const hydrated = readGroupMetadataCacheEntry(groupMetadataCache, jid); + if (hydrated) { + rememberGroupMetadataCacheEntry(groupMetaCache, jid, hydrated); + logWhatsAppVerbose( + options.verbose, + `Using cached group metadata for ${jid} after fetch failure: ${String(err)}`, + ); + return hydrated; + } logWhatsAppVerbose( options.verbose, `Failed to fetch group metadata for ${jid}: ${String(err)}`, @@ -733,6 +804,15 @@ export async function attachWebInboxToSocket( void (async () => { try { const groups = await sock.groupFetchAllParticipating(); + for (const [jid, meta] of Object.entries(groups ?? {})) { + if (meta) { + rememberGroupMetadataCacheEntry( + groupMetadataCache, + jid, + summarizeGroupMetaForReconnectCache(meta), + ); + } + } logWhatsAppVerbose( options.verbose, `Hydrated ${Object.keys(groups ?? {}).length} participating groups on connect`, diff --git a/extensions/whatsapp/src/monitor-inbox.streams-inbound-messages.test-support.ts b/extensions/whatsapp/src/monitor-inbox.streams-inbound-messages.test-support.ts index fa40c36b39f..47c5ad05dfe 100644 --- a/extensions/whatsapp/src/monitor-inbox.streams-inbound-messages.test-support.ts +++ b/extensions/whatsapp/src/monitor-inbox.streams-inbound-messages.test-support.ts @@ -3,6 +3,7 @@ import path from "node:path"; import "./monitor-inbox.test-harness.js"; import { beforeEach, describe, expect, it, vi } from "vitest"; import { WhatsAppRetryableInboundError } from "./inbound/dedupe.js"; +import { WHATSAPP_GROUP_METADATA_CACHE_MAX_ENTRIES } from "./inbound/monitor.js"; import { type InboxMonitorOptions, InboxOnMessage, @@ -215,6 +216,93 @@ describe("web monitor inbox", () => { await listener.close(); }); + it("keeps group inbound alive with cached metadata after reconnect-time metadata fetch failures", async () => { + const groupMetadataCache: NonNullable = new Map(); + const onMessage = vi.fn(async (_msg: Parameters[0]) => { + return; + }); + + const firstSock = getSock(); + firstSock.groupFetchAllParticipating.mockResolvedValueOnce({ + "123@g.us": { + id: "123@g.us", + subject: "Recovered Group", + owner: undefined, + participants: [{ id: "444@s.whatsapp.net" }], + }, + }); + const first = await startInboxMonitor(onMessage as InboxOnMessage, { + groupMetadataCache, + }); + await vi.waitFor(() => { + expect(groupMetadataCache.get("123@g.us")?.subject).toBe("Recovered Group"); + }); + expect( + (groupMetadataCache.get("123@g.us") as Record)?.participants, + ).toBeUndefined(); + await first.listener.close(); + + const second = await startInboxMonitor(onMessage as InboxOnMessage, { + groupMetadataCache, + }); + second.sock.groupMetadata.mockRejectedValueOnce(new Error("408 timed out")); + second.sock.ev.emit( + "messages.upsert", + buildNotifyMessageUpsert({ + id: nextMessageId("group-reconnect-cache"), + remoteJid: "123@g.us", + participant: "444@s.whatsapp.net", + text: "ping", + timestamp: 1_700_000_000, + }), + ); + + await waitForMessageCalls(onMessage, 1); + expect(onMessage).toHaveBeenCalledWith( + expect.objectContaining({ + body: "ping", + from: "123@g.us", + groupSubject: "Recovered Group", + senderE164: "+444", + chatType: "group", + }), + ); + expect(onMessage.mock.calls[0]?.[0].groupParticipants).toBeUndefined(); + + await second.listener.close(); + }); + + it("bounds cached group metadata kept across reconnects", async () => { + const groupMetadataCache: NonNullable = new Map(); + const groups = Object.fromEntries( + Array.from({ length: WHATSAPP_GROUP_METADATA_CACHE_MAX_ENTRIES + 2 }, (_, index) => [ + `${index}@g.us`, + { + id: `${index}@g.us`, + subject: `Group ${index}`, + owner: undefined, + participants: [], + }, + ]), + ); + const sock = getSock(); + sock.groupFetchAllParticipating.mockResolvedValueOnce(groups); + + const { listener } = await startInboxMonitor(vi.fn(async () => {}) as InboxOnMessage, { + groupMetadataCache, + }); + + await vi.waitFor(() => { + expect(groupMetadataCache.size).toBe(WHATSAPP_GROUP_METADATA_CACHE_MAX_ENTRIES); + }); + expect(groupMetadataCache.has("0@g.us")).toBe(false); + expect(groupMetadataCache.has(`${WHATSAPP_GROUP_METADATA_CACHE_MAX_ENTRIES + 1}@g.us`)).toBe( + true, + ); + + await listener.close(); + }); + it("does not block inbound listeners while group hydration is pending", async () => { let resolveHydration!: () => void; const sock = getSock(); diff --git a/extensions/whatsapp/src/monitor-inbox.test-harness.ts b/extensions/whatsapp/src/monitor-inbox.test-harness.ts index 12f7b0e9a83..5c8cc5a4a1a 100644 --- a/extensions/whatsapp/src/monitor-inbox.test-harness.ts +++ b/extensions/whatsapp/src/monitor-inbox.test-harness.ts @@ -38,6 +38,7 @@ export type MockSock = { sendPresenceUpdate: AnyMockFn; sendMessage: AnyMockFn; readMessages: AnyMockFn; + groupMetadata: AnyMockFn; groupFetchAllParticipating: AnyMockFn; updateMediaMessage: AnyMockFn; logger: Record; @@ -110,6 +111,12 @@ function createMockSock(): MockSock { sendPresenceUpdate: createResolvedMock(), sendMessage: createResolvedMock(), readMessages: createResolvedMock(), + groupMetadata: vi.fn().mockImplementation(async (jid: string) => ({ + id: jid, + subject: "Test Group", + owner: undefined, + participants: [], + })), groupFetchAllParticipating: vi.fn().mockResolvedValue({}), updateMediaMessage: vi.fn(), logger: {}, diff --git a/extensions/whatsapp/src/session.ts b/extensions/whatsapp/src/session.ts index b421620aea8..e7a569ba0c3 100644 --- a/extensions/whatsapp/src/session.ts +++ b/extensions/whatsapp/src/session.ts @@ -130,7 +130,10 @@ async function printTerminalQr(qr: string): Promise { export async function createWaSocket( printQr: boolean, verbose: boolean, - opts: { authDir?: string; onQr?: (qr: string) => void } & WhatsAppSocketTimingOptions = {}, + opts: { + authDir?: string; + onQr?: (qr: string) => void; + } & WhatsAppSocketTimingOptions = {}, ): Promise> { const baseLogger = getChildLogger( { module: "baileys" },