diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f1f557a364..16b7025663b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,7 @@ Docs: https://docs.openclaw.ai - Gateway/startup: bound local discovery advertisement during startup, so a stuck discovery plugin can no longer keep the Gateway from reaching ready. Fixes #73865; refs #74630 and #74633. Thanks @lpendeavors, @moltar-bot, and @Saboor711. - Gateway/models: serve the last successful model catalog while stale reloads refresh in the background, so Gateway control-plane and OpenAI-compatible requests no longer block behind model-provider rediscovery after model config changes. Refs #74135, #74630, and #74633. Thanks @DerFlash, @moltar-bot, and @Saboor711. - CLI/status: resolve read-only channel setup runtime fallback from the packaged OpenClaw dist root, so `status --all`, `status --deep`, channel, and doctor paths do not crash when an external channel plugin needs setup metadata. Fixes #74693. Thanks @giangthb. +- SDK/events: keep per-run SDK event streams from surfacing duplicate raw chat projection frames, while normalizing chat-only projection frames and preserving raw access through `rawEvents`. Refs #74704. Thanks @BunsDev. - Google Meet: block managed Chrome intro/test speech until browser health proves the participant is in-call, and expose `speechReady` diagnostics so login, admission, permission, and audio-bridge blockers no longer look like successful speech. Refs #72478. Thanks @DougButdorf. - Slack/commands: keep native command argument menus on select controls for encoded choice values up to Slack's option limit and truncate fallback button labels to Slack's button-text limit, so long valid choices no longer render invalid Slack blocks. Thanks @slackapi. - Agents/Codex: flush accepted debounced steering messages before normal app-server turn cleanup, so inbound follow-ups acknowledged as queued are not dropped when the turn completes before the debounce fires. Thanks @vincentkoc. diff --git a/packages/sdk/src/client.ts b/packages/sdk/src/client.ts index 272633b38a1..65b13617b37 100644 --- a/packages/sdk/src/client.ts +++ b/packages/sdk/src/client.ts @@ -163,6 +163,76 @@ function unsupportedGatewayApi(api: string): never { throw new Error(`${api} is not supported by the current OpenClaw Gateway yet`); } +type ChatProjectionState = "delta" | "final"; + +type ChatProjection = { + state: ChatProjectionState; + payload: Record; +}; + +function asRecord(value: unknown): Record { + return typeof value === "object" && value !== null ? (value as Record) : {}; +} + +function readChatProjection(event: OpenClawEvent): ChatProjection | undefined { + const raw = event.raw; + if (event.type !== "raw" || raw?.event !== "chat") { + return undefined; + } + const payload = asRecord(raw.payload); + return payload.state === "delta" || payload.state === "final" + ? { state: payload.state, payload } + : undefined; +} + +function readChatProjectionText(payload: Record): string | undefined { + const message = asRecord(payload.message); + const content = message.content; + if (typeof content === "string") { + return content; + } + if (!Array.isArray(content)) { + return undefined; + } + const text = content + .map((part) => { + const record = asRecord(part); + return record.type === "text" && typeof record.text === "string" ? record.text : ""; + }) + .join(""); + return text.length > 0 ? text : undefined; +} + +function isAssistantRunEvent(event: OpenClawEvent): boolean { + return event.type === "assistant.delta" || event.type === "assistant.message"; +} + +function isTerminalRunEvent(event: OpenClawEvent): boolean { + return ( + event.type === "run.completed" || + event.type === "run.failed" || + event.type === "run.cancelled" || + event.type === "run.timed_out" + ); +} + +function normalizeChatProjectionEvent( + event: OpenClawEvent, + projection: ChatProjection, +): OpenClawEvent { + const text = readChatProjectionText(projection.payload); + return { + ...event, + type: projection.state === "delta" ? "assistant.delta" : "run.completed", + data: + projection.state === "delta" + ? text !== undefined + ? { delta: text } + : event.data + : { phase: "end", ...(text !== undefined ? { outputText: text } : {}) }, + }; +} + export class OpenClaw { readonly agents: AgentsNamespace; readonly sessions: SessionsNamespace; @@ -262,23 +332,48 @@ export class OpenClaw { filter?: (event: OpenClawEvent) => boolean, ): AsyncIterable { await this.connect(); - const matches = (event: OpenClawEvent) => { - if (event.runId !== runId) { - return false; + const replayEvents = this.replaySnapshot(runId); + let hasCanonicalAssistantRunEvent = replayEvents.some(isAssistantRunEvent); + let hasTerminalRunEvent = replayEvents.some(isTerminalRunEvent); + const toRunStreamEvent = (event: OpenClawEvent): OpenClawEvent | undefined => { + const chatProjection = readChatProjection(event); + if (chatProjection?.state === "delta") { + if (hasCanonicalAssistantRunEvent) { + return undefined; + } + return normalizeChatProjectionEvent(event, chatProjection); } - return filter ? filter(event) : true; + if (chatProjection?.state === "final") { + if (hasTerminalRunEvent) { + return undefined; + } + hasTerminalRunEvent = true; + return normalizeChatProjectionEvent(event, chatProjection); + } + if (isAssistantRunEvent(event)) { + hasCanonicalAssistantRunEvent = true; + } + if (isTerminalRunEvent(event)) { + hasTerminalRunEvent = true; + } + return event; }; + const matches = (event: OpenClawEvent) => event.runId === runId; const liveSource = this.normalizedEvents.stream(matches, { replay: true }); const live = liveSource[Symbol.asyncIterator](); let nextLive = live.next(); const seen = new Set(); try { - for (const event of this.replaySnapshot(runId)) { - if (!matches(event) || seen.has(event.id)) { + for (const event of replayEvents) { + if (seen.has(event.id)) { continue; } seen.add(event.id); - yield event; + const runEvent = toRunStreamEvent(event); + if (!runEvent || (filter && !filter(runEvent))) { + continue; + } + yield runEvent; } while (true) { const next = await nextLive; @@ -290,7 +385,11 @@ export class OpenClaw { continue; } seen.add(next.value.id); - yield next.value; + const runEvent = toRunStreamEvent(next.value); + if (!runEvent || (filter && !filter(runEvent))) { + continue; + } + yield runEvent; } } finally { await live.return?.(); diff --git a/packages/sdk/src/index.test.ts b/packages/sdk/src/index.test.ts index 95231744bef..17509b5ecdc 100644 --- a/packages/sdk/src/index.test.ts +++ b/packages/sdk/src/index.test.ts @@ -1,6 +1,11 @@ import { describe, expect, it } from "vitest"; import { EventHub, OpenClaw, normalizeGatewayEvent } from "./index.js"; -import type { GatewayEvent, GatewayRequestOptions, OpenClawTransport } from "./types.js"; +import type { + GatewayEvent, + GatewayRequestOptions, + OpenClawEvent, + OpenClawTransport, +} from "./types.js"; type RequestCall = { method: string; @@ -355,6 +360,209 @@ describe("OpenClaw SDK", () => { expect(seen).toEqual(["run.started", "assistant.delta", "run.completed"]); }); + it("does not surface raw chat projection events in per-run streams", async () => { + const ts = 1_777_000_000_100; + const transport = new FakeTransport({ + agent: ( + _params: unknown, + _options: GatewayRequestOptions | undefined, + fake: FakeTransport, + ) => { + fake.emit({ + event: "agent", + seq: 1, + payload: { + runId: "run_chat_projection", + stream: "lifecycle", + ts, + data: { phase: "start" }, + }, + }); + fake.emit({ + event: "agent", + seq: 2, + payload: { + runId: "run_chat_projection", + stream: "assistant", + ts: ts + 1, + data: { delta: "hello" }, + }, + }); + fake.emit({ + event: "chat", + seq: 3, + payload: { + runId: "run_chat_projection", + sessionKey: "chat-projection", + state: "delta", + message: { + role: "assistant", + content: [{ type: "text", text: "hello" }], + timestamp: ts + 2, + }, + }, + }); + fake.emit({ + event: "agent", + seq: 4, + payload: { + runId: "run_chat_projection", + stream: "lifecycle", + ts: ts + 3, + data: { phase: "end" }, + }, + }); + fake.emit({ + event: "chat", + seq: 5, + payload: { + runId: "run_chat_projection", + sessionKey: "chat-projection", + state: "final", + message: { + role: "assistant", + content: [{ type: "text", text: "hello" }], + timestamp: ts + 4, + }, + }, + }); + return { + status: "accepted", + runId: "run_chat_projection", + sessionKey: "chat-projection", + }; + }, + }); + const oc = new OpenClaw({ transport }); + + const run = await oc.runs.create({ + input: "stream with chat projection", + idempotencyKey: "chat-projection-events", + sessionKey: "chat-projection", + }); + const seen: OpenClawEvent[] = []; + + for await (const event of run.events()) { + seen.push(event); + if (event.type === "run.completed") { + break; + } + } + + expect(seen.map((event) => event.type)).toEqual([ + "run.started", + "assistant.delta", + "run.completed", + ]); + expect(seen.map((event) => event.raw?.event)).toEqual(["agent", "agent", "agent"]); + }); + + it("normalizes chat-only projection events in per-run streams", async () => { + const ts = 1_777_000_000_200; + const transport = new FakeTransport({ + agent: ( + _params: unknown, + _options: GatewayRequestOptions | undefined, + fake: FakeTransport, + ) => { + fake.emit({ + event: "chat", + seq: 1, + payload: { + runId: "run_chat_only", + sessionKey: "chat-only", + state: "delta", + message: { + role: "assistant", + content: [{ type: "text", text: "hello" }], + timestamp: ts, + }, + }, + }); + fake.emit({ + event: "chat", + seq: 2, + payload: { + runId: "run_chat_only", + sessionKey: "chat-only", + state: "delta", + message: { + role: "assistant", + content: [{ type: "text", text: "hello again" }], + timestamp: ts + 1, + }, + }, + }); + fake.emit({ + event: "chat", + seq: 3, + payload: { + runId: "run_chat_only", + sessionKey: "chat-only", + state: "final", + message: { + role: "assistant", + content: [{ type: "text", text: "hello again" }], + timestamp: ts + 2, + }, + }, + }); + fake.emit({ + event: "custom.debug", + seq: 4, + payload: { + runId: "run_chat_only", + ts: ts + 3, + data: { ok: true }, + }, + }); + return { status: "accepted", runId: "run_chat_only", sessionKey: "chat-only" }; + }, + }); + const oc = new OpenClaw({ transport }); + + const run = await oc.runs.create({ + input: "stream with chat-only projection", + idempotencyKey: "chat-only-events", + sessionKey: "chat-only", + }); + const iterator = run.events()[Symbol.asyncIterator](); + + try { + const first = await iterator.next(); + expect(first).toMatchObject({ + done: false, + value: { + type: "assistant.delta", + data: { delta: "hello" }, + raw: { event: "chat" }, + }, + }); + + const second = await iterator.next(); + expect(second).toMatchObject({ + done: false, + value: { + type: "assistant.delta", + data: { delta: "hello again" }, + raw: { event: "chat" }, + }, + }); + + const third = await iterator.next(); + expect(third).toMatchObject({ + done: false, + value: { + type: "run.completed", + data: { phase: "end", outputText: "hello again" }, + raw: { event: "chat" }, + }, + }); + } finally { + await iterator.return?.(); + } + }); + it("creates a session and sends a message as a run", async () => { const transport = new FakeTransport({ "sessions.create": { key: "session-main", label: "Main" },