From d11730e800447ffe7897a9d4a0541e374b93eee9 Mon Sep 17 00:00:00 2001 From: Eva Date: Sun, 12 Apr 2026 14:35:13 +0700 Subject: [PATCH] feat: background turn maintenance for context engines --- .../context-engine-maintenance.test.ts | 455 +++++++++++++++++- .../context-engine-maintenance.ts | 362 +++++++++++++- src/context-engine/types.ts | 12 + src/infra/backoff.test.ts | 15 + src/infra/backoff.ts | 42 +- 5 files changed, 860 insertions(+), 26 deletions(-) diff --git a/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts b/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts index a49ff63842d..3e3a98e1e9d 100644 --- a/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts +++ b/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts @@ -1,4 +1,19 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; +import { peekSystemEvents, resetSystemEventsForTest } from "../../infra/system-events.js"; +import { + enqueueCommandInLane, + resetCommandQueueStateForTest, +} from "../../process/command-queue.js"; +import { resetTaskFlowRegistryForTests } from "../../tasks/task-flow-registry.js"; +import { + getTaskById, + listTasksForOwnerKey, + resetTaskRegistryDeliveryRuntimeForTests, + resetTaskRegistryForTests, + setTaskRegistryDeliveryRuntimeForTests, +} from "../../tasks/task-registry.js"; +import { withStateDirEnv } from "../../test-helpers/state-dir-env.js"; +import { resolveSessionLane } from "./lanes.js"; const rewriteTranscriptEntriesInSessionManagerMock = vi.fn((_params?: unknown) => ({ changed: true, @@ -12,6 +27,33 @@ const rewriteTranscriptEntriesInSessionFileMock = vi.fn(async (_params?: unknown })); let buildContextEngineMaintenanceRuntimeContext: typeof import("./context-engine-maintenance.js").buildContextEngineMaintenanceRuntimeContext; let runContextEngineMaintenance: typeof import("./context-engine-maintenance.js").runContextEngineMaintenance; +const TURN_MAINTENANCE_TASK_KIND = "context_engine_turn_maintenance"; + +async function flushAsyncWork(times = 4): Promise { + for (let index = 0; index < times; index += 1) { + await Promise.resolve(); + } +} + +async function waitForAssertion( + assertion: () => void, + timeoutMs = 2_000, + stepMs = 5, +): Promise { + const startedAt = Date.now(); + for (;;) { + try { + assertion(); + return; + } catch (error) { + if (Date.now() - startedAt >= timeoutMs) { + throw error; + } + await vi.advanceTimersByTimeAsync(stepMs); + await flushAsyncWork(); + } + } +} vi.mock("./transcript-rewrite.js", () => ({ rewriteTranscriptEntriesInSessionManager: (params: unknown) => @@ -21,7 +63,6 @@ vi.mock("./transcript-rewrite.js", () => ({ })); async function loadFreshContextEngineMaintenanceModuleForTest() { - vi.resetModules(); ({ buildContextEngineMaintenanceRuntimeContext, runContextEngineMaintenance } = await import("./context-engine-maintenance.js")); } @@ -30,6 +71,8 @@ describe("buildContextEngineMaintenanceRuntimeContext", () => { beforeEach(async () => { rewriteTranscriptEntriesInSessionManagerMock.mockClear(); rewriteTranscriptEntriesInSessionFileMock.mockClear(); + resetSystemEventsForTest(); + resetTaskRegistryDeliveryRuntimeForTests(); await loadFreshContextEngineMaintenanceModuleForTest(); }); @@ -152,4 +195,414 @@ describe("runContextEngineMaintenance", () => { | undefined; expect(typeof runtimeContext?.rewriteTranscriptEntries).toBe("function"); }); + + it("defers turn maintenance to a hidden background task when enabled", async () => { + await withStateDirEnv("openclaw-turn-maintenance-", async () => { + vi.useFakeTimers(); + try { + resetCommandQueueStateForTest(); + resetTaskRegistryForTests({ persist: false }); + resetTaskFlowRegistryForTests({ persist: false }); + + const sessionKey = "agent:main:session-1"; + const sessionLane = resolveSessionLane(sessionKey); + let releaseForeground!: () => void; + const foregroundTurn = enqueueCommandInLane(sessionLane, async () => { + await new Promise((resolve) => { + releaseForeground = resolve; + }); + }); + await Promise.resolve(); + + const maintain = vi.fn(async (_params?: unknown) => ({ + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + })); + + const backgroundEngine = { + info: { + id: "test", + name: "Test Engine", + turnMaintenanceMode: "background" as const, + }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }: { messages: unknown[] }) => ({ + messages, + estimatedTokens: 0, + }), + compact: async () => ({ ok: true, compacted: false }), + maintain, + } as NonNullable[0]["contextEngine"]>; + + const result = await runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-1", + sessionKey, + sessionFile: "/tmp/session.jsonl", + reason: "turn", + runtimeContext: { workspaceDir: "/tmp/workspace" }, + }); + + expect(result).toBeUndefined(); + expect(maintain).not.toHaveBeenCalled(); + + const queuedTasks = listTasksForOwnerKey(sessionKey).filter( + (task) => task.taskKind === TURN_MAINTENANCE_TASK_KIND, + ); + expect(queuedTasks).toHaveLength(1); + expect(queuedTasks[0]).toMatchObject({ + runtime: "acp", + scopeKind: "session", + ownerKey: sessionKey, + requesterSessionKey: sessionKey, + taskKind: TURN_MAINTENANCE_TASK_KIND, + notifyPolicy: "silent", + deliveryStatus: "pending", + }); + + releaseForeground(); + await waitForAssertion(() => expect(maintain).toHaveBeenCalledTimes(1)); + expect(maintain.mock.calls[0]?.[0]).toMatchObject({ + sessionId: "session-1", + sessionKey, + sessionFile: "/tmp/session.jsonl", + runtimeContext: expect.objectContaining({ + workspaceDir: "/tmp/workspace", + allowDeferredCompactionExecution: true, + }), + }); + + const completedTask = getTaskById(queuedTasks[0].taskId); + expect(completedTask).toMatchObject({ + status: "succeeded", + progressSummary: expect.stringContaining("Deferred maintenance completed"), + }); + + await foregroundTurn; + } finally { + vi.useRealTimers(); + } + }); + }); + + it("coalesces repeated turn maintenance requests for the same session", async () => { + await withStateDirEnv("openclaw-turn-maintenance-", async () => { + vi.useFakeTimers(); + try { + resetCommandQueueStateForTest(); + resetTaskRegistryForTests({ persist: false }); + resetTaskFlowRegistryForTests({ persist: false }); + + const sessionKey = "agent:main:session-2"; + const sessionLane = resolveSessionLane(sessionKey); + let releaseForeground!: () => void; + const foregroundTurn = enqueueCommandInLane(sessionLane, async () => { + await new Promise((resolve) => { + releaseForeground = resolve; + }); + }); + await Promise.resolve(); + + const maintain = vi.fn(async () => ({ + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + })); + + const backgroundEngine = { + info: { + id: "test", + name: "Test Engine", + turnMaintenanceMode: "background" as const, + }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }: { messages: unknown[] }) => ({ + messages, + estimatedTokens: 0, + }), + compact: async () => ({ ok: true, compacted: false }), + maintain, + } as NonNullable[0]["contextEngine"]>; + + await Promise.all([ + runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-2", + sessionKey, + sessionFile: "/tmp/session-2.jsonl", + reason: "turn", + }), + runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-2", + sessionKey, + sessionFile: "/tmp/session-2.jsonl", + reason: "turn", + }), + ]); + + const queuedTasks = listTasksForOwnerKey(sessionKey).filter( + (task) => task.taskKind === TURN_MAINTENANCE_TASK_KIND, + ); + expect(queuedTasks).toHaveLength(1); + + releaseForeground(); + await waitForAssertion(() => expect(maintain).toHaveBeenCalledTimes(1)); + expect(getTaskById(queuedTasks[0].taskId)).toMatchObject({ + status: "succeeded", + }); + + await foregroundTurn; + } finally { + vi.useRealTimers(); + } + }); + }); + + it("lets foreground turns win while deferred maintenance is waiting", async () => { + await withStateDirEnv("openclaw-turn-maintenance-", async () => { + vi.useFakeTimers(); + try { + resetCommandQueueStateForTest(); + resetTaskRegistryForTests({ persist: false }); + resetTaskFlowRegistryForTests({ persist: false }); + + const sessionKey = "agent:main:session-3"; + const sessionLane = resolveSessionLane(sessionKey); + const events: string[] = []; + let releaseFirstForeground!: () => void; + const firstForeground = enqueueCommandInLane(sessionLane, async () => { + events.push("foreground-1-start"); + await new Promise((resolve) => { + releaseFirstForeground = resolve; + }); + events.push("foreground-1-end"); + }); + await Promise.resolve(); + + const maintain = vi.fn(async () => { + events.push("maintenance-start"); + return { + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + }; + }); + + const backgroundEngine = { + info: { + id: "test", + name: "Test Engine", + turnMaintenanceMode: "background" as const, + }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }: { messages: unknown[] }) => ({ + messages, + estimatedTokens: 0, + }), + compact: async () => ({ ok: true, compacted: false }), + maintain, + } as NonNullable[0]["contextEngine"]>; + + await runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-3", + sessionKey, + sessionFile: "/tmp/session-3.jsonl", + reason: "turn", + }); + + const secondForeground = enqueueCommandInLane(sessionLane, async () => { + events.push("foreground-2-start"); + events.push("foreground-2-end"); + }); + + releaseFirstForeground(); + await waitForAssertion(() => + expect(events).toEqual([ + "foreground-1-start", + "foreground-1-end", + "foreground-2-start", + "foreground-2-end", + "maintenance-start", + ]), + ); + expect(maintain).toHaveBeenCalledTimes(1); + + await Promise.all([firstForeground, secondForeground]); + } finally { + vi.useRealTimers(); + } + }); + }); + + it("keeps fast deferred maintenance silent for the user", async () => { + await withStateDirEnv("openclaw-turn-maintenance-", async () => { + vi.useFakeTimers(); + try { + resetCommandQueueStateForTest(); + resetTaskRegistryForTests({ persist: false }); + resetTaskFlowRegistryForTests({ persist: false }); + resetSystemEventsForTest(); + const sendMessageMock = vi.fn(); + setTaskRegistryDeliveryRuntimeForTests({ + sendMessage: sendMessageMock, + }); + + const sessionKey = "agent:main:session-fast"; + const maintain = vi.fn(async () => ({ + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + })); + const backgroundEngine = { + info: { + id: "test", + name: "Test Engine", + turnMaintenanceMode: "background" as const, + }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }: { messages: unknown[] }) => ({ + messages, + estimatedTokens: 0, + }), + compact: async () => ({ ok: true, compacted: false }), + maintain, + } as NonNullable[0]["contextEngine"]>; + + await runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-fast", + sessionKey, + sessionFile: "/tmp/session-fast.jsonl", + reason: "turn", + }); + await waitForAssertion(() => expect(maintain).toHaveBeenCalledTimes(1)); + expect(sendMessageMock).not.toHaveBeenCalled(); + expect(peekSystemEvents(sessionKey)).toEqual([]); + } finally { + vi.useRealTimers(); + } + }); + }); + + it("surfaces long-running deferred maintenance and completion via task updates", async () => { + await withStateDirEnv("openclaw-turn-maintenance-", async () => { + vi.useFakeTimers(); + try { + resetCommandQueueStateForTest(); + resetTaskRegistryForTests({ persist: false }); + resetTaskFlowRegistryForTests({ persist: false }); + resetSystemEventsForTest(); + + const sessionKey = "agent:main:session-long"; + const sessionLane = resolveSessionLane(sessionKey); + let releaseForeground!: () => void; + const foregroundTurn = enqueueCommandInLane(sessionLane, async () => { + await new Promise((resolve) => { + releaseForeground = resolve; + }); + }); + await Promise.resolve(); + + const maintain = vi.fn(async () => ({ + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + })); + const backgroundEngine = { + info: { + id: "test", + name: "Test Engine", + turnMaintenanceMode: "background" as const, + }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }: { messages: unknown[] }) => ({ + messages, + estimatedTokens: 0, + }), + compact: async () => ({ ok: true, compacted: false }), + maintain, + } as NonNullable[0]["contextEngine"]>; + + await runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-long", + sessionKey, + sessionFile: "/tmp/session-long.jsonl", + reason: "turn", + }); + + await vi.advanceTimersByTimeAsync(11_000); + await waitForAssertion(() => + expect(peekSystemEvents(sessionKey)).toEqual( + expect.arrayContaining([ + expect.stringContaining("Background task update: Context engine turn maintenance."), + ]), + ), + ); + + releaseForeground(); + await waitForAssertion(() => + expect(peekSystemEvents(sessionKey)).toEqual( + expect.arrayContaining([ + expect.stringContaining("Background task done: Context engine turn maintenance"), + ]), + ), + ); + + await foregroundTurn; + } finally { + vi.useRealTimers(); + } + }); + }); + + it("surfaces deferred maintenance failures even when they fail quickly", async () => { + await withStateDirEnv("openclaw-turn-maintenance-", async () => { + vi.useFakeTimers(); + try { + resetCommandQueueStateForTest(); + resetTaskRegistryForTests({ persist: false }); + resetTaskFlowRegistryForTests({ persist: false }); + resetSystemEventsForTest(); + + const sessionKey = "agent:main:session-fail"; + const backgroundEngine = { + info: { + id: "test", + name: "Test Engine", + turnMaintenanceMode: "background" as const, + }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }: { messages: unknown[] }) => ({ + messages, + estimatedTokens: 0, + }), + compact: async () => ({ ok: true, compacted: false }), + maintain: vi.fn(async () => { + throw new Error("maintenance exploded"); + }), + } as NonNullable[0]["contextEngine"]>; + + await runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-fail", + sessionKey, + sessionFile: "/tmp/session-fail.jsonl", + reason: "turn", + }); + await waitForAssertion(() => + expect(peekSystemEvents(sessionKey)).toEqual( + expect.arrayContaining([ + expect.stringContaining("Background task failed: Context engine turn maintenance"), + ]), + ), + ); + } finally { + vi.useRealTimers(); + } + }); + }); }); diff --git a/src/agents/pi-embedded-runner/context-engine-maintenance.ts b/src/agents/pi-embedded-runner/context-engine-maintenance.ts index 88e417f5757..3fec2acd4c2 100644 --- a/src/agents/pi-embedded-runner/context-engine-maintenance.ts +++ b/src/agents/pi-embedded-runner/context-engine-maintenance.ts @@ -1,14 +1,107 @@ +import { randomUUID } from "node:crypto"; import type { ContextEngine, ContextEngineMaintenanceResult, ContextEngineRuntimeContext, } from "../../context-engine/types.js"; +import { sleepWithAbort } from "../../infra/backoff.js"; +import { formatErrorMessage } from "../../infra/errors.js"; +import { enqueueCommandInLane, getQueueSize } from "../../process/command-queue.js"; +import { normalizeOptionalString } from "../../shared/string-coerce.js"; +import { + completeTaskRunByRunId, + createQueuedTaskRun, + failTaskRunByRunId, + recordTaskRunProgressByRunId, + startTaskRunByRunId, +} from "../../tasks/task-executor.js"; +import { + findTaskByRunId, + setTaskRunDeliveryStatusByRunId, + updateTaskNotifyPolicyById, +} from "../../tasks/task-registry.js"; +import { findActiveSessionTask } from "../session-async-task-status.js"; +import { resolveSessionLane } from "./lanes.js"; import { log } from "./logger.js"; import { rewriteTranscriptEntriesInSessionFile, rewriteTranscriptEntriesInSessionManager, } from "./transcript-rewrite.js"; +const TURN_MAINTENANCE_TASK_KIND = "context_engine_turn_maintenance"; +const TURN_MAINTENANCE_TASK_LABEL = "Context engine turn maintenance"; +const TURN_MAINTENANCE_TASK_TASK = "Deferred context-engine maintenance after turn."; +const TURN_MAINTENANCE_LANE_PREFIX = "context-engine-turn-maintenance:"; +const TURN_MAINTENANCE_WAIT_POLL_MS = 100; +const TURN_MAINTENANCE_LONG_WAIT_MS = 10_000; +const activeDeferredTurnMaintenanceRuns = new Map>(); + +function normalizeSessionKey(sessionKey?: string): string | undefined { + return normalizeOptionalString(sessionKey) || undefined; +} + +function resolveDeferredTurnMaintenanceLane(sessionKey: string): string { + return `${TURN_MAINTENANCE_LANE_PREFIX}${sessionKey}`; +} + +function buildTurnMaintenanceTaskDescriptor(params: { sessionKey: string }) { + const runId = `turn-maint:${params.sessionKey}:${Date.now().toString(36)}:${randomUUID().slice( + 0, + 8, + )}`; + return createQueuedTaskRun({ + runtime: "acp", + taskKind: TURN_MAINTENANCE_TASK_KIND, + sourceId: TURN_MAINTENANCE_TASK_KIND, + requesterSessionKey: params.sessionKey, + ownerKey: params.sessionKey, + scopeKind: "session", + runId, + label: TURN_MAINTENANCE_TASK_LABEL, + task: TURN_MAINTENANCE_TASK_TASK, + notifyPolicy: "silent", + deliveryStatus: "pending", + preferMetadata: true, + }); +} + +function promoteTurnMaintenanceTaskVisibility(params: { + sessionKey: string; + runId: string; + notifyPolicy: "done_only" | "state_changes"; +}) { + const task = findTaskByRunId(params.runId); + if (!task) { + return createQueuedTaskRun({ + runtime: "acp", + taskKind: TURN_MAINTENANCE_TASK_KIND, + sourceId: TURN_MAINTENANCE_TASK_KIND, + requesterSessionKey: params.sessionKey, + ownerKey: params.sessionKey, + scopeKind: "session", + runId: params.runId, + label: TURN_MAINTENANCE_TASK_LABEL, + task: TURN_MAINTENANCE_TASK_TASK, + notifyPolicy: params.notifyPolicy, + deliveryStatus: "pending", + preferMetadata: true, + }); + } + setTaskRunDeliveryStatusByRunId({ + runId: params.runId, + runtime: "acp", + sessionKey: params.sessionKey, + deliveryStatus: "pending", + }); + if (task.notifyPolicy !== params.notifyPolicy) { + updateTaskNotifyPolicyById({ + taskId: task.taskId, + notifyPolicy: params.notifyPolicy, + }); + } + return findTaskByRunId(params.runId) ?? task; +} + /** * Attach runtime-owned transcript rewrite helpers to an existing * context-engine runtime context payload. @@ -19,9 +112,11 @@ export function buildContextEngineMaintenanceRuntimeContext(params: { sessionFile: string; sessionManager?: Parameters[0]["sessionManager"]; runtimeContext?: ContextEngineRuntimeContext; + allowDeferredCompactionExecution?: boolean; }): ContextEngineRuntimeContext { return { ...params.runtimeContext, + ...(params.allowDeferredCompactionExecution ? { allowDeferredCompactionExecution: true } : {}), rewriteTranscriptEntries: async (request) => { if (params.sessionManager) { return rewriteTranscriptEntriesInSessionManager({ @@ -39,6 +134,228 @@ export function buildContextEngineMaintenanceRuntimeContext(params: { }; } +async function executeContextEngineMaintenance(params: { + contextEngine: ContextEngine; + sessionId: string; + sessionKey?: string; + sessionFile: string; + reason: "bootstrap" | "compaction" | "turn"; + sessionManager?: Parameters[0]["sessionManager"]; + runtimeContext?: ContextEngineRuntimeContext; + executionMode: "foreground" | "background"; +}): Promise { + if (typeof params.contextEngine.maintain !== "function") { + return undefined; + } + const result = await params.contextEngine.maintain({ + sessionId: params.sessionId, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + runtimeContext: buildContextEngineMaintenanceRuntimeContext({ + sessionId: params.sessionId, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + sessionManager: params.sessionManager, + runtimeContext: params.runtimeContext, + allowDeferredCompactionExecution: params.executionMode === "background", + }), + }); + if (result.changed) { + log.info( + `[context-engine] maintenance(${params.reason}) changed transcript ` + + `rewrittenEntries=${result.rewrittenEntries} bytesFreed=${result.bytesFreed} ` + + `sessionKey=${params.sessionKey ?? params.sessionId ?? "unknown"}`, + ); + } + return result; +} + +async function runDeferredTurnMaintenanceWorker(params: { + contextEngine: ContextEngine; + sessionId: string; + sessionKey: string; + sessionFile: string; + sessionManager?: Parameters[0]["sessionManager"]; + runtimeContext?: ContextEngineRuntimeContext; + runId: string; +}): Promise { + let surfacedUserNotice = false; + let longRunningTimer: ReturnType | null = null; + const surfaceMaintenanceUpdate = (summary: string, eventSummary: string) => { + promoteTurnMaintenanceTaskVisibility({ + sessionKey: params.sessionKey, + runId: params.runId, + notifyPolicy: "state_changes", + }); + surfacedUserNotice = true; + recordTaskRunProgressByRunId({ + runId: params.runId, + runtime: "acp", + sessionKey: params.sessionKey, + lastEventAt: Date.now(), + progressSummary: summary, + eventSummary, + }); + }; + + try { + const sessionLane = resolveSessionLane(params.sessionKey); + const startedWaitingAt = Date.now(); + let lastWaitNoticeAt = 0; + + while (getQueueSize(sessionLane) > 0) { + const now = Date.now(); + if ( + lastWaitNoticeAt === 0 || + now - lastWaitNoticeAt >= TURN_MAINTENANCE_LONG_WAIT_MS || + now - startedWaitingAt >= TURN_MAINTENANCE_LONG_WAIT_MS + ) { + lastWaitNoticeAt = now; + if (now - startedWaitingAt >= TURN_MAINTENANCE_LONG_WAIT_MS) { + surfaceMaintenanceUpdate( + "Waiting for the session lane to go idle.", + surfacedUserNotice + ? "Still waiting for the session lane to go idle." + : "Deferred maintenance is waiting for the session lane to go idle.", + ); + } + } + await sleepWithAbort(TURN_MAINTENANCE_WAIT_POLL_MS); + } + + const runningAt = Date.now(); + startTaskRunByRunId({ + runId: params.runId, + runtime: "acp", + sessionKey: params.sessionKey, + startedAt: runningAt, + lastEventAt: runningAt, + progressSummary: "Running deferred maintenance.", + eventSummary: "Starting deferred maintenance.", + }); + longRunningTimer = setTimeout(() => { + try { + surfaceMaintenanceUpdate( + "Deferred maintenance is still running.", + "Deferred maintenance is still running.", + ); + } catch (error) { + log.warn(`failed to surface deferred maintenance progress: ${String(error)}`); + } + }, TURN_MAINTENANCE_LONG_WAIT_MS); + + const result = await executeContextEngineMaintenance({ + contextEngine: params.contextEngine, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + reason: "turn", + sessionManager: params.sessionManager, + runtimeContext: params.runtimeContext, + executionMode: "background", + }); + if (longRunningTimer) { + clearTimeout(longRunningTimer); + longRunningTimer = null; + } + + const endedAt = Date.now(); + completeTaskRunByRunId({ + runId: params.runId, + runtime: "acp", + sessionKey: params.sessionKey, + endedAt, + lastEventAt: endedAt, + progressSummary: result?.changed + ? "Deferred maintenance completed with transcript changes." + : "Deferred maintenance completed.", + terminalSummary: result?.changed + ? `Rewrote ${result.rewrittenEntries} transcript entr${result.rewrittenEntries === 1 ? "y" : "ies"} and freed ${result.bytesFreed} bytes.` + : "No transcript changes were needed.", + }); + } catch (err) { + if (longRunningTimer) { + clearTimeout(longRunningTimer); + longRunningTimer = null; + } + const endedAt = Date.now(); + const reason = formatErrorMessage(err); + if (!surfacedUserNotice) { + promoteTurnMaintenanceTaskVisibility({ + sessionKey: params.sessionKey, + runId: params.runId, + notifyPolicy: "done_only", + }); + } + failTaskRunByRunId({ + runId: params.runId, + runtime: "acp", + sessionKey: params.sessionKey, + endedAt, + lastEventAt: endedAt, + error: reason, + progressSummary: "Deferred maintenance failed.", + terminalSummary: reason, + }); + log.warn(`deferred context engine maintenance failed: ${reason}`); + } +} + +function scheduleDeferredTurnMaintenance(params: { + contextEngine: ContextEngine; + sessionId: string; + sessionKey: string; + sessionFile: string; + sessionManager?: Parameters[0]["sessionManager"]; + runtimeContext?: ContextEngineRuntimeContext; +}): void { + const sessionKey = normalizeSessionKey(params.sessionKey); + if (!sessionKey) { + return; + } + if (activeDeferredTurnMaintenanceRuns.has(sessionKey)) { + return; + } + + const existingTask = findActiveSessionTask({ + sessionKey, + runtime: "acp", + taskKind: TURN_MAINTENANCE_TASK_KIND, + }); + const task = + existingTask ?? + buildTurnMaintenanceTaskDescriptor({ + sessionKey, + }); + log.info( + `[context-engine] deferred turn maintenance ${existingTask ? "resuming" : "queued"} ` + + `taskId=${task.taskId} sessionKey=${sessionKey} lane=${resolveDeferredTurnMaintenanceLane(sessionKey)}`, + ); + + const runPromise = enqueueCommandInLane( + resolveDeferredTurnMaintenanceLane(sessionKey), + async () => + runDeferredTurnMaintenanceWorker({ + contextEngine: params.contextEngine, + sessionId: params.sessionId, + sessionKey, + sessionFile: params.sessionFile, + sessionManager: params.sessionManager, + runtimeContext: params.runtimeContext, + runId: task.runId ?? task.taskId, + }), + ); + const trackedPromise = runPromise + .catch((err) => { + log.warn(`failed to schedule deferred context engine maintenance: ${String(err)}`); + }) + .finally(() => { + activeDeferredTurnMaintenanceRuns.delete(sessionKey); + }); + activeDeferredTurnMaintenanceRuns.set(sessionKey, trackedPromise); + void trackedPromise; +} + /** * Run optional context-engine transcript maintenance and normalize the result. */ @@ -50,32 +367,45 @@ export async function runContextEngineMaintenance(params: { reason: "bootstrap" | "compaction" | "turn"; sessionManager?: Parameters[0]["sessionManager"]; runtimeContext?: ContextEngineRuntimeContext; + executionMode?: "foreground" | "background"; }): Promise { if (typeof params.contextEngine?.maintain !== "function") { return undefined; } - try { - const result = await params.contextEngine.maintain({ - sessionId: params.sessionId, - sessionKey: params.sessionKey, - sessionFile: params.sessionFile, - runtimeContext: buildContextEngineMaintenanceRuntimeContext({ + const executionMode = params.executionMode ?? "foreground"; + const shouldDefer = + params.reason === "turn" && + executionMode !== "background" && + params.contextEngine.info.turnMaintenanceMode === "background"; + + if (shouldDefer) { + try { + scheduleDeferredTurnMaintenance({ + contextEngine: params.contextEngine, sessionId: params.sessionId, - sessionKey: params.sessionKey, + sessionKey: params.sessionKey ?? params.sessionId, sessionFile: params.sessionFile, sessionManager: params.sessionManager, runtimeContext: params.runtimeContext, - }), - }); - if (result.changed) { - log.info( - `[context-engine] maintenance(${params.reason}) changed transcript ` + - `rewrittenEntries=${result.rewrittenEntries} bytesFreed=${result.bytesFreed} ` + - `sessionKey=${params.sessionKey ?? params.sessionId ?? "unknown"}`, - ); + }); + } catch (err) { + log.warn(`failed to schedule deferred context engine maintenance: ${String(err)}`); } - return result; + return undefined; + } + + try { + return await executeContextEngineMaintenance({ + contextEngine: params.contextEngine, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + reason: params.reason, + sessionManager: params.sessionManager, + runtimeContext: params.runtimeContext, + executionMode, + }); } catch (err) { log.warn(`context engine maintain failed (${params.reason}): ${String(err)}`); return undefined; diff --git a/src/context-engine/types.ts b/src/context-engine/types.ts index e1120db8ade..594703bc864 100644 --- a/src/context-engine/types.ts +++ b/src/context-engine/types.ts @@ -50,6 +50,13 @@ export type ContextEngineInfo = { version?: string; /** True when the engine manages its own compaction lifecycle. */ ownsCompaction?: boolean; + /** + * Controls how turn-triggered maintenance should be executed. + * + * Engines remain compatible by default unless the host explicitly opts into + * background turn maintenance. + */ + turnMaintenanceMode?: "foreground" | "background"; }; export type SubagentSpawnPreparation = { @@ -128,6 +135,11 @@ export type ContextEnginePromptCacheInfo = { }; export type ContextEngineRuntimeContext = Record & { + /** + * True when the host has explicitly opted this maintenance run into + * consuming deferred compaction debt. + */ + allowDeferredCompactionExecution?: boolean; /** Optional prompt-cache telemetry for cache-aware engines. */ promptCache?: ContextEnginePromptCacheInfo; /** diff --git a/src/infra/backoff.test.ts b/src/infra/backoff.test.ts index 34108b815c8..2022cf0b22d 100644 --- a/src/infra/backoff.test.ts +++ b/src/infra/backoff.test.ts @@ -43,4 +43,19 @@ describe("backoff helpers", () => { cause: expect.anything(), }); }); + + it("advances with fake timers", async () => { + vi.useFakeTimers(); + try { + const sleeper = sleepWithAbort(50); + await vi.advanceTimersByTimeAsync(49); + await expect( + Promise.race([sleeper.then(() => "done"), Promise.resolve("pending")]), + ).resolves.toBe("pending"); + await vi.advanceTimersByTimeAsync(1); + await expect(sleeper).resolves.toBeUndefined(); + } finally { + vi.useRealTimers(); + } + }); }); diff --git a/src/infra/backoff.ts b/src/infra/backoff.ts index 153eca16203..a9463414bb8 100644 --- a/src/infra/backoff.ts +++ b/src/infra/backoff.ts @@ -1,5 +1,3 @@ -import { setTimeout as delay } from "node:timers/promises"; - export type BackoffPolicy = { initialMs: number; maxMs: number; @@ -17,12 +15,38 @@ export async function sleepWithAbort(ms: number, abortSignal?: AbortSignal) { if (ms <= 0) { return; } - try { - await delay(ms, undefined, { signal: abortSignal }); - } catch (err) { - if (abortSignal?.aborted) { - throw new Error("aborted", { cause: err }); - } - throw err; + if (abortSignal?.aborted) { + throw new Error("aborted", { cause: abortSignal.reason ?? new Error("aborted") }); } + + await new Promise((resolve, reject) => { + let settled = false; + let timer: ReturnType | null = setTimeout(() => { + settled = true; + if (abortSignal) { + abortSignal.removeEventListener("abort", onAbort); + } + timer = null; + resolve(); + }, ms); + + const onAbort = () => { + if (settled) { + return; + } + settled = true; + if (timer) { + clearTimeout(timer); + timer = null; + } + if (abortSignal) { + abortSignal.removeEventListener("abort", onAbort); + } + reject(new Error("aborted", { cause: abortSignal?.reason ?? new Error("aborted") })); + }; + + if (abortSignal) { + abortSignal.addEventListener("abort", onAbort, { once: true }); + } + }); }