fix(agents): route pi default streams through transport (#79201)

This commit is contained in:
Peter Steinberger
2026-05-08 06:27:46 +01:00
committed by GitHub
parent 03c41eac2d
commit 6eae017dd6
3 changed files with 45 additions and 4 deletions

View File

@@ -183,6 +183,7 @@ Docs: https://docs.openclaw.ai
- Control UI/chat: hide retired and non-public Google Gemini model IDs from chat model catalogs and route the bare `gemini-3-pro` alias to Gemini 3.1 Pro Preview instead of the shut-down Gemini 3 Pro Preview. Thanks @BunsDev.
- CLI/install: refuse state-mutating OpenClaw CLI runs as root by default, keep an explicit `OPENCLAW_ALLOW_ROOT=1` escape hatch for intentional root/container use, and update DigitalOcean setup guidance to run OpenClaw as a non-root user. Fixes #67478. Thanks @Jerry-Xin and @natechicago.
- Auto-reply/media: resolve `scp` from `PATH` when staging sandbox media so nonstandard OpenSSH installs can copy remote attachments.
- Agents/PI: route PI-native OpenAI-compatible default streams through OpenClaw boundary-aware transports so local-compatible model runs keep API-key injection and transport policy.
- Gateway/watch: leave `OPENCLAW_TRACE_SYNC_IO` disabled by default in `pnpm gateway:watch:raw` so watch mode avoids noisy Node sync-I/O stack traces unless explicitly requested.
- Codex app-server: close stdio stdin before force-killing the managed app-server, matching Codex single-client shutdown behavior and avoiding unsettled CLI exits after successful runs.
- CLI/Codex: dispose registered agent harnesses during short-lived CLI shutdown so successful Codex-backed `agent --local` runs do not leave app-server child processes alive.

View File

@@ -1,5 +1,5 @@
import type { StreamFn } from "@mariozechner/pi-agent-core";
import { streamSimple } from "@mariozechner/pi-ai";
import { getApiProvider, streamSimple } from "@mariozechner/pi-ai";
import { describe, expect, it, vi } from "vitest";
import * as providerTransportStream from "../provider-transport-stream.js";
import {
@@ -147,6 +147,31 @@ describe("resolveEmbeddedAgentStreamFn", () => {
expect(streamFn).not.toBe(streamSimple);
});
it("routes PI native OpenAI-compatible provider streams through boundary-aware transports", async () => {
const nativeStreamFn = getApiProvider("openai-completions")?.streamSimple;
expect(nativeStreamFn).toBeDefined();
const innerStreamFn = vi.fn(async (_model, _context, options) => options);
overrideBoundaryAwareStreamFnOnce(innerStreamFn as never);
const streamFn = resolveEmbeddedAgentStreamFn({
currentStreamFn: nativeStreamFn as StreamFn,
shouldUseWebSocketTransport: false,
sessionId: "session-1",
model: {
api: "openai-completions",
provider: "llama",
id: "qwen36-35b-a3b",
} as never,
resolvedApiKey: "local-token",
});
expect(streamFn).not.toBe(nativeStreamFn);
await expect(
streamFn({ provider: "llama", id: "qwen36-35b-a3b" } as never, {} as never, {}),
).resolves.toMatchObject({ apiKey: "local-token" });
expect(innerStreamFn).toHaveBeenCalledTimes(1);
});
it("injects the resolved run api key into provider-owned stream functions", async () => {
const providerStreamFn = vi.fn(async (_model, _context, options) => options);
const authStorage = {

View File

@@ -1,5 +1,5 @@
import type { StreamFn } from "@mariozechner/pi-agent-core";
import { streamSimple } from "@mariozechner/pi-ai";
import { getApiProvider, streamSimple } from "@mariozechner/pi-ai";
import { createAnthropicVertexStreamFnForModel } from "../anthropic-vertex-stream.js";
import { createOpenAIWebSocketStreamFn } from "../openai-ws-stream.js";
import { getModelProviderRequestTransport } from "../provider-request-config.js";
@@ -25,6 +25,21 @@ export function resetEmbeddedAgentBaseStreamFnCacheForTest(): void {
embeddedAgentBaseStreamFnCache = new WeakMap<object, StreamFn | undefined>();
}
function isDefaultPiStreamFnForModel(
model: EmbeddedRunAttemptParams["model"],
streamFn: StreamFn | undefined,
): boolean {
if (!streamFn || streamFn === streamSimple) {
return true;
}
const api = typeof model.api === "string" ? model.api.trim() : "";
if (!api) {
return false;
}
const provider = getApiProvider(api as never);
return streamFn === provider?.streamSimple || streamFn === provider?.stream;
}
export function describeEmbeddedAgentStreamStrategy(params: {
currentStreamFn: StreamFn | undefined;
providerStreamFn?: StreamFn;
@@ -41,7 +56,7 @@ export function describeEmbeddedAgentStreamStrategy(params: {
if (params.model.provider === "anthropic-vertex") {
return "anthropic-vertex";
}
if (params.currentStreamFn === undefined || params.currentStreamFn === streamSimple) {
if (isDefaultPiStreamFnForModel(params.model, params.currentStreamFn)) {
return createBoundaryAwareStreamFnForModel(params.model)
? `boundary-aware:${params.model.api}`
: "stream-simple";
@@ -104,7 +119,7 @@ export function resolveEmbeddedAgentStreamFn(params: {
return createAnthropicVertexStreamFnForModel(params.model);
}
if (params.currentStreamFn === undefined || params.currentStreamFn === streamSimple) {
if (isDefaultPiStreamFnForModel(params.model, params.currentStreamFn)) {
const boundaryAwareStreamFn = createBoundaryAwareStreamFnForModel(params.model);
if (boundaryAwareStreamFn) {
// Boundary-aware transports read credentials from options.apiKey just