From a07dcfde84b6b13e47e7f2aca378280d4ed05313 Mon Sep 17 00:00:00 2001 From: CharZhou <17255546+CharZhou@users.noreply.github.com> Date: Sun, 22 Mar 2026 22:05:00 +0800 Subject: [PATCH] fix: pass clientTools to runEmbeddedAttempt in /v1/responses agent path (#52171) Merged via squash. Prepared head SHA: 74519e7da6d275da5625c0ada850866240bf912a Co-authored-by: CharZhou <17255546+CharZhou@users.noreply.github.com> Co-authored-by: frankekn <4488090+frankekn@users.noreply.github.com> Reviewed-by: @frankekn --- CHANGELOG.md | 1 + src/agents/pi-embedded-runner/run.ts | 1 + src/gateway/openresponses-http.test.ts | 268 +++++++++++++++++- src/gateway/openresponses-http.ts | 376 +++++++++++++++++++------ 4 files changed, 563 insertions(+), 83 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e9f3a80e7d1..0bf5fe8d8b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -235,6 +235,7 @@ Docs: https://docs.openclaw.ai - Android/canvas: recycle captured and scaled snapshot bitmaps so repeated canvas snapshots do not leak native image memory. (#41889) Thanks @Kaneki-x. - Android/theme: switch status bar icon contrast with the active system theme so Android light mode no longer leaves unreadable light icons over the app header. (#51098) Thanks @goweii. - Discord/ACP: forward worker abort signals into ACP turns so timed-out Discord jobs cancel the running turn instead of silently leaving the bound ACP session working in the background. +- Gateway/openresponses: preserve assistant commentary and session continuity across hosted-tool `/v1/responses` turns, and emit streamed tool-call payloads before finalization so client tool loops stay resumable. (#52171) Thanks @CharZhou. ### Breaking diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 0c66203992f..044869f00cd 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -961,6 +961,7 @@ export async function runEmbeddedPiAgent( skillsSnapshot: params.skillsSnapshot, prompt, images: params.images, + clientTools: params.clientTools, disableTools: params.disableTools, provider, modelId, diff --git a/src/gateway/openresponses-http.test.ts b/src/gateway/openresponses-http.test.ts index 3a9a5517537..b7549339dc7 100644 --- a/src/gateway/openresponses-http.test.ts +++ b/src/gateway/openresponses-http.test.ts @@ -1,6 +1,6 @@ import fs from "node:fs/promises"; import path from "node:path"; -import { afterAll, beforeAll, describe, expect, it } from "vitest"; +import { afterAll, beforeAll, beforeEach, describe, expect, it } from "vitest"; import { HISTORY_CONTEXT_MARKER } from "../auto-reply/reply/history.js"; import { CURRENT_MESSAGE_MARKER } from "../auto-reply/reply/mentions.js"; import { emitAgentEvent } from "../infra/agent-events.js"; @@ -11,8 +11,24 @@ installGatewayTestHooks({ scope: "suite" }); let enabledServer: Awaited>; let enabledPort: number; +let openResponsesTesting: { + resetResponseSessionState(): void; + storeResponseSessionAt( + responseId: string, + sessionKey: string, + now: number, + scope?: { agentId: string; user?: string; requestedSessionKey?: string }, + ): void; + lookupResponseSessionAt( + responseId: string | undefined, + now: number, + scope?: { agentId: string; user?: string; requestedSessionKey?: string }, + ): string | undefined; + getResponseSessionIds(): string[]; +}; beforeAll(async () => { + ({ __testing: openResponsesTesting } = await import("./openresponses-http.js")); enabledPort = await getFreePort(); enabledServer = await startServer(enabledPort, { openResponsesEnabled: true }); }); @@ -21,6 +37,10 @@ afterAll(async () => { await enabledServer.close({ reason: "openresponses enabled suite done" }); }); +beforeEach(() => { + openResponsesTesting.resetResponseSessionState(); +}); + async function startServer(port: number, opts?: { openResponsesEnabled?: boolean }) { const { startGatewayServer } = await import("./server.js"); const serverOpts = { @@ -618,6 +638,252 @@ describe("OpenResponses HTTP API (e2e)", () => { } }); + it("preserves assistant text alongside non-stream function_call output", async () => { + const port = enabledPort; + agentCommand.mockClear(); + agentCommand.mockResolvedValueOnce({ + payloads: [{ text: "Let me check that." }], + meta: { + stopReason: "tool_calls", + pendingToolCalls: [ + { + id: "call_1", + name: "get_weather", + arguments: '{"city":"Taipei"}', + }, + ], + }, + } as never); + + const res = await postResponses(port, { + stream: false, + model: "openclaw", + input: "check the weather", + tools: WEATHER_TOOL, + }); + + expect(res.status).toBe(200); + const json = (await res.json()) as { + status?: string; + output?: Array>; + }; + expect(json.status).toBe("incomplete"); + expect(json.output?.map((item) => item.type)).toEqual(["message", "function_call"]); + expect( + ((json.output?.[0]?.content as Array> | undefined)?.[0]?.text as + | string + | undefined) ?? "", + ).toBe("Let me check that."); + expect(json.output?.[1]?.name).toBe("get_weather"); + await ensureResponseConsumed(res); + }); + + it("falls back to payload text for streamed function_call responses", async () => { + const port = enabledPort; + agentCommand.mockClear(); + agentCommand.mockResolvedValueOnce({ + payloads: [{ text: "Let me check that." }], + meta: { + stopReason: "tool_calls", + pendingToolCalls: [ + { + id: "call_1", + name: "get_weather", + arguments: '{"city":"Taipei"}', + }, + ], + }, + } as never); + + const res = await postResponses(port, { + stream: true, + model: "openclaw", + input: "check the weather", + tools: WEATHER_TOOL, + }); + + expect(res.status).toBe(200); + const text = await res.text(); + const events = parseSseEvents(text); + const outputTextDone = events.find((event) => event.event === "response.output_text.done"); + expect(outputTextDone).toBeTruthy(); + expect((JSON.parse(outputTextDone?.data ?? "{}") as { text?: string }).text).toBe( + "Let me check that.", + ); + + const completed = events.find((event) => event.event === "response.completed"); + expect(completed).toBeTruthy(); + const response = ( + JSON.parse(completed?.data ?? "{}") as { + response?: { status?: string; output?: Array> }; + } + ).response; + expect(response?.status).toBe("incomplete"); + expect(response?.output?.map((item) => item.type)).toEqual(["message", "function_call"]); + expect( + (((response?.output?.[0]?.content as Array> | undefined) ?? [])[0] + ?.text as string | undefined) ?? "", + ).toBe("Let me check that."); + expect(response?.output?.[1]?.name).toBe("get_weather"); + expect(events.some((event) => event.data === "[DONE]")).toBe(true); + }); + + it("reuses the prior session when previous_response_id is provided", async () => { + const port = enabledPort; + agentCommand.mockClear(); + agentCommand.mockResolvedValueOnce({ + payloads: [{ text: "Let me check that." }], + meta: { + stopReason: "tool_calls", + pendingToolCalls: [ + { + id: "call_1", + name: "get_weather", + arguments: '{"city":"Taipei"}', + }, + ], + }, + } as never); + + const firstResponse = await postResponses(port, { + stream: false, + model: "openclaw", + input: "check the weather", + tools: WEATHER_TOOL, + }); + expect(firstResponse.status).toBe(200); + const firstJson = (await firstResponse.json()) as { id?: string }; + const firstOpts = (agentCommand.mock.calls[0] as unknown[] | undefined)?.[0] as + | { sessionKey?: string } + | undefined; + expect(firstJson.id).toMatch(/^resp_/); + expect(firstOpts?.sessionKey).toBeTruthy(); + + agentCommand.mockResolvedValueOnce({ + payloads: [{ text: "It is sunny." }], + } as never); + + const secondResponse = await postResponses(port, { + stream: false, + model: "openclaw", + previous_response_id: firstJson.id, + input: [{ type: "function_call_output", call_id: "call_1", output: "Sunny, 70F." }], + }); + expect(secondResponse.status).toBe(200); + const secondOpts = (agentCommand.mock.calls[1] as unknown[] | undefined)?.[0] as + | { sessionKey?: string } + | undefined; + expect(secondOpts?.sessionKey).toBe(firstOpts?.sessionKey); + await ensureResponseConsumed(secondResponse); + }); + + it("does not reuse prior sessions across different user scopes", async () => { + const port = enabledPort; + agentCommand.mockClear(); + agentCommand.mockResolvedValueOnce({ + payloads: [{ text: "First turn." }], + } as never); + + const firstResponse = await postResponses(port, { + stream: false, + model: "openclaw", + user: "alice", + input: "hello", + }); + expect(firstResponse.status).toBe(200); + const firstJson = (await firstResponse.json()) as { id?: string }; + const firstOpts = (agentCommand.mock.calls[0] as unknown[] | undefined)?.[0] as + | { sessionKey?: string } + | undefined; + expect(firstOpts?.sessionKey ?? "").toContain("openresponses-user:alice"); + + agentCommand.mockResolvedValueOnce({ + payloads: [{ text: "Second turn." }], + } as never); + + const secondResponse = await postResponses(port, { + stream: false, + model: "openclaw", + user: "bob", + previous_response_id: firstJson.id, + input: "hello again", + }); + expect(secondResponse.status).toBe(200); + const secondOpts = (agentCommand.mock.calls[1] as unknown[] | undefined)?.[0] as + | { sessionKey?: string } + | undefined; + expect(secondOpts?.sessionKey).not.toBe(firstOpts?.sessionKey); + expect(secondOpts?.sessionKey ?? "").toContain("openresponses-user:bob"); + await ensureResponseConsumed(secondResponse); + }); + + it("stores response session mappings when the response is emitted", async () => { + const port = enabledPort; + agentCommand.mockClear(); + + let release: ((value: { payloads: Array<{ text: string }> }) => void) | undefined; + agentCommand.mockImplementationOnce( + () => + new Promise<{ payloads: Array<{ text: string }> }>((resolve) => { + release = resolve; + }) as never, + ); + + const responsePromise = postResponses(port, { + stream: false, + model: "openclaw", + input: "delayed hello", + }); + + for (let i = 0; i < 20 && agentCommand.mock.calls.length === 0; i += 1) { + await new Promise((resolve) => setTimeout(resolve, 10)); + } + + expect(agentCommand.mock.calls).toHaveLength(1); + expect(openResponsesTesting.getResponseSessionIds()).toEqual([]); + + release?.({ payloads: [{ text: "hello" }] }); + + const res = await responsePromise; + expect(res.status).toBe(200); + const json = (await res.json()) as { id?: string }; + expect(json.id).toMatch(/^resp_/); + expect(openResponsesTesting.getResponseSessionIds()).toEqual([json.id]); + await ensureResponseConsumed(res); + }); + + it("caps response session cache by evicting the oldest entries", () => { + for (let i = 0; i < 505; i += 1) { + openResponsesTesting.storeResponseSessionAt(`resp_${i}`, `session_${i}`, i); + } + + expect(openResponsesTesting.getResponseSessionIds()).toHaveLength(500); + expect(openResponsesTesting.lookupResponseSessionAt("resp_0", 505)).toBeUndefined(); + expect(openResponsesTesting.lookupResponseSessionAt("resp_4", 505)).toBeUndefined(); + expect(openResponsesTesting.lookupResponseSessionAt("resp_5", 505)).toBe("session_5"); + expect(openResponsesTesting.lookupResponseSessionAt("resp_504", 505)).toBe("session_504"); + }); + + it("does not reuse cached sessions when the user scope changes", () => { + openResponsesTesting.storeResponseSessionAt("resp_1", "session_1", 100, { + agentId: "main", + user: "alice", + }); + + expect( + openResponsesTesting.lookupResponseSessionAt("resp_1", 101, { + agentId: "main", + user: "alice", + }), + ).toBe("session_1"); + expect( + openResponsesTesting.lookupResponseSessionAt("resp_1", 101, { + agentId: "main", + user: "bob", + }), + ).toBeUndefined(); + }); + it("blocks unsafe URL-based file/image inputs", async () => { const port = enabledPort; agentCommand.mockClear(); diff --git a/src/gateway/openresponses-http.ts b/src/gateway/openresponses-http.ts index 1c440b1571c..c23388c3d68 100644 --- a/src/gateway/openresponses-http.ts +++ b/src/gateway/openresponses-http.ts @@ -35,7 +35,7 @@ import type { AuthRateLimiter } from "./auth-rate-limit.js"; import type { ResolvedGatewayAuth } from "./auth.js"; import { sendJson, setSseHeaders, writeDone } from "./http-common.js"; import { handleGatewayPostJsonEndpoint } from "./http-endpoint-helpers.js"; -import { resolveGatewayRequestContext } from "./http-utils.js"; +import { getHeader, resolveGatewayRequestContext } from "./http-utils.js"; import { normalizeInputHostnameAllowlist } from "./input-allowlist.js"; import { CreateResponseBodySchema, @@ -59,6 +59,139 @@ type OpenResponsesHttpOptions = { const DEFAULT_BODY_BYTES = 20 * 1024 * 1024; const DEFAULT_MAX_URL_PARTS = 8; +// In-memory map from responseId -> sessionKey for previous_response_id continuity. +// Entries are evicted after 30 minutes to bound memory usage. +const RESPONSE_SESSION_TTL_MS = 30 * 60 * 1000; +const MAX_RESPONSE_SESSION_ENTRIES = 500; +type ResponseSessionScope = { + agentId: string; + user?: string; + requestedSessionKey?: string; +}; + +type ResponseSessionEntry = ResponseSessionScope & { + sessionKey: string; + ts: number; +}; + +const responseSessionMap = new Map(); + +function normalizeResponseSessionScope(scope: ResponseSessionScope): ResponseSessionScope { + const user = scope.user?.trim(); + const requestedSessionKey = scope.requestedSessionKey?.trim(); + return { + agentId: scope.agentId, + user: user || undefined, + requestedSessionKey: requestedSessionKey || undefined, + }; +} + +function createResponseSessionScope(params: { + req: IncomingMessage; + agentId: string; + user?: string; +}): ResponseSessionScope { + return normalizeResponseSessionScope({ + agentId: params.agentId, + user: params.user, + requestedSessionKey: getHeader(params.req, "x-openclaw-session-key"), + }); +} + +function matchesResponseSessionScope( + entry: ResponseSessionEntry, + scope: ResponseSessionScope, +): boolean { + return ( + entry.agentId === scope.agentId && + entry.user === scope.user && + entry.requestedSessionKey === scope.requestedSessionKey + ); +} + +function pruneExpiredResponseSessions(now: number) { + while (responseSessionMap.size > 0) { + const oldest = responseSessionMap.entries().next().value; + if (!oldest) { + return; + } + const [oldestKey, oldestValue] = oldest; + if (now - oldestValue.ts <= RESPONSE_SESSION_TTL_MS) { + return; + } + responseSessionMap.delete(oldestKey); + } +} + +function evictOverflowResponseSessions() { + while (responseSessionMap.size > MAX_RESPONSE_SESSION_ENTRIES) { + const oldestKey = responseSessionMap.keys().next().value; + if (!oldestKey) { + return; + } + responseSessionMap.delete(oldestKey); + } +} + +function storeResponseSession( + responseId: string, + sessionKey: string, + scope: ResponseSessionScope, + now = Date.now(), +) { + // Reinsert existing keys so the map stays ordered by freshest timestamp. + responseSessionMap.delete(responseId); + responseSessionMap.set(responseId, { ...scope, sessionKey, ts: now }); + pruneExpiredResponseSessions(now); + evictOverflowResponseSessions(); +} + +function lookupResponseSession( + responseId: string | undefined, + scope: ResponseSessionScope, + now = Date.now(), +): string | undefined { + if (!responseId) { + return undefined; + } + const entry = responseSessionMap.get(responseId); + if (!entry) { + return undefined; + } + if (now - entry.ts > RESPONSE_SESSION_TTL_MS) { + responseSessionMap.delete(responseId); + return undefined; + } + if (!matchesResponseSessionScope(entry, scope)) { + return undefined; + } + return entry.sessionKey; +} + +export const __testing = { + resetResponseSessionState() { + responseSessionMap.clear(); + }, + storeResponseSessionAt( + responseId: string, + sessionKey: string, + now: number, + scope: ResponseSessionScope = { agentId: "main" }, + ) { + storeResponseSession(responseId, sessionKey, normalizeResponseSessionScope(scope), now); + }, + lookupResponseSessionAt( + responseId: string | undefined, + now: number, + scope: ResponseSessionScope = { agentId: "main" }, + ) { + return lookupResponseSession(responseId, normalizeResponseSessionScope(scope), now); + }, + getResponseSessionIds() { + return [...responseSessionMap.keys()]; + }, +}; + function writeSseEvent(res: ServerResponse, event: StreamingEvent) { res.write(`event: ${event.type}\n`); res.write(`data: ${JSON.stringify(event)}\n\n`); @@ -232,6 +365,23 @@ function createAssistantOutputItem(params: { }; } +function createFunctionCallOutputItem(params: { + id: string; + callId: string; + name: string; + arguments: string; + status?: "in_progress" | "completed"; +}): OutputItem { + return { + type: "function_call", + id: params.id, + call_id: params.callId, + name: params.name, + arguments: params.arguments, + status: params.status, + }; +} + async function runResponsesAgentCommand(params: { message: string; images: ImageContent[]; @@ -437,7 +587,7 @@ export async function handleOpenResponsesHttpRequest( }); return true; } - const { sessionKey, messageChannel } = resolveGatewayRequestContext({ + const resolved = resolveGatewayRequestContext({ req, model, user, @@ -445,6 +595,19 @@ export async function handleOpenResponsesHttpRequest( defaultMessageChannel: "webchat", useMessageChannelHeader: false, }); + const responseSessionScope = createResponseSessionScope({ + req, + agentId: resolved.agentId, + user, + }); + // Resolve session key: reuse previous_response_id only when it matches the + // same agent/user/requested-session scope as the current request. + const previousSessionKey = lookupResponseSession( + payload.previous_response_id, + responseSessionScope, + ); + const sessionKey = previousSessionKey ?? resolved.sessionKey; + const messageChannel = resolved.messageChannel; // Build prompt from input const prompt = buildAgentPrompt(payload.input); @@ -473,6 +636,8 @@ export async function handleOpenResponsesHttpRequest( } const responseId = `resp_${randomUUID()}`; + const rememberResponseSession = () => + storeResponseSession(responseId, sessionKey, responseSessionScope); const outputItemId = `msg_${randomUUID()}`; const deps = createDefaultDeps(); const streamParams = @@ -499,25 +664,46 @@ export async function handleOpenResponsesHttpRequest( const meta = (result as { meta?: unknown } | null)?.meta; const { stopReason, pendingToolCalls } = resolveStopReasonAndPendingToolCalls(meta); - // If agent called a client tool, return function_call instead of text + // If agent called a client tool, return function_call (and any assistant text) to caller if (stopReason === "tool_calls" && pendingToolCalls && pendingToolCalls.length > 0) { const functionCall = pendingToolCalls[0]; const functionCallItemId = `call_${randomUUID()}`; + + const assistantText = + Array.isArray(payloads) && payloads.length > 0 + ? payloads + .map((p) => (typeof p.text === "string" ? p.text : "")) + .filter(Boolean) + .join("\n\n") + : ""; + + const output: OutputItem[] = []; + if (assistantText) { + output.push( + createAssistantOutputItem({ + id: outputItemId, + text: assistantText, + status: "completed", + }), + ); + } + output.push( + createFunctionCallOutputItem({ + id: functionCallItemId, + callId: functionCall.id, + name: functionCall.name, + arguments: functionCall.arguments, + }), + ); + const response = createResponseResource({ id: responseId, model, status: "incomplete", - output: [ - { - type: "function_call", - id: functionCallItemId, - call_id: functionCall.id, - name: functionCall.name, - arguments: functionCall.arguments, - }, - ], + output, usage, }); + rememberResponseSession(); sendJson(res, 200, response); return true; } @@ -540,6 +726,7 @@ export async function handleOpenResponsesHttpRequest( usage, }); + rememberResponseSession(); sendJson(res, 200, response); } catch (err) { logWarn(`openresponses: non-stream response failed: ${String(err)}`); @@ -550,6 +737,7 @@ export async function handleOpenResponsesHttpRequest( output: [], error: { code: "api_error", message: "internal error" }, }); + rememberResponseSession(); sendJson(res, 500, response); } return true; @@ -619,6 +807,7 @@ export async function handleOpenResponsesHttpRequest( usage, }); + rememberResponseSession(); writeSseEvent(res, { type: "response.completed", response: finalResponse }); writeDone(res); res.end(); @@ -722,84 +911,106 @@ export async function handleOpenResponsesHttpRequest( }); finalUsage = extractUsageFromResult(result); + + // Check for pending client tool calls BEFORE maybeFinalize() because the + // lifecycle:end event may already have requested finalization. + const resultAny = result as { payloads?: Array<{ text?: string }>; meta?: unknown }; + const meta = resultAny.meta; + const { stopReason, pendingToolCalls } = resolveStopReasonAndPendingToolCalls(meta); + + if ( + !closed && + stopReason === "tool_calls" && + pendingToolCalls && + pendingToolCalls.length > 0 + ) { + const functionCall = pendingToolCalls[0]; + const usage = finalUsage ?? createEmptyUsage(); + const finalText = + accumulatedText || + (Array.isArray(resultAny.payloads) + ? resultAny.payloads + .map((p) => (typeof p.text === "string" ? p.text : "")) + .filter(Boolean) + .join("\n\n") + : ""); + + writeSseEvent(res, { + type: "response.output_text.done", + item_id: outputItemId, + output_index: 0, + content_index: 0, + text: finalText, + }); + writeSseEvent(res, { + type: "response.content_part.done", + item_id: outputItemId, + output_index: 0, + content_index: 0, + part: { type: "output_text", text: finalText }, + }); + + const completedItem = createAssistantOutputItem({ + id: outputItemId, + text: finalText, + status: "completed", + }); + writeSseEvent(res, { + type: "response.output_item.done", + output_index: 0, + item: completedItem, + }); + + const functionCallItemId = `call_${randomUUID()}`; + const functionCallItem = createFunctionCallOutputItem({ + id: functionCallItemId, + callId: functionCall.id, + name: functionCall.name, + arguments: functionCall.arguments, + }); + writeSseEvent(res, { + type: "response.output_item.added", + output_index: 1, + item: functionCallItem, + }); + const completedFunctionCallItem = createFunctionCallOutputItem({ + id: functionCallItemId, + callId: functionCall.id, + name: functionCall.name, + arguments: functionCall.arguments, + status: "completed", + }); + writeSseEvent(res, { + type: "response.output_item.done", + output_index: 1, + item: completedFunctionCallItem, + }); + + const incompleteResponse = createResponseResource({ + id: responseId, + model, + status: "incomplete", + output: [completedItem, functionCallItem], + usage, + }); + closed = true; + unsubscribe(); + rememberResponseSession(); + writeSseEvent(res, { type: "response.completed", response: incompleteResponse }); + writeDone(res); + res.end(); + return; + } + maybeFinalize(); if (closed) { return; } - // Fallback: if no streaming deltas were received, send the full response + // Fallback: if no streaming deltas were received, send the full response as text if (!sawAssistantDelta) { - const resultAny = result as { payloads?: Array<{ text?: string }>; meta?: unknown }; const payloads = resultAny.payloads; - const meta = resultAny.meta; - const { stopReason, pendingToolCalls } = resolveStopReasonAndPendingToolCalls(meta); - - // If agent called a client tool, emit function_call instead of text - if (stopReason === "tool_calls" && pendingToolCalls && pendingToolCalls.length > 0) { - const functionCall = pendingToolCalls[0]; - const usage = finalUsage ?? createEmptyUsage(); - - writeSseEvent(res, { - type: "response.output_text.done", - item_id: outputItemId, - output_index: 0, - content_index: 0, - text: "", - }); - writeSseEvent(res, { - type: "response.content_part.done", - item_id: outputItemId, - output_index: 0, - content_index: 0, - part: { type: "output_text", text: "" }, - }); - - const completedItem = createAssistantOutputItem({ - id: outputItemId, - text: "", - status: "completed", - }); - writeSseEvent(res, { - type: "response.output_item.done", - output_index: 0, - item: completedItem, - }); - - const functionCallItemId = `call_${randomUUID()}`; - const functionCallItem = { - type: "function_call" as const, - id: functionCallItemId, - call_id: functionCall.id, - name: functionCall.name, - arguments: functionCall.arguments, - }; - writeSseEvent(res, { - type: "response.output_item.added", - output_index: 1, - item: functionCallItem, - }); - writeSseEvent(res, { - type: "response.output_item.done", - output_index: 1, - item: { ...functionCallItem, status: "completed" as const }, - }); - - const incompleteResponse = createResponseResource({ - id: responseId, - model, - status: "incomplete", - output: [completedItem, functionCallItem], - usage, - }); - closed = true; - unsubscribe(); - writeSseEvent(res, { type: "response.completed", response: incompleteResponse }); - writeDone(res); - res.end(); - return; - } - const content = Array.isArray(payloads) && payloads.length > 0 ? payloads @@ -835,6 +1046,7 @@ export async function handleOpenResponsesHttpRequest( usage: finalUsage, }); + rememberResponseSession(); writeSseEvent(res, { type: "response.failed", response: errorResponse }); emitAgentEvent({ runId: responseId,