fix(agents): inject resolved OAuth bearer into boundary-aware embedded streams (#73588)

Fixes openclaw#73559. Extracts a shared wrapEmbeddedAgentStreamFn helper and applies it to both provider-owned and boundary-aware fallback paths in resolveEmbeddedAgentStreamFn, forwarding the resolved OAuth bearer (resolvedApiKey → authStorage → options.apiKey) and run abort signal so models routing through openai-codex-responses and other boundary-aware transports stop failing with 401 Missing bearer auth header.
This commit is contained in:
Chunyue Wang
2026-04-29 12:56:43 +08:00
committed by GitHub
parent 2f589aacf9
commit 16fd9a9d59
3 changed files with 213 additions and 31 deletions

View File

@@ -104,6 +104,7 @@ Docs: https://docs.openclaw.ai
- WhatsApp/reliability: publish real transport-liveness into WhatsApp channel status and force earlier reconnects on silent transport stalls, so quiet healthy sessions stay connected while wedged sockets recover before the later remote 408 path. (#72656) Thanks @Sathvik-1007.
- Core/channels: tighten selected runtime, media, and plugin edge-case handling while preserving existing behavior. Thanks @jesse-merhi.
- Channels/WhatsApp: strip leaked plural tool-call XML wrappers on every WhatsApp-visible outbound path and allow `channels.whatsapp.exposeErrorText` to suppress visible error text per channel or account. (#71830) Thanks @rubencu.
- Agents/embedded-runner: inject the resolved OAuth bearer (and forward the run abort signal) on the boundary-aware embedded stream fallback so models that route through `openai-codex-responses` and other boundary-aware transports stop failing with `401 Unauthorized: Missing bearer or basic authentication in header`. Fixes #73559. (#73588) Thanks @openperf.
## 2026.4.27

View File

@@ -1,11 +1,32 @@
import type { StreamFn } from "@mariozechner/pi-agent-core";
import { streamSimple } from "@mariozechner/pi-ai";
import { describe, expect, it, vi } from "vitest";
import * as providerTransportStream from "../provider-transport-stream.js";
import {
describeEmbeddedAgentStreamStrategy,
resolveEmbeddedAgentApiKey,
resolveEmbeddedAgentStreamFn,
} from "./stream-resolution.js";
// Wrap createBoundaryAwareStreamFnForModel with a spy that delegates to the
// real implementation by default so existing routing tests still observe a
// real transport stream; per-test overrideBoundaryAwareStreamFnOnce() injects
// a probe stream when a regression test needs to inspect the wrapped
// transport's options.
vi.mock("../provider-transport-stream.js", async (importOriginal) => {
const actual = await importOriginal<typeof providerTransportStream>();
return {
...actual,
createBoundaryAwareStreamFnForModel: vi.fn(actual.createBoundaryAwareStreamFnForModel),
};
});
const overrideBoundaryAwareStreamFnOnce = (streamFn: StreamFn): void => {
vi.mocked(providerTransportStream.createBoundaryAwareStreamFnForModel).mockReturnValueOnce(
streamFn,
);
};
describe("describeEmbeddedAgentStreamStrategy", () => {
it("describes provider-owned stream paths explicitly", () => {
expect(
@@ -203,4 +224,138 @@ describe("resolveEmbeddedAgentStreamFn", () => {
signal: explicitSignal,
});
});
it("injects the resolved run api key into the boundary-aware Codex Responses fallback", async () => {
const innerStreamFn = vi.fn(async (_model, _context, options) => options);
overrideBoundaryAwareStreamFnOnce(innerStreamFn as never);
const streamFn = resolveEmbeddedAgentStreamFn({
currentStreamFn: undefined,
shouldUseWebSocketTransport: false,
sessionId: "session-1",
model: {
api: "openai-codex-responses",
provider: "openai-codex",
id: "gpt-5.5",
} as never,
resolvedApiKey: "oauth-bearer-token",
});
await expect(
streamFn({ provider: "openai-codex", id: "gpt-5.5" } as never, {} as never, {}),
).resolves.toMatchObject({ apiKey: "oauth-bearer-token" });
expect(innerStreamFn).toHaveBeenCalledTimes(1);
});
it("falls back to authStorage when no resolved api key is available for boundary-aware fallback", async () => {
const innerStreamFn = vi.fn(async (_model, _context, options) => options);
const authStorage = {
getApiKey: vi.fn(async () => "stored-bearer-token"),
};
overrideBoundaryAwareStreamFnOnce(innerStreamFn as never);
const streamFn = resolveEmbeddedAgentStreamFn({
currentStreamFn: undefined,
shouldUseWebSocketTransport: false,
sessionId: "session-1",
model: {
api: "openai-codex-responses",
provider: "openai-codex",
id: "gpt-5.5",
} as never,
authStorage,
});
await expect(
streamFn({ provider: "openai-codex", id: "gpt-5.5" } as never, {} as never, {}),
).resolves.toMatchObject({ apiKey: "stored-bearer-token" });
expect(authStorage.getApiKey).toHaveBeenCalledWith("openai-codex");
});
it("forwards the run abort signal into the boundary-aware fallback when callers omit one", async () => {
const innerStreamFn = vi.fn(async (_model, _context, options) => options);
const runSignal = new AbortController().signal;
overrideBoundaryAwareStreamFnOnce(innerStreamFn as never);
const streamFn = resolveEmbeddedAgentStreamFn({
currentStreamFn: undefined,
shouldUseWebSocketTransport: false,
sessionId: "session-1",
signal: runSignal,
model: {
api: "openai-codex-responses",
provider: "openai-codex",
id: "gpt-5.5",
} as never,
resolvedApiKey: "oauth-bearer-token",
});
await expect(
streamFn({ provider: "openai-codex", id: "gpt-5.5" } as never, {} as never, {}),
).resolves.toMatchObject({ signal: runSignal, apiKey: "oauth-bearer-token" });
});
it("does not overwrite an explicit signal on the boundary-aware fallback path", async () => {
const innerStreamFn = vi.fn(async (_model, _context, options) => options);
const runSignal = new AbortController().signal;
const explicitSignal = new AbortController().signal;
overrideBoundaryAwareStreamFnOnce(innerStreamFn as never);
const streamFn = resolveEmbeddedAgentStreamFn({
currentStreamFn: undefined,
shouldUseWebSocketTransport: false,
sessionId: "session-1",
signal: runSignal,
model: {
api: "openai-codex-responses",
provider: "openai-codex",
id: "gpt-5.5",
} as never,
resolvedApiKey: "oauth-bearer-token",
});
await expect(
streamFn({ provider: "openai-codex", id: "gpt-5.5" } as never, {} as never, {
signal: explicitSignal,
}),
).resolves.toMatchObject({ signal: explicitSignal });
});
it("forwards the run signal on the sync boundary-aware fallback path without auth credentials", async () => {
const innerStreamFn = vi.fn(async (_model, _context, options) => options);
const runSignal = new AbortController().signal;
overrideBoundaryAwareStreamFnOnce(innerStreamFn as never);
const streamFn = resolveEmbeddedAgentStreamFn({
currentStreamFn: undefined,
shouldUseWebSocketTransport: false,
sessionId: "session-1",
signal: runSignal,
model: {
api: "openai-codex-responses",
provider: "openai-codex",
id: "gpt-5.5",
} as never,
});
await expect(
streamFn({ provider: "openai-codex", id: "gpt-5.5" } as never, {} as never, {}),
).resolves.toMatchObject({ signal: runSignal });
});
it("does not strip cache boundary markers on the boundary-aware fallback path", async () => {
const innerStreamFn = vi.fn(async (_model, context, _options) => context);
overrideBoundaryAwareStreamFnOnce(innerStreamFn as never);
const streamFn = resolveEmbeddedAgentStreamFn({
currentStreamFn: undefined,
shouldUseWebSocketTransport: false,
sessionId: "session-1",
model: {
api: "openai-codex-responses",
provider: "openai-codex",
id: "gpt-5.5",
} as never,
resolvedApiKey: "oauth-bearer-token",
});
const systemPrompt = "intro<<openclaw-cache-boundary>>tail";
await expect(
streamFn({ provider: "openai-codex", id: "gpt-5.5" } as never, { systemPrompt } as never, {}),
).resolves.toMatchObject({ systemPrompt });
});
});

View File

@@ -73,36 +73,19 @@ export function resolveEmbeddedAgentStreamFn(params: {
authStorage?: { getApiKey(provider: string): Promise<string | undefined> };
}): StreamFn {
if (params.providerStreamFn) {
const inner = params.providerStreamFn;
const normalizeContext = (context: Parameters<StreamFn>[1]) =>
context.systemPrompt
? {
...context,
systemPrompt: stripSystemPromptCacheBoundary(context.systemPrompt),
}
: context;
const mergeRunSignal = (options: Parameters<StreamFn>[2]) => {
const signal = options?.signal ?? params.signal;
return signal ? { ...options, signal } : options;
};
// Provider-owned transports bypass pi-coding-agent's default auth lookup,
// so keep injecting the resolved runtime apiKey for streamSimple-compatible
// transports that still read credentials from options.apiKey.
if (params.authStorage || params.resolvedApiKey) {
const { authStorage, model, resolvedApiKey } = params;
return async (m, context, options) => {
const apiKey = await resolveEmbeddedAgentApiKey({
provider: model.provider,
resolvedApiKey,
authStorage,
});
return inner(m, normalizeContext(context), {
...mergeRunSignal(options),
apiKey: apiKey ?? options?.apiKey,
});
};
}
return (m, context, options) => inner(m, normalizeContext(context), mergeRunSignal(options));
return wrapEmbeddedAgentStreamFn(params.providerStreamFn, {
runSignal: params.signal,
resolvedApiKey: params.resolvedApiKey,
authStorage: params.authStorage,
providerId: params.model.provider,
transformContext: (context) =>
context.systemPrompt
? {
...context,
systemPrompt: stripSystemPromptCacheBoundary(context.systemPrompt),
}
: context,
});
}
const currentStreamFn = params.currentStreamFn ?? streamSimple;
@@ -124,9 +107,52 @@ export function resolveEmbeddedAgentStreamFn(params: {
if (params.currentStreamFn === undefined || params.currentStreamFn === streamSimple) {
const boundaryAwareStreamFn = createBoundaryAwareStreamFnForModel(params.model);
if (boundaryAwareStreamFn) {
return boundaryAwareStreamFn;
// Boundary-aware transports read credentials from options.apiKey just
// like provider-owned streams, but the embedded run layer never gets to
// inject the resolved runtime key for them. Without this wrap, OAuth
// providers (e.g. openai-codex/gpt-5.5) hit the Responses API with an
// empty bearer and fail with 401 Missing bearer auth header.
return wrapEmbeddedAgentStreamFn(boundaryAwareStreamFn, {
runSignal: params.signal,
resolvedApiKey: params.resolvedApiKey,
authStorage: params.authStorage,
providerId: params.model.provider,
});
}
}
return currentStreamFn;
}
function wrapEmbeddedAgentStreamFn(
inner: StreamFn,
params: {
runSignal: AbortSignal | undefined;
resolvedApiKey: string | undefined;
authStorage: { getApiKey(provider: string): Promise<string | undefined> } | undefined;
providerId: string;
transformContext?: (context: Parameters<StreamFn>[1]) => Parameters<StreamFn>[1];
},
): StreamFn {
const transformContext =
params.transformContext ?? ((context: Parameters<StreamFn>[1]) => context);
const mergeRunSignal = (options: Parameters<StreamFn>[2]) => {
const signal = options?.signal ?? params.runSignal;
return signal ? { ...options, signal } : options;
};
if (!params.authStorage && !params.resolvedApiKey) {
return (m, context, options) => inner(m, transformContext(context), mergeRunSignal(options));
}
const { authStorage, providerId, resolvedApiKey } = params;
return async (m, context, options) => {
const apiKey = await resolveEmbeddedAgentApiKey({
provider: providerId,
resolvedApiKey,
authStorage,
});
return inner(m, transformContext(context), {
...mergeRunSignal(options),
apiKey: apiKey ?? options?.apiKey,
});
};
}