Harden deferred maintenance scheduling edges

This commit is contained in:
Eva
2026-04-12 17:56:42 +07:00
committed by Josh Lehman
parent cdafbd3899
commit 809cd14633
2 changed files with 212 additions and 26 deletions

View File

@@ -5,6 +5,7 @@ import {
enqueueCommandInLane,
resetCommandQueueStateForTest,
} from "../../process/command-queue.js";
import * as commandQueueModule from "../../process/command-queue.js";
import { createQueuedTaskRun } from "../../tasks/task-executor.js";
import { resetTaskFlowRegistryForTests } from "../../tasks/task-flow-registry.js";
import {
@@ -14,8 +15,8 @@ import {
resetTaskRegistryForTests,
setTaskRegistryDeliveryRuntimeForTests,
} from "../../tasks/task-registry.js";
import { castAgentMessage } from "../test-helpers/agent-message-fixtures.js";
import { withStateDirEnv } from "../../test-helpers/state-dir-env.js";
import { castAgentMessage } from "../test-helpers/agent-message-fixtures.js";
import { resolveSessionLane } from "./lanes.js";
const rewriteTranscriptEntriesInSessionManagerMock = vi.fn((_params?: unknown) => ({
@@ -30,6 +31,7 @@ const rewriteTranscriptEntriesInSessionFileMock = vi.fn(async (_params?: unknown
}));
let buildContextEngineMaintenanceRuntimeContext: typeof import("./context-engine-maintenance.js").buildContextEngineMaintenanceRuntimeContext;
let createDeferredTurnMaintenanceAbortSignal: typeof import("./context-engine-maintenance.js").createDeferredTurnMaintenanceAbortSignal;
let resetDeferredTurnMaintenanceStateForTest: typeof import("./context-engine-maintenance.js").resetDeferredTurnMaintenanceStateForTest;
let runContextEngineMaintenance: typeof import("./context-engine-maintenance.js").runContextEngineMaintenance;
// Keep this literal aligned with the production module; tests use dynamic
// import reloading, so they cannot safely import the constant directly.
@@ -72,9 +74,10 @@ async function loadFreshContextEngineMaintenanceModuleForTest() {
({
buildContextEngineMaintenanceRuntimeContext,
createDeferredTurnMaintenanceAbortSignal,
resetDeferredTurnMaintenanceStateForTest,
runContextEngineMaintenance,
} =
await import("./context-engine-maintenance.js"));
} = await import("./context-engine-maintenance.js"));
resetDeferredTurnMaintenanceStateForTest();
}
describe("buildContextEngineMaintenanceRuntimeContext", () => {
@@ -159,6 +162,7 @@ describe("createDeferredTurnMaintenanceAbortSignal", () => {
it("aborts on termination signals and unregisters listeners", () => {
const listeners = new Map<string, Set<() => void>>();
const kill = vi.fn();
const processLike = {
on(event: "SIGINT" | "SIGTERM", listener: () => void) {
const bucket = listeners.get(event) ?? new Set<() => void>();
@@ -170,9 +174,17 @@ describe("createDeferredTurnMaintenanceAbortSignal", () => {
listeners.get(event)?.delete(listener);
return this;
},
} as unknown as Pick<NodeJS.Process, "on" | "off">;
listenerCount(event: "SIGINT" | "SIGTERM") {
return listeners.get(event)?.size ?? 0;
},
kill,
pid: 4242,
} as unknown as NonNullable<
Parameters<typeof createDeferredTurnMaintenanceAbortSignal>[0]
>["processLike"];
const { abortSignal, dispose } = createDeferredTurnMaintenanceAbortSignal({ processLike });
const second = createDeferredTurnMaintenanceAbortSignal({ processLike });
expect(listeners.get("SIGINT")?.size ?? 0).toBe(1);
expect(listeners.get("SIGTERM")?.size ?? 0).toBe(1);
@@ -181,10 +193,13 @@ describe("createDeferredTurnMaintenanceAbortSignal", () => {
sigtermListeners[0]?.();
expect(abortSignal?.aborted).toBe(true);
expect(second.abortSignal?.aborted).toBe(true);
expect(kill).toHaveBeenCalledWith(4242, "SIGTERM");
expect(listeners.get("SIGINT")?.size ?? 0).toBe(0);
expect(listeners.get("SIGTERM")?.size ?? 0).toBe(0);
dispose();
second.dispose();
expect(listeners.get("SIGINT")?.size ?? 0).toBe(0);
expect(listeners.get("SIGTERM")?.size ?? 0).toBe(0);
});
@@ -246,7 +261,9 @@ describe("runContextEngineMaintenance", () => {
it("forces background maintenance rewrites through the session file even when a session manager exists", async () => {
const maintain = vi.fn(async (params?: unknown) => {
await (params as { runtimeContext?: ContextEngineRuntimeContext } | undefined)?.runtimeContext?.rewriteTranscriptEntries?.({
await (
params as { runtimeContext?: ContextEngineRuntimeContext } | undefined
)?.runtimeContext?.rewriteTranscriptEntries?.({
replacements: [
{
entryId: "entry-1",
@@ -577,7 +594,10 @@ describe("runContextEngineMaintenance", () => {
turnMaintenanceMode: "background" as const,
},
ingest: async () => ({ ingested: true }),
assemble: async ({ messages }: { messages: unknown[] }) => ({ messages, estimatedTokens: 0 }),
assemble: async ({ messages }: { messages: unknown[] }) => ({
messages,
estimatedTokens: 0,
}),
compact: async () => ({ ok: true, compacted: false }),
maintain,
} as NonNullable<Parameters<typeof runContextEngineMaintenance>[0]["contextEngine"]>;
@@ -607,6 +627,64 @@ describe("runContextEngineMaintenance", () => {
});
});
it("cancels the queued task when deferred scheduling is rejected", async () => {
await withStateDirEnv("openclaw-turn-maintenance-", async () => {
vi.useFakeTimers();
const scheduleError = new Error("gateway draining");
const enqueueSpy = vi
.spyOn(commandQueueModule, "enqueueCommandInLane")
.mockRejectedValue(scheduleError);
try {
resetTaskRegistryForTests({ persist: false });
resetTaskFlowRegistryForTests({ persist: false });
resetCommandQueueStateForTest();
const sessionKey = "agent:main:session-enqueue-reject";
const maintain = vi.fn(async () => ({
changed: false,
bytesFreed: 0,
rewrittenEntries: 0,
}));
const backgroundEngine = {
info: {
id: "test",
name: "Test Engine",
turnMaintenanceMode: "background" as const,
},
ingest: async () => ({ ingested: true }),
assemble: async ({ messages }: { messages: unknown[] }) => ({
messages,
estimatedTokens: 0,
}),
compact: async () => ({ ok: true, compacted: false }),
maintain,
} as NonNullable<Parameters<typeof runContextEngineMaintenance>[0]["contextEngine"]>;
await runContextEngineMaintenance({
contextEngine: backgroundEngine,
sessionId: "session-enqueue-reject",
sessionKey,
sessionFile: "/tmp/session-enqueue-reject.jsonl",
reason: "turn",
});
await flushAsyncWork();
const tasks = listTasksForOwnerKey(sessionKey).filter(
(task) => task.taskKind === TURN_MAINTENANCE_TASK_KIND,
);
expect(tasks).toHaveLength(1);
expect(tasks[0]).toMatchObject({
status: "cancelled",
terminalSummary: expect.stringContaining("gateway draining"),
});
expect(maintain).not.toHaveBeenCalled();
} finally {
enqueueSpy.mockRestore();
vi.useRealTimers();
}
});
});
it("lets foreground turns win while deferred maintenance is waiting", async () => {
await withStateDirEnv("openclaw-turn-maintenance-", async () => {
vi.useFakeTimers();
@@ -832,7 +910,10 @@ describe("runContextEngineMaintenance", () => {
turnMaintenanceMode: "background" as const,
},
ingest: async () => ({ ingested: true }),
assemble: async ({ messages }: { messages: unknown[] }) => ({ messages, estimatedTokens: 0 }),
assemble: async ({ messages }: { messages: unknown[] }) => ({
messages,
estimatedTokens: 0,
}),
compact: async () => ({ ok: true, compacted: false }),
maintain: vi.fn(async () => ({
changed: false,

View File

@@ -35,6 +35,9 @@ const TURN_MAINTENANCE_TASK_TASK = "Deferred context-engine maintenance after tu
const TURN_MAINTENANCE_LANE_PREFIX = "context-engine-turn-maintenance:";
const TURN_MAINTENANCE_WAIT_POLL_MS = 100;
const TURN_MAINTENANCE_LONG_WAIT_MS = 10_000;
const DEFERRED_TURN_MAINTENANCE_ABORT_STATE_KEY = Symbol.for(
"openclaw.contextEngineTurnMaintenanceAbortState",
);
type DeferredTurnMaintenanceScheduleParams = {
contextEngine: ContextEngine;
sessionId: string;
@@ -52,6 +55,47 @@ type DeferredTurnMaintenanceRunState = {
const activeDeferredTurnMaintenanceRuns = new Map<string, DeferredTurnMaintenanceRunState>();
type DeferredTurnMaintenanceSignal = "SIGINT" | "SIGTERM";
type DeferredTurnMaintenanceProcessLike = Pick<NodeJS.Process, "on" | "off"> &
Partial<Pick<NodeJS.Process, "listenerCount" | "kill" | "pid">> & {
[DEFERRED_TURN_MAINTENANCE_ABORT_STATE_KEY]?: DeferredTurnMaintenanceAbortState;
};
type DeferredTurnMaintenanceAbortState = {
registered: boolean;
controllers: Set<AbortController>;
cleanupHandlers: Map<DeferredTurnMaintenanceSignal, () => void>;
};
function resolveDeferredTurnMaintenanceAbortState(
processLike: DeferredTurnMaintenanceProcessLike,
): DeferredTurnMaintenanceAbortState {
const existing = processLike[DEFERRED_TURN_MAINTENANCE_ABORT_STATE_KEY];
if (existing) {
return existing;
}
const created: DeferredTurnMaintenanceAbortState = {
registered: false,
controllers: new Set<AbortController>(),
cleanupHandlers: new Map<DeferredTurnMaintenanceSignal, () => void>(),
};
processLike[DEFERRED_TURN_MAINTENANCE_ABORT_STATE_KEY] = created;
return created;
}
function unregisterDeferredTurnMaintenanceAbortSignalHandlers(
processLike: DeferredTurnMaintenanceProcessLike,
state: DeferredTurnMaintenanceAbortState,
): void {
if (!state.registered) {
return;
}
for (const [signal, handler] of state.cleanupHandlers) {
processLike.off(signal, handler);
}
state.cleanupHandlers.clear();
state.registered = false;
}
function normalizeSessionKey(sessionKey?: string): string | undefined {
return normalizeOptionalString(sessionKey) || undefined;
}
@@ -61,7 +105,7 @@ function resolveDeferredTurnMaintenanceLane(sessionKey: string): string {
}
export function createDeferredTurnMaintenanceAbortSignal(params?: {
processLike?: Pick<NodeJS.Process, "on" | "off">;
processLike?: DeferredTurnMaintenanceProcessLike;
}): {
abortSignal?: AbortSignal;
dispose: () => void;
@@ -70,8 +114,42 @@ export function createDeferredTurnMaintenanceAbortSignal(params?: {
return { abortSignal: undefined, dispose: () => {} };
}
const processLike = params?.processLike ?? process;
const processLike = (params?.processLike ?? process) as DeferredTurnMaintenanceProcessLike;
const state = resolveDeferredTurnMaintenanceAbortState(processLike);
const handleTerminationSignal = (signalName: DeferredTurnMaintenanceSignal) => {
const shouldReraise =
typeof processLike.listenerCount === "function"
? processLike.listenerCount(signalName) === 1
: false;
for (const activeController of state.controllers) {
if (!activeController.signal.aborted) {
activeController.abort(
new Error(`received ${signalName} while waiting for deferred maintenance`),
);
}
}
state.controllers.clear();
unregisterDeferredTurnMaintenanceAbortSignalHandlers(processLike, state);
if (shouldReraise && typeof processLike.kill === "function") {
try {
processLike.kill(processLike.pid ?? process.pid, signalName);
} catch {
// Ignore shutdown-path failures.
}
}
};
if (!state.registered) {
state.registered = true;
const onSigint = () => handleTerminationSignal("SIGINT");
const onSigterm = () => handleTerminationSignal("SIGTERM");
state.cleanupHandlers.set("SIGINT", onSigint);
state.cleanupHandlers.set("SIGTERM", onSigterm);
processLike.on("SIGINT", onSigint);
processLike.on("SIGTERM", onSigterm);
}
const controller = new AbortController();
state.controllers.add(controller);
let disposed = false;
const cleanup = () => {
@@ -79,20 +157,11 @@ export function createDeferredTurnMaintenanceAbortSignal(params?: {
return;
}
disposed = true;
processLike.off("SIGINT", onSigint);
processLike.off("SIGTERM", onSigterm);
};
const abortWith = (signalName: "SIGINT" | "SIGTERM") => {
if (!controller.signal.aborted) {
controller.abort(new Error(`received ${signalName} while waiting for deferred maintenance`));
state.controllers.delete(controller);
if (state.controllers.size === 0) {
unregisterDeferredTurnMaintenanceAbortSignalHandlers(processLike, state);
}
cleanup();
};
const onSigint = () => abortWith("SIGINT");
const onSigterm = () => abortWith("SIGTERM");
processLike.on("SIGINT", onSigint);
processLike.on("SIGTERM", onSigterm);
return {
abortSignal: controller.signal,
@@ -100,6 +169,32 @@ export function createDeferredTurnMaintenanceAbortSignal(params?: {
};
}
export function resetDeferredTurnMaintenanceStateForTest(): void {
activeDeferredTurnMaintenanceRuns.clear();
const processLike = process as DeferredTurnMaintenanceProcessLike;
const state = processLike[DEFERRED_TURN_MAINTENANCE_ABORT_STATE_KEY];
if (!state) {
return;
}
state.controllers.clear();
unregisterDeferredTurnMaintenanceAbortSignalHandlers(processLike, state);
delete processLike[DEFERRED_TURN_MAINTENANCE_ABORT_STATE_KEY];
}
function markDeferredTurnMaintenanceTaskScheduleFailure(params: {
taskId: string;
error: unknown;
}): void {
const errorMessage = formatErrorMessage(params.error);
log.warn(`failed to schedule deferred context engine maintenance: ${errorMessage}`);
markTaskTerminalById({
taskId: params.taskId,
status: "cancelled",
endedAt: Date.now(),
terminalSummary: `Deferred maintenance could not be scheduled: ${errorMessage}`,
});
}
function buildTurnMaintenanceTaskDescriptor(params: { sessionKey: string }) {
const runId = `turn-maint:${params.sessionKey}:${Date.now().toString(36)}:${randomUUID().slice(
0,
@@ -412,9 +507,9 @@ function scheduleDeferredTurnMaintenance(params: DeferredTurnMaintenanceSchedule
`taskId=${task.taskId} sessionKey=${sessionKey} lane=${resolveDeferredTurnMaintenanceLane(sessionKey)}`,
);
const runPromise = enqueueCommandInLane(
resolveDeferredTurnMaintenanceLane(sessionKey),
async () =>
let runPromise: Promise<void>;
try {
runPromise = enqueueCommandInLane(resolveDeferredTurnMaintenanceLane(sessionKey), async () =>
runDeferredTurnMaintenanceWorker({
contextEngine: params.contextEngine,
sessionId: params.sessionId,
@@ -424,11 +519,21 @@ function scheduleDeferredTurnMaintenance(params: DeferredTurnMaintenanceSchedule
runtimeContext: params.runtimeContext,
runId: task.runId!,
}),
);
);
} catch (err) {
markDeferredTurnMaintenanceTaskScheduleFailure({
taskId: task.taskId,
error: err,
});
return;
}
let state!: DeferredTurnMaintenanceRunState;
const trackedPromise = runPromise
.catch((err) => {
log.warn(`failed to schedule deferred context engine maintenance: ${String(err)}`);
markDeferredTurnMaintenanceTaskScheduleFailure({
taskId: task.taskId,
error: err,
});
})
.finally(() => {
const current = activeDeferredTurnMaintenanceRuns.get(sessionKey);