mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-23 06:28:08 +00:00
* fix(opencode-go): abort stalled SSE streams at provider-owned raw boundary opencode-go routes through the shared OpenAI-compatible completions provider, where a stalled SSE socket (provider emits tokens then never closes the stream) hangs the gateway until stuckSessionAbortMs (~622s) and surfaces as 'LLM request failed' / 'Request was aborted'. Issue #93610 reports ~90% of opencode-go cron jobs failing intermittently this way. Add a provider-owned stream wrapper at the opencode-go raw SSE boundary that injects an AbortController into the underlying OpenAI SDK request and aborts it after a configurable idle window (default 30s, far below 622s) elapses without any forward-progress event. The wrapper is: - Provider-scoped: only applies when model.provider === 'opencode-go'; the shared openai-completions.ts path is untouched. - Abortable: calls controller.abort() on the injected AbortSignal, which propagates through OpenAI SDK requestOptions.signal and genuinely interrupts the underlying fetch/stream (not just iterator return()). - Idle-based: every event (text/tool/thinking delta, including delayed usage-only chunks) refreshes the timer; natural completion (done/error) cancels it. Normal delayed usage-only completion is preserved. - Boundary-terminal: pushes a terminal { type: 'error', reason: 'aborted' } event downstream so consumers do not hang. TDD: stream-termination.test.ts covers (a) stalled stream after first progress is aborted within the idle window with a downstream 'aborted' terminal event, and (b) normal delayed completion within the idle window is not aborted and the done event is forwarded unchanged. * fix(opencode-go): align stalled-stream idle default with runtime (120s) Match the runtime's shared `DEFAULT_LLM_IDLE_TIMEOUT_MS` (120s) so non-cron interactive opencode-go runs see no behavior change versus the existing watchdog. Cron runs — for which the runtime disables its idle watchdog entirely (`resolveLlmIdleTimeoutMs` returns 0 when trigger is cron and no explicit timeout is set) — still get provider-owned termination well before the ~622s stuck-session recovery. Refs #93610 * fix(opencode-go): satisfy CI lint and test type checks - Remove unnecessary `?? {}` fallback in spread (oxlint no-useless-fallback-in-spread). - Drop non-narrowing `!` on the wrapper return type; use `await Promise.resolve(...)` to collapse the `StreamLike | Promise<StreamLike>` union before `for await`. Refs #93610 * fix(opencode-go): arm stalled-stream idle timer only after first event The wrapper armed the idle timer before the first upstream event, which would mis-abort slow time-to-first-byte requests — including the opencode-go cron runs that the runtime deliberately leaves uncapped via resolveLlmIdleTimeoutMs. Arm only after the first forwarded event, and add regression coverage for the slow-first-event path. * fix(opencode-go): cover stalled stream first event * fix(opencode-go): respect explicit stream timeout * fix(opencode-go): preserve first-event timer after synthetic start * fix(opencode-go): satisfy stream termination test lint * fix(opencode-go): distinguish synthetic stream preambles * fix(opencode-go): route stalled streams through failover
372 lines
12 KiB
TypeScript
372 lines
12 KiB
TypeScript
// Opencode Go stream termination wrapper aborts stalled OpenAI-compatible
|
|
// SSE streams at the provider-owned raw boundary, before the shared runtime
|
|
// stuck-session recovery kicks in.
|
|
import type { AssistantMessage, AssistantMessageEvent } from "openclaw/plugin-sdk/llm";
|
|
import { createAssistantMessageEventStream } from "openclaw/plugin-sdk/llm";
|
|
import type { ProviderWrapStreamFnContext } from "openclaw/plugin-sdk/plugin-entry";
|
|
|
|
type ProviderStreamFn = NonNullable<ProviderWrapStreamFnContext["streamFn"]>;
|
|
|
|
export interface OpencodeGoStalledStreamWrapperOptions {
|
|
/**
|
|
* Provider id this wrapper applies to. Calls whose model.provider does not
|
|
* match are forwarded untouched so the wrapper stays provider-scoped.
|
|
*/
|
|
provider: string;
|
|
/**
|
|
* Maximum idle window between two stream events before the wrapper treats
|
|
* the underlying SSE as stalled and aborts it. Must be > 0.
|
|
*/
|
|
idleTimeoutMs: number;
|
|
/**
|
|
* Maximum window for stream creation and first event delivery. Must be > 0.
|
|
*/
|
|
firstEventTimeoutMs?: number;
|
|
}
|
|
|
|
/**
|
|
* Default idle window used in production. Matches the runtime's shared
|
|
* `DEFAULT_LLM_IDLE_TIMEOUT_MS` (120s) so non-cron interactive runs see
|
|
* no behavior change versus the existing watchdog, while cron runs — for
|
|
* which the runtime disables its idle watchdog entirely
|
|
* (`resolveLlmIdleTimeoutMs` returns 0 when `trigger === "cron"` and no
|
|
* explicit timeout is set) — finally get a provider-owned termination
|
|
* well before the ~622s stuck-session recovery kicks in.
|
|
*/
|
|
export const OPENCODE_GO_STREAM_IDLE_TIMEOUT_MS_DEFAULT = 120_000;
|
|
|
|
export const OPENCODE_GO_STREAM_FIRST_EVENT_TIMEOUT_MS_DEFAULT = 300_000;
|
|
|
|
function isOpencodeGoModel(model: unknown, providerId: string): boolean {
|
|
return Boolean(model) && typeof model === "object"
|
|
? (model as { provider?: unknown }).provider === providerId
|
|
: false;
|
|
}
|
|
|
|
function validTimeoutMs(value: unknown): number | undefined {
|
|
return typeof value === "number" && Number.isFinite(value) && value > 0 ? value : undefined;
|
|
}
|
|
|
|
function resolveTimeoutMs(model: unknown, fallbackMs: number): number {
|
|
return validTimeoutMs((model as { requestTimeoutMs?: unknown })?.requestTimeoutMs) ?? fallbackMs;
|
|
}
|
|
|
|
function isProviderProgressEvent(event: AssistantMessageEvent): boolean {
|
|
return (
|
|
event.type === "text_delta" ||
|
|
event.type === "thinking_delta" ||
|
|
event.type === "toolcall_delta"
|
|
);
|
|
}
|
|
|
|
function combineAbortSignals(signals: (AbortSignal | undefined)[]): {
|
|
signal: AbortSignal;
|
|
cleanup(): void;
|
|
} {
|
|
const present = signals.filter((signal): signal is AbortSignal => Boolean(signal));
|
|
if (present.length === 0) {
|
|
return { signal: new AbortController().signal, cleanup: () => undefined };
|
|
}
|
|
if (present.length === 1) {
|
|
return { signal: present[0], cleanup: () => undefined };
|
|
}
|
|
const anyFn = (
|
|
AbortSignal as unknown as {
|
|
any?: (signals: AbortSignal[]) => AbortSignal;
|
|
}
|
|
).any;
|
|
if (typeof anyFn === "function") {
|
|
return { signal: anyFn(present), cleanup: () => undefined };
|
|
}
|
|
const controller = new AbortController();
|
|
const alreadyAborted = present.find((signal) => signal.aborted);
|
|
if (alreadyAborted) {
|
|
controller.abort((alreadyAborted as { reason?: unknown }).reason);
|
|
return { signal: controller.signal, cleanup: () => undefined };
|
|
}
|
|
const unsubscribe: Array<() => void> = [];
|
|
for (const signal of present) {
|
|
const onAbort = () => controller.abort((signal as { reason?: unknown }).reason);
|
|
signal.addEventListener("abort", onAbort, { once: true });
|
|
unsubscribe.push(() => signal.removeEventListener("abort", onAbort));
|
|
}
|
|
return {
|
|
signal: controller.signal,
|
|
cleanup() {
|
|
for (const remove of unsubscribe) {
|
|
remove();
|
|
}
|
|
unsubscribe.length = 0;
|
|
},
|
|
};
|
|
}
|
|
|
|
const STALLED_STREAM_ERROR_MESSAGE =
|
|
"opencode-go stream timed out after provider-owned SSE boundary stalled";
|
|
|
|
function buildStalledErrorEvent(partial: AssistantMessage | undefined): AssistantMessageEvent {
|
|
if (partial) {
|
|
return {
|
|
type: "error",
|
|
reason: "error",
|
|
error: {
|
|
...partial,
|
|
stopReason: "error",
|
|
errorMessage: STALLED_STREAM_ERROR_MESSAGE,
|
|
},
|
|
};
|
|
}
|
|
return {
|
|
type: "error",
|
|
reason: "error",
|
|
error: synthesizeMinimalAssistantMessage(STALLED_STREAM_ERROR_MESSAGE, "error"),
|
|
};
|
|
}
|
|
|
|
function buildUnterminatedErrorEvent(partial: AssistantMessage | undefined): AssistantMessageEvent {
|
|
if (partial) {
|
|
return {
|
|
type: "error",
|
|
reason: "error",
|
|
error: {
|
|
...partial,
|
|
stopReason: "error",
|
|
errorMessage: "opencode-go stream ended without a terminal event",
|
|
},
|
|
};
|
|
}
|
|
return {
|
|
type: "error",
|
|
reason: "error",
|
|
error: synthesizeMinimalAssistantMessage(
|
|
"opencode-go stream ended without a terminal event",
|
|
"error",
|
|
),
|
|
};
|
|
}
|
|
|
|
function buildCaughtErrorEvent(
|
|
partial: AssistantMessage | undefined,
|
|
error: unknown,
|
|
): AssistantMessageEvent {
|
|
const message = error instanceof Error ? error.message : String(error);
|
|
if (partial) {
|
|
return {
|
|
type: "error",
|
|
reason: "error",
|
|
error: {
|
|
...partial,
|
|
stopReason: "error",
|
|
errorMessage: message,
|
|
},
|
|
};
|
|
}
|
|
return {
|
|
type: "error",
|
|
reason: "error",
|
|
error: synthesizeMinimalAssistantMessage(message, "error"),
|
|
};
|
|
}
|
|
|
|
function synthesizeMinimalAssistantMessage(
|
|
errorMessage: string,
|
|
stopReason: AssistantMessage["stopReason"],
|
|
): AssistantMessage {
|
|
return {
|
|
role: "assistant",
|
|
content: [],
|
|
api: "openai-completions",
|
|
provider: "opencode-go",
|
|
model: "",
|
|
usage: {
|
|
input: 0,
|
|
output: 0,
|
|
cacheRead: 0,
|
|
cacheWrite: 0,
|
|
totalTokens: 0,
|
|
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
|
},
|
|
stopReason,
|
|
errorMessage,
|
|
timestamp: Date.now(),
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Wraps an opencode-go provider stream function so that an SSE socket that
|
|
* fails to deliver a first event or stops producing progress is aborted at the
|
|
* provider-owned raw boundary via the injected AbortSignal, instead of waiting
|
|
* for the much later shared runtime stuck-session recovery.
|
|
*
|
|
* Behavior:
|
|
* - Provider-scoped: only applies when `model.provider === options.provider`.
|
|
* - Idle-based: the timer covers stream creation, first event delivery, and
|
|
* every gap after provider progress begins; if no event arrives within
|
|
* `idleTimeoutMs`, the wrapper calls `controller.abort()` on the AbortSignal
|
|
* injected into the underlying call (so the OpenAI SDK request is genuinely
|
|
* interrupted, not just the iterator) and pushes a terminal `error` event
|
|
* downstream.
|
|
* - Terminal-safe: when the underlying stream emits `done` or `error`, the
|
|
* wrapper forwards the event, clears all timers, and ends the stream.
|
|
*
|
|
* The wrapper never shortens the natural end of a normal completion, because
|
|
* provider progress refreshes the idle timer and a terminal event cancels it entirely.
|
|
*/
|
|
export function createOpencodeGoStalledStreamWrapper(
|
|
underlying: ProviderStreamFn,
|
|
options: OpencodeGoStalledStreamWrapperOptions,
|
|
): ProviderStreamFn {
|
|
if (!options || options.idleTimeoutMs <= 0) {
|
|
throw new Error("createOpencodeGoStalledStreamWrapper requires idleTimeoutMs > 0");
|
|
}
|
|
if (options.firstEventTimeoutMs !== undefined && options.firstEventTimeoutMs <= 0) {
|
|
throw new Error("createOpencodeGoStalledStreamWrapper requires firstEventTimeoutMs > 0");
|
|
}
|
|
const providerId = options.provider;
|
|
const idleTimeoutMsDefault = options.idleTimeoutMs;
|
|
const firstEventTimeoutMsDefault = options.firstEventTimeoutMs ?? options.idleTimeoutMs;
|
|
|
|
return (model, context, callOptions) => {
|
|
if (!isOpencodeGoModel(model, providerId)) {
|
|
return underlying(model, context, callOptions);
|
|
}
|
|
|
|
const output = createAssistantMessageEventStream();
|
|
const idleTimeoutMs = resolveTimeoutMs(model, idleTimeoutMsDefault);
|
|
const firstEventTimeoutMs = resolveTimeoutMs(model, firstEventTimeoutMsDefault);
|
|
const controller = new AbortController();
|
|
const combinedSignal = combineAbortSignals([
|
|
(callOptions as { signal?: AbortSignal } | undefined)?.signal,
|
|
controller.signal,
|
|
]);
|
|
const wrappedOptions = {
|
|
...callOptions,
|
|
signal: combinedSignal.signal,
|
|
};
|
|
let idleTimer: ReturnType<typeof setTimeout> | undefined;
|
|
let lastSeenPartial: AssistantMessage | undefined;
|
|
let settled = false;
|
|
let baseIterator: AsyncIterator<AssistantMessageEvent> | undefined;
|
|
|
|
const clearIdleTimer = () => {
|
|
if (idleTimer !== undefined) {
|
|
clearTimeout(idleTimer);
|
|
idleTimer = undefined;
|
|
}
|
|
};
|
|
|
|
const cleanup = () => {
|
|
clearIdleTimer();
|
|
combinedSignal.cleanup();
|
|
};
|
|
|
|
const releaseBaseStream = () => {
|
|
if (baseIterator?.return) {
|
|
void Promise.resolve(baseIterator.return()).catch(() => undefined);
|
|
}
|
|
};
|
|
|
|
const finishWith = (event: AssistantMessageEvent) => {
|
|
if (settled) {
|
|
return;
|
|
}
|
|
settled = true;
|
|
cleanup();
|
|
output.push(event);
|
|
output.end(
|
|
event.type === "done" ? (event as { message: AssistantMessage }).message : undefined,
|
|
);
|
|
};
|
|
|
|
const abortStalledStream = () => {
|
|
if (settled) {
|
|
return;
|
|
}
|
|
settled = true;
|
|
clearIdleTimer();
|
|
controller.abort(new Error("opencode-go stream stalled"));
|
|
combinedSignal.cleanup();
|
|
releaseBaseStream();
|
|
output.push(buildStalledErrorEvent(lastSeenPartial));
|
|
output.end();
|
|
};
|
|
|
|
const armTimer = (timeoutMs: number) => {
|
|
clearIdleTimer();
|
|
idleTimer = setTimeout(abortStalledStream, timeoutMs);
|
|
idleTimer.unref?.();
|
|
};
|
|
|
|
const armFirstEventTimer = () => armTimer(firstEventTimeoutMs);
|
|
|
|
const armIdleTimer = () => armTimer(idleTimeoutMs);
|
|
|
|
const trackPartial = (event: AssistantMessageEvent) => {
|
|
const partial =
|
|
(event as { partial?: AssistantMessage; message?: AssistantMessage }).partial ??
|
|
(event as { message?: AssistantMessage }).message;
|
|
if (partial) {
|
|
lastSeenPartial = partial;
|
|
}
|
|
};
|
|
|
|
const releaseResolvedStream = (baseStream: AsyncIterable<AssistantMessageEvent>) => {
|
|
const iterator = baseStream[Symbol.asyncIterator]();
|
|
if (iterator.return) {
|
|
void Promise.resolve(iterator.return()).catch(() => undefined);
|
|
}
|
|
};
|
|
|
|
armFirstEventTimer();
|
|
let baseStreamResult: ReturnType<ProviderStreamFn>;
|
|
try {
|
|
baseStreamResult = underlying(model, context, wrappedOptions);
|
|
} catch (error) {
|
|
cleanup();
|
|
throw error;
|
|
}
|
|
|
|
void (async () => {
|
|
try {
|
|
const baseStream = await Promise.resolve(
|
|
baseStreamResult as Awaited<ReturnType<ProviderStreamFn>>,
|
|
);
|
|
if (settled) {
|
|
releaseResolvedStream(baseStream as AsyncIterable<AssistantMessageEvent>);
|
|
return;
|
|
}
|
|
baseIterator = (baseStream as AsyncIterable<AssistantMessageEvent>)[Symbol.asyncIterator]();
|
|
for (;;) {
|
|
const result = await baseIterator.next();
|
|
if (settled) {
|
|
return;
|
|
}
|
|
if (result.done) {
|
|
finishWith(buildUnterminatedErrorEvent(lastSeenPartial));
|
|
return;
|
|
}
|
|
const event = result.value;
|
|
if (event.type === "done" || event.type === "error") {
|
|
trackPartial(event);
|
|
finishWith(event);
|
|
return;
|
|
}
|
|
trackPartial(event);
|
|
output.push(event);
|
|
if (isProviderProgressEvent(event)) {
|
|
armIdleTimer();
|
|
}
|
|
}
|
|
} catch (error) {
|
|
if (!settled) {
|
|
finishWith(buildCaughtErrorEvent(lastSeenPartial, error));
|
|
}
|
|
} finally {
|
|
cleanup();
|
|
}
|
|
})();
|
|
|
|
return output;
|
|
};
|
|
}
|