fix: keep embedded run lanes from wedging

This commit is contained in:
Peter Steinberger
2026-04-29 21:37:12 +01:00
parent b83b639287
commit 470098bd26
10 changed files with 303 additions and 37 deletions

View File

@@ -26,6 +26,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Agents/Codex: bound embedded-run cleanup, trajectory flushing, and command-lane task timeouts after runtime failures, so Discord and other chat sessions return to idle instead of staying stuck in processing. Thanks @vincentkoc.
- Web fetch: add a documented `tools.web.fetch.ssrfPolicy.allowIpv6UniqueLocalRange` opt-in and thread it through cache keys and DNS/IP checks so trusted fake-IP proxy stacks using `fc00::/7` can work without broad private-network access. Fixes #74351. Thanks @jeffrey701.
- OpenAI Codex: restore `/verbose full` persistence and app-server tool-output forwarding, and retry Gateway E2E temp-home cleanup so debug runs do not regress on stale validation or cleanup flakes. Thanks @vincentkoc.
- Anthropic/Meridian: preserve text and thinking content seeded on `content_block_start` in anthropic-messages streams, so `[thinking, text]` replies no longer persist as empty turns or trigger empty-response fallbacks. Fixes #74410. Thanks @vyctorbrzezowski.

View File

@@ -28,6 +28,7 @@ import {
registerNativeHookRelay,
setActiveEmbeddedRun,
supportsModelTools,
runAgentCleanupStep,
type AgentMessage,
type EmbeddedRunAttemptParams,
type EmbeddedRunAttemptResult,
@@ -682,7 +683,15 @@ export async function runCodexAppServerAttempt(
notificationCleanup();
requestCleanup();
nativeHookRelay?.unregister();
await trajectoryRecorder?.flush();
await runAgentCleanupStep({
runId: params.runId,
sessionId: params.sessionId,
step: "codex-trajectory-flush-startup-failure",
log: embeddedAgentLog,
cleanup: async () => {
await trajectoryRecorder?.flush();
},
});
params.abortSignal?.removeEventListener("abort", abortFromUpstream);
throw error;
}
@@ -886,7 +895,15 @@ export async function runCodexAppServerAttempt(
aborted: runAbortController.signal.aborted,
});
}
await trajectoryRecorder?.flush();
await runAgentCleanupStep({
runId: params.runId,
sessionId: params.sessionId,
step: "codex-trajectory-flush",
log: embeddedAgentLog,
cleanup: async () => {
await trajectoryRecorder?.flush();
},
});
userInputBridge?.cancelPending();
clearTimeout(timeout);
clearTurnCompletionIdleTimer();

View File

