Handle deferred maintenance shutdown cleanly

This commit is contained in:
Eva
2026-04-12 15:00:38 +07:00
committed by Josh Lehman
parent 1a2e5e35cc
commit 131368b790
2 changed files with 104 additions and 2 deletions

View File

@@ -28,6 +28,7 @@ const rewriteTranscriptEntriesInSessionFileMock = vi.fn(async (_params?: unknown
rewrittenEntries: 2,
}));
let buildContextEngineMaintenanceRuntimeContext: typeof import("./context-engine-maintenance.js").buildContextEngineMaintenanceRuntimeContext;
let createDeferredTurnMaintenanceAbortSignal: typeof import("./context-engine-maintenance.js").createDeferredTurnMaintenanceAbortSignal;
let runContextEngineMaintenance: typeof import("./context-engine-maintenance.js").runContextEngineMaintenance;
// Keep this literal aligned with the production module; tests use dynamic
// import reloading, so they cannot safely import the constant directly.
@@ -67,7 +68,11 @@ vi.mock("./transcript-rewrite.js", () => ({
}));
async function loadFreshContextEngineMaintenanceModuleForTest() {
({ buildContextEngineMaintenanceRuntimeContext, runContextEngineMaintenance } =
({
buildContextEngineMaintenanceRuntimeContext,
createDeferredTurnMaintenanceAbortSignal,
runContextEngineMaintenance,
} =
await import("./context-engine-maintenance.js"));
}
@@ -146,6 +151,44 @@ describe("buildContextEngineMaintenanceRuntimeContext", () => {
});
});
describe("createDeferredTurnMaintenanceAbortSignal", () => {
beforeEach(async () => {
await loadFreshContextEngineMaintenanceModuleForTest();
});
it("aborts on termination signals and unregisters listeners", () => {
const listeners = new Map<string, Set<() => void>>();
const processLike = {
on(event: "SIGINT" | "SIGTERM", listener: () => void) {
const bucket = listeners.get(event) ?? new Set<() => void>();
bucket.add(listener);
listeners.set(event, bucket);
return this;
},
off(event: "SIGINT" | "SIGTERM", listener: () => void) {
listeners.get(event)?.delete(listener);
return this;
},
} as unknown as Pick<NodeJS.Process, "on" | "off">;
const { abortSignal, dispose } = createDeferredTurnMaintenanceAbortSignal({ processLike });
expect(listeners.get("SIGINT")?.size ?? 0).toBe(1);
expect(listeners.get("SIGTERM")?.size ?? 0).toBe(1);
const sigtermListeners = Array.from(listeners.get("SIGTERM") ?? []);
expect(sigtermListeners).toHaveLength(1);
sigtermListeners[0]?.();
expect(abortSignal?.aborted).toBe(true);
expect(listeners.get("SIGINT")?.size ?? 0).toBe(0);
expect(listeners.get("SIGTERM")?.size ?? 0).toBe(0);
dispose();
expect(listeners.get("SIGINT")?.size ?? 0).toBe(0);
expect(listeners.get("SIGTERM")?.size ?? 0).toBe(0);
});
});
describe("runContextEngineMaintenance", () => {
beforeEach(async () => {
rewriteTranscriptEntriesInSessionManagerMock.mockClear();

View File

@@ -45,6 +45,46 @@ function resolveDeferredTurnMaintenanceLane(sessionKey: string): string {
return `${TURN_MAINTENANCE_LANE_PREFIX}${sessionKey}`;
}
export function createDeferredTurnMaintenanceAbortSignal(params?: {
processLike?: Pick<NodeJS.Process, "on" | "off">;
}): {
abortSignal?: AbortSignal;
dispose: () => void;
} {
if (typeof AbortController === "undefined") {
return { abortSignal: undefined, dispose: () => {} };
}
const processLike = params?.processLike ?? process;
const controller = new AbortController();
let disposed = false;
const cleanup = () => {
if (disposed) {
return;
}
disposed = true;
processLike.off("SIGINT", onSigint);
processLike.off("SIGTERM", onSigterm);
};
const abortWith = (signalName: "SIGINT" | "SIGTERM") => {
if (!controller.signal.aborted) {
controller.abort(new Error(`received ${signalName} while waiting for deferred maintenance`));
}
cleanup();
};
const onSigint = () => abortWith("SIGINT");
const onSigterm = () => abortWith("SIGTERM");
processLike.on("SIGINT", onSigint);
processLike.on("SIGTERM", onSigterm);
return {
abortSignal: controller.signal,
dispose: cleanup,
};
}
function buildTurnMaintenanceTaskDescriptor(params: { sessionKey: string }) {
const runId = `turn-maint:${params.sessionKey}:${Date.now().toString(36)}:${randomUUID().slice(
0,
@@ -182,6 +222,7 @@ async function runDeferredTurnMaintenanceWorker(params: {
}): Promise<void> {
let surfacedUserNotice = false;
let longRunningTimer: ReturnType<typeof setTimeout> | null = null;
const shutdownAbort = createDeferredTurnMaintenanceAbortSignal();
const surfaceMaintenanceUpdate = (summary: string, eventSummary: string) => {
promoteTurnMaintenanceTaskVisibility({
sessionKey: params.sessionKey,
@@ -217,7 +258,7 @@ async function runDeferredTurnMaintenanceWorker(params: {
);
}
}
await sleepWithAbort(TURN_MAINTENANCE_WAIT_POLL_MS);
await sleepWithAbort(TURN_MAINTENANCE_WAIT_POLL_MS, shutdownAbort.abortSignal);
}
const runningAt = Date.now();
@@ -271,6 +312,22 @@ async function runDeferredTurnMaintenanceWorker(params: {
: "No transcript changes were needed.",
});
} catch (err) {
if (shutdownAbort.abortSignal?.aborted) {
if (longRunningTimer) {
clearTimeout(longRunningTimer);
longRunningTimer = null;
}
const task = findTaskByRunId(params.runId);
if (task) {
markTaskTerminalById({
taskId: task.taskId,
status: "cancelled",
endedAt: Date.now(),
terminalSummary: "Deferred maintenance cancelled during shutdown.",
});
}
return;
}
if (longRunningTimer) {
clearTimeout(longRunningTimer);
longRunningTimer = null;
@@ -295,6 +352,8 @@ async function runDeferredTurnMaintenanceWorker(params: {
terminalSummary: reason,
});
log.warn(`deferred context engine maintenance failed: ${reason}`);
} finally {
shutdownAbort.dispose();
}
}