diff --git a/CHANGELOG.md b/CHANGELOG.md index a83cca4aceb..ab3bcb00e8f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -75,6 +75,10 @@ Docs: https://docs.openclaw.ai - Agents/runtime: submit heartbeat, cron, and exec wakeups as transient runtime context instead of visible user prompts, keeping synthetic system work out of chat transcripts. Fixes #66496 and #66814. Thanks @jeades and @mandomaker. - Telegram: include native quote excerpts automatically for threaded replies and reply tags when the original Telegram text is available, without adding another config knob. Fixes #6975. Thanks @rex05ai. - Node/Linux: make `openclaw node install` enable and restart the `openclaw-node` systemd unit instead of the gateway unit on node-only VMs. Fixes #68287. Thanks @dlebee-agent. +- Browser/CDP: retry transient raw-CDP WebSocket handshake failures before any + browser command is sent, and reconnect stale persistent Playwright CDP + sessions for safe tab-list reads without replaying mutating browser actions. + Fixes #67728. - Telegram: preserve exact selected quote text when sending native quote replies, and retry with legacy replies if Telegram rejects quote parameters. (#71952) Thanks @rubencu. - Plugins/CLI: preserve manifest name, description, format, and source metadata in cold `openclaw plugins list` output without importing plugin runtime. Thanks @shakkernerd. - Security/audit: read channel exposure and plugin allowlist ownership from read-only plugin index metadata so cold audits do not depend on loaded channel runtime. Thanks @shakkernerd. diff --git a/extensions/browser/src/browser/cdp.helpers.internal.test.ts b/extensions/browser/src/browser/cdp.helpers.internal.test.ts index f1e716152aa..654bf6cefce 100644 --- a/extensions/browser/src/browser/cdp.helpers.internal.test.ts +++ b/extensions/browser/src/browser/cdp.helpers.internal.test.ts @@ -234,17 +234,90 @@ describe("cdp.helpers internal", () => { it("rejects in-flight pending calls when the socket closes mid-call", async () => { const server = await startWsServer(); wss = server.wss; + let callbackCount = 0; + let connectionCount = 0; server.wss.on("connection", (socket) => { + connectionCount += 1; socket.on("message", () => { // Defer close so the pending entry is definitely registered. setTimeout(() => socket.close(), 10); }); }); await expect( - withCdpSocket(server.url, async (send) => { - await send("Test.willClose"); - }), + withCdpSocket( + server.url, + async (send) => { + callbackCount += 1; + await send("Test.willClose"); + }, + { handshakeRetries: 2, handshakeRetryDelayMs: 1, handshakeMaxRetryDelayMs: 1 }, + ), ).rejects.toThrow(/CDP socket closed/); + expect(callbackCount).toBe(1); + expect(connectionCount).toBe(1); + }); + + it("retries websocket failures before any CDP command is sent", async () => { + let rejectedHandshakes = 0; + wss = new WebSocketServer({ + port: 0, + host: "127.0.0.1", + verifyClient: (_info, cb) => { + if (rejectedHandshakes === 0) { + rejectedHandshakes += 1; + cb(false, 503, "try later"); + return; + } + cb(true); + }, + }); + await new Promise((resolve) => wss?.once("listening", () => resolve())); + const port = (wss.address() as { port: number }).port; + let callbackCount = 0; + wss.on("connection", (socket) => { + socket.on("message", (raw) => { + const msg = JSON.parse(rawDataToString(raw)) as { id?: number; method?: string }; + socket.send(JSON.stringify({ id: msg.id, result: { echoed: msg.method } })); + }); + }); + + const result = await withCdpSocket<{ echoed?: string }>( + `ws://127.0.0.1:${port}/devtools/browser/TEST`, + async (send) => { + callbackCount += 1; + return (await send("Test.afterOpen")) as { echoed?: string }; + }, + { handshakeRetries: 2, handshakeRetryDelayMs: 1, handshakeMaxRetryDelayMs: 1 }, + ); + + expect(result.echoed).toBe("Test.afterOpen"); + expect(rejectedHandshakes).toBe(1); + expect(callbackCount).toBe(1); + }); + + it("does not retry rate-limited websocket handshakes", async () => { + let rejectedHandshakes = 0; + wss = new WebSocketServer({ + port: 0, + host: "127.0.0.1", + verifyClient: (_info, cb) => { + rejectedHandshakes += 1; + cb(false, 429, "too many requests"); + }, + }); + await new Promise((resolve) => wss?.once("listening", () => resolve())); + const port = (wss.address() as { port: number }).port; + + await expect( + withCdpSocket( + `ws://127.0.0.1:${port}/devtools/browser/TEST`, + async (send) => { + await send("Test.neverRuns"); + }, + { handshakeRetries: 2, handshakeRetryDelayMs: 1, handshakeMaxRetryDelayMs: 1 }, + ), + ).rejects.toThrow(/429/); + expect(rejectedHandshakes).toBe(1); }); it("rejects and closes the socket when a CDP command exceeds its timeout", async () => { diff --git a/extensions/browser/src/browser/cdp.helpers.ts b/extensions/browser/src/browser/cdp.helpers.ts index e22a18f6e16..0204680859c 100644 --- a/extensions/browser/src/browser/cdp.helpers.ts +++ b/extensions/browser/src/browser/cdp.helpers.ts @@ -409,46 +409,125 @@ export function openCdpWebSocket( }); } +type CdpSocketOptions = { + headers?: Record; + handshakeTimeoutMs?: number; + commandTimeoutMs?: number; + handshakeRetries?: number; + handshakeRetryDelayMs?: number; + handshakeMaxRetryDelayMs?: number; +}; + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +function normalizeRetryCount(value: number | undefined, fallback: number): number { + if (typeof value !== "number" || !Number.isFinite(value)) { + return fallback; + } + return Math.max(0, Math.floor(value)); +} + +function computeHandshakeRetryDelayMs(attempt: number, opts?: CdpSocketOptions): number { + const baseDelayMs = + typeof opts?.handshakeRetryDelayMs === "number" && Number.isFinite(opts.handshakeRetryDelayMs) + ? Math.max(1, Math.floor(opts.handshakeRetryDelayMs)) + : 200; + const maxDelayMs = + typeof opts?.handshakeMaxRetryDelayMs === "number" && + Number.isFinite(opts.handshakeMaxRetryDelayMs) + ? Math.max(baseDelayMs, Math.floor(opts.handshakeMaxRetryDelayMs)) + : 3000; + const raw = Math.min(maxDelayMs, baseDelayMs * 2 ** Math.max(0, attempt - 1)); + const jitterScale = 0.8 + Math.random() * 0.4; + return Math.max(1, Math.floor(raw * jitterScale)); +} + +function shouldRetryCdpHandshakeError(err: unknown): boolean { + if (!(err instanceof Error)) { + return false; + } + const msg = err.message.toLowerCase(); + if (!msg) { + return false; + } + if (msg.includes("rate limit")) { + return false; + } + const statusMatch = msg.match(/(?:unexpected server response|response):\s*(\d{3})/); + if (statusMatch?.[1]) { + return Number(statusMatch[1]) >= 500; + } + return ( + msg.includes("cdp socket closed") || + msg.includes("econnreset") || + msg.includes("econnrefused") || + msg.includes("econnaborted") || + msg.includes("ehostunreach") || + msg.includes("enetunreach") || + msg.includes("etimedout") || + msg.includes("socket hang up") || + msg.includes("websocket error") || + msg.includes("closed before") + ); +} + export async function withCdpSocket( wsUrl: string, fn: (send: CdpSendFn) => Promise, - opts?: { - headers?: Record; - handshakeTimeoutMs?: number; - commandTimeoutMs?: number; - }, + opts?: CdpSocketOptions, ): Promise { - const ws = openCdpWebSocket(wsUrl, opts); - const { send, closeWithError } = createCdpSender(ws, opts); + const maxHandshakeRetries = normalizeRetryCount(opts?.handshakeRetries, 2); + let lastHandshakeError: unknown; + for (let attempt = 0; attempt <= maxHandshakeRetries; attempt += 1) { + const ws = openCdpWebSocket(wsUrl, opts); + const { send, closeWithError } = createCdpSender(ws, opts); - const openPromise = new Promise((resolve, reject) => { - ws.once("open", () => resolve()); - ws.once("error", (err) => reject(err)); - ws.once("close", () => reject(new Error("CDP socket closed"))); - }); + const openPromise = new Promise((resolve, reject) => { + ws.once("open", () => resolve()); + ws.once("error", (err) => reject(err)); + ws.once("close", () => reject(new Error("CDP socket closed"))); + }); - try { - await openPromise; - } catch (err) { - // openPromise is only rejected via `ws.once('error', err => reject(err))` - // or the close event's `new Error(...)`; the former always carries an - // Error from Node's `ws` library, the latter is already an Error. The - // non-Error wrap is defensive and structurally unreachable. - /* c8 ignore next */ - closeWithError(err instanceof Error ? err : new Error(String(err))); - throw err; - } - - try { - return await fn(send); - } catch (err) { - closeWithError(err instanceof Error ? err : new Error(String(err))); - throw err; - } finally { try { - ws.close(); - } catch { - // ignore + await openPromise; + } catch (err) { + lastHandshakeError = err; + // openPromise is only rejected via `ws.once('error', err => reject(err))` + // or the close event's `new Error(...)`; the former always carries an + // Error from Node's `ws` library, the latter is already an Error. The + // non-Error wrap is defensive and structurally unreachable. + /* c8 ignore next */ + closeWithError(err instanceof Error ? err : new Error(String(err))); + try { + ws.close(); + } catch { + // ignore + } + if (attempt >= maxHandshakeRetries || !shouldRetryCdpHandshakeError(err)) { + throw err; + } + await sleep(computeHandshakeRetryDelayMs(attempt + 1, opts)); + continue; + } + + try { + return await fn(send); + } catch (err) { + closeWithError(err instanceof Error ? err : new Error(String(err))); + throw err; + } finally { + try { + ws.close(); + } catch { + // ignore + } } } + + if (lastHandshakeError instanceof Error) { + throw lastHandshakeError; + } + throw new Error("CDP socket failed to open"); } diff --git a/extensions/browser/src/browser/pw-session.connections.test.ts b/extensions/browser/src/browser/pw-session.connections.test.ts index f8771b983de..1320b46cfb7 100644 --- a/extensions/browser/src/browser/pw-session.connections.test.ts +++ b/extensions/browser/src/browser/pw-session.connections.test.ts @@ -3,6 +3,7 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import * as chromeModule from "./chrome.js"; import { closePlaywrightBrowserConnection, + createPageViaPlaywright, getPageForTargetId, listPagesViaPlaywright, } from "./pw-session.js"; @@ -64,6 +65,62 @@ function makeEmptyBrowser(): BrowserMockBundle { return { browser, browserClose }; } +function makeDisconnectedReadBrowser(): BrowserMockBundle { + let context: import("playwright-core").BrowserContext; + const browserClose = vi.fn(async () => {}); + const page = { + on: vi.fn(), + context: () => context, + title: vi.fn(async () => { + throw new Error("Target page, context or browser has been closed"); + }), + url: vi.fn(() => { + throw new Error("Target page, context or browser has been closed"); + }), + } as unknown as import("playwright-core").Page; + + context = { + pages: () => [page], + on: vi.fn(), + newCDPSession: vi.fn(async () => { + throw new Error("Target page, context or browser has been closed"); + }), + } as unknown as import("playwright-core").BrowserContext; + + const browser = { + contexts: () => [context], + on: vi.fn(), + off: vi.fn(), + close: browserClose, + } as unknown as import("playwright-core").Browser; + + return { browser, browserClose }; +} + +function makeMutatingDisconnectBrowser(): BrowserMockBundle & { + newPage: ReturnType; +} { + const browserClose = vi.fn(async () => {}); + const newPage = vi.fn(async () => { + throw new Error("Target page, context or browser has been closed"); + }); + const context = { + pages: () => [], + on: vi.fn(), + newCDPSession: vi.fn(), + newPage, + } as unknown as import("playwright-core").BrowserContext; + + const browser = { + contexts: () => [context], + on: vi.fn(), + off: vi.fn(), + close: browserClose, + } as unknown as import("playwright-core").Browser; + + return { browser, browserClose, newPage }; +} + afterEach(async () => { connectOverCdpSpy.mockReset(); getChromeWebSocketUrlSpy.mockReset(); @@ -171,4 +228,53 @@ describe("pw-session connection scoping", () => { expect(browserB.browserClose).not.toHaveBeenCalled(); expect(connectOverCdpSpy).toHaveBeenCalledTimes(3); }); + + it("reconnects listPagesViaPlaywright once after a cached transport disconnect", async () => { + const stale = makeDisconnectedReadBrowser(); + const refreshed = makeBrowser("A", "https://a.example/recovered"); + let connectCalls = 0; + + connectOverCdpSpy.mockImplementation((async (...args: unknown[]) => { + const endpointText = String(args[0]); + if (endpointText !== "http://127.0.0.1:9222") { + throw new Error(`unexpected endpoint: ${endpointText}`); + } + connectCalls += 1; + return connectCalls === 1 ? stale.browser : refreshed.browser; + }) as never); + getChromeWebSocketUrlSpy.mockResolvedValue(null); + + const pages = await listPagesViaPlaywright({ cdpUrl: "http://127.0.0.1:9222" }); + + expect(pages.map((page) => page.targetId)).toEqual(["A"]); + expect(connectOverCdpSpy).toHaveBeenCalledTimes(2); + await vi.waitFor(() => expect(stale.browserClose).toHaveBeenCalledTimes(1)); + expect(refreshed.browserClose).not.toHaveBeenCalled(); + }); + + it("does not replay mutating page creation after an ambiguous disconnect", async () => { + const stale = makeMutatingDisconnectBrowser(); + const refreshed = makeBrowser("A", "https://a.example/recovered"); + let connectCalls = 0; + + connectOverCdpSpy.mockImplementation((async (...args: unknown[]) => { + const endpointText = String(args[0]); + if (endpointText !== "http://127.0.0.1:9222") { + throw new Error(`unexpected endpoint: ${endpointText}`); + } + connectCalls += 1; + return connectCalls === 1 ? stale.browser : refreshed.browser; + }) as never); + getChromeWebSocketUrlSpy.mockResolvedValue(null); + + await expect( + createPageViaPlaywright({ + cdpUrl: "http://127.0.0.1:9222", + url: "about:blank", + }), + ).rejects.toThrow(/browser has been closed/); + + expect(stale.newPage).toHaveBeenCalledTimes(1); + expect(connectOverCdpSpy).toHaveBeenCalledTimes(1); + }); }); diff --git a/extensions/browser/src/browser/pw-session.ts b/extensions/browser/src/browser/pw-session.ts index 81d81817f97..b20ba6d05bf 100644 --- a/extensions/browser/src/browser/pw-session.ts +++ b/extensions/browser/src/browser/pw-session.ts @@ -140,6 +140,19 @@ function hasCachedPlaywrightBrowserConnection(cdpUrl: string): boolean { return cachedByCdpUrl.has(normalizeCdpUrl(cdpUrl)); } +function isRecoverablePlaywrightDisconnectError(err: unknown): boolean { + const message = formatErrorMessage(err).toLowerCase(); + return ( + message.includes("target page, context or browser has been closed") || + message.includes("browser has been closed") || + message.includes("browser disconnected") || + message.includes("target closed") || + message.includes("connection closed") || + message.includes("websocket closed") || + message.includes("cdp socket closed") + ); +} + function isRecoverableStalePageSelectionError(err: unknown, reusedCachedBrowser: boolean): boolean { if (!reusedCachedBrowser) { return false; @@ -243,6 +256,25 @@ function clearBlockedPageRef(cdpUrl: string, page: Page): void { blockedPageRefsByCdpUrl.get(normalizeCdpUrl(cdpUrl))?.delete(page); } +function takeCachedPlaywrightBrowserConnection(cdpUrl: string): ConnectedBrowser | null { + const normalized = normalizeCdpUrl(cdpUrl); + const cur = cachedByCdpUrl.get(normalized); + cachedByCdpUrl.delete(normalized); + connectingByCdpUrl.delete(normalized); + if (!cur) { + return null; + } + if (cur.onDisconnected && typeof cur.browser.off === "function") { + cur.browser.off("disconnected", cur.onDisconnected); + } + return cur; +} + +function evictStalePlaywrightBrowserConnection(cdpUrl: string): void { + const cur = takeCachedPlaywrightBrowserConnection(cdpUrl); + cur?.browser.close().catch(() => {}); +} + function hasBlockedTargetsForCdpUrl(cdpUrl: string): boolean { const prefix = `${normalizeCdpUrl(cdpUrl)}::`; for (const key of blockedTargetsByCdpUrl) { @@ -1018,15 +1050,10 @@ export async function closePlaywrightBrowserConnection(opts?: { cdpUrl?: string if (normalized) { clearBlockedTargetsForCdpUrl(normalized); clearBlockedPageRefsForCdpUrl(normalized); - const cur = cachedByCdpUrl.get(normalized); - cachedByCdpUrl.delete(normalized); - connectingByCdpUrl.delete(normalized); + const cur = takeCachedPlaywrightBrowserConnection(normalized); if (!cur) { return; } - if (cur.onDisconnected && typeof cur.browser.off === "function") { - cur.browser.off("disconnected", cur.onDisconnected); - } await cur.browser.close().catch(() => {}); return; } @@ -1152,19 +1179,10 @@ export async function forceDisconnectPlaywrightForTarget(opts: { ssrfPolicy?: SsrFPolicy; }): Promise { const normalized = normalizeCdpUrl(opts.cdpUrl); - const cur = cachedByCdpUrl.get(normalized); + const cur = takeCachedPlaywrightBrowserConnection(normalized); if (!cur) { return; } - cachedByCdpUrl.delete(normalized); - // Also clear the per-url in-flight connect so the next call does a fresh connectOverCDP - // rather than awaiting a stale promise. - connectingByCdpUrl.delete(normalized); - // Remove the "disconnected" listener to prevent the old browser's teardown - // from racing with a fresh connection and nulling the new cached entry. - if (cur.onDisconnected && typeof cur.browser.off === "function") { - cur.browser.off("disconnected", cur.onDisconnected); - } // Best-effort: kill any stuck JS to unblock the target's execution context before we // disconnect Playwright's CDP connection. @@ -1181,6 +1199,21 @@ export async function forceDisconnectPlaywrightForTarget(opts: { cur.browser.close().catch(() => {}); } +async function withPlaywrightSafeReadReconnect( + cdpUrl: string, + run: () => Promise, +): Promise { + try { + return await run(); + } catch (err) { + if (!isRecoverablePlaywrightDisconnectError(err)) { + throw err; + } + evictStalePlaywrightBrowserConnection(cdpUrl); + return await run(); + } +} + /** * List all pages/tabs from the persistent Playwright connection. * Used for remote profiles where HTTP-based /json/list is ephemeral. @@ -1196,30 +1229,56 @@ export async function listPagesViaPlaywright(opts: { type: string; }> > { - const { browser } = await connectBrowser(opts.cdpUrl, opts.ssrfPolicy); - const pages = await getAllPages(browser); - const results: Array<{ - targetId: string; - title: string; - url: string; - type: string; - }> = []; + return await withPlaywrightSafeReadReconnect(opts.cdpUrl, async () => { + const { browser } = await connectBrowser(opts.cdpUrl, opts.ssrfPolicy); + const pages = await getAllPages(browser); + const results: Array<{ + targetId: string; + title: string; + url: string; + type: string; + }> = []; - for (const page of pages) { - if (isBlockedPageRef(opts.cdpUrl, page)) { - continue; + for (const page of pages) { + if (isBlockedPageRef(opts.cdpUrl, page)) { + continue; + } + let tid: string | null; + try { + tid = await pageTargetId(page); + } catch (err) { + if (isRecoverablePlaywrightDisconnectError(err)) { + throw err; + } + tid = null; + } + if (tid && !isBlockedTarget(opts.cdpUrl, tid)) { + let title = ""; + try { + title = await page.title(); + } catch (err) { + if (isRecoverablePlaywrightDisconnectError(err)) { + throw err; + } + } + let url = ""; + try { + url = page.url(); + } catch (err) { + if (isRecoverablePlaywrightDisconnectError(err)) { + throw err; + } + } + results.push({ + targetId: tid, + title, + url, + type: "page", + }); + } } - const tid = await pageTargetId(page).catch(() => null); - if (tid && !isBlockedTarget(opts.cdpUrl, tid)) { - results.push({ - targetId: tid, - title: await page.title().catch(() => ""), - url: page.url(), - type: "page", - }); - } - } - return results; + return results; + }); } /**