diff --git a/CHANGELOG.md b/CHANGELOG.md index 019aa0cdd4f..702eb3f1c4a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ Docs: https://docs.openclaw.ai - Discord/tool-call text: strip standalone Gemma-style `...` tool-call payloads from visible assistant text without truncating prose examples or trailing replies. (#67318) Thanks @joelnishanth. - WhatsApp/web-session: drain the pending per-auth creds save queue before reopening sockets so reconnect-time auth bootstrap no longer races in-flight `creds.json` writes and falsely restores from backup. (#67464) Thanks @neeravmakwana. - BlueBubbles/catchup: add a per-message retry ceiling (`catchup.maxFailureRetries`, default 10) so a persistently-failing message with a malformed payload no longer wedges the catchup cursor forever. After N consecutive `processMessage` failures against the same GUID, catchup logs a WARN, skips that message on subsequent sweeps, and lets the cursor advance past it. Transient failures still retry from the same point as before. Also fixes a lost-update race in the persistent dedupe file lock that silently dropped inbound GUIDs on concurrent writes, a dedupe file naming migration gap on version upgrade, and a balloon-event bypass that let catchup replay debouncer-coalesced events as standalone messages. (#67426, #66870) Thanks @omarshahine. +- Ollama/chat: strip the `ollama/` provider prefix from Ollama chat request model ids so configured refs like `ollama/qwen3:14b-q8_0` stop 404ing against the Ollama API. (#67457) Thanks @suboss87. ## 2026.4.15-beta.1 diff --git a/extensions/ollama/src/stream-runtime.test.ts b/extensions/ollama/src/stream-runtime.test.ts index a27a55ce2f9..174e6dbf88e 100644 --- a/extensions/ollama/src/stream-runtime.test.ts +++ b/extensions/ollama/src/stream-runtime.test.ts @@ -1,4 +1,13 @@ -import { describe, expect, it, vi } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; + +const { fetchWithSsrFGuardMock } = vi.hoisted(() => ({ + fetchWithSsrFGuardMock: vi.fn(), +})); + +vi.mock("openclaw/plugin-sdk/ssrf-runtime", () => ({ + fetchWithSsrFGuard: fetchWithSsrFGuardMock, +})); + import { buildOllamaChatRequest, createConfiguredOllamaStreamFn, @@ -9,6 +18,17 @@ import { resolveOllamaBaseUrlForRun, } from "./stream.js"; +type GuardedFetchCall = { + url: string; + init?: RequestInit; + policy?: unknown; + auditContext?: string; +}; + +afterEach(() => { + fetchWithSsrFGuardMock.mockReset(); +}); + describe("buildOllamaChatRequest", () => { it("omits tools when none are provided", () => { expect( @@ -24,6 +44,17 @@ describe("buildOllamaChatRequest", () => { options: { num_ctx: 65536 }, }); }); + + it("strips the ollama/ prefix from chat model ids", () => { + expect( + buildOllamaChatRequest({ + modelId: "ollama/qwen3:14b-q8_0", + messages: [{ role: "user", content: "hello" }], + }), + ).toMatchObject({ + model: "qwen3:14b-q8_0", + }); + }); }); describe("convertToOllamaMessages", () => { @@ -396,26 +427,23 @@ describe("parseNdjsonStream", () => { async function withMockNdjsonFetch( lines: string[], - run: (fetchMock: ReturnType) => Promise, + run: (fetchMock: typeof fetchWithSsrFGuardMock) => Promise, ): Promise { - const originalFetch = globalThis.fetch; - const fetchMock = vi.fn(async () => { + fetchWithSsrFGuardMock.mockImplementation(async () => { const payload = lines.join("\n"); - return new Response(`${payload}\n`, { - status: 200, - headers: { "Content-Type": "application/x-ndjson" }, - }); + return { + response: new Response(`${payload}\n`, { + status: 200, + headers: { "Content-Type": "application/x-ndjson" }, + }), + release: vi.fn(async () => undefined), + }; }); - globalThis.fetch = fetchMock as unknown as typeof fetch; - try { - await run(fetchMock); - } finally { - globalThis.fetch = originalFetch; - } + await run(fetchWithSsrFGuardMock); } function createControlledNdjsonFetch(): { - fetchMock: ReturnType; + fetchImpl: () => Promise<{ response: Response; release: () => Promise }>; pushLine: (line: string) => void; close: () => void; } { @@ -427,11 +455,12 @@ function createControlledNdjsonFetch(): { }, }); return { - fetchMock: vi.fn(async () => { - return new Response(body, { + fetchImpl: async () => ({ + response: new Response(body, { status: 200, headers: { "Content-Type": "application/x-ndjson" }, - }); + }), + release: vi.fn(async () => undefined), }), pushLine(line: string) { if (!controller) { @@ -448,6 +477,10 @@ function createControlledNdjsonFetch(): { }; } +function getGuardedFetchCall(fetchMock: typeof fetchWithSsrFGuardMock): GuardedFetchCall { + return (fetchMock.mock.calls[0]?.[0] as GuardedFetchCall | undefined) ?? { url: "" }; +} + async function createOllamaTestStream(params: { baseUrl: string; defaultHeaders?: Record; @@ -587,9 +620,8 @@ describe("createOllamaStreamFn streaming events", () => { }); it("emits text_end as soon as Ollama switches from text to tool calls", async () => { - const originalFetch = globalThis.fetch; const controlledFetch = createControlledNdjsonFetch(); - globalThis.fetch = controlledFetch.fetchMock as unknown as typeof fetch; + fetchWithSsrFGuardMock.mockImplementation(controlledFetch.fetchImpl); try { const stream = await createOllamaTestStream({ baseUrl: "http://ollama-host:11434" }); @@ -651,7 +683,7 @@ describe("createOllamaStreamFn streaming events", () => { expect(doneEvent).toMatchObject({ value: undefined, done: true }); } } finally { - globalThis.fetch = originalFetch; + fetchWithSsrFGuardMock.mockReset(); } }); @@ -713,8 +745,10 @@ describe("createOllamaStreamFn", () => { expect(events.at(-1)?.type).toBe("done"); expect(fetchMock).toHaveBeenCalledTimes(1); - const [url, requestInit] = fetchMock.mock.calls[0] as unknown as [string, RequestInit]; - expect(url).toBe("http://ollama-host:11434/api/chat"); + const request = getGuardedFetchCall(fetchMock); + expect(request.url).toBe("http://ollama-host:11434/api/chat"); + expect(request.auditContext).toBe("ollama-stream.chat"); + const requestInit = request.init ?? {}; expect(requestInit.signal).toBe(signal); if (typeof requestInit.body !== "string") { throw new Error("Expected string request body"); @@ -729,6 +763,28 @@ describe("createOllamaStreamFn", () => { ); }); + it("uses the default loopback policy when baseUrl is empty", async () => { + await withMockNdjsonFetch( + [ + '{"model":"m","created_at":"t","message":{"role":"assistant","content":"ok"},"done":false}', + '{"model":"m","created_at":"t","message":{"role":"assistant","content":""},"done":true,"prompt_eval_count":1,"eval_count":1}', + ], + async (fetchMock) => { + const stream = await createOllamaTestStream({ baseUrl: "" }); + + const events = await collectStreamEvents(stream); + expect(events.at(-1)?.type).toBe("done"); + + const request = getGuardedFetchCall(fetchMock); + expect(request.url).toBe("http://127.0.0.1:11434/api/chat"); + expect(request.policy).toMatchObject({ + hostnameAllowlist: ["127.0.0.1"], + allowPrivateNetwork: true, + }); + }, + ); + }); + it("merges default headers and allows request headers to override them", async () => { await withMockNdjsonFetch( [ @@ -753,7 +809,7 @@ describe("createOllamaStreamFn", () => { const events = await collectStreamEvents(stream); expect(events.at(-1)?.type).toBe("done"); - const [, requestInit] = fetchMock.mock.calls[0] as unknown as [string, RequestInit]; + const requestInit = getGuardedFetchCall(fetchMock).init ?? {}; expect(requestInit.headers).toMatchObject({ "Content-Type": "application/json", "X-OLLAMA-KEY": "provider-secret", @@ -785,7 +841,7 @@ describe("createOllamaStreamFn", () => { }); await collectStreamEvents(stream); - const [, requestInit] = fetchMock.mock.calls[0] as unknown as [string, RequestInit]; + const requestInit = getGuardedFetchCall(fetchMock).init ?? {}; expect(requestInit.headers).toMatchObject({ Authorization: "Bearer proxy-token", }); @@ -821,7 +877,7 @@ describe("createOllamaStreamFn", () => { ); await collectStreamEvents(stream); - const [, requestInit] = fetchMock.mock.calls[0] as unknown as [string, RequestInit]; + const requestInit = getGuardedFetchCall(fetchMock).init ?? {}; expect(requestInit.headers).toMatchObject({ Authorization: "Bearer real-token", }); @@ -830,14 +886,13 @@ describe("createOllamaStreamFn", () => { }); it("surfaces non-2xx HTTP response as status-prefixed error", async () => { - const originalFetch = globalThis.fetch; - const fetchMock = vi.fn(async () => { - return new Response("Service Unavailable", { + fetchWithSsrFGuardMock.mockResolvedValue({ + response: new Response("Service Unavailable", { status: 503, statusText: "Service Unavailable", - }); + }), + release: vi.fn(async () => undefined), }); - globalThis.fetch = fetchMock as unknown as typeof fetch; try { const stream = await createOllamaTestStream({ baseUrl: "http://ollama-host:11434" }); const events = await collectStreamEvents(stream); @@ -850,7 +905,7 @@ describe("createOllamaStreamFn", () => { // extractLeadingHttpStatus can parse it for failover/retry logic. expect(errorEvent!.error.errorMessage).toMatch(/^503\b/); } finally { - globalThis.fetch = originalFetch; + fetchWithSsrFGuardMock.mockReset(); } }); @@ -962,8 +1017,9 @@ describe("createConfiguredOllamaStreamFn", () => { ); await collectStreamEvents(stream); - const [url, requestInit] = fetchMock.mock.calls[0] as unknown as [string, RequestInit]; - expect(url).toBe("http://provider-host:11434/api/chat"); + const request = getGuardedFetchCall(fetchMock); + expect(request.url).toBe("http://provider-host:11434/api/chat"); + const requestInit = request.init ?? {}; expect(requestInit.headers).toMatchObject({ Authorization: "Bearer proxy-token", }); diff --git a/extensions/ollama/src/stream.test.ts b/extensions/ollama/src/stream.test.ts index b3d57e9bb49..c717a514d48 100644 --- a/extensions/ollama/src/stream.test.ts +++ b/extensions/ollama/src/stream.test.ts @@ -1,4 +1,13 @@ import { afterEach, describe, expect, it, vi } from "vitest"; + +const { fetchWithSsrFGuardMock } = vi.hoisted(() => ({ + fetchWithSsrFGuardMock: vi.fn(), +})); + +vi.mock("openclaw/plugin-sdk/ssrf-runtime", () => ({ + fetchWithSsrFGuard: fetchWithSsrFGuardMock, +})); + import { buildAssistantMessage, createOllamaStreamFn } from "./stream.js"; function makeOllamaResponse(params: { @@ -79,7 +88,9 @@ describe("buildAssistantMessage", () => { }); describe("createOllamaStreamFn thinking events", () => { - afterEach(() => vi.unstubAllGlobals()); + afterEach(() => { + fetchWithSsrFGuardMock.mockReset(); + }); function makeNdjsonBody(chunks: Array>): ReadableStream { const encoder = new TextEncoder(); @@ -124,11 +135,10 @@ describe("createOllamaStreamFn thinking events", () => { ]; const body = makeNdjsonBody(thinkingChunks); - const fetchMock = vi.fn().mockResolvedValue({ - ok: true, - body, + fetchWithSsrFGuardMock.mockResolvedValue({ + response: new Response(body, { status: 200 }), + release: vi.fn(async () => undefined), }); - vi.stubGlobal("fetch", fetchMock); const streamFn = createOllamaStreamFn("http://localhost:11434"); const stream = streamFn( @@ -151,28 +161,23 @@ describe("createOllamaStreamFn thinking events", () => { expect(eventTypes).toContain("text_delta"); expect(eventTypes).toContain("done"); - // thinking_start comes before text_start const thinkingStartIndex = eventTypes.indexOf("thinking_start"); const textStartIndex = eventTypes.indexOf("text_start"); expect(thinkingStartIndex).toBeLessThan(textStartIndex); - // thinking_end comes before text_start const thinkingEndIndex = eventTypes.indexOf("thinking_end"); expect(thinkingEndIndex).toBeLessThan(textStartIndex); - // Thinking deltas have correct content const thinkingDeltas = events.filter((e) => e.type === "thinking_delta"); expect(thinkingDeltas).toHaveLength(2); expect(thinkingDeltas[0].delta).toBe("Step 1"); expect(thinkingDeltas[1].delta).toBe(" and step 2"); - // Content index: thinking at 0, text at 1 const thinkingStart = events.find((e) => e.type === "thinking_start"); expect(thinkingStart?.contentIndex).toBe(0); const textStart = events.find((e) => e.type === "text_start"); expect(textStart?.contentIndex).toBe(1); - // Final message has thinking block const done = events.find((e) => e.type === "done") as { message?: { content: unknown[] } }; const content = done?.message?.content ?? []; expect(content[0]).toMatchObject({ type: "thinking", thinking: "Step 1 and step 2" }); @@ -199,7 +204,10 @@ describe("createOllamaStreamFn thinking events", () => { ]; const body = makeNdjsonBody(chunks); - vi.stubGlobal("fetch", vi.fn().mockResolvedValue({ ok: true, body })); + fetchWithSsrFGuardMock.mockResolvedValue({ + response: new Response(body, { status: 200 }), + release: vi.fn(async () => undefined), + }); const streamFn = createOllamaStreamFn("http://localhost:11434"); const stream = streamFn( @@ -221,7 +229,6 @@ describe("createOllamaStreamFn thinking events", () => { expect(eventTypes).toContain("text_delta"); expect(eventTypes).toContain("done"); - // Text content index should be 0 (no thinking block) const textStart = events.find((e) => e.type === "text_start") as { contentIndex?: number }; expect(textStart?.contentIndex).toBe(0); }); diff --git a/extensions/ollama/src/stream.ts b/extensions/ollama/src/stream.ts index 49eaa1d7667..e0d36013362 100644 --- a/extensions/ollama/src/stream.ts +++ b/extensions/ollama/src/stream.ts @@ -27,12 +27,14 @@ import { streamWithPayloadPatch, } from "openclaw/plugin-sdk/provider-stream-shared"; import { createSubsystemLogger } from "openclaw/plugin-sdk/runtime-env"; +import { fetchWithSsrFGuard } from "openclaw/plugin-sdk/ssrf-runtime"; import { normalizeLowercaseStringOrEmpty, readStringValue } from "openclaw/plugin-sdk/text-runtime"; import { OLLAMA_DEFAULT_BASE_URL } from "./defaults.js"; import { parseJsonObjectPreservingUnsafeIntegers, parseJsonPreservingUnsafeIntegers, } from "./ollama-json.js"; +import { buildOllamaBaseUrlSsrFPolicy } from "./provider-models.js"; const log = createSubsystemLogger("ollama-stream"); @@ -220,6 +222,11 @@ export function createConfiguredOllamaCompatStreamWrapper( // Ollama compat wrapper now owns more than num_ctx injection. export const createConfiguredOllamaCompatNumCtxWrapper = createConfiguredOllamaCompatStreamWrapper; +function normalizeOllamaWireModelId(modelId: string): string { + const trimmed = modelId.trim(); + return trimmed.startsWith("ollama/") ? trimmed.slice("ollama/".length) : trimmed; +} + export function buildOllamaChatRequest(params: { modelId: string; messages: OllamaChatMessage[]; @@ -228,7 +235,7 @@ export function buildOllamaChatRequest(params: { stream?: boolean; }): OllamaChatRequest { return { - model: params.modelId, + model: normalizeOllamaWireModelId(params.modelId), messages: params.messages, stream: params.stream ?? true, ...(params.tools && params.tools.length > 0 ? { tools: params.tools } : {}), @@ -606,6 +613,7 @@ export function createOllamaStreamFn( defaultHeaders?: Record, ): StreamFn { const chatUrl = resolveOllamaChatUrl(baseUrl); + const ssrfPolicy = buildOllamaBaseUrlSsrFPolicy(chatUrl); return (model, context, options) => { const stream = createAssistantMessageEventStream(); @@ -646,114 +654,59 @@ export function createOllamaStreamFn( headers.Authorization = `Bearer ${options.apiKey}`; } - const response = await fetch(chatUrl, { - method: "POST", - headers, - body: JSON.stringify(body), - signal: options?.signal, + const { response, release } = await fetchWithSsrFGuard({ + url: chatUrl, + init: { + method: "POST", + headers, + body: JSON.stringify(body), + signal: options?.signal, + }, + policy: ssrfPolicy, + auditContext: "ollama-stream.chat", }); - if (!response.ok) { - const errorText = await response.text().catch(() => "unknown error"); - throw new Error(`${response.status} ${errorText}`); - } - if (!response.body) { - throw new Error("Ollama API returned empty response body"); - } - - const reader = response.body.getReader(); - let accumulatedContent = ""; - let accumulatedThinking = ""; - const accumulatedToolCalls: OllamaToolCall[] = []; - let finalResponse: OllamaChatResponse | undefined; - const modelInfo = { api: model.api, provider: model.provider, id: model.id }; - let streamStarted = false; - let thinkingStarted = false; - let thinkingEnded = false; - let textBlockStarted = false; - let textBlockClosed = false; - - // Content index tracking: thinking block (if present) is index 0, - // text block follows at index 1 (or 0 when no thinking). - const textContentIndex = () => (thinkingStarted ? 1 : 0); - - const buildCurrentContent = (): (TextContent | ThinkingContent | ToolCall)[] => { - const parts: (TextContent | ThinkingContent | ToolCall)[] = []; - if (accumulatedThinking) { - parts.push({ - type: "thinking", - thinking: accumulatedThinking, - }); + try { + if (!response.ok) { + const errorText = await response.text().catch(() => "unknown error"); + throw new Error(`${response.status} ${errorText}`); } - if (accumulatedContent) { - parts.push({ type: "text", text: accumulatedContent }); + if (!response.body) { + throw new Error("Ollama API returned empty response body"); } - return parts; - }; - const closeThinkingBlock = () => { - if (!thinkingStarted || thinkingEnded) { - return; - } - thinkingEnded = true; - const partial = buildStreamAssistantMessage({ - model: modelInfo, - content: buildCurrentContent(), - stopReason: "stop", - usage: buildUsageWithNoCost({}), - }); - stream.push({ - type: "thinking_end", - contentIndex: 0, - content: accumulatedThinking, - partial, - }); - }; + const reader = response.body.getReader(); + let accumulatedContent = ""; + let accumulatedThinking = ""; + const accumulatedToolCalls: OllamaToolCall[] = []; + let finalResponse: OllamaChatResponse | undefined; + const modelInfo = { api: model.api, provider: model.provider, id: model.id }; + let streamStarted = false; + let thinkingStarted = false; + let thinkingEnded = false; + let textBlockStarted = false; + let textBlockClosed = false; + const textContentIndex = () => (thinkingStarted ? 1 : 0); - const closeTextBlock = () => { - if (!textBlockStarted || textBlockClosed) { - return; - } - textBlockClosed = true; - const partial = buildStreamAssistantMessage({ - model: modelInfo, - content: buildCurrentContent(), - stopReason: "stop", - usage: buildUsageWithNoCost({}), - }); - stream.push({ - type: "text_end", - contentIndex: textContentIndex(), - content: accumulatedContent, - partial, - }); - }; - - for await (const chunk of parseNdjsonStream(reader)) { - // Handle thinking/reasoning deltas from Ollama's native think mode. - const thinkingDelta = chunk.message?.thinking ?? chunk.message?.reasoning; - if (thinkingDelta) { - if (!streamStarted) { - streamStarted = true; - const emptyPartial = buildStreamAssistantMessage({ - model: modelInfo, - content: [], - stopReason: "stop", - usage: buildUsageWithNoCost({}), + const buildCurrentContent = (): (TextContent | ThinkingContent | ToolCall)[] => { + const parts: (TextContent | ThinkingContent | ToolCall)[] = []; + if (accumulatedThinking) { + parts.push({ + type: "thinking", + thinking: accumulatedThinking, }); - stream.push({ type: "start", partial: emptyPartial }); } - if (!thinkingStarted) { - thinkingStarted = true; - const partial = buildStreamAssistantMessage({ - model: modelInfo, - content: buildCurrentContent(), - stopReason: "stop", - usage: buildUsageWithNoCost({}), - }); - stream.push({ type: "thinking_start", contentIndex: 0, partial }); + if (accumulatedContent) { + parts.push({ type: "text", text: accumulatedContent }); } - accumulatedThinking += thinkingDelta; + return parts; + }; + + const closeThinkingBlock = () => { + if (!thinkingStarted || thinkingEnded) { + return; + } + thinkingEnded = true; const partial = buildStreamAssistantMessage({ model: modelInfo, content: buildCurrentContent(), @@ -761,85 +714,146 @@ export function createOllamaStreamFn( usage: buildUsageWithNoCost({}), }); stream.push({ - type: "thinking_delta", + type: "thinking_end", contentIndex: 0, - delta: thinkingDelta, + content: accumulatedThinking, partial, }); - } + }; - if (chunk.message?.content) { - const delta = chunk.message.content; - - // Transition from thinking to text: close the thinking block first. - if (thinkingStarted && !thinkingEnded) { - closeThinkingBlock(); + const closeTextBlock = () => { + if (!textBlockStarted || textBlockClosed) { + return; } - - if (!streamStarted) { - streamStarted = true; - const emptyPartial = buildStreamAssistantMessage({ - model: modelInfo, - content: [], - stopReason: "stop", - usage: buildUsageWithNoCost({}), - }); - stream.push({ type: "start", partial: emptyPartial }); - } - if (!textBlockStarted) { - textBlockStarted = true; - const partial = buildStreamAssistantMessage({ - model: modelInfo, - content: buildCurrentContent(), - stopReason: "stop", - usage: buildUsageWithNoCost({}), - }); - stream.push({ type: "text_start", contentIndex: textContentIndex(), partial }); - } - - accumulatedContent += delta; + textBlockClosed = true; const partial = buildStreamAssistantMessage({ model: modelInfo, content: buildCurrentContent(), stopReason: "stop", usage: buildUsageWithNoCost({}), }); - stream.push({ type: "text_delta", contentIndex: textContentIndex(), delta, partial }); + stream.push({ + type: "text_end", + contentIndex: textContentIndex(), + content: accumulatedContent, + partial, + }); + }; + + for await (const chunk of parseNdjsonStream(reader)) { + const thinkingDelta = chunk.message?.thinking ?? chunk.message?.reasoning; + if (thinkingDelta) { + if (!streamStarted) { + streamStarted = true; + const emptyPartial = buildStreamAssistantMessage({ + model: modelInfo, + content: [], + stopReason: "stop", + usage: buildUsageWithNoCost({}), + }); + stream.push({ type: "start", partial: emptyPartial }); + } + if (!thinkingStarted) { + thinkingStarted = true; + const partial = buildStreamAssistantMessage({ + model: modelInfo, + content: buildCurrentContent(), + stopReason: "stop", + usage: buildUsageWithNoCost({}), + }); + stream.push({ type: "thinking_start", contentIndex: 0, partial }); + } + accumulatedThinking += thinkingDelta; + const partial = buildStreamAssistantMessage({ + model: modelInfo, + content: buildCurrentContent(), + stopReason: "stop", + usage: buildUsageWithNoCost({}), + }); + stream.push({ + type: "thinking_delta", + contentIndex: 0, + delta: thinkingDelta, + partial, + }); + } + + if (chunk.message?.content) { + const delta = chunk.message.content; + if (thinkingStarted && !thinkingEnded) { + closeThinkingBlock(); + } + + if (!streamStarted) { + streamStarted = true; + const emptyPartial = buildStreamAssistantMessage({ + model: modelInfo, + content: [], + stopReason: "stop", + usage: buildUsageWithNoCost({}), + }); + stream.push({ type: "start", partial: emptyPartial }); + } + if (!textBlockStarted) { + textBlockStarted = true; + const partial = buildStreamAssistantMessage({ + model: modelInfo, + content: buildCurrentContent(), + stopReason: "stop", + usage: buildUsageWithNoCost({}), + }); + stream.push({ type: "text_start", contentIndex: textContentIndex(), partial }); + } + + accumulatedContent += delta; + const partial = buildStreamAssistantMessage({ + model: modelInfo, + content: buildCurrentContent(), + stopReason: "stop", + usage: buildUsageWithNoCost({}), + }); + stream.push({ + type: "text_delta", + contentIndex: textContentIndex(), + delta, + partial, + }); + } + if (chunk.message?.tool_calls) { + closeThinkingBlock(); + closeTextBlock(); + accumulatedToolCalls.push(...chunk.message.tool_calls); + } + if (chunk.done) { + finalResponse = chunk; + break; + } } - if (chunk.message?.tool_calls) { - closeThinkingBlock(); - closeTextBlock(); - accumulatedToolCalls.push(...chunk.message.tool_calls); + + if (!finalResponse) { + throw new Error("Ollama API stream ended without a final response"); } - if (chunk.done) { - finalResponse = chunk; - break; + + finalResponse.message.content = accumulatedContent; + if (accumulatedThinking) { + finalResponse.message.thinking = accumulatedThinking; } + if (accumulatedToolCalls.length > 0) { + finalResponse.message.tool_calls = accumulatedToolCalls; + } + + const assistantMessage = buildAssistantMessage(finalResponse, modelInfo); + closeThinkingBlock(); + closeTextBlock(); + + stream.push({ + type: "done", + reason: assistantMessage.stopReason === "toolUse" ? "toolUse" : "stop", + message: assistantMessage, + }); + } finally { + await release(); } - - if (!finalResponse) { - throw new Error("Ollama API stream ended without a final response"); - } - - finalResponse.message.content = accumulatedContent; - if (accumulatedThinking) { - finalResponse.message.thinking = accumulatedThinking; - } - if (accumulatedToolCalls.length > 0) { - finalResponse.message.tool_calls = accumulatedToolCalls; - } - - const assistantMessage = buildAssistantMessage(finalResponse, modelInfo); - - // Close any open blocks before emitting the done event. - closeThinkingBlock(); - closeTextBlock(); - - stream.push({ - type: "done", - reason: assistantMessage.stopReason === "toolUse" ? "toolUse" : "stop", - message: assistantMessage, - }); } catch (err) { stream.push({ type: "error",