mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-12 07:20:45 +00:00
refactor: extract openai stream wrappers
This commit is contained in:
@@ -9,6 +9,13 @@ import {
|
||||
usesOpenAiStringModeAnthropicToolChoice,
|
||||
} from "../provider-capabilities.js";
|
||||
import { log } from "./logger.js";
|
||||
import {
|
||||
createCodexDefaultTransportWrapper,
|
||||
createOpenAIDefaultTransportWrapper,
|
||||
createOpenAIResponsesContextManagementWrapper,
|
||||
createOpenAIServiceTierWrapper,
|
||||
resolveOpenAIServiceTier,
|
||||
} from "./openai-stream-wrappers.js";
|
||||
|
||||
const OPENROUTER_APP_HEADERS: Record<string, string> = {
|
||||
"HTTP-Referer": "https://openclaw.ai",
|
||||
@@ -25,11 +32,6 @@ function resolveKilocodeAppHeaders(): Record<string, string> {
|
||||
|
||||
const ANTHROPIC_CONTEXT_1M_BETA = "context-1m-2025-08-07";
|
||||
const ANTHROPIC_1M_MODEL_PREFIXES = ["claude-opus-4", "claude-sonnet-4"] as const;
|
||||
// NOTE: We only force `store=true` for *direct* OpenAI Responses.
|
||||
// Codex responses (chatgpt.com/backend-api/codex/responses) require `store=false`.
|
||||
const OPENAI_RESPONSES_APIS = new Set(["openai-responses"]);
|
||||
const OPENAI_RESPONSES_PROVIDERS = new Set(["openai", "azure-openai-responses"]);
|
||||
|
||||
/**
|
||||
* Resolve provider-specific extra params from model config.
|
||||
* Used to pass through stream params like temperature/maxTokens.
|
||||
@@ -69,7 +71,6 @@ export function resolveExtraParams(params: {
|
||||
}
|
||||
|
||||
type CacheRetention = "none" | "short" | "long";
|
||||
type OpenAIServiceTier = "auto" | "default" | "flex" | "priority";
|
||||
type CacheRetentionStreamOptions = Partial<SimpleStreamOptions> & {
|
||||
cacheRetention?: CacheRetention;
|
||||
openaiWsWarmup?: boolean;
|
||||
@@ -214,263 +215,6 @@ function createBedrockNoCacheWrapper(baseStreamFn: StreamFn | undefined): Stream
|
||||
});
|
||||
}
|
||||
|
||||
function isDirectOpenAIBaseUrl(baseUrl: unknown): boolean {
|
||||
if (typeof baseUrl !== "string" || !baseUrl.trim()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
const host = new URL(baseUrl).hostname.toLowerCase();
|
||||
return (
|
||||
host === "api.openai.com" || host === "chatgpt.com" || host.endsWith(".openai.azure.com")
|
||||
);
|
||||
} catch {
|
||||
const normalized = baseUrl.toLowerCase();
|
||||
return (
|
||||
normalized.includes("api.openai.com") ||
|
||||
normalized.includes("chatgpt.com") ||
|
||||
normalized.includes(".openai.azure.com")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
function isOpenAIPublicApiBaseUrl(baseUrl: unknown): boolean {
|
||||
if (typeof baseUrl !== "string" || !baseUrl.trim()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
return new URL(baseUrl).hostname.toLowerCase() === "api.openai.com";
|
||||
} catch {
|
||||
return baseUrl.toLowerCase().includes("api.openai.com");
|
||||
}
|
||||
}
|
||||
|
||||
function shouldForceResponsesStore(model: {
|
||||
api?: unknown;
|
||||
provider?: unknown;
|
||||
baseUrl?: unknown;
|
||||
compat?: { supportsStore?: boolean };
|
||||
}): boolean {
|
||||
// Never force store=true when the model explicitly declares supportsStore=false
|
||||
// (e.g. Azure OpenAI Responses API without server-side persistence).
|
||||
if (model.compat?.supportsStore === false) {
|
||||
return false;
|
||||
}
|
||||
if (typeof model.api !== "string" || typeof model.provider !== "string") {
|
||||
return false;
|
||||
}
|
||||
if (!OPENAI_RESPONSES_APIS.has(model.api)) {
|
||||
return false;
|
||||
}
|
||||
if (!OPENAI_RESPONSES_PROVIDERS.has(model.provider)) {
|
||||
return false;
|
||||
}
|
||||
return isDirectOpenAIBaseUrl(model.baseUrl);
|
||||
}
|
||||
|
||||
function parsePositiveInteger(value: unknown): number | undefined {
|
||||
if (typeof value === "number" && Number.isFinite(value) && value > 0) {
|
||||
return Math.floor(value);
|
||||
}
|
||||
if (typeof value === "string") {
|
||||
const parsed = Number.parseInt(value, 10);
|
||||
if (Number.isFinite(parsed) && parsed > 0) {
|
||||
return parsed;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function resolveOpenAIResponsesCompactThreshold(model: { contextWindow?: unknown }): number {
|
||||
const contextWindow = parsePositiveInteger(model.contextWindow);
|
||||
if (contextWindow) {
|
||||
return Math.max(1_000, Math.floor(contextWindow * 0.7));
|
||||
}
|
||||
return 80_000;
|
||||
}
|
||||
|
||||
function shouldEnableOpenAIResponsesServerCompaction(
|
||||
model: {
|
||||
api?: unknown;
|
||||
provider?: unknown;
|
||||
baseUrl?: unknown;
|
||||
compat?: { supportsStore?: boolean };
|
||||
},
|
||||
extraParams: Record<string, unknown> | undefined,
|
||||
): boolean {
|
||||
const configured = extraParams?.responsesServerCompaction;
|
||||
if (configured === false) {
|
||||
return false;
|
||||
}
|
||||
if (!shouldForceResponsesStore(model)) {
|
||||
return false;
|
||||
}
|
||||
if (configured === true) {
|
||||
return true;
|
||||
}
|
||||
// Auto-enable for direct OpenAI Responses models.
|
||||
return model.provider === "openai";
|
||||
}
|
||||
|
||||
function shouldStripResponsesStore(
|
||||
model: { api?: unknown; compat?: { supportsStore?: boolean } },
|
||||
forceStore: boolean,
|
||||
): boolean {
|
||||
if (forceStore) {
|
||||
return false;
|
||||
}
|
||||
if (typeof model.api !== "string") {
|
||||
return false;
|
||||
}
|
||||
return OPENAI_RESPONSES_APIS.has(model.api) && model.compat?.supportsStore === false;
|
||||
}
|
||||
|
||||
function applyOpenAIResponsesPayloadOverrides(params: {
|
||||
payloadObj: Record<string, unknown>;
|
||||
forceStore: boolean;
|
||||
stripStore: boolean;
|
||||
useServerCompaction: boolean;
|
||||
compactThreshold: number;
|
||||
}): void {
|
||||
if (params.forceStore) {
|
||||
params.payloadObj.store = true;
|
||||
}
|
||||
if (params.stripStore) {
|
||||
delete params.payloadObj.store;
|
||||
}
|
||||
if (params.useServerCompaction && params.payloadObj.context_management === undefined) {
|
||||
params.payloadObj.context_management = [
|
||||
{
|
||||
type: "compaction",
|
||||
compact_threshold: params.compactThreshold,
|
||||
},
|
||||
];
|
||||
}
|
||||
}
|
||||
|
||||
function createOpenAIResponsesContextManagementWrapper(
|
||||
baseStreamFn: StreamFn | undefined,
|
||||
extraParams: Record<string, unknown> | undefined,
|
||||
): StreamFn {
|
||||
const underlying = baseStreamFn ?? streamSimple;
|
||||
return (model, context, options) => {
|
||||
const forceStore = shouldForceResponsesStore(model);
|
||||
const useServerCompaction = shouldEnableOpenAIResponsesServerCompaction(model, extraParams);
|
||||
// Strip `store` from the payload when the model declares supportsStore=false.
|
||||
// pi-ai upstream hardcodes `store: false` for Responses API; strict
|
||||
// OpenAI-compatible endpoints (e.g. Gemini via Cloudflare) reject it.
|
||||
const stripStore = shouldStripResponsesStore(model, forceStore);
|
||||
if (!forceStore && !useServerCompaction && !stripStore) {
|
||||
return underlying(model, context, options);
|
||||
}
|
||||
|
||||
const compactThreshold =
|
||||
parsePositiveInteger(extraParams?.responsesCompactThreshold) ??
|
||||
resolveOpenAIResponsesCompactThreshold(model);
|
||||
const originalOnPayload = options?.onPayload;
|
||||
return underlying(model, context, {
|
||||
...options,
|
||||
onPayload: (payload) => {
|
||||
if (payload && typeof payload === "object") {
|
||||
applyOpenAIResponsesPayloadOverrides({
|
||||
payloadObj: payload as Record<string, unknown>,
|
||||
forceStore,
|
||||
stripStore,
|
||||
useServerCompaction,
|
||||
compactThreshold,
|
||||
});
|
||||
}
|
||||
originalOnPayload?.(payload);
|
||||
},
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
function normalizeOpenAIServiceTier(value: unknown): OpenAIServiceTier | undefined {
|
||||
if (typeof value !== "string") {
|
||||
return undefined;
|
||||
}
|
||||
const normalized = value.trim().toLowerCase();
|
||||
if (
|
||||
normalized === "auto" ||
|
||||
normalized === "default" ||
|
||||
normalized === "flex" ||
|
||||
normalized === "priority"
|
||||
) {
|
||||
return normalized;
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function resolveOpenAIServiceTier(
|
||||
extraParams: Record<string, unknown> | undefined,
|
||||
): OpenAIServiceTier | undefined {
|
||||
const raw = extraParams?.serviceTier ?? extraParams?.service_tier;
|
||||
const normalized = normalizeOpenAIServiceTier(raw);
|
||||
if (raw !== undefined && normalized === undefined) {
|
||||
const rawSummary = typeof raw === "string" ? raw : typeof raw;
|
||||
log.warn(`ignoring invalid OpenAI service tier param: ${rawSummary}`);
|
||||
}
|
||||
return normalized;
|
||||
}
|
||||
|
||||
function createOpenAIServiceTierWrapper(
|
||||
baseStreamFn: StreamFn | undefined,
|
||||
serviceTier: OpenAIServiceTier,
|
||||
): StreamFn {
|
||||
const underlying = baseStreamFn ?? streamSimple;
|
||||
return (model, context, options) => {
|
||||
if (
|
||||
model.api !== "openai-responses" ||
|
||||
model.provider !== "openai" ||
|
||||
!isOpenAIPublicApiBaseUrl(model.baseUrl)
|
||||
) {
|
||||
return underlying(model, context, options);
|
||||
}
|
||||
const originalOnPayload = options?.onPayload;
|
||||
return underlying(model, context, {
|
||||
...options,
|
||||
onPayload: (payload) => {
|
||||
if (payload && typeof payload === "object") {
|
||||
const payloadObj = payload as Record<string, unknown>;
|
||||
if (payloadObj.service_tier === undefined) {
|
||||
payloadObj.service_tier = serviceTier;
|
||||
}
|
||||
}
|
||||
originalOnPayload?.(payload);
|
||||
},
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
function createCodexDefaultTransportWrapper(baseStreamFn: StreamFn | undefined): StreamFn {
|
||||
const underlying = baseStreamFn ?? streamSimple;
|
||||
return (model, context, options) =>
|
||||
underlying(model, context, {
|
||||
...options,
|
||||
transport: options?.transport ?? "auto",
|
||||
});
|
||||
}
|
||||
|
||||
function createOpenAIDefaultTransportWrapper(baseStreamFn: StreamFn | undefined): StreamFn {
|
||||
const underlying = baseStreamFn ?? streamSimple;
|
||||
return (model, context, options) => {
|
||||
const typedOptions = options as
|
||||
| (SimpleStreamOptions & { openaiWsWarmup?: boolean })
|
||||
| undefined;
|
||||
const mergedOptions = {
|
||||
...options,
|
||||
transport: options?.transport ?? "auto",
|
||||
// Warm-up is optional in OpenAI docs; enabled by default here for lower
|
||||
// first-turn latency on WebSocket sessions. Set params.openaiWsWarmup=false
|
||||
// to disable per model.
|
||||
openaiWsWarmup: typedOptions?.openaiWsWarmup ?? true,
|
||||
} as SimpleStreamOptions;
|
||||
return underlying(model, context, mergedOptions);
|
||||
};
|
||||
}
|
||||
|
||||
function isAnthropic1MModel(modelId: string): boolean {
|
||||
const normalized = modelId.trim().toLowerCase();
|
||||
return ANTHROPIC_1M_MODEL_PREFIXES.some((prefix) => normalized.startsWith(prefix));
|
||||
|
||||
257
src/agents/pi-embedded-runner/openai-stream-wrappers.ts
Normal file
257
src/agents/pi-embedded-runner/openai-stream-wrappers.ts
Normal file
@@ -0,0 +1,257 @@
|
||||
import type { StreamFn } from "@mariozechner/pi-agent-core";
|
||||
import type { SimpleStreamOptions } from "@mariozechner/pi-ai";
|
||||
import { streamSimple } from "@mariozechner/pi-ai";
|
||||
import { log } from "./logger.js";
|
||||
|
||||
type OpenAIServiceTier = "auto" | "default" | "flex" | "priority";
|
||||
|
||||
const OPENAI_RESPONSES_APIS = new Set(["openai-responses"]);
|
||||
const OPENAI_RESPONSES_PROVIDERS = new Set(["openai", "azure-openai-responses"]);
|
||||
|
||||
function isDirectOpenAIBaseUrl(baseUrl: unknown): boolean {
|
||||
if (typeof baseUrl !== "string" || !baseUrl.trim()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
const host = new URL(baseUrl).hostname.toLowerCase();
|
||||
return (
|
||||
host === "api.openai.com" || host === "chatgpt.com" || host.endsWith(".openai.azure.com")
|
||||
);
|
||||
} catch {
|
||||
const normalized = baseUrl.toLowerCase();
|
||||
return (
|
||||
normalized.includes("api.openai.com") ||
|
||||
normalized.includes("chatgpt.com") ||
|
||||
normalized.includes(".openai.azure.com")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
function isOpenAIPublicApiBaseUrl(baseUrl: unknown): boolean {
|
||||
if (typeof baseUrl !== "string" || !baseUrl.trim()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
return new URL(baseUrl).hostname.toLowerCase() === "api.openai.com";
|
||||
} catch {
|
||||
return baseUrl.toLowerCase().includes("api.openai.com");
|
||||
}
|
||||
}
|
||||
|
||||
function shouldForceResponsesStore(model: {
|
||||
api?: unknown;
|
||||
provider?: unknown;
|
||||
baseUrl?: unknown;
|
||||
compat?: { supportsStore?: boolean };
|
||||
}): boolean {
|
||||
if (model.compat?.supportsStore === false) {
|
||||
return false;
|
||||
}
|
||||
if (typeof model.api !== "string" || typeof model.provider !== "string") {
|
||||
return false;
|
||||
}
|
||||
if (!OPENAI_RESPONSES_APIS.has(model.api)) {
|
||||
return false;
|
||||
}
|
||||
if (!OPENAI_RESPONSES_PROVIDERS.has(model.provider)) {
|
||||
return false;
|
||||
}
|
||||
return isDirectOpenAIBaseUrl(model.baseUrl);
|
||||
}
|
||||
|
||||
function parsePositiveInteger(value: unknown): number | undefined {
|
||||
if (typeof value === "number" && Number.isFinite(value) && value > 0) {
|
||||
return Math.floor(value);
|
||||
}
|
||||
if (typeof value === "string") {
|
||||
const parsed = Number.parseInt(value, 10);
|
||||
if (Number.isFinite(parsed) && parsed > 0) {
|
||||
return parsed;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function resolveOpenAIResponsesCompactThreshold(model: { contextWindow?: unknown }): number {
|
||||
const contextWindow = parsePositiveInteger(model.contextWindow);
|
||||
if (contextWindow) {
|
||||
return Math.max(1_000, Math.floor(contextWindow * 0.7));
|
||||
}
|
||||
return 80_000;
|
||||
}
|
||||
|
||||
function shouldEnableOpenAIResponsesServerCompaction(
|
||||
model: {
|
||||
api?: unknown;
|
||||
provider?: unknown;
|
||||
baseUrl?: unknown;
|
||||
compat?: { supportsStore?: boolean };
|
||||
},
|
||||
extraParams: Record<string, unknown> | undefined,
|
||||
): boolean {
|
||||
const configured = extraParams?.responsesServerCompaction;
|
||||
if (configured === false) {
|
||||
return false;
|
||||
}
|
||||
if (!shouldForceResponsesStore(model)) {
|
||||
return false;
|
||||
}
|
||||
if (configured === true) {
|
||||
return true;
|
||||
}
|
||||
return model.provider === "openai";
|
||||
}
|
||||
|
||||
function shouldStripResponsesStore(
|
||||
model: { api?: unknown; compat?: { supportsStore?: boolean } },
|
||||
forceStore: boolean,
|
||||
): boolean {
|
||||
if (forceStore) {
|
||||
return false;
|
||||
}
|
||||
if (typeof model.api !== "string") {
|
||||
return false;
|
||||
}
|
||||
return OPENAI_RESPONSES_APIS.has(model.api) && model.compat?.supportsStore === false;
|
||||
}
|
||||
|
||||
function applyOpenAIResponsesPayloadOverrides(params: {
|
||||
payloadObj: Record<string, unknown>;
|
||||
forceStore: boolean;
|
||||
stripStore: boolean;
|
||||
useServerCompaction: boolean;
|
||||
compactThreshold: number;
|
||||
}): void {
|
||||
if (params.forceStore) {
|
||||
params.payloadObj.store = true;
|
||||
}
|
||||
if (params.stripStore) {
|
||||
delete params.payloadObj.store;
|
||||
}
|
||||
if (params.useServerCompaction && params.payloadObj.context_management === undefined) {
|
||||
params.payloadObj.context_management = [
|
||||
{
|
||||
type: "compaction",
|
||||
compact_threshold: params.compactThreshold,
|
||||
},
|
||||
];
|
||||
}
|
||||
}
|
||||
|
||||
function normalizeOpenAIServiceTier(value: unknown): OpenAIServiceTier | undefined {
|
||||
if (typeof value !== "string") {
|
||||
return undefined;
|
||||
}
|
||||
const normalized = value.trim().toLowerCase();
|
||||
if (
|
||||
normalized === "auto" ||
|
||||
normalized === "default" ||
|
||||
normalized === "flex" ||
|
||||
normalized === "priority"
|
||||
) {
|
||||
return normalized;
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
export function resolveOpenAIServiceTier(
|
||||
extraParams: Record<string, unknown> | undefined,
|
||||
): OpenAIServiceTier | undefined {
|
||||
const raw = extraParams?.serviceTier ?? extraParams?.service_tier;
|
||||
const normalized = normalizeOpenAIServiceTier(raw);
|
||||
if (raw !== undefined && normalized === undefined) {
|
||||
const rawSummary = typeof raw === "string" ? raw : typeof raw;
|
||||
log.warn(`ignoring invalid OpenAI service tier param: ${rawSummary}`);
|
||||
}
|
||||
return normalized;
|
||||
}
|
||||
|
||||
export function createOpenAIResponsesContextManagementWrapper(
|
||||
baseStreamFn: StreamFn | undefined,
|
||||
extraParams: Record<string, unknown> | undefined,
|
||||
): StreamFn {
|
||||
const underlying = baseStreamFn ?? streamSimple;
|
||||
return (model, context, options) => {
|
||||
const forceStore = shouldForceResponsesStore(model);
|
||||
const useServerCompaction = shouldEnableOpenAIResponsesServerCompaction(model, extraParams);
|
||||
const stripStore = shouldStripResponsesStore(model, forceStore);
|
||||
if (!forceStore && !useServerCompaction && !stripStore) {
|
||||
return underlying(model, context, options);
|
||||
}
|
||||
|
||||
const compactThreshold =
|
||||
parsePositiveInteger(extraParams?.responsesCompactThreshold) ??
|
||||
resolveOpenAIResponsesCompactThreshold(model);
|
||||
const originalOnPayload = options?.onPayload;
|
||||
return underlying(model, context, {
|
||||
...options,
|
||||
onPayload: (payload) => {
|
||||
if (payload && typeof payload === "object") {
|
||||
applyOpenAIResponsesPayloadOverrides({
|
||||
payloadObj: payload as Record<string, unknown>,
|
||||
forceStore,
|
||||
stripStore,
|
||||
useServerCompaction,
|
||||
compactThreshold,
|
||||
});
|
||||
}
|
||||
originalOnPayload?.(payload);
|
||||
},
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
export function createOpenAIServiceTierWrapper(
|
||||
baseStreamFn: StreamFn | undefined,
|
||||
serviceTier: OpenAIServiceTier,
|
||||
): StreamFn {
|
||||
const underlying = baseStreamFn ?? streamSimple;
|
||||
return (model, context, options) => {
|
||||
if (
|
||||
model.api !== "openai-responses" ||
|
||||
model.provider !== "openai" ||
|
||||
!isOpenAIPublicApiBaseUrl(model.baseUrl)
|
||||
) {
|
||||
return underlying(model, context, options);
|
||||
}
|
||||
const originalOnPayload = options?.onPayload;
|
||||
return underlying(model, context, {
|
||||
...options,
|
||||
onPayload: (payload) => {
|
||||
if (payload && typeof payload === "object") {
|
||||
const payloadObj = payload as Record<string, unknown>;
|
||||
if (payloadObj.service_tier === undefined) {
|
||||
payloadObj.service_tier = serviceTier;
|
||||
}
|
||||
}
|
||||
originalOnPayload?.(payload);
|
||||
},
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
export function createCodexDefaultTransportWrapper(baseStreamFn: StreamFn | undefined): StreamFn {
|
||||
const underlying = baseStreamFn ?? streamSimple;
|
||||
return (model, context, options) =>
|
||||
underlying(model, context, {
|
||||
...options,
|
||||
transport: options?.transport ?? "auto",
|
||||
});
|
||||
}
|
||||
|
||||
export function createOpenAIDefaultTransportWrapper(baseStreamFn: StreamFn | undefined): StreamFn {
|
||||
const underlying = baseStreamFn ?? streamSimple;
|
||||
return (model, context, options) => {
|
||||
const typedOptions = options as
|
||||
| (SimpleStreamOptions & { openaiWsWarmup?: boolean })
|
||||
| undefined;
|
||||
const mergedOptions = {
|
||||
...options,
|
||||
transport: options?.transport ?? "auto",
|
||||
openaiWsWarmup: typedOptions?.openaiWsWarmup ?? true,
|
||||
} as SimpleStreamOptions;
|
||||
return underlying(model, context, mergedOptions);
|
||||
};
|
||||
}
|
||||
Reference in New Issue
Block a user