refactor: centralize provider stream fallback ownership

This commit is contained in:
Peter Steinberger
2026-04-27 12:11:29 +01:00
parent 8200d878a3
commit e98f976a70
9 changed files with 99 additions and 44 deletions

View File

@@ -1,2 +1,2 @@
4eb7cd87196b34b76c8f5d296debe78407ff5a4b8e6867c8e8feb19aabd668cf plugin-sdk-api-baseline.json
ec4c861295450c09889ce5154371686cf68fdc6e3be479ce0c4bc4eeaa38b00b plugin-sdk-api-baseline.jsonl
6eabbe9e1e568fa1bc02539bd21bb6cd463d609f2ad4573d0cbf116ce39a28f9 plugin-sdk-api-baseline.json
c5a5ba7c051ab741b1cdfb36b23f13e6aad9fbe17ba3fa92c4833c0490a35181 plugin-sdk-api-baseline.jsonl

View File

@@ -4,9 +4,6 @@
"private": true,
"description": "OpenClaw Qwen Cloud provider plugin",
"type": "module",
"dependencies": {
"@mariozechner/pi-ai": "0.70.2"
},
"devDependencies": {
"@openclaw/plugin-sdk": "workspace:*"
},

View File

@@ -1,8 +1,7 @@
import type { StreamFn } from "@mariozechner/pi-agent-core";
import { streamSimple } from "@mariozechner/pi-ai";
import type { ProviderWrapStreamFnContext } from "openclaw/plugin-sdk/plugin-entry";
import { normalizeProviderId } from "openclaw/plugin-sdk/provider-model-shared";
import { streamWithPayloadPatch } from "openclaw/plugin-sdk/provider-stream-shared";
import { createPayloadPatchStreamWrapper } from "openclaw/plugin-sdk/provider-stream-shared";
type QwenThinkingLevel = ProviderWrapStreamFnContext["thinkingLevel"];
@@ -33,19 +32,19 @@ export function createQwenThinkingWrapper(
baseStreamFn: StreamFn | undefined,
thinkingLevel: QwenThinkingLevel,
): StreamFn {
const underlying = baseStreamFn ?? streamSimple;
return (model, context, options) => {
if (model.api !== "openai-completions" || !model.reasoning) {
return underlying(model, context, options);
}
const enableThinking = resolveOpenAICompatibleThinkingEnabled({ thinkingLevel, options });
return streamWithPayloadPatch(underlying, model, context, options, (payloadObj) => {
return createPayloadPatchStreamWrapper(
baseStreamFn,
({ payload: payloadObj, options }) => {
const enableThinking = resolveOpenAICompatibleThinkingEnabled({ thinkingLevel, options });
payloadObj.enable_thinking = enableThinking;
delete payloadObj.reasoning_effort;
delete payloadObj.reasoningEffort;
delete payloadObj.reasoning;
});
};
},
{
shouldPatch: ({ model }) => model.api === "openai-completions" && model.reasoning,
},
);
}
export function wrapQwenProviderStream(ctx: ProviderWrapStreamFnContext): StreamFn | undefined {

View File

@@ -4,9 +4,6 @@
"private": true,
"description": "OpenClaw vLLM provider plugin",
"type": "module",
"dependencies": {
"@mariozechner/pi-ai": "0.70.2"
},
"devDependencies": {
"@openclaw/plugin-sdk": "workspace:*"
},

View File

@@ -1,8 +1,7 @@
import type { StreamFn } from "@mariozechner/pi-agent-core";
import { streamSimple } from "@mariozechner/pi-ai";
import type { ProviderWrapStreamFnContext } from "openclaw/plugin-sdk/plugin-entry";
import { normalizeProviderId } from "openclaw/plugin-sdk/provider-model-shared";
import { streamWithPayloadPatch } from "openclaw/plugin-sdk/provider-stream-shared";
import { createPayloadPatchStreamWrapper } from "openclaw/plugin-sdk/provider-stream-shared";
type VllmThinkingLevel = ProviderWrapStreamFnContext["thinkingLevel"];
type VllmQwenThinkingFormat = "chat-template" | "top-level";
@@ -79,16 +78,13 @@ export function createVllmQwenThinkingWrapper(params: {
format: VllmQwenThinkingFormat;
thinkingLevel: VllmThinkingLevel;
}): StreamFn {
const underlying = params.baseStreamFn ?? streamSimple;
return (model, context, options) => {
if (model.api !== "openai-completions" || !model.reasoning) {
return underlying(model, context, options);
}
const enableThinking = resolveOpenAICompatibleThinkingEnabled({
thinkingLevel: params.thinkingLevel,
options,
});
return streamWithPayloadPatch(underlying, model, context, options, (payloadObj) => {
return createPayloadPatchStreamWrapper(
params.baseStreamFn,
({ payload: payloadObj, options }) => {
const enableThinking = resolveOpenAICompatibleThinkingEnabled({
thinkingLevel: params.thinkingLevel,
options,
});
if (params.format === "chat-template") {
setQwenChatTemplateThinking(payloadObj, enableThinking);
} else {
@@ -97,8 +93,11 @@ export function createVllmQwenThinkingWrapper(params: {
delete payloadObj.reasoning_effort;
delete payloadObj.reasoningEffort;
delete payloadObj.reasoning;
});
};
},
{
shouldPatch: ({ model }) => model.api === "openai-completions" && model.reasoning,
},
);
}
export function wrapVllmProviderStream(ctx: ProviderWrapStreamFnContext): StreamFn | undefined {

8
pnpm-lock.yaml generated
View File

@@ -1167,10 +1167,6 @@ importers:
version: link:../..
extensions/qwen:
dependencies:
'@mariozechner/pi-ai':
specifier: 0.70.2
version: 0.70.2(@modelcontextprotocol/sdk@1.29.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6)
devDependencies:
'@openclaw/plugin-sdk':
specifier: workspace:*
@@ -1380,10 +1376,6 @@ importers:
version: link:../../packages/plugin-sdk
extensions/vllm:
dependencies:
'@mariozechner/pi-ai':
specifier: 0.70.2
version: 0.70.2(@modelcontextprotocol/sdk@1.29.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6)
devDependencies:
'@openclaw/plugin-sdk':
specifier: workspace:*

View File

@@ -3,6 +3,7 @@ import { describe, expect, it } from "vitest";
import {
buildCopilotDynamicHeaders,
createHtmlEntityToolCallArgumentDecodingWrapper,
createPayloadPatchStreamWrapper,
defaultToolStreamExtraParams,
decodeHtmlEntitiesInObject,
hasCopilotVisionInput,
@@ -182,3 +183,47 @@ describe("createHtmlEntityToolCallArgumentDecodingWrapper", () => {
});
});
});
describe("createPayloadPatchStreamWrapper", () => {
it("passes stream call options to payload patches", () => {
let captured: Record<string, unknown> = {};
const baseStreamFn: StreamFn = (_model, _context, options) => {
const payload: Record<string, unknown> = {};
options?.onPayload?.(payload, _model);
captured = payload;
return {} as ReturnType<StreamFn>;
};
const wrapped = createPayloadPatchStreamWrapper(baseStreamFn, ({ payload, options }) => {
payload.reasoning = (options as { reasoning?: unknown } | undefined)?.reasoning;
});
void wrapped(
{ id: "model" } as never,
{ messages: [] } as never,
{
reasoning: "medium",
} as never,
);
expect(captured).toEqual({ reasoning: "medium" });
});
it("calls the underlying stream directly when shouldPatch rejects the model", () => {
let onPayloadWasInstalled = false;
const baseStreamFn: StreamFn = (_model, _context, options) => {
onPayloadWasInstalled = typeof options?.onPayload === "function";
return {} as ReturnType<StreamFn>;
};
const wrapped = createPayloadPatchStreamWrapper(
baseStreamFn,
({ payload }) => {
payload.unexpected = true;
},
{ shouldPatch: () => false },
);
void wrapped({ id: "model" } as never, { messages: [] } as never, {});
expect(onPayloadWasInstalled).toBe(false);
});
});

View File

@@ -132,13 +132,26 @@ export function createPayloadPatchStreamWrapper(
patchPayload: (params: {
payload: Record<string, unknown>;
model: Parameters<StreamFn>[0];
context: Parameters<StreamFn>[1];
options: Parameters<StreamFn>[2];
}) => void,
wrapperOptions?: {
shouldPatch?: (params: {
model: Parameters<StreamFn>[0];
context: Parameters<StreamFn>[1];
options: Parameters<StreamFn>[2];
}) => boolean;
},
): StreamFn {
const underlying = baseStreamFn ?? streamSimple;
return (model, context, options) =>
streamWithPayloadPatch(underlying, model, context, options, (payload) =>
patchPayload({ payload, model }),
return (model, context, options) => {
if (wrapperOptions?.shouldPatch && !wrapperOptions.shouldPatch({ model, context, options })) {
return underlying(model, context, options);
}
return streamWithPayloadPatch(underlying, model, context, options, (payload) =>
patchPayload({ payload, model, context, options }),
);
};
}
export type DeepSeekV4ThinkingLevel = ProviderWrapStreamFnContext["thinkingLevel"];

View File

@@ -131,6 +131,19 @@ function collectExtensionCoreImportLeaks(): Array<{ file: string; specifier: str
}
describe("plugin-sdk package contract guardrails", () => {
it("keeps plugin-sdk entrypoint metadata unique", () => {
const counts = new Map<string, number>();
for (const entrypoint of pluginSdkEntrypoints) {
counts.set(entrypoint, (counts.get(entrypoint) ?? 0) + 1);
}
const duplicates = [...counts.entries()]
.filter(([, count]) => count > 1)
.map(([entrypoint]) => entrypoint)
.toSorted();
expect(duplicates).toEqual([]);
});
it("keeps package.json exports aligned with built plugin-sdk entrypoints", () => {
expect(collectPluginSdkPackageExports()).toEqual([...pluginSdkEntrypoints].toSorted());
});