diff --git a/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts b/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts index cd8535529a8..6b91419dc28 100644 --- a/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts +++ b/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts @@ -394,7 +394,7 @@ describe("runContextEngineMaintenance", () => { }); }); - it("coalesces repeated turn maintenance requests for the same session", async () => { + it("coalesces repeated requests into one active run plus one follow-up run for the same session", async () => { await withStateDirEnv("openclaw-turn-maintenance-", async () => { vi.useFakeTimers(); try { @@ -456,10 +456,12 @@ describe("runContextEngineMaintenance", () => { expect(queuedTasks).toHaveLength(1); releaseForeground(); - await waitForAssertion(() => expect(maintain).toHaveBeenCalledTimes(1)); - expect(getTaskById(queuedTasks[0].taskId)).toMatchObject({ - status: "succeeded", - }); + await waitForAssertion(() => expect(maintain).toHaveBeenCalledTimes(2)); + const completedTasks = listTasksForOwnerKey(sessionKey).filter( + (task) => task.taskKind === TURN_MAINTENANCE_TASK_KIND, + ); + expect(completedTasks).toHaveLength(2); + expect(completedTasks.every((task) => task.status === "succeeded")).toBe(true); await foregroundTurn; } finally { @@ -468,6 +470,78 @@ describe("runContextEngineMaintenance", () => { }); }); + it("queues a follow-up maintenance run when a new turn finishes during an active deferred run", async () => { + await withStateDirEnv("openclaw-turn-maintenance-rerun-", async () => { + vi.useFakeTimers(); + try { + resetCommandQueueStateForTest(); + resetTaskRegistryForTests({ persist: false }); + resetTaskFlowRegistryForTests({ persist: false }); + + const sessionKey = "agent:main:session-rerun"; + let releaseFirstMaintenance!: () => void; + let maintenanceCalls = 0; + const maintain = vi.fn(async () => { + maintenanceCalls += 1; + if (maintenanceCalls === 1) { + await new Promise((resolve) => { + releaseFirstMaintenance = resolve; + }); + } + return { + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + }; + }); + + const backgroundEngine = { + info: { + id: "test", + name: "Test Engine", + turnMaintenanceMode: "background" as const, + }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }: { messages: unknown[] }) => ({ + messages, + estimatedTokens: 0, + }), + compact: async () => ({ ok: true, compacted: false }), + maintain, + } as NonNullable[0]["contextEngine"]>; + + await runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-rerun", + sessionKey, + sessionFile: "/tmp/session-rerun.jsonl", + reason: "turn", + }); + + await waitForAssertion(() => expect(maintain).toHaveBeenCalledTimes(1)); + + await runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-rerun", + sessionKey, + sessionFile: "/tmp/session-rerun.jsonl", + reason: "turn", + }); + + releaseFirstMaintenance(); + await waitForAssertion(() => expect(maintain).toHaveBeenCalledTimes(2)); + + const tasks = listTasksForOwnerKey(sessionKey).filter( + (task) => task.taskKind === TURN_MAINTENANCE_TASK_KIND, + ); + expect(tasks).toHaveLength(2); + expect(tasks.every((task) => task.status === "succeeded")).toBe(true); + } finally { + vi.useRealTimers(); + } + }); + }); + it("replaces legacy active maintenance tasks that are missing a runId", async () => { await withStateDirEnv("openclaw-turn-maintenance-", async () => { vi.useFakeTimers(); diff --git a/src/agents/pi-embedded-runner/context-engine-maintenance.ts b/src/agents/pi-embedded-runner/context-engine-maintenance.ts index b1230ae3c76..2709bc64117 100644 --- a/src/agents/pi-embedded-runner/context-engine-maintenance.ts +++ b/src/agents/pi-embedded-runner/context-engine-maintenance.ts @@ -35,7 +35,22 @@ const TURN_MAINTENANCE_TASK_TASK = "Deferred context-engine maintenance after tu const TURN_MAINTENANCE_LANE_PREFIX = "context-engine-turn-maintenance:"; const TURN_MAINTENANCE_WAIT_POLL_MS = 100; const TURN_MAINTENANCE_LONG_WAIT_MS = 10_000; -const activeDeferredTurnMaintenanceRuns = new Map>(); +type DeferredTurnMaintenanceScheduleParams = { + contextEngine: ContextEngine; + sessionId: string; + sessionKey: string; + sessionFile: string; + sessionManager?: Parameters[0]["sessionManager"]; + runtimeContext?: ContextEngineRuntimeContext; +}; + +type DeferredTurnMaintenanceRunState = { + promise: Promise; + rerunRequested: boolean; + latestParams: DeferredTurnMaintenanceScheduleParams; +}; + +const activeDeferredTurnMaintenanceRuns = new Map(); function normalizeSessionKey(sessionKey?: string): string | undefined { return normalizeOptionalString(sessionKey) || undefined; @@ -357,19 +372,15 @@ async function runDeferredTurnMaintenanceWorker(params: { } } -function scheduleDeferredTurnMaintenance(params: { - contextEngine: ContextEngine; - sessionId: string; - sessionKey: string; - sessionFile: string; - sessionManager?: Parameters[0]["sessionManager"]; - runtimeContext?: ContextEngineRuntimeContext; -}): void { +function scheduleDeferredTurnMaintenance(params: DeferredTurnMaintenanceScheduleParams): void { const sessionKey = normalizeSessionKey(params.sessionKey); if (!sessionKey) { return; } - if (activeDeferredTurnMaintenanceRuns.has(sessionKey)) { + const activeRun = activeDeferredTurnMaintenanceRuns.get(sessionKey); + if (activeRun) { + activeRun.rerunRequested = true; + activeRun.latestParams = { ...params, sessionKey }; return; } @@ -414,14 +425,28 @@ function scheduleDeferredTurnMaintenance(params: { runId: task.runId!, }), ); + let state!: DeferredTurnMaintenanceRunState; const trackedPromise = runPromise .catch((err) => { log.warn(`failed to schedule deferred context engine maintenance: ${String(err)}`); }) .finally(() => { + const current = activeDeferredTurnMaintenanceRuns.get(sessionKey); + if (current !== state) { + return; + } + const rerunParams = current.rerunRequested ? current.latestParams : undefined; activeDeferredTurnMaintenanceRuns.delete(sessionKey); + if (rerunParams) { + scheduleDeferredTurnMaintenance(rerunParams); + } }); - activeDeferredTurnMaintenanceRuns.set(sessionKey, trackedPromise); + state = { + promise: trackedPromise, + rerunRequested: false, + latestParams: { ...params, sessionKey }, + }; + activeDeferredTurnMaintenanceRuns.set(sessionKey, state); void trackedPromise; }