From 84b72e66b918c04c286a5db84a5f4e103054cf23 Mon Sep 17 00:00:00 2001 From: Liu Yuan Date: Fri, 27 Mar 2026 15:52:10 +0800 Subject: [PATCH] feat: add LLM idle timeout for streaming responses Problem: When LLM stops responding, the agent hangs for ~5 minutes with no feedback. Users had to use /stop to recover. Solution: Add idle timeout detection for LLM streaming responses. --- src/agents/pi-embedded-runner/run/attempt.ts | 24 +- .../run/llm-idle-timeout.test.ts | 219 ++++++++++++++++++ .../run/llm-idle-timeout.ts | 121 ++++++++++ src/config/schema.base.generated.ts | 13 ++ src/config/types.agent-defaults.ts | 15 ++ src/config/zod-schema.agent-defaults.ts | 13 ++ 6 files changed, 404 insertions(+), 1 deletion(-) create mode 100644 src/agents/pi-embedded-runner/run/llm-idle-timeout.test.ts create mode 100644 src/agents/pi-embedded-runner/run/llm-idle-timeout.ts diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index f9447247ff6..778e3ee905c 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -177,6 +177,7 @@ import { } from "./compaction-timeout.js"; import { pruneProcessedHistoryImages } from "./history-image-prune.js"; import { detectAndLoadPromptImages } from "./images.js"; +import { resolveLlmIdleTimeoutMs, streamWithIdleTimeout } from "./llm-idle-timeout.js"; import type { EmbeddedRunAttemptParams, EmbeddedRunAttemptResult } from "./types.js"; export { @@ -1056,7 +1057,6 @@ export async function runEmbeddedAttempt( activeSession.agent.streamFn, ); } - // Anthropic-compatible providers can add new stop reasons before pi-ai maps them. // Recover the known "sensitive" stop reason here so a model refusal does not // bubble out as an uncaught runner error and stall channel polling. @@ -1064,6 +1064,18 @@ export async function runEmbeddedAttempt( activeSession.agent.streamFn, ); + let idleTimeoutTrigger: ((error: Error) => void) | undefined; + + // Wrap stream with idle timeout detection + const idleTimeoutMs = resolveLlmIdleTimeoutMs(params.config); + if (idleTimeoutMs > 0) { + activeSession.agent.streamFn = streamWithIdleTimeout( + activeSession.agent.streamFn, + idleTimeoutMs, + (error) => idleTimeoutTrigger?.(error), + ); + } + try { const prior = await sanitizeSessionHistory({ messages: activeSession.messages, @@ -1156,6 +1168,13 @@ export async function runEmbeddedAttempt( }; const makeAbortError = (signal: AbortSignal): Error => { const reason = getAbortReason(signal); + // If the reason is already an Error, preserve it to keep the original message + // (e.g., "LLM idle timeout (60s): no response from model" instead of "aborted") + if (reason instanceof Error) { + const err = new Error(reason.message, { cause: reason }); + err.name = "AbortError"; + return err; + } const err = reason ? new Error("aborted", { cause: reason }) : new Error("aborted"); err.name = "AbortError"; return err; @@ -1187,6 +1206,9 @@ export async function runEmbeddedAttempt( abortCompaction(); void activeSession.abort(); }; + idleTimeoutTrigger = (error) => { + abortRun(true, error); + }; const abortable = (promise: Promise): Promise => { const signal = runAbortController.signal; if (signal.aborted) { diff --git a/src/agents/pi-embedded-runner/run/llm-idle-timeout.test.ts b/src/agents/pi-embedded-runner/run/llm-idle-timeout.test.ts new file mode 100644 index 00000000000..290e7d83de9 --- /dev/null +++ b/src/agents/pi-embedded-runner/run/llm-idle-timeout.test.ts @@ -0,0 +1,219 @@ +import { describe, expect, it, vi } from "vitest"; +import type { OpenClawConfig } from "../../../config/config.js"; +import { + DEFAULT_LLM_IDLE_TIMEOUT_MS, + resolveLlmIdleTimeoutMs, + streamWithIdleTimeout, +} from "./llm-idle-timeout.js"; + +describe("resolveLlmIdleTimeoutMs", () => { + it("returns default when config is undefined", () => { + expect(resolveLlmIdleTimeoutMs(undefined)).toBe(DEFAULT_LLM_IDLE_TIMEOUT_MS); + }); + + it("returns default when llm config is missing", () => { + const cfg = { agents: {} } as OpenClawConfig; + expect(resolveLlmIdleTimeoutMs(cfg)).toBe(DEFAULT_LLM_IDLE_TIMEOUT_MS); + }); + + it("returns default when idleTimeoutSeconds is not set", () => { + const cfg = { agents: { defaults: { llm: {} } } } as OpenClawConfig; + expect(resolveLlmIdleTimeoutMs(cfg)).toBe(DEFAULT_LLM_IDLE_TIMEOUT_MS); + }); + + it("returns 0 when idleTimeoutSeconds is 0 (disabled)", () => { + const cfg = { agents: { defaults: { llm: { idleTimeoutSeconds: 0 } } } } as OpenClawConfig; + expect(resolveLlmIdleTimeoutMs(cfg)).toBe(0); + }); + + it("returns configured value in milliseconds", () => { + const cfg = { agents: { defaults: { llm: { idleTimeoutSeconds: 30 } } } } as OpenClawConfig; + expect(resolveLlmIdleTimeoutMs(cfg)).toBe(30_000); + }); + + it("caps at max safe timeout", () => { + const cfg = { + agents: { defaults: { llm: { idleTimeoutSeconds: 10_000_000 } } }, + } as OpenClawConfig; + expect(resolveLlmIdleTimeoutMs(cfg)).toBe(2_147_000_000); + }); + + it("ignores negative values", () => { + const cfg = { agents: { defaults: { llm: { idleTimeoutSeconds: -10 } } } } as OpenClawConfig; + expect(resolveLlmIdleTimeoutMs(cfg)).toBe(DEFAULT_LLM_IDLE_TIMEOUT_MS); + }); + + it("ignores non-finite values", () => { + const cfg = { + agents: { defaults: { llm: { idleTimeoutSeconds: Infinity } } }, + } as OpenClawConfig; + expect(resolveLlmIdleTimeoutMs(cfg)).toBe(DEFAULT_LLM_IDLE_TIMEOUT_MS); + }); +}); + +describe("streamWithIdleTimeout", () => { + // Helper to create a mock async iterable + function createMockAsyncIterable(chunks: T[]): AsyncIterable { + return { + [Symbol.asyncIterator]() { + let index = 0; + return { + async next() { + if (index < chunks.length) { + return { done: false, value: chunks[index++] }; + } + return { done: true, value: undefined }; + }, + async return() { + return { done: true, value: undefined }; + }, + }; + }, + }; + } + + it("wraps stream function", () => { + const mockStream = createMockAsyncIterable([]); + const baseFn = vi.fn().mockReturnValue(mockStream); + const wrapped = streamWithIdleTimeout(baseFn, 1000); + expect(typeof wrapped).toBe("function"); + }); + + it("passes through model, context, and options", async () => { + const mockStream = createMockAsyncIterable([]); + const baseFn = vi.fn().mockReturnValue(mockStream); + const wrapped = streamWithIdleTimeout(baseFn, 1000); + + const model = { api: "openai" } as Parameters[0]; + const context = {} as Parameters[1]; + const options = {} as Parameters[2]; + + void wrapped(model, context, options); + + expect(baseFn).toHaveBeenCalledWith(model, context, options); + }); + + it("throws on idle timeout", async () => { + // Create a stream that never yields + const slowStream: AsyncIterable = { + [Symbol.asyncIterator]() { + return { + async next() { + // Never resolves - simulates hung LLM + return new Promise>(() => {}); + }, + }; + }, + }; + + const baseFn = vi.fn().mockReturnValue(slowStream); + const wrapped = streamWithIdleTimeout(baseFn, 50); // 50ms timeout + + const model = {} as Parameters[0]; + const context = {} as Parameters[1]; + const options = {} as Parameters[2]; + + const stream = wrapped(model, context, options) as AsyncIterable; + const iterator = stream[Symbol.asyncIterator](); + + await expect(iterator.next()).rejects.toThrow(/LLM idle timeout/); + }); + + it("resets timer on each chunk", async () => { + const chunks = [{ text: "a" }, { text: "b" }, { text: "c" }]; + const mockStream = createMockAsyncIterable(chunks); + const baseFn = vi.fn().mockReturnValue(mockStream); + const wrapped = streamWithIdleTimeout(baseFn, 1000); + + const model = {} as Parameters[0]; + const context = {} as Parameters[1]; + const options = {} as Parameters[2]; + + const stream = wrapped(model, context, options) as AsyncIterable; + const results: unknown[] = []; + + for await (const chunk of stream) { + results.push(chunk); + } + + expect(results).toHaveLength(3); + expect(results).toEqual(chunks); + }); + + it("handles stream with delays between chunks", async () => { + // Create a stream with small delays + const delayedStream: AsyncIterable<{ text: string }> = { + [Symbol.asyncIterator]() { + let count = 0; + return { + async next() { + if (count < 3) { + await new Promise((r) => setTimeout(r, 10)); // 10ms delay + return { done: false, value: { text: String(count++) } }; + } + return { done: true, value: undefined }; + }, + }; + }, + }; + + const baseFn = vi.fn().mockReturnValue(delayedStream); + const wrapped = streamWithIdleTimeout(baseFn, 100); // 100ms timeout - should be enough + + const model = {} as Parameters[0]; + const context = {} as Parameters[1]; + const options = {} as Parameters[2]; + + const stream = wrapped(model, context, options) as AsyncIterable<{ text: string }>; + const results: { text: string }[] = []; + + for await (const chunk of stream) { + results.push(chunk); + } + + expect(results).toHaveLength(3); + }); + + it("aborts controller on idle timeout", async () => { + // Create a stream that never yields + const slowStream: AsyncIterable = { + [Symbol.asyncIterator]() { + return { + async next() { + // Never resolves - simulates hung LLM + return new Promise>(() => {}); + }, + }; + }, + }; + + const baseFn = vi.fn().mockReturnValue(slowStream); + const controller = new AbortController(); + const wrapped = streamWithIdleTimeout(baseFn, 50, controller); // 50ms timeout + + const model = {} as Parameters[0]; + const context = {} as Parameters[1]; + const options = {} as Parameters[2]; + + const stream = wrapped(model, context, options) as AsyncIterable; + const iterator = stream[Symbol.asyncIterator](); + + try { + await iterator.next(); + // Should not reach here + expect.fail("Expected timeout error"); + } catch (error) { + // Verify the error message is preserved + expect(error).toBeInstanceOf(Error); + expect((error as Error).message).toMatch(/LLM idle timeout/); + + // Verify the controller was aborted + expect(controller.signal.aborted).toBe(true); + + // Verify the abort reason is the same error + const reason = controller.signal.reason; + expect(reason).toBeInstanceOf(Error); + expect((reason as Error).message).toMatch(/LLM idle timeout/); + } + }); +}); diff --git a/src/agents/pi-embedded-runner/run/llm-idle-timeout.ts b/src/agents/pi-embedded-runner/run/llm-idle-timeout.ts new file mode 100644 index 00000000000..b9fe1ec5267 --- /dev/null +++ b/src/agents/pi-embedded-runner/run/llm-idle-timeout.ts @@ -0,0 +1,121 @@ +import type { StreamFn } from "@mariozechner/pi-agent-core"; +import { streamSimple } from "@mariozechner/pi-ai"; +import type { OpenClawConfig } from "../../../config/config.js"; + +/** + * Default idle timeout for LLM streaming responses in milliseconds. + * If no token is received within this time, the request is aborted. + * Set to 0 to disable (never timeout). + * Default: 60 seconds. + */ +export const DEFAULT_LLM_IDLE_TIMEOUT_MS = 60_000; + +/** + * Maximum safe timeout value (approximately 24.8 days). + */ +const MAX_SAFE_TIMEOUT_MS = 2_147_000_000; + +/** + * Resolves the LLM idle timeout from configuration. + * @param cfg - OpenClaw configuration + * @returns Idle timeout in milliseconds, or 0 to disable + */ +export function resolveLlmIdleTimeoutMs(cfg?: OpenClawConfig): number { + const raw = cfg?.agents?.defaults?.llm?.idleTimeoutSeconds; + // 0 means disabled (no timeout) + if (raw === 0) { + return 0; + } + if (typeof raw === "number" && Number.isFinite(raw) && raw > 0) { + return Math.min(Math.floor(raw) * 1000, MAX_SAFE_TIMEOUT_MS); + } + return DEFAULT_LLM_IDLE_TIMEOUT_MS; +} + +/** + * Wraps a stream function with idle timeout detection. + * If no token is received within the specified timeout, the request is aborted. + * + * @param baseFn - The base stream function to wrap + * @param timeoutMs - Idle timeout in milliseconds + * @param controller - Optional abort controller to abort on timeout + * @returns A wrapped stream function with idle timeout detection + */ +export function streamWithIdleTimeout( + baseFn: StreamFn, + timeoutMs: number, + controller?: AbortController, +): StreamFn { + return (model, context, options) => { + const maybeStream = baseFn(model, context, options); + + const wrapStream = (stream: ReturnType) => { + const originalAsyncIterator = stream[Symbol.asyncIterator].bind(stream); + (stream as { [Symbol.asyncIterator]: typeof originalAsyncIterator })[Symbol.asyncIterator] = + function () { + const iterator = originalAsyncIterator(); + let idleTimer: NodeJS.Timeout | null = null; + + const createTimeoutPromise = (): Promise => { + return new Promise((_, reject) => { + idleTimer = setTimeout(() => { + const error = new Error( + `LLM idle timeout (${Math.floor(timeoutMs / 1000)}s): no response from model`, + ); + if (controller && !controller.signal.aborted) { + controller.abort(error); + } + reject(error); + }, timeoutMs); + }); + }; + + const clearTimer = () => { + if (idleTimer) { + clearTimeout(idleTimer); + idleTimer = null; + } + }; + + return { + async next() { + clearTimer(); + + try { + // Race between the actual next() and the timeout + const result = await Promise.race([iterator.next(), createTimeoutPromise()]); + + if (result.done) { + clearTimer(); + return result; + } + + clearTimer(); + return result; + } catch (error) { + clearTimer(); + throw error; + } + }, + + return() { + clearTimer(); + return iterator.return?.() ?? Promise.resolve({ done: true, value: undefined }); + }, + + throw(error?: unknown) { + clearTimer(); + return iterator.throw?.(error) ?? Promise.reject(error); + }, + }; + }; + + return stream; + }; + + if (maybeStream && typeof maybeStream === "object" && "then" in maybeStream) { + return Promise.resolve(maybeStream).then(wrapStream); + } + return wrapStream(maybeStream); + }; +} diff --git a/src/config/schema.base.generated.ts b/src/config/schema.base.generated.ts index 5c7a694a382..3b6005c768f 100644 --- a/src/config/schema.base.generated.ts +++ b/src/config/schema.base.generated.ts @@ -2303,6 +2303,19 @@ export const GENERATED_BASE_CONFIG_SCHEMA = { }, additionalProperties: false, }, + llm: { + type: "object", + properties: { + idleTimeoutSeconds: { + description: + "Idle timeout for LLM streaming responses in seconds. If no token is received within this time, the request is aborted. Set to 0 to disable. Default: 60 seconds.", + type: "integer", + minimum: 0, + maximum: 9007199254740991, + }, + }, + additionalProperties: false, + }, compaction: { type: "object", properties: { diff --git a/src/config/types.agent-defaults.ts b/src/config/types.agent-defaults.ts index ecaaecb69b9..6366cd064f3 100644 --- a/src/config/types.agent-defaults.ts +++ b/src/config/types.agent-defaults.ts @@ -171,6 +171,8 @@ export type AgentDefaultsConfig = { cliBackends?: Record; /** Opt-in: prune old tool results from the LLM context to reduce token usage. */ contextPruning?: AgentContextPruningConfig; + /** LLM timeout configuration. */ + llm?: AgentLlmConfig; /** Compaction tuning and pre-compaction memory flush behavior. */ compaction?: AgentCompactionConfig; /** Embedded Pi runner hardening and compatibility controls. */ @@ -365,3 +367,16 @@ export type AgentCompactionMemoryFlushConfig = { /** System prompt appended for the memory flush turn. */ systemPrompt?: string; }; + +/** + * LLM timeout configuration. + */ +export type AgentLlmConfig = { + /** + * Idle timeout for LLM streaming responses in seconds. + * If no token is received within this time, the request is aborted. + * Set to 0 to disable (never timeout). + * Default: 60 seconds. + */ + idleTimeoutSeconds?: number; +}; diff --git a/src/config/zod-schema.agent-defaults.ts b/src/config/zod-schema.agent-defaults.ts index 836a1fdae91..abf8be615d4 100644 --- a/src/config/zod-schema.agent-defaults.ts +++ b/src/config/zod-schema.agent-defaults.ts @@ -85,6 +85,19 @@ export const AgentDefaultsSchema = z }) .strict() .optional(), + llm: z + .object({ + idleTimeoutSeconds: z + .number() + .int() + .nonnegative() + .optional() + .describe( + "Idle timeout for LLM streaming responses in seconds. If no token is received within this time, the request is aborted. Set to 0 to disable. Default: 60 seconds.", + ), + }) + .strict() + .optional(), compaction: z .object({ mode: z.union([z.literal("default"), z.literal("safeguard")]).optional(),