diff --git a/CHANGELOG.md b/CHANGELOG.md index b49a609278a..280115a9a6b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ Docs: https://docs.openclaw.ai - Gateway/status: probe local TLS gateways over `wss://`, forward the local cert fingerprint for self-signed loopback probes, and warn when the local TLS runtime cannot load the configured cert. (#61935) Thanks @ThanhNguyxn07. - Agents/history: keep truly legacy unsigned replay text unphased when mixed with phased OpenAI WS assistant blocks, while still inheriting message phase for id-only replay signatures. (#61529) Thanks @100yenadmin. - Discord/thread titles: stop forcing a hardcoded temperature for generated auto-thread names so Codex-backed thread title generation works on `openai-codex/*` models again. (#59525) +- Providers/OpenAI: keep WebSocket text buffered until a real assistant phase arrives, even when text deltas land before a phaseless `output_item.added` announcement. (#61954) Thanks @100yenadmin. ## 2026.4.5 diff --git a/src/agents/openai-ws-stream.test.ts b/src/agents/openai-ws-stream.test.ts index 049cc6d6ad4..54ea1b8b279 100644 --- a/src/agents/openai-ws-stream.test.ts +++ b/src/agents/openai-ws-stream.test.ts @@ -2107,6 +2107,11 @@ describe("createOpenAIWebSocketStreamFn", () => { content_index: 0, delta: "...", }); + + await new Promise((r) => setImmediate(r)); + const prematureDeltas = events.filter((event) => event.type === "text_delta"); + expect(prematureDeltas).toHaveLength(0); + manager.simulateEvent({ type: "response.output_item.done", output_index: 0, @@ -2125,7 +2130,7 @@ describe("createOpenAIWebSocketStreamFn", () => { object: "response", created_at: Date.now(), status: "completed", - model: "gpt-5.2", + model: "gpt-5.4", output: [ { type: "message", @@ -2142,28 +2147,192 @@ describe("createOpenAIWebSocketStreamFn", () => { await done; const deltas = events.filter((event) => event.type === "text_delta"); - expect(deltas).toHaveLength(2); - expect(deltas[0]).toMatchObject({ delta: "Working" }); - expect(deltas[0]?.partial?.phase).toBeUndefined(); + expect(deltas).toHaveLength(1); + expect(deltas[0]).toMatchObject({ delta: "Working..." }); + expect(deltas[0]?.partial?.phase).toBe("commentary"); expect(deltas[0]?.partial?.content).toEqual([ - { - type: "text", - text: "Working", - textSignature: JSON.stringify({ v: 1, id: "item_late_undefined" }), - }, - ]); - expect(deltas[1]).toMatchObject({ delta: "..." }); - // The "..." delta arrives before output_item.done carries the phase, - // so the partial still reflects the undefined phase from output_item.added. - expect(deltas[1]?.partial?.phase).toBeUndefined(); - expect(deltas[1]?.partial?.content).toEqual([ { type: "text", text: "Working...", - textSignature: JSON.stringify({ v: 1, id: "item_late_undefined" }), + textSignature: JSON.stringify({ + v: 1, + id: "item_late_undefined", + phase: "commentary", + }), }, ]); }); + it("buffers text when output_item.added arrives without phase metadata", async () => { + const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-phaseless-gate"); + const stream = streamFn( + modelStub as Parameters[0], + contextStub as Parameters[1], + ); + + const events: Array<{ + type?: string; + delta?: string; + partial?: { phase?: string; content?: unknown[] }; + }> = []; + const done = (async () => { + for await (const ev of await resolveStream(stream)) { + events.push(ev as (typeof events)[number]); + } + })(); + + await new Promise((r) => setImmediate(r)); + const manager = MockManager.lastInstance!; + + // output_item.added WITHOUT phase — simulates phaseless announcement + manager.simulateEvent({ + type: "response.output_item.added", + output_index: 0, + item: { + type: "message", + id: "item_phaseless", + role: "assistant", + content: [], + }, + }); + + // Text delta arrives while phase is still unknown + manager.simulateEvent({ + type: "response.output_text.delta", + item_id: "item_phaseless", + output_index: 0, + content_index: 0, + delta: "Leaked?", + }); + + // Yield to let any would-be emissions propagate + await new Promise((r) => setImmediate(r)); + const prematureDeltas = events.filter((e) => e.type === "text_delta"); + expect(prematureDeltas).toHaveLength(0); + + // output_item.done delivers the actual phase — should flush buffered text + manager.simulateEvent({ + type: "response.output_item.done", + output_index: 0, + item: { + type: "message", + id: "item_phaseless", + role: "assistant", + phase: "commentary", + content: [{ type: "output_text", text: "Leaked?" }], + }, + }); + + manager.simulateEvent({ + type: "response.completed", + response: { + id: "resp_phaseless_gate", + object: "response", + created_at: Date.now(), + status: "completed", + model: "gpt-5.4", + output: [ + { + type: "message", + id: "item_phaseless", + role: "assistant", + phase: "commentary", + content: [{ type: "output_text", text: "Leaked?" }], + }, + ], + usage: { input_tokens: 10, output_tokens: 5, total_tokens: 15 }, + }, + }); + + await done; + + const deltas = events.filter((e) => e.type === "text_delta"); + expect(deltas).toHaveLength(1); + expect(deltas[0]).toMatchObject({ delta: "Leaked?" }); + expect(deltas[0]?.partial?.phase).toBe("commentary"); + }); + + it("buffers output_text.done until item phase is defined", async () => { + const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-phaseless-done-gate"); + const stream = streamFn( + modelStub as Parameters[0], + contextStub as Parameters[1], + ); + + const events: Array<{ + type?: string; + delta?: string; + partial?: { phase?: string; content?: unknown[] }; + }> = []; + const done = (async () => { + for await (const ev of await resolveStream(stream)) { + events.push(ev as (typeof events)[number]); + } + })(); + + await new Promise((r) => setImmediate(r)); + const manager = MockManager.lastInstance!; + + manager.simulateEvent({ + type: "response.output_item.added", + output_index: 0, + item: { + type: "message", + id: "item_phaseless_done", + role: "assistant", + content: [], + }, + }); + manager.simulateEvent({ + type: "response.output_text.done", + item_id: "item_phaseless_done", + output_index: 0, + content_index: 0, + text: "Buffered final text", + }); + + await new Promise((r) => setImmediate(r)); + const prematureDeltas = events.filter((event) => event.type === "text_delta"); + expect(prematureDeltas).toHaveLength(0); + + manager.simulateEvent({ + type: "response.output_item.done", + output_index: 0, + item: { + type: "message", + id: "item_phaseless_done", + role: "assistant", + phase: "commentary", + content: [{ type: "output_text", text: "Buffered final text" }], + }, + }); + manager.simulateEvent({ + type: "response.completed", + response: { + id: "resp_phaseless_done_gate", + object: "response", + created_at: Date.now(), + status: "completed", + model: "gpt-5.4", + output: [ + { + type: "message", + id: "item_phaseless_done", + role: "assistant", + phase: "commentary", + content: [{ type: "output_text", text: "Buffered final text" }], + }, + ], + usage: { input_tokens: 10, output_tokens: 5, total_tokens: 15 }, + }, + }); + + await done; + + const deltas = events.filter((event) => event.type === "text_delta"); + expect(deltas).toHaveLength(1); + expect(deltas[0]).toMatchObject({ delta: "Buffered final text" }); + expect(deltas[0]?.partial?.phase).toBe("commentary"); + }); it("falls back to HTTP when WebSocket connect fails (session pre-broken via flag)", async () => { // Set the class-level flag BEFORE calling streamFn so the new instance diff --git a/src/agents/openai-ws-stream.ts b/src/agents/openai-ws-stream.ts index 94e0afc1529..890f453c358 100644 --- a/src/agents/openai-ws-stream.ts +++ b/src/agents/openai-ws-stream.ts @@ -1042,13 +1042,15 @@ export function createOpenAIWebSocketStreamFn( ? normalizeAssistantPhase((event.item as { phase?: unknown }).phase) : undefined; outputItemPhaseById.set(event.item.id, itemPhase); - for (const key of outputTextByPart.keys()) { - if (key.startsWith(`${event.item.id}:`)) { - const [, contentIndexText] = key.split(":"); - emitBufferedTextDelta({ - itemId: event.item.id, - contentIndex: Number.parseInt(contentIndexText ?? "0", 10) || 0, - }); + if (itemPhase !== undefined) { + for (const key of outputTextByPart.keys()) { + if (key.startsWith(`${event.item.id}:`)) { + const [, contentIndexText] = key.split(":"); + emitBufferedTextDelta({ + itemId: event.item.id, + contentIndex: Number.parseInt(contentIndexText ?? "0", 10) || 0, + }); + } } } } @@ -1059,7 +1061,7 @@ export function createOpenAIWebSocketStreamFn( const key = getOutputTextKey(event.item_id, event.content_index); const nextText = `${outputTextByPart.get(key) ?? ""}${event.delta}`; outputTextByPart.set(key, nextText); - if (outputItemPhaseById.has(event.item_id)) { + if (outputItemPhaseById.get(event.item_id) !== undefined) { emitBufferedTextDelta({ itemId: event.item_id, contentIndex: event.content_index, @@ -1073,7 +1075,7 @@ export function createOpenAIWebSocketStreamFn( if (event.text && event.text !== outputTextByPart.get(key)) { outputTextByPart.set(key, event.text); } - if (outputItemPhaseById.has(event.item_id)) { + if (outputItemPhaseById.get(event.item_id) !== undefined) { emitBufferedTextDelta({ itemId: event.item_id, contentIndex: event.content_index, diff --git a/src/commands/agent/session-store.test.ts b/src/commands/agent/session-store.test.ts index 1a3a99f9f48..fe974a8f614 100644 --- a/src/commands/agent/session-store.test.ts +++ b/src/commands/agent/session-store.test.ts @@ -3,10 +3,10 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { describe, expect, it } from "vitest"; +import { updateSessionStoreAfterAgentRun } from "../../agents/command/session-store.js"; import { resolveSession } from "../../agents/command/session.js"; import type { SessionEntry } from "../../config/sessions.js"; import { loadSessionStore } from "../../config/sessions.js"; -import { updateSessionStoreAfterAgentRun } from "./session-store.js"; function acpMeta() { return {