mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-18 18:54:48 +00:00
fix: harden Codex rollout budget scanning
This commit is contained in:
@@ -6170,6 +6170,205 @@ describe("runCodexAppServerAttempt", () => {
|
||||
expect(savedBinding?.threadId).toBe("thread-existing");
|
||||
});
|
||||
|
||||
it("honors shorthand byte units for native rollout limits", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
const agentDir = path.join(tempDir, "agent");
|
||||
await writeExistingBinding(sessionFile, workspaceDir, { dynamicToolsFingerprint: "[]" });
|
||||
await fs.writeFile(
|
||||
path.join(path.dirname(sessionFile), "sessions.json"),
|
||||
JSON.stringify({
|
||||
"agent:main:session-1": {
|
||||
sessionFile,
|
||||
totalTokens: 12_000,
|
||||
},
|
||||
}),
|
||||
);
|
||||
const rolloutDir = path.join(agentDir, "codex-home", "sessions");
|
||||
await fs.mkdir(rolloutDir, { recursive: true });
|
||||
await fs.writeFile(path.join(rolloutDir, "rollout-thread-existing.jsonl"), "x".repeat(2_000));
|
||||
|
||||
const binding = await __testing.rotateOversizedCodexAppServerStartupBinding({
|
||||
binding: { threadId: "thread-existing", workspaceDir },
|
||||
sessionFile,
|
||||
agentDir,
|
||||
config: {
|
||||
agents: {
|
||||
defaults: {
|
||||
compaction: {
|
||||
truncateAfterCompaction: true,
|
||||
maxActiveTranscriptBytes: "1k",
|
||||
},
|
||||
},
|
||||
},
|
||||
} as never,
|
||||
});
|
||||
|
||||
expect(binding).toBeUndefined();
|
||||
const savedBinding = await readCodexAppServerBinding(sessionFile);
|
||||
expect(savedBinding).toBeUndefined();
|
||||
});
|
||||
|
||||
it("uses current rollout token usage before cumulative usage", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
const agentDir = path.join(tempDir, "agent");
|
||||
await writeExistingBinding(sessionFile, workspaceDir, { dynamicToolsFingerprint: "[]" });
|
||||
await fs.writeFile(
|
||||
path.join(path.dirname(sessionFile), "sessions.json"),
|
||||
JSON.stringify({
|
||||
"agent:main:session-1": {
|
||||
sessionFile,
|
||||
totalTokens: 12_000,
|
||||
},
|
||||
}),
|
||||
);
|
||||
const rolloutDir = path.join(agentDir, "codex-home", "sessions");
|
||||
await fs.mkdir(rolloutDir, { recursive: true });
|
||||
await fs.writeFile(
|
||||
path.join(rolloutDir, "rollout-thread-existing.jsonl"),
|
||||
`${JSON.stringify({
|
||||
payload: {
|
||||
type: "token_count",
|
||||
info: {
|
||||
total_token_usage: {
|
||||
total_tokens: 70_000,
|
||||
},
|
||||
last_token_usage: {
|
||||
total_tokens: 12_000,
|
||||
},
|
||||
},
|
||||
},
|
||||
})}\n`,
|
||||
);
|
||||
|
||||
const binding = await __testing.rotateOversizedCodexAppServerStartupBinding({
|
||||
binding: { threadId: "thread-existing", workspaceDir },
|
||||
sessionFile,
|
||||
agentDir,
|
||||
config: {
|
||||
agents: {
|
||||
defaults: {
|
||||
compaction: {
|
||||
truncateAfterCompaction: true,
|
||||
maxActiveTranscriptBytes: "1mb",
|
||||
},
|
||||
},
|
||||
},
|
||||
} as never,
|
||||
});
|
||||
|
||||
expect(binding?.threadId).toBe("thread-existing");
|
||||
const savedBinding = await readCodexAppServerBinding(sessionFile);
|
||||
expect(savedBinding?.threadId).toBe("thread-existing");
|
||||
});
|
||||
|
||||
it("ignores stale session token totals for native rollout rotation", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
const agentDir = path.join(tempDir, "agent");
|
||||
await writeExistingBinding(sessionFile, workspaceDir, { dynamicToolsFingerprint: "[]" });
|
||||
await fs.writeFile(
|
||||
path.join(path.dirname(sessionFile), "sessions.json"),
|
||||
JSON.stringify({
|
||||
"agent:main:session-1": {
|
||||
sessionFile,
|
||||
totalTokens: 70_000,
|
||||
totalTokensFresh: false,
|
||||
},
|
||||
}),
|
||||
);
|
||||
const rolloutDir = path.join(agentDir, "codex-home", "sessions");
|
||||
await fs.mkdir(rolloutDir, { recursive: true });
|
||||
await fs.writeFile(
|
||||
path.join(rolloutDir, "rollout-thread-existing.jsonl"),
|
||||
`${JSON.stringify({
|
||||
payload: {
|
||||
type: "token_count",
|
||||
info: {
|
||||
last_token_usage: {
|
||||
total_tokens: 12_000,
|
||||
},
|
||||
},
|
||||
},
|
||||
})}\n`,
|
||||
);
|
||||
|
||||
const binding = await __testing.rotateOversizedCodexAppServerStartupBinding({
|
||||
binding: { threadId: "thread-existing", workspaceDir },
|
||||
sessionFile,
|
||||
agentDir,
|
||||
config: {
|
||||
agents: {
|
||||
defaults: {
|
||||
compaction: {
|
||||
truncateAfterCompaction: true,
|
||||
maxActiveTranscriptBytes: "1mb",
|
||||
},
|
||||
},
|
||||
},
|
||||
} as never,
|
||||
});
|
||||
|
||||
expect(binding?.threadId).toBe("thread-existing");
|
||||
const savedBinding = await readCodexAppServerBinding(sessionFile);
|
||||
expect(savedBinding?.threadId).toBe("thread-existing");
|
||||
});
|
||||
|
||||
it("streams rollout token scans without reading the whole file", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
const agentDir = path.join(tempDir, "agent");
|
||||
await writeExistingBinding(sessionFile, workspaceDir, { dynamicToolsFingerprint: "[]" });
|
||||
await fs.writeFile(
|
||||
path.join(path.dirname(sessionFile), "sessions.json"),
|
||||
JSON.stringify({
|
||||
"agent:main:session-1": {
|
||||
sessionFile,
|
||||
totalTokens: 12_000,
|
||||
},
|
||||
}),
|
||||
);
|
||||
const rolloutDir = path.join(agentDir, "codex-home", "sessions");
|
||||
await fs.mkdir(rolloutDir, { recursive: true });
|
||||
const rolloutFile = path.join(rolloutDir, "rollout-thread-existing.jsonl");
|
||||
await fs.writeFile(
|
||||
rolloutFile,
|
||||
`${JSON.stringify({
|
||||
payload: {
|
||||
type: "token_count",
|
||||
info: {
|
||||
last_token_usage: {
|
||||
total_tokens: 70_000,
|
||||
},
|
||||
},
|
||||
},
|
||||
})}\n`,
|
||||
);
|
||||
const readFileSpy = vi.spyOn(fs, "readFile");
|
||||
|
||||
const binding = await __testing.rotateOversizedCodexAppServerStartupBinding({
|
||||
binding: { threadId: "thread-existing", workspaceDir },
|
||||
sessionFile,
|
||||
agentDir,
|
||||
config: {
|
||||
agents: {
|
||||
defaults: {
|
||||
compaction: {
|
||||
truncateAfterCompaction: true,
|
||||
maxActiveTranscriptBytes: "1mb",
|
||||
},
|
||||
},
|
||||
},
|
||||
} as never,
|
||||
});
|
||||
|
||||
expect(binding).toBeUndefined();
|
||||
expect(readFileSpy.mock.calls.some(([file]) => file === rolloutFile)).toBe(false);
|
||||
const savedBinding = await readCodexAppServerBinding(sessionFile);
|
||||
expect(savedBinding).toBeUndefined();
|
||||
});
|
||||
|
||||
it("clears byte-oversized rollouts before reading their contents", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
|
||||
@@ -478,6 +478,21 @@ function isCodexAppServerApprovalPolicy(value: unknown): boolean {
|
||||
}
|
||||
|
||||
const CODEX_APP_SERVER_NATIVE_THREAD_MAX_TOKENS = 70_000;
|
||||
const CODEX_APP_SERVER_BYTE_UNITS: Record<string, number> = {
|
||||
b: 1,
|
||||
k: 1024,
|
||||
kb: 1024,
|
||||
kib: 1024,
|
||||
m: 1024 * 1024,
|
||||
mb: 1024 * 1024,
|
||||
mib: 1024 * 1024,
|
||||
g: 1024 * 1024 * 1024,
|
||||
gb: 1024 * 1024 * 1024,
|
||||
gib: 1024 * 1024 * 1024,
|
||||
t: 1024 * 1024 * 1024 * 1024,
|
||||
tb: 1024 * 1024 * 1024 * 1024,
|
||||
tib: 1024 * 1024 * 1024 * 1024,
|
||||
};
|
||||
|
||||
function parseCodexAppServerByteLimit(value: unknown): number | undefined {
|
||||
if (typeof value === "number" && Number.isFinite(value) && value > 0) {
|
||||
@@ -486,7 +501,7 @@ function parseCodexAppServerByteLimit(value: unknown): number | undefined {
|
||||
if (typeof value !== "string") {
|
||||
return undefined;
|
||||
}
|
||||
const match = value.trim().match(/^(\d+(?:\.\d+)?)\s*(b|kb|kib|mb|mib|gb|gib)?$/i);
|
||||
const match = value.trim().match(/^(\d+(?:\.\d+)?)\s*([a-z]+)?$/i);
|
||||
if (!match) {
|
||||
return undefined;
|
||||
}
|
||||
@@ -495,15 +510,11 @@ function parseCodexAppServerByteLimit(value: unknown): number | undefined {
|
||||
return undefined;
|
||||
}
|
||||
const unit = (match[2] ?? "b").toLowerCase();
|
||||
const multiplier =
|
||||
unit === "gb" || unit === "gib"
|
||||
? 1024 * 1024 * 1024
|
||||
: unit === "mb" || unit === "mib"
|
||||
? 1024 * 1024
|
||||
: unit === "kb" || unit === "kib"
|
||||
? 1024
|
||||
: 1;
|
||||
return Math.max(1, Math.floor(amount * multiplier));
|
||||
const multiplier = CODEX_APP_SERVER_BYTE_UNITS[unit];
|
||||
if (multiplier === undefined) {
|
||||
return undefined;
|
||||
}
|
||||
return Math.max(1, Math.round(amount * multiplier));
|
||||
}
|
||||
|
||||
async function listCodexAppServerRolloutFilesForThread(
|
||||
@@ -582,40 +593,52 @@ async function readCodexSessionRecordForSessionFile(
|
||||
}
|
||||
|
||||
async function readCodexAppServerRolloutTokenUsage(file: string): Promise<number | undefined> {
|
||||
let raw: string;
|
||||
let handle: Awaited<ReturnType<typeof fs.open>>;
|
||||
try {
|
||||
raw = await fs.readFile(file, "utf8");
|
||||
handle = await fs.open(file, "r");
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
let totalTokens: number | undefined;
|
||||
for (const line of raw.split(/\r?\n/)) {
|
||||
if (!line.trim()) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
const parsed = JSON.parse(line) as JsonValue;
|
||||
const payload = isJsonObject(parsed) ? parsed.payload : undefined;
|
||||
const info =
|
||||
isJsonObject(payload) && payload.type === "token_count" && isJsonObject(payload.info)
|
||||
? payload.info
|
||||
: undefined;
|
||||
const usage = isJsonObject(info?.total_token_usage)
|
||||
? info.total_token_usage
|
||||
: isJsonObject(info?.last_token_usage)
|
||||
? info.last_token_usage
|
||||
: undefined;
|
||||
const value = usage?.total_tokens ?? usage?.totalTokens;
|
||||
if (typeof value === "number" && Number.isFinite(value)) {
|
||||
totalTokens = value;
|
||||
try {
|
||||
for await (const line of handle.readLines()) {
|
||||
const lineTokens = readCodexAppServerRolloutTokenUsageLine(line);
|
||||
if (lineTokens !== undefined) {
|
||||
totalTokens = lineTokens;
|
||||
}
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
} finally {
|
||||
await handle.close();
|
||||
}
|
||||
return totalTokens;
|
||||
}
|
||||
|
||||
function readCodexAppServerRolloutTokenUsageLine(line: string): number | undefined {
|
||||
if (!line.trim()) {
|
||||
return undefined;
|
||||
}
|
||||
try {
|
||||
const parsed = JSON.parse(line) as JsonValue;
|
||||
const payload = isJsonObject(parsed) ? parsed.payload : undefined;
|
||||
const info =
|
||||
isJsonObject(payload) && payload.type === "token_count" && isJsonObject(payload.info)
|
||||
? payload.info
|
||||
: undefined;
|
||||
if (!info) {
|
||||
return undefined;
|
||||
}
|
||||
const usage = isJsonObject(info.last_token_usage)
|
||||
? info.last_token_usage
|
||||
: isJsonObject(info.total_token_usage)
|
||||
? info.total_token_usage
|
||||
: undefined;
|
||||
const value = usage?.total_tokens ?? usage?.totalTokens;
|
||||
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
function maxFiniteNumber(values: Array<number | undefined>): number | undefined {
|
||||
const nums = values.filter(
|
||||
(value): value is number => typeof value === "number" && Number.isFinite(value),
|
||||
@@ -668,7 +691,9 @@ async function rotateOversizedCodexAppServerStartupBinding(params: {
|
||||
),
|
||||
);
|
||||
const sessionTokens =
|
||||
typeof sessionRecord?.totalTokens === "number" && Number.isFinite(sessionRecord.totalTokens)
|
||||
sessionRecord?.totalTokensFresh !== false &&
|
||||
typeof sessionRecord?.totalTokens === "number" &&
|
||||
Number.isFinite(sessionRecord.totalTokens)
|
||||
? sessionRecord.totalTokens
|
||||
: undefined;
|
||||
const tokenCount = maxFiniteNumber([sessionTokens, nativeTokens]);
|
||||
|
||||
Reference in New Issue
Block a user