Queue follow-up deferred maintenance runs

This commit is contained in:
Eva
2026-04-12 17:28:38 +07:00
committed by Josh Lehman
parent 47eedd0983
commit cdafbd3899
2 changed files with 115 additions and 16 deletions

View File

@@ -394,7 +394,7 @@ describe("runContextEngineMaintenance", () => {
});
});
it("coalesces repeated turn maintenance requests for the same session", async () => {
it("coalesces repeated requests into one active run plus one follow-up run for the same session", async () => {
await withStateDirEnv("openclaw-turn-maintenance-", async () => {
vi.useFakeTimers();
try {
@@ -456,10 +456,12 @@ describe("runContextEngineMaintenance", () => {
expect(queuedTasks).toHaveLength(1);
releaseForeground();
await waitForAssertion(() => expect(maintain).toHaveBeenCalledTimes(1));
expect(getTaskById(queuedTasks[0].taskId)).toMatchObject({
status: "succeeded",
});
await waitForAssertion(() => expect(maintain).toHaveBeenCalledTimes(2));
const completedTasks = listTasksForOwnerKey(sessionKey).filter(
(task) => task.taskKind === TURN_MAINTENANCE_TASK_KIND,
);
expect(completedTasks).toHaveLength(2);
expect(completedTasks.every((task) => task.status === "succeeded")).toBe(true);
await foregroundTurn;
} finally {
@@ -468,6 +470,78 @@ describe("runContextEngineMaintenance", () => {
});
});
it("queues a follow-up maintenance run when a new turn finishes during an active deferred run", async () => {
await withStateDirEnv("openclaw-turn-maintenance-rerun-", async () => {
vi.useFakeTimers();
try {
resetCommandQueueStateForTest();
resetTaskRegistryForTests({ persist: false });
resetTaskFlowRegistryForTests({ persist: false });
const sessionKey = "agent:main:session-rerun";
let releaseFirstMaintenance!: () => void;
let maintenanceCalls = 0;
const maintain = vi.fn(async () => {
maintenanceCalls += 1;
if (maintenanceCalls === 1) {
await new Promise<void>((resolve) => {
releaseFirstMaintenance = resolve;
});
}
return {
changed: false,
bytesFreed: 0,
rewrittenEntries: 0,
};
});
const backgroundEngine = {
info: {
id: "test",
name: "Test Engine",
turnMaintenanceMode: "background" as const,
},
ingest: async () => ({ ingested: true }),
assemble: async ({ messages }: { messages: unknown[] }) => ({
messages,
estimatedTokens: 0,
}),
compact: async () => ({ ok: true, compacted: false }),
maintain,
} as NonNullable<Parameters<typeof runContextEngineMaintenance>[0]["contextEngine"]>;
await runContextEngineMaintenance({
contextEngine: backgroundEngine,
sessionId: "session-rerun",
sessionKey,
sessionFile: "/tmp/session-rerun.jsonl",
reason: "turn",
});
await waitForAssertion(() => expect(maintain).toHaveBeenCalledTimes(1));
await runContextEngineMaintenance({
contextEngine: backgroundEngine,
sessionId: "session-rerun",
sessionKey,
sessionFile: "/tmp/session-rerun.jsonl",
reason: "turn",
});
releaseFirstMaintenance();
await waitForAssertion(() => expect(maintain).toHaveBeenCalledTimes(2));
const tasks = listTasksForOwnerKey(sessionKey).filter(
(task) => task.taskKind === TURN_MAINTENANCE_TASK_KIND,
);
expect(tasks).toHaveLength(2);
expect(tasks.every((task) => task.status === "succeeded")).toBe(true);
} finally {
vi.useRealTimers();
}
});
});
it("replaces legacy active maintenance tasks that are missing a runId", async () => {
await withStateDirEnv("openclaw-turn-maintenance-", async () => {
vi.useFakeTimers();

View File

@@ -35,7 +35,22 @@ const TURN_MAINTENANCE_TASK_TASK = "Deferred context-engine maintenance after tu
const TURN_MAINTENANCE_LANE_PREFIX = "context-engine-turn-maintenance:";
const TURN_MAINTENANCE_WAIT_POLL_MS = 100;
const TURN_MAINTENANCE_LONG_WAIT_MS = 10_000;
const activeDeferredTurnMaintenanceRuns = new Map<string, Promise<void>>();
type DeferredTurnMaintenanceScheduleParams = {
contextEngine: ContextEngine;
sessionId: string;
sessionKey: string;
sessionFile: string;
sessionManager?: Parameters<typeof rewriteTranscriptEntriesInSessionManager>[0]["sessionManager"];
runtimeContext?: ContextEngineRuntimeContext;
};
type DeferredTurnMaintenanceRunState = {
promise: Promise<void>;
rerunRequested: boolean;
latestParams: DeferredTurnMaintenanceScheduleParams;
};
const activeDeferredTurnMaintenanceRuns = new Map<string, DeferredTurnMaintenanceRunState>();
function normalizeSessionKey(sessionKey?: string): string | undefined {
return normalizeOptionalString(sessionKey) || undefined;
@@ -357,19 +372,15 @@ async function runDeferredTurnMaintenanceWorker(params: {
}
}
function scheduleDeferredTurnMaintenance(params: {
contextEngine: ContextEngine;
sessionId: string;
sessionKey: string;
sessionFile: string;
sessionManager?: Parameters<typeof rewriteTranscriptEntriesInSessionManager>[0]["sessionManager"];
runtimeContext?: ContextEngineRuntimeContext;
}): void {
function scheduleDeferredTurnMaintenance(params: DeferredTurnMaintenanceScheduleParams): void {
const sessionKey = normalizeSessionKey(params.sessionKey);
if (!sessionKey) {
return;
}
if (activeDeferredTurnMaintenanceRuns.has(sessionKey)) {
const activeRun = activeDeferredTurnMaintenanceRuns.get(sessionKey);
if (activeRun) {
activeRun.rerunRequested = true;
activeRun.latestParams = { ...params, sessionKey };
return;
}
@@ -414,14 +425,28 @@ function scheduleDeferredTurnMaintenance(params: {
runId: task.runId!,
}),
);
let state!: DeferredTurnMaintenanceRunState;
const trackedPromise = runPromise
.catch((err) => {
log.warn(`failed to schedule deferred context engine maintenance: ${String(err)}`);
})
.finally(() => {
const current = activeDeferredTurnMaintenanceRuns.get(sessionKey);
if (current !== state) {
return;
}
const rerunParams = current.rerunRequested ? current.latestParams : undefined;
activeDeferredTurnMaintenanceRuns.delete(sessionKey);
if (rerunParams) {
scheduleDeferredTurnMaintenance(rerunParams);
}
});
activeDeferredTurnMaintenanceRuns.set(sessionKey, trackedPromise);
state = {
promise: trackedPromise,
rerunRequested: false,
latestParams: { ...params, sessionKey },
};
activeDeferredTurnMaintenanceRuns.set(sessionKey, state);
void trackedPromise;
}