diff --git a/src/agents/pi-embedded-runner/run/attempt.stop-reason-recovery.ts b/src/agents/pi-embedded-runner/run/attempt.stop-reason-recovery.ts index c3437f9cb8a..d11627fc93e 100644 --- a/src/agents/pi-embedded-runner/run/attempt.stop-reason-recovery.ts +++ b/src/agents/pi-embedded-runner/run/attempt.stop-reason-recovery.ts @@ -1,6 +1,7 @@ import type { StreamFn } from "@mariozechner/pi-agent-core"; import { createAssistantMessageEventStream, streamSimple } from "@mariozechner/pi-ai"; import { formatErrorMessage } from "../../../infra/errors.js"; +import { createStreamIteratorWrapper } from "../../stream-iterator-wrapper.js"; import { buildStreamErrorAssistantMessage } from "../../stream-message-shared.js"; const UNHANDLED_STOP_REASON_RE = /^Unhandled stop reason:\s*(.+)$/i; @@ -90,14 +91,15 @@ function wrapStreamHandleUnhandledStopReason( function () { const iterator = originalAsyncIterator(); let emittedSyntheticTerminal = false; - return { - async next() { + return createStreamIteratorWrapper({ + iterator, + next: async (streamIterator) => { if (emittedSyntheticTerminal) { return { done: true as const, value: undefined }; } try { - const result = await iterator.next(); + const result = await streamIterator.next(); if (!result.done && result.value && typeof result.value === "object") { const event = result.value as { error?: unknown }; patchUnhandledStopReasonInAssistantMessage(event.error); @@ -126,16 +128,7 @@ function wrapStreamHandleUnhandledStopReason( }; } }, - async return(value?: unknown) { - return iterator.return?.(value) ?? { done: true as const, value: undefined }; - }, - async throw(error?: unknown) { - return iterator.throw?.(error) ?? { done: true as const, value: undefined }; - }, - [Symbol.asyncIterator]() { - return this; - }, - }; + }); }; return stream; diff --git a/src/agents/pi-embedded-runner/run/llm-idle-timeout.ts b/src/agents/pi-embedded-runner/run/llm-idle-timeout.ts index 546844473a8..ab16eed42da 100644 --- a/src/agents/pi-embedded-runner/run/llm-idle-timeout.ts +++ b/src/agents/pi-embedded-runner/run/llm-idle-timeout.ts @@ -2,6 +2,7 @@ import type { StreamFn } from "@mariozechner/pi-agent-core"; import { streamSimple } from "@mariozechner/pi-ai"; import { DEFAULT_LLM_IDLE_TIMEOUT_SECONDS } from "../../../config/agent-timeout-defaults.js"; import type { OpenClawConfig } from "../../../config/types.openclaw.js"; +import { createStreamIteratorWrapper } from "../../stream-iterator-wrapper.js"; import type { EmbeddedRunTrigger } from "./params.js"; /** @@ -100,13 +101,14 @@ export function streamWithIdleTimeout( } }; - return { - async next() { + return createStreamIteratorWrapper({ + iterator, + next: async (streamIterator) => { clearTimer(); try { // Race between the actual next() and the timeout - const result = await Promise.race([iterator.next(), createTimeoutPromise()]); + const result = await Promise.race([streamIterator.next(), createTimeoutPromise()]); if (result.done) { clearTimer(); @@ -120,17 +122,15 @@ export function streamWithIdleTimeout( throw error; } }, - - return() { + onReturn(streamIterator) { clearTimer(); - return iterator.return?.() ?? Promise.resolve({ done: true, value: undefined }); + return streamIterator.return?.() ?? Promise.resolve({ done: true, value: undefined }); }, - - throw(error?: unknown) { + onThrow(streamIterator, error) { clearTimer(); - return iterator.throw?.(error) ?? Promise.reject(error); + return streamIterator.throw?.(error) ?? Promise.reject(error); }, - }; + }); }; return stream; diff --git a/src/agents/pi-embedded-runner/run/stream-wrapper.ts b/src/agents/pi-embedded-runner/run/stream-wrapper.ts index 6af05a4eeff..2273873d6db 100644 --- a/src/agents/pi-embedded-runner/run/stream-wrapper.ts +++ b/src/agents/pi-embedded-runner/run/stream-wrapper.ts @@ -1,4 +1,5 @@ import { streamSimple } from "@mariozechner/pi-ai"; +import { createStreamIteratorWrapper } from "../../stream-iterator-wrapper.js"; type SimpleStream = ReturnType; @@ -10,21 +11,16 @@ export function wrapStreamObjectEvents( (stream as { [Symbol.asyncIterator]: typeof originalAsyncIterator })[Symbol.asyncIterator] = function () { const iterator = originalAsyncIterator(); - return { - async next() { - const result = await iterator.next(); + return createStreamIteratorWrapper({ + iterator, + next: async (streamIterator) => { + const result = await streamIterator.next(); if (!result.done && result.value && typeof result.value === "object") { await onEvent(result.value as Record); } return result; }, - async return(value?: unknown) { - return iterator.return?.(value) ?? { done: true as const, value: undefined }; - }, - async throw(error?: unknown) { - return iterator.throw?.(error) ?? { done: true as const, value: undefined }; - }, - }; + }); }; return stream; } diff --git a/src/agents/plugin-text-transforms.ts b/src/agents/plugin-text-transforms.ts index 27ee881336e..826a56d4129 100644 --- a/src/agents/plugin-text-transforms.ts +++ b/src/agents/plugin-text-transforms.ts @@ -1,6 +1,7 @@ import type { StreamFn } from "@mariozechner/pi-agent-core"; import { streamSimple, type AssistantMessageEvent } from "@mariozechner/pi-ai"; import type { PluginTextReplacement, PluginTextTransforms } from "../plugins/cli-backend.types.js"; +import { createStreamIteratorWrapper } from "./stream-iterator-wrapper.js"; export function mergePluginTextTransforms( ...transforms: Array @@ -128,9 +129,10 @@ function wrapStreamTextTransforms( (stream as { [Symbol.asyncIterator]: typeof originalAsyncIterator })[Symbol.asyncIterator] = function () { const iterator = originalAsyncIterator(); - return { - async next() { - const result = await iterator.next(); + return createStreamIteratorWrapper({ + iterator, + next: async (streamIterator) => { + const result = await streamIterator.next(); return result.done ? result : { @@ -138,16 +140,7 @@ function wrapStreamTextTransforms( value: transformAssistantEventText(result.value, replacements), }; }, - async return(value?: unknown) { - return iterator.return?.(value) ?? { done: true as const, value: undefined }; - }, - async throw(error?: unknown) { - return iterator.throw?.(error) ?? { done: true as const, value: undefined }; - }, - [Symbol.asyncIterator]() { - return this; - }, - }; + }); }; return stream; } diff --git a/src/agents/stream-iterator-wrapper.ts b/src/agents/stream-iterator-wrapper.ts new file mode 100644 index 00000000000..42cea7d6683 --- /dev/null +++ b/src/agents/stream-iterator-wrapper.ts @@ -0,0 +1,35 @@ +type StreamIterator = AsyncIterator; + +type IteratorHandler = ( + iterator: StreamIterator, + value?: unknown, +) => IteratorResult | Promise>; + +export function createStreamIteratorWrapper(params: { + iterator: StreamIterator; + next: (iterator: StreamIterator) => Promise>; + onReturn?: IteratorHandler; + onThrow?: IteratorHandler; +}): AsyncIterableIterator { + const wrapper: AsyncIterableIterator = { + async next() { + return params.next(params.iterator); + }, + async return(value?: unknown) { + return ( + (await params.onReturn?.(params.iterator, value)) ?? + (await params.iterator.return?.(value)) ?? { done: true as const, value: undefined } + ); + }, + async throw(error?: unknown) { + return ( + (await params.onThrow?.(params.iterator, error)) ?? + (await params.iterator.throw?.(error)) ?? { done: true as const, value: undefined } + ); + }, + [Symbol.asyncIterator]() { + return this; + }, + }; + return wrapper; +}