diff --git a/extensions/copilot/src/hooks-bridge.test.ts b/extensions/copilot/src/hooks-bridge.test.ts index 8729a20174b..8d3b6a273c4 100755 --- a/extensions/copilot/src/hooks-bridge.test.ts +++ b/extensions/copilot/src/hooks-bridge.test.ts @@ -5,6 +5,7 @@ describe("createHooksBridge", () => { const hookBase = { sessionId: "runtime-session", timestamp: new Date(0), + cwd: "/", workingDirectory: "/", }; @@ -40,6 +41,7 @@ describe("createHooksBridge", () => { const hooks = createHooksBridge({ onPreToolUse })!; const input = { ...hookBase, + cwd: "/tmp", workingDirectory: "/tmp", toolName: "bash", toolArgs: { cmd: "ls" }, diff --git a/src/agents/embedded-agent-helpers.isbillingerrormessage.test.ts b/src/agents/embedded-agent-helpers.isbillingerrormessage.test.ts index 693f5e12331..9dc65d4033b 100644 --- a/src/agents/embedded-agent-helpers.isbillingerrormessage.test.ts +++ b/src/agents/embedded-agent-helpers.isbillingerrormessage.test.ts @@ -1563,6 +1563,25 @@ describe("classifyProviderRuntimeFailureKind", () => { ).toBe("replay_invalid"); }); + it("classifies expired Anthropic thinking signatures as replay invalid", () => { + expect( + classifyProviderRuntimeFailureKind( + '{"type":"error","error":{"type":"invalid_request_error","message":"messages.1.content.440: Invalid `signature` in `thinking` block"}}', + ), + ).toBe("replay_invalid"); + expect( + classifyProviderRuntimeFailureKind( + "ValidationException: invalid signature on thinking block", + ), + ).toBe("replay_invalid"); + expect( + classifyProviderRuntimeFailureKind( + "ValidationException: signature present in thinking block", + ), + ).not.toBe("replay_invalid"); + expect(classifyProviderRuntimeFailureKind("Invalid signature")).not.toBe("replay_invalid"); + }); + it("splits ambiguous provider runtime failures instead of collapsing to unknown", () => { expect(classifyProviderRuntimeFailureKind({})).toBe("empty_response"); expect(classifyProviderRuntimeFailureKind("Unknown error (no error details in response)")).toBe( diff --git a/src/agents/embedded-agent-helpers/errors.ts b/src/agents/embedded-agent-helpers/errors.ts index 43166614022..8abb332c559 100644 --- a/src/agents/embedded-agent-helpers/errors.ts +++ b/src/agents/embedded-agent-helpers/errors.ts @@ -356,6 +356,8 @@ const INTERRUPTED_NETWORK_ERROR_RE = /\beconnrefused\b|\beconnreset\b|\beconnaborted\b|\benetreset\b|\behostunreach\b|\behostdown\b|\benetunreach\b|\bepipe\b|\bsocket hang up\b|\bconnection refused\b|\bconnection reset\b|\bconnection aborted\b|\bnetwork is unreachable\b|\bhost is unreachable\b|\bfetch failed\b|\bconnection error\b|\bnetwork request failed\b/i; const REPLAY_INVALID_RE = /\bprevious_response_id\b.*\b(?:invalid|unknown|not found|does not exist|expired|mismatch)\b|\btool_(?:use|call)\.(?:input|arguments)\b.*\b(?:missing|required)\b|\bincorrect role information\b|\broles must alternate\b|\binput item id does not belong to this connection\b/i; +const THINKING_SIGNATURE_ERROR_RE = + /\b(?:invalid|expired)\b.*\bsignature\b|\bsignature\b.*\b(?:invalid|expired)\b/i; const SANDBOX_BLOCKED_RE = /\bapproval is required\b|\bapproval timed out\b|\bapproval was denied\b|\bblocked by sandbox\b|\bsandbox\b.*\b(?:blocked|denied|forbidden|disabled|not allowed)\b|\bexec denied\s*\(/i; const NO_BODY_HTTP_WRAPPER_RE = @@ -471,7 +473,11 @@ function isDnsTransportErrorMessage(raw: string): boolean { } function isReplayInvalidErrorMessage(raw: string): boolean { - return REPLAY_INVALID_RE.test(raw); + return REPLAY_INVALID_RE.test(raw) || isThinkingSignatureReplayInvalidErrorMessage(raw); +} + +function isThinkingSignatureReplayInvalidErrorMessage(raw: string): boolean { + return /\bthinking\b/i.test(raw) && THINKING_SIGNATURE_ERROR_RE.test(raw); } function isSandboxBlockedErrorMessage(raw: string): boolean { diff --git a/src/agents/embedded-agent-runner/model.ts b/src/agents/embedded-agent-runner/model.ts index 3ffb7c4d0f0..099feba57ac 100644 --- a/src/agents/embedded-agent-runner/model.ts +++ b/src/agents/embedded-agent-runner/model.ts @@ -77,6 +77,7 @@ type ProviderRuntimeHooks = { type StaticCatalogFallbackModel = Model & { compat?: ModelCompatConfig; contextTokens?: number; + params?: Record; mediaInput?: ModelMediaInputConfig; }; @@ -1025,7 +1026,7 @@ function resolveConfiguredFallbackModel(params: { provider, modelId, providerParams: providerConfig?.params, - configuredParams: configuredModel?.params, + configuredParams: metadataModel?.params, }); const fallbackTransport = resolveProviderTransport({ provider, diff --git a/src/agents/embedded-agent-runner/thinking.test.ts b/src/agents/embedded-agent-runner/thinking.test.ts index 941dcb2cad2..441f6e56617 100644 --- a/src/agents/embedded-agent-runner/thinking.test.ts +++ b/src/agents/embedded-agent-runner/thinking.test.ts @@ -482,6 +482,37 @@ describe("wrapAnthropicStreamWithRecovery", () => { const anthropicThinkingError = new Error( "thinking or redacted_thinking blocks in the latest assistant message cannot be modified", ); + const terminalThinkingSignatureError = + "ValidationException: invalid signature on thinking block in message history"; + + function createTestAssistantMessage( + overrides: Partial & Pick, + ): AssistantMessage { + return castAgentMessage({ + role: "assistant", + api: "anthropic-messages", + provider: "anthropic", + model: "claude-sonnet-4-6", + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + timestamp: 0, + ...overrides, + }) as AssistantMessage; + } + + function createTestStreamErrorMessage(errorMessage: string): AssistantMessage { + return createTestAssistantMessage({ + content: [{ type: "text", text: "stream failed" }], + stopReason: "error", + errorMessage, + }); + } it("retries once with omitted-reasoning text when the request is rejected before streaming", async () => { let callCount = 0; @@ -584,6 +615,131 @@ describe("wrapAnthropicStreamWithRecovery", () => { expect(callCount).toBe(2); }); + it("retries pre-content terminal stream-error events with omitted-reasoning text", async () => { + let callCount = 0; + const contexts: Array<{ messages?: AgentMessage[] }> = []; + const finalMessage = createTestAssistantMessage({ + content: [{ type: "text", text: "recovered" }], + stopReason: "stop", + }); + const wrapped = wrapAnthropicStreamWithRecovery( + ((_model, context) => { + callCount += 1; + const attempt = callCount; + contexts.push(context as { messages?: AgentMessage[] }); + const stream = createAssistantMessageEventStream(); + queueMicrotask(() => { + if (attempt === 1) { + stream.push({ + type: "error", + reason: "error", + error: createTestStreamErrorMessage(terminalThinkingSignatureError), + }); + } else { + stream.push({ type: "done", reason: "stop", message: finalMessage }); + } + stream.end(); + }); + return stream; + }) as Parameters[0], + { id: "test-session" }, + ); + + const response = wrapped( + {} as never, + { + messages: castAgentMessages([ + { + role: "assistant", + content: [{ type: "thinking", thinking: "secret", thinkingSignature: "sig" }], + }, + ]), + } as never, + {} as never, + ) as { result: () => Promise } & AsyncIterable; + const events: unknown[] = []; + for await (const event of response) { + events.push(event); + } + + expect(events).toEqual([{ type: "done", reason: "stop", message: finalMessage }]); + await expect(response.result()).resolves.toEqual(finalMessage); + expect(callCount).toBe(2); + const retryMessage = contexts[1]?.messages?.[0]; + if (!retryMessage || retryMessage.role !== "assistant") { + throw new Error("Expected Anthropic recovery retry to start with an assistant message"); + } + expect(retryMessage.content).toEqual([ + { type: "text", text: OMITTED_ASSISTANT_REASONING_TEXT }, + ]); + }); + + it("does not retry non-thinking terminal stream-error events", async () => { + let callCount = 0; + const errorMessage = createTestStreamErrorMessage("rate limit exceeded"); + const wrapped = wrapAnthropicStreamWithRecovery( + (() => { + callCount += 1; + const stream = createAssistantMessageEventStream(); + queueMicrotask(() => { + stream.push({ type: "error", reason: "error", error: errorMessage }); + stream.end(); + }); + return stream; + }) as Parameters[0], + { id: "test-session" }, + ); + + const response = wrapped({} as never, { messages: [] } as never, {} as never) as { + result: () => Promise; + } & AsyncIterable; + const events: unknown[] = []; + for await (const event of response) { + events.push(event); + } + + expect(events).toEqual([{ type: "error", reason: "error", error: errorMessage }]); + await expect(response.result()).resolves.toEqual(errorMessage); + expect(callCount).toBe(1); + }); + + it("does not retry terminal stream-error events after output was yielded", async () => { + let callCount = 0; + const partialMessage = createTestAssistantMessage({ + content: [{ type: "text", text: "" }], + stopReason: "stop", + }); + const errorMessage = createTestStreamErrorMessage(terminalThinkingSignatureError); + const wrapped = wrapAnthropicStreamWithRecovery( + (() => { + callCount += 1; + const stream = createAssistantMessageEventStream(); + queueMicrotask(() => { + stream.push({ type: "start", partial: partialMessage }); + stream.push({ type: "error", reason: "error", error: errorMessage }); + stream.end(); + }); + return stream; + }) as Parameters[0], + { id: "test-session" }, + ); + + const response = wrapped({} as never, { messages: [] } as never, {} as never) as { + result: () => Promise; + } & AsyncIterable; + const events: unknown[] = []; + for await (const event of response) { + events.push(event); + } + + expect(events).toEqual([ + { type: "start", partial: partialMessage }, + { type: "error", reason: "error", error: errorMessage }, + ]); + await expect(response.result()).resolves.toEqual(errorMessage); + expect(callCount).toBe(1); + }); + it("does not retry when the stream fails after yielding a chunk", async () => { let callCount = 0; const wrapped = wrapAnthropicStreamWithRecovery( diff --git a/src/agents/embedded-agent-runner/thinking.ts b/src/agents/embedded-agent-runner/thinking.ts index 71f830d3562..361d14ac476 100644 --- a/src/agents/embedded-agent-runner/thinking.ts +++ b/src/agents/embedded-agent-runner/thinking.ts @@ -1,4 +1,5 @@ import { formatErrorMessage } from "../../infra/errors.js"; +import type { AssistantMessageEvent } from "../../llm/types.js"; import { createAssistantMessageEventStream } from "../../llm/utils/event-stream.js"; import type { AgentMessage, StreamFn } from "../runtime/index.js"; import { log } from "./logger.js"; @@ -427,7 +428,13 @@ function shouldRecoverAnthropicThinkingError( error: unknown, sessionMeta: RecoverySessionMeta, ): boolean { - const message = formatErrorMessage(error); + return shouldRecoverAnthropicThinkingErrorMessage(formatErrorMessage(error), sessionMeta); +} + +function shouldRecoverAnthropicThinkingErrorMessage( + message: string, + sessionMeta: RecoverySessionMeta, +): boolean { if (!THINKING_BLOCK_ERROR_PATTERN.test(message)) { return false; } @@ -440,17 +447,66 @@ function shouldRecoverAnthropicThinkingError( return true; } +function isAssistantMessageErrorEvent( + event: unknown, +): event is Extract { + return ( + Boolean(event) && typeof event === "object" && (event as { type?: unknown }).type === "error" + ); +} + +function getAssistantMessageErrorText( + event: Extract, +): string { + const errorMessage = (event.error as { errorMessage?: unknown }).errorMessage; + return typeof errorMessage === "string" ? errorMessage : ""; +} + +async function retryStreamWithoutThinking( + outer: ReturnType, + retry: () => ReturnType, +): Promise { + const retryStream = retry(); + const resolvedRetry = retryStream instanceof Promise ? await retryStream : retryStream; + for await (const chunk of resolvedRetry as AsyncIterable) { + outer.push(chunk as Parameters[0]); + } + const result = await (resolvedRetry as { result?: () => Promise }).result?.(); + return result as AssistantMessage; +} + async function pumpStreamWithRecovery( outer: ReturnType, stream: ReturnType, sessionMeta: RecoverySessionMeta, retry: () => ReturnType, ): Promise { - let yieldedChunk = false; + let yieldedOutput = false; try { const resolved = stream instanceof Promise ? await stream : stream; for await (const chunk of resolved as AsyncIterable) { - yieldedChunk = true; + if (isAssistantMessageErrorEvent(chunk)) { + if ( + shouldRecoverAnthropicThinkingErrorMessage( + getAssistantMessageErrorText(chunk), + sessionMeta, + ) + ) { + if (yieldedOutput) { + log.warn( + `[session-recovery] Anthropic thinking error occurred after streaming began; skipping retry to avoid duplicate chunks: sessionId=${sessionMeta.id}`, + ); + } else { + sessionMeta.recoveredAnthropicThinking = true; + log.warn( + `[session-recovery] Anthropic thinking stream error; retrying once without thinking blocks: sessionId=${sessionMeta.id}`, + ); + return retryStreamWithoutThinking(outer, retry); + } + } + } else { + yieldedOutput = true; + } outer.push(chunk as Parameters[0]); } const result = await (resolved as { result?: () => Promise }).result?.(); @@ -459,7 +515,7 @@ async function pumpStreamWithRecovery( if (!shouldRecoverAnthropicThinkingError(error, sessionMeta)) { throw error; } - if (yieldedChunk) { + if (yieldedOutput) { log.warn( `[session-recovery] Anthropic thinking error occurred after streaming began; skipping retry to avoid duplicate chunks: sessionId=${sessionMeta.id}`, ); @@ -469,13 +525,7 @@ async function pumpStreamWithRecovery( log.warn( `[session-recovery] Anthropic thinking error during stream; retrying once without thinking blocks: sessionId=${sessionMeta.id}`, ); - const retryStream = retry(); - const resolvedRetry = retryStream instanceof Promise ? await retryStream : retryStream; - for await (const chunk of resolvedRetry as AsyncIterable) { - outer.push(chunk as Parameters[0]); - } - const result = await (resolvedRetry as { result?: () => Promise }).result?.(); - return result as AssistantMessage; + return retryStreamWithoutThinking(outer, retry); } }