fix(agents): bound compaction retry wait and drain embedded runs on restart (#40324)

Merged via squash.

Prepared head SHA: cfd99562d6
Co-authored-by: cgdusek <38732970+cgdusek@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
This commit is contained in:
Charles Dusek
2026-03-09 10:27:29 -05:00
committed by Vincent Koc
parent d0df6f3a4c
commit fce77e2d45
10 changed files with 478 additions and 19 deletions

View File

@@ -14,6 +14,7 @@ Docs: https://docs.openclaw.ai
- Gateway/Control UI: keep dashboard auth tokens in session-scoped browser storage so same-tab refreshes preserve remote token auth without restoring long-lived localStorage token persistence, while scoping tokens to the selected gateway URL and fragment-only bootstrap flow. (#40892) thanks @velvet-shark.
- Models/Kimi Coding: send `anthropic-messages` tools in native Anthropic format again so `kimi-coding` stops degrading tool calls into XML/plain-text pseudo invocations instead of real `tool_use` blocks. (#38669, #39907, #40552) Thanks @opriz.
- Context engine/tests: add bundled-registry regression coverage for cross-chunk resolution, plugin-sdk re-exports, and concurrent chunk registration. (#40460) thanks @dsantoreis.
- Agents/embedded runner: bound compaction retry waiting and drain embedded runs during SIGUSR1 restart so session lanes recover instead of staying blocked behind compaction. (#40324) thanks @cgdusek.
## 2026.3.8

View File

@@ -124,6 +124,7 @@ import { installToolResultContextGuard } from "../tool-result-context-guard.js";
import { splitSdkTools } from "../tool-split.js";
import { describeUnknownError, mapThinkingLevel } from "../utils.js";
import { flushPendingToolResultsAfterIdle } from "../wait-for-idle-before-flush.js";
import { waitForCompactionRetryWithAggregateTimeout } from "./compaction-retry-aggregate-timeout.js";
import {
selectCompactionTimeoutSnapshot,
shouldFlagCompactionTimeout,
@@ -1537,6 +1538,7 @@ export async function runEmbeddedAttempt(
toolMetas,
unsubscribe,
waitForCompactionRetry,
isCompactionInFlight,
getMessagingToolSentTexts,
getMessagingToolSentMediaUrls,
getMessagingToolSentTargets,
@@ -1798,6 +1800,7 @@ export async function runEmbeddedAttempt(
// Only trust snapshot if compaction wasn't running before or after capture
const preCompactionSnapshot = wasCompactingBefore || wasCompactingAfter ? null : snapshot;
const preCompactionSessionId = activeSession.sessionId;
const COMPACTION_RETRY_AGGREGATE_TIMEOUT_MS = 60_000;
try {
// Flush buffered block replies before waiting for compaction so the
@@ -1808,7 +1811,21 @@ export async function runEmbeddedAttempt(
await params.onBlockReplyFlush();
}
await abortable(waitForCompactionRetry());
const compactionRetryWait = await waitForCompactionRetryWithAggregateTimeout({
waitForCompactionRetry,
abortable,
aggregateTimeoutMs: COMPACTION_RETRY_AGGREGATE_TIMEOUT_MS,
isCompactionStillInFlight: isCompactionInFlight,
});
if (compactionRetryWait.timedOut) {
timedOutDuringCompaction = true;
if (!isProbeSession) {
log.warn(
`compaction retry aggregate timeout (${COMPACTION_RETRY_AGGREGATE_TIMEOUT_MS}ms): ` +
`proceeding with pre-compaction state runId=${params.runId} sessionId=${params.sessionId}`,
);
}
}
} catch (err) {
if (isRunnerAbortError(err)) {
if (!promptError) {

View File

@@ -0,0 +1,143 @@
import { describe, expect, it, vi } from "vitest";
import { waitForCompactionRetryWithAggregateTimeout } from "./compaction-retry-aggregate-timeout.js";
describe("waitForCompactionRetryWithAggregateTimeout", () => {
it("times out and fires callback when compaction retry never resolves", async () => {
vi.useFakeTimers();
try {
const onTimeout = vi.fn();
const waitForCompactionRetry = vi.fn(async () => await new Promise<void>(() => {}));
const resultPromise = waitForCompactionRetryWithAggregateTimeout({
waitForCompactionRetry,
abortable: async (promise) => await promise,
aggregateTimeoutMs: 60_000,
onTimeout,
});
await vi.advanceTimersByTimeAsync(60_000);
const result = await resultPromise;
expect(result.timedOut).toBe(true);
expect(onTimeout).toHaveBeenCalledTimes(1);
expect(vi.getTimerCount()).toBe(0);
} finally {
await vi.runOnlyPendingTimersAsync();
vi.useRealTimers();
}
});
it("keeps waiting while compaction remains in flight", async () => {
vi.useFakeTimers();
try {
const onTimeout = vi.fn();
let compactionInFlight = true;
const waitForCompactionRetry = vi.fn(
async () =>
await new Promise<void>((resolve) => {
setTimeout(() => {
compactionInFlight = false;
resolve();
}, 170_000);
}),
);
const resultPromise = waitForCompactionRetryWithAggregateTimeout({
waitForCompactionRetry,
abortable: async (promise) => await promise,
aggregateTimeoutMs: 60_000,
onTimeout,
isCompactionStillInFlight: () => compactionInFlight,
});
await vi.advanceTimersByTimeAsync(170_000);
const result = await resultPromise;
expect(result.timedOut).toBe(false);
expect(onTimeout).not.toHaveBeenCalled();
expect(vi.getTimerCount()).toBe(0);
} finally {
await vi.runOnlyPendingTimersAsync();
vi.useRealTimers();
}
});
it("times out after an idle timeout window", async () => {
vi.useFakeTimers();
try {
const onTimeout = vi.fn();
let compactionInFlight = true;
const waitForCompactionRetry = vi.fn(async () => await new Promise<void>(() => {}));
setTimeout(() => {
compactionInFlight = false;
}, 90_000);
const resultPromise = waitForCompactionRetryWithAggregateTimeout({
waitForCompactionRetry,
abortable: async (promise) => await promise,
aggregateTimeoutMs: 60_000,
onTimeout,
isCompactionStillInFlight: () => compactionInFlight,
});
await vi.advanceTimersByTimeAsync(120_000);
const result = await resultPromise;
expect(result.timedOut).toBe(true);
expect(onTimeout).toHaveBeenCalledTimes(1);
expect(vi.getTimerCount()).toBe(0);
} finally {
await vi.runOnlyPendingTimersAsync();
vi.useRealTimers();
}
});
it("does not time out when compaction retry resolves", async () => {
vi.useFakeTimers();
try {
const onTimeout = vi.fn();
const waitForCompactionRetry = vi.fn(async () => {});
const result = await waitForCompactionRetryWithAggregateTimeout({
waitForCompactionRetry,
abortable: async (promise) => await promise,
aggregateTimeoutMs: 60_000,
onTimeout,
});
expect(result.timedOut).toBe(false);
expect(onTimeout).not.toHaveBeenCalled();
expect(vi.getTimerCount()).toBe(0);
} finally {
await vi.runOnlyPendingTimersAsync();
vi.useRealTimers();
}
});
it("propagates abort errors from abortable and clears timer", async () => {
vi.useFakeTimers();
try {
const abortError = new Error("aborted");
abortError.name = "AbortError";
const onTimeout = vi.fn();
const waitForCompactionRetry = vi.fn(async () => await new Promise<void>(() => {}));
await expect(
waitForCompactionRetryWithAggregateTimeout({
waitForCompactionRetry,
abortable: async () => {
throw abortError;
},
aggregateTimeoutMs: 60_000,
onTimeout,
}),
).rejects.toThrow("aborted");
expect(onTimeout).not.toHaveBeenCalled();
expect(vi.getTimerCount()).toBe(0);
} finally {
await vi.runOnlyPendingTimersAsync();
vi.useRealTimers();
}
});
});

View File

@@ -0,0 +1,51 @@
/**
* Wait for compaction retry completion with an aggregate timeout to avoid
* holding a session lane indefinitely when retry resolution is lost.
*/
export async function waitForCompactionRetryWithAggregateTimeout(params: {
waitForCompactionRetry: () => Promise<void>;
abortable: <T>(promise: Promise<T>) => Promise<T>;
aggregateTimeoutMs: number;
onTimeout?: () => void;
isCompactionStillInFlight?: () => boolean;
}): Promise<{ timedOut: boolean }> {
const timeoutMsRaw = params.aggregateTimeoutMs;
const timeoutMs = Number.isFinite(timeoutMsRaw) ? Math.max(1, Math.floor(timeoutMsRaw)) : 1;
let timedOut = false;
const waitPromise = params.waitForCompactionRetry().then(() => "done" as const);
while (true) {
let timer: ReturnType<typeof setTimeout> | undefined;
try {
const result = await params.abortable(
Promise.race([
waitPromise,
new Promise<"timeout">((resolve) => {
timer = setTimeout(() => resolve("timeout"), timeoutMs);
}),
]),
);
if (result === "done") {
break;
}
// Keep extending the timeout window while compaction is actively running.
// We only trigger the fallback timeout once compaction appears idle.
if (params.isCompactionStillInFlight?.()) {
continue;
}
timedOut = true;
params.onTimeout?.();
break;
} finally {
if (timer !== undefined) {
clearTimeout(timer);
}
}
}
return { timedOut };
}

View File

@@ -0,0 +1,108 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import {
__testing,
abortEmbeddedPiRun,
clearActiveEmbeddedRun,
setActiveEmbeddedRun,
waitForActiveEmbeddedRuns,
} from "./runs.js";
describe("pi-embedded runner run registry", () => {
afterEach(() => {
__testing.resetActiveEmbeddedRuns();
vi.restoreAllMocks();
});
it("aborts only compacting runs in compacting mode", () => {
const abortCompacting = vi.fn();
const abortNormal = vi.fn();
setActiveEmbeddedRun("session-compacting", {
queueMessage: async () => {},
isStreaming: () => true,
isCompacting: () => true,
abort: abortCompacting,
});
setActiveEmbeddedRun("session-normal", {
queueMessage: async () => {},
isStreaming: () => true,
isCompacting: () => false,
abort: abortNormal,
});
const aborted = abortEmbeddedPiRun(undefined, { mode: "compacting" });
expect(aborted).toBe(true);
expect(abortCompacting).toHaveBeenCalledTimes(1);
expect(abortNormal).not.toHaveBeenCalled();
});
it("aborts every active run in all mode", () => {
const abortA = vi.fn();
const abortB = vi.fn();
setActiveEmbeddedRun("session-a", {
queueMessage: async () => {},
isStreaming: () => true,
isCompacting: () => true,
abort: abortA,
});
setActiveEmbeddedRun("session-b", {
queueMessage: async () => {},
isStreaming: () => true,
isCompacting: () => false,
abort: abortB,
});
const aborted = abortEmbeddedPiRun(undefined, { mode: "all" });
expect(aborted).toBe(true);
expect(abortA).toHaveBeenCalledTimes(1);
expect(abortB).toHaveBeenCalledTimes(1);
});
it("waits for active runs to drain", async () => {
vi.useFakeTimers();
try {
const handle = {
queueMessage: async () => {},
isStreaming: () => true,
isCompacting: () => false,
abort: vi.fn(),
};
setActiveEmbeddedRun("session-a", handle);
setTimeout(() => {
clearActiveEmbeddedRun("session-a", handle);
}, 500);
const waitPromise = waitForActiveEmbeddedRuns(1_000, { pollMs: 100 });
await vi.advanceTimersByTimeAsync(500);
const result = await waitPromise;
expect(result.drained).toBe(true);
} finally {
await vi.runOnlyPendingTimersAsync();
vi.useRealTimers();
}
});
it("returns drained=false when timeout elapses", async () => {
vi.useFakeTimers();
try {
setActiveEmbeddedRun("session-a", {
queueMessage: async () => {},
isStreaming: () => true,
isCompacting: () => false,
abort: vi.fn(),
});
const waitPromise = waitForActiveEmbeddedRuns(1_000, { pollMs: 100 });
await vi.advanceTimersByTimeAsync(1_000);
const result = await waitPromise;
expect(result.drained).toBe(false);
} finally {
await vi.runOnlyPendingTimersAsync();
vi.useRealTimers();
}
});
});

View File

@@ -37,15 +37,70 @@ export function queueEmbeddedPiMessage(sessionId: string, text: string): boolean
return true;
}
export function abortEmbeddedPiRun(sessionId: string): boolean {
const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId);
if (!handle) {
diag.debug(`abort failed: sessionId=${sessionId} reason=no_active_run`);
return false;
/**
* Abort embedded PI runs.
*
* - With a sessionId, aborts that single run.
* - With no sessionId, supports targeted abort modes (for example, compacting runs only).
*/
export function abortEmbeddedPiRun(sessionId: string): boolean;
export function abortEmbeddedPiRun(
sessionId: undefined,
opts: { mode: "all" | "compacting" },
): boolean;
export function abortEmbeddedPiRun(
sessionId?: string,
opts?: { mode?: "all" | "compacting" },
): boolean {
if (typeof sessionId === "string" && sessionId.length > 0) {
const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId);
if (!handle) {
diag.debug(`abort failed: sessionId=${sessionId} reason=no_active_run`);
return false;
}
diag.debug(`aborting run: sessionId=${sessionId}`);
try {
handle.abort();
} catch (err) {
diag.warn(`abort failed: sessionId=${sessionId} err=${String(err)}`);
return false;
}
return true;
}
diag.debug(`aborting run: sessionId=${sessionId}`);
handle.abort();
return true;
const mode = opts?.mode;
if (mode === "compacting") {
let aborted = false;
for (const [id, handle] of ACTIVE_EMBEDDED_RUNS) {
if (!handle.isCompacting()) {
continue;
}
diag.debug(`aborting compacting run: sessionId=${id}`);
try {
handle.abort();
aborted = true;
} catch (err) {
diag.warn(`abort failed: sessionId=${id} err=${String(err)}`);
}
}
return aborted;
}
if (mode === "all") {
let aborted = false;
for (const [id, handle] of ACTIVE_EMBEDDED_RUNS) {
diag.debug(`aborting run: sessionId=${id}`);
try {
handle.abort();
aborted = true;
} catch (err) {
diag.warn(`abort failed: sessionId=${id} err=${String(err)}`);
}
}
return aborted;
}
return false;
}
export function isEmbeddedPiRunActive(sessionId: string): boolean {
@@ -68,6 +123,36 @@ export function getActiveEmbeddedRunCount(): number {
return ACTIVE_EMBEDDED_RUNS.size;
}
/**
* Wait for active embedded runs to drain.
*
* Used during restarts so in-flight compaction runs can release session write
* locks before the next lifecycle starts.
*/
export async function waitForActiveEmbeddedRuns(
timeoutMs = 15_000,
opts?: { pollMs?: number },
): Promise<{ drained: boolean }> {
const pollMsRaw = opts?.pollMs ?? 250;
const pollMs = Math.max(10, Math.floor(pollMsRaw));
const maxWaitMs = Math.max(pollMs, Math.floor(timeoutMs));
const startedAt = Date.now();
while (true) {
if (ACTIVE_EMBEDDED_RUNS.size === 0) {
return { drained: true };
}
const elapsedMs = Date.now() - startedAt;
if (elapsedMs >= maxWaitMs) {
diag.warn(
`wait for active embedded runs timed out: activeRuns=${ACTIVE_EMBEDDED_RUNS.size} timeoutMs=${maxWaitMs}`,
);
return { drained: false };
}
await new Promise<void>((resolve) => setTimeout(resolve, pollMs));
}
}
export function waitForEmbeddedPiRunEnd(sessionId: string, timeoutMs = 15_000): Promise<boolean> {
if (!sessionId || !ACTIVE_EMBEDDED_RUNS.has(sessionId)) {
return Promise.resolve(true);
@@ -150,4 +235,17 @@ export function clearActiveEmbeddedRun(
}
}
export const __testing = {
resetActiveEmbeddedRuns() {
for (const waiters of EMBEDDED_RUN_WAITERS.values()) {
for (const waiter of waiters) {
clearTimeout(waiter.timer);
waiter.resolve(true);
}
}
EMBEDDED_RUN_WAITERS.clear();
ACTIVE_EMBEDDED_RUNS.clear();
},
};
export type { EmbeddedPiQueueHandle };

View File

@@ -15,6 +15,11 @@ const resetAllLanes = vi.fn();
const restartGatewayProcessWithFreshPid = vi.fn<
() => { mode: "spawned" | "supervised" | "disabled" | "failed"; pid?: number; detail?: string }
>(() => ({ mode: "disabled" }));
const abortEmbeddedPiRun = vi.fn(
(_sessionId?: string, _opts?: { mode?: "all" | "compacting" }) => false,
);
const getActiveEmbeddedRunCount = vi.fn(() => 0);
const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs: number) => ({ drained: true }));
const DRAIN_TIMEOUT_LOG = "drain timeout reached; proceeding with restart";
const gatewayLog = {
info: vi.fn(),
@@ -43,6 +48,13 @@ vi.mock("../../process/command-queue.js", () => ({
resetAllLanes: () => resetAllLanes(),
}));
vi.mock("../../agents/pi-embedded-runner/runs.js", () => ({
abortEmbeddedPiRun: (sessionId?: string, opts?: { mode?: "all" | "compacting" }) =>
abortEmbeddedPiRun(sessionId, opts),
getActiveEmbeddedRunCount: () => getActiveEmbeddedRunCount(),
waitForActiveEmbeddedRuns: (timeoutMs: number) => waitForActiveEmbeddedRuns(timeoutMs),
}));
vi.mock("../../logging/subsystem.js", () => ({
createSubsystemLogger: () => gatewayLog,
}));
@@ -186,7 +198,9 @@ describe("runGatewayLoop", () => {
await withIsolatedSignals(async ({ captureSignal }) => {
getActiveTaskCount.mockReturnValueOnce(2).mockReturnValueOnce(0);
getActiveEmbeddedRunCount.mockReturnValueOnce(1).mockReturnValueOnce(0);
waitForActiveTasks.mockResolvedValueOnce({ drained: false });
waitForActiveEmbeddedRuns.mockResolvedValueOnce({ drained: true });
type StartServer = () => Promise<{
close: (opts: { reason: string; restartExpectedMs: number | null }) => Promise<void>;
@@ -243,7 +257,10 @@ describe("runGatewayLoop", () => {
expect(start).toHaveBeenCalledTimes(2);
await new Promise<void>((resolve) => setImmediate(resolve));
expect(waitForActiveTasks).toHaveBeenCalledWith(30_000);
expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "compacting" });
expect(waitForActiveTasks).toHaveBeenCalledWith(90_000);
expect(waitForActiveEmbeddedRuns).toHaveBeenCalledWith(90_000);
expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "all" });
expect(markGatewayDraining).toHaveBeenCalledTimes(1);
expect(gatewayLog.warn).toHaveBeenCalledWith(DRAIN_TIMEOUT_LOG);
expect(closeFirst).toHaveBeenCalledWith({

View File

@@ -1,3 +1,8 @@
import {
abortEmbeddedPiRun,
getActiveEmbeddedRunCount,
waitForActiveEmbeddedRuns,
} from "../../agents/pi-embedded-runner/runs.js";
import type { startGatewayServer } from "../../gateway/server.js";
import { acquireGatewayLock } from "../../infra/gateway-lock.js";
import { restartGatewayProcessWithFreshPid } from "../../infra/process-respawn.js";
@@ -90,7 +95,7 @@ export async function runGatewayLoop(params: {
exitProcess(0);
};
const DRAIN_TIMEOUT_MS = 30_000;
const DRAIN_TIMEOUT_MS = 90_000;
const SHUTDOWN_TIMEOUT_MS = 5_000;
const request = (action: GatewayRunSignalAction, signal: string) => {
@@ -121,15 +126,33 @@ export async function runGatewayLoop(params: {
// sessions get an explicit restart error instead of silent task loss.
markGatewayDraining();
const activeTasks = getActiveTaskCount();
if (activeTasks > 0) {
const activeRuns = getActiveEmbeddedRunCount();
// Best-effort abort for compacting runs so long compaction operations
// don't hold session write locks across restart boundaries.
if (activeRuns > 0) {
abortEmbeddedPiRun(undefined, { mode: "compacting" });
}
if (activeTasks > 0 || activeRuns > 0) {
gatewayLog.info(
`draining ${activeTasks} active task(s) before restart (timeout ${DRAIN_TIMEOUT_MS}ms)`,
`draining ${activeTasks} active task(s) and ${activeRuns} active embedded run(s) before restart (timeout ${DRAIN_TIMEOUT_MS}ms)`,
);
const { drained } = await waitForActiveTasks(DRAIN_TIMEOUT_MS);
if (drained) {
gatewayLog.info("all active tasks drained");
const [tasksDrain, runsDrain] = await Promise.all([
activeTasks > 0
? waitForActiveTasks(DRAIN_TIMEOUT_MS)
: Promise.resolve({ drained: true }),
activeRuns > 0
? waitForActiveEmbeddedRuns(DRAIN_TIMEOUT_MS)
: Promise.resolve({ drained: true }),
]);
if (tasksDrain.drained && runsDrain.drained) {
gatewayLog.info("all active work drained");
} else {
gatewayLog.warn("drain timeout reached; proceeding with restart");
// Final best-effort abort to avoid carrying active runs into the
// next lifecycle when drain time budget is exhausted.
abortEmbeddedPiRun(undefined, { mode: "all" });
}
}
}

View File

@@ -244,8 +244,8 @@ describe("infra runtime", () => {
await vi.advanceTimersByTimeAsync(0);
expect(emitSpy).not.toHaveBeenCalledWith("SIGUSR1");
// Advance past the 30s max deferral wait
await vi.advanceTimersByTimeAsync(30_000);
// Advance past the 90s max deferral wait
await vi.advanceTimersByTimeAsync(90_000);
expect(emitSpy).toHaveBeenCalledWith("SIGUSR1");
} finally {
process.removeListener("SIGUSR1", handler);

View File

@@ -19,7 +19,8 @@ export type RestartAttempt = {
const SPAWN_TIMEOUT_MS = 2000;
const SIGUSR1_AUTH_GRACE_MS = 5000;
const DEFAULT_DEFERRAL_POLL_MS = 500;
const DEFAULT_DEFERRAL_MAX_WAIT_MS = 30_000;
// Cover slow in-flight embedded compaction work before forcing restart.
const DEFAULT_DEFERRAL_MAX_WAIT_MS = 90_000;
const RESTART_COOLDOWN_MS = 30_000;
const restartLog = createSubsystemLogger("restart");