fix: preflight OpenResponses tool conflicts before SSE

Move client-tool conflict validation ahead of the OpenResponses stream/non-
stream split so invalid `clientTools` fail before any response body or SSE
event is emitted.

Thread the validated tool set through the gateway and embedded runner entry
points so `/v1/responses` and embedded agent runs share the same preflight
behavior.

Add HTTP and agent regression coverage for duplicate names, alias collisions,
and the no-partial-SSE contract.
This commit is contained in:
Davanum Srinivas
2026-03-14 23:04:29 -04:00
committed by Vincent Koc
parent 7a526ab89a
commit c7b8414fe6
8 changed files with 129 additions and 51 deletions

View File

@@ -888,6 +888,7 @@ export async function runEmbeddedPiAgent(
prompt,
images: params.images,
disableTools: params.disableTools,
onPreflightPassed: params.onPreflightPassed,
provider,
modelId,
model: applyLocalNoAuthHeaderOverride(effectiveModel, apiKeyInfo),

View File

@@ -1568,6 +1568,7 @@ export async function runEmbeddedAttempt(
if (clientToolNameConflicts.length > 0) {
throw createClientToolNameConflictError(clientToolNameConflicts);
}
await params.onPreflightPassed?.();
const allowedToolNames = collectAllowedToolNames({
tools,
clientTools,

View File

@@ -72,6 +72,8 @@ export type RunEmbeddedPiAgentParams = {
images?: ImageContent[];
/** Optional client-provided tools (OpenResponses hosted tools). */
clientTools?: ClientToolDefinition[];
/** Invoked after deterministic tool preflight checks succeed for an attempt. */
onPreflightPassed?: () => void | Promise<void>;
/** Disable built-in tools for this run (LLM-only mode). */
disableTools?: boolean;
provider?: string;

View File

@@ -415,6 +415,20 @@ describe("agentCommand", () => {
});
});
it("passes ingress preflight callbacks through to embedded runs", async () => {
await withTempHome(async (home) => {
const store = path.join(home, "sessions.json");
mockConfig(home, store);
const onPreflightPassed = vi.fn();
await agentCommandFromIngress(
{ message: "hi", to: "+1555", senderIsOwner: false, onPreflightPassed },
runtime,
);
const ingressCall = vi.mocked(runEmbeddedPiAgent).mock.calls.at(-1)?.[0];
expect(ingressCall?.onPreflightPassed).toBe(onPreflightPassed);
});
});
it("resumes when session-id is provided", async () => {
await withTempHome(async (home) => {
const store = path.join(home, "sessions.json");

View File

@@ -480,6 +480,7 @@ function runAgentAttempt(params: {
prompt: effectivePrompt,
images: params.isFallbackRetry ? undefined : params.opts.images,
clientTools: params.opts.clientTools,
onPreflightPassed: params.opts.onPreflightPassed,
provider: params.providerOverride,
model: params.modelOverride,
authProfileId,

View File

@@ -37,6 +37,8 @@ export type AgentCommandOpts = {
images?: ImageContent[];
/** Optional client-provided tools (OpenResponses hosted tools). */
clientTools?: ClientToolDefinition[];
/** Invoked after embedded-runner tool preflight passes for this request. */
onPreflightPassed?: () => void | Promise<void>;
/** Agent id override (must exist in config). */
agentId?: string;
to?: string;

View File

@@ -58,6 +58,12 @@ async function postResponses(port: number, body: unknown, headers?: Record<strin
return res;
}
async function resolveOpenResponsesStreamPreflight(opts: unknown): Promise<void> {
await (
opts as { onPreflightPassed?: (() => void | Promise<void>) | undefined } | undefined
)?.onPreflightPassed?.();
}
function parseSseEvents(text: string): Array<{ event?: string; data: string }> {
const events: Array<{ event?: string; data: string }> = [];
const lines = text.split("\n");
@@ -501,13 +507,15 @@ describe("OpenResponses HTTP API (e2e)", () => {
const port = enabledPort;
try {
agentCommand.mockClear();
agentCommand.mockImplementationOnce((async (opts: unknown) =>
buildAssistantDeltaResult({
agentCommand.mockImplementationOnce((async (opts: unknown) => {
await resolveOpenResponsesStreamPreflight(opts);
return buildAssistantDeltaResult({
opts,
emit: emitAgentEvent,
deltas: ["he", "llo"],
text: "hello",
})) as never);
});
}) as never);
const resDelta = await postResponses(port, {
stream: true,
@@ -541,9 +549,12 @@ describe("OpenResponses HTTP API (e2e)", () => {
expect(deltas).toBe("hello");
agentCommand.mockClear();
agentCommand.mockResolvedValueOnce({
payloads: [{ text: "hello" }],
} as never);
agentCommand.mockImplementationOnce((async (opts: unknown) => {
await resolveOpenResponsesStreamPreflight(opts);
return {
payloads: [{ text: "hello" }],
};
}) as never);
const resFallback = await postResponses(port, {
stream: true,
@@ -556,9 +567,12 @@ describe("OpenResponses HTTP API (e2e)", () => {
expect(fallbackText).toContain("hello");
agentCommand.mockClear();
agentCommand.mockResolvedValueOnce({
payloads: [{ text: "hello" }],
} as never);
agentCommand.mockImplementationOnce((async (opts: unknown) => {
await resolveOpenResponsesStreamPreflight(opts);
return {
payloads: [{ text: "hello" }],
};
}) as never);
const resTypeMatch = await postResponses(port, {
stream: true,
@@ -581,6 +595,36 @@ describe("OpenResponses HTTP API (e2e)", () => {
}
});
it("rejects conflicting OpenResponses client tools before streaming starts", async () => {
const port = enabledPort;
agentCommand.mockClear();
agentCommand.mockRejectedValueOnce(new Error("client tool name conflict: bash"));
const res = await postResponses(port, {
stream: true,
model: "openclaw",
input: "hi",
tools: [{ type: "function", function: { name: "bash", description: "shell" } }],
});
await expectInvalidRequest(res, /invalid tool configuration/i);
expect(res.headers.get("content-type") ?? "").not.toContain("text/event-stream");
});
it("returns invalid-request JSON for conflicting OpenResponses client tools", async () => {
const port = enabledPort;
agentCommand.mockClear();
agentCommand.mockRejectedValueOnce(new Error("client tool name conflict: bash"));
const res = await postResponses(port, {
model: "openclaw",
input: "hi",
tools: [{ type: "function", function: { name: "bash", description: "shell" } }],
});
await expectInvalidRequest(res, /invalid tool configuration/i);
});
it("blocks unsafe URL-based file/image inputs", async () => {
const port = enabledPort;
agentCommand.mockClear();

View File

@@ -236,6 +236,7 @@ async function runResponsesAgentCommand(params: {
message: string;
images: ImageContent[];
clientTools: ClientToolDefinition[];
onPreflightPassed?: () => void | Promise<void>;
extraSystemPrompt: string;
streamParams: { maxTokens: number } | undefined;
sessionKey: string;
@@ -248,6 +249,7 @@ async function runResponsesAgentCommand(params: {
message: params.message,
images: params.images.length > 0 ? params.images : undefined,
clientTools: params.clientTools.length > 0 ? params.clientTools : undefined,
onPreflightPassed: params.onPreflightPassed,
extraSystemPrompt: params.extraSystemPrompt || undefined,
streamParams: params.streamParams ?? undefined,
sessionKey: params.sessionKey,
@@ -534,14 +536,9 @@ export async function handleOpenResponsesHttpRequest(
} catch (err) {
logWarn(`openresponses: non-stream response failed: ${String(err)}`);
if (isClientToolNameConflictError(err)) {
const response = createResponseResource({
id: responseId,
model,
status: "failed",
output: [],
error: { code: "invalid_request_error", message: "invalid tool configuration" },
sendJson(res, 400, {
error: { message: "invalid tool configuration", type: "invalid_request_error" },
});
sendJson(res, 400, response);
return true;
}
const response = createResponseResource({
@@ -560,14 +557,47 @@ export async function handleOpenResponsesHttpRequest(
// Streaming mode
// ─────────────────────────────────────────────────────────────────────────
setSseHeaders(res);
let accumulatedText = "";
let sawAssistantDelta = false;
let closed = false;
let sseStarted = false;
let unsubscribe = () => {};
let finalUsage: Usage | undefined;
let finalizeRequested: { status: ResponseResource["status"]; text: string } | null = null;
const initialResponse = createResponseResource({
id: responseId,
model,
status: "in_progress",
output: [],
});
const outputItem = createAssistantOutputItem({
id: outputItemId,
text: "",
status: "in_progress",
});
const startStream = () => {
if (closed || sseStarted) {
return;
}
sseStarted = true;
setSseHeaders(res);
writeSseEvent(res, { type: "response.created", response: initialResponse });
writeSseEvent(res, { type: "response.in_progress", response: initialResponse });
writeSseEvent(res, {
type: "response.output_item.added",
output_index: 0,
item: outputItem,
});
writeSseEvent(res, {
type: "response.content_part.added",
item_id: outputItemId,
output_index: 0,
content_index: 0,
part: { type: "output_text", text: "" },
});
};
const maybeFinalize = () => {
if (closed) {
@@ -579,6 +609,8 @@ export async function handleOpenResponsesHttpRequest(
if (!finalUsage) {
return;
}
startStream();
const usage = finalUsage;
closed = true;
@@ -633,39 +665,6 @@ export async function handleOpenResponsesHttpRequest(
maybeFinalize();
};
// Send initial events
const initialResponse = createResponseResource({
id: responseId,
model,
status: "in_progress",
output: [],
});
writeSseEvent(res, { type: "response.created", response: initialResponse });
writeSseEvent(res, { type: "response.in_progress", response: initialResponse });
// Add output item
const outputItem = createAssistantOutputItem({
id: outputItemId,
text: "",
status: "in_progress",
});
writeSseEvent(res, {
type: "response.output_item.added",
output_index: 0,
item: outputItem,
});
// Add content part
writeSseEvent(res, {
type: "response.content_part.added",
item_id: outputItemId,
output_index: 0,
content_index: 0,
part: { type: "output_text", text: "" },
});
unsubscribe = onAgentEvent((evt) => {
if (evt.runId !== responseId) {
return;
@@ -682,6 +681,7 @@ export async function handleOpenResponsesHttpRequest(
sawAssistantDelta = true;
accumulatedText += content;
startStream();
writeSseEvent(res, {
type: "response.output_text.delta",
@@ -714,6 +714,7 @@ export async function handleOpenResponsesHttpRequest(
message: prompt.message,
images,
clientTools: resolvedClientTools,
onPreflightPassed: startStream,
extraSystemPrompt,
streamParams,
sessionKey,
@@ -740,6 +741,7 @@ export async function handleOpenResponsesHttpRequest(
if (stopReason === "tool_calls" && pendingToolCalls && pendingToolCalls.length > 0) {
const functionCall = pendingToolCalls[0];
const usage = finalUsage ?? createEmptyUsage();
startStream();
writeSseEvent(res, {
type: "response.output_text.done",
@@ -811,6 +813,7 @@ export async function handleOpenResponsesHttpRequest(
accumulatedText = content;
sawAssistantDelta = true;
startStream();
writeSseEvent(res, {
type: "response.output_text.delta",
@@ -826,7 +829,17 @@ export async function handleOpenResponsesHttpRequest(
return;
}
if (!sseStarted && isClientToolNameConflictError(err)) {
closed = true;
unsubscribe();
sendJson(res, 400, {
error: { message: "invalid tool configuration", type: "invalid_request_error" },
});
return;
}
finalUsage = finalUsage ?? createEmptyUsage();
startStream();
if (isClientToolNameConflictError(err)) {
const errorResponse = createResponseResource({
id: responseId,