fix(sdk): stabilize run event chat projections (#74750) thanks @bitloi

Co-authored-by: bitloi <raphaelaloi.eth@gmail.com>
This commit is contained in:
bitloi
2026-04-30 00:54:52 -03:00
committed by GitHub
parent 5f13af6b68
commit e6abd9e3d8
3 changed files with 317 additions and 9 deletions

View File

@@ -51,6 +51,7 @@ Docs: https://docs.openclaw.ai
- Gateway/startup: bound local discovery advertisement during startup, so a stuck discovery plugin can no longer keep the Gateway from reaching ready. Fixes #73865; refs #74630 and #74633. Thanks @lpendeavors, @moltar-bot, and @Saboor711.
- Gateway/models: serve the last successful model catalog while stale reloads refresh in the background, so Gateway control-plane and OpenAI-compatible requests no longer block behind model-provider rediscovery after model config changes. Refs #74135, #74630, and #74633. Thanks @DerFlash, @moltar-bot, and @Saboor711.
- CLI/status: resolve read-only channel setup runtime fallback from the packaged OpenClaw dist root, so `status --all`, `status --deep`, channel, and doctor paths do not crash when an external channel plugin needs setup metadata. Fixes #74693. Thanks @giangthb.
- SDK/events: keep per-run SDK event streams from surfacing duplicate raw chat projection frames, while normalizing chat-only projection frames and preserving raw access through `rawEvents`. Refs #74704. Thanks @BunsDev.
- Google Meet: block managed Chrome intro/test speech until browser health proves the participant is in-call, and expose `speechReady` diagnostics so login, admission, permission, and audio-bridge blockers no longer look like successful speech. Refs #72478. Thanks @DougButdorf.
- Slack/commands: keep native command argument menus on select controls for encoded choice values up to Slack's option limit and truncate fallback button labels to Slack's button-text limit, so long valid choices no longer render invalid Slack blocks. Thanks @slackapi.
- Agents/Codex: flush accepted debounced steering messages before normal app-server turn cleanup, so inbound follow-ups acknowledged as queued are not dropped when the turn completes before the debounce fires. Thanks @vincentkoc.

View File

@@ -163,6 +163,76 @@ function unsupportedGatewayApi(api: string): never {
throw new Error(`${api} is not supported by the current OpenClaw Gateway yet`);
}
type ChatProjectionState = "delta" | "final";
type ChatProjection = {
state: ChatProjectionState;
payload: Record<string, unknown>;
};
function asRecord(value: unknown): Record<string, unknown> {
return typeof value === "object" && value !== null ? (value as Record<string, unknown>) : {};
}
function readChatProjection(event: OpenClawEvent): ChatProjection | undefined {
const raw = event.raw;
if (event.type !== "raw" || raw?.event !== "chat") {
return undefined;
}
const payload = asRecord(raw.payload);
return payload.state === "delta" || payload.state === "final"
? { state: payload.state, payload }
: undefined;
}
function readChatProjectionText(payload: Record<string, unknown>): string | undefined {
const message = asRecord(payload.message);
const content = message.content;
if (typeof content === "string") {
return content;
}
if (!Array.isArray(content)) {
return undefined;
}
const text = content
.map((part) => {
const record = asRecord(part);
return record.type === "text" && typeof record.text === "string" ? record.text : "";
})
.join("");
return text.length > 0 ? text : undefined;
}
function isAssistantRunEvent(event: OpenClawEvent): boolean {
return event.type === "assistant.delta" || event.type === "assistant.message";
}
function isTerminalRunEvent(event: OpenClawEvent): boolean {
return (
event.type === "run.completed" ||
event.type === "run.failed" ||
event.type === "run.cancelled" ||
event.type === "run.timed_out"
);
}
function normalizeChatProjectionEvent(
event: OpenClawEvent,
projection: ChatProjection,
): OpenClawEvent {
const text = readChatProjectionText(projection.payload);
return {
...event,
type: projection.state === "delta" ? "assistant.delta" : "run.completed",
data:
projection.state === "delta"
? text !== undefined
? { delta: text }
: event.data
: { phase: "end", ...(text !== undefined ? { outputText: text } : {}) },
};
}
export class OpenClaw {
readonly agents: AgentsNamespace;
readonly sessions: SessionsNamespace;
@@ -262,23 +332,48 @@ export class OpenClaw {
filter?: (event: OpenClawEvent) => boolean,
): AsyncIterable<OpenClawEvent> {
await this.connect();
const matches = (event: OpenClawEvent) => {
if (event.runId !== runId) {
return false;
const replayEvents = this.replaySnapshot(runId);
let hasCanonicalAssistantRunEvent = replayEvents.some(isAssistantRunEvent);
let hasTerminalRunEvent = replayEvents.some(isTerminalRunEvent);
const toRunStreamEvent = (event: OpenClawEvent): OpenClawEvent | undefined => {
const chatProjection = readChatProjection(event);
if (chatProjection?.state === "delta") {
if (hasCanonicalAssistantRunEvent) {
return undefined;
}
return normalizeChatProjectionEvent(event, chatProjection);
}
return filter ? filter(event) : true;
if (chatProjection?.state === "final") {
if (hasTerminalRunEvent) {
return undefined;
}
hasTerminalRunEvent = true;
return normalizeChatProjectionEvent(event, chatProjection);
}
if (isAssistantRunEvent(event)) {
hasCanonicalAssistantRunEvent = true;
}
if (isTerminalRunEvent(event)) {
hasTerminalRunEvent = true;
}
return event;
};
const matches = (event: OpenClawEvent) => event.runId === runId;
const liveSource = this.normalizedEvents.stream(matches, { replay: true });
const live = liveSource[Symbol.asyncIterator]();
let nextLive = live.next();
const seen = new Set<string>();
try {
for (const event of this.replaySnapshot(runId)) {
if (!matches(event) || seen.has(event.id)) {
for (const event of replayEvents) {
if (seen.has(event.id)) {
continue;
}
seen.add(event.id);
yield event;
const runEvent = toRunStreamEvent(event);
if (!runEvent || (filter && !filter(runEvent))) {
continue;
}
yield runEvent;
}
while (true) {
const next = await nextLive;
@@ -290,7 +385,11 @@ export class OpenClaw {
continue;
}
seen.add(next.value.id);
yield next.value;
const runEvent = toRunStreamEvent(next.value);
if (!runEvent || (filter && !filter(runEvent))) {
continue;
}
yield runEvent;
}
} finally {
await live.return?.();

View File

@@ -1,6 +1,11 @@
import { describe, expect, it } from "vitest";
import { EventHub, OpenClaw, normalizeGatewayEvent } from "./index.js";
import type { GatewayEvent, GatewayRequestOptions, OpenClawTransport } from "./types.js";
import type {
GatewayEvent,
GatewayRequestOptions,
OpenClawEvent,
OpenClawTransport,
} from "./types.js";
type RequestCall = {
method: string;
@@ -355,6 +360,209 @@ describe("OpenClaw SDK", () => {
expect(seen).toEqual(["run.started", "assistant.delta", "run.completed"]);
});
it("does not surface raw chat projection events in per-run streams", async () => {
const ts = 1_777_000_000_100;
const transport = new FakeTransport({
agent: (
_params: unknown,
_options: GatewayRequestOptions | undefined,
fake: FakeTransport,
) => {
fake.emit({
event: "agent",
seq: 1,
payload: {
runId: "run_chat_projection",
stream: "lifecycle",
ts,
data: { phase: "start" },
},
});
fake.emit({
event: "agent",
seq: 2,
payload: {
runId: "run_chat_projection",
stream: "assistant",
ts: ts + 1,
data: { delta: "hello" },
},
});
fake.emit({
event: "chat",
seq: 3,
payload: {
runId: "run_chat_projection",
sessionKey: "chat-projection",
state: "delta",
message: {
role: "assistant",
content: [{ type: "text", text: "hello" }],
timestamp: ts + 2,
},
},
});
fake.emit({
event: "agent",
seq: 4,
payload: {
runId: "run_chat_projection",
stream: "lifecycle",
ts: ts + 3,
data: { phase: "end" },
},
});
fake.emit({
event: "chat",
seq: 5,
payload: {
runId: "run_chat_projection",
sessionKey: "chat-projection",
state: "final",
message: {
role: "assistant",
content: [{ type: "text", text: "hello" }],
timestamp: ts + 4,
},
},
});
return {
status: "accepted",
runId: "run_chat_projection",
sessionKey: "chat-projection",
};
},
});
const oc = new OpenClaw({ transport });
const run = await oc.runs.create({
input: "stream with chat projection",
idempotencyKey: "chat-projection-events",
sessionKey: "chat-projection",
});
const seen: OpenClawEvent[] = [];
for await (const event of run.events()) {
seen.push(event);
if (event.type === "run.completed") {
break;
}
}
expect(seen.map((event) => event.type)).toEqual([
"run.started",
"assistant.delta",
"run.completed",
]);
expect(seen.map((event) => event.raw?.event)).toEqual(["agent", "agent", "agent"]);
});
it("normalizes chat-only projection events in per-run streams", async () => {
const ts = 1_777_000_000_200;
const transport = new FakeTransport({
agent: (
_params: unknown,
_options: GatewayRequestOptions | undefined,
fake: FakeTransport,
) => {
fake.emit({
event: "chat",
seq: 1,
payload: {
runId: "run_chat_only",
sessionKey: "chat-only",
state: "delta",
message: {
role: "assistant",
content: [{ type: "text", text: "hello" }],
timestamp: ts,
},
},
});
fake.emit({
event: "chat",
seq: 2,
payload: {
runId: "run_chat_only",
sessionKey: "chat-only",
state: "delta",
message: {
role: "assistant",
content: [{ type: "text", text: "hello again" }],
timestamp: ts + 1,
},
},
});
fake.emit({
event: "chat",
seq: 3,
payload: {
runId: "run_chat_only",
sessionKey: "chat-only",
state: "final",
message: {
role: "assistant",
content: [{ type: "text", text: "hello again" }],
timestamp: ts + 2,
},
},
});
fake.emit({
event: "custom.debug",
seq: 4,
payload: {
runId: "run_chat_only",
ts: ts + 3,
data: { ok: true },
},
});
return { status: "accepted", runId: "run_chat_only", sessionKey: "chat-only" };
},
});
const oc = new OpenClaw({ transport });
const run = await oc.runs.create({
input: "stream with chat-only projection",
idempotencyKey: "chat-only-events",
sessionKey: "chat-only",
});
const iterator = run.events()[Symbol.asyncIterator]();
try {
const first = await iterator.next();
expect(first).toMatchObject({
done: false,
value: {
type: "assistant.delta",
data: { delta: "hello" },
raw: { event: "chat" },
},
});
const second = await iterator.next();
expect(second).toMatchObject({
done: false,
value: {
type: "assistant.delta",
data: { delta: "hello again" },
raw: { event: "chat" },
},
});
const third = await iterator.next();
expect(third).toMatchObject({
done: false,
value: {
type: "run.completed",
data: { phase: "end", outputText: "hello again" },
raw: { event: "chat" },
},
});
} finally {
await iterator.return?.();
}
});
it("creates a session and sends a message as a run", async () => {
const transport = new FakeTransport({
"sessions.create": { key: "session-main", label: "Main" },