@@ -12,6 +12,7 @@ import { formatErrorMessage } from "../../infra/errors.js";
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
import { resolveProviderAuthProfileId } from "../../plugins/provider-runtime.js";
import { enqueueCommandInLane } from "../../process/command-queue.js";
import type { CommandQueueEnqueueOptions } from "../../process/command-queue.types.js";
import { normalizeOptionalString } from "../../shared/string-coerce.js";
import { sanitizeForLog } from "../../terminal/ansi.js";
import { resolveUserPath } from "../../utils.js";
@@ -76,6 +77,7 @@ import {
pickFallbackThinkingLevel,
} from "../pi-embedded-helpers.js";
import { resolveProviderIdForAuth } from "../provider-auth-aliases.js";
import { runAgentCleanupStep } from "../run-cleanup-timeout.js";
import { buildAgentRuntimeAuthPlan } from "../runtime-plan/auth.js";
import { buildAgentRuntimePlan } from "../runtime-plan/build.js";
import { ensureRuntimePluginsLoaded } from "../runtime-plugins.js";
@@ -159,8 +161,26 @@ import { createUsageAccumulator, mergeUsageIntoAccumulator } from "./usage-accum
type ApiKeyInfo = ResolvedProviderAuth;
const MAX_SAME_MODEL_IDLE_TIMEOUT_RETRIES = 1;
const EMBEDDED_RUN_LANE_TIMEOUT_GRACE_MS = 30_000;
type EmbeddedRunAttemptForRunner = Awaited<ReturnType<typeof runEmbeddedAttemptWithBackend>>;
function resolveEmbeddedRunLaneTimeoutMs(timeoutMs: number): number | undefined {
if (!Number.isFinite(timeoutMs) || timeoutMs <= 0) {
return undefined;
}
return Math.floor(timeoutMs) + EMBEDDED_RUN_LANE_TIMEOUT_GRACE_MS;
}
function withEmbeddedRunLaneTimeout(
opts: CommandQueueEnqueueOptions | undefined,
laneTaskTimeoutMs: number | undefined,
): CommandQueueEnqueueOptions | undefined {
if (laneTaskTimeoutMs === undefined || opts?.taskTimeoutMs !== undefined) {
return opts;
}
return { ...opts, taskTimeoutMs: laneTaskTimeoutMs };
}
function normalizeEmbeddedRunAttemptResult(
attempt: EmbeddedRunAttemptForRunner,
): EmbeddedRunAttemptForRunner {
@@ -292,10 +312,15 @@ export async function runEmbeddedPiAgent(
}
const sessionLane = resolveSessionLane(params.sessionKey?.trim() || params.sessionId);
const globalLane = resolveGlobalLane(params.lane);
const enqueueGlobal =
params.enqueue ?? ((task, opts) => enqueueCommandInLane(globalLane, task, opts));
const enqueueSession =
params.enqueue ?? ((task, opts) => enqueueCommandInLane(sessionLane, task, opts));
const laneTaskTimeoutMs = resolveEmbeddedRunLaneTimeoutMs(params.timeoutMs);
const withLaneTimeout = (opts?: CommandQueueEnqueueOptions) =>
withEmbeddedRunLaneTimeout(opts, laneTaskTimeoutMs);
const enqueueGlobal = <T>(task: () => Promise<T>, opts?: CommandQueueEnqueueOptions) =>
params.enqueue
? params.enqueue(task, withLaneTimeout(opts))
: enqueueCommandInLane(globalLane, task, withLaneTimeout(opts));
const enqueueSession = <T>(task: () => Promise<T>, opts?: CommandQueueEnqueueOptions) =>
params.enqueue ? params.enqueue(task, opts) : enqueueCommandInLane(sessionLane, task, opts);
const channelHint = params.messageChannel ?? params.messageProvider;
const resolvedToolResultFormat =
params.toolResultFormat ??
@@ -2489,26 +2514,42 @@ export async function runEmbeddedPiAgent(
}
} finally {
forgetPromptBuildDrainCacheForRun(params.runId);
await contextEngine.dispose?.();
stopRuntimeAuthRefreshTimer();
await runAgentCleanupStep({
runId: params.runId,
sessionId: params.sessionId,
step: "context-engine-dispose",
log,
cleanup: async () => {
await contextEngine.dispose?.();
},
});
if (params.cleanupBundleMcpOnRunEnd === true) {
const onError = (error: unknown, sessionId: string) => {
log.warn(
`bundle-mcp cleanup failed after run for ${sessionId}: ${formatErrorMessage(error)}`,
);
};
const retiredBySessionKey = await retireSessionMcpRuntimeForSessionKey({
sessionKey: params.sessionKey,
reason: "embedded-run-end",
onError,
await runAgentCleanupStep({
runId: params.runId,
sessionId: params.sessionId,
step: "bundle-mcp-retire",
log,
cleanup: async () => {
const onError = (error: unknown, sessionId: string) => {
log.warn(
`bundle-mcp cleanup failed after run for ${sessionId}: ${formatErrorMessage(error)}`,
);
};
const retiredBySessionKey = await retireSessionMcpRuntimeForSessionKey({
sessionKey: params.sessionKey,
reason: "embedded-run-end",
onError,
});
if (!retiredBySessionKey) {
await retireSessionMcpRuntime({
sessionId: params.sessionId,
reason: "embedded-run-end",
onError,
});
}
},
});
if (!retiredBySessionKey) {
await retireSessionMcpRuntime({
sessionId: params.sessionId,
reason: "embedded-run-end",
onError,
});
}
}
}
});

View File

