feat: background turn maintenance for context engines

This commit is contained in:
Eva
2026-04-12 14:35:13 +07:00
committed by Josh Lehman
parent aee2681ab1
commit d11730e800
5 changed files with 860 additions and 26 deletions

View File

@@ -1,4 +1,19 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { peekSystemEvents, resetSystemEventsForTest } from "../../infra/system-events.js";
import {
enqueueCommandInLane,
resetCommandQueueStateForTest,
} from "../../process/command-queue.js";
import { resetTaskFlowRegistryForTests } from "../../tasks/task-flow-registry.js";
import {
getTaskById,
listTasksForOwnerKey,
resetTaskRegistryDeliveryRuntimeForTests,
resetTaskRegistryForTests,
setTaskRegistryDeliveryRuntimeForTests,
} from "../../tasks/task-registry.js";
import { withStateDirEnv } from "../../test-helpers/state-dir-env.js";
import { resolveSessionLane } from "./lanes.js";
const rewriteTranscriptEntriesInSessionManagerMock = vi.fn((_params?: unknown) => ({
changed: true,
@@ -12,6 +27,33 @@ const rewriteTranscriptEntriesInSessionFileMock = vi.fn(async (_params?: unknown
}));
let buildContextEngineMaintenanceRuntimeContext: typeof import("./context-engine-maintenance.js").buildContextEngineMaintenanceRuntimeContext;
let runContextEngineMaintenance: typeof import("./context-engine-maintenance.js").runContextEngineMaintenance;
const TURN_MAINTENANCE_TASK_KIND = "context_engine_turn_maintenance";
async function flushAsyncWork(times = 4): Promise<void> {
for (let index = 0; index < times; index += 1) {
await Promise.resolve();
}
}
async function waitForAssertion(
assertion: () => void,
timeoutMs = 2_000,
stepMs = 5,
): Promise<void> {
const startedAt = Date.now();
for (;;) {
try {
assertion();
return;
} catch (error) {
if (Date.now() - startedAt >= timeoutMs) {
throw error;
}
await vi.advanceTimersByTimeAsync(stepMs);
await flushAsyncWork();
}
}
}
vi.mock("./transcript-rewrite.js", () => ({
rewriteTranscriptEntriesInSessionManager: (params: unknown) =>
@@ -21,7 +63,6 @@ vi.mock("./transcript-rewrite.js", () => ({
}));
async function loadFreshContextEngineMaintenanceModuleForTest() {
vi.resetModules();
({ buildContextEngineMaintenanceRuntimeContext, runContextEngineMaintenance } =
await import("./context-engine-maintenance.js"));
}
@@ -30,6 +71,8 @@ describe("buildContextEngineMaintenanceRuntimeContext", () => {
beforeEach(async () => {
rewriteTranscriptEntriesInSessionManagerMock.mockClear();
rewriteTranscriptEntriesInSessionFileMock.mockClear();
resetSystemEventsForTest();
resetTaskRegistryDeliveryRuntimeForTests();
await loadFreshContextEngineMaintenanceModuleForTest();
});
@@ -152,4 +195,414 @@ describe("runContextEngineMaintenance", () => {
| undefined;
expect(typeof runtimeContext?.rewriteTranscriptEntries).toBe("function");
});
it("defers turn maintenance to a hidden background task when enabled", async () => {
await withStateDirEnv("openclaw-turn-maintenance-", async () => {
vi.useFakeTimers();
try {
resetCommandQueueStateForTest();
resetTaskRegistryForTests({ persist: false });
resetTaskFlowRegistryForTests({ persist: false });
const sessionKey = "agent:main:session-1";
const sessionLane = resolveSessionLane(sessionKey);
let releaseForeground!: () => void;
const foregroundTurn = enqueueCommandInLane(sessionLane, async () => {
await new Promise<void>((resolve) => {
releaseForeground = resolve;
});
});
await Promise.resolve();
const maintain = vi.fn(async (_params?: unknown) => ({
changed: false,
bytesFreed: 0,
rewrittenEntries: 0,
}));
const backgroundEngine = {
info: {
id: "test",
name: "Test Engine",
turnMaintenanceMode: "background" as const,
},
ingest: async () => ({ ingested: true }),
assemble: async ({ messages }: { messages: unknown[] }) => ({
messages,
estimatedTokens: 0,
}),
compact: async () => ({ ok: true, compacted: false }),
maintain,
} as NonNullable<Parameters<typeof runContextEngineMaintenance>[0]["contextEngine"]>;
const result = await runContextEngineMaintenance({
contextEngine: backgroundEngine,
sessionId: "session-1",
sessionKey,
sessionFile: "/tmp/session.jsonl",
reason: "turn",
runtimeContext: { workspaceDir: "/tmp/workspace" },
});
expect(result).toBeUndefined();
expect(maintain).not.toHaveBeenCalled();
const queuedTasks = listTasksForOwnerKey(sessionKey).filter(
(task) => task.taskKind === TURN_MAINTENANCE_TASK_KIND,
);
expect(queuedTasks).toHaveLength(1);
expect(queuedTasks[0]).toMatchObject({
runtime: "acp",
scopeKind: "session",
ownerKey: sessionKey,
requesterSessionKey: sessionKey,
taskKind: TURN_MAINTENANCE_TASK_KIND,
notifyPolicy: "silent",
deliveryStatus: "pending",
});
releaseForeground();
await waitForAssertion(() => expect(maintain).toHaveBeenCalledTimes(1));
expect(maintain.mock.calls[0]?.[0]).toMatchObject({
sessionId: "session-1",
sessionKey,
sessionFile: "/tmp/session.jsonl",
runtimeContext: expect.objectContaining({
workspaceDir: "/tmp/workspace",
allowDeferredCompactionExecution: true,
}),
});
const completedTask = getTaskById(queuedTasks[0].taskId);
expect(completedTask).toMatchObject({
status: "succeeded",
progressSummary: expect.stringContaining("Deferred maintenance completed"),
});
await foregroundTurn;
} finally {
vi.useRealTimers();
}
});
});
it("coalesces repeated turn maintenance requests for the same session", async () => {
await withStateDirEnv("openclaw-turn-maintenance-", async () => {
vi.useFakeTimers();
try {
resetCommandQueueStateForTest();
resetTaskRegistryForTests({ persist: false });
resetTaskFlowRegistryForTests({ persist: false });
const sessionKey = "agent:main:session-2";
const sessionLane = resolveSessionLane(sessionKey);
let releaseForeground!: () => void;
const foregroundTurn = enqueueCommandInLane(sessionLane, async () => {
await new Promise<void>((resolve) => {
releaseForeground = resolve;
});
});
await Promise.resolve();
const maintain = vi.fn(async () => ({
changed: false,
bytesFreed: 0,
rewrittenEntries: 0,
}));
const backgroundEngine = {
info: {
id: "test",
name: "Test Engine",
turnMaintenanceMode: "background" as const,
},
ingest: async () => ({ ingested: true }),
assemble: async ({ messages }: { messages: unknown[] }) => ({
messages,
estimatedTokens: 0,
}),
compact: async () => ({ ok: true, compacted: false }),
maintain,
} as NonNullable<Parameters<typeof runContextEngineMaintenance>[0]["contextEngine"]>;
await Promise.all([
runContextEngineMaintenance({
contextEngine: backgroundEngine,
sessionId: "session-2",
sessionKey,
sessionFile: "/tmp/session-2.jsonl",
reason: "turn",
}),
runContextEngineMaintenance({
contextEngine: backgroundEngine,
sessionId: "session-2",
sessionKey,
sessionFile: "/tmp/session-2.jsonl",
reason: "turn",
}),
]);
const queuedTasks = listTasksForOwnerKey(sessionKey).filter(
(task) => task.taskKind === TURN_MAINTENANCE_TASK_KIND,
);
expect(queuedTasks).toHaveLength(1);
releaseForeground();
await waitForAssertion(() => expect(maintain).toHaveBeenCalledTimes(1));
expect(getTaskById(queuedTasks[0].taskId)).toMatchObject({
status: "succeeded",
});
await foregroundTurn;
} finally {
vi.useRealTimers();
}
});
});
it("lets foreground turns win while deferred maintenance is waiting", async () => {
await withStateDirEnv("openclaw-turn-maintenance-", async () => {
vi.useFakeTimers();
try {
resetCommandQueueStateForTest();
resetTaskRegistryForTests({ persist: false });
resetTaskFlowRegistryForTests({ persist: false });
const sessionKey = "agent:main:session-3";
const sessionLane = resolveSessionLane(sessionKey);
const events: string[] = [];
let releaseFirstForeground!: () => void;
const firstForeground = enqueueCommandInLane(sessionLane, async () => {
events.push("foreground-1-start");
await new Promise<void>((resolve) => {
releaseFirstForeground = resolve;
});
events.push("foreground-1-end");
});
await Promise.resolve();
const maintain = vi.fn(async () => {
events.push("maintenance-start");
return {
changed: false,
bytesFreed: 0,
rewrittenEntries: 0,
};
});
const backgroundEngine = {
info: {
id: "test",
name: "Test Engine",
turnMaintenanceMode: "background" as const,
},
ingest: async () => ({ ingested: true }),
assemble: async ({ messages }: { messages: unknown[] }) => ({
messages,
estimatedTokens: 0,
}),
compact: async () => ({ ok: true, compacted: false }),
maintain,
} as NonNullable<Parameters<typeof runContextEngineMaintenance>[0]["contextEngine"]>;
await runContextEngineMaintenance({
contextEngine: backgroundEngine,
sessionId: "session-3",
sessionKey,
sessionFile: "/tmp/session-3.jsonl",
reason: "turn",
});
const secondForeground = enqueueCommandInLane(sessionLane, async () => {
events.push("foreground-2-start");
events.push("foreground-2-end");
});
releaseFirstForeground();
await waitForAssertion(() =>
expect(events).toEqual([
"foreground-1-start",
"foreground-1-end",
"foreground-2-start",
"foreground-2-end",
"maintenance-start",
]),
);
expect(maintain).toHaveBeenCalledTimes(1);
await Promise.all([firstForeground, secondForeground]);
} finally {
vi.useRealTimers();
}
});
});
it("keeps fast deferred maintenance silent for the user", async () => {
await withStateDirEnv("openclaw-turn-maintenance-", async () => {
vi.useFakeTimers();
try {
resetCommandQueueStateForTest();
resetTaskRegistryForTests({ persist: false });
resetTaskFlowRegistryForTests({ persist: false });
resetSystemEventsForTest();
const sendMessageMock = vi.fn();
setTaskRegistryDeliveryRuntimeForTests({
sendMessage: sendMessageMock,
});
const sessionKey = "agent:main:session-fast";
const maintain = vi.fn(async () => ({
changed: false,
bytesFreed: 0,
rewrittenEntries: 0,
}));
const backgroundEngine = {
info: {
id: "test",
name: "Test Engine",
turnMaintenanceMode: "background" as const,
},
ingest: async () => ({ ingested: true }),
assemble: async ({ messages }: { messages: unknown[] }) => ({
messages,
estimatedTokens: 0,
}),
compact: async () => ({ ok: true, compacted: false }),
maintain,
} as NonNullable<Parameters<typeof runContextEngineMaintenance>[0]["contextEngine"]>;
await runContextEngineMaintenance({
contextEngine: backgroundEngine,
sessionId: "session-fast",
sessionKey,
sessionFile: "/tmp/session-fast.jsonl",
reason: "turn",
});
await waitForAssertion(() => expect(maintain).toHaveBeenCalledTimes(1));
expect(sendMessageMock).not.toHaveBeenCalled();
expect(peekSystemEvents(sessionKey)).toEqual([]);
} finally {
vi.useRealTimers();
}
});
});
it("surfaces long-running deferred maintenance and completion via task updates", async () => {
await withStateDirEnv("openclaw-turn-maintenance-", async () => {
vi.useFakeTimers();
try {
resetCommandQueueStateForTest();
resetTaskRegistryForTests({ persist: false });
resetTaskFlowRegistryForTests({ persist: false });
resetSystemEventsForTest();
const sessionKey = "agent:main:session-long";
const sessionLane = resolveSessionLane(sessionKey);
let releaseForeground!: () => void;
const foregroundTurn = enqueueCommandInLane(sessionLane, async () => {
await new Promise<void>((resolve) => {
releaseForeground = resolve;
});
});
await Promise.resolve();
const maintain = vi.fn(async () => ({
changed: false,
bytesFreed: 0,
rewrittenEntries: 0,
}));
const backgroundEngine = {
info: {
id: "test",
name: "Test Engine",
turnMaintenanceMode: "background" as const,
},
ingest: async () => ({ ingested: true }),
assemble: async ({ messages }: { messages: unknown[] }) => ({
messages,
estimatedTokens: 0,
}),
compact: async () => ({ ok: true, compacted: false }),
maintain,
} as NonNullable<Parameters<typeof runContextEngineMaintenance>[0]["contextEngine"]>;
await runContextEngineMaintenance({
contextEngine: backgroundEngine,
sessionId: "session-long",
sessionKey,
sessionFile: "/tmp/session-long.jsonl",
reason: "turn",
});
await vi.advanceTimersByTimeAsync(11_000);
await waitForAssertion(() =>
expect(peekSystemEvents(sessionKey)).toEqual(
expect.arrayContaining([
expect.stringContaining("Background task update: Context engine turn maintenance."),
]),
),
);
releaseForeground();
await waitForAssertion(() =>
expect(peekSystemEvents(sessionKey)).toEqual(
expect.arrayContaining([
expect.stringContaining("Background task done: Context engine turn maintenance"),
]),
),
);
await foregroundTurn;
} finally {
vi.useRealTimers();
}
});
});
it("surfaces deferred maintenance failures even when they fail quickly", async () => {
await withStateDirEnv("openclaw-turn-maintenance-", async () => {
vi.useFakeTimers();
try {
resetCommandQueueStateForTest();
resetTaskRegistryForTests({ persist: false });
resetTaskFlowRegistryForTests({ persist: false });
resetSystemEventsForTest();
const sessionKey = "agent:main:session-fail";
const backgroundEngine = {
info: {
id: "test",
name: "Test Engine",
turnMaintenanceMode: "background" as const,
},
ingest: async () => ({ ingested: true }),
assemble: async ({ messages }: { messages: unknown[] }) => ({
messages,
estimatedTokens: 0,
}),
compact: async () => ({ ok: true, compacted: false }),
maintain: vi.fn(async () => {
throw new Error("maintenance exploded");
}),
} as NonNullable<Parameters<typeof runContextEngineMaintenance>[0]["contextEngine"]>;
await runContextEngineMaintenance({
contextEngine: backgroundEngine,
sessionId: "session-fail",
sessionKey,
sessionFile: "/tmp/session-fail.jsonl",
reason: "turn",
});
await waitForAssertion(() =>
expect(peekSystemEvents(sessionKey)).toEqual(
expect.arrayContaining([
expect.stringContaining("Background task failed: Context engine turn maintenance"),
]),
),
);
} finally {
vi.useRealTimers();
}
});
});
});

View File

@@ -1,14 +1,107 @@
import { randomUUID } from "node:crypto";
import type {
ContextEngine,
ContextEngineMaintenanceResult,
ContextEngineRuntimeContext,
} from "../../context-engine/types.js";
import { sleepWithAbort } from "../../infra/backoff.js";
import { formatErrorMessage } from "../../infra/errors.js";
import { enqueueCommandInLane, getQueueSize } from "../../process/command-queue.js";
import { normalizeOptionalString } from "../../shared/string-coerce.js";
import {
completeTaskRunByRunId,
createQueuedTaskRun,
failTaskRunByRunId,
recordTaskRunProgressByRunId,
startTaskRunByRunId,
} from "../../tasks/task-executor.js";
import {
findTaskByRunId,
setTaskRunDeliveryStatusByRunId,
updateTaskNotifyPolicyById,
} from "../../tasks/task-registry.js";
import { findActiveSessionTask } from "../session-async-task-status.js";
import { resolveSessionLane } from "./lanes.js";
import { log } from "./logger.js";
import {
rewriteTranscriptEntriesInSessionFile,
rewriteTranscriptEntriesInSessionManager,
} from "./transcript-rewrite.js";
const TURN_MAINTENANCE_TASK_KIND = "context_engine_turn_maintenance";
const TURN_MAINTENANCE_TASK_LABEL = "Context engine turn maintenance";
const TURN_MAINTENANCE_TASK_TASK = "Deferred context-engine maintenance after turn.";
const TURN_MAINTENANCE_LANE_PREFIX = "context-engine-turn-maintenance:";
const TURN_MAINTENANCE_WAIT_POLL_MS = 100;
const TURN_MAINTENANCE_LONG_WAIT_MS = 10_000;
const activeDeferredTurnMaintenanceRuns = new Map<string, Promise<void>>();
function normalizeSessionKey(sessionKey?: string): string | undefined {
return normalizeOptionalString(sessionKey) || undefined;
}
function resolveDeferredTurnMaintenanceLane(sessionKey: string): string {
return `${TURN_MAINTENANCE_LANE_PREFIX}${sessionKey}`;
}
function buildTurnMaintenanceTaskDescriptor(params: { sessionKey: string }) {
const runId = `turn-maint:${params.sessionKey}:${Date.now().toString(36)}:${randomUUID().slice(
0,
8,
)}`;
return createQueuedTaskRun({
runtime: "acp",
taskKind: TURN_MAINTENANCE_TASK_KIND,
sourceId: TURN_MAINTENANCE_TASK_KIND,
requesterSessionKey: params.sessionKey,
ownerKey: params.sessionKey,
scopeKind: "session",
runId,
label: TURN_MAINTENANCE_TASK_LABEL,
task: TURN_MAINTENANCE_TASK_TASK,
notifyPolicy: "silent",
deliveryStatus: "pending",
preferMetadata: true,
});
}
function promoteTurnMaintenanceTaskVisibility(params: {
sessionKey: string;
runId: string;
notifyPolicy: "done_only" | "state_changes";
}) {
const task = findTaskByRunId(params.runId);
if (!task) {
return createQueuedTaskRun({
runtime: "acp",
taskKind: TURN_MAINTENANCE_TASK_KIND,
sourceId: TURN_MAINTENANCE_TASK_KIND,
requesterSessionKey: params.sessionKey,
ownerKey: params.sessionKey,
scopeKind: "session",
runId: params.runId,
label: TURN_MAINTENANCE_TASK_LABEL,
task: TURN_MAINTENANCE_TASK_TASK,
notifyPolicy: params.notifyPolicy,
deliveryStatus: "pending",
preferMetadata: true,
});
}
setTaskRunDeliveryStatusByRunId({
runId: params.runId,
runtime: "acp",
sessionKey: params.sessionKey,
deliveryStatus: "pending",
});
if (task.notifyPolicy !== params.notifyPolicy) {
updateTaskNotifyPolicyById({
taskId: task.taskId,
notifyPolicy: params.notifyPolicy,
});
}
return findTaskByRunId(params.runId) ?? task;
}
/**
* Attach runtime-owned transcript rewrite helpers to an existing
* context-engine runtime context payload.
@@ -19,9 +112,11 @@ export function buildContextEngineMaintenanceRuntimeContext(params: {
sessionFile: string;
sessionManager?: Parameters<typeof rewriteTranscriptEntriesInSessionManager>[0]["sessionManager"];
runtimeContext?: ContextEngineRuntimeContext;
allowDeferredCompactionExecution?: boolean;
}): ContextEngineRuntimeContext {
return {
...params.runtimeContext,
...(params.allowDeferredCompactionExecution ? { allowDeferredCompactionExecution: true } : {}),
rewriteTranscriptEntries: async (request) => {
if (params.sessionManager) {
return rewriteTranscriptEntriesInSessionManager({
@@ -39,6 +134,228 @@ export function buildContextEngineMaintenanceRuntimeContext(params: {
};
}
async function executeContextEngineMaintenance(params: {
contextEngine: ContextEngine;
sessionId: string;
sessionKey?: string;
sessionFile: string;
reason: "bootstrap" | "compaction" | "turn";
sessionManager?: Parameters<typeof rewriteTranscriptEntriesInSessionManager>[0]["sessionManager"];
runtimeContext?: ContextEngineRuntimeContext;
executionMode: "foreground" | "background";
}): Promise<ContextEngineMaintenanceResult | undefined> {
if (typeof params.contextEngine.maintain !== "function") {
return undefined;
}
const result = await params.contextEngine.maintain({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
runtimeContext: buildContextEngineMaintenanceRuntimeContext({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
sessionManager: params.sessionManager,
runtimeContext: params.runtimeContext,
allowDeferredCompactionExecution: params.executionMode === "background",
}),
});
if (result.changed) {
log.info(
`[context-engine] maintenance(${params.reason}) changed transcript ` +
`rewrittenEntries=${result.rewrittenEntries} bytesFreed=${result.bytesFreed} ` +
`sessionKey=${params.sessionKey ?? params.sessionId ?? "unknown"}`,
);
}
return result;
}
async function runDeferredTurnMaintenanceWorker(params: {
contextEngine: ContextEngine;
sessionId: string;
sessionKey: string;
sessionFile: string;
sessionManager?: Parameters<typeof rewriteTranscriptEntriesInSessionManager>[0]["sessionManager"];
runtimeContext?: ContextEngineRuntimeContext;
runId: string;
}): Promise<void> {
let surfacedUserNotice = false;
let longRunningTimer: ReturnType<typeof setTimeout> | null = null;
const surfaceMaintenanceUpdate = (summary: string, eventSummary: string) => {
promoteTurnMaintenanceTaskVisibility({
sessionKey: params.sessionKey,
runId: params.runId,
notifyPolicy: "state_changes",
});
surfacedUserNotice = true;
recordTaskRunProgressByRunId({
runId: params.runId,
runtime: "acp",
sessionKey: params.sessionKey,
lastEventAt: Date.now(),
progressSummary: summary,
eventSummary,
});
};
try {
const sessionLane = resolveSessionLane(params.sessionKey);
const startedWaitingAt = Date.now();
let lastWaitNoticeAt = 0;
while (getQueueSize(sessionLane) > 0) {
const now = Date.now();
if (
lastWaitNoticeAt === 0 ||
now - lastWaitNoticeAt >= TURN_MAINTENANCE_LONG_WAIT_MS ||
now - startedWaitingAt >= TURN_MAINTENANCE_LONG_WAIT_MS
) {
lastWaitNoticeAt = now;
if (now - startedWaitingAt >= TURN_MAINTENANCE_LONG_WAIT_MS) {
surfaceMaintenanceUpdate(
"Waiting for the session lane to go idle.",
surfacedUserNotice
? "Still waiting for the session lane to go idle."
: "Deferred maintenance is waiting for the session lane to go idle.",
);
}
}
await sleepWithAbort(TURN_MAINTENANCE_WAIT_POLL_MS);
}
const runningAt = Date.now();
startTaskRunByRunId({
runId: params.runId,
runtime: "acp",
sessionKey: params.sessionKey,
startedAt: runningAt,
lastEventAt: runningAt,
progressSummary: "Running deferred maintenance.",
eventSummary: "Starting deferred maintenance.",
});
longRunningTimer = setTimeout(() => {
try {
surfaceMaintenanceUpdate(
"Deferred maintenance is still running.",
"Deferred maintenance is still running.",
);
} catch (error) {
log.warn(`failed to surface deferred maintenance progress: ${String(error)}`);
}
}, TURN_MAINTENANCE_LONG_WAIT_MS);
const result = await executeContextEngineMaintenance({
contextEngine: params.contextEngine,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
reason: "turn",
sessionManager: params.sessionManager,
runtimeContext: params.runtimeContext,
executionMode: "background",
});
if (longRunningTimer) {
clearTimeout(longRunningTimer);
longRunningTimer = null;
}
const endedAt = Date.now();
completeTaskRunByRunId({
runId: params.runId,
runtime: "acp",
sessionKey: params.sessionKey,
endedAt,
lastEventAt: endedAt,
progressSummary: result?.changed
? "Deferred maintenance completed with transcript changes."
: "Deferred maintenance completed.",
terminalSummary: result?.changed
? `Rewrote ${result.rewrittenEntries} transcript entr${result.rewrittenEntries === 1 ? "y" : "ies"} and freed ${result.bytesFreed} bytes.`
: "No transcript changes were needed.",
});
} catch (err) {
if (longRunningTimer) {
clearTimeout(longRunningTimer);
longRunningTimer = null;
}
const endedAt = Date.now();
const reason = formatErrorMessage(err);
if (!surfacedUserNotice) {
promoteTurnMaintenanceTaskVisibility({
sessionKey: params.sessionKey,
runId: params.runId,
notifyPolicy: "done_only",
});
}
failTaskRunByRunId({
runId: params.runId,
runtime: "acp",
sessionKey: params.sessionKey,
endedAt,
lastEventAt: endedAt,
error: reason,
progressSummary: "Deferred maintenance failed.",
terminalSummary: reason,
});
log.warn(`deferred context engine maintenance failed: ${reason}`);
}
}
function scheduleDeferredTurnMaintenance(params: {
contextEngine: ContextEngine;
sessionId: string;
sessionKey: string;
sessionFile: string;
sessionManager?: Parameters<typeof rewriteTranscriptEntriesInSessionManager>[0]["sessionManager"];
runtimeContext?: ContextEngineRuntimeContext;
}): void {
const sessionKey = normalizeSessionKey(params.sessionKey);
if (!sessionKey) {
return;
}
if (activeDeferredTurnMaintenanceRuns.has(sessionKey)) {
return;
}
const existingTask = findActiveSessionTask({
sessionKey,
runtime: "acp",
taskKind: TURN_MAINTENANCE_TASK_KIND,
});
const task =
existingTask ??
buildTurnMaintenanceTaskDescriptor({
sessionKey,
});
log.info(
`[context-engine] deferred turn maintenance ${existingTask ? "resuming" : "queued"} ` +
`taskId=${task.taskId} sessionKey=${sessionKey} lane=${resolveDeferredTurnMaintenanceLane(sessionKey)}`,
);
const runPromise = enqueueCommandInLane(
resolveDeferredTurnMaintenanceLane(sessionKey),
async () =>
runDeferredTurnMaintenanceWorker({
contextEngine: params.contextEngine,
sessionId: params.sessionId,
sessionKey,
sessionFile: params.sessionFile,
sessionManager: params.sessionManager,
runtimeContext: params.runtimeContext,
runId: task.runId ?? task.taskId,
}),
);
const trackedPromise = runPromise
.catch((err) => {
log.warn(`failed to schedule deferred context engine maintenance: ${String(err)}`);
})
.finally(() => {
activeDeferredTurnMaintenanceRuns.delete(sessionKey);
});
activeDeferredTurnMaintenanceRuns.set(sessionKey, trackedPromise);
void trackedPromise;
}
/**
* Run optional context-engine transcript maintenance and normalize the result.
*/
@@ -50,32 +367,45 @@ export async function runContextEngineMaintenance(params: {
reason: "bootstrap" | "compaction" | "turn";
sessionManager?: Parameters<typeof rewriteTranscriptEntriesInSessionManager>[0]["sessionManager"];
runtimeContext?: ContextEngineRuntimeContext;
executionMode?: "foreground" | "background";
}): Promise<ContextEngineMaintenanceResult | undefined> {
if (typeof params.contextEngine?.maintain !== "function") {
return undefined;
}
try {
const result = await params.contextEngine.maintain({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
runtimeContext: buildContextEngineMaintenanceRuntimeContext({
const executionMode = params.executionMode ?? "foreground";
const shouldDefer =
params.reason === "turn" &&
executionMode !== "background" &&
params.contextEngine.info.turnMaintenanceMode === "background";
if (shouldDefer) {
try {
scheduleDeferredTurnMaintenance({
contextEngine: params.contextEngine,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionKey: params.sessionKey ?? params.sessionId,
sessionFile: params.sessionFile,
sessionManager: params.sessionManager,
runtimeContext: params.runtimeContext,
}),
});
if (result.changed) {
log.info(
`[context-engine] maintenance(${params.reason}) changed transcript ` +
`rewrittenEntries=${result.rewrittenEntries} bytesFreed=${result.bytesFreed} ` +
`sessionKey=${params.sessionKey ?? params.sessionId ?? "unknown"}`,
);
});
} catch (err) {
log.warn(`failed to schedule deferred context engine maintenance: ${String(err)}`);
}
return result;
return undefined;
}
try {
return await executeContextEngineMaintenance({
contextEngine: params.contextEngine,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
reason: params.reason,
sessionManager: params.sessionManager,
runtimeContext: params.runtimeContext,
executionMode,
});
} catch (err) {
log.warn(`context engine maintain failed (${params.reason}): ${String(err)}`);
return undefined;

View File

@@ -50,6 +50,13 @@ export type ContextEngineInfo = {
version?: string;
/** True when the engine manages its own compaction lifecycle. */
ownsCompaction?: boolean;
/**
* Controls how turn-triggered maintenance should be executed.
*
* Engines remain compatible by default unless the host explicitly opts into
* background turn maintenance.
*/
turnMaintenanceMode?: "foreground" | "background";
};
export type SubagentSpawnPreparation = {
@@ -128,6 +135,11 @@ export type ContextEnginePromptCacheInfo = {
};
export type ContextEngineRuntimeContext = Record<string, unknown> & {
/**
* True when the host has explicitly opted this maintenance run into
* consuming deferred compaction debt.
*/
allowDeferredCompactionExecution?: boolean;
/** Optional prompt-cache telemetry for cache-aware engines. */
promptCache?: ContextEnginePromptCacheInfo;
/**

View File

@@ -43,4 +43,19 @@ describe("backoff helpers", () => {
cause: expect.anything(),
});
});
it("advances with fake timers", async () => {
vi.useFakeTimers();
try {
const sleeper = sleepWithAbort(50);
await vi.advanceTimersByTimeAsync(49);
await expect(
Promise.race([sleeper.then(() => "done"), Promise.resolve("pending")]),
).resolves.toBe("pending");
await vi.advanceTimersByTimeAsync(1);
await expect(sleeper).resolves.toBeUndefined();
} finally {
vi.useRealTimers();
}
});
});

View File

@@ -1,5 +1,3 @@
import { setTimeout as delay } from "node:timers/promises";
export type BackoffPolicy = {
initialMs: number;
maxMs: number;
@@ -17,12 +15,38 @@ export async function sleepWithAbort(ms: number, abortSignal?: AbortSignal) {
if (ms <= 0) {
return;
}
try {
await delay(ms, undefined, { signal: abortSignal });
} catch (err) {
if (abortSignal?.aborted) {
throw new Error("aborted", { cause: err });
}
throw err;
if (abortSignal?.aborted) {
throw new Error("aborted", { cause: abortSignal.reason ?? new Error("aborted") });
}
await new Promise<void>((resolve, reject) => {
let settled = false;
let timer: ReturnType<typeof setTimeout> | null = setTimeout(() => {
settled = true;
if (abortSignal) {
abortSignal.removeEventListener("abort", onAbort);
}
timer = null;
resolve();
}, ms);
const onAbort = () => {
if (settled) {
return;
}
settled = true;
if (timer) {
clearTimeout(timer);
timer = null;
}
if (abortSignal) {
abortSignal.removeEventListener("abort", onAbort);
}
reject(new Error("aborted", { cause: abortSignal?.reason ?? new Error("aborted") }));
};
if (abortSignal) {
abortSignal.addEventListener("abort", onAbort, { once: true });
}
});
}