From 809cd146336b91a3fd2776ef8a64156f1e24a70c Mon Sep 17 00:00:00 2001 From: Eva Date: Sun, 12 Apr 2026 17:56:42 +0700 Subject: [PATCH] Harden deferred maintenance scheduling edges --- .../context-engine-maintenance.test.ts | 95 +++++++++++- .../context-engine-maintenance.ts | 143 +++++++++++++++--- 2 files changed, 212 insertions(+), 26 deletions(-) diff --git a/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts b/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts index 6b91419dc28..be2829e5d76 100644 --- a/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts +++ b/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts @@ -5,6 +5,7 @@ import { enqueueCommandInLane, resetCommandQueueStateForTest, } from "../../process/command-queue.js"; +import * as commandQueueModule from "../../process/command-queue.js"; import { createQueuedTaskRun } from "../../tasks/task-executor.js"; import { resetTaskFlowRegistryForTests } from "../../tasks/task-flow-registry.js"; import { @@ -14,8 +15,8 @@ import { resetTaskRegistryForTests, setTaskRegistryDeliveryRuntimeForTests, } from "../../tasks/task-registry.js"; -import { castAgentMessage } from "../test-helpers/agent-message-fixtures.js"; import { withStateDirEnv } from "../../test-helpers/state-dir-env.js"; +import { castAgentMessage } from "../test-helpers/agent-message-fixtures.js"; import { resolveSessionLane } from "./lanes.js"; const rewriteTranscriptEntriesInSessionManagerMock = vi.fn((_params?: unknown) => ({ @@ -30,6 +31,7 @@ const rewriteTranscriptEntriesInSessionFileMock = vi.fn(async (_params?: unknown })); let buildContextEngineMaintenanceRuntimeContext: typeof import("./context-engine-maintenance.js").buildContextEngineMaintenanceRuntimeContext; let createDeferredTurnMaintenanceAbortSignal: typeof import("./context-engine-maintenance.js").createDeferredTurnMaintenanceAbortSignal; +let resetDeferredTurnMaintenanceStateForTest: typeof import("./context-engine-maintenance.js").resetDeferredTurnMaintenanceStateForTest; let runContextEngineMaintenance: typeof import("./context-engine-maintenance.js").runContextEngineMaintenance; // Keep this literal aligned with the production module; tests use dynamic // import reloading, so they cannot safely import the constant directly. @@ -72,9 +74,10 @@ async function loadFreshContextEngineMaintenanceModuleForTest() { ({ buildContextEngineMaintenanceRuntimeContext, createDeferredTurnMaintenanceAbortSignal, + resetDeferredTurnMaintenanceStateForTest, runContextEngineMaintenance, - } = - await import("./context-engine-maintenance.js")); + } = await import("./context-engine-maintenance.js")); + resetDeferredTurnMaintenanceStateForTest(); } describe("buildContextEngineMaintenanceRuntimeContext", () => { @@ -159,6 +162,7 @@ describe("createDeferredTurnMaintenanceAbortSignal", () => { it("aborts on termination signals and unregisters listeners", () => { const listeners = new Map void>>(); + const kill = vi.fn(); const processLike = { on(event: "SIGINT" | "SIGTERM", listener: () => void) { const bucket = listeners.get(event) ?? new Set<() => void>(); @@ -170,9 +174,17 @@ describe("createDeferredTurnMaintenanceAbortSignal", () => { listeners.get(event)?.delete(listener); return this; }, - } as unknown as Pick; + listenerCount(event: "SIGINT" | "SIGTERM") { + return listeners.get(event)?.size ?? 0; + }, + kill, + pid: 4242, + } as unknown as NonNullable< + Parameters[0] + >["processLike"]; const { abortSignal, dispose } = createDeferredTurnMaintenanceAbortSignal({ processLike }); + const second = createDeferredTurnMaintenanceAbortSignal({ processLike }); expect(listeners.get("SIGINT")?.size ?? 0).toBe(1); expect(listeners.get("SIGTERM")?.size ?? 0).toBe(1); @@ -181,10 +193,13 @@ describe("createDeferredTurnMaintenanceAbortSignal", () => { sigtermListeners[0]?.(); expect(abortSignal?.aborted).toBe(true); + expect(second.abortSignal?.aborted).toBe(true); + expect(kill).toHaveBeenCalledWith(4242, "SIGTERM"); expect(listeners.get("SIGINT")?.size ?? 0).toBe(0); expect(listeners.get("SIGTERM")?.size ?? 0).toBe(0); dispose(); + second.dispose(); expect(listeners.get("SIGINT")?.size ?? 0).toBe(0); expect(listeners.get("SIGTERM")?.size ?? 0).toBe(0); }); @@ -246,7 +261,9 @@ describe("runContextEngineMaintenance", () => { it("forces background maintenance rewrites through the session file even when a session manager exists", async () => { const maintain = vi.fn(async (params?: unknown) => { - await (params as { runtimeContext?: ContextEngineRuntimeContext } | undefined)?.runtimeContext?.rewriteTranscriptEntries?.({ + await ( + params as { runtimeContext?: ContextEngineRuntimeContext } | undefined + )?.runtimeContext?.rewriteTranscriptEntries?.({ replacements: [ { entryId: "entry-1", @@ -577,7 +594,10 @@ describe("runContextEngineMaintenance", () => { turnMaintenanceMode: "background" as const, }, ingest: async () => ({ ingested: true }), - assemble: async ({ messages }: { messages: unknown[] }) => ({ messages, estimatedTokens: 0 }), + assemble: async ({ messages }: { messages: unknown[] }) => ({ + messages, + estimatedTokens: 0, + }), compact: async () => ({ ok: true, compacted: false }), maintain, } as NonNullable[0]["contextEngine"]>; @@ -607,6 +627,64 @@ describe("runContextEngineMaintenance", () => { }); }); + it("cancels the queued task when deferred scheduling is rejected", async () => { + await withStateDirEnv("openclaw-turn-maintenance-", async () => { + vi.useFakeTimers(); + const scheduleError = new Error("gateway draining"); + const enqueueSpy = vi + .spyOn(commandQueueModule, "enqueueCommandInLane") + .mockRejectedValue(scheduleError); + try { + resetTaskRegistryForTests({ persist: false }); + resetTaskFlowRegistryForTests({ persist: false }); + resetCommandQueueStateForTest(); + + const sessionKey = "agent:main:session-enqueue-reject"; + const maintain = vi.fn(async () => ({ + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + })); + const backgroundEngine = { + info: { + id: "test", + name: "Test Engine", + turnMaintenanceMode: "background" as const, + }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }: { messages: unknown[] }) => ({ + messages, + estimatedTokens: 0, + }), + compact: async () => ({ ok: true, compacted: false }), + maintain, + } as NonNullable[0]["contextEngine"]>; + + await runContextEngineMaintenance({ + contextEngine: backgroundEngine, + sessionId: "session-enqueue-reject", + sessionKey, + sessionFile: "/tmp/session-enqueue-reject.jsonl", + reason: "turn", + }); + await flushAsyncWork(); + + const tasks = listTasksForOwnerKey(sessionKey).filter( + (task) => task.taskKind === TURN_MAINTENANCE_TASK_KIND, + ); + expect(tasks).toHaveLength(1); + expect(tasks[0]).toMatchObject({ + status: "cancelled", + terminalSummary: expect.stringContaining("gateway draining"), + }); + expect(maintain).not.toHaveBeenCalled(); + } finally { + enqueueSpy.mockRestore(); + vi.useRealTimers(); + } + }); + }); + it("lets foreground turns win while deferred maintenance is waiting", async () => { await withStateDirEnv("openclaw-turn-maintenance-", async () => { vi.useFakeTimers(); @@ -832,7 +910,10 @@ describe("runContextEngineMaintenance", () => { turnMaintenanceMode: "background" as const, }, ingest: async () => ({ ingested: true }), - assemble: async ({ messages }: { messages: unknown[] }) => ({ messages, estimatedTokens: 0 }), + assemble: async ({ messages }: { messages: unknown[] }) => ({ + messages, + estimatedTokens: 0, + }), compact: async () => ({ ok: true, compacted: false }), maintain: vi.fn(async () => ({ changed: false, diff --git a/src/agents/pi-embedded-runner/context-engine-maintenance.ts b/src/agents/pi-embedded-runner/context-engine-maintenance.ts index 2709bc64117..e7a13f06ad8 100644 --- a/src/agents/pi-embedded-runner/context-engine-maintenance.ts +++ b/src/agents/pi-embedded-runner/context-engine-maintenance.ts @@ -35,6 +35,9 @@ const TURN_MAINTENANCE_TASK_TASK = "Deferred context-engine maintenance after tu const TURN_MAINTENANCE_LANE_PREFIX = "context-engine-turn-maintenance:"; const TURN_MAINTENANCE_WAIT_POLL_MS = 100; const TURN_MAINTENANCE_LONG_WAIT_MS = 10_000; +const DEFERRED_TURN_MAINTENANCE_ABORT_STATE_KEY = Symbol.for( + "openclaw.contextEngineTurnMaintenanceAbortState", +); type DeferredTurnMaintenanceScheduleParams = { contextEngine: ContextEngine; sessionId: string; @@ -52,6 +55,47 @@ type DeferredTurnMaintenanceRunState = { const activeDeferredTurnMaintenanceRuns = new Map(); +type DeferredTurnMaintenanceSignal = "SIGINT" | "SIGTERM"; +type DeferredTurnMaintenanceProcessLike = Pick & + Partial> & { + [DEFERRED_TURN_MAINTENANCE_ABORT_STATE_KEY]?: DeferredTurnMaintenanceAbortState; + }; +type DeferredTurnMaintenanceAbortState = { + registered: boolean; + controllers: Set; + cleanupHandlers: Map void>; +}; + +function resolveDeferredTurnMaintenanceAbortState( + processLike: DeferredTurnMaintenanceProcessLike, +): DeferredTurnMaintenanceAbortState { + const existing = processLike[DEFERRED_TURN_MAINTENANCE_ABORT_STATE_KEY]; + if (existing) { + return existing; + } + const created: DeferredTurnMaintenanceAbortState = { + registered: false, + controllers: new Set(), + cleanupHandlers: new Map void>(), + }; + processLike[DEFERRED_TURN_MAINTENANCE_ABORT_STATE_KEY] = created; + return created; +} + +function unregisterDeferredTurnMaintenanceAbortSignalHandlers( + processLike: DeferredTurnMaintenanceProcessLike, + state: DeferredTurnMaintenanceAbortState, +): void { + if (!state.registered) { + return; + } + for (const [signal, handler] of state.cleanupHandlers) { + processLike.off(signal, handler); + } + state.cleanupHandlers.clear(); + state.registered = false; +} + function normalizeSessionKey(sessionKey?: string): string | undefined { return normalizeOptionalString(sessionKey) || undefined; } @@ -61,7 +105,7 @@ function resolveDeferredTurnMaintenanceLane(sessionKey: string): string { } export function createDeferredTurnMaintenanceAbortSignal(params?: { - processLike?: Pick; + processLike?: DeferredTurnMaintenanceProcessLike; }): { abortSignal?: AbortSignal; dispose: () => void; @@ -70,8 +114,42 @@ export function createDeferredTurnMaintenanceAbortSignal(params?: { return { abortSignal: undefined, dispose: () => {} }; } - const processLike = params?.processLike ?? process; + const processLike = (params?.processLike ?? process) as DeferredTurnMaintenanceProcessLike; + const state = resolveDeferredTurnMaintenanceAbortState(processLike); + const handleTerminationSignal = (signalName: DeferredTurnMaintenanceSignal) => { + const shouldReraise = + typeof processLike.listenerCount === "function" + ? processLike.listenerCount(signalName) === 1 + : false; + for (const activeController of state.controllers) { + if (!activeController.signal.aborted) { + activeController.abort( + new Error(`received ${signalName} while waiting for deferred maintenance`), + ); + } + } + state.controllers.clear(); + unregisterDeferredTurnMaintenanceAbortSignalHandlers(processLike, state); + if (shouldReraise && typeof processLike.kill === "function") { + try { + processLike.kill(processLike.pid ?? process.pid, signalName); + } catch { + // Ignore shutdown-path failures. + } + } + }; + if (!state.registered) { + state.registered = true; + const onSigint = () => handleTerminationSignal("SIGINT"); + const onSigterm = () => handleTerminationSignal("SIGTERM"); + state.cleanupHandlers.set("SIGINT", onSigint); + state.cleanupHandlers.set("SIGTERM", onSigterm); + processLike.on("SIGINT", onSigint); + processLike.on("SIGTERM", onSigterm); + } + const controller = new AbortController(); + state.controllers.add(controller); let disposed = false; const cleanup = () => { @@ -79,20 +157,11 @@ export function createDeferredTurnMaintenanceAbortSignal(params?: { return; } disposed = true; - processLike.off("SIGINT", onSigint); - processLike.off("SIGTERM", onSigterm); - }; - const abortWith = (signalName: "SIGINT" | "SIGTERM") => { - if (!controller.signal.aborted) { - controller.abort(new Error(`received ${signalName} while waiting for deferred maintenance`)); + state.controllers.delete(controller); + if (state.controllers.size === 0) { + unregisterDeferredTurnMaintenanceAbortSignalHandlers(processLike, state); } - cleanup(); }; - const onSigint = () => abortWith("SIGINT"); - const onSigterm = () => abortWith("SIGTERM"); - - processLike.on("SIGINT", onSigint); - processLike.on("SIGTERM", onSigterm); return { abortSignal: controller.signal, @@ -100,6 +169,32 @@ export function createDeferredTurnMaintenanceAbortSignal(params?: { }; } +export function resetDeferredTurnMaintenanceStateForTest(): void { + activeDeferredTurnMaintenanceRuns.clear(); + const processLike = process as DeferredTurnMaintenanceProcessLike; + const state = processLike[DEFERRED_TURN_MAINTENANCE_ABORT_STATE_KEY]; + if (!state) { + return; + } + state.controllers.clear(); + unregisterDeferredTurnMaintenanceAbortSignalHandlers(processLike, state); + delete processLike[DEFERRED_TURN_MAINTENANCE_ABORT_STATE_KEY]; +} + +function markDeferredTurnMaintenanceTaskScheduleFailure(params: { + taskId: string; + error: unknown; +}): void { + const errorMessage = formatErrorMessage(params.error); + log.warn(`failed to schedule deferred context engine maintenance: ${errorMessage}`); + markTaskTerminalById({ + taskId: params.taskId, + status: "cancelled", + endedAt: Date.now(), + terminalSummary: `Deferred maintenance could not be scheduled: ${errorMessage}`, + }); +} + function buildTurnMaintenanceTaskDescriptor(params: { sessionKey: string }) { const runId = `turn-maint:${params.sessionKey}:${Date.now().toString(36)}:${randomUUID().slice( 0, @@ -412,9 +507,9 @@ function scheduleDeferredTurnMaintenance(params: DeferredTurnMaintenanceSchedule `taskId=${task.taskId} sessionKey=${sessionKey} lane=${resolveDeferredTurnMaintenanceLane(sessionKey)}`, ); - const runPromise = enqueueCommandInLane( - resolveDeferredTurnMaintenanceLane(sessionKey), - async () => + let runPromise: Promise; + try { + runPromise = enqueueCommandInLane(resolveDeferredTurnMaintenanceLane(sessionKey), async () => runDeferredTurnMaintenanceWorker({ contextEngine: params.contextEngine, sessionId: params.sessionId, @@ -424,11 +519,21 @@ function scheduleDeferredTurnMaintenance(params: DeferredTurnMaintenanceSchedule runtimeContext: params.runtimeContext, runId: task.runId!, }), - ); + ); + } catch (err) { + markDeferredTurnMaintenanceTaskScheduleFailure({ + taskId: task.taskId, + error: err, + }); + return; + } let state!: DeferredTurnMaintenanceRunState; const trackedPromise = runPromise .catch((err) => { - log.warn(`failed to schedule deferred context engine maintenance: ${String(err)}`); + markDeferredTurnMaintenanceTaskScheduleFailure({ + taskId: task.taskId, + error: err, + }); }) .finally(() => { const current = activeDeferredTurnMaintenanceRuns.get(sessionKey);