@@ -119,6 +119,7 @@ import {
import { wrapStreamFnTextTransforms } from "../../plugin-text-transforms.js";
import { describeProviderRequestRoutingSummary } from "../../provider-attribution.js";
import { registerProviderStreamForModel } from "../../provider-stream.js";
import { runAgentCleanupStep } from "../../run-cleanup-timeout.js";
import { collectRuntimeChannelCapabilities } from "../../runtime-capabilities.js";
import {
logAgentRuntimeToolDiagnostics,
@@ -3299,7 +3300,15 @@ export async function runEmbeddedAttempt(
promptError: promptError ? formatErrorMessage(promptError) : undefined,
});
}
await trajectoryRecorder?.flush();
await runAgentCleanupStep({
runId: params.runId,
sessionId: params.sessionId,
step: "pi-trajectory-flush",
log,
cleanup: async () => {
await trajectoryRecorder?.flush();
},
});
// Always tear down the session (and release the lock) before we leave this attempt.
//
// BUGFIX: Wait for the agent to be truly idle before flushing pending tool results.

View File

@@ -0,0 +1,53 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { AGENT_CLEANUP_STEP_TIMEOUT_MS, runAgentCleanupStep } from "./run-cleanup-timeout.js";
describe("agent cleanup timeout", () => {
const log = {
warn: vi.fn(),
};
beforeEach(() => {
vi.useFakeTimers();
log.warn.mockClear();
});
afterEach(() => {
vi.useRealTimers();
});
it("returns after the cleanup timeout when a cleanup step stalls", async () => {
const cleanup = vi.fn(async () => new Promise<never>(() => {}));
const result = runAgentCleanupStep({
runId: "run-1",
sessionId: "session-1",
step: "bundle-mcp-retire",
cleanup,
log,
});
await vi.advanceTimersByTimeAsync(AGENT_CLEANUP_STEP_TIMEOUT_MS);
await expect(result).resolves.toBeUndefined();
expect(cleanup).toHaveBeenCalledTimes(1);
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("agent cleanup timed out:"));
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("step=bundle-mcp-retire"));
});
it("logs cleanup rejection without throwing", async () => {
await expect(
runAgentCleanupStep({
runId: "run-2",
sessionId: "session-2",
step: "context-engine-dispose",
cleanup: async () => {
throw new Error("dispose failed");
},
log,
}),
).resolves.toBeUndefined();
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("agent cleanup failed:"));
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("dispose failed"));
});
});

View File

@@ -0,0 +1,52 @@
import { formatErrorMessage } from "../infra/errors.js";
export const AGENT_CLEANUP_STEP_TIMEOUT_MS = 10_000;
export type AgentCleanupLogger = {
warn: (message: string) => void;
};
export async function runAgentCleanupStep(params: {
runId: string;
sessionId: string;
step: string;
cleanup: () => Promise<void>;
log: AgentCleanupLogger;
timeoutMs?: number;
}): Promise<void> {
const timeoutMs = Math.max(1, Math.floor(params.timeoutMs ?? AGENT_CLEANUP_STEP_TIMEOUT_MS));
let timeoutHandle: ReturnType<typeof setTimeout> | undefined;
let timedOut = false;
const cleanupPromise = Promise.resolve().then(params.cleanup);
const observedCleanupPromise = cleanupPromise.catch((error) => {
if (!timedOut) {
params.log.warn(
`agent cleanup failed: runId=${params.runId} sessionId=${params.sessionId} step=${params.step} error=${formatErrorMessage(error)}`,
);
}
});
const timeoutPromise = new Promise<"timeout">((resolve) => {
timeoutHandle = setTimeout(() => {
timedOut = true;
resolve("timeout");
}, timeoutMs);
timeoutHandle.unref?.();
});
const result = await Promise.race([
observedCleanupPromise.then(() => "done" as const),
timeoutPromise,
]);
if (timeoutHandle) {
clearTimeout(timeoutHandle);
}
if (result === "timeout") {
params.log.warn(
`agent cleanup timed out: runId=${params.runId} sessionId=${params.sessionId} step=${params.step} timeoutMs=${timeoutMs}`,
);
void cleanupPromise.catch((error) => {
params.log.warn(
`agent cleanup rejected after timeout: runId=${params.runId} sessionId=${params.sessionId} step=${params.step} error=${formatErrorMessage(error)}`,
);
});
}
}

