diff --git a/package.json b/package.json index 7ff14ff117d..14de46be320 100644 --- a/package.json +++ b/package.json @@ -1488,7 +1488,6 @@ }, "dependencies": { "@agentclientprotocol/sdk": "0.19.0", - "@anthropic-ai/sdk": "0.81.0", "@anthropic-ai/vertex-sdk": "^0.16.0", "@aws-sdk/client-bedrock": "3.1032.0", "@aws-sdk/client-bedrock-runtime": "3.1032.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b4e5c5733a4..a57af5e72b4 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -41,9 +41,6 @@ importers: '@agentclientprotocol/sdk': specifier: 0.19.0 version: 0.19.0(zod@4.3.6) - '@anthropic-ai/sdk': - specifier: 0.81.0 - version: 0.81.0(zod@4.3.6) '@anthropic-ai/vertex-sdk': specifier: ^0.16.0 version: 0.16.0(zod@4.3.6) diff --git a/src/agents/anthropic-transport-stream.test.ts b/src/agents/anthropic-transport-stream.test.ts index e643a1adec4..99fdf28497e 100644 --- a/src/agents/anthropic-transport-stream.test.ts +++ b/src/agents/anthropic-transport-stream.test.ts @@ -2,30 +2,32 @@ import type { Model } from "@mariozechner/pi-ai"; import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import { attachModelProviderRequestTransport } from "./provider-request-config.js"; -const { - anthropicCtorMock, - anthropicMessagesStreamMock, - buildGuardedModelFetchMock, - guardedFetchMock, -} = vi.hoisted(() => ({ - anthropicCtorMock: vi.fn(), - anthropicMessagesStreamMock: vi.fn(), +const { buildGuardedModelFetchMock, guardedFetchMock } = vi.hoisted(() => ({ buildGuardedModelFetchMock: vi.fn(), guardedFetchMock: vi.fn(), })); -vi.mock("@anthropic-ai/sdk", () => ({ - default: anthropicCtorMock, -})); - vi.mock("./provider-transport-fetch.js", () => ({ buildGuardedModelFetch: buildGuardedModelFetchMock, })); let createAnthropicMessagesTransportStreamFn: typeof import("./anthropic-transport-stream.js").createAnthropicMessagesTransportStreamFn; -function emptyEventStream(): AsyncIterable> { - return (async function* () {})(); +function createSseResponse(events: Record[] = []): Response { + const body = events.map((event) => `data: ${JSON.stringify(event)}\n\n`).join(""); + return new Response(body, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); +} + +function latestAnthropicRequest() { + const [, init] = guardedFetchMock.mock.calls.at(-1) ?? []; + const body = init?.body; + return { + init, + payload: typeof body === "string" ? (JSON.parse(body) as Record) : {}, + }; } describe("anthropic transport stream", () => { @@ -35,19 +37,10 @@ describe("anthropic transport stream", () => { }); beforeEach(() => { - anthropicCtorMock.mockReset(); - anthropicMessagesStreamMock.mockReset(); buildGuardedModelFetchMock.mockReset(); guardedFetchMock.mockReset(); buildGuardedModelFetchMock.mockReturnValue(guardedFetchMock); - anthropicMessagesStreamMock.mockReturnValue(emptyEventStream()); - anthropicCtorMock.mockImplementation(function mockAnthropicClient() { - return { - messages: { - stream: anthropicMessagesStreamMock, - }, - }; - }); + guardedFetchMock.mockResolvedValue(createSseResponse()); }); it("uses the guarded fetch transport for api-key Anthropic requests", async () => { @@ -89,12 +82,14 @@ describe("anthropic transport stream", () => { await stream.result(); expect(buildGuardedModelFetchMock).toHaveBeenCalledWith(model); - expect(anthropicCtorMock).toHaveBeenCalledWith( + expect(guardedFetchMock).toHaveBeenCalledWith( + "https://api.anthropic.com/v1/messages", expect.objectContaining({ - apiKey: "sk-ant-api", - baseURL: "https://api.anthropic.com", - fetch: guardedFetchMock, - defaultHeaders: expect.objectContaining({ + method: "POST", + headers: expect.objectContaining({ + "x-api-key": "sk-ant-api", + "anthropic-version": "2023-06-01", + "content-type": "application/json", accept: "application/json", "anthropic-dangerous-direct-browser-access": "true", "X-Provider": "anthropic", @@ -102,13 +97,10 @@ describe("anthropic transport stream", () => { }), }), ); - expect(anthropicMessagesStreamMock).toHaveBeenCalledWith( - expect.objectContaining({ - model: "claude-sonnet-4-6", - stream: true, - }), - undefined, - ); + expect(latestAnthropicRequest().payload).toMatchObject({ + model: "claude-sonnet-4-6", + stream: true, + }); }); it("ignores non-positive runtime maxTokens overrides and falls back to the model limit", async () => { @@ -147,14 +139,11 @@ describe("anthropic transport stream", () => { ); await stream.result(); - expect(anthropicMessagesStreamMock).toHaveBeenCalledWith( - expect.objectContaining({ - model: "claude-sonnet-4-6", - max_tokens: 8192, - stream: true, - }), - undefined, - ); + expect(latestAnthropicRequest().payload).toMatchObject({ + model: "claude-sonnet-4-6", + max_tokens: 8192, + stream: true, + }); }); it("ignores fractional runtime maxTokens overrides that floor to zero", async () => { @@ -193,14 +182,11 @@ describe("anthropic transport stream", () => { ); await stream.result(); - expect(anthropicMessagesStreamMock).toHaveBeenCalledWith( - expect.objectContaining({ - model: "claude-sonnet-4-6", - max_tokens: 8192, - stream: true, - }), - undefined, - ); + expect(latestAnthropicRequest().payload).toMatchObject({ + model: "claude-sonnet-4-6", + max_tokens: 8192, + stream: true, + }); }); it("fails locally when Anthropic maxTokens is non-positive after resolution", async () => { @@ -243,17 +229,17 @@ describe("anthropic transport stream", () => { expect(result.errorMessage).toContain( "Anthropic Messages transport requires a positive maxTokens value", ); - expect(anthropicMessagesStreamMock).not.toHaveBeenCalled(); + expect(guardedFetchMock).not.toHaveBeenCalled(); }); it("preserves Anthropic OAuth identity and tool-name remapping with transport overrides", async () => { - anthropicMessagesStreamMock.mockReturnValueOnce( - (async function* () { - yield { + guardedFetchMock.mockResolvedValueOnce( + createSseResponse([ + { type: "message_start", message: { id: "msg_1", usage: { input_tokens: 10, output_tokens: 0 } }, - }; - yield { + }, + { type: "content_block_start", index: 0, content_block: { @@ -262,17 +248,17 @@ describe("anthropic transport stream", () => { name: "Read", input: { path: "/tmp/a" }, }, - }; - yield { + }, + { type: "content_block_stop", index: 0, - }; - yield { + }, + { type: "message_delta", delta: { stop_reason: "tool_use" }, usage: { input_tokens: 10, output_tokens: 5 }, - }; - })(), + }, + ]), ); const model = attachModelProviderRequestTransport( { @@ -321,21 +307,17 @@ describe("anthropic transport stream", () => { ); const result = await stream.result(); - expect(anthropicCtorMock).toHaveBeenCalledWith( + expect(guardedFetchMock).toHaveBeenCalledWith( + "https://api.anthropic.com/v1/messages", expect.objectContaining({ - apiKey: null, - authToken: "sk-ant-oat-example", - fetch: guardedFetchMock, - defaultHeaders: expect.objectContaining({ + headers: expect.objectContaining({ + authorization: "Bearer sk-ant-oat-example", "x-app": "cli", "user-agent": expect.stringContaining("claude-cli/"), }), }), ); - const firstCallParams = anthropicMessagesStreamMock.mock.calls[0]?.[0] as Record< - string, - unknown - >; + const firstCallParams = latestAnthropicRequest().payload; expect(firstCallParams.system).toEqual( expect.arrayContaining([ expect.objectContaining({ @@ -407,10 +389,7 @@ describe("anthropic transport stream", () => { ); await stream.result(); - const firstCallParams = anthropicMessagesStreamMock.mock.calls[0]?.[0] as Record< - string, - unknown - >; + const firstCallParams = latestAnthropicRequest().payload; expect(firstCallParams.messages).toEqual( expect.arrayContaining([ expect.objectContaining({ @@ -463,13 +442,10 @@ describe("anthropic transport stream", () => { ); await stream.result(); - expect(anthropicMessagesStreamMock).toHaveBeenCalledWith( - expect.objectContaining({ - thinking: { type: "adaptive" }, - output_config: { effort: "max" }, - }), - undefined, - ); + expect(latestAnthropicRequest().payload).toMatchObject({ + thinking: { type: "adaptive" }, + output_config: { effort: "max" }, + }); }); it("maps xhigh thinking effort for Claude Opus 4.7 transport runs", async () => { @@ -508,12 +484,9 @@ describe("anthropic transport stream", () => { ); await stream.result(); - expect(anthropicMessagesStreamMock).toHaveBeenCalledWith( - expect.objectContaining({ - thinking: { type: "adaptive" }, - output_config: { effort: "xhigh" }, - }), - undefined, - ); + expect(latestAnthropicRequest().payload).toMatchObject({ + thinking: { type: "adaptive" }, + output_config: { effort: "xhigh" }, + }); }); }); diff --git a/src/agents/anthropic-transport-stream.ts b/src/agents/anthropic-transport-stream.ts index 4ebdfb053d3..3e3f528ff90 100644 --- a/src/agents/anthropic-transport-stream.ts +++ b/src/agents/anthropic-transport-stream.ts @@ -1,4 +1,3 @@ -import Anthropic from "@anthropic-ai/sdk"; import type { StreamFn } from "@mariozechner/pi-agent-core"; import { calculateCost, @@ -60,6 +59,14 @@ type AnthropicTransportModel = Model<"anthropic-messages"> & { type AnthropicTransportOptions = AnthropicOptions & Pick; type AnthropicAdaptiveEffort = NonNullable | "xhigh"; +type AnthropicMessagesClient = { + messages: { + stream( + params: Record, + options?: { signal?: AbortSignal }, + ): AsyncIterable>; + }; +}; type TransportContentBlock = | { type: "text"; text: string; index?: number } @@ -419,6 +426,96 @@ function mapStopReason(reason: string | undefined): string { } } +function resolveAnthropicMessagesUrl(baseUrl?: string): string { + const normalized = (baseUrl?.trim() || "https://api.anthropic.com").replace(/\/+$/, ""); + return normalized.endsWith("/v1") ? `${normalized}/messages` : `${normalized}/v1/messages`; +} + +async function* parseAnthropicSseBody( + body: ReadableStream, +): AsyncIterable> { + const reader = body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + try { + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + buffer = `${buffer}${decoder.decode(value, { stream: true })}`.replaceAll("\r\n", "\n"); + let frameEnd = buffer.indexOf("\n\n"); + while (frameEnd >= 0) { + const frame = buffer.slice(0, frameEnd); + buffer = buffer.slice(frameEnd + 2); + const data = frame + .split("\n") + .filter((line) => line.startsWith("data:")) + .map((line) => line.slice(5).trimStart()) + .join("\n"); + if (data && data !== "[DONE]") { + yield JSON.parse(data) as Record; + } + frameEnd = buffer.indexOf("\n\n"); + } + } + const tail = `${buffer}${decoder.decode()}`.replaceAll("\r\n", "\n").trim(); + if (tail) { + const data = tail + .split("\n") + .filter((line) => line.startsWith("data:")) + .map((line) => line.slice(5).trimStart()) + .join("\n"); + if (data && data !== "[DONE]") { + yield JSON.parse(data) as Record; + } + } + } finally { + reader.releaseLock(); + } +} + +function createAnthropicMessagesClient(params: { + apiKey?: string | null; + authToken?: string; + baseURL?: string; + defaultHeaders?: Record; + fetch: typeof fetch; +}): AnthropicMessagesClient { + const url = resolveAnthropicMessagesUrl(params.baseURL); + return { + messages: { + async *stream(body: Record, options?: { signal?: AbortSignal }) { + const headers = mergeTransportHeaders( + { + "content-type": "application/json", + "anthropic-version": "2023-06-01", + ...(params.apiKey ? { "x-api-key": params.apiKey } : {}), + ...(params.authToken ? { authorization: `Bearer ${params.authToken}` } : {}), + }, + params.defaultHeaders, + ); + const response = await params.fetch(url, { + method: "POST", + headers, + body: JSON.stringify(body), + signal: options?.signal, + }); + if (!response.ok) { + const detail = await response.text().catch(() => ""); + throw new Error( + detail || `Anthropic Messages request failed with HTTP ${response.status}`, + ); + } + if (!response.body) { + return; + } + yield* parseAnthropicSseBody(response.body); + }, + }, + }; +} + function createAnthropicTransportClient(params: { model: AnthropicTransportModel; context: Context; @@ -432,11 +529,10 @@ function createAnthropicTransportClient(params: { if (model.provider === "github-copilot") { const betaFeatures = needsInterleavedBeta ? ["interleaved-thinking-2025-05-14"] : []; return { - client: new Anthropic({ + client: createAnthropicMessagesClient({ apiKey: null, authToken: apiKey, baseURL: model.baseUrl, - dangerouslyAllowBrowser: true, defaultHeaders: mergeTransportHeaders( { accept: "application/json", @@ -461,11 +557,10 @@ function createAnthropicTransportClient(params: { } if (isAnthropicOAuthToken(apiKey)) { return { - client: new Anthropic({ + client: createAnthropicMessagesClient({ apiKey: null, authToken: apiKey, baseURL: model.baseUrl, - dangerouslyAllowBrowser: true, defaultHeaders: mergeTransportHeaders( { accept: "application/json", @@ -483,10 +578,9 @@ function createAnthropicTransportClient(params: { }; } return { - client: new Anthropic({ + client: createAnthropicMessagesClient({ apiKey, baseURL: model.baseUrl, - dangerouslyAllowBrowser: true, defaultHeaders: mergeTransportHeaders( { accept: "application/json", @@ -676,12 +770,16 @@ export function createAnthropicMessagesTransportStreamFn(): StreamFn { params = nextParams as Record; } const anthropicStream = client.messages.stream( - { ...params, stream: true } as never, + { ...params, stream: true }, transportOptions.signal ? { signal: transportOptions.signal } : undefined, - ) as AsyncIterable>; + ); stream.push({ type: "start", partial: output as never }); const blocks = output.content; for await (const event of anthropicStream) { + if (event.type === "error") { + const error = event.error as { message?: string } | undefined; + throw new Error(error?.message || "Anthropic Messages stream failed"); + } if (event.type === "message_start") { const message = event.message as | { id?: string; usage?: Record }