From 86bac4ee2a68c4ad30f1d05dcb6fb72fbc9b4d11 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 27 Mar 2026 02:03:39 +0000 Subject: [PATCH] refactor: split openai websocket message conversion --- src/agents/openai-ws-message-conversion.ts | 551 +++++++++++++++++++ src/agents/openai-ws-stream.ts | 584 +-------------------- 2 files changed, 563 insertions(+), 572 deletions(-) create mode 100644 src/agents/openai-ws-message-conversion.ts diff --git a/src/agents/openai-ws-message-conversion.ts b/src/agents/openai-ws-message-conversion.ts new file mode 100644 index 00000000000..9f3e8f2a0b3 --- /dev/null +++ b/src/agents/openai-ws-message-conversion.ts @@ -0,0 +1,551 @@ +import { randomUUID } from "node:crypto"; +import type { Context, Message, StopReason } from "@mariozechner/pi-ai"; +import type { AssistantMessage } from "@mariozechner/pi-ai"; +import type { + ContentPart, + FunctionToolDefinition, + InputItem, + OpenAIResponsesAssistantPhase, + ResponseObject, +} from "./openai-ws-connection.js"; +import { buildAssistantMessage, buildUsageWithNoCost } from "./stream-message-shared.js"; + +type AnyMessage = Message & { role: string; content: unknown }; +type AssistantMessageWithPhase = AssistantMessage & { phase?: OpenAIResponsesAssistantPhase }; +export type ReplayModelInfo = { input?: ReadonlyArray }; +type ReplayableReasoningItem = Extract; +type ReplayableReasoningSignature = { + type: "reasoning" | `reasoning.${string}`; + id?: string; +}; +type ToolCallReplayId = { callId: string; itemId?: string }; +export type PlannedTurnInput = { + inputItems: InputItem[]; + previousResponseId?: string; + mode: "incremental_tool_results" | "full_context_initial" | "full_context_restart"; +}; + +function toNonEmptyString(value: unknown): string | null { + if (typeof value !== "string") { + return null; + } + const trimmed = value.trim(); + return trimmed.length > 0 ? trimmed : null; +} + +function normalizeAssistantPhase(value: unknown): OpenAIResponsesAssistantPhase | undefined { + return value === "commentary" || value === "final_answer" ? value : undefined; +} + +function encodeAssistantTextSignature(params: { + id: string; + phase?: OpenAIResponsesAssistantPhase; +}): string { + return JSON.stringify({ + v: 1, + id: params.id, + ...(params.phase ? { phase: params.phase } : {}), + }); +} + +function parseAssistantTextSignature( + value: unknown, +): { id: string; phase?: OpenAIResponsesAssistantPhase } | null { + if (typeof value !== "string" || value.trim().length === 0) { + return null; + } + if (!value.startsWith("{")) { + return { id: value }; + } + try { + const parsed = JSON.parse(value) as { v?: unknown; id?: unknown; phase?: unknown }; + if (parsed.v !== 1 || typeof parsed.id !== "string") { + return null; + } + return { + id: parsed.id, + ...(normalizeAssistantPhase(parsed.phase) + ? { phase: normalizeAssistantPhase(parsed.phase) } + : {}), + }; + } catch { + return null; + } +} + +function supportsImageInput(modelOverride?: ReplayModelInfo): boolean { + return !Array.isArray(modelOverride?.input) || modelOverride.input.includes("image"); +} + +function contentToText(content: unknown): string { + if (typeof content === "string") { + return content; + } + if (!Array.isArray(content)) { + return ""; + } + return content + .filter( + (part): part is { type?: string; text?: string } => Boolean(part) && typeof part === "object", + ) + .filter( + (part) => + (part.type === "text" || part.type === "input_text" || part.type === "output_text") && + typeof part.text === "string", + ) + .map((part) => part.text as string) + .join(""); +} + +function contentToOpenAIParts(content: unknown, modelOverride?: ReplayModelInfo): ContentPart[] { + if (typeof content === "string") { + return content ? [{ type: "input_text", text: content }] : []; + } + if (!Array.isArray(content)) { + return []; + } + + const includeImages = supportsImageInput(modelOverride); + const parts: ContentPart[] = []; + for (const part of content as Array<{ + type?: string; + text?: string; + data?: string; + mimeType?: string; + source?: unknown; + }>) { + if ( + (part.type === "text" || part.type === "input_text" || part.type === "output_text") && + typeof part.text === "string" + ) { + parts.push({ type: "input_text", text: part.text }); + continue; + } + + if (!includeImages) { + continue; + } + + if (part.type === "image" && typeof part.data === "string") { + parts.push({ + type: "input_image", + source: { + type: "base64", + media_type: part.mimeType ?? "image/jpeg", + data: part.data, + }, + }); + continue; + } + + if ( + part.type === "input_image" && + part.source && + typeof part.source === "object" && + typeof (part.source as { type?: unknown }).type === "string" + ) { + parts.push({ + type: "input_image", + source: part.source as + | { type: "url"; url: string } + | { type: "base64"; media_type: string; data: string }, + }); + } + } + return parts; +} + +function isReplayableReasoningType(value: unknown): value is "reasoning" | `reasoning.${string}` { + return typeof value === "string" && (value === "reasoning" || value.startsWith("reasoning.")); +} + +function toReplayableReasoningId(value: unknown): string | null { + const id = toNonEmptyString(value); + return id && id.startsWith("rs_") ? id : null; +} + +function toReasoningSignature(value: unknown): ReplayableReasoningSignature | null { + if (!value || typeof value !== "object") { + return null; + } + const record = value as { type?: unknown; id?: unknown }; + if (!isReplayableReasoningType(record.type)) { + return null; + } + const reasoningId = toReplayableReasoningId(record.id); + return { + type: record.type, + ...(reasoningId ? { id: reasoningId } : {}), + }; +} + +function encodeThinkingSignature(signature: ReplayableReasoningSignature): string { + return JSON.stringify(signature); +} + +function parseReasoningItem(value: unknown): ReplayableReasoningItem | null { + if (!value || typeof value !== "object") { + return null; + } + const record = value as { + type?: unknown; + id?: unknown; + content?: unknown; + encrypted_content?: unknown; + summary?: unknown; + }; + if (!isReplayableReasoningType(record.type)) { + return null; + } + const reasoningId = toReplayableReasoningId(record.id); + return { + type: "reasoning", + ...(reasoningId ? { id: reasoningId } : {}), + ...(typeof record.content === "string" ? { content: record.content } : {}), + ...(typeof record.encrypted_content === "string" + ? { encrypted_content: record.encrypted_content } + : {}), + ...(typeof record.summary === "string" ? { summary: record.summary } : {}), + }; +} + +function parseThinkingSignature(value: unknown): ReplayableReasoningItem | null { + if (typeof value !== "string" || value.trim().length === 0) { + return null; + } + try { + const signature = toReasoningSignature(JSON.parse(value)); + return signature ? parseReasoningItem(signature) : null; + } catch { + return null; + } +} + +function encodeToolCallReplayId(params: ToolCallReplayId): string { + return params.itemId ? `${params.callId}|${params.itemId}` : params.callId; +} + +function decodeToolCallReplayId(value: unknown): ToolCallReplayId | null { + const raw = toNonEmptyString(value); + if (!raw) { + return null; + } + const [callId, itemId] = raw.split("|", 2); + return { + callId, + ...(itemId ? { itemId } : {}), + }; +} + +function extractReasoningSummaryText(value: unknown): string { + if (typeof value === "string") { + return value.trim(); + } + if (!Array.isArray(value)) { + return ""; + } + return value + .map((item) => { + if (typeof item === "string") { + return item.trim(); + } + if (!item || typeof item !== "object") { + return ""; + } + const record = item as { text?: unknown }; + return typeof record.text === "string" ? record.text.trim() : ""; + }) + .filter(Boolean) + .join("\n") + .trim(); +} + +function extractResponseReasoningText(item: unknown): string { + if (!item || typeof item !== "object") { + return ""; + } + const record = item as { summary?: unknown; content?: unknown }; + const summaryText = extractReasoningSummaryText(record.summary); + if (summaryText) { + return summaryText; + } + return typeof record.content === "string" ? record.content.trim() : ""; +} + +export function convertTools(tools: Context["tools"]): FunctionToolDefinition[] { + if (!tools || tools.length === 0) { + return []; + } + return tools.map((tool) => ({ + type: "function" as const, + name: tool.name, + description: typeof tool.description === "string" ? tool.description : undefined, + parameters: (tool.parameters ?? {}) as Record, + })); +} + +export function planTurnInput(params: { + context: Context; + model: ReplayModelInfo; + previousResponseId: string | null; + lastContextLength: number; +}): PlannedTurnInput { + if (params.previousResponseId && params.lastContextLength > 0) { + const newMessages = params.context.messages.slice(params.lastContextLength); + const toolResults = newMessages.filter((message) => (message as AnyMessage).role === "toolResult"); + if (toolResults.length > 0) { + return { + mode: "incremental_tool_results", + previousResponseId: params.previousResponseId, + inputItems: convertMessagesToInputItems(toolResults, params.model), + }; + } + return { + mode: "full_context_restart", + inputItems: convertMessagesToInputItems(params.context.messages, params.model), + }; + } + + return { + mode: "full_context_initial", + inputItems: convertMessagesToInputItems(params.context.messages, params.model), + }; +} + +export function convertMessagesToInputItems( + messages: Message[], + modelOverride?: ReplayModelInfo, +): InputItem[] { + const items: InputItem[] = []; + + for (const msg of messages) { + const m = msg as AnyMessage & { + phase?: unknown; + toolCallId?: unknown; + toolUseId?: unknown; + }; + + if (m.role === "user") { + const parts = contentToOpenAIParts(m.content, modelOverride); + if (parts.length === 0) { + continue; + } + items.push({ + type: "message", + role: "user", + content: + parts.length === 1 && parts[0]?.type === "input_text" + ? (parts[0] as { type: "input_text"; text: string }).text + : parts, + }); + continue; + } + + if (m.role === "assistant") { + const content = m.content; + let assistantPhase = normalizeAssistantPhase(m.phase); + if (Array.isArray(content)) { + const textParts: string[] = []; + const pushAssistantText = () => { + if (textParts.length === 0) { + return; + } + items.push({ + type: "message", + role: "assistant", + content: textParts.join(""), + ...(assistantPhase ? { phase: assistantPhase } : {}), + }); + textParts.length = 0; + }; + + for (const block of content as Array<{ + type?: string; + text?: string; + textSignature?: unknown; + id?: unknown; + name?: unknown; + arguments?: unknown; + thinkingSignature?: unknown; + }>) { + if (block.type === "text" && typeof block.text === "string") { + const parsedSignature = parseAssistantTextSignature(block.textSignature); + if (!assistantPhase) { + assistantPhase = parsedSignature?.phase; + } + textParts.push(block.text); + continue; + } + + if (block.type === "thinking") { + pushAssistantText(); + const reasoningItem = parseThinkingSignature(block.thinkingSignature); + if (reasoningItem) { + items.push(reasoningItem); + } + continue; + } + + if (block.type !== "toolCall") { + continue; + } + + pushAssistantText(); + const replayId = decodeToolCallReplayId(block.id); + const toolName = toNonEmptyString(block.name); + if (!replayId || !toolName) { + continue; + } + items.push({ + type: "function_call", + ...(replayId.itemId ? { id: replayId.itemId } : {}), + call_id: replayId.callId, + name: toolName, + arguments: + typeof block.arguments === "string" + ? block.arguments + : JSON.stringify(block.arguments ?? {}), + }); + } + + pushAssistantText(); + continue; + } + + const text = contentToText(content); + if (!text) { + continue; + } + items.push({ + type: "message", + role: "assistant", + content: text, + ...(assistantPhase ? { phase: assistantPhase } : {}), + }); + continue; + } + + if (m.role !== "toolResult") { + continue; + } + + const toolCallId = toNonEmptyString(m.toolCallId) ?? toNonEmptyString(m.toolUseId); + if (!toolCallId) { + continue; + } + const replayId = decodeToolCallReplayId(toolCallId); + if (!replayId) { + continue; + } + const parts = Array.isArray(m.content) ? contentToOpenAIParts(m.content, modelOverride) : []; + const textOutput = contentToText(m.content); + const imageParts = parts.filter((part) => part.type === "input_image"); + items.push({ + type: "function_call_output", + call_id: replayId.callId, + output: textOutput || (imageParts.length > 0 ? "(see attached image)" : ""), + }); + if (imageParts.length > 0) { + items.push({ + type: "message", + role: "user", + content: [ + { type: "input_text", text: "Attached image(s) from tool result:" }, + ...imageParts, + ], + }); + } + } + + return items; +} + +export function buildAssistantMessageFromResponse( + response: ResponseObject, + modelInfo: { api: string; provider: string; id: string }, +): AssistantMessage { + const content: AssistantMessage["content"] = []; + let assistantPhase: OpenAIResponsesAssistantPhase | undefined; + + for (const item of response.output ?? []) { + if (item.type === "message") { + const itemPhase = normalizeAssistantPhase(item.phase); + if (itemPhase) { + assistantPhase = itemPhase; + } + for (const part of item.content ?? []) { + if (part.type === "output_text" && part.text) { + content.push({ + type: "text", + text: part.text, + textSignature: encodeAssistantTextSignature({ + id: item.id, + ...(itemPhase ? { phase: itemPhase } : {}), + }), + }); + } + } + } else if (item.type === "function_call") { + const toolName = toNonEmptyString(item.name); + if (!toolName) { + continue; + } + const callId = toNonEmptyString(item.call_id); + const itemId = toNonEmptyString(item.id); + content.push({ + type: "toolCall", + id: encodeToolCallReplayId({ + callId: callId ?? `call_${randomUUID()}`, + itemId: itemId ?? undefined, + }), + name: toolName, + arguments: (() => { + try { + return JSON.parse(item.arguments) as Record; + } catch { + return {} as Record; + } + })(), + }); + } else { + if (!isReplayableReasoningType(item.type)) { + continue; + } + const reasoning = extractResponseReasoningText(item); + if (!reasoning) { + continue; + } + const reasoningId = toReplayableReasoningId(item.id); + content.push({ + type: "thinking", + thinking: reasoning, + ...(reasoningId + ? { + thinkingSignature: encodeThinkingSignature({ + id: reasoningId, + type: item.type, + }), + } + : {}), + } as AssistantMessage["content"][number]); + } + } + + const hasToolCalls = content.some((part) => part.type === "toolCall"); + const stopReason: StopReason = hasToolCalls ? "toolUse" : "stop"; + + const message = buildAssistantMessage({ + model: modelInfo, + content, + stopReason, + usage: buildUsageWithNoCost({ + input: response.usage?.input_tokens ?? 0, + output: response.usage?.output_tokens ?? 0, + totalTokens: response.usage?.total_tokens ?? 0, + }), + }); + + return assistantPhase + ? ({ ...message, phase: assistantPhase } as AssistantMessageWithPhase) + : message; +} diff --git a/src/agents/openai-ws-stream.ts b/src/agents/openai-ws-stream.ts index 290eb4d98de..53d8271c9d9 100644 --- a/src/agents/openai-ws-stream.ts +++ b/src/agents/openai-ws-stream.ts @@ -21,31 +21,28 @@ * @see src/agents/openai-ws-connection.ts for the connection manager */ -import { randomUUID } from "node:crypto"; import type { StreamFn } from "@mariozechner/pi-agent-core"; import * as piAi from "@mariozechner/pi-ai"; import type { AssistantMessage, AssistantMessageEvent, AssistantMessageEventStream, - Context, - Message, StopReason, } from "@mariozechner/pi-ai"; import { OpenAIWebSocketManager, - type ContentPart, type FunctionToolDefinition, - type InputItem, - type OpenAIResponsesAssistantPhase, type OpenAIWebSocketManagerOptions, - type ResponseObject, } from "./openai-ws-connection.js"; +import { + buildAssistantMessageFromResponse, + convertMessagesToInputItems, + convertTools, + planTurnInput, +} from "./openai-ws-message-conversion.js"; import { log } from "./pi-embedded-runner/logger.js"; import { - buildAssistantMessage, buildAssistantMessageWithZeroUsage, - buildUsageWithNoCost, buildStreamErrorAssistantMessage, } from "./stream-message-shared.js"; @@ -188,560 +185,12 @@ export function hasWsSession(sessionId: string): boolean { return !!(s && !s.broken && s.manager.isConnected()); } -// ───────────────────────────────────────────────────────────────────────────── -// Message format converters -// ───────────────────────────────────────────────────────────────────────────── - -type AnyMessage = Message & { role: string; content: unknown }; -type AssistantMessageWithPhase = AssistantMessage & { phase?: OpenAIResponsesAssistantPhase }; -type ReplayModelInfo = { input?: ReadonlyArray }; -type ReplayableReasoningItem = Extract; -type ReplayableReasoningSignature = { - type: "reasoning" | `reasoning.${string}`; - id?: string; -}; -type ToolCallReplayId = { callId: string; itemId?: string }; -type PlannedTurnInput = { - inputItems: InputItem[]; - previousResponseId?: string; - mode: "incremental_tool_results" | "full_context_initial" | "full_context_restart"; -}; - -function toNonEmptyString(value: unknown): string | null { - if (typeof value !== "string") { - return null; - } - const trimmed = value.trim(); - return trimmed.length > 0 ? trimmed : null; -} - -function normalizeAssistantPhase(value: unknown): OpenAIResponsesAssistantPhase | undefined { - return value === "commentary" || value === "final_answer" ? value : undefined; -} - -function encodeAssistantTextSignature(params: { - id: string; - phase?: OpenAIResponsesAssistantPhase; -}): string { - return JSON.stringify({ - v: 1, - id: params.id, - ...(params.phase ? { phase: params.phase } : {}), - }); -} - -function parseAssistantTextSignature( - value: unknown, -): { id: string; phase?: OpenAIResponsesAssistantPhase } | null { - if (typeof value !== "string" || value.trim().length === 0) { - return null; - } - if (!value.startsWith("{")) { - return { id: value }; - } - try { - const parsed = JSON.parse(value) as { v?: unknown; id?: unknown; phase?: unknown }; - if (parsed.v !== 1 || typeof parsed.id !== "string") { - return null; - } - return { - id: parsed.id, - ...(normalizeAssistantPhase(parsed.phase) - ? { phase: normalizeAssistantPhase(parsed.phase) } - : {}), - }; - } catch { - return null; - } -} - -function supportsImageInput(modelOverride?: ReplayModelInfo): boolean { - return !Array.isArray(modelOverride?.input) || modelOverride.input.includes("image"); -} - -/** Convert pi-ai content (string | ContentPart[]) to plain text. */ -function contentToText(content: unknown): string { - if (typeof content === "string") { - return content; - } - if (!Array.isArray(content)) { - return ""; - } - return content - .filter( - (part): part is { type?: string; text?: string } => Boolean(part) && typeof part === "object", - ) - .filter( - (part) => - (part.type === "text" || part.type === "input_text" || part.type === "output_text") && - typeof part.text === "string", - ) - .map((part) => part.text as string) - .join(""); -} - -/** Convert pi-ai content to OpenAI ContentPart[]. */ -function contentToOpenAIParts(content: unknown, modelOverride?: ReplayModelInfo): ContentPart[] { - if (typeof content === "string") { - return content ? [{ type: "input_text", text: content }] : []; - } - if (!Array.isArray(content)) { - return []; - } - - const includeImages = supportsImageInput(modelOverride); - const parts: ContentPart[] = []; - for (const part of content as Array<{ - type?: string; - text?: string; - data?: string; - mimeType?: string; - source?: unknown; - }>) { - if ( - (part.type === "text" || part.type === "input_text" || part.type === "output_text") && - typeof part.text === "string" - ) { - parts.push({ type: "input_text", text: part.text }); - continue; - } - - if (!includeImages) { - continue; - } - - if (part.type === "image" && typeof part.data === "string") { - parts.push({ - type: "input_image", - source: { - type: "base64", - media_type: part.mimeType ?? "image/jpeg", - data: part.data, - }, - }); - continue; - } - - if ( - part.type === "input_image" && - part.source && - typeof part.source === "object" && - typeof (part.source as { type?: unknown }).type === "string" - ) { - parts.push({ - type: "input_image", - source: part.source as - | { type: "url"; url: string } - | { type: "base64"; media_type: string; data: string }, - }); - } - } - return parts; -} - -function isReplayableReasoningType(value: unknown): value is "reasoning" | `reasoning.${string}` { - return typeof value === "string" && (value === "reasoning" || value.startsWith("reasoning.")); -} - -function toReplayableReasoningId(value: unknown): string | null { - const id = toNonEmptyString(value); - return id && id.startsWith("rs_") ? id : null; -} - -function toReasoningSignature(value: unknown): ReplayableReasoningSignature | null { - if (!value || typeof value !== "object") { - return null; - } - const record = value as { type?: unknown; id?: unknown }; - if (!isReplayableReasoningType(record.type)) { - return null; - } - const reasoningId = toReplayableReasoningId(record.id); - return { - type: record.type, - ...(reasoningId ? { id: reasoningId } : {}), - }; -} - -function encodeThinkingSignature(signature: ReplayableReasoningSignature): string { - return JSON.stringify(signature); -} - -function parseReasoningItem(value: unknown): ReplayableReasoningItem | null { - if (!value || typeof value !== "object") { - return null; - } - const record = value as { - type?: unknown; - id?: unknown; - content?: unknown; - encrypted_content?: unknown; - summary?: unknown; - }; - if (!isReplayableReasoningType(record.type)) { - return null; - } - const reasoningId = toReplayableReasoningId(record.id); - return { - type: "reasoning", - ...(reasoningId ? { id: reasoningId } : {}), - ...(typeof record.content === "string" ? { content: record.content } : {}), - ...(typeof record.encrypted_content === "string" - ? { encrypted_content: record.encrypted_content } - : {}), - ...(typeof record.summary === "string" ? { summary: record.summary } : {}), - }; -} - -function parseThinkingSignature(value: unknown): ReplayableReasoningItem | null { - if (typeof value !== "string" || value.trim().length === 0) { - return null; - } - try { - const signature = toReasoningSignature(JSON.parse(value)); - return signature ? parseReasoningItem(signature) : null; - } catch { - return null; - } -} - -function encodeToolCallReplayId(params: ToolCallReplayId): string { - return params.itemId ? `${params.callId}|${params.itemId}` : params.callId; -} - -function decodeToolCallReplayId(value: unknown): ToolCallReplayId | null { - const raw = toNonEmptyString(value); - if (!raw) { - return null; - } - const [callId, itemId] = raw.split("|", 2); - return { - callId, - ...(itemId ? { itemId } : {}), - }; -} - -function extractReasoningSummaryText(value: unknown): string { - if (typeof value === "string") { - return value.trim(); - } - if (!Array.isArray(value)) { - return ""; - } - return value - .map((item) => { - if (typeof item === "string") { - return item.trim(); - } - if (!item || typeof item !== "object") { - return ""; - } - const record = item as { text?: unknown }; - return typeof record.text === "string" ? record.text.trim() : ""; - }) - .filter(Boolean) - .join("\n") - .trim(); -} - -function extractResponseReasoningText(item: unknown): string { - if (!item || typeof item !== "object") { - return ""; - } - const record = item as { summary?: unknown; content?: unknown }; - const summaryText = extractReasoningSummaryText(record.summary); - if (summaryText) { - return summaryText; - } - return typeof record.content === "string" ? record.content.trim() : ""; -} - -function planTurnInput(params: { - context: Context; - model: ReplayModelInfo; - previousResponseId: string | null; - lastContextLength: number; -}): PlannedTurnInput { - if (params.previousResponseId && params.lastContextLength > 0) { - const newMessages = params.context.messages.slice(params.lastContextLength); - const toolResults = newMessages.filter((m) => (m as AnyMessage).role === "toolResult"); - if (toolResults.length > 0) { - return { - mode: "incremental_tool_results", - previousResponseId: params.previousResponseId, - inputItems: convertMessagesToInputItems(toolResults, params.model), - }; - } - return { - mode: "full_context_restart", - inputItems: buildFullInput(params.context, params.model), - }; - } - - return { - mode: "full_context_initial", - inputItems: buildFullInput(params.context, params.model), - }; -} - -/** Convert pi-ai tool array to OpenAI FunctionToolDefinition[]. */ -export function convertTools(tools: Context["tools"]): FunctionToolDefinition[] { - if (!tools || tools.length === 0) { - return []; - } - return tools.map((tool) => ({ - type: "function" as const, - name: tool.name, - description: typeof tool.description === "string" ? tool.description : undefined, - parameters: (tool.parameters ?? {}) as Record, - })); -} - -/** - * Convert the full pi-ai message history to an OpenAI `input` array. - * Handles user messages, assistant text+tool-call messages, and tool results. - */ -export function convertMessagesToInputItems( - messages: Message[], - modelOverride?: ReplayModelInfo, -): InputItem[] { - const items: InputItem[] = []; - - for (const msg of messages) { - const m = msg as AnyMessage & { - phase?: unknown; - toolCallId?: unknown; - toolUseId?: unknown; - }; - - if (m.role === "user") { - const parts = contentToOpenAIParts(m.content, modelOverride); - if (parts.length === 0) { - continue; - } - items.push({ - type: "message", - role: "user", - content: - parts.length === 1 && parts[0]?.type === "input_text" - ? (parts[0] as { type: "input_text"; text: string }).text - : parts, - }); - continue; - } - - if (m.role === "assistant") { - const content = m.content; - let assistantPhase = normalizeAssistantPhase(m.phase); - if (Array.isArray(content)) { - const textParts: string[] = []; - const pushAssistantText = () => { - if (textParts.length === 0) { - return; - } - items.push({ - type: "message", - role: "assistant", - content: textParts.join(""), - ...(assistantPhase ? { phase: assistantPhase } : {}), - }); - textParts.length = 0; - }; - - for (const block of content as Array<{ - type?: string; - text?: string; - textSignature?: unknown; - id?: unknown; - name?: unknown; - arguments?: unknown; - thinkingSignature?: unknown; - }>) { - if (block.type === "text" && typeof block.text === "string") { - const parsedSignature = parseAssistantTextSignature(block.textSignature); - if (!assistantPhase) { - assistantPhase = parsedSignature?.phase; - } - textParts.push(block.text); - continue; - } - - if (block.type === "thinking") { - pushAssistantText(); - const reasoningItem = parseThinkingSignature(block.thinkingSignature); - if (reasoningItem) { - items.push(reasoningItem); - } - continue; - } - - if (block.type !== "toolCall") { - continue; - } - - pushAssistantText(); - const replayId = decodeToolCallReplayId(block.id); - const toolName = toNonEmptyString(block.name); - if (!replayId || !toolName) { - continue; - } - items.push({ - type: "function_call", - ...(replayId.itemId ? { id: replayId.itemId } : {}), - call_id: replayId.callId, - name: toolName, - arguments: - typeof block.arguments === "string" - ? block.arguments - : JSON.stringify(block.arguments ?? {}), - }); - } - - pushAssistantText(); - continue; - } - - const text = contentToText(content); - if (!text) { - continue; - } - items.push({ - type: "message", - role: "assistant", - content: text, - ...(assistantPhase ? { phase: assistantPhase } : {}), - }); - continue; - } - - if (m.role !== "toolResult") { - continue; - } - - const toolCallId = toNonEmptyString(m.toolCallId) ?? toNonEmptyString(m.toolUseId); - if (!toolCallId) { - continue; - } - const replayId = decodeToolCallReplayId(toolCallId); - if (!replayId) { - continue; - } - const parts = Array.isArray(m.content) ? contentToOpenAIParts(m.content, modelOverride) : []; - const textOutput = contentToText(m.content); - const imageParts = parts.filter((part) => part.type === "input_image"); - items.push({ - type: "function_call_output", - call_id: replayId.callId, - output: textOutput || (imageParts.length > 0 ? "(see attached image)" : ""), - }); - if (imageParts.length > 0) { - items.push({ - type: "message", - role: "user", - content: [ - { type: "input_text", text: "Attached image(s) from tool result:" }, - ...imageParts, - ], - }); - } - } - - return items; -} - -// ───────────────────────────────────────────────────────────────────────────── -// Response object → AssistantMessage -// ───────────────────────────────────────────────────────────────────────────── - -export function buildAssistantMessageFromResponse( - response: ResponseObject, - modelInfo: { api: string; provider: string; id: string }, -): AssistantMessage { - const content: AssistantMessage["content"] = []; - let assistantPhase: OpenAIResponsesAssistantPhase | undefined; - - for (const item of response.output ?? []) { - if (item.type === "message") { - const itemPhase = normalizeAssistantPhase(item.phase); - if (itemPhase) { - assistantPhase = itemPhase; - } - for (const part of item.content ?? []) { - if (part.type === "output_text" && part.text) { - content.push({ - type: "text", - text: part.text, - textSignature: encodeAssistantTextSignature({ - id: item.id, - ...(itemPhase ? { phase: itemPhase } : {}), - }), - }); - } - } - } else if (item.type === "function_call") { - const toolName = toNonEmptyString(item.name); - if (!toolName) { - continue; - } - const callId = toNonEmptyString(item.call_id); - const itemId = toNonEmptyString(item.id); - content.push({ - type: "toolCall", - id: encodeToolCallReplayId({ - callId: callId ?? `call_${randomUUID()}`, - itemId: itemId ?? undefined, - }), - name: toolName, - arguments: (() => { - try { - return JSON.parse(item.arguments) as Record; - } catch { - return {} as Record; - } - })(), - }); - } else { - if (!isReplayableReasoningType(item.type)) { - continue; - } - const reasoning = extractResponseReasoningText(item); - if (!reasoning) { - continue; - } - const reasoningId = toReplayableReasoningId(item.id); - content.push({ - type: "thinking", - thinking: reasoning, - ...(reasoningId - ? { - thinkingSignature: encodeThinkingSignature({ - id: reasoningId, - type: item.type, - }), - } - : {}), - } as AssistantMessage["content"][number]); - } - } - - const hasToolCalls = content.some((c) => c.type === "toolCall"); - const stopReason: StopReason = hasToolCalls ? "toolUse" : "stop"; - - const message = buildAssistantMessage({ - model: modelInfo, - content, - stopReason, - usage: buildUsageWithNoCost({ - input: response.usage?.input_tokens ?? 0, - output: response.usage?.output_tokens ?? 0, - totalTokens: response.usage?.total_tokens ?? 0, - }), - }); - - return assistantPhase - ? ({ ...message, phase: assistantPhase } as AssistantMessageWithPhase) - : message; -} +export { + buildAssistantMessageFromResponse, + convertMessagesToInputItems, + convertTools, + planTurnInput, +} from "./openai-ws-message-conversion.js"; // ───────────────────────────────────────────────────────────────────────────── // StreamFn factory @@ -1159,15 +608,6 @@ export function createOpenAIWebSocketStreamFn( }; } -// ───────────────────────────────────────────────────────────────────────────── -// Helpers -// ───────────────────────────────────────────────────────────────────────────── - -/** Build full input items from context (system prompt is passed via `instructions` field). */ -function buildFullInput(context: Context, model: ReplayModelInfo): InputItem[] { - return convertMessagesToInputItems(context.messages, model); -} - /** * Fall back to HTTP (`streamSimple`) and pipe events into the existing stream. * This is called when the WebSocket is broken or unavailable.