mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-20 14:30:57 +00:00
Verified: - pnpm build - pnpm check - pnpm test:macmini Co-authored-by: vishaltandale00 <9222298+vishaltandale00@users.noreply.github.com> Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
346 lines
11 KiB
TypeScript
346 lines
11 KiB
TypeScript
import type { StreamFn } from "@mariozechner/pi-agent-core";
|
|
import type { SimpleStreamOptions } from "@mariozechner/pi-ai";
|
|
import { streamSimple } from "@mariozechner/pi-ai";
|
|
import type { OpenClawConfig } from "../../config/config.js";
|
|
import { log } from "./logger.js";
|
|
|
|
const OPENROUTER_APP_HEADERS: Record<string, string> = {
|
|
"HTTP-Referer": "https://openclaw.ai",
|
|
"X-Title": "OpenClaw",
|
|
};
|
|
const ANTHROPIC_CONTEXT_1M_BETA = "context-1m-2025-08-07";
|
|
const ANTHROPIC_1M_MODEL_PREFIXES = ["claude-opus-4", "claude-sonnet-4"] as const;
|
|
// NOTE: We only force `store=true` for *direct* OpenAI Responses.
|
|
// Codex responses (chatgpt.com/backend-api/codex/responses) require `store=false`.
|
|
const OPENAI_RESPONSES_APIS = new Set(["openai-responses"]);
|
|
const OPENAI_RESPONSES_PROVIDERS = new Set(["openai"]);
|
|
|
|
/**
|
|
* Resolve provider-specific extra params from model config.
|
|
* Used to pass through stream params like temperature/maxTokens.
|
|
*
|
|
* @internal Exported for testing only
|
|
*/
|
|
export function resolveExtraParams(params: {
|
|
cfg: OpenClawConfig | undefined;
|
|
provider: string;
|
|
modelId: string;
|
|
}): Record<string, unknown> | undefined {
|
|
const modelKey = `${params.provider}/${params.modelId}`;
|
|
const modelConfig = params.cfg?.agents?.defaults?.models?.[modelKey];
|
|
return modelConfig?.params ? { ...modelConfig.params } : undefined;
|
|
}
|
|
|
|
type CacheRetention = "none" | "short" | "long";
|
|
type CacheRetentionStreamOptions = Partial<SimpleStreamOptions> & {
|
|
cacheRetention?: CacheRetention;
|
|
};
|
|
|
|
/**
|
|
* Resolve cacheRetention from extraParams, supporting both new `cacheRetention`
|
|
* and legacy `cacheControlTtl` values for backwards compatibility.
|
|
*
|
|
* Mapping: "5m" → "short", "1h" → "long"
|
|
*
|
|
* Only applies to Anthropic provider (OpenRouter uses openai-completions API
|
|
* with hardcoded cache_control, not the cacheRetention stream option).
|
|
*
|
|
* Defaults to "short" for Anthropic provider when not explicitly configured.
|
|
*/
|
|
function resolveCacheRetention(
|
|
extraParams: Record<string, unknown> | undefined,
|
|
provider: string,
|
|
): CacheRetention | undefined {
|
|
if (provider !== "anthropic") {
|
|
return undefined;
|
|
}
|
|
|
|
// Prefer new cacheRetention if present
|
|
const newVal = extraParams?.cacheRetention;
|
|
if (newVal === "none" || newVal === "short" || newVal === "long") {
|
|
return newVal;
|
|
}
|
|
|
|
// Fall back to legacy cacheControlTtl with mapping
|
|
const legacy = extraParams?.cacheControlTtl;
|
|
if (legacy === "5m") {
|
|
return "short";
|
|
}
|
|
if (legacy === "1h") {
|
|
return "long";
|
|
}
|
|
|
|
// Default to "short" for Anthropic when not explicitly configured
|
|
return "short";
|
|
}
|
|
|
|
function createStreamFnWithExtraParams(
|
|
baseStreamFn: StreamFn | undefined,
|
|
extraParams: Record<string, unknown> | undefined,
|
|
provider: string,
|
|
): StreamFn | undefined {
|
|
if (!extraParams || Object.keys(extraParams).length === 0) {
|
|
return undefined;
|
|
}
|
|
|
|
const streamParams: CacheRetentionStreamOptions = {};
|
|
if (typeof extraParams.temperature === "number") {
|
|
streamParams.temperature = extraParams.temperature;
|
|
}
|
|
if (typeof extraParams.maxTokens === "number") {
|
|
streamParams.maxTokens = extraParams.maxTokens;
|
|
}
|
|
const cacheRetention = resolveCacheRetention(extraParams, provider);
|
|
if (cacheRetention) {
|
|
streamParams.cacheRetention = cacheRetention;
|
|
}
|
|
|
|
if (Object.keys(streamParams).length === 0) {
|
|
return undefined;
|
|
}
|
|
|
|
log.debug(`creating streamFn wrapper with params: ${JSON.stringify(streamParams)}`);
|
|
|
|
const underlying = baseStreamFn ?? streamSimple;
|
|
const wrappedStreamFn: StreamFn = (model, context, options) =>
|
|
underlying(model, context, {
|
|
...streamParams,
|
|
...options,
|
|
});
|
|
|
|
return wrappedStreamFn;
|
|
}
|
|
|
|
function isDirectOpenAIBaseUrl(baseUrl: unknown): boolean {
|
|
if (typeof baseUrl !== "string" || !baseUrl.trim()) {
|
|
return true;
|
|
}
|
|
|
|
try {
|
|
const host = new URL(baseUrl).hostname.toLowerCase();
|
|
return host === "api.openai.com" || host === "chatgpt.com";
|
|
} catch {
|
|
const normalized = baseUrl.toLowerCase();
|
|
return normalized.includes("api.openai.com") || normalized.includes("chatgpt.com");
|
|
}
|
|
}
|
|
|
|
function shouldForceResponsesStore(model: {
|
|
api?: unknown;
|
|
provider?: unknown;
|
|
baseUrl?: unknown;
|
|
}): boolean {
|
|
if (typeof model.api !== "string" || typeof model.provider !== "string") {
|
|
return false;
|
|
}
|
|
if (!OPENAI_RESPONSES_APIS.has(model.api)) {
|
|
return false;
|
|
}
|
|
if (!OPENAI_RESPONSES_PROVIDERS.has(model.provider)) {
|
|
return false;
|
|
}
|
|
return isDirectOpenAIBaseUrl(model.baseUrl);
|
|
}
|
|
|
|
function createOpenAIResponsesStoreWrapper(baseStreamFn: StreamFn | undefined): StreamFn {
|
|
const underlying = baseStreamFn ?? streamSimple;
|
|
return (model, context, options) => {
|
|
if (!shouldForceResponsesStore(model)) {
|
|
return underlying(model, context, options);
|
|
}
|
|
|
|
const originalOnPayload = options?.onPayload;
|
|
return underlying(model, context, {
|
|
...options,
|
|
onPayload: (payload) => {
|
|
if (payload && typeof payload === "object") {
|
|
(payload as { store?: unknown }).store = true;
|
|
}
|
|
originalOnPayload?.(payload);
|
|
},
|
|
});
|
|
};
|
|
}
|
|
|
|
function isAnthropic1MModel(modelId: string): boolean {
|
|
const normalized = modelId.trim().toLowerCase();
|
|
return ANTHROPIC_1M_MODEL_PREFIXES.some((prefix) => normalized.startsWith(prefix));
|
|
}
|
|
|
|
function parseHeaderList(value: unknown): string[] {
|
|
if (typeof value !== "string") {
|
|
return [];
|
|
}
|
|
return value
|
|
.split(",")
|
|
.map((item) => item.trim())
|
|
.filter(Boolean);
|
|
}
|
|
|
|
function resolveAnthropicBetas(
|
|
extraParams: Record<string, unknown> | undefined,
|
|
provider: string,
|
|
modelId: string,
|
|
): string[] | undefined {
|
|
if (provider !== "anthropic") {
|
|
return undefined;
|
|
}
|
|
|
|
const betas = new Set<string>();
|
|
const configured = extraParams?.anthropicBeta;
|
|
if (typeof configured === "string" && configured.trim()) {
|
|
betas.add(configured.trim());
|
|
} else if (Array.isArray(configured)) {
|
|
for (const beta of configured) {
|
|
if (typeof beta === "string" && beta.trim()) {
|
|
betas.add(beta.trim());
|
|
}
|
|
}
|
|
}
|
|
|
|
if (extraParams?.context1m === true) {
|
|
if (isAnthropic1MModel(modelId)) {
|
|
betas.add(ANTHROPIC_CONTEXT_1M_BETA);
|
|
} else {
|
|
log.warn(`ignoring context1m for non-opus/sonnet model: ${provider}/${modelId}`);
|
|
}
|
|
}
|
|
|
|
return betas.size > 0 ? [...betas] : undefined;
|
|
}
|
|
|
|
function mergeAnthropicBetaHeader(
|
|
headers: Record<string, string> | undefined,
|
|
betas: string[],
|
|
): Record<string, string> {
|
|
const merged = { ...headers };
|
|
const existingKey = Object.keys(merged).find((key) => key.toLowerCase() === "anthropic-beta");
|
|
const existing = existingKey ? parseHeaderList(merged[existingKey]) : [];
|
|
const values = Array.from(new Set([...existing, ...betas]));
|
|
const key = existingKey ?? "anthropic-beta";
|
|
merged[key] = values.join(",");
|
|
return merged;
|
|
}
|
|
|
|
function createAnthropicBetaHeadersWrapper(
|
|
baseStreamFn: StreamFn | undefined,
|
|
betas: string[],
|
|
): StreamFn {
|
|
const underlying = baseStreamFn ?? streamSimple;
|
|
return (model, context, options) =>
|
|
underlying(model, context, {
|
|
...options,
|
|
headers: mergeAnthropicBetaHeader(options?.headers, betas),
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Create a streamFn wrapper that adds OpenRouter app attribution headers.
|
|
* These headers allow OpenClaw to appear on OpenRouter's leaderboard.
|
|
*/
|
|
function createOpenRouterHeadersWrapper(baseStreamFn: StreamFn | undefined): StreamFn {
|
|
const underlying = baseStreamFn ?? streamSimple;
|
|
return (model, context, options) =>
|
|
underlying(model, context, {
|
|
...options,
|
|
headers: {
|
|
...OPENROUTER_APP_HEADERS,
|
|
...options?.headers,
|
|
},
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Create a streamFn wrapper that injects tool_stream=true for Z.AI providers.
|
|
*
|
|
* Z.AI's API supports the `tool_stream` parameter to enable real-time streaming
|
|
* of tool call arguments and reasoning content. When enabled, the API returns
|
|
* progressive tool_call deltas, allowing users to see tool execution in real-time.
|
|
*
|
|
* @see https://docs.z.ai/api-reference#streaming
|
|
*/
|
|
function createZaiToolStreamWrapper(
|
|
baseStreamFn: StreamFn | undefined,
|
|
enabled: boolean,
|
|
): StreamFn {
|
|
const underlying = baseStreamFn ?? streamSimple;
|
|
return (model, context, options) => {
|
|
if (!enabled) {
|
|
return underlying(model, context, options);
|
|
}
|
|
|
|
const originalOnPayload = options?.onPayload;
|
|
return underlying(model, context, {
|
|
...options,
|
|
onPayload: (payload) => {
|
|
if (payload && typeof payload === "object") {
|
|
// Inject tool_stream: true for Z.AI API
|
|
(payload as Record<string, unknown>).tool_stream = true;
|
|
}
|
|
originalOnPayload?.(payload);
|
|
},
|
|
});
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Apply extra params (like temperature) to an agent's streamFn.
|
|
* Also adds OpenRouter app attribution headers when using the OpenRouter provider.
|
|
*
|
|
* @internal Exported for testing
|
|
*/
|
|
export function applyExtraParamsToAgent(
|
|
agent: { streamFn?: StreamFn },
|
|
cfg: OpenClawConfig | undefined,
|
|
provider: string,
|
|
modelId: string,
|
|
extraParamsOverride?: Record<string, unknown>,
|
|
): void {
|
|
const extraParams = resolveExtraParams({
|
|
cfg,
|
|
provider,
|
|
modelId,
|
|
});
|
|
const override =
|
|
extraParamsOverride && Object.keys(extraParamsOverride).length > 0
|
|
? Object.fromEntries(
|
|
Object.entries(extraParamsOverride).filter(([, value]) => value !== undefined),
|
|
)
|
|
: undefined;
|
|
const merged = Object.assign({}, extraParams, override);
|
|
const wrappedStreamFn = createStreamFnWithExtraParams(agent.streamFn, merged, provider);
|
|
|
|
if (wrappedStreamFn) {
|
|
log.debug(`applying extraParams to agent streamFn for ${provider}/${modelId}`);
|
|
agent.streamFn = wrappedStreamFn;
|
|
}
|
|
|
|
const anthropicBetas = resolveAnthropicBetas(merged, provider, modelId);
|
|
if (anthropicBetas?.length) {
|
|
log.debug(
|
|
`applying Anthropic beta header for ${provider}/${modelId}: ${anthropicBetas.join(",")}`,
|
|
);
|
|
agent.streamFn = createAnthropicBetaHeadersWrapper(agent.streamFn, anthropicBetas);
|
|
}
|
|
|
|
if (provider === "openrouter") {
|
|
log.debug(`applying OpenRouter app attribution headers for ${provider}/${modelId}`);
|
|
agent.streamFn = createOpenRouterHeadersWrapper(agent.streamFn);
|
|
}
|
|
|
|
// Enable Z.AI tool_stream for real-time tool call streaming.
|
|
// Enabled by default for Z.AI provider, can be disabled via params.tool_stream: false
|
|
if (provider === "zai" || provider === "z-ai") {
|
|
const toolStreamEnabled = merged?.tool_stream !== false;
|
|
if (toolStreamEnabled) {
|
|
log.debug(`enabling Z.AI tool_stream for ${provider}/${modelId}`);
|
|
agent.streamFn = createZaiToolStreamWrapper(agent.streamFn, true);
|
|
}
|
|
}
|
|
|
|
// Work around upstream pi-ai hardcoding `store: false` for Responses API.
|
|
// Force `store=true` for direct OpenAI/OpenAI Codex providers so multi-turn
|
|
// server-side conversation state is preserved.
|
|
agent.streamFn = createOpenAIResponsesStoreWrapper(agent.streamFn);
|
|
}
|