mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-05 11:12:54 +00:00
fix(codex): rotate native threads before overflow
This commit is contained in:
committed by
Peter Steinberger
parent
c275064878
commit
ef94eb0c31
@@ -545,109 +545,171 @@ describe("runCodexAppServerAttempt context-engine lifecycle", () => {
|
||||
await secondRun;
|
||||
});
|
||||
|
||||
it.each([
|
||||
[
|
||||
"token",
|
||||
it("resumes a matching thread-bootstrap binding even when the bootstrap turn exceeded the opt-in native byte guard", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
const agentDir = path.join(tempDir, "agent");
|
||||
await writeCodexAppServerBinding(sessionFile, {
|
||||
threadId: "thread-bootstrapped",
|
||||
cwd: workspaceDir,
|
||||
dynamicToolsFingerprint: "[]",
|
||||
contextEngine: {
|
||||
schemaVersion: 1,
|
||||
engineId: "lossless-claw",
|
||||
policyFingerprint:
|
||||
'{"schemaVersion":1,"engineId":"lossless-claw","ownsCompaction":true,"projectionMaxChars":24000}',
|
||||
projection: {
|
||||
schemaVersion: 1,
|
||||
mode: "thread_bootstrap",
|
||||
epoch: "epoch-1",
|
||||
},
|
||||
},
|
||||
});
|
||||
await fs.writeFile(
|
||||
path.join(path.dirname(sessionFile), "sessions.json"),
|
||||
JSON.stringify({
|
||||
"agent:main:session-1": {
|
||||
sessionFile,
|
||||
totalTokens: 12_000,
|
||||
},
|
||||
}),
|
||||
);
|
||||
const rolloutDir = path.join(agentDir, "codex-home", "sessions");
|
||||
await fs.mkdir(rolloutDir, { recursive: true });
|
||||
await fs.writeFile(
|
||||
path.join(rolloutDir, "rollout-thread-bootstrapped.jsonl"),
|
||||
"x".repeat(2_000),
|
||||
);
|
||||
const contextEngine = createContextEngine({
|
||||
assemble: vi.fn(async ({ prompt }) => ({
|
||||
messages: [
|
||||
assistantMessage("already bootstrapped context", 10),
|
||||
userMessage(prompt ?? "", 11),
|
||||
],
|
||||
estimatedTokens: 42,
|
||||
systemPromptAddition: "context-engine system",
|
||||
contextProjection: { mode: "thread_bootstrap" as const, epoch: "epoch-1" },
|
||||
})),
|
||||
});
|
||||
const harness = createStartedThreadHarness(async (method) => {
|
||||
if (method === "thread/resume") {
|
||||
return threadStartResult("thread-bootstrapped");
|
||||
}
|
||||
if (method === "thread/start") {
|
||||
return threadStartResult("thread-fresh");
|
||||
}
|
||||
return undefined;
|
||||
});
|
||||
const params = createParams(sessionFile, workspaceDir);
|
||||
params.agentDir = agentDir;
|
||||
params.contextEngine = contextEngine;
|
||||
params.config = {
|
||||
agents: {
|
||||
defaults: {
|
||||
compaction: {
|
||||
truncateAfterCompaction: true,
|
||||
maxActiveTranscriptBytes: 1_000,
|
||||
},
|
||||
},
|
||||
},
|
||||
} as EmbeddedRunAttemptParams["config"];
|
||||
|
||||
const run = runCodexAppServerAttempt(params);
|
||||
await harness.waitForMethod("turn/start");
|
||||
|
||||
expect(harness.requests.map((request) => request.method)).toEqual([
|
||||
"thread/resume",
|
||||
"turn/start",
|
||||
]);
|
||||
const inputText = getRequestInputText(harness);
|
||||
expect(inputText).not.toContain("OpenClaw assembled context for this turn:");
|
||||
expect(inputText).not.toContain("already bootstrapped context");
|
||||
expect(inputText).toBe("hello");
|
||||
|
||||
await harness.completeTurn("completed", "thread-bootstrapped");
|
||||
await run;
|
||||
});
|
||||
|
||||
it("starts a fresh thread instead of resuming a token-pressured thread-bootstrap binding", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
const agentDir = path.join(tempDir, "agent");
|
||||
await writeCodexAppServerBinding(sessionFile, {
|
||||
threadId: "thread-bootstrapped",
|
||||
cwd: workspaceDir,
|
||||
dynamicToolsFingerprint: "[]",
|
||||
contextEngine: {
|
||||
schemaVersion: 1,
|
||||
engineId: "lossless-claw",
|
||||
policyFingerprint:
|
||||
'{"schemaVersion":1,"engineId":"lossless-claw","ownsCompaction":true,"projectionMaxChars":24000}',
|
||||
projection: {
|
||||
schemaVersion: 1,
|
||||
mode: "thread_bootstrap",
|
||||
epoch: "epoch-1",
|
||||
},
|
||||
},
|
||||
});
|
||||
await fs.writeFile(
|
||||
path.join(path.dirname(sessionFile), "sessions.json"),
|
||||
JSON.stringify({
|
||||
"agent:main:session-1": {
|
||||
sessionFile,
|
||||
totalTokens: 12_000,
|
||||
},
|
||||
}),
|
||||
);
|
||||
const rolloutDir = path.join(agentDir, "codex-home", "sessions");
|
||||
await fs.mkdir(rolloutDir, { recursive: true });
|
||||
await fs.writeFile(
|
||||
path.join(rolloutDir, "rollout-thread-bootstrapped.jsonl"),
|
||||
`${JSON.stringify({
|
||||
payload: {
|
||||
type: "token_count",
|
||||
info: {
|
||||
last_token_usage: {
|
||||
total_tokens: 300_000,
|
||||
total_tokens: 241_198,
|
||||
},
|
||||
model_context_window: 258_400,
|
||||
},
|
||||
},
|
||||
})}\n`,
|
||||
"1mb",
|
||||
],
|
||||
["byte", "x".repeat(2_000), 1_000],
|
||||
] as const)(
|
||||
"resumes a matching thread-bootstrap binding even when the bootstrap turn exceeded the native %s guard",
|
||||
async (_guard, rolloutContent, maxActiveTranscriptBytes) => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
const agentDir = path.join(tempDir, "agent");
|
||||
await writeCodexAppServerBinding(sessionFile, {
|
||||
threadId: "thread-bootstrapped",
|
||||
cwd: workspaceDir,
|
||||
dynamicToolsFingerprint: "[]",
|
||||
contextEngine: {
|
||||
schemaVersion: 1,
|
||||
engineId: "lossless-claw",
|
||||
policyFingerprint:
|
||||
'{"schemaVersion":1,"engineId":"lossless-claw","ownsCompaction":true,"projectionMaxChars":24000}',
|
||||
projection: {
|
||||
schemaVersion: 1,
|
||||
mode: "thread_bootstrap",
|
||||
epoch: "epoch-1",
|
||||
},
|
||||
},
|
||||
});
|
||||
await fs.writeFile(
|
||||
path.join(path.dirname(sessionFile), "sessions.json"),
|
||||
JSON.stringify({
|
||||
"agent:main:session-1": {
|
||||
sessionFile,
|
||||
totalTokens: 12_000,
|
||||
},
|
||||
}),
|
||||
);
|
||||
const rolloutDir = path.join(agentDir, "codex-home", "sessions");
|
||||
await fs.mkdir(rolloutDir, { recursive: true });
|
||||
await fs.writeFile(
|
||||
path.join(rolloutDir, "rollout-thread-bootstrapped.jsonl"),
|
||||
rolloutContent,
|
||||
);
|
||||
const contextEngine = createContextEngine({
|
||||
assemble: vi.fn(async ({ prompt }) => ({
|
||||
messages: [
|
||||
assistantMessage("already bootstrapped context", 10),
|
||||
userMessage(prompt ?? "", 11),
|
||||
],
|
||||
estimatedTokens: 42,
|
||||
systemPromptAddition: "context-engine system",
|
||||
contextProjection: { mode: "thread_bootstrap" as const, epoch: "epoch-1" },
|
||||
})),
|
||||
});
|
||||
const harness = createStartedThreadHarness(async (method) => {
|
||||
if (method === "thread/resume") {
|
||||
return threadStartResult("thread-bootstrapped");
|
||||
}
|
||||
if (method === "thread/start") {
|
||||
return threadStartResult("thread-fresh");
|
||||
}
|
||||
return undefined;
|
||||
});
|
||||
const params = createParams(sessionFile, workspaceDir);
|
||||
params.agentDir = agentDir;
|
||||
params.contextEngine = contextEngine;
|
||||
params.config = {
|
||||
agents: {
|
||||
defaults: {
|
||||
compaction: {
|
||||
truncateAfterCompaction: true,
|
||||
maxActiveTranscriptBytes,
|
||||
},
|
||||
},
|
||||
},
|
||||
} as EmbeddedRunAttemptParams["config"];
|
||||
);
|
||||
const contextEngine = createContextEngine({
|
||||
assemble: vi.fn(async ({ prompt }) => ({
|
||||
messages: [assistantMessage("reprojected context", 10), userMessage(prompt ?? "", 11)],
|
||||
estimatedTokens: 42,
|
||||
systemPromptAddition: "context-engine system",
|
||||
contextProjection: { mode: "thread_bootstrap" as const, epoch: "epoch-1" },
|
||||
})),
|
||||
});
|
||||
const harness = createStartedThreadHarness(async (method) => {
|
||||
if (method === "thread/resume") {
|
||||
return threadStartResult("thread-bootstrapped");
|
||||
}
|
||||
if (method === "thread/start") {
|
||||
return threadStartResult("thread-fresh");
|
||||
}
|
||||
return undefined;
|
||||
});
|
||||
const params = createParams(sessionFile, workspaceDir);
|
||||
params.agentDir = agentDir;
|
||||
params.contextEngine = contextEngine;
|
||||
|
||||
const run = runCodexAppServerAttempt(params);
|
||||
await harness.waitForMethod("turn/start");
|
||||
const run = runCodexAppServerAttempt(params);
|
||||
await harness.waitForMethod("turn/start");
|
||||
|
||||
expect(harness.requests.map((request) => request.method)).toEqual([
|
||||
"thread/resume",
|
||||
"turn/start",
|
||||
]);
|
||||
const inputText = getRequestInputText(harness);
|
||||
expect(inputText).not.toContain("OpenClaw assembled context for this turn:");
|
||||
expect(inputText).not.toContain("already bootstrapped context");
|
||||
expect(inputText).toBe("hello");
|
||||
expect(harness.requests.map((request) => request.method)).toEqual([
|
||||
"thread/start",
|
||||
"turn/start",
|
||||
]);
|
||||
const inputText = getRequestInputText(harness);
|
||||
expect(inputText).toContain("OpenClaw assembled context for this turn:");
|
||||
expect(inputText).toContain("reprojected context");
|
||||
|
||||
await harness.completeTurn("completed", "thread-bootstrapped");
|
||||
await run;
|
||||
},
|
||||
);
|
||||
await harness.completeTurn("completed", "thread-fresh");
|
||||
await run;
|
||||
});
|
||||
|
||||
it("projects mirrored history when an oversized thread-bootstrap binding has no active context engine", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
|
||||
@@ -3763,6 +3763,54 @@ describe("runCodexAppServerAttempt", () => {
|
||||
expect(savedBinding?.threadId).toBe("thread-1");
|
||||
});
|
||||
|
||||
it("starts a fresh Codex thread before turn/start when the next prompt would exhaust native headroom", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
const agentDir = path.join(tempDir, "agent");
|
||||
await writeExistingBinding(sessionFile, workspaceDir, { dynamicToolsFingerprint: "[]" });
|
||||
await fs.writeFile(
|
||||
path.join(path.dirname(sessionFile), "sessions.json"),
|
||||
JSON.stringify({
|
||||
"agent:main:session-1": {
|
||||
sessionFile,
|
||||
totalTokens: 12_000,
|
||||
},
|
||||
}),
|
||||
);
|
||||
const rolloutDir = path.join(agentDir, "codex-home", "sessions");
|
||||
await fs.mkdir(rolloutDir, { recursive: true });
|
||||
await fs.writeFile(
|
||||
path.join(rolloutDir, "rollout-thread-existing.jsonl"),
|
||||
`${JSON.stringify({
|
||||
payload: {
|
||||
type: "token_count",
|
||||
info: {
|
||||
last_token_usage: {
|
||||
total_tokens: 220_000,
|
||||
},
|
||||
model_context_window: 258_400,
|
||||
},
|
||||
},
|
||||
})}\n`,
|
||||
);
|
||||
const { requests, waitForMethod, completeTurn } = createStartedThreadHarness();
|
||||
const params = createParams(sessionFile, workspaceDir);
|
||||
params.agentDir = agentDir;
|
||||
params.prompt = "large prompt ".repeat(12_000);
|
||||
|
||||
const run = runCodexAppServerAttempt(params, {
|
||||
pluginConfig: { appServer: { mode: "yolo" } },
|
||||
});
|
||||
await waitForMethod("turn/start");
|
||||
await completeTurn({ threadId: "thread-1", turnId: "turn-1" });
|
||||
await run;
|
||||
|
||||
expect(requests.map((entry) => entry.method)).toContain("thread/start");
|
||||
expect(requests.map((entry) => entry.method)).not.toContain("thread/resume");
|
||||
const savedBinding = await readCodexAppServerBinding(sessionFile);
|
||||
expect(savedBinding?.threadId).toBe("thread-1");
|
||||
});
|
||||
|
||||
it("preserves bound auth when rotating a fallback-fuse native rollout", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
|
||||
@@ -246,8 +246,17 @@ import {
|
||||
import { createCodexUserInputBridge } from "./user-input-bridge.js";
|
||||
|
||||
const CODEX_NATIVE_HOOK_RELAY_RENEW_INTERVAL_MS = 60_000;
|
||||
const CODEX_APP_SERVER_PROJECTED_CHARS_PER_TOKEN = 4;
|
||||
const ensuredCodexWorkspaceDirs = new Set<string>();
|
||||
|
||||
function estimateCodexAppServerProjectedTurnTokens(params: {
|
||||
prompt: string;
|
||||
developerInstructions?: string;
|
||||
}): number {
|
||||
const inputChars = params.prompt.length + (params.developerInstructions?.length ?? 0);
|
||||
return Math.max(1, Math.ceil(inputChars / CODEX_APP_SERVER_PROJECTED_CHARS_PER_TOKEN));
|
||||
}
|
||||
|
||||
async function ensureCodexWorkspaceDirOnce(workspaceDir: string): Promise<void> {
|
||||
const normalized = path.resolve(workspaceDir);
|
||||
if (ensuredCodexWorkspaceDirs.has(normalized)) {
|
||||
@@ -673,6 +682,15 @@ export async function runCodexAppServerAttempt(
|
||||
let developerInstructions = baseDeveloperInstructions;
|
||||
let prePromptMessageCount = historyMessages.length;
|
||||
let contextEngineProjection: CodexContextEngineThreadBootstrapProjection | undefined;
|
||||
const applyMirroredHistoryProjectionForFreshThread = () => {
|
||||
const projection = projectContextEngineAssemblyForCodex({
|
||||
assembledMessages: historyMessages,
|
||||
originalHistoryMessages: historyMessages,
|
||||
prompt: params.prompt,
|
||||
});
|
||||
promptText = projection.promptText;
|
||||
prePromptMessageCount = projection.prePromptMessageCount;
|
||||
};
|
||||
const applyActiveContextEngineProjection = async (
|
||||
decisionStartupBinding: CodexAppServerThreadBinding | undefined,
|
||||
) => {
|
||||
@@ -764,13 +782,7 @@ export async function runCodexAppServerAttempt(
|
||||
forceProject: !nativeToolSurfaceEnabled,
|
||||
})
|
||||
) {
|
||||
const projection = projectContextEngineAssemblyForCodex({
|
||||
assembledMessages: historyMessages,
|
||||
originalHistoryMessages: historyMessages,
|
||||
prompt: params.prompt,
|
||||
});
|
||||
promptText = projection.promptText;
|
||||
prePromptMessageCount = projection.prePromptMessageCount;
|
||||
applyMirroredHistoryProjectionForFreshThread();
|
||||
}
|
||||
const buildPromptFromCurrentInputs = () =>
|
||||
resolveAgentHarnessBeforePromptBuildResult({
|
||||
@@ -794,6 +806,60 @@ export async function runCodexAppServerAttempt(
|
||||
promptBuild.developerInstructions,
|
||||
buildCodexTurnCollaborationDeveloperInstructions(),
|
||||
);
|
||||
const rebuildCodexTurnPromptFromCurrentProjection = async () => {
|
||||
promptBuild = await buildPromptFromCurrentInputs();
|
||||
codexTurnPromptText = decorateCodexTurnPromptText(promptBuild.prompt);
|
||||
};
|
||||
const rotateStartupBindingForProjectedTurn = async () => {
|
||||
if (!startupBinding?.threadId) {
|
||||
return;
|
||||
}
|
||||
const previousThreadId = startupBinding.threadId;
|
||||
const projectedTurnTokens = estimateCodexAppServerProjectedTurnTokens({
|
||||
prompt: codexTurnPromptText,
|
||||
developerInstructions: buildRenderedCodexDeveloperInstructions(),
|
||||
});
|
||||
startupBinding = await rotateOversizedCodexAppServerStartupBinding({
|
||||
binding: startupBinding,
|
||||
sessionFile: params.sessionFile,
|
||||
agentDir,
|
||||
codexHome: appServer.start.env?.CODEX_HOME,
|
||||
config: params.config,
|
||||
contextEngineActive: Boolean(activeContextEngine),
|
||||
projectedTurnTokens,
|
||||
});
|
||||
if (startupBinding?.threadId) {
|
||||
return;
|
||||
}
|
||||
if (activeContextEngine) {
|
||||
contextEngineProjection = undefined;
|
||||
try {
|
||||
await applyActiveContextEngineProjection(undefined);
|
||||
} catch (assembleErr) {
|
||||
embeddedAgentLog.warn("context engine assemble failed; using Codex baseline prompt", {
|
||||
error: formatErrorMessage(assembleErr),
|
||||
});
|
||||
}
|
||||
} else if (
|
||||
shouldProjectMirroredHistoryForCodexStart({
|
||||
startupBinding,
|
||||
dynamicToolsFingerprint: codexDynamicToolsFingerprint(toolBridge.specs),
|
||||
historyMessages,
|
||||
forceProject: !nativeToolSurfaceEnabled,
|
||||
})
|
||||
) {
|
||||
applyMirroredHistoryProjectionForFreshThread();
|
||||
}
|
||||
await rebuildCodexTurnPromptFromCurrentProjection();
|
||||
embeddedAgentLog.info("codex app-server rebuilt turn prompt after native thread rotation", {
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: contextSessionKey,
|
||||
previousThreadId,
|
||||
promptChars: codexTurnPromptText.length,
|
||||
developerInstructionChars: buildRenderedCodexDeveloperInstructions()?.length ?? 0,
|
||||
});
|
||||
};
|
||||
await rotateStartupBindingForProjectedTurn();
|
||||
const systemPromptReport = buildCodexSystemPromptReport({
|
||||
attempt: params,
|
||||
sessionKey: contextSessionKey,
|
||||
|
||||
@@ -77,6 +77,41 @@ describe("Codex app-server startup binding", () => {
|
||||
expect(savedBinding?.threadId).toBe("thread-existing");
|
||||
});
|
||||
|
||||
it("checks native rollout token pressure under default compaction config", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
const agentDir = path.join(tempDir, "agent");
|
||||
await writeExistingBinding(sessionFile, workspaceDir, { dynamicToolsFingerprint: "[]" });
|
||||
await writeSessionRecord(sessionFile, { totalTokens: 12_000 });
|
||||
const rolloutDir = path.join(agentDir, "codex-home", "sessions");
|
||||
await fs.mkdir(rolloutDir, { recursive: true });
|
||||
await fs.writeFile(
|
||||
path.join(rolloutDir, "rollout-thread-existing.jsonl"),
|
||||
`${JSON.stringify({
|
||||
payload: {
|
||||
type: "token_count",
|
||||
info: {
|
||||
last_token_usage: {
|
||||
total_tokens: 241_198,
|
||||
},
|
||||
model_context_window: 258_400,
|
||||
},
|
||||
},
|
||||
})}\n`,
|
||||
);
|
||||
|
||||
const binding = await rotateOversizedCodexAppServerStartupBinding({
|
||||
binding: await readCodexAppServerBinding(sessionFile),
|
||||
sessionFile,
|
||||
agentDir,
|
||||
config: undefined,
|
||||
});
|
||||
|
||||
expect(binding).toBeUndefined();
|
||||
const savedBinding = await readCodexAppServerBinding(sessionFile);
|
||||
expect(savedBinding).toBeUndefined();
|
||||
});
|
||||
|
||||
it("honors shorthand byte units for native rollout limits", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
@@ -333,6 +368,76 @@ describe("Codex app-server startup binding", () => {
|
||||
expect(savedBinding?.threadId).toBe("thread-existing");
|
||||
});
|
||||
|
||||
it("includes projected turn tokens in the native rollout pressure check", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
const agentDir = path.join(tempDir, "agent");
|
||||
await writeExistingBinding(sessionFile, workspaceDir, { dynamicToolsFingerprint: "[]" });
|
||||
await writeSessionRecord(sessionFile, { totalTokens: 12_000 });
|
||||
const rolloutDir = path.join(agentDir, "codex-home", "sessions");
|
||||
await fs.mkdir(rolloutDir, { recursive: true });
|
||||
await fs.writeFile(
|
||||
path.join(rolloutDir, "rollout-thread-existing.jsonl"),
|
||||
`${JSON.stringify({
|
||||
payload: {
|
||||
type: "token_count",
|
||||
info: {
|
||||
last_token_usage: {
|
||||
total_tokens: 220_000,
|
||||
},
|
||||
model_context_window: 258_400,
|
||||
},
|
||||
},
|
||||
})}\n`,
|
||||
);
|
||||
|
||||
const binding = await rotateOversizedCodexAppServerStartupBinding({
|
||||
binding: await readCodexAppServerBinding(sessionFile),
|
||||
sessionFile,
|
||||
agentDir,
|
||||
config: undefined,
|
||||
projectedTurnTokens: 30_000,
|
||||
});
|
||||
|
||||
expect(binding).toBeUndefined();
|
||||
const savedBinding = await readCodexAppServerBinding(sessionFile);
|
||||
expect(savedBinding).toBeUndefined();
|
||||
});
|
||||
|
||||
it("uses the session context window when the native rollout omits its model window", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
const agentDir = path.join(tempDir, "agent");
|
||||
await writeExistingBinding(sessionFile, workspaceDir, { dynamicToolsFingerprint: "[]" });
|
||||
await writeSessionRecord(sessionFile, { totalTokens: 12_000, contextTokens: 258_400 });
|
||||
const rolloutDir = path.join(agentDir, "codex-home", "sessions");
|
||||
await fs.mkdir(rolloutDir, { recursive: true });
|
||||
await fs.writeFile(
|
||||
path.join(rolloutDir, "rollout-thread-existing.jsonl"),
|
||||
`${JSON.stringify({
|
||||
payload: {
|
||||
type: "token_count",
|
||||
info: {
|
||||
last_token_usage: {
|
||||
total_tokens: 241_198,
|
||||
},
|
||||
},
|
||||
},
|
||||
})}\n`,
|
||||
);
|
||||
|
||||
const binding = await rotateOversizedCodexAppServerStartupBinding({
|
||||
binding: await readCodexAppServerBinding(sessionFile),
|
||||
sessionFile,
|
||||
agentDir,
|
||||
config: undefined,
|
||||
});
|
||||
|
||||
expect(binding).toBeUndefined();
|
||||
const savedBinding = await readCodexAppServerBinding(sessionFile);
|
||||
expect(savedBinding).toBeUndefined();
|
||||
});
|
||||
|
||||
it("clears byte-oversized rollouts before reading their contents", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
|
||||
@@ -9,10 +9,10 @@ import { resolveCodexAppServerHomeDir } from "./auth-bridge.js";
|
||||
import { isJsonObject, type JsonValue } from "./protocol.js";
|
||||
import { clearCodexAppServerBinding, type CodexAppServerThreadBinding } from "./session-binding.js";
|
||||
|
||||
// Codex owns proactive auto-compaction and derives its limit from the active model context
|
||||
// window. OpenClaw only clears a bound native thread as a recovery fuse when Codex does
|
||||
// not report that window, so the fallback stays well above normal compaction pressure.
|
||||
// Codex owns proactive auto-compaction, but OpenClaw must not resume a native
|
||||
// thread that is already too close to the server-side window for the next turn.
|
||||
const CODEX_APP_SERVER_NATIVE_THREAD_FALLBACK_MAX_TOKENS = 300_000;
|
||||
const CODEX_APP_SERVER_NATIVE_THREAD_DEFAULT_RESERVE_TOKENS = 20_000;
|
||||
const CODEX_APP_SERVER_BYTE_UNITS: Record<string, number> = {
|
||||
b: 1,
|
||||
k: 1024,
|
||||
@@ -209,10 +209,48 @@ function readCodexAppServerRolloutTokenSnapshotLine(
|
||||
}
|
||||
}
|
||||
|
||||
function resolveCodexAppServerNativeThreadTokenFuse(
|
||||
modelContextWindow: number | undefined,
|
||||
function toNonNegativeInt(value: unknown): number | undefined {
|
||||
if (typeof value !== "number" || !Number.isFinite(value) || value < 0) {
|
||||
return undefined;
|
||||
}
|
||||
return Math.floor(value);
|
||||
}
|
||||
|
||||
function readCompactionConfig(config: EmbeddedRunAttemptParams["config"] | undefined) {
|
||||
return isJsonObject(config?.agents?.defaults?.compaction)
|
||||
? config.agents.defaults.compaction
|
||||
: undefined;
|
||||
}
|
||||
|
||||
function resolveCodexAppServerNativeThreadReserveTokens(
|
||||
config: EmbeddedRunAttemptParams["config"] | undefined,
|
||||
): number {
|
||||
return modelContextWindow ?? CODEX_APP_SERVER_NATIVE_THREAD_FALLBACK_MAX_TOKENS;
|
||||
const compaction = readCompactionConfig(config);
|
||||
const reserveTokens = toNonNegativeInt(compaction?.reserveTokens);
|
||||
const reserveTokensFloor = toNonNegativeInt(compaction?.reserveTokensFloor);
|
||||
if (reserveTokens !== undefined) {
|
||||
return Math.max(
|
||||
reserveTokens,
|
||||
reserveTokensFloor ?? CODEX_APP_SERVER_NATIVE_THREAD_DEFAULT_RESERVE_TOKENS,
|
||||
);
|
||||
}
|
||||
return reserveTokensFloor ?? CODEX_APP_SERVER_NATIVE_THREAD_DEFAULT_RESERVE_TOKENS;
|
||||
}
|
||||
|
||||
function resolveCodexAppServerNativeThreadTokenFuse(params: {
|
||||
modelContextWindow: number | undefined;
|
||||
reserveTokens: number;
|
||||
projectedTurnTokens?: number;
|
||||
}): number {
|
||||
const projectedTurnTokens =
|
||||
typeof params.projectedTurnTokens === "number" &&
|
||||
Number.isFinite(params.projectedTurnTokens) &&
|
||||
params.projectedTurnTokens > 0
|
||||
? Math.floor(params.projectedTurnTokens)
|
||||
: 0;
|
||||
const contextWindow =
|
||||
params.modelContextWindow ?? CODEX_APP_SERVER_NATIVE_THREAD_FALLBACK_MAX_TOKENS;
|
||||
return Math.max(1, contextWindow - params.reserveTokens - projectedTurnTokens);
|
||||
}
|
||||
|
||||
function maxFiniteNumber(values: Array<number | undefined>): number | undefined {
|
||||
@@ -225,6 +263,16 @@ function maxFiniteNumber(values: Array<number | undefined>): number | undefined
|
||||
return Math.max(...nums);
|
||||
}
|
||||
|
||||
function minFiniteNumber(values: Array<number | undefined>): number | undefined {
|
||||
const nums = values.filter(
|
||||
(value): value is number => typeof value === "number" && Number.isFinite(value),
|
||||
);
|
||||
if (nums.length === 0) {
|
||||
return undefined;
|
||||
}
|
||||
return Math.min(...nums);
|
||||
}
|
||||
|
||||
function hasContextEngineThreadBootstrapProjection(binding: CodexAppServerThreadBinding): boolean {
|
||||
return binding.contextEngine?.projection?.mode === "thread_bootstrap";
|
||||
}
|
||||
@@ -236,50 +284,18 @@ export async function rotateOversizedCodexAppServerStartupBinding(params: {
|
||||
codexHome?: string;
|
||||
config: EmbeddedRunAttemptParams["config"] | undefined;
|
||||
contextEngineActive?: boolean;
|
||||
projectedTurnTokens?: number;
|
||||
}): Promise<CodexAppServerThreadBinding | undefined> {
|
||||
const binding = params.binding;
|
||||
if (!binding?.threadId) {
|
||||
return binding;
|
||||
}
|
||||
if (params.config?.agents?.defaults?.compaction?.truncateAfterCompaction !== true) {
|
||||
return binding;
|
||||
}
|
||||
if (params.contextEngineActive === true && hasContextEngineThreadBootstrapProjection(binding)) {
|
||||
embeddedAgentLog.debug(
|
||||
"codex app-server deferring native transcript size guard for context-engine thread bootstrap",
|
||||
{
|
||||
threadId: binding.threadId,
|
||||
engineId: binding.contextEngine?.engineId,
|
||||
epoch: binding.contextEngine?.projection?.epoch,
|
||||
fingerprint: binding.contextEngine?.projection?.fingerprint,
|
||||
},
|
||||
);
|
||||
return binding;
|
||||
}
|
||||
const sessionRecord = await readCodexSessionRecordForSessionFile(params.sessionFile);
|
||||
const maxBytes = parseCodexAppServerByteLimit(
|
||||
params.config?.agents?.defaults?.compaction?.maxActiveTranscriptBytes,
|
||||
);
|
||||
const rolloutFiles = await listCodexAppServerRolloutFilesForThread(
|
||||
params.agentDir,
|
||||
binding.threadId,
|
||||
params.codexHome,
|
||||
);
|
||||
if (maxBytes !== undefined) {
|
||||
const oversizedFiles = rolloutFiles.filter((file) => file.bytes >= maxBytes);
|
||||
if (oversizedFiles.length > 0) {
|
||||
embeddedAgentLog.warn(
|
||||
"codex app-server native transcript exceeded active byte limit; starting a fresh thread",
|
||||
{
|
||||
threadId: binding.threadId,
|
||||
maxBytes,
|
||||
files: oversizedFiles.map((file) => ({ path: file.path, bytes: file.bytes })),
|
||||
},
|
||||
);
|
||||
await clearCodexAppServerBinding(params.sessionFile);
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
const nativeTokenSnapshots = await Promise.all(
|
||||
rolloutFiles.map(async (file) => readCodexAppServerRolloutTokenSnapshot(file.path)),
|
||||
);
|
||||
@@ -289,7 +305,18 @@ export async function rotateOversizedCodexAppServerStartupBinding(params: {
|
||||
const nativeModelContextWindow = maxFiniteNumber(
|
||||
nativeTokenSnapshots.map((snapshot) => snapshot?.modelContextWindow),
|
||||
);
|
||||
const maxTokens = resolveCodexAppServerNativeThreadTokenFuse(nativeModelContextWindow);
|
||||
const sessionModelContextWindow =
|
||||
typeof sessionRecord?.contextTokens === "number" &&
|
||||
Number.isFinite(sessionRecord.contextTokens) &&
|
||||
sessionRecord.contextTokens > 0
|
||||
? Math.floor(sessionRecord.contextTokens)
|
||||
: undefined;
|
||||
const reserveTokens = resolveCodexAppServerNativeThreadReserveTokens(params.config);
|
||||
const maxTokens = resolveCodexAppServerNativeThreadTokenFuse({
|
||||
modelContextWindow: minFiniteNumber([nativeModelContextWindow, sessionModelContextWindow]),
|
||||
reserveTokens,
|
||||
projectedTurnTokens: params.projectedTurnTokens,
|
||||
});
|
||||
const sessionTokens =
|
||||
sessionRecord?.totalTokensFresh !== false &&
|
||||
typeof sessionRecord?.totalTokens === "number" &&
|
||||
@@ -307,11 +334,46 @@ export async function rotateOversizedCodexAppServerStartupBinding(params: {
|
||||
sessionTokens,
|
||||
nativeTokens,
|
||||
nativeModelContextWindow,
|
||||
sessionModelContextWindow,
|
||||
reserveTokens,
|
||||
projectedTurnTokens: params.projectedTurnTokens,
|
||||
},
|
||||
);
|
||||
await clearCodexAppServerBinding(params.sessionFile);
|
||||
return undefined;
|
||||
}
|
||||
const compaction = readCompactionConfig(params.config);
|
||||
if (compaction?.truncateAfterCompaction !== true) {
|
||||
return binding;
|
||||
}
|
||||
if (params.contextEngineActive === true && hasContextEngineThreadBootstrapProjection(binding)) {
|
||||
embeddedAgentLog.debug(
|
||||
"codex app-server deferring native transcript byte guard for context-engine thread bootstrap",
|
||||
{
|
||||
threadId: binding.threadId,
|
||||
engineId: binding.contextEngine?.engineId,
|
||||
epoch: binding.contextEngine?.projection?.epoch,
|
||||
fingerprint: binding.contextEngine?.projection?.fingerprint,
|
||||
},
|
||||
);
|
||||
return binding;
|
||||
}
|
||||
const maxBytes = parseCodexAppServerByteLimit(compaction.maxActiveTranscriptBytes);
|
||||
if (maxBytes !== undefined) {
|
||||
const oversizedFiles = rolloutFiles.filter((file) => file.bytes >= maxBytes);
|
||||
if (oversizedFiles.length > 0) {
|
||||
embeddedAgentLog.warn(
|
||||
"codex app-server native transcript exceeded active byte limit; starting a fresh thread",
|
||||
{
|
||||
threadId: binding.threadId,
|
||||
maxBytes,
|
||||
files: oversizedFiles.map((file) => ({ path: file.path, bytes: file.bytes })),
|
||||
},
|
||||
);
|
||||
await clearCodexAppServerBinding(params.sessionFile);
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
return binding;
|
||||
}
|
||||
|
||||
@@ -319,4 +381,5 @@ export const testing = {
|
||||
parseCodexAppServerByteLimit,
|
||||
readCodexAppServerRolloutTokenSnapshotLine,
|
||||
resolveCodexAppServerNativeThreadTokenFuse,
|
||||
resolveCodexAppServerNativeThreadReserveTokens,
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user