fix(test): bound codex media path log polling

This commit is contained in:
Vincent Koc
2026-05-27 10:50:44 +02:00
parent 98c0ad8b42
commit 40a2600544
4 changed files with 286 additions and 24 deletions

View File

@@ -1,9 +1,10 @@
import { createHash, randomBytes, randomUUID } from "node:crypto";
import fs from "node:fs";
import { setTimeout as delay } from "node:timers/promises";
import { WebSocket } from "ws";
import { PROTOCOL_VERSION } from "../../../../dist/gateway/protocol/index.js";
import { renderBitmapTextPngBase64 } from "../../../../test/helpers/live-image-probe.ts";
import { createJsonlRequestTailer } from "./jsonl-request-tail.mjs";
import { waitForWebSocketOpen } from "./open-websocket.mjs";
const port = process.env.PORT;
const token = process.env.OPENCLAW_GATEWAY_TOKEN;
@@ -14,6 +15,10 @@ const timeoutSeconds = Number.parseInt(
process.env.OPENCLAW_CODEX_MEDIA_PATH_TIMEOUT_SECONDS ?? "180",
10,
);
const logTailMaxBytes = Number.parseInt(
process.env.OPENCLAW_CODEX_MEDIA_PATH_LOG_TAIL_MAX_BYTES ?? `${2 * 1024 * 1024}`,
10,
);
if (!port || !token) {
throw new Error("missing PORT/OPENCLAW_GATEWAY_TOKEN");
@@ -29,16 +34,12 @@ function sha256Base64(data) {
return createHash("sha256").update(Buffer.from(data, "base64")).digest("hex");
}
function readLoggedRequests() {
if (!fs.existsSync(appServerLog)) {
return [];
}
return fs
.readFileSync(appServerLog, "utf8")
.split("\n")
.filter(Boolean)
.map((line) => JSON.parse(line));
}
const loggedRequests = createJsonlRequestTailer(appServerLog, {
maxReadBytes:
Number.isSafeInteger(logTailMaxBytes) && logTailMaxBytes > 0
? logTailMaxBytes
: 2 * 1024 * 1024,
});
async function waitFor(label, predicate, timeoutMs) {
const started = Date.now();
@@ -67,18 +68,7 @@ function wsDataToString(data) {
async function connectGateway() {
const ws = new WebSocket(`ws://127.0.0.1:${port}`);
await new Promise((resolve, reject) => {
const timer = setTimeout(() => reject(new Error("gateway ws open timeout")), 45_000);
timer.unref?.();
ws.once("open", () => {
clearTimeout(timer);
resolve();
});
ws.once("error", (error) => {
clearTimeout(timer);
reject(error);
});
});
await waitForWebSocketOpen(ws, 45_000, "gateway ws open timeout");
const events = [];
const pending = new Map();
@@ -220,7 +210,7 @@ try {
const turnRequest = await waitFor(
"Codex turn/start image input",
() =>
readLoggedRequests().find((request) => {
loggedRequests.read().find((request) => {
if (request.method !== "turn/start") {
return undefined;
}

View File

@@ -0,0 +1,105 @@
import fs from "node:fs";
const DEFAULT_MAX_READ_BYTES = 2 * 1024 * 1024;
const DEFAULT_HISTORY_LIMIT = 1024;
function positiveInteger(value, fallback) {
return Number.isSafeInteger(value) && value > 0 ? value : fallback;
}
function readSlice(filePath, start, length) {
if (length <= 0) {
return "";
}
const fd = fs.openSync(filePath, "r");
try {
const buffer = Buffer.allocUnsafe(length);
const bytesRead = fs.readSync(fd, buffer, 0, length, start);
return buffer.subarray(0, bytesRead).toString("utf8");
} finally {
fs.closeSync(fd);
}
}
export function createJsonlRequestTailer(filePath, options = {}) {
const maxReadBytes = positiveInteger(options.maxReadBytes, DEFAULT_MAX_READ_BYTES);
const historyLimit = positiveInteger(options.historyLimit, DEFAULT_HISTORY_LIMIT);
let offset = 0;
let pending = "";
let requests = [];
function parseLine(line) {
try {
return JSON.parse(line);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
throw new Error(`invalid app-server JSONL at ${filePath}: ${message}`);
}
}
return {
read() {
if (!fs.existsSync(filePath)) {
return requests;
}
const stats = fs.statSync(filePath);
if (!stats.isFile()) {
return requests;
}
if (stats.size < offset) {
offset = 0;
pending = "";
requests = [];
}
if (stats.size === offset) {
return requests;
}
let start = offset;
let discardFirstLine = false;
let clamped = false;
if (start === 0 && stats.size > maxReadBytes) {
start = stats.size - maxReadBytes;
pending = "";
clamped = true;
} else if (stats.size - start > maxReadBytes) {
start = stats.size - maxReadBytes;
pending = "";
clamped = true;
}
if (clamped && start > 0) {
discardFirstLine = readSlice(filePath, start - 1, 1) !== "\n";
}
const text = readSlice(filePath, start, stats.size - start);
offset = stats.size;
if (!text) {
return requests;
}
let chunk = pending + text;
if (discardFirstLine) {
const newlineIndex = chunk.indexOf("\n");
if (newlineIndex === -1) {
pending = "";
return requests;
}
chunk = chunk.slice(newlineIndex + 1);
}
const lines = chunk.split("\n");
pending = lines.pop() ?? "";
for (const line of lines) {
if (!line.trim()) {
continue;
}
requests.push(parseLine(line));
}
if (requests.length > historyLimit) {
requests = requests.slice(-historyLimit);
}
return requests;
},
};
}

View File

@@ -0,0 +1,40 @@
export function waitForWebSocketOpen(ws, timeoutMs, message = "gateway ws open timeout") {
return new Promise((resolve, reject) => {
let settled = false;
const settle = (fn, value) => {
if (settled) {
return;
}
settled = true;
clearTimeout(timer);
ws.off?.("open", onOpen);
ws.off?.("error", onError);
fn(value);
};
const onOpen = () => settle(resolve);
const onError = (error) => settle(reject, error);
const timer = setTimeout(() => {
const consumeAbortError = () => {};
const removeAbortErrorConsumer = () => {
ws.off?.("error", consumeAbortError);
ws.off?.("close", removeAbortErrorConsumer);
};
try {
ws.off?.("error", onError);
ws.on?.("error", consumeAbortError);
ws.once?.("close", removeAbortErrorConsumer);
ws.terminate?.();
if (typeof ws.terminate !== "function") {
ws.close?.();
}
} finally {
settle(reject, new Error(message));
}
}, timeoutMs);
timer.unref?.();
ws.once("open", onOpen);
ws.once("error", onError);
});
}

View File

@@ -0,0 +1,127 @@
import { EventEmitter } from "node:events";
import { appendFileSync, mkdtempSync, rmSync, writeFileSync } from "node:fs";
import { tmpdir } from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import { createJsonlRequestTailer } from "../../scripts/e2e/lib/codex-media-path/jsonl-request-tail.mjs";
import { waitForWebSocketOpen } from "../../scripts/e2e/lib/codex-media-path/open-websocket.mjs";
const tempRoots: string[] = [];
function makeTempRoot(): string {
const root = mkdtempSync(path.join(tmpdir(), "openclaw-codex-media-path-"));
tempRoots.push(root);
return root;
}
function jsonl(value: unknown): string {
return `${JSON.stringify(value)}\n`;
}
class FakeWebSocket extends EventEmitter {
terminated = false;
closed = false;
terminate(): void {
this.terminated = true;
queueMicrotask(() => {
this.emit("error", new Error("socket abort after terminate"));
this.emit("close");
});
}
close(): void {
this.closed = true;
}
}
afterEach(() => {
for (const root of tempRoots.splice(0)) {
rmSync(root, { recursive: true, force: true });
}
});
describe("codex media path JSONL tailer", () => {
it("keeps parsed app-server requests and reads only appended lines", () => {
const logPath = path.join(makeTempRoot(), "app-server.jsonl");
const tailer = createJsonlRequestTailer(logPath, { maxReadBytes: 1024, historyLimit: 10 });
expect(tailer.read()).toEqual([]);
writeFileSync(logPath, jsonl({ method: "initialize" }));
expect(tailer.read()).toEqual([{ method: "initialize" }]);
appendFileSync(logPath, JSON.stringify({ method: "turn/start" }));
expect(tailer.read()).toEqual([{ method: "initialize" }]);
appendFileSync(logPath, "\n");
expect(tailer.read()).toEqual([{ method: "initialize" }, { method: "turn/start" }]);
});
it("starts from a bounded tail of oversized logs", () => {
const logPath = path.join(makeTempRoot(), "app-server.jsonl");
const lastLine = jsonl({ method: "turn/start" });
writeFileSync(logPath, `${"x".repeat(256)}\n${jsonl({ method: "old" })}${lastLine}`);
const tailer = createJsonlRequestTailer(logPath, {
maxReadBytes: lastLine.length + 2,
historyLimit: 10,
});
expect(tailer.read()).toEqual([{ method: "turn/start" }]);
});
it("keeps a complete line when the bounded tail starts on its boundary", () => {
const logPath = path.join(makeTempRoot(), "app-server.jsonl");
const lastLine = jsonl({ method: "turn/start" });
writeFileSync(logPath, `${"x".repeat(256)}\n${lastLine}`);
const tailer = createJsonlRequestTailer(logPath, {
maxReadBytes: lastLine.length,
historyLimit: 10,
});
expect(tailer.read()).toEqual([{ method: "turn/start" }]);
});
it("resets request history when the app-server log is truncated", () => {
const logPath = path.join(makeTempRoot(), "app-server.jsonl");
const tailer = createJsonlRequestTailer(logPath, { maxReadBytes: 1024, historyLimit: 10 });
writeFileSync(logPath, jsonl({ method: "initialize", payload: "long enough to rotate" }));
expect(tailer.read()).toEqual([{ method: "initialize", payload: "long enough to rotate" }]);
writeFileSync(logPath, jsonl({ method: "turn/start" }));
expect(tailer.read()).toEqual([{ method: "turn/start" }]);
});
});
describe("codex media path WebSocket open guard", () => {
it("terminates sockets that never open", async () => {
const ws = new FakeWebSocket();
const keepAlive = setTimeout(() => {}, 100);
try {
await expect(waitForWebSocketOpen(ws, 1)).rejects.toThrow("gateway ws open timeout");
} finally {
clearTimeout(keepAlive);
}
expect(ws.terminated).toBe(true);
await new Promise((resolve) => setImmediate(resolve));
expect(ws.listenerCount("open")).toBe(0);
expect(ws.listenerCount("error")).toBe(0);
});
it("cleans listeners after successful opens", async () => {
const ws = new FakeWebSocket();
const opened = waitForWebSocketOpen(ws, 100);
ws.emit("open");
await expect(opened).resolves.toBeUndefined();
expect(ws.terminated).toBe(false);
expect(ws.listenerCount("open")).toBe(0);
expect(ws.listenerCount("error")).toBe(0);
});
});