fix: pass clientTools to runEmbeddedAttempt in /v1/responses agent path (#52171)

Merged via squash.

Prepared head SHA: 74519e7da6
Co-authored-by: CharZhou <17255546+CharZhou@users.noreply.github.com>
Co-authored-by: frankekn <4488090+frankekn@users.noreply.github.com>
Reviewed-by: @frankekn
This commit is contained in:
CharZhou
2026-03-22 22:05:00 +08:00
committed by GitHub
parent 7066316db8
commit a07dcfde84
4 changed files with 563 additions and 83 deletions

View File

@@ -235,6 +235,7 @@ Docs: https://docs.openclaw.ai
- Android/canvas: recycle captured and scaled snapshot bitmaps so repeated canvas snapshots do not leak native image memory. (#41889) Thanks @Kaneki-x.
- Android/theme: switch status bar icon contrast with the active system theme so Android light mode no longer leaves unreadable light icons over the app header. (#51098) Thanks @goweii.
- Discord/ACP: forward worker abort signals into ACP turns so timed-out Discord jobs cancel the running turn instead of silently leaving the bound ACP session working in the background.
- Gateway/openresponses: preserve assistant commentary and session continuity across hosted-tool `/v1/responses` turns, and emit streamed tool-call payloads before finalization so client tool loops stay resumable. (#52171) Thanks @CharZhou.
### Breaking

View File

@@ -961,6 +961,7 @@ export async function runEmbeddedPiAgent(
skillsSnapshot: params.skillsSnapshot,
prompt,
images: params.images,
clientTools: params.clientTools,
disableTools: params.disableTools,
provider,
modelId,

View File

@@ -1,6 +1,6 @@
import fs from "node:fs/promises";
import path from "node:path";
import { afterAll, beforeAll, describe, expect, it } from "vitest";
import { afterAll, beforeAll, beforeEach, describe, expect, it } from "vitest";
import { HISTORY_CONTEXT_MARKER } from "../auto-reply/reply/history.js";
import { CURRENT_MESSAGE_MARKER } from "../auto-reply/reply/mentions.js";
import { emitAgentEvent } from "../infra/agent-events.js";
@@ -11,8 +11,24 @@ installGatewayTestHooks({ scope: "suite" });
let enabledServer: Awaited<ReturnType<typeof startServer>>;
let enabledPort: number;
let openResponsesTesting: {
resetResponseSessionState(): void;
storeResponseSessionAt(
responseId: string,
sessionKey: string,
now: number,
scope?: { agentId: string; user?: string; requestedSessionKey?: string },
): void;
lookupResponseSessionAt(
responseId: string | undefined,
now: number,
scope?: { agentId: string; user?: string; requestedSessionKey?: string },
): string | undefined;
getResponseSessionIds(): string[];
};
beforeAll(async () => {
({ __testing: openResponsesTesting } = await import("./openresponses-http.js"));
enabledPort = await getFreePort();
enabledServer = await startServer(enabledPort, { openResponsesEnabled: true });
});
@@ -21,6 +37,10 @@ afterAll(async () => {
await enabledServer.close({ reason: "openresponses enabled suite done" });
});
beforeEach(() => {
openResponsesTesting.resetResponseSessionState();
});
async function startServer(port: number, opts?: { openResponsesEnabled?: boolean }) {
const { startGatewayServer } = await import("./server.js");
const serverOpts = {
@@ -618,6 +638,252 @@ describe("OpenResponses HTTP API (e2e)", () => {
}
});
it("preserves assistant text alongside non-stream function_call output", async () => {
const port = enabledPort;
agentCommand.mockClear();
agentCommand.mockResolvedValueOnce({
payloads: [{ text: "Let me check that." }],
meta: {
stopReason: "tool_calls",
pendingToolCalls: [
{
id: "call_1",
name: "get_weather",
arguments: '{"city":"Taipei"}',
},
],
},
} as never);
const res = await postResponses(port, {
stream: false,
model: "openclaw",
input: "check the weather",
tools: WEATHER_TOOL,
});
expect(res.status).toBe(200);
const json = (await res.json()) as {
status?: string;
output?: Array<Record<string, unknown>>;
};
expect(json.status).toBe("incomplete");
expect(json.output?.map((item) => item.type)).toEqual(["message", "function_call"]);
expect(
((json.output?.[0]?.content as Array<Record<string, unknown>> | undefined)?.[0]?.text as
| string
| undefined) ?? "",
).toBe("Let me check that.");
expect(json.output?.[1]?.name).toBe("get_weather");
await ensureResponseConsumed(res);
});
it("falls back to payload text for streamed function_call responses", async () => {
const port = enabledPort;
agentCommand.mockClear();
agentCommand.mockResolvedValueOnce({
payloads: [{ text: "Let me check that." }],
meta: {
stopReason: "tool_calls",
pendingToolCalls: [
{
id: "call_1",
name: "get_weather",
arguments: '{"city":"Taipei"}',
},
],
},
} as never);
const res = await postResponses(port, {
stream: true,
model: "openclaw",
input: "check the weather",
tools: WEATHER_TOOL,
});
expect(res.status).toBe(200);
const text = await res.text();
const events = parseSseEvents(text);
const outputTextDone = events.find((event) => event.event === "response.output_text.done");
expect(outputTextDone).toBeTruthy();
expect((JSON.parse(outputTextDone?.data ?? "{}") as { text?: string }).text).toBe(
"Let me check that.",
);
const completed = events.find((event) => event.event === "response.completed");
expect(completed).toBeTruthy();
const response = (
JSON.parse(completed?.data ?? "{}") as {
response?: { status?: string; output?: Array<Record<string, unknown>> };
}
).response;
expect(response?.status).toBe("incomplete");
expect(response?.output?.map((item) => item.type)).toEqual(["message", "function_call"]);
expect(
(((response?.output?.[0]?.content as Array<Record<string, unknown>> | undefined) ?? [])[0]
?.text as string | undefined) ?? "",
).toBe("Let me check that.");
expect(response?.output?.[1]?.name).toBe("get_weather");
expect(events.some((event) => event.data === "[DONE]")).toBe(true);
});
it("reuses the prior session when previous_response_id is provided", async () => {
const port = enabledPort;
agentCommand.mockClear();
agentCommand.mockResolvedValueOnce({
payloads: [{ text: "Let me check that." }],
meta: {
stopReason: "tool_calls",
pendingToolCalls: [
{
id: "call_1",
name: "get_weather",
arguments: '{"city":"Taipei"}',
},
],
},
} as never);
const firstResponse = await postResponses(port, {
stream: false,
model: "openclaw",
input: "check the weather",
tools: WEATHER_TOOL,
});
expect(firstResponse.status).toBe(200);
const firstJson = (await firstResponse.json()) as { id?: string };
const firstOpts = (agentCommand.mock.calls[0] as unknown[] | undefined)?.[0] as
| { sessionKey?: string }
| undefined;
expect(firstJson.id).toMatch(/^resp_/);
expect(firstOpts?.sessionKey).toBeTruthy();
agentCommand.mockResolvedValueOnce({
payloads: [{ text: "It is sunny." }],
} as never);
const secondResponse = await postResponses(port, {
stream: false,
model: "openclaw",
previous_response_id: firstJson.id,
input: [{ type: "function_call_output", call_id: "call_1", output: "Sunny, 70F." }],
});
expect(secondResponse.status).toBe(200);
const secondOpts = (agentCommand.mock.calls[1] as unknown[] | undefined)?.[0] as
| { sessionKey?: string }
| undefined;
expect(secondOpts?.sessionKey).toBe(firstOpts?.sessionKey);
await ensureResponseConsumed(secondResponse);
});
it("does not reuse prior sessions across different user scopes", async () => {
const port = enabledPort;
agentCommand.mockClear();
agentCommand.mockResolvedValueOnce({
payloads: [{ text: "First turn." }],
} as never);
const firstResponse = await postResponses(port, {
stream: false,
model: "openclaw",
user: "alice",
input: "hello",
});
expect(firstResponse.status).toBe(200);
const firstJson = (await firstResponse.json()) as { id?: string };
const firstOpts = (agentCommand.mock.calls[0] as unknown[] | undefined)?.[0] as
| { sessionKey?: string }
| undefined;
expect(firstOpts?.sessionKey ?? "").toContain("openresponses-user:alice");
agentCommand.mockResolvedValueOnce({
payloads: [{ text: "Second turn." }],
} as never);
const secondResponse = await postResponses(port, {
stream: false,
model: "openclaw",
user: "bob",
previous_response_id: firstJson.id,
input: "hello again",
});
expect(secondResponse.status).toBe(200);
const secondOpts = (agentCommand.mock.calls[1] as unknown[] | undefined)?.[0] as
| { sessionKey?: string }
| undefined;
expect(secondOpts?.sessionKey).not.toBe(firstOpts?.sessionKey);
expect(secondOpts?.sessionKey ?? "").toContain("openresponses-user:bob");
await ensureResponseConsumed(secondResponse);
});
it("stores response session mappings when the response is emitted", async () => {
const port = enabledPort;
agentCommand.mockClear();
let release: ((value: { payloads: Array<{ text: string }> }) => void) | undefined;
agentCommand.mockImplementationOnce(
() =>
new Promise<{ payloads: Array<{ text: string }> }>((resolve) => {
release = resolve;
}) as never,
);
const responsePromise = postResponses(port, {
stream: false,
model: "openclaw",
input: "delayed hello",
});
for (let i = 0; i < 20 && agentCommand.mock.calls.length === 0; i += 1) {
await new Promise((resolve) => setTimeout(resolve, 10));
}
expect(agentCommand.mock.calls).toHaveLength(1);
expect(openResponsesTesting.getResponseSessionIds()).toEqual([]);
release?.({ payloads: [{ text: "hello" }] });
const res = await responsePromise;
expect(res.status).toBe(200);
const json = (await res.json()) as { id?: string };
expect(json.id).toMatch(/^resp_/);
expect(openResponsesTesting.getResponseSessionIds()).toEqual([json.id]);
await ensureResponseConsumed(res);
});
it("caps response session cache by evicting the oldest entries", () => {
for (let i = 0; i < 505; i += 1) {
openResponsesTesting.storeResponseSessionAt(`resp_${i}`, `session_${i}`, i);
}
expect(openResponsesTesting.getResponseSessionIds()).toHaveLength(500);
expect(openResponsesTesting.lookupResponseSessionAt("resp_0", 505)).toBeUndefined();
expect(openResponsesTesting.lookupResponseSessionAt("resp_4", 505)).toBeUndefined();
expect(openResponsesTesting.lookupResponseSessionAt("resp_5", 505)).toBe("session_5");
expect(openResponsesTesting.lookupResponseSessionAt("resp_504", 505)).toBe("session_504");
});
it("does not reuse cached sessions when the user scope changes", () => {
openResponsesTesting.storeResponseSessionAt("resp_1", "session_1", 100, {
agentId: "main",
user: "alice",
});
expect(
openResponsesTesting.lookupResponseSessionAt("resp_1", 101, {
agentId: "main",
user: "alice",
}),
).toBe("session_1");
expect(
openResponsesTesting.lookupResponseSessionAt("resp_1", 101, {
agentId: "main",
user: "bob",
}),
).toBeUndefined();
});
it("blocks unsafe URL-based file/image inputs", async () => {
const port = enabledPort;
agentCommand.mockClear();

View File

@@ -35,7 +35,7 @@ import type { AuthRateLimiter } from "./auth-rate-limit.js";
import type { ResolvedGatewayAuth } from "./auth.js";
import { sendJson, setSseHeaders, writeDone } from "./http-common.js";
import { handleGatewayPostJsonEndpoint } from "./http-endpoint-helpers.js";
import { resolveGatewayRequestContext } from "./http-utils.js";
import { getHeader, resolveGatewayRequestContext } from "./http-utils.js";
import { normalizeInputHostnameAllowlist } from "./input-allowlist.js";
import {
CreateResponseBodySchema,
@@ -59,6 +59,139 @@ type OpenResponsesHttpOptions = {
const DEFAULT_BODY_BYTES = 20 * 1024 * 1024;
const DEFAULT_MAX_URL_PARTS = 8;
// In-memory map from responseId -> sessionKey for previous_response_id continuity.
// Entries are evicted after 30 minutes to bound memory usage.
const RESPONSE_SESSION_TTL_MS = 30 * 60 * 1000;
const MAX_RESPONSE_SESSION_ENTRIES = 500;
type ResponseSessionScope = {
agentId: string;
user?: string;
requestedSessionKey?: string;
};
type ResponseSessionEntry = ResponseSessionScope & {
sessionKey: string;
ts: number;
};
const responseSessionMap = new Map<string, ResponseSessionEntry>();
function normalizeResponseSessionScope(scope: ResponseSessionScope): ResponseSessionScope {
const user = scope.user?.trim();
const requestedSessionKey = scope.requestedSessionKey?.trim();
return {
agentId: scope.agentId,
user: user || undefined,
requestedSessionKey: requestedSessionKey || undefined,
};
}
function createResponseSessionScope(params: {
req: IncomingMessage;
agentId: string;
user?: string;
}): ResponseSessionScope {
return normalizeResponseSessionScope({
agentId: params.agentId,
user: params.user,
requestedSessionKey: getHeader(params.req, "x-openclaw-session-key"),
});
}
function matchesResponseSessionScope(
entry: ResponseSessionEntry,
scope: ResponseSessionScope,
): boolean {
return (
entry.agentId === scope.agentId &&
entry.user === scope.user &&
entry.requestedSessionKey === scope.requestedSessionKey
);
}
function pruneExpiredResponseSessions(now: number) {
while (responseSessionMap.size > 0) {
const oldest = responseSessionMap.entries().next().value;
if (!oldest) {
return;
}
const [oldestKey, oldestValue] = oldest;
if (now - oldestValue.ts <= RESPONSE_SESSION_TTL_MS) {
return;
}
responseSessionMap.delete(oldestKey);
}
}
function evictOverflowResponseSessions() {
while (responseSessionMap.size > MAX_RESPONSE_SESSION_ENTRIES) {
const oldestKey = responseSessionMap.keys().next().value;
if (!oldestKey) {
return;
}
responseSessionMap.delete(oldestKey);
}
}
function storeResponseSession(
responseId: string,
sessionKey: string,
scope: ResponseSessionScope,
now = Date.now(),
) {
// Reinsert existing keys so the map stays ordered by freshest timestamp.
responseSessionMap.delete(responseId);
responseSessionMap.set(responseId, { ...scope, sessionKey, ts: now });
pruneExpiredResponseSessions(now);
evictOverflowResponseSessions();
}
function lookupResponseSession(
responseId: string | undefined,
scope: ResponseSessionScope,
now = Date.now(),
): string | undefined {
if (!responseId) {
return undefined;
}
const entry = responseSessionMap.get(responseId);
if (!entry) {
return undefined;
}
if (now - entry.ts > RESPONSE_SESSION_TTL_MS) {
responseSessionMap.delete(responseId);
return undefined;
}
if (!matchesResponseSessionScope(entry, scope)) {
return undefined;
}
return entry.sessionKey;
}
export const __testing = {
resetResponseSessionState() {
responseSessionMap.clear();
},
storeResponseSessionAt(
responseId: string,
sessionKey: string,
now: number,
scope: ResponseSessionScope = { agentId: "main" },
) {
storeResponseSession(responseId, sessionKey, normalizeResponseSessionScope(scope), now);
},
lookupResponseSessionAt(
responseId: string | undefined,
now: number,
scope: ResponseSessionScope = { agentId: "main" },
) {
return lookupResponseSession(responseId, normalizeResponseSessionScope(scope), now);
},
getResponseSessionIds() {
return [...responseSessionMap.keys()];
},
};
function writeSseEvent(res: ServerResponse, event: StreamingEvent) {
res.write(`event: ${event.type}\n`);
res.write(`data: ${JSON.stringify(event)}\n\n`);
@@ -232,6 +365,23 @@ function createAssistantOutputItem(params: {
};
}
function createFunctionCallOutputItem(params: {
id: string;
callId: string;
name: string;
arguments: string;
status?: "in_progress" | "completed";
}): OutputItem {
return {
type: "function_call",
id: params.id,
call_id: params.callId,
name: params.name,
arguments: params.arguments,
status: params.status,
};
}
async function runResponsesAgentCommand(params: {
message: string;
images: ImageContent[];
@@ -437,7 +587,7 @@ export async function handleOpenResponsesHttpRequest(
});
return true;
}
const { sessionKey, messageChannel } = resolveGatewayRequestContext({
const resolved = resolveGatewayRequestContext({
req,
model,
user,
@@ -445,6 +595,19 @@ export async function handleOpenResponsesHttpRequest(
defaultMessageChannel: "webchat",
useMessageChannelHeader: false,
});
const responseSessionScope = createResponseSessionScope({
req,
agentId: resolved.agentId,
user,
});
// Resolve session key: reuse previous_response_id only when it matches the
// same agent/user/requested-session scope as the current request.
const previousSessionKey = lookupResponseSession(
payload.previous_response_id,
responseSessionScope,
);
const sessionKey = previousSessionKey ?? resolved.sessionKey;
const messageChannel = resolved.messageChannel;
// Build prompt from input
const prompt = buildAgentPrompt(payload.input);
@@ -473,6 +636,8 @@ export async function handleOpenResponsesHttpRequest(
}
const responseId = `resp_${randomUUID()}`;
const rememberResponseSession = () =>
storeResponseSession(responseId, sessionKey, responseSessionScope);
const outputItemId = `msg_${randomUUID()}`;
const deps = createDefaultDeps();
const streamParams =
@@ -499,25 +664,46 @@ export async function handleOpenResponsesHttpRequest(
const meta = (result as { meta?: unknown } | null)?.meta;
const { stopReason, pendingToolCalls } = resolveStopReasonAndPendingToolCalls(meta);
// If agent called a client tool, return function_call instead of text
// If agent called a client tool, return function_call (and any assistant text) to caller
if (stopReason === "tool_calls" && pendingToolCalls && pendingToolCalls.length > 0) {
const functionCall = pendingToolCalls[0];
const functionCallItemId = `call_${randomUUID()}`;
const assistantText =
Array.isArray(payloads) && payloads.length > 0
? payloads
.map((p) => (typeof p.text === "string" ? p.text : ""))
.filter(Boolean)
.join("\n\n")
: "";
const output: OutputItem[] = [];
if (assistantText) {
output.push(
createAssistantOutputItem({
id: outputItemId,
text: assistantText,
status: "completed",
}),
);
}
output.push(
createFunctionCallOutputItem({
id: functionCallItemId,
callId: functionCall.id,
name: functionCall.name,
arguments: functionCall.arguments,
}),
);
const response = createResponseResource({
id: responseId,
model,
status: "incomplete",
output: [
{
type: "function_call",
id: functionCallItemId,
call_id: functionCall.id,
name: functionCall.name,
arguments: functionCall.arguments,
},
],
output,
usage,
});
rememberResponseSession();
sendJson(res, 200, response);
return true;
}
@@ -540,6 +726,7 @@ export async function handleOpenResponsesHttpRequest(
usage,
});
rememberResponseSession();
sendJson(res, 200, response);
} catch (err) {
logWarn(`openresponses: non-stream response failed: ${String(err)}`);
@@ -550,6 +737,7 @@ export async function handleOpenResponsesHttpRequest(
output: [],
error: { code: "api_error", message: "internal error" },
});
rememberResponseSession();
sendJson(res, 500, response);
}
return true;
@@ -619,6 +807,7 @@ export async function handleOpenResponsesHttpRequest(
usage,
});
rememberResponseSession();
writeSseEvent(res, { type: "response.completed", response: finalResponse });
writeDone(res);
res.end();
@@ -722,84 +911,106 @@ export async function handleOpenResponsesHttpRequest(
});
finalUsage = extractUsageFromResult(result);
// Check for pending client tool calls BEFORE maybeFinalize() because the
// lifecycle:end event may already have requested finalization.
const resultAny = result as { payloads?: Array<{ text?: string }>; meta?: unknown };
const meta = resultAny.meta;
const { stopReason, pendingToolCalls } = resolveStopReasonAndPendingToolCalls(meta);
if (
!closed &&
stopReason === "tool_calls" &&
pendingToolCalls &&
pendingToolCalls.length > 0
) {
const functionCall = pendingToolCalls[0];
const usage = finalUsage ?? createEmptyUsage();
const finalText =
accumulatedText ||
(Array.isArray(resultAny.payloads)
? resultAny.payloads
.map((p) => (typeof p.text === "string" ? p.text : ""))
.filter(Boolean)
.join("\n\n")
: "");
writeSseEvent(res, {
type: "response.output_text.done",
item_id: outputItemId,
output_index: 0,
content_index: 0,
text: finalText,
});
writeSseEvent(res, {
type: "response.content_part.done",
item_id: outputItemId,
output_index: 0,
content_index: 0,
part: { type: "output_text", text: finalText },
});
const completedItem = createAssistantOutputItem({
id: outputItemId,
text: finalText,
status: "completed",
});
writeSseEvent(res, {
type: "response.output_item.done",
output_index: 0,
item: completedItem,
});
const functionCallItemId = `call_${randomUUID()}`;
const functionCallItem = createFunctionCallOutputItem({
id: functionCallItemId,
callId: functionCall.id,
name: functionCall.name,
arguments: functionCall.arguments,
});
writeSseEvent(res, {
type: "response.output_item.added",
output_index: 1,
item: functionCallItem,
});
const completedFunctionCallItem = createFunctionCallOutputItem({
id: functionCallItemId,
callId: functionCall.id,
name: functionCall.name,
arguments: functionCall.arguments,
status: "completed",
});
writeSseEvent(res, {
type: "response.output_item.done",
output_index: 1,
item: completedFunctionCallItem,
});
const incompleteResponse = createResponseResource({
id: responseId,
model,
status: "incomplete",
output: [completedItem, functionCallItem],
usage,
});
closed = true;
unsubscribe();
rememberResponseSession();
writeSseEvent(res, { type: "response.completed", response: incompleteResponse });
writeDone(res);
res.end();
return;
}
maybeFinalize();
if (closed) {
return;
}
// Fallback: if no streaming deltas were received, send the full response
// Fallback: if no streaming deltas were received, send the full response as text
if (!sawAssistantDelta) {
const resultAny = result as { payloads?: Array<{ text?: string }>; meta?: unknown };
const payloads = resultAny.payloads;
const meta = resultAny.meta;
const { stopReason, pendingToolCalls } = resolveStopReasonAndPendingToolCalls(meta);
// If agent called a client tool, emit function_call instead of text
if (stopReason === "tool_calls" && pendingToolCalls && pendingToolCalls.length > 0) {
const functionCall = pendingToolCalls[0];
const usage = finalUsage ?? createEmptyUsage();
writeSseEvent(res, {
type: "response.output_text.done",
item_id: outputItemId,
output_index: 0,
content_index: 0,
text: "",
});
writeSseEvent(res, {
type: "response.content_part.done",
item_id: outputItemId,
output_index: 0,
content_index: 0,
part: { type: "output_text", text: "" },
});
const completedItem = createAssistantOutputItem({
id: outputItemId,
text: "",
status: "completed",
});
writeSseEvent(res, {
type: "response.output_item.done",
output_index: 0,
item: completedItem,
});
const functionCallItemId = `call_${randomUUID()}`;
const functionCallItem = {
type: "function_call" as const,
id: functionCallItemId,
call_id: functionCall.id,
name: functionCall.name,
arguments: functionCall.arguments,
};
writeSseEvent(res, {
type: "response.output_item.added",
output_index: 1,
item: functionCallItem,
});
writeSseEvent(res, {
type: "response.output_item.done",
output_index: 1,
item: { ...functionCallItem, status: "completed" as const },
});
const incompleteResponse = createResponseResource({
id: responseId,
model,
status: "incomplete",
output: [completedItem, functionCallItem],
usage,
});
closed = true;
unsubscribe();
writeSseEvent(res, { type: "response.completed", response: incompleteResponse });
writeDone(res);
res.end();
return;
}
const content =
Array.isArray(payloads) && payloads.length > 0
? payloads
@@ -835,6 +1046,7 @@ export async function handleOpenResponsesHttpRequest(
usage: finalUsage,
});
rememberResponseSession();
writeSseEvent(res, { type: "response.failed", response: errorResponse });
emitAgentEvent({
runId: responseId,