fix(agents): keep phaseless OpenAI WS text buffered until phase resolves (#61968)

* fix(agents): gate WS text delta emission on valid phase value, not map key existence

When output_item.added arrives without phase metadata, outputItemPhaseById
stores undefined. The previous .has() check returned true for undefined
values, bypassing the buffering gate and leaking commentary as unphased
visible content.

Fix: change .has() to .get() !== undefined on both delta and done handlers.

Fixes #61477

* docs: note WS phase buffering fix (#61954) (thanks @100yenadmin)

* test(agents): cover phaseless WS output_text.done buffering (#61954)

* test(commands): fix session-store import path for tsgo (#61968)

---------

Co-authored-by: Eva <eva@100yen.org>
This commit is contained in:
Peter Steinberger
2026-04-06 16:35:16 +01:00
committed by GitHub
parent 56136c83b7
commit 0b9993df95
4 changed files with 198 additions and 26 deletions

View File

@@ -38,6 +38,7 @@ Docs: https://docs.openclaw.ai
- Gateway/status: probe local TLS gateways over `wss://`, forward the local cert fingerprint for self-signed loopback probes, and warn when the local TLS runtime cannot load the configured cert. (#61935) Thanks @ThanhNguyxn07.
- Agents/history: keep truly legacy unsigned replay text unphased when mixed with phased OpenAI WS assistant blocks, while still inheriting message phase for id-only replay signatures. (#61529) Thanks @100yenadmin.
- Discord/thread titles: stop forcing a hardcoded temperature for generated auto-thread names so Codex-backed thread title generation works on `openai-codex/*` models again. (#59525)
- Providers/OpenAI: keep WebSocket text buffered until a real assistant phase arrives, even when text deltas land before a phaseless `output_item.added` announcement. (#61954) Thanks @100yenadmin.
## 2026.4.5

View File

@@ -2107,6 +2107,11 @@ describe("createOpenAIWebSocketStreamFn", () => {
content_index: 0,
delta: "...",
});
await new Promise((r) => setImmediate(r));
const prematureDeltas = events.filter((event) => event.type === "text_delta");
expect(prematureDeltas).toHaveLength(0);
manager.simulateEvent({
type: "response.output_item.done",
output_index: 0,
@@ -2125,7 +2130,7 @@ describe("createOpenAIWebSocketStreamFn", () => {
object: "response",
created_at: Date.now(),
status: "completed",
model: "gpt-5.2",
model: "gpt-5.4",
output: [
{
type: "message",
@@ -2142,28 +2147,192 @@ describe("createOpenAIWebSocketStreamFn", () => {
await done;
const deltas = events.filter((event) => event.type === "text_delta");
expect(deltas).toHaveLength(2);
expect(deltas[0]).toMatchObject({ delta: "Working" });
expect(deltas[0]?.partial?.phase).toBeUndefined();
expect(deltas).toHaveLength(1);
expect(deltas[0]).toMatchObject({ delta: "Working..." });
expect(deltas[0]?.partial?.phase).toBe("commentary");
expect(deltas[0]?.partial?.content).toEqual([
{
type: "text",
text: "Working",
textSignature: JSON.stringify({ v: 1, id: "item_late_undefined" }),
},
]);
expect(deltas[1]).toMatchObject({ delta: "..." });
// The "..." delta arrives before output_item.done carries the phase,
// so the partial still reflects the undefined phase from output_item.added.
expect(deltas[1]?.partial?.phase).toBeUndefined();
expect(deltas[1]?.partial?.content).toEqual([
{
type: "text",
text: "Working...",
textSignature: JSON.stringify({ v: 1, id: "item_late_undefined" }),
textSignature: JSON.stringify({
v: 1,
id: "item_late_undefined",
phase: "commentary",
}),
},
]);
});
it("buffers text when output_item.added arrives without phase metadata", async () => {
const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-phaseless-gate");
const stream = streamFn(
modelStub as Parameters<typeof streamFn>[0],
contextStub as Parameters<typeof streamFn>[1],
);
const events: Array<{
type?: string;
delta?: string;
partial?: { phase?: string; content?: unknown[] };
}> = [];
const done = (async () => {
for await (const ev of await resolveStream(stream)) {
events.push(ev as (typeof events)[number]);
}
})();
await new Promise((r) => setImmediate(r));
const manager = MockManager.lastInstance!;
// output_item.added WITHOUT phase — simulates phaseless announcement
manager.simulateEvent({
type: "response.output_item.added",
output_index: 0,
item: {
type: "message",
id: "item_phaseless",
role: "assistant",
content: [],
},
});
// Text delta arrives while phase is still unknown
manager.simulateEvent({
type: "response.output_text.delta",
item_id: "item_phaseless",
output_index: 0,
content_index: 0,
delta: "Leaked?",
});
// Yield to let any would-be emissions propagate
await new Promise((r) => setImmediate(r));
const prematureDeltas = events.filter((e) => e.type === "text_delta");
expect(prematureDeltas).toHaveLength(0);
// output_item.done delivers the actual phase — should flush buffered text
manager.simulateEvent({
type: "response.output_item.done",
output_index: 0,
item: {
type: "message",
id: "item_phaseless",
role: "assistant",
phase: "commentary",
content: [{ type: "output_text", text: "Leaked?" }],
},
});
manager.simulateEvent({
type: "response.completed",
response: {
id: "resp_phaseless_gate",
object: "response",
created_at: Date.now(),
status: "completed",
model: "gpt-5.4",
output: [
{
type: "message",
id: "item_phaseless",
role: "assistant",
phase: "commentary",
content: [{ type: "output_text", text: "Leaked?" }],
},
],
usage: { input_tokens: 10, output_tokens: 5, total_tokens: 15 },
},
});
await done;
const deltas = events.filter((e) => e.type === "text_delta");
expect(deltas).toHaveLength(1);
expect(deltas[0]).toMatchObject({ delta: "Leaked?" });
expect(deltas[0]?.partial?.phase).toBe("commentary");
});
it("buffers output_text.done until item phase is defined", async () => {
const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-phaseless-done-gate");
const stream = streamFn(
modelStub as Parameters<typeof streamFn>[0],
contextStub as Parameters<typeof streamFn>[1],
);
const events: Array<{
type?: string;
delta?: string;
partial?: { phase?: string; content?: unknown[] };
}> = [];
const done = (async () => {
for await (const ev of await resolveStream(stream)) {
events.push(ev as (typeof events)[number]);
}
})();
await new Promise((r) => setImmediate(r));
const manager = MockManager.lastInstance!;
manager.simulateEvent({
type: "response.output_item.added",
output_index: 0,
item: {
type: "message",
id: "item_phaseless_done",
role: "assistant",
content: [],
},
});
manager.simulateEvent({
type: "response.output_text.done",
item_id: "item_phaseless_done",
output_index: 0,
content_index: 0,
text: "Buffered final text",
});
await new Promise((r) => setImmediate(r));
const prematureDeltas = events.filter((event) => event.type === "text_delta");
expect(prematureDeltas).toHaveLength(0);
manager.simulateEvent({
type: "response.output_item.done",
output_index: 0,
item: {
type: "message",
id: "item_phaseless_done",
role: "assistant",
phase: "commentary",
content: [{ type: "output_text", text: "Buffered final text" }],
},
});
manager.simulateEvent({
type: "response.completed",
response: {
id: "resp_phaseless_done_gate",
object: "response",
created_at: Date.now(),
status: "completed",
model: "gpt-5.4",
output: [
{
type: "message",
id: "item_phaseless_done",
role: "assistant",
phase: "commentary",
content: [{ type: "output_text", text: "Buffered final text" }],
},
],
usage: { input_tokens: 10, output_tokens: 5, total_tokens: 15 },
},
});
await done;
const deltas = events.filter((event) => event.type === "text_delta");
expect(deltas).toHaveLength(1);
expect(deltas[0]).toMatchObject({ delta: "Buffered final text" });
expect(deltas[0]?.partial?.phase).toBe("commentary");
});
it("falls back to HTTP when WebSocket connect fails (session pre-broken via flag)", async () => {
// Set the class-level flag BEFORE calling streamFn so the new instance

View File

@@ -1042,13 +1042,15 @@ export function createOpenAIWebSocketStreamFn(
? normalizeAssistantPhase((event.item as { phase?: unknown }).phase)
: undefined;
outputItemPhaseById.set(event.item.id, itemPhase);
for (const key of outputTextByPart.keys()) {
if (key.startsWith(`${event.item.id}:`)) {
const [, contentIndexText] = key.split(":");
emitBufferedTextDelta({
itemId: event.item.id,
contentIndex: Number.parseInt(contentIndexText ?? "0", 10) || 0,
});
if (itemPhase !== undefined) {
for (const key of outputTextByPart.keys()) {
if (key.startsWith(`${event.item.id}:`)) {
const [, contentIndexText] = key.split(":");
emitBufferedTextDelta({
itemId: event.item.id,
contentIndex: Number.parseInt(contentIndexText ?? "0", 10) || 0,
});
}
}
}
}
@@ -1059,7 +1061,7 @@ export function createOpenAIWebSocketStreamFn(
const key = getOutputTextKey(event.item_id, event.content_index);
const nextText = `${outputTextByPart.get(key) ?? ""}${event.delta}`;
outputTextByPart.set(key, nextText);
if (outputItemPhaseById.has(event.item_id)) {
if (outputItemPhaseById.get(event.item_id) !== undefined) {
emitBufferedTextDelta({
itemId: event.item_id,
contentIndex: event.content_index,
@@ -1073,7 +1075,7 @@ export function createOpenAIWebSocketStreamFn(
if (event.text && event.text !== outputTextByPart.get(key)) {
outputTextByPart.set(key, event.text);
}
if (outputItemPhaseById.has(event.item_id)) {
if (outputItemPhaseById.get(event.item_id) !== undefined) {
emitBufferedTextDelta({
itemId: event.item_id,
contentIndex: event.content_index,

View File

@@ -3,10 +3,10 @@ import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import { updateSessionStoreAfterAgentRun } from "../../agents/command/session-store.js";
import { resolveSession } from "../../agents/command/session.js";
import type { SessionEntry } from "../../config/sessions.js";
import { loadSessionStore } from "../../config/sessions.js";
import { updateSessionStoreAfterAgentRun } from "./session-store.js";
function acpMeta() {
return {