From 52bc809143c641bdb7f2a66b2d321a9c640bd140 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 8 Mar 2026 17:12:58 +0000 Subject: [PATCH] refactor: extract provider stream wrappers --- .../anthropic-stream-wrappers.ts | 319 ++++++++ src/agents/pi-embedded-runner/extra-params.ts | 696 +----------------- .../moonshot-stream-wrappers.ts | 113 +++ .../proxy-stream-wrappers.ts | 145 ++++ 4 files changed, 597 insertions(+), 676 deletions(-) create mode 100644 src/agents/pi-embedded-runner/anthropic-stream-wrappers.ts create mode 100644 src/agents/pi-embedded-runner/moonshot-stream-wrappers.ts create mode 100644 src/agents/pi-embedded-runner/proxy-stream-wrappers.ts diff --git a/src/agents/pi-embedded-runner/anthropic-stream-wrappers.ts b/src/agents/pi-embedded-runner/anthropic-stream-wrappers.ts new file mode 100644 index 00000000000..77c5e82f814 --- /dev/null +++ b/src/agents/pi-embedded-runner/anthropic-stream-wrappers.ts @@ -0,0 +1,319 @@ +import type { StreamFn } from "@mariozechner/pi-agent-core"; +import { streamSimple } from "@mariozechner/pi-ai"; +import { + requiresOpenAiCompatibleAnthropicToolPayload, + usesOpenAiFunctionAnthropicToolSchema, + usesOpenAiStringModeAnthropicToolChoice, +} from "../provider-capabilities.js"; +import { log } from "./logger.js"; + +const ANTHROPIC_CONTEXT_1M_BETA = "context-1m-2025-08-07"; +const ANTHROPIC_1M_MODEL_PREFIXES = ["claude-opus-4", "claude-sonnet-4"] as const; +const PI_AI_DEFAULT_ANTHROPIC_BETAS = [ + "fine-grained-tool-streaming-2025-05-14", + "interleaved-thinking-2025-05-14", +] as const; +const PI_AI_OAUTH_ANTHROPIC_BETAS = [ + "claude-code-20250219", + "oauth-2025-04-20", + ...PI_AI_DEFAULT_ANTHROPIC_BETAS, +] as const; + +type CacheRetention = "none" | "short" | "long"; + +function isAnthropic1MModel(modelId: string): boolean { + const normalized = modelId.trim().toLowerCase(); + return ANTHROPIC_1M_MODEL_PREFIXES.some((prefix) => normalized.startsWith(prefix)); +} + +function parseHeaderList(value: unknown): string[] { + if (typeof value !== "string") { + return []; + } + return value + .split(",") + .map((item) => item.trim()) + .filter(Boolean); +} + +function mergeAnthropicBetaHeader( + headers: Record | undefined, + betas: string[], +): Record { + const merged = { ...headers }; + const existingKey = Object.keys(merged).find((key) => key.toLowerCase() === "anthropic-beta"); + const existing = existingKey ? parseHeaderList(merged[existingKey]) : []; + const values = Array.from(new Set([...existing, ...betas])); + const key = existingKey ?? "anthropic-beta"; + merged[key] = values.join(","); + return merged; +} + +function isAnthropicOAuthApiKey(apiKey: unknown): boolean { + return typeof apiKey === "string" && apiKey.includes("sk-ant-oat"); +} + +function requiresAnthropicToolPayloadCompatibilityForModel(model: { + api?: unknown; + provider?: unknown; + compat?: unknown; +}): boolean { + if (model.api !== "anthropic-messages") { + return false; + } + + if ( + typeof model.provider === "string" && + requiresOpenAiCompatibleAnthropicToolPayload(model.provider) + ) { + return true; + } + + if (!model.compat || typeof model.compat !== "object" || Array.isArray(model.compat)) { + return false; + } + + return ( + (model.compat as { requiresOpenAiAnthropicToolPayload?: unknown }) + .requiresOpenAiAnthropicToolPayload === true + ); +} + +function usesOpenAiFunctionAnthropicToolSchemaForModel(model: { + provider?: unknown; + compat?: unknown; +}): boolean { + if (typeof model.provider === "string" && usesOpenAiFunctionAnthropicToolSchema(model.provider)) { + return true; + } + if (!model.compat || typeof model.compat !== "object" || Array.isArray(model.compat)) { + return false; + } + return ( + (model.compat as { requiresOpenAiAnthropicToolPayload?: unknown }) + .requiresOpenAiAnthropicToolPayload === true + ); +} + +function usesOpenAiStringModeAnthropicToolChoiceForModel(model: { + provider?: unknown; + compat?: unknown; +}): boolean { + if ( + typeof model.provider === "string" && + usesOpenAiStringModeAnthropicToolChoice(model.provider) + ) { + return true; + } + if (!model.compat || typeof model.compat !== "object" || Array.isArray(model.compat)) { + return false; + } + return ( + (model.compat as { requiresOpenAiAnthropicToolPayload?: unknown }) + .requiresOpenAiAnthropicToolPayload === true + ); +} + +function normalizeOpenAiFunctionAnthropicToolDefinition( + tool: unknown, +): Record | undefined { + if (!tool || typeof tool !== "object" || Array.isArray(tool)) { + return undefined; + } + + const toolObj = tool as Record; + if (toolObj.function && typeof toolObj.function === "object") { + return toolObj; + } + + const rawName = typeof toolObj.name === "string" ? toolObj.name.trim() : ""; + if (!rawName) { + return toolObj; + } + + const functionSpec: Record = { + name: rawName, + parameters: + toolObj.input_schema && typeof toolObj.input_schema === "object" + ? toolObj.input_schema + : toolObj.parameters && typeof toolObj.parameters === "object" + ? toolObj.parameters + : { type: "object", properties: {} }, + }; + + if (typeof toolObj.description === "string" && toolObj.description.trim()) { + functionSpec.description = toolObj.description; + } + if (typeof toolObj.strict === "boolean") { + functionSpec.strict = toolObj.strict; + } + + return { + type: "function", + function: functionSpec, + }; +} + +function normalizeOpenAiStringModeAnthropicToolChoice(toolChoice: unknown): unknown { + if (!toolChoice || typeof toolChoice !== "object" || Array.isArray(toolChoice)) { + return toolChoice; + } + + const choice = toolChoice as Record; + if (choice.type === "auto") { + return "auto"; + } + if (choice.type === "none") { + return "none"; + } + if (choice.type === "required" || choice.type === "any") { + return "required"; + } + if (choice.type === "tool" && typeof choice.name === "string" && choice.name.trim()) { + return { + type: "function", + function: { name: choice.name.trim() }, + }; + } + + return toolChoice; +} + +export function resolveCacheRetention( + extraParams: Record | undefined, + provider: string, +): CacheRetention | undefined { + const isAnthropicDirect = provider === "anthropic"; + const hasBedrockOverride = + extraParams?.cacheRetention !== undefined || extraParams?.cacheControlTtl !== undefined; + const isAnthropicBedrock = provider === "amazon-bedrock" && hasBedrockOverride; + + if (!isAnthropicDirect && !isAnthropicBedrock) { + return undefined; + } + + const newVal = extraParams?.cacheRetention; + if (newVal === "none" || newVal === "short" || newVal === "long") { + return newVal; + } + + const legacy = extraParams?.cacheControlTtl; + if (legacy === "5m") { + return "short"; + } + if (legacy === "1h") { + return "long"; + } + + return isAnthropicDirect ? "short" : undefined; +} + +export function resolveAnthropicBetas( + extraParams: Record | undefined, + provider: string, + modelId: string, +): string[] | undefined { + if (provider !== "anthropic") { + return undefined; + } + + const betas = new Set(); + const configured = extraParams?.anthropicBeta; + if (typeof configured === "string" && configured.trim()) { + betas.add(configured.trim()); + } else if (Array.isArray(configured)) { + for (const beta of configured) { + if (typeof beta === "string" && beta.trim()) { + betas.add(beta.trim()); + } + } + } + + if (extraParams?.context1m === true) { + if (isAnthropic1MModel(modelId)) { + betas.add(ANTHROPIC_CONTEXT_1M_BETA); + } else { + log.warn(`ignoring context1m for non-opus/sonnet model: ${provider}/${modelId}`); + } + } + + return betas.size > 0 ? [...betas] : undefined; +} + +export function createAnthropicBetaHeadersWrapper( + baseStreamFn: StreamFn | undefined, + betas: string[], +): StreamFn { + const underlying = baseStreamFn ?? streamSimple; + return (model, context, options) => { + const isOauth = isAnthropicOAuthApiKey(options?.apiKey); + const requestedContext1m = betas.includes(ANTHROPIC_CONTEXT_1M_BETA); + const effectiveBetas = + isOauth && requestedContext1m + ? betas.filter((beta) => beta !== ANTHROPIC_CONTEXT_1M_BETA) + : betas; + if (isOauth && requestedContext1m) { + log.warn( + `ignoring context1m for OAuth token auth on ${model.provider}/${model.id}; Anthropic rejects context-1m beta with OAuth auth`, + ); + } + + const piAiBetas = isOauth + ? (PI_AI_OAUTH_ANTHROPIC_BETAS as readonly string[]) + : (PI_AI_DEFAULT_ANTHROPIC_BETAS as readonly string[]); + const allBetas = [...new Set([...piAiBetas, ...effectiveBetas])]; + return underlying(model, context, { + ...options, + headers: mergeAnthropicBetaHeader(options?.headers, allBetas), + }); + }; +} + +export function createAnthropicToolPayloadCompatibilityWrapper( + baseStreamFn: StreamFn | undefined, +): StreamFn { + const underlying = baseStreamFn ?? streamSimple; + return (model, context, options) => { + const originalOnPayload = options?.onPayload; + return underlying(model, context, { + ...options, + onPayload: (payload) => { + if ( + payload && + typeof payload === "object" && + requiresAnthropicToolPayloadCompatibilityForModel(model) + ) { + const payloadObj = payload as Record; + if ( + Array.isArray(payloadObj.tools) && + usesOpenAiFunctionAnthropicToolSchemaForModel(model) + ) { + payloadObj.tools = payloadObj.tools + .map((tool) => normalizeOpenAiFunctionAnthropicToolDefinition(tool)) + .filter((tool): tool is Record => !!tool); + } + if (usesOpenAiStringModeAnthropicToolChoiceForModel(model)) { + payloadObj.tool_choice = normalizeOpenAiStringModeAnthropicToolChoice( + payloadObj.tool_choice, + ); + } + } + originalOnPayload?.(payload); + }, + }); + }; +} + +export function createBedrockNoCacheWrapper(baseStreamFn: StreamFn | undefined): StreamFn { + const underlying = baseStreamFn ?? streamSimple; + return (model, context, options) => + underlying(model, context, { + ...options, + cacheRetention: "none", + }); +} + +export function isAnthropicBedrockModel(modelId: string): boolean { + const normalized = modelId.toLowerCase(); + return normalized.includes("anthropic.claude") || normalized.includes("anthropic/claude"); +} diff --git a/src/agents/pi-embedded-runner/extra-params.ts b/src/agents/pi-embedded-runner/extra-params.ts index 89451dc614d..7054d765f81 100644 --- a/src/agents/pi-embedded-runner/extra-params.ts +++ b/src/agents/pi-embedded-runner/extra-params.ts @@ -4,11 +4,20 @@ import { streamSimple } from "@mariozechner/pi-ai"; import type { ThinkLevel } from "../../auto-reply/thinking.js"; import type { OpenClawConfig } from "../../config/config.js"; import { - requiresOpenAiCompatibleAnthropicToolPayload, - usesOpenAiFunctionAnthropicToolSchema, - usesOpenAiStringModeAnthropicToolChoice, -} from "../provider-capabilities.js"; + createAnthropicBetaHeadersWrapper, + createAnthropicToolPayloadCompatibilityWrapper, + createBedrockNoCacheWrapper, + isAnthropicBedrockModel, + resolveAnthropicBetas, + resolveCacheRetention, +} from "./anthropic-stream-wrappers.js"; import { log } from "./logger.js"; +import { + createMoonshotThinkingWrapper, + createSiliconFlowThinkingWrapper, + resolveMoonshotThinkingType, + shouldApplySiliconFlowThinkingOffCompat, +} from "./moonshot-stream-wrappers.js"; import { createCodexDefaultTransportWrapper, createOpenAIDefaultTransportWrapper, @@ -16,22 +25,13 @@ import { createOpenAIServiceTierWrapper, resolveOpenAIServiceTier, } from "./openai-stream-wrappers.js"; +import { + createKilocodeWrapper, + createOpenRouterSystemCacheWrapper, + createOpenRouterWrapper, + isProxyReasoningUnsupported, +} from "./proxy-stream-wrappers.js"; -const OPENROUTER_APP_HEADERS: Record = { - "HTTP-Referer": "https://openclaw.ai", - "X-Title": "OpenClaw", -}; -const KILOCODE_FEATURE_HEADER = "X-KILOCODE-FEATURE"; -const KILOCODE_FEATURE_DEFAULT = "openclaw"; -const KILOCODE_FEATURE_ENV_VAR = "KILOCODE_FEATURE"; - -function resolveKilocodeAppHeaders(): Record { - const feature = process.env[KILOCODE_FEATURE_ENV_VAR]?.trim() || KILOCODE_FEATURE_DEFAULT; - return { [KILOCODE_FEATURE_HEADER]: feature }; -} - -const ANTHROPIC_CONTEXT_1M_BETA = "context-1m-2025-08-07"; -const ANTHROPIC_1M_MODEL_PREFIXES = ["claude-opus-4", "claude-sonnet-4"] as const; /** * Resolve provider-specific extra params from model config. * Used to pass through stream params like temperature/maxTokens. @@ -70,65 +70,11 @@ export function resolveExtraParams(params: { return merged; } -type CacheRetention = "none" | "short" | "long"; type CacheRetentionStreamOptions = Partial & { - cacheRetention?: CacheRetention; + cacheRetention?: "none" | "short" | "long"; openaiWsWarmup?: boolean; }; -/** - * Resolve cacheRetention from extraParams, supporting both new `cacheRetention` - * and legacy `cacheControlTtl` values for backwards compatibility. - * - * Mapping: "5m" → "short", "1h" → "long" - * - * Applies to: - * - direct Anthropic provider - * - Anthropic Claude models on Bedrock when cache retention is explicitly configured - * - * OpenRouter uses openai-completions API with hardcoded cache_control instead - * of the cacheRetention stream option. - * - * Defaults to "short" for direct Anthropic when not explicitly configured. - */ -function resolveCacheRetention( - extraParams: Record | undefined, - provider: string, -): CacheRetention | undefined { - const isAnthropicDirect = provider === "anthropic"; - const hasBedrockOverride = - extraParams?.cacheRetention !== undefined || extraParams?.cacheControlTtl !== undefined; - const isAnthropicBedrock = provider === "amazon-bedrock" && hasBedrockOverride; - - if (!isAnthropicDirect && !isAnthropicBedrock) { - return undefined; - } - - // Prefer new cacheRetention if present - const newVal = extraParams?.cacheRetention; - if (newVal === "none" || newVal === "short" || newVal === "long") { - return newVal; - } - - // Fall back to legacy cacheControlTtl with mapping - const legacy = extraParams?.cacheControlTtl; - if (legacy === "5m") { - return "short"; - } - if (legacy === "1h") { - return "long"; - } - - // Default to "short" only for direct Anthropic when not explicitly configured. - // Bedrock retains upstream provider defaults unless explicitly set. - if (!isAnthropicDirect) { - return undefined; - } - - // Default to "short" for direct Anthropic when not explicitly configured - return "short"; -} - function createStreamFnWithExtraParams( baseStreamFn: StreamFn | undefined, extraParams: Record | undefined, @@ -201,608 +147,6 @@ function createStreamFnWithExtraParams( return wrappedStreamFn; } -function isAnthropicBedrockModel(modelId: string): boolean { - const normalized = modelId.toLowerCase(); - return normalized.includes("anthropic.claude") || normalized.includes("anthropic/claude"); -} - -function createBedrockNoCacheWrapper(baseStreamFn: StreamFn | undefined): StreamFn { - const underlying = baseStreamFn ?? streamSimple; - return (model, context, options) => - underlying(model, context, { - ...options, - cacheRetention: "none", - }); -} - -function isAnthropic1MModel(modelId: string): boolean { - const normalized = modelId.trim().toLowerCase(); - return ANTHROPIC_1M_MODEL_PREFIXES.some((prefix) => normalized.startsWith(prefix)); -} - -function parseHeaderList(value: unknown): string[] { - if (typeof value !== "string") { - return []; - } - return value - .split(",") - .map((item) => item.trim()) - .filter(Boolean); -} - -function resolveAnthropicBetas( - extraParams: Record | undefined, - provider: string, - modelId: string, -): string[] | undefined { - if (provider !== "anthropic") { - return undefined; - } - - const betas = new Set(); - const configured = extraParams?.anthropicBeta; - if (typeof configured === "string" && configured.trim()) { - betas.add(configured.trim()); - } else if (Array.isArray(configured)) { - for (const beta of configured) { - if (typeof beta === "string" && beta.trim()) { - betas.add(beta.trim()); - } - } - } - - if (extraParams?.context1m === true) { - if (isAnthropic1MModel(modelId)) { - betas.add(ANTHROPIC_CONTEXT_1M_BETA); - } else { - log.warn(`ignoring context1m for non-opus/sonnet model: ${provider}/${modelId}`); - } - } - - return betas.size > 0 ? [...betas] : undefined; -} - -function mergeAnthropicBetaHeader( - headers: Record | undefined, - betas: string[], -): Record { - const merged = { ...headers }; - const existingKey = Object.keys(merged).find((key) => key.toLowerCase() === "anthropic-beta"); - const existing = existingKey ? parseHeaderList(merged[existingKey]) : []; - const values = Array.from(new Set([...existing, ...betas])); - const key = existingKey ?? "anthropic-beta"; - merged[key] = values.join(","); - return merged; -} - -// Betas that pi-ai's createClient injects for standard Anthropic API key calls. -// Must be included when injecting anthropic-beta via options.headers, because -// pi-ai's mergeHeaders uses Object.assign (last-wins), which would otherwise -// overwrite the hardcoded defaultHeaders["anthropic-beta"]. -const PI_AI_DEFAULT_ANTHROPIC_BETAS = [ - "fine-grained-tool-streaming-2025-05-14", - "interleaved-thinking-2025-05-14", -] as const; - -// Additional betas pi-ai injects when the API key is an OAuth token (sk-ant-oat-*). -// These are required for Anthropic to accept OAuth Bearer auth. Losing oauth-2025-04-20 -// causes a 401 "OAuth authentication is currently not supported". -const PI_AI_OAUTH_ANTHROPIC_BETAS = [ - "claude-code-20250219", - "oauth-2025-04-20", - ...PI_AI_DEFAULT_ANTHROPIC_BETAS, -] as const; - -function isAnthropicOAuthApiKey(apiKey: unknown): boolean { - return typeof apiKey === "string" && apiKey.includes("sk-ant-oat"); -} - -function createAnthropicBetaHeadersWrapper( - baseStreamFn: StreamFn | undefined, - betas: string[], -): StreamFn { - const underlying = baseStreamFn ?? streamSimple; - return (model, context, options) => { - const isOauth = isAnthropicOAuthApiKey(options?.apiKey); - const requestedContext1m = betas.includes(ANTHROPIC_CONTEXT_1M_BETA); - const effectiveBetas = - isOauth && requestedContext1m - ? betas.filter((beta) => beta !== ANTHROPIC_CONTEXT_1M_BETA) - : betas; - if (isOauth && requestedContext1m) { - log.warn( - `ignoring context1m for OAuth token auth on ${model.provider}/${model.id}; Anthropic rejects context-1m beta with OAuth auth`, - ); - } - - // Preserve the betas pi-ai's createClient would inject for the given token type. - // Without this, our options.headers["anthropic-beta"] overwrites the pi-ai - // defaultHeaders via Object.assign, stripping critical betas like oauth-2025-04-20. - const piAiBetas = isOauth - ? (PI_AI_OAUTH_ANTHROPIC_BETAS as readonly string[]) - : (PI_AI_DEFAULT_ANTHROPIC_BETAS as readonly string[]); - const allBetas = [...new Set([...piAiBetas, ...effectiveBetas])]; - return underlying(model, context, { - ...options, - headers: mergeAnthropicBetaHeader(options?.headers, allBetas), - }); - }; -} - -function isOpenRouterAnthropicModel(provider: string, modelId: string): boolean { - return provider.toLowerCase() === "openrouter" && modelId.toLowerCase().startsWith("anthropic/"); -} - -type PayloadMessage = { - role?: string; - content?: unknown; -}; - -/** - * Inject cache_control into the system message for OpenRouter Anthropic models. - * OpenRouter passes through Anthropic's cache_control field — caching the system - * prompt avoids re-processing it on every request. - */ -function createOpenRouterSystemCacheWrapper(baseStreamFn: StreamFn | undefined): StreamFn { - const underlying = baseStreamFn ?? streamSimple; - return (model, context, options) => { - if ( - typeof model.provider !== "string" || - typeof model.id !== "string" || - !isOpenRouterAnthropicModel(model.provider, model.id) - ) { - return underlying(model, context, options); - } - - const originalOnPayload = options?.onPayload; - return underlying(model, context, { - ...options, - onPayload: (payload) => { - const messages = (payload as Record)?.messages; - if (Array.isArray(messages)) { - for (const msg of messages as PayloadMessage[]) { - if (msg.role !== "system" && msg.role !== "developer") { - continue; - } - if (typeof msg.content === "string") { - msg.content = [ - { type: "text", text: msg.content, cache_control: { type: "ephemeral" } }, - ]; - } else if (Array.isArray(msg.content) && msg.content.length > 0) { - const last = msg.content[msg.content.length - 1]; - if (last && typeof last === "object") { - (last as Record).cache_control = { type: "ephemeral" }; - } - } - } - } - originalOnPayload?.(payload); - }, - }); - }; -} - -/** - * Map OpenClaw's ThinkLevel to OpenRouter's reasoning.effort values. - * "off" maps to "none"; all other levels pass through as-is. - */ -function mapThinkingLevelToOpenRouterReasoningEffort( - thinkingLevel: ThinkLevel, -): "none" | "minimal" | "low" | "medium" | "high" | "xhigh" { - if (thinkingLevel === "off") { - return "none"; - } - if (thinkingLevel === "adaptive") { - return "medium"; - } - return thinkingLevel; -} - -function shouldApplySiliconFlowThinkingOffCompat(params: { - provider: string; - modelId: string; - thinkingLevel?: ThinkLevel; -}): boolean { - return ( - params.provider === "siliconflow" && - params.thinkingLevel === "off" && - params.modelId.startsWith("Pro/") - ); -} - -/** - * SiliconFlow's Pro/* models reject string thinking modes (including "off") - * with HTTP 400 invalid-parameter errors. Normalize to `thinking: null` to - * preserve "thinking disabled" intent without sending an invalid enum value. - */ -function createSiliconFlowThinkingWrapper(baseStreamFn: StreamFn | undefined): StreamFn { - const underlying = baseStreamFn ?? streamSimple; - return (model, context, options) => { - const originalOnPayload = options?.onPayload; - return underlying(model, context, { - ...options, - onPayload: (payload) => { - if (payload && typeof payload === "object") { - const payloadObj = payload as Record; - if (payloadObj.thinking === "off") { - payloadObj.thinking = null; - } - } - originalOnPayload?.(payload); - }, - }); - }; -} - -type MoonshotThinkingType = "enabled" | "disabled"; - -function normalizeMoonshotThinkingType(value: unknown): MoonshotThinkingType | undefined { - if (typeof value === "boolean") { - return value ? "enabled" : "disabled"; - } - if (typeof value === "string") { - const normalized = value.trim().toLowerCase(); - if ( - normalized === "enabled" || - normalized === "enable" || - normalized === "on" || - normalized === "true" - ) { - return "enabled"; - } - if ( - normalized === "disabled" || - normalized === "disable" || - normalized === "off" || - normalized === "false" - ) { - return "disabled"; - } - return undefined; - } - if (value && typeof value === "object" && !Array.isArray(value)) { - const typeValue = (value as Record).type; - return normalizeMoonshotThinkingType(typeValue); - } - return undefined; -} - -function resolveMoonshotThinkingType(params: { - configuredThinking: unknown; - thinkingLevel?: ThinkLevel; -}): MoonshotThinkingType | undefined { - const configured = normalizeMoonshotThinkingType(params.configuredThinking); - if (configured) { - return configured; - } - if (!params.thinkingLevel) { - return undefined; - } - return params.thinkingLevel === "off" ? "disabled" : "enabled"; -} - -function isMoonshotToolChoiceCompatible(toolChoice: unknown): boolean { - if (toolChoice == null) { - return true; - } - if (toolChoice === "auto" || toolChoice === "none") { - return true; - } - if (typeof toolChoice === "object" && !Array.isArray(toolChoice)) { - const typeValue = (toolChoice as Record).type; - return typeValue === "auto" || typeValue === "none"; - } - return false; -} - -/** - * Moonshot Kimi supports native binary thinking mode: - * - { thinking: { type: "enabled" } } - * - { thinking: { type: "disabled" } } - * - * When thinking is enabled, Moonshot only accepts tool_choice auto|none. - * Normalize incompatible values to auto instead of failing the request. - */ -function createMoonshotThinkingWrapper( - baseStreamFn: StreamFn | undefined, - thinkingType?: MoonshotThinkingType, -): StreamFn { - const underlying = baseStreamFn ?? streamSimple; - return (model, context, options) => { - const originalOnPayload = options?.onPayload; - return underlying(model, context, { - ...options, - onPayload: (payload) => { - if (payload && typeof payload === "object") { - const payloadObj = payload as Record; - let effectiveThinkingType = normalizeMoonshotThinkingType(payloadObj.thinking); - - if (thinkingType) { - payloadObj.thinking = { type: thinkingType }; - effectiveThinkingType = thinkingType; - } - - if ( - effectiveThinkingType === "enabled" && - !isMoonshotToolChoiceCompatible(payloadObj.tool_choice) - ) { - payloadObj.tool_choice = "auto"; - } - } - originalOnPayload?.(payload); - }, - }); - }; -} - -function requiresAnthropicToolPayloadCompatibilityForModel(model: { - api?: unknown; - provider?: unknown; - compat?: unknown; -}): boolean { - if (model.api !== "anthropic-messages") { - return false; - } - - if ( - typeof model.provider === "string" && - requiresOpenAiCompatibleAnthropicToolPayload(model.provider) - ) { - return true; - } - - if (!model.compat || typeof model.compat !== "object" || Array.isArray(model.compat)) { - return false; - } - - return ( - (model.compat as { requiresOpenAiAnthropicToolPayload?: unknown }) - .requiresOpenAiAnthropicToolPayload === true - ); -} - -function usesOpenAiFunctionAnthropicToolSchemaForModel(model: { - provider?: unknown; - compat?: unknown; -}): boolean { - if (typeof model.provider === "string" && usesOpenAiFunctionAnthropicToolSchema(model.provider)) { - return true; - } - if (!model.compat || typeof model.compat !== "object" || Array.isArray(model.compat)) { - return false; - } - return ( - (model.compat as { requiresOpenAiAnthropicToolPayload?: unknown }) - .requiresOpenAiAnthropicToolPayload === true - ); -} - -function usesOpenAiStringModeAnthropicToolChoiceForModel(model: { - provider?: unknown; - compat?: unknown; -}): boolean { - if ( - typeof model.provider === "string" && - usesOpenAiStringModeAnthropicToolChoice(model.provider) - ) { - return true; - } - if (!model.compat || typeof model.compat !== "object" || Array.isArray(model.compat)) { - return false; - } - return ( - (model.compat as { requiresOpenAiAnthropicToolPayload?: unknown }) - .requiresOpenAiAnthropicToolPayload === true - ); -} - -function normalizeOpenAiFunctionAnthropicToolDefinition( - tool: unknown, -): Record | undefined { - if (!tool || typeof tool !== "object" || Array.isArray(tool)) { - return undefined; - } - - const toolObj = tool as Record; - if (toolObj.function && typeof toolObj.function === "object") { - return toolObj; - } - - const rawName = typeof toolObj.name === "string" ? toolObj.name.trim() : ""; - if (!rawName) { - return toolObj; - } - - const functionSpec: Record = { - name: rawName, - parameters: - toolObj.input_schema && typeof toolObj.input_schema === "object" - ? toolObj.input_schema - : toolObj.parameters && typeof toolObj.parameters === "object" - ? toolObj.parameters - : { type: "object", properties: {} }, - }; - - if (typeof toolObj.description === "string" && toolObj.description.trim()) { - functionSpec.description = toolObj.description; - } - if (typeof toolObj.strict === "boolean") { - functionSpec.strict = toolObj.strict; - } - - return { - type: "function", - function: functionSpec, - }; -} - -function normalizeOpenAiStringModeAnthropicToolChoice(toolChoice: unknown): unknown { - if (!toolChoice || typeof toolChoice !== "object" || Array.isArray(toolChoice)) { - return toolChoice; - } - - const choice = toolChoice as Record; - if (choice.type === "auto") { - return "auto"; - } - if (choice.type === "none") { - return "none"; - } - if (choice.type === "required") { - return "required"; - } - if (choice.type === "any") { - return "required"; - } - if (choice.type === "tool" && typeof choice.name === "string" && choice.name.trim()) { - return { - type: "function", - function: { name: choice.name.trim() }, - }; - } - - return toolChoice; -} - -/** - * Some anthropic-messages providers accept Anthropic framing but still expect - * OpenAI-style tool payloads (`tools[].function`, string tool_choice modes). - */ -function createAnthropicToolPayloadCompatibilityWrapper( - baseStreamFn: StreamFn | undefined, -): StreamFn { - const underlying = baseStreamFn ?? streamSimple; - return (model, context, options) => { - const originalOnPayload = options?.onPayload; - return underlying(model, context, { - ...options, - onPayload: (payload) => { - if ( - payload && - typeof payload === "object" && - requiresAnthropicToolPayloadCompatibilityForModel(model) - ) { - const payloadObj = payload as Record; - if ( - Array.isArray(payloadObj.tools) && - usesOpenAiFunctionAnthropicToolSchemaForModel(model) - ) { - payloadObj.tools = payloadObj.tools - .map((tool) => normalizeOpenAiFunctionAnthropicToolDefinition(tool)) - .filter((tool): tool is Record => !!tool); - } - if (usesOpenAiStringModeAnthropicToolChoiceForModel(model)) { - payloadObj.tool_choice = normalizeOpenAiStringModeAnthropicToolChoice( - payloadObj.tool_choice, - ); - } - } - originalOnPayload?.(payload); - }, - }); - }; -} - -/** - * Create a streamFn wrapper that adds OpenRouter app attribution headers - * and injects reasoning.effort based on the configured thinking level. - */ -function normalizeProxyReasoningPayload(payload: unknown, thinkingLevel?: ThinkLevel): void { - if (!payload || typeof payload !== "object") { - return; - } - - const payloadObj = payload as Record; - - // pi-ai may inject a top-level reasoning_effort (OpenAI flat format). - // OpenRouter-compatible proxy gateways expect the nested reasoning.effort - // shape instead, and some models reject the flat field outright. - delete payloadObj.reasoning_effort; - - // When thinking is "off", or provider/model guards disable injection, - // leave reasoning unset after normalizing away the legacy flat field. - if (!thinkingLevel || thinkingLevel === "off") { - return; - } - - const existingReasoning = payloadObj.reasoning; - - // OpenRouter treats reasoning.effort and reasoning.max_tokens as - // alternative controls. If max_tokens is already present, do not inject - // effort and do not overwrite caller-supplied reasoning. - if ( - existingReasoning && - typeof existingReasoning === "object" && - !Array.isArray(existingReasoning) - ) { - const reasoningObj = existingReasoning as Record; - if (!("max_tokens" in reasoningObj) && !("effort" in reasoningObj)) { - reasoningObj.effort = mapThinkingLevelToOpenRouterReasoningEffort(thinkingLevel); - } - } else if (!existingReasoning) { - payloadObj.reasoning = { - effort: mapThinkingLevelToOpenRouterReasoningEffort(thinkingLevel), - }; - } -} - -function createOpenRouterWrapper( - baseStreamFn: StreamFn | undefined, - thinkingLevel?: ThinkLevel, -): StreamFn { - const underlying = baseStreamFn ?? streamSimple; - return (model, context, options) => { - const onPayload = options?.onPayload; - return underlying(model, context, { - ...options, - headers: { - ...OPENROUTER_APP_HEADERS, - ...options?.headers, - }, - onPayload: (payload) => { - normalizeProxyReasoningPayload(payload, thinkingLevel); - onPayload?.(payload); - }, - }); - }; -} - -/** - * Models on OpenRouter-style proxy providers that reject `reasoning.effort`. - */ -function isProxyReasoningUnsupported(modelId: string): boolean { - const id = modelId.toLowerCase(); - return id.startsWith("x-ai/"); -} - -/** - * Create a streamFn wrapper that adds the Kilocode feature attribution header - * and injects reasoning.effort based on the configured thinking level. - * - * The Kilocode provider gateway manages provider-specific quirks (e.g. cache - * control) server-side, so we only handle header injection and reasoning here. - */ -function createKilocodeWrapper( - baseStreamFn: StreamFn | undefined, - thinkingLevel?: ThinkLevel, -): StreamFn { - const underlying = baseStreamFn ?? streamSimple; - return (model, context, options) => { - const onPayload = options?.onPayload; - return underlying(model, context, { - ...options, - headers: { - ...options?.headers, - ...resolveKilocodeAppHeaders(), - }, - onPayload: (payload) => { - normalizeProxyReasoningPayload(payload, thinkingLevel); - onPayload?.(payload); - }, - }); - }; -} - function isGemini31Model(modelId: string): boolean { const normalized = modelId.toLowerCase(); return normalized.includes("gemini-3.1-pro") || normalized.includes("gemini-3.1-flash"); diff --git a/src/agents/pi-embedded-runner/moonshot-stream-wrappers.ts b/src/agents/pi-embedded-runner/moonshot-stream-wrappers.ts new file mode 100644 index 00000000000..0cb17c6d49e --- /dev/null +++ b/src/agents/pi-embedded-runner/moonshot-stream-wrappers.ts @@ -0,0 +1,113 @@ +import type { StreamFn } from "@mariozechner/pi-agent-core"; +import { streamSimple } from "@mariozechner/pi-ai"; +import type { ThinkLevel } from "../../auto-reply/thinking.js"; + +type MoonshotThinkingType = "enabled" | "disabled"; + +function normalizeMoonshotThinkingType(value: unknown): MoonshotThinkingType | undefined { + if (typeof value === "boolean") { + return value ? "enabled" : "disabled"; + } + if (typeof value === "string") { + const normalized = value.trim().toLowerCase(); + if (["enabled", "enable", "on", "true"].includes(normalized)) { + return "enabled"; + } + if (["disabled", "disable", "off", "false"].includes(normalized)) { + return "disabled"; + } + return undefined; + } + if (value && typeof value === "object" && !Array.isArray(value)) { + return normalizeMoonshotThinkingType((value as Record).type); + } + return undefined; +} + +function isMoonshotToolChoiceCompatible(toolChoice: unknown): boolean { + if (toolChoice == null || toolChoice === "auto" || toolChoice === "none") { + return true; + } + if (typeof toolChoice === "object" && !Array.isArray(toolChoice)) { + const typeValue = (toolChoice as Record).type; + return typeValue === "auto" || typeValue === "none"; + } + return false; +} + +export function shouldApplySiliconFlowThinkingOffCompat(params: { + provider: string; + modelId: string; + thinkingLevel?: ThinkLevel; +}): boolean { + return ( + params.provider === "siliconflow" && + params.thinkingLevel === "off" && + params.modelId.startsWith("Pro/") + ); +} + +export function createSiliconFlowThinkingWrapper(baseStreamFn: StreamFn | undefined): StreamFn { + const underlying = baseStreamFn ?? streamSimple; + return (model, context, options) => { + const originalOnPayload = options?.onPayload; + return underlying(model, context, { + ...options, + onPayload: (payload) => { + if (payload && typeof payload === "object") { + const payloadObj = payload as Record; + if (payloadObj.thinking === "off") { + payloadObj.thinking = null; + } + } + originalOnPayload?.(payload); + }, + }); + }; +} + +export function resolveMoonshotThinkingType(params: { + configuredThinking: unknown; + thinkingLevel?: ThinkLevel; +}): MoonshotThinkingType | undefined { + const configured = normalizeMoonshotThinkingType(params.configuredThinking); + if (configured) { + return configured; + } + if (!params.thinkingLevel) { + return undefined; + } + return params.thinkingLevel === "off" ? "disabled" : "enabled"; +} + +export function createMoonshotThinkingWrapper( + baseStreamFn: StreamFn | undefined, + thinkingType?: MoonshotThinkingType, +): StreamFn { + const underlying = baseStreamFn ?? streamSimple; + return (model, context, options) => { + const originalOnPayload = options?.onPayload; + return underlying(model, context, { + ...options, + onPayload: (payload) => { + if (payload && typeof payload === "object") { + const payloadObj = payload as Record; + let effectiveThinkingType = normalizeMoonshotThinkingType(payloadObj.thinking); + + if (thinkingType) { + payloadObj.thinking = { type: thinkingType }; + effectiveThinkingType = thinkingType; + } + + if ( + effectiveThinkingType === "enabled" && + !isMoonshotToolChoiceCompatible(payloadObj.tool_choice) + ) { + payloadObj.tool_choice = "auto"; + } + } + originalOnPayload?.(payload); + }, + }); + }; +} diff --git a/src/agents/pi-embedded-runner/proxy-stream-wrappers.ts b/src/agents/pi-embedded-runner/proxy-stream-wrappers.ts new file mode 100644 index 00000000000..5e8076ad49c --- /dev/null +++ b/src/agents/pi-embedded-runner/proxy-stream-wrappers.ts @@ -0,0 +1,145 @@ +import type { StreamFn } from "@mariozechner/pi-agent-core"; +import { streamSimple } from "@mariozechner/pi-ai"; +import type { ThinkLevel } from "../../auto-reply/thinking.js"; + +const OPENROUTER_APP_HEADERS: Record = { + "HTTP-Referer": "https://openclaw.ai", + "X-Title": "OpenClaw", +}; +const KILOCODE_FEATURE_HEADER = "X-KILOCODE-FEATURE"; +const KILOCODE_FEATURE_DEFAULT = "openclaw"; +const KILOCODE_FEATURE_ENV_VAR = "KILOCODE_FEATURE"; + +function resolveKilocodeAppHeaders(): Record { + const feature = process.env[KILOCODE_FEATURE_ENV_VAR]?.trim() || KILOCODE_FEATURE_DEFAULT; + return { [KILOCODE_FEATURE_HEADER]: feature }; +} + +function isOpenRouterAnthropicModel(provider: string, modelId: string): boolean { + return provider.toLowerCase() === "openrouter" && modelId.toLowerCase().startsWith("anthropic/"); +} + +function mapThinkingLevelToOpenRouterReasoningEffort( + thinkingLevel: ThinkLevel, +): "none" | "minimal" | "low" | "medium" | "high" | "xhigh" { + if (thinkingLevel === "off") { + return "none"; + } + if (thinkingLevel === "adaptive") { + return "medium"; + } + return thinkingLevel; +} + +function normalizeProxyReasoningPayload(payload: unknown, thinkingLevel?: ThinkLevel): void { + if (!payload || typeof payload !== "object") { + return; + } + + const payloadObj = payload as Record; + delete payloadObj.reasoning_effort; + if (!thinkingLevel || thinkingLevel === "off") { + return; + } + + const existingReasoning = payloadObj.reasoning; + if ( + existingReasoning && + typeof existingReasoning === "object" && + !Array.isArray(existingReasoning) + ) { + const reasoningObj = existingReasoning as Record; + if (!("max_tokens" in reasoningObj) && !("effort" in reasoningObj)) { + reasoningObj.effort = mapThinkingLevelToOpenRouterReasoningEffort(thinkingLevel); + } + } else if (!existingReasoning) { + payloadObj.reasoning = { + effort: mapThinkingLevelToOpenRouterReasoningEffort(thinkingLevel), + }; + } +} + +export function createOpenRouterSystemCacheWrapper(baseStreamFn: StreamFn | undefined): StreamFn { + const underlying = baseStreamFn ?? streamSimple; + return (model, context, options) => { + if ( + typeof model.provider !== "string" || + typeof model.id !== "string" || + !isOpenRouterAnthropicModel(model.provider, model.id) + ) { + return underlying(model, context, options); + } + + const originalOnPayload = options?.onPayload; + return underlying(model, context, { + ...options, + onPayload: (payload) => { + const messages = (payload as Record)?.messages; + if (Array.isArray(messages)) { + for (const msg of messages as Array<{ role?: string; content?: unknown }>) { + if (msg.role !== "system" && msg.role !== "developer") { + continue; + } + if (typeof msg.content === "string") { + msg.content = [ + { type: "text", text: msg.content, cache_control: { type: "ephemeral" } }, + ]; + } else if (Array.isArray(msg.content) && msg.content.length > 0) { + const last = msg.content[msg.content.length - 1]; + if (last && typeof last === "object") { + (last as Record).cache_control = { type: "ephemeral" }; + } + } + } + } + originalOnPayload?.(payload); + }, + }); + }; +} + +export function createOpenRouterWrapper( + baseStreamFn: StreamFn | undefined, + thinkingLevel?: ThinkLevel, +): StreamFn { + const underlying = baseStreamFn ?? streamSimple; + return (model, context, options) => { + const onPayload = options?.onPayload; + return underlying(model, context, { + ...options, + headers: { + ...OPENROUTER_APP_HEADERS, + ...options?.headers, + }, + onPayload: (payload) => { + normalizeProxyReasoningPayload(payload, thinkingLevel); + onPayload?.(payload); + }, + }); + }; +} + +export function isProxyReasoningUnsupported(modelId: string): boolean { + return modelId.toLowerCase().startsWith("x-ai/"); +} + +export function createKilocodeWrapper( + baseStreamFn: StreamFn | undefined, + thinkingLevel?: ThinkLevel, +): StreamFn { + const underlying = baseStreamFn ?? streamSimple; + return (model, context, options) => { + const onPayload = options?.onPayload; + return underlying(model, context, { + ...options, + headers: { + ...options?.headers, + ...resolveKilocodeAppHeaders(), + }, + onPayload: (payload) => { + normalizeProxyReasoningPayload(payload, thinkingLevel); + onPayload?.(payload); + }, + }); + }; +}