Files
openclaw/extensions/opencode-go/stream-termination.test.ts
zhang-guiping 769579bcf0 fix(opencode-go): streaming completes when provider ends responses (#93965)
* fix(opencode-go): abort stalled SSE streams at provider-owned raw boundary

opencode-go routes through the shared OpenAI-compatible completions provider,
where a stalled SSE socket (provider emits tokens then never closes the stream)
hangs the gateway until stuckSessionAbortMs (~622s) and surfaces as
'LLM request failed' / 'Request was aborted'. Issue #93610 reports ~90% of
opencode-go cron jobs failing intermittently this way.

Add a provider-owned stream wrapper at the opencode-go raw SSE boundary that
injects an AbortController into the underlying OpenAI SDK request and aborts
it after a configurable idle window (default 30s, far below 622s) elapses
without any forward-progress event. The wrapper is:

- Provider-scoped: only applies when model.provider === 'opencode-go'; the
  shared openai-completions.ts path is untouched.
- Abortable: calls controller.abort() on the injected AbortSignal, which
  propagates through OpenAI SDK requestOptions.signal and genuinely
  interrupts the underlying fetch/stream (not just iterator return()).
- Idle-based: every event (text/tool/thinking delta, including delayed
  usage-only chunks) refreshes the timer; natural completion (done/error)
  cancels it. Normal delayed usage-only completion is preserved.
- Boundary-terminal: pushes a terminal { type: 'error', reason: 'aborted' }
  event downstream so consumers do not hang.

TDD: stream-termination.test.ts covers (a) stalled stream after first
progress is aborted within the idle window with a downstream 'aborted'
terminal event, and (b) normal delayed completion within the idle window
is not aborted and the done event is forwarded unchanged.

* fix(opencode-go): align stalled-stream idle default with runtime (120s)

Match the runtime's shared `DEFAULT_LLM_IDLE_TIMEOUT_MS` (120s) so
non-cron interactive opencode-go runs see no behavior change versus the
existing watchdog. Cron runs — for which the runtime disables its idle
watchdog entirely (`resolveLlmIdleTimeoutMs` returns 0 when trigger is
cron and no explicit timeout is set) — still get provider-owned
termination well before the ~622s stuck-session recovery.

Refs #93610

* fix(opencode-go): satisfy CI lint and test type checks

- Remove unnecessary `?? {}` fallback in spread (oxlint
  no-useless-fallback-in-spread).
- Drop non-narrowing `!` on the wrapper return type; use
  `await Promise.resolve(...)` to collapse the
  `StreamLike | Promise<StreamLike>` union before `for await`.

Refs #93610

* fix(opencode-go): arm stalled-stream idle timer only after first event

The wrapper armed the idle timer before the first upstream event, which
would mis-abort slow time-to-first-byte requests — including the
opencode-go cron runs that the runtime deliberately leaves uncapped via
resolveLlmIdleTimeoutMs. Arm only after the first forwarded event, and
add regression coverage for the slow-first-event path.

* fix(opencode-go): cover stalled stream first event

* fix(opencode-go): respect explicit stream timeout

* fix(opencode-go): preserve first-event timer after synthetic start

* fix(opencode-go): satisfy stream termination test lint

* fix(opencode-go): distinguish synthetic stream preambles

* fix(opencode-go): route stalled streams through failover
2026-06-22 19:57:21 +00:00

717 lines
20 KiB
TypeScript

// Opencode Go stream termination wrapper tests cover provider-owned raw SSE
// boundary behavior for stalled OpenAI-compatible streams.
import type {
AssistantMessageEvent,
AssistantMessageEventStreamContract,
} from "openclaw/plugin-sdk/llm";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { createOpencodeGoStalledStreamWrapper } from "./stream-termination.js";
type AnyEvent = AssistantMessageEvent;
type StreamLike = AssistantMessageEventStreamContract;
interface FakeStreamController {
emit(event: AnyEvent): void;
end(): void;
}
function createFakeBaseStream(): {
stream: StreamLike;
controller: FakeStreamController;
getReturnCalls: () => number;
} {
const queued: IteratorResult<AnyEvent>[] = [];
const waiters: ((result: IteratorResult<AnyEvent>) => void)[] = [];
let finished = false;
let returnCalls = 0;
const iterator: AsyncIterator<AnyEvent> = {
next(): Promise<IteratorResult<AnyEvent>> {
if (queued.length > 0) {
return Promise.resolve(queued.shift()!);
}
if (finished) {
return Promise.resolve({ value: undefined, done: true });
}
return new Promise((resolve) => {
waiters.push(resolve);
});
},
return(): Promise<IteratorResult<AnyEvent>> {
returnCalls += 1;
finished = true;
while (waiters.length > 0) {
waiters.shift()!({ value: undefined, done: true });
}
return Promise.resolve({ value: undefined, done: true });
},
};
const stream: StreamLike = {
[Symbol.asyncIterator]() {
return iterator;
},
push() {
// unused: the wrapper pushes its own events into a separate stream.
},
end() {
// unused: the wrapper ends its own stream.
},
result() {
return Promise.reject(new Error("fake base stream result not used"));
},
};
const controller: FakeStreamController = {
emit(event: AnyEvent) {
const waiter = waiters.shift();
if (waiter) {
waiter({ value: event, done: false });
} else {
queued.push({ value: event, done: false });
}
},
end() {
finished = true;
while (waiters.length > 0) {
waiters.shift()!({ value: undefined, done: true });
}
},
};
return { stream, controller, getReturnCalls: () => returnCalls };
}
function disableAbortSignalAny(): PropertyDescriptor | undefined {
const descriptor = Object.getOwnPropertyDescriptor(AbortSignal, "any");
Object.defineProperty(AbortSignal, "any", {
configurable: true,
value: undefined,
});
return descriptor;
}
function restoreAbortSignalAny(descriptor: PropertyDescriptor | undefined): void {
if (descriptor) {
Object.defineProperty(AbortSignal, "any", descriptor);
} else {
Reflect.deleteProperty(AbortSignal, "any");
}
}
describe("createOpencodeGoStalledStreamWrapper", () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});
it("aborts underlying stream when progress stalls after first delta (raw SSE boundary)", async () => {
// Arrange: a fake base stream that emits a start + one text_delta, then stalls.
const { stream: baseStream, controller } = createFakeBaseStream();
void baseStream;
let abortCalled = false;
const capturedSignals: AbortSignal[] = [];
const underlying = vi.fn((_model, _context, options) => {
if (options?.signal) {
capturedSignals.push(options.signal);
options.signal.addEventListener("abort", () => {
abortCalled = true;
});
}
return baseStream;
});
const wrapper = createOpencodeGoStalledStreamWrapper(underlying as any, {
provider: "opencode-go",
idleTimeoutMs: 5_000,
});
const downstream = await Promise.resolve(
wrapper({ provider: "opencode-go", id: "deepseek-v4-flash" } as any, {} as any, {} as any),
);
expect(downstream).toBeDefined();
if (!downstream) {
return;
}
// Drain wrapper events in the background.
const received: AnyEvent[] = [];
const consumer = (async () => {
for await (const event of downstream) {
received.push(event);
}
})();
// Emit a start + one text delta — that proves the provider side has produced tokens.
const partial = {
role: "assistant",
content: [{ type: "text", text: "hi" }],
stopReason: undefined,
};
controller.emit({ type: "start", partial } as any);
controller.emit({
type: "text_delta",
contentIndex: 0,
delta: "hi",
partial,
} as any);
// Advance wall clock beyond idleTimeoutMs without any new progress.
await vi.advanceTimersByTimeAsync(6_000);
// Assert: wrapper called abort on its injected AbortController (forwarded as options.signal).
expect(capturedSignals).toHaveLength(1);
expect(abortCalled).toBe(true);
// And it pushed a terminal error event to the downstream consumer.
const terminal = received.find(
(event) => event.type === "error" && (event as any).reason === "error",
);
expect(terminal).toBeDefined();
expect((terminal as any)?.error).toMatchObject({
stopReason: "error",
errorMessage: "opencode-go stream timed out after provider-owned SSE boundary stalled",
});
// Cleanup: end base stream so consumer promise resolves.
controller.end();
await consumer;
});
it("uses a longer first-event timeout than the inter-event idle timeout", async () => {
const { stream: baseStream } = createFakeBaseStream();
let abortCalled = false;
const underlying = vi.fn((_model, _context, options) => {
if (options?.signal) {
options.signal.addEventListener("abort", () => {
abortCalled = true;
});
}
return baseStream;
});
const wrapper = createOpencodeGoStalledStreamWrapper(underlying as any, {
provider: "opencode-go",
idleTimeoutMs: 5_000,
firstEventTimeoutMs: 10_000,
});
const downstream = await Promise.resolve(
wrapper({ provider: "opencode-go", id: "deepseek-v4-flash" } as any, {} as any, {} as any),
);
expect(downstream).toBeDefined();
if (!downstream) {
return;
}
const consumer = (async () => {
for await (const event of downstream) {
void event;
}
})();
await vi.advanceTimersByTimeAsync(6_000);
expect(abortCalled).toBe(false);
await vi.advanceTimersByTimeAsync(5_000);
expect(abortCalled).toBe(true);
await consumer;
});
it("keeps the first-event window after an openai-completions synthetic start", async () => {
const { stream: baseStream, controller } = createFakeBaseStream();
let abortCalled = false;
const underlying = vi.fn((_model, _context, options) => {
if (options?.signal) {
options.signal.addEventListener("abort", () => {
abortCalled = true;
});
}
return baseStream;
});
const wrapper = createOpencodeGoStalledStreamWrapper(underlying as any, {
provider: "opencode-go",
idleTimeoutMs: 5_000,
firstEventTimeoutMs: 10_000,
});
const downstream = await Promise.resolve(
wrapper({ provider: "opencode-go", id: "deepseek-v4-flash" } as any, {} as any, {} as any),
);
expect(downstream).toBeDefined();
if (!downstream) {
return;
}
const received: AnyEvent[] = [];
const consumer = (async () => {
for await (const event of downstream) {
received.push(event);
}
})();
const partial = {
role: "assistant",
content: [],
stopReason: undefined,
};
controller.emit({ type: "start", partial } as any);
await vi.advanceTimersByTimeAsync(6_000);
expect(abortCalled).toBe(false);
controller.emit({
type: "text_delta",
contentIndex: 0,
delta: "hello",
partial: {
...partial,
content: [{ type: "text", text: "hello" }],
},
} as any);
controller.emit({
type: "done",
reason: "stop",
message: {
...partial,
content: [{ type: "text", text: "hello" }],
stopReason: "stop",
},
} as any);
await consumer;
expect(abortCalled).toBe(false);
expect(received.some((event) => event.type === "text_delta")).toBe(true);
expect(received.some((event) => event.type === "done")).toBe(true);
});
it("keeps the first-event window after synthetic block-start events until a provider delta", async () => {
const { stream: baseStream, controller } = createFakeBaseStream();
let abortCalled = false;
const underlying = vi.fn((_model, _context, options) => {
if (options?.signal) {
options.signal.addEventListener("abort", () => {
abortCalled = true;
});
}
return baseStream;
});
const wrapper = createOpencodeGoStalledStreamWrapper(underlying as any, {
provider: "opencode-go",
idleTimeoutMs: 5_000,
firstEventTimeoutMs: 10_000,
});
const downstream = await Promise.resolve(
wrapper({ provider: "opencode-go", id: "deepseek-v4-flash" } as any, {} as any, {} as any),
);
expect(downstream).toBeDefined();
if (!downstream) {
return;
}
const received: AnyEvent[] = [];
const consumer = (async () => {
for await (const event of downstream) {
received.push(event);
}
})();
const partial = {
role: "assistant",
content: [{ type: "text", text: "" }],
stopReason: undefined,
};
controller.emit({ type: "start", partial } as any);
controller.emit({ type: "text_start", contentIndex: 0, partial } as any);
await vi.advanceTimersByTimeAsync(6_000);
expect(abortCalled).toBe(false);
const message = {
...partial,
content: [{ type: "text", text: "hello" }],
stopReason: "stop",
};
controller.emit({
type: "text_delta",
contentIndex: 0,
delta: "hello",
partial: message,
} as any);
controller.emit({ type: "done", reason: "stop", message } as any);
await consumer;
expect(abortCalled).toBe(false);
expect(received.some((event) => event.type === "text_delta")).toBe(true);
expect(received.some((event) => event.type === "done")).toBe(true);
});
it("honors explicit opencode-go provider request timeout above the wrapper idle default", async () => {
const { stream: baseStream, controller } = createFakeBaseStream();
let abortCalled = false;
const underlying = vi.fn((_model, _context, options) => {
if (options?.signal) {
options.signal.addEventListener("abort", () => {
abortCalled = true;
});
}
return baseStream;
});
const wrapper = createOpencodeGoStalledStreamWrapper(underlying as any, {
provider: "opencode-go",
idleTimeoutMs: 5_000,
firstEventTimeoutMs: 5_000,
});
const downstream = await Promise.resolve(
wrapper(
{ provider: "opencode-go", id: "deepseek-v4-flash", requestTimeoutMs: 10_000 } as any,
{} as any,
{} as any,
),
);
expect(downstream).toBeDefined();
if (!downstream) {
return;
}
const consumer = (async () => {
for await (const event of downstream) {
void event;
}
})();
const partial = {
role: "assistant",
content: [{ type: "text", text: "slow" }],
stopReason: undefined,
};
controller.emit({ type: "start", partial } as any);
await vi.advanceTimersByTimeAsync(6_000);
expect(abortCalled).toBe(false);
await vi.advanceTimersByTimeAsync(5_000);
expect(abortCalled).toBe(true);
await consumer;
});
it("honors explicit opencode-go provider request timeout below wrapper defaults", async () => {
const { stream: baseStream } = createFakeBaseStream();
let abortCalled = false;
const underlying = vi.fn((_model, _context, options) => {
if (options?.signal) {
options.signal.addEventListener("abort", () => {
abortCalled = true;
});
}
return baseStream;
});
const wrapper = createOpencodeGoStalledStreamWrapper(underlying as any, {
provider: "opencode-go",
idleTimeoutMs: 5_000,
firstEventTimeoutMs: 10_000,
});
const downstream = await Promise.resolve(
wrapper(
{ provider: "opencode-go", id: "deepseek-v4-flash", requestTimeoutMs: 2_000 } as any,
{} as any,
{} as any,
),
);
expect(downstream).toBeDefined();
if (!downstream) {
return;
}
const consumer = (async () => {
for await (const event of downstream) {
void event;
}
})();
await vi.advanceTimersByTimeAsync(2_500);
expect(abortCalled).toBe(true);
await consumer;
});
it("aborts and releases the underlying stream when no first event arrives", async () => {
const { stream: baseStream, getReturnCalls } = createFakeBaseStream();
let abortCalled = false;
const capturedSignals: AbortSignal[] = [];
const underlying = vi.fn((_model, _context, options) => {
if (options?.signal) {
capturedSignals.push(options.signal);
options.signal.addEventListener("abort", () => {
abortCalled = true;
});
}
return baseStream;
});
const wrapper = createOpencodeGoStalledStreamWrapper(underlying as any, {
provider: "opencode-go",
idleTimeoutMs: 5_000,
});
const downstream = await Promise.resolve(
wrapper({ provider: "opencode-go", id: "deepseek-v4-flash" } as any, {} as any, {} as any),
);
expect(downstream).toBeDefined();
if (!downstream) {
return;
}
const received: AnyEvent[] = [];
const consumer = (async () => {
for await (const event of downstream) {
received.push(event);
}
})();
await vi.advanceTimersByTimeAsync(6_000);
expect(capturedSignals).toHaveLength(1);
expect(abortCalled).toBe(true);
expect(getReturnCalls()).toBe(1);
expect(
received.some((event) => event.type === "error" && (event as any).reason === "error"),
).toBe(true);
await consumer;
});
it("aborts stream creation when the upstream stream promise never resolves", async () => {
let abortCalled = false;
const underlying = vi.fn((_model, _context, options) => {
if (options?.signal) {
options.signal.addEventListener("abort", () => {
abortCalled = true;
});
}
return new Promise<StreamLike>(() => {
// keep pending
});
});
const wrapper = createOpencodeGoStalledStreamWrapper(underlying as any, {
provider: "opencode-go",
idleTimeoutMs: 5_000,
});
const downstream = await Promise.resolve(
wrapper({ provider: "opencode-go", id: "deepseek-v4-flash" } as any, {} as any, {} as any),
);
expect(downstream).toBeDefined();
if (!downstream) {
return;
}
const received: AnyEvent[] = [];
const consumer = (async () => {
for await (const event of downstream) {
received.push(event);
}
})();
await vi.advanceTimersByTimeAsync(6_000);
expect(abortCalled).toBe(true);
expect(
received.some((event) => event.type === "error" && (event as any).reason === "error"),
).toBe(true);
await consumer;
});
it("aborts through the fallback combined signal when no first event arrives", async () => {
const abortSignalAnyDescriptor = disableAbortSignalAny();
const { stream: baseStream } = createFakeBaseStream();
let abortCalled = false;
try {
const underlying = vi.fn((_model, _context, options) => {
if (options?.signal) {
options.signal.addEventListener("abort", () => {
abortCalled = true;
});
}
return baseStream;
});
const wrapper = createOpencodeGoStalledStreamWrapper(underlying as any, {
provider: "opencode-go",
idleTimeoutMs: 5_000,
});
const downstream = await Promise.resolve(
wrapper(
{ provider: "opencode-go", id: "deepseek-v4-flash" } as any,
{} as any,
{ signal: new AbortController().signal } as any,
),
);
expect(downstream).toBeDefined();
if (!downstream) {
return;
}
const consumer = (async () => {
for await (const event of downstream) {
void event;
}
})();
await vi.advanceTimersByTimeAsync(6_000);
expect(abortCalled).toBe(true);
await consumer;
} finally {
restoreAbortSignalAny(abortSignalAnyDescriptor);
}
});
it("cleans up fallback AbortSignal listeners after natural completion", async () => {
const abortSignalAnyDescriptor = disableAbortSignalAny();
const sourceController = new AbortController();
const addEventListener = vi.spyOn(sourceController.signal, "addEventListener");
const removeEventListener = vi.spyOn(sourceController.signal, "removeEventListener");
const { stream: baseStream, controller } = createFakeBaseStream();
try {
const wrapper = createOpencodeGoStalledStreamWrapper(vi.fn(() => baseStream) as any, {
provider: "opencode-go",
idleTimeoutMs: 5_000,
});
const downstream = await Promise.resolve(
wrapper(
{ provider: "opencode-go", id: "deepseek-v4-flash" } as any,
{} as any,
{ signal: sourceController.signal } as any,
),
);
expect(downstream).toBeDefined();
if (!downstream) {
return;
}
const received: AnyEvent[] = [];
const consumer = (async () => {
for await (const event of downstream) {
received.push(event);
}
})();
const partial = {
role: "assistant",
content: [{ type: "text", text: "done" }],
stopReason: "stop",
};
controller.emit({ type: "start", partial } as any);
controller.emit({ type: "done", reason: "stop", message: partial } as any);
await consumer;
expect(received.some((event) => event.type === "done")).toBe(true);
expect(addEventListener).toHaveBeenCalledWith("abort", expect.any(Function), { once: true });
expect(removeEventListener).toHaveBeenCalledWith("abort", expect.any(Function));
} finally {
restoreAbortSignalAny(abortSignalAnyDescriptor);
addEventListener.mockRestore();
removeEventListener.mockRestore();
}
});
it("preserves normal delayed usage-only completion without aborting", async () => {
// Arrange: a fake base stream that streams a normal completion, including
// a long quiet gap before the final usage-only delta — but well within the
// idle timeout. The wrapper must not abort.
const { stream: baseStream, controller } = createFakeBaseStream();
void baseStream;
let abortCalled = false;
const capturedSignals: AbortSignal[] = [];
const underlying = vi.fn((_model, _context, options) => {
if (options?.signal) {
capturedSignals.push(options.signal);
options.signal.addEventListener("abort", () => {
abortCalled = true;
});
}
return baseStream;
});
const wrapper = createOpencodeGoStalledStreamWrapper(underlying as any, {
provider: "opencode-go",
idleTimeoutMs: 5_000,
});
const downstream = await Promise.resolve(
wrapper({ provider: "opencode-go", id: "deepseek-v4-flash" } as any, {} as any, {} as any),
);
expect(downstream).toBeDefined();
if (!downstream) {
return;
}
const received: AnyEvent[] = [];
const consumer = (async () => {
for await (const event of downstream) {
received.push(event);
}
})();
const partial = {
role: "assistant",
content: [{ type: "text", text: "hello" }],
stopReason: "stop",
};
controller.emit({ type: "start", partial } as any);
controller.emit({
type: "text_delta",
contentIndex: 0,
delta: "hello",
partial,
} as any);
// Simulate a delayed final chunk after a short (sub-timeout) quiet gap.
await vi.advanceTimersByTimeAsync(2_000);
// Final completion event arrives before idle timeout fires.
controller.emit({
type: "done",
reason: "stop",
message: partial,
} as any);
// Advance well past the idle timeout — wrapper should NOT have fired.
await vi.advanceTimersByTimeAsync(10_000);
expect(abortCalled).toBe(false);
// Downstream must contain all forwarded events including the done event.
const doneEvent = received.find((event) => event.type === "done");
expect(doneEvent).toBeDefined();
// Cleanup
controller.end();
await consumer;
});
});