From c7b8414fe6be93033dbb8d83667da7c5740b20fb Mon Sep 17 00:00:00 2001 From: Davanum Srinivas Date: Sat, 14 Mar 2026 23:04:29 -0400 Subject: [PATCH] fix: preflight OpenResponses tool conflicts before SSE Move client-tool conflict validation ahead of the OpenResponses stream/non- stream split so invalid `clientTools` fail before any response body or SSE event is emitted. Thread the validated tool set through the gateway and embedded runner entry points so `/v1/responses` and embedded agent runs share the same preflight behavior. Add HTTP and agent regression coverage for duplicate names, alias collisions, and the no-partial-SSE contract. --- src/agents/pi-embedded-runner/run.ts | 1 + src/agents/pi-embedded-runner/run/attempt.ts | 1 + src/agents/pi-embedded-runner/run/params.ts | 2 + src/commands/agent.test.ts | 14 +++ src/commands/agent.ts | 1 + src/commands/agent/types.ts | 2 + src/gateway/openresponses-http.test.ts | 62 +++++++++++-- src/gateway/openresponses-http.ts | 97 +++++++++++--------- 8 files changed, 129 insertions(+), 51 deletions(-) diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 65d87712ca8..764622c88ab 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -888,6 +888,7 @@ export async function runEmbeddedPiAgent( prompt, images: params.images, disableTools: params.disableTools, + onPreflightPassed: params.onPreflightPassed, provider, modelId, model: applyLocalNoAuthHeaderOverride(effectiveModel, apiKeyInfo), diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 2dc1adf9898..0bd442834a3 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -1568,6 +1568,7 @@ export async function runEmbeddedAttempt( if (clientToolNameConflicts.length > 0) { throw createClientToolNameConflictError(clientToolNameConflicts); } + await params.onPreflightPassed?.(); const allowedToolNames = collectAllowedToolNames({ tools, clientTools, diff --git a/src/agents/pi-embedded-runner/run/params.ts b/src/agents/pi-embedded-runner/run/params.ts index ba69d991dd9..75ec684780d 100644 --- a/src/agents/pi-embedded-runner/run/params.ts +++ b/src/agents/pi-embedded-runner/run/params.ts @@ -72,6 +72,8 @@ export type RunEmbeddedPiAgentParams = { images?: ImageContent[]; /** Optional client-provided tools (OpenResponses hosted tools). */ clientTools?: ClientToolDefinition[]; + /** Invoked after deterministic tool preflight checks succeed for an attempt. */ + onPreflightPassed?: () => void | Promise; /** Disable built-in tools for this run (LLM-only mode). */ disableTools?: boolean; provider?: string; diff --git a/src/commands/agent.test.ts b/src/commands/agent.test.ts index 5b4fc2c9040..23c2647d2bc 100644 --- a/src/commands/agent.test.ts +++ b/src/commands/agent.test.ts @@ -415,6 +415,20 @@ describe("agentCommand", () => { }); }); + it("passes ingress preflight callbacks through to embedded runs", async () => { + await withTempHome(async (home) => { + const store = path.join(home, "sessions.json"); + mockConfig(home, store); + const onPreflightPassed = vi.fn(); + await agentCommandFromIngress( + { message: "hi", to: "+1555", senderIsOwner: false, onPreflightPassed }, + runtime, + ); + const ingressCall = vi.mocked(runEmbeddedPiAgent).mock.calls.at(-1)?.[0]; + expect(ingressCall?.onPreflightPassed).toBe(onPreflightPassed); + }); + }); + it("resumes when session-id is provided", async () => { await withTempHome(async (home) => { const store = path.join(home, "sessions.json"); diff --git a/src/commands/agent.ts b/src/commands/agent.ts index ab690b37666..c7fb4910db5 100644 --- a/src/commands/agent.ts +++ b/src/commands/agent.ts @@ -480,6 +480,7 @@ function runAgentAttempt(params: { prompt: effectivePrompt, images: params.isFallbackRetry ? undefined : params.opts.images, clientTools: params.opts.clientTools, + onPreflightPassed: params.opts.onPreflightPassed, provider: params.providerOverride, model: params.modelOverride, authProfileId, diff --git a/src/commands/agent/types.ts b/src/commands/agent/types.ts index 66d0209bdfb..80a682dc6f4 100644 --- a/src/commands/agent/types.ts +++ b/src/commands/agent/types.ts @@ -37,6 +37,8 @@ export type AgentCommandOpts = { images?: ImageContent[]; /** Optional client-provided tools (OpenResponses hosted tools). */ clientTools?: ClientToolDefinition[]; + /** Invoked after embedded-runner tool preflight passes for this request. */ + onPreflightPassed?: () => void | Promise; /** Agent id override (must exist in config). */ agentId?: string; to?: string; diff --git a/src/gateway/openresponses-http.test.ts b/src/gateway/openresponses-http.test.ts index 3f6cb43917d..a8f2b2067a9 100644 --- a/src/gateway/openresponses-http.test.ts +++ b/src/gateway/openresponses-http.test.ts @@ -58,6 +58,12 @@ async function postResponses(port: number, body: unknown, headers?: Record { + await ( + opts as { onPreflightPassed?: (() => void | Promise) | undefined } | undefined + )?.onPreflightPassed?.(); +} + function parseSseEvents(text: string): Array<{ event?: string; data: string }> { const events: Array<{ event?: string; data: string }> = []; const lines = text.split("\n"); @@ -501,13 +507,15 @@ describe("OpenResponses HTTP API (e2e)", () => { const port = enabledPort; try { agentCommand.mockClear(); - agentCommand.mockImplementationOnce((async (opts: unknown) => - buildAssistantDeltaResult({ + agentCommand.mockImplementationOnce((async (opts: unknown) => { + await resolveOpenResponsesStreamPreflight(opts); + return buildAssistantDeltaResult({ opts, emit: emitAgentEvent, deltas: ["he", "llo"], text: "hello", - })) as never); + }); + }) as never); const resDelta = await postResponses(port, { stream: true, @@ -541,9 +549,12 @@ describe("OpenResponses HTTP API (e2e)", () => { expect(deltas).toBe("hello"); agentCommand.mockClear(); - agentCommand.mockResolvedValueOnce({ - payloads: [{ text: "hello" }], - } as never); + agentCommand.mockImplementationOnce((async (opts: unknown) => { + await resolveOpenResponsesStreamPreflight(opts); + return { + payloads: [{ text: "hello" }], + }; + }) as never); const resFallback = await postResponses(port, { stream: true, @@ -556,9 +567,12 @@ describe("OpenResponses HTTP API (e2e)", () => { expect(fallbackText).toContain("hello"); agentCommand.mockClear(); - agentCommand.mockResolvedValueOnce({ - payloads: [{ text: "hello" }], - } as never); + agentCommand.mockImplementationOnce((async (opts: unknown) => { + await resolveOpenResponsesStreamPreflight(opts); + return { + payloads: [{ text: "hello" }], + }; + }) as never); const resTypeMatch = await postResponses(port, { stream: true, @@ -581,6 +595,36 @@ describe("OpenResponses HTTP API (e2e)", () => { } }); + it("rejects conflicting OpenResponses client tools before streaming starts", async () => { + const port = enabledPort; + agentCommand.mockClear(); + agentCommand.mockRejectedValueOnce(new Error("client tool name conflict: bash")); + + const res = await postResponses(port, { + stream: true, + model: "openclaw", + input: "hi", + tools: [{ type: "function", function: { name: "bash", description: "shell" } }], + }); + + await expectInvalidRequest(res, /invalid tool configuration/i); + expect(res.headers.get("content-type") ?? "").not.toContain("text/event-stream"); + }); + + it("returns invalid-request JSON for conflicting OpenResponses client tools", async () => { + const port = enabledPort; + agentCommand.mockClear(); + agentCommand.mockRejectedValueOnce(new Error("client tool name conflict: bash")); + + const res = await postResponses(port, { + model: "openclaw", + input: "hi", + tools: [{ type: "function", function: { name: "bash", description: "shell" } }], + }); + + await expectInvalidRequest(res, /invalid tool configuration/i); + }); + it("blocks unsafe URL-based file/image inputs", async () => { const port = enabledPort; agentCommand.mockClear(); diff --git a/src/gateway/openresponses-http.ts b/src/gateway/openresponses-http.ts index f2dd15f684d..7de65ab354b 100644 --- a/src/gateway/openresponses-http.ts +++ b/src/gateway/openresponses-http.ts @@ -236,6 +236,7 @@ async function runResponsesAgentCommand(params: { message: string; images: ImageContent[]; clientTools: ClientToolDefinition[]; + onPreflightPassed?: () => void | Promise; extraSystemPrompt: string; streamParams: { maxTokens: number } | undefined; sessionKey: string; @@ -248,6 +249,7 @@ async function runResponsesAgentCommand(params: { message: params.message, images: params.images.length > 0 ? params.images : undefined, clientTools: params.clientTools.length > 0 ? params.clientTools : undefined, + onPreflightPassed: params.onPreflightPassed, extraSystemPrompt: params.extraSystemPrompt || undefined, streamParams: params.streamParams ?? undefined, sessionKey: params.sessionKey, @@ -534,14 +536,9 @@ export async function handleOpenResponsesHttpRequest( } catch (err) { logWarn(`openresponses: non-stream response failed: ${String(err)}`); if (isClientToolNameConflictError(err)) { - const response = createResponseResource({ - id: responseId, - model, - status: "failed", - output: [], - error: { code: "invalid_request_error", message: "invalid tool configuration" }, + sendJson(res, 400, { + error: { message: "invalid tool configuration", type: "invalid_request_error" }, }); - sendJson(res, 400, response); return true; } const response = createResponseResource({ @@ -560,14 +557,47 @@ export async function handleOpenResponsesHttpRequest( // Streaming mode // ───────────────────────────────────────────────────────────────────────── - setSseHeaders(res); - let accumulatedText = ""; let sawAssistantDelta = false; let closed = false; + let sseStarted = false; let unsubscribe = () => {}; let finalUsage: Usage | undefined; let finalizeRequested: { status: ResponseResource["status"]; text: string } | null = null; + const initialResponse = createResponseResource({ + id: responseId, + model, + status: "in_progress", + output: [], + }); + const outputItem = createAssistantOutputItem({ + id: outputItemId, + text: "", + status: "in_progress", + }); + + const startStream = () => { + if (closed || sseStarted) { + return; + } + + sseStarted = true; + setSseHeaders(res); + writeSseEvent(res, { type: "response.created", response: initialResponse }); + writeSseEvent(res, { type: "response.in_progress", response: initialResponse }); + writeSseEvent(res, { + type: "response.output_item.added", + output_index: 0, + item: outputItem, + }); + writeSseEvent(res, { + type: "response.content_part.added", + item_id: outputItemId, + output_index: 0, + content_index: 0, + part: { type: "output_text", text: "" }, + }); + }; const maybeFinalize = () => { if (closed) { @@ -579,6 +609,8 @@ export async function handleOpenResponsesHttpRequest( if (!finalUsage) { return; } + + startStream(); const usage = finalUsage; closed = true; @@ -633,39 +665,6 @@ export async function handleOpenResponsesHttpRequest( maybeFinalize(); }; - // Send initial events - const initialResponse = createResponseResource({ - id: responseId, - model, - status: "in_progress", - output: [], - }); - - writeSseEvent(res, { type: "response.created", response: initialResponse }); - writeSseEvent(res, { type: "response.in_progress", response: initialResponse }); - - // Add output item - const outputItem = createAssistantOutputItem({ - id: outputItemId, - text: "", - status: "in_progress", - }); - - writeSseEvent(res, { - type: "response.output_item.added", - output_index: 0, - item: outputItem, - }); - - // Add content part - writeSseEvent(res, { - type: "response.content_part.added", - item_id: outputItemId, - output_index: 0, - content_index: 0, - part: { type: "output_text", text: "" }, - }); - unsubscribe = onAgentEvent((evt) => { if (evt.runId !== responseId) { return; @@ -682,6 +681,7 @@ export async function handleOpenResponsesHttpRequest( sawAssistantDelta = true; accumulatedText += content; + startStream(); writeSseEvent(res, { type: "response.output_text.delta", @@ -714,6 +714,7 @@ export async function handleOpenResponsesHttpRequest( message: prompt.message, images, clientTools: resolvedClientTools, + onPreflightPassed: startStream, extraSystemPrompt, streamParams, sessionKey, @@ -740,6 +741,7 @@ export async function handleOpenResponsesHttpRequest( if (stopReason === "tool_calls" && pendingToolCalls && pendingToolCalls.length > 0) { const functionCall = pendingToolCalls[0]; const usage = finalUsage ?? createEmptyUsage(); + startStream(); writeSseEvent(res, { type: "response.output_text.done", @@ -811,6 +813,7 @@ export async function handleOpenResponsesHttpRequest( accumulatedText = content; sawAssistantDelta = true; + startStream(); writeSseEvent(res, { type: "response.output_text.delta", @@ -826,7 +829,17 @@ export async function handleOpenResponsesHttpRequest( return; } + if (!sseStarted && isClientToolNameConflictError(err)) { + closed = true; + unsubscribe(); + sendJson(res, 400, { + error: { message: "invalid tool configuration", type: "invalid_request_error" }, + }); + return; + } + finalUsage = finalUsage ?? createEmptyUsage(); + startStream(); if (isClientToolNameConflictError(err)) { const errorResponse = createResponseResource({ id: responseId,