fix(browser): recover stale remote CDP reads safely

This commit is contained in:
Peter Steinberger
2026-04-26 06:35:12 +01:00
parent eca9f46824
commit 142577d9b2
5 changed files with 395 additions and 74 deletions

View File

@@ -75,6 +75,10 @@ Docs: https://docs.openclaw.ai
- Agents/runtime: submit heartbeat, cron, and exec wakeups as transient runtime context instead of visible user prompts, keeping synthetic system work out of chat transcripts. Fixes #66496 and #66814. Thanks @jeades and @mandomaker.
- Telegram: include native quote excerpts automatically for threaded replies and reply tags when the original Telegram text is available, without adding another config knob. Fixes #6975. Thanks @rex05ai.
- Node/Linux: make `openclaw node install` enable and restart the `openclaw-node` systemd unit instead of the gateway unit on node-only VMs. Fixes #68287. Thanks @dlebee-agent.
- Browser/CDP: retry transient raw-CDP WebSocket handshake failures before any
browser command is sent, and reconnect stale persistent Playwright CDP
sessions for safe tab-list reads without replaying mutating browser actions.
Fixes #67728.
- Telegram: preserve exact selected quote text when sending native quote replies, and retry with legacy replies if Telegram rejects quote parameters. (#71952) Thanks @rubencu.
- Plugins/CLI: preserve manifest name, description, format, and source metadata in cold `openclaw plugins list` output without importing plugin runtime. Thanks @shakkernerd.
- Security/audit: read channel exposure and plugin allowlist ownership from read-only plugin index metadata so cold audits do not depend on loaded channel runtime. Thanks @shakkernerd.

View File

@@ -234,17 +234,90 @@ describe("cdp.helpers internal", () => {
it("rejects in-flight pending calls when the socket closes mid-call", async () => {
const server = await startWsServer();
wss = server.wss;
let callbackCount = 0;
let connectionCount = 0;
server.wss.on("connection", (socket) => {
connectionCount += 1;
socket.on("message", () => {
// Defer close so the pending entry is definitely registered.
setTimeout(() => socket.close(), 10);
});
});
await expect(
withCdpSocket(server.url, async (send) => {
await send("Test.willClose");
}),
withCdpSocket(
server.url,
async (send) => {
callbackCount += 1;
await send("Test.willClose");
},
{ handshakeRetries: 2, handshakeRetryDelayMs: 1, handshakeMaxRetryDelayMs: 1 },
),
).rejects.toThrow(/CDP socket closed/);
expect(callbackCount).toBe(1);
expect(connectionCount).toBe(1);
});
it("retries websocket failures before any CDP command is sent", async () => {
let rejectedHandshakes = 0;
wss = new WebSocketServer({
port: 0,
host: "127.0.0.1",
verifyClient: (_info, cb) => {
if (rejectedHandshakes === 0) {
rejectedHandshakes += 1;
cb(false, 503, "try later");
return;
}
cb(true);
},
});
await new Promise<void>((resolve) => wss?.once("listening", () => resolve()));
const port = (wss.address() as { port: number }).port;
let callbackCount = 0;
wss.on("connection", (socket) => {
socket.on("message", (raw) => {
const msg = JSON.parse(rawDataToString(raw)) as { id?: number; method?: string };
socket.send(JSON.stringify({ id: msg.id, result: { echoed: msg.method } }));
});
});
const result = await withCdpSocket<{ echoed?: string }>(
`ws://127.0.0.1:${port}/devtools/browser/TEST`,
async (send) => {
callbackCount += 1;
return (await send("Test.afterOpen")) as { echoed?: string };
},
{ handshakeRetries: 2, handshakeRetryDelayMs: 1, handshakeMaxRetryDelayMs: 1 },
);
expect(result.echoed).toBe("Test.afterOpen");
expect(rejectedHandshakes).toBe(1);
expect(callbackCount).toBe(1);
});
it("does not retry rate-limited websocket handshakes", async () => {
let rejectedHandshakes = 0;
wss = new WebSocketServer({
port: 0,
host: "127.0.0.1",
verifyClient: (_info, cb) => {
rejectedHandshakes += 1;
cb(false, 429, "too many requests");
},
});
await new Promise<void>((resolve) => wss?.once("listening", () => resolve()));
const port = (wss.address() as { port: number }).port;
await expect(
withCdpSocket(
`ws://127.0.0.1:${port}/devtools/browser/TEST`,
async (send) => {
await send("Test.neverRuns");
},
{ handshakeRetries: 2, handshakeRetryDelayMs: 1, handshakeMaxRetryDelayMs: 1 },
),
).rejects.toThrow(/429/);
expect(rejectedHandshakes).toBe(1);
});
it("rejects and closes the socket when a CDP command exceeds its timeout", async () => {

View File

@@ -409,46 +409,125 @@ export function openCdpWebSocket(
});
}
type CdpSocketOptions = {
headers?: Record<string, string>;
handshakeTimeoutMs?: number;
commandTimeoutMs?: number;
handshakeRetries?: number;
handshakeRetryDelayMs?: number;
handshakeMaxRetryDelayMs?: number;
};
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function normalizeRetryCount(value: number | undefined, fallback: number): number {
if (typeof value !== "number" || !Number.isFinite(value)) {
return fallback;
}
return Math.max(0, Math.floor(value));
}
function computeHandshakeRetryDelayMs(attempt: number, opts?: CdpSocketOptions): number {
const baseDelayMs =
typeof opts?.handshakeRetryDelayMs === "number" && Number.isFinite(opts.handshakeRetryDelayMs)
? Math.max(1, Math.floor(opts.handshakeRetryDelayMs))
: 200;
const maxDelayMs =
typeof opts?.handshakeMaxRetryDelayMs === "number" &&
Number.isFinite(opts.handshakeMaxRetryDelayMs)
? Math.max(baseDelayMs, Math.floor(opts.handshakeMaxRetryDelayMs))
: 3000;
const raw = Math.min(maxDelayMs, baseDelayMs * 2 ** Math.max(0, attempt - 1));
const jitterScale = 0.8 + Math.random() * 0.4;
return Math.max(1, Math.floor(raw * jitterScale));
}
function shouldRetryCdpHandshakeError(err: unknown): boolean {
if (!(err instanceof Error)) {
return false;
}
const msg = err.message.toLowerCase();
if (!msg) {
return false;
}
if (msg.includes("rate limit")) {
return false;
}
const statusMatch = msg.match(/(?:unexpected server response|response):\s*(\d{3})/);
if (statusMatch?.[1]) {
return Number(statusMatch[1]) >= 500;
}
return (
msg.includes("cdp socket closed") ||
msg.includes("econnreset") ||
msg.includes("econnrefused") ||
msg.includes("econnaborted") ||
msg.includes("ehostunreach") ||
msg.includes("enetunreach") ||
msg.includes("etimedout") ||
msg.includes("socket hang up") ||
msg.includes("websocket error") ||
msg.includes("closed before")
);
}
export async function withCdpSocket<T>(
wsUrl: string,
fn: (send: CdpSendFn) => Promise<T>,
opts?: {
headers?: Record<string, string>;
handshakeTimeoutMs?: number;
commandTimeoutMs?: number;
},
opts?: CdpSocketOptions,
): Promise<T> {
const ws = openCdpWebSocket(wsUrl, opts);
const { send, closeWithError } = createCdpSender(ws, opts);
const maxHandshakeRetries = normalizeRetryCount(opts?.handshakeRetries, 2);
let lastHandshakeError: unknown;
for (let attempt = 0; attempt <= maxHandshakeRetries; attempt += 1) {
const ws = openCdpWebSocket(wsUrl, opts);
const { send, closeWithError } = createCdpSender(ws, opts);
const openPromise = new Promise<void>((resolve, reject) => {
ws.once("open", () => resolve());
ws.once("error", (err) => reject(err));
ws.once("close", () => reject(new Error("CDP socket closed")));
});
const openPromise = new Promise<void>((resolve, reject) => {
ws.once("open", () => resolve());
ws.once("error", (err) => reject(err));
ws.once("close", () => reject(new Error("CDP socket closed")));
});
try {
await openPromise;
} catch (err) {
// openPromise is only rejected via `ws.once('error', err => reject(err))`
// or the close event's `new Error(...)`; the former always carries an
// Error from Node's `ws` library, the latter is already an Error. The
// non-Error wrap is defensive and structurally unreachable.
/* c8 ignore next */
closeWithError(err instanceof Error ? err : new Error(String(err)));
throw err;
}
try {
return await fn(send);
} catch (err) {
closeWithError(err instanceof Error ? err : new Error(String(err)));
throw err;
} finally {
try {
ws.close();
} catch {
// ignore
await openPromise;
} catch (err) {
lastHandshakeError = err;
// openPromise is only rejected via `ws.once('error', err => reject(err))`
// or the close event's `new Error(...)`; the former always carries an
// Error from Node's `ws` library, the latter is already an Error. The
// non-Error wrap is defensive and structurally unreachable.
/* c8 ignore next */
closeWithError(err instanceof Error ? err : new Error(String(err)));
try {
ws.close();
} catch {
// ignore
}
if (attempt >= maxHandshakeRetries || !shouldRetryCdpHandshakeError(err)) {
throw err;
}
await sleep(computeHandshakeRetryDelayMs(attempt + 1, opts));
continue;
}
try {
return await fn(send);
} catch (err) {
closeWithError(err instanceof Error ? err : new Error(String(err)));
throw err;
} finally {
try {
ws.close();
} catch {
// ignore
}
}
}
if (lastHandshakeError instanceof Error) {
throw lastHandshakeError;
}
throw new Error("CDP socket failed to open");
}

View File

@@ -3,6 +3,7 @@ import { afterEach, describe, expect, it, vi } from "vitest";
import * as chromeModule from "./chrome.js";
import {
closePlaywrightBrowserConnection,
createPageViaPlaywright,
getPageForTargetId,
listPagesViaPlaywright,
} from "./pw-session.js";
@@ -64,6 +65,62 @@ function makeEmptyBrowser(): BrowserMockBundle {
return { browser, browserClose };
}
function makeDisconnectedReadBrowser(): BrowserMockBundle {
let context: import("playwright-core").BrowserContext;
const browserClose = vi.fn(async () => {});
const page = {
on: vi.fn(),
context: () => context,
title: vi.fn(async () => {
throw new Error("Target page, context or browser has been closed");
}),
url: vi.fn(() => {
throw new Error("Target page, context or browser has been closed");
}),
} as unknown as import("playwright-core").Page;
context = {
pages: () => [page],
on: vi.fn(),
newCDPSession: vi.fn(async () => {
throw new Error("Target page, context or browser has been closed");
}),
} as unknown as import("playwright-core").BrowserContext;
const browser = {
contexts: () => [context],
on: vi.fn(),
off: vi.fn(),
close: browserClose,
} as unknown as import("playwright-core").Browser;
return { browser, browserClose };
}
function makeMutatingDisconnectBrowser(): BrowserMockBundle & {
newPage: ReturnType<typeof vi.fn>;
} {
const browserClose = vi.fn(async () => {});
const newPage = vi.fn(async () => {
throw new Error("Target page, context or browser has been closed");
});
const context = {
pages: () => [],
on: vi.fn(),
newCDPSession: vi.fn(),
newPage,
} as unknown as import("playwright-core").BrowserContext;
const browser = {
contexts: () => [context],
on: vi.fn(),
off: vi.fn(),
close: browserClose,
} as unknown as import("playwright-core").Browser;
return { browser, browserClose, newPage };
}
afterEach(async () => {
connectOverCdpSpy.mockReset();
getChromeWebSocketUrlSpy.mockReset();
@@ -171,4 +228,53 @@ describe("pw-session connection scoping", () => {
expect(browserB.browserClose).not.toHaveBeenCalled();
expect(connectOverCdpSpy).toHaveBeenCalledTimes(3);
});
it("reconnects listPagesViaPlaywright once after a cached transport disconnect", async () => {
const stale = makeDisconnectedReadBrowser();
const refreshed = makeBrowser("A", "https://a.example/recovered");
let connectCalls = 0;
connectOverCdpSpy.mockImplementation((async (...args: unknown[]) => {
const endpointText = String(args[0]);
if (endpointText !== "http://127.0.0.1:9222") {
throw new Error(`unexpected endpoint: ${endpointText}`);
}
connectCalls += 1;
return connectCalls === 1 ? stale.browser : refreshed.browser;
}) as never);
getChromeWebSocketUrlSpy.mockResolvedValue(null);
const pages = await listPagesViaPlaywright({ cdpUrl: "http://127.0.0.1:9222" });
expect(pages.map((page) => page.targetId)).toEqual(["A"]);
expect(connectOverCdpSpy).toHaveBeenCalledTimes(2);
await vi.waitFor(() => expect(stale.browserClose).toHaveBeenCalledTimes(1));
expect(refreshed.browserClose).not.toHaveBeenCalled();
});
it("does not replay mutating page creation after an ambiguous disconnect", async () => {
const stale = makeMutatingDisconnectBrowser();
const refreshed = makeBrowser("A", "https://a.example/recovered");
let connectCalls = 0;
connectOverCdpSpy.mockImplementation((async (...args: unknown[]) => {
const endpointText = String(args[0]);
if (endpointText !== "http://127.0.0.1:9222") {
throw new Error(`unexpected endpoint: ${endpointText}`);
}
connectCalls += 1;
return connectCalls === 1 ? stale.browser : refreshed.browser;
}) as never);
getChromeWebSocketUrlSpy.mockResolvedValue(null);
await expect(
createPageViaPlaywright({
cdpUrl: "http://127.0.0.1:9222",
url: "about:blank",
}),
).rejects.toThrow(/browser has been closed/);
expect(stale.newPage).toHaveBeenCalledTimes(1);
expect(connectOverCdpSpy).toHaveBeenCalledTimes(1);
});
});

View File

@@ -140,6 +140,19 @@ function hasCachedPlaywrightBrowserConnection(cdpUrl: string): boolean {
return cachedByCdpUrl.has(normalizeCdpUrl(cdpUrl));
}
function isRecoverablePlaywrightDisconnectError(err: unknown): boolean {
const message = formatErrorMessage(err).toLowerCase();
return (
message.includes("target page, context or browser has been closed") ||
message.includes("browser has been closed") ||
message.includes("browser disconnected") ||
message.includes("target closed") ||
message.includes("connection closed") ||
message.includes("websocket closed") ||
message.includes("cdp socket closed")
);
}
function isRecoverableStalePageSelectionError(err: unknown, reusedCachedBrowser: boolean): boolean {
if (!reusedCachedBrowser) {
return false;
@@ -243,6 +256,25 @@ function clearBlockedPageRef(cdpUrl: string, page: Page): void {
blockedPageRefsByCdpUrl.get(normalizeCdpUrl(cdpUrl))?.delete(page);
}
function takeCachedPlaywrightBrowserConnection(cdpUrl: string): ConnectedBrowser | null {
const normalized = normalizeCdpUrl(cdpUrl);
const cur = cachedByCdpUrl.get(normalized);
cachedByCdpUrl.delete(normalized);
connectingByCdpUrl.delete(normalized);
if (!cur) {
return null;
}
if (cur.onDisconnected && typeof cur.browser.off === "function") {
cur.browser.off("disconnected", cur.onDisconnected);
}
return cur;
}
function evictStalePlaywrightBrowserConnection(cdpUrl: string): void {
const cur = takeCachedPlaywrightBrowserConnection(cdpUrl);
cur?.browser.close().catch(() => {});
}
function hasBlockedTargetsForCdpUrl(cdpUrl: string): boolean {
const prefix = `${normalizeCdpUrl(cdpUrl)}::`;
for (const key of blockedTargetsByCdpUrl) {
@@ -1018,15 +1050,10 @@ export async function closePlaywrightBrowserConnection(opts?: { cdpUrl?: string
if (normalized) {
clearBlockedTargetsForCdpUrl(normalized);
clearBlockedPageRefsForCdpUrl(normalized);
const cur = cachedByCdpUrl.get(normalized);
cachedByCdpUrl.delete(normalized);
connectingByCdpUrl.delete(normalized);
const cur = takeCachedPlaywrightBrowserConnection(normalized);
if (!cur) {
return;
}
if (cur.onDisconnected && typeof cur.browser.off === "function") {
cur.browser.off("disconnected", cur.onDisconnected);
}
await cur.browser.close().catch(() => {});
return;
}
@@ -1152,19 +1179,10 @@ export async function forceDisconnectPlaywrightForTarget(opts: {
ssrfPolicy?: SsrFPolicy;
}): Promise<void> {
const normalized = normalizeCdpUrl(opts.cdpUrl);
const cur = cachedByCdpUrl.get(normalized);
const cur = takeCachedPlaywrightBrowserConnection(normalized);
if (!cur) {
return;
}
cachedByCdpUrl.delete(normalized);
// Also clear the per-url in-flight connect so the next call does a fresh connectOverCDP
// rather than awaiting a stale promise.
connectingByCdpUrl.delete(normalized);
// Remove the "disconnected" listener to prevent the old browser's teardown
// from racing with a fresh connection and nulling the new cached entry.
if (cur.onDisconnected && typeof cur.browser.off === "function") {
cur.browser.off("disconnected", cur.onDisconnected);
}
// Best-effort: kill any stuck JS to unblock the target's execution context before we
// disconnect Playwright's CDP connection.
@@ -1181,6 +1199,21 @@ export async function forceDisconnectPlaywrightForTarget(opts: {
cur.browser.close().catch(() => {});
}
async function withPlaywrightSafeReadReconnect<T>(
cdpUrl: string,
run: () => Promise<T>,
): Promise<T> {
try {
return await run();
} catch (err) {
if (!isRecoverablePlaywrightDisconnectError(err)) {
throw err;
}
evictStalePlaywrightBrowserConnection(cdpUrl);
return await run();
}
}
/**
* List all pages/tabs from the persistent Playwright connection.
* Used for remote profiles where HTTP-based /json/list is ephemeral.
@@ -1196,30 +1229,56 @@ export async function listPagesViaPlaywright(opts: {
type: string;
}>
> {
const { browser } = await connectBrowser(opts.cdpUrl, opts.ssrfPolicy);
const pages = await getAllPages(browser);
const results: Array<{
targetId: string;
title: string;
url: string;
type: string;
}> = [];
return await withPlaywrightSafeReadReconnect(opts.cdpUrl, async () => {
const { browser } = await connectBrowser(opts.cdpUrl, opts.ssrfPolicy);
const pages = await getAllPages(browser);
const results: Array<{
targetId: string;
title: string;
url: string;
type: string;
}> = [];
for (const page of pages) {
if (isBlockedPageRef(opts.cdpUrl, page)) {
continue;
for (const page of pages) {
if (isBlockedPageRef(opts.cdpUrl, page)) {
continue;
}
let tid: string | null;
try {
tid = await pageTargetId(page);
} catch (err) {
if (isRecoverablePlaywrightDisconnectError(err)) {
throw err;
}
tid = null;
}
if (tid && !isBlockedTarget(opts.cdpUrl, tid)) {
let title = "";
try {
title = await page.title();
} catch (err) {
if (isRecoverablePlaywrightDisconnectError(err)) {
throw err;
}
}
let url = "";
try {
url = page.url();
} catch (err) {
if (isRecoverablePlaywrightDisconnectError(err)) {
throw err;
}
}
results.push({
targetId: tid,
title,
url,
type: "page",
});
}
}
const tid = await pageTargetId(page).catch(() => null);
if (tid && !isBlockedTarget(opts.cdpUrl, tid)) {
results.push({
targetId: tid,
title: await page.title().catch(() => ""),
url: page.url(),
type: "page",
});
}
}
return results;
return results;
});
}
/**