From c1383680405879a3c3f43dceb20a694f54d3c13c Mon Sep 17 00:00:00 2001 From: EVA Date: Fri, 24 Apr 2026 15:32:27 +0700 Subject: [PATCH] feat: add Codex harness extension seams Co-authored-by: Eva <100yenadmin@users.noreply.github.com> --- CHANGELOG.md | 1 + .../.generated/plugin-sdk-api-baseline.sha256 | 4 +- docs/tools/capability-cookbook.md | 19 +++ docs/tools/plugin.md | 4 +- .../codex/src/app-server/run-attempt.ts | 2 + .../src/bot.create-telegram-bot.test.ts | 1 - src/agents/cli-runner.ts | 1 + src/agents/embedded-runner.ts | 17 +++ src/agents/harness/selection.test.ts | 28 ++++ src/agents/harness/selection.ts | 20 ++- src/agents/harness/types.ts | 8 ++ src/agents/model-fallback.test.ts | 40 ++++++ src/agents/openai-ws-stream.test.ts | 126 ++++++++++++++++ src/agents/openai-ws-stream.ts | 90 ++++++++++-- .../pi-embedded-runner-extraparams.test.ts | 135 ++++++++++++++++++ ...pi-agent.auth-profile-rotation.e2e.test.ts | 1 + src/agents/pi-embedded-runner/aliases.test.ts | 19 +++ src/agents/pi-embedded-runner/extra-params.ts | 68 +++++++-- .../result-fallback-classifier.ts | 38 +++++ .../run.overflow-compaction.harness.ts | 1 + src/agents/pi-embedded-runner/run.ts | 35 ++++- ...mpt.spawn-workspace.context-engine.test.ts | 4 +- .../run/attempt.subscription-cleanup.ts | 5 +- src/agents/pi-embedded-runner/run/attempt.ts | 23 ++- .../run/message-merge-strategy.test.ts | 64 +++++++++ .../run/message-merge-strategy.ts | 54 +++++++ src/agents/pi-embedded-runner/run/types.ts | 1 + src/agents/pi-embedded-runner/types.ts | 1 + src/agents/provider-api-families.test.ts | 18 +++ src/agents/provider-api-families.ts | 10 ++ src/auto-reply/reply/followup-runner.test.ts | 62 ++++++++ src/auto-reply/reply/followup-runner.ts | 49 ++++++- src/plugin-sdk/agent-harness-runtime.ts | 1 + src/plugins/hook-types.ts | 8 ++ src/plugins/provider-hook-runtime.ts | 35 +++++ src/plugins/provider-runtime.test.ts | 123 ++++++++++++++++ src/plugins/provider-runtime.ts | 26 +++- src/plugins/types.ts | 80 +++++++++++ 38 files changed, 1173 insertions(+), 49 deletions(-) create mode 100644 src/agents/embedded-runner.ts create mode 100644 src/agents/pi-embedded-runner/aliases.test.ts create mode 100644 src/agents/pi-embedded-runner/run/message-merge-strategy.test.ts create mode 100644 src/agents/pi-embedded-runner/run/message-merge-strategy.ts create mode 100644 src/agents/provider-api-families.test.ts create mode 100644 src/agents/provider-api-families.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 434e8791ea8..09d59976580 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ Docs: https://docs.openclaw.ai - Agents/tools: add optional per-call `timeoutMs` support for image, video, music, and TTS generation tools so agents can extend provider request timeouts only when a specific generation needs it. - Agents/subagents: add optional forked context for native `sessions_spawn` runs so agents can let a child inherit the requester transcript when needed, while keeping clean isolated sessions as the default; includes prompt guidance, context-engine hook metadata, docs, and QA coverage. - Codex harness: add structured debug logging for embedded harness selection decisions so `/status` stays simple while gateway logs explain auto-selection and Pi fallback reasons. (#70760) Thanks @100yenadmin. +- Plugin SDK/Codex harness: add provider-owned transport/auth/follow-up seams and harness result classification so Codex-style runtimes can participate in fallback policy without core special-casing. (#70772) Thanks @100yenadmin. - Dependencies/Pi: update bundled Pi packages to `0.70.0`, use Pi's upstream `gpt-5.5` catalog metadata for OpenAI and OpenAI Codex, and keep only local `gpt-5.5-pro` forward-compat handling. - Models/CLI: speed up `openclaw models list --all --provider ` for bundled providers with safe static catalogs while keeping live and third-party providers on registry discovery. (#70632) Thanks @shakkernerd. - Models/CLI: avoid broad registry enumeration for default `openclaw models list`, reducing default listing latency while preserving configured-row output. (#70883) Thanks @shakkernerd. diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index 6d1cc133f51..cd321edcfa1 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -c57d43f93ec2930b099dd5c5777f201f1bdd1ab432eeb4049b6e62ff23fe8112 plugin-sdk-api-baseline.json -ece1ea689914c4070b587551e86c6bed6598feba90457ab489222e168b2d9298 plugin-sdk-api-baseline.jsonl +8ca22ea6125fb198641c676d73b4df5a3bc49079be68bef8ed0718a54c1bb53a plugin-sdk-api-baseline.json +197d9743128020062fc457228fa9139d0bd465d9e1775101bfc39137f4a10896 plugin-sdk-api-baseline.jsonl diff --git a/docs/tools/capability-cookbook.md b/docs/tools/capability-cookbook.md index 09eb06029d4..3746fc02529 100644 --- a/docs/tools/capability-cookbook.md +++ b/docs/tools/capability-cookbook.md @@ -69,6 +69,25 @@ Feature/channel plugin: - calls `api.runtime.*` or the matching `plugin-sdk/*-runtime` helper - never calls a vendor implementation directly +## Provider and Harness Seams + +Use provider hooks when the behavior belongs to the model provider contract +rather than the generic agent loop. Examples include provider-specific request +params after transport selection, auth-profile preference, prompt overlays, and +follow-up fallback routing after model/profile failover. + +Use agent harness hooks when the behavior belongs to the runtime that is +executing a turn. Harnesses can classify successful-but-unusable attempt results +such as empty, reasoning-only, or planning-only responses so the outer model +fallback policy can make the retry decision. + +Keep both seams narrow: + +- core owns the retry/fallback policy +- provider plugins own provider-specific request/auth/routing hints +- harness plugins own runtime-specific attempt classification +- third-party plugins return hints, not direct mutations of core state + ## File checklist For a new capability, expect to touch these areas: diff --git a/docs/tools/plugin.md b/docs/tools/plugin.md index ef64601fbdd..b1ce85d02d2 100644 --- a/docs/tools/plugin.md +++ b/docs/tools/plugin.md @@ -9,8 +9,8 @@ sidebarTitle: "Install and Configure" --- Plugins extend OpenClaw with new capabilities: channels, model providers, -tools, skills, speech, realtime transcription, realtime voice, -media-understanding, image generation, video generation, web fetch, web +agent harnesses, tools, skills, speech, realtime transcription, realtime +voice, media-understanding, image generation, video generation, web fetch, web search, and more. Some plugins are **core** (shipped with OpenClaw), others are **external** (published on npm by the community). diff --git a/extensions/codex/src/app-server/run-attempt.ts b/extensions/codex/src/app-server/run-attempt.ts index 29e822fc234..4ddd00f1482 100644 --- a/extensions/codex/src/app-server/run-attempt.ts +++ b/extensions/codex/src/app-server/run-attempt.ts @@ -448,6 +448,7 @@ export async function runCodexAppServerAttempt( sessionId: params.sessionId, provider: params.provider, model: params.modelId, + resolvedRef: `${params.provider}/${params.modelId}`, assistantTexts: [], }, ctx: hookContext, @@ -602,6 +603,7 @@ export async function runCodexAppServerAttempt( sessionId: params.sessionId, provider: params.provider, model: params.modelId, + resolvedRef: `${params.provider}/${params.modelId}`, assistantTexts: result.assistantTexts, ...(result.lastAssistant ? { lastAssistant: result.lastAssistant } : {}), ...(result.attemptUsage ? { usage: result.attemptUsage } : {}), diff --git a/extensions/telegram/src/bot.create-telegram-bot.test.ts b/extensions/telegram/src/bot.create-telegram-bot.test.ts index 95d4666660f..820efada259 100644 --- a/extensions/telegram/src/bot.create-telegram-bot.test.ts +++ b/extensions/telegram/src/bot.create-telegram-bot.test.ts @@ -240,7 +240,6 @@ describe("createTelegramBot", () => { it("lets /status bypass a busy Telegram topic lane", async () => { installPerKeySequentializer(); loadConfig.mockReturnValue({ - commands: { native: true }, channels: { telegram: { dmPolicy: "open", diff --git a/src/agents/cli-runner.ts b/src/agents/cli-runner.ts index ecb23331387..04d6a1a7aa2 100644 --- a/src/agents/cli-runner.ts +++ b/src/agents/cli-runner.ts @@ -199,6 +199,7 @@ export async function runPreparedCliAgent( sessionId: params.sessionId, provider: params.provider, model: context.modelId, + resolvedRef: `${params.provider}/${context.modelId}`, assistantTexts, ...(lastAssistant ? { lastAssistant } : {}), ...(output.usage ? { usage: output.usage } : {}), diff --git a/src/agents/embedded-runner.ts b/src/agents/embedded-runner.ts new file mode 100644 index 00000000000..7f7ff9b754f --- /dev/null +++ b/src/agents/embedded-runner.ts @@ -0,0 +1,17 @@ +export { + abortEmbeddedAgentRun, + compactEmbeddedAgentSession, + isEmbeddedAgentRunActive, + isEmbeddedAgentRunStreaming, + queueEmbeddedAgentMessage, + resolveActiveEmbeddedAgentRunSessionId, + resolveEmbeddedSessionLane, + runEmbeddedAgent, + waitForEmbeddedAgentRunEnd, +} from "./pi-embedded-runner.js"; +export type { + EmbeddedAgentCompactResult, + EmbeddedAgentMeta, + EmbeddedAgentRunMeta, + EmbeddedAgentRunResult, +} from "./pi-embedded-runner.js"; diff --git a/src/agents/harness/selection.test.ts b/src/agents/harness/selection.test.ts index 59c246da792..561b66273c6 100644 --- a/src/agents/harness/selection.test.ts +++ b/src/agents/harness/selection.test.ts @@ -138,6 +138,34 @@ describe("runAgentHarnessAttemptWithFallback", () => { expect(piRunAttempt).not.toHaveBeenCalled(); }); + it("annotates non-ok harness result classifications for outer model fallback", async () => { + process.env.OPENCLAW_AGENT_RUNTIME = "auto"; + const classify = vi.fn(() => "empty" as const); + registerAgentHarness( + { + id: "codex", + label: "Classifying Codex", + supports: (ctx) => + ctx.provider === "codex" ? { supported: true, priority: 100 } : { supported: false }, + runAttempt: vi.fn(async () => createAttemptResult("codex")), + classify, + }, + { ownerPluginId: "codex" }, + ); + + const params = createAttemptParams(); + const result = await runAgentHarnessAttemptWithFallback(params); + + expect(classify).toHaveBeenCalledWith( + expect.objectContaining({ sessionIdUsed: "codex" }), + params, + ); + expect(result).toMatchObject({ + agentHarnessId: "codex", + agentHarnessResultClassification: "empty", + }); + }); + it("honors env fallback override over config fallback", async () => { process.env.OPENCLAW_AGENT_RUNTIME = "auto"; process.env.OPENCLAW_AGENT_HARNESS_FALLBACK = "none"; diff --git a/src/agents/harness/selection.ts b/src/agents/harness/selection.ts index 8d9b151c24d..63ac2fe55bd 100644 --- a/src/agents/harness/selection.ts +++ b/src/agents/harness/selection.ts @@ -189,12 +189,12 @@ export async function runAgentHarnessAttemptWithFallback( }); if (harness.id === "pi") { const result = await harness.runAttempt(params); - return { ...result, agentHarnessId: harness.id }; + return applyHarnessResultClassification(harness, result, params); } try { const result = await harness.runAttempt(params); - return { ...result, agentHarnessId: harness.id }; + return applyHarnessResultClassification(harness, result, params); } catch (error) { log.warn(`${harness.label} failed; not falling back to embedded PI backend`, { harnessId: harness.id, @@ -263,6 +263,22 @@ function logAgentHarnessSelection( }); } +function applyHarnessResultClassification( + harness: AgentHarness, + result: EmbeddedRunAttemptResult, + params: EmbeddedRunAttemptParams, +): EmbeddedRunAttemptResult { + const classification = harness.classify?.(result, params); + if (!classification || classification === "ok") { + return { ...result, agentHarnessId: harness.id }; + } + return { + ...result, + agentHarnessId: harness.id, + agentHarnessResultClassification: classification, + }; +} + function resolvePinnedAgentHarnessPolicy( agentHarnessId: string | undefined, ): AgentHarnessPolicy | undefined { diff --git a/src/agents/harness/types.ts b/src/agents/harness/types.ts index 47dc9371761..9a8f797e181 100644 --- a/src/agents/harness/types.ts +++ b/src/agents/harness/types.ts @@ -27,12 +27,20 @@ export type AgentHarnessResetParams = { reason?: "new" | "reset" | "idle" | "daily" | "compaction" | "deleted" | "unknown"; }; +export type AgentHarnessResultClassification = + | "ok" + | NonNullable; + export type AgentHarness = { id: string; label: string; pluginId?: string; supports(ctx: AgentHarnessSupportContext): AgentHarnessSupport; runAttempt(params: AgentHarnessAttemptParams): Promise; + classify?( + result: AgentHarnessAttemptResult, + ctx: AgentHarnessAttemptParams, + ): AgentHarnessResultClassification | undefined; compact?(params: AgentHarnessCompactParams): Promise; reset?(params: AgentHarnessResetParams): Promise | void; dispose?(): Promise | void; diff --git a/src/agents/model-fallback.test.ts b/src/agents/model-fallback.test.ts index c71df3ca859..d3eda8ba795 100644 --- a/src/agents/model-fallback.test.ts +++ b/src/agents/model-fallback.test.ts @@ -561,6 +561,46 @@ describe("runWithModelFallback", () => { ).toBeNull(); }); + it("uses harness-owned terminal classification for GPT-5 fallback", () => { + const runResult: EmbeddedPiRunResult = { + payloads: [], + meta: { + durationMs: 1, + agentHarnessResultClassification: "planning-only", + }, + }; + + expect( + classifyEmbeddedPiRunResultForModelFallback({ + provider: "codex", + model: "gpt-5.4", + result: runResult, + }), + ).toMatchObject({ + code: "planning_only_result", + reason: "format", + }); + }); + + it("keeps aborted harness-classified GPT-5 runs out of fallback", () => { + const runResult: EmbeddedPiRunResult = { + payloads: [], + meta: { + durationMs: 1, + aborted: true, + agentHarnessResultClassification: "empty", + }, + }; + + expect( + classifyEmbeddedPiRunResultForModelFallback({ + provider: "codex", + model: "gpt-5.4", + result: runResult, + }), + ).toBeNull(); + }); + it("passes original unknown errors to onError during fallback", async () => { const cfg = makeCfg(); const unknownError = new Error("provider misbehaved"); diff --git a/src/agents/openai-ws-stream.test.ts b/src/agents/openai-ws-stream.test.ts index 9fc8651d9e5..4d63d14cda6 100644 --- a/src/agents/openai-ws-stream.test.ts +++ b/src/agents/openai-ws-stream.test.ts @@ -47,6 +47,7 @@ const { MockManager } = vi.hoisted(() => { sentEvents: unknown[] = []; connectCallCount = 0; + connectApiKeys: string[] = []; closeCallCount = 0; options: unknown; @@ -69,6 +70,7 @@ const { MockManager } = vi.hoisted(() => { async connect(_apiKey: string): Promise { this.connectCallCount++; + this.connectApiKeys.push(_apiKey); if (this.connectShouldFail || _globalConnectShouldFail) { throw new Error("Mock connect failure"); } @@ -3628,6 +3630,61 @@ describe("releaseWsSession / hasWsSession", () => { expect(manager.closeCallCount).toBe(1); }); + it("pools cleanly released sessions behind the explicit pool flag", async () => { + const streamFn = createOpenAIWebSocketStreamFn("sk-test", "registry-test"); + const stream = streamFn( + { + api: "openai-responses", + provider: "openai", + id: "gpt-5.4", + contextWindow: 128000, + maxTokens: 4096, + reasoning: false, + input: ["text"], + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + name: "GPT-5.4", + } as Parameters[0], + { + systemPrompt: "test", + messages: [userMsg("Hi") as Parameters[0][number]], + tools: [], + } as Parameters[1], + ); + + await new Promise((r) => setImmediate(r)); + const manager = MockManager.lastInstance!; + manager.simulateEvent({ + type: "response.completed", + response: makeResponseObject("resp-pooled", "done"), + }); + for await (const _ of await resolveStream(stream)) { + // consume + } + + vi.useFakeTimers(); + try { + releaseWsSession("registry-test", { + allowPool: true, + env: { + OPENCLAW_OPENAI_WS_POOL: "1", + OPENCLAW_OPENAI_WS_SESSION_POOL_IDLE_MS: "1000", + } as NodeJS.ProcessEnv, + }); + + expect(hasWsSession("registry-test")).toBe(true); + expect(manager.closeCallCount).toBe(0); + + await vi.advanceTimersByTimeAsync(999); + expect(hasWsSession("registry-test")).toBe(true); + + await vi.advanceTimersByTimeAsync(1); + expect(hasWsSession("registry-test")).toBe(false); + expect(manager.closeCallCount).toBe(1); + } finally { + vi.useRealTimers(); + } + }); + it("releaseWsSession is a no-op for unknown sessions", () => { expect(() => releaseWsSession("nonexistent-session")).not.toThrow(); }); @@ -3712,6 +3769,75 @@ describe("releaseWsSession / hasWsSession", () => { // consume } }); + + it("recreates the cached manager when the API key changes for the same session", async () => { + const sessionId = "registry-test"; + const firstStreamFn = createOpenAIWebSocketStreamFn("sk-first", sessionId); + const firstStream = firstStreamFn( + { + api: "openai-responses", + provider: "openai", + id: "gpt-5.4", + contextWindow: 128000, + maxTokens: 4096, + reasoning: false, + input: ["text"], + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + name: "GPT-5.4", + } as Parameters[0], + { + systemPrompt: "test", + messages: [userMsg("Hi") as Parameters[0][number]], + tools: [], + } as Parameters[1], + ); + + await new Promise((r) => setImmediate(r)); + const firstManager = MockManager.lastInstance!; + expect(firstManager.connectApiKeys).toEqual(["sk-first"]); + firstManager.simulateEvent({ + type: "response.completed", + response: makeResponseObject("resp-first-key", "done"), + }); + for await (const _ of await resolveStream(firstStream)) { + // consume + } + + const secondStreamFn = createOpenAIWebSocketStreamFn("sk-second", sessionId); + const secondStream = secondStreamFn( + { + api: "openai-responses", + provider: "openai", + id: "gpt-5.4", + contextWindow: 128000, + maxTokens: 4096, + reasoning: false, + input: ["text"], + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + name: "GPT-5.4", + } as Parameters[0], + { + systemPrompt: "test", + messages: [userMsg("Again") as Parameters[0][number]], + tools: [], + } as Parameters[1], + ); + + await new Promise((r) => setImmediate(r)); + expect(MockManager.instances).toHaveLength(2); + expect(firstManager.closeCallCount).toBe(1); + const secondManager = MockManager.lastInstance!; + expect(secondManager).not.toBe(firstManager); + expect(secondManager.connectApiKeys).toEqual(["sk-second"]); + + secondManager.simulateEvent({ + type: "response.completed", + response: makeResponseObject("resp-second-key", "done"), + }); + for await (const _ of await resolveStream(secondStream)) { + // consume + } + }); }); describe("convertMessagesToInputItems — phase inheritance", () => { diff --git a/src/agents/openai-ws-stream.ts b/src/agents/openai-ws-stream.ts index 0a7c5cd66eb..6645d5a50e8 100644 --- a/src/agents/openai-ws-stream.ts +++ b/src/agents/openai-ws-stream.ts @@ -1,4 +1,4 @@ -import { randomUUID } from "node:crypto"; +import { createHash, randomUUID } from "node:crypto"; import type { StreamFn } from "@mariozechner/pi-agent-core"; import type { AssistantMessage, @@ -73,6 +73,7 @@ import { mergeTransportMetadata } from "./transport-stream-shared.js"; interface WsSession { manager: OpenAIWebSocketManager; managerConfigSignature: string; + authSignature: string; /** Number of messages that were in context.messages at the END of the last streamFn call. */ lastContextLength: number; /** True if the connection has been established at least once. */ @@ -81,6 +82,9 @@ interface WsSession { warmUpAttempted: boolean; /** True if the session is permanently broken (no more reconnect). */ broken: boolean; + /** Pending idle release timer when disabled-by-default pooling retains a session. */ + idleTimer?: ReturnType; + pooledUntil?: number; /** Session-scoped cool-down after repeated websocket failures. */ degradedUntil: number | null; degradeCooldownMs: number; @@ -201,20 +205,72 @@ function createEventStream(): AssistantMessageEventStream { // Public registry helpers // ───────────────────────────────────────────────────────────────────────────── +type ReleaseWsSessionOptions = { + allowPool?: boolean; + env?: NodeJS.ProcessEnv; +}; + +function resolveWsSessionPoolConfig(env: NodeJS.ProcessEnv = process.env): { + enabled: boolean; + idleMs: number; +} { + const enabled = + env.OPENCLAW_OPENAI_WS_POOL === "1" || env.OPENCLAW_OPENAI_WS_SESSION_POOL === "1"; + const rawIdleMs = Number(env.OPENCLAW_OPENAI_WS_SESSION_POOL_IDLE_MS); + const idleMs = Number.isFinite(rawIdleMs) + ? Math.min(300_000, Math.max(1_000, Math.trunc(rawIdleMs))) + : 30_000; + return { enabled, idleMs }; +} + +function clearWsSessionIdleTimer(session: WsSession): void { + if (!session.idleTimer) { + return; + } + clearTimeout(session.idleTimer); + session.idleTimer = undefined; + session.pooledUntil = undefined; +} + +function closeWsSession(sessionId: string, session: WsSession): void { + clearWsSessionIdleTimer(session); + try { + session.manager.close(); + } catch { + // Ignore close errors — connection may already be gone. + } + wsRegistry.delete(sessionId); +} + /** * Release and close the WebSocket session for the given sessionId. * Call this after the agent run completes to free the connection. */ -export function releaseWsSession(sessionId: string): void { +export function releaseWsSession(sessionId: string, options: ReleaseWsSessionOptions = {}): void { const session = wsRegistry.get(sessionId); - if (session) { - try { - session.manager.close(); - } catch { - // Ignore close errors — connection may already be gone. - } - wsRegistry.delete(sessionId); + if (!session) { + return; } + const pool = resolveWsSessionPoolConfig(options.env); + if ( + options.allowPool === true && + pool.enabled && + !session.broken && + session.manager.isConnected() + ) { + clearWsSessionIdleTimer(session); + session.pooledUntil = Date.now() + pool.idleMs; + session.idleTimer = setTimeout(() => { + const current = wsRegistry.get(sessionId); + if (current === session) { + closeWsSession(sessionId, session); + } + }, pool.idleMs); + session.idleTimer.unref?.(); + log.debug(`[ws-stream] pooled websocket session=${sessionId} idleMs=${pool.idleMs}`); + return; + } + closeWsSession(sessionId, session); } /** @@ -292,6 +348,7 @@ function resetWsSession(params: { createManager: () => OpenAIWebSocketManager; preserveDegradeUntil?: boolean; }): void { + clearWsSessionIdleTimer(params.session); try { params.session.manager.close(); } catch { @@ -362,6 +419,10 @@ function resolveWsManagerConfigSignature( }); } +function resolveWsAuthSignature(apiKey: string): string { + return createHash("sha256").update(apiKey).digest("hex"); +} + const AZURE_OPENAI_PROVIDER_IDS = new Set(["azure-openai", "azure-openai-responses"]); const OPENAI_CODEX_PROVIDER_ID = "openai-codex"; @@ -655,6 +716,7 @@ export function createOpenAIWebSocketStreamFn( while (true) { let session = wsRegistry.get(sessionId); + const authSignature = resolveWsAuthSignature(apiKey); const managerConfigSignature = resolveWsManagerConfigSignature( opts.managerOptions, sessionHeaders, @@ -664,6 +726,7 @@ export function createOpenAIWebSocketStreamFn( session = { manager, managerConfigSignature, + authSignature, lastContextLength: 0, everConnected: false, warmUpAttempted: false, @@ -672,13 +735,20 @@ export function createOpenAIWebSocketStreamFn( degradeCooldownMs: wsSessionPolicy.degradeCooldownMs, }; wsRegistry.set(sessionId, session); - } else if (session.managerConfigSignature !== managerConfigSignature) { + } else if ( + session.managerConfigSignature !== managerConfigSignature || + session.authSignature !== authSignature + ) { + clearWsSessionIdleTimer(session); resetWsSession({ session, createManager: () => createWsManager(opts.managerOptions, sessionHeaders), }); session.managerConfigSignature = managerConfigSignature; + session.authSignature = authSignature; session.degradeCooldownMs = wsSessionPolicy.degradeCooldownMs; + } else { + clearWsSessionIdleTimer(session); } if (transport !== "websocket" && isWsSessionDegraded(session)) { diff --git a/src/agents/pi-embedded-runner-extraparams.test.ts b/src/agents/pi-embedded-runner-extraparams.test.ts index 3a286cb6035..9fdb12a3478 100644 --- a/src/agents/pi-embedded-runner-extraparams.test.ts +++ b/src/agents/pi-embedded-runner-extraparams.test.ts @@ -8,6 +8,7 @@ vi.mock("../plugins/provider-hook-runtime.js", () => ({ buildHookProviderCacheKey: () => "test-provider-hook-cache-key", }, prepareProviderExtraParams: () => undefined, + resolveProviderExtraParamsForTransport: () => undefined, resetProviderRuntimeHookCacheForTest: () => {}, wrapProviderStreamFn: (params: { context: { streamFn?: StreamFn } }) => params.context.streamFn, })); @@ -282,6 +283,7 @@ import { import { applyExtraParamsToAgent, resolveAgentTransportOverride, + resolveExplicitSettingsTransport, resolvePreparedExtraParams, } from "./pi-embedded-runner/extra-params.js"; import { createGoogleThinkingPayloadWrapper } from "./pi-embedded-runner/google-stream-wrappers.js"; @@ -1909,6 +1911,118 @@ describe("applyExtraParamsToAgent", () => { expect(effectiveExtraParams.transport).toBe("auto"); }); + it("composes transport extra-param hooks after provider preparation", () => { + const resolveProviderExtraParamsForTransport = vi.fn((_params) => ({ + patch: { + hookApplied: true, + }, + })); + extraParamsTesting.setProviderRuntimeDepsForTest({ + prepareProviderExtraParams: (params) => ({ + ...params.context.extraParams, + transport: "websocket", + }), + resolveProviderExtraParamsForTransport, + wrapProviderStreamFn: (params) => params.context.streamFn, + }); + + const model = { + api: "openai-responses", + provider: "openai", + id: "gpt-5", + } as Model<"openai-responses">; + const effectiveExtraParams = resolvePreparedExtraParams({ + cfg: undefined, + provider: "openai", + modelId: "gpt-5", + agentDir: "/tmp/agent", + workspaceDir: "/tmp/workspace", + model, + }); + + expect(effectiveExtraParams).toMatchObject({ + transport: "websocket", + hookApplied: true, + }); + expect(resolveProviderExtraParamsForTransport).toHaveBeenCalledWith( + expect.objectContaining({ + provider: "openai", + context: expect.objectContaining({ + model, + transport: "websocket", + agentDir: "/tmp/agent", + workspaceDir: "/tmp/workspace", + }), + }), + ); + }); + + it("passes explicit settings transport to transport extra-param hooks", () => { + const resolveProviderExtraParamsForTransport = vi.fn((_params) => ({ + patch: { + hookApplied: true, + }, + })); + extraParamsTesting.setProviderRuntimeDepsForTest({ + prepareProviderExtraParams: (params) => ({ + ...params.context.extraParams, + transport: "auto", + }), + resolveProviderExtraParamsForTransport, + wrapProviderStreamFn: (params) => params.context.streamFn, + }); + + const resolvedTransport = resolveExplicitSettingsTransport({ + settingsManager: { + getGlobalSettings: () => ({ transport: "websocket" }), + getProjectSettings: () => ({}), + }, + sessionTransport: "websocket", + }); + const effectiveExtraParams = resolvePreparedExtraParams({ + cfg: undefined, + provider: "openai", + modelId: "gpt-5", + resolvedTransport, + }); + + expect(effectiveExtraParams).toMatchObject({ + transport: "auto", + hookApplied: true, + }); + expect(resolveProviderExtraParamsForTransport).toHaveBeenCalledWith( + expect.objectContaining({ + context: expect.objectContaining({ + transport: "websocket", + }), + }), + ); + }); + + it("applies transport hook parallel_tool_calls patches to request payloads", () => { + extraParamsTesting.setProviderRuntimeDepsForTest({ + prepareProviderExtraParams: () => undefined, + resolveProviderExtraParamsForTransport: () => ({ + patch: { + parallel_tool_calls: true, + }, + }), + wrapProviderStreamFn: (params) => params.context.streamFn, + }); + const payload = runResponsesPayloadMutationCase({ + applyProvider: "test-openai", + applyModelId: "gpt-compatible", + model: { + api: "openai-responses", + provider: "test-openai", + id: "gpt-compatible", + } as Model<"openai-responses">, + payload: {}, + }); + + expect(payload.parallel_tool_calls).toBe(true); + }); + it("uses prepared transport when session settings did not explicitly set one", () => { const effectiveExtraParams = resolvePreparedExtraParams({ cfg: undefined, @@ -1945,6 +2059,27 @@ describe("applyExtraParamsToAgent", () => { ).toBeUndefined(); }); + it("resolves explicit settings transport from the active session transport", () => { + expect( + resolveExplicitSettingsTransport({ + settingsManager: { + getGlobalSettings: () => ({}), + getProjectSettings: () => ({}), + }, + sessionTransport: "websocket", + }), + ).toBeUndefined(); + expect( + resolveExplicitSettingsTransport({ + settingsManager: { + getGlobalSettings: () => ({ transport: "sse" }), + getProjectSettings: () => ({}), + }, + sessionTransport: "websocket", + }), + ).toBe("websocket"); + }); + it("strips prototype pollution keys from extra params overrides", () => { const effectiveExtraParams = resolvePreparedExtraParams({ cfg: undefined, diff --git a/src/agents/pi-embedded-runner.run-embedded-pi-agent.auth-profile-rotation.e2e.test.ts b/src/agents/pi-embedded-runner.run-embedded-pi-agent.auth-profile-rotation.e2e.test.ts index b9b5b24ccb1..ca438b76769 100644 --- a/src/agents/pi-embedded-runner.run-embedded-pi-agent.auth-profile-rotation.e2e.test.ts +++ b/src/agents/pi-embedded-runner.run-embedded-pi-agent.auth-profile-rotation.e2e.test.ts @@ -1065,6 +1065,7 @@ describe("runEmbeddedPiAgent auth profile rotation", () => { }), ).rejects.toMatchObject({ name: "FailoverError", + profileId: "openai:p1", reason: "rate_limit", provider: "openai", model: "mock-1", diff --git a/src/agents/pi-embedded-runner/aliases.test.ts b/src/agents/pi-embedded-runner/aliases.test.ts new file mode 100644 index 00000000000..a9923dec93a --- /dev/null +++ b/src/agents/pi-embedded-runner/aliases.test.ts @@ -0,0 +1,19 @@ +import { describe, expect, it } from "vitest"; +import { runEmbeddedAgent as runEmbeddedAgentFromNeutralBarrel } from "../embedded-runner.js"; +import { + abortEmbeddedAgentRun, + abortEmbeddedPiRun, + compactEmbeddedAgentSession, + compactEmbeddedPiSession, + runEmbeddedAgent, + runEmbeddedPiAgent, +} from "../pi-embedded-runner.js"; + +describe("embedded runner compatibility aliases", () => { + it("keeps neutral embedded-agent aliases bound to the PI compatibility exports", () => { + expect(runEmbeddedAgent).toBe(runEmbeddedPiAgent); + expect(runEmbeddedAgentFromNeutralBarrel).toBe(runEmbeddedPiAgent); + expect(compactEmbeddedAgentSession).toBe(compactEmbeddedPiSession); + expect(abortEmbeddedAgentRun).toBe(abortEmbeddedPiRun); + }); +}); diff --git a/src/agents/pi-embedded-runner/extra-params.ts b/src/agents/pi-embedded-runner/extra-params.ts index cb0a0efe506..7b1f0d35dd1 100644 --- a/src/agents/pi-embedded-runner/extra-params.ts +++ b/src/agents/pi-embedded-runner/extra-params.ts @@ -6,9 +6,11 @@ import type { ThinkLevel } from "../../auto-reply/thinking.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { prepareProviderExtraParams as prepareProviderExtraParamsRuntime, + resolveProviderExtraParamsForTransport as resolveProviderExtraParamsForTransportRuntime, wrapProviderStreamFn as wrapProviderStreamFnRuntime, } from "../../plugins/provider-hook-runtime.js"; import type { ProviderRuntimeModel } from "../../plugins/provider-runtime-model.types.js"; +import { supportsGptParallelToolCallsPayload } from "../provider-api-families.js"; import { createGoogleThinkingPayloadWrapper } from "./google-stream-wrappers.js"; import { log } from "./logger.js"; import { createMinimaxThinkingDisabledWrapper } from "./minimax-stream-wrappers.js"; @@ -26,6 +28,7 @@ import { streamWithPayloadPatch } from "./stream-payload-utils.js"; const defaultProviderRuntimeDeps = { prepareProviderExtraParams: prepareProviderExtraParamsRuntime, + resolveProviderExtraParamsForTransport: resolveProviderExtraParamsForTransportRuntime, wrapProviderStreamFn: wrapProviderStreamFnRuntime, }; @@ -39,12 +42,17 @@ export const __testing = { ): void { providerRuntimeDeps.prepareProviderExtraParams = deps?.prepareProviderExtraParams ?? defaultProviderRuntimeDeps.prepareProviderExtraParams; + providerRuntimeDeps.resolveProviderExtraParamsForTransport = + deps?.resolveProviderExtraParamsForTransport ?? + defaultProviderRuntimeDeps.resolveProviderExtraParamsForTransport; providerRuntimeDeps.wrapProviderStreamFn = deps?.wrapProviderStreamFn ?? defaultProviderRuntimeDeps.wrapProviderStreamFn; }, resetProviderRuntimeDepsForTest(): void { providerRuntimeDeps.prepareProviderExtraParams = defaultProviderRuntimeDeps.prepareProviderExtraParams; + providerRuntimeDeps.resolveProviderExtraParamsForTransport = + defaultProviderRuntimeDeps.resolveProviderExtraParamsForTransport; providerRuntimeDeps.wrapProviderStreamFn = defaultProviderRuntimeDeps.wrapProviderStreamFn; }, }; @@ -111,7 +119,7 @@ type CacheRetentionStreamOptions = Partial & { cachedContent?: string; openaiWsWarmup?: boolean; }; -type SupportedTransport = Exclude; +export type SupportedTransport = Exclude; function resolveSupportedTransport(value: unknown): SupportedTransport | undefined { return value === "sse" || value === "websocket" || value === "auto" ? value : undefined; @@ -125,10 +133,14 @@ export function resolvePreparedExtraParams(params: { cfg: OpenClawConfig | undefined; provider: string; modelId: string; + agentDir?: string; + workspaceDir?: string; extraParamsOverride?: Record; thinkingLevel?: ThinkLevel; agentId?: string; resolvedExtraParams?: Record; + model?: ProviderRuntimeModel; + resolvedTransport?: SupportedTransport; }): Record { const resolvedExtraParams = params.resolvedExtraParams ?? @@ -159,19 +171,38 @@ export function resolvePreparedExtraParams(params: { merged.cachedContent = resolvedCachedContent; delete merged.cached_content; } - return ( + const prepared = providerRuntimeDeps.prepareProviderExtraParams({ provider: params.provider, config: params.cfg, + workspaceDir: params.workspaceDir, context: { config: params.cfg, + agentDir: params.agentDir, + workspaceDir: params.workspaceDir, provider: params.provider, modelId: params.modelId, extraParams: merged, thinkingLevel: params.thinkingLevel, }, - }) ?? merged - ); + }) ?? merged; + const transportPatch = providerRuntimeDeps.resolveProviderExtraParamsForTransport({ + provider: params.provider, + config: params.cfg, + workspaceDir: params.workspaceDir, + context: { + config: params.cfg, + agentDir: params.agentDir, + workspaceDir: params.workspaceDir, + provider: params.provider, + modelId: params.modelId, + extraParams: prepared, + thinkingLevel: params.thinkingLevel, + model: params.model, + transport: params.resolvedTransport ?? resolveSupportedTransport(prepared.transport), + }, + })?.patch; + return transportPatch ? { ...prepared, ...transportPatch } : prepared; } function sanitizeExtraParamsRecord( @@ -230,6 +261,21 @@ export function resolveAgentTransportOverride(params: { return resolveSupportedTransport(params.effectiveExtraParams?.transport); } +export function resolveExplicitSettingsTransport(params: { + settingsManager: Pick; + sessionTransport: unknown; +}): SupportedTransport | undefined { + const globalSettings = params.settingsManager.getGlobalSettings(); + const projectSettings = params.settingsManager.getProjectSettings(); + if ( + !hasExplicitTransportSetting(globalSettings) && + !hasExplicitTransportSetting(projectSettings) + ) { + return undefined; + } + return resolveSupportedTransport(params.sessionTransport); +} + function createStreamFnWithExtraParams( baseStreamFn: StreamFn | undefined, extraParams: Record | undefined, @@ -331,12 +377,7 @@ function createParallelToolCallsWrapper( ): StreamFn { const underlying = baseStreamFn ?? streamSimple; return (model, context, options) => { - if ( - model.api !== "openai-completions" && - model.api !== "openai-responses" && - model.api !== "openai-codex-responses" && - model.api !== "azure-openai-responses" - ) { + if (!supportsGptParallelToolCallsPayload(model.api)) { return underlying(model, context, options); } log.debug( @@ -415,7 +456,7 @@ function applyPostPluginStreamWrappers( ctx.agent.streamFn = createMinimaxThinkingDisabledWrapper(ctx.agent.streamFn); const rawParallelToolCalls = resolveAliasedParamValue( - [ctx.resolvedExtraParams, ctx.override], + [ctx.effectiveExtraParams, ctx.override], "parallel_tool_calls", "parallelToolCalls", ); @@ -452,6 +493,7 @@ export function applyExtraParamsToAgent( workspaceDir?: string, model?: ProviderRuntimeModel, agentDir?: string, + resolvedTransport?: SupportedTransport, ): { effectiveExtraParams: Record } { const resolvedExtraParams = resolveExtraParams({ cfg, @@ -472,7 +514,11 @@ export function applyExtraParamsToAgent( extraParamsOverride, thinkingLevel, agentId, + agentDir, + workspaceDir, resolvedExtraParams, + model, + resolvedTransport, }); const wrapperContext: ApplyExtraParamsContext = { agent, diff --git a/src/agents/pi-embedded-runner/result-fallback-classifier.ts b/src/agents/pi-embedded-runner/result-fallback-classifier.ts index 67f12df6aac..9373aa7b91c 100644 --- a/src/agents/pi-embedded-runner/result-fallback-classifier.ts +++ b/src/agents/pi-embedded-runner/result-fallback-classifier.ts @@ -47,6 +47,35 @@ function hasDeliberateSilentTerminalReply(result: EmbeddedPiRunResult): boolean ); } +function classifyHarnessResult(params: { + provider: string; + model: string; + result: EmbeddedPiRunResult; +}): ModelFallbackResultClassification { + switch (params.result.meta.agentHarnessResultClassification) { + case "empty": + return { + message: `${params.provider}/${params.model} ended without a visible assistant reply`, + reason: "format", + code: "empty_result", + }; + case "reasoning-only": + return { + message: `${params.provider}/${params.model} ended with reasoning only`, + reason: "format", + code: "reasoning_only_result", + }; + case "planning-only": + return { + message: `${params.provider}/${params.model} exhausted plan-only retries without taking action`, + reason: "format", + code: "planning_only_result", + }; + default: + return null; + } +} + export function classifyEmbeddedPiRunResultForModelFallback(params: { provider: string; model: string; @@ -69,6 +98,15 @@ export function classifyEmbeddedPiRunResultForModelFallback(params: { return null; } + const harnessClassification = classifyHarnessResult({ + provider: params.provider, + model: params.model, + result: params.result, + }); + if (harnessClassification) { + return harnessClassification; + } + const payloads = params.result.payloads ?? []; if (payloads.length === 0 && hasDeliberateSilentTerminalReply(params.result)) { return null; diff --git a/src/agents/pi-embedded-runner/run.overflow-compaction.harness.ts b/src/agents/pi-embedded-runner/run.overflow-compaction.harness.ts index 7faf8c63f71..3868c75d16f 100644 --- a/src/agents/pi-embedded-runner/run.overflow-compaction.harness.ts +++ b/src/agents/pi-embedded-runner/run.overflow-compaction.harness.ts @@ -371,6 +371,7 @@ export async function loadRunOverflowCompactionHarness(): Promise<{ vi.doMock("../../plugins/provider-runtime.js", () => ({ prepareProviderRuntimeAuth: mockedPrepareProviderRuntimeAuth, resolveProviderCapabilitiesWithPlugin: vi.fn(() => ({})), + resolveProviderAuthProfileId: vi.fn(() => undefined), prepareProviderExtraParams: vi.fn(async () => ({})), wrapProviderStreamFn: vi.fn((_cfg: unknown, _model: unknown, fn: unknown) => fn), })); diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index f9a925f017a..b2e71796121 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -10,6 +10,7 @@ import { sleepWithAbort } from "../../infra/backoff.js"; import { freezeDiagnosticTraceContext } from "../../infra/diagnostic-trace-context.js"; import { formatErrorMessage } from "../../infra/errors.js"; import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; +import { resolveProviderAuthProfileId } from "../../plugins/provider-runtime.js"; import { enqueueCommandInLane } from "../../process/command-queue.js"; import { normalizeOptionalString } from "../../shared/string-coerce.js"; import { sanitizeForLog } from "../../terminal/ansi.js"; @@ -448,10 +449,35 @@ export async function runEmbeddedPiAgent( provider, preferredProfile: preferredProfileId, }); + const providerPreferredProfileId = lockedProfileId + ? undefined + : resolveProviderAuthProfileId({ + provider, + config: params.config, + workspaceDir: resolvedWorkspace, + context: { + config: params.config, + agentDir, + workspaceDir: resolvedWorkspace, + provider, + modelId, + preferredProfileId, + lockedProfileId, + profileOrder, + authStore, + }, + }); + const providerOrderedProfiles = + providerPreferredProfileId && profileOrder.includes(providerPreferredProfileId) + ? [ + providerPreferredProfileId, + ...profileOrder.filter((profileId) => profileId !== providerPreferredProfileId), + ] + : profileOrder; const profileCandidates = lockedProfileId ? [lockedProfileId] - : profileOrder.length > 0 - ? profileOrder + : providerOrderedProfiles.length > 0 + ? providerOrderedProfiles : [undefined]; let profileIndex = 0; const traceAttempts: TraceAttempt[] = []; @@ -1786,6 +1812,7 @@ export async function runEmbeddedPiAgent( replayInvalid, livenessState, toolSummary: attemptToolSummary, + agentHarnessResultClassification: attempt.agentHarnessResultClassification, }, didSendViaMessagingTool: attempt.didSendViaMessagingTool, didSendDeterministicApprovalPrompt: attempt.didSendDeterministicApprovalPrompt, @@ -1947,6 +1974,7 @@ export async function runEmbeddedPiAgent( replayInvalid, livenessState, toolSummary: attemptToolSummary, + agentHarnessResultClassification: attempt.agentHarnessResultClassification, }, didSendViaMessagingTool: attempt.didSendViaMessagingTool, didSendDeterministicApprovalPrompt: attempt.didSendDeterministicApprovalPrompt, @@ -1995,6 +2023,7 @@ export async function runEmbeddedPiAgent( replayInvalid, livenessState, toolSummary: attemptToolSummary, + agentHarnessResultClassification: attempt.agentHarnessResultClassification, }, didSendViaMessagingTool: attempt.didSendViaMessagingTool, didSendDeterministicApprovalPrompt: attempt.didSendDeterministicApprovalPrompt, @@ -2105,6 +2134,7 @@ export async function runEmbeddedPiAgent( replayInvalid, livenessState, toolSummary: attemptToolSummary, + agentHarnessResultClassification: attempt.agentHarnessResultClassification, }, didSendViaMessagingTool: attempt.didSendViaMessagingTool, didSendDeterministicApprovalPrompt: attempt.didSendDeterministicApprovalPrompt, @@ -2163,6 +2193,7 @@ export async function runEmbeddedPiAgent( finalAssistantRawText, replayInvalid, livenessState, + agentHarnessResultClassification: attempt.agentHarnessResultClassification, // Handle client tool calls (OpenResponses hosted tools) // Propagate the LLM stop reason so callers (lifecycle events, // ACP bridge) can distinguish end_turn from max_tokens. diff --git a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.context-engine.test.ts b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.context-engine.test.ts index bf3365f1f2d..4fbf273db79 100644 --- a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.context-engine.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.context-engine.test.ts @@ -520,6 +520,8 @@ describe("runEmbeddedAttempt context engine sessionKey forwarding", () => { expect(flushMock).toHaveBeenCalledTimes(1); expect(disposeMock).toHaveBeenCalledTimes(1); expect(releaseMock).toHaveBeenCalledTimes(1); - expect(hoisted.releaseWsSessionMock).toHaveBeenCalledWith("embedded-session"); + expect(hoisted.releaseWsSessionMock).toHaveBeenCalledWith("embedded-session", { + allowPool: false, + }); }); }); diff --git a/src/agents/pi-embedded-runner/run/attempt.subscription-cleanup.ts b/src/agents/pi-embedded-runner/run/attempt.subscription-cleanup.ts index b860a1b9cff..ed5647dcf9c 100644 --- a/src/agents/pi-embedded-runner/run/attempt.subscription-cleanup.ts +++ b/src/agents/pi-embedded-runner/run/attempt.subscription-cleanup.ts @@ -24,7 +24,8 @@ export async function cleanupEmbeddedAttemptResources(params: { }) => Promise; session?: { agent?: unknown; dispose(): void }; sessionManager: unknown; - releaseWsSession: (sessionId: string) => void; + releaseWsSession: (sessionId: string, options?: { allowPool?: boolean }) => void; + allowWsSessionPool?: boolean; sessionId: string; bundleLspRuntime?: { dispose(): Promise | void }; sessionLock: { release(): Promise | void }; @@ -50,7 +51,7 @@ export async function cleanupEmbeddedAttemptResources(params: { /* best-effort */ } try { - params.releaseWsSession(params.sessionId); + params.releaseWsSession(params.sessionId, { allowPool: params.allowWsSessionPool === true }); } catch { /* best-effort */ } diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 39083fd79a7..c02a39a6b5f 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -141,7 +141,11 @@ import { resolveCompactionTimeoutMs } from "../compaction-safety-timeout.js"; import { runContextEngineMaintenance } from "../context-engine-maintenance.js"; import { applyFinalEffectiveToolPolicy } from "../effective-tool-policy.js"; import { buildEmbeddedExtensionFactories } from "../extensions.js"; -import { applyExtraParamsToAgent, resolveAgentTransportOverride } from "../extra-params.js"; +import { + applyExtraParamsToAgent, + resolveAgentTransportOverride, + resolveExplicitSettingsTransport, +} from "../extra-params.js"; import { prepareGooglePromptCacheStreamFn } from "../google-prompt-cache.js"; import { getDmHistoryLimitFromSessionKey, limitHistoryTurns } from "../history.js"; import { log } from "../logger.js"; @@ -224,7 +228,6 @@ import { import { buildAfterTurnRuntimeContext, buildAfterTurnRuntimeContextFromUsage, - mergeOrphanedTrailingUserPrompt, prependSystemPromptAddition, resolveAttemptFsWorkspaceOnly, resolveAttemptPrependSystemContext, @@ -274,6 +277,7 @@ import { pruneProcessedHistoryImages } from "./history-image-prune.js"; import { detectAndLoadPromptImages } from "./images.js"; import { buildAttemptReplayMetadata } from "./incomplete-turn.js"; import { resolveLlmIdleTimeoutMs, streamWithIdleTimeout } from "./llm-idle-timeout.js"; +import { resolveMessageMergeStrategy } from "./message-merge-strategy.js"; import { PREEMPTIVE_OVERFLOW_ERROR_TEXT, shouldPreemptivelyCompactBeforePrompt, @@ -1381,6 +1385,10 @@ export async function runEmbeddedAttempt( effectiveWorkspace, params.model, agentDir, + resolveExplicitSettingsTransport({ + settingsManager, + sessionTransport: activeSession.agent.transport, + }), ); const effectivePromptCacheRetention = resolveCacheRetention( effectiveExtraParams, @@ -2066,7 +2074,7 @@ export async function runEmbeddedAttempt( // Repair orphaned trailing user messages so new prompts don't violate role ordering. const leafEntry = sessionManager.getLeafEntry(); if (leafEntry?.type === "message" && leafEntry.message.role === "user") { - const orphanPromptMerge = mergeOrphanedTrailingUserPrompt({ + const orphanPromptMerge = resolveMessageMergeStrategy().mergeOrphanedTrailingUserPrompt({ prompt: effectivePrompt, trigger: params.trigger, leafMessage: leafEntry.message, @@ -2088,8 +2096,10 @@ export async function runEmbeddedAttempt( ? "Merged and removed" : "Removed already-queued" : "Preserved" - } orphaned user message ` + - `to prevent consecutive user turns. ` + + } orphaned user message` + + (orphanPromptMerge.removeLeaf + ? " to prevent consecutive user turns. " + : " without removing the active session leaf. ") + `runId=${params.runId} sessionId=${params.sessionId} trigger=${params.trigger}`; if (shouldWarnOnOrphanedUserRepair(params.trigger)) { log.warn(orphanRepairMessage); @@ -2694,6 +2704,7 @@ export async function runEmbeddedAttempt( sessionId: params.sessionId, provider: params.provider, model: params.modelId, + resolvedRef: `${params.provider}/${params.modelId}`, assistantTexts, lastAssistant, usage: attemptUsage, @@ -2841,6 +2852,8 @@ export async function runEmbeddedAttempt( session, sessionManager, releaseWsSession, + allowWsSessionPool: + !promptError && !aborted && !timedOut && !idleTimedOut && !timedOutDuringCompaction, sessionId: params.sessionId, bundleLspRuntime, sessionLock, diff --git a/src/agents/pi-embedded-runner/run/message-merge-strategy.test.ts b/src/agents/pi-embedded-runner/run/message-merge-strategy.test.ts new file mode 100644 index 00000000000..b6c4f6df2a2 --- /dev/null +++ b/src/agents/pi-embedded-runner/run/message-merge-strategy.test.ts @@ -0,0 +1,64 @@ +import { afterEach, describe, expect, it } from "vitest"; +import { + DEFAULT_MESSAGE_MERGE_STRATEGY_ID, + registerMessageMergeStrategyForTest, + resolveMessageMergeStrategy, + type MessageMergeStrategy, +} from "./message-merge-strategy.js"; + +let restoreStrategy: (() => void) | undefined; + +afterEach(() => { + restoreStrategy?.(); + restoreStrategy = undefined; +}); + +describe("message merge strategy registry", () => { + it("resolves the default orphan trailing user prompt strategy", () => { + const strategy = resolveMessageMergeStrategy(); + + expect(strategy.id).toBe(DEFAULT_MESSAGE_MERGE_STRATEGY_ID); + expect( + strategy.mergeOrphanedTrailingUserPrompt({ + prompt: "newest inbound message", + trigger: "user", + leafMessage: { content: "older active-turn message" }, + }), + ).toEqual({ + merged: true, + removeLeaf: true, + prompt: + "[Queued user message that arrived while the previous turn was still active]\n" + + "older active-turn message\n\nnewest inbound message", + }); + }); + + it("allows tests to override and restore the active strategy", () => { + const override: MessageMergeStrategy = { + id: DEFAULT_MESSAGE_MERGE_STRATEGY_ID, + mergeOrphanedTrailingUserPrompt: (params) => ({ + prompt: `override: ${params.prompt}`, + merged: false, + removeLeaf: false, + }), + }; + + restoreStrategy = registerMessageMergeStrategyForTest(override); + + expect( + resolveMessageMergeStrategy().mergeOrphanedTrailingUserPrompt({ + prompt: "next", + trigger: "manual", + leafMessage: { content: "previous" }, + }), + ).toEqual({ + prompt: "override: next", + merged: false, + removeLeaf: false, + }); + + restoreStrategy(); + restoreStrategy = undefined; + expect(resolveMessageMergeStrategy()).not.toBe(override); + }); +}); diff --git a/src/agents/pi-embedded-runner/run/message-merge-strategy.ts b/src/agents/pi-embedded-runner/run/message-merge-strategy.ts new file mode 100644 index 00000000000..1d6dd7ebbb9 --- /dev/null +++ b/src/agents/pi-embedded-runner/run/message-merge-strategy.ts @@ -0,0 +1,54 @@ +import { mergeOrphanedTrailingUserPrompt } from "./attempt.prompt-helpers.js"; +import type { EmbeddedRunAttemptParams } from "./types.js"; + +export type OrphanedTrailingUserPromptMergeParams = { + prompt: string; + trigger: EmbeddedRunAttemptParams["trigger"]; + leafMessage: { content?: unknown }; +}; + +export type OrphanedTrailingUserPromptMergeResult = { + prompt: string; + merged: boolean; + /** + * When false, the active session leaf is preserved. Use this only when the + * caller intentionally accepts that the next appended prompt may follow an + * existing user leaf; most providers reject consecutive user turns. + */ + removeLeaf: boolean; +}; + +export type MessageMergeStrategyId = "orphan-trailing-user-prompt"; + +export type MessageMergeStrategy = { + id: MessageMergeStrategyId; + mergeOrphanedTrailingUserPrompt: ( + params: OrphanedTrailingUserPromptMergeParams, + ) => OrphanedTrailingUserPromptMergeResult; +}; + +export const DEFAULT_MESSAGE_MERGE_STRATEGY_ID: MessageMergeStrategyId = + "orphan-trailing-user-prompt"; + +const defaultMessageMergeStrategy: MessageMergeStrategy = { + id: DEFAULT_MESSAGE_MERGE_STRATEGY_ID, + mergeOrphanedTrailingUserPrompt, +}; + +let activeMessageMergeStrategy = defaultMessageMergeStrategy; + +export function resolveMessageMergeStrategy(): MessageMergeStrategy { + return activeMessageMergeStrategy; +} + +function registerMessageMergeStrategy(strategy: MessageMergeStrategy): () => void { + const previous = activeMessageMergeStrategy; + activeMessageMergeStrategy = strategy; + return () => { + activeMessageMergeStrategy = previous; + }; +} + +export function registerMessageMergeStrategyForTest(strategy: MessageMergeStrategy): () => void { + return registerMessageMergeStrategy(strategy); +} diff --git a/src/agents/pi-embedded-runner/run/types.ts b/src/agents/pi-embedded-runner/run/types.ts index b9a74cd1f02..43cd28eca78 100644 --- a/src/agents/pi-embedded-runner/run/types.ts +++ b/src/agents/pi-embedded-runner/run/types.ts @@ -75,6 +75,7 @@ export type EmbeddedRunAttemptResult = { sessionIdUsed: string; diagnosticTrace?: DiagnosticTraceContext; agentHarnessId?: string; + agentHarnessResultClassification?: "empty" | "reasoning-only" | "planning-only"; bootstrapPromptWarningSignaturesSeen?: string[]; bootstrapPromptWarningSignature?: string; systemPromptReport?: SessionSystemPromptReport; diff --git a/src/agents/pi-embedded-runner/types.ts b/src/agents/pi-embedded-runner/types.ts index 1b848357602..f9c1badaa36 100644 --- a/src/agents/pi-embedded-runner/types.ts +++ b/src/agents/pi-embedded-runner/types.ts @@ -106,6 +106,7 @@ export type EmbeddedPiRunMeta = { finalAssistantRawText?: string; replayInvalid?: boolean; livenessState?: EmbeddedRunLivenessState; + agentHarnessResultClassification?: "empty" | "reasoning-only" | "planning-only"; error?: { kind: | "context_overflow" diff --git a/src/agents/provider-api-families.test.ts b/src/agents/provider-api-families.test.ts new file mode 100644 index 00000000000..fa77e7f5391 --- /dev/null +++ b/src/agents/provider-api-families.test.ts @@ -0,0 +1,18 @@ +import { describe, expect, it } from "vitest"; +import { supportsGptParallelToolCallsPayload } from "./provider-api-families.js"; + +describe("provider api families", () => { + it.each([ + "openai-completions", + "openai-responses", + "openai-codex-responses", + "azure-openai-responses", + ])("classifies %s as supporting the GPT parallel_tool_calls payload patch", (api) => { + expect(supportsGptParallelToolCallsPayload(api)).toBe(true); + }); + + it("rejects unrelated APIs", () => { + expect(supportsGptParallelToolCallsPayload("anthropic-messages")).toBe(false); + expect(supportsGptParallelToolCallsPayload(undefined)).toBe(false); + }); +}); diff --git a/src/agents/provider-api-families.ts b/src/agents/provider-api-families.ts new file mode 100644 index 00000000000..6be3066056c --- /dev/null +++ b/src/agents/provider-api-families.ts @@ -0,0 +1,10 @@ +const GPT_PARALLEL_TOOL_CALLS_APIS = new Set([ + "openai-completions", + "openai-responses", + "openai-codex-responses", + "azure-openai-responses", +]); + +export function supportsGptParallelToolCallsPayload(api: unknown): boolean { + return typeof api === "string" && GPT_PARALLEL_TOOL_CALLS_APIS.has(api); +} diff --git a/src/auto-reply/reply/followup-runner.test.ts b/src/auto-reply/reply/followup-runner.test.ts index c24815e73db..ad62c65df89 100644 --- a/src/auto-reply/reply/followup-runner.test.ts +++ b/src/auto-reply/reply/followup-runner.test.ts @@ -13,6 +13,7 @@ const isRoutableChannelMock = vi.fn(); const runPreflightCompactionIfNeededMock = vi.fn(); const resolveCommandSecretRefsViaGatewayMock = vi.fn(); const resolveQueuedReplyExecutionConfigMock = vi.fn(); +const resolveProviderFollowupFallbackRouteMock = vi.fn(); let resolveQueuedReplyExecutionConfigActual: | (typeof import("./agent-runner-utils.js"))["resolveQueuedReplyExecutionConfig"] | undefined; @@ -281,6 +282,16 @@ async function loadFreshFollowupRunnerModuleForTest() { isRoutableChannel: (...args: unknown[]) => isRoutableChannelMock(...args), routeReply: (...args: unknown[]) => routeReplyMock(...args), })); + vi.doMock("../../plugins/provider-runtime.js", async () => { + const actual = await vi.importActual( + "../../plugins/provider-runtime.js", + ); + return { + ...actual, + resolveProviderFollowupFallbackRoute: (...args: unknown[]) => + resolveProviderFollowupFallbackRouteMock(...args), + }; + }); vi.doMock("./agent-runner-utils.js", async () => { const actual = await vi.importActual("./agent-runner-utils.js"); @@ -358,6 +369,8 @@ beforeEach(() => { runPreflightCompactionIfNeededMock.mockReset(); resolveCommandSecretRefsViaGatewayMock.mockReset(); resolveQueuedReplyExecutionConfigMock.mockReset(); + resolveProviderFollowupFallbackRouteMock.mockReset(); + resolveProviderFollowupFallbackRouteMock.mockReturnValue(undefined); const resolveQueuedReplyExecutionConfig = resolveQueuedReplyExecutionConfigActual; if (!resolveQueuedReplyExecutionConfig) { throw new Error("resolveQueuedReplyExecutionConfig mock not initialized"); @@ -1348,6 +1361,55 @@ describe("createFollowupRunner messaging tool dedupe", () => { expect(onBlockReply).toHaveBeenCalledWith(expect.objectContaining({ text: "hello world!" })); }); + it("lets provider followup route hooks force dispatcher delivery", async () => { + resolveProviderFollowupFallbackRouteMock.mockReturnValue({ + route: "dispatcher", + reason: "operator-visible review copy", + }); + const { onBlockReply } = await runMessagingCase({ + agentResult: { payloads: [{ text: "hello world!" }] }, + queued: { + ...baseQueuedRun("webchat"), + originatingChannel: "discord", + originatingTo: "channel:C1", + } as FollowupRun, + }); + + expect(routeReplyMock).not.toHaveBeenCalled(); + expect(onBlockReply).toHaveBeenCalledTimes(1); + expect(onBlockReply).toHaveBeenCalledWith(expect.objectContaining({ text: "hello world!" })); + expect(resolveProviderFollowupFallbackRouteMock).toHaveBeenCalledWith( + expect.objectContaining({ + provider: "anthropic", + context: expect.objectContaining({ + provider: "anthropic", + modelId: "claude", + originRoutable: true, + dispatcherAvailable: true, + payload: expect.objectContaining({ text: "hello world!" }), + }), + }), + ); + }); + + it("lets provider followup route hooks drop payloads explicitly", async () => { + resolveProviderFollowupFallbackRouteMock.mockReturnValue({ + route: "drop", + reason: "already delivered out of band", + }); + const { onBlockReply } = await runMessagingCase({ + agentResult: { payloads: [{ text: "hello world!" }] }, + queued: { + ...baseQueuedRun("webchat"), + originatingChannel: "discord", + originatingTo: "channel:C1", + } as FollowupRun, + }); + + expect(routeReplyMock).not.toHaveBeenCalled(); + expect(onBlockReply).not.toHaveBeenCalled(); + }); + it("falls back to dispatcher when same-channel origin routing fails", async () => { routeReplyMock.mockResolvedValueOnce({ ok: false, diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index 903daa6724a..f84f64af383 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -16,6 +16,7 @@ import type { TypingMode } from "../../config/types.js"; import { logVerbose } from "../../globals.js"; import { emitAgentEvent, registerAgentRunContext } from "../../infra/agent-events.js"; import { formatErrorMessage } from "../../infra/errors.js"; +import { resolveProviderFollowupFallbackRoute } from "../../plugins/provider-runtime.js"; import { defaultRuntime } from "../../runtime.js"; import { isInternalMessageChannel } from "../../utils/message-channel.js"; import { stripHeartbeatToken } from "../heartbeat.js"; @@ -74,7 +75,11 @@ export function createFollowupRunner(params: { * session's current dispatcher. This ensures replies go back to * where the message originated. */ - const sendFollowupPayloads = async (payloads: ReplyPayload[], queued: FollowupRun) => { + const sendFollowupPayloads = async ( + payloads: ReplyPayload[], + queued: FollowupRun, + resolvedRun: { provider: string; modelId: string }, + ) => { // Check if we should route to originating channel. const { originatingChannel, originatingTo } = queued; const runtimeConfig = resolveQueuedReplyRuntimeConfig(queued.run.config); @@ -99,10 +104,43 @@ export function createFollowupRunner(params: { ) { continue; } + const providerRoute = resolveProviderFollowupFallbackRoute({ + provider: resolvedRun.provider, + config: runtimeConfig, + workspaceDir: queued.run.workspaceDir, + context: { + config: runtimeConfig, + agentDir: queued.run.agentDir, + workspaceDir: queued.run.workspaceDir, + provider: resolvedRun.provider, + modelId: resolvedRun.modelId, + payload, + originatingChannel, + originatingTo, + originRoutable: Boolean(shouldRouteToOriginating), + dispatcherAvailable: Boolean(opts?.onBlockReply), + }, + }); + if (providerRoute?.route === "drop") { + logVerbose( + `followup queue: provider hook dropped payload route reason=${providerRoute.reason ?? "unspecified"}`, + ); + continue; + } + const deliveryRoute = + providerRoute?.route === "origin" && shouldRouteToOriginating + ? "origin" + : providerRoute?.route === "dispatcher" && opts?.onBlockReply + ? "dispatcher" + : shouldRouteToOriginating + ? "origin" + : opts?.onBlockReply + ? "dispatcher" + : undefined; await typingSignals.signalTextDelta(payload.text); // Route to originating channel if set, otherwise fall back to dispatcher. - if (shouldRouteToOriginating) { + if (deliveryRoute === "origin" && isRoutableChannel(originatingChannel) && originatingTo) { const result = await routeReply({ payload, channel: originatingChannel, @@ -145,7 +183,7 @@ export function createFollowupRunner(params: { routedAnyCrossChannelPayloadToOrigin = true; } } - } else if (opts?.onBlockReply) { + } else if (deliveryRoute === "dispatcher" && opts?.onBlockReply) { await opts.onBlockReply(payload); } } @@ -438,7 +476,10 @@ export function createFollowupRunner(params: { } } - await sendFollowupPayloads(finalPayloads, effectiveQueued); + await sendFollowupPayloads(finalPayloads, effectiveQueued, { + provider: providerUsed, + modelId: modelUsed, + }); } finally { replyOperation.complete(); // Both signals are required for the typing controller to clean up. diff --git a/src/plugin-sdk/agent-harness-runtime.ts b/src/plugin-sdk/agent-harness-runtime.ts index 438665287f1..75a28747a34 100644 --- a/src/plugin-sdk/agent-harness-runtime.ts +++ b/src/plugin-sdk/agent-harness-runtime.ts @@ -14,6 +14,7 @@ export type { AgentHarnessAttemptResult, AgentHarnessCompactParams, AgentHarnessCompactResult, + AgentHarnessResultClassification, AgentHarnessResetParams, AgentHarnessSupport, AgentHarnessSupportContext, diff --git a/src/plugins/hook-types.ts b/src/plugins/hook-types.ts index 280d6da360e..38ee412cac8 100644 --- a/src/plugins/hook-types.ts +++ b/src/plugins/hook-types.ts @@ -192,6 +192,14 @@ export type PluginHookLlmOutputEvent = { sessionId: string; provider: string; model: string; + /** + * Fully resolved provider/model ref used for the call. + * + * This intentionally keeps the provider prefix so operator tooling can + * distinguish e.g. openai-codex/gpt-5.4 from codex/gpt-5.4 even when display + * names collapse to just the model id. + */ + resolvedRef?: string; assistantTexts: string[]; lastAssistant?: unknown; usage?: { diff --git a/src/plugins/provider-hook-runtime.ts b/src/plugins/provider-hook-runtime.ts index 3f8e6c1c531..6bfed15e245 100644 --- a/src/plugins/provider-hook-runtime.ts +++ b/src/plugins/provider-hook-runtime.ts @@ -6,7 +6,11 @@ import { resolvePluginCacheInputs } from "./roots.js"; import { getActivePluginRegistryWorkspaceDirFromState } from "./runtime-state.js"; import type { ProviderPlugin, + ProviderExtraParamsForTransportContext, ProviderPrepareExtraParamsContext, + ProviderResolveAuthProfileIdContext, + ProviderFollowupFallbackRouteContext, + ProviderFollowupFallbackRouteResult, ProviderWrapStreamFnContext, } from "./types.js"; @@ -182,6 +186,37 @@ export function prepareProviderExtraParams(params: { return resolveProviderRuntimePlugin(params)?.prepareExtraParams?.(params.context) ?? undefined; } +export function resolveProviderExtraParamsForTransport(params: { + provider: string; + config?: OpenClawConfig; + workspaceDir?: string; + env?: NodeJS.ProcessEnv; + context: ProviderExtraParamsForTransportContext; +}) { + return resolveProviderHookPlugin(params)?.extraParamsForTransport?.(params.context) ?? undefined; +} + +export function resolveProviderAuthProfileId(params: { + provider: string; + config?: OpenClawConfig; + workspaceDir?: string; + env?: NodeJS.ProcessEnv; + context: ProviderResolveAuthProfileIdContext; +}): string | undefined { + const resolved = resolveProviderHookPlugin(params)?.resolveAuthProfileId?.(params.context); + return typeof resolved === "string" && resolved.trim() ? resolved.trim() : undefined; +} + +export function resolveProviderFollowupFallbackRoute(params: { + provider: string; + config?: OpenClawConfig; + workspaceDir?: string; + env?: NodeJS.ProcessEnv; + context: ProviderFollowupFallbackRouteContext; +}): ProviderFollowupFallbackRouteResult | undefined { + return resolveProviderHookPlugin(params)?.followupFallbackRoute?.(params.context) ?? undefined; +} + export function wrapProviderStreamFn(params: { provider: string; config?: OpenClawConfig; diff --git a/src/plugins/provider-runtime.test.ts b/src/plugins/provider-runtime.test.ts index 39fe1230a9d..d515e2e8656 100644 --- a/src/plugins/provider-runtime.test.ts +++ b/src/plugins/provider-runtime.test.ts @@ -53,7 +53,10 @@ let applyProviderResolvedModelCompatWithPlugins: typeof import("./provider-runti let applyProviderResolvedTransportWithPlugin: typeof import("./provider-runtime.js").applyProviderResolvedTransportWithPlugin; let normalizeProviderTransportWithPlugin: typeof import("./provider-runtime.js").normalizeProviderTransportWithPlugin; let prepareProviderExtraParams: typeof import("./provider-runtime.js").prepareProviderExtraParams; +let resolveProviderAuthProfileId: typeof import("./provider-runtime.js").resolveProviderAuthProfileId; let resolveProviderConfigApiKeyWithPlugin: typeof import("./provider-runtime.js").resolveProviderConfigApiKeyWithPlugin; +let resolveProviderExtraParamsForTransport: typeof import("./provider-runtime.js").resolveProviderExtraParamsForTransport; +let resolveProviderFollowupFallbackRoute: typeof import("./provider-runtime.js").resolveProviderFollowupFallbackRoute; let resolveProviderStreamFn: typeof import("./provider-runtime.js").resolveProviderStreamFn; let resolveProviderCacheTtlEligibility: typeof import("./provider-runtime.js").resolveProviderCacheTtlEligibility; let resolveProviderBinaryThinking: typeof import("./provider-runtime.js").resolveProviderBinaryThinking; @@ -280,7 +283,10 @@ describe("provider-runtime", () => { normalizeProviderModelIdWithPlugin, normalizeProviderTransportWithPlugin, prepareProviderExtraParams, + resolveProviderAuthProfileId, resolveProviderConfigApiKeyWithPlugin, + resolveProviderExtraParamsForTransport, + resolveProviderFollowupFallbackRoute, resolveProviderStreamFn, resolveProviderCacheTtlEligibility, resolveProviderBinaryThinking, @@ -528,6 +534,84 @@ describe("provider-runtime", () => { }); }); + it("exposes provider-owned transport extra params", () => { + const extraParamsForTransport = vi.fn((_ctx) => ({ + patch: { + providerTransportPatch: true, + }, + })); + resolvePluginProvidersMock.mockReturnValue([ + { + id: DEMO_PROVIDER_ID, + label: "Demo", + auth: [], + extraParamsForTransport, + } satisfies ProviderPlugin, + ]); + + expect( + resolveProviderExtraParamsForTransport({ + provider: DEMO_PROVIDER_ID, + context: createDemoResolvedModelContext({ + extraParams: { transport: "websocket" }, + transport: "websocket" as const, + }), + }), + ).toEqual({ + patch: { + providerTransportPatch: true, + }, + }); + expect(extraParamsForTransport).toHaveBeenCalledWith( + expect.objectContaining({ + provider: DEMO_PROVIDER_ID, + modelId: MODEL.id, + model: MODEL, + transport: "websocket", + }), + ); + }); + + it("exposes provider-owned auth profile and fallback route seams", () => { + const resolveAuthProfileId = vi.fn(() => "profile-b"); + const followupFallbackRoute = vi.fn(() => ({ + route: "dispatcher" as const, + reason: "origin unavailable", + })); + resolvePluginProvidersMock.mockReturnValue([ + { + id: DEMO_PROVIDER_ID, + label: "Demo", + auth: [], + resolveAuthProfileId, + followupFallbackRoute, + } satisfies ProviderPlugin, + ]); + + expect( + resolveProviderAuthProfileId({ + provider: DEMO_PROVIDER_ID, + context: createDemoRuntimeContext({ + profileOrder: ["profile-a", "profile-b"], + authStore: { version: 1, profiles: {}, order: {} }, + }), + }), + ).toBe("profile-b"); + expect( + resolveProviderFollowupFallbackRoute({ + provider: DEMO_PROVIDER_ID, + context: createDemoRuntimeContext({ + payload: { text: "hello" }, + originRoutable: false, + dispatcherAvailable: true, + }), + }), + ).toEqual({ + route: "dispatcher", + reason: "origin unavailable", + }); + }); + it("applies the shared GPT-5 prompt overlay for any provider", () => { const contribution = resolveProviderSystemPromptContribution({ provider: "openrouter", @@ -567,6 +651,45 @@ describe("provider-runtime", () => { expect(contribution?.sectionOverrides).toEqual({}); }); + it("lets provider-owned prompt overlays compose after the built-in GPT-5 overlay", () => { + const resolvePromptOverlay = vi.fn((ctx) => ({ + stablePrefix: "provider overlay", + sectionOverrides: { + execution_bias: ctx.baseOverlay?.stablePrefix ? "saw built-in overlay" : "missing", + }, + })); + resolvePluginProvidersMock.mockReturnValue([ + { + id: "openrouter", + label: "OpenRouter", + auth: [], + resolvePromptOverlay, + } satisfies ProviderPlugin, + ]); + + const contribution = resolveProviderSystemPromptContribution({ + provider: "openrouter", + context: { + provider: "openrouter", + modelId: "openai/gpt-5.4", + promptMode: "full", + } as never, + }); + + expect(contribution?.stablePrefix).toContain(""); + expect(contribution?.stablePrefix).toContain("provider overlay"); + expect(contribution?.sectionOverrides?.execution_bias).toBe("saw built-in overlay"); + expect(resolvePromptOverlay).toHaveBeenCalledWith( + expect.objectContaining({ + provider: "openrouter", + modelId: "openai/gpt-5.4", + baseOverlay: expect.objectContaining({ + stablePrefix: expect.stringContaining(""), + }), + }), + ); + }); + it("ignores OpenAI plugin personality fallback for non-OpenAI GPT-5 providers", () => { const contribution = resolveProviderSystemPromptContribution({ provider: "openrouter", diff --git a/src/plugins/provider-runtime.ts b/src/plugins/provider-runtime.ts index a6fc03ca69a..1f601905d23 100644 --- a/src/plugins/provider-runtime.ts +++ b/src/plugins/provider-runtime.ts @@ -16,6 +16,9 @@ import { clearProviderRuntimeHookCache, prepareProviderExtraParams, resetProviderRuntimeHookCacheForTest, + resolveProviderAuthProfileId, + resolveProviderExtraParamsForTransport, + resolveProviderFollowupFallbackRoute, resolveProviderHookPlugin, resolveProviderPluginsForHooks, resolveProviderRuntimePlugin, @@ -88,6 +91,9 @@ function resetExternalAuthFallbackWarningCacheForTest(): void { export { clearProviderRuntimeHookCache, prepareProviderExtraParams, + resolveProviderAuthProfileId, + resolveProviderExtraParamsForTransport, + resolveProviderFollowupFallbackRoute, resetProviderRuntimeHookCacheForTest, resolveProviderRuntimePlugin, wrapProviderStreamFn, @@ -136,14 +142,20 @@ export function resolveProviderSystemPromptContribution(params: { env?: NodeJS.ProcessEnv; context: ProviderSystemPromptContributionContext; }): ProviderSystemPromptContribution | undefined { + const plugin = resolveProviderRuntimePlugin(params); + const baseOverlay = resolveGpt5SystemPromptContribution({ + config: params.context.config ?? params.config, + providerId: params.context.provider ?? params.provider, + modelId: params.context.modelId, + }); + const providerOverlay = + plugin?.resolvePromptOverlay?.({ + ...params.context, + baseOverlay, + }) ?? undefined; return mergeProviderSystemPromptContributions( - resolveGpt5SystemPromptContribution({ - config: params.context.config ?? params.config, - providerId: params.context.provider ?? params.provider, - modelId: params.context.modelId, - }), - resolveProviderRuntimePlugin(params)?.resolveSystemPromptContribution?.(params.context) ?? - undefined, + mergeProviderSystemPromptContributions(baseOverlay, providerOverlay), + plugin?.resolveSystemPromptContribution?.(params.context) ?? undefined, ); } diff --git a/src/plugins/types.ts b/src/plugins/types.ts index edbebf54499..f3ac6a0502f 100644 --- a/src/plugins/types.ts +++ b/src/plugins/types.ts @@ -600,6 +600,53 @@ export type ProviderPrepareExtraParamsContext = { thinkingLevel?: ThinkLevel; }; +export type ProviderExtraParamsForTransportContext = Omit< + ProviderPrepareExtraParamsContext, + "extraParams" +> & { + model?: ProviderRuntimeModel; + transport?: "sse" | "websocket" | "auto"; + extraParams: Record; +}; + +export type ProviderExtraParamsForTransportResult = { + patch?: Record | null; +}; + +export type ProviderResolvePromptOverlayContext = ProviderSystemPromptContributionContext & { + baseOverlay?: ProviderSystemPromptContribution; +}; + +export type ProviderFollowupFallbackRouteContext = { + config?: OpenClawConfig; + agentDir?: string; + workspaceDir?: string; + provider: string; + modelId: string; + payload: ReplyPayload; + originatingChannel?: string; + originatingTo?: string; + originRoutable: boolean; + dispatcherAvailable: boolean; +}; + +export type ProviderFollowupFallbackRouteResult = { + route?: "origin" | "dispatcher" | "drop"; + reason?: string; +}; + +export type ProviderResolveAuthProfileIdContext = { + config?: OpenClawConfig; + agentDir?: string; + workspaceDir?: string; + provider: string; + modelId: string; + preferredProfileId?: string; + lockedProfileId?: string; + profileOrder: string[]; + authStore: AuthProfileStore; +}; + export type ProviderReplaySanitizeMode = "full" | "images-only"; export type ProviderReplayToolCallIdMode = "strict" | "strict9"; @@ -1269,6 +1316,15 @@ export type ProviderPlugin = { prepareExtraParams?: ( ctx: ProviderPrepareExtraParamsContext, ) => Record | null | undefined; + /** + * Provider-owned request params after transport/model resolution. + * + * Use this for transport-family request knobs that should be keyed by the + * resolved model API/transport rather than a hardcoded core allowlist. + */ + extraParamsForTransport?: ( + ctx: ProviderExtraParamsForTransportContext, + ) => ProviderExtraParamsForTransportResult | null | undefined; /** * Provider-owned transport factory. * @@ -1464,6 +1520,30 @@ export type ProviderPlugin = { resolveSystemPromptContribution?: ( ctx: ProviderSystemPromptContributionContext, ) => ProviderSystemPromptContribution | null | undefined; + /** + * Provider-owned GPT/model prompt overlay seam. + * + * Runs after OpenClaw's built-in overlay is resolved and before the + * provider's regular system-prompt contribution is merged. + */ + resolvePromptOverlay?: ( + ctx: ProviderResolvePromptOverlayContext, + ) => ProviderSystemPromptContribution | null | undefined; + /** + * Provider-owned fallback route override for model/profile failure handling. + * + * Return undefined/null to keep OpenClaw's default fallback policy. + */ + followupFallbackRoute?: ( + ctx: ProviderFollowupFallbackRouteContext, + ) => ProviderFollowupFallbackRouteResult | null | undefined; + /** + * Provider-owned auth profile resolver. + * + * Return a profile id from the supplied order to prefer it for this attempt; + * invalid or missing ids are ignored by core. + */ + resolveAuthProfileId?: (ctx: ProviderResolveAuthProfileIdContext) => string | null | undefined; /** * Provider-owned final system-prompt transform. *