refactor: share stream iterator wrappers

This commit is contained in:
Peter Steinberger
2026-04-18 22:57:35 +01:00
parent f76883d46c
commit 4e2541e5fb
5 changed files with 63 additions and 46 deletions

View File

@@ -1,6 +1,7 @@
import type { StreamFn } from "@mariozechner/pi-agent-core";
import { createAssistantMessageEventStream, streamSimple } from "@mariozechner/pi-ai";
import { formatErrorMessage } from "../../../infra/errors.js";
import { createStreamIteratorWrapper } from "../../stream-iterator-wrapper.js";
import { buildStreamErrorAssistantMessage } from "../../stream-message-shared.js";
const UNHANDLED_STOP_REASON_RE = /^Unhandled stop reason:\s*(.+)$/i;
@@ -90,14 +91,15 @@ function wrapStreamHandleUnhandledStopReason(
function () {
const iterator = originalAsyncIterator();
let emittedSyntheticTerminal = false;
return {
async next() {
return createStreamIteratorWrapper({
iterator,
next: async (streamIterator) => {
if (emittedSyntheticTerminal) {
return { done: true as const, value: undefined };
}
try {
const result = await iterator.next();
const result = await streamIterator.next();
if (!result.done && result.value && typeof result.value === "object") {
const event = result.value as { error?: unknown };
patchUnhandledStopReasonInAssistantMessage(event.error);
@@ -126,16 +128,7 @@ function wrapStreamHandleUnhandledStopReason(
};
}
},
async return(value?: unknown) {
return iterator.return?.(value) ?? { done: true as const, value: undefined };
},
async throw(error?: unknown) {
return iterator.throw?.(error) ?? { done: true as const, value: undefined };
},
[Symbol.asyncIterator]() {
return this;
},
};
});
};
return stream;

View File

@@ -2,6 +2,7 @@ import type { StreamFn } from "@mariozechner/pi-agent-core";
import { streamSimple } from "@mariozechner/pi-ai";
import { DEFAULT_LLM_IDLE_TIMEOUT_SECONDS } from "../../../config/agent-timeout-defaults.js";
import type { OpenClawConfig } from "../../../config/types.openclaw.js";
import { createStreamIteratorWrapper } from "../../stream-iterator-wrapper.js";
import type { EmbeddedRunTrigger } from "./params.js";
/**
@@ -100,13 +101,14 @@ export function streamWithIdleTimeout(
}
};
return {
async next() {
return createStreamIteratorWrapper({
iterator,
next: async (streamIterator) => {
clearTimer();
try {
// Race between the actual next() and the timeout
const result = await Promise.race([iterator.next(), createTimeoutPromise()]);
const result = await Promise.race([streamIterator.next(), createTimeoutPromise()]);
if (result.done) {
clearTimer();
@@ -120,17 +122,15 @@ export function streamWithIdleTimeout(
throw error;
}
},
return() {
onReturn(streamIterator) {
clearTimer();
return iterator.return?.() ?? Promise.resolve({ done: true, value: undefined });
return streamIterator.return?.() ?? Promise.resolve({ done: true, value: undefined });
},
throw(error?: unknown) {
onThrow(streamIterator, error) {
clearTimer();
return iterator.throw?.(error) ?? Promise.reject(error);
return streamIterator.throw?.(error) ?? Promise.reject(error);
},
};
});
};
return stream;

View File

@@ -1,4 +1,5 @@
import { streamSimple } from "@mariozechner/pi-ai";
import { createStreamIteratorWrapper } from "../../stream-iterator-wrapper.js";
type SimpleStream = ReturnType<typeof streamSimple>;
@@ -10,21 +11,16 @@ export function wrapStreamObjectEvents(
(stream as { [Symbol.asyncIterator]: typeof originalAsyncIterator })[Symbol.asyncIterator] =
function () {
const iterator = originalAsyncIterator();
return {
async next() {
const result = await iterator.next();
return createStreamIteratorWrapper({
iterator,
next: async (streamIterator) => {
const result = await streamIterator.next();
if (!result.done && result.value && typeof result.value === "object") {
await onEvent(result.value as Record<string, unknown>);
}
return result;
},
async return(value?: unknown) {
return iterator.return?.(value) ?? { done: true as const, value: undefined };
},
async throw(error?: unknown) {
return iterator.throw?.(error) ?? { done: true as const, value: undefined };
},
};
});
};
return stream;
}

View File

@@ -1,6 +1,7 @@
import type { StreamFn } from "@mariozechner/pi-agent-core";
import { streamSimple, type AssistantMessageEvent } from "@mariozechner/pi-ai";
import type { PluginTextReplacement, PluginTextTransforms } from "../plugins/cli-backend.types.js";
import { createStreamIteratorWrapper } from "./stream-iterator-wrapper.js";
export function mergePluginTextTransforms(
...transforms: Array<PluginTextTransforms | undefined>
@@ -128,9 +129,10 @@ function wrapStreamTextTransforms(
(stream as { [Symbol.asyncIterator]: typeof originalAsyncIterator })[Symbol.asyncIterator] =
function () {
const iterator = originalAsyncIterator();
return {
async next() {
const result = await iterator.next();
return createStreamIteratorWrapper({
iterator,
next: async (streamIterator) => {
const result = await streamIterator.next();
return result.done
? result
: {
@@ -138,16 +140,7 @@ function wrapStreamTextTransforms(
value: transformAssistantEventText(result.value, replacements),
};
},
async return(value?: unknown) {
return iterator.return?.(value) ?? { done: true as const, value: undefined };
},
async throw(error?: unknown) {
return iterator.throw?.(error) ?? { done: true as const, value: undefined };
},
[Symbol.asyncIterator]() {
return this;
},
};
});
};
return stream;
}

View File

@@ -0,0 +1,35 @@
type StreamIterator<T> = AsyncIterator<T, unknown, unknown>;
type IteratorHandler<T> = (
iterator: StreamIterator<T>,
value?: unknown,
) => IteratorResult<T, unknown> | Promise<IteratorResult<T, unknown>>;
export function createStreamIteratorWrapper<T>(params: {
iterator: StreamIterator<T>;
next: (iterator: StreamIterator<T>) => Promise<IteratorResult<T, unknown>>;
onReturn?: IteratorHandler<T>;
onThrow?: IteratorHandler<T>;
}): AsyncIterableIterator<T> {
const wrapper: AsyncIterableIterator<T> = {
async next() {
return params.next(params.iterator);
},
async return(value?: unknown) {
return (
(await params.onReturn?.(params.iterator, value)) ??
(await params.iterator.return?.(value)) ?? { done: true as const, value: undefined }
);
},
async throw(error?: unknown) {
return (
(await params.onThrow?.(params.iterator, error)) ??
(await params.iterator.throw?.(error)) ?? { done: true as const, value: undefined }
);
},
[Symbol.asyncIterator]() {
return this;
},
};
return wrapper;
}