fix: keep copilot on boundary-aware stream path

This commit is contained in:
Peter Steinberger
2026-04-25 00:06:26 +01:00
parent 1787ae0f5d
commit beefcda68f
5 changed files with 77 additions and 28 deletions

View File

@@ -145,7 +145,11 @@ describeLive("github-copilot connection-bound Responses IDs live", () => {
};
let capturedPayload: Record<string, unknown> | undefined;
const stream = wrapCopilotOpenAIResponsesStream(streamOpenAIResponses as never)(
const wrappedStream = wrapCopilotOpenAIResponsesStream(streamOpenAIResponses as never);
if (!wrappedStream) {
throw new Error("expected Copilot Responses stream wrapper");
}
const stream = wrappedStream(
model as never,
context as never,
{

View File

@@ -6,6 +6,14 @@ import {
wrapCopilotProviderStream,
} from "./stream.js";
function requireStreamFn(streamFn: ReturnType<typeof wrapCopilotProviderStream>) {
expect(streamFn).toBeTypeOf("function");
if (!streamFn) {
throw new Error("expected stream fn");
}
return streamFn;
}
describe("wrapCopilotAnthropicStream", () => {
it("adds Copilot headers and Anthropic cache markers for Claude payloads", async () => {
const payloads: Array<{
@@ -28,7 +36,7 @@ describe("wrapCopilotAnthropicStream", () => {
} as never;
});
const wrapped = wrapCopilotAnthropicStream(baseStreamFn);
const wrapped = requireStreamFn(wrapCopilotAnthropicStream(baseStreamFn));
const messages = [
{
role: "user",
@@ -77,7 +85,7 @@ describe("wrapCopilotAnthropicStream", () => {
it("leaves non-Anthropic Copilot models untouched", () => {
const baseStreamFn = vi.fn(() => ({ async *[Symbol.asyncIterator]() {} }) as never);
const wrapped = wrapCopilotAnthropicStream(baseStreamFn);
const wrapped = requireStreamFn(wrapCopilotAnthropicStream(baseStreamFn));
const options = { headers: { Existing: "1" } };
void wrapped(
@@ -105,7 +113,7 @@ describe("wrapCopilotAnthropicStream", () => {
} as never;
});
const wrapped = wrapCopilotOpenAIResponsesStream(baseStreamFn);
const wrapped = requireStreamFn(wrapCopilotOpenAIResponsesStream(baseStreamFn));
const messages = [
{
role: "toolResult",
@@ -149,7 +157,7 @@ describe("wrapCopilotAnthropicStream", () => {
} as never;
});
const wrapped = wrapCopilotOpenAIResponsesStream(baseStreamFn);
const wrapped = requireStreamFn(wrapCopilotOpenAIResponsesStream(baseStreamFn));
await wrapped(
{
@@ -171,9 +179,11 @@ describe("wrapCopilotAnthropicStream", () => {
it("adapts provider stream context without changing wrapper behavior", () => {
const baseStreamFn = vi.fn(() => ({ async *[Symbol.asyncIterator]() {} }) as never);
const wrapped = wrapCopilotProviderStream({
streamFn: baseStreamFn,
} as never);
const wrapped = requireStreamFn(
wrapCopilotProviderStream({
streamFn: baseStreamFn,
} as never),
);
void wrapped(
{
@@ -187,4 +197,12 @@ describe("wrapCopilotAnthropicStream", () => {
expect(baseStreamFn).toHaveBeenCalledOnce();
});
it("does not claim provider transport before OpenClaw chooses one", () => {
expect(
wrapCopilotProviderStream({
streamFn: undefined,
} as never),
).toBeUndefined();
});
});

View File

@@ -1,5 +1,4 @@
import type { StreamFn } from "@mariozechner/pi-agent-core";
import { streamSimple } from "@mariozechner/pi-ai";
import type { ProviderWrapStreamFnContext } from "openclaw/plugin-sdk/plugin-entry";
import {
applyAnthropicEphemeralCacheControlMarkers,
@@ -23,8 +22,26 @@ function patchOnPayloadResult(result: unknown): unknown {
return result;
}
export function wrapCopilotAnthropicStream(baseStreamFn: StreamFn | undefined): StreamFn {
const underlying = baseStreamFn ?? streamSimple;
function buildCopilotRequestHeaders(
context: Parameters<StreamFn>[1],
headers: Record<string, string> | undefined,
): Record<string, string> {
return {
...buildCopilotDynamicHeaders({
messages: context.messages,
hasImages: hasCopilotVisionInput(context.messages),
}),
...headers,
};
}
export function wrapCopilotAnthropicStream(
baseStreamFn: StreamFn | undefined,
): StreamFn | undefined {
if (!baseStreamFn) {
return undefined;
}
const underlying = baseStreamFn;
return (model, context, options) => {
if (model.provider !== "github-copilot" || model.api !== "anthropic-messages") {
return underlying(model, context, options);
@@ -36,21 +53,20 @@ export function wrapCopilotAnthropicStream(baseStreamFn: StreamFn | undefined):
context,
{
...options,
headers: {
...buildCopilotDynamicHeaders({
messages: context.messages,
hasImages: hasCopilotVisionInput(context.messages),
}),
...options?.headers,
},
headers: buildCopilotRequestHeaders(context, options?.headers),
},
applyAnthropicEphemeralCacheControlMarkers,
);
};
}
export function wrapCopilotOpenAIResponsesStream(baseStreamFn: StreamFn | undefined): StreamFn {
const underlying = baseStreamFn ?? streamSimple;
export function wrapCopilotOpenAIResponsesStream(
baseStreamFn: StreamFn | undefined,
): StreamFn | undefined {
if (!baseStreamFn) {
return undefined;
}
const underlying = baseStreamFn;
return (model, context, options) => {
if (model.provider !== "github-copilot" || model.api !== "openai-responses") {
return underlying(model, context, options);
@@ -59,13 +75,7 @@ export function wrapCopilotOpenAIResponsesStream(baseStreamFn: StreamFn | undefi
const originalOnPayload = options?.onPayload;
const wrappedOptions: StreamOptions = {
...options,
headers: {
...buildCopilotDynamicHeaders({
messages: context.messages,
hasImages: hasCopilotVisionInput(context.messages),
}),
...options?.headers,
},
headers: buildCopilotRequestHeaders(context, options?.headers),
onPayload: (payload, payloadModel) => {
rewriteCopilotResponsePayloadConnectionBoundIds(payload);
return patchOnPayloadResult(originalOnPayload?.(payload, payloadModel));
@@ -75,6 +85,6 @@ export function wrapCopilotOpenAIResponsesStream(baseStreamFn: StreamFn | undefi
};
}
export function wrapCopilotProviderStream(ctx: ProviderWrapStreamFnContext): StreamFn {
export function wrapCopilotProviderStream(ctx: ProviderWrapStreamFnContext): StreamFn | undefined {
return wrapCopilotOpenAIResponsesStream(wrapCopilotAnthropicStream(ctx.streamFn));
}