mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-18 15:34:46 +00:00
feat(codex): bind context-engine projections to codex threads (#82351)
* feat(codex): bind context-engine projections to codex threads * fix: harden Codex context-engine projection * fix: remove unused Codex projection helper * fix(codex): adopt compacted context-engine transcripts
This commit is contained in:
@@ -12,6 +12,7 @@ Docs: https://docs.openclaw.ai
|
||||
- CLI/onboarding: localize the setup wizard and bundled channel setup flows for English, Simplified Chinese, and Traditional Chinese. (#80645) Thanks @GaosCode.
|
||||
- Agents/skills: cache hydrated `resolvedSkills` across warm gateway turns while keying reuse by the redacted effective config, reducing redundant skill snapshot rebuilds without crossing config-gated skill boundaries. (#81451) Thanks @solodmd.
|
||||
- Telegram/group chat: add opt-in `messages.groupChat.ambientTurns: "room_event"` handling so always-on ambient chatter can run as quiet room context and speak visibly only via the message tool. (#81317) Thanks @obviyus.
|
||||
- Codex/context engines: bind thread-bootstrap projection epochs to Codex app-server threads, carry redacted tool-result context into fresh threads, and rotate backend threads when projection state changes. (#82351) Thanks @jalehman.
|
||||
|
||||
### Fixes
|
||||
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
eed14a427f34d1531d63e5f1065ef2325b46f77e5a16ce809e5e845cd6700769 plugin-sdk-api-baseline.json
|
||||
d8d090e4858f8d619b2151d69b8dc992132bc8930e04b990e4a69a433fb19d41 plugin-sdk-api-baseline.jsonl
|
||||
ae6b95dffe88496aadee03e49b6b6db2db74d4bbd9b984be94a39d81df449f93 plugin-sdk-api-baseline.json
|
||||
08da4f6d26afff58fc1accb2f1b12c2d0ef740a0abf60cbdef43a32f422a4382 plugin-sdk-api-baseline.jsonl
|
||||
|
||||
@@ -1057,6 +1057,16 @@ export default function (api) {
|
||||
The factory `ctx` exposes optional `config`, `agentDir`, and `workspaceDir`
|
||||
values for construction-time initialization.
|
||||
|
||||
`assemble()` may return `contextProjection` when the active harness has a
|
||||
persistent backend thread. Omit it for legacy per-turn projection. Return
|
||||
`{ mode: "thread_bootstrap", epoch }` when the assembled context should be
|
||||
injected once into a backend thread and reused until the epoch changes. Change
|
||||
the epoch after the engine's semantic context changes, such as after an
|
||||
engine-owned compaction pass. Hosts may preserve tool-call metadata, input
|
||||
shape, and redacted tool results in a thread-bootstrap projection so fresh
|
||||
backend threads retain tool continuity without copying raw secret-bearing
|
||||
payloads.
|
||||
|
||||
If your engine does **not** own the compaction algorithm, keep `compact()`
|
||||
implemented and delegate it explicitly:
|
||||
|
||||
|
||||
@@ -20,7 +20,8 @@ diagnostic surfaces around that boundary.
|
||||
OpenClaw still owns channel routing, session files, visible message delivery,
|
||||
OpenClaw dynamic tools, approvals, media delivery, and a transcript mirror.
|
||||
Codex owns the canonical native thread, native model loop, native tool
|
||||
continuation, and native compaction.
|
||||
continuation, and native compaction unless the active OpenClaw context engine
|
||||
declares that it owns compaction.
|
||||
|
||||
## Thread bindings and model changes
|
||||
|
||||
@@ -184,8 +185,17 @@ diagnostics bundle.
|
||||
## Compaction and transcript mirror
|
||||
|
||||
When the selected model uses the Codex harness, native thread compaction is
|
||||
delegated to Codex app-server. OpenClaw keeps a transcript mirror for channel
|
||||
history, search, `/new`, `/reset`, and future model or harness switching.
|
||||
delegated to Codex app-server unless an active context engine declares
|
||||
`ownsCompaction: true`. Owning context engines compact first and cause OpenClaw
|
||||
to abandon the old Codex backend thread so the next turn can rehydrate a fresh
|
||||
thread from engine-managed context. OpenClaw keeps a transcript mirror for
|
||||
channel history, search, `/new`, `/reset`, and future model or harness
|
||||
switching.
|
||||
|
||||
When a context engine requests Codex thread-bootstrap projection, OpenClaw
|
||||
projects tool-call names and ids, input shapes, and redacted tool-result content
|
||||
into the fresh Codex thread. It does not copy raw tool-call argument values into
|
||||
that projection.
|
||||
|
||||
The mirror includes the user prompt, final assistant text, and lightweight Codex
|
||||
reasoning or plan records when the app-server emits them. Today, OpenClaw only
|
||||
|
||||
@@ -119,8 +119,9 @@ Use `openai/gpt-*` model refs for Codex-backed OpenAI agent turns. Prefer
|
||||
`openai-codex:*` auth profiles and `auth.order.openai-codex` remain valid, but
|
||||
do not write new `openai-codex/gpt-*` model refs.
|
||||
|
||||
Do not set `compaction.model` or `compaction.provider` on Codex-backed agents.
|
||||
Codex owns compaction through its native app-server thread state, so OpenClaw
|
||||
Do not set `compaction.model` or `compaction.provider` on Codex-backed agents
|
||||
unless a selected context engine owns compaction. Without an owning context
|
||||
engine, Codex compacts through its native app-server thread state, so OpenClaw
|
||||
ignores those local summarizer overrides at runtime and `openclaw doctor --fix`
|
||||
removes them when the agent uses Codex.
|
||||
|
||||
@@ -131,6 +132,12 @@ Lossless remains supported as a context engine. Configure it through
|
||||
`compaction.provider: "lossless-claw"` shape to the Lossless context-engine slot
|
||||
when Codex is the active runtime.
|
||||
|
||||
When the active context engine reports `ownsCompaction: true`, `/compact` runs
|
||||
that engine's compaction lifecycle and invalidates the bound Codex app-server
|
||||
thread. The next Codex turn starts a fresh backend thread and rehydrates it from
|
||||
the context engine instead of layering Codex native compaction on top of the
|
||||
engine-owned semantic summary.
|
||||
|
||||
```json5
|
||||
{
|
||||
auth: {
|
||||
@@ -625,8 +632,10 @@ The Codex harness changes the low-level embedded agent executor only.
|
||||
- Codex-native shell, patch, MCP, and native app tools are owned by Codex.
|
||||
OpenClaw can observe or block selected native events through the supported
|
||||
relay, but it does not rewrite native tool arguments.
|
||||
- Codex owns native compaction. OpenClaw keeps a transcript mirror for channel
|
||||
history, search, `/new`, `/reset`, and future model or harness switching.
|
||||
- Codex owns native compaction unless the active OpenClaw context engine
|
||||
declares `ownsCompaction: true`. OpenClaw keeps a transcript mirror for
|
||||
channel history, search, `/new`, `/reset`, and future model or harness
|
||||
switching.
|
||||
- Media generation, media understanding, TTS, approvals, and messaging-tool
|
||||
output continue through the matching OpenClaw provider/model settings.
|
||||
- `tool_result_persist` applies to OpenClaw-owned transcript tool results, not
|
||||
|
||||
@@ -10,7 +10,7 @@ import type { CodexAppServerClientFactory } from "./client-factory.js";
|
||||
import type { CodexAppServerClient } from "./client.js";
|
||||
import { maybeCompactCodexAppServerSession as maybeCompactCodexAppServerSessionImpl } from "./compact.js";
|
||||
import type { CodexServerNotification } from "./protocol.js";
|
||||
import { writeCodexAppServerBinding } from "./session-binding.js";
|
||||
import { readCodexAppServerBinding, writeCodexAppServerBinding } from "./session-binding.js";
|
||||
|
||||
let tempDir: string;
|
||||
let codexAppServerClientFactoryForTest: CodexAppServerClientFactory | undefined;
|
||||
@@ -298,17 +298,15 @@ describe("maybeCompactCodexAppServerSession", () => {
|
||||
|
||||
it("does not warn for legacy Lossless config when the Lossless context engine slot is active", async () => {
|
||||
const warn = vi.spyOn(embeddedAgentLog, "warn").mockImplementation(() => undefined);
|
||||
const fake = createFakeCodexClient();
|
||||
setCodexAppServerClientFactoryForTest(async () => fake.client);
|
||||
const sessionFile = await writeTestBinding();
|
||||
const contextEngine: ContextEngine = {
|
||||
info: { id: "lcm", name: "Lossless Context Manager", ownsCompaction: true },
|
||||
assemble: vi.fn() as never,
|
||||
ingest: vi.fn() as never,
|
||||
compact: vi.fn() as never,
|
||||
compact: vi.fn(async () => ({ ok: true, compacted: false, reason: "below threshold" })),
|
||||
};
|
||||
|
||||
const pendingResult = maybeCompactCodexAppServerSession({
|
||||
await maybeCompactCodexAppServerSession({
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:main:session-1",
|
||||
sessionFile,
|
||||
@@ -330,14 +328,6 @@ describe("maybeCompactCodexAppServerSession", () => {
|
||||
},
|
||||
},
|
||||
});
|
||||
await vi.waitFor(() => {
|
||||
expect(fake.request).toHaveBeenCalledWith("thread/compact/start", { threadId: "thread-1" });
|
||||
});
|
||||
fake.emit({
|
||||
method: "thread/compacted",
|
||||
params: { threadId: "thread-1", turnId: "turn-1" },
|
||||
});
|
||||
await pendingResult;
|
||||
|
||||
expect(warn).not.toHaveBeenCalledWith(
|
||||
"ignoring OpenClaw compaction overrides for Codex app-server compaction; Codex uses native server-side compaction",
|
||||
@@ -348,17 +338,15 @@ describe("maybeCompactCodexAppServerSession", () => {
|
||||
|
||||
it("does not warn for inherited legacy Lossless provider when the Lossless slot is active", async () => {
|
||||
const warn = vi.spyOn(embeddedAgentLog, "warn").mockImplementation(() => undefined);
|
||||
const fake = createFakeCodexClient();
|
||||
setCodexAppServerClientFactoryForTest(async () => fake.client);
|
||||
const sessionFile = await writeTestBinding();
|
||||
const contextEngine: ContextEngine = {
|
||||
info: { id: "lcm", name: "Lossless Context Manager", ownsCompaction: true },
|
||||
assemble: vi.fn() as never,
|
||||
ingest: vi.fn() as never,
|
||||
compact: vi.fn() as never,
|
||||
compact: vi.fn(async () => ({ ok: true, compacted: false, reason: "below threshold" })),
|
||||
};
|
||||
|
||||
const pendingResult = maybeCompactCodexAppServerSession({
|
||||
await maybeCompactCodexAppServerSession({
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:nik:session-1",
|
||||
sessionFile,
|
||||
@@ -387,14 +375,6 @@ describe("maybeCompactCodexAppServerSession", () => {
|
||||
},
|
||||
},
|
||||
});
|
||||
await vi.waitFor(() => {
|
||||
expect(fake.request).toHaveBeenCalledWith("thread/compact/start", { threadId: "thread-1" });
|
||||
});
|
||||
fake.emit({
|
||||
method: "thread/compacted",
|
||||
params: { threadId: "thread-1", turnId: "turn-1" },
|
||||
});
|
||||
await pendingResult;
|
||||
|
||||
expect(warn).not.toHaveBeenCalledWith(
|
||||
"ignoring OpenClaw compaction overrides for Codex app-server compaction; Codex uses native server-side compaction",
|
||||
@@ -430,9 +410,8 @@ describe("maybeCompactCodexAppServerSession", () => {
|
||||
expect(factory).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("keeps context engines in maintenance mode after native compaction", async () => {
|
||||
const fake = createFakeCodexClient();
|
||||
setCodexAppServerClientFactoryForTest(async () => fake.client);
|
||||
it("runs owning context-engine compaction and invalidates the Codex thread binding", async () => {
|
||||
const info = vi.spyOn(embeddedAgentLog, "info").mockImplementation(() => undefined);
|
||||
const sessionFile = await writeTestBinding();
|
||||
const compact = vi.fn(async () => ({
|
||||
ok: true,
|
||||
@@ -470,26 +449,29 @@ describe("maybeCompactCodexAppServerSession", () => {
|
||||
currentTokenCount: 123,
|
||||
trigger: "manual",
|
||||
});
|
||||
await vi.waitFor(() => {
|
||||
expect(fake.request).toHaveBeenCalledWith("thread/compact/start", { threadId: "thread-1" });
|
||||
});
|
||||
fake.emit({
|
||||
method: "thread/compacted",
|
||||
params: { threadId: "thread-1", turnId: "turn-1" },
|
||||
});
|
||||
|
||||
const result = requireCompactResult(await pendingResult);
|
||||
expect(result.ok).toBe(true);
|
||||
expect(result.compacted).toBe(true);
|
||||
expect(result.result?.summary).toBe("");
|
||||
expect(result.result?.firstKeptEntryId).toBe("");
|
||||
expect(result.result?.tokensBefore).toBe(123);
|
||||
expect(result.result?.summary).toBe("engine summary");
|
||||
expect(result.result?.firstKeptEntryId).toBe("entry-1");
|
||||
expect(result.result?.tokensBefore).toBe(55);
|
||||
const details = compactDetails(result);
|
||||
expect(details.backend).toBe("codex-app-server");
|
||||
expect(details.threadId).toBe("thread-1");
|
||||
expect(details.signal).toBe("thread/compacted");
|
||||
expect(details.turnId).toBe("turn-1");
|
||||
expect(compact).not.toHaveBeenCalled();
|
||||
expect(details.engine).toBe("lossless-claw");
|
||||
expect(details.codexThreadBindingInvalidated).toBe(true);
|
||||
expect(await readCodexAppServerBinding(sessionFile)).toBeUndefined();
|
||||
expect(compact).toHaveBeenCalledTimes(1);
|
||||
expect(compact).toHaveBeenCalledWith({
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:main:session-1",
|
||||
sessionFile,
|
||||
tokenBudget: 777,
|
||||
currentTokenCount: 123,
|
||||
compactionTarget: "threshold",
|
||||
customInstructions: undefined,
|
||||
force: true,
|
||||
runtimeContext: { workspaceDir: tempDir, provider: "codex" },
|
||||
});
|
||||
expect(maintain).toHaveBeenCalledTimes(1);
|
||||
const [maintainCall] = maintain.mock.calls[0] ?? [];
|
||||
const maintainParams = maintainCall as
|
||||
@@ -505,11 +487,94 @@ describe("maybeCompactCodexAppServerSession", () => {
|
||||
expect(maintainParams?.sessionFile).toBe(sessionFile);
|
||||
expect(maintainParams?.runtimeContext?.workspaceDir).toBe(tempDir);
|
||||
expect(maintainParams?.runtimeContext?.provider).toBe("codex");
|
||||
expect(info).toHaveBeenCalledWith(
|
||||
"starting context-engine-owned Codex app-server compaction",
|
||||
expect.objectContaining({
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:main:session-1",
|
||||
engineId: "lossless-claw",
|
||||
tokenBudget: 777,
|
||||
currentTokenCount: 123,
|
||||
trigger: "manual",
|
||||
compactionTarget: "threshold",
|
||||
force: true,
|
||||
}),
|
||||
);
|
||||
expect(info).toHaveBeenCalledWith(
|
||||
"completed context-engine-owned Codex app-server compaction",
|
||||
expect.objectContaining({
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:main:session-1",
|
||||
engineId: "lossless-claw",
|
||||
ok: true,
|
||||
compacted: true,
|
||||
codexThreadBindingInvalidated: true,
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("returns native compaction success when context-engine maintenance fails", async () => {
|
||||
const fake = createFakeCodexClient();
|
||||
setCodexAppServerClientFactoryForTest(async () => fake.client);
|
||||
it("adopts successor transcript handles after owning context-engine compaction", async () => {
|
||||
const sessionFile = await writeTestBinding();
|
||||
const successorFile = path.join(tempDir, "session.compacted.jsonl");
|
||||
await writeCodexAppServerBinding(successorFile, {
|
||||
threadId: "thread-successor",
|
||||
cwd: tempDir,
|
||||
});
|
||||
const compact = vi.fn(async () => ({
|
||||
ok: true,
|
||||
compacted: true,
|
||||
result: {
|
||||
summary: "engine summary",
|
||||
firstKeptEntryId: "entry-1",
|
||||
tokensBefore: 55,
|
||||
sessionId: "session-1-compacted",
|
||||
sessionFile: successorFile,
|
||||
},
|
||||
}));
|
||||
const maintain = vi.fn(
|
||||
async (_params: Parameters<NonNullable<ContextEngine["maintain"]>>[0]) => ({
|
||||
changed: false,
|
||||
bytesFreed: 0,
|
||||
rewrittenEntries: 0,
|
||||
}),
|
||||
);
|
||||
const contextEngine: ContextEngine = {
|
||||
info: { id: "lossless-claw", name: "Lossless Claw", ownsCompaction: true },
|
||||
assemble: vi.fn() as never,
|
||||
ingest: vi.fn() as never,
|
||||
compact,
|
||||
maintain,
|
||||
};
|
||||
|
||||
const result = requireCompactResult(
|
||||
await maybeCompactCodexAppServerSession({
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:main:session-1",
|
||||
sessionFile,
|
||||
workspaceDir: tempDir,
|
||||
contextEngine,
|
||||
}),
|
||||
);
|
||||
|
||||
expect(result.ok).toBe(true);
|
||||
expect(result.compacted).toBe(true);
|
||||
expect(result.result?.sessionId).toBe("session-1-compacted");
|
||||
expect(result.result?.sessionFile).toBe(successorFile);
|
||||
expect(await readCodexAppServerBinding(sessionFile)).toBeUndefined();
|
||||
expect(await readCodexAppServerBinding(successorFile)).toBeUndefined();
|
||||
expect(maintain).toHaveBeenCalledTimes(1);
|
||||
const [maintainCall] = maintain.mock.calls[0] ?? [];
|
||||
const maintainParams = maintainCall as
|
||||
| {
|
||||
sessionId?: string;
|
||||
sessionFile?: string;
|
||||
}
|
||||
| undefined;
|
||||
expect(maintainParams?.sessionId).toBe("session-1-compacted");
|
||||
expect(maintainParams?.sessionFile).toBe(successorFile);
|
||||
});
|
||||
|
||||
it("returns context-engine compaction success when maintenance fails", async () => {
|
||||
const sessionFile = await writeTestBinding();
|
||||
const compact = vi.fn(async () => ({
|
||||
ok: true,
|
||||
@@ -537,24 +602,17 @@ describe("maybeCompactCodexAppServerSession", () => {
|
||||
workspaceDir: tempDir,
|
||||
contextEngine,
|
||||
});
|
||||
await vi.waitFor(() => {
|
||||
expect(fake.request).toHaveBeenCalledWith("thread/compact/start", { threadId: "thread-1" });
|
||||
});
|
||||
fake.emit({
|
||||
method: "thread/compacted",
|
||||
params: { threadId: "thread-1", turnId: "turn-1" },
|
||||
});
|
||||
|
||||
const result = requireCompactResult(await pendingResult);
|
||||
expect(result.ok).toBe(true);
|
||||
expect(result.compacted).toBe(true);
|
||||
expect(result.result?.summary).toBe("engine summary");
|
||||
const details = compactDetails(result);
|
||||
expect(details.backend).toBe("codex-app-server");
|
||||
expect(details.threadId).toBe("thread-1");
|
||||
expect(compact).not.toHaveBeenCalled();
|
||||
expect(details.codexThreadBindingInvalidated).toBe(true);
|
||||
expect(compact).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("does not fall back to context-engine compaction when native compaction cannot run", async () => {
|
||||
it("does not require a Codex binding when the owning context engine compacts", async () => {
|
||||
const compact = vi.fn(async () => ({
|
||||
ok: true,
|
||||
compacted: true,
|
||||
@@ -586,14 +644,14 @@ describe("maybeCompactCodexAppServerSession", () => {
|
||||
});
|
||||
|
||||
const compactResult = requireCompactResult(result);
|
||||
expect(compactResult.ok).toBe(false);
|
||||
expect(compactResult.compacted).toBe(false);
|
||||
expect(compactResult.reason).toBe("no codex app-server thread binding");
|
||||
expect(compact).not.toHaveBeenCalled();
|
||||
expect(maintain).not.toHaveBeenCalled();
|
||||
expect(compactResult.ok).toBe(true);
|
||||
expect(compactResult.compacted).toBe(true);
|
||||
expect(compactResult.result?.summary).toBe("engine summary");
|
||||
expect(compact).toHaveBeenCalledTimes(1);
|
||||
expect(maintain).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("does not run context-engine maintenance when native compaction is skipped", async () => {
|
||||
it("does not run context-engine maintenance when owning compaction does not compact", async () => {
|
||||
const maintain = vi.fn(async () => ({
|
||||
changed: false,
|
||||
bytesFreed: 0,
|
||||
@@ -605,7 +663,8 @@ describe("maybeCompactCodexAppServerSession", () => {
|
||||
ingest: vi.fn() as never,
|
||||
compact: vi.fn(async () => ({
|
||||
ok: true,
|
||||
compacted: true,
|
||||
compacted: false,
|
||||
reason: "below threshold",
|
||||
})),
|
||||
maintain,
|
||||
};
|
||||
@@ -619,8 +678,9 @@ describe("maybeCompactCodexAppServerSession", () => {
|
||||
});
|
||||
|
||||
const compactResult = requireCompactResult(result);
|
||||
expect(compactResult.ok).toBe(false);
|
||||
expect(compactResult.ok).toBe(true);
|
||||
expect(compactResult.compacted).toBe(false);
|
||||
expect(compactResult.reason).toBe("below threshold");
|
||||
expect(maintain).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -14,7 +14,7 @@ import {
|
||||
import type { CodexAppServerClient, CodexServerNotificationHandler } from "./client.js";
|
||||
import { resolveCodexAppServerRuntimeOptions } from "./config.js";
|
||||
import { isJsonObject, type CodexServerNotification, type JsonObject } from "./protocol.js";
|
||||
import { readCodexAppServerBinding } from "./session-binding.js";
|
||||
import { clearCodexAppServerBinding, readCodexAppServerBinding } from "./session-binding.js";
|
||||
type CodexNativeCompactionCompletion = {
|
||||
signal: "thread/compacted" | "item/completed";
|
||||
turnId?: string;
|
||||
@@ -36,6 +36,9 @@ export async function maybeCompactCodexAppServerSession(
|
||||
const activeContextEngine = isActiveHarnessContextEngine(params.contextEngine)
|
||||
? params.contextEngine
|
||||
: undefined;
|
||||
if (activeContextEngine?.info.ownsCompaction) {
|
||||
return await compactOwningContextEngine(params, activeContextEngine);
|
||||
}
|
||||
warnIfIgnoringOpenClawCompactionOverrides(params);
|
||||
const nativeResult = await compactCodexNativeThread(params, options);
|
||||
if (activeContextEngine && nativeResult?.ok && nativeResult.compacted) {
|
||||
@@ -60,6 +63,119 @@ export async function maybeCompactCodexAppServerSession(
|
||||
return nativeResult;
|
||||
}
|
||||
|
||||
async function compactOwningContextEngine(
|
||||
params: CompactEmbeddedPiSessionParams,
|
||||
contextEngine: NonNullable<CompactEmbeddedPiSessionParams["contextEngine"]>,
|
||||
): Promise<EmbeddedPiCompactResult> {
|
||||
embeddedAgentLog.info("starting context-engine-owned Codex app-server compaction", {
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
engineId: contextEngine.info.id,
|
||||
tokenBudget: params.contextTokenBudget,
|
||||
currentTokenCount: params.currentTokenCount,
|
||||
trigger: params.trigger,
|
||||
compactionTarget: params.trigger === "manual" ? "threshold" : "budget",
|
||||
force: params.trigger === "manual",
|
||||
});
|
||||
let result: Awaited<ReturnType<typeof contextEngine.compact>>;
|
||||
try {
|
||||
result = await contextEngine.compact({
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionFile: params.sessionFile,
|
||||
tokenBudget: params.contextTokenBudget,
|
||||
currentTokenCount: params.currentTokenCount,
|
||||
compactionTarget: params.trigger === "manual" ? "threshold" : "budget",
|
||||
customInstructions: params.customInstructions,
|
||||
force: params.trigger === "manual",
|
||||
runtimeContext: params.contextEngineRuntimeContext,
|
||||
});
|
||||
} catch (error) {
|
||||
embeddedAgentLog.warn("context-engine-owned Codex app-server compaction failed", {
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
engineId: contextEngine.info.id,
|
||||
error: formatErrorMessage(error),
|
||||
});
|
||||
return {
|
||||
ok: false,
|
||||
compacted: false,
|
||||
reason: `context engine compaction failed: ${formatErrorMessage(error)}`,
|
||||
};
|
||||
}
|
||||
|
||||
if (result.ok && result.compacted) {
|
||||
const compactedSessionId = result.result?.sessionId ?? params.sessionId;
|
||||
const compactedSessionFile = result.result?.sessionFile ?? params.sessionFile;
|
||||
try {
|
||||
await runHarnessContextEngineMaintenance({
|
||||
contextEngine,
|
||||
sessionId: compactedSessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionFile: compactedSessionFile,
|
||||
reason: "compaction",
|
||||
runtimeContext: params.contextEngineRuntimeContext,
|
||||
config: params.config,
|
||||
});
|
||||
} catch (error) {
|
||||
embeddedAgentLog.warn("context engine compaction maintenance failed", {
|
||||
sessionId: compactedSessionId,
|
||||
engineId: contextEngine.info.id,
|
||||
error: formatErrorMessage(error),
|
||||
});
|
||||
}
|
||||
await clearCodexAppServerBinding(params.sessionFile);
|
||||
if (compactedSessionFile !== params.sessionFile) {
|
||||
await clearCodexAppServerBinding(compactedSessionFile);
|
||||
}
|
||||
}
|
||||
|
||||
embeddedAgentLog.info("completed context-engine-owned Codex app-server compaction", {
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
engineId: contextEngine.info.id,
|
||||
ok: result.ok,
|
||||
compacted: result.compacted,
|
||||
reason: result.reason,
|
||||
codexThreadBindingInvalidated: result.ok && result.compacted,
|
||||
});
|
||||
return {
|
||||
ok: result.ok,
|
||||
compacted: result.compacted,
|
||||
reason: result.reason,
|
||||
result: result.result
|
||||
? {
|
||||
...result.result,
|
||||
summary: result.result.summary ?? "",
|
||||
firstKeptEntryId: result.result.firstKeptEntryId ?? "",
|
||||
details: mergeContextEngineCompactionDetails(result.result.details, {
|
||||
codexThreadBindingInvalidated: result.ok && result.compacted,
|
||||
}),
|
||||
}
|
||||
: result.ok && result.compacted
|
||||
? {
|
||||
summary: "",
|
||||
firstKeptEntryId: "",
|
||||
tokensBefore: params.currentTokenCount ?? 0,
|
||||
details: { codexThreadBindingInvalidated: true },
|
||||
}
|
||||
: undefined,
|
||||
};
|
||||
}
|
||||
|
||||
function mergeContextEngineCompactionDetails(
|
||||
details: unknown,
|
||||
extra: Record<string, unknown>,
|
||||
): unknown {
|
||||
if (details && typeof details === "object" && !Array.isArray(details)) {
|
||||
return {
|
||||
...(details as Record<string, unknown>),
|
||||
...extra,
|
||||
};
|
||||
}
|
||||
return extra;
|
||||
}
|
||||
|
||||
function warnIfIgnoringOpenClawCompactionOverrides(params: CompactEmbeddedPiSessionParams): void {
|
||||
const activeContextEngine = isActiveHarnessContextEngine(params.contextEngine)
|
||||
? params.contextEngine
|
||||
|
||||
@@ -95,6 +95,54 @@ describe("projectContextEngineAssemblyForCodex", () => {
|
||||
expect(result.promptText).not.toContain("cat .env");
|
||||
});
|
||||
|
||||
it("preserves redacted tool payload context for thread bootstrap projections", () => {
|
||||
const result = projectContextEngineAssemblyForCodex({
|
||||
assembledMessages: [
|
||||
{
|
||||
role: "assistant",
|
||||
content: [
|
||||
{
|
||||
type: "toolCall",
|
||||
name: "exec",
|
||||
input: {
|
||||
token: "sk-1234567890abcdef",
|
||||
cmd: "cat .env",
|
||||
options: { recursive: true },
|
||||
},
|
||||
},
|
||||
],
|
||||
timestamp: 1,
|
||||
} as unknown as AgentMessage,
|
||||
{
|
||||
role: "toolResult",
|
||||
content: [
|
||||
{
|
||||
type: "toolResult",
|
||||
toolUseId: "call-1",
|
||||
content: "OPENAI_API_KEY=sk-1234567890abcdef\nstatus ok",
|
||||
},
|
||||
],
|
||||
timestamp: 2,
|
||||
} as unknown as AgentMessage,
|
||||
],
|
||||
originalHistoryMessages: [],
|
||||
prompt: "continue",
|
||||
toolPayloadMode: "preserve",
|
||||
});
|
||||
|
||||
expect(result.promptText).toContain("tool call: exec");
|
||||
expect(result.promptText).toContain('"inputShape"');
|
||||
expect(result.promptText).toContain('"token": "[string]"');
|
||||
expect(result.promptText).toContain('"cmd": "[string]"');
|
||||
expect(result.promptText).toContain('"recursive": "[boolean]"');
|
||||
expect(result.promptText).toContain("tool result: call-1");
|
||||
expect(result.promptText).toContain('"content"');
|
||||
expect(result.promptText).toContain("OPENAI_API_KEY=");
|
||||
expect(result.promptText).toContain("status ok");
|
||||
expect(result.promptText).not.toContain("cat .env");
|
||||
expect(result.promptText).not.toContain("sk-1234567890abcdef");
|
||||
});
|
||||
|
||||
it("bounds oversized text context", () => {
|
||||
const result = projectContextEngineAssemblyForCodex({
|
||||
assembledMessages: [textMessage("assistant", "x".repeat(30_000))],
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import type { AgentMessage } from "openclaw/plugin-sdk/agent-harness-runtime";
|
||||
import { redactSensitiveFieldValue, redactToolPayloadText } from "openclaw/plugin-sdk/logging-core";
|
||||
|
||||
type CodexContextProjection = {
|
||||
developerInstructionAddition?: string;
|
||||
@@ -31,12 +32,14 @@ export function projectContextEngineAssemblyForCodex(params: {
|
||||
prompt: string;
|
||||
systemPromptAddition?: string;
|
||||
maxRenderedContextChars?: number;
|
||||
toolPayloadMode?: "elide" | "preserve";
|
||||
}): CodexContextProjection {
|
||||
const prompt = params.prompt.trim();
|
||||
const contextMessages = dropDuplicateTrailingPrompt(params.assembledMessages, prompt);
|
||||
const maxRenderedContextChars = normalizeRenderedContextMaxChars(params.maxRenderedContextChars);
|
||||
const renderedContext = renderMessagesForCodexContext(contextMessages, {
|
||||
maxTextPartChars: resolveTextPartMaxChars(maxRenderedContextChars),
|
||||
toolPayloadMode: params.toolPayloadMode ?? "elide",
|
||||
});
|
||||
const promptText = renderedContext
|
||||
? [
|
||||
@@ -145,7 +148,7 @@ function dropDuplicateTrailingPrompt(messages: AgentMessage[], prompt: string):
|
||||
|
||||
function renderMessagesForCodexContext(
|
||||
messages: AgentMessage[],
|
||||
options: { maxTextPartChars: number },
|
||||
options: { maxTextPartChars: number; toolPayloadMode: "elide" | "preserve" },
|
||||
): string {
|
||||
return messages
|
||||
.map((message) => {
|
||||
@@ -156,7 +159,10 @@ function renderMessagesForCodexContext(
|
||||
.join("\n\n");
|
||||
}
|
||||
|
||||
function renderMessageBody(message: AgentMessage, options: { maxTextPartChars: number }): string {
|
||||
function renderMessageBody(
|
||||
message: AgentMessage,
|
||||
options: { maxTextPartChars: number; toolPayloadMode: "elide" | "preserve" },
|
||||
): string {
|
||||
if (!hasMessageContent(message)) {
|
||||
return "";
|
||||
}
|
||||
@@ -173,7 +179,10 @@ function renderMessageBody(message: AgentMessage, options: { maxTextPartChars: n
|
||||
.trim();
|
||||
}
|
||||
|
||||
function renderMessagePart(part: unknown, options: { maxTextPartChars: number }): string {
|
||||
function renderMessagePart(
|
||||
part: unknown,
|
||||
options: { maxTextPartChars: number; toolPayloadMode: "elide" | "preserve" },
|
||||
): string {
|
||||
if (!part || typeof part !== "object") {
|
||||
return "";
|
||||
}
|
||||
@@ -188,16 +197,143 @@ function renderMessagePart(part: unknown, options: { maxTextPartChars: number })
|
||||
return "[image omitted]";
|
||||
}
|
||||
if (type === "toolCall" || type === "tool_use") {
|
||||
return `tool call${typeof record.name === "string" ? `: ${record.name}` : ""} [input omitted]`;
|
||||
const label = `tool call${typeof record.name === "string" ? `: ${record.name}` : ""}`;
|
||||
if (options.toolPayloadMode === "preserve") {
|
||||
return truncateText(
|
||||
`${label}\n${stableJson(renderToolCallPayload(record))}`,
|
||||
options.maxTextPartChars,
|
||||
);
|
||||
}
|
||||
return `${label} [input omitted]`;
|
||||
}
|
||||
if (type === "toolResult" || type === "tool_result") {
|
||||
const label =
|
||||
typeof record.toolUseId === "string" ? `tool result: ${record.toolUseId}` : "tool result";
|
||||
if (options.toolPayloadMode === "preserve") {
|
||||
return truncateText(
|
||||
`${label}\n${stableJson(renderToolResultPayload(record))}`,
|
||||
options.maxTextPartChars,
|
||||
);
|
||||
}
|
||||
return `${label} [content omitted]`;
|
||||
}
|
||||
return `[${type ?? "non-text"} content omitted]`;
|
||||
}
|
||||
|
||||
function renderToolCallPayload(record: Record<string, unknown>): Record<string, unknown> {
|
||||
const payload: Record<string, unknown> = pickToolPayloadMetadata(record);
|
||||
const input = record.input ?? record.arguments;
|
||||
if (input !== undefined) {
|
||||
payload.inputShape = summarizeToolInputShape(input);
|
||||
}
|
||||
return payload;
|
||||
}
|
||||
|
||||
function renderToolResultPayload(record: Record<string, unknown>): Record<string, unknown> {
|
||||
const payload: Record<string, unknown> = pickToolPayloadMetadata(record);
|
||||
for (const [key, value] of Object.entries(record)) {
|
||||
if (TOOL_PAYLOAD_METADATA_KEYS.has(key)) {
|
||||
continue;
|
||||
}
|
||||
payload[key] = redactPreservedToolValue(key, value);
|
||||
}
|
||||
return payload;
|
||||
}
|
||||
|
||||
const TOOL_PAYLOAD_METADATA_KEYS = new Set([
|
||||
"type",
|
||||
"name",
|
||||
"id",
|
||||
"callId",
|
||||
"toolCallId",
|
||||
"toolUseId",
|
||||
]);
|
||||
|
||||
function pickToolPayloadMetadata(record: Record<string, unknown>): Record<string, unknown> {
|
||||
const payload: Record<string, unknown> = {};
|
||||
for (const key of TOOL_PAYLOAD_METADATA_KEYS) {
|
||||
const value = record[key];
|
||||
if (typeof value === "string" && value.trim()) {
|
||||
payload[key] = redactSensitiveFieldValue(key, value);
|
||||
}
|
||||
}
|
||||
return payload;
|
||||
}
|
||||
|
||||
// Tool-call inputs can contain shell commands and credentials. For bootstrap
|
||||
// continuity, retain object structure and primitive types instead of values.
|
||||
function summarizeToolInputShape(value: unknown, seen = new WeakSet<object>()): unknown {
|
||||
if (value === null) {
|
||||
return null;
|
||||
}
|
||||
if (Array.isArray(value)) {
|
||||
if (seen.has(value)) {
|
||||
return "[Circular]";
|
||||
}
|
||||
seen.add(value);
|
||||
return value.map((entry) => summarizeToolInputShape(entry, seen));
|
||||
}
|
||||
if (value && typeof value === "object") {
|
||||
if (seen.has(value)) {
|
||||
return "[Circular]";
|
||||
}
|
||||
seen.add(value);
|
||||
const out: Record<string, unknown> = {};
|
||||
for (const [key, child] of Object.entries(value as Record<string, unknown>)) {
|
||||
out[key] = summarizeToolInputShape(child, seen);
|
||||
}
|
||||
return out;
|
||||
}
|
||||
return `[${typeof value}]`;
|
||||
}
|
||||
|
||||
// Tool results are the useful carried context for a fresh Codex thread, so keep
|
||||
// their content while applying the same text/field redaction used for tool logs.
|
||||
function redactPreservedToolValue(
|
||||
key: string,
|
||||
value: unknown,
|
||||
seen = new WeakSet<object>(),
|
||||
): unknown {
|
||||
if (typeof value === "string") {
|
||||
return redactSensitiveFieldValue(key, redactToolPayloadText(value));
|
||||
}
|
||||
if (
|
||||
value === null ||
|
||||
value === undefined ||
|
||||
typeof value === "number" ||
|
||||
typeof value === "boolean"
|
||||
) {
|
||||
return value;
|
||||
}
|
||||
if (Array.isArray(value)) {
|
||||
if (seen.has(value)) {
|
||||
return "[Circular]";
|
||||
}
|
||||
seen.add(value);
|
||||
return value.map((entry) => redactPreservedToolValue(key, entry, seen));
|
||||
}
|
||||
if (value && typeof value === "object") {
|
||||
if (seen.has(value)) {
|
||||
return "[Circular]";
|
||||
}
|
||||
seen.add(value);
|
||||
const out: Record<string, unknown> = {};
|
||||
for (const [childKey, child] of Object.entries(value as Record<string, unknown>)) {
|
||||
out[childKey] = redactPreservedToolValue(childKey, child, seen);
|
||||
}
|
||||
return out;
|
||||
}
|
||||
return `[${typeof value}]`;
|
||||
}
|
||||
|
||||
function stableJson(value: unknown): string {
|
||||
try {
|
||||
return JSON.stringify(value, null, 2) ?? "";
|
||||
} catch {
|
||||
return "[unserializable payload omitted]";
|
||||
}
|
||||
}
|
||||
|
||||
function extractMessageText(message: AgentMessage): string {
|
||||
if (!hasMessageContent(message)) {
|
||||
return "";
|
||||
|
||||
@@ -267,7 +267,15 @@ function expectRequestInputTextContains(
|
||||
}
|
||||
|
||||
function getRequestInputText(harness: ReturnType<typeof createStartedThreadHarness>): string {
|
||||
const params = requireRequestParams(harness, "turn/start");
|
||||
return getRequestInputTextAt(harness, 0);
|
||||
}
|
||||
|
||||
function getRequestInputTextAt(
|
||||
harness: ReturnType<typeof createStartedThreadHarness>,
|
||||
index: number,
|
||||
): string {
|
||||
const request = harness.requests.filter((entry) => entry.method === "turn/start").at(index);
|
||||
const params = requireRecord(request?.params, "turn/start params");
|
||||
const input = requireArray(params.input, "turn/start input");
|
||||
return input
|
||||
.map((entry) => {
|
||||
@@ -399,7 +407,95 @@ describe("runCodexAppServerAttempt context-engine lifecycle", () => {
|
||||
await run;
|
||||
});
|
||||
|
||||
it("retries a resumed context-engine thread on a fresh Codex thread after early context overflow", async () => {
|
||||
it("projects thread-bootstrap context only once for a matching context-engine epoch", async () => {
|
||||
const info = vi.spyOn(embeddedAgentLog, "info").mockImplementation(() => undefined);
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
SessionManager.open(sessionFile).appendMessage(
|
||||
assistantMessage("bootstrap-only context", Date.now()) as never,
|
||||
);
|
||||
const contextEngine = createContextEngine({
|
||||
assemble: vi.fn(async ({ messages, prompt }) => ({
|
||||
messages: [...messages, userMessage(prompt ?? "", 10)],
|
||||
estimatedTokens: 42,
|
||||
systemPromptAddition: "context-engine system",
|
||||
contextProjection: { mode: "thread_bootstrap" as const, epoch: "epoch-1" },
|
||||
})),
|
||||
});
|
||||
const firstHarness = createStartedThreadHarness();
|
||||
const firstParams = createParams(sessionFile, workspaceDir);
|
||||
firstParams.contextEngine = contextEngine;
|
||||
|
||||
const firstRun = runCodexAppServerAttempt(firstParams);
|
||||
await firstHarness.waitForMethod("turn/start");
|
||||
expectRequestInputTextContains(firstHarness, "OpenClaw assembled context for this turn:");
|
||||
expectRequestInputTextContains(firstHarness, "bootstrap-only context");
|
||||
await firstHarness.completeTurn();
|
||||
await firstRun;
|
||||
|
||||
const savedBinding = await readCodexAppServerBinding(sessionFile);
|
||||
expect(savedBinding?.contextEngine?.projection).toEqual({
|
||||
schemaVersion: 1,
|
||||
mode: "thread_bootstrap",
|
||||
epoch: "epoch-1",
|
||||
fingerprint: undefined,
|
||||
});
|
||||
|
||||
const secondHarness = createStartedThreadHarness(async (method) => {
|
||||
if (method === "thread/resume") {
|
||||
return threadStartResult("thread-1");
|
||||
}
|
||||
return undefined;
|
||||
});
|
||||
const secondRun = runCodexAppServerAttempt(firstParams);
|
||||
await secondHarness.waitForMethod("turn/start");
|
||||
|
||||
expect(secondHarness.requests.map((request) => request.method)).toEqual([
|
||||
"thread/resume",
|
||||
"turn/start",
|
||||
]);
|
||||
const secondInputText = getRequestInputText(secondHarness);
|
||||
expect(secondInputText).not.toContain("OpenClaw assembled context for this turn:");
|
||||
expect(secondInputText).not.toContain("bootstrap-only context");
|
||||
expect(secondInputText).toBe("hello");
|
||||
const projectionLogs = info.mock.calls.filter(
|
||||
([message]) => message === "codex app-server context-engine projection decision",
|
||||
);
|
||||
expect(projectionLogs).toEqual([
|
||||
[
|
||||
"codex app-server context-engine projection decision",
|
||||
expect.objectContaining({
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:main:session-1",
|
||||
engineId: "lossless-claw",
|
||||
mode: "thread_bootstrap",
|
||||
epoch: "epoch-1",
|
||||
projected: true,
|
||||
reason: "missing-thread-binding",
|
||||
}),
|
||||
],
|
||||
[
|
||||
"codex app-server context-engine projection decision",
|
||||
expect.objectContaining({
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:main:session-1",
|
||||
engineId: "lossless-claw",
|
||||
mode: "thread_bootstrap",
|
||||
epoch: "epoch-1",
|
||||
previousThreadId: "thread-1",
|
||||
previousEpoch: "epoch-1",
|
||||
projected: false,
|
||||
reason: "matching-thread-bootstrap-binding",
|
||||
}),
|
||||
],
|
||||
]);
|
||||
|
||||
await secondHarness.completeTurn();
|
||||
await secondRun;
|
||||
});
|
||||
|
||||
it("starts a fresh Codex thread and reprojects when context-engine epoch changes", async () => {
|
||||
const info = vi.spyOn(embeddedAgentLog, "info").mockImplementation(() => undefined);
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
await writeCodexAppServerBinding(sessionFile, {
|
||||
@@ -410,10 +506,265 @@ describe("runCodexAppServerAttempt context-engine lifecycle", () => {
|
||||
schemaVersion: 1,
|
||||
engineId: "lossless-claw",
|
||||
policyFingerprint:
|
||||
'{"schemaVersion":1,"engineId":"lossless-claw","ownsCompaction":true,"contextTokenBudget":400000,"projectionMaxChars":1000000}',
|
||||
'{"schemaVersion":1,"engineId":"lossless-claw","ownsCompaction":true,"projectionMaxChars":24000}',
|
||||
projection: {
|
||||
schemaVersion: 1,
|
||||
mode: "thread_bootstrap",
|
||||
epoch: "epoch-old",
|
||||
},
|
||||
},
|
||||
});
|
||||
const contextEngine = createContextEngine();
|
||||
const contextEngine = createContextEngine({
|
||||
assemble: vi.fn(async ({ prompt }) => ({
|
||||
messages: [assistantMessage("new epoch context", 10), userMessage(prompt ?? "", 11)],
|
||||
estimatedTokens: 42,
|
||||
systemPromptAddition: "context-engine system",
|
||||
contextProjection: { mode: "thread_bootstrap" as const, epoch: "epoch-new" },
|
||||
})),
|
||||
});
|
||||
const harness = createStartedThreadHarness(async (method) => {
|
||||
if (method === "thread/start") {
|
||||
return threadStartResult("thread-new");
|
||||
}
|
||||
return undefined;
|
||||
});
|
||||
const params = createParams(sessionFile, workspaceDir);
|
||||
params.contextEngine = contextEngine;
|
||||
|
||||
const run = runCodexAppServerAttempt(params);
|
||||
await harness.waitForMethod("turn/start");
|
||||
|
||||
expect(harness.requests.map((request) => request.method)).toEqual([
|
||||
"thread/start",
|
||||
"turn/start",
|
||||
]);
|
||||
expectRequestInputTextContains(harness, "OpenClaw assembled context for this turn:");
|
||||
expectRequestInputTextContains(harness, "new epoch context");
|
||||
|
||||
await harness.notify({
|
||||
method: "turn/completed",
|
||||
params: {
|
||||
threadId: "thread-new",
|
||||
turnId: "turn-1",
|
||||
turn: {
|
||||
id: "turn-1",
|
||||
status: "completed",
|
||||
items: [{ type: "agentMessage", id: "msg-1", text: "fresh answer" }],
|
||||
},
|
||||
},
|
||||
});
|
||||
await run;
|
||||
|
||||
const savedBinding = await readCodexAppServerBinding(sessionFile);
|
||||
expect(savedBinding?.threadId).toBe("thread-new");
|
||||
expect(savedBinding?.contextEngine?.projection?.epoch).toBe("epoch-new");
|
||||
expect(info).toHaveBeenCalledWith(
|
||||
"codex app-server context-engine projection decision",
|
||||
expect.objectContaining({
|
||||
sessionId: "session-1",
|
||||
engineId: "lossless-claw",
|
||||
epoch: "epoch-new",
|
||||
previousThreadId: "thread-old",
|
||||
previousEpoch: "epoch-old",
|
||||
projected: true,
|
||||
reason: "context-engine-binding-mismatch",
|
||||
}),
|
||||
);
|
||||
expect(info).toHaveBeenCalledWith(
|
||||
"codex app-server wrote context-engine thread binding",
|
||||
expect.objectContaining({
|
||||
sessionId: "session-1",
|
||||
threadId: "thread-new",
|
||||
engineId: "lossless-claw",
|
||||
epoch: "epoch-new",
|
||||
action: "rotated",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("reprojects thread-bootstrap context when context-engine policy changes", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
await writeCodexAppServerBinding(sessionFile, {
|
||||
threadId: "thread-old",
|
||||
cwd: workspaceDir,
|
||||
dynamicToolsFingerprint: "[]",
|
||||
contextEngine: {
|
||||
schemaVersion: 1,
|
||||
engineId: "lossless-claw",
|
||||
policyFingerprint:
|
||||
'{"schemaVersion":1,"engineId":"lossless-claw","ownsCompaction":true,"projectionMaxChars":24000}',
|
||||
projection: {
|
||||
schemaVersion: 1,
|
||||
mode: "thread_bootstrap",
|
||||
epoch: "epoch-1",
|
||||
},
|
||||
},
|
||||
});
|
||||
const contextEngine = createContextEngine({
|
||||
assemble: vi.fn(async ({ prompt }) => ({
|
||||
messages: [assistantMessage("policy changed context", 10), userMessage(prompt ?? "", 11)],
|
||||
estimatedTokens: 42,
|
||||
systemPromptAddition: "context-engine system",
|
||||
contextProjection: { mode: "thread_bootstrap" as const, epoch: "epoch-1" },
|
||||
})),
|
||||
});
|
||||
const harness = createStartedThreadHarness(async (method) => {
|
||||
if (method === "thread/start") {
|
||||
return threadStartResult("thread-new");
|
||||
}
|
||||
return undefined;
|
||||
});
|
||||
const params = createParams(sessionFile, workspaceDir);
|
||||
params.contextEngine = contextEngine;
|
||||
params.contextTokenBudget = 80_000;
|
||||
|
||||
const run = runCodexAppServerAttempt(params);
|
||||
await harness.waitForMethod("turn/start");
|
||||
|
||||
expect(harness.requests.map((request) => request.method)).toEqual([
|
||||
"thread/start",
|
||||
"turn/start",
|
||||
]);
|
||||
expectRequestInputTextContains(harness, "OpenClaw assembled context for this turn:");
|
||||
expectRequestInputTextContains(harness, "policy changed context");
|
||||
|
||||
await harness.notify({
|
||||
method: "turn/completed",
|
||||
params: {
|
||||
threadId: "thread-new",
|
||||
turnId: "turn-1",
|
||||
turn: {
|
||||
id: "turn-1",
|
||||
status: "completed",
|
||||
items: [{ type: "agentMessage", id: "msg-1", text: "fresh answer" }],
|
||||
},
|
||||
},
|
||||
});
|
||||
await run;
|
||||
});
|
||||
|
||||
it("starts a fresh Codex thread when thread-bootstrap projection falls back to per-turn projection", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
await writeCodexAppServerBinding(sessionFile, {
|
||||
threadId: "thread-old",
|
||||
cwd: workspaceDir,
|
||||
dynamicToolsFingerprint: "[]",
|
||||
contextEngine: {
|
||||
schemaVersion: 1,
|
||||
engineId: "lossless-claw",
|
||||
policyFingerprint:
|
||||
'{"schemaVersion":1,"engineId":"lossless-claw","ownsCompaction":true,"projectionMaxChars":24000}',
|
||||
projection: {
|
||||
schemaVersion: 1,
|
||||
mode: "thread_bootstrap",
|
||||
epoch: "epoch-1",
|
||||
},
|
||||
},
|
||||
});
|
||||
const contextEngine = createContextEngine({
|
||||
assemble: vi.fn(async ({ prompt }) => ({
|
||||
messages: [assistantMessage("per-turn context", 10), userMessage(prompt ?? "", 11)],
|
||||
estimatedTokens: 42,
|
||||
systemPromptAddition: "context-engine system",
|
||||
})),
|
||||
});
|
||||
const harness = createStartedThreadHarness(async (method) => {
|
||||
if (method === "thread/resume") {
|
||||
return threadStartResult("thread-old");
|
||||
}
|
||||
if (method === "thread/start") {
|
||||
return threadStartResult("thread-new");
|
||||
}
|
||||
return undefined;
|
||||
});
|
||||
const params = createParams(sessionFile, workspaceDir);
|
||||
params.contextEngine = contextEngine;
|
||||
|
||||
const run = runCodexAppServerAttempt(params);
|
||||
await harness.waitForMethod("turn/start");
|
||||
|
||||
expect(harness.requests.map((request) => request.method)).toEqual([
|
||||
"thread/start",
|
||||
"turn/start",
|
||||
]);
|
||||
expectRequestInputTextContains(harness, "OpenClaw assembled context for this turn:");
|
||||
expectRequestInputTextContains(harness, "per-turn context");
|
||||
|
||||
await harness.notify({
|
||||
method: "turn/completed",
|
||||
params: {
|
||||
threadId: "thread-new",
|
||||
turnId: "turn-1",
|
||||
turn: {
|
||||
id: "turn-1",
|
||||
status: "completed",
|
||||
items: [{ type: "agentMessage", id: "msg-1", text: "fresh answer" }],
|
||||
},
|
||||
},
|
||||
});
|
||||
await run;
|
||||
|
||||
const savedBinding = await readCodexAppServerBinding(sessionFile);
|
||||
expect(savedBinding?.threadId).toBe("thread-new");
|
||||
expect(savedBinding?.contextEngine?.projection).toBeUndefined();
|
||||
});
|
||||
|
||||
it("retries a resumed context-engine thread on a fresh Codex thread after early context overflow", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const successorFile = path.join(tempDir, "session.compacted.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
SessionManager.open(sessionFile).appendMessage(
|
||||
assistantMessage("pre-compaction context", Date.now()) as never,
|
||||
);
|
||||
await writeCodexAppServerBinding(sessionFile, {
|
||||
threadId: "thread-old",
|
||||
cwd: workspaceDir,
|
||||
dynamicToolsFingerprint: "[]",
|
||||
contextEngine: {
|
||||
schemaVersion: 1,
|
||||
engineId: "lossless-claw",
|
||||
policyFingerprint:
|
||||
'{"schemaVersion":1,"engineId":"lossless-claw","ownsCompaction":true,"contextTokenBudget":400000,"projectionMaxChars":1000000}',
|
||||
projection: {
|
||||
schemaVersion: 1,
|
||||
mode: "thread_bootstrap",
|
||||
epoch: "epoch-before",
|
||||
},
|
||||
},
|
||||
});
|
||||
let epoch = "epoch-before";
|
||||
const compact = vi.fn(async () => {
|
||||
epoch = "epoch-after";
|
||||
SessionManager.open(successorFile).appendMessage(
|
||||
assistantMessage("successor compacted context", Date.now()) as never,
|
||||
);
|
||||
return {
|
||||
ok: true,
|
||||
compacted: true,
|
||||
result: {
|
||||
summary: "summary",
|
||||
firstKeptEntryId: "entry-1",
|
||||
tokensBefore: 10,
|
||||
sessionId: "session-1-compacted",
|
||||
sessionFile: successorFile,
|
||||
},
|
||||
};
|
||||
});
|
||||
const assemble = vi.fn(
|
||||
async ({ messages, prompt }: Parameters<ContextEngine["assemble"]>[0]) => ({
|
||||
messages: [
|
||||
...messages,
|
||||
assistantMessage(`context ${epoch}`, 10),
|
||||
userMessage(prompt ?? "", 11),
|
||||
],
|
||||
estimatedTokens: 42,
|
||||
systemPromptAddition: "context-engine system",
|
||||
contextProjection: { mode: "thread_bootstrap" as const, epoch },
|
||||
}),
|
||||
);
|
||||
const contextEngine = createContextEngine({ assemble, compact });
|
||||
const harness = createStartedThreadHarness(async (method, requestParams) => {
|
||||
const request = requireRecord(requestParams, `${method} params`);
|
||||
if (method === "thread/resume") {
|
||||
@@ -458,9 +809,30 @@ describe("runCodexAppServerAttempt context-engine lifecycle", () => {
|
||||
const result = await run;
|
||||
|
||||
expect(result.assistantTexts).toContain("fresh answer");
|
||||
const savedBinding = await readCodexAppServerBinding(sessionFile);
|
||||
expect(compact).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:main:session-1",
|
||||
sessionFile,
|
||||
tokenBudget: 400_000,
|
||||
currentTokenCount: 400_000,
|
||||
compactionTarget: "threshold",
|
||||
force: true,
|
||||
}),
|
||||
);
|
||||
expect(assemble).toHaveBeenCalledTimes(2);
|
||||
const retryAssembleParams = assemble.mock.calls[1]?.[0];
|
||||
expect(retryAssembleParams?.messages.map((message) => message.role)).toEqual(["assistant"]);
|
||||
expect(
|
||||
retryAssembleParams?.messages.map((message) => String(message.content?.[0]?.text)),
|
||||
).toEqual(["successor compacted context"]);
|
||||
const retryInputText = getRequestInputTextAt(harness, -1);
|
||||
expect(retryInputText).toContain("successor compacted context");
|
||||
expect(retryInputText).not.toContain("pre-compaction context");
|
||||
const savedBinding = await readCodexAppServerBinding(successorFile);
|
||||
expect(savedBinding?.threadId).toBe("thread-fresh");
|
||||
expect(savedBinding?.contextEngine?.engineId).toBe("lossless-claw");
|
||||
expect(savedBinding?.contextEngine?.projection?.epoch).toBe("epoch-after");
|
||||
});
|
||||
|
||||
it("keeps current-turn context at the front of the Codex context-engine prompt", async () => {
|
||||
|
||||
@@ -36,6 +36,7 @@ import {
|
||||
type EmbeddedRunAttemptParams,
|
||||
type EmbeddedRunAttemptResult,
|
||||
type EmbeddedContextFile,
|
||||
type ContextEngineProjection,
|
||||
type NativeHookRelayEvent,
|
||||
type NativeHookRelayRegistrationHandle,
|
||||
} from "openclaw/plugin-sdk/agent-harness-runtime";
|
||||
@@ -123,10 +124,13 @@ import { clearSharedCodexAppServerClientIfCurrent } from "./shared-client.js";
|
||||
import {
|
||||
areCodexDynamicToolFingerprintsCompatible,
|
||||
buildDeveloperInstructions,
|
||||
buildContextEngineBinding,
|
||||
buildTurnStartParams,
|
||||
codexDynamicToolsFingerprint,
|
||||
isContextEngineBindingCompatible,
|
||||
startOrResumeThread,
|
||||
type CodexAppServerThreadLifecycleBinding,
|
||||
type CodexContextEngineThreadBootstrapProjection,
|
||||
} from "./thread-lifecycle.js";
|
||||
import {
|
||||
inferCodexDynamicToolMeta,
|
||||
@@ -506,6 +510,23 @@ export async function runCodexAppServerAttempt(
|
||||
sessionKey: sandboxSessionKey,
|
||||
...(startupAuthProfileId ? { authProfileId: startupAuthProfileId } : {}),
|
||||
};
|
||||
let activeSessionId = params.sessionId;
|
||||
let activeSessionFile = params.sessionFile;
|
||||
const buildActiveRunAttemptParams = (): EmbeddedRunAttemptParams => ({
|
||||
...runtimeParams,
|
||||
sessionId: activeSessionId,
|
||||
sessionFile: activeSessionFile,
|
||||
});
|
||||
const adoptContextEngineCompactionTranscript = (compactResult: {
|
||||
result?: { sessionId?: string; sessionFile?: string };
|
||||
}): void => {
|
||||
if (compactResult.result?.sessionId) {
|
||||
activeSessionId = compactResult.result.sessionId;
|
||||
}
|
||||
if (compactResult.result?.sessionFile) {
|
||||
activeSessionFile = compactResult.result.sessionFile;
|
||||
}
|
||||
};
|
||||
const startupAuthAccountCacheKey = await resolveCodexAppServerAuthAccountCacheKey({
|
||||
authProfileId: startupAuthProfileId,
|
||||
authProfileStore: params.authProfileStore,
|
||||
@@ -557,8 +578,8 @@ export async function runCodexAppServerAttempt(
|
||||
runId: params.runId,
|
||||
},
|
||||
});
|
||||
const hadSessionFile = await pathExists(params.sessionFile);
|
||||
let historyMessages = (await readMirroredSessionHistoryMessages(params.sessionFile)) ?? [];
|
||||
const hadSessionFile = await pathExists(activeSessionFile);
|
||||
let historyMessages = (await readMirroredSessionHistoryMessages(activeSessionFile)) ?? [];
|
||||
const hookContextWindowFields = {
|
||||
...(params.contextWindowInfo?.tokens
|
||||
? { contextTokenBudget: params.contextWindowInfo.tokens }
|
||||
@@ -583,28 +604,32 @@ export async function runCodexAppServerAttempt(
|
||||
channelId: params.messageChannel ?? params.messageProvider ?? undefined,
|
||||
...hookContextWindowFields,
|
||||
};
|
||||
const activeContextEnginePluginId = activeContextEngine
|
||||
? resolveContextEngineOwnerPluginId(activeContextEngine)
|
||||
: undefined;
|
||||
const buildActiveContextEngineRuntimeContext = () =>
|
||||
buildHarnessContextEngineRuntimeContext({
|
||||
attempt: buildActiveRunAttemptParams(),
|
||||
workspaceDir: effectiveWorkspace,
|
||||
agentDir,
|
||||
activeAgentId: sessionAgentId,
|
||||
contextEnginePluginId: activeContextEnginePluginId,
|
||||
tokenBudget: params.contextTokenBudget,
|
||||
});
|
||||
if (activeContextEngine) {
|
||||
const activeContextEnginePluginId = resolveContextEngineOwnerPluginId(activeContextEngine);
|
||||
await bootstrapHarnessContextEngine({
|
||||
hadSessionFile,
|
||||
contextEngine: activeContextEngine,
|
||||
sessionId: params.sessionId,
|
||||
sessionId: activeSessionId,
|
||||
sessionKey: sandboxSessionKey,
|
||||
sessionFile: params.sessionFile,
|
||||
runtimeContext: buildHarnessContextEngineRuntimeContext({
|
||||
attempt: runtimeParams,
|
||||
workspaceDir: effectiveWorkspace,
|
||||
agentDir,
|
||||
activeAgentId: sessionAgentId,
|
||||
contextEnginePluginId: activeContextEnginePluginId,
|
||||
tokenBudget: params.contextTokenBudget,
|
||||
}),
|
||||
sessionFile: activeSessionFile,
|
||||
runtimeContext: buildActiveContextEngineRuntimeContext(),
|
||||
runMaintenance: runHarnessContextEngineMaintenance,
|
||||
config: params.config,
|
||||
warn: (message) => embeddedAgentLog.warn(message),
|
||||
});
|
||||
historyMessages =
|
||||
(await readMirroredSessionHistoryMessages(params.sessionFile)) ?? historyMessages;
|
||||
(await readMirroredSessionHistoryMessages(activeSessionFile)) ?? historyMessages;
|
||||
}
|
||||
const baseDeveloperInstructions = buildDeveloperInstructions(params);
|
||||
// Build the workspace bootstrap block before finalizing developer
|
||||
@@ -624,41 +649,91 @@ export async function runCodexAppServerAttempt(
|
||||
workspaceBootstrapInstructions,
|
||||
);
|
||||
let prePromptMessageCount = historyMessages.length;
|
||||
let contextEngineProjection: CodexContextEngineThreadBootstrapProjection | undefined;
|
||||
const resetCodexPromptInputs = () => {
|
||||
promptText = params.prompt;
|
||||
developerInstructions = joinPresentSections(
|
||||
baseDeveloperInstructions,
|
||||
workspaceBootstrapInstructions,
|
||||
);
|
||||
prePromptMessageCount = historyMessages.length;
|
||||
contextEngineProjection = undefined;
|
||||
};
|
||||
const applyActiveContextEngineProjection = async (
|
||||
decisionStartupBinding: CodexAppServerThreadBinding | undefined,
|
||||
) => {
|
||||
if (!activeContextEngine) {
|
||||
return;
|
||||
}
|
||||
const assembled = await assembleHarnessContextEngine({
|
||||
contextEngine: activeContextEngine,
|
||||
sessionId: activeSessionId,
|
||||
sessionKey: sandboxSessionKey,
|
||||
messages: historyMessages,
|
||||
tokenBudget: params.contextTokenBudget,
|
||||
availableTools: new Set(toolBridge.specs.map((tool) => tool.name).filter(isNonEmptyString)),
|
||||
citationsMode: params.config?.memory?.citations,
|
||||
modelId: params.modelId,
|
||||
prompt: params.prompt,
|
||||
});
|
||||
if (!assembled) {
|
||||
throw new Error("context engine assemble returned no result");
|
||||
}
|
||||
contextEngineProjection = readContextEngineThreadBootstrapProjection(
|
||||
assembled.contextProjection,
|
||||
);
|
||||
const projection = projectContextEngineAssemblyForCodex({
|
||||
assembledMessages: assembled.messages,
|
||||
originalHistoryMessages: historyMessages,
|
||||
prompt: params.prompt,
|
||||
systemPromptAddition: assembled.systemPromptAddition,
|
||||
maxRenderedContextChars: resolveCodexContextEngineProjectionMaxChars({
|
||||
contextTokenBudget: params.contextTokenBudget,
|
||||
reserveTokens: resolveCodexContextEngineProjectionReserveTokens({
|
||||
config: params.config,
|
||||
}),
|
||||
}),
|
||||
toolPayloadMode: contextEngineProjection ? "preserve" : "elide",
|
||||
});
|
||||
const projectionDecision = contextEngineProjection
|
||||
? resolveContextEngineBootstrapProjectionDecision({
|
||||
startupBinding: decisionStartupBinding,
|
||||
expectedBinding: buildContextEngineBinding(
|
||||
buildActiveRunAttemptParams(),
|
||||
contextEngineProjection,
|
||||
),
|
||||
projection: contextEngineProjection,
|
||||
dynamicToolsFingerprint: codexDynamicToolsFingerprint(toolBridge.specs),
|
||||
})
|
||||
: { project: true, reason: "per-turn-projection" };
|
||||
embeddedAgentLog.info("codex app-server context-engine projection decision", {
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: sandboxSessionKey,
|
||||
engineId: activeContextEngine.info.id,
|
||||
mode: contextEngineProjection?.mode ?? assembled.contextProjection?.mode ?? "per_turn",
|
||||
epoch: contextEngineProjection?.epoch,
|
||||
fingerprint: contextEngineProjection?.fingerprint,
|
||||
previousThreadId: decisionStartupBinding?.threadId,
|
||||
previousEpoch: decisionStartupBinding?.contextEngine?.projection?.epoch,
|
||||
previousFingerprint: decisionStartupBinding?.contextEngine?.projection?.fingerprint,
|
||||
projected: projectionDecision.project,
|
||||
reason: projectionDecision.reason,
|
||||
assembledMessages: assembled.messages.length,
|
||||
originalHistoryMessages: historyMessages.length,
|
||||
projectedPromptChars: projection.promptText.length,
|
||||
developerInstructionAdditionChars: projection.developerInstructionAddition?.length ?? 0,
|
||||
});
|
||||
promptText = projectionDecision.project ? projection.promptText : params.prompt;
|
||||
developerInstructions = joinPresentSections(
|
||||
baseDeveloperInstructions,
|
||||
workspaceBootstrapInstructions,
|
||||
projection.developerInstructionAddition,
|
||||
);
|
||||
prePromptMessageCount = projection.prePromptMessageCount;
|
||||
};
|
||||
if (activeContextEngine) {
|
||||
try {
|
||||
const assembled = await assembleHarnessContextEngine({
|
||||
contextEngine: activeContextEngine,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: sandboxSessionKey,
|
||||
messages: historyMessages,
|
||||
tokenBudget: params.contextTokenBudget,
|
||||
availableTools: new Set(toolBridge.specs.map((tool) => tool.name).filter(isNonEmptyString)),
|
||||
citationsMode: params.config?.memory?.citations,
|
||||
modelId: params.modelId,
|
||||
prompt: params.prompt,
|
||||
});
|
||||
if (!assembled) {
|
||||
throw new Error("context engine assemble returned no result");
|
||||
}
|
||||
const projection = projectContextEngineAssemblyForCodex({
|
||||
assembledMessages: assembled.messages,
|
||||
originalHistoryMessages: historyMessages,
|
||||
prompt: params.prompt,
|
||||
systemPromptAddition: assembled.systemPromptAddition,
|
||||
maxRenderedContextChars: resolveCodexContextEngineProjectionMaxChars({
|
||||
contextTokenBudget: params.contextTokenBudget,
|
||||
reserveTokens: resolveCodexContextEngineProjectionReserveTokens({
|
||||
config: params.config,
|
||||
}),
|
||||
}),
|
||||
});
|
||||
promptText = projection.promptText;
|
||||
developerInstructions = joinPresentSections(
|
||||
baseDeveloperInstructions,
|
||||
workspaceBootstrapInstructions,
|
||||
projection.developerInstructionAddition,
|
||||
);
|
||||
prePromptMessageCount = projection.prePromptMessageCount;
|
||||
await applyActiveContextEngineProjection(startupBinding);
|
||||
} catch (assembleErr) {
|
||||
embeddedAgentLog.warn("context engine assemble failed; using Codex baseline prompt", {
|
||||
error: formatErrorMessage(assembleErr),
|
||||
@@ -679,13 +754,14 @@ export async function runCodexAppServerAttempt(
|
||||
promptText = projection.promptText;
|
||||
prePromptMessageCount = projection.prePromptMessageCount;
|
||||
}
|
||||
promptText = prependCurrentTurnContext(promptText, params.currentTurnContext);
|
||||
const promptBuild = await resolveAgentHarnessBeforePromptBuildResult({
|
||||
prompt: promptText,
|
||||
developerInstructions,
|
||||
messages: historyMessages,
|
||||
ctx: hookContext,
|
||||
});
|
||||
const buildPromptFromCurrentInputs = () =>
|
||||
resolveAgentHarnessBeforePromptBuildResult({
|
||||
prompt: prependCurrentTurnContext(promptText, params.currentTurnContext),
|
||||
developerInstructions,
|
||||
messages: historyMessages,
|
||||
ctx: hookContext,
|
||||
});
|
||||
let promptBuild = await buildPromptFromCurrentInputs();
|
||||
const systemPromptReport = buildCodexSystemPromptReport({
|
||||
attempt: params,
|
||||
sessionKey: sandboxSessionKey,
|
||||
@@ -794,38 +870,40 @@ export async function runCodexAppServerAttempt(
|
||||
timeoutMs: appServer.requestTimeoutMs,
|
||||
signal: runAbortController.signal,
|
||||
});
|
||||
const threadLifecycleParams = {
|
||||
client: startupClient,
|
||||
params: runtimeParams,
|
||||
agentId: sessionAgentId,
|
||||
cwd: effectiveWorkspace,
|
||||
dynamicTools: toolBridge.specs,
|
||||
appServer: pluginAppServer,
|
||||
developerInstructions: promptBuild.developerInstructions,
|
||||
config: threadConfig,
|
||||
mcpServersFingerprint: bundleMcpThreadConfig.fingerprint,
|
||||
mcpServersFingerprintEvaluated: bundleMcpThreadConfig.evaluated,
|
||||
pluginThreadConfig: pluginThreadConfigEnabled
|
||||
? {
|
||||
enabled: true,
|
||||
inputFingerprint: pluginThreadConfigInputFingerprint,
|
||||
enabledPluginConfigKeys,
|
||||
build: () =>
|
||||
buildCodexPluginThreadConfig({
|
||||
pluginConfig,
|
||||
request: (method, requestParams) =>
|
||||
startupClient.request(method, requestParams, {
|
||||
timeoutMs: appServer.requestTimeoutMs,
|
||||
signal: runAbortController.signal,
|
||||
}),
|
||||
appCache: defaultCodexAppInventoryCache,
|
||||
appCacheKey: pluginAppCacheKey,
|
||||
}),
|
||||
}
|
||||
: undefined,
|
||||
} satisfies Parameters<typeof startOrResumeThread>[0];
|
||||
restartContextEngineCodexThread = () => startOrResumeThread(threadLifecycleParams);
|
||||
const startupThread = await startOrResumeThread(threadLifecycleParams);
|
||||
const buildThreadLifecycleParams = () =>
|
||||
({
|
||||
client: startupClient,
|
||||
params: buildActiveRunAttemptParams(),
|
||||
agentId: sessionAgentId,
|
||||
cwd: effectiveWorkspace,
|
||||
dynamicTools: toolBridge.specs,
|
||||
appServer: pluginAppServer,
|
||||
developerInstructions: promptBuild.developerInstructions,
|
||||
config: threadConfig,
|
||||
mcpServersFingerprint: bundleMcpThreadConfig.fingerprint,
|
||||
mcpServersFingerprintEvaluated: bundleMcpThreadConfig.evaluated,
|
||||
contextEngineProjection,
|
||||
pluginThreadConfig: pluginThreadConfigEnabled
|
||||
? {
|
||||
enabled: true,
|
||||
inputFingerprint: pluginThreadConfigInputFingerprint,
|
||||
enabledPluginConfigKeys,
|
||||
build: () =>
|
||||
buildCodexPluginThreadConfig({
|
||||
pluginConfig,
|
||||
request: (method, requestParams) =>
|
||||
startupClient.request(method, requestParams, {
|
||||
timeoutMs: appServer.requestTimeoutMs,
|
||||
signal: runAbortController.signal,
|
||||
}),
|
||||
appCache: defaultCodexAppInventoryCache,
|
||||
appCacheKey: pluginAppCacheKey,
|
||||
}),
|
||||
}
|
||||
: undefined,
|
||||
}) satisfies Parameters<typeof startOrResumeThread>[0];
|
||||
restartContextEngineCodexThread = () => startOrResumeThread(buildThreadLifecycleParams());
|
||||
const startupThread = await startOrResumeThread(buildThreadLifecycleParams());
|
||||
return { client: startupClient, thread: startupThread };
|
||||
};
|
||||
for (
|
||||
@@ -1500,7 +1578,91 @@ export async function runCodexAppServerAttempt(
|
||||
}
|
||||
});
|
||||
|
||||
const llmInputEvent = {
|
||||
const forceContextEngineCompactionForCodexOverflow = async (error: unknown): Promise<boolean> => {
|
||||
if (!activeContextEngine?.info.ownsCompaction) {
|
||||
return false;
|
||||
}
|
||||
embeddedAgentLog.warn(
|
||||
"codex app-server context-engine turn overflowed; forcing context-engine compaction",
|
||||
{
|
||||
sessionId: activeSessionId,
|
||||
sessionKey: sandboxSessionKey,
|
||||
threadId: thread.threadId,
|
||||
engineId: activeContextEngine.info.id,
|
||||
tokenBudget: params.contextTokenBudget,
|
||||
error: formatErrorMessage(error),
|
||||
},
|
||||
);
|
||||
try {
|
||||
const runtimeContext = buildActiveContextEngineRuntimeContext();
|
||||
const overflowTokenCount = params.contextTokenBudget ?? params.contextWindowInfo?.tokens;
|
||||
const compactResult = await activeContextEngine.compact({
|
||||
sessionId: activeSessionId,
|
||||
sessionKey: sandboxSessionKey,
|
||||
sessionFile: activeSessionFile,
|
||||
tokenBudget: params.contextTokenBudget,
|
||||
force: true,
|
||||
...(overflowTokenCount ? { currentTokenCount: overflowTokenCount } : {}),
|
||||
compactionTarget: "threshold",
|
||||
runtimeContext: overflowTokenCount
|
||||
? {
|
||||
...runtimeContext,
|
||||
currentTokenCount: overflowTokenCount,
|
||||
}
|
||||
: runtimeContext,
|
||||
});
|
||||
embeddedAgentLog.info("codex app-server context-engine forced compaction result", {
|
||||
sessionId: activeSessionId,
|
||||
sessionKey: sandboxSessionKey,
|
||||
engineId: activeContextEngine.info.id,
|
||||
ok: compactResult.ok,
|
||||
compacted: compactResult.compacted,
|
||||
reason: compactResult.reason,
|
||||
tokensBefore: compactResult.result?.tokensBefore,
|
||||
tokensAfter: compactResult.result?.tokensAfter,
|
||||
});
|
||||
if (!compactResult.ok || !compactResult.compacted) {
|
||||
return false;
|
||||
}
|
||||
adoptContextEngineCompactionTranscript(compactResult);
|
||||
const maintenanceRuntimeContext = buildActiveContextEngineRuntimeContext();
|
||||
await runHarnessContextEngineMaintenance({
|
||||
contextEngine: activeContextEngine,
|
||||
sessionId: activeSessionId,
|
||||
sessionKey: sandboxSessionKey,
|
||||
sessionFile: activeSessionFile,
|
||||
reason: "compaction",
|
||||
runtimeContext: maintenanceRuntimeContext,
|
||||
config: params.config,
|
||||
});
|
||||
return true;
|
||||
} catch (compactErr) {
|
||||
embeddedAgentLog.warn("codex app-server context-engine forced compaction failed", {
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: sandboxSessionKey,
|
||||
engineId: activeContextEngine.info.id,
|
||||
error: formatErrorMessage(compactErr),
|
||||
});
|
||||
return false;
|
||||
}
|
||||
};
|
||||
const rebuildPromptAfterContextEngineCompaction = async () => {
|
||||
historyMessages =
|
||||
(await readMirroredSessionHistoryMessages(activeSessionFile)) ?? historyMessages;
|
||||
resetCodexPromptInputs();
|
||||
try {
|
||||
await applyActiveContextEngineProjection(undefined);
|
||||
} catch (assembleErr) {
|
||||
embeddedAgentLog.warn(
|
||||
"context engine assemble failed after forced compaction; using Codex baseline prompt",
|
||||
{
|
||||
error: formatErrorMessage(assembleErr),
|
||||
},
|
||||
);
|
||||
}
|
||||
promptBuild = await buildPromptFromCurrentInputs();
|
||||
};
|
||||
const buildLlmInputEvent = () => ({
|
||||
runId: params.runId,
|
||||
sessionId: params.sessionId,
|
||||
provider: params.provider,
|
||||
@@ -1509,8 +1671,8 @@ export async function runCodexAppServerAttempt(
|
||||
prompt: promptBuild.prompt,
|
||||
historyMessages,
|
||||
imagesCount: params.images?.length ?? 0,
|
||||
};
|
||||
const turnStartFailureMessages = [
|
||||
});
|
||||
const buildTurnStartFailureMessages = () => [
|
||||
...historyMessages,
|
||||
buildCodexUserPromptMessage({ ...params, prompt: promptBuild.prompt }),
|
||||
];
|
||||
@@ -1531,7 +1693,7 @@ export async function runCodexAppServerAttempt(
|
||||
);
|
||||
try {
|
||||
runAgentHarnessLlmInputHook({
|
||||
event: llmInputEvent,
|
||||
event: buildLlmInputEvent(),
|
||||
ctx: hookContext,
|
||||
});
|
||||
emitCodexAppServerEvent(params, {
|
||||
@@ -1556,7 +1718,15 @@ export async function runCodexAppServerAttempt(
|
||||
error: formatErrorMessage(turnStartError),
|
||||
},
|
||||
);
|
||||
await clearCodexAppServerBinding(params.sessionFile);
|
||||
const preRetrySessionFile = activeSessionFile;
|
||||
const compactedForRetry = await forceContextEngineCompactionForCodexOverflow(turnStartError);
|
||||
await clearCodexAppServerBinding(preRetrySessionFile);
|
||||
if (activeSessionFile !== preRetrySessionFile) {
|
||||
await clearCodexAppServerBinding(activeSessionFile);
|
||||
}
|
||||
if (compactedForRetry) {
|
||||
await rebuildPromptAfterContextEngineCompaction();
|
||||
}
|
||||
thread = await restartContextEngineCodexThread();
|
||||
emitCodexAppServerEvent(params, {
|
||||
stream: "codex_app_server.lifecycle",
|
||||
@@ -1607,7 +1777,7 @@ export async function runCodexAppServerAttempt(
|
||||
});
|
||||
runAgentHarnessAgentEndHook({
|
||||
event: {
|
||||
messages: turnStartFailureMessages,
|
||||
messages: buildTurnStartFailureMessages(),
|
||||
success: false,
|
||||
error: turnStartErrorMessage,
|
||||
durationMs: Date.now() - attemptStartedAt,
|
||||
@@ -1636,7 +1806,7 @@ export async function runCodexAppServerAttempt(
|
||||
return buildCodexTurnStartFailureResult({
|
||||
params,
|
||||
message: usageLimitError.message,
|
||||
messagesSnapshot: turnStartFailureMessages,
|
||||
messagesSnapshot: buildTurnStartFailureMessages(),
|
||||
systemPromptReport,
|
||||
});
|
||||
}
|
||||
@@ -1806,21 +1976,21 @@ export async function runCodexAppServerAttempt(
|
||||
if (activeContextEngine) {
|
||||
const activeContextEnginePluginId = resolveContextEngineOwnerPluginId(activeContextEngine);
|
||||
const finalMessages =
|
||||
(await readMirroredSessionHistoryMessages(params.sessionFile)) ??
|
||||
(await readMirroredSessionHistoryMessages(activeSessionFile)) ??
|
||||
historyMessages.concat(result.messagesSnapshot);
|
||||
await finalizeHarnessContextEngineTurn({
|
||||
contextEngine: activeContextEngine,
|
||||
promptError: Boolean(finalPromptError),
|
||||
aborted: finalAborted,
|
||||
yieldAborted: Boolean(result.yieldDetected),
|
||||
sessionIdUsed: params.sessionId,
|
||||
sessionIdUsed: activeSessionId,
|
||||
sessionKey: sandboxSessionKey,
|
||||
sessionFile: params.sessionFile,
|
||||
sessionFile: activeSessionFile,
|
||||
messagesSnapshot: finalMessages,
|
||||
prePromptMessageCount,
|
||||
tokenBudget: params.contextTokenBudget,
|
||||
runtimeContext: buildHarnessContextEngineRuntimeContextFromUsage({
|
||||
attempt: runtimeParams,
|
||||
attempt: buildActiveRunAttemptParams(),
|
||||
workspaceDir: effectiveWorkspace,
|
||||
agentDir,
|
||||
activeAgentId: sessionAgentId,
|
||||
@@ -2421,6 +2591,65 @@ function shouldProjectMirroredHistoryForCodexStart(params: {
|
||||
});
|
||||
}
|
||||
|
||||
function readContextEngineThreadBootstrapProjection(
|
||||
projection: ContextEngineProjection | undefined,
|
||||
): CodexContextEngineThreadBootstrapProjection | undefined {
|
||||
if (projection?.mode !== "thread_bootstrap") {
|
||||
return undefined;
|
||||
}
|
||||
const epoch = projection.epoch?.trim();
|
||||
if (!epoch) {
|
||||
embeddedAgentLog.warn(
|
||||
"context engine requested Codex thread-bootstrap projection without an epoch; using per-turn projection",
|
||||
);
|
||||
return undefined;
|
||||
}
|
||||
const fingerprint = projection.fingerprint?.trim();
|
||||
return {
|
||||
mode: "thread_bootstrap",
|
||||
epoch,
|
||||
...(fingerprint ? { fingerprint } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function resolveContextEngineBootstrapProjectionDecision(params: {
|
||||
startupBinding: CodexAppServerThreadBinding | undefined;
|
||||
expectedBinding: ReturnType<typeof buildContextEngineBinding>;
|
||||
projection: CodexContextEngineThreadBootstrapProjection;
|
||||
dynamicToolsFingerprint: string;
|
||||
}): { project: boolean; reason: string } {
|
||||
const bindingProjection = params.startupBinding?.contextEngine?.projection;
|
||||
if (!params.startupBinding?.threadId || !bindingProjection) {
|
||||
return {
|
||||
project: true,
|
||||
reason: !params.startupBinding?.threadId
|
||||
? "missing-thread-binding"
|
||||
: "missing-projection-binding",
|
||||
};
|
||||
}
|
||||
if (
|
||||
!params.expectedBinding ||
|
||||
!isContextEngineBindingCompatible(params.startupBinding.contextEngine, params.expectedBinding)
|
||||
) {
|
||||
return { project: true, reason: "context-engine-binding-mismatch" };
|
||||
}
|
||||
if (
|
||||
!areCodexDynamicToolFingerprintsCompatible({
|
||||
previous: params.startupBinding.dynamicToolsFingerprint,
|
||||
next: params.dynamicToolsFingerprint,
|
||||
})
|
||||
) {
|
||||
return { project: true, reason: "dynamic-tools-mismatch" };
|
||||
}
|
||||
const projectionChanged =
|
||||
bindingProjection.mode !== "thread_bootstrap" ||
|
||||
bindingProjection.epoch !== params.projection.epoch ||
|
||||
bindingProjection.fingerprint !== params.projection.fingerprint;
|
||||
return projectionChanged
|
||||
? { project: true, reason: "projection-mismatch" }
|
||||
: { project: false, reason: "matching-thread-bootstrap-binding" };
|
||||
}
|
||||
|
||||
async function withCodexStartupTimeout<T>(params: {
|
||||
timeoutMs: number;
|
||||
signal: AbortSignal;
|
||||
|
||||
@@ -54,6 +54,14 @@ export type CodexAppServerContextEngineBinding = {
|
||||
schemaVersion: 1;
|
||||
engineId: string;
|
||||
policyFingerprint: string;
|
||||
projection?: CodexAppServerContextEngineProjectionBinding;
|
||||
};
|
||||
|
||||
export type CodexAppServerContextEngineProjectionBinding = {
|
||||
schemaVersion: 1;
|
||||
mode: "thread_bootstrap";
|
||||
epoch: string;
|
||||
fingerprint?: string;
|
||||
};
|
||||
|
||||
export function resolveCodexAppServerBindingPath(sessionFile: string): string {
|
||||
@@ -182,6 +190,30 @@ function readContextEngineBinding(value: unknown): CodexAppServerContextEngineBi
|
||||
schemaVersion: 1,
|
||||
engineId: record.engineId,
|
||||
policyFingerprint: record.policyFingerprint,
|
||||
projection: readContextEngineProjectionBinding(record.projection),
|
||||
};
|
||||
}
|
||||
|
||||
function readContextEngineProjectionBinding(
|
||||
value: unknown,
|
||||
): CodexAppServerContextEngineProjectionBinding | undefined {
|
||||
if (!value || typeof value !== "object" || Array.isArray(value)) {
|
||||
return undefined;
|
||||
}
|
||||
const record = value as Record<string, unknown>;
|
||||
if (
|
||||
record.schemaVersion !== 1 ||
|
||||
record.mode !== "thread_bootstrap" ||
|
||||
typeof record.epoch !== "string" ||
|
||||
!record.epoch.trim()
|
||||
) {
|
||||
return undefined;
|
||||
}
|
||||
return {
|
||||
schemaVersion: 1,
|
||||
mode: "thread_bootstrap",
|
||||
epoch: record.epoch,
|
||||
fingerprint: typeof record.fingerprint === "string" ? record.fingerprint : undefined,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -41,6 +41,7 @@ import {
|
||||
writeCodexAppServerBinding,
|
||||
type CodexAppServerAuthProfileLookup,
|
||||
type CodexAppServerContextEngineBinding,
|
||||
type CodexAppServerContextEngineProjectionBinding,
|
||||
type CodexAppServerThreadBinding,
|
||||
} from "./session-binding.js";
|
||||
|
||||
@@ -53,6 +54,12 @@ export type CodexAppServerThreadLifecycleBinding = CodexAppServerThreadBinding &
|
||||
lifecycle: CodexAppServerThreadLifecycle;
|
||||
};
|
||||
|
||||
export type CodexContextEngineThreadBootstrapProjection = {
|
||||
mode: "thread_bootstrap";
|
||||
epoch: string;
|
||||
fingerprint?: string;
|
||||
};
|
||||
|
||||
export type CodexPluginThreadConfigProvider = {
|
||||
enabled: boolean;
|
||||
inputFingerprint?: string;
|
||||
@@ -81,9 +88,13 @@ export async function startOrResumeThread(params: {
|
||||
mcpServersFingerprint?: string;
|
||||
mcpServersFingerprintEvaluated?: boolean;
|
||||
pluginThreadConfig?: CodexPluginThreadConfigProvider;
|
||||
contextEngineProjection?: CodexContextEngineThreadBootstrapProjection;
|
||||
}): Promise<CodexAppServerThreadLifecycleBinding> {
|
||||
const dynamicToolsFingerprint = fingerprintDynamicTools(params.dynamicTools);
|
||||
const contextEngineBinding = buildContextEngineBinding(params.params);
|
||||
const contextEngineBinding = buildContextEngineBinding(
|
||||
params.params,
|
||||
params.contextEngineProjection,
|
||||
);
|
||||
const userMcpServersConfigPatch = buildCodexUserMcpServersThreadConfigPatch(
|
||||
params.params.config,
|
||||
{
|
||||
@@ -110,6 +121,12 @@ export async function startOrResumeThread(params: {
|
||||
threadId: binding.threadId,
|
||||
engineId: contextEngineBinding?.engineId,
|
||||
previousEngineId: binding.contextEngine?.engineId,
|
||||
epoch: contextEngineBinding?.projection?.epoch,
|
||||
previousEpoch: binding.contextEngine?.projection?.epoch,
|
||||
fingerprint: contextEngineBinding?.projection?.fingerprint,
|
||||
previousFingerprint: binding.contextEngine?.projection?.fingerprint,
|
||||
policyFingerprint: contextEngineBinding?.policyFingerprint,
|
||||
previousPolicyFingerprint: binding.contextEngine?.policyFingerprint,
|
||||
},
|
||||
);
|
||||
await clearCodexAppServerBinding(params.params.sessionFile);
|
||||
@@ -261,6 +278,17 @@ export async function startOrResumeThread(params: {
|
||||
config: params.params.config,
|
||||
},
|
||||
);
|
||||
if (contextEngineBinding) {
|
||||
embeddedAgentLog.info("codex app-server wrote context-engine thread binding", {
|
||||
sessionId: params.params.sessionId,
|
||||
sessionKey: params.params.sessionKey,
|
||||
threadId: response.thread.id,
|
||||
engineId: contextEngineBinding.engineId,
|
||||
epoch: contextEngineBinding.projection?.epoch,
|
||||
fingerprint: contextEngineBinding.projection?.fingerprint,
|
||||
action: "resumed",
|
||||
});
|
||||
}
|
||||
return {
|
||||
...binding,
|
||||
threadId: response.thread.id,
|
||||
@@ -343,6 +371,17 @@ export async function startOrResumeThread(params: {
|
||||
config: params.params.config,
|
||||
},
|
||||
);
|
||||
if (contextEngineBinding) {
|
||||
embeddedAgentLog.info("codex app-server wrote context-engine thread binding", {
|
||||
sessionId: params.params.sessionId,
|
||||
sessionKey: params.params.sessionKey,
|
||||
threadId: response.thread.id,
|
||||
engineId: contextEngineBinding.engineId,
|
||||
epoch: contextEngineBinding.projection?.epoch,
|
||||
fingerprint: contextEngineBinding.projection?.fingerprint,
|
||||
action: rotatedContextEngineBinding ? "rotated" : "started",
|
||||
});
|
||||
}
|
||||
}
|
||||
return {
|
||||
schemaVersion: 1,
|
||||
@@ -368,8 +407,9 @@ export async function startOrResumeThread(params: {
|
||||
};
|
||||
}
|
||||
|
||||
function buildContextEngineBinding(
|
||||
export function buildContextEngineBinding(
|
||||
params: EmbeddedRunAttemptParams,
|
||||
projection?: CodexContextEngineThreadBootstrapProjection,
|
||||
): CodexAppServerContextEngineBinding | undefined {
|
||||
const contextEngine = isActiveHarnessContextEngine(params.contextEngine)
|
||||
? params.contextEngine
|
||||
@@ -396,17 +436,45 @@ function buildContextEngineBinding(
|
||||
}),
|
||||
}),
|
||||
}),
|
||||
projection: projection ? buildContextEngineProjectionBinding(projection) : undefined,
|
||||
};
|
||||
}
|
||||
|
||||
function isContextEngineBindingCompatible(
|
||||
function buildContextEngineProjectionBinding(
|
||||
projection: CodexContextEngineThreadBootstrapProjection,
|
||||
): CodexAppServerContextEngineProjectionBinding {
|
||||
return {
|
||||
schemaVersion: 1,
|
||||
mode: "thread_bootstrap",
|
||||
epoch: projection.epoch,
|
||||
fingerprint: projection.fingerprint,
|
||||
};
|
||||
}
|
||||
|
||||
export function isContextEngineBindingCompatible(
|
||||
previous: CodexAppServerContextEngineBinding | undefined,
|
||||
next: CodexAppServerContextEngineBinding,
|
||||
): boolean {
|
||||
return (
|
||||
previous?.schemaVersion === next.schemaVersion &&
|
||||
previous.engineId === next.engineId &&
|
||||
previous.policyFingerprint === next.policyFingerprint
|
||||
previous.policyFingerprint === next.policyFingerprint &&
|
||||
areContextEngineProjectionBindingsCompatible(previous.projection, next.projection)
|
||||
);
|
||||
}
|
||||
|
||||
function areContextEngineProjectionBindingsCompatible(
|
||||
previous: CodexAppServerContextEngineProjectionBinding | undefined,
|
||||
next: CodexAppServerContextEngineProjectionBinding | undefined,
|
||||
): boolean {
|
||||
if (!next) {
|
||||
return previous === undefined;
|
||||
}
|
||||
return (
|
||||
previous?.schemaVersion === next.schemaVersion &&
|
||||
previous.mode === next.mode &&
|
||||
previous.epoch === next.epoch &&
|
||||
previous.fingerprint === next.fingerprint
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -24,6 +24,24 @@ export type AssembleResult = {
|
||||
promptAuthority?: "assembled" | "preassembly_may_overflow";
|
||||
/** Optional context-engine-provided instructions prepended to the runtime system prompt */
|
||||
systemPromptAddition?: string;
|
||||
/**
|
||||
* Optional projection lifecycle for hosts with persistent backend threads.
|
||||
*
|
||||
* Context engines that return `thread_bootstrap` ask the host to inject the
|
||||
* assembled context once for the supplied epoch, then reuse the backend
|
||||
* thread until the epoch changes. Engines that omit this field retain the
|
||||
* legacy per-turn projection behavior.
|
||||
*/
|
||||
contextProjection?: ContextEngineProjection;
|
||||
};
|
||||
|
||||
export type ContextEngineProjection = {
|
||||
/** How the assembled context should be projected into the backend runtime. */
|
||||
mode: "per_turn" | "thread_bootstrap";
|
||||
/** Stable context epoch. Changing this tells persistent backends to rotate. */
|
||||
epoch?: string;
|
||||
/** Optional diagnostic fingerprint for the projected context payload. */
|
||||
fingerprint?: string;
|
||||
};
|
||||
|
||||
export type CompactResult = {
|
||||
|
||||
@@ -40,7 +40,10 @@ export type {
|
||||
EmbeddedRunAttemptParams,
|
||||
EmbeddedRunAttemptResult,
|
||||
} from "../agents/pi-embedded-runner/run/types.js";
|
||||
export type { ContextEngine as HarnessContextEngine } from "../context-engine/types.js";
|
||||
export type {
|
||||
ContextEngine as HarnessContextEngine,
|
||||
ContextEngineProjection,
|
||||
} from "../context-engine/types.js";
|
||||
export type { CompactEmbeddedPiSessionParams } from "../agents/pi-embedded-runner/compact.js";
|
||||
export type { EmbeddedPiCompactResult } from "../agents/pi-embedded-runner/types.js";
|
||||
export type { AnyAgentTool } from "../agents/tools/common.js";
|
||||
|
||||
Reference in New Issue
Block a user