diff --git a/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts b/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts index be2829e5d76..addd583e7ef 100644 --- a/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts +++ b/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts @@ -153,6 +153,62 @@ describe("buildContextEngineMaintenanceRuntimeContext", () => { }); expect(rewriteTranscriptEntriesInSessionFileMock).not.toHaveBeenCalled(); }); + + it("defers file rewrites onto the session lane when requested", async () => { + vi.useFakeTimers(); + try { + resetCommandQueueStateForTest(); + const sessionKey = "agent:main:session-rewrite-handoff"; + const sessionLane = resolveSessionLane(sessionKey); + const events: string[] = []; + let releaseForeground!: () => void; + const foregroundTurn = enqueueCommandInLane(sessionLane, async () => { + events.push("foreground-start"); + await new Promise((resolve) => { + releaseForeground = resolve; + }); + events.push("foreground-end"); + }); + await Promise.resolve(); + + rewriteTranscriptEntriesInSessionFileMock.mockImplementationOnce(async (_params?: unknown) => { + events.push("rewrite"); + return { + changed: true, + bytesFreed: 123, + rewrittenEntries: 2, + }; + }); + + const runtimeContext = buildContextEngineMaintenanceRuntimeContext({ + sessionId: "session-rewrite-handoff", + sessionKey, + sessionFile: "/tmp/session-rewrite-handoff.jsonl", + deferTranscriptRewriteToSessionLane: true, + }); + + const rewritePromise = runtimeContext.rewriteTranscriptEntries?.({ + replacements: [ + { entryId: "entry-1", message: { role: "user", content: "hi", timestamp: 1 } }, + ], + }); + expect(rewritePromise).toBeDefined(); + + await flushAsyncWork(); + expect(rewriteTranscriptEntriesInSessionFileMock).not.toHaveBeenCalled(); + + releaseForeground(); + await expect(rewritePromise!).resolves.toEqual({ + changed: true, + bytesFreed: 123, + rewrittenEntries: 2, + }); + expect(events).toEqual(["foreground-start", "foreground-end", "rewrite"]); + await foregroundTurn; + } finally { + vi.useRealTimers(); + } + }); }); describe("createDeferredTurnMaintenanceAbortSignal", () => { @@ -762,6 +818,105 @@ describe("runContextEngineMaintenance", () => { }); }); + it("lets a foreground turn run before a deferred maintenance transcript rewrite", async () => { + await withStateDirEnv("openclaw-turn-maintenance-", async () => { + vi.useFakeTimers(); + try { + resetCommandQueueStateForTest(); + resetTaskRegistryForTests({ persist: false }); + resetTaskFlowRegistryForTests({ persist: false }); + + const sessionKey = "agent:main:session-rewrite-priority"; + const sessionLane = resolveSessionLane(sessionKey); + const events: string[] = []; + let allowRewrite!: () => void; + const maintain = vi.fn(async (params?: unknown) => { + events.push("maintenance-start"); + await new Promise((resolve) => { + allowRewrite = resolve; + }); + events.push("maintenance-before-rewrite"); + await (params as { runtimeContext?: ContextEngineRuntimeContext }).runtimeContext + ?.rewriteTranscriptEntries?.({ + replacements: [ + { + entryId: "entry-1", + message: castAgentMessage({ + role: "assistant", + content: [{ type: "text", text: "done" }], + timestamp: 2, + }), + }, + ], + }); + events.push("maintenance-after-rewrite"); + return { + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + }; + }); + + rewriteTranscriptEntriesInSessionFileMock.mockImplementationOnce(async (_params?: unknown) => { + events.push("rewrite"); + return { + changed: true, + bytesFreed: 123, + rewrittenEntries: 2, + }; + }); + + const backgroundEngine = { + info: { + id: "test", + name: "Test Engine", + turnMaintenanceMode: "background" as const, + }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }: { messages: unknown[] }) => ({ + messages, + estimatedTokens: 0, + }), + compact: async () => ({ ok: true, compacted: false }), + maintain, + } as NonNullable[0]["contextEngine"]>; + + await runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-rewrite-priority", + sessionKey, + sessionFile: "/tmp/session-rewrite-priority.jsonl", + reason: "turn", + }); + + await waitForAssertion(() => expect(events).toContain("maintenance-start")); + + const foregroundTurn = enqueueCommandInLane(sessionLane, async () => { + events.push("foreground-start"); + events.push("foreground-end"); + }); + + allowRewrite(); + + await waitForAssertion(() => + expect(events).toEqual([ + "maintenance-start", + "foreground-start", + "foreground-end", + "maintenance-before-rewrite", + "rewrite", + "maintenance-after-rewrite", + ]), + ); + + expect(maintain).toHaveBeenCalledTimes(1); + await foregroundTurn; + } finally { + vi.useRealTimers(); + } + }); + }); + it("keeps fast deferred maintenance silent for the user", async () => { await withStateDirEnv("openclaw-turn-maintenance-", async () => { vi.useFakeTimers(); diff --git a/src/agents/pi-embedded-runner/context-engine-maintenance.ts b/src/agents/pi-embedded-runner/context-engine-maintenance.ts index e7a13f06ad8..a0d01c447ca 100644 --- a/src/agents/pi-embedded-runner/context-engine-maintenance.ts +++ b/src/agents/pi-embedded-runner/context-engine-maintenance.ts @@ -264,6 +264,7 @@ export function buildContextEngineMaintenanceRuntimeContext(params: { sessionManager?: Parameters[0]["sessionManager"]; runtimeContext?: ContextEngineRuntimeContext; allowDeferredCompactionExecution?: boolean; + deferTranscriptRewriteToSessionLane?: boolean; }): ContextEngineRuntimeContext { return { ...params.runtimeContext, @@ -275,12 +276,20 @@ export function buildContextEngineMaintenanceRuntimeContext(params: { replacements: request.replacements, }); } - return await rewriteTranscriptEntriesInSessionFile({ - sessionFile: params.sessionFile, - sessionId: params.sessionId, - sessionKey: params.sessionKey, - request, - }); + const rewriteTranscriptEntriesInFile = async () => + await rewriteTranscriptEntriesInSessionFile({ + sessionFile: params.sessionFile, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + request, + }); + const rewriteSessionKey = normalizeSessionKey(params.sessionKey ?? params.sessionId); + if (params.deferTranscriptRewriteToSessionLane && rewriteSessionKey) { + return await enqueueCommandInLane(resolveSessionLane(rewriteSessionKey), async () => + await rewriteTranscriptEntriesInFile(), + ); + } + return await rewriteTranscriptEntriesInFile(); }, }; } @@ -309,6 +318,7 @@ async function executeContextEngineMaintenance(params: { sessionManager: params.executionMode === "background" ? undefined : params.sessionManager, runtimeContext: params.runtimeContext, allowDeferredCompactionExecution: params.executionMode === "background", + deferTranscriptRewriteToSessionLane: params.executionMode === "background", }), }); if (result.changed) { @@ -355,20 +365,26 @@ async function runDeferredTurnMaintenanceWorker(params: { const startedWaitingAt = Date.now(); let lastWaitNoticeAt = 0; - while (getQueueSize(sessionLane) > 0) { - const now = Date.now(); - if (lastWaitNoticeAt === 0 || now - lastWaitNoticeAt >= TURN_MAINTENANCE_LONG_WAIT_MS) { - lastWaitNoticeAt = now; - if (now - startedWaitingAt >= TURN_MAINTENANCE_LONG_WAIT_MS) { - surfaceMaintenanceUpdate( - "Waiting for the session lane to go idle.", - surfacedUserNotice - ? "Still waiting for the session lane to go idle." - : "Deferred maintenance is waiting for the session lane to go idle.", - ); + for (;;) { + while (getQueueSize(sessionLane) > 0) { + const now = Date.now(); + if (lastWaitNoticeAt === 0 || now - lastWaitNoticeAt >= TURN_MAINTENANCE_LONG_WAIT_MS) { + lastWaitNoticeAt = now; + if (now - startedWaitingAt >= TURN_MAINTENANCE_LONG_WAIT_MS) { + surfaceMaintenanceUpdate( + "Waiting for the session lane to go idle.", + surfacedUserNotice + ? "Still waiting for the session lane to go idle." + : "Deferred maintenance is waiting for the session lane to go idle.", + ); + } } + await sleepWithAbort(TURN_MAINTENANCE_WAIT_POLL_MS, shutdownAbort.abortSignal); + } + await Promise.resolve(); + if (getQueueSize(sessionLane) === 0) { + break; } - await sleepWithAbort(TURN_MAINTENANCE_WAIT_POLL_MS, shutdownAbort.abortSignal); } const runningAt = Date.now();