fix: match codex openai websocket continuation

This commit is contained in:
Peter Steinberger
2026-04-24 22:46:56 +01:00
parent 95a2b0d799
commit cabdf5bbc4
5 changed files with 312 additions and 42 deletions

View File

@@ -62,6 +62,7 @@ Docs: https://docs.openclaw.ai
- Gateway/MCP loopback: apply owner-only tool policy and run before-tool-call hooks on `127.0.0.1/mcp` `tools/list` and `tools/call`, so non-owner bearer callers can no longer see or invoke owner-only tools such as `cron`, `gateway`, and `nodes`, matching the existing HTTP `/tools/invoke` and embedded-agent paths. (#71159) Thanks @mmaps.
- Codex harness/security: wait for final app-server approval decisions and sanitize approval preview text, so native Codex permission prompts cannot be resolved by an early placeholder decision or render unsafe terminal/control content. (#70751, #70569) Thanks @Lucenx9.
- Providers/voice security: route ElevenLabs TTS and OpenAI Realtime browser-session secret creation through guarded fetch paths, preserving provider calls while keeping SSRF protections on voice surfaces.
- Agents/OpenAI WS: match Codex's Responses WebSocket continuation strategy, sending only strict incremental follow-up input with `previous_response_id` and falling back to full context when the replay chain or request shape differs. Fixes #44948. Thanks @hss-oss.
- Plugins/Google Chat: log webhook auth rejection reasons only after all candidates fail, and warn when add-on `appPrincipal` values do not match configuration. Fixes #71078. (#71145) Thanks @luyao618.
- Models/configure: preserve the existing default model when provider auth is re-run from configure while keeping explicit default-setting commands authoritative. Fixes #70696. (#70793) Thanks @Sathvik-1007.
- Config/plugins: accept `plugins.entries.*.hooks.allowConversationAccess` in validation, generated schema metadata, and plugin policy inspection so trusted external plugins can enable conversation-access hooks such as `agent_end` without local schema patches. Fixes #71215. (#71221) Thanks @BillChirico.

View File

@@ -627,3 +627,13 @@ export function buildAssistantMessageFromResponse(
? ({ ...message, phase: finalAssistantPhase } as AssistantMessageWithPhase)
: message;
}
export function convertResponseToInputItems(
response: ResponseObject,
modelInfo: { api: string; provider: string; id: string; input?: ReadonlyArray<string> },
): InputItem[] {
return convertMessagesToInputItems(
[buildAssistantMessageFromResponse(response, modelInfo)] as Message[],
modelInfo,
);
}

View File

@@ -31,6 +31,87 @@ export interface PlannedWsTurnInput {
previousResponseId?: string;
}
export type PlannedWsRequestPayload = {
mode: "full_context" | "incremental";
payload: ResponseCreateEvent;
};
function stringifyStable(value: unknown): string {
if (value === undefined) {
return "";
}
if (value === null || typeof value !== "object") {
return JSON.stringify(value);
}
if (Array.isArray(value)) {
return `[${value.map((entry) => stringifyStable(entry)).join(",")}]`;
}
const entries = Object.entries(value as Record<string, unknown>)
.filter(([, entry]) => entry !== undefined)
.toSorted(([left], [right]) => left.localeCompare(right));
return `{${entries
.map(([key, entry]) => `${JSON.stringify(key)}:${stringifyStable(entry)}`)
.join(",")}}`;
}
function payloadWithoutIncrementalFields(payload: ResponseCreateEvent): Record<string, unknown> {
const {
input: _input,
metadata: _metadata,
previous_response_id: _previousResponseId,
...rest
} = payload;
return rest;
}
function payloadFieldsMatch(left: ResponseCreateEvent, right: ResponseCreateEvent): boolean {
return (
stringifyStable(payloadWithoutIncrementalFields(left)) ===
stringifyStable(payloadWithoutIncrementalFields(right))
);
}
function inputItemsStartWith(input: InputItem[], baseline: InputItem[]): boolean {
if (baseline.length > input.length) {
return false;
}
return baseline.every((item, index) => stringifyStable(item) === stringifyStable(input[index]));
}
export function planOpenAIWebSocketRequestPayload(params: {
fullPayload: ResponseCreateEvent;
previousRequestPayload?: ResponseCreateEvent;
previousResponseId?: string | null;
previousResponseInputItems?: InputItem[];
}): PlannedWsRequestPayload {
const fullInputItems = Array.isArray(params.fullPayload.input) ? params.fullPayload.input : [];
const previousInputItems = Array.isArray(params.previousRequestPayload?.input)
? params.previousRequestPayload.input
: [];
const previousResponseInputItems = params.previousResponseInputItems ?? [];
if (
params.previousResponseId &&
params.previousRequestPayload &&
payloadFieldsMatch(params.fullPayload, params.previousRequestPayload)
) {
const baseline = [...previousInputItems, ...previousResponseInputItems];
if (inputItemsStartWith(fullInputItems, baseline)) {
return {
mode: "incremental",
payload: {
...params.fullPayload,
previous_response_id: params.previousResponseId,
input: fullInputItems.slice(baseline.length),
},
};
}
}
const { previous_response_id: _previousResponseId, ...payload } = params.fullPayload;
return { mode: "full_context", payload };
}
export function buildOpenAIWebSocketWarmUpPayload(params: {
model: string;
tools?: FunctionToolDefinition[];

View File

@@ -11,7 +11,10 @@
import { createAssistantMessageEventStream } from "@mariozechner/pi-ai";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { ResponseObject } from "./openai-ws-connection.js";
import { buildOpenAIWebSocketResponseCreatePayload } from "./openai-ws-request.js";
import {
buildOpenAIWebSocketResponseCreatePayload,
planOpenAIWebSocketRequestPayload,
} from "./openai-ws-request.js";
import {
__testing as openAIWsStreamTesting,
buildAssistantMessageFromResponse,
@@ -22,6 +25,7 @@ import {
planTurnInput,
releaseWsSession,
} from "./openai-ws-stream.js";
import type { InputItem, ResponseCreateEvent } from "./openai-ws-types.js";
import { log } from "./pi-embedded-runner/logger.js";
import { SYSTEM_PROMPT_CACHE_BOUNDARY } from "./system-prompt-cache-boundary.js";
@@ -152,6 +156,10 @@ const { MockManager } = vi.hoisted(() => {
// Test helper: simulate a server event
simulateEvent(event: unknown): void {
const maybeEvent = event as { type?: string; response?: { id?: string } };
if (maybeEvent.type === "response.completed" && maybeEvent.response?.id) {
this._previousResponseId = maybeEvent.response.id;
}
for (const fn of this._listeners) {
fn(event);
}
@@ -1678,6 +1686,101 @@ describe("planTurnInput", () => {
// ─────────────────────────────────────────────────────────────────────────────
describe("planOpenAIWebSocketRequestPayload", () => {
it("sends only the strict suffix when the full input extends the prior response chain", () => {
const previousInputItems: InputItem[] = [{ type: "message", role: "user", content: "Hello" }];
const previousRequest: ResponseCreateEvent = {
type: "response.create",
model: "gpt-5.4",
store: false,
instructions: "You are helpful.",
input: previousInputItems,
};
const previousResponseInputItems: InputItem[] = [
{ type: "message", role: "assistant", content: "Hi" },
];
const fullPayload: ResponseCreateEvent = {
type: "response.create",
model: "gpt-5.4",
store: false,
instructions: "You are helpful.",
input: [
...previousInputItems,
...previousResponseInputItems,
{ type: "message", role: "user", content: "Next" },
],
};
const plan = planOpenAIWebSocketRequestPayload({
fullPayload,
previousRequestPayload: previousRequest,
previousResponseId: "resp_prev",
previousResponseInputItems: [...previousResponseInputItems],
});
expect(plan.mode).toBe("incremental");
expect(plan.payload.previous_response_id).toBe("resp_prev");
expect(plan.payload.input).toEqual([{ type: "message", role: "user", content: "Next" }]);
});
it("falls back to full context when non-input fields differ", () => {
const previousInputItems: InputItem[] = [{ type: "message", role: "user", content: "Hello" }];
const previousRequest: ResponseCreateEvent = {
type: "response.create",
model: "gpt-5.4",
store: false,
instructions: "Old instructions",
input: previousInputItems,
};
const fullPayload: ResponseCreateEvent = {
...previousRequest,
instructions: "New instructions",
input: [
...previousInputItems,
{ type: "message", role: "assistant", content: "Hi" },
{ type: "message", role: "user", content: "Next" },
],
};
const plan = planOpenAIWebSocketRequestPayload({
fullPayload,
previousRequestPayload: previousRequest,
previousResponseId: "resp_prev",
previousResponseInputItems: [{ type: "message", role: "assistant", content: "Hi" }],
});
expect(plan.mode).toBe("full_context");
expect(plan.payload.previous_response_id).toBeUndefined();
expect(plan.payload.input).toEqual(fullPayload.input);
});
it("falls back to full context when the input is not a strict response-chain extension", () => {
const previousRequest: ResponseCreateEvent = {
type: "response.create",
model: "gpt-5.4",
store: false,
input: [{ type: "message", role: "user", content: "Hello" }],
};
const fullPayload: ResponseCreateEvent = {
...previousRequest,
input: [{ type: "message", role: "user", content: "Different" }],
};
const plan = planOpenAIWebSocketRequestPayload({
fullPayload,
previousRequestPayload: previousRequest,
previousResponseId: "resp_prev",
previousResponseInputItems: [{ type: "message", role: "assistant", content: "Hi" }],
});
expect(plan.mode).toBe("full_context");
expect(plan.payload.previous_response_id).toBeUndefined();
expect(plan.payload.input).toEqual(fullPayload.input);
});
});
// ─────────────────────────────────────────────────────────────────────────────
describe("createOpenAIWebSocketStreamFn", () => {
const modelStub = {
api: "openai-responses",
@@ -2738,7 +2841,6 @@ describe("createOpenAIWebSocketStreamFn", () => {
// Server responds with a tool call
const turn1Response = makeResponseObject("resp_turn1", undefined, "exec");
manager.setPreviousResponseId("resp_turn1");
manager.simulateEvent({ type: "response.completed", response: turn1Response });
await done1;
@@ -2747,8 +2849,8 @@ describe("createOpenAIWebSocketStreamFn", () => {
systemPrompt: "You are helpful.",
messages: [
userMsg("Run ls"),
assistantMsg([], [{ id: "call_1", name: "exec", args: { cmd: "ls" } }]),
toolResultMsg("call_1", "file.txt"),
buildAssistantMessageFromResponse(turn1Response, modelStub),
toolResultMsg("call_abc|item_2", "file.txt"),
] as Parameters<typeof convertMessagesToInputItems>[0],
tools: [],
};
@@ -2785,7 +2887,68 @@ describe("createOpenAIWebSocketStreamFn", () => {
expect(inputTypes).toHaveLength(1);
});
it("omits previous_response_id when replaying full context on follow-up turns", async () => {
it("sends only a follow-up user message when the full context is a strict extension", async () => {
const sessionId = "sess-user-delta";
const streamFn = createOpenAIWebSocketStreamFn("sk-test", sessionId);
const ctx1 = {
systemPrompt: "You are helpful.",
messages: [userMsg("Hello")] as Parameters<typeof convertMessagesToInputItems>[0],
tools: [],
};
const stream1 = streamFn(
modelStub as Parameters<typeof streamFn>[0],
ctx1 as Parameters<typeof streamFn>[1],
);
const done1 = (async () => {
for await (const _ of await resolveStream(stream1)) {
/* consume */
}
})();
await new Promise((r) => setImmediate(r));
const manager = MockManager.lastInstance!;
const turn1Response = makeResponseObject("resp_turn1_text", "Hi there.");
manager.simulateEvent({ type: "response.completed", response: turn1Response });
await done1;
const ctx2 = {
systemPrompt: "You are helpful.",
messages: [
userMsg("Hello"),
buildAssistantMessageFromResponse(turn1Response, modelStub),
userMsg("What can you do?"),
] as Parameters<typeof convertMessagesToInputItems>[0],
tools: [],
};
const stream2 = streamFn(
modelStub as Parameters<typeof streamFn>[0],
ctx2 as Parameters<typeof streamFn>[1],
);
const done2 = (async () => {
for await (const _ of await resolveStream(stream2)) {
/* consume */
}
})();
await new Promise((r) => setImmediate(r));
manager.simulateEvent({
type: "response.completed",
response: makeResponseObject("resp_turn2_text", "I can help."),
});
await done2;
const sent2 = manager.sentEvents[1] as {
previous_response_id?: string;
input: Array<{ type: string; role?: string; content?: unknown }>;
};
expect(sent2.previous_response_id).toBe("resp_turn1_text");
expect(sent2.input).toEqual([{ type: "message", role: "user", content: "What can you do?" }]);
});
it("uses an empty incremental payload when replay context exactly matches the response chain", async () => {
const sessionId = "sess-full-context-replay";
const streamFn = createOpenAIWebSocketStreamFn("sk-test", sessionId);
@@ -2830,7 +2993,6 @@ describe("createOpenAIWebSocketStreamFn", () => {
await new Promise((r) => setImmediate(r));
const manager = MockManager.lastInstance!;
manager.setPreviousResponseId("resp_turn1_reasoning");
manager.simulateEvent({ type: "response.completed", response: turn1Response });
await done1;
@@ -2864,14 +3026,8 @@ describe("createOpenAIWebSocketStreamFn", () => {
previous_response_id?: string;
input: Array<{ type: string; id?: string; call_id?: string }>;
};
expect(sent2.previous_response_id).toBeUndefined();
expect(sent2.input.map((item) => item.type)).toEqual(["message", "reasoning", "function_call"]);
expect(sent2.input[1]).toMatchObject({ type: "reasoning", id: "rs_turn1" });
expect(sent2.input[2]).toMatchObject({
type: "function_call",
call_id: "call_turn1",
id: "fc_turn1",
});
expect(sent2.previous_response_id).toBe("resp_turn1_reasoning");
expect(sent2.input).toEqual([]);
});
it("sends instructions (system prompt) in each request", async () => {

View File

@@ -51,10 +51,15 @@ import {
import {
buildAssistantMessageFromResponse,
convertMessagesToInputItems,
convertResponseToInputItems,
convertTools,
planTurnInput,
} from "./openai-ws-message-conversion.js";
import { buildOpenAIWebSocketResponseCreatePayload } from "./openai-ws-request.js";
import {
buildOpenAIWebSocketResponseCreatePayload,
planOpenAIWebSocketRequestPayload,
} from "./openai-ws-request.js";
import type { ResponseCreateEvent } from "./openai-ws-types.js";
import { log } from "./pi-embedded-runner/logger.js";
import { resolveProviderEndpoint } from "./provider-attribution.js";
import { normalizeProviderId } from "./provider-id.js";
@@ -76,6 +81,10 @@ interface WsSession {
authSignature: string;
/** Number of messages that were in context.messages at the END of the last streamFn call. */
lastContextLength: number;
/** Last full canonical request, before any incremental previous_response_id delta rewrite. */
lastRequestPayload?: ResponseCreateEvent;
/** Last response output converted to the same replay form used by future full-context sends. */
lastResponseInputItems: ReturnType<typeof convertResponseToInputItems>;
/** True if the connection has been established at least once. */
everConnected: boolean;
/** True once a best-effort warm-up attempt has run for this session. */
@@ -358,6 +367,9 @@ function resetWsSession(params: {
params.session.everConnected = false;
params.session.warmUpAttempted = false;
params.session.broken = false;
params.session.lastContextLength = 0;
params.session.lastRequestPayload = undefined;
params.session.lastResponseInputItems = [];
if (!params.preserveDegradeUntil) {
params.session.degradedUntil = null;
}
@@ -728,6 +740,7 @@ export function createOpenAIWebSocketStreamFn(
managerConfigSignature,
authSignature,
lastContextLength: 0,
lastResponseInputItems: [],
everConnected: false,
warmUpAttempted: false,
broken: false,
@@ -890,27 +903,6 @@ export function createOpenAIWebSocketStreamFn(
}
}
const turnInput = planTurnInput({
context,
model,
previousResponseId: session.manager.previousResponseId,
lastContextLength: session.lastContextLength,
});
if (turnInput.mode === "incremental_tool_results") {
log.debug(
`[ws-stream] session=${sessionId}: incremental send (${turnInput.inputItems.length} tool results) previous_response_id=${turnInput.previousResponseId}`,
);
} else if (turnInput.mode === "full_context_restart") {
log.debug(
`[ws-stream] session=${sessionId}: no new tool results found; sending full context without previous_response_id`,
);
} else {
log.debug(
`[ws-stream] session=${sessionId}: full context send (${turnInput.inputItems.length} items)`,
);
}
turnAttempt++;
const turnState = resolveProviderTransportTurnState(model, {
sessionId,
@@ -918,22 +910,45 @@ export function createOpenAIWebSocketStreamFn(
attempt: turnAttempt,
transport: "websocket",
});
let payload = buildOpenAIWebSocketResponseCreatePayload({
const fullTurnInput = {
inputItems: convertMessagesToInputItems(context.messages, model),
};
let fullPayload = buildOpenAIWebSocketResponseCreatePayload({
model,
context,
options: options as WsOptions | undefined,
turnInput,
turnInput: fullTurnInput,
tools: convertTools(context.tools, {
strict: resolveOpenAIWebSocketStrictToolSetting(model),
}),
metadata: turnState?.metadata,
}) as Record<string, unknown>;
const nextPayload = await options?.onPayload?.(payload, model);
payload = mergeTransportMetadata(
(nextPayload ?? payload) as Record<string, unknown>,
const nextPayload = await options?.onPayload?.(fullPayload, model);
fullPayload = mergeTransportMetadata(
(nextPayload ?? fullPayload) as Record<string, unknown>,
turnState?.metadata,
);
const requestPayload = payload as Parameters<OpenAIWebSocketManager["send"]>[0];
const plannedPayload = planOpenAIWebSocketRequestPayload({
fullPayload: fullPayload as ResponseCreateEvent,
previousRequestPayload: session.lastRequestPayload,
previousResponseId: session.manager.previousResponseId,
previousResponseInputItems: session.lastResponseInputItems,
});
const plannedInputItems = Array.isArray(plannedPayload.payload.input)
? plannedPayload.payload.input
: [];
if (plannedPayload.mode === "incremental") {
log.debug(
`[ws-stream] session=${sessionId}: incremental send (${plannedInputItems.length} items) previous_response_id=${plannedPayload.payload.previous_response_id}`,
);
} else {
log.debug(
`[ws-stream] session=${sessionId}: full context send (${plannedInputItems.length} items)`,
);
}
const requestPayload = plannedPayload.payload as Parameters<
OpenAIWebSocketManager["send"]
>[0];
try {
session.manager.send(requestPayload);
@@ -1167,6 +1182,13 @@ export function createOpenAIWebSocketStreamFn(
emittedTextByPart.clear();
cleanup();
session.lastContextLength = capturedContextLength;
session.lastRequestPayload = fullPayload as ResponseCreateEvent;
session.lastResponseInputItems = convertResponseToInputItems(event.response, {
api: model.api,
provider: model.provider,
id: model.id,
input: model.input,
});
const assistantMsg = buildAssistantMessageFromResponse(event.response, {
api: model.api,
provider: model.provider,