diff --git a/extensions/openai/openai-provider.test.ts b/extensions/openai/openai-provider.test.ts index 330d24e73c1..3cb2872f5f0 100644 --- a/extensions/openai/openai-provider.test.ts +++ b/extensions/openai/openai-provider.test.ts @@ -19,9 +19,7 @@ vi.mock("openclaw/plugin-sdk/provider-stream-family", async (importOriginal) => const wrapStreamFn: NonNullable = ( ctx, ) => { - let nextStreamFn = actual.createOpenAIAttributionHeadersWrapper(ctx.streamFn, { - codexNativeTransportStreamFn: mocks.openAIResponsesTransportStreamFn, - }); + let nextStreamFn = actual.createOpenAIAttributionHeadersWrapper(ctx.streamFn); if (actual.resolveOpenAIFastMode(ctx.extraParams)) { nextStreamFn = actual.createOpenAIFastModeWrapper(nextStreamFn); diff --git a/src/agents/openai-transport-stream.test.ts b/src/agents/openai-transport-stream.test.ts index 16d580ae97b..28d09495a46 100644 --- a/src/agents/openai-transport-stream.test.ts +++ b/src/agents/openai-transport-stream.test.ts @@ -14,6 +14,7 @@ import { attachModelProviderRequestTransport } from "./provider-request-config.j import { buildTransportAwareSimpleStreamFn, createBoundaryAwareStreamFnForModel, + createOpenClawTransportStreamFnForModel, isTransportAwareApiSupported, prepareTransportAwareSimpleModel, resolveTransportAwareSimpleApi, @@ -179,6 +180,20 @@ describe("openai transport stream", () => { maxTokens: 8192, } satisfies Model<"openai-responses">), ).toBeTypeOf("function"); + expect( + createOpenClawTransportStreamFnForModel({ + id: "gpt-5.4", + name: "GPT-5.4", + api: "openai-responses", + provider: "openai", + baseUrl: "https://api.openai.com/v1", + reasoning: true, + input: ["text"], + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + contextWindow: 200000, + maxTokens: 8192, + } satisfies Model<"openai-responses">), + ).toBeTypeOf("function"); expect( createBoundaryAwareStreamFnForModel({ id: "codex-mini-latest", diff --git a/src/agents/openai-ws-stream.test.ts b/src/agents/openai-ws-stream.test.ts index 20309599434..0daa7d05050 100644 --- a/src/agents/openai-ws-stream.test.ts +++ b/src/agents/openai-ws-stream.test.ts @@ -3421,6 +3421,12 @@ describe("createOpenAIWebSocketStreamFn", () => { }); }); + it("keeps the default websocket HTTP fallback on the OpenClaw transport", () => { + expect( + openAIWsStreamTesting.getDefaultHttpFallbackStreamFnForTest(modelStub as never), + ).toBeTypeOf("function"); + }); + it("forwards temperature and maxTokens to response.create", async () => { const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-temp"); const opts = { temperature: 0.3, maxTokens: 256 }; diff --git a/src/agents/openai-ws-stream.ts b/src/agents/openai-ws-stream.ts index 12849375fe4..279a5e69a3e 100644 --- a/src/agents/openai-ws-stream.ts +++ b/src/agents/openai-ws-stream.ts @@ -16,13 +16,13 @@ import * as piAi from "@mariozechner/pi-ai"; * Key behaviours: * - Per-session `OpenAIWebSocketManager` (keyed by sessionId) * - Tracks `previous_response_id` to send only incremental tool-result inputs - * - Falls back to `streamSimple` (HTTP) if the WebSocket connection fails + * - Falls back to the OpenClaw HTTP transport if the WebSocket connection fails * - Cleanup helpers for releasing sessions after the run completes * * Complexity budget & risk mitigation: * - **Transport aware**: respects `transport` (`auto` | `websocket` | `sse`) * - **Transparent fallback in `auto` mode**: connect/send failures fall back to - * the existing HTTP `streamSimple`; forced `websocket` mode surfaces WS errors + * the existing HTTP path; forced `websocket` mode surfaces WS errors * - **Zero shared state**: per-session registry; session cleanup on dispose prevents leaks * - **Full parity**: all generation options (temperature, top_p, max_output_tokens, * tool_choice, reasoning) forwarded identically to the HTTP path @@ -63,7 +63,7 @@ import type { ResponseCreateEvent } from "./openai-ws-types.js"; import { log } from "./pi-embedded-runner/logger.js"; import { resolveProviderEndpoint } from "./provider-attribution.js"; import { normalizeProviderId } from "./provider-id.js"; -import { createBoundaryAwareStreamFnForModel } from "./provider-transport-stream.js"; +import { createOpenClawTransportStreamFnForModel } from "./provider-transport-stream.js"; import { buildAssistantMessageWithZeroUsage, buildStreamErrorAssistantMessage, @@ -124,7 +124,9 @@ type AssistantMessageWithPhase = AssistantMessage & { phase?: OpenAIResponsesAss const defaultOpenAIWsStreamDeps: OpenAIWsStreamDeps = { createManager: (options) => new OpenAIWebSocketManager(options), - createHttpFallbackStreamFn: (model) => createBoundaryAwareStreamFnForModel(model), + // WebSocket auto-mode HTTP fallback must keep the OpenClaw transport path so + // degraded sessions do not leak cache-boundary markers or lose strict tools. + createHttpFallbackStreamFn: (model) => createOpenClawTransportStreamFnForModel(model), streamSimple: (...args) => piAi.streamSimple(...args), }; @@ -697,8 +699,8 @@ async function runWarmUp(params: { * connection; subsequent calls reuse it, sending only incremental tool-result * inputs with `previous_response_id`. * - * If the WebSocket connection is unavailable, the function falls back to the - * standard `streamSimple` HTTP path and logs a warning. + * If the WebSocket connection is unavailable, the function falls back to an + * OpenClaw HTTP transport when available, or the standard `streamSimple` path. * * @param apiKey OpenAI API key * @param sessionId Agent session ID (used as the registry key) @@ -1358,6 +1360,9 @@ export const __testing = { } : defaultOpenAIWsStreamDeps; }, + getDefaultHttpFallbackStreamFnForTest(model: ProviderRuntimeModel): StreamFn | undefined { + return defaultOpenAIWsStreamDeps.createHttpFallbackStreamFn(model); + }, setWsDegradeCooldownMsForTest(nextMs?: number) { wsDegradeCooldownMsOverride = nextMs; }, diff --git a/src/agents/pi-embedded-runner-extraparams.test.ts b/src/agents/pi-embedded-runner-extraparams.test.ts index 768be43172e..c8dfc220b0f 100644 --- a/src/agents/pi-embedded-runner-extraparams.test.ts +++ b/src/agents/pi-embedded-runner-extraparams.test.ts @@ -416,9 +416,7 @@ function createTestOpenAIProviderWrapper( if (withDefaultTransport) { streamFn = createOpenAIDefaultTransportWrapper(streamFn); } - streamFn = createOpenAIAttributionHeadersWrapper(streamFn, { - codexNativeTransportStreamFn: params.context.streamFn, - }); + streamFn = createOpenAIAttributionHeadersWrapper(streamFn); if (resolveOpenAIFastMode(params.context.extraParams)) { streamFn = createOpenAIFastModeWrapper(streamFn); diff --git a/src/agents/pi-embedded-runner/openai-stream-wrappers.test.ts b/src/agents/pi-embedded-runner/openai-stream-wrappers.test.ts index 29f4ac23839..8cc968cc727 100644 --- a/src/agents/pi-embedded-runner/openai-stream-wrappers.test.ts +++ b/src/agents/pi-embedded-runner/openai-stream-wrappers.test.ts @@ -211,7 +211,7 @@ describe("createOpenAIThinkingLevelWrapper", () => { }); describe("createOpenAIAttributionHeadersWrapper", () => { - it("routes native Codex traffic through the OpenClaw transport when no wrapped stream exists", () => { + it("routes native Codex traffic through the OpenClaw transport so attribution survives PI defaults", () => { let codexCalls = 0; let capturedHeaders: Record | undefined; const codexTransport: StreamFn = (_model, _context, options) => { diff --git a/src/agents/provider-transport-stream.test.ts b/src/agents/provider-transport-stream.test.ts index 92bb8614289..3a125dd460d 100644 --- a/src/agents/provider-transport-stream.test.ts +++ b/src/agents/provider-transport-stream.test.ts @@ -4,6 +4,7 @@ import { attachModelProviderRequestTransport } from "./provider-request-config.j import { buildTransportAwareSimpleStreamFn, createBoundaryAwareStreamFnForModel, + createOpenClawTransportStreamFnForModel, createTransportAwareStreamFnForModel, isTransportAwareApiSupported, prepareTransportAwareSimpleModel, @@ -151,4 +152,40 @@ describe("provider transport stream contracts", () => { expect(buildTransportAwareSimpleStreamFn(model)).toBeUndefined(); expect(prepareTransportAwareSimpleModel(model)).toBe(model); }); + + it("keeps OpenAI API-key default streams on OpenClaw transport", () => { + const cases = [ + buildModel("openai-responses", { + id: "gpt-5.4", + provider: "openai", + baseUrl: "https://api.openai.com/v1", + }), + buildModel("openai-completions", { + id: "gpt-4o", + provider: "openai", + baseUrl: "https://api.openai.com/v1", + }), + ] as const; + + for (const model of cases) { + expect(createBoundaryAwareStreamFnForModel(model)).toBeTypeOf("function"); + expect(createOpenClawTransportStreamFnForModel(model)).toBeTypeOf("function"); + expect(createTransportAwareStreamFnForModel(model)).toBeUndefined(); + expect(buildTransportAwareSimpleStreamFn(model)).toBeUndefined(); + expect(prepareTransportAwareSimpleModel(model)).toBe(model); + } + }); + + it("keeps Codex defaults on the OpenClaw transport until PI preserves attribution", () => { + const model = buildModel("openai-codex-responses", { + id: "gpt-5.4", + provider: "openai-codex", + baseUrl: "https://chatgpt.com/backend-api", + }); + + expect(createBoundaryAwareStreamFnForModel(model)).toBeTypeOf("function"); + expect(createTransportAwareStreamFnForModel(model)).toBeUndefined(); + expect(buildTransportAwareSimpleStreamFn(model)).toBeUndefined(); + expect(prepareTransportAwareSimpleModel(model)).toBe(model); + }); }); diff --git a/src/agents/provider-transport-stream.ts b/src/agents/provider-transport-stream.ts index 47c648c9943..6b02f298c9b 100644 --- a/src/agents/provider-transport-stream.ts +++ b/src/agents/provider-transport-stream.ts @@ -121,10 +121,26 @@ export function createTransportAwareStreamFnForModel( return createSupportedTransportStreamFn(model, ctx); } +export function createOpenClawTransportStreamFnForModel( + model: Model, + ctx?: ProviderTransportStreamContext, +): StreamFn | undefined { + // Explicit fallback callers use this when they need OpenClaw's HTTP + // transport semantics regardless of the default embedded-runner strategy. + // Native OpenAI HTTP still depends on this path for strict tool shaping, + // attribution, cache-boundary stripping, and runtime credential injection. + if (!isTransportAwareApiSupported(model.api)) { + return undefined; + } + return createSupportedTransportStreamFn(model, ctx); +} + export function createBoundaryAwareStreamFnForModel( model: Model, ctx?: ProviderTransportStreamContext, ): StreamFn | undefined { + // Default embedded-runner fallback. Keep OpenAI-family APIs here until PI's + // native HTTP streams preserve the same OpenClaw request contract. if (!isTransportAwareApiSupported(model.api)) { return undefined; }