mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-18 16:44:45 +00:00
fix(codex): rotate incompatible context-engine threads (#81223)
* fix(codex): rotate incompatible context-engine threads * fix(codex): tighten context-engine sidecar policy * fix: type context-engine binding policy config --------- Co-authored-by: Josh Lehman <phaedrus@Mac.hsd1.ca.comcast.net>
This commit is contained in:
@@ -43,6 +43,7 @@ Docs: https://docs.openclaw.ai
|
||||
- browser: enforce navigation checks for act interactions [AI]. (#81070) Thanks @pgondhi987.
|
||||
- Validate node exec event provenance [AI]. (#81071) Thanks @pgondhi987.
|
||||
- Gateway: keep active reply runs visible to stuck-session diagnostics and clear no-active-work recovery state, preventing stale queued lanes after compaction or tool failures. Fixes #80677. (#81302)
|
||||
- Codex app-server: rotate incompatible context-engine-managed native threads so Lossless-managed sessions do not resume stale hidden Codex history. (#81223) Thanks @jalehman.
|
||||
- Gateway/OpenAI HTTP: return OpenAI-compatible 400 errors for invalid sampling params and provider validation failures instead of collapsing them to 500s. (#81275) Thanks @Lellansin.
|
||||
- Telegram: publish plugin and skill command description localizations to native command menus while filtering unsupported locale codes and preserving Telegram command limits. (#81351) Thanks @jzakirov.
|
||||
- Limit hook CLI tool authority [AI]. (#81065) Thanks @pgondhi987.
|
||||
|
||||
@@ -11,6 +11,7 @@ import {
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { CodexServerNotification } from "./protocol.js";
|
||||
import { runCodexAppServerAttempt, __testing } from "./run-attempt.js";
|
||||
import { readCodexAppServerBinding, writeCodexAppServerBinding } from "./session-binding.js";
|
||||
import { createCodexTestModel } from "./test-support.js";
|
||||
|
||||
let tempDir: string;
|
||||
@@ -373,6 +374,70 @@ describe("runCodexAppServerAttempt context-engine lifecycle", () => {
|
||||
await run;
|
||||
});
|
||||
|
||||
it("retries a resumed context-engine thread on a fresh Codex thread after early context overflow", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
await writeCodexAppServerBinding(sessionFile, {
|
||||
threadId: "thread-old",
|
||||
cwd: workspaceDir,
|
||||
dynamicToolsFingerprint: "[]",
|
||||
contextEngine: {
|
||||
schemaVersion: 1,
|
||||
engineId: "lossless-claw",
|
||||
policyFingerprint:
|
||||
'{"schemaVersion":1,"engineId":"lossless-claw","ownsCompaction":true,"contextTokenBudget":400000,"projectionMaxChars":1000000}',
|
||||
},
|
||||
});
|
||||
const contextEngine = createContextEngine();
|
||||
const harness = createStartedThreadHarness(async (method, requestParams) => {
|
||||
const request = requireRecord(requestParams, `${method} params`);
|
||||
if (method === "thread/resume") {
|
||||
return threadStartResult("thread-old");
|
||||
}
|
||||
if (method === "turn/start" && request.threadId === "thread-old") {
|
||||
throw new Error("Codex ran out of room in the model's context window");
|
||||
}
|
||||
if (method === "thread/start") {
|
||||
return threadStartResult("thread-fresh");
|
||||
}
|
||||
if (method === "turn/start" && request.threadId === "thread-fresh") {
|
||||
return turnStartResult("turn-fresh");
|
||||
}
|
||||
return undefined;
|
||||
});
|
||||
const params = createParams(sessionFile, workspaceDir);
|
||||
params.contextEngine = contextEngine;
|
||||
params.contextTokenBudget = 400_000;
|
||||
|
||||
const run = runCodexAppServerAttempt(params);
|
||||
await vi.waitFor(() =>
|
||||
expect(harness.requests.map((request) => request.method)).toEqual([
|
||||
"thread/resume",
|
||||
"turn/start",
|
||||
"thread/start",
|
||||
"turn/start",
|
||||
]),
|
||||
);
|
||||
await harness.notify({
|
||||
method: "turn/completed",
|
||||
params: {
|
||||
threadId: "thread-fresh",
|
||||
turnId: "turn-fresh",
|
||||
turn: {
|
||||
id: "turn-fresh",
|
||||
status: "completed",
|
||||
items: [{ type: "agentMessage", id: "msg-1", text: "fresh answer" }],
|
||||
},
|
||||
},
|
||||
});
|
||||
const result = await run;
|
||||
|
||||
expect(result.assistantTexts).toContain("fresh answer");
|
||||
const savedBinding = await readCodexAppServerBinding(sessionFile);
|
||||
expect(savedBinding?.threadId).toBe("thread-fresh");
|
||||
expect(savedBinding?.contextEngine?.engineId).toBe("lossless-claw");
|
||||
});
|
||||
|
||||
it("keeps current-turn context at the front of the Codex context-engine prompt", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
|
||||
@@ -4470,6 +4470,180 @@ describe("runCodexAppServerAttempt", () => {
|
||||
expect(request.mock.calls.map(([method]) => method)).toEqual(["thread/start", "thread/resume"]);
|
||||
});
|
||||
|
||||
it("starts a fresh Codex thread for legacy context-engine sidecars without metadata", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
await writeExistingBinding(sessionFile, workspaceDir, { dynamicToolsFingerprint: "[]" });
|
||||
const params = createParams(sessionFile, workspaceDir);
|
||||
params.contextEngine = {
|
||||
info: { id: "lossless-claw", name: "Lossless Claw", ownsCompaction: true },
|
||||
assemble: vi.fn(),
|
||||
compact: vi.fn(),
|
||||
} as never;
|
||||
params.contextTokenBudget = 400_000;
|
||||
const appServer = createThreadLifecycleAppServerOptions();
|
||||
const request = vi.fn(async (method: string) => {
|
||||
if (method === "thread/start") {
|
||||
return threadStartResult("thread-fresh");
|
||||
}
|
||||
throw new Error(`unexpected method: ${method}`);
|
||||
});
|
||||
|
||||
const binding = await startOrResumeThread({
|
||||
client: { request } as never,
|
||||
params,
|
||||
cwd: workspaceDir,
|
||||
dynamicTools: [],
|
||||
appServer,
|
||||
});
|
||||
|
||||
expect(binding.threadId).toBe("thread-fresh");
|
||||
expect(binding.lifecycle).toEqual({
|
||||
action: "started",
|
||||
rotatedContextEngineBinding: true,
|
||||
});
|
||||
expect(request.mock.calls.map(([method]) => method)).toEqual(["thread/start"]);
|
||||
const savedBinding = await readCodexAppServerBinding(sessionFile);
|
||||
expect(savedBinding?.contextEngine?.engineId).toBe("lossless-claw");
|
||||
expect(savedBinding?.contextEngine?.policyFingerprint).toContain('"contextTokenBudget":400000');
|
||||
});
|
||||
|
||||
it("resumes a Codex thread when context-engine sidecar metadata is compatible", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
const contextEngine = {
|
||||
schemaVersion: 1 as const,
|
||||
engineId: "lossless-claw",
|
||||
policyFingerprint:
|
||||
'{"schemaVersion":1,"engineId":"lossless-claw","ownsCompaction":true,"contextTokenBudget":400000,"projectionMaxChars":1000000}',
|
||||
};
|
||||
await writeExistingBinding(sessionFile, workspaceDir, {
|
||||
dynamicToolsFingerprint: "[]",
|
||||
contextEngine,
|
||||
});
|
||||
const params = createParams(sessionFile, workspaceDir);
|
||||
params.contextEngine = {
|
||||
info: { id: "lossless-claw", name: "Lossless Claw", ownsCompaction: true },
|
||||
assemble: vi.fn(),
|
||||
compact: vi.fn(),
|
||||
} as never;
|
||||
params.contextTokenBudget = 400_000;
|
||||
const appServer = createThreadLifecycleAppServerOptions();
|
||||
const request = vi.fn(async (method: string) => {
|
||||
if (method === "thread/resume") {
|
||||
return threadStartResult("thread-existing");
|
||||
}
|
||||
throw new Error(`unexpected method: ${method}`);
|
||||
});
|
||||
|
||||
const binding = await startOrResumeThread({
|
||||
client: { request } as never,
|
||||
params,
|
||||
cwd: workspaceDir,
|
||||
dynamicTools: [],
|
||||
appServer,
|
||||
});
|
||||
|
||||
expect(binding.threadId).toBe("thread-existing");
|
||||
expect(binding.lifecycle).toEqual({ action: "resumed" });
|
||||
expect(request.mock.calls.map(([method]) => method)).toEqual(["thread/resume"]);
|
||||
});
|
||||
|
||||
it("starts a fresh Codex thread when context-engine sidecar metadata is no longer active", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
await writeExistingBinding(sessionFile, workspaceDir, {
|
||||
dynamicToolsFingerprint: "[]",
|
||||
contextEngine: {
|
||||
schemaVersion: 1,
|
||||
engineId: "lossless-claw",
|
||||
policyFingerprint:
|
||||
'{"schemaVersion":1,"engineId":"lossless-claw","ownsCompaction":true,"contextTokenBudget":400000,"projectionMaxChars":1000000}',
|
||||
},
|
||||
});
|
||||
const params = createParams(sessionFile, workspaceDir);
|
||||
const appServer = createThreadLifecycleAppServerOptions();
|
||||
const request = vi.fn(async (method: string) => {
|
||||
if (method === "thread/start") {
|
||||
return threadStartResult("thread-fresh");
|
||||
}
|
||||
throw new Error(`unexpected method: ${method}`);
|
||||
});
|
||||
|
||||
const binding = await startOrResumeThread({
|
||||
client: { request } as never,
|
||||
params,
|
||||
cwd: workspaceDir,
|
||||
dynamicTools: [],
|
||||
appServer,
|
||||
});
|
||||
|
||||
expect(binding.threadId).toBe("thread-fresh");
|
||||
expect(binding.lifecycle).toEqual({
|
||||
action: "started",
|
||||
rotatedContextEngineBinding: true,
|
||||
});
|
||||
expect(request.mock.calls.map(([method]) => method)).toEqual(["thread/start"]);
|
||||
const savedBinding = await readCodexAppServerBinding(sessionFile);
|
||||
expect(savedBinding?.contextEngine).toBeUndefined();
|
||||
});
|
||||
|
||||
it("starts a fresh Codex thread when context-engine policy metadata changes", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
await writeExistingBinding(sessionFile, workspaceDir, {
|
||||
dynamicToolsFingerprint: "[]",
|
||||
contextEngine: {
|
||||
schemaVersion: 1,
|
||||
engineId: "lossless-claw",
|
||||
policyFingerprint:
|
||||
'{"schemaVersion":1,"engineId":"lossless-claw","engineVersion":"1.0.0","ownsCompaction":true,"turnMaintenanceMode":"foreground","citationsMode":"inline","contextTokenBudget":400000,"projectionMaxChars":1000000}',
|
||||
},
|
||||
});
|
||||
const params = createParams(sessionFile, workspaceDir);
|
||||
params.contextEngine = {
|
||||
info: {
|
||||
id: "lossless-claw",
|
||||
name: "Lossless Claw",
|
||||
version: "1.0.1",
|
||||
ownsCompaction: true,
|
||||
turnMaintenanceMode: "foreground",
|
||||
},
|
||||
assemble: vi.fn(),
|
||||
compact: vi.fn(),
|
||||
} as never;
|
||||
params.config = { memory: { citations: "inline" } } as never;
|
||||
params.contextTokenBudget = 400_000;
|
||||
const appServer = createThreadLifecycleAppServerOptions();
|
||||
const request = vi.fn(async (method: string) => {
|
||||
if (method === "thread/start") {
|
||||
return threadStartResult("thread-fresh");
|
||||
}
|
||||
throw new Error(`unexpected method: ${method}`);
|
||||
});
|
||||
|
||||
const binding = await startOrResumeThread({
|
||||
client: { request } as never,
|
||||
params,
|
||||
cwd: workspaceDir,
|
||||
dynamicTools: [],
|
||||
appServer,
|
||||
});
|
||||
|
||||
expect(binding.threadId).toBe("thread-fresh");
|
||||
expect(binding.lifecycle).toEqual({
|
||||
action: "started",
|
||||
rotatedContextEngineBinding: true,
|
||||
});
|
||||
expect(request.mock.calls.map(([method]) => method)).toEqual(["thread/start"]);
|
||||
const savedBinding = await readCodexAppServerBinding(sessionFile);
|
||||
expect(savedBinding?.contextEngine?.policyFingerprint).toContain('"engineVersion":"1.0.1"');
|
||||
expect(savedBinding?.contextEngine?.policyFingerprint).toContain(
|
||||
'"turnMaintenanceMode":"foreground"',
|
||||
);
|
||||
expect(savedBinding?.contextEngine?.policyFingerprint).toContain('"citationsMode":"inline"');
|
||||
});
|
||||
|
||||
it("keeps the previous dynamic tool fingerprint for transient no-tool maintenance turns", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
|
||||
@@ -111,7 +111,11 @@ import {
|
||||
resolveCodexUsageLimitResetAtMs,
|
||||
shouldRefreshCodexRateLimitsForUsageLimitMessage,
|
||||
} from "./rate-limits.js";
|
||||
import { readCodexAppServerBinding, type CodexAppServerThreadBinding } from "./session-binding.js";
|
||||
import {
|
||||
clearCodexAppServerBinding,
|
||||
readCodexAppServerBinding,
|
||||
type CodexAppServerThreadBinding,
|
||||
} from "./session-binding.js";
|
||||
import { readCodexMirroredSessionHistoryMessages } from "./session-history.js";
|
||||
import { clearSharedCodexAppServerClientIfCurrent } from "./shared-client.js";
|
||||
import {
|
||||
@@ -120,6 +124,7 @@ import {
|
||||
buildTurnStartParams,
|
||||
codexDynamicToolsFingerprint,
|
||||
startOrResumeThread,
|
||||
type CodexAppServerThreadLifecycleBinding,
|
||||
} from "./thread-lifecycle.js";
|
||||
import {
|
||||
inferCodexDynamicToolMeta,
|
||||
@@ -670,10 +675,13 @@ export async function runCodexAppServerAttempt(
|
||||
tools: toolBridge.specs,
|
||||
});
|
||||
let client: CodexAppServerClient;
|
||||
let thread: CodexAppServerThreadBinding;
|
||||
let thread: CodexAppServerThreadLifecycleBinding;
|
||||
let trajectoryEndRecorded = false;
|
||||
let nativeHookRelay: NativeHookRelayRegistrationHandle | undefined;
|
||||
let startupClientForCleanup: CodexAppServerClient | undefined;
|
||||
let restartContextEngineCodexThread:
|
||||
| (() => Promise<CodexAppServerThreadLifecycleBinding>)
|
||||
| undefined;
|
||||
const startupTimeoutMs = resolveCodexStartupTimeoutMs({
|
||||
timeoutMs: params.timeoutMs,
|
||||
timeoutFloorMs: options.startupTimeoutFloorMs,
|
||||
@@ -756,7 +764,7 @@ export async function runCodexAppServerAttempt(
|
||||
timeoutMs: appServer.requestTimeoutMs,
|
||||
signal: runAbortController.signal,
|
||||
});
|
||||
const startupThread = await startOrResumeThread({
|
||||
const threadLifecycleParams = {
|
||||
client: startupClient,
|
||||
params: runtimeParams,
|
||||
cwd: effectiveWorkspace,
|
||||
@@ -782,7 +790,9 @@ export async function runCodexAppServerAttempt(
|
||||
}),
|
||||
}
|
||||
: undefined,
|
||||
});
|
||||
} satisfies Parameters<typeof startOrResumeThread>[0];
|
||||
restartContextEngineCodexThread = () => startOrResumeThread(threadLifecycleParams);
|
||||
const startupThread = await startOrResumeThread(threadLifecycleParams);
|
||||
return { client: startupClient, thread: startupThread };
|
||||
};
|
||||
for (
|
||||
@@ -1444,17 +1454,9 @@ export async function runCodexAppServerAttempt(
|
||||
},
|
||||
];
|
||||
|
||||
let turn: CodexTurnStartResponse;
|
||||
try {
|
||||
runAgentHarnessLlmInputHook({
|
||||
event: llmInputEvent,
|
||||
ctx: hookContext,
|
||||
});
|
||||
emitCodexAppServerEvent(params, {
|
||||
stream: "codex_app_server.lifecycle",
|
||||
data: { phase: "turn_starting", threadId: thread.threadId },
|
||||
});
|
||||
turn = assertCodexTurnStartResponse(
|
||||
let turn: CodexTurnStartResponse | undefined;
|
||||
const startCodexTurn = async (): Promise<CodexTurnStartResponse> =>
|
||||
assertCodexTurnStartResponse(
|
||||
await client.request(
|
||||
"turn/start",
|
||||
buildTurnStartParams(params, {
|
||||
@@ -1466,78 +1468,121 @@ export async function runCodexAppServerAttempt(
|
||||
{ timeoutMs: params.timeoutMs, signal: runAbortController.signal },
|
||||
),
|
||||
);
|
||||
} catch (error) {
|
||||
const usageLimitError = await formatCodexTurnStartUsageLimitError({
|
||||
client,
|
||||
error,
|
||||
pendingNotifications,
|
||||
timeoutMs: appServer.requestTimeoutMs,
|
||||
signal: runAbortController.signal,
|
||||
try {
|
||||
runAgentHarnessLlmInputHook({
|
||||
event: llmInputEvent,
|
||||
ctx: hookContext,
|
||||
});
|
||||
const turnStartErrorMessage = usageLimitError?.message ?? formatErrorMessage(error);
|
||||
emitCodexAppServerEvent(params, {
|
||||
stream: "codex_app_server.lifecycle",
|
||||
data: { phase: "turn_start_failed", error: turnStartErrorMessage },
|
||||
data: { phase: "turn_starting", threadId: thread.threadId },
|
||||
});
|
||||
trajectoryRecorder?.recordEvent("session.ended", {
|
||||
status: "error",
|
||||
threadId: thread.threadId,
|
||||
timedOut,
|
||||
aborted: runAbortController.signal.aborted,
|
||||
promptError: turnStartErrorMessage,
|
||||
});
|
||||
trajectoryEndRecorded = true;
|
||||
runAgentHarnessLlmOutputHook({
|
||||
event: {
|
||||
turn = await startCodexTurn();
|
||||
} catch (error) {
|
||||
let turnStartError = error;
|
||||
if (
|
||||
shouldRetryContextEngineTurnOnFreshCodexThread({
|
||||
error: turnStartError,
|
||||
contextEngineActive: Boolean(activeContextEngine),
|
||||
thread,
|
||||
}) &&
|
||||
restartContextEngineCodexThread
|
||||
) {
|
||||
embeddedAgentLog.warn(
|
||||
"codex app-server context-engine turn overflowed on resume; retrying with fresh thread",
|
||||
{
|
||||
threadId: thread.threadId,
|
||||
error: formatErrorMessage(turnStartError),
|
||||
},
|
||||
);
|
||||
await clearCodexAppServerBinding(params.sessionFile);
|
||||
thread = await restartContextEngineCodexThread();
|
||||
emitCodexAppServerEvent(params, {
|
||||
stream: "codex_app_server.lifecycle",
|
||||
data: { phase: "thread_ready_retry", threadId: thread.threadId },
|
||||
});
|
||||
try {
|
||||
turn = await startCodexTurn();
|
||||
} catch (retryError) {
|
||||
turnStartError = retryError;
|
||||
}
|
||||
}
|
||||
if (turn === undefined) {
|
||||
const usageLimitError = await formatCodexTurnStartUsageLimitError({
|
||||
client,
|
||||
error: turnStartError,
|
||||
pendingNotifications,
|
||||
timeoutMs: appServer.requestTimeoutMs,
|
||||
signal: runAbortController.signal,
|
||||
});
|
||||
const turnStartErrorMessage = usageLimitError?.message ?? formatErrorMessage(turnStartError);
|
||||
emitCodexAppServerEvent(params, {
|
||||
stream: "codex_app_server.lifecycle",
|
||||
data: { phase: "turn_start_failed", error: turnStartErrorMessage },
|
||||
});
|
||||
trajectoryRecorder?.recordEvent("session.ended", {
|
||||
status: "error",
|
||||
threadId: thread.threadId,
|
||||
timedOut,
|
||||
aborted: runAbortController.signal.aborted,
|
||||
promptError: turnStartErrorMessage,
|
||||
});
|
||||
trajectoryEndRecorded = true;
|
||||
runAgentHarnessLlmOutputHook({
|
||||
event: {
|
||||
runId: params.runId,
|
||||
sessionId: params.sessionId,
|
||||
provider: params.provider,
|
||||
model: params.modelId,
|
||||
resolvedRef:
|
||||
params.runtimePlan?.observability.resolvedRef ?? `${params.provider}/${params.modelId}`,
|
||||
...(params.runtimePlan?.observability.harnessId
|
||||
? { harnessId: params.runtimePlan.observability.harnessId }
|
||||
: {}),
|
||||
assistantTexts: [],
|
||||
},
|
||||
ctx: hookContext,
|
||||
});
|
||||
runAgentHarnessAgentEndHook({
|
||||
event: {
|
||||
messages: turnStartFailureMessages,
|
||||
success: false,
|
||||
error: turnStartErrorMessage,
|
||||
durationMs: Date.now() - attemptStartedAt,
|
||||
},
|
||||
ctx: hookContext,
|
||||
});
|
||||
notificationCleanup();
|
||||
requestCleanup();
|
||||
nativeHookRelay?.unregister();
|
||||
await runAgentCleanupStep({
|
||||
runId: params.runId,
|
||||
sessionId: params.sessionId,
|
||||
provider: params.provider,
|
||||
model: params.modelId,
|
||||
resolvedRef:
|
||||
params.runtimePlan?.observability.resolvedRef ?? `${params.provider}/${params.modelId}`,
|
||||
...(params.runtimePlan?.observability.harnessId
|
||||
? { harnessId: params.runtimePlan.observability.harnessId }
|
||||
: {}),
|
||||
assistantTexts: [],
|
||||
},
|
||||
ctx: hookContext,
|
||||
});
|
||||
runAgentHarnessAgentEndHook({
|
||||
event: {
|
||||
messages: turnStartFailureMessages,
|
||||
success: false,
|
||||
error: turnStartErrorMessage,
|
||||
durationMs: Date.now() - attemptStartedAt,
|
||||
},
|
||||
ctx: hookContext,
|
||||
});
|
||||
notificationCleanup();
|
||||
requestCleanup();
|
||||
nativeHookRelay?.unregister();
|
||||
await runAgentCleanupStep({
|
||||
runId: params.runId,
|
||||
sessionId: params.sessionId,
|
||||
step: "codex-trajectory-flush-startup-failure",
|
||||
log: embeddedAgentLog,
|
||||
cleanup: async () => {
|
||||
await trajectoryRecorder?.flush();
|
||||
},
|
||||
});
|
||||
params.abortSignal?.removeEventListener("abort", abortFromUpstream);
|
||||
if (usageLimitError) {
|
||||
await markCodexAuthProfileBlockedFromRateLimits({
|
||||
params,
|
||||
authProfileId: startupAuthProfileId,
|
||||
rateLimits: usageLimitError.rateLimitsForProfile,
|
||||
});
|
||||
return buildCodexTurnStartFailureResult({
|
||||
params,
|
||||
message: usageLimitError.message,
|
||||
messagesSnapshot: turnStartFailureMessages,
|
||||
systemPromptReport,
|
||||
step: "codex-trajectory-flush-startup-failure",
|
||||
log: embeddedAgentLog,
|
||||
cleanup: async () => {
|
||||
await trajectoryRecorder?.flush();
|
||||
},
|
||||
});
|
||||
params.abortSignal?.removeEventListener("abort", abortFromUpstream);
|
||||
if (usageLimitError) {
|
||||
await markCodexAuthProfileBlockedFromRateLimits({
|
||||
params,
|
||||
authProfileId: startupAuthProfileId,
|
||||
rateLimits: usageLimitError.rateLimitsForProfile,
|
||||
});
|
||||
return buildCodexTurnStartFailureResult({
|
||||
params,
|
||||
message: usageLimitError.message,
|
||||
messagesSnapshot: turnStartFailureMessages,
|
||||
systemPromptReport,
|
||||
});
|
||||
}
|
||||
throw turnStartError;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
if (!turn) {
|
||||
throw new Error("codex app-server turn/start failed without an error");
|
||||
}
|
||||
turnId = turn.turn.id;
|
||||
const activeTurnId = turn.turn.id;
|
||||
@@ -3056,6 +3101,28 @@ function isNonEmptyString(value: unknown): value is string {
|
||||
return typeof value === "string" && value.length > 0;
|
||||
}
|
||||
|
||||
function shouldRetryContextEngineTurnOnFreshCodexThread(params: {
|
||||
error: unknown;
|
||||
contextEngineActive: boolean;
|
||||
thread: CodexAppServerThreadLifecycleBinding;
|
||||
}): boolean {
|
||||
if (!params.contextEngineActive || params.thread.lifecycle.action !== "resumed") {
|
||||
return false;
|
||||
}
|
||||
return isCodexContextWindowError(params.error);
|
||||
}
|
||||
|
||||
function isCodexContextWindowError(error: unknown): boolean {
|
||||
const message = formatErrorMessage(error);
|
||||
return (
|
||||
/ran out of room in the model'?s context window/iu.test(message) ||
|
||||
/context window/iu.test(message) ||
|
||||
/context length/iu.test(message) ||
|
||||
/maximum context/iu.test(message) ||
|
||||
/too many tokens/iu.test(message)
|
||||
);
|
||||
}
|
||||
|
||||
function readCodexNotificationItem(params: JsonValue | undefined): CodexThreadItem | undefined {
|
||||
if (!isJsonObject(params) || !isJsonObject(params.item)) {
|
||||
return undefined;
|
||||
|
||||
@@ -102,6 +102,27 @@ describe("codex app-server session binding", () => {
|
||||
expect(binding?.pluginAppPolicyContext).toEqual(pluginAppPolicyContext);
|
||||
});
|
||||
|
||||
it("round-trips context-engine binding metadata", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.json");
|
||||
await writeCodexAppServerBinding(sessionFile, {
|
||||
threadId: "thread-123",
|
||||
cwd: tempDir,
|
||||
contextEngine: {
|
||||
schemaVersion: 1,
|
||||
engineId: "lossless-claw",
|
||||
policyFingerprint: "lossless-policy-1",
|
||||
},
|
||||
});
|
||||
|
||||
const binding = await readCodexAppServerBinding(sessionFile);
|
||||
|
||||
expect(binding?.contextEngine).toEqual({
|
||||
schemaVersion: 1,
|
||||
engineId: "lossless-claw",
|
||||
policyFingerprint: "lossless-policy-1",
|
||||
});
|
||||
});
|
||||
|
||||
it("rejects old plugin app policy entries that duplicate the app id", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.json");
|
||||
await fs.writeFile(
|
||||
|
||||
@@ -43,10 +43,17 @@ export type CodexAppServerThreadBinding = {
|
||||
pluginAppsFingerprint?: string;
|
||||
pluginAppsInputFingerprint?: string;
|
||||
pluginAppPolicyContext?: PluginAppPolicyContext;
|
||||
contextEngine?: CodexAppServerContextEngineBinding;
|
||||
createdAt: string;
|
||||
updatedAt: string;
|
||||
};
|
||||
|
||||
export type CodexAppServerContextEngineBinding = {
|
||||
schemaVersion: 1;
|
||||
engineId: string;
|
||||
policyFingerprint: string;
|
||||
};
|
||||
|
||||
export function resolveCodexAppServerBindingPath(sessionFile: string): string {
|
||||
return `${sessionFile}.codex-app-server.json`;
|
||||
}
|
||||
@@ -99,6 +106,7 @@ export async function readCodexAppServerBinding(
|
||||
? parsed.pluginAppsInputFingerprint
|
||||
: undefined,
|
||||
pluginAppPolicyContext: readPluginAppPolicyContext(parsed.pluginAppPolicyContext),
|
||||
contextEngine: readContextEngineBinding(parsed.contextEngine),
|
||||
createdAt: typeof parsed.createdAt === "string" ? parsed.createdAt : new Date().toISOString(),
|
||||
updatedAt: typeof parsed.updatedAt === "string" ? parsed.updatedAt : new Date().toISOString(),
|
||||
};
|
||||
@@ -138,6 +146,7 @@ export async function writeCodexAppServerBinding(
|
||||
pluginAppsFingerprint: binding.pluginAppsFingerprint,
|
||||
pluginAppsInputFingerprint: binding.pluginAppsInputFingerprint,
|
||||
pluginAppPolicyContext: binding.pluginAppPolicyContext,
|
||||
contextEngine: binding.contextEngine,
|
||||
createdAt: binding.createdAt ?? now,
|
||||
updatedAt: now,
|
||||
};
|
||||
@@ -147,6 +156,25 @@ export async function writeCodexAppServerBinding(
|
||||
);
|
||||
}
|
||||
|
||||
function readContextEngineBinding(value: unknown): CodexAppServerContextEngineBinding | undefined {
|
||||
if (!value || typeof value !== "object" || Array.isArray(value)) {
|
||||
return undefined;
|
||||
}
|
||||
const record = value as Record<string, unknown>;
|
||||
if (
|
||||
record.schemaVersion !== 1 ||
|
||||
typeof record.engineId !== "string" ||
|
||||
typeof record.policyFingerprint !== "string"
|
||||
) {
|
||||
return undefined;
|
||||
}
|
||||
return {
|
||||
schemaVersion: 1,
|
||||
engineId: record.engineId,
|
||||
policyFingerprint: record.policyFingerprint,
|
||||
};
|
||||
}
|
||||
|
||||
function readPluginAppPolicyContext(value: unknown): PluginAppPolicyContext | undefined {
|
||||
if (!value || typeof value !== "object" || Array.isArray(value)) {
|
||||
return undefined;
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import {
|
||||
embeddedAgentLog,
|
||||
isActiveHarnessContextEngine,
|
||||
type EmbeddedRunAttemptParams,
|
||||
} from "openclaw/plugin-sdk/agent-harness-runtime";
|
||||
import {
|
||||
@@ -9,6 +10,10 @@ import {
|
||||
import { isModernCodexModel } from "../../provider.js";
|
||||
import { isCodexAppServerConnectionClosedError, type CodexAppServerClient } from "./client.js";
|
||||
import { codexSandboxPolicyForTurn, type CodexAppServerRuntimeOptions } from "./config.js";
|
||||
import {
|
||||
resolveCodexContextEngineProjectionMaxChars,
|
||||
resolveCodexContextEngineProjectionReserveTokens,
|
||||
} from "./context-engine-projection.js";
|
||||
import {
|
||||
isCodexPluginThreadBindingStale,
|
||||
mergeCodexThreadConfigs,
|
||||
@@ -34,9 +39,19 @@ import {
|
||||
readCodexAppServerBinding,
|
||||
writeCodexAppServerBinding,
|
||||
type CodexAppServerAuthProfileLookup,
|
||||
type CodexAppServerContextEngineBinding,
|
||||
type CodexAppServerThreadBinding,
|
||||
} from "./session-binding.js";
|
||||
|
||||
export type CodexAppServerThreadLifecycle = {
|
||||
action: "started" | "resumed";
|
||||
rotatedContextEngineBinding?: boolean;
|
||||
};
|
||||
|
||||
export type CodexAppServerThreadLifecycleBinding = CodexAppServerThreadBinding & {
|
||||
lifecycle: CodexAppServerThreadLifecycle;
|
||||
};
|
||||
|
||||
export type CodexPluginThreadConfigProvider = {
|
||||
enabled: boolean;
|
||||
inputFingerprint?: string;
|
||||
@@ -58,15 +73,35 @@ export async function startOrResumeThread(params: {
|
||||
developerInstructions?: string;
|
||||
config?: JsonObject;
|
||||
pluginThreadConfig?: CodexPluginThreadConfigProvider;
|
||||
}): Promise<CodexAppServerThreadBinding> {
|
||||
}): Promise<CodexAppServerThreadLifecycleBinding> {
|
||||
const dynamicToolsFingerprint = fingerprintDynamicTools(params.dynamicTools);
|
||||
const contextEngineBinding = buildContextEngineBinding(params.params);
|
||||
let binding = await readCodexAppServerBinding(params.params.sessionFile, {
|
||||
authProfileStore: params.params.authProfileStore,
|
||||
agentDir: params.params.agentDir,
|
||||
config: params.params.config,
|
||||
});
|
||||
let preserveExistingBinding = false;
|
||||
let rotatedContextEngineBinding = false;
|
||||
let prebuiltPluginThreadConfig: CodexPluginThreadConfig | undefined;
|
||||
if (binding?.threadId && (binding.contextEngine || contextEngineBinding)) {
|
||||
if (
|
||||
!contextEngineBinding ||
|
||||
!isContextEngineBindingCompatible(binding.contextEngine, contextEngineBinding)
|
||||
) {
|
||||
embeddedAgentLog.debug(
|
||||
"codex app-server context-engine binding changed; starting a new thread",
|
||||
{
|
||||
threadId: binding.threadId,
|
||||
engineId: contextEngineBinding?.engineId,
|
||||
previousEngineId: binding.contextEngine?.engineId,
|
||||
},
|
||||
);
|
||||
await clearCodexAppServerBinding(params.params.sessionFile);
|
||||
binding = undefined;
|
||||
rotatedContextEngineBinding = true;
|
||||
}
|
||||
}
|
||||
if (binding?.threadId) {
|
||||
let pluginBindingStale = isCodexPluginThreadBindingStale({
|
||||
codexPluginsEnabled: params.pluginThreadConfig?.enabled ?? false,
|
||||
@@ -166,6 +201,7 @@ export async function startOrResumeThread(params: {
|
||||
pluginAppsFingerprint: binding.pluginAppsFingerprint,
|
||||
pluginAppsInputFingerprint: binding.pluginAppsInputFingerprint,
|
||||
pluginAppPolicyContext: binding.pluginAppPolicyContext,
|
||||
contextEngine: contextEngineBinding,
|
||||
createdAt: binding.createdAt,
|
||||
},
|
||||
{
|
||||
@@ -185,6 +221,8 @@ export async function startOrResumeThread(params: {
|
||||
pluginAppsFingerprint: binding.pluginAppsFingerprint,
|
||||
pluginAppsInputFingerprint: binding.pluginAppsInputFingerprint,
|
||||
pluginAppPolicyContext: binding.pluginAppPolicyContext,
|
||||
contextEngine: contextEngineBinding,
|
||||
lifecycle: { action: "resumed" },
|
||||
};
|
||||
} catch (error) {
|
||||
if (isCodexAppServerConnectionClosedError(error)) {
|
||||
@@ -235,6 +273,7 @@ export async function startOrResumeThread(params: {
|
||||
pluginAppsFingerprint: pluginThreadConfig?.fingerprint,
|
||||
pluginAppsInputFingerprint: pluginThreadConfig?.inputFingerprint,
|
||||
pluginAppPolicyContext: pluginThreadConfig?.policyContext,
|
||||
contextEngine: contextEngineBinding,
|
||||
createdAt,
|
||||
},
|
||||
{
|
||||
@@ -256,11 +295,82 @@ export async function startOrResumeThread(params: {
|
||||
pluginAppsFingerprint: pluginThreadConfig?.fingerprint,
|
||||
pluginAppsInputFingerprint: pluginThreadConfig?.inputFingerprint,
|
||||
pluginAppPolicyContext: pluginThreadConfig?.policyContext,
|
||||
contextEngine: contextEngineBinding,
|
||||
createdAt,
|
||||
updatedAt: createdAt,
|
||||
lifecycle: {
|
||||
action: "started",
|
||||
...(rotatedContextEngineBinding ? { rotatedContextEngineBinding } : {}),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function buildContextEngineBinding(
|
||||
params: EmbeddedRunAttemptParams,
|
||||
): CodexAppServerContextEngineBinding | undefined {
|
||||
const contextEngine = isActiveHarnessContextEngine(params.contextEngine)
|
||||
? params.contextEngine
|
||||
: undefined;
|
||||
const engineId = contextEngine?.info?.id?.trim();
|
||||
if (!contextEngine || !engineId) {
|
||||
return undefined;
|
||||
}
|
||||
return {
|
||||
schemaVersion: 1,
|
||||
engineId,
|
||||
policyFingerprint: JSON.stringify({
|
||||
schemaVersion: 1,
|
||||
engineId,
|
||||
engineVersion: contextEngine.info.version,
|
||||
ownsCompaction: contextEngine.info.ownsCompaction === true,
|
||||
turnMaintenanceMode: contextEngine.info.turnMaintenanceMode,
|
||||
citationsMode: resolveContextEngineCitationsMode(params.config),
|
||||
contextTokenBudget: params.contextTokenBudget,
|
||||
projectionMaxChars: resolveCodexContextEngineProjectionMaxChars({
|
||||
contextTokenBudget: params.contextTokenBudget,
|
||||
reserveTokens: resolveCodexContextEngineProjectionReserveTokens({
|
||||
config: params.config,
|
||||
}),
|
||||
}),
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
function isContextEngineBindingCompatible(
|
||||
previous: CodexAppServerContextEngineBinding | undefined,
|
||||
next: CodexAppServerContextEngineBinding,
|
||||
): boolean {
|
||||
return (
|
||||
previous?.schemaVersion === next.schemaVersion &&
|
||||
previous.engineId === next.engineId &&
|
||||
previous.policyFingerprint === next.policyFingerprint
|
||||
);
|
||||
}
|
||||
|
||||
function resolveContextEngineCitationsMode(config: unknown): JsonValue | undefined {
|
||||
const rootConfig = isUnknownRecord(config) ? config : undefined;
|
||||
const memoryConfig = isUnknownRecord(rootConfig?.memory) ? rootConfig.memory : undefined;
|
||||
const citations = memoryConfig?.citations;
|
||||
return isJsonConfigValue(citations) ? citations : undefined;
|
||||
}
|
||||
|
||||
function isUnknownRecord(value: unknown): value is Record<string, unknown> {
|
||||
return Boolean(value && typeof value === "object" && !Array.isArray(value));
|
||||
}
|
||||
|
||||
function isJsonConfigValue(value: unknown): value is JsonValue {
|
||||
if (value === null || typeof value === "string" || typeof value === "boolean") {
|
||||
return true;
|
||||
}
|
||||
if (typeof value === "number") {
|
||||
return Number.isFinite(value);
|
||||
}
|
||||
if (Array.isArray(value)) {
|
||||
return value.every(isJsonConfigValue);
|
||||
}
|
||||
return isUnknownRecord(value) && Object.values(value).every(isJsonConfigValue);
|
||||
}
|
||||
|
||||
function shouldRecheckRecoverablePluginBinding(params: {
|
||||
binding: CodexAppServerThreadBinding;
|
||||
pluginThreadConfig?: CodexPluginThreadConfigProvider;
|
||||
|
||||
Reference in New Issue
Block a user