From e32600aa31d49f50432a40c031640465052dec5e Mon Sep 17 00:00:00 2001 From: Eva Date: Sun, 12 Apr 2026 14:54:21 +0700 Subject: [PATCH] Fix deferred maintenance review follow-ups --- .../context-engine-maintenance.test.ts | 192 ++++++++++++++++++ .../context-engine-maintenance.ts | 28 ++- src/infra/backoff.test.ts | 20 ++ src/infra/backoff.ts | 33 +-- 4 files changed, 251 insertions(+), 22 deletions(-) diff --git a/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts b/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts index 3e3a98e1e9d..48189eec0af 100644 --- a/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts +++ b/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts @@ -1,9 +1,11 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { ContextEngineRuntimeContext } from "../../context-engine/types.js"; import { peekSystemEvents, resetSystemEventsForTest } from "../../infra/system-events.js"; import { enqueueCommandInLane, resetCommandQueueStateForTest, } from "../../process/command-queue.js"; +import { createQueuedTaskRun } from "../../tasks/task-executor.js"; import { resetTaskFlowRegistryForTests } from "../../tasks/task-flow-registry.js"; import { getTaskById, @@ -27,6 +29,8 @@ const rewriteTranscriptEntriesInSessionFileMock = vi.fn(async (_params?: unknown })); let buildContextEngineMaintenanceRuntimeContext: typeof import("./context-engine-maintenance.js").buildContextEngineMaintenanceRuntimeContext; let runContextEngineMaintenance: typeof import("./context-engine-maintenance.js").runContextEngineMaintenance; +// Keep this literal aligned with the production module; tests use dynamic +// import reloading, so they cannot safely import the constant directly. const TURN_MAINTENANCE_TASK_KIND = "context_engine_turn_maintenance"; async function flushAsyncWork(times = 4): Promise { @@ -196,6 +200,55 @@ describe("runContextEngineMaintenance", () => { expect(typeof runtimeContext?.rewriteTranscriptEntries).toBe("function"); }); + it("forces background maintenance rewrites through the session file even when a session manager exists", async () => { + const maintain = vi.fn(async (params?: unknown) => { + await (params as { runtimeContext?: ContextEngineRuntimeContext } | undefined)?.runtimeContext?.rewriteTranscriptEntries?.({ + replacements: [ + { entryId: "entry-1", message: { role: "assistant", content: "done", timestamp: 2 } }, + ], + }); + return { + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + }; + }); + const sessionManager = { appendMessage: vi.fn() } as unknown as Parameters< + typeof buildContextEngineMaintenanceRuntimeContext + >[0]["sessionManager"]; + + await runContextEngineMaintenance({ + contextEngine: { + info: { id: "test", name: "Test Engine", turnMaintenanceMode: "background" }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }) => ({ messages, estimatedTokens: 0 }), + compact: async () => ({ ok: true, compacted: false }), + maintain, + }, + sessionId: "session-background-file-rewrite", + sessionKey: "agent:main:session-background-file-rewrite", + sessionFile: "/tmp/session-background-file-rewrite.jsonl", + reason: "turn", + executionMode: "background", + sessionManager, + }); + + expect(rewriteTranscriptEntriesInSessionManagerMock).not.toHaveBeenCalled(); + expect(rewriteTranscriptEntriesInSessionFileMock).toHaveBeenCalledWith({ + sessionFile: "/tmp/session-background-file-rewrite.jsonl", + sessionId: "session-background-file-rewrite", + sessionKey: "agent:main:session-background-file-rewrite", + request: { + replacements: [ + { + entryId: "entry-1", + message: { role: "assistant", content: "done", timestamp: 2 }, + }, + ], + }, + }); + }); + it("defers turn maintenance to a hidden background task when enabled", async () => { await withStateDirEnv("openclaw-turn-maintenance-", async () => { vi.useFakeTimers(); @@ -360,6 +413,71 @@ describe("runContextEngineMaintenance", () => { }); }); + it("replaces legacy active maintenance tasks that are missing a runId", async () => { + await withStateDirEnv("openclaw-turn-maintenance-", async () => { + vi.useFakeTimers(); + try { + resetCommandQueueStateForTest(); + resetTaskRegistryForTests({ persist: false }); + resetTaskFlowRegistryForTests({ persist: false }); + + const sessionKey = "agent:main:session-legacy"; + const legacyTask = createQueuedTaskRun({ + runtime: "acp", + taskKind: TURN_MAINTENANCE_TASK_KIND, + sourceId: TURN_MAINTENANCE_TASK_KIND, + requesterSessionKey: sessionKey, + ownerKey: sessionKey, + scopeKind: "session", + label: "Context engine turn maintenance", + task: "Deferred context-engine maintenance after turn.", + notifyPolicy: "silent", + deliveryStatus: "pending", + preferMetadata: true, + }); + + const maintain = vi.fn(async () => ({ + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + })); + const backgroundEngine = { + info: { + id: "test", + name: "Test Engine", + turnMaintenanceMode: "background" as const, + }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }: { messages: unknown[] }) => ({ messages, estimatedTokens: 0 }), + compact: async () => ({ ok: true, compacted: false }), + maintain, + } as NonNullable[0]["contextEngine"]>; + + await runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-legacy", + sessionKey, + sessionFile: "/tmp/session-legacy.jsonl", + reason: "turn", + }); + + await waitForAssertion(() => expect(maintain).toHaveBeenCalledTimes(1)); + + const tasks = listTasksForOwnerKey(sessionKey).filter( + (task) => task.taskKind === TURN_MAINTENANCE_TASK_KIND, + ); + expect(tasks).toHaveLength(2); + expect(getTaskById(legacyTask.taskId)).toMatchObject({ + status: "cancelled", + notifyPolicy: "silent", + }); + expect(tasks.some((task) => task.runId?.startsWith("turn-maint:"))).toBe(true); + } finally { + vi.useRealTimers(); + } + }); + }); + it("lets foreground turns win while deferred maintenance is waiting", async () => { await withStateDirEnv("openclaw-turn-maintenance-", async () => { vi.useFakeTimers(); @@ -559,6 +677,80 @@ describe("runContextEngineMaintenance", () => { }); }); + it("throttles deferred wait notices while the session lane stays busy", async () => { + await withStateDirEnv("openclaw-turn-maintenance-", async () => { + vi.useFakeTimers(); + try { + resetCommandQueueStateForTest(); + resetTaskRegistryForTests({ persist: false }); + resetTaskFlowRegistryForTests({ persist: false }); + resetSystemEventsForTest(); + + const sessionKey = "agent:main:session-throttle"; + const sessionLane = resolveSessionLane(sessionKey); + let releaseForeground!: () => void; + const foregroundTurn = enqueueCommandInLane(sessionLane, async () => { + await new Promise((resolve) => { + releaseForeground = resolve; + }); + }); + await Promise.resolve(); + + const backgroundEngine = { + info: { + id: "test", + name: "Test Engine", + turnMaintenanceMode: "background" as const, + }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }: { messages: unknown[] }) => ({ messages, estimatedTokens: 0 }), + compact: async () => ({ ok: true, compacted: false }), + maintain: vi.fn(async () => ({ + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + })), + } as NonNullable[0]["contextEngine"]>; + + await runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-throttle", + sessionKey, + sessionFile: "/tmp/session-throttle.jsonl", + reason: "turn", + }); + + await vi.advanceTimersByTimeAsync(11_000); + await waitForAssertion(() => + expect( + peekSystemEvents(sessionKey).filter((event) => + event.includes("Background task update: Context engine turn maintenance."), + ), + ).toHaveLength(1), + ); + + await vi.advanceTimersByTimeAsync(9_000); + expect( + peekSystemEvents(sessionKey).filter((event) => + event.includes("Background task update: Context engine turn maintenance."), + ), + ).toHaveLength(2); + + await vi.advanceTimersByTimeAsync(1_000); + expect( + peekSystemEvents(sessionKey).filter((event) => + event.includes("Background task update: Context engine turn maintenance."), + ), + ).toHaveLength(2); + + releaseForeground(); + await foregroundTurn; + } finally { + vi.useRealTimers(); + } + }); + }); + it("surfaces deferred maintenance failures even when they fail quickly", async () => { await withStateDirEnv("openclaw-turn-maintenance-", async () => { vi.useFakeTimers(); diff --git a/src/agents/pi-embedded-runner/context-engine-maintenance.ts b/src/agents/pi-embedded-runner/context-engine-maintenance.ts index 3fec2acd4c2..3b176d35fe0 100644 --- a/src/agents/pi-embedded-runner/context-engine-maintenance.ts +++ b/src/agents/pi-embedded-runner/context-engine-maintenance.ts @@ -17,6 +17,7 @@ import { } from "../../tasks/task-executor.js"; import { findTaskByRunId, + markTaskTerminalById, setTaskRunDeliveryStatusByRunId, updateTaskNotifyPolicyById, } from "../../tasks/task-registry.js"; @@ -155,7 +156,7 @@ async function executeContextEngineMaintenance(params: { sessionId: params.sessionId, sessionKey: params.sessionKey, sessionFile: params.sessionFile, - sessionManager: params.sessionManager, + sessionManager: params.executionMode === "background" ? undefined : params.sessionManager, runtimeContext: params.runtimeContext, allowDeferredCompactionExecution: params.executionMode === "background", }), @@ -205,11 +206,7 @@ async function runDeferredTurnMaintenanceWorker(params: { while (getQueueSize(sessionLane) > 0) { const now = Date.now(); - if ( - lastWaitNoticeAt === 0 || - now - lastWaitNoticeAt >= TURN_MAINTENANCE_LONG_WAIT_MS || - now - startedWaitingAt >= TURN_MAINTENANCE_LONG_WAIT_MS - ) { + if (lastWaitNoticeAt === 0 || now - lastWaitNoticeAt >= TURN_MAINTENANCE_LONG_WAIT_MS) { lastWaitNoticeAt = now; if (now - startedWaitingAt >= TURN_MAINTENANCE_LONG_WAIT_MS) { surfaceMaintenanceUpdate( @@ -322,13 +319,26 @@ function scheduleDeferredTurnMaintenance(params: { runtime: "acp", taskKind: TURN_MAINTENANCE_TASK_KIND, }); + const reusableTask = existingTask?.runId?.trim() ? existingTask : undefined; + if (existingTask && !reusableTask) { + updateTaskNotifyPolicyById({ + taskId: existingTask.taskId, + notifyPolicy: "silent", + }); + markTaskTerminalById({ + taskId: existingTask.taskId, + status: "cancelled", + endedAt: Date.now(), + terminalSummary: "Superseded by refreshed deferred maintenance task.", + }); + } const task = - existingTask ?? + reusableTask ?? buildTurnMaintenanceTaskDescriptor({ sessionKey, }); log.info( - `[context-engine] deferred turn maintenance ${existingTask ? "resuming" : "queued"} ` + + `[context-engine] deferred turn maintenance ${reusableTask ? "resuming" : "queued"} ` + `taskId=${task.taskId} sessionKey=${sessionKey} lane=${resolveDeferredTurnMaintenanceLane(sessionKey)}`, ); @@ -342,7 +352,7 @@ function scheduleDeferredTurnMaintenance(params: { sessionFile: params.sessionFile, sessionManager: params.sessionManager, runtimeContext: params.runtimeContext, - runId: task.runId ?? task.taskId, + runId: task.runId!, }), ); const trackedPromise = runPromise diff --git a/src/infra/backoff.test.ts b/src/infra/backoff.test.ts index 2022cf0b22d..5f55f5471f7 100644 --- a/src/infra/backoff.test.ts +++ b/src/infra/backoff.test.ts @@ -58,4 +58,24 @@ describe("backoff helpers", () => { vi.useRealTimers(); } }); + + it("rejects if the signal aborts during listener registration", async () => { + let aborted = false; + const signal = { + get aborted() { + return aborted; + }, + get reason() { + return new Error("listener-registration-race"); + }, + addEventListener(_event: string, _listener: EventListenerOrEventListenerObject) { + aborted = true; + }, + removeEventListener() {}, + } as unknown as AbortSignal; + + await expect(sleepWithAbort(50, signal)).rejects.toMatchObject({ + message: "aborted", + }); + }); }); diff --git a/src/infra/backoff.ts b/src/infra/backoff.ts index a9463414bb8..706ee229f61 100644 --- a/src/infra/backoff.ts +++ b/src/infra/backoff.ts @@ -15,21 +15,9 @@ export async function sleepWithAbort(ms: number, abortSignal?: AbortSignal) { if (ms <= 0) { return; } - if (abortSignal?.aborted) { - throw new Error("aborted", { cause: abortSignal.reason ?? new Error("aborted") }); - } - await new Promise((resolve, reject) => { let settled = false; - let timer: ReturnType | null = setTimeout(() => { - settled = true; - if (abortSignal) { - abortSignal.removeEventListener("abort", onAbort); - } - timer = null; - resolve(); - }, ms); - + let timer: ReturnType | null = null; const onAbort = () => { if (settled) { return; @@ -47,6 +35,25 @@ export async function sleepWithAbort(ms: number, abortSignal?: AbortSignal) { if (abortSignal) { abortSignal.addEventListener("abort", onAbort, { once: true }); + if (abortSignal.aborted) { + onAbort(); + return; + } + } + + timer = setTimeout(() => { + settled = true; + if (abortSignal) { + abortSignal.removeEventListener("abort", onAbort); + } + timer = null; + resolve(); + }, ms); + + if (abortSignal) { + if (abortSignal.aborted) { + onAbort(); + } } }); }