diff --git a/CHANGELOG.md b/CHANGELOG.md index e70093a53a0..f8cbdfeecc9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ Docs: https://docs.openclaw.ai ## Unreleased +### Changes + +### Fixes + +- Agents/context engines: run opt-in turn maintenance as idle-aware background work so the next foreground turn no longer waits on proactive maintenance. (#65233) thanks @100yenadmin + ## 2026.4.12 ### Changes @@ -44,6 +50,7 @@ Docs: https://docs.openclaw.ai - WhatsApp/outbound: fall back to the first `mediaUrls` entry when `mediaUrl` is empty so gateway media sends stop silently dropping attachments that already have a resolved media list. (#64394) Thanks @eric-fr4 and @vincentkoc. - Doctor/Discord: stop `openclaw doctor --fix` from rewriting legacy Discord preview-streaming config into the nested modern shape, so downgrades can still recover without hand-editing `channels.discord.streaming`. (#65035) Thanks @vincentkoc. - Gateway/auth: blank the shipped example gateway credential in `.env.example` and fail startup when a copied placeholder token or password is still configured, so operators cannot accidentally launch with a publicly known secret. (#64586) Thanks @navarrotech and @vincentkoc. + - Memory/active-memory+dreaming: keep active-memory recall runs on the strongest resolved channel, consume managed dreaming heartbeat events exactly once, stop dreaming from re-ingesting its own narrative transcripts, and add explicit repair/dedupe recovery flows in CLI, doctor, and the Dreams UI. - Agents/queueing: carry orphaned active-turn user text into the next prompt before repairing transcript ordering, so follow-up messages that arrive mid-run are no longer silently dropped. (#65388) Thanks @adminfedres and @vincentkoc. - Gateway/keepalive: stop marking WebSocket tick broadcasts as droppable so slow or backpressured clients do not self-disconnect with `tick timeout` while long-running work is still alive. (#65256) Thanks @100yenadmin and @vincentkoc. diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index 4b0b5e0eec6..24fb0fd41ee 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -fd679707dd78dbf63460876ea137ada61c536d7815ff8f6eb02e4c4b40a765cb plugin-sdk-api-baseline.json -bd52b020f75ef21f49b8934bc142a7cf877844791d9dfcda8577281e99a753f2 plugin-sdk-api-baseline.jsonl +600f05b14825fa01eb9d63ab6cab5f33c74ff44a48cab5c65457ab08e5b0e91a plugin-sdk-api-baseline.json +99d649a86a30756b18b91686f3683e6e829c5e316e1370266ec4fee344bc55cb plugin-sdk-api-baseline.jsonl diff --git a/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts b/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts index a49ff63842d..addd583e7ef 100644 --- a/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts +++ b/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts @@ -1,4 +1,23 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { ContextEngineRuntimeContext } from "../../context-engine/types.js"; +import { peekSystemEvents, resetSystemEventsForTest } from "../../infra/system-events.js"; +import { + enqueueCommandInLane, + resetCommandQueueStateForTest, +} from "../../process/command-queue.js"; +import * as commandQueueModule from "../../process/command-queue.js"; +import { createQueuedTaskRun } from "../../tasks/task-executor.js"; +import { resetTaskFlowRegistryForTests } from "../../tasks/task-flow-registry.js"; +import { + getTaskById, + listTasksForOwnerKey, + resetTaskRegistryDeliveryRuntimeForTests, + resetTaskRegistryForTests, + setTaskRegistryDeliveryRuntimeForTests, +} from "../../tasks/task-registry.js"; +import { withStateDirEnv } from "../../test-helpers/state-dir-env.js"; +import { castAgentMessage } from "../test-helpers/agent-message-fixtures.js"; +import { resolveSessionLane } from "./lanes.js"; const rewriteTranscriptEntriesInSessionManagerMock = vi.fn((_params?: unknown) => ({ changed: true, @@ -11,7 +30,38 @@ const rewriteTranscriptEntriesInSessionFileMock = vi.fn(async (_params?: unknown rewrittenEntries: 2, })); let buildContextEngineMaintenanceRuntimeContext: typeof import("./context-engine-maintenance.js").buildContextEngineMaintenanceRuntimeContext; +let createDeferredTurnMaintenanceAbortSignal: typeof import("./context-engine-maintenance.js").createDeferredTurnMaintenanceAbortSignal; +let resetDeferredTurnMaintenanceStateForTest: typeof import("./context-engine-maintenance.js").resetDeferredTurnMaintenanceStateForTest; let runContextEngineMaintenance: typeof import("./context-engine-maintenance.js").runContextEngineMaintenance; +// Keep this literal aligned with the production module; tests use dynamic +// import reloading, so they cannot safely import the constant directly. +const TURN_MAINTENANCE_TASK_KIND = "context_engine_turn_maintenance"; + +async function flushAsyncWork(times = 4): Promise { + for (let index = 0; index < times; index += 1) { + await Promise.resolve(); + } +} + +async function waitForAssertion( + assertion: () => void, + timeoutMs = 2_000, + stepMs = 5, +): Promise { + const startedAt = Date.now(); + for (;;) { + try { + assertion(); + return; + } catch (error) { + if (Date.now() - startedAt >= timeoutMs) { + throw error; + } + await vi.advanceTimersByTimeAsync(stepMs); + await flushAsyncWork(); + } + } +} vi.mock("./transcript-rewrite.js", () => ({ rewriteTranscriptEntriesInSessionManager: (params: unknown) => @@ -21,15 +71,21 @@ vi.mock("./transcript-rewrite.js", () => ({ })); async function loadFreshContextEngineMaintenanceModuleForTest() { - vi.resetModules(); - ({ buildContextEngineMaintenanceRuntimeContext, runContextEngineMaintenance } = - await import("./context-engine-maintenance.js")); + ({ + buildContextEngineMaintenanceRuntimeContext, + createDeferredTurnMaintenanceAbortSignal, + resetDeferredTurnMaintenanceStateForTest, + runContextEngineMaintenance, + } = await import("./context-engine-maintenance.js")); + resetDeferredTurnMaintenanceStateForTest(); } describe("buildContextEngineMaintenanceRuntimeContext", () => { beforeEach(async () => { rewriteTranscriptEntriesInSessionManagerMock.mockClear(); rewriteTranscriptEntriesInSessionFileMock.mockClear(); + resetSystemEventsForTest(); + resetTaskRegistryDeliveryRuntimeForTests(); await loadFreshContextEngineMaintenanceModuleForTest(); }); @@ -97,6 +153,112 @@ describe("buildContextEngineMaintenanceRuntimeContext", () => { }); expect(rewriteTranscriptEntriesInSessionFileMock).not.toHaveBeenCalled(); }); + + it("defers file rewrites onto the session lane when requested", async () => { + vi.useFakeTimers(); + try { + resetCommandQueueStateForTest(); + const sessionKey = "agent:main:session-rewrite-handoff"; + const sessionLane = resolveSessionLane(sessionKey); + const events: string[] = []; + let releaseForeground!: () => void; + const foregroundTurn = enqueueCommandInLane(sessionLane, async () => { + events.push("foreground-start"); + await new Promise((resolve) => { + releaseForeground = resolve; + }); + events.push("foreground-end"); + }); + await Promise.resolve(); + + rewriteTranscriptEntriesInSessionFileMock.mockImplementationOnce(async (_params?: unknown) => { + events.push("rewrite"); + return { + changed: true, + bytesFreed: 123, + rewrittenEntries: 2, + }; + }); + + const runtimeContext = buildContextEngineMaintenanceRuntimeContext({ + sessionId: "session-rewrite-handoff", + sessionKey, + sessionFile: "/tmp/session-rewrite-handoff.jsonl", + deferTranscriptRewriteToSessionLane: true, + }); + + const rewritePromise = runtimeContext.rewriteTranscriptEntries?.({ + replacements: [ + { entryId: "entry-1", message: { role: "user", content: "hi", timestamp: 1 } }, + ], + }); + expect(rewritePromise).toBeDefined(); + + await flushAsyncWork(); + expect(rewriteTranscriptEntriesInSessionFileMock).not.toHaveBeenCalled(); + + releaseForeground(); + await expect(rewritePromise!).resolves.toEqual({ + changed: true, + bytesFreed: 123, + rewrittenEntries: 2, + }); + expect(events).toEqual(["foreground-start", "foreground-end", "rewrite"]); + await foregroundTurn; + } finally { + vi.useRealTimers(); + } + }); +}); + +describe("createDeferredTurnMaintenanceAbortSignal", () => { + beforeEach(async () => { + await loadFreshContextEngineMaintenanceModuleForTest(); + }); + + it("aborts on termination signals and unregisters listeners", () => { + const listeners = new Map void>>(); + const kill = vi.fn(); + const processLike = { + on(event: "SIGINT" | "SIGTERM", listener: () => void) { + const bucket = listeners.get(event) ?? new Set<() => void>(); + bucket.add(listener); + listeners.set(event, bucket); + return this; + }, + off(event: "SIGINT" | "SIGTERM", listener: () => void) { + listeners.get(event)?.delete(listener); + return this; + }, + listenerCount(event: "SIGINT" | "SIGTERM") { + return listeners.get(event)?.size ?? 0; + }, + kill, + pid: 4242, + } as unknown as NonNullable< + Parameters[0] + >["processLike"]; + + const { abortSignal, dispose } = createDeferredTurnMaintenanceAbortSignal({ processLike }); + const second = createDeferredTurnMaintenanceAbortSignal({ processLike }); + expect(listeners.get("SIGINT")?.size ?? 0).toBe(1); + expect(listeners.get("SIGTERM")?.size ?? 0).toBe(1); + + const sigtermListeners = Array.from(listeners.get("SIGTERM") ?? []); + expect(sigtermListeners).toHaveLength(1); + sigtermListeners[0]?.(); + + expect(abortSignal?.aborted).toBe(true); + expect(second.abortSignal?.aborted).toBe(true); + expect(kill).toHaveBeenCalledWith(4242, "SIGTERM"); + expect(listeners.get("SIGINT")?.size ?? 0).toBe(0); + expect(listeners.get("SIGTERM")?.size ?? 0).toBe(0); + + dispose(); + second.dispose(); + expect(listeners.get("SIGINT")?.size ?? 0).toBe(0); + expect(listeners.get("SIGTERM")?.size ?? 0).toBe(0); + }); }); describe("runContextEngineMaintenance", () => { @@ -152,4 +314,852 @@ describe("runContextEngineMaintenance", () => { | undefined; expect(typeof runtimeContext?.rewriteTranscriptEntries).toBe("function"); }); + + it("forces background maintenance rewrites through the session file even when a session manager exists", async () => { + const maintain = vi.fn(async (params?: unknown) => { + await ( + params as { runtimeContext?: ContextEngineRuntimeContext } | undefined + )?.runtimeContext?.rewriteTranscriptEntries?.({ + replacements: [ + { + entryId: "entry-1", + message: castAgentMessage({ + role: "assistant", + content: [{ type: "text", text: "done" }], + timestamp: 2, + }), + }, + ], + }); + return { + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + }; + }); + const sessionManager = { appendMessage: vi.fn() } as unknown as Parameters< + typeof buildContextEngineMaintenanceRuntimeContext + >[0]["sessionManager"]; + + await runContextEngineMaintenance({ + contextEngine: { + info: { id: "test", name: "Test Engine", turnMaintenanceMode: "background" }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }) => ({ messages, estimatedTokens: 0 }), + compact: async () => ({ ok: true, compacted: false }), + maintain, + }, + sessionId: "session-background-file-rewrite", + sessionKey: "agent:main:session-background-file-rewrite", + sessionFile: "/tmp/session-background-file-rewrite.jsonl", + reason: "turn", + executionMode: "background", + sessionManager, + }); + + expect(rewriteTranscriptEntriesInSessionManagerMock).not.toHaveBeenCalled(); + expect(rewriteTranscriptEntriesInSessionFileMock).toHaveBeenCalledWith({ + sessionFile: "/tmp/session-background-file-rewrite.jsonl", + sessionId: "session-background-file-rewrite", + sessionKey: "agent:main:session-background-file-rewrite", + request: { + replacements: [ + { + entryId: "entry-1", + message: castAgentMessage({ + role: "assistant", + content: [{ type: "text", text: "done" }], + timestamp: 2, + }), + }, + ], + }, + }); + }); + + it("defers turn maintenance to a hidden background task when enabled", async () => { + await withStateDirEnv("openclaw-turn-maintenance-", async () => { + vi.useFakeTimers(); + try { + resetCommandQueueStateForTest(); + resetTaskRegistryForTests({ persist: false }); + resetTaskFlowRegistryForTests({ persist: false }); + + const sessionKey = "agent:main:session-1"; + const sessionLane = resolveSessionLane(sessionKey); + let releaseForeground!: () => void; + const foregroundTurn = enqueueCommandInLane(sessionLane, async () => { + await new Promise((resolve) => { + releaseForeground = resolve; + }); + }); + await Promise.resolve(); + + const maintain = vi.fn(async (_params?: unknown) => ({ + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + })); + + const backgroundEngine = { + info: { + id: "test", + name: "Test Engine", + turnMaintenanceMode: "background" as const, + }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }: { messages: unknown[] }) => ({ + messages, + estimatedTokens: 0, + }), + compact: async () => ({ ok: true, compacted: false }), + maintain, + } as NonNullable[0]["contextEngine"]>; + + const result = await runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-1", + sessionKey, + sessionFile: "/tmp/session.jsonl", + reason: "turn", + runtimeContext: { workspaceDir: "/tmp/workspace" }, + }); + + expect(result).toBeUndefined(); + expect(maintain).not.toHaveBeenCalled(); + + const queuedTasks = listTasksForOwnerKey(sessionKey).filter( + (task) => task.taskKind === TURN_MAINTENANCE_TASK_KIND, + ); + expect(queuedTasks).toHaveLength(1); + expect(queuedTasks[0]).toMatchObject({ + runtime: "acp", + scopeKind: "session", + ownerKey: sessionKey, + requesterSessionKey: sessionKey, + taskKind: TURN_MAINTENANCE_TASK_KIND, + notifyPolicy: "silent", + deliveryStatus: "pending", + }); + + releaseForeground(); + await waitForAssertion(() => expect(maintain).toHaveBeenCalledTimes(1)); + expect(maintain.mock.calls[0]?.[0]).toMatchObject({ + sessionId: "session-1", + sessionKey, + sessionFile: "/tmp/session.jsonl", + runtimeContext: expect.objectContaining({ + workspaceDir: "/tmp/workspace", + allowDeferredCompactionExecution: true, + }), + }); + + const completedTask = getTaskById(queuedTasks[0].taskId); + expect(completedTask).toMatchObject({ + status: "succeeded", + progressSummary: expect.stringContaining("Deferred maintenance completed"), + }); + + await foregroundTurn; + } finally { + vi.useRealTimers(); + } + }); + }); + + it("coalesces repeated requests into one active run plus one follow-up run for the same session", async () => { + await withStateDirEnv("openclaw-turn-maintenance-", async () => { + vi.useFakeTimers(); + try { + resetCommandQueueStateForTest(); + resetTaskRegistryForTests({ persist: false }); + resetTaskFlowRegistryForTests({ persist: false }); + + const sessionKey = "agent:main:session-2"; + const sessionLane = resolveSessionLane(sessionKey); + let releaseForeground!: () => void; + const foregroundTurn = enqueueCommandInLane(sessionLane, async () => { + await new Promise((resolve) => { + releaseForeground = resolve; + }); + }); + await Promise.resolve(); + + const maintain = vi.fn(async () => ({ + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + })); + + const backgroundEngine = { + info: { + id: "test", + name: "Test Engine", + turnMaintenanceMode: "background" as const, + }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }: { messages: unknown[] }) => ({ + messages, + estimatedTokens: 0, + }), + compact: async () => ({ ok: true, compacted: false }), + maintain, + } as NonNullable[0]["contextEngine"]>; + + await Promise.all([ + runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-2", + sessionKey, + sessionFile: "/tmp/session-2.jsonl", + reason: "turn", + }), + runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-2", + sessionKey, + sessionFile: "/tmp/session-2.jsonl", + reason: "turn", + }), + ]); + + const queuedTasks = listTasksForOwnerKey(sessionKey).filter( + (task) => task.taskKind === TURN_MAINTENANCE_TASK_KIND, + ); + expect(queuedTasks).toHaveLength(1); + + releaseForeground(); + await waitForAssertion(() => expect(maintain).toHaveBeenCalledTimes(2)); + const completedTasks = listTasksForOwnerKey(sessionKey).filter( + (task) => task.taskKind === TURN_MAINTENANCE_TASK_KIND, + ); + expect(completedTasks).toHaveLength(2); + expect(completedTasks.every((task) => task.status === "succeeded")).toBe(true); + + await foregroundTurn; + } finally { + vi.useRealTimers(); + } + }); + }); + + it("queues a follow-up maintenance run when a new turn finishes during an active deferred run", async () => { + await withStateDirEnv("openclaw-turn-maintenance-rerun-", async () => { + vi.useFakeTimers(); + try { + resetCommandQueueStateForTest(); + resetTaskRegistryForTests({ persist: false }); + resetTaskFlowRegistryForTests({ persist: false }); + + const sessionKey = "agent:main:session-rerun"; + let releaseFirstMaintenance!: () => void; + let maintenanceCalls = 0; + const maintain = vi.fn(async () => { + maintenanceCalls += 1; + if (maintenanceCalls === 1) { + await new Promise((resolve) => { + releaseFirstMaintenance = resolve; + }); + } + return { + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + }; + }); + + const backgroundEngine = { + info: { + id: "test", + name: "Test Engine", + turnMaintenanceMode: "background" as const, + }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }: { messages: unknown[] }) => ({ + messages, + estimatedTokens: 0, + }), + compact: async () => ({ ok: true, compacted: false }), + maintain, + } as NonNullable[0]["contextEngine"]>; + + await runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-rerun", + sessionKey, + sessionFile: "/tmp/session-rerun.jsonl", + reason: "turn", + }); + + await waitForAssertion(() => expect(maintain).toHaveBeenCalledTimes(1)); + + await runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-rerun", + sessionKey, + sessionFile: "/tmp/session-rerun.jsonl", + reason: "turn", + }); + + releaseFirstMaintenance(); + await waitForAssertion(() => expect(maintain).toHaveBeenCalledTimes(2)); + + const tasks = listTasksForOwnerKey(sessionKey).filter( + (task) => task.taskKind === TURN_MAINTENANCE_TASK_KIND, + ); + expect(tasks).toHaveLength(2); + expect(tasks.every((task) => task.status === "succeeded")).toBe(true); + } finally { + vi.useRealTimers(); + } + }); + }); + + it("replaces legacy active maintenance tasks that are missing a runId", async () => { + await withStateDirEnv("openclaw-turn-maintenance-", async () => { + vi.useFakeTimers(); + try { + resetCommandQueueStateForTest(); + resetTaskRegistryForTests({ persist: false }); + resetTaskFlowRegistryForTests({ persist: false }); + + const sessionKey = "agent:main:session-legacy"; + const legacyTask = createQueuedTaskRun({ + runtime: "acp", + taskKind: TURN_MAINTENANCE_TASK_KIND, + sourceId: TURN_MAINTENANCE_TASK_KIND, + requesterSessionKey: sessionKey, + ownerKey: sessionKey, + scopeKind: "session", + label: "Context engine turn maintenance", + task: "Deferred context-engine maintenance after turn.", + notifyPolicy: "silent", + deliveryStatus: "pending", + preferMetadata: true, + }); + + const maintain = vi.fn(async () => ({ + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + })); + const backgroundEngine = { + info: { + id: "test", + name: "Test Engine", + turnMaintenanceMode: "background" as const, + }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }: { messages: unknown[] }) => ({ + messages, + estimatedTokens: 0, + }), + compact: async () => ({ ok: true, compacted: false }), + maintain, + } as NonNullable[0]["contextEngine"]>; + + await runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-legacy", + sessionKey, + sessionFile: "/tmp/session-legacy.jsonl", + reason: "turn", + }); + + await waitForAssertion(() => expect(maintain).toHaveBeenCalledTimes(1)); + + const tasks = listTasksForOwnerKey(sessionKey).filter( + (task) => task.taskKind === TURN_MAINTENANCE_TASK_KIND, + ); + expect(tasks).toHaveLength(2); + expect(getTaskById(legacyTask.taskId)).toMatchObject({ + status: "cancelled", + notifyPolicy: "silent", + }); + expect(tasks.some((task) => task.runId?.startsWith("turn-maint:"))).toBe(true); + } finally { + vi.useRealTimers(); + } + }); + }); + + it("cancels the queued task when deferred scheduling is rejected", async () => { + await withStateDirEnv("openclaw-turn-maintenance-", async () => { + vi.useFakeTimers(); + const scheduleError = new Error("gateway draining"); + const enqueueSpy = vi + .spyOn(commandQueueModule, "enqueueCommandInLane") + .mockRejectedValue(scheduleError); + try { + resetTaskRegistryForTests({ persist: false }); + resetTaskFlowRegistryForTests({ persist: false }); + resetCommandQueueStateForTest(); + + const sessionKey = "agent:main:session-enqueue-reject"; + const maintain = vi.fn(async () => ({ + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + })); + const backgroundEngine = { + info: { + id: "test", + name: "Test Engine", + turnMaintenanceMode: "background" as const, + }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }: { messages: unknown[] }) => ({ + messages, + estimatedTokens: 0, + }), + compact: async () => ({ ok: true, compacted: false }), + maintain, + } as NonNullable[0]["contextEngine"]>; + + await runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-enqueue-reject", + sessionKey, + sessionFile: "/tmp/session-enqueue-reject.jsonl", + reason: "turn", + }); + await flushAsyncWork(); + + const tasks = listTasksForOwnerKey(sessionKey).filter( + (task) => task.taskKind === TURN_MAINTENANCE_TASK_KIND, + ); + expect(tasks).toHaveLength(1); + expect(tasks[0]).toMatchObject({ + status: "cancelled", + terminalSummary: expect.stringContaining("gateway draining"), + }); + expect(maintain).not.toHaveBeenCalled(); + } finally { + enqueueSpy.mockRestore(); + vi.useRealTimers(); + } + }); + }); + + it("lets foreground turns win while deferred maintenance is waiting", async () => { + await withStateDirEnv("openclaw-turn-maintenance-", async () => { + vi.useFakeTimers(); + try { + resetCommandQueueStateForTest(); + resetTaskRegistryForTests({ persist: false }); + resetTaskFlowRegistryForTests({ persist: false }); + + const sessionKey = "agent:main:session-3"; + const sessionLane = resolveSessionLane(sessionKey); + const events: string[] = []; + let releaseFirstForeground!: () => void; + const firstForeground = enqueueCommandInLane(sessionLane, async () => { + events.push("foreground-1-start"); + await new Promise((resolve) => { + releaseFirstForeground = resolve; + }); + events.push("foreground-1-end"); + }); + await Promise.resolve(); + + const maintain = vi.fn(async () => { + events.push("maintenance-start"); + return { + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + }; + }); + + const backgroundEngine = { + info: { + id: "test", + name: "Test Engine", + turnMaintenanceMode: "background" as const, + }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }: { messages: unknown[] }) => ({ + messages, + estimatedTokens: 0, + }), + compact: async () => ({ ok: true, compacted: false }), + maintain, + } as NonNullable[0]["contextEngine"]>; + + await runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-3", + sessionKey, + sessionFile: "/tmp/session-3.jsonl", + reason: "turn", + }); + + const secondForeground = enqueueCommandInLane(sessionLane, async () => { + events.push("foreground-2-start"); + events.push("foreground-2-end"); + }); + + releaseFirstForeground(); + await waitForAssertion(() => + expect(events).toEqual([ + "foreground-1-start", + "foreground-1-end", + "foreground-2-start", + "foreground-2-end", + "maintenance-start", + ]), + ); + expect(maintain).toHaveBeenCalledTimes(1); + + await Promise.all([firstForeground, secondForeground]); + } finally { + vi.useRealTimers(); + } + }); + }); + + it("lets a foreground turn run before a deferred maintenance transcript rewrite", async () => { + await withStateDirEnv("openclaw-turn-maintenance-", async () => { + vi.useFakeTimers(); + try { + resetCommandQueueStateForTest(); + resetTaskRegistryForTests({ persist: false }); + resetTaskFlowRegistryForTests({ persist: false }); + + const sessionKey = "agent:main:session-rewrite-priority"; + const sessionLane = resolveSessionLane(sessionKey); + const events: string[] = []; + let allowRewrite!: () => void; + const maintain = vi.fn(async (params?: unknown) => { + events.push("maintenance-start"); + await new Promise((resolve) => { + allowRewrite = resolve; + }); + events.push("maintenance-before-rewrite"); + await (params as { runtimeContext?: ContextEngineRuntimeContext }).runtimeContext + ?.rewriteTranscriptEntries?.({ + replacements: [ + { + entryId: "entry-1", + message: castAgentMessage({ + role: "assistant", + content: [{ type: "text", text: "done" }], + timestamp: 2, + }), + }, + ], + }); + events.push("maintenance-after-rewrite"); + return { + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + }; + }); + + rewriteTranscriptEntriesInSessionFileMock.mockImplementationOnce(async (_params?: unknown) => { + events.push("rewrite"); + return { + changed: true, + bytesFreed: 123, + rewrittenEntries: 2, + }; + }); + + const backgroundEngine = { + info: { + id: "test", + name: "Test Engine", + turnMaintenanceMode: "background" as const, + }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }: { messages: unknown[] }) => ({ + messages, + estimatedTokens: 0, + }), + compact: async () => ({ ok: true, compacted: false }), + maintain, + } as NonNullable[0]["contextEngine"]>; + + await runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-rewrite-priority", + sessionKey, + sessionFile: "/tmp/session-rewrite-priority.jsonl", + reason: "turn", + }); + + await waitForAssertion(() => expect(events).toContain("maintenance-start")); + + const foregroundTurn = enqueueCommandInLane(sessionLane, async () => { + events.push("foreground-start"); + events.push("foreground-end"); + }); + + allowRewrite(); + + await waitForAssertion(() => + expect(events).toEqual([ + "maintenance-start", + "foreground-start", + "foreground-end", + "maintenance-before-rewrite", + "rewrite", + "maintenance-after-rewrite", + ]), + ); + + expect(maintain).toHaveBeenCalledTimes(1); + await foregroundTurn; + } finally { + vi.useRealTimers(); + } + }); + }); + + it("keeps fast deferred maintenance silent for the user", async () => { + await withStateDirEnv("openclaw-turn-maintenance-", async () => { + vi.useFakeTimers(); + try { + resetCommandQueueStateForTest(); + resetTaskRegistryForTests({ persist: false }); + resetTaskFlowRegistryForTests({ persist: false }); + resetSystemEventsForTest(); + const sendMessageMock = vi.fn(); + setTaskRegistryDeliveryRuntimeForTests({ + sendMessage: sendMessageMock, + }); + + const sessionKey = "agent:main:session-fast"; + const maintain = vi.fn(async () => ({ + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + })); + const backgroundEngine = { + info: { + id: "test", + name: "Test Engine", + turnMaintenanceMode: "background" as const, + }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }: { messages: unknown[] }) => ({ + messages, + estimatedTokens: 0, + }), + compact: async () => ({ ok: true, compacted: false }), + maintain, + } as NonNullable[0]["contextEngine"]>; + + await runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-fast", + sessionKey, + sessionFile: "/tmp/session-fast.jsonl", + reason: "turn", + }); + await waitForAssertion(() => expect(maintain).toHaveBeenCalledTimes(1)); + expect(sendMessageMock).not.toHaveBeenCalled(); + expect(peekSystemEvents(sessionKey)).toEqual([]); + } finally { + vi.useRealTimers(); + } + }); + }); + + it("surfaces long-running deferred maintenance and completion via task updates", async () => { + await withStateDirEnv("openclaw-turn-maintenance-", async () => { + vi.useFakeTimers(); + try { + resetCommandQueueStateForTest(); + resetTaskRegistryForTests({ persist: false }); + resetTaskFlowRegistryForTests({ persist: false }); + resetSystemEventsForTest(); + + const sessionKey = "agent:main:session-long"; + const sessionLane = resolveSessionLane(sessionKey); + let releaseForeground!: () => void; + const foregroundTurn = enqueueCommandInLane(sessionLane, async () => { + await new Promise((resolve) => { + releaseForeground = resolve; + }); + }); + await Promise.resolve(); + + const maintain = vi.fn(async () => ({ + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + })); + const backgroundEngine = { + info: { + id: "test", + name: "Test Engine", + turnMaintenanceMode: "background" as const, + }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }: { messages: unknown[] }) => ({ + messages, + estimatedTokens: 0, + }), + compact: async () => ({ ok: true, compacted: false }), + maintain, + } as NonNullable[0]["contextEngine"]>; + + await runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-long", + sessionKey, + sessionFile: "/tmp/session-long.jsonl", + reason: "turn", + }); + + await vi.advanceTimersByTimeAsync(11_000); + await waitForAssertion(() => + expect(peekSystemEvents(sessionKey)).toEqual( + expect.arrayContaining([ + expect.stringContaining("Background task update: Context engine turn maintenance."), + ]), + ), + ); + + releaseForeground(); + await waitForAssertion(() => + expect(peekSystemEvents(sessionKey)).toEqual( + expect.arrayContaining([ + expect.stringContaining("Background task done: Context engine turn maintenance"), + ]), + ), + ); + + await foregroundTurn; + } finally { + vi.useRealTimers(); + } + }); + }); + + it("throttles deferred wait notices while the session lane stays busy", async () => { + await withStateDirEnv("openclaw-turn-maintenance-", async () => { + vi.useFakeTimers(); + try { + resetCommandQueueStateForTest(); + resetTaskRegistryForTests({ persist: false }); + resetTaskFlowRegistryForTests({ persist: false }); + resetSystemEventsForTest(); + + const sessionKey = "agent:main:session-throttle"; + const sessionLane = resolveSessionLane(sessionKey); + let releaseForeground!: () => void; + const foregroundTurn = enqueueCommandInLane(sessionLane, async () => { + await new Promise((resolve) => { + releaseForeground = resolve; + }); + }); + await Promise.resolve(); + + const backgroundEngine = { + info: { + id: "test", + name: "Test Engine", + turnMaintenanceMode: "background" as const, + }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }: { messages: unknown[] }) => ({ + messages, + estimatedTokens: 0, + }), + compact: async () => ({ ok: true, compacted: false }), + maintain: vi.fn(async () => ({ + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + })), + } as NonNullable[0]["contextEngine"]>; + + await runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-throttle", + sessionKey, + sessionFile: "/tmp/session-throttle.jsonl", + reason: "turn", + }); + + await vi.advanceTimersByTimeAsync(11_000); + await waitForAssertion(() => + expect( + peekSystemEvents(sessionKey).filter((event) => + event.includes("Background task update: Context engine turn maintenance."), + ), + ).toHaveLength(1), + ); + + await vi.advanceTimersByTimeAsync(9_000); + expect( + peekSystemEvents(sessionKey).filter((event) => + event.includes("Background task update: Context engine turn maintenance."), + ), + ).toHaveLength(2); + + await vi.advanceTimersByTimeAsync(1_000); + expect( + peekSystemEvents(sessionKey).filter((event) => + event.includes("Background task update: Context engine turn maintenance."), + ), + ).toHaveLength(2); + + releaseForeground(); + await foregroundTurn; + } finally { + vi.useRealTimers(); + } + }); + }); + + it("surfaces deferred maintenance failures even when they fail quickly", async () => { + await withStateDirEnv("openclaw-turn-maintenance-", async () => { + vi.useFakeTimers(); + try { + resetCommandQueueStateForTest(); + resetTaskRegistryForTests({ persist: false }); + resetTaskFlowRegistryForTests({ persist: false }); + resetSystemEventsForTest(); + + const sessionKey = "agent:main:session-fail"; + const backgroundEngine = { + info: { + id: "test", + name: "Test Engine", + turnMaintenanceMode: "background" as const, + }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }: { messages: unknown[] }) => ({ + messages, + estimatedTokens: 0, + }), + compact: async () => ({ ok: true, compacted: false }), + maintain: vi.fn(async () => { + throw new Error("maintenance exploded"); + }), + } as NonNullable[0]["contextEngine"]>; + + await runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-fail", + sessionKey, + sessionFile: "/tmp/session-fail.jsonl", + reason: "turn", + }); + await waitForAssertion(() => + expect(peekSystemEvents(sessionKey)).toEqual( + expect.arrayContaining([ + expect.stringContaining("Background task failed: Context engine turn maintenance"), + ]), + ), + ); + } finally { + vi.useRealTimers(); + } + }); + }); }); diff --git a/src/agents/pi-embedded-runner/context-engine-maintenance.ts b/src/agents/pi-embedded-runner/context-engine-maintenance.ts index 88e417f5757..639b26e282e 100644 --- a/src/agents/pi-embedded-runner/context-engine-maintenance.ts +++ b/src/agents/pi-embedded-runner/context-engine-maintenance.ts @@ -1,14 +1,268 @@ +import { randomUUID } from "node:crypto"; import type { ContextEngine, ContextEngineMaintenanceResult, ContextEngineRuntimeContext, } from "../../context-engine/types.js"; +import { sleepWithAbort } from "../../infra/backoff.js"; +import { formatErrorMessage } from "../../infra/errors.js"; +import { enqueueCommandInLane, getQueueSize } from "../../process/command-queue.js"; +import { normalizeOptionalString } from "../../shared/string-coerce.js"; +import { + completeTaskRunByRunId, + createQueuedTaskRun, + failTaskRunByRunId, + recordTaskRunProgressByRunId, + setDetachedTaskDeliveryStatusByRunId, + startTaskRunByRunId, +} from "../../tasks/task-executor.js"; +import { + cancelTaskByIdForOwner, + findTaskByRunIdForOwner, + updateTaskNotifyPolicyForOwner, +} from "../../tasks/task-owner-access.js"; +import { findActiveSessionTask } from "../session-async-task-status.js"; +import { resolveSessionLane } from "./lanes.js"; import { log } from "./logger.js"; import { rewriteTranscriptEntriesInSessionFile, rewriteTranscriptEntriesInSessionManager, } from "./transcript-rewrite.js"; +const TURN_MAINTENANCE_TASK_KIND = "context_engine_turn_maintenance"; +const TURN_MAINTENANCE_TASK_LABEL = "Context engine turn maintenance"; +const TURN_MAINTENANCE_TASK_TASK = "Deferred context-engine maintenance after turn."; +const TURN_MAINTENANCE_LANE_PREFIX = "context-engine-turn-maintenance:"; +const TURN_MAINTENANCE_WAIT_POLL_MS = 100; +const TURN_MAINTENANCE_LONG_WAIT_MS = 10_000; +const DEFERRED_TURN_MAINTENANCE_ABORT_STATE_KEY = Symbol.for( + "openclaw.contextEngineTurnMaintenanceAbortState", +); +type DeferredTurnMaintenanceScheduleParams = { + contextEngine: ContextEngine; + sessionId: string; + sessionKey: string; + sessionFile: string; + sessionManager?: Parameters[0]["sessionManager"]; + runtimeContext?: ContextEngineRuntimeContext; +}; + +type DeferredTurnMaintenanceRunState = { + promise: Promise; + rerunRequested: boolean; + latestParams: DeferredTurnMaintenanceScheduleParams; +}; + +const activeDeferredTurnMaintenanceRuns = new Map(); + +type DeferredTurnMaintenanceSignal = "SIGINT" | "SIGTERM"; +type DeferredTurnMaintenanceProcessLike = Pick & + Partial> & { + [DEFERRED_TURN_MAINTENANCE_ABORT_STATE_KEY]?: DeferredTurnMaintenanceAbortState; + }; +type DeferredTurnMaintenanceAbortState = { + registered: boolean; + controllers: Set; + cleanupHandlers: Map void>; +}; + +function resolveDeferredTurnMaintenanceAbortState( + processLike: DeferredTurnMaintenanceProcessLike, +): DeferredTurnMaintenanceAbortState { + const existing = processLike[DEFERRED_TURN_MAINTENANCE_ABORT_STATE_KEY]; + if (existing) { + return existing; + } + const created: DeferredTurnMaintenanceAbortState = { + registered: false, + controllers: new Set(), + cleanupHandlers: new Map void>(), + }; + processLike[DEFERRED_TURN_MAINTENANCE_ABORT_STATE_KEY] = created; + return created; +} + +function unregisterDeferredTurnMaintenanceAbortSignalHandlers( + processLike: DeferredTurnMaintenanceProcessLike, + state: DeferredTurnMaintenanceAbortState, +): void { + if (!state.registered) { + return; + } + for (const [signal, handler] of state.cleanupHandlers) { + processLike.off(signal, handler); + } + state.cleanupHandlers.clear(); + state.registered = false; +} + +function normalizeSessionKey(sessionKey?: string): string | undefined { + return normalizeOptionalString(sessionKey) || undefined; +} + +function resolveDeferredTurnMaintenanceLane(sessionKey: string): string { + return `${TURN_MAINTENANCE_LANE_PREFIX}${sessionKey}`; +} + +export function createDeferredTurnMaintenanceAbortSignal(params?: { + processLike?: DeferredTurnMaintenanceProcessLike; +}): { + abortSignal?: AbortSignal; + dispose: () => void; +} { + if (typeof AbortController === "undefined") { + return { abortSignal: undefined, dispose: () => {} }; + } + + const processLike = (params?.processLike ?? process) as DeferredTurnMaintenanceProcessLike; + const state = resolveDeferredTurnMaintenanceAbortState(processLike); + const handleTerminationSignal = (signalName: DeferredTurnMaintenanceSignal) => { + const shouldReraise = + typeof processLike.listenerCount === "function" + ? processLike.listenerCount(signalName) === 1 + : false; + for (const activeController of state.controllers) { + if (!activeController.signal.aborted) { + activeController.abort( + new Error(`received ${signalName} while waiting for deferred maintenance`), + ); + } + } + state.controllers.clear(); + unregisterDeferredTurnMaintenanceAbortSignalHandlers(processLike, state); + if (shouldReraise && typeof processLike.kill === "function") { + try { + processLike.kill(processLike.pid ?? process.pid, signalName); + } catch { + // Ignore shutdown-path failures. + } + } + }; + if (!state.registered) { + state.registered = true; + const onSigint = () => handleTerminationSignal("SIGINT"); + const onSigterm = () => handleTerminationSignal("SIGTERM"); + state.cleanupHandlers.set("SIGINT", onSigint); + state.cleanupHandlers.set("SIGTERM", onSigterm); + processLike.on("SIGINT", onSigint); + processLike.on("SIGTERM", onSigterm); + } + + const controller = new AbortController(); + state.controllers.add(controller); + let disposed = false; + + const cleanup = () => { + if (disposed) { + return; + } + disposed = true; + state.controllers.delete(controller); + if (state.controllers.size === 0) { + unregisterDeferredTurnMaintenanceAbortSignalHandlers(processLike, state); + } + }; + + return { + abortSignal: controller.signal, + dispose: cleanup, + }; +} + +export function resetDeferredTurnMaintenanceStateForTest(): void { + activeDeferredTurnMaintenanceRuns.clear(); + const processLike = process as DeferredTurnMaintenanceProcessLike; + const state = processLike[DEFERRED_TURN_MAINTENANCE_ABORT_STATE_KEY]; + if (!state) { + return; + } + state.controllers.clear(); + unregisterDeferredTurnMaintenanceAbortSignalHandlers(processLike, state); + delete processLike[DEFERRED_TURN_MAINTENANCE_ABORT_STATE_KEY]; +} + +function markDeferredTurnMaintenanceTaskScheduleFailure(params: { + sessionKey: string; + taskId: string; + error: unknown; +}): void { + const errorMessage = formatErrorMessage(params.error); + log.warn(`failed to schedule deferred context engine maintenance: ${errorMessage}`); + cancelTaskByIdForOwner({ + taskId: params.taskId, + callerOwnerKey: params.sessionKey, + endedAt: Date.now(), + terminalSummary: `Deferred maintenance could not be scheduled: ${errorMessage}`, + }); +} + +function buildTurnMaintenanceTaskDescriptor(params: { sessionKey: string }) { + const runId = `turn-maint:${params.sessionKey}:${Date.now().toString(36)}:${randomUUID().slice( + 0, + 8, + )}`; + return createQueuedTaskRun({ + runtime: "acp", + taskKind: TURN_MAINTENANCE_TASK_KIND, + sourceId: TURN_MAINTENANCE_TASK_KIND, + requesterSessionKey: params.sessionKey, + ownerKey: params.sessionKey, + scopeKind: "session", + runId, + label: TURN_MAINTENANCE_TASK_LABEL, + task: TURN_MAINTENANCE_TASK_TASK, + notifyPolicy: "silent", + deliveryStatus: "pending", + preferMetadata: true, + }); +} + +function promoteTurnMaintenanceTaskVisibility(params: { + sessionKey: string; + runId: string; + notifyPolicy: "done_only" | "state_changes"; +}) { + const task = findTaskByRunIdForOwner({ + runId: params.runId, + callerOwnerKey: params.sessionKey, + }); + if (!task) { + return createQueuedTaskRun({ + runtime: "acp", + taskKind: TURN_MAINTENANCE_TASK_KIND, + sourceId: TURN_MAINTENANCE_TASK_KIND, + requesterSessionKey: params.sessionKey, + ownerKey: params.sessionKey, + scopeKind: "session", + runId: params.runId, + label: TURN_MAINTENANCE_TASK_LABEL, + task: TURN_MAINTENANCE_TASK_TASK, + notifyPolicy: params.notifyPolicy, + deliveryStatus: "pending", + preferMetadata: true, + }); + } + setDetachedTaskDeliveryStatusByRunId({ + runId: params.runId, + runtime: "acp", + sessionKey: params.sessionKey, + deliveryStatus: "pending", + }); + if (task.notifyPolicy !== params.notifyPolicy) { + updateTaskNotifyPolicyForOwner({ + taskId: task.taskId, + callerOwnerKey: params.sessionKey, + notifyPolicy: params.notifyPolicy, + }); + } + return ( + findTaskByRunIdForOwner({ + runId: params.runId, + callerOwnerKey: params.sessionKey, + }) ?? task + ); +} + /** * Attach runtime-owned transcript rewrite helpers to an existing * context-engine runtime context payload. @@ -19,9 +273,12 @@ export function buildContextEngineMaintenanceRuntimeContext(params: { sessionFile: string; sessionManager?: Parameters[0]["sessionManager"]; runtimeContext?: ContextEngineRuntimeContext; + allowDeferredCompactionExecution?: boolean; + deferTranscriptRewriteToSessionLane?: boolean; }): ContextEngineRuntimeContext { return { ...params.runtimeContext, + ...(params.allowDeferredCompactionExecution ? { allowDeferredCompactionExecution: true } : {}), rewriteTranscriptEntries: async (request) => { if (params.sessionManager) { return rewriteTranscriptEntriesInSessionManager({ @@ -29,16 +286,314 @@ export function buildContextEngineMaintenanceRuntimeContext(params: { replacements: request.replacements, }); } - return await rewriteTranscriptEntriesInSessionFile({ - sessionFile: params.sessionFile, - sessionId: params.sessionId, - sessionKey: params.sessionKey, - request, - }); + const rewriteTranscriptEntriesInFile = async () => + await rewriteTranscriptEntriesInSessionFile({ + sessionFile: params.sessionFile, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + request, + }); + const rewriteSessionKey = normalizeSessionKey(params.sessionKey ?? params.sessionId); + if (params.deferTranscriptRewriteToSessionLane && rewriteSessionKey) { + return await enqueueCommandInLane( + resolveSessionLane(rewriteSessionKey), + async () => await rewriteTranscriptEntriesInFile(), + ); + } + return await rewriteTranscriptEntriesInFile(); }, }; } +async function executeContextEngineMaintenance(params: { + contextEngine: ContextEngine; + sessionId: string; + sessionKey?: string; + sessionFile: string; + reason: "bootstrap" | "compaction" | "turn"; + sessionManager?: Parameters[0]["sessionManager"]; + runtimeContext?: ContextEngineRuntimeContext; + executionMode: "foreground" | "background"; +}): Promise { + if (typeof params.contextEngine.maintain !== "function") { + return undefined; + } + const result = await params.contextEngine.maintain({ + sessionId: params.sessionId, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + runtimeContext: buildContextEngineMaintenanceRuntimeContext({ + sessionId: params.sessionId, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + sessionManager: params.executionMode === "background" ? undefined : params.sessionManager, + runtimeContext: params.runtimeContext, + allowDeferredCompactionExecution: params.executionMode === "background", + deferTranscriptRewriteToSessionLane: params.executionMode === "background", + }), + }); + if (result.changed) { + log.info( + `[context-engine] maintenance(${params.reason}) changed transcript ` + + `rewrittenEntries=${result.rewrittenEntries} bytesFreed=${result.bytesFreed} ` + + `sessionKey=${params.sessionKey ?? params.sessionId ?? "unknown"}`, + ); + } + return result; +} + +async function runDeferredTurnMaintenanceWorker(params: { + contextEngine: ContextEngine; + sessionId: string; + sessionKey: string; + sessionFile: string; + sessionManager?: Parameters[0]["sessionManager"]; + runtimeContext?: ContextEngineRuntimeContext; + runId: string; +}): Promise { + let surfacedUserNotice = false; + let longRunningTimer: ReturnType | null = null; + const shutdownAbort = createDeferredTurnMaintenanceAbortSignal(); + const surfaceMaintenanceUpdate = (summary: string, eventSummary: string) => { + promoteTurnMaintenanceTaskVisibility({ + sessionKey: params.sessionKey, + runId: params.runId, + notifyPolicy: "state_changes", + }); + surfacedUserNotice = true; + recordTaskRunProgressByRunId({ + runId: params.runId, + runtime: "acp", + sessionKey: params.sessionKey, + lastEventAt: Date.now(), + progressSummary: summary, + eventSummary, + }); + }; + + try { + const sessionLane = resolveSessionLane(params.sessionKey); + const startedWaitingAt = Date.now(); + let lastWaitNoticeAt = 0; + + for (;;) { + while (getQueueSize(sessionLane) > 0) { + const now = Date.now(); + if ( + now - startedWaitingAt >= TURN_MAINTENANCE_LONG_WAIT_MS && + now - lastWaitNoticeAt >= TURN_MAINTENANCE_LONG_WAIT_MS + ) { + lastWaitNoticeAt = now; + surfaceMaintenanceUpdate( + "Waiting for the session lane to go idle.", + surfacedUserNotice + ? "Still waiting for the session lane to go idle." + : "Deferred maintenance is waiting for the session lane to go idle.", + ); + } + await sleepWithAbort(TURN_MAINTENANCE_WAIT_POLL_MS, shutdownAbort.abortSignal); + } + await Promise.resolve(); + if (getQueueSize(sessionLane) === 0) { + break; + } + } + + const runningAt = Date.now(); + startTaskRunByRunId({ + runId: params.runId, + runtime: "acp", + sessionKey: params.sessionKey, + startedAt: runningAt, + lastEventAt: runningAt, + progressSummary: "Running deferred maintenance.", + eventSummary: "Starting deferred maintenance.", + }); + longRunningTimer = setTimeout(() => { + try { + surfaceMaintenanceUpdate( + "Deferred maintenance is still running.", + "Deferred maintenance is still running.", + ); + } catch (error) { + log.warn(`failed to surface deferred maintenance progress: ${String(error)}`); + } + }, TURN_MAINTENANCE_LONG_WAIT_MS); + + const result = await executeContextEngineMaintenance({ + contextEngine: params.contextEngine, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + reason: "turn", + sessionManager: params.sessionManager, + runtimeContext: params.runtimeContext, + executionMode: "background", + }); + if (longRunningTimer) { + clearTimeout(longRunningTimer); + longRunningTimer = null; + } + + const endedAt = Date.now(); + completeTaskRunByRunId({ + runId: params.runId, + runtime: "acp", + sessionKey: params.sessionKey, + endedAt, + lastEventAt: endedAt, + progressSummary: result?.changed + ? "Deferred maintenance completed with transcript changes." + : "Deferred maintenance completed.", + terminalSummary: result?.changed + ? `Rewrote ${result.rewrittenEntries} transcript entr${result.rewrittenEntries === 1 ? "y" : "ies"} and freed ${result.bytesFreed} bytes.` + : "No transcript changes were needed.", + }); + } catch (err) { + if (shutdownAbort.abortSignal?.aborted) { + if (longRunningTimer) { + clearTimeout(longRunningTimer); + longRunningTimer = null; + } + const task = findTaskByRunIdForOwner({ + runId: params.runId, + callerOwnerKey: params.sessionKey, + }); + if (task) { + cancelTaskByIdForOwner({ + taskId: task.taskId, + callerOwnerKey: params.sessionKey, + endedAt: Date.now(), + terminalSummary: "Deferred maintenance cancelled during shutdown.", + }); + } + return; + } + if (longRunningTimer) { + clearTimeout(longRunningTimer); + longRunningTimer = null; + } + const endedAt = Date.now(); + const reason = formatErrorMessage(err); + if (!surfacedUserNotice) { + promoteTurnMaintenanceTaskVisibility({ + sessionKey: params.sessionKey, + runId: params.runId, + notifyPolicy: "done_only", + }); + } + failTaskRunByRunId({ + runId: params.runId, + runtime: "acp", + sessionKey: params.sessionKey, + endedAt, + lastEventAt: endedAt, + error: reason, + progressSummary: "Deferred maintenance failed.", + terminalSummary: reason, + }); + log.warn(`deferred context engine maintenance failed: ${reason}`); + } finally { + shutdownAbort.dispose(); + } +} + +function scheduleDeferredTurnMaintenance(params: DeferredTurnMaintenanceScheduleParams): void { + const sessionKey = normalizeSessionKey(params.sessionKey); + if (!sessionKey) { + return; + } + const activeRun = activeDeferredTurnMaintenanceRuns.get(sessionKey); + if (activeRun) { + activeRun.rerunRequested = true; + activeRun.latestParams = { ...params, sessionKey }; + return; + } + + const existingTask = findActiveSessionTask({ + sessionKey, + runtime: "acp", + taskKind: TURN_MAINTENANCE_TASK_KIND, + }); + const reusableTask = existingTask?.runId?.trim() ? existingTask : undefined; + if (existingTask && !reusableTask) { + updateTaskNotifyPolicyForOwner({ + taskId: existingTask.taskId, + callerOwnerKey: sessionKey, + notifyPolicy: "silent", + }); + cancelTaskByIdForOwner({ + taskId: existingTask.taskId, + callerOwnerKey: sessionKey, + endedAt: Date.now(), + terminalSummary: "Superseded by refreshed deferred maintenance task.", + }); + } + const task = + reusableTask ?? + buildTurnMaintenanceTaskDescriptor({ + sessionKey, + }); + log.info( + `[context-engine] deferred turn maintenance ${reusableTask ? "resuming" : "queued"} ` + + `taskId=${task.taskId} sessionKey=${sessionKey} lane=${resolveDeferredTurnMaintenanceLane(sessionKey)}`, + ); + + const schedulerAbort = createDeferredTurnMaintenanceAbortSignal(); + let runPromise: Promise; + try { + runPromise = enqueueCommandInLane(resolveDeferredTurnMaintenanceLane(sessionKey), async () => + runDeferredTurnMaintenanceWorker({ + contextEngine: params.contextEngine, + sessionId: params.sessionId, + sessionKey, + sessionFile: params.sessionFile, + sessionManager: params.sessionManager, + runtimeContext: params.runtimeContext, + runId: task.runId!, + }), + ); + } catch (err) { + schedulerAbort.dispose(); + markDeferredTurnMaintenanceTaskScheduleFailure({ + sessionKey, + taskId: task.taskId, + error: err, + }); + return; + } + let state!: DeferredTurnMaintenanceRunState; + const trackedPromise = runPromise + .catch((err) => { + markDeferredTurnMaintenanceTaskScheduleFailure({ + sessionKey, + taskId: task.taskId, + error: err, + }); + }) + .finally(() => { + schedulerAbort.dispose(); + const current = activeDeferredTurnMaintenanceRuns.get(sessionKey); + if (current !== state) { + return; + } + const shutdownTriggered = schedulerAbort.abortSignal?.aborted === true; + const rerunParams = + current.rerunRequested && !shutdownTriggered ? current.latestParams : undefined; + activeDeferredTurnMaintenanceRuns.delete(sessionKey); + if (rerunParams) { + scheduleDeferredTurnMaintenance(rerunParams); + } + }); + state = { + promise: trackedPromise, + rerunRequested: false, + latestParams: { ...params, sessionKey }, + }; + activeDeferredTurnMaintenanceRuns.set(sessionKey, state); + void trackedPromise; +} + /** * Run optional context-engine transcript maintenance and normalize the result. */ @@ -50,32 +605,45 @@ export async function runContextEngineMaintenance(params: { reason: "bootstrap" | "compaction" | "turn"; sessionManager?: Parameters[0]["sessionManager"]; runtimeContext?: ContextEngineRuntimeContext; + executionMode?: "foreground" | "background"; }): Promise { if (typeof params.contextEngine?.maintain !== "function") { return undefined; } - try { - const result = await params.contextEngine.maintain({ - sessionId: params.sessionId, - sessionKey: params.sessionKey, - sessionFile: params.sessionFile, - runtimeContext: buildContextEngineMaintenanceRuntimeContext({ + const executionMode = params.executionMode ?? "foreground"; + const shouldDefer = + params.reason === "turn" && + executionMode !== "background" && + params.contextEngine.info.turnMaintenanceMode === "background"; + + if (shouldDefer) { + try { + scheduleDeferredTurnMaintenance({ + contextEngine: params.contextEngine, sessionId: params.sessionId, - sessionKey: params.sessionKey, + sessionKey: params.sessionKey ?? params.sessionId, sessionFile: params.sessionFile, sessionManager: params.sessionManager, runtimeContext: params.runtimeContext, - }), - }); - if (result.changed) { - log.info( - `[context-engine] maintenance(${params.reason}) changed transcript ` + - `rewrittenEntries=${result.rewrittenEntries} bytesFreed=${result.bytesFreed} ` + - `sessionKey=${params.sessionKey ?? params.sessionId ?? "unknown"}`, - ); + }); + } catch (err) { + log.warn(`failed to schedule deferred context engine maintenance: ${String(err)}`); } - return result; + return undefined; + } + + try { + return await executeContextEngineMaintenance({ + contextEngine: params.contextEngine, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + reason: params.reason, + sessionManager: params.sessionManager, + runtimeContext: params.runtimeContext, + executionMode, + }); } catch (err) { log.warn(`context engine maintain failed (${params.reason}): ${String(err)}`); return undefined; diff --git a/src/context-engine/types.ts b/src/context-engine/types.ts index e1120db8ade..594703bc864 100644 --- a/src/context-engine/types.ts +++ b/src/context-engine/types.ts @@ -50,6 +50,13 @@ export type ContextEngineInfo = { version?: string; /** True when the engine manages its own compaction lifecycle. */ ownsCompaction?: boolean; + /** + * Controls how turn-triggered maintenance should be executed. + * + * Engines remain compatible by default unless the host explicitly opts into + * background turn maintenance. + */ + turnMaintenanceMode?: "foreground" | "background"; }; export type SubagentSpawnPreparation = { @@ -128,6 +135,11 @@ export type ContextEnginePromptCacheInfo = { }; export type ContextEngineRuntimeContext = Record & { + /** + * True when the host has explicitly opted this maintenance run into + * consuming deferred compaction debt. + */ + allowDeferredCompactionExecution?: boolean; /** Optional prompt-cache telemetry for cache-aware engines. */ promptCache?: ContextEnginePromptCacheInfo; /** diff --git a/src/infra/backoff.test.ts b/src/infra/backoff.test.ts index 34108b815c8..5f55f5471f7 100644 --- a/src/infra/backoff.test.ts +++ b/src/infra/backoff.test.ts @@ -43,4 +43,39 @@ describe("backoff helpers", () => { cause: expect.anything(), }); }); + + it("advances with fake timers", async () => { + vi.useFakeTimers(); + try { + const sleeper = sleepWithAbort(50); + await vi.advanceTimersByTimeAsync(49); + await expect( + Promise.race([sleeper.then(() => "done"), Promise.resolve("pending")]), + ).resolves.toBe("pending"); + await vi.advanceTimersByTimeAsync(1); + await expect(sleeper).resolves.toBeUndefined(); + } finally { + vi.useRealTimers(); + } + }); + + it("rejects if the signal aborts during listener registration", async () => { + let aborted = false; + const signal = { + get aborted() { + return aborted; + }, + get reason() { + return new Error("listener-registration-race"); + }, + addEventListener(_event: string, _listener: EventListenerOrEventListenerObject) { + aborted = true; + }, + removeEventListener() {}, + } as unknown as AbortSignal; + + await expect(sleepWithAbort(50, signal)).rejects.toMatchObject({ + message: "aborted", + }); + }); }); diff --git a/src/infra/backoff.ts b/src/infra/backoff.ts index 153eca16203..706ee229f61 100644 --- a/src/infra/backoff.ts +++ b/src/infra/backoff.ts @@ -1,5 +1,3 @@ -import { setTimeout as delay } from "node:timers/promises"; - export type BackoffPolicy = { initialMs: number; maxMs: number; @@ -17,12 +15,45 @@ export async function sleepWithAbort(ms: number, abortSignal?: AbortSignal) { if (ms <= 0) { return; } - try { - await delay(ms, undefined, { signal: abortSignal }); - } catch (err) { - if (abortSignal?.aborted) { - throw new Error("aborted", { cause: err }); + await new Promise((resolve, reject) => { + let settled = false; + let timer: ReturnType | null = null; + const onAbort = () => { + if (settled) { + return; + } + settled = true; + if (timer) { + clearTimeout(timer); + timer = null; + } + if (abortSignal) { + abortSignal.removeEventListener("abort", onAbort); + } + reject(new Error("aborted", { cause: abortSignal?.reason ?? new Error("aborted") })); + }; + + if (abortSignal) { + abortSignal.addEventListener("abort", onAbort, { once: true }); + if (abortSignal.aborted) { + onAbort(); + return; + } } - throw err; - } + + timer = setTimeout(() => { + settled = true; + if (abortSignal) { + abortSignal.removeEventListener("abort", onAbort); + } + timer = null; + resolve(); + }, ms); + + if (abortSignal) { + if (abortSignal.aborted) { + onAbort(); + } + } + }); } diff --git a/src/tasks/task-owner-access.ts b/src/tasks/task-owner-access.ts index 5b26d7dbb62..041d2d2f365 100644 --- a/src/tasks/task-owner-access.ts +++ b/src/tasks/task-owner-access.ts @@ -3,9 +3,11 @@ import { findTaskByRunId, getTaskById, listTasksForRelatedSessionKey, + markTaskTerminalById as markTaskTerminalRecordById, resolveTaskForLookupToken, + updateTaskNotifyPolicyById, } from "./task-registry.js"; -import type { TaskRecord } from "./task-registry.types.js"; +import type { TaskNotifyPolicy, TaskRecord } from "./task-registry.types.js"; import { buildTaskStatusSnapshot } from "./task-status.js"; function canOwnerAccessTask(task: TaskRecord, callerOwnerKey: string): boolean { @@ -31,6 +33,47 @@ export function findTaskByRunIdForOwner(params: { return task && canOwnerAccessTask(task, params.callerOwnerKey) ? task : undefined; } +/** Update an owner-visible task's notification policy. */ +export function updateTaskNotifyPolicyForOwner(params: { + taskId: string; + callerOwnerKey: string; + notifyPolicy: TaskNotifyPolicy; +}): TaskRecord | null { + const task = getTaskByIdForOwner({ + taskId: params.taskId, + callerOwnerKey: params.callerOwnerKey, + }); + if (!task) { + return null; + } + return updateTaskNotifyPolicyById({ + taskId: task.taskId, + notifyPolicy: params.notifyPolicy, + }); +} + +/** Mark an owner-visible task as cancelled with a caller-provided summary. */ +export function cancelTaskByIdForOwner(params: { + taskId: string; + callerOwnerKey: string; + endedAt: number; + terminalSummary?: string | null; +}): TaskRecord | null { + const task = getTaskByIdForOwner({ + taskId: params.taskId, + callerOwnerKey: params.callerOwnerKey, + }); + if (!task) { + return null; + } + return markTaskTerminalRecordById({ + taskId: task.taskId, + status: "cancelled", + endedAt: params.endedAt, + terminalSummary: params.terminalSummary, + }); +} + export function listTasksForRelatedSessionKeyForOwner(params: { relatedSessionKey: string; callerOwnerKey: string;