fix(codex): surface native compaction failures (#85160)

* fix(codex): surface native compaction failures

* docs: add changelog for codex compaction fix

* test: align compaction failure fixtures
This commit is contained in:
Josh Avant
2026-05-21 19:41:54 -07:00
committed by GitHub
parent c8a35c4645
commit b8e9ab9385
44 changed files with 3173 additions and 328 deletions

View File

@@ -42,6 +42,7 @@ Docs: https://docs.openclaw.ai
- Agents: cap heartbeat model bleed context hints by the stored session window when runtime model metadata is unavailable, so overflow recovery advice does not suggest a larger window than the active session actually has.
- Control UI/Web Push: use `https://openclaw.ai` as the generated default VAPID subject instead of the old localhost mailbox so iOS PWA push setup uses an Apple-acceptable subject when `OPENCLAW_VAPID_SUBJECT` is unset. Fixes #83134. (#83317) Thanks @IWhatsskill.
- Agents/Pi: keep embedded session transcript writes from tripping false takeover detection after packaged npm onboarding agent turns.
- Codex/TUI: surface Codex-native post-turn compaction failures instead of continuing uncompacted, and keep successful native compaction serialized before local idle/next-turn handling. Fixes #84305. (#85160) Thanks @joshavant.
- Memory/search: stop recall tracking from writing dreaming side-effect artifacts when `dreaming.enabled=false`, while preserving normal search results. Fixes #84436. (#84444) Thanks @NianJiuZst.
- Diffs: render viewer toolbar icons from a closed icon-name map instead of HTML strings, removing the toolbar icon XSS sink. (#83955) Thanks @tanshanshan.
- QA: keep `pnpm qa:e2e` self-check runs inside the private QA runtime envelope even when inherited shell env disables bundled plugins.

View File

@@ -110,16 +110,29 @@ describe("maybeCompactCodexAppServerSession", () => {
method: "thread/compacted",
params: { threadId: "thread-1", turnId: "turn-1" },
});
fake.emit({
method: "thread/tokenUsage/updated",
params: {
threadId: "thread-1",
tokenUsage: {
last_token_usage: {
total_tokens: 27_170,
},
},
},
});
const result = requireCompactResult(await pendingResult);
expect(result.ok).toBe(true);
expect(result.compacted).toBe(true);
expect(result.result?.tokensBefore).toBe(123);
expect(result.result?.tokensAfter).toBe(27_170);
const details = compactDetails(result);
expect(details.backend).toBe("codex-app-server");
expect(details.threadId).toBe("thread-1");
expect(details.signal).toBe("thread/compacted");
expect(details.turnId).toBe("turn-1");
expect(details.tokenUsageSource).toBe("thread/tokenUsage/updated");
});
it("blocks native app-server compaction when the current OpenClaw session is sandboxed", async () => {
@@ -137,7 +150,73 @@ describe("maybeCompactCodexAppServerSession", () => {
expect(fake.request).not.toHaveBeenCalled();
});
it("accepts native context-compaction item completion as success", async () => {
it("uses native token usage that arrives before compaction completion", async () => {
const fake = createFakeCodexClient();
setCodexAppServerClientFactoryForTest(async () => fake.client);
const sessionFile = await writeTestBinding();
const pendingResult = startCompaction(sessionFile, { currentTokenCount: 123 });
await vi.waitFor(() => {
expect(fake.request).toHaveBeenCalledWith("thread/compact/start", { threadId: "thread-1" });
});
fake.emit({
method: "thread/tokenUsage/updated",
params: {
threadId: "thread-1",
tokenUsage: {
last_token_usage: {
total_tokens: 18_004,
},
},
},
});
fake.emit({
method: "thread/compacted",
params: { threadId: "thread-1", turnId: "turn-1" },
});
const result = requireCompactResult(await pendingResult);
expect(result.ok).toBe(true);
expect(result.compacted).toBe(true);
expect(result.result?.tokensAfter).toBe(18_004);
expect(compactDetails(result).tokenUsageSource).toBe("thread/tokenUsage/updated");
});
it("accepts native current token usage with a total alias", async () => {
const fake = createFakeCodexClient();
setCodexAppServerClientFactoryForTest(async () => fake.client);
const sessionFile = await writeTestBinding();
const pendingResult = startCompaction(sessionFile, { currentTokenCount: 123 });
await vi.waitFor(() => {
expect(fake.request).toHaveBeenCalledWith("thread/compact/start", { threadId: "thread-1" });
});
fake.emit({
method: "thread/tokenUsage/updated",
params: {
threadId: "thread-1",
tokenUsage: {
last: {
total: 16_384,
},
},
},
});
fake.emit({
method: "thread/compacted",
params: { threadId: "thread-1", turnId: "turn-1" },
});
const result = requireCompactResult(await pendingResult);
expect(result.ok).toBe(true);
expect(result.compacted).toBe(true);
expect(result.result?.tokensAfter).toBe(16_384);
expect(compactDetails(result).tokenUsageSource).toBe("thread/tokenUsage/updated");
});
it("accepts native context-compaction item completion with unknown token count as success", async () => {
const fake = createFakeCodexClient();
setCodexAppServerClientFactoryForTest(async () => fake.client);
const sessionFile = await writeTestBinding();
@@ -158,11 +237,44 @@ describe("maybeCompactCodexAppServerSession", () => {
const result = requireCompactResult(await pendingResult);
expect(result.ok).toBe(true);
expect(result.compacted).toBe(true);
expect(result.result?.tokensAfter).toBeUndefined();
const details = compactDetails(result);
expect(details.signal).toBe("item/completed");
expect(details.itemId).toBe("compact-1");
});
it("does not treat zero native token usage as an authoritative post-compaction count", async () => {
const fake = createFakeCodexClient();
setCodexAppServerClientFactoryForTest(async () => fake.client);
const sessionFile = await writeTestBinding();
const pendingResult = startCompaction(sessionFile, { currentTokenCount: 123 });
await vi.waitFor(() => {
expect(fake.request).toHaveBeenCalledWith("thread/compact/start", { threadId: "thread-1" });
});
fake.emit({
method: "thread/compacted",
params: { threadId: "thread-1", turnId: "turn-1" },
});
fake.emit({
method: "thread/tokenUsage/updated",
params: {
threadId: "thread-1",
tokenUsage: {
last_token_usage: {
total_tokens: 0,
},
},
},
});
const result = requireCompactResult(await pendingResult);
expect(result.ok).toBe(true);
expect(result.compacted).toBe(true);
expect(result.result?.tokensAfter).toBeUndefined();
expect(compactDetails(result).tokenUsageSource).toBeUndefined();
});
it("reuses the bound auth profile for native compaction", async () => {
const fake = createFakeCodexClient();
let seenAuthProfileId: string | undefined;
@@ -185,6 +297,39 @@ describe("maybeCompactCodexAppServerSession", () => {
expect(seenAuthProfileId).toBe("openai-codex:work");
});
it("reports missing thread bindings as failed native compaction", async () => {
const sessionFile = path.join(tempDir, "missing-binding.jsonl");
const result = requireCompactResult(
await startCompaction(sessionFile, { currentTokenCount: 123 }),
);
expect(result.ok).toBe(false);
expect(result.compacted).toBe(false);
expect(result.reason).toBe("no codex app-server thread binding");
expect(result.failure?.reason).toBe("missing_thread_binding");
expect(result.result).toBeUndefined();
});
it("clears stale thread bindings and reports failed native compaction", async () => {
const fake = createFakeCodexClient();
fake.request.mockRejectedValueOnce(new Error("thread not found: thread-1"));
setCodexAppServerClientFactoryForTest(async () => fake.client);
const sessionFile = await writeTestBinding();
const result = requireCompactResult(
await startCompaction(sessionFile, { currentTokenCount: 456 }),
);
expect(fake.request).toHaveBeenCalledWith("thread/compact/start", { threadId: "thread-1" });
expect(await readCodexAppServerBinding(sessionFile)).toBeUndefined();
expect(result.ok).toBe(false);
expect(result.compacted).toBe(false);
expect(result.reason).toBe("thread not found: thread-1");
expect(result.failure?.reason).toBe("stale_thread_binding");
expect(result.result).toBeUndefined();
});
it("warns when stale OpenClaw compaction overrides are ignored", async () => {
const warn = vi.spyOn(embeddedAgentLog, "warn").mockImplementation(() => undefined);
const fake = createFakeCodexClient();
@@ -541,6 +686,58 @@ describe("maybeCompactCodexAppServerSession", () => {
);
});
it("honors explicit force for budget-triggered owning context-engine compaction", async () => {
const info = vi.spyOn(embeddedAgentLog, "info").mockImplementation(() => undefined);
const sessionFile = await writeTestBinding();
const compact = vi.fn(async () => ({
ok: true,
compacted: true,
result: {
summary: "engine summary",
firstKeptEntryId: "entry-1",
tokensBefore: 900,
tokensAfter: 100,
},
}));
const contextEngine: ContextEngine = {
info: { id: "lossless-claw", name: "Lossless Claw", ownsCompaction: true },
assemble: vi.fn() as never,
ingest: vi.fn() as never,
compact,
};
const result = requireCompactResult(
await maybeCompactCodexAppServerSession({
sessionId: "session-1",
sessionKey: "agent:main:session-1",
sessionFile,
workspaceDir: tempDir,
contextEngine,
contextTokenBudget: 777,
currentTokenCount: 900,
trigger: "budget",
force: true,
}),
);
expect(result.ok).toBe(true);
expect(result.compacted).toBe(true);
expect(compact).toHaveBeenCalledWith(
expect.objectContaining({
compactionTarget: "budget",
force: true,
}),
);
expect(info).toHaveBeenCalledWith(
"starting context-engine-owned Codex app-server compaction",
expect.objectContaining({
trigger: "budget",
compactionTarget: "budget",
force: true,
}),
);
});
it("adopts successor transcript handles after owning context-engine compaction", async () => {
const sessionFile = await writeTestBinding();
const successorFile = path.join(tempDir, "session.compacted.jsonl");

View File

@@ -22,6 +22,7 @@ type CodexNativeCompactionCompletion = {
signal: "thread/compacted" | "item/completed";
turnId?: string;
itemId?: string;
tokensAfter?: number;
};
type CodexNativeCompactionWaiter = {
promise: Promise<CodexNativeCompactionCompletion>;
@@ -30,6 +31,7 @@ type CodexNativeCompactionWaiter = {
};
const DEFAULT_CODEX_COMPACTION_WAIT_TIMEOUT_MS = 5 * 60 * 1000;
const CODEX_COMPACTION_TOKEN_USAGE_GRACE_MS = 250;
const warnedIgnoredCompactionOverrides = new Set<string>();
export async function maybeCompactCodexAppServerSession(
@@ -70,6 +72,8 @@ async function compactOwningContextEngine(
params: CompactEmbeddedPiSessionParams,
contextEngine: NonNullable<CompactEmbeddedPiSessionParams["contextEngine"]>,
): Promise<EmbeddedPiCompactResult> {
const compactionTarget = params.trigger === "manual" ? "threshold" : "budget";
const force = params.force === true || params.trigger === "manual";
embeddedAgentLog.info("starting context-engine-owned Codex app-server compaction", {
sessionId: params.sessionId,
sessionKey: params.sessionKey,
@@ -77,8 +81,8 @@ async function compactOwningContextEngine(
tokenBudget: params.contextTokenBudget,
currentTokenCount: params.currentTokenCount,
trigger: params.trigger,
compactionTarget: params.trigger === "manual" ? "threshold" : "budget",
force: params.trigger === "manual",
compactionTarget,
force,
});
let result: Awaited<ReturnType<typeof contextEngine.compact>>;
try {
@@ -95,9 +99,9 @@ async function compactOwningContextEngine(
sessionFile: params.sessionFile,
tokenBudget: params.contextTokenBudget,
currentTokenCount: params.currentTokenCount,
compactionTarget: params.trigger === "manual" ? "threshold" : "budget",
compactionTarget,
customInstructions: params.customInstructions,
force: params.trigger === "manual",
force,
runtimeContext: params.contextEngineRuntimeContext,
},
resolveCompactionTimeoutMs(params.config),
@@ -137,9 +141,9 @@ async function compactOwningContextEngine(
error: formatErrorMessage(error),
});
}
await clearCodexAppServerBinding(params.sessionFile);
await clearCodexAppServerBinding(params.sessionFile, { config: params.config });
if (compactedSessionFile !== params.sessionFile) {
await clearCodexAppServerBinding(compactedSessionFile);
await clearCodexAppServerBinding(compactedSessionFile, { config: params.config });
}
}
@@ -335,7 +339,10 @@ async function compactCodexNativeThread(
const appServer = resolveCodexAppServerRuntimeOptions({ pluginConfig: options.pluginConfig });
const binding = await readCodexAppServerBinding(params.sessionFile, { config: params.config });
if (!binding?.threadId) {
return { ok: false, compacted: false, reason: "no codex app-server thread binding" };
return failedCodexThreadBindingCompactionResult(params, {
reason: "no codex app-server thread binding",
recovery: "missing_thread_binding",
});
}
const requestedAuthProfileId = params.authProfileId?.trim() || undefined;
if (
@@ -367,6 +374,14 @@ async function compactCodexNativeThread(
completion = await waiter.promise;
} catch (error) {
waiter.cancel();
if (isCodexThreadNotFoundError(error)) {
await clearCodexAppServerBinding(params.sessionFile, { config: params.config });
return failedCodexThreadBindingCompactionResult(params, {
threadId: binding.threadId,
reason: formatCompactionError(error),
recovery: "stale_thread_binding",
});
}
return {
ok: false,
compacted: false,
@@ -379,7 +394,22 @@ async function compactCodexNativeThread(
signal: completion.signal,
turnId: completion.turnId,
itemId: completion.itemId,
tokensAfter: completion.tokensAfter,
});
const resultDetails: JsonObject = {
backend: "codex-app-server",
threadId: binding.threadId,
signal: completion.signal,
};
if (completion.turnId) {
resultDetails.turnId = completion.turnId;
}
if (completion.itemId) {
resultDetails.itemId = completion.itemId;
}
if (completion.tokensAfter !== undefined) {
resultDetails.tokenUsageSource = "thread/tokenUsage/updated";
}
return {
ok: true,
compacted: true,
@@ -387,17 +417,42 @@ async function compactCodexNativeThread(
summary: "",
firstKeptEntryId: "",
tokensBefore: params.currentTokenCount ?? 0,
details: {
backend: "codex-app-server",
threadId: binding.threadId,
signal: completion.signal,
turnId: completion.turnId,
itemId: completion.itemId,
},
...(completion.tokensAfter !== undefined ? { tokensAfter: completion.tokensAfter } : {}),
details: resultDetails,
},
};
}
function failedCodexThreadBindingCompactionResult(
params: CompactEmbeddedPiSessionParams,
recovery: {
reason: string;
recovery: "missing_thread_binding" | "stale_thread_binding";
threadId?: string;
},
): EmbeddedPiCompactResult {
embeddedAgentLog.warn("codex app-server compaction could not use thread binding", {
sessionId: params.sessionId,
sessionKey: params.sessionKey,
threadId: recovery.threadId,
reason: recovery.reason,
recovery: recovery.recovery,
});
return {
ok: false,
compacted: false,
reason: recovery.reason,
failure: {
reason: recovery.recovery,
rawError: recovery.reason,
},
};
}
function isCodexThreadNotFoundError(error: unknown): boolean {
return formatCompactionError(error).toLowerCase().includes("thread not found");
}
function createCodexNativeCompactionWaiter(
client: CodexAppServerClient,
threadId: string,
@@ -405,6 +460,7 @@ function createCodexNativeCompactionWaiter(
let settled = false;
let removeHandler: () => void = () => {};
let timeout: ReturnType<typeof setTimeout> | undefined;
let tokenUsageGraceTimeout: ReturnType<typeof setTimeout> | undefined;
let failWaiter: (error: Error) => void = () => {};
const promise = new Promise<CodexNativeCompactionCompletion>((resolve, reject) => {
@@ -413,6 +469,9 @@ function createCodexNativeCompactionWaiter(
if (timeout) {
clearTimeout(timeout);
}
if (tokenUsageGraceTimeout) {
clearTimeout(tokenUsageGraceTimeout);
}
};
const complete = (completion: CodexNativeCompactionCompletion): void => {
if (settled) {
@@ -430,11 +489,49 @@ function createCodexNativeCompactionWaiter(
cleanup();
reject(error);
};
let latestTokensAfter: number | undefined;
const completionWithLatestTokenUsage = (
completion: CodexNativeCompactionCompletion,
): CodexNativeCompactionCompletion =>
latestTokensAfter === undefined
? completion
: { ...completion, tokensAfter: latestTokensAfter };
const completeAfterTokenUsageGrace = (completion: CodexNativeCompactionCompletion): void => {
if (settled || tokenUsageGraceTimeout) {
return;
}
if (timeout) {
clearTimeout(timeout);
timeout = undefined;
}
tokenUsageGraceTimeout = setTimeout(
() => complete(completionWithLatestTokenUsage(observedCompletion ?? completion)),
CODEX_COMPACTION_TOKEN_USAGE_GRACE_MS,
);
tokenUsageGraceTimeout.unref?.();
};
failWaiter = fail;
let observedCompletion: CodexNativeCompactionCompletion | undefined;
const handler: CodexServerNotificationHandler = (notification) => {
const tokensAfter = readNativeCompactionTokenUsage(notification, threadId);
if (tokensAfter !== undefined) {
latestTokensAfter = tokensAfter;
if (observedCompletion) {
complete(completionWithLatestTokenUsage(observedCompletion));
return;
}
}
const completion = readNativeCompactionCompletion(notification, threadId);
if (completion) {
complete(completion);
observedCompletion = completionWithLatestTokenUsage({
...observedCompletion,
...completion,
});
if (latestTokensAfter !== undefined) {
complete(observedCompletion);
return;
}
completeAfterTokenUsageGrace(observedCompletion);
}
};
removeHandler = client.addNotificationHandler(handler);
@@ -464,6 +561,49 @@ function createCodexNativeCompactionWaiter(
};
}
function readNativeCompactionTokenUsage(
notification: CodexServerNotification,
threadId: string,
): number | undefined {
const params = notification.params;
if (!isJsonObject(params) || readString(params, "threadId", "thread_id") !== threadId) {
return undefined;
}
if (notification.method !== "thread/tokenUsage/updated") {
return undefined;
}
const tokenUsage = isJsonObject(params.tokenUsage) ? params.tokenUsage : undefined;
const currentUsage = readCodexCurrentTokenUsage(tokenUsage) ?? readCodexCurrentTokenUsage(params);
return readCodexTotalTokens(currentUsage);
}
function readCodexCurrentTokenUsage(value: JsonObject | undefined): JsonObject | undefined {
if (!value) {
return undefined;
}
for (const key of [
"last",
"current",
"lastCall",
"lastCallUsage",
"lastTokenUsage",
"last_token_usage",
]) {
const usage = value[key];
if (isJsonObject(usage)) {
return usage;
}
}
return undefined;
}
function readCodexTotalTokens(value: JsonObject | undefined): number | undefined {
const totalTokens = value?.total_tokens ?? value?.totalTokens ?? value?.total;
return typeof totalTokens === "number" && Number.isFinite(totalTokens) && totalTokens > 0
? Math.floor(totalTokens)
: undefined;
}
function readNativeCompactionCompletion(
notification: CodexServerNotification,
threadId: string,

View File

@@ -280,7 +280,10 @@ function readPluginAppPolicyContext(value: unknown): PluginAppPolicyContext | un
};
}
export async function clearCodexAppServerBinding(sessionFile: string): Promise<void> {
export async function clearCodexAppServerBinding(
sessionFile: string,
_lookup: Omit<CodexAppServerAuthProfileLookup, "authProfileId"> = {},
): Promise<void> {
try {
await fs.unlink(resolveCodexAppServerBindingPath(sessionFile));
} catch (error) {

View File

@@ -242,6 +242,7 @@ vi.mock("../logging/subsystem.js", () => ({
}));
vi.mock("../routing/session-key.js", () => ({
isSubagentSessionKey: () => false,
normalizeAgentId: (id: string) => id,
normalizeMainKey: (key?: string | null) => key?.trim() || "main",
}));
@@ -673,6 +674,18 @@ function setupModelSwitchRetry(switchOptions: ModelSwitchOptions) {
});
}
function setupSingleAttemptFallback() {
state.runWithModelFallbackMock.mockImplementation(async (params: FallbackRunnerParams) => {
const result = await params.run(params.provider, params.model);
return {
result,
provider: params.provider,
model: params.model,
attempts: [],
};
});
}
function requireRecord(value: unknown, label: string): Record<string, unknown> {
if (!value || typeof value !== "object") {
throw new Error(`expected ${label} to be an object`);
@@ -816,6 +829,98 @@ describe("agentCommand LiveSessionModelSwitchError retry", () => {
return arg?.stream === "lifecycle" && arg?.data?.phase === "end";
});
expect(lifecycleEndCalls.length).toBeGreaterThanOrEqual(1);
const lifecycleFinishingCalls = state.emitAgentEventMock.mock.calls.filter(
(call: unknown[]) => {
const arg = call[0] as { stream?: string; data?: { phase?: string } };
return arg?.stream === "lifecycle" && arg?.data?.phase === "finishing";
},
);
expect(lifecycleFinishingCalls.length).toBeGreaterThanOrEqual(1);
expectRecordFields(mockCallArg(state.runAgentAttemptMock), {
deferTerminalLifecycleEnd: true,
});
const firstFinishingIndex = state.emitAgentEventMock.mock.calls.findIndex((call: unknown[]) => {
const arg = call[0] as { stream?: string; data?: { phase?: string } };
return arg?.stream === "lifecycle" && arg?.data?.phase === "finishing";
});
const lastEndIndex = state.emitAgentEventMock.mock.calls.findLastIndex((call: unknown[]) => {
const arg = call[0] as { stream?: string; data?: { phase?: string } };
return arg?.stream === "lifecycle" && arg?.data?.phase === "end";
});
expect(state.deliverAgentCommandResultMock).toHaveBeenCalledTimes(1);
const deliveryOrder = state.deliverAgentCommandResultMock.mock.invocationCallOrder[0] ?? 0;
expect(
state.emitAgentEventMock.mock.invocationCallOrder[firstFinishingIndex] ?? 0,
).toBeLessThan(deliveryOrder);
expect(deliveryOrder).toBeLessThan(
state.emitAgentEventMock.mock.invocationCallOrder[lastEndIndex] ?? 0,
);
});
it("clears stale flag-only pending final delivery when there is no final payload", async () => {
setupSingleAttemptFallback();
state.runAgentAttemptMock.mockResolvedValue(makeEmptyResult("openai", "gpt-5.4"));
const sessionEntry: SessionEntry = {
sessionId: "session-1",
updatedAt: 1,
pendingFinalDelivery: true,
pendingFinalDeliveryCreatedAt: 2,
pendingFinalDeliveryLastAttemptAt: 3,
pendingFinalDeliveryAttemptCount: 4,
pendingFinalDeliveryLastError: "previous failure",
pendingFinalDeliveryContext: { channel: "tui" },
pendingFinalDeliveryIntentId: "intent-1",
};
state.sessionEntryMock = sessionEntry;
state.sessionStoreMock = { "agent:main": sessionEntry };
state.storePathMock = "/tmp/openclaw-sessions.json";
state.deliverAgentCommandResultMock.mockResolvedValue(undefined);
await agentCommand({
message: "hello",
to: "+1234567890",
deliver: true,
});
const stored = (state.sessionStoreMock as Record<string, SessionEntry>)["agent:main"];
expect(stored?.pendingFinalDelivery).toBeUndefined();
expect(stored?.pendingFinalDeliveryText).toBeUndefined();
expect(stored?.pendingFinalDeliveryCreatedAt).toBeUndefined();
expect(stored?.pendingFinalDeliveryLastAttemptAt).toBeUndefined();
expect(stored?.pendingFinalDeliveryAttemptCount).toBeUndefined();
expect(stored?.pendingFinalDeliveryLastError).toBeUndefined();
expect(stored?.pendingFinalDeliveryContext).toBeUndefined();
expect(stored?.pendingFinalDeliveryIntentId).toBeUndefined();
});
it("does not duplicate finishing lifecycle when an attempt already emitted finishing", async () => {
setupModelSwitchRetry({
provider: "openai",
model: "gpt-5.4",
});
state.runAgentAttemptMock.mockImplementation(async (attemptParams: unknown) => {
state.emitAgentEventMock({
runId: "run-live-switch",
stream: "lifecycle",
data: { phase: "finishing" },
});
(attemptParams as { onAgentEvent?: (evt: unknown) => void }).onAgentEvent?.({
stream: "lifecycle",
data: { phase: "finishing" },
});
return makeSuccessResult("openai", "gpt-5.4");
});
await runBasicAgentCommand();
const lifecycleFinishingCalls = state.emitAgentEventMock.mock.calls.filter(
(call: unknown[]) => {
const arg = call[0] as { stream?: string; data?: { phase?: string } };
return arg?.stream === "lifecycle" && arg?.data?.phase === "finishing";
},
);
expect(lifecycleFinishingCalls).toHaveLength(1);
});
it("validates explicit thinking against configured model compat without an allowlist", async () => {

View File

@@ -273,6 +273,21 @@ async function persistSessionEntry(
});
}
function clearPendingFinalDeliveryFields(entry: SessionEntry, updatedAt: number): SessionEntry {
return {
...entry,
pendingFinalDelivery: undefined,
pendingFinalDeliveryText: undefined,
pendingFinalDeliveryCreatedAt: undefined,
pendingFinalDeliveryLastAttemptAt: undefined,
pendingFinalDeliveryAttemptCount: undefined,
pendingFinalDeliveryLastError: undefined,
pendingFinalDeliveryContext: undefined,
pendingFinalDeliveryIntentId: undefined,
updatedAt,
};
}
function containsControlCharacters(value: string): boolean {
for (const char of value) {
const code = char.codePointAt(0);
@@ -1086,9 +1101,70 @@ async function agentCommandInternal(
const startedAt = Date.now();
const attemptLifecycleState = {
currentTurnUserMessagePersisted: false,
lifecycleFinishing: false,
lifecycleEnded: false,
};
const attemptLifecycleCallbacks = createAgentAttemptLifecycleCallbacks(attemptLifecycleState);
let lifecycleFinishingEmitted = false;
const emitLifecycleFinishing = (runResult: AgentAttemptResult) => {
if (
attemptLifecycleState.lifecycleEnded ||
attemptLifecycleState.lifecycleFinishing ||
lifecycleFinishingEmitted
) {
return;
}
lifecycleFinishingEmitted = true;
attemptLifecycleState.lifecycleFinishing = true;
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "finishing",
startedAt,
endedAt: Date.now(),
aborted: runResult.meta.aborted ?? false,
stopReason: runResult.meta.stopReason,
},
});
};
const emitLifecycleEnd = (runResult: AgentAttemptResult) => {
if (attemptLifecycleState.lifecycleEnded) {
return;
}
attemptLifecycleState.lifecycleEnded = true;
const stopReason = runResult.meta.stopReason;
if (stopReason && stopReason !== "end_turn") {
console.error(`[agent] run ${runId} ended with stopReason=${stopReason}`);
}
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "end",
startedAt,
endedAt: Date.now(),
aborted: runResult.meta.aborted ?? false,
stopReason,
},
});
};
const emitLifecyclePostTurnError = (error: unknown) => {
if (attemptLifecycleState.lifecycleEnded) {
return;
}
attemptLifecycleState.lifecycleEnded = true;
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "error",
startedAt,
endedAt: Date.now(),
error: error instanceof Error ? error.message : "Agent run failed",
},
});
};
const attemptExecutionRuntime = await loadAttemptExecutionRuntime();
const runContext = resolveAgentRunContext(opts);
const messageChannel = resolveMessageChannel(
@@ -1224,6 +1300,7 @@ async function agentCommandInternal(
(isFallbackRetry && attemptLifecycleState.currentTurnUserMessagePersisted),
onUserMessagePersisted: attemptLifecycleCallbacks.onUserMessagePersisted,
onAgentEvent: attemptLifecycleCallbacks.onAgentEvent,
deferTerminalLifecycleEnd: true,
});
},
});
@@ -1285,23 +1362,7 @@ async function agentCommandInternal(
},
};
}
if (!attemptLifecycleState.lifecycleEnded) {
const stopReason = result.meta.stopReason;
if (stopReason && stopReason !== "end_turn") {
console.error(`[agent] run ${runId} ended with stopReason=${stopReason}`);
}
emitAgentEvent({
runId,
stream: "lifecycle",
data: {
phase: "end",
startedAt,
endedAt: Date.now(),
aborted: result.meta.aborted ?? false,
stopReason,
},
});
}
emitLifecycleFinishing(result);
break;
} catch (err) {
if (err instanceof LiveSessionModelSwitchError) {
@@ -1401,181 +1462,190 @@ async function agentCommandInternal(
throw err;
}
}
await fallbackTrajectoryRecorder?.flush();
try {
await fallbackTrajectoryRecorder?.flush();
// Update token+model fields in the session store.
if (sessionStore && sessionKey) {
const { updateSessionStoreAfterAgentRun } = await loadSessionStoreRuntime();
await updateSessionStoreAfterAgentRun({
cfg,
contextTokensOverride: agentCfg?.contextTokens,
sessionId,
sessionKey,
storePath,
sessionStore,
defaultProvider: provider,
defaultModel: model,
fallbackProvider,
fallbackModel,
result,
touchInteraction:
opts.bootstrapContextRunKind !== "cron" &&
opts.bootstrapContextRunKind !== "heartbeat" &&
!opts.internalEvents?.length,
preserveRuntimeModel: opts.bootstrapContextRunKind === "heartbeat",
});
sessionEntry = sessionStore[sessionKey] ?? sessionEntry;
}
const transcriptPersistenceRunner = result.meta.executionTrace?.runner;
const embeddedAssistantGapFill =
transcriptPersistenceRunner === "embedded" ||
(transcriptPersistenceRunner === undefined &&
Boolean(result.meta.finalAssistantVisibleText?.trim()));
if (transcriptPersistenceRunner === "cli" || embeddedAssistantGapFill) {
try {
sessionEntry = await attemptExecutionRuntime.persistCliTurnTranscript({
body,
transcriptBody,
result,
sessionId,
sessionKey: sessionKey ?? sessionId,
sessionEntry,
sessionStore,
storePath,
sessionAgentId,
threadId: opts.threadId,
sessionCwd: workspaceDir,
config: cfg,
embeddedAssistantGapFill,
});
sessionEntry = await (
await loadCliCompactionRuntime()
).runCliTurnCompactionLifecycle({
// Update token+model fields in the session store.
if (sessionStore && sessionKey) {
const { updateSessionStoreAfterAgentRun } = await loadSessionStoreRuntime();
await updateSessionStoreAfterAgentRun({
cfg,
contextTokensOverride: agentCfg?.contextTokens,
sessionId,
sessionKey: sessionKey ?? sessionId,
sessionEntry,
sessionStore,
storePath,
sessionAgentId,
workspaceDir,
agentDir,
provider: result.meta.agentMeta?.provider ?? provider,
model: result.meta.agentMeta?.model ?? model,
skillsSnapshot,
messageChannel,
agentAccountId: runContext.accountId,
thinkLevel: resolvedThinkLevel,
extraSystemPrompt: opts.extraSystemPrompt,
});
} catch (error) {
log.warn(
`Turn transcript persistence failed for ${sessionKey ?? sessionId}: ${error instanceof Error ? error.message : String(error)}`,
);
}
}
const payloads = result.payloads ?? [];
// Phase 2: Persist pending final delivery for main sessions before attempting delivery.
// This ensures that if the process restarts during delivery, the payload is durable.
if (
opts.deliver === true &&
sessionStore &&
sessionKey &&
payloads.length > 0 &&
!isSubagentSessionKey(sessionKey)
) {
const now = Date.now();
const combinedPayload = sanitizePendingFinalDeliveryText(
payloads
.map((p) => (typeof p.text === "string" ? p.text : ""))
.filter(Boolean)
.join("\n\n"),
);
if (combinedPayload) {
const entry = sessionStore[sessionKey] ?? sessionEntry;
const next: SessionEntry = {
...entry,
pendingFinalDelivery: true,
pendingFinalDeliveryText: combinedPayload,
pendingFinalDeliveryCreatedAt: now,
updatedAt: now,
};
await persistSessionEntry({
sessionStore,
sessionKey,
storePath,
entry: next,
sessionStore,
defaultProvider: provider,
defaultModel: model,
fallbackProvider,
fallbackModel,
result,
touchInteraction:
opts.bootstrapContextRunKind !== "cron" &&
opts.bootstrapContextRunKind !== "heartbeat" &&
!opts.internalEvents?.length,
preserveRuntimeModel: opts.bootstrapContextRunKind === "heartbeat",
});
sessionEntry = next;
sessionEntry = sessionStore[sessionKey] ?? sessionEntry;
}
}
const { deliverAgentCommandResult } = await loadDeliveryRuntime();
const resolveFreshSessionEntryForDelivery =
sessionStore && sessionKey
? async (): Promise<SessionEntry | undefined> => {
const { loadSessionStore } = await loadSessionStoreRuntime();
const freshStore = loadSessionStore(storePath, {
skipCache: true,
clone: false,
});
const freshEntry = freshStore[sessionKey];
if (!freshEntry || freshEntry.sessionId !== sessionId) {
return undefined;
const transcriptPersistenceRunner = result.meta.executionTrace?.runner;
const embeddedAssistantGapFill =
transcriptPersistenceRunner === "embedded" ||
(transcriptPersistenceRunner === undefined &&
Boolean(result.meta.finalAssistantVisibleText?.trim()));
if (transcriptPersistenceRunner === "cli" || embeddedAssistantGapFill) {
let persistedCliTurnTranscript = false;
try {
sessionEntry = await attemptExecutionRuntime.persistCliTurnTranscript({
body,
transcriptBody,
result,
sessionId,
sessionKey: sessionKey ?? sessionId,
sessionEntry,
sessionStore,
storePath,
sessionAgentId,
threadId: opts.threadId,
sessionCwd: workspaceDir,
config: cfg,
embeddedAssistantGapFill,
});
persistedCliTurnTranscript = true;
} catch (error) {
log.warn(
`Turn transcript persistence failed for ${sessionKey ?? sessionId}: ${error instanceof Error ? error.message : String(error)}`,
);
}
if (persistedCliTurnTranscript) {
sessionEntry = await (
await loadCliCompactionRuntime()
).runCliTurnCompactionLifecycle({
cfg,
sessionId,
sessionKey: sessionKey ?? sessionId,
sessionEntry,
sessionStore,
storePath,
sessionAgentId,
workspaceDir,
agentDir,
provider: result.meta.agentMeta?.provider ?? provider,
model: result.meta.agentMeta?.model ?? model,
skillsSnapshot,
messageChannel,
agentAccountId: runContext.accountId,
senderIsOwner: opts.senderIsOwner,
thinkLevel: resolvedThinkLevel,
extraSystemPrompt: opts.extraSystemPrompt,
});
}
}
const payloads = result.payloads ?? [];
let pendingFinalDeliveryTextForThisRun: string | undefined;
// Phase 2: Persist pending final delivery for main sessions before attempting delivery.
// This ensures that if the process restarts during delivery, the payload is durable.
if (
opts.deliver === true &&
sessionStore &&
sessionKey &&
payloads.length > 0 &&
!isSubagentSessionKey(sessionKey)
) {
const now = Date.now();
const combinedPayload = sanitizePendingFinalDeliveryText(
payloads
.map((p) => (typeof p.text === "string" ? p.text : ""))
.filter(Boolean)
.join("\n\n"),
);
pendingFinalDeliveryTextForThisRun = combinedPayload || undefined;
if (combinedPayload) {
const entry = sessionStore[sessionKey] ?? sessionEntry;
const next: SessionEntry = {
...entry,
pendingFinalDelivery: true,
pendingFinalDeliveryText: combinedPayload,
pendingFinalDeliveryCreatedAt: now,
updatedAt: now,
};
await persistSessionEntry({
sessionStore,
sessionKey,
storePath,
entry: next,
});
sessionEntry = next;
}
}
const { deliverAgentCommandResult } = await loadDeliveryRuntime();
const resolveFreshSessionEntryForDelivery =
sessionStore && sessionKey
? async (): Promise<SessionEntry | undefined> => {
const { loadSessionStore } = await loadSessionStoreRuntime();
const freshStore = loadSessionStore(storePath, {
skipCache: true,
clone: false,
});
const freshEntry = freshStore[sessionKey];
if (!freshEntry || freshEntry.sessionId !== sessionId) {
return undefined;
}
sessionStore[sessionKey] = freshEntry;
return freshEntry;
}
sessionStore[sessionKey] = freshEntry;
return freshEntry;
}
: undefined;
const deliveryParams = {
cfg,
deps: resolvedDeps,
runtime,
opts,
outboundSession,
sessionEntry,
result,
payloads,
};
const deliveryResult = await deliverAgentCommandResult(
resolveFreshSessionEntryForDelivery
? {
...deliveryParams,
expectedSessionIdForFreshDelivery: sessionId,
resolveFreshSessionEntryForDelivery,
}
: deliveryParams,
);
// Phase 2: Clear pending delivery payload after successful delivery.
if (
deliveryResult?.deliverySucceeded === true &&
sessionStore &&
sessionKey &&
!isSubagentSessionKey(sessionKey)
) {
const entry = sessionStore[sessionKey] ?? sessionEntry;
const next: SessionEntry = {
...entry,
pendingFinalDelivery: undefined,
pendingFinalDeliveryText: undefined,
pendingFinalDeliveryCreatedAt: undefined,
updatedAt: Date.now(),
: undefined;
const deliveryParams = {
cfg,
deps: resolvedDeps,
runtime,
opts,
outboundSession,
sessionEntry,
result,
payloads,
};
await persistSessionEntry({
sessionStore,
sessionKey,
storePath,
entry: next,
});
sessionEntry = next;
}
const deliveryResult = await deliverAgentCommandResult(
resolveFreshSessionEntryForDelivery
? {
...deliveryParams,
expectedSessionIdForFreshDelivery: sessionId,
resolveFreshSessionEntryForDelivery,
}
: deliveryParams,
);
return deliveryResult;
// Phase 2: Clear pending delivery payload after successful delivery.
if (sessionStore && sessionKey && !isSubagentSessionKey(sessionKey)) {
const entry = sessionStore[sessionKey] ?? sessionEntry;
const noPendingTextForThisRun =
opts.deliver === true &&
pendingFinalDeliveryTextForThisRun === undefined &&
entry.pendingFinalDelivery === true &&
!entry.pendingFinalDeliveryText;
if (deliveryResult?.deliverySucceeded === true || noPendingTextForThisRun) {
const next = clearPendingFinalDeliveryFields(entry, Date.now());
await persistSessionEntry({
sessionStore,
sessionKey,
storePath,
entry: next,
});
sessionEntry = next;
}
}
emitLifecycleEnd(result);
return deliveryResult;
} catch (error) {
emitLifecyclePostTurnError(error);
throw error;
}
} finally {
clearAgentRunContext(runId);
}

View File

@@ -3,7 +3,11 @@ import { createAgentAttemptLifecycleCallbacks } from "./attempt-callbacks.js";
describe("createAgentAttemptLifecycleCallbacks", () => {
it("tracks user-message persistence without closing over the agent command scope", () => {
const state = { currentTurnUserMessagePersisted: false, lifecycleEnded: false };
const state = {
currentTurnUserMessagePersisted: false,
lifecycleFinishing: false,
lifecycleEnded: false,
};
const callbacks = createAgentAttemptLifecycleCallbacks(state);
callbacks.onUserMessagePersisted?.({
@@ -17,7 +21,11 @@ describe("createAgentAttemptLifecycleCallbacks", () => {
});
it("tracks terminal lifecycle phases", () => {
const state = { currentTurnUserMessagePersisted: false, lifecycleEnded: false };
const state = {
currentTurnUserMessagePersisted: false,
lifecycleFinishing: false,
lifecycleEnded: false,
};
const callbacks = createAgentAttemptLifecycleCallbacks(state);
callbacks.onAgentEvent({ stream: "lifecycle", data: { phase: "start" } });

View File

@@ -2,6 +2,7 @@ import type { AgentMessage } from "@earendil-works/pi-agent-core";
export type AgentAttemptLifecycleState = {
currentTurnUserMessagePersisted: boolean;
lifecycleFinishing: boolean;
lifecycleEnded: boolean;
};
@@ -20,11 +21,14 @@ export function createAgentAttemptLifecycleCallbacks(state: AgentAttemptLifecycl
state.currentTurnUserMessagePersisted = true;
},
onAgentEvent: (evt) => {
if (
evt.stream === "lifecycle" &&
typeof evt.data?.phase === "string" &&
(evt.data.phase === "end" || evt.data.phase === "error")
) {
if (evt.stream !== "lifecycle" || typeof evt.data?.phase !== "string") {
return;
}
if (evt.data.phase === "finishing") {
state.lifecycleFinishing = true;
return;
}
if (evt.data.phase === "end" || evt.data.phase === "error") {
state.lifecycleEnded = true;
}
},

View File

@@ -393,6 +393,7 @@ export function runAgentAttempt(params: {
data?: Record<string, unknown>;
sessionKey?: string;
}) => void;
deferTerminalLifecycleEnd?: boolean;
authProfileProvider: string;
sessionStore?: Record<string, SessionEntry>;
storePath?: string;
@@ -684,6 +685,7 @@ export function runAgentAttempt(params: {
promptMode: params.opts.promptMode,
disableTools: params.opts.modelRun === true,
onAgentEvent: params.onAgentEvent,
deferTerminalLifecycleEnd: params.deferTerminalLifecycleEnd,
suppressNextUserMessagePersistence: params.suppressPromptPersistenceOnRetry === true,
onUserMessagePersisted: params.onUserMessagePersisted,
bootstrapPromptWarningSignaturesSeen,

View File

@@ -177,6 +177,665 @@ describe("runCliTurnCompactionLifecycle", () => {
expect(updatedEntry?.claudeCliSessionId).toBeUndefined();
});
it("routes OpenAI Codex harness CLI compaction through native harness compaction", async () => {
const sessionKey = "agent:main:codex";
const sessionId = "session-codex";
const sessionFile = path.join(tmpDir, "session-codex.jsonl");
const storePath = path.join(tmpDir, "sessions-codex.json");
await writeSessionFile({ sessionFile, sessionId });
const sessionEntry: SessionEntry = {
sessionId,
updatedAt: Date.now(),
sessionFile,
contextTokens: 1_000,
totalTokens: 950,
totalTokensFresh: true,
agentHarnessId: "codex",
};
const sessionStore: Record<string, SessionEntry> = { [sessionKey]: sessionEntry };
await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2), "utf-8");
const compactCalls: Array<Parameters<ContextEngine["compact"]>[0]> = [];
const contextEngine = buildContextEngine({ compactCalls });
const resolveContextEngine = vi.fn(async () => contextEngine);
const ensureSelectedAgentHarnessPlugin = vi.fn(async () => undefined);
const compactAgentHarnessSession = vi.fn(async () => ({
ok: true,
compacted: true,
result: { tokensBefore: 950, tokensAfter: 100 },
}));
const applyPiAutoCompactionGuard = vi.fn(async () => ({
supported: true,
disabled: false,
}));
const recordCliCompactionInStore = vi.fn(async () => ({
...sessionEntry,
compactionCount: 1,
}));
setCliCompactionTestDeps({
resolveContextEngine,
ensureSelectedAgentHarnessPlugin,
maybeCompactAgentHarnessSession: compactAgentHarnessSession as never,
createPreparedEmbeddedPiSettingsManager: async () => ({
getCompactionReserveTokens: () => 200,
getCompactionKeepRecentTokens: () => 0,
applyOverrides: () => {},
}),
shouldPreemptivelyCompactBeforePrompt: () => ({
route: "fits",
shouldCompact: false,
estimatedPromptTokens: 600,
promptBudgetBeforeReserve: 800,
overflowTokens: 0,
toolResultReducibleChars: 0,
effectiveReserveTokens: 200,
}),
resolveLiveToolResultMaxChars: () => 20_000,
applyPiAutoCompactionGuard,
recordCliCompactionInStore,
});
const updatedEntry = await runCliTurnCompactionLifecycle({
cfg: {} as OpenClawConfig,
sessionId,
sessionKey,
sessionEntry,
sessionStore,
storePath,
sessionAgentId: "main",
workspaceDir: tmpDir,
agentDir: tmpDir,
provider: "openai",
model: "gpt-5.5",
});
expect(resolveContextEngine).toHaveBeenCalledTimes(1);
expect(applyPiAutoCompactionGuard).toHaveBeenCalledWith(
expect.objectContaining({
contextEngineInfo: contextEngine.info,
}),
);
expect(ensureSelectedAgentHarnessPlugin).toHaveBeenCalledWith(
expect.objectContaining({
provider: "openai",
modelId: "gpt-5.5",
sessionKey,
agentHarnessRuntimeOverride: "codex",
}),
);
expect(applyPiAutoCompactionGuard.mock.invocationCallOrder[0] ?? 0).toBeLessThan(
compactAgentHarnessSession.mock.invocationCallOrder[0] ?? 0,
);
expect(compactAgentHarnessSession).toHaveBeenCalledTimes(1);
const compactAgentHarnessSessionCalls = compactAgentHarnessSession.mock
.calls as unknown as Array<[Record<string, unknown>]>;
expect(compactAgentHarnessSessionCalls[0]?.[0]).toMatchObject({
sessionId,
sessionKey,
sessionFile,
provider: "openai",
model: "gpt-5.5",
contextTokenBudget: 1_000,
currentTokenCount: 950,
contextEngine,
agentHarnessId: "codex",
trigger: "budget",
force: true,
});
expect(compactCalls).toHaveLength(0);
expect(recordCliCompactionInStore).toHaveBeenCalledTimes(1);
expect(recordCliCompactionInStore).toHaveBeenCalledWith(
expect.objectContaining({
provider: "openai",
sessionKey,
tokensAfter: 100,
}),
);
expect(updatedEntry?.compactionCount).toBe(1);
});
it("ignores stale native harness ids when the active provider no longer matches", async () => {
const sessionKey = "agent:main:pi-after-codex";
const sessionId = "session-pi-after-codex";
const sessionFile = path.join(tmpDir, "session-pi-after-codex.jsonl");
const storePath = path.join(tmpDir, "sessions-pi-after-codex.json");
await writeSessionFile({ sessionFile, sessionId });
const sessionEntry: SessionEntry = {
sessionId,
updatedAt: Date.now(),
sessionFile,
contextTokens: 1_000,
totalTokens: 950,
totalTokensFresh: true,
agentHarnessId: "codex",
};
const sessionStore: Record<string, SessionEntry> = { [sessionKey]: sessionEntry };
await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2), "utf-8");
const compactCalls: Array<Parameters<ContextEngine["compact"]>[0]> = [];
const compactAgentHarnessSession = vi.fn();
setCliCompactionTestDeps({
resolveContextEngine: async () => buildContextEngine({ compactCalls }),
maybeCompactAgentHarnessSession: compactAgentHarnessSession as never,
createPreparedEmbeddedPiSettingsManager: async () => ({
getCompactionReserveTokens: () => 200,
getCompactionKeepRecentTokens: () => 0,
applyOverrides: () => {},
}),
shouldPreemptivelyCompactBeforePrompt: () => ({
route: "fits",
shouldCompact: false,
estimatedPromptTokens: 600,
promptBudgetBeforeReserve: 800,
overflowTokens: 0,
toolResultReducibleChars: 0,
effectiveReserveTokens: 200,
}),
resolveLiveToolResultMaxChars: () => 20_000,
runContextEngineMaintenance: vi.fn(async () => ({
changed: false,
bytesFreed: 0,
rewrittenEntries: 0,
})),
});
await runCliTurnCompactionLifecycle({
cfg: {} as OpenClawConfig,
sessionId,
sessionKey,
sessionEntry,
sessionStore,
storePath,
sessionAgentId: "main",
workspaceDir: tmpDir,
agentDir: tmpDir,
provider: "pi",
model: "sonnet-4.6",
});
expect(compactAgentHarnessSession).not.toHaveBeenCalled();
expect(compactCalls).toHaveLength(1);
});
it("surfaces nonrecoverable native harness CLI compaction failures", async () => {
const sessionKey = "agent:main:codex-native-failure";
const sessionId = "session-codex-native-failure";
const sessionFile = path.join(tmpDir, "session-codex-native-failure.jsonl");
const storePath = path.join(tmpDir, "sessions-codex-native-failure.json");
await writeSessionFile({ sessionFile, sessionId });
const sessionEntry: SessionEntry = {
sessionId,
updatedAt: Date.now(),
sessionFile,
contextTokens: 1_000,
totalTokens: 950,
totalTokensFresh: true,
agentHarnessId: "codex",
};
const sessionStore: Record<string, SessionEntry> = { [sessionKey]: sessionEntry };
await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2), "utf-8");
const compactCalls: Array<Parameters<ContextEngine["compact"]>[0]> = [];
const ensureSelectedAgentHarnessPlugin = vi.fn(async () => undefined);
const compactAgentHarnessSession = vi.fn(async () => ({
ok: false,
compacted: false,
reason: "timed out waiting for codex app-server compaction",
}));
const recordCliCompactionInStore = vi.fn();
setCliCompactionTestDeps({
resolveContextEngine: async () => buildContextEngine({ compactCalls }),
ensureSelectedAgentHarnessPlugin,
maybeCompactAgentHarnessSession: compactAgentHarnessSession as never,
createPreparedEmbeddedPiSettingsManager: async () => ({
getCompactionReserveTokens: () => 200,
getCompactionKeepRecentTokens: () => 0,
applyOverrides: () => {},
}),
shouldPreemptivelyCompactBeforePrompt: () => ({
route: "fits",
shouldCompact: false,
estimatedPromptTokens: 600,
promptBudgetBeforeReserve: 800,
overflowTokens: 0,
toolResultReducibleChars: 0,
effectiveReserveTokens: 200,
}),
resolveLiveToolResultMaxChars: () => 20_000,
recordCliCompactionInStore,
});
await expect(
runCliTurnCompactionLifecycle({
cfg: {} as OpenClawConfig,
sessionId,
sessionKey,
sessionEntry,
sessionStore,
storePath,
sessionAgentId: "main",
workspaceDir: tmpDir,
agentDir: tmpDir,
provider: "codex",
model: "gpt-5.5",
}),
).rejects.toThrow(
"CLI native harness compaction failed for codex/gpt-5.5: timed out waiting for codex app-server compaction",
);
expect(compactAgentHarnessSession).toHaveBeenCalledTimes(1);
expect(compactCalls).toHaveLength(0);
expect(recordCliCompactionInStore).not.toHaveBeenCalled();
});
it("does not fall back when native harness compaction returns no result", async () => {
const sessionKey = "agent:main:codex-native-empty";
const sessionId = "session-codex-native-empty";
const sessionFile = path.join(tmpDir, "session-codex-native-empty.jsonl");
const storePath = path.join(tmpDir, "sessions-codex-native-empty.json");
await writeSessionFile({ sessionFile, sessionId });
const sessionEntry: SessionEntry = {
sessionId,
updatedAt: Date.now(),
sessionFile,
contextTokens: 1_000,
totalTokens: 950,
totalTokensFresh: true,
agentHarnessId: "codex",
};
const sessionStore: Record<string, SessionEntry> = { [sessionKey]: sessionEntry };
await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2), "utf-8");
const compactCalls: Array<Parameters<ContextEngine["compact"]>[0]> = [];
setCliCompactionTestDeps({
resolveContextEngine: async () => buildContextEngine({ compactCalls }),
ensureSelectedAgentHarnessPlugin: vi.fn(async () => undefined),
maybeCompactAgentHarnessSession: vi.fn(async () => undefined) as never,
createPreparedEmbeddedPiSettingsManager: async () => ({
getCompactionReserveTokens: () => 200,
getCompactionKeepRecentTokens: () => 0,
applyOverrides: () => {},
}),
shouldPreemptivelyCompactBeforePrompt: () => ({
route: "fits",
shouldCompact: false,
estimatedPromptTokens: 600,
promptBudgetBeforeReserve: 800,
overflowTokens: 0,
toolResultReducibleChars: 0,
effectiveReserveTokens: 200,
}),
resolveLiveToolResultMaxChars: () => 20_000,
});
await expect(
runCliTurnCompactionLifecycle({
cfg: {} as OpenClawConfig,
sessionId,
sessionKey,
sessionEntry,
sessionStore,
storePath,
sessionAgentId: "main",
workspaceDir: tmpDir,
agentDir: tmpDir,
provider: "codex",
model: "gpt-5.5",
}),
).rejects.toThrow(
"CLI native harness compaction failed for codex/gpt-5.5: native harness compaction did not reduce context",
);
expect(compactCalls).toHaveLength(0);
});
it("passes owning context engines into native harness CLI compaction", async () => {
const sessionKey = "agent:main:codex-owned-engine";
const sessionId = "session-codex-owned-engine";
const sessionFile = path.join(tmpDir, "session-codex-owned-engine.jsonl");
const storePath = path.join(tmpDir, "sessions-codex-owned-engine.json");
await writeSessionFile({ sessionFile, sessionId });
const sessionEntry: SessionEntry = {
sessionId,
updatedAt: Date.now(),
sessionFile,
contextTokens: 1_000,
totalTokens: 950,
totalTokensFresh: true,
agentHarnessId: "codex",
};
const sessionStore: Record<string, SessionEntry> = { [sessionKey]: sessionEntry };
await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2), "utf-8");
const compactCalls: Array<Parameters<ContextEngine["compact"]>[0]> = [];
const contextEngine = {
...buildContextEngine({ compactCalls }),
info: {
id: "lossless-claw",
name: "Lossless Claw",
ownsCompaction: true,
},
} satisfies ContextEngine;
const ensureSelectedAgentHarnessPlugin = vi.fn(async () => undefined);
const compactAgentHarnessSession = vi.fn(async (compactParams) => {
expect(compactParams.contextEngine).toBe(contextEngine);
expect(compactParams.contextEngineRuntimeContext).toMatchObject({
currentTokenCount: 950,
tokenBudget: 1_000,
trigger: "cli_native_budget",
});
return {
ok: true,
compacted: true,
result: {
summary: "engine-owned",
firstKeptEntryId: "entry-1",
tokensBefore: 950,
tokensAfter: 42,
sessionId: "session-codex-owned-engine-rotated",
sessionFile: path.join(tmpDir, "session-codex-owned-engine-rotated.jsonl"),
},
};
});
const recordCliCompactionInStore = vi.fn(async () => ({
...sessionEntry,
compactionCount: 1,
}));
setCliCompactionTestDeps({
resolveContextEngine: async () => contextEngine,
ensureSelectedAgentHarnessPlugin,
maybeCompactAgentHarnessSession: compactAgentHarnessSession as never,
createPreparedEmbeddedPiSettingsManager: async () => ({
getCompactionReserveTokens: () => 200,
getCompactionKeepRecentTokens: () => 0,
applyOverrides: () => {},
}),
shouldPreemptivelyCompactBeforePrompt: () => ({
route: "fits",
shouldCompact: false,
estimatedPromptTokens: 600,
promptBudgetBeforeReserve: 800,
overflowTokens: 0,
toolResultReducibleChars: 0,
effectiveReserveTokens: 200,
}),
resolveLiveToolResultMaxChars: () => 20_000,
recordCliCompactionInStore,
});
await runCliTurnCompactionLifecycle({
cfg: {} as OpenClawConfig,
sessionId,
sessionKey,
sessionEntry,
sessionStore,
storePath,
sessionAgentId: "main",
workspaceDir: tmpDir,
agentDir: tmpDir,
provider: "codex",
model: "gpt-5.5",
});
expect(compactAgentHarnessSession).toHaveBeenCalledTimes(1);
expect(recordCliCompactionInStore).toHaveBeenCalledWith(
expect.objectContaining({
provider: "codex",
sessionKey,
tokensAfter: 42,
newSessionId: "session-codex-owned-engine-rotated",
newSessionFile: path.join(tmpDir, "session-codex-owned-engine-rotated.jsonl"),
}),
);
});
it("falls back to context-engine compaction when a pinned harness has no native compactor", async () => {
const sessionKey = "agent:main:external-harness";
const sessionId = "session-external-harness";
const sessionFile = path.join(tmpDir, "session-external-harness.jsonl");
const storePath = path.join(tmpDir, "sessions-external-harness.json");
await writeSessionFile({ sessionFile, sessionId });
const sessionEntry: SessionEntry = {
sessionId,
updatedAt: Date.now(),
sessionFile,
contextTokens: 1_000,
totalTokens: 950,
totalTokensFresh: true,
agentHarnessId: "external-harness",
};
const sessionStore: Record<string, SessionEntry> = { [sessionKey]: sessionEntry };
await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2), "utf-8");
const compactCalls: Array<Parameters<ContextEngine["compact"]>[0]> = [];
const ensureSelectedAgentHarnessPlugin = vi.fn(async () => undefined);
const compactAgentHarnessSession = vi.fn(async () => ({
ok: false,
compacted: false,
reason: 'Agent harness "external-harness" does not support compaction.',
failure: { reason: "unsupported_harness_compaction" },
}));
const maintenance = vi.fn(async () => ({ changed: false, bytesFreed: 0, rewrittenEntries: 0 }));
const recordCliCompactionInStore = vi.fn(async () => ({
...sessionEntry,
compactionCount: 1,
}));
setCliCompactionTestDeps({
resolveContextEngine: async () => buildContextEngine({ compactCalls }),
ensureSelectedAgentHarnessPlugin,
maybeCompactAgentHarnessSession: compactAgentHarnessSession as never,
createPreparedEmbeddedPiSettingsManager: async () => ({
getCompactionReserveTokens: () => 200,
getCompactionKeepRecentTokens: () => 0,
applyOverrides: () => {},
}),
shouldPreemptivelyCompactBeforePrompt: () => ({
route: "fits",
shouldCompact: false,
estimatedPromptTokens: 600,
promptBudgetBeforeReserve: 800,
overflowTokens: 0,
toolResultReducibleChars: 0,
effectiveReserveTokens: 200,
}),
resolveLiveToolResultMaxChars: () => 20_000,
runContextEngineMaintenance: maintenance,
recordCliCompactionInStore,
});
const updatedEntry = await runCliTurnCompactionLifecycle({
cfg: {} as OpenClawConfig,
sessionId,
sessionKey,
sessionEntry,
sessionStore,
storePath,
sessionAgentId: "main",
workspaceDir: tmpDir,
agentDir: tmpDir,
provider: "external-harness",
model: "model",
});
expect(compactAgentHarnessSession).toHaveBeenCalledTimes(1);
expect(compactCalls).toHaveLength(1);
expect(maintenance).toHaveBeenCalledTimes(1);
expect(recordCliCompactionInStore).toHaveBeenCalledWith(
expect.objectContaining({
provider: "external-harness",
sessionKey,
tokensAfter: undefined,
}),
);
expect(updatedEntry?.compactionCount).toBe(1);
});
it("falls back to context-engine compaction when Codex native binding is stale", async () => {
const sessionKey = "agent:main:codex-stale-binding";
const sessionId = "session-codex-stale-binding";
const sessionFile = path.join(tmpDir, "session-codex-stale-binding.jsonl");
const storePath = path.join(tmpDir, "sessions-codex-stale-binding.json");
await writeSessionFile({ sessionFile, sessionId });
const sessionEntry: SessionEntry = {
sessionId,
updatedAt: Date.now(),
sessionFile,
contextTokens: 1_000,
totalTokens: 950,
totalTokensFresh: true,
agentHarnessId: "codex",
};
const sessionStore: Record<string, SessionEntry> = { [sessionKey]: sessionEntry };
await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2), "utf-8");
const compactCalls: Array<Parameters<ContextEngine["compact"]>[0]> = [];
const ensureSelectedAgentHarnessPlugin = vi.fn(async () => undefined);
const compactAgentHarnessSession = vi.fn(async () => ({
ok: false,
compacted: false,
reason: "thread not found: thread-1",
failure: {
reason: "stale_thread_binding",
},
}));
const maintenance = vi.fn(async () => ({ changed: false, bytesFreed: 0, rewrittenEntries: 0 }));
const recordCliCompactionInStore = vi.fn(async () => ({
...sessionEntry,
compactionCount: 1,
}));
setCliCompactionTestDeps({
resolveContextEngine: async () => buildContextEngine({ compactCalls }),
ensureSelectedAgentHarnessPlugin,
maybeCompactAgentHarnessSession: compactAgentHarnessSession as never,
createPreparedEmbeddedPiSettingsManager: async () => ({
getCompactionReserveTokens: () => 200,
getCompactionKeepRecentTokens: () => 0,
applyOverrides: () => {},
}),
shouldPreemptivelyCompactBeforePrompt: () => ({
route: "fits",
shouldCompact: false,
estimatedPromptTokens: 600,
promptBudgetBeforeReserve: 800,
overflowTokens: 0,
toolResultReducibleChars: 0,
effectiveReserveTokens: 200,
}),
resolveLiveToolResultMaxChars: () => 20_000,
runContextEngineMaintenance: maintenance,
recordCliCompactionInStore,
});
const updatedEntry = await runCliTurnCompactionLifecycle({
cfg: {} as OpenClawConfig,
sessionId,
sessionKey,
sessionEntry,
sessionStore,
storePath,
sessionAgentId: "main",
workspaceDir: tmpDir,
agentDir: tmpDir,
provider: "codex",
model: "gpt-5.5",
});
expect(compactAgentHarnessSession).toHaveBeenCalledTimes(1);
expect(compactCalls).toHaveLength(1);
expect(maintenance).toHaveBeenCalledTimes(1);
expect(recordCliCompactionInStore).toHaveBeenCalledWith(
expect.objectContaining({
provider: "codex",
sessionKey,
tokensAfter: undefined,
}),
);
expect(updatedEntry?.compactionCount).toBe(1);
});
it("keeps successful context-engine fallback when post-compaction maintenance fails", async () => {
const sessionKey = "agent:main:codex-stale-maintenance";
const sessionId = "session-codex-stale-maintenance";
const sessionFile = path.join(tmpDir, "session-codex-stale-maintenance.jsonl");
const storePath = path.join(tmpDir, "sessions-codex-stale-maintenance.json");
await writeSessionFile({ sessionFile, sessionId });
const sessionEntry: SessionEntry = {
sessionId,
updatedAt: Date.now(),
sessionFile,
contextTokens: 1_000,
totalTokens: 950,
totalTokensFresh: true,
agentHarnessId: "codex",
};
const sessionStore: Record<string, SessionEntry> = { [sessionKey]: sessionEntry };
await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2), "utf-8");
const compactCalls: Array<Parameters<ContextEngine["compact"]>[0]> = [];
const maintenance = vi.fn(async () => {
throw new Error("maintenance rotated stale binding");
});
const recordCliCompactionInStore = vi.fn(async () => ({
...sessionEntry,
compactionCount: 1,
}));
setCliCompactionTestDeps({
resolveContextEngine: async () => buildContextEngine({ compactCalls }),
ensureSelectedAgentHarnessPlugin: vi.fn(async () => undefined),
maybeCompactAgentHarnessSession: vi.fn(async () => ({
ok: false,
compacted: false,
reason: "thread not found: thread-1",
failure: { reason: "stale_thread_binding" },
})) as never,
createPreparedEmbeddedPiSettingsManager: async () => ({
getCompactionReserveTokens: () => 200,
getCompactionKeepRecentTokens: () => 0,
applyOverrides: () => {},
}),
shouldPreemptivelyCompactBeforePrompt: () => ({
route: "fits",
shouldCompact: false,
estimatedPromptTokens: 600,
promptBudgetBeforeReserve: 800,
overflowTokens: 0,
toolResultReducibleChars: 0,
effectiveReserveTokens: 200,
}),
resolveLiveToolResultMaxChars: () => 20_000,
runContextEngineMaintenance: maintenance,
recordCliCompactionInStore,
});
const updatedEntry = await runCliTurnCompactionLifecycle({
cfg: {} as OpenClawConfig,
sessionId,
sessionKey,
sessionEntry,
sessionStore,
storePath,
sessionAgentId: "main",
workspaceDir: tmpDir,
agentDir: tmpDir,
provider: "codex",
model: "gpt-5.5",
});
expect(compactCalls).toHaveLength(1);
expect(maintenance).toHaveBeenCalledTimes(1);
expect(recordCliCompactionInStore).toHaveBeenCalledWith(
expect.objectContaining({ provider: "codex", sessionKey }),
);
expect(updatedEntry?.compactionCount).toBe(1);
});
it("initializes built-in context engines before resolving CLI compaction engine", async () => {
const sessionKey = "agent:main:cli";
const sessionId = "session-cli-init";
@@ -188,7 +847,7 @@ describe("runCliTurnCompactionLifecycle", () => {
updatedAt: Date.now(),
sessionFile,
contextTokens: 1_000,
totalTokens: 100,
totalTokens: 950,
totalTokensFresh: true,
};
const calls: string[] = [];
@@ -208,7 +867,7 @@ describe("runCliTurnCompactionLifecycle", () => {
shouldPreemptivelyCompactBeforePrompt: () => ({
route: "fits",
shouldCompact: false,
estimatedPromptTokens: 100,
estimatedPromptTokens: 600,
promptBudgetBeforeReserve: 800,
overflowTokens: 0,
toolResultReducibleChars: 0,
@@ -302,8 +961,11 @@ describe("runCliTurnCompactionLifecycle", () => {
model: "opus",
});
const rejection = expect(pending).rejects.toThrow(
"CLI transcript compaction failed for claude-cli/opus: Compaction timed out",
);
await vi.advanceTimersByTimeAsync(1_000);
const updatedEntry = await pending;
await rejection;
vi.useRealTimers();
expect(compactCalls).toHaveLength(1);
@@ -311,7 +973,8 @@ describe("runCliTurnCompactionLifecycle", () => {
expect(compactCalls[0]?.abortSignal?.aborted).toBe(true);
expect(maintenance).not.toHaveBeenCalled();
expect(recordCliCompactionInStore).not.toHaveBeenCalled();
expect(updatedEntry).toBe(sessionEntry);
expect(updatedEntry?.cliSessionBindings?.["claude-cli"]?.sessionId).toBe("claude-session");
expect(sessionStore[sessionKey]?.cliSessionBindings?.["claude-cli"]?.sessionId).toBe(
"claude-session",
);
});
});

View File

@@ -7,14 +7,18 @@ import { ensureContextEnginesInitialized as ensureContextEnginesInitializedImpl
import { resolveContextEngine as resolveContextEngineImpl } from "../../context-engine/registry.js";
import type { ContextEngine } from "../../context-engine/types.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
import { ensureSelectedAgentHarnessPlugin as ensureSelectedAgentHarnessPluginImpl } from "../harness/runtime-plugin.js";
import { maybeCompactAgentHarnessSession as maybeCompactAgentHarnessSessionImpl } from "../harness/selection.js";
import { buildEmbeddedCompactionRuntimeContext } from "../pi-embedded-runner/compaction-runtime-context.js";
import {
compactContextEngineWithSafetyTimeout,
compactWithSafetyTimeout,
resolveCompactionTimeoutMs,
} from "../pi-embedded-runner/compaction-safety-timeout.js";
import { runContextEngineMaintenance as runContextEngineMaintenanceImpl } from "../pi-embedded-runner/context-engine-maintenance.js";
import { shouldPreemptivelyCompactBeforePrompt as shouldPreemptivelyCompactBeforePromptImpl } from "../pi-embedded-runner/run/preemptive-compaction.js";
import { resolveLiveToolResultMaxChars as resolveLiveToolResultMaxCharsImpl } from "../pi-embedded-runner/tool-result-truncation.js";
import type { EmbeddedPiCompactResult } from "../pi-embedded-runner/types.js";
import { createPreparedEmbeddedPiSettingsManager as createPreparedEmbeddedPiSettingsManagerImpl } from "../pi-project-settings.js";
import {
applyPiAutoCompactionGuard as applyPiAutoCompactionGuardImpl,
@@ -53,9 +57,39 @@ type CliCompactionDeps = {
shouldPreemptivelyCompactBeforePrompt: typeof shouldPreemptivelyCompactBeforePromptImpl;
resolveLiveToolResultMaxChars: typeof resolveLiveToolResultMaxCharsImpl;
runContextEngineMaintenance: typeof runContextEngineMaintenanceImpl;
ensureSelectedAgentHarnessPlugin: typeof ensureSelectedAgentHarnessPluginImpl;
maybeCompactAgentHarnessSession: typeof maybeCompactAgentHarnessSessionImpl;
recordCliCompactionInStore: typeof recordCliCompactionInStoreImpl;
};
type NativeHarnessCliCompactionOutcome = {
compacted: boolean;
result?: EmbeddedPiCompactResult;
fallbackToContextEngine?: boolean;
failureReason?: string;
};
type CliTranscriptCompactionOutcome = {
compacted: boolean;
failureReason?: string;
};
type CliCompactionRuntimeContextParams = {
sessionKey: string;
messageChannel?: string;
agentAccountId?: string;
workspaceDir: string;
agentDir: string;
cfg: OpenClawConfig;
skillsSnapshot?: SkillSnapshot;
senderIsOwner?: boolean;
provider: string;
model: string;
thinkLevel?: Parameters<typeof buildEmbeddedCompactionRuntimeContext>[0]["thinkLevel"];
extraSystemPrompt?: string;
currentTokenCount: number;
contextTokenBudget: number;
trigger: string;
};
const log = createSubsystemLogger("agents/cli-compaction");
const cliCompactionDeps: CliCompactionDeps = {
@@ -67,6 +101,8 @@ const cliCompactionDeps: CliCompactionDeps = {
shouldPreemptivelyCompactBeforePrompt: shouldPreemptivelyCompactBeforePromptImpl,
resolveLiveToolResultMaxChars: resolveLiveToolResultMaxCharsImpl,
runContextEngineMaintenance: runContextEngineMaintenanceImpl,
ensureSelectedAgentHarnessPlugin: ensureSelectedAgentHarnessPluginImpl,
maybeCompactAgentHarnessSession: maybeCompactAgentHarnessSessionImpl,
recordCliCompactionInStore: recordCliCompactionInStoreImpl,
};
@@ -84,6 +120,8 @@ export function resetCliCompactionTestDeps(): void {
shouldPreemptivelyCompactBeforePrompt: shouldPreemptivelyCompactBeforePromptImpl,
resolveLiveToolResultMaxChars: resolveLiveToolResultMaxCharsImpl,
runContextEngineMaintenance: runContextEngineMaintenanceImpl,
ensureSelectedAgentHarnessPlugin: ensureSelectedAgentHarnessPluginImpl,
maybeCompactAgentHarnessSession: maybeCompactAgentHarnessSessionImpl,
recordCliCompactionInStore: recordCliCompactionInStoreImpl,
});
}
@@ -111,6 +149,67 @@ function resolveSessionTokenSnapshot(sessionEntry: SessionEntry | undefined): nu
);
}
function isNativeHarnessCompactionSession(
sessionEntry: SessionEntry | undefined,
provider: string,
): sessionEntry is SessionEntry {
const harnessId = sessionEntry?.agentHarnessId?.trim().toLowerCase();
if (!harnessId || harnessId === "pi") {
return false;
}
const providerId = provider.trim().toLowerCase();
return (
harnessId === providerId ||
(harnessId === "codex" &&
(providerId === "codex" || providerId === "openai" || providerId === "openai-codex"))
);
}
function isUnsupportedNativeHarnessCompaction(
result: EmbeddedPiCompactResult | undefined,
): boolean {
return result?.ok === false && result.failure?.reason === "unsupported_harness_compaction";
}
function isRecoverableNativeHarnessCompactionFailure(
result: EmbeddedPiCompactResult | undefined,
): boolean {
return (
result?.ok === false &&
(result.failure?.reason === "missing_thread_binding" ||
result.failure?.reason === "stale_thread_binding")
);
}
function readAgentIdFromSessionKey(sessionKey: string): string | undefined {
const parts = sessionKey.trim().split(":");
return parts[0] === "agent" && parts[1]?.trim() ? parts[1].trim() : undefined;
}
function buildCliCompactionRuntimeContext(params: CliCompactionRuntimeContextParams) {
return {
...buildEmbeddedCompactionRuntimeContext({
sessionKey: params.sessionKey,
messageChannel: params.messageChannel,
messageProvider: params.messageChannel,
agentAccountId: params.agentAccountId,
authProfileId: undefined,
workspaceDir: params.workspaceDir,
agentDir: params.agentDir,
config: params.cfg,
skillsSnapshot: params.skillsSnapshot,
senderIsOwner: params.senderIsOwner,
provider: params.provider,
modelId: params.model,
thinkLevel: params.thinkLevel,
extraSystemPrompt: params.extraSystemPrompt,
}),
currentTokenCount: params.currentTokenCount,
tokenBudget: params.contextTokenBudget,
trigger: params.trigger,
};
}
async function compactCliTranscript(params: {
contextEngine: ContextEngine;
sessionId: string;
@@ -127,29 +226,28 @@ async function compactCliTranscript(params: {
skillsSnapshot?: SkillSnapshot;
messageChannel?: string;
agentAccountId?: string;
senderIsOwner?: boolean;
thinkLevel?: Parameters<typeof buildEmbeddedCompactionRuntimeContext>[0]["thinkLevel"];
extraSystemPrompt?: string;
}) {
const runtimeContext = {
...buildEmbeddedCompactionRuntimeContext({
sessionKey: params.sessionKey,
messageChannel: params.messageChannel,
messageProvider: params.messageChannel,
agentAccountId: params.agentAccountId,
authProfileId: undefined,
workspaceDir: params.workspaceDir,
agentDir: params.agentDir,
config: params.cfg,
skillsSnapshot: params.skillsSnapshot,
provider: params.provider,
modelId: params.model,
thinkLevel: params.thinkLevel,
extraSystemPrompt: params.extraSystemPrompt,
}),
bestEffortMaintenance?: boolean;
}): Promise<CliTranscriptCompactionOutcome> {
const runtimeContext = buildCliCompactionRuntimeContext({
sessionKey: params.sessionKey,
messageChannel: params.messageChannel,
agentAccountId: params.agentAccountId,
workspaceDir: params.workspaceDir,
agentDir: params.agentDir,
cfg: params.cfg,
skillsSnapshot: params.skillsSnapshot,
senderIsOwner: params.senderIsOwner,
provider: params.provider,
model: params.model,
thinkLevel: params.thinkLevel,
extraSystemPrompt: params.extraSystemPrompt,
currentTokenCount: params.currentTokenCount,
tokenBudget: params.contextTokenBudget,
contextTokenBudget: params.contextTokenBudget,
trigger: "cli_budget",
};
});
let compactResult: Awaited<ReturnType<typeof params.contextEngine.compact>>;
try {
@@ -171,27 +269,151 @@ async function compactCliTranscript(params: {
log.warn(
`CLI transcript compaction failed for ${params.provider}/${params.model}: ${error instanceof Error ? error.message : String(error)}`,
);
return false;
return {
compacted: false,
failureReason: error instanceof Error ? error.message : String(error),
};
}
if (!compactResult.compacted) {
log.warn(
`CLI transcript compaction did not reduce context for ${params.provider}/${params.model}: ${compactResult.reason ?? "nothing to compact"}`,
);
return false;
return {
compacted: false,
failureReason: compactResult.reason ?? "compaction did not reduce context",
};
}
await cliCompactionDeps.runContextEngineMaintenance({
contextEngine: params.contextEngine,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
reason: "compaction",
sessionManager: params.sessionManager,
runtimeContext,
config: params.cfg,
});
return true;
try {
await cliCompactionDeps.runContextEngineMaintenance({
contextEngine: params.contextEngine,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
reason: "compaction",
sessionManager: params.sessionManager,
runtimeContext,
config: params.cfg,
});
} catch (error) {
if (!params.bestEffortMaintenance) {
throw error;
}
log.warn(
`CLI transcript compaction maintenance failed after fallback for ${params.provider}/${params.model}: ${error instanceof Error ? error.message : String(error)}`,
);
}
return { compacted: true };
}
async function compactNativeHarnessCliTranscript(params: {
cfg: OpenClawConfig;
sessionId: string;
sessionKey: string;
sessionFile: string;
sessionEntry: SessionEntry;
workspaceDir: string;
agentDir: string;
provider: string;
model: string;
contextTokenBudget: number;
currentTokenCount: number;
contextEngine?: ContextEngine;
skillsSnapshot?: SkillSnapshot;
messageChannel?: string;
agentAccountId?: string;
senderIsOwner?: boolean;
thinkLevel?: Parameters<typeof buildEmbeddedCompactionRuntimeContext>[0]["thinkLevel"];
extraSystemPrompt?: string;
}): Promise<NativeHarnessCliCompactionOutcome> {
let result: EmbeddedPiCompactResult | undefined;
try {
const sessionAgentId = readAgentIdFromSessionKey(params.sessionKey);
const nativeHarnessId = params.sessionEntry.agentHarnessId?.trim();
await cliCompactionDeps.ensureSelectedAgentHarnessPlugin({
provider: params.provider,
modelId: params.model,
config: params.cfg,
sessionKey: params.sessionKey,
workspaceDir: params.workspaceDir,
...(sessionAgentId ? { agentId: sessionAgentId } : {}),
...(nativeHarnessId ? { agentHarnessRuntimeOverride: nativeHarnessId } : {}),
});
result = await compactWithSafetyTimeout(
(abortSignal) =>
cliCompactionDeps.maybeCompactAgentHarnessSession({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
workspaceDir: params.workspaceDir,
agentDir: params.agentDir,
config: params.cfg,
skillsSnapshot: params.skillsSnapshot,
provider: params.provider,
model: params.model,
contextTokenBudget: params.contextTokenBudget,
currentTokenCount: params.currentTokenCount,
trigger: "budget",
force: true,
messageChannel: params.messageChannel,
agentAccountId: params.agentAccountId,
senderIsOwner: params.senderIsOwner,
thinkLevel: params.thinkLevel,
extraSystemPrompt: params.extraSystemPrompt,
allowGatewaySubagentBinding: true,
...(params.contextEngine
? {
contextEngine: params.contextEngine,
contextEngineRuntimeContext: buildCliCompactionRuntimeContext({
sessionKey: params.sessionKey,
messageChannel: params.messageChannel,
agentAccountId: params.agentAccountId,
workspaceDir: params.workspaceDir,
agentDir: params.agentDir,
cfg: params.cfg,
skillsSnapshot: params.skillsSnapshot,
senderIsOwner: params.senderIsOwner,
provider: params.provider,
model: params.model,
thinkLevel: params.thinkLevel,
extraSystemPrompt: params.extraSystemPrompt,
currentTokenCount: params.currentTokenCount,
contextTokenBudget: params.contextTokenBudget,
trigger: "cli_native_budget",
}),
}
: {}),
...(nativeHarnessId ? { agentHarnessId: nativeHarnessId } : {}),
...(abortSignal ? { abortSignal } : {}),
}),
resolveCompactionTimeoutMs(params.cfg),
);
} catch (error) {
log.warn(
`CLI native harness compaction failed for ${params.provider}/${params.model}: ${error instanceof Error ? error.message : String(error)}`,
);
return {
compacted: false,
failureReason: error instanceof Error ? error.message : String(error),
};
}
if (!result?.compacted) {
const fallbackToContextEngine =
isUnsupportedNativeHarnessCompaction(result) ||
isRecoverableNativeHarnessCompactionFailure(result);
log.warn(
`CLI native harness compaction did not reduce context for ${params.provider}/${params.model}: ${result?.reason ?? "nothing to compact"}`,
);
return {
compacted: false,
fallbackToContextEngine,
failureReason: result?.reason ?? "native harness compaction did not reduce context",
};
}
return { compacted: true, result };
}
export async function runCliTurnCompactionLifecycle(params: {
@@ -209,6 +431,7 @@ export async function runCliTurnCompactionLifecycle(params: {
skillsSnapshot?: SkillSnapshot;
messageChannel?: string;
agentAccountId?: string;
senderIsOwner?: boolean;
thinkLevel?: Parameters<typeof buildEmbeddedCompactionRuntimeContext>[0]["thinkLevel"];
extraSystemPrompt?: string;
}): Promise<SessionEntry | undefined> {
@@ -218,8 +441,6 @@ export async function runCliTurnCompactionLifecycle(params: {
return params.sessionEntry;
}
cliCompactionDeps.ensureContextEnginesInitialized();
const contextEngine = await cliCompactionDeps.resolveContextEngine(params.cfg);
const sessionManager = cliCompactionDeps.openSessionManager(sessionFile);
const settingsManager = await cliCompactionDeps.createPreparedEmbeddedPiSettingsManager({
cwd: params.workspaceDir,
@@ -227,11 +448,6 @@ export async function runCliTurnCompactionLifecycle(params: {
cfg: params.cfg,
contextTokenBudget,
});
await cliCompactionDeps.applyPiAutoCompactionGuard({
settingsManager,
contextEngineInfo: contextEngine.info,
compactionMode: resolveEffectiveCompactionMode(params.cfg),
});
const preemptiveCompaction = cliCompactionDeps.shouldPreemptivelyCompactBeforePrompt({
messages: getSessionBranchMessages(sessionManager),
@@ -256,25 +472,101 @@ export async function runCliTurnCompactionLifecycle(params: {
return params.sessionEntry;
}
const compacted = await compactCliTranscript({
contextEngine,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionFile,
sessionManager,
cfg: params.cfg,
workspaceDir: params.workspaceDir,
agentDir: params.agentDir,
provider: params.provider,
model: params.model,
contextTokenBudget,
currentTokenCount,
skillsSnapshot: params.skillsSnapshot,
messageChannel: params.messageChannel,
agentAccountId: params.agentAccountId,
thinkLevel: params.thinkLevel,
extraSystemPrompt: params.extraSystemPrompt,
});
let compacted = false;
let nativeCompactionResult: EmbeddedPiCompactResult | undefined;
let useContextEngineCompaction = true;
let nativeFallbackToContextEngine = false;
let resolvedContextEngine: ContextEngine | undefined;
let autoCompactionGuardApplied = false;
const applyAutoCompactionGuard = async (contextEngine: ContextEngine): Promise<void> => {
if (autoCompactionGuardApplied) {
return;
}
autoCompactionGuardApplied = true;
await cliCompactionDeps.applyPiAutoCompactionGuard({
settingsManager,
contextEngineInfo: contextEngine.info,
compactionMode: resolveEffectiveCompactionMode(params.cfg),
});
};
if (isNativeHarnessCompactionSession(params.sessionEntry, params.provider)) {
cliCompactionDeps.ensureContextEnginesInitialized();
resolvedContextEngine = await cliCompactionDeps.resolveContextEngine(params.cfg);
await applyAutoCompactionGuard(resolvedContextEngine);
const nativeOutcome = await compactNativeHarnessCliTranscript({
cfg: params.cfg,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionFile,
sessionEntry: params.sessionEntry,
workspaceDir: params.workspaceDir,
agentDir: params.agentDir,
provider: params.provider,
model: params.model,
contextTokenBudget,
currentTokenCount,
contextEngine: resolvedContextEngine,
skillsSnapshot: params.skillsSnapshot,
messageChannel: params.messageChannel,
agentAccountId: params.agentAccountId,
senderIsOwner: params.senderIsOwner,
thinkLevel: params.thinkLevel,
extraSystemPrompt: params.extraSystemPrompt,
});
if (nativeOutcome.compacted) {
compacted = true;
nativeCompactionResult = nativeOutcome.result;
useContextEngineCompaction = false;
} else if (!nativeOutcome.fallbackToContextEngine) {
throw new Error(
`CLI native harness compaction failed for ${params.provider}/${params.model}: ${
nativeOutcome.failureReason ?? "compaction did not reduce context"
}`,
);
} else {
nativeFallbackToContextEngine = true;
}
}
if (useContextEngineCompaction) {
if (!resolvedContextEngine) {
cliCompactionDeps.ensureContextEnginesInitialized();
resolvedContextEngine = await cliCompactionDeps.resolveContextEngine(params.cfg);
}
const contextEngine = resolvedContextEngine;
await applyAutoCompactionGuard(contextEngine);
const contextOutcome = await compactCliTranscript({
contextEngine,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionFile,
sessionManager,
cfg: params.cfg,
workspaceDir: params.workspaceDir,
agentDir: params.agentDir,
provider: params.provider,
model: params.model,
contextTokenBudget,
currentTokenCount,
skillsSnapshot: params.skillsSnapshot,
messageChannel: params.messageChannel,
agentAccountId: params.agentAccountId,
senderIsOwner: params.senderIsOwner,
thinkLevel: params.thinkLevel,
extraSystemPrompt: params.extraSystemPrompt,
bestEffortMaintenance: nativeFallbackToContextEngine,
});
compacted = contextOutcome.compacted;
if (!compacted) {
throw new Error(
`CLI transcript compaction failed for ${params.provider}/${params.model}: ${
contextOutcome.failureReason ?? "compaction did not reduce context"
}`,
);
}
}
if (!compacted || !params.sessionStore || !params.storePath) {
return params.sessionEntry;
@@ -286,6 +578,9 @@ export async function runCliTurnCompactionLifecycle(params: {
sessionKey: params.sessionKey,
sessionStore: params.sessionStore,
storePath: params.storePath,
tokensAfter: nativeCompactionResult?.result?.tokensAfter,
newSessionId: nativeCompactionResult?.result?.sessionId,
newSessionFile: nativeCompactionResult?.result?.sessionFile,
})) ?? params.sessionEntry
);
}

View File

@@ -6,7 +6,11 @@ import type { OpenClawConfig } from "../../config/config.js";
import type { SessionEntry } from "../../config/sessions.js";
import { loadSessionStore } from "../../config/sessions.js";
import type { EmbeddedPiRunResult } from "../pi-embedded.js";
import { clearCliSessionInStore, updateSessionStoreAfterAgentRun } from "./session-store.js";
import {
clearCliSessionInStore,
recordCliCompactionInStore,
updateSessionStoreAfterAgentRun,
} from "./session-store.js";
import { resolveSession } from "./session.js";
vi.mock("../model-selection.js", () => ({
@@ -118,6 +122,15 @@ vi.mock("../../config/sessions.js", async () => {
return {};
}
},
canonicalizeAbsoluteSessionFilePath: (filePath: string) => path.resolve(filePath),
rewriteSessionFileForNewSessionId: (params: {
sessionFile?: string;
previousSessionId: string;
nextSessionId: string;
}) => params.sessionFile?.replace(params.previousSessionId, params.nextSessionId),
resolveSessionFilePathOptions: (params: unknown) => params,
resolveSessionFilePath: (sessionId: string, entry?: SessionEntry) =>
entry?.sessionFile ?? path.join("/tmp", `${sessionId}.jsonl`),
};
});
@@ -789,6 +802,49 @@ describe("updateSessionStoreAfterAgentRun", () => {
});
});
it("accepts zero compaction tokensAfter when provider usage is unavailable", async () => {
await withTempSessionStore(async ({ storePath }) => {
const cfg = {} as OpenClawConfig;
const sessionKey = "agent:main:explicit:test-zero-compaction-tokens-after";
const sessionId = "test-zero-compaction-tokens-after-session";
const sessionStore: Record<string, SessionEntry> = {
[sessionKey]: {
sessionId,
updatedAt: 1,
totalTokens: 12_000,
totalTokensFresh: true,
},
};
await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2));
await updateSessionStoreAfterAgentRun({
cfg,
sessionId,
sessionKey,
storePath,
sessionStore,
defaultProvider: "minimax",
defaultModel: "MiniMax-M2.7",
result: {
meta: {
durationMs: 500,
agentMeta: {
sessionId,
provider: "minimax",
model: "MiniMax-M2.7",
compactionCount: 1,
compactionTokensAfter: 0,
},
},
} as EmbeddedPiRunResult,
});
expect(sessionStore[sessionKey]?.totalTokens).toBe(0);
expect(sessionStore[sessionKey]?.totalTokensFresh).toBe(true);
expect(sessionStore[sessionKey]?.compactionCount).toBe(1);
});
});
it("ignores non-finite compaction tokensAfter values", async () => {
await withTempSessionStore(async ({ storePath }) => {
const cfg = {} as OpenClawConfig;
@@ -1242,6 +1298,130 @@ describe("updateSessionStoreAfterAgentRun", () => {
});
});
describe("recordCliCompactionInStore", () => {
it("persists native compaction token counts and clears stale CLI usage breakdown", async () => {
await withTempSessionStore(async ({ storePath }) => {
const sessionKey = "agent:main:explicit:test-record-cli-compaction";
const sessionId = "test-record-cli-compaction-session";
const sessionStore: Record<string, SessionEntry> = {
[sessionKey]: {
sessionId,
updatedAt: 1,
totalTokens: 12_000,
totalTokensFresh: true,
inputTokens: 9_000,
outputTokens: 100,
cacheRead: 2_900,
cacheWrite: 0,
cliSessionBindings: {
codex: {
sessionId: "stale-cli-session",
},
},
cliSessionIds: {
codex: "stale-cli-session",
},
},
};
await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2));
await recordCliCompactionInStore({
provider: "codex",
sessionKey,
sessionStore,
storePath,
tokensAfter: 0,
});
const persisted = loadSessionStore(storePath);
expect(sessionStore[sessionKey]?.compactionCount).toBe(1);
expect(sessionStore[sessionKey]?.totalTokens).toBe(0);
expect(sessionStore[sessionKey]?.totalTokensFresh).toBe(true);
expect(sessionStore[sessionKey]?.inputTokens).toBeUndefined();
expect(sessionStore[sessionKey]?.outputTokens).toBeUndefined();
expect(sessionStore[sessionKey]?.cacheRead).toBeUndefined();
expect(sessionStore[sessionKey]?.cacheWrite).toBeUndefined();
expect(sessionStore[sessionKey]?.cliSessionBindings?.codex).toBeUndefined();
expect(sessionStore[sessionKey]?.cliSessionIds?.codex).toBeUndefined();
expect(persisted[sessionKey]?.totalTokens).toBe(0);
expect(persisted[sessionKey]?.totalTokensFresh).toBe(true);
});
});
it("marks CLI token counts stale when native compaction returns no token count", async () => {
await withTempSessionStore(async ({ storePath }) => {
const sessionKey = "agent:main:explicit:test-record-cli-compaction-unknown";
const sessionId = "test-record-cli-compaction-unknown-session";
const sessionStore: Record<string, SessionEntry> = {
[sessionKey]: {
sessionId,
updatedAt: 1,
totalTokens: 37_000,
totalTokensFresh: true,
inputTokens: 30_000,
outputTokens: 100,
cacheRead: 6_900,
cacheWrite: 0,
},
};
await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2));
await recordCliCompactionInStore({
provider: "codex",
sessionKey,
sessionStore,
storePath,
});
const persisted = loadSessionStore(storePath);
expect(sessionStore[sessionKey]?.compactionCount).toBe(1);
expect(sessionStore[sessionKey]?.totalTokens).toBe(37_000);
expect(sessionStore[sessionKey]?.totalTokensFresh).toBe(false);
expect(sessionStore[sessionKey]?.inputTokens).toBeUndefined();
expect(sessionStore[sessionKey]?.outputTokens).toBeUndefined();
expect(sessionStore[sessionKey]?.cacheRead).toBeUndefined();
expect(sessionStore[sessionKey]?.cacheWrite).toBeUndefined();
expect(persisted[sessionKey]?.totalTokens).toBe(37_000);
expect(persisted[sessionKey]?.totalTokensFresh).toBe(false);
});
});
it("persists successor session handles from native CLI compaction", async () => {
await withTempSessionStore(async ({ dir, storePath }) => {
const sessionKey = "agent:main:explicit:test-record-cli-compaction-rotate";
const sessionId = "test-record-cli-compaction-rotate-session";
const nextSessionId = "test-record-cli-compaction-rotate-next";
const nextSessionFile = path.join(dir, `${nextSessionId}.jsonl`);
const sessionStore: Record<string, SessionEntry> = {
[sessionKey]: {
sessionId,
updatedAt: 1,
sessionFile: path.join(dir, `${sessionId}.jsonl`),
},
};
await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2));
await recordCliCompactionInStore({
provider: "codex",
sessionKey,
sessionStore,
storePath,
newSessionId: nextSessionId,
newSessionFile: nextSessionFile,
});
expect(sessionStore[sessionKey]?.sessionId).toBe(nextSessionId);
expect(sessionStore[sessionKey]?.sessionFile).toBe(nextSessionFile);
expect(sessionStore[sessionKey]?.usageFamilyKey).toBe(sessionKey);
expect(sessionStore[sessionKey]?.usageFamilySessionIds).toEqual([sessionId, nextSessionId]);
const persisted = loadSessionStore(storePath);
expect(persisted[sessionKey]?.sessionId).toBe(nextSessionId);
expect(persisted[sessionKey]?.sessionFile).toBe(nextSessionFile);
});
});
});
describe("clearCliSessionInStore", () => {
it("persists cleared Claude CLI bindings through session-store merge", async () => {
await withTempSessionStore(async ({ storePath }) => {

View File

@@ -1,10 +1,16 @@
import path from "node:path";
import {
canonicalizeAbsoluteSessionFilePath,
mergeSessionEntry,
resolveSessionFilePath,
resolveSessionFilePathOptions,
setSessionRuntimeModel,
type SessionEntry,
updateSessionStore,
rewriteSessionFileForNewSessionId,
} from "../../config/sessions.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js";
import { createLazyImportLoader } from "../../shared/lazy-promise.js";
import { normalizeOptionalString } from "../../shared/string-coerce.js";
import { clearCliSession, setCliSessionBinding, setCliSessionId } from "../cli-session.js";
@@ -87,7 +93,7 @@ export async function updateSessionStoreAfterAgentRun(params: {
const compactionTokensAfter =
typeof result.meta.agentMeta?.compactionTokensAfter === "number" &&
Number.isFinite(result.meta.agentMeta.compactionTokensAfter) &&
result.meta.agentMeta.compactionTokensAfter > 0
result.meta.agentMeta.compactionTokensAfter >= 0
? Math.floor(result.meta.agentMeta.compactionTokensAfter)
: undefined;
const compactionsThisRun = Math.max(0, result.meta.agentMeta?.compactionCount ?? 0);
@@ -271,6 +277,9 @@ export async function recordCliCompactionInStore(params: {
sessionKey: string;
sessionStore: Record<string, SessionEntry>;
storePath: string;
tokensAfter?: number;
newSessionId?: string;
newSessionFile?: string;
}): Promise<SessionEntry | undefined> {
const { provider, sessionKey, sessionStore, storePath } = params;
const entry = sessionStore[sessionKey];
@@ -282,6 +291,44 @@ export async function recordCliCompactionInStore(params: {
clearCliSession(next, provider);
next.compactionCount = (entry.compactionCount ?? 0) + 1;
next.updatedAt = Date.now();
const newSessionId = normalizeOptionalString(params.newSessionId);
const explicitNewSessionFile = normalizeOptionalString(params.newSessionFile);
const sessionIdChanged = Boolean(newSessionId && newSessionId !== entry.sessionId);
const sessionFileChanged = Boolean(
explicitNewSessionFile && explicitNewSessionFile !== entry.sessionFile,
);
if (sessionIdChanged && newSessionId) {
next.sessionId = newSessionId;
next.sessionFile =
explicitNewSessionFile ??
resolveCompactionSessionFile({
entry,
sessionKey,
storePath,
newSessionId,
});
next.usageFamilyKey = entry.usageFamilyKey ?? sessionKey;
next.usageFamilySessionIds = Array.from(
new Set([...(entry.usageFamilySessionIds ?? []), entry.sessionId, newSessionId]),
);
} else if (sessionFileChanged && explicitNewSessionFile) {
next.sessionFile = explicitNewSessionFile;
}
const tokensAfterCompaction = resolveNonNegativeNumber(params.tokensAfter);
if (tokensAfterCompaction !== undefined) {
next.totalTokens = Math.floor(tokensAfterCompaction);
next.totalTokensFresh = true;
next.inputTokens = undefined;
next.outputTokens = undefined;
next.cacheRead = undefined;
next.cacheWrite = undefined;
} else {
next.totalTokensFresh = false;
next.inputTokens = undefined;
next.outputTokens = undefined;
next.cacheRead = undefined;
next.cacheWrite = undefined;
}
const persisted = await updateSessionStore(storePath, (store) => {
const merged = mergeSessionEntry(store[sessionKey], next);
@@ -291,3 +338,30 @@ export async function recordCliCompactionInStore(params: {
sessionStore[sessionKey] = persisted;
return persisted;
}
function resolveCompactionSessionFile(params: {
entry: SessionEntry;
sessionKey: string;
storePath?: string;
newSessionId: string;
}): string {
const agentId = resolveAgentIdFromSessionKey(params.sessionKey);
const pathOpts = resolveSessionFilePathOptions({
agentId,
storePath: params.storePath,
});
const rewrittenSessionFile = rewriteSessionFileForNewSessionId({
sessionFile: params.entry.sessionFile,
previousSessionId: params.entry.sessionId,
nextSessionId: params.newSessionId,
});
const normalizedRewrittenSessionFile =
rewrittenSessionFile && path.isAbsolute(rewrittenSessionFile)
? canonicalizeAbsoluteSessionFilePath(rewrittenSessionFile)
: rewrittenSessionFile;
return resolveSessionFilePath(
params.newSessionId,
normalizedRewrittenSessionFile ? { sessionFile: normalizedRewrittenSessionFile } : undefined,
pathOpts,
);
}

View File

@@ -597,7 +597,7 @@ describe("selectAgentHarness", () => {
).toBe("codex");
});
it("does not compact a plugin-pinned session through PI when the plugin has no compactor", async () => {
it("ignores stale plugin pins during compaction when the provider no longer matches", async () => {
registerFailingCodexHarness();
await expect(
@@ -606,14 +606,31 @@ describe("selectAgentHarness", () => {
sessionKey: "agent:main:main",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp/workspace",
provider: "openai",
model: "gpt-5.4",
provider: "ollama",
model: "llama3.3",
agentHarnessId: "codex",
}),
).resolves.toBeUndefined();
});
it("does not compact a selected plugin harness through PI when the plugin has no compactor", async () => {
registerFailingCodexHarness();
await expect(
maybeCompactAgentHarnessSession({
sessionId: "session-1",
sessionKey: "agent:main:main",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp/workspace",
provider: "codex",
model: "gpt-5.5",
agentHarnessId: "codex",
}),
).resolves.toEqual({
ok: false,
compacted: false,
reason: 'Agent harness "codex" does not support compaction.',
failure: { reason: "unsupported_harness_compaction" },
});
});
});

View File

@@ -454,6 +454,7 @@ export async function maybeCompactAgentHarnessSession(
ok: false,
compacted: false,
reason: `Agent harness "${harness.id}" does not support compaction.`,
failure: { reason: "unsupported_harness_compaction" },
};
}
return undefined;

View File

@@ -1372,6 +1372,29 @@ describe("compactEmbeddedPiSession hooks (ownsCompaction engine)", () => {
});
});
it("falls back to context-engine compaction when native harness binding is recoverable", async () => {
maybeCompactAgentHarnessSessionMock.mockResolvedValueOnce({
ok: false,
compacted: false,
reason: "no codex app-server thread binding",
failure: { reason: "missing_thread_binding" },
});
const result = await compactEmbeddedPiSession(
wrappedCompactionArgs({
provider: "openai-codex",
model: "gpt-5.4",
agentHarnessId: "codex",
currentTokenCount: 333,
}),
);
expect(result.ok).toBe(true);
expect(result.compacted).toBe(true);
expect(maybeCompactAgentHarnessSessionMock).toHaveBeenCalledTimes(1);
expect(contextEngineCompactMock).toHaveBeenCalledTimes(1);
});
it("does not fire after_compaction when compaction fails", async () => {
hookRunner.hasHooks.mockReturnValue(true);
const sync = vi.fn(async () => {});

View File

@@ -48,6 +48,16 @@ import { readPiModelContextTokens } from "./model-context-tokens.js";
import { resolveModelAsync } from "./model.js";
import type { EmbeddedPiCompactResult } from "./types.js";
function shouldFallbackAfterHarnessCompaction(
result: EmbeddedPiCompactResult | undefined,
): boolean {
return (
result?.ok === false &&
(result.failure?.reason === "missing_thread_binding" ||
result.failure?.reason === "stale_thread_binding")
);
}
/**
* Compacts a session with lane queueing (session lane + global lane).
* Use this from outside a lane context. If already inside a lane, use
@@ -123,8 +133,13 @@ export async function compactEmbeddedPiSession(
contextEngineRuntimeContext,
});
if (harnessResult) {
await contextEngine.dispose?.();
return harnessResult;
if (!shouldFallbackAfterHarnessCompaction(harnessResult)) {
await contextEngine.dispose?.();
return harnessResult;
}
log.warn(
`native harness compaction could not use its session binding; falling back to context engine: ${harnessResult.reason ?? "unknown"}`,
);
}
const sessionLane = resolveSessionLane(params.sessionKey?.trim() || params.sessionId);
const globalLane = resolveGlobalLane(params.lane);
@@ -371,6 +386,7 @@ function buildCompactionContextEngineRuntimeContext(params: {
agentDir: params.agentDir,
config: params.params.config,
skillsSnapshot: params.params.skillsSnapshot,
senderIsOwner: params.params.senderIsOwner,
senderId: params.params.senderId,
provider: params.params.provider,
modelId: params.params.model,

View File

@@ -40,6 +40,7 @@ export type CompactEmbeddedPiSessionParams = {
agentDir?: string;
config?: OpenClawConfig;
skillsSnapshot?: SkillSnapshot;
senderIsOwner?: boolean;
provider?: string;
model?: string;
/** Effective model fallback chain for this session attempt. Undefined uses config defaults. */

View File

@@ -25,6 +25,7 @@ describe("buildEmbeddedCompactionRuntimeContext", () => {
workspaceDir: "/tmp/workspace",
agentDir: "/tmp/agent",
config: {} as OpenClawConfig,
senderIsOwner: true,
senderId: "user-123",
provider: "openai-codex",
modelId: "gpt-5.4",
@@ -43,6 +44,7 @@ describe("buildEmbeddedCompactionRuntimeContext", () => {
expect(result.authProfileId).toBe("openai:p1");
expect(result.workspaceDir).toBe("/tmp/workspace");
expect(result.agentDir).toBe("/tmp/agent");
expect(result.senderIsOwner).toBe(true);
expect(result.senderId).toBe("user-123");
expect(result.provider).toBe("openai-codex");
expect(result.model).toBe("gpt-5.4");

View File

@@ -21,6 +21,7 @@ export type EmbeddedCompactionRuntimeContext = {
agentDir: string;
config?: OpenClawConfig;
skillsSnapshot?: SkillSnapshot;
senderIsOwner?: boolean;
senderId?: string;
provider?: string;
model?: string;
@@ -88,6 +89,7 @@ export function buildEmbeddedCompactionRuntimeContext(params: {
agentDir: string;
config?: OpenClawConfig;
skillsSnapshot?: SkillSnapshot;
senderIsOwner?: boolean;
senderId?: string | null;
provider?: string | null;
modelId?: string | null;
@@ -125,6 +127,7 @@ export function buildEmbeddedCompactionRuntimeContext(params: {
agentDir: params.agentDir,
config: params.config,
skillsSnapshot: params.skillsSnapshot,
senderIsOwner: params.senderIsOwner,
senderId: params.senderId ?? undefined,
provider: resolved.provider,
model: resolved.model,

View File

@@ -1622,7 +1622,7 @@ export async function runEmbeddedPiAgent(
if (
typeof attempt.compactionTokensAfter === "number" &&
Number.isFinite(attempt.compactionTokensAfter) &&
attempt.compactionTokensAfter > 0
attempt.compactionTokensAfter >= 0
) {
lastCompactionTokensAfter = Math.floor(attempt.compactionTokensAfter);
}
@@ -1807,7 +1807,7 @@ export async function runEmbeddedPiAgent(
if (
typeof timeoutCompactResult.result?.tokensAfter === "number" &&
Number.isFinite(timeoutCompactResult.result.tokensAfter) &&
timeoutCompactResult.result.tokensAfter > 0
timeoutCompactResult.result.tokensAfter >= 0
) {
lastCompactionTokensAfter = Math.floor(timeoutCompactResult.result.tokensAfter);
}
@@ -2004,7 +2004,7 @@ export async function runEmbeddedPiAgent(
if (
typeof compactResult.result?.tokensAfter === "number" &&
Number.isFinite(compactResult.result.tokensAfter) &&
compactResult.result.tokensAfter > 0
compactResult.result.tokensAfter >= 0
) {
lastCompactionTokensAfter = Math.floor(compactResult.result.tokensAfter);
}

View File

@@ -3225,6 +3225,7 @@ export async function runEmbeddedAttempt(
onAssistantMessageStart: params.onAssistantMessageStart,
onExecutionPhase: params.onExecutionPhase,
onAgentEvent: params.onAgentEvent,
terminalLifecyclePhase: params.deferTerminalLifecycleEnd ? "finishing" : "end",
onBeforeLifecycleTerminal: () => {
// Clear embedded-run activity before emitting terminal lifecycle events so
// post-completion cleanup does not observe a logically finished run as active.

View File

@@ -195,6 +195,11 @@ export type RunEmbeddedPiAgentParams = {
data: Record<string, unknown>;
sessionKey?: string;
}) => void | Promise<void>;
/**
* Emit lifecycle "finishing" when the model turn ends; the caller owns the
* final lifecycle "end" after durable post-turn maintenance completes.
*/
deferTerminalLifecycleEnd?: boolean;
lane?: string;
enqueue?: CommandQueueEnqueueFn;
extraSystemPrompt?: string;

View File

@@ -146,11 +146,12 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext): void | Promise<
});
return;
}
const successPhase = ctx.params.terminalLifecyclePhase ?? "end";
emitAgentEvent({
runId: ctx.params.runId,
stream: "lifecycle",
data: {
phase: "end",
phase: successPhase,
...terminalMeta,
...(livenessState ? { livenessState } : {}),
...(replayInvalid ? { replayInvalid } : {}),
@@ -160,7 +161,7 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext): void | Promise<
void ctx.params.onAgentEvent?.({
stream: "lifecycle",
data: {
phase: "end",
phase: successPhase,
...terminalMeta,
...(livenessState ? { livenessState } : {}),
...(replayInvalid ? { replayInvalid } : {}),

View File

@@ -510,7 +510,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
compactionCount += 1;
};
const noteCompactionTokensAfter = (value: unknown) => {
if (typeof value !== "number" || !Number.isFinite(value) || value <= 0) {
if (typeof value !== "number" || !Number.isFinite(value) || value < 0) {
return;
}
state.lastCompactionTokensAfter = Math.floor(value);

View File

@@ -56,6 +56,7 @@ export type SubscribeEmbeddedPiSessionParams = {
data: Record<string, unknown>;
sessionKey?: string;
}) => void | Promise<void>;
terminalLifecyclePhase?: "end" | "finishing";
/** Best-effort hook invoked immediately before the terminal lifecycle event is emitted. */
onBeforeLifecycleTerminal?: () => void | Promise<void>;
enforceFinalTag?: boolean;

View File

@@ -855,6 +855,53 @@ describe("runMemoryFlushIfNeeded", () => {
expect(compactCall.currentTokenCount).toBe(347_000);
});
it("still compacts when a fresh persisted token total is over the threshold", async () => {
registerMemoryFlushPlanResolverForTest(() => ({
softThresholdTokens: 4_000,
forceFlushTranscriptBytes: 1_000_000_000,
reserveTokensFloor: 0,
prompt: "Pre-compaction memory flush.\nNO_REPLY",
systemPrompt: "Write memory to memory/YYYY-MM-DD.md.",
relativePath: "memory/2023-11-14.md",
}));
const sessionEntry: SessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 347_000,
totalTokensFresh: true,
agentHarnessId: "codex",
};
await runPreflightCompactionIfNeeded({
cfg: {
models: {
providers: {
openai: { models: [{ id: "gpt-5.5", contextWindow: 1_000_000 }] },
"openai-codex": { models: [{ id: "gpt-5.5", contextWindow: 350_000 }] },
},
},
agents: { defaults: { compaction: { memoryFlush: {} } } },
} as never,
followupRun: createTestFollowupRun({
provider: "openai",
model: "gpt-5.5",
sessionId: "session",
sessionKey: "main",
}),
defaultModel: "gpt-5.5",
sessionEntry,
sessionStore: { main: sessionEntry },
sessionKey: "main",
storePath: path.join(rootDir, "sessions.json"),
isHeartbeat: false,
replyOperation: createReplyOperation(),
});
expect(compactEmbeddedPiSessionMock).toHaveBeenCalledTimes(1);
const compactCall = requireCompactEmbeddedPiSessionCall();
expect(compactCall.currentTokenCount).toBe(347_000);
});
it("keeps the OpenAI API context window for persisted PI runtime overrides", async () => {
registerMemoryFlushPlanResolverForTest(() => ({
softThresholdTokens: 4_000,

View File

@@ -631,10 +631,6 @@ export async function runPreflightCompactionIfNeeded(params: {
typeof activeTranscriptBytes === "number" &&
typeof maxActiveTranscriptBytes === "number" &&
activeTranscriptBytes >= maxActiveTranscriptBytes;
const shouldUseTranscriptFallback = entry.totalTokensFresh === false || !hasPersistedTotalTokens;
if (!shouldUseTranscriptFallback && !shouldCompactByTranscriptBytes) {
return entry ?? params.sessionEntry;
}
const promptTokenEstimate = estimatePromptTokensForMemoryFlush(
params.promptForEstimate ?? params.followupRun.prompt,
);
@@ -743,10 +739,9 @@ export async function runPreflightCompactionIfNeeded(params: {
});
if (!result?.ok || !result.compacted) {
logVerbose(
`preflightCompaction skipped: sessionKey=${params.sessionKey} reason=${result?.reason ?? "not_compacted"}`,
);
return entry ?? params.sessionEntry;
const reason = result?.reason ?? "not_compacted";
logVerbose(`preflightCompaction failed: sessionKey=${params.sessionKey} reason=${reason}`);
throw new Error(`Preflight compaction required but failed: ${reason}`);
}
await incrementCompactionCount({

View File

@@ -2467,6 +2467,11 @@ describe("runReplyAgent fallback reasoning tags", () => {
provider: "google-gemini-cli",
model: "gemini-3",
}));
compactState.compactEmbeddedPiSessionMock.mockResolvedValueOnce({
ok: true,
compacted: true,
result: { tokensAfter: 1_000_000 },
});
await createRun({
sessionEntry: {

View File

@@ -529,6 +529,34 @@ describe("incrementCompactionCount", () => {
expect(stored[sessionKey].outputTokens).toBeUndefined();
});
it("accepts zero tokensAfter as a fresh post-compaction total", async () => {
const entry = {
sessionId: "s1",
updatedAt: Date.now(),
compactionCount: 0,
totalTokens: 180_000,
inputTokens: 170_000,
outputTokens: 10_000,
totalTokensFresh: true,
} as SessionEntry;
const { storePath, sessionKey, sessionStore } = await createCompactionSessionFixture(entry);
await incrementCompactionCount({
sessionEntry: entry,
sessionStore,
sessionKey,
storePath,
tokensAfter: 0,
});
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(stored[sessionKey].compactionCount).toBe(1);
expect(stored[sessionKey].totalTokens).toBe(0);
expect(stored[sessionKey].totalTokensFresh).toBe(true);
expect(stored[sessionKey].inputTokens).toBeUndefined();
expect(stored[sessionKey].outputTokens).toBeUndefined();
});
it("prefers explicit compactionTokensAfter over last-call usage for run accounting", async () => {
const entry = {
sessionId: "s1",
@@ -557,6 +585,34 @@ describe("incrementCompactionCount", () => {
expect(stored[sessionKey].totalTokensFresh).toBe(true);
});
it("preserves zero compactionTokensAfter for run accounting", async () => {
const entry = {
sessionId: "s1",
updatedAt: Date.now(),
compactionCount: 0,
totalTokens: 180_000,
} as SessionEntry;
const { storePath, sessionKey, sessionStore } = await createCompactionSessionFixture(entry);
await incrementRunCompactionCount({
sessionEntry: entry,
sessionStore,
sessionKey,
storePath,
compactionTokensAfter: 0,
lastCallUsage: {
input: 90_000,
output: 1_000,
total: 91_000,
},
contextTokensUsed: 200_000,
});
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(stored[sessionKey].totalTokens).toBe(0);
expect(stored[sessionKey].totalTokensFresh).toBe(true);
});
it("falls back to last-call usage when run compactionTokensAfter is non-finite", async () => {
const entry = {
sessionId: "s1",

View File

@@ -18,8 +18,8 @@ type IncrementRunCompactionCountParams = Omit<
newSessionFile?: string;
};
function resolvePositiveTokenCount(value: number | undefined): number | undefined {
return typeof value === "number" && Number.isFinite(value) && value > 0
function resolveNonNegativeTokenCount(value: number | undefined): number | undefined {
return typeof value === "number" && Number.isFinite(value) && value >= 0
? Math.floor(value)
: undefined;
}
@@ -32,7 +32,7 @@ export async function incrementRunCompactionCount(
params: IncrementRunCompactionCountParams,
): Promise<number | undefined> {
const tokensAfterCompaction =
resolvePositiveTokenCount(params.compactionTokensAfter) ??
resolveNonNegativeTokenCount(params.compactionTokensAfter) ??
(params.lastCallUsage
? deriveSessionTotalTokens({
usage: params.lastCallUsage,

View File

@@ -204,8 +204,8 @@ function emitCompactionSessionLifecycleHooks(params: {
}
}
function resolvePositiveTokenCount(value: number | undefined): number | undefined {
return typeof value === "number" && Number.isFinite(value) && value > 0
function resolveNonNegativeTokenCount(value: number | undefined): number | undefined {
return typeof value === "number" && Number.isFinite(value) && value >= 0
? Math.floor(value)
: undefined;
}
@@ -411,7 +411,7 @@ export async function incrementCompactionCount(params: {
updates.sessionFile = explicitNewSessionFile;
}
// If tokensAfter is provided, update the cached token counts to reflect post-compaction state
const tokensAfterCompaction = resolvePositiveTokenCount(tokensAfter);
const tokensAfterCompaction = resolveNonNegativeTokenCount(tokensAfter);
if (tokensAfterCompaction !== undefined) {
updates.totalTokens = tokensAfterCompaction;
updates.totalTokensFresh = true;

View File

@@ -240,6 +240,425 @@ describe("EmbeddedTuiBackend", () => {
]);
});
it("waits for local post-turn maintenance before emitting chat final", async () => {
const { EmbeddedTuiBackend } = await import("./embedded-backend.js");
const pending = deferred<{
payloads: Array<{ text: string }>;
meta: Record<string, unknown>;
}>();
agentCommandFromIngressMock.mockReturnValueOnce(pending.promise);
const backend = new EmbeddedTuiBackend();
const events: Array<{ event: string; payload: unknown }> = [];
backend.onEvent = (evt) => {
events.push({ event: evt.event, payload: evt.payload });
};
backend.start();
await backend.sendChat({
sessionKey: "agent:main:main",
message: "compact after final",
runId: "run-local-maintenance",
});
registeredListener?.({
runId: "run-local-maintenance",
stream: "assistant",
data: { text: "done", delta: "done" },
});
registeredListener?.({
runId: "run-local-maintenance",
stream: "lifecycle",
data: { phase: "end", stopReason: "stop" },
});
await flushMicrotasks();
expect(
events.some(
(entry) =>
entry.event === "chat" && (entry.payload as { state?: string }).state === "final",
),
).toBe(false);
pending.resolve({ payloads: [{ text: "done" }], meta: {} });
await flushMicrotasks();
expect(
events
.filter((entry) => entry.event === "chat")
.map((entry) => (entry.payload as { state?: string }).state),
).toEqual(["delta", "final"]);
});
it("waits for local post-turn maintenance during stop", async () => {
const { EmbeddedTuiBackend } = await import("./embedded-backend.js");
const pending = deferred<{
payloads: Array<{ text: string }>;
meta: Record<string, unknown>;
}>();
const abortListener = vi.fn();
agentCommandFromIngressMock.mockImplementationOnce((opts: { abortSignal?: AbortSignal }) => {
opts.abortSignal?.addEventListener("abort", abortListener);
return pending.promise;
});
const backend = new EmbeddedTuiBackend();
backend.start();
await backend.sendChat({
sessionKey: "agent:main:main",
message: "compact before shutdown",
runId: "run-local-stop-maintenance",
});
registeredListener?.({
runId: "run-local-stop-maintenance",
stream: "assistant",
data: { text: "done", delta: "done" },
});
registeredListener?.({
runId: "run-local-stop-maintenance",
stream: "lifecycle",
data: { phase: "end", stopReason: "stop" },
});
let stopped = false;
const stopPromise = backend.stop().then(() => {
stopped = true;
});
await flushMicrotasks();
expect(stopped).toBe(false);
expect(abortListener).not.toHaveBeenCalled();
expect(isEmbeddedMode()).toBe(true);
pending.resolve({ payloads: [{ text: "done" }], meta: {} });
await stopPromise;
expect(abortListener).not.toHaveBeenCalled();
expect(registeredListener).toBeUndefined();
expect(isEmbeddedMode()).toBe(false);
});
it("aborts local post-turn maintenance when stop grace elapses", async () => {
const previous = process.env.OPENCLAW_TUI_LOCAL_RUN_SHUTDOWN_GRACE_MS;
process.env.OPENCLAW_TUI_LOCAL_RUN_SHUTDOWN_GRACE_MS = "5";
try {
const { EmbeddedTuiBackend } = await import("./embedded-backend.js");
const pending = deferred<{
payloads: Array<{ text: string }>;
meta: Record<string, unknown>;
}>();
const abortListener = vi.fn();
agentCommandFromIngressMock.mockImplementationOnce((opts: { abortSignal?: AbortSignal }) => {
opts.abortSignal?.addEventListener("abort", abortListener);
return pending.promise;
});
const backend = new EmbeddedTuiBackend();
backend.start();
await backend.sendChat({
sessionKey: "agent:main:main",
message: "compact before shutdown",
runId: "run-local-stop-timeout",
});
registeredListener?.({
runId: "run-local-stop-timeout",
stream: "lifecycle",
data: { phase: "end", stopReason: "stop" },
});
let stopped = false;
const stopPromise = backend.stop().then(() => {
stopped = true;
});
await flushMicrotasks();
expect(stopped).toBe(false);
expect(abortListener).not.toHaveBeenCalled();
await vi.advanceTimersByTimeAsync(5);
await stopPromise;
expect(abortListener).toHaveBeenCalledTimes(1);
expect(isEmbeddedMode()).toBe(false);
} finally {
if (previous === undefined) {
delete process.env.OPENCLAW_TUI_LOCAL_RUN_SHUTDOWN_GRACE_MS;
} else {
process.env.OPENCLAW_TUI_LOCAL_RUN_SHUTDOWN_GRACE_MS = previous;
}
}
});
it("queues same-session sends behind local post-turn maintenance", async () => {
const { EmbeddedTuiBackend } = await import("./embedded-backend.js");
const first = deferred<{
payloads: Array<{ text: string }>;
meta: Record<string, unknown>;
}>();
const second = deferred<{
payloads: Array<{ text: string }>;
meta: Record<string, unknown>;
}>();
const firstAbortListener = vi.fn();
agentCommandFromIngressMock
.mockImplementationOnce((opts: { abortSignal?: AbortSignal }) => {
opts.abortSignal?.addEventListener("abort", firstAbortListener);
return first.promise;
})
.mockReturnValueOnce(second.promise);
const backend = new EmbeddedTuiBackend();
backend.start();
await backend.sendChat({
sessionKey: "agent:main:main",
message: "first",
runId: "run-local-first",
});
registeredListener?.({
runId: "run-local-first",
stream: "assistant",
data: { text: "first done", delta: "first done" },
});
registeredListener?.({
runId: "run-local-first",
stream: "lifecycle",
data: { phase: "finishing", stopReason: "stop" },
});
await backend.sendChat({
sessionKey: "agent:main:main",
message: "second",
runId: "run-local-second",
});
expect(firstAbortListener).not.toHaveBeenCalled();
expect(agentCommandFromIngressMock).toHaveBeenCalledTimes(1);
first.resolve({ payloads: [{ text: "first done" }], meta: {} });
await vi.waitFor(() => {
expect(agentCommandFromIngressMock).toHaveBeenCalledTimes(2);
});
second.resolve({ payloads: [{ text: "second done" }], meta: {} });
await flushMicrotasks();
});
it("queues same-session sends behind terminal local runs until maintenance settles", async () => {
const { EmbeddedTuiBackend } = await import("./embedded-backend.js");
const first = deferred<{
payloads: Array<{ text: string }>;
meta: Record<string, unknown>;
}>();
const second = deferred<{
payloads: Array<{ text: string }>;
meta: Record<string, unknown>;
}>();
agentCommandFromIngressMock
.mockReturnValueOnce(first.promise)
.mockReturnValueOnce(second.promise);
const backend = new EmbeddedTuiBackend();
backend.start();
await backend.sendChat({
sessionKey: "agent:main:main",
message: "first",
runId: "run-local-first",
});
registeredListener?.({
runId: "run-local-first",
stream: "lifecycle",
data: { phase: "end", stopReason: "stop" },
});
await backend.sendChat({
sessionKey: "agent:main:main",
message: "second",
runId: "run-local-second",
});
expect(agentCommandFromIngressMock).toHaveBeenCalledTimes(1);
first.resolve({ payloads: [{ text: "first done" }], meta: {} });
await vi.waitFor(() => {
expect(agentCommandFromIngressMock).toHaveBeenCalledTimes(2);
});
second.resolve({ payloads: [{ text: "second done" }], meta: {} });
await flushMicrotasks();
});
it("fails a queued local send when the previous finishing run does not settle", async () => {
const previous = process.env.OPENCLAW_TUI_LOCAL_RUN_SHUTDOWN_GRACE_MS;
process.env.OPENCLAW_TUI_LOCAL_RUN_SHUTDOWN_GRACE_MS = "5";
try {
const { EmbeddedTuiBackend } = await import("./embedded-backend.js");
const first = deferred<{
payloads: Array<{ text: string }>;
meta: Record<string, unknown>;
}>();
agentCommandFromIngressMock.mockReturnValueOnce(first.promise);
const backend = new EmbeddedTuiBackend();
const events: Array<{ event: string; payload: unknown }> = [];
backend.onEvent = (evt) => {
events.push({ event: evt.event, payload: evt.payload });
};
backend.start();
await backend.sendChat({
sessionKey: "agent:main:main",
message: "first",
runId: "run-local-first",
});
registeredListener?.({
runId: "run-local-first",
stream: "assistant",
data: { text: "first done", delta: "first done" },
});
registeredListener?.({
runId: "run-local-first",
stream: "lifecycle",
data: { phase: "finishing", stopReason: "stop" },
});
await backend.sendChat({
sessionKey: "agent:main:main",
message: "second",
runId: "run-local-second",
});
await vi.advanceTimersByTimeAsync(5);
await flushMicrotasks();
expect(agentCommandFromIngressMock).toHaveBeenCalledTimes(1);
expect(
events.some(
(entry) =>
entry.event === "chat" &&
(entry.payload as { runId?: string; state?: string; errorMessage?: string }).runId ===
"run-local-second" &&
(entry.payload as { state?: string }).state === "error" &&
((entry.payload as { errorMessage?: string }).errorMessage ?? "").includes(
"timed out waiting for previous local run",
),
),
).toBe(true);
} finally {
if (previous === undefined) {
delete process.env.OPENCLAW_TUI_LOCAL_RUN_SHUTDOWN_GRACE_MS;
} else {
process.env.OPENCLAW_TUI_LOCAL_RUN_SHUTDOWN_GRACE_MS = previous;
}
}
});
it("fails a queued local send immediately when shutdown grace is zero", async () => {
const previous = process.env.OPENCLAW_TUI_LOCAL_RUN_SHUTDOWN_GRACE_MS;
process.env.OPENCLAW_TUI_LOCAL_RUN_SHUTDOWN_GRACE_MS = "0";
try {
const { EmbeddedTuiBackend } = await import("./embedded-backend.js");
const first = deferred<{
payloads: Array<{ text: string }>;
meta: Record<string, unknown>;
}>();
agentCommandFromIngressMock.mockReturnValueOnce(first.promise);
const backend = new EmbeddedTuiBackend();
const events: Array<{ event: string; payload: unknown }> = [];
backend.onEvent = (evt) => {
events.push({ event: evt.event, payload: evt.payload });
};
backend.start();
await backend.sendChat({
sessionKey: "agent:main:main",
message: "first",
runId: "run-local-first",
});
registeredListener?.({
runId: "run-local-first",
stream: "lifecycle",
data: { phase: "finishing", stopReason: "stop" },
});
await backend.sendChat({
sessionKey: "agent:main:main",
message: "second",
runId: "run-local-second",
});
await flushMicrotasks();
expect(agentCommandFromIngressMock).toHaveBeenCalledTimes(1);
expect(
events.some(
(entry) =>
entry.event === "chat" &&
(entry.payload as { runId?: string; state?: string; errorMessage?: string }).runId ===
"run-local-second" &&
(entry.payload as { state?: string }).state === "error" &&
((entry.payload as { errorMessage?: string }).errorMessage ?? "").includes(
"timed out waiting for previous local run",
),
),
).toBe(true);
} finally {
if (previous === undefined) {
delete process.env.OPENCLAW_TUI_LOCAL_RUN_SHUTDOWN_GRACE_MS;
} else {
process.env.OPENCLAW_TUI_LOCAL_RUN_SHUTDOWN_GRACE_MS = previous;
}
}
});
it("clears local finishing state before surfacing a post-turn failure", async () => {
const { EmbeddedTuiBackend } = await import("./embedded-backend.js");
agentCommandFromIngressMock
.mockImplementationOnce(() => {
registeredListener?.({
runId: "run-local-first",
stream: "lifecycle",
data: { phase: "finishing", stopReason: "stop" },
});
throw new Error("post-turn compaction failed");
})
.mockResolvedValueOnce({ payloads: [{ text: "second done" }], meta: {} });
const backend = new EmbeddedTuiBackend();
let callsAfterSendDuringError = 0;
let sentDuringError: Promise<{ runId: string }> | undefined;
backend.onEvent = (evt) => {
const payload = evt.payload as { runId?: string; state?: string };
if (
evt.event === "chat" &&
payload.runId === "run-local-first" &&
payload.state === "error"
) {
sentDuringError = backend.sendChat({
sessionKey: "agent:main:main",
message: "second",
runId: "run-local-second",
});
callsAfterSendDuringError = agentCommandFromIngressMock.mock.calls.length;
}
};
backend.start();
await backend.sendChat({
sessionKey: "agent:main:main",
message: "first",
runId: "run-local-first",
});
await vi.waitFor(() => {
expect(sentDuringError).toBeDefined();
});
expect(callsAfterSendDuringError).toBe(2);
await sentDuringError;
await flushMicrotasks();
});
it("keeps final short replies like No after suppressing lead-fragment deltas", async () => {
const { EmbeddedTuiBackend } = await import("./embedded-backend.js");
const pending = deferred<{
@@ -642,7 +1061,7 @@ describe("EmbeddedTuiBackend", () => {
| undefined;
expect(ingressOptions?.timeout).toBe("300");
} finally {
backend.stop();
await backend.stop();
}
});
@@ -656,7 +1075,7 @@ describe("EmbeddedTuiBackend", () => {
expect(defaultRuntime.log).not.toBe(originalRuntimeLog);
expect(defaultRuntime.error).not.toBe(originalRuntimeError);
backend.stop();
await backend.stop();
expect(isEmbeddedMode()).toBe(false);
expect(defaultRuntime.log).toBe(originalRuntimeLog);

View File

@@ -48,6 +48,7 @@ import { type AgentEventPayload, onAgentEvent } from "../infra/agent-events.js";
import { setEmbeddedMode } from "../infra/embedded-mode.js";
import { defaultRuntime } from "../runtime.js";
import { INTERNAL_MESSAGE_CHANNEL } from "../utils/message-channel.js";
import { resolveLocalRunShutdownGraceMs } from "./local-run-shutdown.js";
import type {
ChatSendOptions,
TuiAgentsList,
@@ -64,6 +65,9 @@ type LocalRunState = {
lastBroadcastText?: string;
isBtw: boolean;
question?: string;
finishing: boolean;
lifecycleEnded: boolean;
lifecycleStopReason?: string;
finalSent: boolean;
registered: boolean;
};
@@ -118,6 +122,60 @@ function resolveDeltaPayload(text: string, previousText: string | undefined) {
return { deltaText: text.slice(previousText.length) };
}
async function waitForLocalRunShutdown(promises: Promise<void>[]): Promise<boolean> {
if (promises.length === 0) {
return true;
}
const timeoutMs = resolveLocalRunShutdownGraceMs();
if (timeoutMs <= 0) {
return false;
}
let timeout: ReturnType<typeof setTimeout> | undefined;
let completed = false;
await Promise.race([
Promise.allSettled(promises).then(() => {
completed = true;
}),
new Promise<void>((resolve) => {
timeout = setTimeout(resolve, timeoutMs);
timeout.unref?.();
}),
]);
if (timeout) {
clearTimeout(timeout);
}
return completed;
}
async function waitForQueuedLocalRun(previousRun: Promise<void>, runId: string): Promise<void> {
const timeoutMs = resolveLocalRunShutdownGraceMs();
if (timeoutMs <= 0) {
throw new Error(
`timed out waiting for previous local run to finish post-turn maintenance for ${runId}`,
);
}
let timeout: ReturnType<typeof setTimeout> | undefined;
try {
await Promise.race([
previousRun,
new Promise<void>((_, reject) => {
timeout = setTimeout(() => {
reject(
new Error(
`timed out waiting for previous local run to finish post-turn maintenance for ${runId}`,
),
);
}, timeoutMs);
timeout.unref?.();
}),
]);
} finally {
if (timeout) {
clearTimeout(timeout);
}
}
}
export class EmbeddedTuiBackend implements TuiBackend {
readonly connection = { url: "local embedded" };
@@ -128,6 +186,7 @@ export class EmbeddedTuiBackend implements TuiBackend {
private readonly deps = createDefaultDeps();
private readonly runs = new Map<string, LocalRunState>();
private readonly runPromises = new Map<string, Promise<void>>();
private unsubscribe?: () => void;
private previousRuntimeLog?: typeof defaultRuntime.log;
private previousRuntimeError?: typeof defaultRuntime.error;
@@ -154,14 +213,34 @@ export class EmbeddedTuiBackend implements TuiBackend {
});
}
stop() {
async stop() {
const maintenancePromises: Promise<void>[] = [];
for (const [runId, run] of this.runs) {
if (run.finishing || run.lifecycleEnded) {
const promise = this.runPromises.get(runId);
if (promise) {
maintenancePromises.push(promise);
}
continue;
}
run.controller.abort();
}
const maintenanceCompleted = await waitForLocalRunShutdown(maintenancePromises);
if (!maintenanceCompleted) {
for (const run of this.runs.values()) {
if (run.finishing || run.lifecycleEnded) {
run.controller.abort();
}
}
}
this.unsubscribe?.();
this.unsubscribe = undefined;
this.clearPendingLifecycleErrors();
for (const run of this.runs.values()) {
run.controller.abort();
}
this.clearPendingLifecycleErrors();
this.runs.clear();
this.runPromises.clear();
defaultRuntime.log = this.previousRuntimeLog ?? defaultRuntime.log;
defaultRuntime.error = this.previousRuntimeError ?? defaultRuntime.error;
this.previousRuntimeLog = undefined;
@@ -172,8 +251,11 @@ export class EmbeddedTuiBackend implements TuiBackend {
async sendChat(opts: ChatSendOptions): Promise<{ runId: string }> {
const runId = opts.runId ?? randomUUID();
const question = resolveBtwQuestion(opts.message);
const queuedAfter = question ? undefined : this.findPendingSessionRunPromise(opts.sessionKey);
if (!question) {
this.abortSessionRuns(opts.sessionKey);
if (!queuedAfter) {
this.abortSessionRuns(opts.sessionKey);
}
}
const controller = new AbortController();
this.runs.set(runId, {
@@ -182,11 +264,13 @@ export class EmbeddedTuiBackend implements TuiBackend {
buffer: "",
isBtw: Boolean(question),
question,
finishing: false,
lifecycleEnded: false,
finalSent: false,
registered: false,
});
void this.runTurn({
const runPromise = this.runTurn({
runId,
sessionKey: opts.sessionKey,
message: opts.message,
@@ -194,6 +278,11 @@ export class EmbeddedTuiBackend implements TuiBackend {
deliver: opts.deliver,
timeoutMs: opts.timeoutMs,
controller,
queuedAfter,
});
this.runPromises.set(runId, runPromise);
void runPromise.finally(() => {
this.runPromises.delete(runId);
});
return { runId };
@@ -204,6 +293,9 @@ export class EmbeddedTuiBackend implements TuiBackend {
if (!run || run.sessionKey !== opts.sessionKey) {
return { ok: true, aborted: false };
}
if (run.lifecycleEnded) {
return { ok: true, aborted: false };
}
run.controller.abort();
return { ok: true, aborted: true };
}
@@ -356,12 +448,21 @@ export class EmbeddedTuiBackend implements TuiBackend {
private abortSessionRuns(sessionKey: string) {
for (const run of this.runs.values()) {
if (run.sessionKey === sessionKey && !run.isBtw) {
if (run.sessionKey === sessionKey && !run.isBtw && !run.lifecycleEnded && !run.finishing) {
run.controller.abort();
}
}
}
private findPendingSessionRunPromise(sessionKey: string): Promise<void> | undefined {
for (const [runId, run] of this.runs) {
if (run.sessionKey === sessionKey && !run.isBtw && (run.finishing || run.lifecycleEnded)) {
return this.runPromises.get(runId);
}
}
return undefined;
}
private nextSeq() {
this.seq += 1;
return this.seq;
@@ -430,10 +531,13 @@ export class EmbeddedTuiBackend implements TuiBackend {
private emitChatFinal(runId: string, run: LocalRunState, stopReason?: string) {
this.clearPendingLifecycleError(runId);
if (run.finalSent) {
const alreadyFinal = run.finalSent;
run.finishing = false;
run.lifecycleEnded = true;
run.finalSent = true;
if (alreadyFinal) {
return;
}
run.finalSent = true;
run.registered = true;
run.lastBroadcastText = undefined;
const projected = projectLiveAssistantBufferedText(run.buffer.trim(), {
@@ -460,10 +564,13 @@ export class EmbeddedTuiBackend implements TuiBackend {
private emitChatAborted(runId: string, run: LocalRunState) {
this.clearPendingLifecycleError(runId);
if (run.finalSent) {
const alreadyFinal = run.finalSent;
run.finishing = false;
run.lifecycleEnded = true;
run.finalSent = true;
if (alreadyFinal) {
return;
}
run.finalSent = true;
run.registered = true;
run.lastBroadcastText = undefined;
this.emit("chat", {
@@ -475,10 +582,13 @@ export class EmbeddedTuiBackend implements TuiBackend {
private emitChatError(runId: string, run: LocalRunState, errorMessage?: string) {
this.clearPendingLifecycleError(runId);
if (run.finalSent) {
const alreadyFinal = run.finalSent;
run.finishing = false;
run.lifecycleEnded = true;
run.finalSent = true;
if (alreadyFinal) {
return;
}
run.finalSent = true;
run.registered = true;
run.lastBroadcastText = undefined;
this.emit("chat", {
@@ -555,20 +665,26 @@ export class EmbeddedTuiBackend implements TuiBackend {
const phase = lifecyclePhase;
const aborted = evt.data?.aborted === true || run.controller.signal.aborted;
if (phase === "finishing") {
run.finishing = true;
run.lifecycleStopReason =
typeof evt.data?.stopReason === "string" ? evt.data.stopReason : undefined;
return;
}
if (phase === "end") {
run.finishing = false;
if (aborted) {
this.emitChatAborted(evt.runId, run);
return;
}
if (!run.isBtw) {
const stopReason =
typeof evt.data?.stopReason === "string" ? evt.data.stopReason : undefined;
this.emitChatFinal(evt.runId, run, stopReason);
}
run.lifecycleEnded = true;
run.lifecycleStopReason =
typeof evt.data?.stopReason === "string" ? evt.data.stopReason : undefined;
return;
}
if (phase === "error") {
run.finishing = false;
if (aborted) {
this.emitChatAborted(evt.runId, run);
return;
@@ -587,8 +703,32 @@ export class EmbeddedTuiBackend implements TuiBackend {
deliver?: boolean;
timeoutMs?: number;
controller: AbortController;
queuedAfter?: Promise<void>;
}) {
try {
if (params.queuedAfter) {
try {
await waitForQueuedLocalRun(params.queuedAfter, params.runId);
} catch (error) {
const run = this.runs.get(params.runId);
if (run) {
const errorMessage = error instanceof Error ? error.message : String(error);
this.emitChatError(
params.runId,
run,
`previous run did not finish cleanly: ${errorMessage}`,
);
}
return;
}
if (params.controller.signal.aborted) {
const run = this.runs.get(params.runId);
if (run) {
this.emitChatAborted(params.runId, run);
}
return;
}
}
const { cfg, canonicalKey, entry } = loadSessionEntry(params.sessionKey);
const result = await agentCommandFromIngress(
{
@@ -634,7 +774,10 @@ export class EmbeddedTuiBackend implements TuiBackend {
if (normalizedText && !run.buffer) {
run.buffer = normalizedText;
}
this.emitChatFinal(params.runId, run);
const stopReason =
run.lifecycleStopReason ??
(typeof result?.meta?.stopReason === "string" ? result.meta.stopReason : undefined);
this.emitChatFinal(params.runId, run, stopReason);
}
} catch (error) {
const run = this.runs.get(params.runId);

View File

@@ -0,0 +1,10 @@
const LOCAL_RUN_SHUTDOWN_GRACE_MS = 120_000;
export function resolveLocalRunShutdownGraceMs(): number {
const raw = process.env.OPENCLAW_TUI_LOCAL_RUN_SHUTDOWN_GRACE_MS?.trim();
const parsed = raw ? Number.parseInt(raw, 10) : Number.NaN;
if (Number.isFinite(parsed) && parsed >= 0) {
return parsed;
}
return LOCAL_RUN_SHUTDOWN_GRACE_MS;
}

View File

@@ -108,7 +108,7 @@ export type TuiBackend = {
onDisconnected?: (reason: string) => void;
onGap?: (info: { expected: number; received: number }) => void;
start: () => void;
stop: () => void;
stop: () => void | Promise<void>;
sendChat: (opts: ChatSendOptions) => Promise<{ runId: string }>;
abortChat: (opts: {
sessionKey: string;

View File

@@ -63,6 +63,7 @@ function createHarness(params?: {
isConnected?: boolean;
activeChatRunId?: string | null;
pendingOptimisticUserMessage?: boolean;
activityStatus?: string;
opts?: { local?: boolean };
currentSessionId?: string | null;
}) {
@@ -98,6 +99,7 @@ function createHarness(params?: {
activeChatRunId: params?.activeChatRunId ?? null,
pendingOptimisticUserMessage: params?.pendingOptimisticUserMessage ?? false,
pendingChatRunId: null as string | null,
activityStatus: params?.activityStatus ?? "idle",
isConnected: params?.isConnected ?? true,
sessionInfo: {},
};
@@ -507,6 +509,37 @@ describe("tui command handlers", () => {
expect(state.activeChatRunId).toBe("run-active");
});
it("allows local sends to queue while the current run is finishing", async () => {
const { handleCommand, sendChat, addUser, addSystem } = createHarness({
opts: { local: true },
activeChatRunId: "run-active",
activityStatus: "finishing context",
});
await handleCommand("/context detail");
expect(sendChat).toHaveBeenCalledTimes(1);
expect(addUser).toHaveBeenCalledWith("/context detail");
expect(addSystem).not.toHaveBeenCalledWith(
"agent is busy — press Esc to abort before sending a new message",
);
});
it("rejects gateway sends while the current run is finishing", async () => {
const { handleCommand, sendChat, addUser, addSystem } = createHarness({
activeChatRunId: "run-active",
activityStatus: "finishing context",
});
await handleCommand("/context detail");
expect(sendChat).not.toHaveBeenCalled();
expect(addUser).not.toHaveBeenCalled();
expect(addSystem).toHaveBeenCalledWith(
"agent is busy — press Esc to abort before sending a new message",
);
});
it("runs /auth through the local auth flow and refreshes session info", async () => {
const refreshSessionInfo = vi.fn().mockResolvedValue(undefined);
const runAuthFlow = vi.fn().mockResolvedValue({ exitCode: 0, signal: null });

View File

@@ -632,8 +632,13 @@ export function createCommandHandlers(context: CommandHandlerContext) {
return;
}
const isBtw = isBtwCommand(text);
const canQueueBehindLocalFinishingTurn =
opts.local === true && state.activityStatus === "finishing context";
if (
!isBtw &&
(!canQueueBehindLocalFinishingTurn ||
state.pendingChatRunId ||
state.pendingOptimisticUserMessage) &&
(state.activeChatRunId || state.pendingChatRunId || state.pendingOptimisticUserMessage)
) {
chatLog.addSystem("agent is busy — press Esc to abort before sending a new message");

View File

@@ -270,6 +270,106 @@ describe("tui-event-handlers: handleAgentEvent", () => {
expect(tui.requestRender).toHaveBeenCalled();
});
it("shows finishing context for a pending run before chat registration", () => {
const { state, tui, setActivityStatus, handleAgentEvent, isLocalRunId } = createHandlersHarness(
{
localMode: true,
state: {
activeChatRunId: null,
pendingChatRunId: "run-pending",
pendingOptimisticUserMessage: true,
},
},
);
handleAgentEvent({
runId: "run-pending",
stream: "lifecycle",
data: { phase: "finishing" },
});
expect(state.activeChatRunId).toBe("run-pending");
expect(state.pendingChatRunId).toBeNull();
expect(state.pendingOptimisticUserMessage).toBe(false);
expect(isLocalRunId("run-pending")).toBe(true);
expect(setActivityStatus).toHaveBeenCalledWith("finishing context");
expect(tui.requestRender).toHaveBeenCalled();
});
it("shows finishing context for a known run after assistant final", () => {
const { state, tui, setActivityStatus, handleChatEvent, handleAgentEvent } =
createHandlersHarness({
state: { activeChatRunId: null },
});
handleChatEvent({
runId: "run-final",
sessionKey: state.currentSessionKey,
state: "final",
message: { content: [{ type: "text", text: "done" }] },
});
setActivityStatus.mockClear();
tui.requestRender.mockClear();
handleAgentEvent({
runId: "run-final",
stream: "lifecycle",
data: { phase: "finishing" },
});
expect(setActivityStatus).toHaveBeenCalledWith("finishing context");
expect(tui.requestRender).toHaveBeenCalled();
setActivityStatus.mockClear();
tui.requestRender.mockClear();
handleAgentEvent({
runId: "run-final",
stream: "lifecycle",
data: { phase: "end" },
});
expect(setActivityStatus).toHaveBeenCalledWith("idle");
expect(tui.requestRender).toHaveBeenCalled();
});
it("does not let delayed finalized-run lifecycle clobber a newer active run", () => {
const { state, tui, setActivityStatus, handleChatEvent, handleAgentEvent } =
createHandlersHarness({
state: { activeChatRunId: null },
});
handleChatEvent({
runId: "run-old",
sessionKey: state.currentSessionKey,
state: "final",
message: { content: [{ type: "text", text: "old done" }] },
});
handleChatEvent({
runId: "run-new",
sessionKey: state.currentSessionKey,
state: "delta",
message: { content: "new running" },
});
setActivityStatus.mockClear();
tui.requestRender.mockClear();
handleAgentEvent({
runId: "run-old",
stream: "lifecycle",
data: { phase: "finishing" },
});
handleAgentEvent({
runId: "run-old",
stream: "lifecycle",
data: { phase: "end" },
});
expect(state.activeChatRunId).toBe("run-new");
expect(setActivityStatus).not.toHaveBeenCalled();
expect(tui.requestRender).not.toHaveBeenCalled();
});
it("ignores fallback model updates for unrelated runs", () => {
const { state, tui, handleAgentEvent } = createHandlersHarness({
state: {

View File

@@ -72,6 +72,7 @@ export function createEventHandlers(context: EventHandlerContext) {
} = context;
const finalizedRuns = new Map<string, number>();
const sessionRuns = new Map<string, number>();
const postFinalizingRuns = new Map<string, number>();
let streamAssembler = new TuiStreamAssembler();
let lastSessionKey = state.currentSessionKey;
let pendingHistoryRefresh = false;
@@ -170,6 +171,7 @@ export function createEventHandlers(context: EventHandlerContext) {
lastSessionKey = state.currentSessionKey;
finalizedRuns.clear();
sessionRuns.clear();
postFinalizingRuns.clear();
streamAssembler = new TuiStreamAssembler();
pendingHistoryRefresh = false;
state.pendingOptimisticUserMessage = false;
@@ -233,6 +235,11 @@ export function createEventHandlers(context: EventHandlerContext) {
pruneRunMap(finalizedRuns);
};
const notePostFinalizingRun = (runId: string) => {
postFinalizingRuns.set(runId, Date.now());
pruneRunMap(postFinalizingRuns);
};
const clearActiveRunIfMatch = (runId: string) => {
if (state.activeChatRunId === runId) {
state.activeChatRunId = null;
@@ -510,7 +517,7 @@ export function createEventHandlers(context: EventHandlerContext) {
tui.requestRender();
return;
}
const isKnownRun = isActiveRun || isSessionRun || finalizedRuns.has(evt.runId);
const isKnownRun = isActiveRun || isPendingRun || isSessionRun || finalizedRuns.has(evt.runId);
if (!isKnownRun) {
return;
}
@@ -553,20 +560,54 @@ export function createEventHandlers(context: EventHandlerContext) {
return;
}
if (evt.stream === "lifecycle") {
if (!isActiveRun) {
return;
if (isPendingRun) {
noteSessionRun(evt.runId);
state.activeChatRunId = evt.runId;
state.pendingChatRunId = null;
if (state.pendingOptimisticUserMessage) {
if (localMode) {
noteLocalRunId?.(evt.runId);
}
state.pendingOptimisticUserMessage = false;
}
}
const phase = typeof evt.data?.phase === "string" ? evt.data.phase : "";
if (phase && phase !== "end" && phase !== "error") {
const isPostFinalizingRun = postFinalizingRuns.has(evt.runId);
const isPostFinalTerminalPhase =
isPostFinalizingRun && (phase === "end" || phase === "error");
if (!isActiveRun && !isPendingRun && phase !== "finishing" && !isPostFinalTerminalPhase) {
return;
}
const canUpdateActivityStatus = !hasConcurrentActiveRun(evt.runId);
if (phase && phase !== "end" && phase !== "error" && phase !== "finishing") {
armStreamingWatchdog(evt.runId);
}
if (phase === "start") {
if (!canUpdateActivityStatus) {
return;
}
setActivityStatus("running");
}
if (phase === "finishing") {
notePostFinalizingRun(evt.runId);
if (!canUpdateActivityStatus) {
return;
}
clearStreamingWatchdog();
setActivityStatus("finishing context");
}
if (phase === "end") {
postFinalizingRuns.delete(evt.runId);
if (!canUpdateActivityStatus) {
return;
}
setActivityStatus("idle");
}
if (phase === "error") {
postFinalizingRuns.delete(evt.runId);
if (!canUpdateActivityStatus) {
return;
}
setActivityStatus("error");
}
tui.requestRender();

View File

@@ -406,6 +406,63 @@ describe("tui session actions", () => {
expect(requestRender).toHaveBeenCalledOnce();
});
it("does not abort local post-turn maintenance while finishing context", async () => {
const abortChat = vi.fn().mockResolvedValue({ ok: true, aborted: true });
const addSystem = vi.fn();
const requestRender = vi.fn();
const state = createBaseState({
activeChatRunId: "run-finishing",
pendingChatRunId: null,
activityStatus: "finishing context",
});
const { abortActive } = createTestSessionActions({
client: { listSessions: vi.fn(), abortChat } as unknown as TuiBackend,
chatLog: {
addSystem,
clearAll: vi.fn(),
} as unknown as import("./components/chat-log.js").ChatLog,
tui: { requestRender } as unknown as import("@earendil-works/pi-tui").TUI,
opts: { local: true },
state,
});
await abortActive();
expect(abortChat).not.toHaveBeenCalled();
expect(addSystem).toHaveBeenCalledWith(
"agent is finishing context; wait for it to finish before aborting",
);
expect(requestRender).toHaveBeenCalled();
expect(state.activeChatRunId).toBe("run-finishing");
});
it("aborts the queued pending run after a local finishing turn accepts the next send", async () => {
const abortChat = vi.fn().mockResolvedValue({ ok: true, aborted: true });
const setActivityStatus = vi.fn();
const state = createBaseState({
activeChatRunId: "run-finishing",
pendingChatRunId: "run-queued",
activityStatus: "waiting",
});
const { abortActive } = createTestSessionActions({
client: { listSessions: vi.fn(), abortChat } as unknown as TuiBackend,
opts: { local: true },
state,
setActivityStatus,
});
await abortActive();
expect(abortChat).toHaveBeenCalledWith({
sessionKey: "agent:main:main",
runId: "run-queued",
});
expect(state.pendingChatRunId).toBeNull();
expect(setActivityStatus).toHaveBeenCalledWith("aborted");
});
it("remembers the selected session after history loads", async () => {
const listSessions = vi.fn().mockResolvedValue({
ts: Date.now(),

View File

@@ -395,7 +395,19 @@ export function createSessionActions(context: SessionActionContext) {
};
const abortActive = async () => {
const runId = state.activeChatRunId ?? state.pendingChatRunId ?? null;
if (
opts.local === true &&
state.activityStatus === "finishing context" &&
!state.pendingChatRunId
) {
chatLog.addSystem("agent is finishing context; wait for it to finish before aborting");
tui.requestRender();
return;
}
const runId =
opts.local === true && state.activeChatRunId && state.pendingChatRunId
? state.pendingChatRunId
: (state.activeChatRunId ?? state.pendingChatRunId ?? null);
if (!runId) {
chatLog.addSystem("no active run", { coalesceConsecutive: true });
tui.requestRender();

View File

@@ -5,6 +5,7 @@ import { MALFORMED_STREAMING_FRAGMENT_ERROR_MESSAGE } from "../shared/assistant-
import { getSlashCommands, parseCommand } from "./commands.js";
import {
createBackspaceDeduper,
canSubmitTuiChatMessage,
createDeferredTuiFinish,
drainAndStopTuiSafely,
installTuiTerminalLossExitHandler,
@@ -15,10 +16,12 @@ import {
resolveFinalAssistantText,
resolveGatewayDisconnectState,
resolveInitialTuiAgentId,
isTuiBusyActivityStatus,
resolveLocalAuthCliInvocation,
resolveLocalAuthSpawnCwd,
resolveLocalAuthSpawnOptions,
resolveTuiCtrlCAction,
resolveTuiShutdownHardExitMs,
resolveTuiSessionKey,
scheduleProcessExitAfterTuiReturn,
stopTuiSafely,
@@ -84,6 +87,65 @@ describe("tui slash commands", () => {
});
});
describe("canSubmitTuiChatMessage", () => {
it("allows local queued submit while a run is finishing", () => {
expect(
canSubmitTuiChatMessage({
local: true,
activityStatus: "finishing context",
activeChatRunId: "run-active",
}),
).toBe(true);
});
it("does not allow gateway submit while a run is finishing", () => {
expect(
canSubmitTuiChatMessage({
local: false,
activityStatus: "finishing context",
activeChatRunId: "run-active",
}),
).toBe(false);
});
it("blocks submits with pending optimistic state", () => {
expect(
canSubmitTuiChatMessage({
local: true,
activityStatus: "finishing context",
activeChatRunId: "run-active",
pendingOptimisticUserMessage: true,
}),
).toBe(false);
});
});
describe("isTuiBusyActivityStatus", () => {
it("treats finishing context as a visible busy status", () => {
expect(isTuiBusyActivityStatus("finishing context")).toBe(true);
});
});
describe("resolveTuiShutdownHardExitMs", () => {
it("keeps gateway shutdown bounded by the hard-exit timer", () => {
expect(resolveTuiShutdownHardExitMs({ localMode: false })).toBe(2000);
});
it("adds local run shutdown grace before forcing embedded shutdown", () => {
const previous = process.env.OPENCLAW_TUI_LOCAL_RUN_SHUTDOWN_GRACE_MS;
process.env.OPENCLAW_TUI_LOCAL_RUN_SHUTDOWN_GRACE_MS = "3456";
try {
expect(resolveTuiShutdownHardExitMs({ localMode: true })).toBe(5456);
} finally {
if (previous === undefined) {
delete process.env.OPENCLAW_TUI_LOCAL_RUN_SHUTDOWN_GRACE_MS;
} else {
process.env.OPENCLAW_TUI_LOCAL_RUN_SHUTDOWN_GRACE_MS = previous;
}
}
});
});
describe("resolveTuiSessionKey", () => {
it("uses global only as the default when scope is global", () => {
expect(

View File

@@ -29,6 +29,7 @@ import { getSlashCommands } from "./commands.js";
import { ChatLog } from "./components/chat-log.js";
import { CustomEditor } from "./components/custom-editor.js";
import { GatewayChatClient } from "./gateway-chat.js";
import { resolveLocalRunShutdownGraceMs } from "./local-run-shutdown.js";
import { editorTheme, theme } from "./theme/theme.js";
import type { TuiBackend } from "./tui-backend.js";
import { createCommandHandlers } from "./tui-command-handlers.js";
@@ -365,6 +366,36 @@ export async function drainAndStopTuiSafely(tui: DrainableTui): Promise<void> {
stopTuiSafely(() => tui.stop());
}
export function canSubmitTuiChatMessage(params: {
local?: boolean;
activityStatus: string;
activeChatRunId?: string | null;
pendingChatRunId?: string | null;
pendingOptimisticUserMessage?: boolean;
}): boolean {
const pending = Boolean(params.pendingChatRunId) || params.pendingOptimisticUserMessage === true;
if (params.activeChatRunId) {
return params.local === true && params.activityStatus === "finishing context" && !pending;
}
return !pending;
}
const TUI_BUSY_ACTIVITY_STATUSES = new Set([
"sending",
"waiting",
"streaming",
"running",
"finishing context",
]);
export function isTuiBusyActivityStatus(status: string): boolean {
return TUI_BUSY_ACTIVITY_STATUSES.has(status);
}
export function resolveTuiShutdownHardExitMs(params: { localMode?: boolean } = {}): number {
return TUI_SHUTDOWN_HARD_EXIT_MS + (params.localMode ? resolveLocalRunShutdownGraceMs() : 0);
}
export function scheduleProcessExitAfterTuiReturn(
params: {
delayMs?: number;
@@ -868,7 +899,6 @@ export async function runTui(opts: RunTuiOptions): Promise<TuiResult> {
);
};
const busyStates = new Set(["sending", "waiting", "streaming", "running"]);
let statusText: Text | null = null;
let statusLoader: Loader | null = null;
@@ -940,7 +970,7 @@ export async function runTui(opts: RunTuiOptions): Promise<TuiResult> {
return;
}
statusTimer = setInterval(() => {
if (!busyStates.has(activityStatus)) {
if (!isTuiBusyActivityStatus(activityStatus)) {
return;
}
updateBusyStatusMessage();
@@ -986,7 +1016,7 @@ export async function runTui(opts: RunTuiOptions): Promise<TuiResult> {
};
const renderStatus = () => {
const isBusy = busyStates.has(activityStatus);
const isBusy = isTuiBusyActivityStatus(activityStatus);
if (isBusy) {
if (!statusStartedAt || lastActivityStatus !== activityStatus) {
statusStartedAt = Date.now();
@@ -1218,10 +1248,17 @@ export async function runTui(opts: RunTuiOptions): Promise<TuiResult> {
exitReason: result?.exitReason ?? "exit",
...(result?.crestodianMessage ? { crestodianMessage: result.crestodianMessage } : {}),
};
const hardExitTimer = setTimeout(forceExit, TUI_SHUTDOWN_HARD_EXIT_MS);
const hardExitTimer = setTimeout(
forceExit,
resolveTuiShutdownHardExitMs({ localMode: isLocalMode }),
);
hardExitTimer.unref?.();
client.stop();
void drainAndStopTuiSafely(tui)
void Promise.resolve()
.then(() => client.stop())
.then(() => drainAndStopTuiSafely(tui))
.finally(() => {
clearTimeout(hardExitTimer);
})
.catch((err) => {
if (!isTuiTerminalLossError(err)) {
try {
@@ -1232,7 +1269,6 @@ export async function runTui(opts: RunTuiOptions): Promise<TuiResult> {
}
})
.finally(() => {
clearTimeout(hardExitTimer);
deferredFinish.requestFinish();
});
};
@@ -1275,7 +1311,13 @@ export async function runTui(opts: RunTuiOptions): Promise<TuiResult> {
});
updateAutocompleteProvider();
const canSubmitChatMessage = () =>
!state.activeChatRunId && !state.pendingChatRunId && !state.pendingOptimisticUserMessage;
canSubmitTuiChatMessage({
local: opts.local,
activityStatus: state.activityStatus,
activeChatRunId: state.activeChatRunId,
pendingChatRunId: state.pendingChatRunId,
pendingOptimisticUserMessage: state.pendingOptimisticUserMessage,
});
const notifyBlockedChatSubmit = () => {
chatLog.addSystem("agent is busy — press Esc to abort before sending a new message");
tui.requestRender();
@@ -1455,8 +1497,12 @@ export async function runTui(opts: RunTuiOptions): Promise<TuiResult> {
const sigtermHandler = () => {
requestExit();
};
const sighupHandler = () => {
requestExit();
};
process.on("SIGINT", sigintHandler);
process.on("SIGTERM", sigtermHandler);
process.on("SIGHUP", sighupHandler);
let cleanupTerminalLossHandler: (() => void) | null = installTuiTerminalLossExitHandler(() =>
requestExit(),
);
@@ -1471,6 +1517,7 @@ export async function runTui(opts: RunTuiOptions): Promise<TuiResult> {
cleanupTerminalLossHandler = null;
process.removeListener("SIGINT", sigintHandler);
process.removeListener("SIGTERM", sigtermHandler);
process.removeListener("SIGHUP", sighupHandler);
process.removeListener("exit", finish);
deferredFinish.clearFinish();
resolve();