From e53d840fed7a2f6a1cf4732ad68c318f09d9ceea Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 8 Mar 2026 16:59:48 +0000 Subject: [PATCH] refactor: extract openai stream wrappers --- src/agents/pi-embedded-runner/extra-params.ts | 270 +----------------- .../openai-stream-wrappers.ts | 257 +++++++++++++++++ 2 files changed, 264 insertions(+), 263 deletions(-) create mode 100644 src/agents/pi-embedded-runner/openai-stream-wrappers.ts diff --git a/src/agents/pi-embedded-runner/extra-params.ts b/src/agents/pi-embedded-runner/extra-params.ts index 23178e1d1fa..89451dc614d 100644 --- a/src/agents/pi-embedded-runner/extra-params.ts +++ b/src/agents/pi-embedded-runner/extra-params.ts @@ -9,6 +9,13 @@ import { usesOpenAiStringModeAnthropicToolChoice, } from "../provider-capabilities.js"; import { log } from "./logger.js"; +import { + createCodexDefaultTransportWrapper, + createOpenAIDefaultTransportWrapper, + createOpenAIResponsesContextManagementWrapper, + createOpenAIServiceTierWrapper, + resolveOpenAIServiceTier, +} from "./openai-stream-wrappers.js"; const OPENROUTER_APP_HEADERS: Record = { "HTTP-Referer": "https://openclaw.ai", @@ -25,11 +32,6 @@ function resolveKilocodeAppHeaders(): Record { const ANTHROPIC_CONTEXT_1M_BETA = "context-1m-2025-08-07"; const ANTHROPIC_1M_MODEL_PREFIXES = ["claude-opus-4", "claude-sonnet-4"] as const; -// NOTE: We only force `store=true` for *direct* OpenAI Responses. -// Codex responses (chatgpt.com/backend-api/codex/responses) require `store=false`. -const OPENAI_RESPONSES_APIS = new Set(["openai-responses"]); -const OPENAI_RESPONSES_PROVIDERS = new Set(["openai", "azure-openai-responses"]); - /** * Resolve provider-specific extra params from model config. * Used to pass through stream params like temperature/maxTokens. @@ -69,7 +71,6 @@ export function resolveExtraParams(params: { } type CacheRetention = "none" | "short" | "long"; -type OpenAIServiceTier = "auto" | "default" | "flex" | "priority"; type CacheRetentionStreamOptions = Partial & { cacheRetention?: CacheRetention; openaiWsWarmup?: boolean; @@ -214,263 +215,6 @@ function createBedrockNoCacheWrapper(baseStreamFn: StreamFn | undefined): Stream }); } -function isDirectOpenAIBaseUrl(baseUrl: unknown): boolean { - if (typeof baseUrl !== "string" || !baseUrl.trim()) { - return false; - } - - try { - const host = new URL(baseUrl).hostname.toLowerCase(); - return ( - host === "api.openai.com" || host === "chatgpt.com" || host.endsWith(".openai.azure.com") - ); - } catch { - const normalized = baseUrl.toLowerCase(); - return ( - normalized.includes("api.openai.com") || - normalized.includes("chatgpt.com") || - normalized.includes(".openai.azure.com") - ); - } -} - -function isOpenAIPublicApiBaseUrl(baseUrl: unknown): boolean { - if (typeof baseUrl !== "string" || !baseUrl.trim()) { - return false; - } - - try { - return new URL(baseUrl).hostname.toLowerCase() === "api.openai.com"; - } catch { - return baseUrl.toLowerCase().includes("api.openai.com"); - } -} - -function shouldForceResponsesStore(model: { - api?: unknown; - provider?: unknown; - baseUrl?: unknown; - compat?: { supportsStore?: boolean }; -}): boolean { - // Never force store=true when the model explicitly declares supportsStore=false - // (e.g. Azure OpenAI Responses API without server-side persistence). - if (model.compat?.supportsStore === false) { - return false; - } - if (typeof model.api !== "string" || typeof model.provider !== "string") { - return false; - } - if (!OPENAI_RESPONSES_APIS.has(model.api)) { - return false; - } - if (!OPENAI_RESPONSES_PROVIDERS.has(model.provider)) { - return false; - } - return isDirectOpenAIBaseUrl(model.baseUrl); -} - -function parsePositiveInteger(value: unknown): number | undefined { - if (typeof value === "number" && Number.isFinite(value) && value > 0) { - return Math.floor(value); - } - if (typeof value === "string") { - const parsed = Number.parseInt(value, 10); - if (Number.isFinite(parsed) && parsed > 0) { - return parsed; - } - } - return undefined; -} - -function resolveOpenAIResponsesCompactThreshold(model: { contextWindow?: unknown }): number { - const contextWindow = parsePositiveInteger(model.contextWindow); - if (contextWindow) { - return Math.max(1_000, Math.floor(contextWindow * 0.7)); - } - return 80_000; -} - -function shouldEnableOpenAIResponsesServerCompaction( - model: { - api?: unknown; - provider?: unknown; - baseUrl?: unknown; - compat?: { supportsStore?: boolean }; - }, - extraParams: Record | undefined, -): boolean { - const configured = extraParams?.responsesServerCompaction; - if (configured === false) { - return false; - } - if (!shouldForceResponsesStore(model)) { - return false; - } - if (configured === true) { - return true; - } - // Auto-enable for direct OpenAI Responses models. - return model.provider === "openai"; -} - -function shouldStripResponsesStore( - model: { api?: unknown; compat?: { supportsStore?: boolean } }, - forceStore: boolean, -): boolean { - if (forceStore) { - return false; - } - if (typeof model.api !== "string") { - return false; - } - return OPENAI_RESPONSES_APIS.has(model.api) && model.compat?.supportsStore === false; -} - -function applyOpenAIResponsesPayloadOverrides(params: { - payloadObj: Record; - forceStore: boolean; - stripStore: boolean; - useServerCompaction: boolean; - compactThreshold: number; -}): void { - if (params.forceStore) { - params.payloadObj.store = true; - } - if (params.stripStore) { - delete params.payloadObj.store; - } - if (params.useServerCompaction && params.payloadObj.context_management === undefined) { - params.payloadObj.context_management = [ - { - type: "compaction", - compact_threshold: params.compactThreshold, - }, - ]; - } -} - -function createOpenAIResponsesContextManagementWrapper( - baseStreamFn: StreamFn | undefined, - extraParams: Record | undefined, -): StreamFn { - const underlying = baseStreamFn ?? streamSimple; - return (model, context, options) => { - const forceStore = shouldForceResponsesStore(model); - const useServerCompaction = shouldEnableOpenAIResponsesServerCompaction(model, extraParams); - // Strip `store` from the payload when the model declares supportsStore=false. - // pi-ai upstream hardcodes `store: false` for Responses API; strict - // OpenAI-compatible endpoints (e.g. Gemini via Cloudflare) reject it. - const stripStore = shouldStripResponsesStore(model, forceStore); - if (!forceStore && !useServerCompaction && !stripStore) { - return underlying(model, context, options); - } - - const compactThreshold = - parsePositiveInteger(extraParams?.responsesCompactThreshold) ?? - resolveOpenAIResponsesCompactThreshold(model); - const originalOnPayload = options?.onPayload; - return underlying(model, context, { - ...options, - onPayload: (payload) => { - if (payload && typeof payload === "object") { - applyOpenAIResponsesPayloadOverrides({ - payloadObj: payload as Record, - forceStore, - stripStore, - useServerCompaction, - compactThreshold, - }); - } - originalOnPayload?.(payload); - }, - }); - }; -} - -function normalizeOpenAIServiceTier(value: unknown): OpenAIServiceTier | undefined { - if (typeof value !== "string") { - return undefined; - } - const normalized = value.trim().toLowerCase(); - if ( - normalized === "auto" || - normalized === "default" || - normalized === "flex" || - normalized === "priority" - ) { - return normalized; - } - return undefined; -} - -function resolveOpenAIServiceTier( - extraParams: Record | undefined, -): OpenAIServiceTier | undefined { - const raw = extraParams?.serviceTier ?? extraParams?.service_tier; - const normalized = normalizeOpenAIServiceTier(raw); - if (raw !== undefined && normalized === undefined) { - const rawSummary = typeof raw === "string" ? raw : typeof raw; - log.warn(`ignoring invalid OpenAI service tier param: ${rawSummary}`); - } - return normalized; -} - -function createOpenAIServiceTierWrapper( - baseStreamFn: StreamFn | undefined, - serviceTier: OpenAIServiceTier, -): StreamFn { - const underlying = baseStreamFn ?? streamSimple; - return (model, context, options) => { - if ( - model.api !== "openai-responses" || - model.provider !== "openai" || - !isOpenAIPublicApiBaseUrl(model.baseUrl) - ) { - return underlying(model, context, options); - } - const originalOnPayload = options?.onPayload; - return underlying(model, context, { - ...options, - onPayload: (payload) => { - if (payload && typeof payload === "object") { - const payloadObj = payload as Record; - if (payloadObj.service_tier === undefined) { - payloadObj.service_tier = serviceTier; - } - } - originalOnPayload?.(payload); - }, - }); - }; -} - -function createCodexDefaultTransportWrapper(baseStreamFn: StreamFn | undefined): StreamFn { - const underlying = baseStreamFn ?? streamSimple; - return (model, context, options) => - underlying(model, context, { - ...options, - transport: options?.transport ?? "auto", - }); -} - -function createOpenAIDefaultTransportWrapper(baseStreamFn: StreamFn | undefined): StreamFn { - const underlying = baseStreamFn ?? streamSimple; - return (model, context, options) => { - const typedOptions = options as - | (SimpleStreamOptions & { openaiWsWarmup?: boolean }) - | undefined; - const mergedOptions = { - ...options, - transport: options?.transport ?? "auto", - // Warm-up is optional in OpenAI docs; enabled by default here for lower - // first-turn latency on WebSocket sessions. Set params.openaiWsWarmup=false - // to disable per model. - openaiWsWarmup: typedOptions?.openaiWsWarmup ?? true, - } as SimpleStreamOptions; - return underlying(model, context, mergedOptions); - }; -} - function isAnthropic1MModel(modelId: string): boolean { const normalized = modelId.trim().toLowerCase(); return ANTHROPIC_1M_MODEL_PREFIXES.some((prefix) => normalized.startsWith(prefix)); diff --git a/src/agents/pi-embedded-runner/openai-stream-wrappers.ts b/src/agents/pi-embedded-runner/openai-stream-wrappers.ts new file mode 100644 index 00000000000..fc72d9ca0fe --- /dev/null +++ b/src/agents/pi-embedded-runner/openai-stream-wrappers.ts @@ -0,0 +1,257 @@ +import type { StreamFn } from "@mariozechner/pi-agent-core"; +import type { SimpleStreamOptions } from "@mariozechner/pi-ai"; +import { streamSimple } from "@mariozechner/pi-ai"; +import { log } from "./logger.js"; + +type OpenAIServiceTier = "auto" | "default" | "flex" | "priority"; + +const OPENAI_RESPONSES_APIS = new Set(["openai-responses"]); +const OPENAI_RESPONSES_PROVIDERS = new Set(["openai", "azure-openai-responses"]); + +function isDirectOpenAIBaseUrl(baseUrl: unknown): boolean { + if (typeof baseUrl !== "string" || !baseUrl.trim()) { + return false; + } + + try { + const host = new URL(baseUrl).hostname.toLowerCase(); + return ( + host === "api.openai.com" || host === "chatgpt.com" || host.endsWith(".openai.azure.com") + ); + } catch { + const normalized = baseUrl.toLowerCase(); + return ( + normalized.includes("api.openai.com") || + normalized.includes("chatgpt.com") || + normalized.includes(".openai.azure.com") + ); + } +} + +function isOpenAIPublicApiBaseUrl(baseUrl: unknown): boolean { + if (typeof baseUrl !== "string" || !baseUrl.trim()) { + return false; + } + + try { + return new URL(baseUrl).hostname.toLowerCase() === "api.openai.com"; + } catch { + return baseUrl.toLowerCase().includes("api.openai.com"); + } +} + +function shouldForceResponsesStore(model: { + api?: unknown; + provider?: unknown; + baseUrl?: unknown; + compat?: { supportsStore?: boolean }; +}): boolean { + if (model.compat?.supportsStore === false) { + return false; + } + if (typeof model.api !== "string" || typeof model.provider !== "string") { + return false; + } + if (!OPENAI_RESPONSES_APIS.has(model.api)) { + return false; + } + if (!OPENAI_RESPONSES_PROVIDERS.has(model.provider)) { + return false; + } + return isDirectOpenAIBaseUrl(model.baseUrl); +} + +function parsePositiveInteger(value: unknown): number | undefined { + if (typeof value === "number" && Number.isFinite(value) && value > 0) { + return Math.floor(value); + } + if (typeof value === "string") { + const parsed = Number.parseInt(value, 10); + if (Number.isFinite(parsed) && parsed > 0) { + return parsed; + } + } + return undefined; +} + +function resolveOpenAIResponsesCompactThreshold(model: { contextWindow?: unknown }): number { + const contextWindow = parsePositiveInteger(model.contextWindow); + if (contextWindow) { + return Math.max(1_000, Math.floor(contextWindow * 0.7)); + } + return 80_000; +} + +function shouldEnableOpenAIResponsesServerCompaction( + model: { + api?: unknown; + provider?: unknown; + baseUrl?: unknown; + compat?: { supportsStore?: boolean }; + }, + extraParams: Record | undefined, +): boolean { + const configured = extraParams?.responsesServerCompaction; + if (configured === false) { + return false; + } + if (!shouldForceResponsesStore(model)) { + return false; + } + if (configured === true) { + return true; + } + return model.provider === "openai"; +} + +function shouldStripResponsesStore( + model: { api?: unknown; compat?: { supportsStore?: boolean } }, + forceStore: boolean, +): boolean { + if (forceStore) { + return false; + } + if (typeof model.api !== "string") { + return false; + } + return OPENAI_RESPONSES_APIS.has(model.api) && model.compat?.supportsStore === false; +} + +function applyOpenAIResponsesPayloadOverrides(params: { + payloadObj: Record; + forceStore: boolean; + stripStore: boolean; + useServerCompaction: boolean; + compactThreshold: number; +}): void { + if (params.forceStore) { + params.payloadObj.store = true; + } + if (params.stripStore) { + delete params.payloadObj.store; + } + if (params.useServerCompaction && params.payloadObj.context_management === undefined) { + params.payloadObj.context_management = [ + { + type: "compaction", + compact_threshold: params.compactThreshold, + }, + ]; + } +} + +function normalizeOpenAIServiceTier(value: unknown): OpenAIServiceTier | undefined { + if (typeof value !== "string") { + return undefined; + } + const normalized = value.trim().toLowerCase(); + if ( + normalized === "auto" || + normalized === "default" || + normalized === "flex" || + normalized === "priority" + ) { + return normalized; + } + return undefined; +} + +export function resolveOpenAIServiceTier( + extraParams: Record | undefined, +): OpenAIServiceTier | undefined { + const raw = extraParams?.serviceTier ?? extraParams?.service_tier; + const normalized = normalizeOpenAIServiceTier(raw); + if (raw !== undefined && normalized === undefined) { + const rawSummary = typeof raw === "string" ? raw : typeof raw; + log.warn(`ignoring invalid OpenAI service tier param: ${rawSummary}`); + } + return normalized; +} + +export function createOpenAIResponsesContextManagementWrapper( + baseStreamFn: StreamFn | undefined, + extraParams: Record | undefined, +): StreamFn { + const underlying = baseStreamFn ?? streamSimple; + return (model, context, options) => { + const forceStore = shouldForceResponsesStore(model); + const useServerCompaction = shouldEnableOpenAIResponsesServerCompaction(model, extraParams); + const stripStore = shouldStripResponsesStore(model, forceStore); + if (!forceStore && !useServerCompaction && !stripStore) { + return underlying(model, context, options); + } + + const compactThreshold = + parsePositiveInteger(extraParams?.responsesCompactThreshold) ?? + resolveOpenAIResponsesCompactThreshold(model); + const originalOnPayload = options?.onPayload; + return underlying(model, context, { + ...options, + onPayload: (payload) => { + if (payload && typeof payload === "object") { + applyOpenAIResponsesPayloadOverrides({ + payloadObj: payload as Record, + forceStore, + stripStore, + useServerCompaction, + compactThreshold, + }); + } + originalOnPayload?.(payload); + }, + }); + }; +} + +export function createOpenAIServiceTierWrapper( + baseStreamFn: StreamFn | undefined, + serviceTier: OpenAIServiceTier, +): StreamFn { + const underlying = baseStreamFn ?? streamSimple; + return (model, context, options) => { + if ( + model.api !== "openai-responses" || + model.provider !== "openai" || + !isOpenAIPublicApiBaseUrl(model.baseUrl) + ) { + return underlying(model, context, options); + } + const originalOnPayload = options?.onPayload; + return underlying(model, context, { + ...options, + onPayload: (payload) => { + if (payload && typeof payload === "object") { + const payloadObj = payload as Record; + if (payloadObj.service_tier === undefined) { + payloadObj.service_tier = serviceTier; + } + } + originalOnPayload?.(payload); + }, + }); + }; +} + +export function createCodexDefaultTransportWrapper(baseStreamFn: StreamFn | undefined): StreamFn { + const underlying = baseStreamFn ?? streamSimple; + return (model, context, options) => + underlying(model, context, { + ...options, + transport: options?.transport ?? "auto", + }); +} + +export function createOpenAIDefaultTransportWrapper(baseStreamFn: StreamFn | undefined): StreamFn { + const underlying = baseStreamFn ?? streamSimple; + return (model, context, options) => { + const typedOptions = options as + | (SimpleStreamOptions & { openaiWsWarmup?: boolean }) + | undefined; + const mergedOptions = { + ...options, + transport: options?.transport ?? "auto", + openaiWsWarmup: typedOptions?.openaiWsWarmup ?? true, + } as SimpleStreamOptions; + return underlying(model, context, mergedOptions); + }; +}