mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 11:10:45 +00:00
fix: restore Ollama chat model IDs (#67457) (thanks @suboss87)
* fix(ollama): strip provider prefix from model ID in chat requests buildOllamaChatRequest passed params.modelId directly to the Ollama API without stripping the "ollama/" provider prefix. The embedding provider already handles this (normalizeEmbeddingModel at line 100), but the chat stream path did not. When setup writes the primary model as "ollama/<model>" or the model ID flows through without normalization, the Ollama API rejects it with a 404. Closes #67435 * ollama: guard chat fetch and streamline tests * fix: restore Ollama chat model IDs (#67457) (thanks @suboss87) * fix: preserve Ollama default chat fallback (#67457) (thanks @suboss87) --------- Co-authored-by: Ayaan Zaidi <hi@obviy.us>
This commit is contained in:
@@ -28,6 +28,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Discord/tool-call text: strip standalone Gemma-style `<function>...</function>` tool-call payloads from visible assistant text without truncating prose examples or trailing replies. (#67318) Thanks @joelnishanth.
|
||||
- WhatsApp/web-session: drain the pending per-auth creds save queue before reopening sockets so reconnect-time auth bootstrap no longer races in-flight `creds.json` writes and falsely restores from backup. (#67464) Thanks @neeravmakwana.
|
||||
- BlueBubbles/catchup: add a per-message retry ceiling (`catchup.maxFailureRetries`, default 10) so a persistently-failing message with a malformed payload no longer wedges the catchup cursor forever. After N consecutive `processMessage` failures against the same GUID, catchup logs a WARN, skips that message on subsequent sweeps, and lets the cursor advance past it. Transient failures still retry from the same point as before. Also fixes a lost-update race in the persistent dedupe file lock that silently dropped inbound GUIDs on concurrent writes, a dedupe file naming migration gap on version upgrade, and a balloon-event bypass that let catchup replay debouncer-coalesced events as standalone messages. (#67426, #66870) Thanks @omarshahine.
|
||||
- Ollama/chat: strip the `ollama/` provider prefix from Ollama chat request model ids so configured refs like `ollama/qwen3:14b-q8_0` stop 404ing against the Ollama API. (#67457) Thanks @suboss87.
|
||||
|
||||
## 2026.4.15-beta.1
|
||||
|
||||
|
||||
@@ -1,4 +1,13 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
const { fetchWithSsrFGuardMock } = vi.hoisted(() => ({
|
||||
fetchWithSsrFGuardMock: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("openclaw/plugin-sdk/ssrf-runtime", () => ({
|
||||
fetchWithSsrFGuard: fetchWithSsrFGuardMock,
|
||||
}));
|
||||
|
||||
import {
|
||||
buildOllamaChatRequest,
|
||||
createConfiguredOllamaStreamFn,
|
||||
@@ -9,6 +18,17 @@ import {
|
||||
resolveOllamaBaseUrlForRun,
|
||||
} from "./stream.js";
|
||||
|
||||
type GuardedFetchCall = {
|
||||
url: string;
|
||||
init?: RequestInit;
|
||||
policy?: unknown;
|
||||
auditContext?: string;
|
||||
};
|
||||
|
||||
afterEach(() => {
|
||||
fetchWithSsrFGuardMock.mockReset();
|
||||
});
|
||||
|
||||
describe("buildOllamaChatRequest", () => {
|
||||
it("omits tools when none are provided", () => {
|
||||
expect(
|
||||
@@ -24,6 +44,17 @@ describe("buildOllamaChatRequest", () => {
|
||||
options: { num_ctx: 65536 },
|
||||
});
|
||||
});
|
||||
|
||||
it("strips the ollama/ prefix from chat model ids", () => {
|
||||
expect(
|
||||
buildOllamaChatRequest({
|
||||
modelId: "ollama/qwen3:14b-q8_0",
|
||||
messages: [{ role: "user", content: "hello" }],
|
||||
}),
|
||||
).toMatchObject({
|
||||
model: "qwen3:14b-q8_0",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("convertToOllamaMessages", () => {
|
||||
@@ -396,26 +427,23 @@ describe("parseNdjsonStream", () => {
|
||||
|
||||
async function withMockNdjsonFetch(
|
||||
lines: string[],
|
||||
run: (fetchMock: ReturnType<typeof vi.fn>) => Promise<void>,
|
||||
run: (fetchMock: typeof fetchWithSsrFGuardMock) => Promise<void>,
|
||||
): Promise<void> {
|
||||
const originalFetch = globalThis.fetch;
|
||||
const fetchMock = vi.fn(async () => {
|
||||
fetchWithSsrFGuardMock.mockImplementation(async () => {
|
||||
const payload = lines.join("\n");
|
||||
return new Response(`${payload}\n`, {
|
||||
status: 200,
|
||||
headers: { "Content-Type": "application/x-ndjson" },
|
||||
});
|
||||
return {
|
||||
response: new Response(`${payload}\n`, {
|
||||
status: 200,
|
||||
headers: { "Content-Type": "application/x-ndjson" },
|
||||
}),
|
||||
release: vi.fn(async () => undefined),
|
||||
};
|
||||
});
|
||||
globalThis.fetch = fetchMock as unknown as typeof fetch;
|
||||
try {
|
||||
await run(fetchMock);
|
||||
} finally {
|
||||
globalThis.fetch = originalFetch;
|
||||
}
|
||||
await run(fetchWithSsrFGuardMock);
|
||||
}
|
||||
|
||||
function createControlledNdjsonFetch(): {
|
||||
fetchMock: ReturnType<typeof vi.fn>;
|
||||
fetchImpl: () => Promise<{ response: Response; release: () => Promise<void> }>;
|
||||
pushLine: (line: string) => void;
|
||||
close: () => void;
|
||||
} {
|
||||
@@ -427,11 +455,12 @@ function createControlledNdjsonFetch(): {
|
||||
},
|
||||
});
|
||||
return {
|
||||
fetchMock: vi.fn(async () => {
|
||||
return new Response(body, {
|
||||
fetchImpl: async () => ({
|
||||
response: new Response(body, {
|
||||
status: 200,
|
||||
headers: { "Content-Type": "application/x-ndjson" },
|
||||
});
|
||||
}),
|
||||
release: vi.fn(async () => undefined),
|
||||
}),
|
||||
pushLine(line: string) {
|
||||
if (!controller) {
|
||||
@@ -448,6 +477,10 @@ function createControlledNdjsonFetch(): {
|
||||
};
|
||||
}
|
||||
|
||||
function getGuardedFetchCall(fetchMock: typeof fetchWithSsrFGuardMock): GuardedFetchCall {
|
||||
return (fetchMock.mock.calls[0]?.[0] as GuardedFetchCall | undefined) ?? { url: "" };
|
||||
}
|
||||
|
||||
async function createOllamaTestStream(params: {
|
||||
baseUrl: string;
|
||||
defaultHeaders?: Record<string, string>;
|
||||
@@ -587,9 +620,8 @@ describe("createOllamaStreamFn streaming events", () => {
|
||||
});
|
||||
|
||||
it("emits text_end as soon as Ollama switches from text to tool calls", async () => {
|
||||
const originalFetch = globalThis.fetch;
|
||||
const controlledFetch = createControlledNdjsonFetch();
|
||||
globalThis.fetch = controlledFetch.fetchMock as unknown as typeof fetch;
|
||||
fetchWithSsrFGuardMock.mockImplementation(controlledFetch.fetchImpl);
|
||||
|
||||
try {
|
||||
const stream = await createOllamaTestStream({ baseUrl: "http://ollama-host:11434" });
|
||||
@@ -651,7 +683,7 @@ describe("createOllamaStreamFn streaming events", () => {
|
||||
expect(doneEvent).toMatchObject({ value: undefined, done: true });
|
||||
}
|
||||
} finally {
|
||||
globalThis.fetch = originalFetch;
|
||||
fetchWithSsrFGuardMock.mockReset();
|
||||
}
|
||||
});
|
||||
|
||||
@@ -713,8 +745,10 @@ describe("createOllamaStreamFn", () => {
|
||||
expect(events.at(-1)?.type).toBe("done");
|
||||
|
||||
expect(fetchMock).toHaveBeenCalledTimes(1);
|
||||
const [url, requestInit] = fetchMock.mock.calls[0] as unknown as [string, RequestInit];
|
||||
expect(url).toBe("http://ollama-host:11434/api/chat");
|
||||
const request = getGuardedFetchCall(fetchMock);
|
||||
expect(request.url).toBe("http://ollama-host:11434/api/chat");
|
||||
expect(request.auditContext).toBe("ollama-stream.chat");
|
||||
const requestInit = request.init ?? {};
|
||||
expect(requestInit.signal).toBe(signal);
|
||||
if (typeof requestInit.body !== "string") {
|
||||
throw new Error("Expected string request body");
|
||||
@@ -729,6 +763,28 @@ describe("createOllamaStreamFn", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("uses the default loopback policy when baseUrl is empty", async () => {
|
||||
await withMockNdjsonFetch(
|
||||
[
|
||||
'{"model":"m","created_at":"t","message":{"role":"assistant","content":"ok"},"done":false}',
|
||||
'{"model":"m","created_at":"t","message":{"role":"assistant","content":""},"done":true,"prompt_eval_count":1,"eval_count":1}',
|
||||
],
|
||||
async (fetchMock) => {
|
||||
const stream = await createOllamaTestStream({ baseUrl: "" });
|
||||
|
||||
const events = await collectStreamEvents(stream);
|
||||
expect(events.at(-1)?.type).toBe("done");
|
||||
|
||||
const request = getGuardedFetchCall(fetchMock);
|
||||
expect(request.url).toBe("http://127.0.0.1:11434/api/chat");
|
||||
expect(request.policy).toMatchObject({
|
||||
hostnameAllowlist: ["127.0.0.1"],
|
||||
allowPrivateNetwork: true,
|
||||
});
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
it("merges default headers and allows request headers to override them", async () => {
|
||||
await withMockNdjsonFetch(
|
||||
[
|
||||
@@ -753,7 +809,7 @@ describe("createOllamaStreamFn", () => {
|
||||
const events = await collectStreamEvents(stream);
|
||||
expect(events.at(-1)?.type).toBe("done");
|
||||
|
||||
const [, requestInit] = fetchMock.mock.calls[0] as unknown as [string, RequestInit];
|
||||
const requestInit = getGuardedFetchCall(fetchMock).init ?? {};
|
||||
expect(requestInit.headers).toMatchObject({
|
||||
"Content-Type": "application/json",
|
||||
"X-OLLAMA-KEY": "provider-secret",
|
||||
@@ -785,7 +841,7 @@ describe("createOllamaStreamFn", () => {
|
||||
});
|
||||
|
||||
await collectStreamEvents(stream);
|
||||
const [, requestInit] = fetchMock.mock.calls[0] as unknown as [string, RequestInit];
|
||||
const requestInit = getGuardedFetchCall(fetchMock).init ?? {};
|
||||
expect(requestInit.headers).toMatchObject({
|
||||
Authorization: "Bearer proxy-token",
|
||||
});
|
||||
@@ -821,7 +877,7 @@ describe("createOllamaStreamFn", () => {
|
||||
);
|
||||
|
||||
await collectStreamEvents(stream);
|
||||
const [, requestInit] = fetchMock.mock.calls[0] as unknown as [string, RequestInit];
|
||||
const requestInit = getGuardedFetchCall(fetchMock).init ?? {};
|
||||
expect(requestInit.headers).toMatchObject({
|
||||
Authorization: "Bearer real-token",
|
||||
});
|
||||
@@ -830,14 +886,13 @@ describe("createOllamaStreamFn", () => {
|
||||
});
|
||||
|
||||
it("surfaces non-2xx HTTP response as status-prefixed error", async () => {
|
||||
const originalFetch = globalThis.fetch;
|
||||
const fetchMock = vi.fn(async () => {
|
||||
return new Response("Service Unavailable", {
|
||||
fetchWithSsrFGuardMock.mockResolvedValue({
|
||||
response: new Response("Service Unavailable", {
|
||||
status: 503,
|
||||
statusText: "Service Unavailable",
|
||||
});
|
||||
}),
|
||||
release: vi.fn(async () => undefined),
|
||||
});
|
||||
globalThis.fetch = fetchMock as unknown as typeof fetch;
|
||||
try {
|
||||
const stream = await createOllamaTestStream({ baseUrl: "http://ollama-host:11434" });
|
||||
const events = await collectStreamEvents(stream);
|
||||
@@ -850,7 +905,7 @@ describe("createOllamaStreamFn", () => {
|
||||
// extractLeadingHttpStatus can parse it for failover/retry logic.
|
||||
expect(errorEvent!.error.errorMessage).toMatch(/^503\b/);
|
||||
} finally {
|
||||
globalThis.fetch = originalFetch;
|
||||
fetchWithSsrFGuardMock.mockReset();
|
||||
}
|
||||
});
|
||||
|
||||
@@ -962,8 +1017,9 @@ describe("createConfiguredOllamaStreamFn", () => {
|
||||
);
|
||||
|
||||
await collectStreamEvents(stream);
|
||||
const [url, requestInit] = fetchMock.mock.calls[0] as unknown as [string, RequestInit];
|
||||
expect(url).toBe("http://provider-host:11434/api/chat");
|
||||
const request = getGuardedFetchCall(fetchMock);
|
||||
expect(request.url).toBe("http://provider-host:11434/api/chat");
|
||||
const requestInit = request.init ?? {};
|
||||
expect(requestInit.headers).toMatchObject({
|
||||
Authorization: "Bearer proxy-token",
|
||||
});
|
||||
|
||||
@@ -1,4 +1,13 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
const { fetchWithSsrFGuardMock } = vi.hoisted(() => ({
|
||||
fetchWithSsrFGuardMock: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("openclaw/plugin-sdk/ssrf-runtime", () => ({
|
||||
fetchWithSsrFGuard: fetchWithSsrFGuardMock,
|
||||
}));
|
||||
|
||||
import { buildAssistantMessage, createOllamaStreamFn } from "./stream.js";
|
||||
|
||||
function makeOllamaResponse(params: {
|
||||
@@ -79,7 +88,9 @@ describe("buildAssistantMessage", () => {
|
||||
});
|
||||
|
||||
describe("createOllamaStreamFn thinking events", () => {
|
||||
afterEach(() => vi.unstubAllGlobals());
|
||||
afterEach(() => {
|
||||
fetchWithSsrFGuardMock.mockReset();
|
||||
});
|
||||
|
||||
function makeNdjsonBody(chunks: Array<Record<string, unknown>>): ReadableStream<Uint8Array> {
|
||||
const encoder = new TextEncoder();
|
||||
@@ -124,11 +135,10 @@ describe("createOllamaStreamFn thinking events", () => {
|
||||
];
|
||||
|
||||
const body = makeNdjsonBody(thinkingChunks);
|
||||
const fetchMock = vi.fn().mockResolvedValue({
|
||||
ok: true,
|
||||
body,
|
||||
fetchWithSsrFGuardMock.mockResolvedValue({
|
||||
response: new Response(body, { status: 200 }),
|
||||
release: vi.fn(async () => undefined),
|
||||
});
|
||||
vi.stubGlobal("fetch", fetchMock);
|
||||
|
||||
const streamFn = createOllamaStreamFn("http://localhost:11434");
|
||||
const stream = streamFn(
|
||||
@@ -151,28 +161,23 @@ describe("createOllamaStreamFn thinking events", () => {
|
||||
expect(eventTypes).toContain("text_delta");
|
||||
expect(eventTypes).toContain("done");
|
||||
|
||||
// thinking_start comes before text_start
|
||||
const thinkingStartIndex = eventTypes.indexOf("thinking_start");
|
||||
const textStartIndex = eventTypes.indexOf("text_start");
|
||||
expect(thinkingStartIndex).toBeLessThan(textStartIndex);
|
||||
|
||||
// thinking_end comes before text_start
|
||||
const thinkingEndIndex = eventTypes.indexOf("thinking_end");
|
||||
expect(thinkingEndIndex).toBeLessThan(textStartIndex);
|
||||
|
||||
// Thinking deltas have correct content
|
||||
const thinkingDeltas = events.filter((e) => e.type === "thinking_delta");
|
||||
expect(thinkingDeltas).toHaveLength(2);
|
||||
expect(thinkingDeltas[0].delta).toBe("Step 1");
|
||||
expect(thinkingDeltas[1].delta).toBe(" and step 2");
|
||||
|
||||
// Content index: thinking at 0, text at 1
|
||||
const thinkingStart = events.find((e) => e.type === "thinking_start");
|
||||
expect(thinkingStart?.contentIndex).toBe(0);
|
||||
const textStart = events.find((e) => e.type === "text_start");
|
||||
expect(textStart?.contentIndex).toBe(1);
|
||||
|
||||
// Final message has thinking block
|
||||
const done = events.find((e) => e.type === "done") as { message?: { content: unknown[] } };
|
||||
const content = done?.message?.content ?? [];
|
||||
expect(content[0]).toMatchObject({ type: "thinking", thinking: "Step 1 and step 2" });
|
||||
@@ -199,7 +204,10 @@ describe("createOllamaStreamFn thinking events", () => {
|
||||
];
|
||||
|
||||
const body = makeNdjsonBody(chunks);
|
||||
vi.stubGlobal("fetch", vi.fn().mockResolvedValue({ ok: true, body }));
|
||||
fetchWithSsrFGuardMock.mockResolvedValue({
|
||||
response: new Response(body, { status: 200 }),
|
||||
release: vi.fn(async () => undefined),
|
||||
});
|
||||
|
||||
const streamFn = createOllamaStreamFn("http://localhost:11434");
|
||||
const stream = streamFn(
|
||||
@@ -221,7 +229,6 @@ describe("createOllamaStreamFn thinking events", () => {
|
||||
expect(eventTypes).toContain("text_delta");
|
||||
expect(eventTypes).toContain("done");
|
||||
|
||||
// Text content index should be 0 (no thinking block)
|
||||
const textStart = events.find((e) => e.type === "text_start") as { contentIndex?: number };
|
||||
expect(textStart?.contentIndex).toBe(0);
|
||||
});
|
||||
|
||||
@@ -27,12 +27,14 @@ import {
|
||||
streamWithPayloadPatch,
|
||||
} from "openclaw/plugin-sdk/provider-stream-shared";
|
||||
import { createSubsystemLogger } from "openclaw/plugin-sdk/runtime-env";
|
||||
import { fetchWithSsrFGuard } from "openclaw/plugin-sdk/ssrf-runtime";
|
||||
import { normalizeLowercaseStringOrEmpty, readStringValue } from "openclaw/plugin-sdk/text-runtime";
|
||||
import { OLLAMA_DEFAULT_BASE_URL } from "./defaults.js";
|
||||
import {
|
||||
parseJsonObjectPreservingUnsafeIntegers,
|
||||
parseJsonPreservingUnsafeIntegers,
|
||||
} from "./ollama-json.js";
|
||||
import { buildOllamaBaseUrlSsrFPolicy } from "./provider-models.js";
|
||||
|
||||
const log = createSubsystemLogger("ollama-stream");
|
||||
|
||||
@@ -220,6 +222,11 @@ export function createConfiguredOllamaCompatStreamWrapper(
|
||||
// Ollama compat wrapper now owns more than num_ctx injection.
|
||||
export const createConfiguredOllamaCompatNumCtxWrapper = createConfiguredOllamaCompatStreamWrapper;
|
||||
|
||||
function normalizeOllamaWireModelId(modelId: string): string {
|
||||
const trimmed = modelId.trim();
|
||||
return trimmed.startsWith("ollama/") ? trimmed.slice("ollama/".length) : trimmed;
|
||||
}
|
||||
|
||||
export function buildOllamaChatRequest(params: {
|
||||
modelId: string;
|
||||
messages: OllamaChatMessage[];
|
||||
@@ -228,7 +235,7 @@ export function buildOllamaChatRequest(params: {
|
||||
stream?: boolean;
|
||||
}): OllamaChatRequest {
|
||||
return {
|
||||
model: params.modelId,
|
||||
model: normalizeOllamaWireModelId(params.modelId),
|
||||
messages: params.messages,
|
||||
stream: params.stream ?? true,
|
||||
...(params.tools && params.tools.length > 0 ? { tools: params.tools } : {}),
|
||||
@@ -606,6 +613,7 @@ export function createOllamaStreamFn(
|
||||
defaultHeaders?: Record<string, string>,
|
||||
): StreamFn {
|
||||
const chatUrl = resolveOllamaChatUrl(baseUrl);
|
||||
const ssrfPolicy = buildOllamaBaseUrlSsrFPolicy(chatUrl);
|
||||
|
||||
return (model, context, options) => {
|
||||
const stream = createAssistantMessageEventStream();
|
||||
@@ -646,114 +654,59 @@ export function createOllamaStreamFn(
|
||||
headers.Authorization = `Bearer ${options.apiKey}`;
|
||||
}
|
||||
|
||||
const response = await fetch(chatUrl, {
|
||||
method: "POST",
|
||||
headers,
|
||||
body: JSON.stringify(body),
|
||||
signal: options?.signal,
|
||||
const { response, release } = await fetchWithSsrFGuard({
|
||||
url: chatUrl,
|
||||
init: {
|
||||
method: "POST",
|
||||
headers,
|
||||
body: JSON.stringify(body),
|
||||
signal: options?.signal,
|
||||
},
|
||||
policy: ssrfPolicy,
|
||||
auditContext: "ollama-stream.chat",
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
const errorText = await response.text().catch(() => "unknown error");
|
||||
throw new Error(`${response.status} ${errorText}`);
|
||||
}
|
||||
if (!response.body) {
|
||||
throw new Error("Ollama API returned empty response body");
|
||||
}
|
||||
|
||||
const reader = response.body.getReader();
|
||||
let accumulatedContent = "";
|
||||
let accumulatedThinking = "";
|
||||
const accumulatedToolCalls: OllamaToolCall[] = [];
|
||||
let finalResponse: OllamaChatResponse | undefined;
|
||||
const modelInfo = { api: model.api, provider: model.provider, id: model.id };
|
||||
let streamStarted = false;
|
||||
let thinkingStarted = false;
|
||||
let thinkingEnded = false;
|
||||
let textBlockStarted = false;
|
||||
let textBlockClosed = false;
|
||||
|
||||
// Content index tracking: thinking block (if present) is index 0,
|
||||
// text block follows at index 1 (or 0 when no thinking).
|
||||
const textContentIndex = () => (thinkingStarted ? 1 : 0);
|
||||
|
||||
const buildCurrentContent = (): (TextContent | ThinkingContent | ToolCall)[] => {
|
||||
const parts: (TextContent | ThinkingContent | ToolCall)[] = [];
|
||||
if (accumulatedThinking) {
|
||||
parts.push({
|
||||
type: "thinking",
|
||||
thinking: accumulatedThinking,
|
||||
});
|
||||
try {
|
||||
if (!response.ok) {
|
||||
const errorText = await response.text().catch(() => "unknown error");
|
||||
throw new Error(`${response.status} ${errorText}`);
|
||||
}
|
||||
if (accumulatedContent) {
|
||||
parts.push({ type: "text", text: accumulatedContent });
|
||||
if (!response.body) {
|
||||
throw new Error("Ollama API returned empty response body");
|
||||
}
|
||||
return parts;
|
||||
};
|
||||
|
||||
const closeThinkingBlock = () => {
|
||||
if (!thinkingStarted || thinkingEnded) {
|
||||
return;
|
||||
}
|
||||
thinkingEnded = true;
|
||||
const partial = buildStreamAssistantMessage({
|
||||
model: modelInfo,
|
||||
content: buildCurrentContent(),
|
||||
stopReason: "stop",
|
||||
usage: buildUsageWithNoCost({}),
|
||||
});
|
||||
stream.push({
|
||||
type: "thinking_end",
|
||||
contentIndex: 0,
|
||||
content: accumulatedThinking,
|
||||
partial,
|
||||
});
|
||||
};
|
||||
const reader = response.body.getReader();
|
||||
let accumulatedContent = "";
|
||||
let accumulatedThinking = "";
|
||||
const accumulatedToolCalls: OllamaToolCall[] = [];
|
||||
let finalResponse: OllamaChatResponse | undefined;
|
||||
const modelInfo = { api: model.api, provider: model.provider, id: model.id };
|
||||
let streamStarted = false;
|
||||
let thinkingStarted = false;
|
||||
let thinkingEnded = false;
|
||||
let textBlockStarted = false;
|
||||
let textBlockClosed = false;
|
||||
const textContentIndex = () => (thinkingStarted ? 1 : 0);
|
||||
|
||||
const closeTextBlock = () => {
|
||||
if (!textBlockStarted || textBlockClosed) {
|
||||
return;
|
||||
}
|
||||
textBlockClosed = true;
|
||||
const partial = buildStreamAssistantMessage({
|
||||
model: modelInfo,
|
||||
content: buildCurrentContent(),
|
||||
stopReason: "stop",
|
||||
usage: buildUsageWithNoCost({}),
|
||||
});
|
||||
stream.push({
|
||||
type: "text_end",
|
||||
contentIndex: textContentIndex(),
|
||||
content: accumulatedContent,
|
||||
partial,
|
||||
});
|
||||
};
|
||||
|
||||
for await (const chunk of parseNdjsonStream(reader)) {
|
||||
// Handle thinking/reasoning deltas from Ollama's native think mode.
|
||||
const thinkingDelta = chunk.message?.thinking ?? chunk.message?.reasoning;
|
||||
if (thinkingDelta) {
|
||||
if (!streamStarted) {
|
||||
streamStarted = true;
|
||||
const emptyPartial = buildStreamAssistantMessage({
|
||||
model: modelInfo,
|
||||
content: [],
|
||||
stopReason: "stop",
|
||||
usage: buildUsageWithNoCost({}),
|
||||
const buildCurrentContent = (): (TextContent | ThinkingContent | ToolCall)[] => {
|
||||
const parts: (TextContent | ThinkingContent | ToolCall)[] = [];
|
||||
if (accumulatedThinking) {
|
||||
parts.push({
|
||||
type: "thinking",
|
||||
thinking: accumulatedThinking,
|
||||
});
|
||||
stream.push({ type: "start", partial: emptyPartial });
|
||||
}
|
||||
if (!thinkingStarted) {
|
||||
thinkingStarted = true;
|
||||
const partial = buildStreamAssistantMessage({
|
||||
model: modelInfo,
|
||||
content: buildCurrentContent(),
|
||||
stopReason: "stop",
|
||||
usage: buildUsageWithNoCost({}),
|
||||
});
|
||||
stream.push({ type: "thinking_start", contentIndex: 0, partial });
|
||||
if (accumulatedContent) {
|
||||
parts.push({ type: "text", text: accumulatedContent });
|
||||
}
|
||||
accumulatedThinking += thinkingDelta;
|
||||
return parts;
|
||||
};
|
||||
|
||||
const closeThinkingBlock = () => {
|
||||
if (!thinkingStarted || thinkingEnded) {
|
||||
return;
|
||||
}
|
||||
thinkingEnded = true;
|
||||
const partial = buildStreamAssistantMessage({
|
||||
model: modelInfo,
|
||||
content: buildCurrentContent(),
|
||||
@@ -761,85 +714,146 @@ export function createOllamaStreamFn(
|
||||
usage: buildUsageWithNoCost({}),
|
||||
});
|
||||
stream.push({
|
||||
type: "thinking_delta",
|
||||
type: "thinking_end",
|
||||
contentIndex: 0,
|
||||
delta: thinkingDelta,
|
||||
content: accumulatedThinking,
|
||||
partial,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
if (chunk.message?.content) {
|
||||
const delta = chunk.message.content;
|
||||
|
||||
// Transition from thinking to text: close the thinking block first.
|
||||
if (thinkingStarted && !thinkingEnded) {
|
||||
closeThinkingBlock();
|
||||
const closeTextBlock = () => {
|
||||
if (!textBlockStarted || textBlockClosed) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!streamStarted) {
|
||||
streamStarted = true;
|
||||
const emptyPartial = buildStreamAssistantMessage({
|
||||
model: modelInfo,
|
||||
content: [],
|
||||
stopReason: "stop",
|
||||
usage: buildUsageWithNoCost({}),
|
||||
});
|
||||
stream.push({ type: "start", partial: emptyPartial });
|
||||
}
|
||||
if (!textBlockStarted) {
|
||||
textBlockStarted = true;
|
||||
const partial = buildStreamAssistantMessage({
|
||||
model: modelInfo,
|
||||
content: buildCurrentContent(),
|
||||
stopReason: "stop",
|
||||
usage: buildUsageWithNoCost({}),
|
||||
});
|
||||
stream.push({ type: "text_start", contentIndex: textContentIndex(), partial });
|
||||
}
|
||||
|
||||
accumulatedContent += delta;
|
||||
textBlockClosed = true;
|
||||
const partial = buildStreamAssistantMessage({
|
||||
model: modelInfo,
|
||||
content: buildCurrentContent(),
|
||||
stopReason: "stop",
|
||||
usage: buildUsageWithNoCost({}),
|
||||
});
|
||||
stream.push({ type: "text_delta", contentIndex: textContentIndex(), delta, partial });
|
||||
stream.push({
|
||||
type: "text_end",
|
||||
contentIndex: textContentIndex(),
|
||||
content: accumulatedContent,
|
||||
partial,
|
||||
});
|
||||
};
|
||||
|
||||
for await (const chunk of parseNdjsonStream(reader)) {
|
||||
const thinkingDelta = chunk.message?.thinking ?? chunk.message?.reasoning;
|
||||
if (thinkingDelta) {
|
||||
if (!streamStarted) {
|
||||
streamStarted = true;
|
||||
const emptyPartial = buildStreamAssistantMessage({
|
||||
model: modelInfo,
|
||||
content: [],
|
||||
stopReason: "stop",
|
||||
usage: buildUsageWithNoCost({}),
|
||||
});
|
||||
stream.push({ type: "start", partial: emptyPartial });
|
||||
}
|
||||
if (!thinkingStarted) {
|
||||
thinkingStarted = true;
|
||||
const partial = buildStreamAssistantMessage({
|
||||
model: modelInfo,
|
||||
content: buildCurrentContent(),
|
||||
stopReason: "stop",
|
||||
usage: buildUsageWithNoCost({}),
|
||||
});
|
||||
stream.push({ type: "thinking_start", contentIndex: 0, partial });
|
||||
}
|
||||
accumulatedThinking += thinkingDelta;
|
||||
const partial = buildStreamAssistantMessage({
|
||||
model: modelInfo,
|
||||
content: buildCurrentContent(),
|
||||
stopReason: "stop",
|
||||
usage: buildUsageWithNoCost({}),
|
||||
});
|
||||
stream.push({
|
||||
type: "thinking_delta",
|
||||
contentIndex: 0,
|
||||
delta: thinkingDelta,
|
||||
partial,
|
||||
});
|
||||
}
|
||||
|
||||
if (chunk.message?.content) {
|
||||
const delta = chunk.message.content;
|
||||
if (thinkingStarted && !thinkingEnded) {
|
||||
closeThinkingBlock();
|
||||
}
|
||||
|
||||
if (!streamStarted) {
|
||||
streamStarted = true;
|
||||
const emptyPartial = buildStreamAssistantMessage({
|
||||
model: modelInfo,
|
||||
content: [],
|
||||
stopReason: "stop",
|
||||
usage: buildUsageWithNoCost({}),
|
||||
});
|
||||
stream.push({ type: "start", partial: emptyPartial });
|
||||
}
|
||||
if (!textBlockStarted) {
|
||||
textBlockStarted = true;
|
||||
const partial = buildStreamAssistantMessage({
|
||||
model: modelInfo,
|
||||
content: buildCurrentContent(),
|
||||
stopReason: "stop",
|
||||
usage: buildUsageWithNoCost({}),
|
||||
});
|
||||
stream.push({ type: "text_start", contentIndex: textContentIndex(), partial });
|
||||
}
|
||||
|
||||
accumulatedContent += delta;
|
||||
const partial = buildStreamAssistantMessage({
|
||||
model: modelInfo,
|
||||
content: buildCurrentContent(),
|
||||
stopReason: "stop",
|
||||
usage: buildUsageWithNoCost({}),
|
||||
});
|
||||
stream.push({
|
||||
type: "text_delta",
|
||||
contentIndex: textContentIndex(),
|
||||
delta,
|
||||
partial,
|
||||
});
|
||||
}
|
||||
if (chunk.message?.tool_calls) {
|
||||
closeThinkingBlock();
|
||||
closeTextBlock();
|
||||
accumulatedToolCalls.push(...chunk.message.tool_calls);
|
||||
}
|
||||
if (chunk.done) {
|
||||
finalResponse = chunk;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (chunk.message?.tool_calls) {
|
||||
closeThinkingBlock();
|
||||
closeTextBlock();
|
||||
accumulatedToolCalls.push(...chunk.message.tool_calls);
|
||||
|
||||
if (!finalResponse) {
|
||||
throw new Error("Ollama API stream ended without a final response");
|
||||
}
|
||||
if (chunk.done) {
|
||||
finalResponse = chunk;
|
||||
break;
|
||||
|
||||
finalResponse.message.content = accumulatedContent;
|
||||
if (accumulatedThinking) {
|
||||
finalResponse.message.thinking = accumulatedThinking;
|
||||
}
|
||||
if (accumulatedToolCalls.length > 0) {
|
||||
finalResponse.message.tool_calls = accumulatedToolCalls;
|
||||
}
|
||||
|
||||
const assistantMessage = buildAssistantMessage(finalResponse, modelInfo);
|
||||
closeThinkingBlock();
|
||||
closeTextBlock();
|
||||
|
||||
stream.push({
|
||||
type: "done",
|
||||
reason: assistantMessage.stopReason === "toolUse" ? "toolUse" : "stop",
|
||||
message: assistantMessage,
|
||||
});
|
||||
} finally {
|
||||
await release();
|
||||
}
|
||||
|
||||
if (!finalResponse) {
|
||||
throw new Error("Ollama API stream ended without a final response");
|
||||
}
|
||||
|
||||
finalResponse.message.content = accumulatedContent;
|
||||
if (accumulatedThinking) {
|
||||
finalResponse.message.thinking = accumulatedThinking;
|
||||
}
|
||||
if (accumulatedToolCalls.length > 0) {
|
||||
finalResponse.message.tool_calls = accumulatedToolCalls;
|
||||
}
|
||||
|
||||
const assistantMessage = buildAssistantMessage(finalResponse, modelInfo);
|
||||
|
||||
// Close any open blocks before emitting the done event.
|
||||
closeThinkingBlock();
|
||||
closeTextBlock();
|
||||
|
||||
stream.push({
|
||||
type: "done",
|
||||
reason: assistantMessage.stopReason === "toolUse" ? "toolUse" : "stop",
|
||||
message: assistantMessage,
|
||||
});
|
||||
} catch (err) {
|
||||
stream.push({
|
||||
type: "error",
|
||||
|
||||
Reference in New Issue
Block a user