View File

@@ -60,6 +60,7 @@ export { VERSION as OPENCLAW_VERSION } from "../version.js";
export { formatErrorMessage } from "../infra/errors.js";
export { formatApprovalDisplayPath } from "../infra/approval-display-paths.js";
export { emitAgentEvent, onAgentEvent, resetAgentEventsForTest } from "../infra/agent-events.js";
export { runAgentCleanupStep } from "../agents/run-cleanup-timeout.js";
export { log as embeddedAgentLog } from "../agents/pi-embedded-runner/logger.js";
export { buildAgentRuntimePlan } from "../agents/runtime-plan/build.js";
export { classifyEmbeddedPiRunResultForModelFallback } from "../agents/pi-embedded-runner/result-fallback-classifier.js";

View File

@@ -22,6 +22,7 @@ type CommandQueueModule = typeof import("./command-queue.js");
let clearCommandLane: CommandQueueModule["clearCommandLane"];
let CommandLaneClearedError: CommandQueueModule["CommandLaneClearedError"];
let CommandLaneTaskTimeoutError: CommandQueueModule["CommandLaneTaskTimeoutError"];
let enqueueCommand: CommandQueueModule["enqueueCommand"];
let enqueueCommandInLane: CommandQueueModule["enqueueCommandInLane"];
let GatewayDrainingError: CommandQueueModule["GatewayDrainingError"];
@@ -63,6 +64,7 @@ describe("command queue", () => {
({
clearCommandLane,
CommandLaneClearedError,
CommandLaneTaskTimeoutError,
enqueueCommand,
enqueueCommandInLane,
GatewayDrainingError,
@@ -326,6 +328,42 @@ describe("command queue", () => {
await expect(other).resolves.toBe("other");
});
it("task timeout releases a stuck lane and drains queued work", async () => {
const lane = `timeout-lane-${Date.now()}-${Math.random().toString(16).slice(2)}`;
setCommandLaneConcurrency(lane, 1);
vi.useFakeTimers();
try {
const first = enqueueCommandInLane(lane, async () => new Promise<never>(() => {}), {
taskTimeoutMs: 25,
});
const firstRejected = expect(first).rejects.toBeInstanceOf(CommandLaneTaskTimeoutError);
let secondRan = false;
const second = enqueueCommandInLane(lane, async () => {
secondRan = true;
return "second";
});
expect(secondRan).toBe(false);
expect(getCommandLaneSnapshot(lane)).toMatchObject({
activeCount: 1,
queuedCount: 1,
});
await vi.advanceTimersByTimeAsync(25);
await firstRejected;
await expect(second).resolves.toBe("second");
expect(secondRan).toBe(true);
expect(getCommandLaneSnapshot(lane)).toMatchObject({
activeCount: 0,
queuedCount: 0,
});
} finally {
vi.useRealTimers();
}
});
it("getCommandLaneSnapshot reports active and queued work for one lane", async () => {
const lane = `snapshot-lane-${Date.now()}-${Math.random().toString(16).slice(2)}`;
setCommandLaneConcurrency(lane, 1);

View File

@@ -4,6 +4,7 @@ import {
logLaneEnqueue,
} from "../logging/diagnostic-runtime.js";
import { resolveGlobalSingleton } from "../shared/global-singleton.js";
import type { CommandQueueEnqueueOptions } from "./command-queue.types.js";
import { CommandLane } from "./lanes.js";
/**
* Dedicated error type thrown when a queued command is rejected because
@@ -17,6 +18,18 @@ export class CommandLaneClearedError extends Error {
}
}
/**
* Dedicated error type thrown when an active command exceeds its caller-owned
* lane timeout. The underlying task may still be unwinding, but the lane is
* released so queued work is not blocked forever.
*/
export class CommandLaneTaskTimeoutError extends Error {
constructor(lane: string, timeoutMs: number) {
super(`Command lane "${lane}" task timed out after ${timeoutMs}ms`);
this.name = "CommandLaneTaskTimeoutError";
}
}
/**
* Dedicated error type thrown when a new command is rejected because the
* gateway is currently draining for restart.
@@ -39,6 +52,7 @@ type QueueEntry = {
reject: (reason?: unknown) => void;
enqueuedAt: number;
warnAfterMs: number;
taskTimeoutMs?: number;
onWait?: (waitMs: number, queuedAhead: number) => void;
};
@@ -172,6 +186,48 @@ function notifyActiveTaskWaiters(): void {
}
}
function normalizeTaskTimeoutMs(value: number | undefined): number | undefined {
if (value === undefined || !Number.isFinite(value) || value <= 0) {
return undefined;
}
return Math.max(1, Math.floor(value));
}
async function runQueueEntryTask(lane: string, entry: QueueEntry): Promise<unknown> {
const taskPromise = Promise.resolve().then(entry.task);
const taskTimeoutMs = normalizeTaskTimeoutMs(entry.taskTimeoutMs);
if (taskTimeoutMs === undefined) {
return await taskPromise;
}
let timeoutHandle: ReturnType<typeof setTimeout> | undefined;
let timedOut = false;
const timeoutPromise = new Promise<never>((_, reject) => {
timeoutHandle = setTimeout(() => {
timedOut = true;
reject(new CommandLaneTaskTimeoutError(lane, taskTimeoutMs));
}, taskTimeoutMs);
timeoutHandle.unref?.();
});
try {
return await Promise.race([taskPromise, timeoutPromise]);
} catch (err) {
if (timedOut) {
void taskPromise.catch((lateErr) => {
diag.warn(
`lane task rejected after timeout: lane=${lane} timeoutMs=${taskTimeoutMs} error="${String(lateErr)}"`,
);
});
}
throw err;
} finally {
if (!timedOut && timeoutHandle) {
clearTimeout(timeoutHandle);
}
}
}
function drainLane(lane: string) {
const state = getLaneState(lane);
if (state.draining) {
@@ -206,7 +262,7 @@ function drainLane(lane: string) {
void (async () => {
const startTime = Date.now();
try {
const result = await entry.task();
const result = await runQueueEntryTask(lane, entry);
const completedCurrentGeneration = completeTask(state, taskId, taskGeneration);
if (completedCurrentGeneration) {
notifyActiveTaskWaiters();
@@ -262,10 +318,7 @@ export function setCommandLaneConcurrency(lane: string, maxConcurrent: number) {
export function enqueueCommandInLane<T>(
lane: string,
task: () => Promise<T>,
opts?: {
warnAfterMs?: number;
onWait?: (waitMs: number, queuedAhead: number) => void;
},
opts?: CommandQueueEnqueueOptions,
): Promise<T> {
const queueState = getQueueState();
if (queueState.gatewayDraining) {
@@ -281,6 +334,7 @@ export function enqueueCommandInLane<T>(
reject,
enqueuedAt: Date.now(),
warnAfterMs,
taskTimeoutMs: normalizeTaskTimeoutMs(opts?.taskTimeoutMs),
onWait: opts?.onWait,
});
logLaneEnqueue(cleaned, getLaneDepth(state));
@@ -290,10 +344,7 @@ export function enqueueCommandInLane<T>(
export function enqueueCommand<T>(
task: () => Promise<T>,
opts?: {
warnAfterMs?: number;
onWait?: (waitMs: number, queuedAhead: number) => void;
},
opts?: CommandQueueEnqueueOptions,
): Promise<T> {
return enqueueCommandInLane(CommandLane.Main, task, opts);
}

View File

@@ -1,7 +1,10 @@
export type CommandQueueEnqueueOptions = {
warnAfterMs?: number;
onWait?: (waitMs: number, queuedAhead: number) => void;
taskTimeoutMs?: number;
};
export type CommandQueueEnqueueFn = <T>(
task: () => Promise<T>,
opts?: {
warnAfterMs?: number;
onWait?: (waitMs: number, queuedAhead: number) => void;
},
opts?: CommandQueueEnqueueOptions,
) => Promise<T>;