From 6eae017dd6142abe10c550b6a24d85cb652d176b Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 8 May 2026 06:27:46 +0100 Subject: [PATCH] fix(agents): route pi default streams through transport (#79201) --- CHANGELOG.md | 1 + .../stream-resolution.test.ts | 27 ++++++++++++++++++- .../pi-embedded-runner/stream-resolution.ts | 21 ++++++++++++--- 3 files changed, 45 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d76f1626782..0861b062f5d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -183,6 +183,7 @@ Docs: https://docs.openclaw.ai - Control UI/chat: hide retired and non-public Google Gemini model IDs from chat model catalogs and route the bare `gemini-3-pro` alias to Gemini 3.1 Pro Preview instead of the shut-down Gemini 3 Pro Preview. Thanks @BunsDev. - CLI/install: refuse state-mutating OpenClaw CLI runs as root by default, keep an explicit `OPENCLAW_ALLOW_ROOT=1` escape hatch for intentional root/container use, and update DigitalOcean setup guidance to run OpenClaw as a non-root user. Fixes #67478. Thanks @Jerry-Xin and @natechicago. - Auto-reply/media: resolve `scp` from `PATH` when staging sandbox media so nonstandard OpenSSH installs can copy remote attachments. +- Agents/PI: route PI-native OpenAI-compatible default streams through OpenClaw boundary-aware transports so local-compatible model runs keep API-key injection and transport policy. - Gateway/watch: leave `OPENCLAW_TRACE_SYNC_IO` disabled by default in `pnpm gateway:watch:raw` so watch mode avoids noisy Node sync-I/O stack traces unless explicitly requested. - Codex app-server: close stdio stdin before force-killing the managed app-server, matching Codex single-client shutdown behavior and avoiding unsettled CLI exits after successful runs. - CLI/Codex: dispose registered agent harnesses during short-lived CLI shutdown so successful Codex-backed `agent --local` runs do not leave app-server child processes alive. diff --git a/src/agents/pi-embedded-runner/stream-resolution.test.ts b/src/agents/pi-embedded-runner/stream-resolution.test.ts index 66ba5247ec7..6d5ad204dc4 100644 --- a/src/agents/pi-embedded-runner/stream-resolution.test.ts +++ b/src/agents/pi-embedded-runner/stream-resolution.test.ts @@ -1,5 +1,5 @@ import type { StreamFn } from "@mariozechner/pi-agent-core"; -import { streamSimple } from "@mariozechner/pi-ai"; +import { getApiProvider, streamSimple } from "@mariozechner/pi-ai"; import { describe, expect, it, vi } from "vitest"; import * as providerTransportStream from "../provider-transport-stream.js"; import { @@ -147,6 +147,31 @@ describe("resolveEmbeddedAgentStreamFn", () => { expect(streamFn).not.toBe(streamSimple); }); + it("routes PI native OpenAI-compatible provider streams through boundary-aware transports", async () => { + const nativeStreamFn = getApiProvider("openai-completions")?.streamSimple; + expect(nativeStreamFn).toBeDefined(); + const innerStreamFn = vi.fn(async (_model, _context, options) => options); + overrideBoundaryAwareStreamFnOnce(innerStreamFn as never); + + const streamFn = resolveEmbeddedAgentStreamFn({ + currentStreamFn: nativeStreamFn as StreamFn, + shouldUseWebSocketTransport: false, + sessionId: "session-1", + model: { + api: "openai-completions", + provider: "llama", + id: "qwen36-35b-a3b", + } as never, + resolvedApiKey: "local-token", + }); + + expect(streamFn).not.toBe(nativeStreamFn); + await expect( + streamFn({ provider: "llama", id: "qwen36-35b-a3b" } as never, {} as never, {}), + ).resolves.toMatchObject({ apiKey: "local-token" }); + expect(innerStreamFn).toHaveBeenCalledTimes(1); + }); + it("injects the resolved run api key into provider-owned stream functions", async () => { const providerStreamFn = vi.fn(async (_model, _context, options) => options); const authStorage = { diff --git a/src/agents/pi-embedded-runner/stream-resolution.ts b/src/agents/pi-embedded-runner/stream-resolution.ts index c1e2f5aa225..91999ad4c2e 100644 --- a/src/agents/pi-embedded-runner/stream-resolution.ts +++ b/src/agents/pi-embedded-runner/stream-resolution.ts @@ -1,5 +1,5 @@ import type { StreamFn } from "@mariozechner/pi-agent-core"; -import { streamSimple } from "@mariozechner/pi-ai"; +import { getApiProvider, streamSimple } from "@mariozechner/pi-ai"; import { createAnthropicVertexStreamFnForModel } from "../anthropic-vertex-stream.js"; import { createOpenAIWebSocketStreamFn } from "../openai-ws-stream.js"; import { getModelProviderRequestTransport } from "../provider-request-config.js"; @@ -25,6 +25,21 @@ export function resetEmbeddedAgentBaseStreamFnCacheForTest(): void { embeddedAgentBaseStreamFnCache = new WeakMap(); } +function isDefaultPiStreamFnForModel( + model: EmbeddedRunAttemptParams["model"], + streamFn: StreamFn | undefined, +): boolean { + if (!streamFn || streamFn === streamSimple) { + return true; + } + const api = typeof model.api === "string" ? model.api.trim() : ""; + if (!api) { + return false; + } + const provider = getApiProvider(api as never); + return streamFn === provider?.streamSimple || streamFn === provider?.stream; +} + export function describeEmbeddedAgentStreamStrategy(params: { currentStreamFn: StreamFn | undefined; providerStreamFn?: StreamFn; @@ -41,7 +56,7 @@ export function describeEmbeddedAgentStreamStrategy(params: { if (params.model.provider === "anthropic-vertex") { return "anthropic-vertex"; } - if (params.currentStreamFn === undefined || params.currentStreamFn === streamSimple) { + if (isDefaultPiStreamFnForModel(params.model, params.currentStreamFn)) { return createBoundaryAwareStreamFnForModel(params.model) ? `boundary-aware:${params.model.api}` : "stream-simple"; @@ -104,7 +119,7 @@ export function resolveEmbeddedAgentStreamFn(params: { return createAnthropicVertexStreamFnForModel(params.model); } - if (params.currentStreamFn === undefined || params.currentStreamFn === streamSimple) { + if (isDefaultPiStreamFnForModel(params.model, params.currentStreamFn)) { const boundaryAwareStreamFn = createBoundaryAwareStreamFnForModel(params.model); if (boundaryAwareStreamFn) { // Boundary-aware transports read credentials from options.apiKey just