From e8e4b93a94d5e66bfeb3bf2db98a14e1d470c8ba Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 17 May 2026 10:33:10 +0100 Subject: [PATCH] fix: harden Codex rollout budget scanning --- .../codex/src/app-server/run-attempt.test.ts | 199 ++++++++++++++++++ .../codex/src/app-server/run-attempt.ts | 93 +++++--- 2 files changed, 258 insertions(+), 34 deletions(-) diff --git a/extensions/codex/src/app-server/run-attempt.test.ts b/extensions/codex/src/app-server/run-attempt.test.ts index dc1df1291fb..8c5d85062b6 100644 --- a/extensions/codex/src/app-server/run-attempt.test.ts +++ b/extensions/codex/src/app-server/run-attempt.test.ts @@ -6170,6 +6170,205 @@ describe("runCodexAppServerAttempt", () => { expect(savedBinding?.threadId).toBe("thread-existing"); }); + it("honors shorthand byte units for native rollout limits", async () => { + const sessionFile = path.join(tempDir, "session.jsonl"); + const workspaceDir = path.join(tempDir, "workspace"); + const agentDir = path.join(tempDir, "agent"); + await writeExistingBinding(sessionFile, workspaceDir, { dynamicToolsFingerprint: "[]" }); + await fs.writeFile( + path.join(path.dirname(sessionFile), "sessions.json"), + JSON.stringify({ + "agent:main:session-1": { + sessionFile, + totalTokens: 12_000, + }, + }), + ); + const rolloutDir = path.join(agentDir, "codex-home", "sessions"); + await fs.mkdir(rolloutDir, { recursive: true }); + await fs.writeFile(path.join(rolloutDir, "rollout-thread-existing.jsonl"), "x".repeat(2_000)); + + const binding = await __testing.rotateOversizedCodexAppServerStartupBinding({ + binding: { threadId: "thread-existing", workspaceDir }, + sessionFile, + agentDir, + config: { + agents: { + defaults: { + compaction: { + truncateAfterCompaction: true, + maxActiveTranscriptBytes: "1k", + }, + }, + }, + } as never, + }); + + expect(binding).toBeUndefined(); + const savedBinding = await readCodexAppServerBinding(sessionFile); + expect(savedBinding).toBeUndefined(); + }); + + it("uses current rollout token usage before cumulative usage", async () => { + const sessionFile = path.join(tempDir, "session.jsonl"); + const workspaceDir = path.join(tempDir, "workspace"); + const agentDir = path.join(tempDir, "agent"); + await writeExistingBinding(sessionFile, workspaceDir, { dynamicToolsFingerprint: "[]" }); + await fs.writeFile( + path.join(path.dirname(sessionFile), "sessions.json"), + JSON.stringify({ + "agent:main:session-1": { + sessionFile, + totalTokens: 12_000, + }, + }), + ); + const rolloutDir = path.join(agentDir, "codex-home", "sessions"); + await fs.mkdir(rolloutDir, { recursive: true }); + await fs.writeFile( + path.join(rolloutDir, "rollout-thread-existing.jsonl"), + `${JSON.stringify({ + payload: { + type: "token_count", + info: { + total_token_usage: { + total_tokens: 70_000, + }, + last_token_usage: { + total_tokens: 12_000, + }, + }, + }, + })}\n`, + ); + + const binding = await __testing.rotateOversizedCodexAppServerStartupBinding({ + binding: { threadId: "thread-existing", workspaceDir }, + sessionFile, + agentDir, + config: { + agents: { + defaults: { + compaction: { + truncateAfterCompaction: true, + maxActiveTranscriptBytes: "1mb", + }, + }, + }, + } as never, + }); + + expect(binding?.threadId).toBe("thread-existing"); + const savedBinding = await readCodexAppServerBinding(sessionFile); + expect(savedBinding?.threadId).toBe("thread-existing"); + }); + + it("ignores stale session token totals for native rollout rotation", async () => { + const sessionFile = path.join(tempDir, "session.jsonl"); + const workspaceDir = path.join(tempDir, "workspace"); + const agentDir = path.join(tempDir, "agent"); + await writeExistingBinding(sessionFile, workspaceDir, { dynamicToolsFingerprint: "[]" }); + await fs.writeFile( + path.join(path.dirname(sessionFile), "sessions.json"), + JSON.stringify({ + "agent:main:session-1": { + sessionFile, + totalTokens: 70_000, + totalTokensFresh: false, + }, + }), + ); + const rolloutDir = path.join(agentDir, "codex-home", "sessions"); + await fs.mkdir(rolloutDir, { recursive: true }); + await fs.writeFile( + path.join(rolloutDir, "rollout-thread-existing.jsonl"), + `${JSON.stringify({ + payload: { + type: "token_count", + info: { + last_token_usage: { + total_tokens: 12_000, + }, + }, + }, + })}\n`, + ); + + const binding = await __testing.rotateOversizedCodexAppServerStartupBinding({ + binding: { threadId: "thread-existing", workspaceDir }, + sessionFile, + agentDir, + config: { + agents: { + defaults: { + compaction: { + truncateAfterCompaction: true, + maxActiveTranscriptBytes: "1mb", + }, + }, + }, + } as never, + }); + + expect(binding?.threadId).toBe("thread-existing"); + const savedBinding = await readCodexAppServerBinding(sessionFile); + expect(savedBinding?.threadId).toBe("thread-existing"); + }); + + it("streams rollout token scans without reading the whole file", async () => { + const sessionFile = path.join(tempDir, "session.jsonl"); + const workspaceDir = path.join(tempDir, "workspace"); + const agentDir = path.join(tempDir, "agent"); + await writeExistingBinding(sessionFile, workspaceDir, { dynamicToolsFingerprint: "[]" }); + await fs.writeFile( + path.join(path.dirname(sessionFile), "sessions.json"), + JSON.stringify({ + "agent:main:session-1": { + sessionFile, + totalTokens: 12_000, + }, + }), + ); + const rolloutDir = path.join(agentDir, "codex-home", "sessions"); + await fs.mkdir(rolloutDir, { recursive: true }); + const rolloutFile = path.join(rolloutDir, "rollout-thread-existing.jsonl"); + await fs.writeFile( + rolloutFile, + `${JSON.stringify({ + payload: { + type: "token_count", + info: { + last_token_usage: { + total_tokens: 70_000, + }, + }, + }, + })}\n`, + ); + const readFileSpy = vi.spyOn(fs, "readFile"); + + const binding = await __testing.rotateOversizedCodexAppServerStartupBinding({ + binding: { threadId: "thread-existing", workspaceDir }, + sessionFile, + agentDir, + config: { + agents: { + defaults: { + compaction: { + truncateAfterCompaction: true, + maxActiveTranscriptBytes: "1mb", + }, + }, + }, + } as never, + }); + + expect(binding).toBeUndefined(); + expect(readFileSpy.mock.calls.some(([file]) => file === rolloutFile)).toBe(false); + const savedBinding = await readCodexAppServerBinding(sessionFile); + expect(savedBinding).toBeUndefined(); + }); + it("clears byte-oversized rollouts before reading their contents", async () => { const sessionFile = path.join(tempDir, "session.jsonl"); const workspaceDir = path.join(tempDir, "workspace"); diff --git a/extensions/codex/src/app-server/run-attempt.ts b/extensions/codex/src/app-server/run-attempt.ts index 3ecc90d9ae7..1331ecee207 100644 --- a/extensions/codex/src/app-server/run-attempt.ts +++ b/extensions/codex/src/app-server/run-attempt.ts @@ -478,6 +478,21 @@ function isCodexAppServerApprovalPolicy(value: unknown): boolean { } const CODEX_APP_SERVER_NATIVE_THREAD_MAX_TOKENS = 70_000; +const CODEX_APP_SERVER_BYTE_UNITS: Record = { + b: 1, + k: 1024, + kb: 1024, + kib: 1024, + m: 1024 * 1024, + mb: 1024 * 1024, + mib: 1024 * 1024, + g: 1024 * 1024 * 1024, + gb: 1024 * 1024 * 1024, + gib: 1024 * 1024 * 1024, + t: 1024 * 1024 * 1024 * 1024, + tb: 1024 * 1024 * 1024 * 1024, + tib: 1024 * 1024 * 1024 * 1024, +}; function parseCodexAppServerByteLimit(value: unknown): number | undefined { if (typeof value === "number" && Number.isFinite(value) && value > 0) { @@ -486,7 +501,7 @@ function parseCodexAppServerByteLimit(value: unknown): number | undefined { if (typeof value !== "string") { return undefined; } - const match = value.trim().match(/^(\d+(?:\.\d+)?)\s*(b|kb|kib|mb|mib|gb|gib)?$/i); + const match = value.trim().match(/^(\d+(?:\.\d+)?)\s*([a-z]+)?$/i); if (!match) { return undefined; } @@ -495,15 +510,11 @@ function parseCodexAppServerByteLimit(value: unknown): number | undefined { return undefined; } const unit = (match[2] ?? "b").toLowerCase(); - const multiplier = - unit === "gb" || unit === "gib" - ? 1024 * 1024 * 1024 - : unit === "mb" || unit === "mib" - ? 1024 * 1024 - : unit === "kb" || unit === "kib" - ? 1024 - : 1; - return Math.max(1, Math.floor(amount * multiplier)); + const multiplier = CODEX_APP_SERVER_BYTE_UNITS[unit]; + if (multiplier === undefined) { + return undefined; + } + return Math.max(1, Math.round(amount * multiplier)); } async function listCodexAppServerRolloutFilesForThread( @@ -582,40 +593,52 @@ async function readCodexSessionRecordForSessionFile( } async function readCodexAppServerRolloutTokenUsage(file: string): Promise { - let raw: string; + let handle: Awaited>; try { - raw = await fs.readFile(file, "utf8"); + handle = await fs.open(file, "r"); } catch { return undefined; } let totalTokens: number | undefined; - for (const line of raw.split(/\r?\n/)) { - if (!line.trim()) { - continue; - } - try { - const parsed = JSON.parse(line) as JsonValue; - const payload = isJsonObject(parsed) ? parsed.payload : undefined; - const info = - isJsonObject(payload) && payload.type === "token_count" && isJsonObject(payload.info) - ? payload.info - : undefined; - const usage = isJsonObject(info?.total_token_usage) - ? info.total_token_usage - : isJsonObject(info?.last_token_usage) - ? info.last_token_usage - : undefined; - const value = usage?.total_tokens ?? usage?.totalTokens; - if (typeof value === "number" && Number.isFinite(value)) { - totalTokens = value; + try { + for await (const line of handle.readLines()) { + const lineTokens = readCodexAppServerRolloutTokenUsageLine(line); + if (lineTokens !== undefined) { + totalTokens = lineTokens; } - } catch { - continue; } + } finally { + await handle.close(); } return totalTokens; } +function readCodexAppServerRolloutTokenUsageLine(line: string): number | undefined { + if (!line.trim()) { + return undefined; + } + try { + const parsed = JSON.parse(line) as JsonValue; + const payload = isJsonObject(parsed) ? parsed.payload : undefined; + const info = + isJsonObject(payload) && payload.type === "token_count" && isJsonObject(payload.info) + ? payload.info + : undefined; + if (!info) { + return undefined; + } + const usage = isJsonObject(info.last_token_usage) + ? info.last_token_usage + : isJsonObject(info.total_token_usage) + ? info.total_token_usage + : undefined; + const value = usage?.total_tokens ?? usage?.totalTokens; + return typeof value === "number" && Number.isFinite(value) ? value : undefined; + } catch { + return undefined; + } +} + function maxFiniteNumber(values: Array): number | undefined { const nums = values.filter( (value): value is number => typeof value === "number" && Number.isFinite(value), @@ -668,7 +691,9 @@ async function rotateOversizedCodexAppServerStartupBinding(params: { ), ); const sessionTokens = - typeof sessionRecord?.totalTokens === "number" && Number.isFinite(sessionRecord.totalTokens) + sessionRecord?.totalTokensFresh !== false && + typeof sessionRecord?.totalTokens === "number" && + Number.isFinite(sessionRecord.totalTokens) ? sessionRecord.totalTokens : undefined; const tokenCount = maxFiniteNumber([sessionTokens, nativeTokens]);