diff --git a/CHANGELOG.md b/CHANGELOG.md index 104bd2a97e6..769a88a9ecb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -86,6 +86,7 @@ Docs: https://docs.openclaw.ai - ACP/ACPX streaming: pin ACPX plugin support to `0.1.15`, add configurable ACPX command/version probing, and streamline ACP stream delivery (`final_only` default + reduced tool-event noise) with matching runtime and test updates. (#30036) Thanks @osolmaz. - OpenAI/Streaming transport: make `openai` Responses WebSocket-first by default (`transport: "auto"` with SSE fallback), add shared OpenAI WS stream/connection runtime wiring with per-session cleanup, and preserve server-side compaction payload mutation (`store` + `context_management`) on the WS path. +- OpenAI/WebSocket warm-up: add optional OpenAI Responses WebSocket warm-up (`response.create` with `generate:false`), enable it by default for `openai/*`, and expose `params.openaiWsWarmup` for per-model enable/disable control. ### Fixes diff --git a/docs/concepts/model-providers.md b/docs/concepts/model-providers.md index a64b92cecb9..eb88236592d 100644 --- a/docs/concepts/model-providers.md +++ b/docs/concepts/model-providers.md @@ -45,6 +45,7 @@ OpenClaw ships with the pi‑ai catalog. These providers require **no** - CLI: `openclaw onboard --auth-choice openai-api-key` - Default transport is `auto` (WebSocket-first, SSE fallback) - Override per model via `agents.defaults.models["openai/"].params.transport` (`"sse"`, `"websocket"`, or `"auto"`) +- OpenAI Responses WebSocket warm-up defaults to enabled via `params.openaiWsWarmup` (`true`/`false`) ```json5 { diff --git a/docs/providers/openai.md b/docs/providers/openai.md index 35018c9937a..9eb167631c3 100644 --- a/docs/providers/openai.md +++ b/docs/providers/openai.md @@ -68,6 +68,9 @@ You can set `agents.defaults.models..params.transport`: - `"websocket"`: force WebSocket - `"auto"`: try WebSocket, then fall back to SSE +For `openai/*` (Responses API), OpenClaw also enables WebSocket warm-up by +default (`openaiWsWarmup: true`) when WebSocket transport is used. + ```json5 { agents: { @@ -85,6 +88,47 @@ You can set `agents.defaults.models..params.transport`: } ``` +### OpenAI WebSocket warm-up + +OpenAI docs describe warm-up as optional. OpenClaw enables it by default for +`openai/*` to reduce first-turn latency when using WebSocket transport. + +### Disable warm-up + +```json5 +{ + agents: { + defaults: { + models: { + "openai/gpt-5": { + params: { + openaiWsWarmup: false, + }, + }, + }, + }, + }, +} +``` + +### Enable warm-up explicitly + +```json5 +{ + agents: { + defaults: { + models: { + "openai/gpt-5": { + params: { + openaiWsWarmup: true, + }, + }, + }, + }, + }, +} +``` + ### OpenAI Responses server-side compaction For direct OpenAI Responses models (`openai/*` using `api: "openai-responses"` with diff --git a/src/agents/openai-ws-stream.test.ts b/src/agents/openai-ws-stream.test.ts index 0b2911ce8fa..d65670dcd0f 100644 --- a/src/agents/openai-ws-stream.test.ts +++ b/src/agents/openai-ws-stream.test.ts @@ -70,6 +70,27 @@ const { MockManager } = vi.hoisted(() => { throw new Error("Mock send failure"); } this.sentEvents.push(event); + const maybeEvent = event as { type?: string; generate?: boolean; model?: string } | null; + // Auto-complete warm-up events so warm-up-enabled tests don't hang waiting + // for the warm-up terminal event. + if (maybeEvent?.type === "response.create" && maybeEvent.generate === false) { + queueMicrotask(() => { + this.simulateEvent({ + type: "response.completed", + response: makeResponseObject(`warmup-${Date.now()}`), + }); + }); + } + } + + warmUp(params: { model: string; tools?: unknown[]; instructions?: string }): void { + this.send({ + type: "response.create", + generate: false, + model: params.model, + ...(params.tools ? { tools: params.tools } : {}), + ...(params.instructions ? { instructions: params.instructions } : {}), + }); } onMessage(handler: (event: unknown) => void): () => void { @@ -967,6 +988,67 @@ describe("createOpenAIWebSocketStreamFn", () => { }); }); }); + + it("sends warm-up event before first request when openaiWsWarmup=true", async () => { + const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-warmup-enabled"); + const stream = streamFn( + modelStub as Parameters[0], + contextStub as Parameters[1], + { openaiWsWarmup: true } as unknown as Parameters[2], + ); + await new Promise((resolve, reject) => { + queueMicrotask(async () => { + try { + await new Promise((r) => setImmediate(r)); + MockManager.lastInstance!.simulateEvent({ + type: "response.completed", + response: makeResponseObject("resp-warm", "Done"), + }); + for await (const _ of await resolveStream(stream)) { + // consume + } + resolve(); + } catch (e) { + reject(e); + } + }); + }); + const sent = MockManager.lastInstance!.sentEvents as Array>; + expect(sent).toHaveLength(2); + expect(sent[0]?.type).toBe("response.create"); + expect(sent[0]?.generate).toBe(false); + expect(sent[1]?.type).toBe("response.create"); + }); + + it("skips warm-up when openaiWsWarmup=false", async () => { + const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-warmup-disabled"); + const stream = streamFn( + modelStub as Parameters[0], + contextStub as Parameters[1], + { openaiWsWarmup: false } as unknown as Parameters[2], + ); + await new Promise((resolve, reject) => { + queueMicrotask(async () => { + try { + await new Promise((r) => setImmediate(r)); + MockManager.lastInstance!.simulateEvent({ + type: "response.completed", + response: makeResponseObject("resp-nowarm", "Done"), + }); + for await (const _ of await resolveStream(stream)) { + // consume + } + resolve(); + } catch (e) { + reject(e); + } + }); + }); + const sent = MockManager.lastInstance!.sentEvents as Array>; + expect(sent).toHaveLength(1); + expect(sent[0]?.type).toBe("response.create"); + expect(sent[0]?.generate).toBeUndefined(); + }); }); // ───────────────────────────────────────────────────────────────────────────── diff --git a/src/agents/openai-ws-stream.ts b/src/agents/openai-ws-stream.ts index 865ad775840..ae7f1da4376 100644 --- a/src/agents/openai-ws-stream.ts +++ b/src/agents/openai-ws-stream.ts @@ -53,6 +53,8 @@ interface WsSession { lastContextLength: number; /** True if the connection has been established at least once. */ everConnected: boolean; + /** True once a best-effort warm-up attempt has run for this session. */ + warmUpAttempted: boolean; /** True if the session is permanently broken (no more reconnect). */ broken: boolean; } @@ -325,6 +327,7 @@ export interface OpenAIWebSocketStreamOptions { } type WsTransport = "sse" | "websocket" | "auto"; +const WARM_UP_TIMEOUT_MS = 8_000; function resolveWsTransport(options: Parameters[2]): WsTransport { const transport = (options as { transport?: unknown } | undefined)?.transport; @@ -333,6 +336,68 @@ function resolveWsTransport(options: Parameters[2]): WsTransport { : "auto"; } +type WsOptions = Parameters[2] & { openaiWsWarmup?: unknown; signal?: AbortSignal }; + +function resolveWsWarmup(options: Parameters[2]): boolean { + const warmup = (options as WsOptions | undefined)?.openaiWsWarmup; + return warmup === true; +} + +async function runWarmUp(params: { + manager: OpenAIWebSocketManager; + modelId: string; + tools: FunctionToolDefinition[]; + instructions?: string; + signal?: AbortSignal; +}): Promise { + if (params.signal?.aborted) { + throw new Error("aborted"); + } + await new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + cleanup(); + reject(new Error(`warm-up timed out after ${WARM_UP_TIMEOUT_MS}ms`)); + }, WARM_UP_TIMEOUT_MS); + + const abortHandler = () => { + cleanup(); + reject(new Error("aborted")); + }; + const closeHandler = (code: number, reason: string) => { + cleanup(); + reject(new Error(`warm-up closed (code=${code}, reason=${reason || "unknown"})`)); + }; + const unsubscribe = params.manager.onMessage((event) => { + if (event.type === "response.completed") { + cleanup(); + resolve(); + } else if (event.type === "response.failed") { + cleanup(); + const errMsg = event.response?.error?.message ?? "Response failed"; + reject(new Error(`warm-up failed: ${errMsg}`)); + } else if (event.type === "error") { + cleanup(); + reject(new Error(`warm-up error: ${event.message} (code=${event.code})`)); + } + }); + + const cleanup = () => { + clearTimeout(timeout); + params.signal?.removeEventListener("abort", abortHandler); + params.manager.off("close", closeHandler); + unsubscribe(); + }; + + params.signal?.addEventListener("abort", abortHandler, { once: true }); + params.manager.on("close", closeHandler); + params.manager.warmUp({ + model: params.modelId, + tools: params.tools.length > 0 ? params.tools : undefined, + instructions: params.instructions, + }); + }); +} + /** * Creates a `StreamFn` backed by a persistent WebSocket connection to the * OpenAI Responses API. The first call for a given `sessionId` opens the @@ -369,6 +434,7 @@ export function createOpenAIWebSocketStreamFn( manager, lastContextLength: 0, everConnected: false, + warmUpAttempted: false, broken: false, }; wsRegistry.set(sessionId, session); @@ -416,6 +482,29 @@ export function createOpenAIWebSocketStreamFn( return fallbackToHttp(model, context, options, eventStream, opts.signal); } + const signal = opts.signal ?? (options as WsOptions | undefined)?.signal; + + if (resolveWsWarmup(options) && !session.warmUpAttempted) { + session.warmUpAttempted = true; + try { + await runWarmUp({ + manager: session.manager, + modelId: model.id, + tools: convertTools(context.tools), + instructions: context.systemPrompt ?? undefined, + signal, + }); + log.debug(`[ws-stream] warm-up completed for session=${sessionId}`); + } catch (warmErr) { + if (signal?.aborted) { + throw warmErr instanceof Error ? warmErr : new Error(String(warmErr)); + } + log.warn( + `[ws-stream] warm-up failed for session=${sessionId}; continuing without warm-up. error=${String(warmErr)}`, + ); + } + } + // ── 3. Compute incremental vs full input ───────────────────────────── const prevResponseId = session.manager.previousResponseId; let inputItems: InputItem[]; @@ -544,7 +633,6 @@ export function createOpenAIWebSocketStreamFn( cleanup(); reject(new Error("aborted")); }; - const signal = opts.signal ?? (options as { signal?: AbortSignal } | undefined)?.signal; if (signal?.aborted) { reject(new Error("aborted")); return; diff --git a/src/agents/pi-embedded-runner-extraparams.test.ts b/src/agents/pi-embedded-runner-extraparams.test.ts index 5f45ebe7caa..e0d65cda224 100644 --- a/src/agents/pi-embedded-runner-extraparams.test.ts +++ b/src/agents/pi-embedded-runner-extraparams.test.ts @@ -138,9 +138,9 @@ describe("resolveExtraParams", () => { describe("applyExtraParamsToAgent", () => { function createOptionsCaptureAgent() { - const calls: Array = []; + const calls: Array<(SimpleStreamOptions & { openaiWsWarmup?: boolean }) | undefined> = []; const baseStreamFn: StreamFn = (_model, _context, options) => { - calls.push(options); + calls.push(options as (SimpleStreamOptions & { openaiWsWarmup?: boolean }) | undefined); return {} as ReturnType; }; return { @@ -559,6 +559,7 @@ describe("applyExtraParamsToAgent", () => { expect(calls).toHaveLength(1); expect(calls[0]?.transport).toBe("auto"); + expect(calls[0]?.openaiWsWarmup).toBe(true); }); it("lets runtime options override OpenAI default transport", () => { @@ -578,6 +579,68 @@ describe("applyExtraParamsToAgent", () => { expect(calls[0]?.transport).toBe("sse"); }); + it("allows disabling OpenAI websocket warm-up via model params", () => { + const { calls, agent } = createOptionsCaptureAgent(); + const cfg = { + agents: { + defaults: { + models: { + "openai/gpt-5": { + params: { + openaiWsWarmup: false, + }, + }, + }, + }, + }, + }; + + applyExtraParamsToAgent(agent, cfg, "openai", "gpt-5"); + + const model = { + api: "openai-responses", + provider: "openai", + id: "gpt-5", + } as Model<"openai-responses">; + const context: Context = { messages: [] }; + void agent.streamFn?.(model, context, {}); + + expect(calls).toHaveLength(1); + expect(calls[0]?.openaiWsWarmup).toBe(false); + }); + + it("lets runtime options override configured OpenAI websocket warm-up", () => { + const { calls, agent } = createOptionsCaptureAgent(); + const cfg = { + agents: { + defaults: { + models: { + "openai/gpt-5": { + params: { + openaiWsWarmup: false, + }, + }, + }, + }, + }, + }; + + applyExtraParamsToAgent(agent, cfg, "openai", "gpt-5"); + + const model = { + api: "openai-responses", + provider: "openai", + id: "gpt-5", + } as Model<"openai-responses">; + const context: Context = { messages: [] }; + void agent.streamFn?.(model, context, { + openaiWsWarmup: true, + } as unknown as SimpleStreamOptions); + + expect(calls).toHaveLength(1); + expect(calls[0]?.openaiWsWarmup).toBe(true); + }); + it("allows forcing Codex transport to SSE", () => { const { calls, agent } = createOptionsCaptureAgent(); const cfg = { diff --git a/src/agents/pi-embedded-runner/extra-params.ts b/src/agents/pi-embedded-runner/extra-params.ts index de1d552957b..ac4dec57a73 100644 --- a/src/agents/pi-embedded-runner/extra-params.ts +++ b/src/agents/pi-embedded-runner/extra-params.ts @@ -46,6 +46,7 @@ export function resolveExtraParams(params: { type CacheRetention = "none" | "short" | "long"; type CacheRetentionStreamOptions = Partial & { cacheRetention?: CacheRetention; + openaiWsWarmup?: boolean; }; /** @@ -124,6 +125,9 @@ function createStreamFnWithExtraParams( const transportSummary = typeof transport === "string" ? transport : typeof transport; log.warn(`ignoring invalid transport param: ${transportSummary}`); } + if (typeof extraParams.openaiWsWarmup === "boolean") { + streamParams.openaiWsWarmup = extraParams.openaiWsWarmup; + } const cacheRetention = resolveCacheRetention(extraParams, provider); if (cacheRetention) { streamParams.cacheRetention = cacheRetention; @@ -321,11 +325,19 @@ function createCodexDefaultTransportWrapper(baseStreamFn: StreamFn | undefined): function createOpenAIDefaultTransportWrapper(baseStreamFn: StreamFn | undefined): StreamFn { const underlying = baseStreamFn ?? streamSimple; - return (model, context, options) => - underlying(model, context, { + return (model, context, options) => { + const typedOptions = options as + | (SimpleStreamOptions & { openaiWsWarmup?: boolean }) + | undefined; + return underlying(model, context, { ...options, transport: options?.transport ?? "auto", + // Warm-up is optional in OpenAI docs; enabled by default here for lower + // first-turn latency on WebSocket sessions. Set params.openaiWsWarmup=false + // to disable per model. + openaiWsWarmup: typedOptions?.openaiWsWarmup ?? true, }); + }; } function isAnthropic1MModel(modelId: string): boolean {