diff --git a/CHANGELOG.md b/CHANGELOG.md index 6761b937d30..32da431e31f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,6 +62,7 @@ Docs: https://docs.openclaw.ai - Gateway/MCP loopback: apply owner-only tool policy and run before-tool-call hooks on `127.0.0.1/mcp` `tools/list` and `tools/call`, so non-owner bearer callers can no longer see or invoke owner-only tools such as `cron`, `gateway`, and `nodes`, matching the existing HTTP `/tools/invoke` and embedded-agent paths. (#71159) Thanks @mmaps. - Codex harness/security: wait for final app-server approval decisions and sanitize approval preview text, so native Codex permission prompts cannot be resolved by an early placeholder decision or render unsafe terminal/control content. (#70751, #70569) Thanks @Lucenx9. - Providers/voice security: route ElevenLabs TTS and OpenAI Realtime browser-session secret creation through guarded fetch paths, preserving provider calls while keeping SSRF protections on voice surfaces. +- Agents/OpenAI WS: match Codex's Responses WebSocket continuation strategy, sending only strict incremental follow-up input with `previous_response_id` and falling back to full context when the replay chain or request shape differs. Fixes #44948. Thanks @hss-oss. - Plugins/Google Chat: log webhook auth rejection reasons only after all candidates fail, and warn when add-on `appPrincipal` values do not match configuration. Fixes #71078. (#71145) Thanks @luyao618. - Models/configure: preserve the existing default model when provider auth is re-run from configure while keeping explicit default-setting commands authoritative. Fixes #70696. (#70793) Thanks @Sathvik-1007. - Config/plugins: accept `plugins.entries.*.hooks.allowConversationAccess` in validation, generated schema metadata, and plugin policy inspection so trusted external plugins can enable conversation-access hooks such as `agent_end` without local schema patches. Fixes #71215. (#71221) Thanks @BillChirico. diff --git a/src/agents/openai-ws-message-conversion.ts b/src/agents/openai-ws-message-conversion.ts index badf1a257ae..10d749c037b 100644 --- a/src/agents/openai-ws-message-conversion.ts +++ b/src/agents/openai-ws-message-conversion.ts @@ -627,3 +627,13 @@ export function buildAssistantMessageFromResponse( ? ({ ...message, phase: finalAssistantPhase } as AssistantMessageWithPhase) : message; } + +export function convertResponseToInputItems( + response: ResponseObject, + modelInfo: { api: string; provider: string; id: string; input?: ReadonlyArray }, +): InputItem[] { + return convertMessagesToInputItems( + [buildAssistantMessageFromResponse(response, modelInfo)] as Message[], + modelInfo, + ); +} diff --git a/src/agents/openai-ws-request.ts b/src/agents/openai-ws-request.ts index 90c6f89d113..d8f911f474f 100644 --- a/src/agents/openai-ws-request.ts +++ b/src/agents/openai-ws-request.ts @@ -31,6 +31,87 @@ export interface PlannedWsTurnInput { previousResponseId?: string; } +export type PlannedWsRequestPayload = { + mode: "full_context" | "incremental"; + payload: ResponseCreateEvent; +}; + +function stringifyStable(value: unknown): string { + if (value === undefined) { + return ""; + } + if (value === null || typeof value !== "object") { + return JSON.stringify(value); + } + if (Array.isArray(value)) { + return `[${value.map((entry) => stringifyStable(entry)).join(",")}]`; + } + const entries = Object.entries(value as Record) + .filter(([, entry]) => entry !== undefined) + .toSorted(([left], [right]) => left.localeCompare(right)); + return `{${entries + .map(([key, entry]) => `${JSON.stringify(key)}:${stringifyStable(entry)}`) + .join(",")}}`; +} + +function payloadWithoutIncrementalFields(payload: ResponseCreateEvent): Record { + const { + input: _input, + metadata: _metadata, + previous_response_id: _previousResponseId, + ...rest + } = payload; + return rest; +} + +function payloadFieldsMatch(left: ResponseCreateEvent, right: ResponseCreateEvent): boolean { + return ( + stringifyStable(payloadWithoutIncrementalFields(left)) === + stringifyStable(payloadWithoutIncrementalFields(right)) + ); +} + +function inputItemsStartWith(input: InputItem[], baseline: InputItem[]): boolean { + if (baseline.length > input.length) { + return false; + } + return baseline.every((item, index) => stringifyStable(item) === stringifyStable(input[index])); +} + +export function planOpenAIWebSocketRequestPayload(params: { + fullPayload: ResponseCreateEvent; + previousRequestPayload?: ResponseCreateEvent; + previousResponseId?: string | null; + previousResponseInputItems?: InputItem[]; +}): PlannedWsRequestPayload { + const fullInputItems = Array.isArray(params.fullPayload.input) ? params.fullPayload.input : []; + const previousInputItems = Array.isArray(params.previousRequestPayload?.input) + ? params.previousRequestPayload.input + : []; + const previousResponseInputItems = params.previousResponseInputItems ?? []; + + if ( + params.previousResponseId && + params.previousRequestPayload && + payloadFieldsMatch(params.fullPayload, params.previousRequestPayload) + ) { + const baseline = [...previousInputItems, ...previousResponseInputItems]; + if (inputItemsStartWith(fullInputItems, baseline)) { + return { + mode: "incremental", + payload: { + ...params.fullPayload, + previous_response_id: params.previousResponseId, + input: fullInputItems.slice(baseline.length), + }, + }; + } + } + + const { previous_response_id: _previousResponseId, ...payload } = params.fullPayload; + return { mode: "full_context", payload }; +} + export function buildOpenAIWebSocketWarmUpPayload(params: { model: string; tools?: FunctionToolDefinition[]; diff --git a/src/agents/openai-ws-stream.test.ts b/src/agents/openai-ws-stream.test.ts index 4d63d14cda6..03ff3a0fb9a 100644 --- a/src/agents/openai-ws-stream.test.ts +++ b/src/agents/openai-ws-stream.test.ts @@ -11,7 +11,10 @@ import { createAssistantMessageEventStream } from "@mariozechner/pi-ai"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { ResponseObject } from "./openai-ws-connection.js"; -import { buildOpenAIWebSocketResponseCreatePayload } from "./openai-ws-request.js"; +import { + buildOpenAIWebSocketResponseCreatePayload, + planOpenAIWebSocketRequestPayload, +} from "./openai-ws-request.js"; import { __testing as openAIWsStreamTesting, buildAssistantMessageFromResponse, @@ -22,6 +25,7 @@ import { planTurnInput, releaseWsSession, } from "./openai-ws-stream.js"; +import type { InputItem, ResponseCreateEvent } from "./openai-ws-types.js"; import { log } from "./pi-embedded-runner/logger.js"; import { SYSTEM_PROMPT_CACHE_BOUNDARY } from "./system-prompt-cache-boundary.js"; @@ -152,6 +156,10 @@ const { MockManager } = vi.hoisted(() => { // Test helper: simulate a server event simulateEvent(event: unknown): void { + const maybeEvent = event as { type?: string; response?: { id?: string } }; + if (maybeEvent.type === "response.completed" && maybeEvent.response?.id) { + this._previousResponseId = maybeEvent.response.id; + } for (const fn of this._listeners) { fn(event); } @@ -1678,6 +1686,101 @@ describe("planTurnInput", () => { // ───────────────────────────────────────────────────────────────────────────── +describe("planOpenAIWebSocketRequestPayload", () => { + it("sends only the strict suffix when the full input extends the prior response chain", () => { + const previousInputItems: InputItem[] = [{ type: "message", role: "user", content: "Hello" }]; + const previousRequest: ResponseCreateEvent = { + type: "response.create", + model: "gpt-5.4", + store: false, + instructions: "You are helpful.", + input: previousInputItems, + }; + const previousResponseInputItems: InputItem[] = [ + { type: "message", role: "assistant", content: "Hi" }, + ]; + const fullPayload: ResponseCreateEvent = { + type: "response.create", + model: "gpt-5.4", + store: false, + instructions: "You are helpful.", + input: [ + ...previousInputItems, + ...previousResponseInputItems, + { type: "message", role: "user", content: "Next" }, + ], + }; + + const plan = planOpenAIWebSocketRequestPayload({ + fullPayload, + previousRequestPayload: previousRequest, + previousResponseId: "resp_prev", + previousResponseInputItems: [...previousResponseInputItems], + }); + + expect(plan.mode).toBe("incremental"); + expect(plan.payload.previous_response_id).toBe("resp_prev"); + expect(plan.payload.input).toEqual([{ type: "message", role: "user", content: "Next" }]); + }); + + it("falls back to full context when non-input fields differ", () => { + const previousInputItems: InputItem[] = [{ type: "message", role: "user", content: "Hello" }]; + const previousRequest: ResponseCreateEvent = { + type: "response.create", + model: "gpt-5.4", + store: false, + instructions: "Old instructions", + input: previousInputItems, + }; + const fullPayload: ResponseCreateEvent = { + ...previousRequest, + instructions: "New instructions", + input: [ + ...previousInputItems, + { type: "message", role: "assistant", content: "Hi" }, + { type: "message", role: "user", content: "Next" }, + ], + }; + + const plan = planOpenAIWebSocketRequestPayload({ + fullPayload, + previousRequestPayload: previousRequest, + previousResponseId: "resp_prev", + previousResponseInputItems: [{ type: "message", role: "assistant", content: "Hi" }], + }); + + expect(plan.mode).toBe("full_context"); + expect(plan.payload.previous_response_id).toBeUndefined(); + expect(plan.payload.input).toEqual(fullPayload.input); + }); + + it("falls back to full context when the input is not a strict response-chain extension", () => { + const previousRequest: ResponseCreateEvent = { + type: "response.create", + model: "gpt-5.4", + store: false, + input: [{ type: "message", role: "user", content: "Hello" }], + }; + const fullPayload: ResponseCreateEvent = { + ...previousRequest, + input: [{ type: "message", role: "user", content: "Different" }], + }; + + const plan = planOpenAIWebSocketRequestPayload({ + fullPayload, + previousRequestPayload: previousRequest, + previousResponseId: "resp_prev", + previousResponseInputItems: [{ type: "message", role: "assistant", content: "Hi" }], + }); + + expect(plan.mode).toBe("full_context"); + expect(plan.payload.previous_response_id).toBeUndefined(); + expect(plan.payload.input).toEqual(fullPayload.input); + }); +}); + +// ───────────────────────────────────────────────────────────────────────────── + describe("createOpenAIWebSocketStreamFn", () => { const modelStub = { api: "openai-responses", @@ -2738,7 +2841,6 @@ describe("createOpenAIWebSocketStreamFn", () => { // Server responds with a tool call const turn1Response = makeResponseObject("resp_turn1", undefined, "exec"); - manager.setPreviousResponseId("resp_turn1"); manager.simulateEvent({ type: "response.completed", response: turn1Response }); await done1; @@ -2747,8 +2849,8 @@ describe("createOpenAIWebSocketStreamFn", () => { systemPrompt: "You are helpful.", messages: [ userMsg("Run ls"), - assistantMsg([], [{ id: "call_1", name: "exec", args: { cmd: "ls" } }]), - toolResultMsg("call_1", "file.txt"), + buildAssistantMessageFromResponse(turn1Response, modelStub), + toolResultMsg("call_abc|item_2", "file.txt"), ] as Parameters[0], tools: [], }; @@ -2785,7 +2887,68 @@ describe("createOpenAIWebSocketStreamFn", () => { expect(inputTypes).toHaveLength(1); }); - it("omits previous_response_id when replaying full context on follow-up turns", async () => { + it("sends only a follow-up user message when the full context is a strict extension", async () => { + const sessionId = "sess-user-delta"; + const streamFn = createOpenAIWebSocketStreamFn("sk-test", sessionId); + + const ctx1 = { + systemPrompt: "You are helpful.", + messages: [userMsg("Hello")] as Parameters[0], + tools: [], + }; + + const stream1 = streamFn( + modelStub as Parameters[0], + ctx1 as Parameters[1], + ); + const done1 = (async () => { + for await (const _ of await resolveStream(stream1)) { + /* consume */ + } + })(); + + await new Promise((r) => setImmediate(r)); + const manager = MockManager.lastInstance!; + const turn1Response = makeResponseObject("resp_turn1_text", "Hi there."); + manager.simulateEvent({ type: "response.completed", response: turn1Response }); + await done1; + + const ctx2 = { + systemPrompt: "You are helpful.", + messages: [ + userMsg("Hello"), + buildAssistantMessageFromResponse(turn1Response, modelStub), + userMsg("What can you do?"), + ] as Parameters[0], + tools: [], + }; + + const stream2 = streamFn( + modelStub as Parameters[0], + ctx2 as Parameters[1], + ); + const done2 = (async () => { + for await (const _ of await resolveStream(stream2)) { + /* consume */ + } + })(); + + await new Promise((r) => setImmediate(r)); + manager.simulateEvent({ + type: "response.completed", + response: makeResponseObject("resp_turn2_text", "I can help."), + }); + await done2; + + const sent2 = manager.sentEvents[1] as { + previous_response_id?: string; + input: Array<{ type: string; role?: string; content?: unknown }>; + }; + expect(sent2.previous_response_id).toBe("resp_turn1_text"); + expect(sent2.input).toEqual([{ type: "message", role: "user", content: "What can you do?" }]); + }); + + it("uses an empty incremental payload when replay context exactly matches the response chain", async () => { const sessionId = "sess-full-context-replay"; const streamFn = createOpenAIWebSocketStreamFn("sk-test", sessionId); @@ -2830,7 +2993,6 @@ describe("createOpenAIWebSocketStreamFn", () => { await new Promise((r) => setImmediate(r)); const manager = MockManager.lastInstance!; - manager.setPreviousResponseId("resp_turn1_reasoning"); manager.simulateEvent({ type: "response.completed", response: turn1Response }); await done1; @@ -2864,14 +3026,8 @@ describe("createOpenAIWebSocketStreamFn", () => { previous_response_id?: string; input: Array<{ type: string; id?: string; call_id?: string }>; }; - expect(sent2.previous_response_id).toBeUndefined(); - expect(sent2.input.map((item) => item.type)).toEqual(["message", "reasoning", "function_call"]); - expect(sent2.input[1]).toMatchObject({ type: "reasoning", id: "rs_turn1" }); - expect(sent2.input[2]).toMatchObject({ - type: "function_call", - call_id: "call_turn1", - id: "fc_turn1", - }); + expect(sent2.previous_response_id).toBe("resp_turn1_reasoning"); + expect(sent2.input).toEqual([]); }); it("sends instructions (system prompt) in each request", async () => { diff --git a/src/agents/openai-ws-stream.ts b/src/agents/openai-ws-stream.ts index 6645d5a50e8..365f36a0f05 100644 --- a/src/agents/openai-ws-stream.ts +++ b/src/agents/openai-ws-stream.ts @@ -51,10 +51,15 @@ import { import { buildAssistantMessageFromResponse, convertMessagesToInputItems, + convertResponseToInputItems, convertTools, planTurnInput, } from "./openai-ws-message-conversion.js"; -import { buildOpenAIWebSocketResponseCreatePayload } from "./openai-ws-request.js"; +import { + buildOpenAIWebSocketResponseCreatePayload, + planOpenAIWebSocketRequestPayload, +} from "./openai-ws-request.js"; +import type { ResponseCreateEvent } from "./openai-ws-types.js"; import { log } from "./pi-embedded-runner/logger.js"; import { resolveProviderEndpoint } from "./provider-attribution.js"; import { normalizeProviderId } from "./provider-id.js"; @@ -76,6 +81,10 @@ interface WsSession { authSignature: string; /** Number of messages that were in context.messages at the END of the last streamFn call. */ lastContextLength: number; + /** Last full canonical request, before any incremental previous_response_id delta rewrite. */ + lastRequestPayload?: ResponseCreateEvent; + /** Last response output converted to the same replay form used by future full-context sends. */ + lastResponseInputItems: ReturnType; /** True if the connection has been established at least once. */ everConnected: boolean; /** True once a best-effort warm-up attempt has run for this session. */ @@ -358,6 +367,9 @@ function resetWsSession(params: { params.session.everConnected = false; params.session.warmUpAttempted = false; params.session.broken = false; + params.session.lastContextLength = 0; + params.session.lastRequestPayload = undefined; + params.session.lastResponseInputItems = []; if (!params.preserveDegradeUntil) { params.session.degradedUntil = null; } @@ -728,6 +740,7 @@ export function createOpenAIWebSocketStreamFn( managerConfigSignature, authSignature, lastContextLength: 0, + lastResponseInputItems: [], everConnected: false, warmUpAttempted: false, broken: false, @@ -890,27 +903,6 @@ export function createOpenAIWebSocketStreamFn( } } - const turnInput = planTurnInput({ - context, - model, - previousResponseId: session.manager.previousResponseId, - lastContextLength: session.lastContextLength, - }); - - if (turnInput.mode === "incremental_tool_results") { - log.debug( - `[ws-stream] session=${sessionId}: incremental send (${turnInput.inputItems.length} tool results) previous_response_id=${turnInput.previousResponseId}`, - ); - } else if (turnInput.mode === "full_context_restart") { - log.debug( - `[ws-stream] session=${sessionId}: no new tool results found; sending full context without previous_response_id`, - ); - } else { - log.debug( - `[ws-stream] session=${sessionId}: full context send (${turnInput.inputItems.length} items)`, - ); - } - turnAttempt++; const turnState = resolveProviderTransportTurnState(model, { sessionId, @@ -918,22 +910,45 @@ export function createOpenAIWebSocketStreamFn( attempt: turnAttempt, transport: "websocket", }); - let payload = buildOpenAIWebSocketResponseCreatePayload({ + const fullTurnInput = { + inputItems: convertMessagesToInputItems(context.messages, model), + }; + let fullPayload = buildOpenAIWebSocketResponseCreatePayload({ model, context, options: options as WsOptions | undefined, - turnInput, + turnInput: fullTurnInput, tools: convertTools(context.tools, { strict: resolveOpenAIWebSocketStrictToolSetting(model), }), metadata: turnState?.metadata, }) as Record; - const nextPayload = await options?.onPayload?.(payload, model); - payload = mergeTransportMetadata( - (nextPayload ?? payload) as Record, + const nextPayload = await options?.onPayload?.(fullPayload, model); + fullPayload = mergeTransportMetadata( + (nextPayload ?? fullPayload) as Record, turnState?.metadata, ); - const requestPayload = payload as Parameters[0]; + const plannedPayload = planOpenAIWebSocketRequestPayload({ + fullPayload: fullPayload as ResponseCreateEvent, + previousRequestPayload: session.lastRequestPayload, + previousResponseId: session.manager.previousResponseId, + previousResponseInputItems: session.lastResponseInputItems, + }); + const plannedInputItems = Array.isArray(plannedPayload.payload.input) + ? plannedPayload.payload.input + : []; + if (plannedPayload.mode === "incremental") { + log.debug( + `[ws-stream] session=${sessionId}: incremental send (${plannedInputItems.length} items) previous_response_id=${plannedPayload.payload.previous_response_id}`, + ); + } else { + log.debug( + `[ws-stream] session=${sessionId}: full context send (${plannedInputItems.length} items)`, + ); + } + const requestPayload = plannedPayload.payload as Parameters< + OpenAIWebSocketManager["send"] + >[0]; try { session.manager.send(requestPayload); @@ -1167,6 +1182,13 @@ export function createOpenAIWebSocketStreamFn( emittedTextByPart.clear(); cleanup(); session.lastContextLength = capturedContextLength; + session.lastRequestPayload = fullPayload as ResponseCreateEvent; + session.lastResponseInputItems = convertResponseToInputItems(event.response, { + api: model.api, + provider: model.provider, + id: model.id, + input: model.input, + }); const assistantMsg = buildAssistantMessageFromResponse(event.response, { api: model.api, provider: model.provider,