From b55d9fa4660bb684e7c6691032cd627ff495f84c Mon Sep 17 00:00:00 2001 From: Josh Lehman Date: Wed, 13 May 2026 11:50:03 -0700 Subject: [PATCH] fix(codex): rotate incompatible context-engine threads (#81223) * fix(codex): rotate incompatible context-engine threads * fix(codex): tighten context-engine sidecar policy * fix: type context-engine binding policy config --------- Co-authored-by: Josh Lehman --- CHANGELOG.md | 1 + .../run-attempt.context-engine.test.ts | 65 +++++ .../codex/src/app-server/run-attempt.test.ts | 174 ++++++++++++++ .../codex/src/app-server/run-attempt.ts | 225 ++++++++++++------ .../src/app-server/session-binding.test.ts | 21 ++ .../codex/src/app-server/session-binding.ts | 28 +++ .../codex/src/app-server/thread-lifecycle.ts | 112 ++++++++- 7 files changed, 546 insertions(+), 80 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 430e2ce99a9..c918e9b1664 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ Docs: https://docs.openclaw.ai - browser: enforce navigation checks for act interactions [AI]. (#81070) Thanks @pgondhi987. - Validate node exec event provenance [AI]. (#81071) Thanks @pgondhi987. - Gateway: keep active reply runs visible to stuck-session diagnostics and clear no-active-work recovery state, preventing stale queued lanes after compaction or tool failures. Fixes #80677. (#81302) +- Codex app-server: rotate incompatible context-engine-managed native threads so Lossless-managed sessions do not resume stale hidden Codex history. (#81223) Thanks @jalehman. - Gateway/OpenAI HTTP: return OpenAI-compatible 400 errors for invalid sampling params and provider validation failures instead of collapsing them to 500s. (#81275) Thanks @Lellansin. - Telegram: publish plugin and skill command description localizations to native command menus while filtering unsupported locale codes and preserving Telegram command limits. (#81351) Thanks @jzakirov. - Limit hook CLI tool authority [AI]. (#81065) Thanks @pgondhi987. diff --git a/extensions/codex/src/app-server/run-attempt.context-engine.test.ts b/extensions/codex/src/app-server/run-attempt.context-engine.test.ts index f49edb99ec9..13f3cbd1bed 100644 --- a/extensions/codex/src/app-server/run-attempt.context-engine.test.ts +++ b/extensions/codex/src/app-server/run-attempt.context-engine.test.ts @@ -11,6 +11,7 @@ import { import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { CodexServerNotification } from "./protocol.js"; import { runCodexAppServerAttempt, __testing } from "./run-attempt.js"; +import { readCodexAppServerBinding, writeCodexAppServerBinding } from "./session-binding.js"; import { createCodexTestModel } from "./test-support.js"; let tempDir: string; @@ -373,6 +374,70 @@ describe("runCodexAppServerAttempt context-engine lifecycle", () => { await run; }); + it("retries a resumed context-engine thread on a fresh Codex thread after early context overflow", async () => { + const sessionFile = path.join(tempDir, "session.jsonl"); + const workspaceDir = path.join(tempDir, "workspace"); + await writeCodexAppServerBinding(sessionFile, { + threadId: "thread-old", + cwd: workspaceDir, + dynamicToolsFingerprint: "[]", + contextEngine: { + schemaVersion: 1, + engineId: "lossless-claw", + policyFingerprint: + '{"schemaVersion":1,"engineId":"lossless-claw","ownsCompaction":true,"contextTokenBudget":400000,"projectionMaxChars":1000000}', + }, + }); + const contextEngine = createContextEngine(); + const harness = createStartedThreadHarness(async (method, requestParams) => { + const request = requireRecord(requestParams, `${method} params`); + if (method === "thread/resume") { + return threadStartResult("thread-old"); + } + if (method === "turn/start" && request.threadId === "thread-old") { + throw new Error("Codex ran out of room in the model's context window"); + } + if (method === "thread/start") { + return threadStartResult("thread-fresh"); + } + if (method === "turn/start" && request.threadId === "thread-fresh") { + return turnStartResult("turn-fresh"); + } + return undefined; + }); + const params = createParams(sessionFile, workspaceDir); + params.contextEngine = contextEngine; + params.contextTokenBudget = 400_000; + + const run = runCodexAppServerAttempt(params); + await vi.waitFor(() => + expect(harness.requests.map((request) => request.method)).toEqual([ + "thread/resume", + "turn/start", + "thread/start", + "turn/start", + ]), + ); + await harness.notify({ + method: "turn/completed", + params: { + threadId: "thread-fresh", + turnId: "turn-fresh", + turn: { + id: "turn-fresh", + status: "completed", + items: [{ type: "agentMessage", id: "msg-1", text: "fresh answer" }], + }, + }, + }); + const result = await run; + + expect(result.assistantTexts).toContain("fresh answer"); + const savedBinding = await readCodexAppServerBinding(sessionFile); + expect(savedBinding?.threadId).toBe("thread-fresh"); + expect(savedBinding?.contextEngine?.engineId).toBe("lossless-claw"); + }); + it("keeps current-turn context at the front of the Codex context-engine prompt", async () => { const sessionFile = path.join(tempDir, "session.jsonl"); const workspaceDir = path.join(tempDir, "workspace"); diff --git a/extensions/codex/src/app-server/run-attempt.test.ts b/extensions/codex/src/app-server/run-attempt.test.ts index 0e336e057b2..1ba818d556f 100644 --- a/extensions/codex/src/app-server/run-attempt.test.ts +++ b/extensions/codex/src/app-server/run-attempt.test.ts @@ -4470,6 +4470,180 @@ describe("runCodexAppServerAttempt", () => { expect(request.mock.calls.map(([method]) => method)).toEqual(["thread/start", "thread/resume"]); }); + it("starts a fresh Codex thread for legacy context-engine sidecars without metadata", async () => { + const sessionFile = path.join(tempDir, "session.jsonl"); + const workspaceDir = path.join(tempDir, "workspace"); + await writeExistingBinding(sessionFile, workspaceDir, { dynamicToolsFingerprint: "[]" }); + const params = createParams(sessionFile, workspaceDir); + params.contextEngine = { + info: { id: "lossless-claw", name: "Lossless Claw", ownsCompaction: true }, + assemble: vi.fn(), + compact: vi.fn(), + } as never; + params.contextTokenBudget = 400_000; + const appServer = createThreadLifecycleAppServerOptions(); + const request = vi.fn(async (method: string) => { + if (method === "thread/start") { + return threadStartResult("thread-fresh"); + } + throw new Error(`unexpected method: ${method}`); + }); + + const binding = await startOrResumeThread({ + client: { request } as never, + params, + cwd: workspaceDir, + dynamicTools: [], + appServer, + }); + + expect(binding.threadId).toBe("thread-fresh"); + expect(binding.lifecycle).toEqual({ + action: "started", + rotatedContextEngineBinding: true, + }); + expect(request.mock.calls.map(([method]) => method)).toEqual(["thread/start"]); + const savedBinding = await readCodexAppServerBinding(sessionFile); + expect(savedBinding?.contextEngine?.engineId).toBe("lossless-claw"); + expect(savedBinding?.contextEngine?.policyFingerprint).toContain('"contextTokenBudget":400000'); + }); + + it("resumes a Codex thread when context-engine sidecar metadata is compatible", async () => { + const sessionFile = path.join(tempDir, "session.jsonl"); + const workspaceDir = path.join(tempDir, "workspace"); + const contextEngine = { + schemaVersion: 1 as const, + engineId: "lossless-claw", + policyFingerprint: + '{"schemaVersion":1,"engineId":"lossless-claw","ownsCompaction":true,"contextTokenBudget":400000,"projectionMaxChars":1000000}', + }; + await writeExistingBinding(sessionFile, workspaceDir, { + dynamicToolsFingerprint: "[]", + contextEngine, + }); + const params = createParams(sessionFile, workspaceDir); + params.contextEngine = { + info: { id: "lossless-claw", name: "Lossless Claw", ownsCompaction: true }, + assemble: vi.fn(), + compact: vi.fn(), + } as never; + params.contextTokenBudget = 400_000; + const appServer = createThreadLifecycleAppServerOptions(); + const request = vi.fn(async (method: string) => { + if (method === "thread/resume") { + return threadStartResult("thread-existing"); + } + throw new Error(`unexpected method: ${method}`); + }); + + const binding = await startOrResumeThread({ + client: { request } as never, + params, + cwd: workspaceDir, + dynamicTools: [], + appServer, + }); + + expect(binding.threadId).toBe("thread-existing"); + expect(binding.lifecycle).toEqual({ action: "resumed" }); + expect(request.mock.calls.map(([method]) => method)).toEqual(["thread/resume"]); + }); + + it("starts a fresh Codex thread when context-engine sidecar metadata is no longer active", async () => { + const sessionFile = path.join(tempDir, "session.jsonl"); + const workspaceDir = path.join(tempDir, "workspace"); + await writeExistingBinding(sessionFile, workspaceDir, { + dynamicToolsFingerprint: "[]", + contextEngine: { + schemaVersion: 1, + engineId: "lossless-claw", + policyFingerprint: + '{"schemaVersion":1,"engineId":"lossless-claw","ownsCompaction":true,"contextTokenBudget":400000,"projectionMaxChars":1000000}', + }, + }); + const params = createParams(sessionFile, workspaceDir); + const appServer = createThreadLifecycleAppServerOptions(); + const request = vi.fn(async (method: string) => { + if (method === "thread/start") { + return threadStartResult("thread-fresh"); + } + throw new Error(`unexpected method: ${method}`); + }); + + const binding = await startOrResumeThread({ + client: { request } as never, + params, + cwd: workspaceDir, + dynamicTools: [], + appServer, + }); + + expect(binding.threadId).toBe("thread-fresh"); + expect(binding.lifecycle).toEqual({ + action: "started", + rotatedContextEngineBinding: true, + }); + expect(request.mock.calls.map(([method]) => method)).toEqual(["thread/start"]); + const savedBinding = await readCodexAppServerBinding(sessionFile); + expect(savedBinding?.contextEngine).toBeUndefined(); + }); + + it("starts a fresh Codex thread when context-engine policy metadata changes", async () => { + const sessionFile = path.join(tempDir, "session.jsonl"); + const workspaceDir = path.join(tempDir, "workspace"); + await writeExistingBinding(sessionFile, workspaceDir, { + dynamicToolsFingerprint: "[]", + contextEngine: { + schemaVersion: 1, + engineId: "lossless-claw", + policyFingerprint: + '{"schemaVersion":1,"engineId":"lossless-claw","engineVersion":"1.0.0","ownsCompaction":true,"turnMaintenanceMode":"foreground","citationsMode":"inline","contextTokenBudget":400000,"projectionMaxChars":1000000}', + }, + }); + const params = createParams(sessionFile, workspaceDir); + params.contextEngine = { + info: { + id: "lossless-claw", + name: "Lossless Claw", + version: "1.0.1", + ownsCompaction: true, + turnMaintenanceMode: "foreground", + }, + assemble: vi.fn(), + compact: vi.fn(), + } as never; + params.config = { memory: { citations: "inline" } } as never; + params.contextTokenBudget = 400_000; + const appServer = createThreadLifecycleAppServerOptions(); + const request = vi.fn(async (method: string) => { + if (method === "thread/start") { + return threadStartResult("thread-fresh"); + } + throw new Error(`unexpected method: ${method}`); + }); + + const binding = await startOrResumeThread({ + client: { request } as never, + params, + cwd: workspaceDir, + dynamicTools: [], + appServer, + }); + + expect(binding.threadId).toBe("thread-fresh"); + expect(binding.lifecycle).toEqual({ + action: "started", + rotatedContextEngineBinding: true, + }); + expect(request.mock.calls.map(([method]) => method)).toEqual(["thread/start"]); + const savedBinding = await readCodexAppServerBinding(sessionFile); + expect(savedBinding?.contextEngine?.policyFingerprint).toContain('"engineVersion":"1.0.1"'); + expect(savedBinding?.contextEngine?.policyFingerprint).toContain( + '"turnMaintenanceMode":"foreground"', + ); + expect(savedBinding?.contextEngine?.policyFingerprint).toContain('"citationsMode":"inline"'); + }); + it("keeps the previous dynamic tool fingerprint for transient no-tool maintenance turns", async () => { const sessionFile = path.join(tempDir, "session.jsonl"); const workspaceDir = path.join(tempDir, "workspace"); diff --git a/extensions/codex/src/app-server/run-attempt.ts b/extensions/codex/src/app-server/run-attempt.ts index 774b683856f..3672c1be21a 100644 --- a/extensions/codex/src/app-server/run-attempt.ts +++ b/extensions/codex/src/app-server/run-attempt.ts @@ -111,7 +111,11 @@ import { resolveCodexUsageLimitResetAtMs, shouldRefreshCodexRateLimitsForUsageLimitMessage, } from "./rate-limits.js"; -import { readCodexAppServerBinding, type CodexAppServerThreadBinding } from "./session-binding.js"; +import { + clearCodexAppServerBinding, + readCodexAppServerBinding, + type CodexAppServerThreadBinding, +} from "./session-binding.js"; import { readCodexMirroredSessionHistoryMessages } from "./session-history.js"; import { clearSharedCodexAppServerClientIfCurrent } from "./shared-client.js"; import { @@ -120,6 +124,7 @@ import { buildTurnStartParams, codexDynamicToolsFingerprint, startOrResumeThread, + type CodexAppServerThreadLifecycleBinding, } from "./thread-lifecycle.js"; import { inferCodexDynamicToolMeta, @@ -670,10 +675,13 @@ export async function runCodexAppServerAttempt( tools: toolBridge.specs, }); let client: CodexAppServerClient; - let thread: CodexAppServerThreadBinding; + let thread: CodexAppServerThreadLifecycleBinding; let trajectoryEndRecorded = false; let nativeHookRelay: NativeHookRelayRegistrationHandle | undefined; let startupClientForCleanup: CodexAppServerClient | undefined; + let restartContextEngineCodexThread: + | (() => Promise) + | undefined; const startupTimeoutMs = resolveCodexStartupTimeoutMs({ timeoutMs: params.timeoutMs, timeoutFloorMs: options.startupTimeoutFloorMs, @@ -756,7 +764,7 @@ export async function runCodexAppServerAttempt( timeoutMs: appServer.requestTimeoutMs, signal: runAbortController.signal, }); - const startupThread = await startOrResumeThread({ + const threadLifecycleParams = { client: startupClient, params: runtimeParams, cwd: effectiveWorkspace, @@ -782,7 +790,9 @@ export async function runCodexAppServerAttempt( }), } : undefined, - }); + } satisfies Parameters[0]; + restartContextEngineCodexThread = () => startOrResumeThread(threadLifecycleParams); + const startupThread = await startOrResumeThread(threadLifecycleParams); return { client: startupClient, thread: startupThread }; }; for ( @@ -1444,17 +1454,9 @@ export async function runCodexAppServerAttempt( }, ]; - let turn: CodexTurnStartResponse; - try { - runAgentHarnessLlmInputHook({ - event: llmInputEvent, - ctx: hookContext, - }); - emitCodexAppServerEvent(params, { - stream: "codex_app_server.lifecycle", - data: { phase: "turn_starting", threadId: thread.threadId }, - }); - turn = assertCodexTurnStartResponse( + let turn: CodexTurnStartResponse | undefined; + const startCodexTurn = async (): Promise => + assertCodexTurnStartResponse( await client.request( "turn/start", buildTurnStartParams(params, { @@ -1466,78 +1468,121 @@ export async function runCodexAppServerAttempt( { timeoutMs: params.timeoutMs, signal: runAbortController.signal }, ), ); - } catch (error) { - const usageLimitError = await formatCodexTurnStartUsageLimitError({ - client, - error, - pendingNotifications, - timeoutMs: appServer.requestTimeoutMs, - signal: runAbortController.signal, + try { + runAgentHarnessLlmInputHook({ + event: llmInputEvent, + ctx: hookContext, }); - const turnStartErrorMessage = usageLimitError?.message ?? formatErrorMessage(error); emitCodexAppServerEvent(params, { stream: "codex_app_server.lifecycle", - data: { phase: "turn_start_failed", error: turnStartErrorMessage }, + data: { phase: "turn_starting", threadId: thread.threadId }, }); - trajectoryRecorder?.recordEvent("session.ended", { - status: "error", - threadId: thread.threadId, - timedOut, - aborted: runAbortController.signal.aborted, - promptError: turnStartErrorMessage, - }); - trajectoryEndRecorded = true; - runAgentHarnessLlmOutputHook({ - event: { + turn = await startCodexTurn(); + } catch (error) { + let turnStartError = error; + if ( + shouldRetryContextEngineTurnOnFreshCodexThread({ + error: turnStartError, + contextEngineActive: Boolean(activeContextEngine), + thread, + }) && + restartContextEngineCodexThread + ) { + embeddedAgentLog.warn( + "codex app-server context-engine turn overflowed on resume; retrying with fresh thread", + { + threadId: thread.threadId, + error: formatErrorMessage(turnStartError), + }, + ); + await clearCodexAppServerBinding(params.sessionFile); + thread = await restartContextEngineCodexThread(); + emitCodexAppServerEvent(params, { + stream: "codex_app_server.lifecycle", + data: { phase: "thread_ready_retry", threadId: thread.threadId }, + }); + try { + turn = await startCodexTurn(); + } catch (retryError) { + turnStartError = retryError; + } + } + if (turn === undefined) { + const usageLimitError = await formatCodexTurnStartUsageLimitError({ + client, + error: turnStartError, + pendingNotifications, + timeoutMs: appServer.requestTimeoutMs, + signal: runAbortController.signal, + }); + const turnStartErrorMessage = usageLimitError?.message ?? formatErrorMessage(turnStartError); + emitCodexAppServerEvent(params, { + stream: "codex_app_server.lifecycle", + data: { phase: "turn_start_failed", error: turnStartErrorMessage }, + }); + trajectoryRecorder?.recordEvent("session.ended", { + status: "error", + threadId: thread.threadId, + timedOut, + aborted: runAbortController.signal.aborted, + promptError: turnStartErrorMessage, + }); + trajectoryEndRecorded = true; + runAgentHarnessLlmOutputHook({ + event: { + runId: params.runId, + sessionId: params.sessionId, + provider: params.provider, + model: params.modelId, + resolvedRef: + params.runtimePlan?.observability.resolvedRef ?? `${params.provider}/${params.modelId}`, + ...(params.runtimePlan?.observability.harnessId + ? { harnessId: params.runtimePlan.observability.harnessId } + : {}), + assistantTexts: [], + }, + ctx: hookContext, + }); + runAgentHarnessAgentEndHook({ + event: { + messages: turnStartFailureMessages, + success: false, + error: turnStartErrorMessage, + durationMs: Date.now() - attemptStartedAt, + }, + ctx: hookContext, + }); + notificationCleanup(); + requestCleanup(); + nativeHookRelay?.unregister(); + await runAgentCleanupStep({ runId: params.runId, sessionId: params.sessionId, - provider: params.provider, - model: params.modelId, - resolvedRef: - params.runtimePlan?.observability.resolvedRef ?? `${params.provider}/${params.modelId}`, - ...(params.runtimePlan?.observability.harnessId - ? { harnessId: params.runtimePlan.observability.harnessId } - : {}), - assistantTexts: [], - }, - ctx: hookContext, - }); - runAgentHarnessAgentEndHook({ - event: { - messages: turnStartFailureMessages, - success: false, - error: turnStartErrorMessage, - durationMs: Date.now() - attemptStartedAt, - }, - ctx: hookContext, - }); - notificationCleanup(); - requestCleanup(); - nativeHookRelay?.unregister(); - await runAgentCleanupStep({ - runId: params.runId, - sessionId: params.sessionId, - step: "codex-trajectory-flush-startup-failure", - log: embeddedAgentLog, - cleanup: async () => { - await trajectoryRecorder?.flush(); - }, - }); - params.abortSignal?.removeEventListener("abort", abortFromUpstream); - if (usageLimitError) { - await markCodexAuthProfileBlockedFromRateLimits({ - params, - authProfileId: startupAuthProfileId, - rateLimits: usageLimitError.rateLimitsForProfile, - }); - return buildCodexTurnStartFailureResult({ - params, - message: usageLimitError.message, - messagesSnapshot: turnStartFailureMessages, - systemPromptReport, + step: "codex-trajectory-flush-startup-failure", + log: embeddedAgentLog, + cleanup: async () => { + await trajectoryRecorder?.flush(); + }, }); + params.abortSignal?.removeEventListener("abort", abortFromUpstream); + if (usageLimitError) { + await markCodexAuthProfileBlockedFromRateLimits({ + params, + authProfileId: startupAuthProfileId, + rateLimits: usageLimitError.rateLimitsForProfile, + }); + return buildCodexTurnStartFailureResult({ + params, + message: usageLimitError.message, + messagesSnapshot: turnStartFailureMessages, + systemPromptReport, + }); + } + throw turnStartError; } - throw error; + } + if (!turn) { + throw new Error("codex app-server turn/start failed without an error"); } turnId = turn.turn.id; const activeTurnId = turn.turn.id; @@ -3056,6 +3101,28 @@ function isNonEmptyString(value: unknown): value is string { return typeof value === "string" && value.length > 0; } +function shouldRetryContextEngineTurnOnFreshCodexThread(params: { + error: unknown; + contextEngineActive: boolean; + thread: CodexAppServerThreadLifecycleBinding; +}): boolean { + if (!params.contextEngineActive || params.thread.lifecycle.action !== "resumed") { + return false; + } + return isCodexContextWindowError(params.error); +} + +function isCodexContextWindowError(error: unknown): boolean { + const message = formatErrorMessage(error); + return ( + /ran out of room in the model'?s context window/iu.test(message) || + /context window/iu.test(message) || + /context length/iu.test(message) || + /maximum context/iu.test(message) || + /too many tokens/iu.test(message) + ); +} + function readCodexNotificationItem(params: JsonValue | undefined): CodexThreadItem | undefined { if (!isJsonObject(params) || !isJsonObject(params.item)) { return undefined; diff --git a/extensions/codex/src/app-server/session-binding.test.ts b/extensions/codex/src/app-server/session-binding.test.ts index 130a6d81ac4..44d55e4d8cd 100644 --- a/extensions/codex/src/app-server/session-binding.test.ts +++ b/extensions/codex/src/app-server/session-binding.test.ts @@ -102,6 +102,27 @@ describe("codex app-server session binding", () => { expect(binding?.pluginAppPolicyContext).toEqual(pluginAppPolicyContext); }); + it("round-trips context-engine binding metadata", async () => { + const sessionFile = path.join(tempDir, "session.json"); + await writeCodexAppServerBinding(sessionFile, { + threadId: "thread-123", + cwd: tempDir, + contextEngine: { + schemaVersion: 1, + engineId: "lossless-claw", + policyFingerprint: "lossless-policy-1", + }, + }); + + const binding = await readCodexAppServerBinding(sessionFile); + + expect(binding?.contextEngine).toEqual({ + schemaVersion: 1, + engineId: "lossless-claw", + policyFingerprint: "lossless-policy-1", + }); + }); + it("rejects old plugin app policy entries that duplicate the app id", async () => { const sessionFile = path.join(tempDir, "session.json"); await fs.writeFile( diff --git a/extensions/codex/src/app-server/session-binding.ts b/extensions/codex/src/app-server/session-binding.ts index 4ac8fba53db..039f3e1f0a1 100644 --- a/extensions/codex/src/app-server/session-binding.ts +++ b/extensions/codex/src/app-server/session-binding.ts @@ -43,10 +43,17 @@ export type CodexAppServerThreadBinding = { pluginAppsFingerprint?: string; pluginAppsInputFingerprint?: string; pluginAppPolicyContext?: PluginAppPolicyContext; + contextEngine?: CodexAppServerContextEngineBinding; createdAt: string; updatedAt: string; }; +export type CodexAppServerContextEngineBinding = { + schemaVersion: 1; + engineId: string; + policyFingerprint: string; +}; + export function resolveCodexAppServerBindingPath(sessionFile: string): string { return `${sessionFile}.codex-app-server.json`; } @@ -99,6 +106,7 @@ export async function readCodexAppServerBinding( ? parsed.pluginAppsInputFingerprint : undefined, pluginAppPolicyContext: readPluginAppPolicyContext(parsed.pluginAppPolicyContext), + contextEngine: readContextEngineBinding(parsed.contextEngine), createdAt: typeof parsed.createdAt === "string" ? parsed.createdAt : new Date().toISOString(), updatedAt: typeof parsed.updatedAt === "string" ? parsed.updatedAt : new Date().toISOString(), }; @@ -138,6 +146,7 @@ export async function writeCodexAppServerBinding( pluginAppsFingerprint: binding.pluginAppsFingerprint, pluginAppsInputFingerprint: binding.pluginAppsInputFingerprint, pluginAppPolicyContext: binding.pluginAppPolicyContext, + contextEngine: binding.contextEngine, createdAt: binding.createdAt ?? now, updatedAt: now, }; @@ -147,6 +156,25 @@ export async function writeCodexAppServerBinding( ); } +function readContextEngineBinding(value: unknown): CodexAppServerContextEngineBinding | undefined { + if (!value || typeof value !== "object" || Array.isArray(value)) { + return undefined; + } + const record = value as Record; + if ( + record.schemaVersion !== 1 || + typeof record.engineId !== "string" || + typeof record.policyFingerprint !== "string" + ) { + return undefined; + } + return { + schemaVersion: 1, + engineId: record.engineId, + policyFingerprint: record.policyFingerprint, + }; +} + function readPluginAppPolicyContext(value: unknown): PluginAppPolicyContext | undefined { if (!value || typeof value !== "object" || Array.isArray(value)) { return undefined; diff --git a/extensions/codex/src/app-server/thread-lifecycle.ts b/extensions/codex/src/app-server/thread-lifecycle.ts index fc18109f36f..2ebef8423ea 100644 --- a/extensions/codex/src/app-server/thread-lifecycle.ts +++ b/extensions/codex/src/app-server/thread-lifecycle.ts @@ -1,5 +1,6 @@ import { embeddedAgentLog, + isActiveHarnessContextEngine, type EmbeddedRunAttemptParams, } from "openclaw/plugin-sdk/agent-harness-runtime"; import { @@ -9,6 +10,10 @@ import { import { isModernCodexModel } from "../../provider.js"; import { isCodexAppServerConnectionClosedError, type CodexAppServerClient } from "./client.js"; import { codexSandboxPolicyForTurn, type CodexAppServerRuntimeOptions } from "./config.js"; +import { + resolveCodexContextEngineProjectionMaxChars, + resolveCodexContextEngineProjectionReserveTokens, +} from "./context-engine-projection.js"; import { isCodexPluginThreadBindingStale, mergeCodexThreadConfigs, @@ -34,9 +39,19 @@ import { readCodexAppServerBinding, writeCodexAppServerBinding, type CodexAppServerAuthProfileLookup, + type CodexAppServerContextEngineBinding, type CodexAppServerThreadBinding, } from "./session-binding.js"; +export type CodexAppServerThreadLifecycle = { + action: "started" | "resumed"; + rotatedContextEngineBinding?: boolean; +}; + +export type CodexAppServerThreadLifecycleBinding = CodexAppServerThreadBinding & { + lifecycle: CodexAppServerThreadLifecycle; +}; + export type CodexPluginThreadConfigProvider = { enabled: boolean; inputFingerprint?: string; @@ -58,15 +73,35 @@ export async function startOrResumeThread(params: { developerInstructions?: string; config?: JsonObject; pluginThreadConfig?: CodexPluginThreadConfigProvider; -}): Promise { +}): Promise { const dynamicToolsFingerprint = fingerprintDynamicTools(params.dynamicTools); + const contextEngineBinding = buildContextEngineBinding(params.params); let binding = await readCodexAppServerBinding(params.params.sessionFile, { authProfileStore: params.params.authProfileStore, agentDir: params.params.agentDir, config: params.params.config, }); let preserveExistingBinding = false; + let rotatedContextEngineBinding = false; let prebuiltPluginThreadConfig: CodexPluginThreadConfig | undefined; + if (binding?.threadId && (binding.contextEngine || contextEngineBinding)) { + if ( + !contextEngineBinding || + !isContextEngineBindingCompatible(binding.contextEngine, contextEngineBinding) + ) { + embeddedAgentLog.debug( + "codex app-server context-engine binding changed; starting a new thread", + { + threadId: binding.threadId, + engineId: contextEngineBinding?.engineId, + previousEngineId: binding.contextEngine?.engineId, + }, + ); + await clearCodexAppServerBinding(params.params.sessionFile); + binding = undefined; + rotatedContextEngineBinding = true; + } + } if (binding?.threadId) { let pluginBindingStale = isCodexPluginThreadBindingStale({ codexPluginsEnabled: params.pluginThreadConfig?.enabled ?? false, @@ -166,6 +201,7 @@ export async function startOrResumeThread(params: { pluginAppsFingerprint: binding.pluginAppsFingerprint, pluginAppsInputFingerprint: binding.pluginAppsInputFingerprint, pluginAppPolicyContext: binding.pluginAppPolicyContext, + contextEngine: contextEngineBinding, createdAt: binding.createdAt, }, { @@ -185,6 +221,8 @@ export async function startOrResumeThread(params: { pluginAppsFingerprint: binding.pluginAppsFingerprint, pluginAppsInputFingerprint: binding.pluginAppsInputFingerprint, pluginAppPolicyContext: binding.pluginAppPolicyContext, + contextEngine: contextEngineBinding, + lifecycle: { action: "resumed" }, }; } catch (error) { if (isCodexAppServerConnectionClosedError(error)) { @@ -235,6 +273,7 @@ export async function startOrResumeThread(params: { pluginAppsFingerprint: pluginThreadConfig?.fingerprint, pluginAppsInputFingerprint: pluginThreadConfig?.inputFingerprint, pluginAppPolicyContext: pluginThreadConfig?.policyContext, + contextEngine: contextEngineBinding, createdAt, }, { @@ -256,11 +295,82 @@ export async function startOrResumeThread(params: { pluginAppsFingerprint: pluginThreadConfig?.fingerprint, pluginAppsInputFingerprint: pluginThreadConfig?.inputFingerprint, pluginAppPolicyContext: pluginThreadConfig?.policyContext, + contextEngine: contextEngineBinding, createdAt, updatedAt: createdAt, + lifecycle: { + action: "started", + ...(rotatedContextEngineBinding ? { rotatedContextEngineBinding } : {}), + }, }; } +function buildContextEngineBinding( + params: EmbeddedRunAttemptParams, +): CodexAppServerContextEngineBinding | undefined { + const contextEngine = isActiveHarnessContextEngine(params.contextEngine) + ? params.contextEngine + : undefined; + const engineId = contextEngine?.info?.id?.trim(); + if (!contextEngine || !engineId) { + return undefined; + } + return { + schemaVersion: 1, + engineId, + policyFingerprint: JSON.stringify({ + schemaVersion: 1, + engineId, + engineVersion: contextEngine.info.version, + ownsCompaction: contextEngine.info.ownsCompaction === true, + turnMaintenanceMode: contextEngine.info.turnMaintenanceMode, + citationsMode: resolveContextEngineCitationsMode(params.config), + contextTokenBudget: params.contextTokenBudget, + projectionMaxChars: resolveCodexContextEngineProjectionMaxChars({ + contextTokenBudget: params.contextTokenBudget, + reserveTokens: resolveCodexContextEngineProjectionReserveTokens({ + config: params.config, + }), + }), + }), + }; +} + +function isContextEngineBindingCompatible( + previous: CodexAppServerContextEngineBinding | undefined, + next: CodexAppServerContextEngineBinding, +): boolean { + return ( + previous?.schemaVersion === next.schemaVersion && + previous.engineId === next.engineId && + previous.policyFingerprint === next.policyFingerprint + ); +} + +function resolveContextEngineCitationsMode(config: unknown): JsonValue | undefined { + const rootConfig = isUnknownRecord(config) ? config : undefined; + const memoryConfig = isUnknownRecord(rootConfig?.memory) ? rootConfig.memory : undefined; + const citations = memoryConfig?.citations; + return isJsonConfigValue(citations) ? citations : undefined; +} + +function isUnknownRecord(value: unknown): value is Record { + return Boolean(value && typeof value === "object" && !Array.isArray(value)); +} + +function isJsonConfigValue(value: unknown): value is JsonValue { + if (value === null || typeof value === "string" || typeof value === "boolean") { + return true; + } + if (typeof value === "number") { + return Number.isFinite(value); + } + if (Array.isArray(value)) { + return value.every(isJsonConfigValue); + } + return isUnknownRecord(value) && Object.values(value).every(isJsonConfigValue); +} + function shouldRecheckRecoverablePluginBinding(params: { binding: CodexAppServerThreadBinding; pluginThreadConfig?: CodexPluginThreadConfigProvider;