diff --git a/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts b/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts index 2a32a310d18..63de202012c 100644 --- a/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts +++ b/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts @@ -28,6 +28,7 @@ const rewriteTranscriptEntriesInSessionFileMock = vi.fn(async (_params?: unknown rewrittenEntries: 2, })); let buildContextEngineMaintenanceRuntimeContext: typeof import("./context-engine-maintenance.js").buildContextEngineMaintenanceRuntimeContext; +let createDeferredTurnMaintenanceAbortSignal: typeof import("./context-engine-maintenance.js").createDeferredTurnMaintenanceAbortSignal; let runContextEngineMaintenance: typeof import("./context-engine-maintenance.js").runContextEngineMaintenance; // Keep this literal aligned with the production module; tests use dynamic // import reloading, so they cannot safely import the constant directly. @@ -67,7 +68,11 @@ vi.mock("./transcript-rewrite.js", () => ({ })); async function loadFreshContextEngineMaintenanceModuleForTest() { - ({ buildContextEngineMaintenanceRuntimeContext, runContextEngineMaintenance } = + ({ + buildContextEngineMaintenanceRuntimeContext, + createDeferredTurnMaintenanceAbortSignal, + runContextEngineMaintenance, + } = await import("./context-engine-maintenance.js")); } @@ -146,6 +151,44 @@ describe("buildContextEngineMaintenanceRuntimeContext", () => { }); }); +describe("createDeferredTurnMaintenanceAbortSignal", () => { + beforeEach(async () => { + await loadFreshContextEngineMaintenanceModuleForTest(); + }); + + it("aborts on termination signals and unregisters listeners", () => { + const listeners = new Map void>>(); + const processLike = { + on(event: "SIGINT" | "SIGTERM", listener: () => void) { + const bucket = listeners.get(event) ?? new Set<() => void>(); + bucket.add(listener); + listeners.set(event, bucket); + return this; + }, + off(event: "SIGINT" | "SIGTERM", listener: () => void) { + listeners.get(event)?.delete(listener); + return this; + }, + } as unknown as Pick; + + const { abortSignal, dispose } = createDeferredTurnMaintenanceAbortSignal({ processLike }); + expect(listeners.get("SIGINT")?.size ?? 0).toBe(1); + expect(listeners.get("SIGTERM")?.size ?? 0).toBe(1); + + const sigtermListeners = Array.from(listeners.get("SIGTERM") ?? []); + expect(sigtermListeners).toHaveLength(1); + sigtermListeners[0]?.(); + + expect(abortSignal?.aborted).toBe(true); + expect(listeners.get("SIGINT")?.size ?? 0).toBe(0); + expect(listeners.get("SIGTERM")?.size ?? 0).toBe(0); + + dispose(); + expect(listeners.get("SIGINT")?.size ?? 0).toBe(0); + expect(listeners.get("SIGTERM")?.size ?? 0).toBe(0); + }); +}); + describe("runContextEngineMaintenance", () => { beforeEach(async () => { rewriteTranscriptEntriesInSessionManagerMock.mockClear(); diff --git a/src/agents/pi-embedded-runner/context-engine-maintenance.ts b/src/agents/pi-embedded-runner/context-engine-maintenance.ts index 3b176d35fe0..b1230ae3c76 100644 --- a/src/agents/pi-embedded-runner/context-engine-maintenance.ts +++ b/src/agents/pi-embedded-runner/context-engine-maintenance.ts @@ -45,6 +45,46 @@ function resolveDeferredTurnMaintenanceLane(sessionKey: string): string { return `${TURN_MAINTENANCE_LANE_PREFIX}${sessionKey}`; } +export function createDeferredTurnMaintenanceAbortSignal(params?: { + processLike?: Pick; +}): { + abortSignal?: AbortSignal; + dispose: () => void; +} { + if (typeof AbortController === "undefined") { + return { abortSignal: undefined, dispose: () => {} }; + } + + const processLike = params?.processLike ?? process; + const controller = new AbortController(); + let disposed = false; + + const cleanup = () => { + if (disposed) { + return; + } + disposed = true; + processLike.off("SIGINT", onSigint); + processLike.off("SIGTERM", onSigterm); + }; + const abortWith = (signalName: "SIGINT" | "SIGTERM") => { + if (!controller.signal.aborted) { + controller.abort(new Error(`received ${signalName} while waiting for deferred maintenance`)); + } + cleanup(); + }; + const onSigint = () => abortWith("SIGINT"); + const onSigterm = () => abortWith("SIGTERM"); + + processLike.on("SIGINT", onSigint); + processLike.on("SIGTERM", onSigterm); + + return { + abortSignal: controller.signal, + dispose: cleanup, + }; +} + function buildTurnMaintenanceTaskDescriptor(params: { sessionKey: string }) { const runId = `turn-maint:${params.sessionKey}:${Date.now().toString(36)}:${randomUUID().slice( 0, @@ -182,6 +222,7 @@ async function runDeferredTurnMaintenanceWorker(params: { }): Promise { let surfacedUserNotice = false; let longRunningTimer: ReturnType | null = null; + const shutdownAbort = createDeferredTurnMaintenanceAbortSignal(); const surfaceMaintenanceUpdate = (summary: string, eventSummary: string) => { promoteTurnMaintenanceTaskVisibility({ sessionKey: params.sessionKey, @@ -217,7 +258,7 @@ async function runDeferredTurnMaintenanceWorker(params: { ); } } - await sleepWithAbort(TURN_MAINTENANCE_WAIT_POLL_MS); + await sleepWithAbort(TURN_MAINTENANCE_WAIT_POLL_MS, shutdownAbort.abortSignal); } const runningAt = Date.now(); @@ -271,6 +312,22 @@ async function runDeferredTurnMaintenanceWorker(params: { : "No transcript changes were needed.", }); } catch (err) { + if (shutdownAbort.abortSignal?.aborted) { + if (longRunningTimer) { + clearTimeout(longRunningTimer); + longRunningTimer = null; + } + const task = findTaskByRunId(params.runId); + if (task) { + markTaskTerminalById({ + taskId: task.taskId, + status: "cancelled", + endedAt: Date.now(), + terminalSummary: "Deferred maintenance cancelled during shutdown.", + }); + } + return; + } if (longRunningTimer) { clearTimeout(longRunningTimer); longRunningTimer = null; @@ -295,6 +352,8 @@ async function runDeferredTurnMaintenanceWorker(params: { terminalSummary: reason, }); log.warn(`deferred context engine maintenance failed: ${reason}`); + } finally { + shutdownAbort.dispose(); } }