mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 14:20:44 +00:00
feat: add Codex harness extension seams
Co-authored-by: Eva <100yenadmin@users.noreply.github.com>
This commit is contained in:
@@ -19,6 +19,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Agents/tools: add optional per-call `timeoutMs` support for image, video, music, and TTS generation tools so agents can extend provider request timeouts only when a specific generation needs it.
|
||||
- Agents/subagents: add optional forked context for native `sessions_spawn` runs so agents can let a child inherit the requester transcript when needed, while keeping clean isolated sessions as the default; includes prompt guidance, context-engine hook metadata, docs, and QA coverage.
|
||||
- Codex harness: add structured debug logging for embedded harness selection decisions so `/status` stays simple while gateway logs explain auto-selection and Pi fallback reasons. (#70760) Thanks @100yenadmin.
|
||||
- Plugin SDK/Codex harness: add provider-owned transport/auth/follow-up seams and harness result classification so Codex-style runtimes can participate in fallback policy without core special-casing. (#70772) Thanks @100yenadmin.
|
||||
- Dependencies/Pi: update bundled Pi packages to `0.70.0`, use Pi's upstream `gpt-5.5` catalog metadata for OpenAI and OpenAI Codex, and keep only local `gpt-5.5-pro` forward-compat handling.
|
||||
- Models/CLI: speed up `openclaw models list --all --provider <id>` for bundled providers with safe static catalogs while keeping live and third-party providers on registry discovery. (#70632) Thanks @shakkernerd.
|
||||
- Models/CLI: avoid broad registry enumeration for default `openclaw models list`, reducing default listing latency while preserving configured-row output. (#70883) Thanks @shakkernerd.
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
c57d43f93ec2930b099dd5c5777f201f1bdd1ab432eeb4049b6e62ff23fe8112 plugin-sdk-api-baseline.json
|
||||
ece1ea689914c4070b587551e86c6bed6598feba90457ab489222e168b2d9298 plugin-sdk-api-baseline.jsonl
|
||||
8ca22ea6125fb198641c676d73b4df5a3bc49079be68bef8ed0718a54c1bb53a plugin-sdk-api-baseline.json
|
||||
197d9743128020062fc457228fa9139d0bd465d9e1775101bfc39137f4a10896 plugin-sdk-api-baseline.jsonl
|
||||
|
||||
@@ -69,6 +69,25 @@ Feature/channel plugin:
|
||||
- calls `api.runtime.*` or the matching `plugin-sdk/*-runtime` helper
|
||||
- never calls a vendor implementation directly
|
||||
|
||||
## Provider and Harness Seams
|
||||
|
||||
Use provider hooks when the behavior belongs to the model provider contract
|
||||
rather than the generic agent loop. Examples include provider-specific request
|
||||
params after transport selection, auth-profile preference, prompt overlays, and
|
||||
follow-up fallback routing after model/profile failover.
|
||||
|
||||
Use agent harness hooks when the behavior belongs to the runtime that is
|
||||
executing a turn. Harnesses can classify successful-but-unusable attempt results
|
||||
such as empty, reasoning-only, or planning-only responses so the outer model
|
||||
fallback policy can make the retry decision.
|
||||
|
||||
Keep both seams narrow:
|
||||
|
||||
- core owns the retry/fallback policy
|
||||
- provider plugins own provider-specific request/auth/routing hints
|
||||
- harness plugins own runtime-specific attempt classification
|
||||
- third-party plugins return hints, not direct mutations of core state
|
||||
|
||||
## File checklist
|
||||
|
||||
For a new capability, expect to touch these areas:
|
||||
|
||||
@@ -9,8 +9,8 @@ sidebarTitle: "Install and Configure"
|
||||
---
|
||||
|
||||
Plugins extend OpenClaw with new capabilities: channels, model providers,
|
||||
tools, skills, speech, realtime transcription, realtime voice,
|
||||
media-understanding, image generation, video generation, web fetch, web
|
||||
agent harnesses, tools, skills, speech, realtime transcription, realtime
|
||||
voice, media-understanding, image generation, video generation, web fetch, web
|
||||
search, and more. Some plugins are **core** (shipped with OpenClaw), others
|
||||
are **external** (published on npm by the community).
|
||||
|
||||
|
||||
@@ -448,6 +448,7 @@ export async function runCodexAppServerAttempt(
|
||||
sessionId: params.sessionId,
|
||||
provider: params.provider,
|
||||
model: params.modelId,
|
||||
resolvedRef: `${params.provider}/${params.modelId}`,
|
||||
assistantTexts: [],
|
||||
},
|
||||
ctx: hookContext,
|
||||
@@ -602,6 +603,7 @@ export async function runCodexAppServerAttempt(
|
||||
sessionId: params.sessionId,
|
||||
provider: params.provider,
|
||||
model: params.modelId,
|
||||
resolvedRef: `${params.provider}/${params.modelId}`,
|
||||
assistantTexts: result.assistantTexts,
|
||||
...(result.lastAssistant ? { lastAssistant: result.lastAssistant } : {}),
|
||||
...(result.attemptUsage ? { usage: result.attemptUsage } : {}),
|
||||
|
||||
@@ -240,7 +240,6 @@ describe("createTelegramBot", () => {
|
||||
it("lets /status bypass a busy Telegram topic lane", async () => {
|
||||
installPerKeySequentializer();
|
||||
loadConfig.mockReturnValue({
|
||||
commands: { native: true },
|
||||
channels: {
|
||||
telegram: {
|
||||
dmPolicy: "open",
|
||||
|
||||
@@ -199,6 +199,7 @@ export async function runPreparedCliAgent(
|
||||
sessionId: params.sessionId,
|
||||
provider: params.provider,
|
||||
model: context.modelId,
|
||||
resolvedRef: `${params.provider}/${context.modelId}`,
|
||||
assistantTexts,
|
||||
...(lastAssistant ? { lastAssistant } : {}),
|
||||
...(output.usage ? { usage: output.usage } : {}),
|
||||
|
||||
17
src/agents/embedded-runner.ts
Normal file
17
src/agents/embedded-runner.ts
Normal file
@@ -0,0 +1,17 @@
|
||||
export {
|
||||
abortEmbeddedAgentRun,
|
||||
compactEmbeddedAgentSession,
|
||||
isEmbeddedAgentRunActive,
|
||||
isEmbeddedAgentRunStreaming,
|
||||
queueEmbeddedAgentMessage,
|
||||
resolveActiveEmbeddedAgentRunSessionId,
|
||||
resolveEmbeddedSessionLane,
|
||||
runEmbeddedAgent,
|
||||
waitForEmbeddedAgentRunEnd,
|
||||
} from "./pi-embedded-runner.js";
|
||||
export type {
|
||||
EmbeddedAgentCompactResult,
|
||||
EmbeddedAgentMeta,
|
||||
EmbeddedAgentRunMeta,
|
||||
EmbeddedAgentRunResult,
|
||||
} from "./pi-embedded-runner.js";
|
||||
@@ -138,6 +138,34 @@ describe("runAgentHarnessAttemptWithFallback", () => {
|
||||
expect(piRunAttempt).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("annotates non-ok harness result classifications for outer model fallback", async () => {
|
||||
process.env.OPENCLAW_AGENT_RUNTIME = "auto";
|
||||
const classify = vi.fn(() => "empty" as const);
|
||||
registerAgentHarness(
|
||||
{
|
||||
id: "codex",
|
||||
label: "Classifying Codex",
|
||||
supports: (ctx) =>
|
||||
ctx.provider === "codex" ? { supported: true, priority: 100 } : { supported: false },
|
||||
runAttempt: vi.fn(async () => createAttemptResult("codex")),
|
||||
classify,
|
||||
},
|
||||
{ ownerPluginId: "codex" },
|
||||
);
|
||||
|
||||
const params = createAttemptParams();
|
||||
const result = await runAgentHarnessAttemptWithFallback(params);
|
||||
|
||||
expect(classify).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ sessionIdUsed: "codex" }),
|
||||
params,
|
||||
);
|
||||
expect(result).toMatchObject({
|
||||
agentHarnessId: "codex",
|
||||
agentHarnessResultClassification: "empty",
|
||||
});
|
||||
});
|
||||
|
||||
it("honors env fallback override over config fallback", async () => {
|
||||
process.env.OPENCLAW_AGENT_RUNTIME = "auto";
|
||||
process.env.OPENCLAW_AGENT_HARNESS_FALLBACK = "none";
|
||||
|
||||
@@ -189,12 +189,12 @@ export async function runAgentHarnessAttemptWithFallback(
|
||||
});
|
||||
if (harness.id === "pi") {
|
||||
const result = await harness.runAttempt(params);
|
||||
return { ...result, agentHarnessId: harness.id };
|
||||
return applyHarnessResultClassification(harness, result, params);
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await harness.runAttempt(params);
|
||||
return { ...result, agentHarnessId: harness.id };
|
||||
return applyHarnessResultClassification(harness, result, params);
|
||||
} catch (error) {
|
||||
log.warn(`${harness.label} failed; not falling back to embedded PI backend`, {
|
||||
harnessId: harness.id,
|
||||
@@ -263,6 +263,22 @@ function logAgentHarnessSelection(
|
||||
});
|
||||
}
|
||||
|
||||
function applyHarnessResultClassification(
|
||||
harness: AgentHarness,
|
||||
result: EmbeddedRunAttemptResult,
|
||||
params: EmbeddedRunAttemptParams,
|
||||
): EmbeddedRunAttemptResult {
|
||||
const classification = harness.classify?.(result, params);
|
||||
if (!classification || classification === "ok") {
|
||||
return { ...result, agentHarnessId: harness.id };
|
||||
}
|
||||
return {
|
||||
...result,
|
||||
agentHarnessId: harness.id,
|
||||
agentHarnessResultClassification: classification,
|
||||
};
|
||||
}
|
||||
|
||||
function resolvePinnedAgentHarnessPolicy(
|
||||
agentHarnessId: string | undefined,
|
||||
): AgentHarnessPolicy | undefined {
|
||||
|
||||
@@ -27,12 +27,20 @@ export type AgentHarnessResetParams = {
|
||||
reason?: "new" | "reset" | "idle" | "daily" | "compaction" | "deleted" | "unknown";
|
||||
};
|
||||
|
||||
export type AgentHarnessResultClassification =
|
||||
| "ok"
|
||||
| NonNullable<EmbeddedRunAttemptResult["agentHarnessResultClassification"]>;
|
||||
|
||||
export type AgentHarness = {
|
||||
id: string;
|
||||
label: string;
|
||||
pluginId?: string;
|
||||
supports(ctx: AgentHarnessSupportContext): AgentHarnessSupport;
|
||||
runAttempt(params: AgentHarnessAttemptParams): Promise<AgentHarnessAttemptResult>;
|
||||
classify?(
|
||||
result: AgentHarnessAttemptResult,
|
||||
ctx: AgentHarnessAttemptParams,
|
||||
): AgentHarnessResultClassification | undefined;
|
||||
compact?(params: AgentHarnessCompactParams): Promise<AgentHarnessCompactResult | undefined>;
|
||||
reset?(params: AgentHarnessResetParams): Promise<void> | void;
|
||||
dispose?(): Promise<void> | void;
|
||||
|
||||
@@ -561,6 +561,46 @@ describe("runWithModelFallback", () => {
|
||||
).toBeNull();
|
||||
});
|
||||
|
||||
it("uses harness-owned terminal classification for GPT-5 fallback", () => {
|
||||
const runResult: EmbeddedPiRunResult = {
|
||||
payloads: [],
|
||||
meta: {
|
||||
durationMs: 1,
|
||||
agentHarnessResultClassification: "planning-only",
|
||||
},
|
||||
};
|
||||
|
||||
expect(
|
||||
classifyEmbeddedPiRunResultForModelFallback({
|
||||
provider: "codex",
|
||||
model: "gpt-5.4",
|
||||
result: runResult,
|
||||
}),
|
||||
).toMatchObject({
|
||||
code: "planning_only_result",
|
||||
reason: "format",
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps aborted harness-classified GPT-5 runs out of fallback", () => {
|
||||
const runResult: EmbeddedPiRunResult = {
|
||||
payloads: [],
|
||||
meta: {
|
||||
durationMs: 1,
|
||||
aborted: true,
|
||||
agentHarnessResultClassification: "empty",
|
||||
},
|
||||
};
|
||||
|
||||
expect(
|
||||
classifyEmbeddedPiRunResultForModelFallback({
|
||||
provider: "codex",
|
||||
model: "gpt-5.4",
|
||||
result: runResult,
|
||||
}),
|
||||
).toBeNull();
|
||||
});
|
||||
|
||||
it("passes original unknown errors to onError during fallback", async () => {
|
||||
const cfg = makeCfg();
|
||||
const unknownError = new Error("provider misbehaved");
|
||||
|
||||
@@ -47,6 +47,7 @@ const { MockManager } = vi.hoisted(() => {
|
||||
|
||||
sentEvents: unknown[] = [];
|
||||
connectCallCount = 0;
|
||||
connectApiKeys: string[] = [];
|
||||
closeCallCount = 0;
|
||||
options: unknown;
|
||||
|
||||
@@ -69,6 +70,7 @@ const { MockManager } = vi.hoisted(() => {
|
||||
|
||||
async connect(_apiKey: string): Promise<void> {
|
||||
this.connectCallCount++;
|
||||
this.connectApiKeys.push(_apiKey);
|
||||
if (this.connectShouldFail || _globalConnectShouldFail) {
|
||||
throw new Error("Mock connect failure");
|
||||
}
|
||||
@@ -3628,6 +3630,61 @@ describe("releaseWsSession / hasWsSession", () => {
|
||||
expect(manager.closeCallCount).toBe(1);
|
||||
});
|
||||
|
||||
it("pools cleanly released sessions behind the explicit pool flag", async () => {
|
||||
const streamFn = createOpenAIWebSocketStreamFn("sk-test", "registry-test");
|
||||
const stream = streamFn(
|
||||
{
|
||||
api: "openai-responses",
|
||||
provider: "openai",
|
||||
id: "gpt-5.4",
|
||||
contextWindow: 128000,
|
||||
maxTokens: 4096,
|
||||
reasoning: false,
|
||||
input: ["text"],
|
||||
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
|
||||
name: "GPT-5.4",
|
||||
} as Parameters<typeof streamFn>[0],
|
||||
{
|
||||
systemPrompt: "test",
|
||||
messages: [userMsg("Hi") as Parameters<typeof convertMessagesToInputItems>[0][number]],
|
||||
tools: [],
|
||||
} as Parameters<typeof streamFn>[1],
|
||||
);
|
||||
|
||||
await new Promise((r) => setImmediate(r));
|
||||
const manager = MockManager.lastInstance!;
|
||||
manager.simulateEvent({
|
||||
type: "response.completed",
|
||||
response: makeResponseObject("resp-pooled", "done"),
|
||||
});
|
||||
for await (const _ of await resolveStream(stream)) {
|
||||
// consume
|
||||
}
|
||||
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
releaseWsSession("registry-test", {
|
||||
allowPool: true,
|
||||
env: {
|
||||
OPENCLAW_OPENAI_WS_POOL: "1",
|
||||
OPENCLAW_OPENAI_WS_SESSION_POOL_IDLE_MS: "1000",
|
||||
} as NodeJS.ProcessEnv,
|
||||
});
|
||||
|
||||
expect(hasWsSession("registry-test")).toBe(true);
|
||||
expect(manager.closeCallCount).toBe(0);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(999);
|
||||
expect(hasWsSession("registry-test")).toBe(true);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
expect(hasWsSession("registry-test")).toBe(false);
|
||||
expect(manager.closeCallCount).toBe(1);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("releaseWsSession is a no-op for unknown sessions", () => {
|
||||
expect(() => releaseWsSession("nonexistent-session")).not.toThrow();
|
||||
});
|
||||
@@ -3712,6 +3769,75 @@ describe("releaseWsSession / hasWsSession", () => {
|
||||
// consume
|
||||
}
|
||||
});
|
||||
|
||||
it("recreates the cached manager when the API key changes for the same session", async () => {
|
||||
const sessionId = "registry-test";
|
||||
const firstStreamFn = createOpenAIWebSocketStreamFn("sk-first", sessionId);
|
||||
const firstStream = firstStreamFn(
|
||||
{
|
||||
api: "openai-responses",
|
||||
provider: "openai",
|
||||
id: "gpt-5.4",
|
||||
contextWindow: 128000,
|
||||
maxTokens: 4096,
|
||||
reasoning: false,
|
||||
input: ["text"],
|
||||
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
|
||||
name: "GPT-5.4",
|
||||
} as Parameters<typeof firstStreamFn>[0],
|
||||
{
|
||||
systemPrompt: "test",
|
||||
messages: [userMsg("Hi") as Parameters<typeof convertMessagesToInputItems>[0][number]],
|
||||
tools: [],
|
||||
} as Parameters<typeof firstStreamFn>[1],
|
||||
);
|
||||
|
||||
await new Promise((r) => setImmediate(r));
|
||||
const firstManager = MockManager.lastInstance!;
|
||||
expect(firstManager.connectApiKeys).toEqual(["sk-first"]);
|
||||
firstManager.simulateEvent({
|
||||
type: "response.completed",
|
||||
response: makeResponseObject("resp-first-key", "done"),
|
||||
});
|
||||
for await (const _ of await resolveStream(firstStream)) {
|
||||
// consume
|
||||
}
|
||||
|
||||
const secondStreamFn = createOpenAIWebSocketStreamFn("sk-second", sessionId);
|
||||
const secondStream = secondStreamFn(
|
||||
{
|
||||
api: "openai-responses",
|
||||
provider: "openai",
|
||||
id: "gpt-5.4",
|
||||
contextWindow: 128000,
|
||||
maxTokens: 4096,
|
||||
reasoning: false,
|
||||
input: ["text"],
|
||||
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
|
||||
name: "GPT-5.4",
|
||||
} as Parameters<typeof secondStreamFn>[0],
|
||||
{
|
||||
systemPrompt: "test",
|
||||
messages: [userMsg("Again") as Parameters<typeof convertMessagesToInputItems>[0][number]],
|
||||
tools: [],
|
||||
} as Parameters<typeof secondStreamFn>[1],
|
||||
);
|
||||
|
||||
await new Promise((r) => setImmediate(r));
|
||||
expect(MockManager.instances).toHaveLength(2);
|
||||
expect(firstManager.closeCallCount).toBe(1);
|
||||
const secondManager = MockManager.lastInstance!;
|
||||
expect(secondManager).not.toBe(firstManager);
|
||||
expect(secondManager.connectApiKeys).toEqual(["sk-second"]);
|
||||
|
||||
secondManager.simulateEvent({
|
||||
type: "response.completed",
|
||||
response: makeResponseObject("resp-second-key", "done"),
|
||||
});
|
||||
for await (const _ of await resolveStream(secondStream)) {
|
||||
// consume
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("convertMessagesToInputItems — phase inheritance", () => {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { createHash, randomUUID } from "node:crypto";
|
||||
import type { StreamFn } from "@mariozechner/pi-agent-core";
|
||||
import type {
|
||||
AssistantMessage,
|
||||
@@ -73,6 +73,7 @@ import { mergeTransportMetadata } from "./transport-stream-shared.js";
|
||||
interface WsSession {
|
||||
manager: OpenAIWebSocketManager;
|
||||
managerConfigSignature: string;
|
||||
authSignature: string;
|
||||
/** Number of messages that were in context.messages at the END of the last streamFn call. */
|
||||
lastContextLength: number;
|
||||
/** True if the connection has been established at least once. */
|
||||
@@ -81,6 +82,9 @@ interface WsSession {
|
||||
warmUpAttempted: boolean;
|
||||
/** True if the session is permanently broken (no more reconnect). */
|
||||
broken: boolean;
|
||||
/** Pending idle release timer when disabled-by-default pooling retains a session. */
|
||||
idleTimer?: ReturnType<typeof setTimeout>;
|
||||
pooledUntil?: number;
|
||||
/** Session-scoped cool-down after repeated websocket failures. */
|
||||
degradedUntil: number | null;
|
||||
degradeCooldownMs: number;
|
||||
@@ -201,20 +205,72 @@ function createEventStream(): AssistantMessageEventStream {
|
||||
// Public registry helpers
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
type ReleaseWsSessionOptions = {
|
||||
allowPool?: boolean;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
};
|
||||
|
||||
function resolveWsSessionPoolConfig(env: NodeJS.ProcessEnv = process.env): {
|
||||
enabled: boolean;
|
||||
idleMs: number;
|
||||
} {
|
||||
const enabled =
|
||||
env.OPENCLAW_OPENAI_WS_POOL === "1" || env.OPENCLAW_OPENAI_WS_SESSION_POOL === "1";
|
||||
const rawIdleMs = Number(env.OPENCLAW_OPENAI_WS_SESSION_POOL_IDLE_MS);
|
||||
const idleMs = Number.isFinite(rawIdleMs)
|
||||
? Math.min(300_000, Math.max(1_000, Math.trunc(rawIdleMs)))
|
||||
: 30_000;
|
||||
return { enabled, idleMs };
|
||||
}
|
||||
|
||||
function clearWsSessionIdleTimer(session: WsSession): void {
|
||||
if (!session.idleTimer) {
|
||||
return;
|
||||
}
|
||||
clearTimeout(session.idleTimer);
|
||||
session.idleTimer = undefined;
|
||||
session.pooledUntil = undefined;
|
||||
}
|
||||
|
||||
function closeWsSession(sessionId: string, session: WsSession): void {
|
||||
clearWsSessionIdleTimer(session);
|
||||
try {
|
||||
session.manager.close();
|
||||
} catch {
|
||||
// Ignore close errors — connection may already be gone.
|
||||
}
|
||||
wsRegistry.delete(sessionId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Release and close the WebSocket session for the given sessionId.
|
||||
* Call this after the agent run completes to free the connection.
|
||||
*/
|
||||
export function releaseWsSession(sessionId: string): void {
|
||||
export function releaseWsSession(sessionId: string, options: ReleaseWsSessionOptions = {}): void {
|
||||
const session = wsRegistry.get(sessionId);
|
||||
if (session) {
|
||||
try {
|
||||
session.manager.close();
|
||||
} catch {
|
||||
// Ignore close errors — connection may already be gone.
|
||||
}
|
||||
wsRegistry.delete(sessionId);
|
||||
if (!session) {
|
||||
return;
|
||||
}
|
||||
const pool = resolveWsSessionPoolConfig(options.env);
|
||||
if (
|
||||
options.allowPool === true &&
|
||||
pool.enabled &&
|
||||
!session.broken &&
|
||||
session.manager.isConnected()
|
||||
) {
|
||||
clearWsSessionIdleTimer(session);
|
||||
session.pooledUntil = Date.now() + pool.idleMs;
|
||||
session.idleTimer = setTimeout(() => {
|
||||
const current = wsRegistry.get(sessionId);
|
||||
if (current === session) {
|
||||
closeWsSession(sessionId, session);
|
||||
}
|
||||
}, pool.idleMs);
|
||||
session.idleTimer.unref?.();
|
||||
log.debug(`[ws-stream] pooled websocket session=${sessionId} idleMs=${pool.idleMs}`);
|
||||
return;
|
||||
}
|
||||
closeWsSession(sessionId, session);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -292,6 +348,7 @@ function resetWsSession(params: {
|
||||
createManager: () => OpenAIWebSocketManager;
|
||||
preserveDegradeUntil?: boolean;
|
||||
}): void {
|
||||
clearWsSessionIdleTimer(params.session);
|
||||
try {
|
||||
params.session.manager.close();
|
||||
} catch {
|
||||
@@ -362,6 +419,10 @@ function resolveWsManagerConfigSignature(
|
||||
});
|
||||
}
|
||||
|
||||
function resolveWsAuthSignature(apiKey: string): string {
|
||||
return createHash("sha256").update(apiKey).digest("hex");
|
||||
}
|
||||
|
||||
const AZURE_OPENAI_PROVIDER_IDS = new Set(["azure-openai", "azure-openai-responses"]);
|
||||
const OPENAI_CODEX_PROVIDER_ID = "openai-codex";
|
||||
|
||||
@@ -655,6 +716,7 @@ export function createOpenAIWebSocketStreamFn(
|
||||
|
||||
while (true) {
|
||||
let session = wsRegistry.get(sessionId);
|
||||
const authSignature = resolveWsAuthSignature(apiKey);
|
||||
const managerConfigSignature = resolveWsManagerConfigSignature(
|
||||
opts.managerOptions,
|
||||
sessionHeaders,
|
||||
@@ -664,6 +726,7 @@ export function createOpenAIWebSocketStreamFn(
|
||||
session = {
|
||||
manager,
|
||||
managerConfigSignature,
|
||||
authSignature,
|
||||
lastContextLength: 0,
|
||||
everConnected: false,
|
||||
warmUpAttempted: false,
|
||||
@@ -672,13 +735,20 @@ export function createOpenAIWebSocketStreamFn(
|
||||
degradeCooldownMs: wsSessionPolicy.degradeCooldownMs,
|
||||
};
|
||||
wsRegistry.set(sessionId, session);
|
||||
} else if (session.managerConfigSignature !== managerConfigSignature) {
|
||||
} else if (
|
||||
session.managerConfigSignature !== managerConfigSignature ||
|
||||
session.authSignature !== authSignature
|
||||
) {
|
||||
clearWsSessionIdleTimer(session);
|
||||
resetWsSession({
|
||||
session,
|
||||
createManager: () => createWsManager(opts.managerOptions, sessionHeaders),
|
||||
});
|
||||
session.managerConfigSignature = managerConfigSignature;
|
||||
session.authSignature = authSignature;
|
||||
session.degradeCooldownMs = wsSessionPolicy.degradeCooldownMs;
|
||||
} else {
|
||||
clearWsSessionIdleTimer(session);
|
||||
}
|
||||
|
||||
if (transport !== "websocket" && isWsSessionDegraded(session)) {
|
||||
|
||||
@@ -8,6 +8,7 @@ vi.mock("../plugins/provider-hook-runtime.js", () => ({
|
||||
buildHookProviderCacheKey: () => "test-provider-hook-cache-key",
|
||||
},
|
||||
prepareProviderExtraParams: () => undefined,
|
||||
resolveProviderExtraParamsForTransport: () => undefined,
|
||||
resetProviderRuntimeHookCacheForTest: () => {},
|
||||
wrapProviderStreamFn: (params: { context: { streamFn?: StreamFn } }) => params.context.streamFn,
|
||||
}));
|
||||
@@ -282,6 +283,7 @@ import {
|
||||
import {
|
||||
applyExtraParamsToAgent,
|
||||
resolveAgentTransportOverride,
|
||||
resolveExplicitSettingsTransport,
|
||||
resolvePreparedExtraParams,
|
||||
} from "./pi-embedded-runner/extra-params.js";
|
||||
import { createGoogleThinkingPayloadWrapper } from "./pi-embedded-runner/google-stream-wrappers.js";
|
||||
@@ -1909,6 +1911,118 @@ describe("applyExtraParamsToAgent", () => {
|
||||
expect(effectiveExtraParams.transport).toBe("auto");
|
||||
});
|
||||
|
||||
it("composes transport extra-param hooks after provider preparation", () => {
|
||||
const resolveProviderExtraParamsForTransport = vi.fn((_params) => ({
|
||||
patch: {
|
||||
hookApplied: true,
|
||||
},
|
||||
}));
|
||||
extraParamsTesting.setProviderRuntimeDepsForTest({
|
||||
prepareProviderExtraParams: (params) => ({
|
||||
...params.context.extraParams,
|
||||
transport: "websocket",
|
||||
}),
|
||||
resolveProviderExtraParamsForTransport,
|
||||
wrapProviderStreamFn: (params) => params.context.streamFn,
|
||||
});
|
||||
|
||||
const model = {
|
||||
api: "openai-responses",
|
||||
provider: "openai",
|
||||
id: "gpt-5",
|
||||
} as Model<"openai-responses">;
|
||||
const effectiveExtraParams = resolvePreparedExtraParams({
|
||||
cfg: undefined,
|
||||
provider: "openai",
|
||||
modelId: "gpt-5",
|
||||
agentDir: "/tmp/agent",
|
||||
workspaceDir: "/tmp/workspace",
|
||||
model,
|
||||
});
|
||||
|
||||
expect(effectiveExtraParams).toMatchObject({
|
||||
transport: "websocket",
|
||||
hookApplied: true,
|
||||
});
|
||||
expect(resolveProviderExtraParamsForTransport).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
provider: "openai",
|
||||
context: expect.objectContaining({
|
||||
model,
|
||||
transport: "websocket",
|
||||
agentDir: "/tmp/agent",
|
||||
workspaceDir: "/tmp/workspace",
|
||||
}),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("passes explicit settings transport to transport extra-param hooks", () => {
|
||||
const resolveProviderExtraParamsForTransport = vi.fn((_params) => ({
|
||||
patch: {
|
||||
hookApplied: true,
|
||||
},
|
||||
}));
|
||||
extraParamsTesting.setProviderRuntimeDepsForTest({
|
||||
prepareProviderExtraParams: (params) => ({
|
||||
...params.context.extraParams,
|
||||
transport: "auto",
|
||||
}),
|
||||
resolveProviderExtraParamsForTransport,
|
||||
wrapProviderStreamFn: (params) => params.context.streamFn,
|
||||
});
|
||||
|
||||
const resolvedTransport = resolveExplicitSettingsTransport({
|
||||
settingsManager: {
|
||||
getGlobalSettings: () => ({ transport: "websocket" }),
|
||||
getProjectSettings: () => ({}),
|
||||
},
|
||||
sessionTransport: "websocket",
|
||||
});
|
||||
const effectiveExtraParams = resolvePreparedExtraParams({
|
||||
cfg: undefined,
|
||||
provider: "openai",
|
||||
modelId: "gpt-5",
|
||||
resolvedTransport,
|
||||
});
|
||||
|
||||
expect(effectiveExtraParams).toMatchObject({
|
||||
transport: "auto",
|
||||
hookApplied: true,
|
||||
});
|
||||
expect(resolveProviderExtraParamsForTransport).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
context: expect.objectContaining({
|
||||
transport: "websocket",
|
||||
}),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("applies transport hook parallel_tool_calls patches to request payloads", () => {
|
||||
extraParamsTesting.setProviderRuntimeDepsForTest({
|
||||
prepareProviderExtraParams: () => undefined,
|
||||
resolveProviderExtraParamsForTransport: () => ({
|
||||
patch: {
|
||||
parallel_tool_calls: true,
|
||||
},
|
||||
}),
|
||||
wrapProviderStreamFn: (params) => params.context.streamFn,
|
||||
});
|
||||
const payload = runResponsesPayloadMutationCase({
|
||||
applyProvider: "test-openai",
|
||||
applyModelId: "gpt-compatible",
|
||||
model: {
|
||||
api: "openai-responses",
|
||||
provider: "test-openai",
|
||||
id: "gpt-compatible",
|
||||
} as Model<"openai-responses">,
|
||||
payload: {},
|
||||
});
|
||||
|
||||
expect(payload.parallel_tool_calls).toBe(true);
|
||||
});
|
||||
|
||||
it("uses prepared transport when session settings did not explicitly set one", () => {
|
||||
const effectiveExtraParams = resolvePreparedExtraParams({
|
||||
cfg: undefined,
|
||||
@@ -1945,6 +2059,27 @@ describe("applyExtraParamsToAgent", () => {
|
||||
).toBeUndefined();
|
||||
});
|
||||
|
||||
it("resolves explicit settings transport from the active session transport", () => {
|
||||
expect(
|
||||
resolveExplicitSettingsTransport({
|
||||
settingsManager: {
|
||||
getGlobalSettings: () => ({}),
|
||||
getProjectSettings: () => ({}),
|
||||
},
|
||||
sessionTransport: "websocket",
|
||||
}),
|
||||
).toBeUndefined();
|
||||
expect(
|
||||
resolveExplicitSettingsTransport({
|
||||
settingsManager: {
|
||||
getGlobalSettings: () => ({ transport: "sse" }),
|
||||
getProjectSettings: () => ({}),
|
||||
},
|
||||
sessionTransport: "websocket",
|
||||
}),
|
||||
).toBe("websocket");
|
||||
});
|
||||
|
||||
it("strips prototype pollution keys from extra params overrides", () => {
|
||||
const effectiveExtraParams = resolvePreparedExtraParams({
|
||||
cfg: undefined,
|
||||
|
||||
@@ -1065,6 +1065,7 @@ describe("runEmbeddedPiAgent auth profile rotation", () => {
|
||||
}),
|
||||
).rejects.toMatchObject({
|
||||
name: "FailoverError",
|
||||
profileId: "openai:p1",
|
||||
reason: "rate_limit",
|
||||
provider: "openai",
|
||||
model: "mock-1",
|
||||
|
||||
19
src/agents/pi-embedded-runner/aliases.test.ts
Normal file
19
src/agents/pi-embedded-runner/aliases.test.ts
Normal file
@@ -0,0 +1,19 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { runEmbeddedAgent as runEmbeddedAgentFromNeutralBarrel } from "../embedded-runner.js";
|
||||
import {
|
||||
abortEmbeddedAgentRun,
|
||||
abortEmbeddedPiRun,
|
||||
compactEmbeddedAgentSession,
|
||||
compactEmbeddedPiSession,
|
||||
runEmbeddedAgent,
|
||||
runEmbeddedPiAgent,
|
||||
} from "../pi-embedded-runner.js";
|
||||
|
||||
describe("embedded runner compatibility aliases", () => {
|
||||
it("keeps neutral embedded-agent aliases bound to the PI compatibility exports", () => {
|
||||
expect(runEmbeddedAgent).toBe(runEmbeddedPiAgent);
|
||||
expect(runEmbeddedAgentFromNeutralBarrel).toBe(runEmbeddedPiAgent);
|
||||
expect(compactEmbeddedAgentSession).toBe(compactEmbeddedPiSession);
|
||||
expect(abortEmbeddedAgentRun).toBe(abortEmbeddedPiRun);
|
||||
});
|
||||
});
|
||||
@@ -6,9 +6,11 @@ import type { ThinkLevel } from "../../auto-reply/thinking.js";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import {
|
||||
prepareProviderExtraParams as prepareProviderExtraParamsRuntime,
|
||||
resolveProviderExtraParamsForTransport as resolveProviderExtraParamsForTransportRuntime,
|
||||
wrapProviderStreamFn as wrapProviderStreamFnRuntime,
|
||||
} from "../../plugins/provider-hook-runtime.js";
|
||||
import type { ProviderRuntimeModel } from "../../plugins/provider-runtime-model.types.js";
|
||||
import { supportsGptParallelToolCallsPayload } from "../provider-api-families.js";
|
||||
import { createGoogleThinkingPayloadWrapper } from "./google-stream-wrappers.js";
|
||||
import { log } from "./logger.js";
|
||||
import { createMinimaxThinkingDisabledWrapper } from "./minimax-stream-wrappers.js";
|
||||
@@ -26,6 +28,7 @@ import { streamWithPayloadPatch } from "./stream-payload-utils.js";
|
||||
|
||||
const defaultProviderRuntimeDeps = {
|
||||
prepareProviderExtraParams: prepareProviderExtraParamsRuntime,
|
||||
resolveProviderExtraParamsForTransport: resolveProviderExtraParamsForTransportRuntime,
|
||||
wrapProviderStreamFn: wrapProviderStreamFnRuntime,
|
||||
};
|
||||
|
||||
@@ -39,12 +42,17 @@ export const __testing = {
|
||||
): void {
|
||||
providerRuntimeDeps.prepareProviderExtraParams =
|
||||
deps?.prepareProviderExtraParams ?? defaultProviderRuntimeDeps.prepareProviderExtraParams;
|
||||
providerRuntimeDeps.resolveProviderExtraParamsForTransport =
|
||||
deps?.resolveProviderExtraParamsForTransport ??
|
||||
defaultProviderRuntimeDeps.resolveProviderExtraParamsForTransport;
|
||||
providerRuntimeDeps.wrapProviderStreamFn =
|
||||
deps?.wrapProviderStreamFn ?? defaultProviderRuntimeDeps.wrapProviderStreamFn;
|
||||
},
|
||||
resetProviderRuntimeDepsForTest(): void {
|
||||
providerRuntimeDeps.prepareProviderExtraParams =
|
||||
defaultProviderRuntimeDeps.prepareProviderExtraParams;
|
||||
providerRuntimeDeps.resolveProviderExtraParamsForTransport =
|
||||
defaultProviderRuntimeDeps.resolveProviderExtraParamsForTransport;
|
||||
providerRuntimeDeps.wrapProviderStreamFn = defaultProviderRuntimeDeps.wrapProviderStreamFn;
|
||||
},
|
||||
};
|
||||
@@ -111,7 +119,7 @@ type CacheRetentionStreamOptions = Partial<SimpleStreamOptions> & {
|
||||
cachedContent?: string;
|
||||
openaiWsWarmup?: boolean;
|
||||
};
|
||||
type SupportedTransport = Exclude<CacheRetentionStreamOptions["transport"], undefined>;
|
||||
export type SupportedTransport = Exclude<CacheRetentionStreamOptions["transport"], undefined>;
|
||||
|
||||
function resolveSupportedTransport(value: unknown): SupportedTransport | undefined {
|
||||
return value === "sse" || value === "websocket" || value === "auto" ? value : undefined;
|
||||
@@ -125,10 +133,14 @@ export function resolvePreparedExtraParams(params: {
|
||||
cfg: OpenClawConfig | undefined;
|
||||
provider: string;
|
||||
modelId: string;
|
||||
agentDir?: string;
|
||||
workspaceDir?: string;
|
||||
extraParamsOverride?: Record<string, unknown>;
|
||||
thinkingLevel?: ThinkLevel;
|
||||
agentId?: string;
|
||||
resolvedExtraParams?: Record<string, unknown>;
|
||||
model?: ProviderRuntimeModel;
|
||||
resolvedTransport?: SupportedTransport;
|
||||
}): Record<string, unknown> {
|
||||
const resolvedExtraParams =
|
||||
params.resolvedExtraParams ??
|
||||
@@ -159,19 +171,38 @@ export function resolvePreparedExtraParams(params: {
|
||||
merged.cachedContent = resolvedCachedContent;
|
||||
delete merged.cached_content;
|
||||
}
|
||||
return (
|
||||
const prepared =
|
||||
providerRuntimeDeps.prepareProviderExtraParams({
|
||||
provider: params.provider,
|
||||
config: params.cfg,
|
||||
workspaceDir: params.workspaceDir,
|
||||
context: {
|
||||
config: params.cfg,
|
||||
agentDir: params.agentDir,
|
||||
workspaceDir: params.workspaceDir,
|
||||
provider: params.provider,
|
||||
modelId: params.modelId,
|
||||
extraParams: merged,
|
||||
thinkingLevel: params.thinkingLevel,
|
||||
},
|
||||
}) ?? merged
|
||||
);
|
||||
}) ?? merged;
|
||||
const transportPatch = providerRuntimeDeps.resolveProviderExtraParamsForTransport({
|
||||
provider: params.provider,
|
||||
config: params.cfg,
|
||||
workspaceDir: params.workspaceDir,
|
||||
context: {
|
||||
config: params.cfg,
|
||||
agentDir: params.agentDir,
|
||||
workspaceDir: params.workspaceDir,
|
||||
provider: params.provider,
|
||||
modelId: params.modelId,
|
||||
extraParams: prepared,
|
||||
thinkingLevel: params.thinkingLevel,
|
||||
model: params.model,
|
||||
transport: params.resolvedTransport ?? resolveSupportedTransport(prepared.transport),
|
||||
},
|
||||
})?.patch;
|
||||
return transportPatch ? { ...prepared, ...transportPatch } : prepared;
|
||||
}
|
||||
|
||||
function sanitizeExtraParamsRecord(
|
||||
@@ -230,6 +261,21 @@ export function resolveAgentTransportOverride(params: {
|
||||
return resolveSupportedTransport(params.effectiveExtraParams?.transport);
|
||||
}
|
||||
|
||||
export function resolveExplicitSettingsTransport(params: {
|
||||
settingsManager: Pick<SettingsManager, "getGlobalSettings" | "getProjectSettings">;
|
||||
sessionTransport: unknown;
|
||||
}): SupportedTransport | undefined {
|
||||
const globalSettings = params.settingsManager.getGlobalSettings();
|
||||
const projectSettings = params.settingsManager.getProjectSettings();
|
||||
if (
|
||||
!hasExplicitTransportSetting(globalSettings) &&
|
||||
!hasExplicitTransportSetting(projectSettings)
|
||||
) {
|
||||
return undefined;
|
||||
}
|
||||
return resolveSupportedTransport(params.sessionTransport);
|
||||
}
|
||||
|
||||
function createStreamFnWithExtraParams(
|
||||
baseStreamFn: StreamFn | undefined,
|
||||
extraParams: Record<string, unknown> | undefined,
|
||||
@@ -331,12 +377,7 @@ function createParallelToolCallsWrapper(
|
||||
): StreamFn {
|
||||
const underlying = baseStreamFn ?? streamSimple;
|
||||
return (model, context, options) => {
|
||||
if (
|
||||
model.api !== "openai-completions" &&
|
||||
model.api !== "openai-responses" &&
|
||||
model.api !== "openai-codex-responses" &&
|
||||
model.api !== "azure-openai-responses"
|
||||
) {
|
||||
if (!supportsGptParallelToolCallsPayload(model.api)) {
|
||||
return underlying(model, context, options);
|
||||
}
|
||||
log.debug(
|
||||
@@ -415,7 +456,7 @@ function applyPostPluginStreamWrappers(
|
||||
ctx.agent.streamFn = createMinimaxThinkingDisabledWrapper(ctx.agent.streamFn);
|
||||
|
||||
const rawParallelToolCalls = resolveAliasedParamValue(
|
||||
[ctx.resolvedExtraParams, ctx.override],
|
||||
[ctx.effectiveExtraParams, ctx.override],
|
||||
"parallel_tool_calls",
|
||||
"parallelToolCalls",
|
||||
);
|
||||
@@ -452,6 +493,7 @@ export function applyExtraParamsToAgent(
|
||||
workspaceDir?: string,
|
||||
model?: ProviderRuntimeModel,
|
||||
agentDir?: string,
|
||||
resolvedTransport?: SupportedTransport,
|
||||
): { effectiveExtraParams: Record<string, unknown> } {
|
||||
const resolvedExtraParams = resolveExtraParams({
|
||||
cfg,
|
||||
@@ -472,7 +514,11 @@ export function applyExtraParamsToAgent(
|
||||
extraParamsOverride,
|
||||
thinkingLevel,
|
||||
agentId,
|
||||
agentDir,
|
||||
workspaceDir,
|
||||
resolvedExtraParams,
|
||||
model,
|
||||
resolvedTransport,
|
||||
});
|
||||
const wrapperContext: ApplyExtraParamsContext = {
|
||||
agent,
|
||||
|
||||
@@ -47,6 +47,35 @@ function hasDeliberateSilentTerminalReply(result: EmbeddedPiRunResult): boolean
|
||||
);
|
||||
}
|
||||
|
||||
function classifyHarnessResult(params: {
|
||||
provider: string;
|
||||
model: string;
|
||||
result: EmbeddedPiRunResult;
|
||||
}): ModelFallbackResultClassification {
|
||||
switch (params.result.meta.agentHarnessResultClassification) {
|
||||
case "empty":
|
||||
return {
|
||||
message: `${params.provider}/${params.model} ended without a visible assistant reply`,
|
||||
reason: "format",
|
||||
code: "empty_result",
|
||||
};
|
||||
case "reasoning-only":
|
||||
return {
|
||||
message: `${params.provider}/${params.model} ended with reasoning only`,
|
||||
reason: "format",
|
||||
code: "reasoning_only_result",
|
||||
};
|
||||
case "planning-only":
|
||||
return {
|
||||
message: `${params.provider}/${params.model} exhausted plan-only retries without taking action`,
|
||||
reason: "format",
|
||||
code: "planning_only_result",
|
||||
};
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
export function classifyEmbeddedPiRunResultForModelFallback(params: {
|
||||
provider: string;
|
||||
model: string;
|
||||
@@ -69,6 +98,15 @@ export function classifyEmbeddedPiRunResultForModelFallback(params: {
|
||||
return null;
|
||||
}
|
||||
|
||||
const harnessClassification = classifyHarnessResult({
|
||||
provider: params.provider,
|
||||
model: params.model,
|
||||
result: params.result,
|
||||
});
|
||||
if (harnessClassification) {
|
||||
return harnessClassification;
|
||||
}
|
||||
|
||||
const payloads = params.result.payloads ?? [];
|
||||
if (payloads.length === 0 && hasDeliberateSilentTerminalReply(params.result)) {
|
||||
return null;
|
||||
|
||||
@@ -371,6 +371,7 @@ export async function loadRunOverflowCompactionHarness(): Promise<{
|
||||
vi.doMock("../../plugins/provider-runtime.js", () => ({
|
||||
prepareProviderRuntimeAuth: mockedPrepareProviderRuntimeAuth,
|
||||
resolveProviderCapabilitiesWithPlugin: vi.fn(() => ({})),
|
||||
resolveProviderAuthProfileId: vi.fn(() => undefined),
|
||||
prepareProviderExtraParams: vi.fn(async () => ({})),
|
||||
wrapProviderStreamFn: vi.fn((_cfg: unknown, _model: unknown, fn: unknown) => fn),
|
||||
}));
|
||||
|
||||
@@ -10,6 +10,7 @@ import { sleepWithAbort } from "../../infra/backoff.js";
|
||||
import { freezeDiagnosticTraceContext } from "../../infra/diagnostic-trace-context.js";
|
||||
import { formatErrorMessage } from "../../infra/errors.js";
|
||||
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
|
||||
import { resolveProviderAuthProfileId } from "../../plugins/provider-runtime.js";
|
||||
import { enqueueCommandInLane } from "../../process/command-queue.js";
|
||||
import { normalizeOptionalString } from "../../shared/string-coerce.js";
|
||||
import { sanitizeForLog } from "../../terminal/ansi.js";
|
||||
@@ -448,10 +449,35 @@ export async function runEmbeddedPiAgent(
|
||||
provider,
|
||||
preferredProfile: preferredProfileId,
|
||||
});
|
||||
const providerPreferredProfileId = lockedProfileId
|
||||
? undefined
|
||||
: resolveProviderAuthProfileId({
|
||||
provider,
|
||||
config: params.config,
|
||||
workspaceDir: resolvedWorkspace,
|
||||
context: {
|
||||
config: params.config,
|
||||
agentDir,
|
||||
workspaceDir: resolvedWorkspace,
|
||||
provider,
|
||||
modelId,
|
||||
preferredProfileId,
|
||||
lockedProfileId,
|
||||
profileOrder,
|
||||
authStore,
|
||||
},
|
||||
});
|
||||
const providerOrderedProfiles =
|
||||
providerPreferredProfileId && profileOrder.includes(providerPreferredProfileId)
|
||||
? [
|
||||
providerPreferredProfileId,
|
||||
...profileOrder.filter((profileId) => profileId !== providerPreferredProfileId),
|
||||
]
|
||||
: profileOrder;
|
||||
const profileCandidates = lockedProfileId
|
||||
? [lockedProfileId]
|
||||
: profileOrder.length > 0
|
||||
? profileOrder
|
||||
: providerOrderedProfiles.length > 0
|
||||
? providerOrderedProfiles
|
||||
: [undefined];
|
||||
let profileIndex = 0;
|
||||
const traceAttempts: TraceAttempt[] = [];
|
||||
@@ -1786,6 +1812,7 @@ export async function runEmbeddedPiAgent(
|
||||
replayInvalid,
|
||||
livenessState,
|
||||
toolSummary: attemptToolSummary,
|
||||
agentHarnessResultClassification: attempt.agentHarnessResultClassification,
|
||||
},
|
||||
didSendViaMessagingTool: attempt.didSendViaMessagingTool,
|
||||
didSendDeterministicApprovalPrompt: attempt.didSendDeterministicApprovalPrompt,
|
||||
@@ -1947,6 +1974,7 @@ export async function runEmbeddedPiAgent(
|
||||
replayInvalid,
|
||||
livenessState,
|
||||
toolSummary: attemptToolSummary,
|
||||
agentHarnessResultClassification: attempt.agentHarnessResultClassification,
|
||||
},
|
||||
didSendViaMessagingTool: attempt.didSendViaMessagingTool,
|
||||
didSendDeterministicApprovalPrompt: attempt.didSendDeterministicApprovalPrompt,
|
||||
@@ -1995,6 +2023,7 @@ export async function runEmbeddedPiAgent(
|
||||
replayInvalid,
|
||||
livenessState,
|
||||
toolSummary: attemptToolSummary,
|
||||
agentHarnessResultClassification: attempt.agentHarnessResultClassification,
|
||||
},
|
||||
didSendViaMessagingTool: attempt.didSendViaMessagingTool,
|
||||
didSendDeterministicApprovalPrompt: attempt.didSendDeterministicApprovalPrompt,
|
||||
@@ -2105,6 +2134,7 @@ export async function runEmbeddedPiAgent(
|
||||
replayInvalid,
|
||||
livenessState,
|
||||
toolSummary: attemptToolSummary,
|
||||
agentHarnessResultClassification: attempt.agentHarnessResultClassification,
|
||||
},
|
||||
didSendViaMessagingTool: attempt.didSendViaMessagingTool,
|
||||
didSendDeterministicApprovalPrompt: attempt.didSendDeterministicApprovalPrompt,
|
||||
@@ -2163,6 +2193,7 @@ export async function runEmbeddedPiAgent(
|
||||
finalAssistantRawText,
|
||||
replayInvalid,
|
||||
livenessState,
|
||||
agentHarnessResultClassification: attempt.agentHarnessResultClassification,
|
||||
// Handle client tool calls (OpenResponses hosted tools)
|
||||
// Propagate the LLM stop reason so callers (lifecycle events,
|
||||
// ACP bridge) can distinguish end_turn from max_tokens.
|
||||
|
||||
@@ -520,6 +520,8 @@ describe("runEmbeddedAttempt context engine sessionKey forwarding", () => {
|
||||
expect(flushMock).toHaveBeenCalledTimes(1);
|
||||
expect(disposeMock).toHaveBeenCalledTimes(1);
|
||||
expect(releaseMock).toHaveBeenCalledTimes(1);
|
||||
expect(hoisted.releaseWsSessionMock).toHaveBeenCalledWith("embedded-session");
|
||||
expect(hoisted.releaseWsSessionMock).toHaveBeenCalledWith("embedded-session", {
|
||||
allowPool: false,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -24,7 +24,8 @@ export async function cleanupEmbeddedAttemptResources(params: {
|
||||
}) => Promise<void>;
|
||||
session?: { agent?: unknown; dispose(): void };
|
||||
sessionManager: unknown;
|
||||
releaseWsSession: (sessionId: string) => void;
|
||||
releaseWsSession: (sessionId: string, options?: { allowPool?: boolean }) => void;
|
||||
allowWsSessionPool?: boolean;
|
||||
sessionId: string;
|
||||
bundleLspRuntime?: { dispose(): Promise<void> | void };
|
||||
sessionLock: { release(): Promise<void> | void };
|
||||
@@ -50,7 +51,7 @@ export async function cleanupEmbeddedAttemptResources(params: {
|
||||
/* best-effort */
|
||||
}
|
||||
try {
|
||||
params.releaseWsSession(params.sessionId);
|
||||
params.releaseWsSession(params.sessionId, { allowPool: params.allowWsSessionPool === true });
|
||||
} catch {
|
||||
/* best-effort */
|
||||
}
|
||||
|
||||
@@ -141,7 +141,11 @@ import { resolveCompactionTimeoutMs } from "../compaction-safety-timeout.js";
|
||||
import { runContextEngineMaintenance } from "../context-engine-maintenance.js";
|
||||
import { applyFinalEffectiveToolPolicy } from "../effective-tool-policy.js";
|
||||
import { buildEmbeddedExtensionFactories } from "../extensions.js";
|
||||
import { applyExtraParamsToAgent, resolveAgentTransportOverride } from "../extra-params.js";
|
||||
import {
|
||||
applyExtraParamsToAgent,
|
||||
resolveAgentTransportOverride,
|
||||
resolveExplicitSettingsTransport,
|
||||
} from "../extra-params.js";
|
||||
import { prepareGooglePromptCacheStreamFn } from "../google-prompt-cache.js";
|
||||
import { getDmHistoryLimitFromSessionKey, limitHistoryTurns } from "../history.js";
|
||||
import { log } from "../logger.js";
|
||||
@@ -224,7 +228,6 @@ import {
|
||||
import {
|
||||
buildAfterTurnRuntimeContext,
|
||||
buildAfterTurnRuntimeContextFromUsage,
|
||||
mergeOrphanedTrailingUserPrompt,
|
||||
prependSystemPromptAddition,
|
||||
resolveAttemptFsWorkspaceOnly,
|
||||
resolveAttemptPrependSystemContext,
|
||||
@@ -274,6 +277,7 @@ import { pruneProcessedHistoryImages } from "./history-image-prune.js";
|
||||
import { detectAndLoadPromptImages } from "./images.js";
|
||||
import { buildAttemptReplayMetadata } from "./incomplete-turn.js";
|
||||
import { resolveLlmIdleTimeoutMs, streamWithIdleTimeout } from "./llm-idle-timeout.js";
|
||||
import { resolveMessageMergeStrategy } from "./message-merge-strategy.js";
|
||||
import {
|
||||
PREEMPTIVE_OVERFLOW_ERROR_TEXT,
|
||||
shouldPreemptivelyCompactBeforePrompt,
|
||||
@@ -1381,6 +1385,10 @@ export async function runEmbeddedAttempt(
|
||||
effectiveWorkspace,
|
||||
params.model,
|
||||
agentDir,
|
||||
resolveExplicitSettingsTransport({
|
||||
settingsManager,
|
||||
sessionTransport: activeSession.agent.transport,
|
||||
}),
|
||||
);
|
||||
const effectivePromptCacheRetention = resolveCacheRetention(
|
||||
effectiveExtraParams,
|
||||
@@ -2066,7 +2074,7 @@ export async function runEmbeddedAttempt(
|
||||
// Repair orphaned trailing user messages so new prompts don't violate role ordering.
|
||||
const leafEntry = sessionManager.getLeafEntry();
|
||||
if (leafEntry?.type === "message" && leafEntry.message.role === "user") {
|
||||
const orphanPromptMerge = mergeOrphanedTrailingUserPrompt({
|
||||
const orphanPromptMerge = resolveMessageMergeStrategy().mergeOrphanedTrailingUserPrompt({
|
||||
prompt: effectivePrompt,
|
||||
trigger: params.trigger,
|
||||
leafMessage: leafEntry.message,
|
||||
@@ -2088,8 +2096,10 @@ export async function runEmbeddedAttempt(
|
||||
? "Merged and removed"
|
||||
: "Removed already-queued"
|
||||
: "Preserved"
|
||||
} orphaned user message ` +
|
||||
`to prevent consecutive user turns. ` +
|
||||
} orphaned user message` +
|
||||
(orphanPromptMerge.removeLeaf
|
||||
? " to prevent consecutive user turns. "
|
||||
: " without removing the active session leaf. ") +
|
||||
`runId=${params.runId} sessionId=${params.sessionId} trigger=${params.trigger}`;
|
||||
if (shouldWarnOnOrphanedUserRepair(params.trigger)) {
|
||||
log.warn(orphanRepairMessage);
|
||||
@@ -2694,6 +2704,7 @@ export async function runEmbeddedAttempt(
|
||||
sessionId: params.sessionId,
|
||||
provider: params.provider,
|
||||
model: params.modelId,
|
||||
resolvedRef: `${params.provider}/${params.modelId}`,
|
||||
assistantTexts,
|
||||
lastAssistant,
|
||||
usage: attemptUsage,
|
||||
@@ -2841,6 +2852,8 @@ export async function runEmbeddedAttempt(
|
||||
session,
|
||||
sessionManager,
|
||||
releaseWsSession,
|
||||
allowWsSessionPool:
|
||||
!promptError && !aborted && !timedOut && !idleTimedOut && !timedOutDuringCompaction,
|
||||
sessionId: params.sessionId,
|
||||
bundleLspRuntime,
|
||||
sessionLock,
|
||||
|
||||
@@ -0,0 +1,64 @@
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import {
|
||||
DEFAULT_MESSAGE_MERGE_STRATEGY_ID,
|
||||
registerMessageMergeStrategyForTest,
|
||||
resolveMessageMergeStrategy,
|
||||
type MessageMergeStrategy,
|
||||
} from "./message-merge-strategy.js";
|
||||
|
||||
let restoreStrategy: (() => void) | undefined;
|
||||
|
||||
afterEach(() => {
|
||||
restoreStrategy?.();
|
||||
restoreStrategy = undefined;
|
||||
});
|
||||
|
||||
describe("message merge strategy registry", () => {
|
||||
it("resolves the default orphan trailing user prompt strategy", () => {
|
||||
const strategy = resolveMessageMergeStrategy();
|
||||
|
||||
expect(strategy.id).toBe(DEFAULT_MESSAGE_MERGE_STRATEGY_ID);
|
||||
expect(
|
||||
strategy.mergeOrphanedTrailingUserPrompt({
|
||||
prompt: "newest inbound message",
|
||||
trigger: "user",
|
||||
leafMessage: { content: "older active-turn message" },
|
||||
}),
|
||||
).toEqual({
|
||||
merged: true,
|
||||
removeLeaf: true,
|
||||
prompt:
|
||||
"[Queued user message that arrived while the previous turn was still active]\n" +
|
||||
"older active-turn message\n\nnewest inbound message",
|
||||
});
|
||||
});
|
||||
|
||||
it("allows tests to override and restore the active strategy", () => {
|
||||
const override: MessageMergeStrategy = {
|
||||
id: DEFAULT_MESSAGE_MERGE_STRATEGY_ID,
|
||||
mergeOrphanedTrailingUserPrompt: (params) => ({
|
||||
prompt: `override: ${params.prompt}`,
|
||||
merged: false,
|
||||
removeLeaf: false,
|
||||
}),
|
||||
};
|
||||
|
||||
restoreStrategy = registerMessageMergeStrategyForTest(override);
|
||||
|
||||
expect(
|
||||
resolveMessageMergeStrategy().mergeOrphanedTrailingUserPrompt({
|
||||
prompt: "next",
|
||||
trigger: "manual",
|
||||
leafMessage: { content: "previous" },
|
||||
}),
|
||||
).toEqual({
|
||||
prompt: "override: next",
|
||||
merged: false,
|
||||
removeLeaf: false,
|
||||
});
|
||||
|
||||
restoreStrategy();
|
||||
restoreStrategy = undefined;
|
||||
expect(resolveMessageMergeStrategy()).not.toBe(override);
|
||||
});
|
||||
});
|
||||
54
src/agents/pi-embedded-runner/run/message-merge-strategy.ts
Normal file
54
src/agents/pi-embedded-runner/run/message-merge-strategy.ts
Normal file
@@ -0,0 +1,54 @@
|
||||
import { mergeOrphanedTrailingUserPrompt } from "./attempt.prompt-helpers.js";
|
||||
import type { EmbeddedRunAttemptParams } from "./types.js";
|
||||
|
||||
export type OrphanedTrailingUserPromptMergeParams = {
|
||||
prompt: string;
|
||||
trigger: EmbeddedRunAttemptParams["trigger"];
|
||||
leafMessage: { content?: unknown };
|
||||
};
|
||||
|
||||
export type OrphanedTrailingUserPromptMergeResult = {
|
||||
prompt: string;
|
||||
merged: boolean;
|
||||
/**
|
||||
* When false, the active session leaf is preserved. Use this only when the
|
||||
* caller intentionally accepts that the next appended prompt may follow an
|
||||
* existing user leaf; most providers reject consecutive user turns.
|
||||
*/
|
||||
removeLeaf: boolean;
|
||||
};
|
||||
|
||||
export type MessageMergeStrategyId = "orphan-trailing-user-prompt";
|
||||
|
||||
export type MessageMergeStrategy = {
|
||||
id: MessageMergeStrategyId;
|
||||
mergeOrphanedTrailingUserPrompt: (
|
||||
params: OrphanedTrailingUserPromptMergeParams,
|
||||
) => OrphanedTrailingUserPromptMergeResult;
|
||||
};
|
||||
|
||||
export const DEFAULT_MESSAGE_MERGE_STRATEGY_ID: MessageMergeStrategyId =
|
||||
"orphan-trailing-user-prompt";
|
||||
|
||||
const defaultMessageMergeStrategy: MessageMergeStrategy = {
|
||||
id: DEFAULT_MESSAGE_MERGE_STRATEGY_ID,
|
||||
mergeOrphanedTrailingUserPrompt,
|
||||
};
|
||||
|
||||
let activeMessageMergeStrategy = defaultMessageMergeStrategy;
|
||||
|
||||
export function resolveMessageMergeStrategy(): MessageMergeStrategy {
|
||||
return activeMessageMergeStrategy;
|
||||
}
|
||||
|
||||
function registerMessageMergeStrategy(strategy: MessageMergeStrategy): () => void {
|
||||
const previous = activeMessageMergeStrategy;
|
||||
activeMessageMergeStrategy = strategy;
|
||||
return () => {
|
||||
activeMessageMergeStrategy = previous;
|
||||
};
|
||||
}
|
||||
|
||||
export function registerMessageMergeStrategyForTest(strategy: MessageMergeStrategy): () => void {
|
||||
return registerMessageMergeStrategy(strategy);
|
||||
}
|
||||
@@ -75,6 +75,7 @@ export type EmbeddedRunAttemptResult = {
|
||||
sessionIdUsed: string;
|
||||
diagnosticTrace?: DiagnosticTraceContext;
|
||||
agentHarnessId?: string;
|
||||
agentHarnessResultClassification?: "empty" | "reasoning-only" | "planning-only";
|
||||
bootstrapPromptWarningSignaturesSeen?: string[];
|
||||
bootstrapPromptWarningSignature?: string;
|
||||
systemPromptReport?: SessionSystemPromptReport;
|
||||
|
||||
@@ -106,6 +106,7 @@ export type EmbeddedPiRunMeta = {
|
||||
finalAssistantRawText?: string;
|
||||
replayInvalid?: boolean;
|
||||
livenessState?: EmbeddedRunLivenessState;
|
||||
agentHarnessResultClassification?: "empty" | "reasoning-only" | "planning-only";
|
||||
error?: {
|
||||
kind:
|
||||
| "context_overflow"
|
||||
|
||||
18
src/agents/provider-api-families.test.ts
Normal file
18
src/agents/provider-api-families.test.ts
Normal file
@@ -0,0 +1,18 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { supportsGptParallelToolCallsPayload } from "./provider-api-families.js";
|
||||
|
||||
describe("provider api families", () => {
|
||||
it.each([
|
||||
"openai-completions",
|
||||
"openai-responses",
|
||||
"openai-codex-responses",
|
||||
"azure-openai-responses",
|
||||
])("classifies %s as supporting the GPT parallel_tool_calls payload patch", (api) => {
|
||||
expect(supportsGptParallelToolCallsPayload(api)).toBe(true);
|
||||
});
|
||||
|
||||
it("rejects unrelated APIs", () => {
|
||||
expect(supportsGptParallelToolCallsPayload("anthropic-messages")).toBe(false);
|
||||
expect(supportsGptParallelToolCallsPayload(undefined)).toBe(false);
|
||||
});
|
||||
});
|
||||
10
src/agents/provider-api-families.ts
Normal file
10
src/agents/provider-api-families.ts
Normal file
@@ -0,0 +1,10 @@
|
||||
const GPT_PARALLEL_TOOL_CALLS_APIS = new Set([
|
||||
"openai-completions",
|
||||
"openai-responses",
|
||||
"openai-codex-responses",
|
||||
"azure-openai-responses",
|
||||
]);
|
||||
|
||||
export function supportsGptParallelToolCallsPayload(api: unknown): boolean {
|
||||
return typeof api === "string" && GPT_PARALLEL_TOOL_CALLS_APIS.has(api);
|
||||
}
|
||||
@@ -13,6 +13,7 @@ const isRoutableChannelMock = vi.fn();
|
||||
const runPreflightCompactionIfNeededMock = vi.fn();
|
||||
const resolveCommandSecretRefsViaGatewayMock = vi.fn();
|
||||
const resolveQueuedReplyExecutionConfigMock = vi.fn();
|
||||
const resolveProviderFollowupFallbackRouteMock = vi.fn();
|
||||
let resolveQueuedReplyExecutionConfigActual:
|
||||
| (typeof import("./agent-runner-utils.js"))["resolveQueuedReplyExecutionConfig"]
|
||||
| undefined;
|
||||
@@ -281,6 +282,16 @@ async function loadFreshFollowupRunnerModuleForTest() {
|
||||
isRoutableChannel: (...args: unknown[]) => isRoutableChannelMock(...args),
|
||||
routeReply: (...args: unknown[]) => routeReplyMock(...args),
|
||||
}));
|
||||
vi.doMock("../../plugins/provider-runtime.js", async () => {
|
||||
const actual = await vi.importActual<typeof import("../../plugins/provider-runtime.js")>(
|
||||
"../../plugins/provider-runtime.js",
|
||||
);
|
||||
return {
|
||||
...actual,
|
||||
resolveProviderFollowupFallbackRoute: (...args: unknown[]) =>
|
||||
resolveProviderFollowupFallbackRouteMock(...args),
|
||||
};
|
||||
});
|
||||
vi.doMock("./agent-runner-utils.js", async () => {
|
||||
const actual =
|
||||
await vi.importActual<typeof import("./agent-runner-utils.js")>("./agent-runner-utils.js");
|
||||
@@ -358,6 +369,8 @@ beforeEach(() => {
|
||||
runPreflightCompactionIfNeededMock.mockReset();
|
||||
resolveCommandSecretRefsViaGatewayMock.mockReset();
|
||||
resolveQueuedReplyExecutionConfigMock.mockReset();
|
||||
resolveProviderFollowupFallbackRouteMock.mockReset();
|
||||
resolveProviderFollowupFallbackRouteMock.mockReturnValue(undefined);
|
||||
const resolveQueuedReplyExecutionConfig = resolveQueuedReplyExecutionConfigActual;
|
||||
if (!resolveQueuedReplyExecutionConfig) {
|
||||
throw new Error("resolveQueuedReplyExecutionConfig mock not initialized");
|
||||
@@ -1348,6 +1361,55 @@ describe("createFollowupRunner messaging tool dedupe", () => {
|
||||
expect(onBlockReply).toHaveBeenCalledWith(expect.objectContaining({ text: "hello world!" }));
|
||||
});
|
||||
|
||||
it("lets provider followup route hooks force dispatcher delivery", async () => {
|
||||
resolveProviderFollowupFallbackRouteMock.mockReturnValue({
|
||||
route: "dispatcher",
|
||||
reason: "operator-visible review copy",
|
||||
});
|
||||
const { onBlockReply } = await runMessagingCase({
|
||||
agentResult: { payloads: [{ text: "hello world!" }] },
|
||||
queued: {
|
||||
...baseQueuedRun("webchat"),
|
||||
originatingChannel: "discord",
|
||||
originatingTo: "channel:C1",
|
||||
} as FollowupRun,
|
||||
});
|
||||
|
||||
expect(routeReplyMock).not.toHaveBeenCalled();
|
||||
expect(onBlockReply).toHaveBeenCalledTimes(1);
|
||||
expect(onBlockReply).toHaveBeenCalledWith(expect.objectContaining({ text: "hello world!" }));
|
||||
expect(resolveProviderFollowupFallbackRouteMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
provider: "anthropic",
|
||||
context: expect.objectContaining({
|
||||
provider: "anthropic",
|
||||
modelId: "claude",
|
||||
originRoutable: true,
|
||||
dispatcherAvailable: true,
|
||||
payload: expect.objectContaining({ text: "hello world!" }),
|
||||
}),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("lets provider followup route hooks drop payloads explicitly", async () => {
|
||||
resolveProviderFollowupFallbackRouteMock.mockReturnValue({
|
||||
route: "drop",
|
||||
reason: "already delivered out of band",
|
||||
});
|
||||
const { onBlockReply } = await runMessagingCase({
|
||||
agentResult: { payloads: [{ text: "hello world!" }] },
|
||||
queued: {
|
||||
...baseQueuedRun("webchat"),
|
||||
originatingChannel: "discord",
|
||||
originatingTo: "channel:C1",
|
||||
} as FollowupRun,
|
||||
});
|
||||
|
||||
expect(routeReplyMock).not.toHaveBeenCalled();
|
||||
expect(onBlockReply).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("falls back to dispatcher when same-channel origin routing fails", async () => {
|
||||
routeReplyMock.mockResolvedValueOnce({
|
||||
ok: false,
|
||||
|
||||
@@ -16,6 +16,7 @@ import type { TypingMode } from "../../config/types.js";
|
||||
import { logVerbose } from "../../globals.js";
|
||||
import { emitAgentEvent, registerAgentRunContext } from "../../infra/agent-events.js";
|
||||
import { formatErrorMessage } from "../../infra/errors.js";
|
||||
import { resolveProviderFollowupFallbackRoute } from "../../plugins/provider-runtime.js";
|
||||
import { defaultRuntime } from "../../runtime.js";
|
||||
import { isInternalMessageChannel } from "../../utils/message-channel.js";
|
||||
import { stripHeartbeatToken } from "../heartbeat.js";
|
||||
@@ -74,7 +75,11 @@ export function createFollowupRunner(params: {
|
||||
* session's current dispatcher. This ensures replies go back to
|
||||
* where the message originated.
|
||||
*/
|
||||
const sendFollowupPayloads = async (payloads: ReplyPayload[], queued: FollowupRun) => {
|
||||
const sendFollowupPayloads = async (
|
||||
payloads: ReplyPayload[],
|
||||
queued: FollowupRun,
|
||||
resolvedRun: { provider: string; modelId: string },
|
||||
) => {
|
||||
// Check if we should route to originating channel.
|
||||
const { originatingChannel, originatingTo } = queued;
|
||||
const runtimeConfig = resolveQueuedReplyRuntimeConfig(queued.run.config);
|
||||
@@ -99,10 +104,43 @@ export function createFollowupRunner(params: {
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
const providerRoute = resolveProviderFollowupFallbackRoute({
|
||||
provider: resolvedRun.provider,
|
||||
config: runtimeConfig,
|
||||
workspaceDir: queued.run.workspaceDir,
|
||||
context: {
|
||||
config: runtimeConfig,
|
||||
agentDir: queued.run.agentDir,
|
||||
workspaceDir: queued.run.workspaceDir,
|
||||
provider: resolvedRun.provider,
|
||||
modelId: resolvedRun.modelId,
|
||||
payload,
|
||||
originatingChannel,
|
||||
originatingTo,
|
||||
originRoutable: Boolean(shouldRouteToOriginating),
|
||||
dispatcherAvailable: Boolean(opts?.onBlockReply),
|
||||
},
|
||||
});
|
||||
if (providerRoute?.route === "drop") {
|
||||
logVerbose(
|
||||
`followup queue: provider hook dropped payload route reason=${providerRoute.reason ?? "unspecified"}`,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
const deliveryRoute =
|
||||
providerRoute?.route === "origin" && shouldRouteToOriginating
|
||||
? "origin"
|
||||
: providerRoute?.route === "dispatcher" && opts?.onBlockReply
|
||||
? "dispatcher"
|
||||
: shouldRouteToOriginating
|
||||
? "origin"
|
||||
: opts?.onBlockReply
|
||||
? "dispatcher"
|
||||
: undefined;
|
||||
await typingSignals.signalTextDelta(payload.text);
|
||||
|
||||
// Route to originating channel if set, otherwise fall back to dispatcher.
|
||||
if (shouldRouteToOriginating) {
|
||||
if (deliveryRoute === "origin" && isRoutableChannel(originatingChannel) && originatingTo) {
|
||||
const result = await routeReply({
|
||||
payload,
|
||||
channel: originatingChannel,
|
||||
@@ -145,7 +183,7 @@ export function createFollowupRunner(params: {
|
||||
routedAnyCrossChannelPayloadToOrigin = true;
|
||||
}
|
||||
}
|
||||
} else if (opts?.onBlockReply) {
|
||||
} else if (deliveryRoute === "dispatcher" && opts?.onBlockReply) {
|
||||
await opts.onBlockReply(payload);
|
||||
}
|
||||
}
|
||||
@@ -438,7 +476,10 @@ export function createFollowupRunner(params: {
|
||||
}
|
||||
}
|
||||
|
||||
await sendFollowupPayloads(finalPayloads, effectiveQueued);
|
||||
await sendFollowupPayloads(finalPayloads, effectiveQueued, {
|
||||
provider: providerUsed,
|
||||
modelId: modelUsed,
|
||||
});
|
||||
} finally {
|
||||
replyOperation.complete();
|
||||
// Both signals are required for the typing controller to clean up.
|
||||
|
||||
@@ -14,6 +14,7 @@ export type {
|
||||
AgentHarnessAttemptResult,
|
||||
AgentHarnessCompactParams,
|
||||
AgentHarnessCompactResult,
|
||||
AgentHarnessResultClassification,
|
||||
AgentHarnessResetParams,
|
||||
AgentHarnessSupport,
|
||||
AgentHarnessSupportContext,
|
||||
|
||||
@@ -192,6 +192,14 @@ export type PluginHookLlmOutputEvent = {
|
||||
sessionId: string;
|
||||
provider: string;
|
||||
model: string;
|
||||
/**
|
||||
* Fully resolved provider/model ref used for the call.
|
||||
*
|
||||
* This intentionally keeps the provider prefix so operator tooling can
|
||||
* distinguish e.g. openai-codex/gpt-5.4 from codex/gpt-5.4 even when display
|
||||
* names collapse to just the model id.
|
||||
*/
|
||||
resolvedRef?: string;
|
||||
assistantTexts: string[];
|
||||
lastAssistant?: unknown;
|
||||
usage?: {
|
||||
|
||||
@@ -6,7 +6,11 @@ import { resolvePluginCacheInputs } from "./roots.js";
|
||||
import { getActivePluginRegistryWorkspaceDirFromState } from "./runtime-state.js";
|
||||
import type {
|
||||
ProviderPlugin,
|
||||
ProviderExtraParamsForTransportContext,
|
||||
ProviderPrepareExtraParamsContext,
|
||||
ProviderResolveAuthProfileIdContext,
|
||||
ProviderFollowupFallbackRouteContext,
|
||||
ProviderFollowupFallbackRouteResult,
|
||||
ProviderWrapStreamFnContext,
|
||||
} from "./types.js";
|
||||
|
||||
@@ -182,6 +186,37 @@ export function prepareProviderExtraParams(params: {
|
||||
return resolveProviderRuntimePlugin(params)?.prepareExtraParams?.(params.context) ?? undefined;
|
||||
}
|
||||
|
||||
export function resolveProviderExtraParamsForTransport(params: {
|
||||
provider: string;
|
||||
config?: OpenClawConfig;
|
||||
workspaceDir?: string;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
context: ProviderExtraParamsForTransportContext;
|
||||
}) {
|
||||
return resolveProviderHookPlugin(params)?.extraParamsForTransport?.(params.context) ?? undefined;
|
||||
}
|
||||
|
||||
export function resolveProviderAuthProfileId(params: {
|
||||
provider: string;
|
||||
config?: OpenClawConfig;
|
||||
workspaceDir?: string;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
context: ProviderResolveAuthProfileIdContext;
|
||||
}): string | undefined {
|
||||
const resolved = resolveProviderHookPlugin(params)?.resolveAuthProfileId?.(params.context);
|
||||
return typeof resolved === "string" && resolved.trim() ? resolved.trim() : undefined;
|
||||
}
|
||||
|
||||
export function resolveProviderFollowupFallbackRoute(params: {
|
||||
provider: string;
|
||||
config?: OpenClawConfig;
|
||||
workspaceDir?: string;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
context: ProviderFollowupFallbackRouteContext;
|
||||
}): ProviderFollowupFallbackRouteResult | undefined {
|
||||
return resolveProviderHookPlugin(params)?.followupFallbackRoute?.(params.context) ?? undefined;
|
||||
}
|
||||
|
||||
export function wrapProviderStreamFn(params: {
|
||||
provider: string;
|
||||
config?: OpenClawConfig;
|
||||
|
||||
@@ -53,7 +53,10 @@ let applyProviderResolvedModelCompatWithPlugins: typeof import("./provider-runti
|
||||
let applyProviderResolvedTransportWithPlugin: typeof import("./provider-runtime.js").applyProviderResolvedTransportWithPlugin;
|
||||
let normalizeProviderTransportWithPlugin: typeof import("./provider-runtime.js").normalizeProviderTransportWithPlugin;
|
||||
let prepareProviderExtraParams: typeof import("./provider-runtime.js").prepareProviderExtraParams;
|
||||
let resolveProviderAuthProfileId: typeof import("./provider-runtime.js").resolveProviderAuthProfileId;
|
||||
let resolveProviderConfigApiKeyWithPlugin: typeof import("./provider-runtime.js").resolveProviderConfigApiKeyWithPlugin;
|
||||
let resolveProviderExtraParamsForTransport: typeof import("./provider-runtime.js").resolveProviderExtraParamsForTransport;
|
||||
let resolveProviderFollowupFallbackRoute: typeof import("./provider-runtime.js").resolveProviderFollowupFallbackRoute;
|
||||
let resolveProviderStreamFn: typeof import("./provider-runtime.js").resolveProviderStreamFn;
|
||||
let resolveProviderCacheTtlEligibility: typeof import("./provider-runtime.js").resolveProviderCacheTtlEligibility;
|
||||
let resolveProviderBinaryThinking: typeof import("./provider-runtime.js").resolveProviderBinaryThinking;
|
||||
@@ -280,7 +283,10 @@ describe("provider-runtime", () => {
|
||||
normalizeProviderModelIdWithPlugin,
|
||||
normalizeProviderTransportWithPlugin,
|
||||
prepareProviderExtraParams,
|
||||
resolveProviderAuthProfileId,
|
||||
resolveProviderConfigApiKeyWithPlugin,
|
||||
resolveProviderExtraParamsForTransport,
|
||||
resolveProviderFollowupFallbackRoute,
|
||||
resolveProviderStreamFn,
|
||||
resolveProviderCacheTtlEligibility,
|
||||
resolveProviderBinaryThinking,
|
||||
@@ -528,6 +534,84 @@ describe("provider-runtime", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("exposes provider-owned transport extra params", () => {
|
||||
const extraParamsForTransport = vi.fn((_ctx) => ({
|
||||
patch: {
|
||||
providerTransportPatch: true,
|
||||
},
|
||||
}));
|
||||
resolvePluginProvidersMock.mockReturnValue([
|
||||
{
|
||||
id: DEMO_PROVIDER_ID,
|
||||
label: "Demo",
|
||||
auth: [],
|
||||
extraParamsForTransport,
|
||||
} satisfies ProviderPlugin,
|
||||
]);
|
||||
|
||||
expect(
|
||||
resolveProviderExtraParamsForTransport({
|
||||
provider: DEMO_PROVIDER_ID,
|
||||
context: createDemoResolvedModelContext({
|
||||
extraParams: { transport: "websocket" },
|
||||
transport: "websocket" as const,
|
||||
}),
|
||||
}),
|
||||
).toEqual({
|
||||
patch: {
|
||||
providerTransportPatch: true,
|
||||
},
|
||||
});
|
||||
expect(extraParamsForTransport).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
provider: DEMO_PROVIDER_ID,
|
||||
modelId: MODEL.id,
|
||||
model: MODEL,
|
||||
transport: "websocket",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("exposes provider-owned auth profile and fallback route seams", () => {
|
||||
const resolveAuthProfileId = vi.fn(() => "profile-b");
|
||||
const followupFallbackRoute = vi.fn(() => ({
|
||||
route: "dispatcher" as const,
|
||||
reason: "origin unavailable",
|
||||
}));
|
||||
resolvePluginProvidersMock.mockReturnValue([
|
||||
{
|
||||
id: DEMO_PROVIDER_ID,
|
||||
label: "Demo",
|
||||
auth: [],
|
||||
resolveAuthProfileId,
|
||||
followupFallbackRoute,
|
||||
} satisfies ProviderPlugin,
|
||||
]);
|
||||
|
||||
expect(
|
||||
resolveProviderAuthProfileId({
|
||||
provider: DEMO_PROVIDER_ID,
|
||||
context: createDemoRuntimeContext({
|
||||
profileOrder: ["profile-a", "profile-b"],
|
||||
authStore: { version: 1, profiles: {}, order: {} },
|
||||
}),
|
||||
}),
|
||||
).toBe("profile-b");
|
||||
expect(
|
||||
resolveProviderFollowupFallbackRoute({
|
||||
provider: DEMO_PROVIDER_ID,
|
||||
context: createDemoRuntimeContext({
|
||||
payload: { text: "hello" },
|
||||
originRoutable: false,
|
||||
dispatcherAvailable: true,
|
||||
}),
|
||||
}),
|
||||
).toEqual({
|
||||
route: "dispatcher",
|
||||
reason: "origin unavailable",
|
||||
});
|
||||
});
|
||||
|
||||
it("applies the shared GPT-5 prompt overlay for any provider", () => {
|
||||
const contribution = resolveProviderSystemPromptContribution({
|
||||
provider: "openrouter",
|
||||
@@ -567,6 +651,45 @@ describe("provider-runtime", () => {
|
||||
expect(contribution?.sectionOverrides).toEqual({});
|
||||
});
|
||||
|
||||
it("lets provider-owned prompt overlays compose after the built-in GPT-5 overlay", () => {
|
||||
const resolvePromptOverlay = vi.fn((ctx) => ({
|
||||
stablePrefix: "provider overlay",
|
||||
sectionOverrides: {
|
||||
execution_bias: ctx.baseOverlay?.stablePrefix ? "saw built-in overlay" : "missing",
|
||||
},
|
||||
}));
|
||||
resolvePluginProvidersMock.mockReturnValue([
|
||||
{
|
||||
id: "openrouter",
|
||||
label: "OpenRouter",
|
||||
auth: [],
|
||||
resolvePromptOverlay,
|
||||
} satisfies ProviderPlugin,
|
||||
]);
|
||||
|
||||
const contribution = resolveProviderSystemPromptContribution({
|
||||
provider: "openrouter",
|
||||
context: {
|
||||
provider: "openrouter",
|
||||
modelId: "openai/gpt-5.4",
|
||||
promptMode: "full",
|
||||
} as never,
|
||||
});
|
||||
|
||||
expect(contribution?.stablePrefix).toContain("<persona_latch>");
|
||||
expect(contribution?.stablePrefix).toContain("provider overlay");
|
||||
expect(contribution?.sectionOverrides?.execution_bias).toBe("saw built-in overlay");
|
||||
expect(resolvePromptOverlay).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
provider: "openrouter",
|
||||
modelId: "openai/gpt-5.4",
|
||||
baseOverlay: expect.objectContaining({
|
||||
stablePrefix: expect.stringContaining("<persona_latch>"),
|
||||
}),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("ignores OpenAI plugin personality fallback for non-OpenAI GPT-5 providers", () => {
|
||||
const contribution = resolveProviderSystemPromptContribution({
|
||||
provider: "openrouter",
|
||||
|
||||
@@ -16,6 +16,9 @@ import {
|
||||
clearProviderRuntimeHookCache,
|
||||
prepareProviderExtraParams,
|
||||
resetProviderRuntimeHookCacheForTest,
|
||||
resolveProviderAuthProfileId,
|
||||
resolveProviderExtraParamsForTransport,
|
||||
resolveProviderFollowupFallbackRoute,
|
||||
resolveProviderHookPlugin,
|
||||
resolveProviderPluginsForHooks,
|
||||
resolveProviderRuntimePlugin,
|
||||
@@ -88,6 +91,9 @@ function resetExternalAuthFallbackWarningCacheForTest(): void {
|
||||
export {
|
||||
clearProviderRuntimeHookCache,
|
||||
prepareProviderExtraParams,
|
||||
resolveProviderAuthProfileId,
|
||||
resolveProviderExtraParamsForTransport,
|
||||
resolveProviderFollowupFallbackRoute,
|
||||
resetProviderRuntimeHookCacheForTest,
|
||||
resolveProviderRuntimePlugin,
|
||||
wrapProviderStreamFn,
|
||||
@@ -136,14 +142,20 @@ export function resolveProviderSystemPromptContribution(params: {
|
||||
env?: NodeJS.ProcessEnv;
|
||||
context: ProviderSystemPromptContributionContext;
|
||||
}): ProviderSystemPromptContribution | undefined {
|
||||
const plugin = resolveProviderRuntimePlugin(params);
|
||||
const baseOverlay = resolveGpt5SystemPromptContribution({
|
||||
config: params.context.config ?? params.config,
|
||||
providerId: params.context.provider ?? params.provider,
|
||||
modelId: params.context.modelId,
|
||||
});
|
||||
const providerOverlay =
|
||||
plugin?.resolvePromptOverlay?.({
|
||||
...params.context,
|
||||
baseOverlay,
|
||||
}) ?? undefined;
|
||||
return mergeProviderSystemPromptContributions(
|
||||
resolveGpt5SystemPromptContribution({
|
||||
config: params.context.config ?? params.config,
|
||||
providerId: params.context.provider ?? params.provider,
|
||||
modelId: params.context.modelId,
|
||||
}),
|
||||
resolveProviderRuntimePlugin(params)?.resolveSystemPromptContribution?.(params.context) ??
|
||||
undefined,
|
||||
mergeProviderSystemPromptContributions(baseOverlay, providerOverlay),
|
||||
plugin?.resolveSystemPromptContribution?.(params.context) ?? undefined,
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -600,6 +600,53 @@ export type ProviderPrepareExtraParamsContext = {
|
||||
thinkingLevel?: ThinkLevel;
|
||||
};
|
||||
|
||||
export type ProviderExtraParamsForTransportContext = Omit<
|
||||
ProviderPrepareExtraParamsContext,
|
||||
"extraParams"
|
||||
> & {
|
||||
model?: ProviderRuntimeModel;
|
||||
transport?: "sse" | "websocket" | "auto";
|
||||
extraParams: Record<string, unknown>;
|
||||
};
|
||||
|
||||
export type ProviderExtraParamsForTransportResult = {
|
||||
patch?: Record<string, unknown> | null;
|
||||
};
|
||||
|
||||
export type ProviderResolvePromptOverlayContext = ProviderSystemPromptContributionContext & {
|
||||
baseOverlay?: ProviderSystemPromptContribution;
|
||||
};
|
||||
|
||||
export type ProviderFollowupFallbackRouteContext = {
|
||||
config?: OpenClawConfig;
|
||||
agentDir?: string;
|
||||
workspaceDir?: string;
|
||||
provider: string;
|
||||
modelId: string;
|
||||
payload: ReplyPayload;
|
||||
originatingChannel?: string;
|
||||
originatingTo?: string;
|
||||
originRoutable: boolean;
|
||||
dispatcherAvailable: boolean;
|
||||
};
|
||||
|
||||
export type ProviderFollowupFallbackRouteResult = {
|
||||
route?: "origin" | "dispatcher" | "drop";
|
||||
reason?: string;
|
||||
};
|
||||
|
||||
export type ProviderResolveAuthProfileIdContext = {
|
||||
config?: OpenClawConfig;
|
||||
agentDir?: string;
|
||||
workspaceDir?: string;
|
||||
provider: string;
|
||||
modelId: string;
|
||||
preferredProfileId?: string;
|
||||
lockedProfileId?: string;
|
||||
profileOrder: string[];
|
||||
authStore: AuthProfileStore;
|
||||
};
|
||||
|
||||
export type ProviderReplaySanitizeMode = "full" | "images-only";
|
||||
|
||||
export type ProviderReplayToolCallIdMode = "strict" | "strict9";
|
||||
@@ -1269,6 +1316,15 @@ export type ProviderPlugin = {
|
||||
prepareExtraParams?: (
|
||||
ctx: ProviderPrepareExtraParamsContext,
|
||||
) => Record<string, unknown> | null | undefined;
|
||||
/**
|
||||
* Provider-owned request params after transport/model resolution.
|
||||
*
|
||||
* Use this for transport-family request knobs that should be keyed by the
|
||||
* resolved model API/transport rather than a hardcoded core allowlist.
|
||||
*/
|
||||
extraParamsForTransport?: (
|
||||
ctx: ProviderExtraParamsForTransportContext,
|
||||
) => ProviderExtraParamsForTransportResult | null | undefined;
|
||||
/**
|
||||
* Provider-owned transport factory.
|
||||
*
|
||||
@@ -1464,6 +1520,30 @@ export type ProviderPlugin = {
|
||||
resolveSystemPromptContribution?: (
|
||||
ctx: ProviderSystemPromptContributionContext,
|
||||
) => ProviderSystemPromptContribution | null | undefined;
|
||||
/**
|
||||
* Provider-owned GPT/model prompt overlay seam.
|
||||
*
|
||||
* Runs after OpenClaw's built-in overlay is resolved and before the
|
||||
* provider's regular system-prompt contribution is merged.
|
||||
*/
|
||||
resolvePromptOverlay?: (
|
||||
ctx: ProviderResolvePromptOverlayContext,
|
||||
) => ProviderSystemPromptContribution | null | undefined;
|
||||
/**
|
||||
* Provider-owned fallback route override for model/profile failure handling.
|
||||
*
|
||||
* Return undefined/null to keep OpenClaw's default fallback policy.
|
||||
*/
|
||||
followupFallbackRoute?: (
|
||||
ctx: ProviderFollowupFallbackRouteContext,
|
||||
) => ProviderFollowupFallbackRouteResult | null | undefined;
|
||||
/**
|
||||
* Provider-owned auth profile resolver.
|
||||
*
|
||||
* Return a profile id from the supplied order to prefer it for this attempt;
|
||||
* invalid or missing ids are ignored by core.
|
||||
*/
|
||||
resolveAuthProfileId?: (ctx: ProviderResolveAuthProfileIdContext) => string | null | undefined;
|
||||
/**
|
||||
* Provider-owned final system-prompt transform.
|
||||
*
|
||||
|
||||
Reference in New Issue
Block a user