refactor: keep OpenAI streams on OpenClaw transport

This commit is contained in:
Peter Steinberger
2026-05-01 21:46:30 +01:00
parent 364ec53785
commit cf511288b8
8 changed files with 88 additions and 13 deletions

View File

@@ -19,9 +19,7 @@ vi.mock("openclaw/plugin-sdk/provider-stream-family", async (importOriginal) =>
const wrapStreamFn: NonNullable<typeof actual.OPENAI_RESPONSES_STREAM_HOOKS.wrapStreamFn> = (
ctx,
) => {
let nextStreamFn = actual.createOpenAIAttributionHeadersWrapper(ctx.streamFn, {
codexNativeTransportStreamFn: mocks.openAIResponsesTransportStreamFn,
});
let nextStreamFn = actual.createOpenAIAttributionHeadersWrapper(ctx.streamFn);
if (actual.resolveOpenAIFastMode(ctx.extraParams)) {
nextStreamFn = actual.createOpenAIFastModeWrapper(nextStreamFn);

View File

@@ -14,6 +14,7 @@ import { attachModelProviderRequestTransport } from "./provider-request-config.j
import {
buildTransportAwareSimpleStreamFn,
createBoundaryAwareStreamFnForModel,
createOpenClawTransportStreamFnForModel,
isTransportAwareApiSupported,
prepareTransportAwareSimpleModel,
resolveTransportAwareSimpleApi,
@@ -179,6 +180,20 @@ describe("openai transport stream", () => {
maxTokens: 8192,
} satisfies Model<"openai-responses">),
).toBeTypeOf("function");
expect(
createOpenClawTransportStreamFnForModel({
id: "gpt-5.4",
name: "GPT-5.4",
api: "openai-responses",
provider: "openai",
baseUrl: "https://api.openai.com/v1",
reasoning: true,
input: ["text"],
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
contextWindow: 200000,
maxTokens: 8192,
} satisfies Model<"openai-responses">),
).toBeTypeOf("function");
expect(
createBoundaryAwareStreamFnForModel({
id: "codex-mini-latest",

View File

@@ -3421,6 +3421,12 @@ describe("createOpenAIWebSocketStreamFn", () => {
});
});
it("keeps the default websocket HTTP fallback on the OpenClaw transport", () => {
expect(
openAIWsStreamTesting.getDefaultHttpFallbackStreamFnForTest(modelStub as never),
).toBeTypeOf("function");
});
it("forwards temperature and maxTokens to response.create", async () => {
const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-temp");
const opts = { temperature: 0.3, maxTokens: 256 };

View File

@@ -16,13 +16,13 @@ import * as piAi from "@mariozechner/pi-ai";
* Key behaviours:
* - Per-session `OpenAIWebSocketManager` (keyed by sessionId)
* - Tracks `previous_response_id` to send only incremental tool-result inputs
* - Falls back to `streamSimple` (HTTP) if the WebSocket connection fails
* - Falls back to the OpenClaw HTTP transport if the WebSocket connection fails
* - Cleanup helpers for releasing sessions after the run completes
*
* Complexity budget & risk mitigation:
* - **Transport aware**: respects `transport` (`auto` | `websocket` | `sse`)
* - **Transparent fallback in `auto` mode**: connect/send failures fall back to
* the existing HTTP `streamSimple`; forced `websocket` mode surfaces WS errors
* the existing HTTP path; forced `websocket` mode surfaces WS errors
* - **Zero shared state**: per-session registry; session cleanup on dispose prevents leaks
* - **Full parity**: all generation options (temperature, top_p, max_output_tokens,
* tool_choice, reasoning) forwarded identically to the HTTP path
@@ -63,7 +63,7 @@ import type { ResponseCreateEvent } from "./openai-ws-types.js";
import { log } from "./pi-embedded-runner/logger.js";
import { resolveProviderEndpoint } from "./provider-attribution.js";
import { normalizeProviderId } from "./provider-id.js";
import { createBoundaryAwareStreamFnForModel } from "./provider-transport-stream.js";
import { createOpenClawTransportStreamFnForModel } from "./provider-transport-stream.js";
import {
buildAssistantMessageWithZeroUsage,
buildStreamErrorAssistantMessage,
@@ -124,7 +124,9 @@ type AssistantMessageWithPhase = AssistantMessage & { phase?: OpenAIResponsesAss
const defaultOpenAIWsStreamDeps: OpenAIWsStreamDeps = {
createManager: (options) => new OpenAIWebSocketManager(options),
createHttpFallbackStreamFn: (model) => createBoundaryAwareStreamFnForModel(model),
// WebSocket auto-mode HTTP fallback must keep the OpenClaw transport path so
// degraded sessions do not leak cache-boundary markers or lose strict tools.
createHttpFallbackStreamFn: (model) => createOpenClawTransportStreamFnForModel(model),
streamSimple: (...args) => piAi.streamSimple(...args),
};
@@ -697,8 +699,8 @@ async function runWarmUp(params: {
* connection; subsequent calls reuse it, sending only incremental tool-result
* inputs with `previous_response_id`.
*
* If the WebSocket connection is unavailable, the function falls back to the
* standard `streamSimple` HTTP path and logs a warning.
* If the WebSocket connection is unavailable, the function falls back to an
* OpenClaw HTTP transport when available, or the standard `streamSimple` path.
*
* @param apiKey OpenAI API key
* @param sessionId Agent session ID (used as the registry key)
@@ -1358,6 +1360,9 @@ export const __testing = {
}
: defaultOpenAIWsStreamDeps;
},
getDefaultHttpFallbackStreamFnForTest(model: ProviderRuntimeModel): StreamFn | undefined {
return defaultOpenAIWsStreamDeps.createHttpFallbackStreamFn(model);
},
setWsDegradeCooldownMsForTest(nextMs?: number) {
wsDegradeCooldownMsOverride = nextMs;
},

View File

@@ -416,9 +416,7 @@ function createTestOpenAIProviderWrapper(
if (withDefaultTransport) {
streamFn = createOpenAIDefaultTransportWrapper(streamFn);
}
streamFn = createOpenAIAttributionHeadersWrapper(streamFn, {
codexNativeTransportStreamFn: params.context.streamFn,
});
streamFn = createOpenAIAttributionHeadersWrapper(streamFn);
if (resolveOpenAIFastMode(params.context.extraParams)) {
streamFn = createOpenAIFastModeWrapper(streamFn);

View File

@@ -211,7 +211,7 @@ describe("createOpenAIThinkingLevelWrapper", () => {
});
describe("createOpenAIAttributionHeadersWrapper", () => {
it("routes native Codex traffic through the OpenClaw transport when no wrapped stream exists", () => {
it("routes native Codex traffic through the OpenClaw transport so attribution survives PI defaults", () => {
let codexCalls = 0;
let capturedHeaders: Record<string, string> | undefined;
const codexTransport: StreamFn = (_model, _context, options) => {

View File

@@ -4,6 +4,7 @@ import { attachModelProviderRequestTransport } from "./provider-request-config.j
import {
buildTransportAwareSimpleStreamFn,
createBoundaryAwareStreamFnForModel,
createOpenClawTransportStreamFnForModel,
createTransportAwareStreamFnForModel,
isTransportAwareApiSupported,
prepareTransportAwareSimpleModel,
@@ -151,4 +152,40 @@ describe("provider transport stream contracts", () => {
expect(buildTransportAwareSimpleStreamFn(model)).toBeUndefined();
expect(prepareTransportAwareSimpleModel(model)).toBe(model);
});
it("keeps OpenAI API-key default streams on OpenClaw transport", () => {
const cases = [
buildModel("openai-responses", {
id: "gpt-5.4",
provider: "openai",
baseUrl: "https://api.openai.com/v1",
}),
buildModel("openai-completions", {
id: "gpt-4o",
provider: "openai",
baseUrl: "https://api.openai.com/v1",
}),
] as const;
for (const model of cases) {
expect(createBoundaryAwareStreamFnForModel(model)).toBeTypeOf("function");
expect(createOpenClawTransportStreamFnForModel(model)).toBeTypeOf("function");
expect(createTransportAwareStreamFnForModel(model)).toBeUndefined();
expect(buildTransportAwareSimpleStreamFn(model)).toBeUndefined();
expect(prepareTransportAwareSimpleModel(model)).toBe(model);
}
});
it("keeps Codex defaults on the OpenClaw transport until PI preserves attribution", () => {
const model = buildModel("openai-codex-responses", {
id: "gpt-5.4",
provider: "openai-codex",
baseUrl: "https://chatgpt.com/backend-api",
});
expect(createBoundaryAwareStreamFnForModel(model)).toBeTypeOf("function");
expect(createTransportAwareStreamFnForModel(model)).toBeUndefined();
expect(buildTransportAwareSimpleStreamFn(model)).toBeUndefined();
expect(prepareTransportAwareSimpleModel(model)).toBe(model);
});
});

View File

@@ -121,10 +121,26 @@ export function createTransportAwareStreamFnForModel(
return createSupportedTransportStreamFn(model, ctx);
}
export function createOpenClawTransportStreamFnForModel(
model: Model<Api>,
ctx?: ProviderTransportStreamContext,
): StreamFn | undefined {
// Explicit fallback callers use this when they need OpenClaw's HTTP
// transport semantics regardless of the default embedded-runner strategy.
// Native OpenAI HTTP still depends on this path for strict tool shaping,
// attribution, cache-boundary stripping, and runtime credential injection.
if (!isTransportAwareApiSupported(model.api)) {
return undefined;
}
return createSupportedTransportStreamFn(model, ctx);
}
export function createBoundaryAwareStreamFnForModel(
model: Model<Api>,
ctx?: ProviderTransportStreamContext,
): StreamFn | undefined {
// Default embedded-runner fallback. Keep OpenAI-family APIs here until PI's
// native HTTP streams preserve the same OpenClaw request contract.
if (!isTransportAwareApiSupported(model.api)) {
return undefined;
}