From fce77e2d459212bc1581beeb6620dd34f9ff3816 Mon Sep 17 00:00:00 2001 From: Charles Dusek <38732970+cgdusek@users.noreply.github.com> Date: Mon, 9 Mar 2026 10:27:29 -0500 Subject: [PATCH] fix(agents): bound compaction retry wait and drain embedded runs on restart (#40324) Merged via squash. Prepared head SHA: cfd99562d686b21b9239d3d536c6f6aadc518138 Co-authored-by: cgdusek <38732970+cgdusek@users.noreply.github.com> Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com> Reviewed-by: @jalehman --- CHANGELOG.md | 1 + src/agents/pi-embedded-runner/run/attempt.ts | 19 ++- ...compaction-retry-aggregate-timeout.test.ts | 143 ++++++++++++++++++ .../run/compaction-retry-aggregate-timeout.ts | 51 +++++++ src/agents/pi-embedded-runner/runs.test.ts | 108 +++++++++++++ src/agents/pi-embedded-runner/runs.ts | 114 +++++++++++++- src/cli/gateway-cli/run-loop.test.ts | 19 ++- src/cli/gateway-cli/run-loop.ts | 35 ++++- src/infra/infra-runtime.test.ts | 4 +- src/infra/restart.ts | 3 +- 10 files changed, 478 insertions(+), 19 deletions(-) create mode 100644 src/agents/pi-embedded-runner/run/compaction-retry-aggregate-timeout.test.ts create mode 100644 src/agents/pi-embedded-runner/run/compaction-retry-aggregate-timeout.ts create mode 100644 src/agents/pi-embedded-runner/runs.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 15b57e181ff..7fa48053c6d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ Docs: https://docs.openclaw.ai - Gateway/Control UI: keep dashboard auth tokens in session-scoped browser storage so same-tab refreshes preserve remote token auth without restoring long-lived localStorage token persistence, while scoping tokens to the selected gateway URL and fragment-only bootstrap flow. (#40892) thanks @velvet-shark. - Models/Kimi Coding: send `anthropic-messages` tools in native Anthropic format again so `kimi-coding` stops degrading tool calls into XML/plain-text pseudo invocations instead of real `tool_use` blocks. (#38669, #39907, #40552) Thanks @opriz. - Context engine/tests: add bundled-registry regression coverage for cross-chunk resolution, plugin-sdk re-exports, and concurrent chunk registration. (#40460) thanks @dsantoreis. +- Agents/embedded runner: bound compaction retry waiting and drain embedded runs during SIGUSR1 restart so session lanes recover instead of staying blocked behind compaction. (#40324) thanks @cgdusek. ## 2026.3.8 diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index b8dc464e51c..d7fa541c2be 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -124,6 +124,7 @@ import { installToolResultContextGuard } from "../tool-result-context-guard.js"; import { splitSdkTools } from "../tool-split.js"; import { describeUnknownError, mapThinkingLevel } from "../utils.js"; import { flushPendingToolResultsAfterIdle } from "../wait-for-idle-before-flush.js"; +import { waitForCompactionRetryWithAggregateTimeout } from "./compaction-retry-aggregate-timeout.js"; import { selectCompactionTimeoutSnapshot, shouldFlagCompactionTimeout, @@ -1537,6 +1538,7 @@ export async function runEmbeddedAttempt( toolMetas, unsubscribe, waitForCompactionRetry, + isCompactionInFlight, getMessagingToolSentTexts, getMessagingToolSentMediaUrls, getMessagingToolSentTargets, @@ -1798,6 +1800,7 @@ export async function runEmbeddedAttempt( // Only trust snapshot if compaction wasn't running before or after capture const preCompactionSnapshot = wasCompactingBefore || wasCompactingAfter ? null : snapshot; const preCompactionSessionId = activeSession.sessionId; + const COMPACTION_RETRY_AGGREGATE_TIMEOUT_MS = 60_000; try { // Flush buffered block replies before waiting for compaction so the @@ -1808,7 +1811,21 @@ export async function runEmbeddedAttempt( await params.onBlockReplyFlush(); } - await abortable(waitForCompactionRetry()); + const compactionRetryWait = await waitForCompactionRetryWithAggregateTimeout({ + waitForCompactionRetry, + abortable, + aggregateTimeoutMs: COMPACTION_RETRY_AGGREGATE_TIMEOUT_MS, + isCompactionStillInFlight: isCompactionInFlight, + }); + if (compactionRetryWait.timedOut) { + timedOutDuringCompaction = true; + if (!isProbeSession) { + log.warn( + `compaction retry aggregate timeout (${COMPACTION_RETRY_AGGREGATE_TIMEOUT_MS}ms): ` + + `proceeding with pre-compaction state runId=${params.runId} sessionId=${params.sessionId}`, + ); + } + } } catch (err) { if (isRunnerAbortError(err)) { if (!promptError) { diff --git a/src/agents/pi-embedded-runner/run/compaction-retry-aggregate-timeout.test.ts b/src/agents/pi-embedded-runner/run/compaction-retry-aggregate-timeout.test.ts new file mode 100644 index 00000000000..9a38127c84a --- /dev/null +++ b/src/agents/pi-embedded-runner/run/compaction-retry-aggregate-timeout.test.ts @@ -0,0 +1,143 @@ +import { describe, expect, it, vi } from "vitest"; +import { waitForCompactionRetryWithAggregateTimeout } from "./compaction-retry-aggregate-timeout.js"; + +describe("waitForCompactionRetryWithAggregateTimeout", () => { + it("times out and fires callback when compaction retry never resolves", async () => { + vi.useFakeTimers(); + try { + const onTimeout = vi.fn(); + const waitForCompactionRetry = vi.fn(async () => await new Promise(() => {})); + + const resultPromise = waitForCompactionRetryWithAggregateTimeout({ + waitForCompactionRetry, + abortable: async (promise) => await promise, + aggregateTimeoutMs: 60_000, + onTimeout, + }); + + await vi.advanceTimersByTimeAsync(60_000); + const result = await resultPromise; + + expect(result.timedOut).toBe(true); + expect(onTimeout).toHaveBeenCalledTimes(1); + expect(vi.getTimerCount()).toBe(0); + } finally { + await vi.runOnlyPendingTimersAsync(); + vi.useRealTimers(); + } + }); + + it("keeps waiting while compaction remains in flight", async () => { + vi.useFakeTimers(); + try { + const onTimeout = vi.fn(); + let compactionInFlight = true; + const waitForCompactionRetry = vi.fn( + async () => + await new Promise((resolve) => { + setTimeout(() => { + compactionInFlight = false; + resolve(); + }, 170_000); + }), + ); + + const resultPromise = waitForCompactionRetryWithAggregateTimeout({ + waitForCompactionRetry, + abortable: async (promise) => await promise, + aggregateTimeoutMs: 60_000, + onTimeout, + isCompactionStillInFlight: () => compactionInFlight, + }); + + await vi.advanceTimersByTimeAsync(170_000); + const result = await resultPromise; + + expect(result.timedOut).toBe(false); + expect(onTimeout).not.toHaveBeenCalled(); + expect(vi.getTimerCount()).toBe(0); + } finally { + await vi.runOnlyPendingTimersAsync(); + vi.useRealTimers(); + } + }); + + it("times out after an idle timeout window", async () => { + vi.useFakeTimers(); + try { + const onTimeout = vi.fn(); + let compactionInFlight = true; + const waitForCompactionRetry = vi.fn(async () => await new Promise(() => {})); + setTimeout(() => { + compactionInFlight = false; + }, 90_000); + + const resultPromise = waitForCompactionRetryWithAggregateTimeout({ + waitForCompactionRetry, + abortable: async (promise) => await promise, + aggregateTimeoutMs: 60_000, + onTimeout, + isCompactionStillInFlight: () => compactionInFlight, + }); + + await vi.advanceTimersByTimeAsync(120_000); + const result = await resultPromise; + + expect(result.timedOut).toBe(true); + expect(onTimeout).toHaveBeenCalledTimes(1); + expect(vi.getTimerCount()).toBe(0); + } finally { + await vi.runOnlyPendingTimersAsync(); + vi.useRealTimers(); + } + }); + + it("does not time out when compaction retry resolves", async () => { + vi.useFakeTimers(); + try { + const onTimeout = vi.fn(); + const waitForCompactionRetry = vi.fn(async () => {}); + + const result = await waitForCompactionRetryWithAggregateTimeout({ + waitForCompactionRetry, + abortable: async (promise) => await promise, + aggregateTimeoutMs: 60_000, + onTimeout, + }); + + expect(result.timedOut).toBe(false); + expect(onTimeout).not.toHaveBeenCalled(); + expect(vi.getTimerCount()).toBe(0); + } finally { + await vi.runOnlyPendingTimersAsync(); + vi.useRealTimers(); + } + }); + + it("propagates abort errors from abortable and clears timer", async () => { + vi.useFakeTimers(); + try { + const abortError = new Error("aborted"); + abortError.name = "AbortError"; + const onTimeout = vi.fn(); + const waitForCompactionRetry = vi.fn(async () => await new Promise(() => {})); + + await expect( + waitForCompactionRetryWithAggregateTimeout({ + waitForCompactionRetry, + abortable: async () => { + throw abortError; + }, + aggregateTimeoutMs: 60_000, + onTimeout, + }), + ).rejects.toThrow("aborted"); + + expect(onTimeout).not.toHaveBeenCalled(); + expect(vi.getTimerCount()).toBe(0); + } finally { + await vi.runOnlyPendingTimersAsync(); + vi.useRealTimers(); + } + }); +}); diff --git a/src/agents/pi-embedded-runner/run/compaction-retry-aggregate-timeout.ts b/src/agents/pi-embedded-runner/run/compaction-retry-aggregate-timeout.ts new file mode 100644 index 00000000000..464e3cfcf7f --- /dev/null +++ b/src/agents/pi-embedded-runner/run/compaction-retry-aggregate-timeout.ts @@ -0,0 +1,51 @@ +/** + * Wait for compaction retry completion with an aggregate timeout to avoid + * holding a session lane indefinitely when retry resolution is lost. + */ +export async function waitForCompactionRetryWithAggregateTimeout(params: { + waitForCompactionRetry: () => Promise; + abortable: (promise: Promise) => Promise; + aggregateTimeoutMs: number; + onTimeout?: () => void; + isCompactionStillInFlight?: () => boolean; +}): Promise<{ timedOut: boolean }> { + const timeoutMsRaw = params.aggregateTimeoutMs; + const timeoutMs = Number.isFinite(timeoutMsRaw) ? Math.max(1, Math.floor(timeoutMsRaw)) : 1; + + let timedOut = false; + const waitPromise = params.waitForCompactionRetry().then(() => "done" as const); + + while (true) { + let timer: ReturnType | undefined; + try { + const result = await params.abortable( + Promise.race([ + waitPromise, + new Promise<"timeout">((resolve) => { + timer = setTimeout(() => resolve("timeout"), timeoutMs); + }), + ]), + ); + + if (result === "done") { + break; + } + + // Keep extending the timeout window while compaction is actively running. + // We only trigger the fallback timeout once compaction appears idle. + if (params.isCompactionStillInFlight?.()) { + continue; + } + + timedOut = true; + params.onTimeout?.(); + break; + } finally { + if (timer !== undefined) { + clearTimeout(timer); + } + } + } + + return { timedOut }; +} diff --git a/src/agents/pi-embedded-runner/runs.test.ts b/src/agents/pi-embedded-runner/runs.test.ts new file mode 100644 index 00000000000..73201749317 --- /dev/null +++ b/src/agents/pi-embedded-runner/runs.test.ts @@ -0,0 +1,108 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { + __testing, + abortEmbeddedPiRun, + clearActiveEmbeddedRun, + setActiveEmbeddedRun, + waitForActiveEmbeddedRuns, +} from "./runs.js"; + +describe("pi-embedded runner run registry", () => { + afterEach(() => { + __testing.resetActiveEmbeddedRuns(); + vi.restoreAllMocks(); + }); + + it("aborts only compacting runs in compacting mode", () => { + const abortCompacting = vi.fn(); + const abortNormal = vi.fn(); + + setActiveEmbeddedRun("session-compacting", { + queueMessage: async () => {}, + isStreaming: () => true, + isCompacting: () => true, + abort: abortCompacting, + }); + + setActiveEmbeddedRun("session-normal", { + queueMessage: async () => {}, + isStreaming: () => true, + isCompacting: () => false, + abort: abortNormal, + }); + + const aborted = abortEmbeddedPiRun(undefined, { mode: "compacting" }); + expect(aborted).toBe(true); + expect(abortCompacting).toHaveBeenCalledTimes(1); + expect(abortNormal).not.toHaveBeenCalled(); + }); + + it("aborts every active run in all mode", () => { + const abortA = vi.fn(); + const abortB = vi.fn(); + + setActiveEmbeddedRun("session-a", { + queueMessage: async () => {}, + isStreaming: () => true, + isCompacting: () => true, + abort: abortA, + }); + + setActiveEmbeddedRun("session-b", { + queueMessage: async () => {}, + isStreaming: () => true, + isCompacting: () => false, + abort: abortB, + }); + + const aborted = abortEmbeddedPiRun(undefined, { mode: "all" }); + expect(aborted).toBe(true); + expect(abortA).toHaveBeenCalledTimes(1); + expect(abortB).toHaveBeenCalledTimes(1); + }); + + it("waits for active runs to drain", async () => { + vi.useFakeTimers(); + try { + const handle = { + queueMessage: async () => {}, + isStreaming: () => true, + isCompacting: () => false, + abort: vi.fn(), + }; + setActiveEmbeddedRun("session-a", handle); + setTimeout(() => { + clearActiveEmbeddedRun("session-a", handle); + }, 500); + + const waitPromise = waitForActiveEmbeddedRuns(1_000, { pollMs: 100 }); + await vi.advanceTimersByTimeAsync(500); + const result = await waitPromise; + + expect(result.drained).toBe(true); + } finally { + await vi.runOnlyPendingTimersAsync(); + vi.useRealTimers(); + } + }); + + it("returns drained=false when timeout elapses", async () => { + vi.useFakeTimers(); + try { + setActiveEmbeddedRun("session-a", { + queueMessage: async () => {}, + isStreaming: () => true, + isCompacting: () => false, + abort: vi.fn(), + }); + + const waitPromise = waitForActiveEmbeddedRuns(1_000, { pollMs: 100 }); + await vi.advanceTimersByTimeAsync(1_000); + const result = await waitPromise; + expect(result.drained).toBe(false); + } finally { + await vi.runOnlyPendingTimersAsync(); + vi.useRealTimers(); + } + }); +}); diff --git a/src/agents/pi-embedded-runner/runs.ts b/src/agents/pi-embedded-runner/runs.ts index 41dad4df582..6b62b9b59ed 100644 --- a/src/agents/pi-embedded-runner/runs.ts +++ b/src/agents/pi-embedded-runner/runs.ts @@ -37,15 +37,70 @@ export function queueEmbeddedPiMessage(sessionId: string, text: string): boolean return true; } -export function abortEmbeddedPiRun(sessionId: string): boolean { - const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId); - if (!handle) { - diag.debug(`abort failed: sessionId=${sessionId} reason=no_active_run`); - return false; +/** + * Abort embedded PI runs. + * + * - With a sessionId, aborts that single run. + * - With no sessionId, supports targeted abort modes (for example, compacting runs only). + */ +export function abortEmbeddedPiRun(sessionId: string): boolean; +export function abortEmbeddedPiRun( + sessionId: undefined, + opts: { mode: "all" | "compacting" }, +): boolean; +export function abortEmbeddedPiRun( + sessionId?: string, + opts?: { mode?: "all" | "compacting" }, +): boolean { + if (typeof sessionId === "string" && sessionId.length > 0) { + const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId); + if (!handle) { + diag.debug(`abort failed: sessionId=${sessionId} reason=no_active_run`); + return false; + } + diag.debug(`aborting run: sessionId=${sessionId}`); + try { + handle.abort(); + } catch (err) { + diag.warn(`abort failed: sessionId=${sessionId} err=${String(err)}`); + return false; + } + return true; } - diag.debug(`aborting run: sessionId=${sessionId}`); - handle.abort(); - return true; + + const mode = opts?.mode; + if (mode === "compacting") { + let aborted = false; + for (const [id, handle] of ACTIVE_EMBEDDED_RUNS) { + if (!handle.isCompacting()) { + continue; + } + diag.debug(`aborting compacting run: sessionId=${id}`); + try { + handle.abort(); + aborted = true; + } catch (err) { + diag.warn(`abort failed: sessionId=${id} err=${String(err)}`); + } + } + return aborted; + } + + if (mode === "all") { + let aborted = false; + for (const [id, handle] of ACTIVE_EMBEDDED_RUNS) { + diag.debug(`aborting run: sessionId=${id}`); + try { + handle.abort(); + aborted = true; + } catch (err) { + diag.warn(`abort failed: sessionId=${id} err=${String(err)}`); + } + } + return aborted; + } + + return false; } export function isEmbeddedPiRunActive(sessionId: string): boolean { @@ -68,6 +123,36 @@ export function getActiveEmbeddedRunCount(): number { return ACTIVE_EMBEDDED_RUNS.size; } +/** + * Wait for active embedded runs to drain. + * + * Used during restarts so in-flight compaction runs can release session write + * locks before the next lifecycle starts. + */ +export async function waitForActiveEmbeddedRuns( + timeoutMs = 15_000, + opts?: { pollMs?: number }, +): Promise<{ drained: boolean }> { + const pollMsRaw = opts?.pollMs ?? 250; + const pollMs = Math.max(10, Math.floor(pollMsRaw)); + const maxWaitMs = Math.max(pollMs, Math.floor(timeoutMs)); + + const startedAt = Date.now(); + while (true) { + if (ACTIVE_EMBEDDED_RUNS.size === 0) { + return { drained: true }; + } + const elapsedMs = Date.now() - startedAt; + if (elapsedMs >= maxWaitMs) { + diag.warn( + `wait for active embedded runs timed out: activeRuns=${ACTIVE_EMBEDDED_RUNS.size} timeoutMs=${maxWaitMs}`, + ); + return { drained: false }; + } + await new Promise((resolve) => setTimeout(resolve, pollMs)); + } +} + export function waitForEmbeddedPiRunEnd(sessionId: string, timeoutMs = 15_000): Promise { if (!sessionId || !ACTIVE_EMBEDDED_RUNS.has(sessionId)) { return Promise.resolve(true); @@ -150,4 +235,17 @@ export function clearActiveEmbeddedRun( } } +export const __testing = { + resetActiveEmbeddedRuns() { + for (const waiters of EMBEDDED_RUN_WAITERS.values()) { + for (const waiter of waiters) { + clearTimeout(waiter.timer); + waiter.resolve(true); + } + } + EMBEDDED_RUN_WAITERS.clear(); + ACTIVE_EMBEDDED_RUNS.clear(); + }, +}; + export type { EmbeddedPiQueueHandle }; diff --git a/src/cli/gateway-cli/run-loop.test.ts b/src/cli/gateway-cli/run-loop.test.ts index 9e44d67c59b..bff37742254 100644 --- a/src/cli/gateway-cli/run-loop.test.ts +++ b/src/cli/gateway-cli/run-loop.test.ts @@ -15,6 +15,11 @@ const resetAllLanes = vi.fn(); const restartGatewayProcessWithFreshPid = vi.fn< () => { mode: "spawned" | "supervised" | "disabled" | "failed"; pid?: number; detail?: string } >(() => ({ mode: "disabled" })); +const abortEmbeddedPiRun = vi.fn( + (_sessionId?: string, _opts?: { mode?: "all" | "compacting" }) => false, +); +const getActiveEmbeddedRunCount = vi.fn(() => 0); +const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs: number) => ({ drained: true })); const DRAIN_TIMEOUT_LOG = "drain timeout reached; proceeding with restart"; const gatewayLog = { info: vi.fn(), @@ -43,6 +48,13 @@ vi.mock("../../process/command-queue.js", () => ({ resetAllLanes: () => resetAllLanes(), })); +vi.mock("../../agents/pi-embedded-runner/runs.js", () => ({ + abortEmbeddedPiRun: (sessionId?: string, opts?: { mode?: "all" | "compacting" }) => + abortEmbeddedPiRun(sessionId, opts), + getActiveEmbeddedRunCount: () => getActiveEmbeddedRunCount(), + waitForActiveEmbeddedRuns: (timeoutMs: number) => waitForActiveEmbeddedRuns(timeoutMs), +})); + vi.mock("../../logging/subsystem.js", () => ({ createSubsystemLogger: () => gatewayLog, })); @@ -186,7 +198,9 @@ describe("runGatewayLoop", () => { await withIsolatedSignals(async ({ captureSignal }) => { getActiveTaskCount.mockReturnValueOnce(2).mockReturnValueOnce(0); + getActiveEmbeddedRunCount.mockReturnValueOnce(1).mockReturnValueOnce(0); waitForActiveTasks.mockResolvedValueOnce({ drained: false }); + waitForActiveEmbeddedRuns.mockResolvedValueOnce({ drained: true }); type StartServer = () => Promise<{ close: (opts: { reason: string; restartExpectedMs: number | null }) => Promise; @@ -243,7 +257,10 @@ describe("runGatewayLoop", () => { expect(start).toHaveBeenCalledTimes(2); await new Promise((resolve) => setImmediate(resolve)); - expect(waitForActiveTasks).toHaveBeenCalledWith(30_000); + expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "compacting" }); + expect(waitForActiveTasks).toHaveBeenCalledWith(90_000); + expect(waitForActiveEmbeddedRuns).toHaveBeenCalledWith(90_000); + expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "all" }); expect(markGatewayDraining).toHaveBeenCalledTimes(1); expect(gatewayLog.warn).toHaveBeenCalledWith(DRAIN_TIMEOUT_LOG); expect(closeFirst).toHaveBeenCalledWith({ diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index 4fbb4807264..13ef073a80d 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -1,3 +1,8 @@ +import { + abortEmbeddedPiRun, + getActiveEmbeddedRunCount, + waitForActiveEmbeddedRuns, +} from "../../agents/pi-embedded-runner/runs.js"; import type { startGatewayServer } from "../../gateway/server.js"; import { acquireGatewayLock } from "../../infra/gateway-lock.js"; import { restartGatewayProcessWithFreshPid } from "../../infra/process-respawn.js"; @@ -90,7 +95,7 @@ export async function runGatewayLoop(params: { exitProcess(0); }; - const DRAIN_TIMEOUT_MS = 30_000; + const DRAIN_TIMEOUT_MS = 90_000; const SHUTDOWN_TIMEOUT_MS = 5_000; const request = (action: GatewayRunSignalAction, signal: string) => { @@ -121,15 +126,33 @@ export async function runGatewayLoop(params: { // sessions get an explicit restart error instead of silent task loss. markGatewayDraining(); const activeTasks = getActiveTaskCount(); - if (activeTasks > 0) { + const activeRuns = getActiveEmbeddedRunCount(); + + // Best-effort abort for compacting runs so long compaction operations + // don't hold session write locks across restart boundaries. + if (activeRuns > 0) { + abortEmbeddedPiRun(undefined, { mode: "compacting" }); + } + + if (activeTasks > 0 || activeRuns > 0) { gatewayLog.info( - `draining ${activeTasks} active task(s) before restart (timeout ${DRAIN_TIMEOUT_MS}ms)`, + `draining ${activeTasks} active task(s) and ${activeRuns} active embedded run(s) before restart (timeout ${DRAIN_TIMEOUT_MS}ms)`, ); - const { drained } = await waitForActiveTasks(DRAIN_TIMEOUT_MS); - if (drained) { - gatewayLog.info("all active tasks drained"); + const [tasksDrain, runsDrain] = await Promise.all([ + activeTasks > 0 + ? waitForActiveTasks(DRAIN_TIMEOUT_MS) + : Promise.resolve({ drained: true }), + activeRuns > 0 + ? waitForActiveEmbeddedRuns(DRAIN_TIMEOUT_MS) + : Promise.resolve({ drained: true }), + ]); + if (tasksDrain.drained && runsDrain.drained) { + gatewayLog.info("all active work drained"); } else { gatewayLog.warn("drain timeout reached; proceeding with restart"); + // Final best-effort abort to avoid carrying active runs into the + // next lifecycle when drain time budget is exhausted. + abortEmbeddedPiRun(undefined, { mode: "all" }); } } } diff --git a/src/infra/infra-runtime.test.ts b/src/infra/infra-runtime.test.ts index 6a406e8113b..e7656de974f 100644 --- a/src/infra/infra-runtime.test.ts +++ b/src/infra/infra-runtime.test.ts @@ -244,8 +244,8 @@ describe("infra runtime", () => { await vi.advanceTimersByTimeAsync(0); expect(emitSpy).not.toHaveBeenCalledWith("SIGUSR1"); - // Advance past the 30s max deferral wait - await vi.advanceTimersByTimeAsync(30_000); + // Advance past the 90s max deferral wait + await vi.advanceTimersByTimeAsync(90_000); expect(emitSpy).toHaveBeenCalledWith("SIGUSR1"); } finally { process.removeListener("SIGUSR1", handler); diff --git a/src/infra/restart.ts b/src/infra/restart.ts index ddb4352e5ca..3e0379f25f2 100644 --- a/src/infra/restart.ts +++ b/src/infra/restart.ts @@ -19,7 +19,8 @@ export type RestartAttempt = { const SPAWN_TIMEOUT_MS = 2000; const SIGUSR1_AUTH_GRACE_MS = 5000; const DEFAULT_DEFERRAL_POLL_MS = 500; -const DEFAULT_DEFERRAL_MAX_WAIT_MS = 30_000; +// Cover slow in-flight embedded compaction work before forcing restart. +const DEFAULT_DEFERRAL_MAX_WAIT_MS = 90_000; const RESTART_COOLDOWN_MS = 30_000; const restartLog = createSubsystemLogger("restart");