From 40a2600544d379ec2c13aeb0de7d2fdb0b5ceb83 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Wed, 27 May 2026 10:50:44 +0200 Subject: [PATCH] fix(test): bound codex media path log polling --- scripts/e2e/lib/codex-media-path/client.mjs | 38 ++---- .../codex-media-path/jsonl-request-tail.mjs | 105 +++++++++++++++ .../lib/codex-media-path/open-websocket.mjs | 40 ++++++ test/scripts/codex-media-path-client.test.ts | 127 ++++++++++++++++++ 4 files changed, 286 insertions(+), 24 deletions(-) create mode 100644 scripts/e2e/lib/codex-media-path/jsonl-request-tail.mjs create mode 100644 scripts/e2e/lib/codex-media-path/open-websocket.mjs create mode 100644 test/scripts/codex-media-path-client.test.ts diff --git a/scripts/e2e/lib/codex-media-path/client.mjs b/scripts/e2e/lib/codex-media-path/client.mjs index 124e47fbf85..c6b52061afc 100644 --- a/scripts/e2e/lib/codex-media-path/client.mjs +++ b/scripts/e2e/lib/codex-media-path/client.mjs @@ -1,9 +1,10 @@ import { createHash, randomBytes, randomUUID } from "node:crypto"; -import fs from "node:fs"; import { setTimeout as delay } from "node:timers/promises"; import { WebSocket } from "ws"; import { PROTOCOL_VERSION } from "../../../../dist/gateway/protocol/index.js"; import { renderBitmapTextPngBase64 } from "../../../../test/helpers/live-image-probe.ts"; +import { createJsonlRequestTailer } from "./jsonl-request-tail.mjs"; +import { waitForWebSocketOpen } from "./open-websocket.mjs"; const port = process.env.PORT; const token = process.env.OPENCLAW_GATEWAY_TOKEN; @@ -14,6 +15,10 @@ const timeoutSeconds = Number.parseInt( process.env.OPENCLAW_CODEX_MEDIA_PATH_TIMEOUT_SECONDS ?? "180", 10, ); +const logTailMaxBytes = Number.parseInt( + process.env.OPENCLAW_CODEX_MEDIA_PATH_LOG_TAIL_MAX_BYTES ?? `${2 * 1024 * 1024}`, + 10, +); if (!port || !token) { throw new Error("missing PORT/OPENCLAW_GATEWAY_TOKEN"); @@ -29,16 +34,12 @@ function sha256Base64(data) { return createHash("sha256").update(Buffer.from(data, "base64")).digest("hex"); } -function readLoggedRequests() { - if (!fs.existsSync(appServerLog)) { - return []; - } - return fs - .readFileSync(appServerLog, "utf8") - .split("\n") - .filter(Boolean) - .map((line) => JSON.parse(line)); -} +const loggedRequests = createJsonlRequestTailer(appServerLog, { + maxReadBytes: + Number.isSafeInteger(logTailMaxBytes) && logTailMaxBytes > 0 + ? logTailMaxBytes + : 2 * 1024 * 1024, +}); async function waitFor(label, predicate, timeoutMs) { const started = Date.now(); @@ -67,18 +68,7 @@ function wsDataToString(data) { async function connectGateway() { const ws = new WebSocket(`ws://127.0.0.1:${port}`); - await new Promise((resolve, reject) => { - const timer = setTimeout(() => reject(new Error("gateway ws open timeout")), 45_000); - timer.unref?.(); - ws.once("open", () => { - clearTimeout(timer); - resolve(); - }); - ws.once("error", (error) => { - clearTimeout(timer); - reject(error); - }); - }); + await waitForWebSocketOpen(ws, 45_000, "gateway ws open timeout"); const events = []; const pending = new Map(); @@ -220,7 +210,7 @@ try { const turnRequest = await waitFor( "Codex turn/start image input", () => - readLoggedRequests().find((request) => { + loggedRequests.read().find((request) => { if (request.method !== "turn/start") { return undefined; } diff --git a/scripts/e2e/lib/codex-media-path/jsonl-request-tail.mjs b/scripts/e2e/lib/codex-media-path/jsonl-request-tail.mjs new file mode 100644 index 00000000000..a4ff17199b5 --- /dev/null +++ b/scripts/e2e/lib/codex-media-path/jsonl-request-tail.mjs @@ -0,0 +1,105 @@ +import fs from "node:fs"; + +const DEFAULT_MAX_READ_BYTES = 2 * 1024 * 1024; +const DEFAULT_HISTORY_LIMIT = 1024; + +function positiveInteger(value, fallback) { + return Number.isSafeInteger(value) && value > 0 ? value : fallback; +} + +function readSlice(filePath, start, length) { + if (length <= 0) { + return ""; + } + const fd = fs.openSync(filePath, "r"); + try { + const buffer = Buffer.allocUnsafe(length); + const bytesRead = fs.readSync(fd, buffer, 0, length, start); + return buffer.subarray(0, bytesRead).toString("utf8"); + } finally { + fs.closeSync(fd); + } +} + +export function createJsonlRequestTailer(filePath, options = {}) { + const maxReadBytes = positiveInteger(options.maxReadBytes, DEFAULT_MAX_READ_BYTES); + const historyLimit = positiveInteger(options.historyLimit, DEFAULT_HISTORY_LIMIT); + let offset = 0; + let pending = ""; + let requests = []; + + function parseLine(line) { + try { + return JSON.parse(line); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + throw new Error(`invalid app-server JSONL at ${filePath}: ${message}`); + } + } + + return { + read() { + if (!fs.existsSync(filePath)) { + return requests; + } + + const stats = fs.statSync(filePath); + if (!stats.isFile()) { + return requests; + } + if (stats.size < offset) { + offset = 0; + pending = ""; + requests = []; + } + if (stats.size === offset) { + return requests; + } + + let start = offset; + let discardFirstLine = false; + let clamped = false; + if (start === 0 && stats.size > maxReadBytes) { + start = stats.size - maxReadBytes; + pending = ""; + clamped = true; + } else if (stats.size - start > maxReadBytes) { + start = stats.size - maxReadBytes; + pending = ""; + clamped = true; + } + if (clamped && start > 0) { + discardFirstLine = readSlice(filePath, start - 1, 1) !== "\n"; + } + + const text = readSlice(filePath, start, stats.size - start); + offset = stats.size; + if (!text) { + return requests; + } + + let chunk = pending + text; + if (discardFirstLine) { + const newlineIndex = chunk.indexOf("\n"); + if (newlineIndex === -1) { + pending = ""; + return requests; + } + chunk = chunk.slice(newlineIndex + 1); + } + + const lines = chunk.split("\n"); + pending = lines.pop() ?? ""; + for (const line of lines) { + if (!line.trim()) { + continue; + } + requests.push(parseLine(line)); + } + if (requests.length > historyLimit) { + requests = requests.slice(-historyLimit); + } + return requests; + }, + }; +} diff --git a/scripts/e2e/lib/codex-media-path/open-websocket.mjs b/scripts/e2e/lib/codex-media-path/open-websocket.mjs new file mode 100644 index 00000000000..956ea3d6cbb --- /dev/null +++ b/scripts/e2e/lib/codex-media-path/open-websocket.mjs @@ -0,0 +1,40 @@ +export function waitForWebSocketOpen(ws, timeoutMs, message = "gateway ws open timeout") { + return new Promise((resolve, reject) => { + let settled = false; + + const settle = (fn, value) => { + if (settled) { + return; + } + settled = true; + clearTimeout(timer); + ws.off?.("open", onOpen); + ws.off?.("error", onError); + fn(value); + }; + const onOpen = () => settle(resolve); + const onError = (error) => settle(reject, error); + const timer = setTimeout(() => { + const consumeAbortError = () => {}; + const removeAbortErrorConsumer = () => { + ws.off?.("error", consumeAbortError); + ws.off?.("close", removeAbortErrorConsumer); + }; + try { + ws.off?.("error", onError); + ws.on?.("error", consumeAbortError); + ws.once?.("close", removeAbortErrorConsumer); + ws.terminate?.(); + if (typeof ws.terminate !== "function") { + ws.close?.(); + } + } finally { + settle(reject, new Error(message)); + } + }, timeoutMs); + + timer.unref?.(); + ws.once("open", onOpen); + ws.once("error", onError); + }); +} diff --git a/test/scripts/codex-media-path-client.test.ts b/test/scripts/codex-media-path-client.test.ts new file mode 100644 index 00000000000..3430dde62db --- /dev/null +++ b/test/scripts/codex-media-path-client.test.ts @@ -0,0 +1,127 @@ +import { EventEmitter } from "node:events"; +import { appendFileSync, mkdtempSync, rmSync, writeFileSync } from "node:fs"; +import { tmpdir } from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, it } from "vitest"; +import { createJsonlRequestTailer } from "../../scripts/e2e/lib/codex-media-path/jsonl-request-tail.mjs"; +import { waitForWebSocketOpen } from "../../scripts/e2e/lib/codex-media-path/open-websocket.mjs"; + +const tempRoots: string[] = []; + +function makeTempRoot(): string { + const root = mkdtempSync(path.join(tmpdir(), "openclaw-codex-media-path-")); + tempRoots.push(root); + return root; +} + +function jsonl(value: unknown): string { + return `${JSON.stringify(value)}\n`; +} + +class FakeWebSocket extends EventEmitter { + terminated = false; + closed = false; + + terminate(): void { + this.terminated = true; + queueMicrotask(() => { + this.emit("error", new Error("socket abort after terminate")); + this.emit("close"); + }); + } + + close(): void { + this.closed = true; + } +} + +afterEach(() => { + for (const root of tempRoots.splice(0)) { + rmSync(root, { recursive: true, force: true }); + } +}); + +describe("codex media path JSONL tailer", () => { + it("keeps parsed app-server requests and reads only appended lines", () => { + const logPath = path.join(makeTempRoot(), "app-server.jsonl"); + const tailer = createJsonlRequestTailer(logPath, { maxReadBytes: 1024, historyLimit: 10 }); + + expect(tailer.read()).toEqual([]); + + writeFileSync(logPath, jsonl({ method: "initialize" })); + expect(tailer.read()).toEqual([{ method: "initialize" }]); + + appendFileSync(logPath, JSON.stringify({ method: "turn/start" })); + expect(tailer.read()).toEqual([{ method: "initialize" }]); + + appendFileSync(logPath, "\n"); + expect(tailer.read()).toEqual([{ method: "initialize" }, { method: "turn/start" }]); + }); + + it("starts from a bounded tail of oversized logs", () => { + const logPath = path.join(makeTempRoot(), "app-server.jsonl"); + const lastLine = jsonl({ method: "turn/start" }); + writeFileSync(logPath, `${"x".repeat(256)}\n${jsonl({ method: "old" })}${lastLine}`); + + const tailer = createJsonlRequestTailer(logPath, { + maxReadBytes: lastLine.length + 2, + historyLimit: 10, + }); + + expect(tailer.read()).toEqual([{ method: "turn/start" }]); + }); + + it("keeps a complete line when the bounded tail starts on its boundary", () => { + const logPath = path.join(makeTempRoot(), "app-server.jsonl"); + const lastLine = jsonl({ method: "turn/start" }); + writeFileSync(logPath, `${"x".repeat(256)}\n${lastLine}`); + + const tailer = createJsonlRequestTailer(logPath, { + maxReadBytes: lastLine.length, + historyLimit: 10, + }); + + expect(tailer.read()).toEqual([{ method: "turn/start" }]); + }); + + it("resets request history when the app-server log is truncated", () => { + const logPath = path.join(makeTempRoot(), "app-server.jsonl"); + const tailer = createJsonlRequestTailer(logPath, { maxReadBytes: 1024, historyLimit: 10 }); + + writeFileSync(logPath, jsonl({ method: "initialize", payload: "long enough to rotate" })); + expect(tailer.read()).toEqual([{ method: "initialize", payload: "long enough to rotate" }]); + + writeFileSync(logPath, jsonl({ method: "turn/start" })); + expect(tailer.read()).toEqual([{ method: "turn/start" }]); + }); +}); + +describe("codex media path WebSocket open guard", () => { + it("terminates sockets that never open", async () => { + const ws = new FakeWebSocket(); + const keepAlive = setTimeout(() => {}, 100); + + try { + await expect(waitForWebSocketOpen(ws, 1)).rejects.toThrow("gateway ws open timeout"); + } finally { + clearTimeout(keepAlive); + } + + expect(ws.terminated).toBe(true); + await new Promise((resolve) => setImmediate(resolve)); + expect(ws.listenerCount("open")).toBe(0); + expect(ws.listenerCount("error")).toBe(0); + }); + + it("cleans listeners after successful opens", async () => { + const ws = new FakeWebSocket(); + const opened = waitForWebSocketOpen(ws, 100); + + ws.emit("open"); + + await expect(opened).resolves.toBeUndefined(); + expect(ws.terminated).toBe(false); + expect(ws.listenerCount("open")).toBe(0); + expect(ws.listenerCount("error")).toBe(0); + }); +});