fix agents stream timeout heartbeat

This commit is contained in:
mkdev11
2026-05-03 12:58:36 +02:00
committed by Peter Steinberger
parent 4760ee4055
commit ffd3dfa4f5
7 changed files with 138 additions and 24 deletions

View File

@@ -550,6 +550,26 @@ describe("overflow compaction in run loop", () => {
expect(result.payloads?.[0]?.text).toContain("timed out");
});
it("returns a timeout payload instead of a partial assistant fragment after stream timeout", async () => {
mockedRunEmbeddedAttempt.mockResolvedValue(
makeAttemptResult({
aborted: true,
timedOut: true,
timedOutDuringCompaction: false,
assistantTexts: ["# Current Tasks\n\nLast updated:"],
lastAssistant: undefined,
}),
);
const result = await runEmbeddedPiAgent(baseParams);
expect(result.payloads?.[0]?.isError).toBe(true);
expect(result.payloads?.[0]?.text).toContain("timed out");
expect(
result.payloads?.some((payload) => (payload.text ?? "").includes("# Current Tasks")),
).toBe(false);
});
it("sets promptTokens from the latest model call usage, not accumulated attempt usage", async () => {
mockedRunEmbeddedAttempt.mockResolvedValue(
makeAttemptResult({

View File

@@ -2069,6 +2069,17 @@ export async function runEmbeddedPiAgent(
toolMediaUrls: attempt.toolMediaUrls,
toolAudioAsVoice: attempt.toolAudioAsVoice,
});
const timedOutDuringPrompt =
timedOut && !timedOutDuringCompaction && !timedOutDuringToolExecution;
const hasPartialAssistantTextAfterPromptTimeout =
timedOutDuringPrompt &&
(attempt.assistantTexts ?? []).some((text) => text.trim().length > 0) &&
!attempt.clientToolCalls &&
!attempt.yieldDetected &&
!attempt.didSendViaMessagingTool &&
!attempt.didSendDeterministicApprovalPrompt &&
!attempt.lastToolError &&
(attempt.toolMetas?.length ?? 0) === 0;
const attemptToolSummary = buildTraceToolSummary({
toolMetas: attempt.toolMetas,
hadFailure: Boolean(attempt.lastToolError),
@@ -2078,14 +2089,11 @@ export async function runEmbeddedPiAgent(
lastToolError: attempt.lastToolError,
});
// Timeout aborts can leave the run without any assistant payloads.
// Emit an explicit timeout error instead of silently completing, so
// callers do not lose the turn as an orphaned user message.
// Timeout aborts can leave the run without payloads or with only a
// partial assistant fragment. Emit an explicit timeout error instead.
if (
timedOut &&
!timedOutDuringCompaction &&
!timedOutDuringToolExecution &&
!payloadsWithToolMedia?.length
timedOutDuringPrompt &&
(!payloadsWithToolMedia?.length || hasPartialAssistantTextAfterPromptTimeout)
) {
const timeoutText = idleTimedOut
? "The model did not produce a response before the model idle timeout. " +
@@ -2094,7 +2102,7 @@ export async function runEmbeddedPiAgent(
"Please try again, or increase `agents.defaults.timeoutSeconds` in your config.";
const replayInvalid = resolveReplayInvalidForAttempt(null);
const livenessState = resolveRunLivenessState({
payloadCount: payloads.length,
payloadCount: hasPartialAssistantTextAfterPromptTimeout ? 0 : payloads.length,
aborted,
timedOut,
attempt,

View File

@@ -204,6 +204,46 @@ describe("buildGuardedModelFetch", () => {
expect(items).toEqual([{ ok: true }]);
});
it("refreshes the guarded timeout while consuming streaming response chunks", async () => {
const encoder = new TextEncoder();
const refreshTimeout = vi.fn();
fetchWithSsrFGuardMock.mockResolvedValue({
response: new Response(
new ReadableStream({
start(controller) {
controller.enqueue(encoder.encode("event: message\n\n"));
controller.enqueue(encoder.encode('data: {"ok": true}\n\n'));
controller.close();
},
}),
{ headers: { "content-type": "text/event-stream" } },
),
finalUrl: "https://api.openai.com/v1/chat/completions",
release: vi.fn(async () => undefined),
refreshTimeout,
});
const { buildGuardedModelFetch } = await import("./provider-transport-fetch.js");
const model = {
id: "gpt-5.4",
provider: "openai",
api: "openai-completions",
baseUrl: "https://api.openai.com/v1",
} as unknown as Model<"openai-completions">;
const response = await buildGuardedModelFetch(model)(
"https://api.openai.com/v1/chat/completions",
{ method: "POST" },
);
const items = [];
for await (const item of Stream.fromSSEResponse(response, new AbortController())) {
items.push(item);
}
expect(items).toEqual([{ ok: true }]);
expect(refreshTimeout).toHaveBeenCalledTimes(2);
});
describe("long retry-after handling", () => {
const anthropicModel = {
id: "sonnet-4.6",

View File

@@ -172,7 +172,11 @@ function shouldBypassLongSdkRetry(response: Response): boolean {
return status === 429;
}
function buildManagedResponse(response: Response, release: () => Promise<void>): Response {
function buildManagedResponse(
response: Response,
release: () => Promise<void>,
refreshTimeout?: () => void,
): Response {
if (!response.body) {
void release();
return response;
@@ -199,6 +203,7 @@ function buildManagedResponse(response: Response, release: () => Promise<void>):
await finalize();
return;
}
refreshTimeout?.();
controller.enqueue(chunk.value);
} catch (error) {
controller.error(error);
@@ -315,7 +320,7 @@ export function buildGuardedModelFetch(model: Model<Api>, timeoutMs?: number): t
headers,
});
}
response = sanitizeOpenAISdkSseResponse(response);
return buildManagedResponse(response, result.release);
response = buildManagedResponse(response, result.release, result.refreshTimeout);
return sanitizeOpenAISdkSseResponse(response);
};
}

View File

@@ -86,6 +86,7 @@ export type GuardedFetchResult = {
response: Response;
finalUrl: string;
release: () => Promise<void>;
refreshTimeout?: () => void;
};
type GuardedFetchPresetOptions = Omit<
@@ -315,7 +316,7 @@ export async function fetchWithSsrFGuard(params: GuardedFetchOptions): Promise<G
: DEFAULT_MAX_REDIRECTS;
const mode = resolveGuardedFetchMode(params);
const { signal, cleanup } = buildTimeoutAbortSignal({
const { signal, cleanup, refresh } = buildTimeoutAbortSignal({
timeoutMs: params.timeoutMs,
signal: params.signal,
operation: "fetchWithSsrFGuard",
@@ -480,6 +481,7 @@ export async function fetchWithSsrFGuard(params: GuardedFetchOptions): Promise<G
response,
finalUrl: currentUrl,
release: async () => release(dispatcher),
refreshTimeout: refresh,
};
} catch (err) {
if (err instanceof SsrFBlockedError) {

View File

@@ -105,4 +105,25 @@ describe("buildTimeoutAbortSignal", () => {
cleanup();
});
it("refreshes its timeout when progress is observed", async () => {
const { signal, refresh, cleanup } = buildTimeoutAbortSignal({
timeoutMs: 25,
operation: "unit-test",
});
await vi.advanceTimersByTimeAsync(20);
refresh();
await vi.advanceTimersByTimeAsync(24);
expect(signal?.aborted).toBe(false);
expect(warn).not.toHaveBeenCalled();
await vi.advanceTimersByTimeAsync(1);
expect(signal?.aborted).toBe(true);
expect(warn).toHaveBeenCalledTimes(1);
cleanup();
});
});

View File

@@ -93,26 +93,32 @@ function abortDueToTimeout(
export function buildTimeoutAbortSignal(params: TimeoutAbortSignalParams): {
signal?: AbortSignal;
cleanup: () => void;
refresh: () => void;
} {
const { timeoutMs, signal } = params;
if (!timeoutMs && !signal) {
return { signal: undefined, cleanup: () => {} };
return { signal: undefined, cleanup: () => {}, refresh: () => {} };
}
if (!timeoutMs) {
return { signal, cleanup: () => {} };
return { signal, cleanup: () => {}, refresh: () => {} };
}
const controller = new AbortController();
const normalizedTimeoutMs = Math.max(1, Math.floor(timeoutMs));
const timeoutId = setTimeout(
abortDueToTimeout,
normalizedTimeoutMs,
controller,
normalizedTimeoutMs,
Date.now(),
params.operation,
params.url,
);
let active = true;
let timeoutId: ReturnType<typeof setTimeout> | undefined;
const scheduleTimeout = () => {
timeoutId = setTimeout(
abortDueToTimeout,
normalizedTimeoutMs,
controller,
normalizedTimeoutMs,
Date.now(),
params.operation,
params.url,
);
};
scheduleTimeout();
const onAbort = bindAbortRelay(controller);
if (signal) {
if (signal.aborted) {
@@ -124,8 +130,20 @@ export function buildTimeoutAbortSignal(params: TimeoutAbortSignalParams): {
return {
signal: controller.signal,
refresh: () => {
if (!active || controller.signal.aborted) {
return;
}
if (timeoutId) {
clearTimeout(timeoutId);
}
scheduleTimeout();
},
cleanup: () => {
clearTimeout(timeoutId);
active = false;
if (timeoutId) {
clearTimeout(timeoutId);
}
if (signal) {
signal.removeEventListener("abort", onAbort);
}