From 470098bd26f3a12d88d50b2d9ffc0219b3a69643 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 29 Apr 2026 21:37:12 +0100 Subject: [PATCH] fix: keep embedded run lanes from wedging --- CHANGELOG.md | 1 + .../codex/src/app-server/run-attempt.ts | 21 ++++- src/agents/pi-embedded-runner/run.ts | 83 ++++++++++++++----- src/agents/pi-embedded-runner/run/attempt.ts | 11 ++- src/agents/run-cleanup-timeout.test.ts | 53 ++++++++++++ src/agents/run-cleanup-timeout.ts | 52 ++++++++++++ src/plugin-sdk/agent-harness-runtime.ts | 1 + src/process/command-queue.test.ts | 38 +++++++++ src/process/command-queue.ts | 69 +++++++++++++-- src/process/command-queue.types.ts | 11 ++- 10 files changed, 303 insertions(+), 37 deletions(-) create mode 100644 src/agents/run-cleanup-timeout.test.ts create mode 100644 src/agents/run-cleanup-timeout.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index c137525d66d..fc1d66bb0d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Agents/Codex: bound embedded-run cleanup, trajectory flushing, and command-lane task timeouts after runtime failures, so Discord and other chat sessions return to idle instead of staying stuck in processing. Thanks @vincentkoc. - Web fetch: add a documented `tools.web.fetch.ssrfPolicy.allowIpv6UniqueLocalRange` opt-in and thread it through cache keys and DNS/IP checks so trusted fake-IP proxy stacks using `fc00::/7` can work without broad private-network access. Fixes #74351. Thanks @jeffrey701. - OpenAI Codex: restore `/verbose full` persistence and app-server tool-output forwarding, and retry Gateway E2E temp-home cleanup so debug runs do not regress on stale validation or cleanup flakes. Thanks @vincentkoc. - Anthropic/Meridian: preserve text and thinking content seeded on `content_block_start` in anthropic-messages streams, so `[thinking, text]` replies no longer persist as empty turns or trigger empty-response fallbacks. Fixes #74410. Thanks @vyctorbrzezowski. diff --git a/extensions/codex/src/app-server/run-attempt.ts b/extensions/codex/src/app-server/run-attempt.ts index 536878381e7..ce0d0f895f1 100644 --- a/extensions/codex/src/app-server/run-attempt.ts +++ b/extensions/codex/src/app-server/run-attempt.ts @@ -28,6 +28,7 @@ import { registerNativeHookRelay, setActiveEmbeddedRun, supportsModelTools, + runAgentCleanupStep, type AgentMessage, type EmbeddedRunAttemptParams, type EmbeddedRunAttemptResult, @@ -682,7 +683,15 @@ export async function runCodexAppServerAttempt( notificationCleanup(); requestCleanup(); nativeHookRelay?.unregister(); - await trajectoryRecorder?.flush(); + await runAgentCleanupStep({ + runId: params.runId, + sessionId: params.sessionId, + step: "codex-trajectory-flush-startup-failure", + log: embeddedAgentLog, + cleanup: async () => { + await trajectoryRecorder?.flush(); + }, + }); params.abortSignal?.removeEventListener("abort", abortFromUpstream); throw error; } @@ -886,7 +895,15 @@ export async function runCodexAppServerAttempt( aborted: runAbortController.signal.aborted, }); } - await trajectoryRecorder?.flush(); + await runAgentCleanupStep({ + runId: params.runId, + sessionId: params.sessionId, + step: "codex-trajectory-flush", + log: embeddedAgentLog, + cleanup: async () => { + await trajectoryRecorder?.flush(); + }, + }); userInputBridge?.cancelPending(); clearTimeout(timeout); clearTurnCompletionIdleTimer(); diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 04c869a697a..80f67f93cb7 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -12,6 +12,7 @@ import { formatErrorMessage } from "../../infra/errors.js"; import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; import { resolveProviderAuthProfileId } from "../../plugins/provider-runtime.js"; import { enqueueCommandInLane } from "../../process/command-queue.js"; +import type { CommandQueueEnqueueOptions } from "../../process/command-queue.types.js"; import { normalizeOptionalString } from "../../shared/string-coerce.js"; import { sanitizeForLog } from "../../terminal/ansi.js"; import { resolveUserPath } from "../../utils.js"; @@ -76,6 +77,7 @@ import { pickFallbackThinkingLevel, } from "../pi-embedded-helpers.js"; import { resolveProviderIdForAuth } from "../provider-auth-aliases.js"; +import { runAgentCleanupStep } from "../run-cleanup-timeout.js"; import { buildAgentRuntimeAuthPlan } from "../runtime-plan/auth.js"; import { buildAgentRuntimePlan } from "../runtime-plan/build.js"; import { ensureRuntimePluginsLoaded } from "../runtime-plugins.js"; @@ -159,8 +161,26 @@ import { createUsageAccumulator, mergeUsageIntoAccumulator } from "./usage-accum type ApiKeyInfo = ResolvedProviderAuth; const MAX_SAME_MODEL_IDLE_TIMEOUT_RETRIES = 1; +const EMBEDDED_RUN_LANE_TIMEOUT_GRACE_MS = 30_000; type EmbeddedRunAttemptForRunner = Awaited>; +function resolveEmbeddedRunLaneTimeoutMs(timeoutMs: number): number | undefined { + if (!Number.isFinite(timeoutMs) || timeoutMs <= 0) { + return undefined; + } + return Math.floor(timeoutMs) + EMBEDDED_RUN_LANE_TIMEOUT_GRACE_MS; +} + +function withEmbeddedRunLaneTimeout( + opts: CommandQueueEnqueueOptions | undefined, + laneTaskTimeoutMs: number | undefined, +): CommandQueueEnqueueOptions | undefined { + if (laneTaskTimeoutMs === undefined || opts?.taskTimeoutMs !== undefined) { + return opts; + } + return { ...opts, taskTimeoutMs: laneTaskTimeoutMs }; +} + function normalizeEmbeddedRunAttemptResult( attempt: EmbeddedRunAttemptForRunner, ): EmbeddedRunAttemptForRunner { @@ -292,10 +312,15 @@ export async function runEmbeddedPiAgent( } const sessionLane = resolveSessionLane(params.sessionKey?.trim() || params.sessionId); const globalLane = resolveGlobalLane(params.lane); - const enqueueGlobal = - params.enqueue ?? ((task, opts) => enqueueCommandInLane(globalLane, task, opts)); - const enqueueSession = - params.enqueue ?? ((task, opts) => enqueueCommandInLane(sessionLane, task, opts)); + const laneTaskTimeoutMs = resolveEmbeddedRunLaneTimeoutMs(params.timeoutMs); + const withLaneTimeout = (opts?: CommandQueueEnqueueOptions) => + withEmbeddedRunLaneTimeout(opts, laneTaskTimeoutMs); + const enqueueGlobal = (task: () => Promise, opts?: CommandQueueEnqueueOptions) => + params.enqueue + ? params.enqueue(task, withLaneTimeout(opts)) + : enqueueCommandInLane(globalLane, task, withLaneTimeout(opts)); + const enqueueSession = (task: () => Promise, opts?: CommandQueueEnqueueOptions) => + params.enqueue ? params.enqueue(task, opts) : enqueueCommandInLane(sessionLane, task, opts); const channelHint = params.messageChannel ?? params.messageProvider; const resolvedToolResultFormat = params.toolResultFormat ?? @@ -2489,26 +2514,42 @@ export async function runEmbeddedPiAgent( } } finally { forgetPromptBuildDrainCacheForRun(params.runId); - await contextEngine.dispose?.(); stopRuntimeAuthRefreshTimer(); + await runAgentCleanupStep({ + runId: params.runId, + sessionId: params.sessionId, + step: "context-engine-dispose", + log, + cleanup: async () => { + await contextEngine.dispose?.(); + }, + }); if (params.cleanupBundleMcpOnRunEnd === true) { - const onError = (error: unknown, sessionId: string) => { - log.warn( - `bundle-mcp cleanup failed after run for ${sessionId}: ${formatErrorMessage(error)}`, - ); - }; - const retiredBySessionKey = await retireSessionMcpRuntimeForSessionKey({ - sessionKey: params.sessionKey, - reason: "embedded-run-end", - onError, + await runAgentCleanupStep({ + runId: params.runId, + sessionId: params.sessionId, + step: "bundle-mcp-retire", + log, + cleanup: async () => { + const onError = (error: unknown, sessionId: string) => { + log.warn( + `bundle-mcp cleanup failed after run for ${sessionId}: ${formatErrorMessage(error)}`, + ); + }; + const retiredBySessionKey = await retireSessionMcpRuntimeForSessionKey({ + sessionKey: params.sessionKey, + reason: "embedded-run-end", + onError, + }); + if (!retiredBySessionKey) { + await retireSessionMcpRuntime({ + sessionId: params.sessionId, + reason: "embedded-run-end", + onError, + }); + } + }, }); - if (!retiredBySessionKey) { - await retireSessionMcpRuntime({ - sessionId: params.sessionId, - reason: "embedded-run-end", - onError, - }); - } } } }); diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 545aa1c5acb..6fcfeec8cd7 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -119,6 +119,7 @@ import { import { wrapStreamFnTextTransforms } from "../../plugin-text-transforms.js"; import { describeProviderRequestRoutingSummary } from "../../provider-attribution.js"; import { registerProviderStreamForModel } from "../../provider-stream.js"; +import { runAgentCleanupStep } from "../../run-cleanup-timeout.js"; import { collectRuntimeChannelCapabilities } from "../../runtime-capabilities.js"; import { logAgentRuntimeToolDiagnostics, @@ -3299,7 +3300,15 @@ export async function runEmbeddedAttempt( promptError: promptError ? formatErrorMessage(promptError) : undefined, }); } - await trajectoryRecorder?.flush(); + await runAgentCleanupStep({ + runId: params.runId, + sessionId: params.sessionId, + step: "pi-trajectory-flush", + log, + cleanup: async () => { + await trajectoryRecorder?.flush(); + }, + }); // Always tear down the session (and release the lock) before we leave this attempt. // // BUGFIX: Wait for the agent to be truly idle before flushing pending tool results. diff --git a/src/agents/run-cleanup-timeout.test.ts b/src/agents/run-cleanup-timeout.test.ts new file mode 100644 index 00000000000..f9080f71ae6 --- /dev/null +++ b/src/agents/run-cleanup-timeout.test.ts @@ -0,0 +1,53 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { AGENT_CLEANUP_STEP_TIMEOUT_MS, runAgentCleanupStep } from "./run-cleanup-timeout.js"; + +describe("agent cleanup timeout", () => { + const log = { + warn: vi.fn(), + }; + + beforeEach(() => { + vi.useFakeTimers(); + log.warn.mockClear(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("returns after the cleanup timeout when a cleanup step stalls", async () => { + const cleanup = vi.fn(async () => new Promise(() => {})); + + const result = runAgentCleanupStep({ + runId: "run-1", + sessionId: "session-1", + step: "bundle-mcp-retire", + cleanup, + log, + }); + + await vi.advanceTimersByTimeAsync(AGENT_CLEANUP_STEP_TIMEOUT_MS); + await expect(result).resolves.toBeUndefined(); + + expect(cleanup).toHaveBeenCalledTimes(1); + expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("agent cleanup timed out:")); + expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("step=bundle-mcp-retire")); + }); + + it("logs cleanup rejection without throwing", async () => { + await expect( + runAgentCleanupStep({ + runId: "run-2", + sessionId: "session-2", + step: "context-engine-dispose", + cleanup: async () => { + throw new Error("dispose failed"); + }, + log, + }), + ).resolves.toBeUndefined(); + + expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("agent cleanup failed:")); + expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("dispose failed")); + }); +}); diff --git a/src/agents/run-cleanup-timeout.ts b/src/agents/run-cleanup-timeout.ts new file mode 100644 index 00000000000..37b4797b0a5 --- /dev/null +++ b/src/agents/run-cleanup-timeout.ts @@ -0,0 +1,52 @@ +import { formatErrorMessage } from "../infra/errors.js"; + +export const AGENT_CLEANUP_STEP_TIMEOUT_MS = 10_000; + +export type AgentCleanupLogger = { + warn: (message: string) => void; +}; + +export async function runAgentCleanupStep(params: { + runId: string; + sessionId: string; + step: string; + cleanup: () => Promise; + log: AgentCleanupLogger; + timeoutMs?: number; +}): Promise { + const timeoutMs = Math.max(1, Math.floor(params.timeoutMs ?? AGENT_CLEANUP_STEP_TIMEOUT_MS)); + let timeoutHandle: ReturnType | undefined; + let timedOut = false; + const cleanupPromise = Promise.resolve().then(params.cleanup); + const observedCleanupPromise = cleanupPromise.catch((error) => { + if (!timedOut) { + params.log.warn( + `agent cleanup failed: runId=${params.runId} sessionId=${params.sessionId} step=${params.step} error=${formatErrorMessage(error)}`, + ); + } + }); + const timeoutPromise = new Promise<"timeout">((resolve) => { + timeoutHandle = setTimeout(() => { + timedOut = true; + resolve("timeout"); + }, timeoutMs); + timeoutHandle.unref?.(); + }); + const result = await Promise.race([ + observedCleanupPromise.then(() => "done" as const), + timeoutPromise, + ]); + if (timeoutHandle) { + clearTimeout(timeoutHandle); + } + if (result === "timeout") { + params.log.warn( + `agent cleanup timed out: runId=${params.runId} sessionId=${params.sessionId} step=${params.step} timeoutMs=${timeoutMs}`, + ); + void cleanupPromise.catch((error) => { + params.log.warn( + `agent cleanup rejected after timeout: runId=${params.runId} sessionId=${params.sessionId} step=${params.step} error=${formatErrorMessage(error)}`, + ); + }); + } +} diff --git a/src/plugin-sdk/agent-harness-runtime.ts b/src/plugin-sdk/agent-harness-runtime.ts index 9c01c1c251f..8202ebd73a7 100644 --- a/src/plugin-sdk/agent-harness-runtime.ts +++ b/src/plugin-sdk/agent-harness-runtime.ts @@ -60,6 +60,7 @@ export { VERSION as OPENCLAW_VERSION } from "../version.js"; export { formatErrorMessage } from "../infra/errors.js"; export { formatApprovalDisplayPath } from "../infra/approval-display-paths.js"; export { emitAgentEvent, onAgentEvent, resetAgentEventsForTest } from "../infra/agent-events.js"; +export { runAgentCleanupStep } from "../agents/run-cleanup-timeout.js"; export { log as embeddedAgentLog } from "../agents/pi-embedded-runner/logger.js"; export { buildAgentRuntimePlan } from "../agents/runtime-plan/build.js"; export { classifyEmbeddedPiRunResultForModelFallback } from "../agents/pi-embedded-runner/result-fallback-classifier.js"; diff --git a/src/process/command-queue.test.ts b/src/process/command-queue.test.ts index 149d12cee16..ca55e430c80 100644 --- a/src/process/command-queue.test.ts +++ b/src/process/command-queue.test.ts @@ -22,6 +22,7 @@ type CommandQueueModule = typeof import("./command-queue.js"); let clearCommandLane: CommandQueueModule["clearCommandLane"]; let CommandLaneClearedError: CommandQueueModule["CommandLaneClearedError"]; +let CommandLaneTaskTimeoutError: CommandQueueModule["CommandLaneTaskTimeoutError"]; let enqueueCommand: CommandQueueModule["enqueueCommand"]; let enqueueCommandInLane: CommandQueueModule["enqueueCommandInLane"]; let GatewayDrainingError: CommandQueueModule["GatewayDrainingError"]; @@ -63,6 +64,7 @@ describe("command queue", () => { ({ clearCommandLane, CommandLaneClearedError, + CommandLaneTaskTimeoutError, enqueueCommand, enqueueCommandInLane, GatewayDrainingError, @@ -326,6 +328,42 @@ describe("command queue", () => { await expect(other).resolves.toBe("other"); }); + it("task timeout releases a stuck lane and drains queued work", async () => { + const lane = `timeout-lane-${Date.now()}-${Math.random().toString(16).slice(2)}`; + setCommandLaneConcurrency(lane, 1); + + vi.useFakeTimers(); + try { + const first = enqueueCommandInLane(lane, async () => new Promise(() => {}), { + taskTimeoutMs: 25, + }); + const firstRejected = expect(first).rejects.toBeInstanceOf(CommandLaneTaskTimeoutError); + let secondRan = false; + const second = enqueueCommandInLane(lane, async () => { + secondRan = true; + return "second"; + }); + + expect(secondRan).toBe(false); + expect(getCommandLaneSnapshot(lane)).toMatchObject({ + activeCount: 1, + queuedCount: 1, + }); + + await vi.advanceTimersByTimeAsync(25); + + await firstRejected; + await expect(second).resolves.toBe("second"); + expect(secondRan).toBe(true); + expect(getCommandLaneSnapshot(lane)).toMatchObject({ + activeCount: 0, + queuedCount: 0, + }); + } finally { + vi.useRealTimers(); + } + }); + it("getCommandLaneSnapshot reports active and queued work for one lane", async () => { const lane = `snapshot-lane-${Date.now()}-${Math.random().toString(16).slice(2)}`; setCommandLaneConcurrency(lane, 1); diff --git a/src/process/command-queue.ts b/src/process/command-queue.ts index c640f36b939..da9ef7d2a0a 100644 --- a/src/process/command-queue.ts +++ b/src/process/command-queue.ts @@ -4,6 +4,7 @@ import { logLaneEnqueue, } from "../logging/diagnostic-runtime.js"; import { resolveGlobalSingleton } from "../shared/global-singleton.js"; +import type { CommandQueueEnqueueOptions } from "./command-queue.types.js"; import { CommandLane } from "./lanes.js"; /** * Dedicated error type thrown when a queued command is rejected because @@ -17,6 +18,18 @@ export class CommandLaneClearedError extends Error { } } +/** + * Dedicated error type thrown when an active command exceeds its caller-owned + * lane timeout. The underlying task may still be unwinding, but the lane is + * released so queued work is not blocked forever. + */ +export class CommandLaneTaskTimeoutError extends Error { + constructor(lane: string, timeoutMs: number) { + super(`Command lane "${lane}" task timed out after ${timeoutMs}ms`); + this.name = "CommandLaneTaskTimeoutError"; + } +} + /** * Dedicated error type thrown when a new command is rejected because the * gateway is currently draining for restart. @@ -39,6 +52,7 @@ type QueueEntry = { reject: (reason?: unknown) => void; enqueuedAt: number; warnAfterMs: number; + taskTimeoutMs?: number; onWait?: (waitMs: number, queuedAhead: number) => void; }; @@ -172,6 +186,48 @@ function notifyActiveTaskWaiters(): void { } } +function normalizeTaskTimeoutMs(value: number | undefined): number | undefined { + if (value === undefined || !Number.isFinite(value) || value <= 0) { + return undefined; + } + return Math.max(1, Math.floor(value)); +} + +async function runQueueEntryTask(lane: string, entry: QueueEntry): Promise { + const taskPromise = Promise.resolve().then(entry.task); + const taskTimeoutMs = normalizeTaskTimeoutMs(entry.taskTimeoutMs); + if (taskTimeoutMs === undefined) { + return await taskPromise; + } + + let timeoutHandle: ReturnType | undefined; + let timedOut = false; + const timeoutPromise = new Promise((_, reject) => { + timeoutHandle = setTimeout(() => { + timedOut = true; + reject(new CommandLaneTaskTimeoutError(lane, taskTimeoutMs)); + }, taskTimeoutMs); + timeoutHandle.unref?.(); + }); + + try { + return await Promise.race([taskPromise, timeoutPromise]); + } catch (err) { + if (timedOut) { + void taskPromise.catch((lateErr) => { + diag.warn( + `lane task rejected after timeout: lane=${lane} timeoutMs=${taskTimeoutMs} error="${String(lateErr)}"`, + ); + }); + } + throw err; + } finally { + if (!timedOut && timeoutHandle) { + clearTimeout(timeoutHandle); + } + } +} + function drainLane(lane: string) { const state = getLaneState(lane); if (state.draining) { @@ -206,7 +262,7 @@ function drainLane(lane: string) { void (async () => { const startTime = Date.now(); try { - const result = await entry.task(); + const result = await runQueueEntryTask(lane, entry); const completedCurrentGeneration = completeTask(state, taskId, taskGeneration); if (completedCurrentGeneration) { notifyActiveTaskWaiters(); @@ -262,10 +318,7 @@ export function setCommandLaneConcurrency(lane: string, maxConcurrent: number) { export function enqueueCommandInLane( lane: string, task: () => Promise, - opts?: { - warnAfterMs?: number; - onWait?: (waitMs: number, queuedAhead: number) => void; - }, + opts?: CommandQueueEnqueueOptions, ): Promise { const queueState = getQueueState(); if (queueState.gatewayDraining) { @@ -281,6 +334,7 @@ export function enqueueCommandInLane( reject, enqueuedAt: Date.now(), warnAfterMs, + taskTimeoutMs: normalizeTaskTimeoutMs(opts?.taskTimeoutMs), onWait: opts?.onWait, }); logLaneEnqueue(cleaned, getLaneDepth(state)); @@ -290,10 +344,7 @@ export function enqueueCommandInLane( export function enqueueCommand( task: () => Promise, - opts?: { - warnAfterMs?: number; - onWait?: (waitMs: number, queuedAhead: number) => void; - }, + opts?: CommandQueueEnqueueOptions, ): Promise { return enqueueCommandInLane(CommandLane.Main, task, opts); } diff --git a/src/process/command-queue.types.ts b/src/process/command-queue.types.ts index 5e600554e02..2e6e6d450c9 100644 --- a/src/process/command-queue.types.ts +++ b/src/process/command-queue.types.ts @@ -1,7 +1,10 @@ +export type CommandQueueEnqueueOptions = { + warnAfterMs?: number; + onWait?: (waitMs: number, queuedAhead: number) => void; + taskTimeoutMs?: number; +}; + export type CommandQueueEnqueueFn = ( task: () => Promise, - opts?: { - warnAfterMs?: number; - onWait?: (waitMs: number, queuedAhead: number) => void; - }, + opts?: CommandQueueEnqueueOptions, ) => Promise;