From e98f976a70dfdffc2af62eeeea5ce6d76857cf1f Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 27 Apr 2026 12:11:29 +0100 Subject: [PATCH] refactor: centralize provider stream fallback ownership --- .../.generated/plugin-sdk-api-baseline.sha256 | 4 +- extensions/qwen/package.json | 3 -- extensions/qwen/stream.ts | 21 +++++---- extensions/vllm/package.json | 3 -- extensions/vllm/stream.ts | 27 ++++++----- pnpm-lock.yaml | 8 ---- src/plugin-sdk/provider-stream-shared.test.ts | 45 +++++++++++++++++++ src/plugin-sdk/provider-stream-shared.ts | 19 ++++++-- ...in-sdk-package-contract-guardrails.test.ts | 13 ++++++ 9 files changed, 99 insertions(+), 44 deletions(-) diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index 8e5ba1621dc..e74323d68b1 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -4eb7cd87196b34b76c8f5d296debe78407ff5a4b8e6867c8e8feb19aabd668cf plugin-sdk-api-baseline.json -ec4c861295450c09889ce5154371686cf68fdc6e3be479ce0c4bc4eeaa38b00b plugin-sdk-api-baseline.jsonl +6eabbe9e1e568fa1bc02539bd21bb6cd463d609f2ad4573d0cbf116ce39a28f9 plugin-sdk-api-baseline.json +c5a5ba7c051ab741b1cdfb36b23f13e6aad9fbe17ba3fa92c4833c0490a35181 plugin-sdk-api-baseline.jsonl diff --git a/extensions/qwen/package.json b/extensions/qwen/package.json index 240b4d499cf..8d3fb5ce6b5 100644 --- a/extensions/qwen/package.json +++ b/extensions/qwen/package.json @@ -4,9 +4,6 @@ "private": true, "description": "OpenClaw Qwen Cloud provider plugin", "type": "module", - "dependencies": { - "@mariozechner/pi-ai": "0.70.2" - }, "devDependencies": { "@openclaw/plugin-sdk": "workspace:*" }, diff --git a/extensions/qwen/stream.ts b/extensions/qwen/stream.ts index c37c3287f70..428a0d2c184 100644 --- a/extensions/qwen/stream.ts +++ b/extensions/qwen/stream.ts @@ -1,8 +1,7 @@ import type { StreamFn } from "@mariozechner/pi-agent-core"; -import { streamSimple } from "@mariozechner/pi-ai"; import type { ProviderWrapStreamFnContext } from "openclaw/plugin-sdk/plugin-entry"; import { normalizeProviderId } from "openclaw/plugin-sdk/provider-model-shared"; -import { streamWithPayloadPatch } from "openclaw/plugin-sdk/provider-stream-shared"; +import { createPayloadPatchStreamWrapper } from "openclaw/plugin-sdk/provider-stream-shared"; type QwenThinkingLevel = ProviderWrapStreamFnContext["thinkingLevel"]; @@ -33,19 +32,19 @@ export function createQwenThinkingWrapper( baseStreamFn: StreamFn | undefined, thinkingLevel: QwenThinkingLevel, ): StreamFn { - const underlying = baseStreamFn ?? streamSimple; - return (model, context, options) => { - if (model.api !== "openai-completions" || !model.reasoning) { - return underlying(model, context, options); - } - const enableThinking = resolveOpenAICompatibleThinkingEnabled({ thinkingLevel, options }); - return streamWithPayloadPatch(underlying, model, context, options, (payloadObj) => { + return createPayloadPatchStreamWrapper( + baseStreamFn, + ({ payload: payloadObj, options }) => { + const enableThinking = resolveOpenAICompatibleThinkingEnabled({ thinkingLevel, options }); payloadObj.enable_thinking = enableThinking; delete payloadObj.reasoning_effort; delete payloadObj.reasoningEffort; delete payloadObj.reasoning; - }); - }; + }, + { + shouldPatch: ({ model }) => model.api === "openai-completions" && model.reasoning, + }, + ); } export function wrapQwenProviderStream(ctx: ProviderWrapStreamFnContext): StreamFn | undefined { diff --git a/extensions/vllm/package.json b/extensions/vllm/package.json index 629fae408a6..79aaecc5d36 100644 --- a/extensions/vllm/package.json +++ b/extensions/vllm/package.json @@ -4,9 +4,6 @@ "private": true, "description": "OpenClaw vLLM provider plugin", "type": "module", - "dependencies": { - "@mariozechner/pi-ai": "0.70.2" - }, "devDependencies": { "@openclaw/plugin-sdk": "workspace:*" }, diff --git a/extensions/vllm/stream.ts b/extensions/vllm/stream.ts index c7b24de2547..15050d73e4d 100644 --- a/extensions/vllm/stream.ts +++ b/extensions/vllm/stream.ts @@ -1,8 +1,7 @@ import type { StreamFn } from "@mariozechner/pi-agent-core"; -import { streamSimple } from "@mariozechner/pi-ai"; import type { ProviderWrapStreamFnContext } from "openclaw/plugin-sdk/plugin-entry"; import { normalizeProviderId } from "openclaw/plugin-sdk/provider-model-shared"; -import { streamWithPayloadPatch } from "openclaw/plugin-sdk/provider-stream-shared"; +import { createPayloadPatchStreamWrapper } from "openclaw/plugin-sdk/provider-stream-shared"; type VllmThinkingLevel = ProviderWrapStreamFnContext["thinkingLevel"]; type VllmQwenThinkingFormat = "chat-template" | "top-level"; @@ -79,16 +78,13 @@ export function createVllmQwenThinkingWrapper(params: { format: VllmQwenThinkingFormat; thinkingLevel: VllmThinkingLevel; }): StreamFn { - const underlying = params.baseStreamFn ?? streamSimple; - return (model, context, options) => { - if (model.api !== "openai-completions" || !model.reasoning) { - return underlying(model, context, options); - } - const enableThinking = resolveOpenAICompatibleThinkingEnabled({ - thinkingLevel: params.thinkingLevel, - options, - }); - return streamWithPayloadPatch(underlying, model, context, options, (payloadObj) => { + return createPayloadPatchStreamWrapper( + params.baseStreamFn, + ({ payload: payloadObj, options }) => { + const enableThinking = resolveOpenAICompatibleThinkingEnabled({ + thinkingLevel: params.thinkingLevel, + options, + }); if (params.format === "chat-template") { setQwenChatTemplateThinking(payloadObj, enableThinking); } else { @@ -97,8 +93,11 @@ export function createVllmQwenThinkingWrapper(params: { delete payloadObj.reasoning_effort; delete payloadObj.reasoningEffort; delete payloadObj.reasoning; - }); - }; + }, + { + shouldPatch: ({ model }) => model.api === "openai-completions" && model.reasoning, + }, + ); } export function wrapVllmProviderStream(ctx: ProviderWrapStreamFnContext): StreamFn | undefined { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5864920e963..b8d5e777002 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1167,10 +1167,6 @@ importers: version: link:../.. extensions/qwen: - dependencies: - '@mariozechner/pi-ai': - specifier: 0.70.2 - version: 0.70.2(@modelcontextprotocol/sdk@1.29.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6) devDependencies: '@openclaw/plugin-sdk': specifier: workspace:* @@ -1380,10 +1376,6 @@ importers: version: link:../../packages/plugin-sdk extensions/vllm: - dependencies: - '@mariozechner/pi-ai': - specifier: 0.70.2 - version: 0.70.2(@modelcontextprotocol/sdk@1.29.0(zod@4.3.6))(ws@8.20.0)(zod@4.3.6) devDependencies: '@openclaw/plugin-sdk': specifier: workspace:* diff --git a/src/plugin-sdk/provider-stream-shared.test.ts b/src/plugin-sdk/provider-stream-shared.test.ts index d7f83b27638..d152c17ef20 100644 --- a/src/plugin-sdk/provider-stream-shared.test.ts +++ b/src/plugin-sdk/provider-stream-shared.test.ts @@ -3,6 +3,7 @@ import { describe, expect, it } from "vitest"; import { buildCopilotDynamicHeaders, createHtmlEntityToolCallArgumentDecodingWrapper, + createPayloadPatchStreamWrapper, defaultToolStreamExtraParams, decodeHtmlEntitiesInObject, hasCopilotVisionInput, @@ -182,3 +183,47 @@ describe("createHtmlEntityToolCallArgumentDecodingWrapper", () => { }); }); }); + +describe("createPayloadPatchStreamWrapper", () => { + it("passes stream call options to payload patches", () => { + let captured: Record = {}; + const baseStreamFn: StreamFn = (_model, _context, options) => { + const payload: Record = {}; + options?.onPayload?.(payload, _model); + captured = payload; + return {} as ReturnType; + }; + + const wrapped = createPayloadPatchStreamWrapper(baseStreamFn, ({ payload, options }) => { + payload.reasoning = (options as { reasoning?: unknown } | undefined)?.reasoning; + }); + void wrapped( + { id: "model" } as never, + { messages: [] } as never, + { + reasoning: "medium", + } as never, + ); + + expect(captured).toEqual({ reasoning: "medium" }); + }); + + it("calls the underlying stream directly when shouldPatch rejects the model", () => { + let onPayloadWasInstalled = false; + const baseStreamFn: StreamFn = (_model, _context, options) => { + onPayloadWasInstalled = typeof options?.onPayload === "function"; + return {} as ReturnType; + }; + + const wrapped = createPayloadPatchStreamWrapper( + baseStreamFn, + ({ payload }) => { + payload.unexpected = true; + }, + { shouldPatch: () => false }, + ); + void wrapped({ id: "model" } as never, { messages: [] } as never, {}); + + expect(onPayloadWasInstalled).toBe(false); + }); +}); diff --git a/src/plugin-sdk/provider-stream-shared.ts b/src/plugin-sdk/provider-stream-shared.ts index 76776a04700..c02d0a566a4 100644 --- a/src/plugin-sdk/provider-stream-shared.ts +++ b/src/plugin-sdk/provider-stream-shared.ts @@ -132,13 +132,26 @@ export function createPayloadPatchStreamWrapper( patchPayload: (params: { payload: Record; model: Parameters[0]; + context: Parameters[1]; + options: Parameters[2]; }) => void, + wrapperOptions?: { + shouldPatch?: (params: { + model: Parameters[0]; + context: Parameters[1]; + options: Parameters[2]; + }) => boolean; + }, ): StreamFn { const underlying = baseStreamFn ?? streamSimple; - return (model, context, options) => - streamWithPayloadPatch(underlying, model, context, options, (payload) => - patchPayload({ payload, model }), + return (model, context, options) => { + if (wrapperOptions?.shouldPatch && !wrapperOptions.shouldPatch({ model, context, options })) { + return underlying(model, context, options); + } + return streamWithPayloadPatch(underlying, model, context, options, (payload) => + patchPayload({ payload, model, context, options }), ); + }; } export type DeepSeekV4ThinkingLevel = ProviderWrapStreamFnContext["thinkingLevel"]; diff --git a/src/plugins/contracts/plugin-sdk-package-contract-guardrails.test.ts b/src/plugins/contracts/plugin-sdk-package-contract-guardrails.test.ts index 8a3b6cad8d1..2d7aa41acab 100644 --- a/src/plugins/contracts/plugin-sdk-package-contract-guardrails.test.ts +++ b/src/plugins/contracts/plugin-sdk-package-contract-guardrails.test.ts @@ -131,6 +131,19 @@ function collectExtensionCoreImportLeaks(): Array<{ file: string; specifier: str } describe("plugin-sdk package contract guardrails", () => { + it("keeps plugin-sdk entrypoint metadata unique", () => { + const counts = new Map(); + for (const entrypoint of pluginSdkEntrypoints) { + counts.set(entrypoint, (counts.get(entrypoint) ?? 0) + 1); + } + const duplicates = [...counts.entries()] + .filter(([, count]) => count > 1) + .map(([entrypoint]) => entrypoint) + .toSorted(); + + expect(duplicates).toEqual([]); + }); + it("keeps package.json exports aligned with built plugin-sdk entrypoints", () => { expect(collectPluginSdkPackageExports()).toEqual([...pluginSdkEntrypoints].toSorted()); });