diff --git a/src/agents/pi-embedded-runner/run/llm-idle-timeout.test.ts b/src/agents/pi-embedded-runner/run/llm-idle-timeout.test.ts index 64c67f55ad8..b21b4d1e6c0 100644 --- a/src/agents/pi-embedded-runner/run/llm-idle-timeout.test.ts +++ b/src/agents/pi-embedded-runner/run/llm-idle-timeout.test.ts @@ -277,19 +277,37 @@ describe("streamWithIdleTimeout", () => { const baseFn = vi.fn().mockReturnValue(mockStream); const wrapped = streamWithIdleTimeout(baseFn, 1000); - const model = { api: "openai" } as Parameters[0]; + const model = { api: "openai", requestTimeoutMs: 5000 } as Parameters[0]; const context = {} as Parameters[1]; const options = {} as Parameters[2]; void wrapped(model, context, options); expect(baseFn).toHaveBeenCalledWith( - model, + expect.objectContaining({ api: "openai", requestTimeoutMs: 1000 }), context, expect.objectContaining({ signal: expect.any(AbortSignal) }), ); }); + it("keeps model request timeouts that are shorter than the idle watchdog", () => { + const mockStream = createMockAsyncIterable([]); + const baseFn = vi.fn().mockReturnValue(mockStream); + const wrapped = streamWithIdleTimeout(baseFn, 1000); + + const model = { requestTimeoutMs: 250 } as Parameters[0]; + const context = {} as Parameters[1]; + const options = {} as Parameters[2]; + + void wrapped(model, context, options); + + expect(baseFn).toHaveBeenCalledWith( + expect.objectContaining({ requestTimeoutMs: 250 }), + context, + expect.any(Object), + ); + }); + it("throws on idle timeout", async () => { vi.useFakeTimers(); const slowStream = createNeverYieldingStream(); diff --git a/src/agents/pi-embedded-runner/run/llm-idle-timeout.ts b/src/agents/pi-embedded-runner/run/llm-idle-timeout.ts index 7fae4d00c29..49f82c929bd 100644 --- a/src/agents/pi-embedded-runner/run/llm-idle-timeout.ts +++ b/src/agents/pi-embedded-runner/run/llm-idle-timeout.ts @@ -225,9 +225,22 @@ export function streamWithIdleTimeout( sourceSignal?.removeEventListener("abort", abortFromSourceSignal); }; const wrappedOptions = { - ...(options ?? {}), + ...options, signal: streamAbortController.signal, } as typeof options; + const existingRequestTimeoutMs = + typeof (model as { requestTimeoutMs?: unknown })?.requestTimeoutMs === "number" && + Number.isFinite((model as { requestTimeoutMs?: number }).requestTimeoutMs) && + (model as { requestTimeoutMs?: number }).requestTimeoutMs! > 0 + ? Math.floor((model as { requestTimeoutMs?: number }).requestTimeoutMs!) + : timeoutMs; + const wrappedModel = + typeof model === "object" && model !== null + ? ({ + ...model, + requestTimeoutMs: Math.min(existingRequestTimeoutMs, timeoutMs), + } as typeof model) + : model; const createTimeoutPromise = (setTimer: (timer: NodeJS.Timeout) => void): Promise => { return new Promise((_, reject) => { @@ -244,7 +257,7 @@ export function streamWithIdleTimeout( let maybeStream: ReturnType; try { - maybeStream = baseFn(model, context, wrappedOptions); + maybeStream = baseFn(wrappedModel, context, wrappedOptions); } catch (error) { cleanupSourceSignal(); throw error;