mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-12 07:20:45 +00:00
fix: harden openai websocket replay
This commit is contained in:
@@ -208,11 +208,10 @@ export type ToolChoice =
|
||||
|
||||
export interface FunctionToolDefinition {
|
||||
type: "function";
|
||||
function: {
|
||||
name: string;
|
||||
description?: string;
|
||||
parameters?: Record<string, unknown>;
|
||||
};
|
||||
name: string;
|
||||
description?: string;
|
||||
parameters?: Record<string, unknown>;
|
||||
strict?: boolean;
|
||||
}
|
||||
|
||||
/** Standard response.create event payload (full turn) */
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
* Skipped in CI — no API key available and we avoid billable external calls.
|
||||
*/
|
||||
|
||||
import type { AssistantMessage, Context } from "@mariozechner/pi-ai";
|
||||
import { describe, it, expect, afterEach } from "vitest";
|
||||
import {
|
||||
createOpenAIWebSocketStreamFn,
|
||||
@@ -28,14 +29,13 @@ const testFn = LIVE ? it : it.skip;
|
||||
const model = {
|
||||
api: "openai-responses" as const,
|
||||
provider: "openai",
|
||||
id: "gpt-4o-mini",
|
||||
name: "gpt-4o-mini",
|
||||
baseUrl: "",
|
||||
reasoning: false,
|
||||
input: { maxTokens: 128_000 },
|
||||
output: { maxTokens: 16_384 },
|
||||
cache: false,
|
||||
compat: {},
|
||||
id: "gpt-5.2",
|
||||
name: "gpt-5.2",
|
||||
contextWindow: 128_000,
|
||||
maxTokens: 4_096,
|
||||
reasoning: true,
|
||||
input: ["text"],
|
||||
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
|
||||
} as unknown as Parameters<ReturnType<typeof createOpenAIWebSocketStreamFn>>[0];
|
||||
|
||||
type StreamFnParams = Parameters<ReturnType<typeof createOpenAIWebSocketStreamFn>>;
|
||||
@@ -47,6 +47,61 @@ function makeContext(userMessage: string): StreamFnParams[1] {
|
||||
} as unknown as StreamFnParams[1];
|
||||
}
|
||||
|
||||
function makeToolContext(userMessage: string): StreamFnParams[1] {
|
||||
return {
|
||||
systemPrompt: "You are a precise assistant. Follow tool instructions exactly.",
|
||||
messages: [{ role: "user" as const, content: userMessage }],
|
||||
tools: [
|
||||
{
|
||||
name: "noop",
|
||||
description: "Return the supplied tool result to the user.",
|
||||
parameters: {
|
||||
type: "object",
|
||||
additionalProperties: false,
|
||||
properties: {},
|
||||
},
|
||||
},
|
||||
],
|
||||
} as unknown as Context;
|
||||
}
|
||||
|
||||
function makeToolResultMessage(
|
||||
callId: string,
|
||||
output: string,
|
||||
): StreamFnParams[1]["messages"][number] {
|
||||
return {
|
||||
role: "toolResult" as const,
|
||||
toolCallId: callId,
|
||||
toolName: "noop",
|
||||
content: [{ type: "text" as const, text: output }],
|
||||
isError: false,
|
||||
timestamp: Date.now(),
|
||||
} as unknown as StreamFnParams[1]["messages"][number];
|
||||
}
|
||||
|
||||
async function collectEvents(
|
||||
stream: ReturnType<ReturnType<typeof createOpenAIWebSocketStreamFn>>,
|
||||
): Promise<Array<{ type: string; message?: AssistantMessage }>> {
|
||||
const events: Array<{ type: string; message?: AssistantMessage }> = [];
|
||||
for await (const event of stream as AsyncIterable<{ type: string; message?: AssistantMessage }>) {
|
||||
events.push(event);
|
||||
}
|
||||
return events;
|
||||
}
|
||||
|
||||
function expectDone(events: Array<{ type: string; message?: AssistantMessage }>): AssistantMessage {
|
||||
const done = events.find((event) => event.type === "done")?.message;
|
||||
expect(done).toBeDefined();
|
||||
return done!;
|
||||
}
|
||||
|
||||
function assistantText(message: AssistantMessage): string {
|
||||
return message.content
|
||||
.filter((block) => block.type === "text")
|
||||
.map((block) => block.text)
|
||||
.join("");
|
||||
}
|
||||
|
||||
/** Each test gets a unique session ID to avoid cross-test interference. */
|
||||
const sessions: string[] = [];
|
||||
function freshSession(name: string): string {
|
||||
@@ -68,26 +123,14 @@ describe("OpenAI WebSocket e2e", () => {
|
||||
async () => {
|
||||
const sid = freshSession("single");
|
||||
const streamFn = createOpenAIWebSocketStreamFn(API_KEY!, sid);
|
||||
const stream = streamFn(model, makeContext("What is 2+2?"), {});
|
||||
const stream = streamFn(model, makeContext("What is 2+2?"), { transport: "websocket" });
|
||||
const done = expectDone(await collectEvents(stream));
|
||||
|
||||
const events: Array<{ type: string }> = [];
|
||||
for await (const event of stream as AsyncIterable<{ type: string }>) {
|
||||
events.push(event);
|
||||
}
|
||||
|
||||
const done = events.find((e) => e.type === "done") as
|
||||
| { type: "done"; message: { content: Array<{ type: string; text?: string }> } }
|
||||
| undefined;
|
||||
expect(done).toBeDefined();
|
||||
expect(done!.message.content.length).toBeGreaterThan(0);
|
||||
|
||||
const text = done!.message.content
|
||||
.filter((c) => c.type === "text")
|
||||
.map((c) => c.text)
|
||||
.join("");
|
||||
expect(done.content.length).toBeGreaterThan(0);
|
||||
const text = assistantText(done);
|
||||
expect(text).toMatch(/4/);
|
||||
},
|
||||
30_000,
|
||||
45_000,
|
||||
);
|
||||
|
||||
testFn(
|
||||
@@ -96,19 +139,80 @@ describe("OpenAI WebSocket e2e", () => {
|
||||
const sid = freshSession("temp");
|
||||
const streamFn = createOpenAIWebSocketStreamFn(API_KEY!, sid);
|
||||
const stream = streamFn(model, makeContext("Pick a random number between 1 and 1000."), {
|
||||
transport: "websocket",
|
||||
temperature: 0.8,
|
||||
});
|
||||
|
||||
const events: Array<{ type: string }> = [];
|
||||
for await (const event of stream as AsyncIterable<{ type: string }>) {
|
||||
events.push(event);
|
||||
}
|
||||
const events = await collectEvents(stream);
|
||||
|
||||
// Stream must complete (done or error with fallback) — must NOT hang.
|
||||
const hasTerminal = events.some((e) => e.type === "done" || e.type === "error");
|
||||
expect(hasTerminal).toBe(true);
|
||||
},
|
||||
30_000,
|
||||
45_000,
|
||||
);
|
||||
|
||||
testFn(
|
||||
"reuses the websocket session for tool-call follow-up turns",
|
||||
async () => {
|
||||
const sid = freshSession("tool-roundtrip");
|
||||
const streamFn = createOpenAIWebSocketStreamFn(API_KEY!, sid);
|
||||
const firstContext = makeToolContext(
|
||||
"Call the tool `noop` with {}. After the tool result arrives, reply with exactly the tool output and nothing else.",
|
||||
);
|
||||
const firstEvents = await collectEvents(
|
||||
streamFn(model, firstContext, {
|
||||
transport: "websocket",
|
||||
toolChoice: "required",
|
||||
maxTokens: 128,
|
||||
}),
|
||||
);
|
||||
const firstDone = expectDone(firstEvents);
|
||||
const toolCall = firstDone.content.find((block) => block.type === "toolCall") as
|
||||
| { type: "toolCall"; id: string; name: string }
|
||||
| undefined;
|
||||
expect(toolCall?.name).toBe("noop");
|
||||
expect(toolCall?.id).toBeTruthy();
|
||||
|
||||
const secondContext = {
|
||||
...firstContext,
|
||||
messages: [
|
||||
...firstContext.messages,
|
||||
firstDone,
|
||||
makeToolResultMessage(toolCall!.id, "TOOL_OK"),
|
||||
],
|
||||
} as unknown as StreamFnParams[1];
|
||||
const secondDone = expectDone(
|
||||
await collectEvents(
|
||||
streamFn(model, secondContext, {
|
||||
transport: "websocket",
|
||||
maxTokens: 128,
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
expect(assistantText(secondDone)).toMatch(/TOOL_OK/);
|
||||
},
|
||||
60_000,
|
||||
);
|
||||
|
||||
testFn(
|
||||
"supports websocket warm-up before the first request",
|
||||
async () => {
|
||||
const sid = freshSession("warmup");
|
||||
const streamFn = createOpenAIWebSocketStreamFn(API_KEY!, sid);
|
||||
const done = expectDone(
|
||||
await collectEvents(
|
||||
streamFn(model, makeContext("Reply with the word warmed."), {
|
||||
transport: "websocket",
|
||||
openaiWsWarmup: true,
|
||||
maxTokens: 32,
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
expect(assistantText(done).toLowerCase()).toContain("warmed");
|
||||
},
|
||||
45_000,
|
||||
);
|
||||
|
||||
testFn(
|
||||
@@ -119,16 +223,13 @@ describe("OpenAI WebSocket e2e", () => {
|
||||
|
||||
expect(hasWsSession(sid)).toBe(false);
|
||||
|
||||
const stream = streamFn(model, makeContext("Say hello."), {});
|
||||
for await (const _ of stream as AsyncIterable<unknown>) {
|
||||
/* consume */
|
||||
}
|
||||
await collectEvents(streamFn(model, makeContext("Say hello."), { transport: "websocket" }));
|
||||
|
||||
expect(hasWsSession(sid)).toBe(true);
|
||||
releaseWsSession(sid);
|
||||
expect(hasWsSession(sid)).toBe(false);
|
||||
},
|
||||
30_000,
|
||||
45_000,
|
||||
);
|
||||
|
||||
testFn(
|
||||
@@ -137,15 +238,11 @@ describe("OpenAI WebSocket e2e", () => {
|
||||
const sid = freshSession("fallback");
|
||||
const streamFn = createOpenAIWebSocketStreamFn("sk-invalid-key", sid);
|
||||
const stream = streamFn(model, makeContext("Hello"), {});
|
||||
|
||||
const events: Array<{ type: string }> = [];
|
||||
for await (const event of stream as AsyncIterable<{ type: string }>) {
|
||||
events.push(event);
|
||||
}
|
||||
const events = await collectEvents(stream);
|
||||
|
||||
const hasTerminal = events.some((e) => e.type === "done" || e.type === "error");
|
||||
expect(hasTerminal).toBe(true);
|
||||
},
|
||||
30_000,
|
||||
45_000,
|
||||
);
|
||||
});
|
||||
|
||||
@@ -362,18 +362,16 @@ describe("convertTools", () => {
|
||||
expect(result).toHaveLength(1);
|
||||
expect(result[0]).toMatchObject({
|
||||
type: "function",
|
||||
function: {
|
||||
name: "exec",
|
||||
description: "Run a command",
|
||||
parameters: { type: "object", properties: { cmd: { type: "string" } } },
|
||||
},
|
||||
name: "exec",
|
||||
description: "Run a command",
|
||||
parameters: { type: "object", properties: { cmd: { type: "string" } } },
|
||||
});
|
||||
});
|
||||
|
||||
it("handles tools without description", () => {
|
||||
const tools = [{ name: "ping", description: "", parameters: {} }];
|
||||
const result = convertTools(tools as Parameters<typeof convertTools>[0]);
|
||||
expect(result[0]?.function?.name).toBe("ping");
|
||||
expect(result[0]?.name).toBe("ping");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -449,6 +447,35 @@ describe("convertMessagesToInputItems", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("preserves assistant phase from textSignature metadata without local phase field", () => {
|
||||
const msg = {
|
||||
role: "assistant" as const,
|
||||
content: [
|
||||
{
|
||||
type: "text" as const,
|
||||
text: "Working on it.",
|
||||
textSignature: JSON.stringify({ v: 1, id: "msg_sig", phase: "commentary" }),
|
||||
},
|
||||
],
|
||||
stopReason: "stop",
|
||||
api: "openai-responses",
|
||||
provider: "openai",
|
||||
model: "gpt-5.2",
|
||||
usage: {},
|
||||
timestamp: 0,
|
||||
};
|
||||
const items = convertMessagesToInputItems([msg] as Parameters<
|
||||
typeof convertMessagesToInputItems
|
||||
>[0]);
|
||||
expect(items).toHaveLength(1);
|
||||
expect(items[0]).toMatchObject({
|
||||
type: "message",
|
||||
role: "assistant",
|
||||
content: "Working on it.",
|
||||
phase: "commentary",
|
||||
});
|
||||
});
|
||||
|
||||
it("converts a tool result message", () => {
|
||||
const items = convertMessagesToInputItems([toolResultMsg("call_1", "file.txt")] as Parameters<
|
||||
typeof convertMessagesToInputItems
|
||||
@@ -555,6 +582,34 @@ describe("convertMessagesToInputItems", () => {
|
||||
expect((items[0] as { content?: unknown }).content).toBe("Here is my answer.");
|
||||
});
|
||||
|
||||
it("replays reasoning blocks from thinking signatures", () => {
|
||||
const msg = {
|
||||
role: "assistant" as const,
|
||||
content: [
|
||||
{
|
||||
type: "thinking" as const,
|
||||
thinking: "internal reasoning...",
|
||||
thinkingSignature: JSON.stringify({
|
||||
type: "reasoning",
|
||||
id: "rs_test",
|
||||
summary: [],
|
||||
}),
|
||||
},
|
||||
{ type: "text" as const, text: "Here is my answer." },
|
||||
],
|
||||
stopReason: "stop",
|
||||
api: "openai-responses",
|
||||
provider: "openai",
|
||||
model: "gpt-5.2",
|
||||
usage: {},
|
||||
timestamp: 0,
|
||||
};
|
||||
const items = convertMessagesToInputItems([msg] as Parameters<
|
||||
typeof convertMessagesToInputItems
|
||||
>[0]);
|
||||
expect(items.map((item) => item.type)).toEqual(["reasoning", "message"]);
|
||||
});
|
||||
|
||||
it("returns empty array for empty messages", () => {
|
||||
expect(convertMessagesToInputItems([])).toEqual([]);
|
||||
});
|
||||
|
||||
@@ -102,6 +102,7 @@ export function hasWsSession(sessionId: string): boolean {
|
||||
|
||||
type AnyMessage = Message & { role: string; content: unknown };
|
||||
type AssistantMessageWithPhase = AssistantMessage & { phase?: OpenAIResponsesAssistantPhase };
|
||||
type ReplayModelInfo = { input?: ReadonlyArray<string> };
|
||||
|
||||
function toNonEmptyString(value: unknown): string | null {
|
||||
if (typeof value !== "string") {
|
||||
@@ -115,6 +116,46 @@ function normalizeAssistantPhase(value: unknown): OpenAIResponsesAssistantPhase
|
||||
return value === "commentary" || value === "final_answer" ? value : undefined;
|
||||
}
|
||||
|
||||
function encodeAssistantTextSignature(params: {
|
||||
id: string;
|
||||
phase?: OpenAIResponsesAssistantPhase;
|
||||
}): string {
|
||||
return JSON.stringify({
|
||||
v: 1,
|
||||
id: params.id,
|
||||
...(params.phase ? { phase: params.phase } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
function parseAssistantTextSignature(
|
||||
value: unknown,
|
||||
): { id: string; phase?: OpenAIResponsesAssistantPhase } | null {
|
||||
if (typeof value !== "string" || value.trim().length === 0) {
|
||||
return null;
|
||||
}
|
||||
if (!value.startsWith("{")) {
|
||||
return { id: value };
|
||||
}
|
||||
try {
|
||||
const parsed = JSON.parse(value) as { v?: unknown; id?: unknown; phase?: unknown };
|
||||
if (parsed.v !== 1 || typeof parsed.id !== "string") {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
id: parsed.id,
|
||||
...(normalizeAssistantPhase(parsed.phase)
|
||||
? { phase: normalizeAssistantPhase(parsed.phase) }
|
||||
: {}),
|
||||
};
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function supportsImageInput(modelOverride?: ReplayModelInfo): boolean {
|
||||
return !Array.isArray(modelOverride?.input) || modelOverride.input.includes("image");
|
||||
}
|
||||
|
||||
/** Convert pi-ai content (string | ContentPart[]) to plain text. */
|
||||
function contentToText(content: unknown): string {
|
||||
if (typeof content === "string") {
|
||||
@@ -123,30 +164,50 @@ function contentToText(content: unknown): string {
|
||||
if (!Array.isArray(content)) {
|
||||
return "";
|
||||
}
|
||||
return (content as Array<{ type?: string; text?: string }>)
|
||||
.filter((p) => p.type === "text" && typeof p.text === "string")
|
||||
.map((p) => p.text as string)
|
||||
return content
|
||||
.filter(
|
||||
(part): part is { type?: string; text?: string } => Boolean(part) && typeof part === "object",
|
||||
)
|
||||
.filter(
|
||||
(part) =>
|
||||
(part.type === "text" || part.type === "input_text" || part.type === "output_text") &&
|
||||
typeof part.text === "string",
|
||||
)
|
||||
.map((part) => part.text as string)
|
||||
.join("");
|
||||
}
|
||||
|
||||
/** Convert pi-ai content to OpenAI ContentPart[]. */
|
||||
function contentToOpenAIParts(content: unknown): ContentPart[] {
|
||||
function contentToOpenAIParts(content: unknown, modelOverride?: ReplayModelInfo): ContentPart[] {
|
||||
if (typeof content === "string") {
|
||||
return content ? [{ type: "input_text", text: content }] : [];
|
||||
}
|
||||
if (!Array.isArray(content)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const includeImages = supportsImageInput(modelOverride);
|
||||
const parts: ContentPart[] = [];
|
||||
for (const part of content as Array<{
|
||||
type?: string;
|
||||
text?: string;
|
||||
data?: string;
|
||||
mimeType?: string;
|
||||
source?: unknown;
|
||||
}>) {
|
||||
if (part.type === "text" && typeof part.text === "string") {
|
||||
if (
|
||||
(part.type === "text" || part.type === "input_text" || part.type === "output_text") &&
|
||||
typeof part.text === "string"
|
||||
) {
|
||||
parts.push({ type: "input_text", text: part.text });
|
||||
} else if (part.type === "image" && typeof part.data === "string") {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!includeImages) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (part.type === "image" && typeof part.data === "string") {
|
||||
parts.push({
|
||||
type: "input_image",
|
||||
source: {
|
||||
@@ -155,11 +216,60 @@ function contentToOpenAIParts(content: unknown): ContentPart[] {
|
||||
data: part.data,
|
||||
},
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
if (
|
||||
part.type === "input_image" &&
|
||||
part.source &&
|
||||
typeof part.source === "object" &&
|
||||
typeof (part.source as { type?: unknown }).type === "string"
|
||||
) {
|
||||
parts.push({
|
||||
type: "input_image",
|
||||
source: part.source as
|
||||
| { type: "url"; url: string }
|
||||
| { type: "base64"; media_type: string; data: string },
|
||||
});
|
||||
}
|
||||
}
|
||||
return parts;
|
||||
}
|
||||
|
||||
function parseReasoningItem(value: unknown): Extract<InputItem, { type: "reasoning" }> | null {
|
||||
if (!value || typeof value !== "object") {
|
||||
return null;
|
||||
}
|
||||
const record = value as {
|
||||
type?: unknown;
|
||||
content?: unknown;
|
||||
encrypted_content?: unknown;
|
||||
summary?: unknown;
|
||||
};
|
||||
if (record.type !== "reasoning") {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
type: "reasoning",
|
||||
...(typeof record.content === "string" ? { content: record.content } : {}),
|
||||
...(typeof record.encrypted_content === "string"
|
||||
? { encrypted_content: record.encrypted_content }
|
||||
: {}),
|
||||
...(typeof record.summary === "string" ? { summary: record.summary } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function parseThinkingSignature(value: unknown): Extract<InputItem, { type: "reasoning" }> | null {
|
||||
if (typeof value !== "string" || value.trim().length === 0) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
return parseReasoningItem(JSON.parse(value));
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/** Convert pi-ai tool array to OpenAI FunctionToolDefinition[]. */
|
||||
export function convertTools(tools: Context["tools"]): FunctionToolDefinition[] {
|
||||
if (!tools || tools.length === 0) {
|
||||
@@ -167,11 +277,9 @@ export function convertTools(tools: Context["tools"]): FunctionToolDefinition[]
|
||||
}
|
||||
return tools.map((tool) => ({
|
||||
type: "function" as const,
|
||||
function: {
|
||||
name: tool.name,
|
||||
description: typeof tool.description === "string" ? tool.description : undefined,
|
||||
parameters: (tool.parameters ?? {}) as Record<string, unknown>,
|
||||
},
|
||||
name: tool.name,
|
||||
description: typeof tool.description === "string" ? tool.description : undefined,
|
||||
parameters: (tool.parameters ?? {}) as Record<string, unknown>,
|
||||
}));
|
||||
}
|
||||
|
||||
@@ -179,14 +287,24 @@ export function convertTools(tools: Context["tools"]): FunctionToolDefinition[]
|
||||
* Convert the full pi-ai message history to an OpenAI `input` array.
|
||||
* Handles user messages, assistant text+tool-call messages, and tool results.
|
||||
*/
|
||||
export function convertMessagesToInputItems(messages: Message[]): InputItem[] {
|
||||
export function convertMessagesToInputItems(
|
||||
messages: Message[],
|
||||
modelOverride?: ReplayModelInfo,
|
||||
): InputItem[] {
|
||||
const items: InputItem[] = [];
|
||||
|
||||
for (const msg of messages) {
|
||||
const m = msg as AnyMessage;
|
||||
const m = msg as AnyMessage & {
|
||||
phase?: unknown;
|
||||
toolCallId?: unknown;
|
||||
toolUseId?: unknown;
|
||||
};
|
||||
|
||||
if (m.role === "user") {
|
||||
const parts = contentToOpenAIParts(m.content);
|
||||
const parts = contentToOpenAIParts(m.content, modelOverride);
|
||||
if (parts.length === 0) {
|
||||
continue;
|
||||
}
|
||||
items.push({
|
||||
type: "message",
|
||||
role: "user",
|
||||
@@ -199,92 +317,117 @@ export function convertMessagesToInputItems(messages: Message[]): InputItem[] {
|
||||
}
|
||||
|
||||
if (m.role === "assistant") {
|
||||
const assistantPhase = normalizeAssistantPhase((m as { phase?: unknown }).phase);
|
||||
const content = m.content;
|
||||
let assistantPhase = normalizeAssistantPhase(m.phase);
|
||||
if (Array.isArray(content)) {
|
||||
// Collect text blocks and tool calls separately
|
||||
const textParts: string[] = [];
|
||||
for (const block of content as Array<{
|
||||
type?: string;
|
||||
text?: string;
|
||||
id?: string;
|
||||
name?: string;
|
||||
arguments?: Record<string, unknown>;
|
||||
thinking?: string;
|
||||
}>) {
|
||||
if (block.type === "text" && typeof block.text === "string") {
|
||||
textParts.push(block.text);
|
||||
} else if (block.type === "thinking" && typeof block.thinking === "string") {
|
||||
// Skip thinking blocks — not sent back to the model
|
||||
} else if (block.type === "toolCall") {
|
||||
// Push accumulated text first
|
||||
if (textParts.length > 0) {
|
||||
items.push({
|
||||
type: "message",
|
||||
role: "assistant",
|
||||
content: textParts.join(""),
|
||||
...(assistantPhase ? { phase: assistantPhase } : {}),
|
||||
});
|
||||
textParts.length = 0;
|
||||
}
|
||||
const callId = toNonEmptyString(block.id);
|
||||
const toolName = toNonEmptyString(block.name);
|
||||
if (!callId || !toolName) {
|
||||
continue;
|
||||
}
|
||||
// Push function_call item
|
||||
items.push({
|
||||
type: "function_call",
|
||||
call_id: callId,
|
||||
name: toolName,
|
||||
arguments:
|
||||
typeof block.arguments === "string"
|
||||
? block.arguments
|
||||
: JSON.stringify(block.arguments ?? {}),
|
||||
});
|
||||
const pushAssistantText = () => {
|
||||
if (textParts.length === 0) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (textParts.length > 0) {
|
||||
items.push({
|
||||
type: "message",
|
||||
role: "assistant",
|
||||
content: textParts.join(""),
|
||||
...(assistantPhase ? { phase: assistantPhase } : {}),
|
||||
});
|
||||
}
|
||||
} else {
|
||||
const text = contentToText(m.content);
|
||||
if (text) {
|
||||
textParts.length = 0;
|
||||
};
|
||||
|
||||
for (const block of content as Array<{
|
||||
type?: string;
|
||||
text?: string;
|
||||
textSignature?: unknown;
|
||||
id?: unknown;
|
||||
name?: unknown;
|
||||
arguments?: unknown;
|
||||
thinkingSignature?: unknown;
|
||||
}>) {
|
||||
if (block.type === "text" && typeof block.text === "string") {
|
||||
const parsedSignature = parseAssistantTextSignature(block.textSignature);
|
||||
if (!assistantPhase) {
|
||||
assistantPhase = parsedSignature?.phase;
|
||||
}
|
||||
textParts.push(block.text);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (block.type === "thinking") {
|
||||
pushAssistantText();
|
||||
const reasoningItem = parseThinkingSignature(block.thinkingSignature);
|
||||
if (reasoningItem) {
|
||||
items.push(reasoningItem);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (block.type !== "toolCall") {
|
||||
continue;
|
||||
}
|
||||
|
||||
pushAssistantText();
|
||||
const callIdRaw = toNonEmptyString(block.id);
|
||||
const toolName = toNonEmptyString(block.name);
|
||||
if (!callIdRaw || !toolName) {
|
||||
continue;
|
||||
}
|
||||
const [callId, itemId] = callIdRaw.split("|", 2);
|
||||
items.push({
|
||||
type: "message",
|
||||
role: "assistant",
|
||||
content: text,
|
||||
...(assistantPhase ? { phase: assistantPhase } : {}),
|
||||
type: "function_call",
|
||||
...(itemId ? { id: itemId } : {}),
|
||||
call_id: callId,
|
||||
name: toolName,
|
||||
arguments:
|
||||
typeof block.arguments === "string"
|
||||
? block.arguments
|
||||
: JSON.stringify(block.arguments ?? {}),
|
||||
});
|
||||
}
|
||||
|
||||
pushAssistantText();
|
||||
continue;
|
||||
}
|
||||
|
||||
const text = contentToText(content);
|
||||
if (!text) {
|
||||
continue;
|
||||
}
|
||||
items.push({
|
||||
type: "message",
|
||||
role: "assistant",
|
||||
content: text,
|
||||
...(assistantPhase ? { phase: assistantPhase } : {}),
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
if (m.role === "toolResult") {
|
||||
const tr = m as unknown as {
|
||||
toolCallId?: string;
|
||||
toolUseId?: string;
|
||||
content: unknown;
|
||||
isError: boolean;
|
||||
};
|
||||
const callId = toNonEmptyString(tr.toolCallId) ?? toNonEmptyString(tr.toolUseId);
|
||||
if (!callId) {
|
||||
continue;
|
||||
}
|
||||
const outputText = contentToText(tr.content);
|
||||
items.push({
|
||||
type: "function_call_output",
|
||||
call_id: callId,
|
||||
output: outputText,
|
||||
});
|
||||
if (m.role !== "toolResult") {
|
||||
continue;
|
||||
}
|
||||
|
||||
const toolCallId = toNonEmptyString(m.toolCallId) ?? toNonEmptyString(m.toolUseId);
|
||||
if (!toolCallId) {
|
||||
continue;
|
||||
}
|
||||
const [callId] = toolCallId.split("|", 2);
|
||||
const parts = Array.isArray(m.content) ? contentToOpenAIParts(m.content, modelOverride) : [];
|
||||
const textOutput = contentToText(m.content);
|
||||
const imageParts = parts.filter((part) => part.type === "input_image");
|
||||
items.push({
|
||||
type: "function_call_output",
|
||||
call_id: callId,
|
||||
output: textOutput || (imageParts.length > 0 ? "(see attached image)" : ""),
|
||||
});
|
||||
if (imageParts.length > 0) {
|
||||
items.push({
|
||||
type: "message",
|
||||
role: "user",
|
||||
content: [
|
||||
{ type: "input_text", text: "Attached image(s) from tool result:" },
|
||||
...imageParts,
|
||||
],
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return items;
|
||||
@@ -309,7 +452,14 @@ export function buildAssistantMessageFromResponse(
|
||||
}
|
||||
for (const part of item.content ?? []) {
|
||||
if (part.type === "output_text" && part.text) {
|
||||
content.push({ type: "text", text: part.text });
|
||||
content.push({
|
||||
type: "text",
|
||||
text: part.text,
|
||||
textSignature: encodeAssistantTextSignature({
|
||||
id: item.id,
|
||||
...(itemPhase ? { phase: itemPhase } : {}),
|
||||
}),
|
||||
});
|
||||
}
|
||||
}
|
||||
} else if (item.type === "function_call") {
|
||||
@@ -523,6 +673,7 @@ export function createOpenAIWebSocketStreamFn(
|
||||
|
||||
if (resolveWsWarmup(options) && !session.warmUpAttempted) {
|
||||
session.warmUpAttempted = true;
|
||||
let warmupFailed = false;
|
||||
try {
|
||||
await runWarmUp({
|
||||
manager: session.manager,
|
||||
@@ -536,10 +687,33 @@ export function createOpenAIWebSocketStreamFn(
|
||||
if (signal?.aborted) {
|
||||
throw warmErr instanceof Error ? warmErr : new Error(String(warmErr));
|
||||
}
|
||||
warmupFailed = true;
|
||||
log.warn(
|
||||
`[ws-stream] warm-up failed for session=${sessionId}; continuing without warm-up. error=${String(warmErr)}`,
|
||||
);
|
||||
}
|
||||
if (warmupFailed && !session.manager.isConnected()) {
|
||||
try {
|
||||
session.manager.close();
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
try {
|
||||
await session.manager.connect(apiKey);
|
||||
session.everConnected = true;
|
||||
log.debug(`[ws-stream] reconnected after warm-up failure for session=${sessionId}`);
|
||||
} catch (reconnectErr) {
|
||||
session.broken = true;
|
||||
wsRegistry.delete(sessionId);
|
||||
if (transport === "websocket") {
|
||||
throw reconnectErr instanceof Error ? reconnectErr : new Error(String(reconnectErr));
|
||||
}
|
||||
log.warn(
|
||||
`[ws-stream] reconnect after warm-up failed for session=${sessionId}; falling back to HTTP. error=${String(reconnectErr)}`,
|
||||
);
|
||||
return fallbackToHttp(model, context, options, eventStream, opts.signal);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── 3. Compute incremental vs full input ─────────────────────────────
|
||||
@@ -556,16 +730,16 @@ export function createOpenAIWebSocketStreamFn(
|
||||
log.debug(
|
||||
`[ws-stream] session=${sessionId}: no new tool results found; sending full context`,
|
||||
);
|
||||
inputItems = buildFullInput(context);
|
||||
inputItems = buildFullInput(context, model);
|
||||
} else {
|
||||
inputItems = convertMessagesToInputItems(toolResults);
|
||||
inputItems = convertMessagesToInputItems(toolResults, model);
|
||||
}
|
||||
log.debug(
|
||||
`[ws-stream] session=${sessionId}: incremental send (${inputItems.length} tool results) previous_response_id=${prevResponseId}`,
|
||||
);
|
||||
} else {
|
||||
// First turn: send full context
|
||||
inputItems = buildFullInput(context);
|
||||
inputItems = buildFullInput(context, model);
|
||||
log.debug(
|
||||
`[ws-stream] session=${sessionId}: full context send (${inputItems.length} items)`,
|
||||
);
|
||||
@@ -752,8 +926,8 @@ export function createOpenAIWebSocketStreamFn(
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
/** Build full input items from context (system prompt is passed via `instructions` field). */
|
||||
function buildFullInput(context: Context): InputItem[] {
|
||||
return convertMessagesToInputItems(context.messages);
|
||||
function buildFullInput(context: Context, model: ReplayModelInfo): InputItem[] {
|
||||
return convertMessagesToInputItems(context.messages, model);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1081,7 +1081,7 @@ describe("applyExtraParamsToAgent", () => {
|
||||
|
||||
expect(calls).toHaveLength(1);
|
||||
expect(calls[0]?.transport).toBe("auto");
|
||||
expect(calls[0]?.openaiWsWarmup).toBe(true);
|
||||
expect(calls[0]?.openaiWsWarmup).toBe(false);
|
||||
});
|
||||
|
||||
it("lets runtime options override OpenAI default transport", () => {
|
||||
|
||||
@@ -250,7 +250,7 @@ export function createOpenAIDefaultTransportWrapper(baseStreamFn: StreamFn | und
|
||||
const mergedOptions = {
|
||||
...options,
|
||||
transport: options?.transport ?? "auto",
|
||||
openaiWsWarmup: typedOptions?.openaiWsWarmup ?? true,
|
||||
openaiWsWarmup: typedOptions?.openaiWsWarmup ?? false,
|
||||
} as SimpleStreamOptions;
|
||||
return underlying(model, context, mergedOptions);
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user