refactor: move stream payload compat into provider seams

This commit is contained in:
Peter Steinberger
2026-03-28 00:07:38 +00:00
parent 958e3a4c69
commit 8e687613b6
16 changed files with 436 additions and 271 deletions

View File

@@ -23,6 +23,8 @@ export {
promptAndConfigureOllama,
} from "./src/setup.js";
export {
buildOllamaChatRequest,
createConfiguredOllamaCompatStreamWrapper,
isOllamaCompatProvider,
resolveOllamaCompatNumCtxEnabled,
shouldInjectOllamaCompatNumCtx,

View File

@@ -19,7 +19,7 @@ const promptAndConfigureOllamaMock = vi.hoisted(() =>
);
const ensureOllamaModelPulledMock = vi.hoisted(() => vi.fn(async () => {}));
vi.mock("./src/setup.js", () => ({
vi.mock("openclaw/plugin-sdk/provider-setup", () => ({
promptAndConfigureOllama: promptAndConfigureOllamaMock,
ensureOllamaModelPulled: ensureOllamaModelPulledMock,
configureOllamaNonInteractive: vi.fn(),
@@ -141,4 +141,58 @@ describe("ollama plugin", () => {
expect(baseStreamFn).toHaveBeenCalledTimes(1);
expect((payloadSeen?.options as Record<string, unknown> | undefined)?.num_ctx).toBe(202752);
});
it("wraps native Ollama payloads with top-level think=false when thinking is off", () => {
const provider = registerProvider();
let payloadSeen: Record<string, unknown> | undefined;
const baseStreamFn = vi.fn((_model, _context, options) => {
const payload: Record<string, unknown> = {
messages: [],
options: { num_ctx: 65536 },
stream: true,
};
options?.onPayload?.(payload, _model);
payloadSeen = payload;
return {} as never;
});
const wrapped = provider.wrapStreamFn?.({
config: {
models: {
providers: {
ollama: {
api: "ollama",
baseUrl: "http://127.0.0.1:11434",
models: [],
},
},
},
},
provider: "ollama",
modelId: "qwen3.5:9b",
thinkingLevel: "off",
model: {
api: "ollama",
provider: "ollama",
id: "qwen3.5:9b",
baseUrl: "http://127.0.0.1:11434",
contextWindow: 131_072,
},
streamFn: baseStreamFn,
});
expect(typeof wrapped).toBe("function");
void wrapped?.(
{
api: "ollama",
provider: "ollama",
id: "qwen3.5:9b",
} as never,
{} as never,
{},
);
expect(baseStreamFn).toHaveBeenCalledTimes(1);
expect(payloadSeen?.think).toBe(false);
expect((payloadSeen?.options as Record<string, unknown> | undefined)?.think).toBeUndefined();
});
});

View File

@@ -13,7 +13,7 @@ import {
} from "./src/embedding-provider.js";
import { resolveOllamaApiBase } from "./src/provider-models.js";
import {
createConfiguredOllamaCompatNumCtxWrapper,
createConfiguredOllamaCompatStreamWrapper,
createConfiguredOllamaStreamFn,
} from "./src/stream.js";
@@ -145,7 +145,7 @@ export default definePluginEntry({
});
},
wrapStreamFn: (ctx) => {
return createConfiguredOllamaCompatNumCtxWrapper(ctx);
return createConfiguredOllamaCompatStreamWrapper(ctx);
},
createEmbeddingProvider: async ({ config, model, remote }) => {
const { provider, client } = await createOllamaEmbeddingProvider({

View File

@@ -1,5 +1,7 @@
export {
buildAssistantMessage,
buildOllamaChatRequest,
createConfiguredOllamaCompatStreamWrapper,
convertToOllamaMessages,
createConfiguredOllamaCompatNumCtxWrapper,
createConfiguredOllamaStreamFn,

View File

@@ -22,6 +22,7 @@ import {
import {
createMoonshotThinkingWrapper,
resolveMoonshotThinkingType,
streamWithPayloadPatch,
} from "openclaw/plugin-sdk/provider-stream";
import { createSubsystemLogger } from "openclaw/plugin-sdk/runtime";
import { OLLAMA_DEFAULT_BASE_URL } from "./defaults.js";
@@ -132,22 +133,26 @@ export function shouldInjectOllamaCompatNumCtx(params: {
export function wrapOllamaCompatNumCtx(baseFn: StreamFn | undefined, numCtx: number): StreamFn {
const streamFn = baseFn ?? streamSimple;
return (model, context, options) =>
streamFn(model, context, {
...options,
onPayload: (payload: unknown) => {
if (!payload || typeof payload !== "object") {
return options?.onPayload?.(payload, model);
}
const payloadRecord = payload as Record<string, unknown>;
if (!payloadRecord.options || typeof payloadRecord.options !== "object") {
payloadRecord.options = {};
}
(payloadRecord.options as Record<string, unknown>).num_ctx = numCtx;
return options?.onPayload?.(payload, model);
},
streamWithPayloadPatch(streamFn, model, context, options, (payloadRecord) => {
if (!payloadRecord.options || typeof payloadRecord.options !== "object") {
payloadRecord.options = {};
}
(payloadRecord.options as Record<string, unknown>).num_ctx = numCtx;
});
}
function createOllamaThinkingOffWrapper(baseFn: StreamFn | undefined): StreamFn {
const streamFn = baseFn ?? streamSimple;
return (model, context, options) => {
if (model.api !== "ollama") {
return streamFn(model, context, options);
}
return streamWithPayloadPatch(streamFn, model, context, options, (payloadRecord) => {
payloadRecord.think = false;
});
};
}
function resolveOllamaCompatNumCtx(model: ProviderRuntimeModel): number {
return Math.max(1, Math.floor(model.contextWindow ?? model.maxTokens ?? DEFAULT_CONTEXT_TOKENS));
}
@@ -157,11 +162,12 @@ function isOllamaCloudKimiModelRef(modelId: string): boolean {
return normalizedModelId.startsWith("kimi-k") && normalizedModelId.includes(":cloud");
}
export function createConfiguredOllamaCompatNumCtxWrapper(
export function createConfiguredOllamaCompatStreamWrapper(
ctx: ProviderWrapStreamFnContext,
): StreamFn | undefined {
let streamFn = ctx.streamFn;
const model = ctx.model;
let injectNumCtx = false;
if (model) {
const providerId =
@@ -175,10 +181,18 @@ export function createConfiguredOllamaCompatNumCtxWrapper(
providerId,
})
) {
streamFn = wrapOllamaCompatNumCtx(streamFn, resolveOllamaCompatNumCtx(model));
injectNumCtx = true;
}
}
if (injectNumCtx && model) {
streamFn = wrapOllamaCompatNumCtx(streamFn, resolveOllamaCompatNumCtx(model));
}
if (ctx.thinkingLevel === "off") {
streamFn = createOllamaThinkingOffWrapper(streamFn);
}
if (normalizeProviderId(ctx.provider) === "ollama" && isOllamaCloudKimiModelRef(ctx.modelId)) {
const thinkingType = resolveMoonshotThinkingType({
configuredThinking: ctx.extraParams?.thinking,
@@ -190,6 +204,26 @@ export function createConfiguredOllamaCompatNumCtxWrapper(
return streamFn;
}
// Backward-compatible alias for existing imports/tests while the broader
// Ollama compat wrapper now owns more than num_ctx injection.
export const createConfiguredOllamaCompatNumCtxWrapper = createConfiguredOllamaCompatStreamWrapper;
export function buildOllamaChatRequest(params: {
modelId: string;
messages: OllamaChatMessage[];
tools?: OllamaTool[];
options?: Record<string, unknown>;
stream?: boolean;
}): OllamaChatRequest {
return {
model: params.modelId,
messages: params.messages,
stream: params.stream ?? true,
...(params.tools && params.tools.length > 0 ? { tools: params.tools } : {}),
...(params.options ? { options: params.options } : {}),
};
}
type StreamModelDescriptor = {
api: string;
provider: string;
@@ -656,13 +690,13 @@ export function createOllamaStreamFn(
ollamaOptions.num_predict = options.maxTokens;
}
const body: OllamaChatRequest = {
model: model.id,
const body = buildOllamaChatRequest({
modelId: model.id,
messages: ollamaMessages,
stream: true,
...(ollamaTools.length > 0 ? { tools: ollamaTools } : {}),
tools: ollamaTools,
options: ollamaOptions,
};
});
options?.onPayload?.(body, model);
const headers: Record<string, string> = {
"Content-Type": "application/json",

View File

@@ -1,5 +1,6 @@
import { describe, expect, it, vi } from "vitest";
import {
buildOllamaChatRequest,
createConfiguredOllamaStreamFn,
createOllamaStreamFn,
convertToOllamaMessages,
@@ -9,6 +10,23 @@ import {
} from "../plugin-sdk/ollama.js";
import { applyExtraParamsToAgent } from "./pi-embedded-runner/extra-params.js";
describe("buildOllamaChatRequest", () => {
it("omits tools when none are provided", () => {
expect(
buildOllamaChatRequest({
modelId: "qwen3.5:9b",
messages: [{ role: "user", content: "hello" }],
options: { num_ctx: 65536 },
}),
).toEqual({
model: "qwen3.5:9b",
messages: [{ role: "user", content: "hello" }],
stream: true,
options: { num_ctx: 65536 },
});
});
});
describe("convertToOllamaMessages", () => {
it("converts user text messages", () => {
const messages = [{ role: "user", content: "hello" }];

View File

@@ -1,5 +1,6 @@
import type { Model } from "@mariozechner/pi-ai";
import { describe, expect, it, vi } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { __testing as extraParamsTesting } from "./extra-params.js";
import { runExtraParamsCase } from "./extra-params.test-support.js";
vi.mock("@mariozechner/pi-ai", async (importOriginal) => {
@@ -13,8 +14,37 @@ vi.mock("@mariozechner/pi-ai", async (importOriginal) => {
};
});
describe("extra-params: Ollama thinking payload compatibility", () => {
it("injects top-level think=false when thinkingLevel is off", () => {
beforeEach(() => {
extraParamsTesting.setProviderRuntimeDepsForTest({
prepareProviderExtraParams: ({ context }) => context.extraParams,
wrapProviderStreamFn: ({ provider, context }) => {
if (provider !== "ollama" || context.thinkingLevel !== "off") {
return context.streamFn;
}
const baseStreamFn = context.streamFn;
if (!baseStreamFn) {
return undefined;
}
return (model, streamContext, options) =>
baseStreamFn(model, streamContext, {
...options,
onPayload: (payload, payloadModel) => {
if (payload && typeof payload === "object") {
(payload as Record<string, unknown>).think = false;
}
return options?.onPayload?.(payload, payloadModel);
},
});
},
});
});
afterEach(() => {
extraParamsTesting.resetProviderRuntimeDepsForTest();
});
describe("extra-params: Ollama plugin handoff", () => {
it("passes thinking-off intent through the provider runtime wrapper seam", () => {
const payload = runExtraParamsCase({
applyProvider: "ollama",
applyModelId: "qwen3.5:9b",
@@ -39,7 +69,7 @@ describe("extra-params: Ollama thinking payload compatibility", () => {
expect((payload.options as Record<string, unknown>).think).toBeUndefined();
});
it("does not inject think=false for non-ollama models", () => {
it("does not apply the plugin wrapper for non-ollama providers", () => {
const payload = runExtraParamsCase({
applyProvider: "openai",
applyModelId: "gpt-5.4",
@@ -58,7 +88,7 @@ describe("extra-params: Ollama thinking payload compatibility", () => {
expect(payload.think).toBeUndefined();
});
it("does not inject think=false when thinkingLevel is not off", () => {
it("does not apply the plugin wrapper when thinkingLevel is not off", () => {
const payload = runExtraParamsCase({
applyProvider: "ollama",
applyModelId: "qwen3.5:9b",

View File

@@ -10,7 +10,10 @@ export async function createPiAiStreamSimpleMock(
...original,
streamSimple: vi.fn(() => ({
push: vi.fn(),
result: vi.fn(),
result: vi.fn(async () => undefined),
[Symbol.asyncIterator]: vi.fn(async function* () {
// Minimal async stream shape for wrappers that patch iteration/result.
}),
})),
};
}

View File

@@ -9,6 +9,18 @@ export type ExtraParamsCapture<TPayload extends Record<string, unknown>> = {
payload: TPayload;
};
function createMockStream(): ReturnType<StreamFn> {
return {
push() {},
async result() {
return undefined;
},
async *[Symbol.asyncIterator]() {
// Minimal async stream surface for wrappers that decorate iteration.
},
} as unknown as ReturnType<StreamFn>;
}
type RunExtraParamsCaseParams<
TApi extends "openai-completions" | "openai-responses",
TPayload extends Record<string, unknown>,
@@ -34,7 +46,7 @@ export function runExtraParamsCase<
const baseStreamFn: StreamFn = (model, _context, options) => {
captured.headers = options?.headers;
options?.onPayload?.(params.payload, model);
return {} as ReturnType<StreamFn>;
return createMockStream();
};
const agent = { streamFn: baseStreamFn };

View File

@@ -38,6 +38,7 @@ import {
resolveOpenAIFastMode,
resolveOpenAIServiceTier,
} from "./openai-stream-wrappers.js";
import { streamWithPayloadPatch } from "./stream-payload-utils.js";
import { createXaiFastModeWrapper } from "./xai-stream-wrappers.js";
const defaultProviderRuntimeDeps = {
@@ -271,19 +272,168 @@ function createParallelToolCallsWrapper(
log.debug(
`applying parallel_tool_calls=${enabled} for ${model.provider ?? "unknown"}/${model.id ?? "unknown"} api=${model.api}`,
);
const originalOnPayload = options?.onPayload;
return underlying(model, context, {
...options,
onPayload: (payload) => {
if (payload && typeof payload === "object") {
(payload as Record<string, unknown>).parallel_tool_calls = enabled;
}
return originalOnPayload?.(payload, model);
},
return streamWithPayloadPatch(underlying, model, context, options, (payloadObj) => {
payloadObj.parallel_tool_calls = enabled;
});
};
}
type ApplyExtraParamsContext = {
agent: { streamFn?: StreamFn };
cfg: OpenClawConfig | undefined;
provider: string;
modelId: string;
workspaceDir?: string;
thinkingLevel?: ThinkLevel;
model?: ProviderRuntimeModel;
effectiveExtraParams: Record<string, unknown>;
resolvedExtraParams?: Record<string, unknown>;
override?: Record<string, unknown>;
};
function applyPrePluginStreamWrappers(ctx: ApplyExtraParamsContext): void {
if (ctx.provider === "openai" || ctx.provider === "openai-codex") {
if (ctx.provider === "openai") {
// Default OpenAI Responses to WebSocket-first with transparent SSE fallback.
ctx.agent.streamFn = createOpenAIDefaultTransportWrapper(ctx.agent.streamFn);
}
ctx.agent.streamFn = createOpenAIAttributionHeadersWrapper(ctx.agent.streamFn);
}
const wrappedStreamFn = createStreamFnWithExtraParams(
ctx.agent.streamFn,
ctx.effectiveExtraParams,
ctx.provider,
);
if (wrappedStreamFn) {
log.debug(`applying extraParams to agent streamFn for ${ctx.provider}/${ctx.modelId}`);
ctx.agent.streamFn = wrappedStreamFn;
}
const anthropicBetas = resolveAnthropicBetas(ctx.effectiveExtraParams, ctx.provider, ctx.modelId);
if (anthropicBetas?.length) {
log.debug(
`applying Anthropic beta header for ${ctx.provider}/${ctx.modelId}: ${anthropicBetas.join(",")}`,
);
ctx.agent.streamFn = createAnthropicBetaHeadersWrapper(ctx.agent.streamFn, anthropicBetas);
}
if (
shouldApplySiliconFlowThinkingOffCompat({
provider: ctx.provider,
modelId: ctx.modelId,
thinkingLevel: ctx.thinkingLevel,
})
) {
log.debug(
`normalizing thinking=off to thinking=null for SiliconFlow compatibility (${ctx.provider}/${ctx.modelId})`,
);
ctx.agent.streamFn = createSiliconFlowThinkingWrapper(ctx.agent.streamFn);
}
ctx.agent.streamFn = createAnthropicToolPayloadCompatibilityWrapper(ctx.agent.streamFn, {
config: ctx.cfg,
workspaceDir: ctx.workspaceDir,
});
}
function applyPostPluginStreamWrappers(
ctx: ApplyExtraParamsContext & { providerWrapperHandled: boolean },
): void {
if (
!ctx.providerWrapperHandled &&
shouldApplyMoonshotPayloadCompat({ provider: ctx.provider, modelId: ctx.modelId })
) {
// Preserve the legacy Moonshot compatibility path when no plugin wrapper
// actually handled the stream function. This mainly covers tests and
// disabled plugins for the native Moonshot provider.
const thinkingType = resolveMoonshotThinkingType({
configuredThinking: ctx.effectiveExtraParams?.thinking,
thinkingLevel: ctx.thinkingLevel,
});
ctx.agent.streamFn = createMoonshotThinkingWrapper(ctx.agent.streamFn, thinkingType);
}
if (ctx.provider === "amazon-bedrock" && !isAnthropicBedrockModel(ctx.modelId)) {
log.debug(
`disabling prompt caching for non-Anthropic Bedrock model ${ctx.provider}/${ctx.modelId}`,
);
ctx.agent.streamFn = createBedrockNoCacheWrapper(ctx.agent.streamFn);
}
// Guard Google payloads against invalid negative thinking budgets emitted by
// upstream model-ID heuristics for Gemini 3.1 variants.
ctx.agent.streamFn = createGoogleThinkingPayloadWrapper(ctx.agent.streamFn, ctx.thinkingLevel);
const anthropicFastMode = resolveAnthropicFastMode(ctx.effectiveExtraParams);
if (anthropicFastMode !== undefined) {
log.debug(
`applying Anthropic fast mode=${anthropicFastMode} for ${ctx.provider}/${ctx.modelId}`,
);
ctx.agent.streamFn = createAnthropicFastModeWrapper(ctx.agent.streamFn, anthropicFastMode);
}
if (typeof ctx.effectiveExtraParams?.fastMode === "boolean") {
log.debug(
`applying MiniMax fast mode=${ctx.effectiveExtraParams.fastMode} for ${ctx.provider}/${ctx.modelId}`,
);
ctx.agent.streamFn = createMinimaxFastModeWrapper(
ctx.agent.streamFn,
ctx.effectiveExtraParams.fastMode,
);
log.debug(
`applying xAI fast mode=${ctx.effectiveExtraParams.fastMode} for ${ctx.provider}/${ctx.modelId}`,
);
ctx.agent.streamFn = createXaiFastModeWrapper(
ctx.agent.streamFn,
ctx.effectiveExtraParams.fastMode,
);
}
const openAIFastMode = resolveOpenAIFastMode(ctx.effectiveExtraParams);
if (openAIFastMode) {
log.debug(`applying OpenAI fast mode for ${ctx.provider}/${ctx.modelId}`);
ctx.agent.streamFn = createOpenAIFastModeWrapper(ctx.agent.streamFn);
}
const openAIServiceTier = resolveOpenAIServiceTier(ctx.effectiveExtraParams);
if (openAIServiceTier) {
log.debug(
`applying OpenAI service_tier=${openAIServiceTier} for ${ctx.provider}/${ctx.modelId}`,
);
ctx.agent.streamFn = createOpenAIServiceTierWrapper(ctx.agent.streamFn, openAIServiceTier);
}
// Work around upstream pi-ai hardcoding `store: false` for Responses API.
// Force `store=true` for direct OpenAI Responses models and auto-enable
// server-side compaction for compatible OpenAI Responses payloads.
ctx.agent.streamFn = createOpenAIResponsesContextManagementWrapper(
ctx.agent.streamFn,
ctx.effectiveExtraParams,
);
const rawParallelToolCalls = resolveAliasedParamValue(
[ctx.resolvedExtraParams, ctx.override],
"parallel_tool_calls",
"parallelToolCalls",
);
if (rawParallelToolCalls === undefined) {
return;
}
if (typeof rawParallelToolCalls === "boolean") {
ctx.agent.streamFn = createParallelToolCallsWrapper(ctx.agent.streamFn, rawParallelToolCalls);
return;
}
if (rawParallelToolCalls === null) {
log.debug("parallel_tool_calls suppressed by null override, skipping injection");
return;
}
const summary =
typeof rawParallelToolCalls === "string" ? rawParallelToolCalls : typeof rawParallelToolCalls;
log.warn(`ignoring invalid parallel_tool_calls param: ${summary}`);
}
/**
* Apply extra params (like temperature) to an agent's streamFn.
* Also applies verified provider-specific request wrappers, such as OpenRouter attribution.
@@ -323,48 +473,20 @@ export function applyExtraParamsToAgent(
resolvedExtraParams,
});
if (provider === "openai" || provider === "openai-codex") {
if (provider === "openai") {
// Default OpenAI Responses to WebSocket-first with transparent SSE fallback.
agent.streamFn = createOpenAIDefaultTransportWrapper(agent.streamFn);
}
agent.streamFn = createOpenAIAttributionHeadersWrapper(agent.streamFn);
}
const wrappedStreamFn = createStreamFnWithExtraParams(
agent.streamFn,
effectiveExtraParams,
const wrapperContext: ApplyExtraParamsContext = {
agent,
cfg,
provider,
);
if (wrappedStreamFn) {
log.debug(`applying extraParams to agent streamFn for ${provider}/${modelId}`);
agent.streamFn = wrappedStreamFn;
}
const anthropicBetas = resolveAnthropicBetas(effectiveExtraParams, provider, modelId);
if (anthropicBetas?.length) {
log.debug(
`applying Anthropic beta header for ${provider}/${modelId}: ${anthropicBetas.join(",")}`,
);
agent.streamFn = createAnthropicBetaHeadersWrapper(agent.streamFn, anthropicBetas);
}
if (shouldApplySiliconFlowThinkingOffCompat({ provider, modelId, thinkingLevel })) {
log.debug(
`normalizing thinking=off to thinking=null for SiliconFlow compatibility (${provider}/${modelId})`,
);
agent.streamFn = createSiliconFlowThinkingWrapper(agent.streamFn);
}
if (thinkingLevel === "off") {
agent.streamFn = createOllamaThinkingOffWrapper(agent.streamFn);
}
agent.streamFn = createAnthropicToolPayloadCompatibilityWrapper(agent.streamFn, {
config: cfg,
modelId,
workspaceDir,
});
thinkingLevel,
model,
effectiveExtraParams,
resolvedExtraParams,
override,
};
applyPrePluginStreamWrappers(wrapperContext);
const providerStreamBase = agent.streamFn;
const pluginWrappedStreamFn = providerRuntimeDeps.wrapProviderStreamFn({
provider,
@@ -382,99 +504,10 @@ export function applyExtraParamsToAgent(
agent.streamFn = pluginWrappedStreamFn ?? providerStreamBase;
const providerWrapperHandled =
pluginWrappedStreamFn !== undefined && pluginWrappedStreamFn !== providerStreamBase;
if (!providerWrapperHandled && shouldApplyMoonshotPayloadCompat({ provider, modelId })) {
// Preserve the legacy Moonshot compatibility path when no plugin wrapper
// actually handled the stream function. This mainly covers tests and
// disabled plugins for the native Moonshot provider.
const thinkingType = resolveMoonshotThinkingType({
configuredThinking: effectiveExtraParams?.thinking,
thinkingLevel,
});
agent.streamFn = createMoonshotThinkingWrapper(agent.streamFn, thinkingType);
}
if (provider === "amazon-bedrock" && !isAnthropicBedrockModel(modelId)) {
log.debug(`disabling prompt caching for non-Anthropic Bedrock model ${provider}/${modelId}`);
agent.streamFn = createBedrockNoCacheWrapper(agent.streamFn);
}
// Guard Google payloads against invalid negative thinking budgets emitted by
// upstream model-ID heuristics for Gemini 3.1 variants.
agent.streamFn = createGoogleThinkingPayloadWrapper(agent.streamFn, thinkingLevel);
const anthropicFastMode = resolveAnthropicFastMode(effectiveExtraParams);
if (anthropicFastMode !== undefined) {
log.debug(`applying Anthropic fast mode=${anthropicFastMode} for ${provider}/${modelId}`);
agent.streamFn = createAnthropicFastModeWrapper(agent.streamFn, anthropicFastMode);
}
if (typeof effectiveExtraParams?.fastMode === "boolean") {
log.debug(
`applying MiniMax fast mode=${effectiveExtraParams.fastMode} for ${provider}/${modelId}`,
);
agent.streamFn = createMinimaxFastModeWrapper(agent.streamFn, effectiveExtraParams.fastMode);
log.debug(`applying xAI fast mode=${effectiveExtraParams.fastMode} for ${provider}/${modelId}`);
agent.streamFn = createXaiFastModeWrapper(agent.streamFn, effectiveExtraParams.fastMode);
}
const openAIFastMode = resolveOpenAIFastMode(effectiveExtraParams);
if (openAIFastMode) {
log.debug(`applying OpenAI fast mode for ${provider}/${modelId}`);
agent.streamFn = createOpenAIFastModeWrapper(agent.streamFn);
}
const openAIServiceTier = resolveOpenAIServiceTier(effectiveExtraParams);
if (openAIServiceTier) {
log.debug(`applying OpenAI service_tier=${openAIServiceTier} for ${provider}/${modelId}`);
agent.streamFn = createOpenAIServiceTierWrapper(agent.streamFn, openAIServiceTier);
}
// Work around upstream pi-ai hardcoding `store: false` for Responses API.
// Force `store=true` for direct OpenAI Responses models and auto-enable
// server-side compaction for compatible OpenAI Responses payloads.
agent.streamFn = createOpenAIResponsesContextManagementWrapper(
agent.streamFn,
effectiveExtraParams,
);
const rawParallelToolCalls = resolveAliasedParamValue(
[resolvedExtraParams, override],
"parallel_tool_calls",
"parallelToolCalls",
);
if (rawParallelToolCalls !== undefined) {
if (typeof rawParallelToolCalls === "boolean") {
agent.streamFn = createParallelToolCallsWrapper(agent.streamFn, rawParallelToolCalls);
} else if (rawParallelToolCalls === null) {
log.debug("parallel_tool_calls suppressed by null override, skipping injection");
} else {
const summary =
typeof rawParallelToolCalls === "string"
? rawParallelToolCalls
: typeof rawParallelToolCalls;
log.warn(`ignoring invalid parallel_tool_calls param: ${summary}`);
}
}
applyPostPluginStreamWrappers({
...wrapperContext,
providerWrapperHandled,
});
return { effectiveExtraParams };
}
function createOllamaThinkingOffWrapper(baseStreamFn: StreamFn | undefined): StreamFn {
const underlying = baseStreamFn ?? streamSimple;
return (model, context, options) => {
if (model.api !== "ollama") {
return underlying(model, context, options);
}
const originalOnPayload = options?.onPayload;
return underlying(model, context, {
...options,
onPayload: (payload, payloadModel) => {
if (payload && typeof payload === "object") {
(payload as Record<string, unknown>).think = false;
}
return originalOnPayload?.(payload, payloadModel);
},
});
};
}

View File

@@ -1,6 +1,7 @@
import type { StreamFn } from "@mariozechner/pi-agent-core";
import { streamSimple } from "@mariozechner/pi-ai";
import type { ThinkLevel } from "../../auto-reply/thinking.js";
import { streamWithPayloadPatch } from "./stream-payload-utils.js";
function isGemini31Model(modelId: string): boolean {
const normalized = modelId.toLowerCase();
@@ -74,19 +75,14 @@ export function createGoogleThinkingPayloadWrapper(
): StreamFn {
const underlying = baseStreamFn ?? streamSimple;
return (model, context, options) => {
const onPayload = options?.onPayload;
return underlying(model, context, {
...options,
onPayload: (payload) => {
if (model.api === "google-generative-ai") {
sanitizeGoogleThinkingPayload({
payload,
modelId: model.id,
thinkingLevel,
});
}
return onPayload?.(payload, model);
},
return streamWithPayloadPatch(underlying, model, context, options, (payload) => {
if (model.api === "google-generative-ai") {
sanitizeGoogleThinkingPayload({
payload,
modelId: model.id,
thinkingLevel,
});
}
});
};
}

View File

@@ -3,6 +3,7 @@ import { streamSimple } from "@mariozechner/pi-ai";
import type { ThinkLevel } from "../../auto-reply/thinking.js";
import { usesMoonshotThinkingPayloadCompatStatic } from "../moonshot-provider-compat.js";
import { normalizeProviderId } from "../provider-id.js";
import { streamWithPayloadPatch } from "./stream-payload-utils.js";
export {
createMoonshotThinkingWrapper,
@@ -41,19 +42,10 @@ export function shouldApplyMoonshotPayloadCompat(params: {
export function createSiliconFlowThinkingWrapper(baseStreamFn: StreamFn | undefined): StreamFn {
const underlying = baseStreamFn ?? streamSimple;
return (model, context, options) => {
const originalOnPayload = options?.onPayload;
return underlying(model, context, {
...options,
onPayload: (payload) => {
if (payload && typeof payload === "object") {
const payloadObj = payload as Record<string, unknown>;
if (payloadObj.thinking === "off") {
payloadObj.thinking = null;
}
}
return originalOnPayload?.(payload, model);
},
return (model, context, options) =>
streamWithPayloadPatch(underlying, model, context, options, (payloadObj) => {
if (payloadObj.thinking === "off") {
payloadObj.thinking = null;
}
});
};
}

View File

@@ -1,6 +1,7 @@
import type { StreamFn } from "@mariozechner/pi-agent-core";
import { streamSimple } from "@mariozechner/pi-ai";
import type { ThinkLevel } from "../../auto-reply/thinking.js";
import { streamWithPayloadPatch } from "./stream-payload-utils.js";
type MoonshotThinkingType = "enabled" | "disabled";
@@ -62,33 +63,24 @@ export function createMoonshotThinkingWrapper(
thinkingType?: MoonshotThinkingType,
): StreamFn {
const underlying = baseStreamFn ?? streamSimple;
return (model, context, options) => {
const originalOnPayload = options?.onPayload;
return underlying(model, context, {
...options,
onPayload: (payload) => {
if (payload && typeof payload === "object") {
const payloadObj = payload as Record<string, unknown>;
let effectiveThinkingType = normalizeMoonshotThinkingType(payloadObj.thinking);
return (model, context, options) =>
streamWithPayloadPatch(underlying, model, context, options, (payloadObj) => {
let effectiveThinkingType = normalizeMoonshotThinkingType(payloadObj.thinking);
if (thinkingType) {
payloadObj.thinking = { type: thinkingType };
effectiveThinkingType = thinkingType;
}
if (thinkingType) {
payloadObj.thinking = { type: thinkingType };
effectiveThinkingType = thinkingType;
}
if (
effectiveThinkingType === "enabled" &&
!isMoonshotToolChoiceCompatible(payloadObj.tool_choice)
) {
if (payloadObj.tool_choice === "required") {
payloadObj.tool_choice = "auto";
} else if (isPinnedToolChoice(payloadObj.tool_choice)) {
payloadObj.thinking = { type: "disabled" };
}
}
if (
effectiveThinkingType === "enabled" &&
!isMoonshotToolChoiceCompatible(payloadObj.tool_choice)
) {
if (payloadObj.tool_choice === "required") {
payloadObj.tool_choice = "auto";
} else if (isPinnedToolChoice(payloadObj.tool_choice)) {
payloadObj.thinking = { type: "disabled" };
}
return originalOnPayload?.(payload, model);
},
}
});
};
}

View File

@@ -2,6 +2,7 @@ import type { StreamFn } from "@mariozechner/pi-agent-core";
import { streamSimple } from "@mariozechner/pi-ai";
import type { ThinkLevel } from "../../auto-reply/thinking.js";
import { resolveProviderAttributionHeaders } from "../provider-attribution.js";
import { streamWithPayloadPatch } from "./stream-payload-utils.js";
const KILOCODE_FEATURE_HEADER = "X-KILOCODE-FEATURE";
const KILOCODE_FEATURE_DEFAULT = "openclaw";
const KILOCODE_FEATURE_ENV_VAR = "KILOCODE_FEATURE";
@@ -66,30 +67,25 @@ export function createOpenRouterSystemCacheWrapper(baseStreamFn: StreamFn | unde
return underlying(model, context, options);
}
const originalOnPayload = options?.onPayload;
return underlying(model, context, {
...options,
onPayload: (payload) => {
const messages = (payload as Record<string, unknown>)?.messages;
if (Array.isArray(messages)) {
for (const msg of messages as Array<{ role?: string; content?: unknown }>) {
if (msg.role !== "system" && msg.role !== "developer") {
continue;
}
if (typeof msg.content === "string") {
msg.content = [
{ type: "text", text: msg.content, cache_control: { type: "ephemeral" } },
];
} else if (Array.isArray(msg.content) && msg.content.length > 0) {
const last = msg.content[msg.content.length - 1];
if (last && typeof last === "object") {
(last as Record<string, unknown>).cache_control = { type: "ephemeral" };
}
return streamWithPayloadPatch(underlying, model, context, options, (payloadObj) => {
const messages = payloadObj.messages;
if (Array.isArray(messages)) {
for (const msg of messages as Array<{ role?: string; content?: unknown }>) {
if (msg.role !== "system" && msg.role !== "developer") {
continue;
}
if (typeof msg.content === "string") {
msg.content = [
{ type: "text", text: msg.content, cache_control: { type: "ephemeral" } },
];
} else if (Array.isArray(msg.content) && msg.content.length > 0) {
const last = msg.content[msg.content.length - 1];
if (last && typeof last === "object") {
(last as Record<string, unknown>).cache_control = { type: "ephemeral" };
}
}
}
return originalOnPayload?.(payload, model);
},
}
});
};
}
@@ -100,19 +96,22 @@ export function createOpenRouterWrapper(
): StreamFn {
const underlying = baseStreamFn ?? streamSimple;
return (model, context, options) => {
const onPayload = options?.onPayload;
const attributionHeaders = resolveProviderAttributionHeaders("openrouter");
return underlying(model, context, {
...options,
headers: {
...attributionHeaders,
...options?.headers,
return streamWithPayloadPatch(
underlying,
model,
context,
{
...options,
headers: {
...attributionHeaders,
...options?.headers,
},
},
onPayload: (payload) => {
(payload) => {
normalizeProxyReasoningPayload(payload, thinkingLevel);
return onPayload?.(payload, model);
},
});
);
};
}
@@ -126,17 +125,20 @@ export function createKilocodeWrapper(
): StreamFn {
const underlying = baseStreamFn ?? streamSimple;
return (model, context, options) => {
const onPayload = options?.onPayload;
return underlying(model, context, {
...options,
headers: {
...options?.headers,
...resolveKilocodeAppHeaders(),
return streamWithPayloadPatch(
underlying,
model,
context,
{
...options,
headers: {
...options?.headers,
...resolveKilocodeAppHeaders(),
},
},
onPayload: (payload) => {
(payload) => {
normalizeProxyReasoningPayload(payload, thinkingLevel);
return onPayload?.(payload, model);
},
});
);
};
}

View File

@@ -1,5 +1,6 @@
import type { StreamFn } from "@mariozechner/pi-agent-core";
import { streamSimple } from "@mariozechner/pi-ai";
import { streamWithPayloadPatch } from "./stream-payload-utils.js";
/**
* Inject `tool_stream=true` so tool-call deltas stream in real time.
@@ -15,15 +16,8 @@ export function createToolStreamWrapper(
return underlying(model, context, options);
}
const originalOnPayload = options?.onPayload;
return underlying(model, context, {
...options,
onPayload: (payload) => {
if (payload && typeof payload === "object") {
(payload as Record<string, unknown>).tool_stream = true;
}
return originalOnPayload?.(payload, model);
},
return streamWithPayloadPatch(underlying, model, context, options, (payloadObj) => {
payloadObj.tool_stream = true;
});
};
}

View File

@@ -22,6 +22,7 @@ export {
createOpenAIAttributionHeadersWrapper,
createOpenAIDefaultTransportWrapper,
} from "../agents/pi-embedded-runner/openai-stream-wrappers.js";
export { streamWithPayloadPatch } from "../agents/pi-embedded-runner/stream-payload-utils.js";
export {
createToolStreamWrapper,
createZaiToolStreamWrapper,