From 204d200be3f942de95175aaf1ffae34a1b222ad5 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 29 Apr 2026 23:44:24 +0100 Subject: [PATCH] fix: preserve SDK run event outcomes --- packages/sdk/src/client.ts | 183 +++++++++++++++++++++++++++++---- packages/sdk/src/event-hub.ts | 130 ++++++++++++++++------- packages/sdk/src/index.test.ts | 95 ++++++++++++++++- packages/sdk/src/normalize.ts | 16 +-- packages/sdk/src/transport.ts | 8 +- 5 files changed, 358 insertions(+), 74 deletions(-) diff --git a/packages/sdk/src/client.ts b/packages/sdk/src/client.ts index 623006bc609..4a4faf166e0 100644 --- a/packages/sdk/src/client.ts +++ b/packages/sdk/src/client.ts @@ -1,4 +1,5 @@ import { randomUUID } from "node:crypto"; +import { EventHub } from "./event-hub.js"; import { normalizeGatewayEvent } from "./normalize.js"; import { GatewayClientTransport, isConnectableTransport } from "./transport.js"; import type { @@ -15,6 +16,10 @@ import type { SessionTarget, } from "./types.js"; +const MAX_REPLAY_RUNS = 100; +const MAX_REPLAY_EVENTS_PER_RUN = 500; +const MAX_NORMALIZED_REPLAY_EVENTS = 2000; + export type OpenClawOptions = { gateway?: "auto" | (string & {}); url?: string; @@ -36,17 +41,32 @@ function resolveGatewayUrl(options: OpenClawOptions): string | undefined { function runStatusFromWaitPayload(payload: unknown): RunResult["status"] { const record = - typeof payload === "object" && payload !== null ? (payload as { status?: unknown }) : {}; - const status = typeof record.status === "string" ? record.status : undefined; + typeof payload === "object" && payload !== null + ? (payload as { aborted?: unknown; status?: unknown; stopReason?: unknown }) + : {}; + const status = typeof record.status === "string" ? record.status.toLowerCase() : undefined; + const stopReason = typeof record.stopReason === "string" ? record.stopReason.toLowerCase() : ""; + if ( + status === "aborted" || + status === "cancelled" || + status === "canceled" || + status === "killed" || + stopReason === "aborted" || + stopReason === "cancelled" || + stopReason === "canceled" || + stopReason === "killed" || + stopReason === "rpc" || + stopReason === "user" || + (record.aborted === true && stopReason === "stop") + ) { + return "cancelled"; + } if (status === "ok" || status === "completed" || status === "succeeded") { return "completed"; } if (status === "timeout" || status === "timed_out") { return "timed_out"; } - if (status === "cancelled" || status === "canceled") { - return "cancelled"; - } if (status === "accepted") { return "accepted"; } @@ -149,7 +169,13 @@ export class OpenClaw { readonly environments: EnvironmentsNamespace; private readonly transport: OpenClawTransport; + private readonly normalizedEvents = new EventHub({ + replayLimit: MAX_NORMALIZED_REPLAY_EVENTS, + }); + private readonly replayByRunId = new Map(); private connected = false; + private eventPumpPromise: Promise | null = null; + private eventPumpReady: Promise | null = null; constructor(options: OpenClawOptions = {}) { this.transport = @@ -173,16 +199,22 @@ export class OpenClaw { async connect(): Promise { if (this.connected) { + await this.startEventPump(); return; } if (isConnectableTransport(this.transport)) { await this.transport.connect(); } this.connected = true; + await this.startEventPump(); } async close(): Promise { await this.transport.close?.(); + await this.eventPumpPromise?.catch(() => {}); + this.normalizedEvents.close(); + this.eventPumpPromise = null; + this.eventPumpReady = null; this.connected = false; } @@ -196,21 +228,135 @@ export class OpenClaw { } events(filter?: (event: OpenClawEvent) => boolean): AsyncIterable { - const source = this.transport.events(); - async function* iterate(): AsyncIterable { - for await (const event of source) { - const normalized = normalizeGatewayEvent(event); - if (!filter || filter(normalized)) { - yield normalized; - } - } - } - return iterate(); + return this.iterateEvents(filter); + } + + runEvents( + runId: string, + filter?: (event: OpenClawEvent) => boolean, + ): AsyncIterable { + return this.iterateRunEvents(runId, filter); } rawEvents(filter?: (event: GatewayEvent) => boolean): AsyncIterable { return this.transport.events(filter); } + + private async *iterateEvents( + filter?: (event: OpenClawEvent) => boolean, + ): AsyncIterable { + await this.connect(); + for await (const event of this.normalizedEvents.stream(filter)) { + yield event; + } + } + + private async *iterateRunEvents( + runId: string, + filter?: (event: OpenClawEvent) => boolean, + ): AsyncIterable { + await this.connect(); + const matches = (event: OpenClawEvent) => { + if (event.runId !== runId) { + return false; + } + return filter ? filter(event) : true; + }; + const liveSource = this.normalizedEvents.stream(matches, { replay: true }); + const live = liveSource[Symbol.asyncIterator](); + let nextLive = live.next(); + const seen = new Set(); + try { + for (const event of this.replaySnapshot(runId)) { + if (!matches(event) || seen.has(event.id)) { + continue; + } + seen.add(event.id); + yield event; + } + while (true) { + const next = await nextLive; + if (next.done) { + break; + } + nextLive = live.next(); + if (seen.has(next.value.id)) { + continue; + } + seen.add(next.value.id); + yield next.value; + } + } finally { + await live.return?.(); + } + } + + private startEventPump(): Promise { + if (this.eventPumpReady) { + return this.eventPumpReady; + } + let markReady = () => {}; + let ready = false; + this.eventPumpReady = new Promise((resolve) => { + markReady = () => { + if (ready) { + return; + } + ready = true; + resolve(); + }; + }); + this.eventPumpPromise = (async () => { + const iterator = this.transport.events()[Symbol.asyncIterator](); + try { + while (true) { + const next = iterator.next(); + await Promise.resolve(); + markReady(); + const result = await next; + if (result.done) { + break; + } + const normalized = normalizeGatewayEvent(result.value); + this.recordReplayEvent(normalized); + this.normalizedEvents.publish(normalized); + } + } finally { + markReady(); + await iterator.return?.(); + this.normalizedEvents.close(); + } + })().catch(() => { + markReady(); + this.normalizedEvents.close(); + }); + return this.eventPumpReady; + } + + private recordReplayEvent(event: OpenClawEvent): void { + if (!event.runId) { + return; + } + let events = this.replayByRunId.get(event.runId); + if (!events) { + if (this.replayByRunId.size >= MAX_REPLAY_RUNS) { + const oldestRunId = this.replayByRunId.keys().next().value; + if (oldestRunId) { + this.replayByRunId.delete(oldestRunId); + } + } + events = []; + this.replayByRunId.set(event.runId, events); + } + events.push(event); + if (events.length > MAX_REPLAY_EVENTS_PER_RUN) { + events.splice(0, events.length - MAX_REPLAY_EVENTS_PER_RUN); + } + } + + private replaySnapshot(runId: string): OpenClawEvent[] { + return [...(this.replayByRunId.get(runId) ?? [])]; + } } export class Agent { @@ -241,12 +387,7 @@ export class Run { ) {} events(filter?: (event: OpenClawEvent) => boolean): AsyncIterable { - return this.client.events((event) => { - if (event.runId !== this.id) { - return false; - } - return filter ? filter(event) : true; - }); + return this.client.runEvents(this.id, filter); } async wait(options?: { timeoutMs?: number }): Promise { diff --git a/packages/sdk/src/event-hub.ts b/packages/sdk/src/event-hub.ts index 4a5a8eb56b0..d2afe3ea6dc 100644 --- a/packages/sdk/src/event-hub.ts +++ b/packages/sdk/src/event-hub.ts @@ -2,15 +2,36 @@ import type { GatewayEvent } from "./types.js"; type Listener = (event: T) => void; +export type EventHubOptions = { + replayLimit?: number; +}; + +export type EventStreamOptions = { + replay?: boolean; +}; + export class EventHub { + private readonly replayLimit: number; + private readonly replayEvents: T[] = []; private closed = false; private readonly listeners = new Set>(); private readonly waiters = new Set<() => void>(); + constructor(options: EventHubOptions = {}) { + this.replayLimit = options.replayLimit ?? 0; + } + publish(event: T): void { if (this.closed) { return; } + if (this.replayLimit > 0) { + this.replayEvents.push(event); + const overflow = this.replayEvents.length - this.replayLimit; + if (overflow > 0) { + this.replayEvents.splice(0, overflow); + } + } for (const listener of this.listeners) { listener(event); } @@ -18,6 +39,7 @@ export class EventHub { close(): void { this.closed = true; + this.replayEvents.length = 0; this.listeners.clear(); for (const wake of this.waiters) { wake(); @@ -25,46 +47,76 @@ export class EventHub { this.waiters.clear(); } - async *stream(filter?: (event: T) => boolean): AsyncIterable { - const queue: T[] = []; - let wake: (() => void) | null = null; - const listener = (event: T) => { - if (!filter || filter(event)) { - queue.push(event); - wake?.(); - wake = null; - } - }; + snapshot(filter?: (event: T) => boolean): T[] { + return filter ? this.replayEvents.filter(filter) : [...this.replayEvents]; + } - this.listeners.add(listener); - try { - while (!this.closed) { - const next = queue.shift(); - if (next) { - yield next; - continue; - } - await new Promise((resolve) => { - const wakeCurrent = () => { - this.waiters.delete(wakeCurrent); - resolve(); - }; - wake = wakeCurrent; - this.waiters.add(wakeCurrent); - }); - } - while (queue.length > 0) { - const next = queue.shift(); - if (next) { - yield next; - } - } - } finally { - this.listeners.delete(listener); - if (wake) { - this.waiters.delete(wake); - } - } + stream(filter?: (event: T) => boolean, options: EventStreamOptions = {}): AsyncIterable { + return { + [Symbol.asyncIterator]: (): AsyncIterator => { + const queue: T[] = options.replay ? this.snapshot(filter) : []; + let stopped = false; + let wake: (() => void) | null = null; + const wakePending = () => { + const pending = wake; + if (!pending) { + return; + } + wake = null; + this.waiters.delete(pending); + pending(); + }; + const listener = (event: T) => { + if (!filter || filter(event)) { + queue.push(event); + wakePending(); + } + }; + const cleanup = () => { + if (stopped) { + return; + } + stopped = true; + this.listeners.delete(listener); + wakePending(); + }; + + this.listeners.add(listener); + + return { + next: async (): Promise> => { + while (true) { + if (stopped) { + break; + } + if (queue.length > 0) { + return { done: false, value: queue.shift() as T }; + } + if (this.closed) { + break; + } + await new Promise((resolve) => { + const wakeCurrent = () => { + if (wake === wakeCurrent) { + wake = null; + } + this.waiters.delete(wakeCurrent); + resolve(); + }; + wake = wakeCurrent; + this.waiters.add(wakeCurrent); + }); + } + cleanup(); + return { done: true, value: undefined as never }; + }, + return: async (): Promise> => { + cleanup(); + return { done: true, value: undefined as never }; + }, + }; + }, + }; } } diff --git a/packages/sdk/src/index.test.ts b/packages/sdk/src/index.test.ts index a431254e94d..0960827e636 100644 --- a/packages/sdk/src/index.test.ts +++ b/packages/sdk/src/index.test.ts @@ -8,11 +8,19 @@ type RequestCall = { options?: GatewayRequestOptions; }; +type FakeResponse = + | unknown + | (( + params: unknown, + options: GatewayRequestOptions | undefined, + transport: FakeTransport, + ) => Promise | unknown); + class FakeTransport implements OpenClawTransport { readonly calls: RequestCall[] = []; - private readonly eventHub = new EventHub(); + private readonly eventHub = new EventHub({ replayLimit: 100 }); - constructor(private readonly responses: Record) {} + constructor(private readonly responses: Record) {} async request( method: string, @@ -20,11 +28,15 @@ class FakeTransport implements OpenClawTransport { options?: GatewayRequestOptions, ): Promise { this.calls.push({ method, params, options }); - return this.responses[method] as T; + const response = this.responses[method]; + if (typeof response === "function") { + return (await response(params, options, this)) as T; + } + return response as T; } events(filter?: (event: GatewayEvent) => boolean): AsyncIterable { - return this.eventHub.stream(filter); + return this.eventHub.stream(filter, { replay: true }); } emit(event: GatewayEvent): void { @@ -104,6 +116,26 @@ describe("OpenClaw SDK", () => { ]); }); + it("maps aborted wait snapshots to cancelled even when Gateway status is timeout", async () => { + const transport = new FakeTransport({ + "agent.wait": { + status: "timeout", + runId: "run_cancelled", + stopReason: "rpc", + error: "aborted by operator", + }, + }); + const oc = new OpenClaw({ transport }); + + const result = await oc.runs.wait("run_cancelled"); + + expect(result).toMatchObject({ + runId: "run_cancelled", + status: "cancelled", + error: { message: "aborted by operator" }, + }); + }); + it("splits provider-qualified model refs and rejects unsupported run options", async () => { const transport = new FakeTransport({ agent: { status: "accepted", runId: "run_openrouter" }, @@ -218,6 +250,61 @@ describe("OpenClaw SDK", () => { expect(transport.calls[2]?.params).toEqual({ probe: false }); }); + it("replays fast run events emitted before the caller starts iterating", async () => { + const ts = 1_777_000_000_000; + const transport = new FakeTransport({ + agent: ( + _params: unknown, + _options: GatewayRequestOptions | undefined, + fake: FakeTransport, + ) => { + fake.emit({ + event: "agent", + seq: 1, + payload: { runId: "run_fast", stream: "lifecycle", ts, data: { phase: "start" } }, + }); + fake.emit({ + event: "agent", + seq: 2, + payload: { + runId: "run_fast", + stream: "assistant", + ts: ts + 1, + data: { delta: "fast" }, + }, + }); + fake.emit({ + event: "agent", + seq: 3, + payload: { + runId: "run_fast", + stream: "lifecycle", + ts: ts + 2, + data: { phase: "end" }, + }, + }); + return { status: "accepted", runId: "run_fast", sessionKey: "fast" }; + }, + }); + const oc = new OpenClaw({ transport }); + + const run = await oc.runs.create({ + input: "finish immediately", + idempotencyKey: "fast-run-events", + sessionKey: "fast", + }); + const seen: string[] = []; + + for await (const event of run.events()) { + seen.push(event.type); + if (event.type === "run.completed") { + break; + } + } + + expect(seen).toEqual(["run.started", "assistant.delta", "run.completed"]); + }); + it("creates a session and sends a message as a run", async () => { const transport = new FakeTransport({ "sessions.create": { key: "session-main", label: "Main" }, diff --git a/packages/sdk/src/normalize.ts b/packages/sdk/src/normalize.ts index 1bfaf47c01d..e3ce0059af0 100644 --- a/packages/sdk/src/normalize.ts +++ b/packages/sdk/src/normalize.ts @@ -19,14 +19,6 @@ function readLowerString(value: unknown): string | undefined { function normalizeLifecycleEndEventType(data: JsonObject): OpenClawEventType { const status = readLowerString(data.status); const stopReason = readLowerString(data.stopReason); - if ( - status === "timeout" || - status === "timed_out" || - stopReason === "timeout" || - stopReason === "timed_out" - ) { - return "run.timed_out"; - } if ( status === "aborted" || status === "cancelled" || @@ -42,6 +34,14 @@ function normalizeLifecycleEndEventType(data: JsonObject): OpenClawEventType { ) { return "run.cancelled"; } + if ( + status === "timeout" || + status === "timed_out" || + stopReason === "timeout" || + stopReason === "timed_out" + ) { + return "run.timed_out"; + } if (data.aborted === true) { return "run.timed_out"; } diff --git a/packages/sdk/src/transport.ts b/packages/sdk/src/transport.ts index d9428e35726..50847e8ab0c 100644 --- a/packages/sdk/src/transport.ts +++ b/packages/sdk/src/transport.ts @@ -16,6 +16,8 @@ type GatewayClientLike = { stopAndWait(): Promise; }; +const RAW_EVENT_REPLAY_LIMIT = 1000; + export type GatewayClientTransportOptions = { url?: string; connectChallengeTimeoutMs?: number; @@ -65,7 +67,9 @@ function toGatewayEvent(event: unknown): GatewayEvent { } export class GatewayClientTransport implements ConnectableOpenClawTransport { - private readonly eventsHub = new EventHub(); + private readonly eventsHub = new EventHub({ + replayLimit: RAW_EVENT_REPLAY_LIMIT, + }); private readonly options: GatewayClientTransportOptions; private client: GatewayClientLike | null = null; private connectPromise: Promise | null = null; @@ -126,7 +130,7 @@ export class GatewayClientTransport implements ConnectableOpenClawTransport { } events(filter?: (event: GatewayEvent) => boolean): AsyncIterable { - return this.eventsHub.stream(filter); + return this.eventsHub.stream(filter, { replay: true }); } async close(): Promise {