Files
openclaw/extensions/opencode-go/stream-termination.ts
zhang-guiping 769579bcf0 fix(opencode-go): streaming completes when provider ends responses (#93965)
* fix(opencode-go): abort stalled SSE streams at provider-owned raw boundary

opencode-go routes through the shared OpenAI-compatible completions provider,
where a stalled SSE socket (provider emits tokens then never closes the stream)
hangs the gateway until stuckSessionAbortMs (~622s) and surfaces as
'LLM request failed' / 'Request was aborted'. Issue #93610 reports ~90% of
opencode-go cron jobs failing intermittently this way.

Add a provider-owned stream wrapper at the opencode-go raw SSE boundary that
injects an AbortController into the underlying OpenAI SDK request and aborts
it after a configurable idle window (default 30s, far below 622s) elapses
without any forward-progress event. The wrapper is:

- Provider-scoped: only applies when model.provider === 'opencode-go'; the
  shared openai-completions.ts path is untouched.
- Abortable: calls controller.abort() on the injected AbortSignal, which
  propagates through OpenAI SDK requestOptions.signal and genuinely
  interrupts the underlying fetch/stream (not just iterator return()).
- Idle-based: every event (text/tool/thinking delta, including delayed
  usage-only chunks) refreshes the timer; natural completion (done/error)
  cancels it. Normal delayed usage-only completion is preserved.
- Boundary-terminal: pushes a terminal { type: 'error', reason: 'aborted' }
  event downstream so consumers do not hang.

TDD: stream-termination.test.ts covers (a) stalled stream after first
progress is aborted within the idle window with a downstream 'aborted'
terminal event, and (b) normal delayed completion within the idle window
is not aborted and the done event is forwarded unchanged.

* fix(opencode-go): align stalled-stream idle default with runtime (120s)

Match the runtime's shared `DEFAULT_LLM_IDLE_TIMEOUT_MS` (120s) so
non-cron interactive opencode-go runs see no behavior change versus the
existing watchdog. Cron runs — for which the runtime disables its idle
watchdog entirely (`resolveLlmIdleTimeoutMs` returns 0 when trigger is
cron and no explicit timeout is set) — still get provider-owned
termination well before the ~622s stuck-session recovery.

Refs #93610

* fix(opencode-go): satisfy CI lint and test type checks

- Remove unnecessary `?? {}` fallback in spread (oxlint
  no-useless-fallback-in-spread).
- Drop non-narrowing `!` on the wrapper return type; use
  `await Promise.resolve(...)` to collapse the
  `StreamLike | Promise<StreamLike>` union before `for await`.

Refs #93610

* fix(opencode-go): arm stalled-stream idle timer only after first event

The wrapper armed the idle timer before the first upstream event, which
would mis-abort slow time-to-first-byte requests — including the
opencode-go cron runs that the runtime deliberately leaves uncapped via
resolveLlmIdleTimeoutMs. Arm only after the first forwarded event, and
add regression coverage for the slow-first-event path.

* fix(opencode-go): cover stalled stream first event

* fix(opencode-go): respect explicit stream timeout

* fix(opencode-go): preserve first-event timer after synthetic start

* fix(opencode-go): satisfy stream termination test lint

* fix(opencode-go): distinguish synthetic stream preambles

* fix(opencode-go): route stalled streams through failover
2026-06-22 19:57:21 +00:00

372 lines
12 KiB
TypeScript

// Opencode Go stream termination wrapper aborts stalled OpenAI-compatible
// SSE streams at the provider-owned raw boundary, before the shared runtime
// stuck-session recovery kicks in.
import type { AssistantMessage, AssistantMessageEvent } from "openclaw/plugin-sdk/llm";
import { createAssistantMessageEventStream } from "openclaw/plugin-sdk/llm";
import type { ProviderWrapStreamFnContext } from "openclaw/plugin-sdk/plugin-entry";
type ProviderStreamFn = NonNullable<ProviderWrapStreamFnContext["streamFn"]>;
export interface OpencodeGoStalledStreamWrapperOptions {
/**
* Provider id this wrapper applies to. Calls whose model.provider does not
* match are forwarded untouched so the wrapper stays provider-scoped.
*/
provider: string;
/**
* Maximum idle window between two stream events before the wrapper treats
* the underlying SSE as stalled and aborts it. Must be > 0.
*/
idleTimeoutMs: number;
/**
* Maximum window for stream creation and first event delivery. Must be > 0.
*/
firstEventTimeoutMs?: number;
}
/**
* Default idle window used in production. Matches the runtime's shared
* `DEFAULT_LLM_IDLE_TIMEOUT_MS` (120s) so non-cron interactive runs see
* no behavior change versus the existing watchdog, while cron runs — for
* which the runtime disables its idle watchdog entirely
* (`resolveLlmIdleTimeoutMs` returns 0 when `trigger === "cron"` and no
* explicit timeout is set) — finally get a provider-owned termination
* well before the ~622s stuck-session recovery kicks in.
*/
export const OPENCODE_GO_STREAM_IDLE_TIMEOUT_MS_DEFAULT = 120_000;
export const OPENCODE_GO_STREAM_FIRST_EVENT_TIMEOUT_MS_DEFAULT = 300_000;
function isOpencodeGoModel(model: unknown, providerId: string): boolean {
return Boolean(model) && typeof model === "object"
? (model as { provider?: unknown }).provider === providerId
: false;
}
function validTimeoutMs(value: unknown): number | undefined {
return typeof value === "number" && Number.isFinite(value) && value > 0 ? value : undefined;
}
function resolveTimeoutMs(model: unknown, fallbackMs: number): number {
return validTimeoutMs((model as { requestTimeoutMs?: unknown })?.requestTimeoutMs) ?? fallbackMs;
}
function isProviderProgressEvent(event: AssistantMessageEvent): boolean {
return (
event.type === "text_delta" ||
event.type === "thinking_delta" ||
event.type === "toolcall_delta"
);
}
function combineAbortSignals(signals: (AbortSignal | undefined)[]): {
signal: AbortSignal;
cleanup(): void;
} {
const present = signals.filter((signal): signal is AbortSignal => Boolean(signal));
if (present.length === 0) {
return { signal: new AbortController().signal, cleanup: () => undefined };
}
if (present.length === 1) {
return { signal: present[0], cleanup: () => undefined };
}
const anyFn = (
AbortSignal as unknown as {
any?: (signals: AbortSignal[]) => AbortSignal;
}
).any;
if (typeof anyFn === "function") {
return { signal: anyFn(present), cleanup: () => undefined };
}
const controller = new AbortController();
const alreadyAborted = present.find((signal) => signal.aborted);
if (alreadyAborted) {
controller.abort((alreadyAborted as { reason?: unknown }).reason);
return { signal: controller.signal, cleanup: () => undefined };
}
const unsubscribe: Array<() => void> = [];
for (const signal of present) {
const onAbort = () => controller.abort((signal as { reason?: unknown }).reason);
signal.addEventListener("abort", onAbort, { once: true });
unsubscribe.push(() => signal.removeEventListener("abort", onAbort));
}
return {
signal: controller.signal,
cleanup() {
for (const remove of unsubscribe) {
remove();
}
unsubscribe.length = 0;
},
};
}
const STALLED_STREAM_ERROR_MESSAGE =
"opencode-go stream timed out after provider-owned SSE boundary stalled";
function buildStalledErrorEvent(partial: AssistantMessage | undefined): AssistantMessageEvent {
if (partial) {
return {
type: "error",
reason: "error",
error: {
...partial,
stopReason: "error",
errorMessage: STALLED_STREAM_ERROR_MESSAGE,
},
};
}
return {
type: "error",
reason: "error",
error: synthesizeMinimalAssistantMessage(STALLED_STREAM_ERROR_MESSAGE, "error"),
};
}
function buildUnterminatedErrorEvent(partial: AssistantMessage | undefined): AssistantMessageEvent {
if (partial) {
return {
type: "error",
reason: "error",
error: {
...partial,
stopReason: "error",
errorMessage: "opencode-go stream ended without a terminal event",
},
};
}
return {
type: "error",
reason: "error",
error: synthesizeMinimalAssistantMessage(
"opencode-go stream ended without a terminal event",
"error",
),
};
}
function buildCaughtErrorEvent(
partial: AssistantMessage | undefined,
error: unknown,
): AssistantMessageEvent {
const message = error instanceof Error ? error.message : String(error);
if (partial) {
return {
type: "error",
reason: "error",
error: {
...partial,
stopReason: "error",
errorMessage: message,
},
};
}
return {
type: "error",
reason: "error",
error: synthesizeMinimalAssistantMessage(message, "error"),
};
}
function synthesizeMinimalAssistantMessage(
errorMessage: string,
stopReason: AssistantMessage["stopReason"],
): AssistantMessage {
return {
role: "assistant",
content: [],
api: "openai-completions",
provider: "opencode-go",
model: "",
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
stopReason,
errorMessage,
timestamp: Date.now(),
};
}
/**
* Wraps an opencode-go provider stream function so that an SSE socket that
* fails to deliver a first event or stops producing progress is aborted at the
* provider-owned raw boundary via the injected AbortSignal, instead of waiting
* for the much later shared runtime stuck-session recovery.
*
* Behavior:
* - Provider-scoped: only applies when `model.provider === options.provider`.
* - Idle-based: the timer covers stream creation, first event delivery, and
* every gap after provider progress begins; if no event arrives within
* `idleTimeoutMs`, the wrapper calls `controller.abort()` on the AbortSignal
* injected into the underlying call (so the OpenAI SDK request is genuinely
* interrupted, not just the iterator) and pushes a terminal `error` event
* downstream.
* - Terminal-safe: when the underlying stream emits `done` or `error`, the
* wrapper forwards the event, clears all timers, and ends the stream.
*
* The wrapper never shortens the natural end of a normal completion, because
* provider progress refreshes the idle timer and a terminal event cancels it entirely.
*/
export function createOpencodeGoStalledStreamWrapper(
underlying: ProviderStreamFn,
options: OpencodeGoStalledStreamWrapperOptions,
): ProviderStreamFn {
if (!options || options.idleTimeoutMs <= 0) {
throw new Error("createOpencodeGoStalledStreamWrapper requires idleTimeoutMs > 0");
}
if (options.firstEventTimeoutMs !== undefined && options.firstEventTimeoutMs <= 0) {
throw new Error("createOpencodeGoStalledStreamWrapper requires firstEventTimeoutMs > 0");
}
const providerId = options.provider;
const idleTimeoutMsDefault = options.idleTimeoutMs;
const firstEventTimeoutMsDefault = options.firstEventTimeoutMs ?? options.idleTimeoutMs;
return (model, context, callOptions) => {
if (!isOpencodeGoModel(model, providerId)) {
return underlying(model, context, callOptions);
}
const output = createAssistantMessageEventStream();
const idleTimeoutMs = resolveTimeoutMs(model, idleTimeoutMsDefault);
const firstEventTimeoutMs = resolveTimeoutMs(model, firstEventTimeoutMsDefault);
const controller = new AbortController();
const combinedSignal = combineAbortSignals([
(callOptions as { signal?: AbortSignal } | undefined)?.signal,
controller.signal,
]);
const wrappedOptions = {
...callOptions,
signal: combinedSignal.signal,
};
let idleTimer: ReturnType<typeof setTimeout> | undefined;
let lastSeenPartial: AssistantMessage | undefined;
let settled = false;
let baseIterator: AsyncIterator<AssistantMessageEvent> | undefined;
const clearIdleTimer = () => {
if (idleTimer !== undefined) {
clearTimeout(idleTimer);
idleTimer = undefined;
}
};
const cleanup = () => {
clearIdleTimer();
combinedSignal.cleanup();
};
const releaseBaseStream = () => {
if (baseIterator?.return) {
void Promise.resolve(baseIterator.return()).catch(() => undefined);
}
};
const finishWith = (event: AssistantMessageEvent) => {
if (settled) {
return;
}
settled = true;
cleanup();
output.push(event);
output.end(
event.type === "done" ? (event as { message: AssistantMessage }).message : undefined,
);
};
const abortStalledStream = () => {
if (settled) {
return;
}
settled = true;
clearIdleTimer();
controller.abort(new Error("opencode-go stream stalled"));
combinedSignal.cleanup();
releaseBaseStream();
output.push(buildStalledErrorEvent(lastSeenPartial));
output.end();
};
const armTimer = (timeoutMs: number) => {
clearIdleTimer();
idleTimer = setTimeout(abortStalledStream, timeoutMs);
idleTimer.unref?.();
};
const armFirstEventTimer = () => armTimer(firstEventTimeoutMs);
const armIdleTimer = () => armTimer(idleTimeoutMs);
const trackPartial = (event: AssistantMessageEvent) => {
const partial =
(event as { partial?: AssistantMessage; message?: AssistantMessage }).partial ??
(event as { message?: AssistantMessage }).message;
if (partial) {
lastSeenPartial = partial;
}
};
const releaseResolvedStream = (baseStream: AsyncIterable<AssistantMessageEvent>) => {
const iterator = baseStream[Symbol.asyncIterator]();
if (iterator.return) {
void Promise.resolve(iterator.return()).catch(() => undefined);
}
};
armFirstEventTimer();
let baseStreamResult: ReturnType<ProviderStreamFn>;
try {
baseStreamResult = underlying(model, context, wrappedOptions);
} catch (error) {
cleanup();
throw error;
}
void (async () => {
try {
const baseStream = await Promise.resolve(
baseStreamResult as Awaited<ReturnType<ProviderStreamFn>>,
);
if (settled) {
releaseResolvedStream(baseStream as AsyncIterable<AssistantMessageEvent>);
return;
}
baseIterator = (baseStream as AsyncIterable<AssistantMessageEvent>)[Symbol.asyncIterator]();
for (;;) {
const result = await baseIterator.next();
if (settled) {
return;
}
if (result.done) {
finishWith(buildUnterminatedErrorEvent(lastSeenPartial));
return;
}
const event = result.value;
if (event.type === "done" || event.type === "error") {
trackPartial(event);
finishWith(event);
return;
}
trackPartial(event);
output.push(event);
if (isProviderProgressEvent(event)) {
armIdleTimer();
}
}
} catch (error) {
if (!settled) {
finishWith(buildCaughtErrorEvent(lastSeenPartial, error));
}
} finally {
cleanup();
}
})();
return output;
};
}