diff --git a/src/agents/pi-embedded-runner/run.overflow-compaction.loop.test.ts b/src/agents/pi-embedded-runner/run.overflow-compaction.loop.test.ts index 0e7e6870b84..c70bedde48e 100644 --- a/src/agents/pi-embedded-runner/run.overflow-compaction.loop.test.ts +++ b/src/agents/pi-embedded-runner/run.overflow-compaction.loop.test.ts @@ -550,6 +550,26 @@ describe("overflow compaction in run loop", () => { expect(result.payloads?.[0]?.text).toContain("timed out"); }); + it("returns a timeout payload instead of a partial assistant fragment after stream timeout", async () => { + mockedRunEmbeddedAttempt.mockResolvedValue( + makeAttemptResult({ + aborted: true, + timedOut: true, + timedOutDuringCompaction: false, + assistantTexts: ["# Current Tasks\n\nLast updated:"], + lastAssistant: undefined, + }), + ); + + const result = await runEmbeddedPiAgent(baseParams); + + expect(result.payloads?.[0]?.isError).toBe(true); + expect(result.payloads?.[0]?.text).toContain("timed out"); + expect( + result.payloads?.some((payload) => (payload.text ?? "").includes("# Current Tasks")), + ).toBe(false); + }); + it("sets promptTokens from the latest model call usage, not accumulated attempt usage", async () => { mockedRunEmbeddedAttempt.mockResolvedValue( makeAttemptResult({ diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index dcbec9ec8cc..dc7ce4b9f34 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -2069,6 +2069,17 @@ export async function runEmbeddedPiAgent( toolMediaUrls: attempt.toolMediaUrls, toolAudioAsVoice: attempt.toolAudioAsVoice, }); + const timedOutDuringPrompt = + timedOut && !timedOutDuringCompaction && !timedOutDuringToolExecution; + const hasPartialAssistantTextAfterPromptTimeout = + timedOutDuringPrompt && + (attempt.assistantTexts ?? []).some((text) => text.trim().length > 0) && + !attempt.clientToolCalls && + !attempt.yieldDetected && + !attempt.didSendViaMessagingTool && + !attempt.didSendDeterministicApprovalPrompt && + !attempt.lastToolError && + (attempt.toolMetas?.length ?? 0) === 0; const attemptToolSummary = buildTraceToolSummary({ toolMetas: attempt.toolMetas, hadFailure: Boolean(attempt.lastToolError), @@ -2078,14 +2089,11 @@ export async function runEmbeddedPiAgent( lastToolError: attempt.lastToolError, }); - // Timeout aborts can leave the run without any assistant payloads. - // Emit an explicit timeout error instead of silently completing, so - // callers do not lose the turn as an orphaned user message. + // Timeout aborts can leave the run without payloads or with only a + // partial assistant fragment. Emit an explicit timeout error instead. if ( - timedOut && - !timedOutDuringCompaction && - !timedOutDuringToolExecution && - !payloadsWithToolMedia?.length + timedOutDuringPrompt && + (!payloadsWithToolMedia?.length || hasPartialAssistantTextAfterPromptTimeout) ) { const timeoutText = idleTimedOut ? "The model did not produce a response before the model idle timeout. " + @@ -2094,7 +2102,7 @@ export async function runEmbeddedPiAgent( "Please try again, or increase `agents.defaults.timeoutSeconds` in your config."; const replayInvalid = resolveReplayInvalidForAttempt(null); const livenessState = resolveRunLivenessState({ - payloadCount: payloads.length, + payloadCount: hasPartialAssistantTextAfterPromptTimeout ? 0 : payloads.length, aborted, timedOut, attempt, diff --git a/src/agents/provider-transport-fetch.test.ts b/src/agents/provider-transport-fetch.test.ts index da1969bf698..38302047920 100644 --- a/src/agents/provider-transport-fetch.test.ts +++ b/src/agents/provider-transport-fetch.test.ts @@ -204,6 +204,46 @@ describe("buildGuardedModelFetch", () => { expect(items).toEqual([{ ok: true }]); }); + it("refreshes the guarded timeout while consuming streaming response chunks", async () => { + const encoder = new TextEncoder(); + const refreshTimeout = vi.fn(); + fetchWithSsrFGuardMock.mockResolvedValue({ + response: new Response( + new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode("event: message\n\n")); + controller.enqueue(encoder.encode('data: {"ok": true}\n\n')); + controller.close(); + }, + }), + { headers: { "content-type": "text/event-stream" } }, + ), + finalUrl: "https://api.openai.com/v1/chat/completions", + release: vi.fn(async () => undefined), + refreshTimeout, + }); + + const { buildGuardedModelFetch } = await import("./provider-transport-fetch.js"); + const model = { + id: "gpt-5.4", + provider: "openai", + api: "openai-completions", + baseUrl: "https://api.openai.com/v1", + } as unknown as Model<"openai-completions">; + + const response = await buildGuardedModelFetch(model)( + "https://api.openai.com/v1/chat/completions", + { method: "POST" }, + ); + const items = []; + for await (const item of Stream.fromSSEResponse(response, new AbortController())) { + items.push(item); + } + + expect(items).toEqual([{ ok: true }]); + expect(refreshTimeout).toHaveBeenCalledTimes(2); + }); + describe("long retry-after handling", () => { const anthropicModel = { id: "sonnet-4.6", diff --git a/src/agents/provider-transport-fetch.ts b/src/agents/provider-transport-fetch.ts index 4dfc1b44087..78d9a8b2e9f 100644 --- a/src/agents/provider-transport-fetch.ts +++ b/src/agents/provider-transport-fetch.ts @@ -172,7 +172,11 @@ function shouldBypassLongSdkRetry(response: Response): boolean { return status === 429; } -function buildManagedResponse(response: Response, release: () => Promise): Response { +function buildManagedResponse( + response: Response, + release: () => Promise, + refreshTimeout?: () => void, +): Response { if (!response.body) { void release(); return response; @@ -199,6 +203,7 @@ function buildManagedResponse(response: Response, release: () => Promise): await finalize(); return; } + refreshTimeout?.(); controller.enqueue(chunk.value); } catch (error) { controller.error(error); @@ -315,7 +320,7 @@ export function buildGuardedModelFetch(model: Model, timeoutMs?: number): t headers, }); } - response = sanitizeOpenAISdkSseResponse(response); - return buildManagedResponse(response, result.release); + response = buildManagedResponse(response, result.release, result.refreshTimeout); + return sanitizeOpenAISdkSseResponse(response); }; } diff --git a/src/infra/net/fetch-guard.ts b/src/infra/net/fetch-guard.ts index 9851b62d18b..9acea3c5092 100644 --- a/src/infra/net/fetch-guard.ts +++ b/src/infra/net/fetch-guard.ts @@ -86,6 +86,7 @@ export type GuardedFetchResult = { response: Response; finalUrl: string; release: () => Promise; + refreshTimeout?: () => void; }; type GuardedFetchPresetOptions = Omit< @@ -315,7 +316,7 @@ export async function fetchWithSsrFGuard(params: GuardedFetchOptions): Promise release(dispatcher), + refreshTimeout: refresh, }; } catch (err) { if (err instanceof SsrFBlockedError) { diff --git a/src/utils/fetch-timeout.test.ts b/src/utils/fetch-timeout.test.ts index 179d46ae87e..4a295c44205 100644 --- a/src/utils/fetch-timeout.test.ts +++ b/src/utils/fetch-timeout.test.ts @@ -105,4 +105,25 @@ describe("buildTimeoutAbortSignal", () => { cleanup(); }); + + it("refreshes its timeout when progress is observed", async () => { + const { signal, refresh, cleanup } = buildTimeoutAbortSignal({ + timeoutMs: 25, + operation: "unit-test", + }); + + await vi.advanceTimersByTimeAsync(20); + refresh(); + await vi.advanceTimersByTimeAsync(24); + + expect(signal?.aborted).toBe(false); + expect(warn).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(1); + + expect(signal?.aborted).toBe(true); + expect(warn).toHaveBeenCalledTimes(1); + + cleanup(); + }); }); diff --git a/src/utils/fetch-timeout.ts b/src/utils/fetch-timeout.ts index fa43dcd7a8e..bd1b7c2ffcd 100644 --- a/src/utils/fetch-timeout.ts +++ b/src/utils/fetch-timeout.ts @@ -93,26 +93,32 @@ function abortDueToTimeout( export function buildTimeoutAbortSignal(params: TimeoutAbortSignalParams): { signal?: AbortSignal; cleanup: () => void; + refresh: () => void; } { const { timeoutMs, signal } = params; if (!timeoutMs && !signal) { - return { signal: undefined, cleanup: () => {} }; + return { signal: undefined, cleanup: () => {}, refresh: () => {} }; } if (!timeoutMs) { - return { signal, cleanup: () => {} }; + return { signal, cleanup: () => {}, refresh: () => {} }; } const controller = new AbortController(); const normalizedTimeoutMs = Math.max(1, Math.floor(timeoutMs)); - const timeoutId = setTimeout( - abortDueToTimeout, - normalizedTimeoutMs, - controller, - normalizedTimeoutMs, - Date.now(), - params.operation, - params.url, - ); + let active = true; + let timeoutId: ReturnType | undefined; + const scheduleTimeout = () => { + timeoutId = setTimeout( + abortDueToTimeout, + normalizedTimeoutMs, + controller, + normalizedTimeoutMs, + Date.now(), + params.operation, + params.url, + ); + }; + scheduleTimeout(); const onAbort = bindAbortRelay(controller); if (signal) { if (signal.aborted) { @@ -124,8 +130,20 @@ export function buildTimeoutAbortSignal(params: TimeoutAbortSignalParams): { return { signal: controller.signal, + refresh: () => { + if (!active || controller.signal.aborted) { + return; + } + if (timeoutId) { + clearTimeout(timeoutId); + } + scheduleTimeout(); + }, cleanup: () => { - clearTimeout(timeoutId); + active = false; + if (timeoutId) { + clearTimeout(timeoutId); + } if (signal) { signal.removeEventListener("abort", onAbort); }