fix: preserve SDK run event outcomes

This commit is contained in:
Peter Steinberger
2026-04-29 23:44:24 +01:00
parent 43f6c8b01a
commit 204d200be3
5 changed files with 358 additions and 74 deletions

View File

@@ -1,4 +1,5 @@
import { randomUUID } from "node:crypto";
import { EventHub } from "./event-hub.js";
import { normalizeGatewayEvent } from "./normalize.js";
import { GatewayClientTransport, isConnectableTransport } from "./transport.js";
import type {
@@ -15,6 +16,10 @@ import type {
SessionTarget,
} from "./types.js";
const MAX_REPLAY_RUNS = 100;
const MAX_REPLAY_EVENTS_PER_RUN = 500;
const MAX_NORMALIZED_REPLAY_EVENTS = 2000;
export type OpenClawOptions = {
gateway?: "auto" | (string & {});
url?: string;
@@ -36,17 +41,32 @@ function resolveGatewayUrl(options: OpenClawOptions): string | undefined {
function runStatusFromWaitPayload(payload: unknown): RunResult["status"] {
const record =
typeof payload === "object" && payload !== null ? (payload as { status?: unknown }) : {};
const status = typeof record.status === "string" ? record.status : undefined;
typeof payload === "object" && payload !== null
? (payload as { aborted?: unknown; status?: unknown; stopReason?: unknown })
: {};
const status = typeof record.status === "string" ? record.status.toLowerCase() : undefined;
const stopReason = typeof record.stopReason === "string" ? record.stopReason.toLowerCase() : "";
if (
status === "aborted" ||
status === "cancelled" ||
status === "canceled" ||
status === "killed" ||
stopReason === "aborted" ||
stopReason === "cancelled" ||
stopReason === "canceled" ||
stopReason === "killed" ||
stopReason === "rpc" ||
stopReason === "user" ||
(record.aborted === true && stopReason === "stop")
) {
return "cancelled";
}
if (status === "ok" || status === "completed" || status === "succeeded") {
return "completed";
}
if (status === "timeout" || status === "timed_out") {
return "timed_out";
}
if (status === "cancelled" || status === "canceled") {
return "cancelled";
}
if (status === "accepted") {
return "accepted";
}
@@ -149,7 +169,13 @@ export class OpenClaw {
readonly environments: EnvironmentsNamespace;
private readonly transport: OpenClawTransport;
private readonly normalizedEvents = new EventHub<OpenClawEvent>({
replayLimit: MAX_NORMALIZED_REPLAY_EVENTS,
});
private readonly replayByRunId = new Map<string, OpenClawEvent[]>();
private connected = false;
private eventPumpPromise: Promise<void> | null = null;
private eventPumpReady: Promise<void> | null = null;
constructor(options: OpenClawOptions = {}) {
this.transport =
@@ -173,16 +199,22 @@ export class OpenClaw {
async connect(): Promise<void> {
if (this.connected) {
await this.startEventPump();
return;
}
if (isConnectableTransport(this.transport)) {
await this.transport.connect();
}
this.connected = true;
await this.startEventPump();
}
async close(): Promise<void> {
await this.transport.close?.();
await this.eventPumpPromise?.catch(() => {});
this.normalizedEvents.close();
this.eventPumpPromise = null;
this.eventPumpReady = null;
this.connected = false;
}
@@ -196,21 +228,135 @@ export class OpenClaw {
}
events(filter?: (event: OpenClawEvent) => boolean): AsyncIterable<OpenClawEvent> {
const source = this.transport.events();
async function* iterate(): AsyncIterable<OpenClawEvent> {
for await (const event of source) {
const normalized = normalizeGatewayEvent(event);
if (!filter || filter(normalized)) {
yield normalized;
}
}
}
return iterate();
return this.iterateEvents(filter);
}
runEvents(
runId: string,
filter?: (event: OpenClawEvent) => boolean,
): AsyncIterable<OpenClawEvent> {
return this.iterateRunEvents(runId, filter);
}
rawEvents(filter?: (event: GatewayEvent) => boolean): AsyncIterable<GatewayEvent> {
return this.transport.events(filter);
}
private async *iterateEvents(
filter?: (event: OpenClawEvent) => boolean,
): AsyncIterable<OpenClawEvent> {
await this.connect();
for await (const event of this.normalizedEvents.stream(filter)) {
yield event;
}
}
private async *iterateRunEvents(
runId: string,
filter?: (event: OpenClawEvent) => boolean,
): AsyncIterable<OpenClawEvent> {
await this.connect();
const matches = (event: OpenClawEvent) => {
if (event.runId !== runId) {
return false;
}
return filter ? filter(event) : true;
};
const liveSource = this.normalizedEvents.stream(matches, { replay: true });
const live = liveSource[Symbol.asyncIterator]();
let nextLive = live.next();
const seen = new Set<string>();
try {
for (const event of this.replaySnapshot(runId)) {
if (!matches(event) || seen.has(event.id)) {
continue;
}
seen.add(event.id);
yield event;
}
while (true) {
const next = await nextLive;
if (next.done) {
break;
}
nextLive = live.next();
if (seen.has(next.value.id)) {
continue;
}
seen.add(next.value.id);
yield next.value;
}
} finally {
await live.return?.();
}
}
private startEventPump(): Promise<void> {
if (this.eventPumpReady) {
return this.eventPumpReady;
}
let markReady = () => {};
let ready = false;
this.eventPumpReady = new Promise<void>((resolve) => {
markReady = () => {
if (ready) {
return;
}
ready = true;
resolve();
};
});
this.eventPumpPromise = (async () => {
const iterator = this.transport.events()[Symbol.asyncIterator]();
try {
while (true) {
const next = iterator.next();
await Promise.resolve();
markReady();
const result = await next;
if (result.done) {
break;
}
const normalized = normalizeGatewayEvent(result.value);
this.recordReplayEvent(normalized);
this.normalizedEvents.publish(normalized);
}
} finally {
markReady();
await iterator.return?.();
this.normalizedEvents.close();
}
})().catch(() => {
markReady();
this.normalizedEvents.close();
});
return this.eventPumpReady;
}
private recordReplayEvent(event: OpenClawEvent): void {
if (!event.runId) {
return;
}
let events = this.replayByRunId.get(event.runId);
if (!events) {
if (this.replayByRunId.size >= MAX_REPLAY_RUNS) {
const oldestRunId = this.replayByRunId.keys().next().value;
if (oldestRunId) {
this.replayByRunId.delete(oldestRunId);
}
}
events = [];
this.replayByRunId.set(event.runId, events);
}
events.push(event);
if (events.length > MAX_REPLAY_EVENTS_PER_RUN) {
events.splice(0, events.length - MAX_REPLAY_EVENTS_PER_RUN);
}
}
private replaySnapshot(runId: string): OpenClawEvent[] {
return [...(this.replayByRunId.get(runId) ?? [])];
}
}
export class Agent {
@@ -241,12 +387,7 @@ export class Run {
) {}
events(filter?: (event: OpenClawEvent) => boolean): AsyncIterable<OpenClawEvent> {
return this.client.events((event) => {
if (event.runId !== this.id) {
return false;
}
return filter ? filter(event) : true;
});
return this.client.runEvents(this.id, filter);
}
async wait(options?: { timeoutMs?: number }): Promise<RunResult> {

View File

@@ -2,15 +2,36 @@ import type { GatewayEvent } from "./types.js";
type Listener<T> = (event: T) => void;
export type EventHubOptions = {
replayLimit?: number;
};
export type EventStreamOptions = {
replay?: boolean;
};
export class EventHub<T> {
private readonly replayLimit: number;
private readonly replayEvents: T[] = [];
private closed = false;
private readonly listeners = new Set<Listener<T>>();
private readonly waiters = new Set<() => void>();
constructor(options: EventHubOptions = {}) {
this.replayLimit = options.replayLimit ?? 0;
}
publish(event: T): void {
if (this.closed) {
return;
}
if (this.replayLimit > 0) {
this.replayEvents.push(event);
const overflow = this.replayEvents.length - this.replayLimit;
if (overflow > 0) {
this.replayEvents.splice(0, overflow);
}
}
for (const listener of this.listeners) {
listener(event);
}
@@ -18,6 +39,7 @@ export class EventHub<T> {
close(): void {
this.closed = true;
this.replayEvents.length = 0;
this.listeners.clear();
for (const wake of this.waiters) {
wake();
@@ -25,46 +47,76 @@ export class EventHub<T> {
this.waiters.clear();
}
async *stream(filter?: (event: T) => boolean): AsyncIterable<T> {
const queue: T[] = [];
let wake: (() => void) | null = null;
const listener = (event: T) => {
if (!filter || filter(event)) {
queue.push(event);
wake?.();
wake = null;
}
};
snapshot(filter?: (event: T) => boolean): T[] {
return filter ? this.replayEvents.filter(filter) : [...this.replayEvents];
}
this.listeners.add(listener);
try {
while (!this.closed) {
const next = queue.shift();
if (next) {
yield next;
continue;
}
await new Promise<void>((resolve) => {
const wakeCurrent = () => {
this.waiters.delete(wakeCurrent);
resolve();
};
wake = wakeCurrent;
this.waiters.add(wakeCurrent);
});
}
while (queue.length > 0) {
const next = queue.shift();
if (next) {
yield next;
}
}
} finally {
this.listeners.delete(listener);
if (wake) {
this.waiters.delete(wake);
}
}
stream(filter?: (event: T) => boolean, options: EventStreamOptions = {}): AsyncIterable<T> {
return {
[Symbol.asyncIterator]: (): AsyncIterator<T> => {
const queue: T[] = options.replay ? this.snapshot(filter) : [];
let stopped = false;
let wake: (() => void) | null = null;
const wakePending = () => {
const pending = wake;
if (!pending) {
return;
}
wake = null;
this.waiters.delete(pending);
pending();
};
const listener = (event: T) => {
if (!filter || filter(event)) {
queue.push(event);
wakePending();
}
};
const cleanup = () => {
if (stopped) {
return;
}
stopped = true;
this.listeners.delete(listener);
wakePending();
};
this.listeners.add(listener);
return {
next: async (): Promise<IteratorResult<T>> => {
while (true) {
if (stopped) {
break;
}
if (queue.length > 0) {
return { done: false, value: queue.shift() as T };
}
if (this.closed) {
break;
}
await new Promise<void>((resolve) => {
const wakeCurrent = () => {
if (wake === wakeCurrent) {
wake = null;
}
this.waiters.delete(wakeCurrent);
resolve();
};
wake = wakeCurrent;
this.waiters.add(wakeCurrent);
});
}
cleanup();
return { done: true, value: undefined as never };
},
return: async (): Promise<IteratorResult<T>> => {
cleanup();
return { done: true, value: undefined as never };
},
};
},
};
}
}

View File

@@ -8,11 +8,19 @@ type RequestCall = {
options?: GatewayRequestOptions;
};
type FakeResponse =
| unknown
| ((
params: unknown,
options: GatewayRequestOptions | undefined,
transport: FakeTransport,
) => Promise<unknown> | unknown);
class FakeTransport implements OpenClawTransport {
readonly calls: RequestCall[] = [];
private readonly eventHub = new EventHub<GatewayEvent>();
private readonly eventHub = new EventHub<GatewayEvent>({ replayLimit: 100 });
constructor(private readonly responses: Record<string, unknown>) {}
constructor(private readonly responses: Record<string, FakeResponse>) {}
async request<T = unknown>(
method: string,
@@ -20,11 +28,15 @@ class FakeTransport implements OpenClawTransport {
options?: GatewayRequestOptions,
): Promise<T> {
this.calls.push({ method, params, options });
return this.responses[method] as T;
const response = this.responses[method];
if (typeof response === "function") {
return (await response(params, options, this)) as T;
}
return response as T;
}
events(filter?: (event: GatewayEvent) => boolean): AsyncIterable<GatewayEvent> {
return this.eventHub.stream(filter);
return this.eventHub.stream(filter, { replay: true });
}
emit(event: GatewayEvent): void {
@@ -104,6 +116,26 @@ describe("OpenClaw SDK", () => {
]);
});
it("maps aborted wait snapshots to cancelled even when Gateway status is timeout", async () => {
const transport = new FakeTransport({
"agent.wait": {
status: "timeout",
runId: "run_cancelled",
stopReason: "rpc",
error: "aborted by operator",
},
});
const oc = new OpenClaw({ transport });
const result = await oc.runs.wait("run_cancelled");
expect(result).toMatchObject({
runId: "run_cancelled",
status: "cancelled",
error: { message: "aborted by operator" },
});
});
it("splits provider-qualified model refs and rejects unsupported run options", async () => {
const transport = new FakeTransport({
agent: { status: "accepted", runId: "run_openrouter" },
@@ -218,6 +250,61 @@ describe("OpenClaw SDK", () => {
expect(transport.calls[2]?.params).toEqual({ probe: false });
});
it("replays fast run events emitted before the caller starts iterating", async () => {
const ts = 1_777_000_000_000;
const transport = new FakeTransport({
agent: (
_params: unknown,
_options: GatewayRequestOptions | undefined,
fake: FakeTransport,
) => {
fake.emit({
event: "agent",
seq: 1,
payload: { runId: "run_fast", stream: "lifecycle", ts, data: { phase: "start" } },
});
fake.emit({
event: "agent",
seq: 2,
payload: {
runId: "run_fast",
stream: "assistant",
ts: ts + 1,
data: { delta: "fast" },
},
});
fake.emit({
event: "agent",
seq: 3,
payload: {
runId: "run_fast",
stream: "lifecycle",
ts: ts + 2,
data: { phase: "end" },
},
});
return { status: "accepted", runId: "run_fast", sessionKey: "fast" };
},
});
const oc = new OpenClaw({ transport });
const run = await oc.runs.create({
input: "finish immediately",
idempotencyKey: "fast-run-events",
sessionKey: "fast",
});
const seen: string[] = [];
for await (const event of run.events()) {
seen.push(event.type);
if (event.type === "run.completed") {
break;
}
}
expect(seen).toEqual(["run.started", "assistant.delta", "run.completed"]);
});
it("creates a session and sends a message as a run", async () => {
const transport = new FakeTransport({
"sessions.create": { key: "session-main", label: "Main" },

View File

@@ -19,14 +19,6 @@ function readLowerString(value: unknown): string | undefined {
function normalizeLifecycleEndEventType(data: JsonObject): OpenClawEventType {
const status = readLowerString(data.status);
const stopReason = readLowerString(data.stopReason);
if (
status === "timeout" ||
status === "timed_out" ||
stopReason === "timeout" ||
stopReason === "timed_out"
) {
return "run.timed_out";
}
if (
status === "aborted" ||
status === "cancelled" ||
@@ -42,6 +34,14 @@ function normalizeLifecycleEndEventType(data: JsonObject): OpenClawEventType {
) {
return "run.cancelled";
}
if (
status === "timeout" ||
status === "timed_out" ||
stopReason === "timeout" ||
stopReason === "timed_out"
) {
return "run.timed_out";
}
if (data.aborted === true) {
return "run.timed_out";
}

View File

@@ -16,6 +16,8 @@ type GatewayClientLike = {
stopAndWait(): Promise<void>;
};
const RAW_EVENT_REPLAY_LIMIT = 1000;
export type GatewayClientTransportOptions = {
url?: string;
connectChallengeTimeoutMs?: number;
@@ -65,7 +67,9 @@ function toGatewayEvent(event: unknown): GatewayEvent {
}
export class GatewayClientTransport implements ConnectableOpenClawTransport {
private readonly eventsHub = new EventHub<GatewayEvent>();
private readonly eventsHub = new EventHub<GatewayEvent>({
replayLimit: RAW_EVENT_REPLAY_LIMIT,
});
private readonly options: GatewayClientTransportOptions;
private client: GatewayClientLike | null = null;
private connectPromise: Promise<void> | null = null;
@@ -126,7 +130,7 @@ export class GatewayClientTransport implements ConnectableOpenClawTransport {
}
events(filter?: (event: GatewayEvent) => boolean): AsyncIterable<GatewayEvent> {
return this.eventsHub.stream(filter);
return this.eventsHub.stream(filter, { replay: true });
}
async close(): Promise<void> {