mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-23 04:38:13 +00:00
* fix(opencode-go): abort stalled SSE streams at provider-owned raw boundary opencode-go routes through the shared OpenAI-compatible completions provider, where a stalled SSE socket (provider emits tokens then never closes the stream) hangs the gateway until stuckSessionAbortMs (~622s) and surfaces as 'LLM request failed' / 'Request was aborted'. Issue #93610 reports ~90% of opencode-go cron jobs failing intermittently this way. Add a provider-owned stream wrapper at the opencode-go raw SSE boundary that injects an AbortController into the underlying OpenAI SDK request and aborts it after a configurable idle window (default 30s, far below 622s) elapses without any forward-progress event. The wrapper is: - Provider-scoped: only applies when model.provider === 'opencode-go'; the shared openai-completions.ts path is untouched. - Abortable: calls controller.abort() on the injected AbortSignal, which propagates through OpenAI SDK requestOptions.signal and genuinely interrupts the underlying fetch/stream (not just iterator return()). - Idle-based: every event (text/tool/thinking delta, including delayed usage-only chunks) refreshes the timer; natural completion (done/error) cancels it. Normal delayed usage-only completion is preserved. - Boundary-terminal: pushes a terminal { type: 'error', reason: 'aborted' } event downstream so consumers do not hang. TDD: stream-termination.test.ts covers (a) stalled stream after first progress is aborted within the idle window with a downstream 'aborted' terminal event, and (b) normal delayed completion within the idle window is not aborted and the done event is forwarded unchanged. * fix(opencode-go): align stalled-stream idle default with runtime (120s) Match the runtime's shared `DEFAULT_LLM_IDLE_TIMEOUT_MS` (120s) so non-cron interactive opencode-go runs see no behavior change versus the existing watchdog. Cron runs — for which the runtime disables its idle watchdog entirely (`resolveLlmIdleTimeoutMs` returns 0 when trigger is cron and no explicit timeout is set) — still get provider-owned termination well before the ~622s stuck-session recovery. Refs #93610 * fix(opencode-go): satisfy CI lint and test type checks - Remove unnecessary `?? {}` fallback in spread (oxlint no-useless-fallback-in-spread). - Drop non-narrowing `!` on the wrapper return type; use `await Promise.resolve(...)` to collapse the `StreamLike | Promise<StreamLike>` union before `for await`. Refs #93610 * fix(opencode-go): arm stalled-stream idle timer only after first event The wrapper armed the idle timer before the first upstream event, which would mis-abort slow time-to-first-byte requests — including the opencode-go cron runs that the runtime deliberately leaves uncapped via resolveLlmIdleTimeoutMs. Arm only after the first forwarded event, and add regression coverage for the slow-first-event path. * fix(opencode-go): cover stalled stream first event * fix(opencode-go): respect explicit stream timeout * fix(opencode-go): preserve first-event timer after synthetic start * fix(opencode-go): satisfy stream termination test lint * fix(opencode-go): distinguish synthetic stream preambles * fix(opencode-go): route stalled streams through failover
717 lines
20 KiB
TypeScript
717 lines
20 KiB
TypeScript
// Opencode Go stream termination wrapper tests cover provider-owned raw SSE
|
|
// boundary behavior for stalled OpenAI-compatible streams.
|
|
import type {
|
|
AssistantMessageEvent,
|
|
AssistantMessageEventStreamContract,
|
|
} from "openclaw/plugin-sdk/llm";
|
|
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
|
import { createOpencodeGoStalledStreamWrapper } from "./stream-termination.js";
|
|
|
|
type AnyEvent = AssistantMessageEvent;
|
|
type StreamLike = AssistantMessageEventStreamContract;
|
|
|
|
interface FakeStreamController {
|
|
emit(event: AnyEvent): void;
|
|
end(): void;
|
|
}
|
|
|
|
function createFakeBaseStream(): {
|
|
stream: StreamLike;
|
|
controller: FakeStreamController;
|
|
getReturnCalls: () => number;
|
|
} {
|
|
const queued: IteratorResult<AnyEvent>[] = [];
|
|
const waiters: ((result: IteratorResult<AnyEvent>) => void)[] = [];
|
|
let finished = false;
|
|
let returnCalls = 0;
|
|
|
|
const iterator: AsyncIterator<AnyEvent> = {
|
|
next(): Promise<IteratorResult<AnyEvent>> {
|
|
if (queued.length > 0) {
|
|
return Promise.resolve(queued.shift()!);
|
|
}
|
|
if (finished) {
|
|
return Promise.resolve({ value: undefined, done: true });
|
|
}
|
|
return new Promise((resolve) => {
|
|
waiters.push(resolve);
|
|
});
|
|
},
|
|
return(): Promise<IteratorResult<AnyEvent>> {
|
|
returnCalls += 1;
|
|
finished = true;
|
|
while (waiters.length > 0) {
|
|
waiters.shift()!({ value: undefined, done: true });
|
|
}
|
|
return Promise.resolve({ value: undefined, done: true });
|
|
},
|
|
};
|
|
|
|
const stream: StreamLike = {
|
|
[Symbol.asyncIterator]() {
|
|
return iterator;
|
|
},
|
|
push() {
|
|
// unused: the wrapper pushes its own events into a separate stream.
|
|
},
|
|
end() {
|
|
// unused: the wrapper ends its own stream.
|
|
},
|
|
result() {
|
|
return Promise.reject(new Error("fake base stream result not used"));
|
|
},
|
|
};
|
|
|
|
const controller: FakeStreamController = {
|
|
emit(event: AnyEvent) {
|
|
const waiter = waiters.shift();
|
|
if (waiter) {
|
|
waiter({ value: event, done: false });
|
|
} else {
|
|
queued.push({ value: event, done: false });
|
|
}
|
|
},
|
|
end() {
|
|
finished = true;
|
|
while (waiters.length > 0) {
|
|
waiters.shift()!({ value: undefined, done: true });
|
|
}
|
|
},
|
|
};
|
|
|
|
return { stream, controller, getReturnCalls: () => returnCalls };
|
|
}
|
|
|
|
function disableAbortSignalAny(): PropertyDescriptor | undefined {
|
|
const descriptor = Object.getOwnPropertyDescriptor(AbortSignal, "any");
|
|
Object.defineProperty(AbortSignal, "any", {
|
|
configurable: true,
|
|
value: undefined,
|
|
});
|
|
return descriptor;
|
|
}
|
|
|
|
function restoreAbortSignalAny(descriptor: PropertyDescriptor | undefined): void {
|
|
if (descriptor) {
|
|
Object.defineProperty(AbortSignal, "any", descriptor);
|
|
} else {
|
|
Reflect.deleteProperty(AbortSignal, "any");
|
|
}
|
|
}
|
|
|
|
describe("createOpencodeGoStalledStreamWrapper", () => {
|
|
beforeEach(() => {
|
|
vi.useFakeTimers();
|
|
});
|
|
|
|
afterEach(() => {
|
|
vi.useRealTimers();
|
|
});
|
|
|
|
it("aborts underlying stream when progress stalls after first delta (raw SSE boundary)", async () => {
|
|
// Arrange: a fake base stream that emits a start + one text_delta, then stalls.
|
|
const { stream: baseStream, controller } = createFakeBaseStream();
|
|
void baseStream;
|
|
let abortCalled = false;
|
|
const capturedSignals: AbortSignal[] = [];
|
|
|
|
const underlying = vi.fn((_model, _context, options) => {
|
|
if (options?.signal) {
|
|
capturedSignals.push(options.signal);
|
|
options.signal.addEventListener("abort", () => {
|
|
abortCalled = true;
|
|
});
|
|
}
|
|
return baseStream;
|
|
});
|
|
|
|
const wrapper = createOpencodeGoStalledStreamWrapper(underlying as any, {
|
|
provider: "opencode-go",
|
|
idleTimeoutMs: 5_000,
|
|
});
|
|
|
|
const downstream = await Promise.resolve(
|
|
wrapper({ provider: "opencode-go", id: "deepseek-v4-flash" } as any, {} as any, {} as any),
|
|
);
|
|
expect(downstream).toBeDefined();
|
|
if (!downstream) {
|
|
return;
|
|
}
|
|
|
|
// Drain wrapper events in the background.
|
|
const received: AnyEvent[] = [];
|
|
const consumer = (async () => {
|
|
for await (const event of downstream) {
|
|
received.push(event);
|
|
}
|
|
})();
|
|
|
|
// Emit a start + one text delta — that proves the provider side has produced tokens.
|
|
const partial = {
|
|
role: "assistant",
|
|
content: [{ type: "text", text: "hi" }],
|
|
stopReason: undefined,
|
|
};
|
|
controller.emit({ type: "start", partial } as any);
|
|
controller.emit({
|
|
type: "text_delta",
|
|
contentIndex: 0,
|
|
delta: "hi",
|
|
partial,
|
|
} as any);
|
|
|
|
// Advance wall clock beyond idleTimeoutMs without any new progress.
|
|
await vi.advanceTimersByTimeAsync(6_000);
|
|
|
|
// Assert: wrapper called abort on its injected AbortController (forwarded as options.signal).
|
|
expect(capturedSignals).toHaveLength(1);
|
|
expect(abortCalled).toBe(true);
|
|
|
|
// And it pushed a terminal error event to the downstream consumer.
|
|
const terminal = received.find(
|
|
(event) => event.type === "error" && (event as any).reason === "error",
|
|
);
|
|
expect(terminal).toBeDefined();
|
|
expect((terminal as any)?.error).toMatchObject({
|
|
stopReason: "error",
|
|
errorMessage: "opencode-go stream timed out after provider-owned SSE boundary stalled",
|
|
});
|
|
|
|
// Cleanup: end base stream so consumer promise resolves.
|
|
controller.end();
|
|
await consumer;
|
|
});
|
|
|
|
it("uses a longer first-event timeout than the inter-event idle timeout", async () => {
|
|
const { stream: baseStream } = createFakeBaseStream();
|
|
let abortCalled = false;
|
|
|
|
const underlying = vi.fn((_model, _context, options) => {
|
|
if (options?.signal) {
|
|
options.signal.addEventListener("abort", () => {
|
|
abortCalled = true;
|
|
});
|
|
}
|
|
return baseStream;
|
|
});
|
|
|
|
const wrapper = createOpencodeGoStalledStreamWrapper(underlying as any, {
|
|
provider: "opencode-go",
|
|
idleTimeoutMs: 5_000,
|
|
firstEventTimeoutMs: 10_000,
|
|
});
|
|
|
|
const downstream = await Promise.resolve(
|
|
wrapper({ provider: "opencode-go", id: "deepseek-v4-flash" } as any, {} as any, {} as any),
|
|
);
|
|
expect(downstream).toBeDefined();
|
|
if (!downstream) {
|
|
return;
|
|
}
|
|
|
|
const consumer = (async () => {
|
|
for await (const event of downstream) {
|
|
void event;
|
|
}
|
|
})();
|
|
|
|
await vi.advanceTimersByTimeAsync(6_000);
|
|
expect(abortCalled).toBe(false);
|
|
|
|
await vi.advanceTimersByTimeAsync(5_000);
|
|
expect(abortCalled).toBe(true);
|
|
await consumer;
|
|
});
|
|
|
|
it("keeps the first-event window after an openai-completions synthetic start", async () => {
|
|
const { stream: baseStream, controller } = createFakeBaseStream();
|
|
let abortCalled = false;
|
|
|
|
const underlying = vi.fn((_model, _context, options) => {
|
|
if (options?.signal) {
|
|
options.signal.addEventListener("abort", () => {
|
|
abortCalled = true;
|
|
});
|
|
}
|
|
return baseStream;
|
|
});
|
|
|
|
const wrapper = createOpencodeGoStalledStreamWrapper(underlying as any, {
|
|
provider: "opencode-go",
|
|
idleTimeoutMs: 5_000,
|
|
firstEventTimeoutMs: 10_000,
|
|
});
|
|
|
|
const downstream = await Promise.resolve(
|
|
wrapper({ provider: "opencode-go", id: "deepseek-v4-flash" } as any, {} as any, {} as any),
|
|
);
|
|
expect(downstream).toBeDefined();
|
|
if (!downstream) {
|
|
return;
|
|
}
|
|
|
|
const received: AnyEvent[] = [];
|
|
const consumer = (async () => {
|
|
for await (const event of downstream) {
|
|
received.push(event);
|
|
}
|
|
})();
|
|
|
|
const partial = {
|
|
role: "assistant",
|
|
content: [],
|
|
stopReason: undefined,
|
|
};
|
|
controller.emit({ type: "start", partial } as any);
|
|
|
|
await vi.advanceTimersByTimeAsync(6_000);
|
|
expect(abortCalled).toBe(false);
|
|
|
|
controller.emit({
|
|
type: "text_delta",
|
|
contentIndex: 0,
|
|
delta: "hello",
|
|
partial: {
|
|
...partial,
|
|
content: [{ type: "text", text: "hello" }],
|
|
},
|
|
} as any);
|
|
controller.emit({
|
|
type: "done",
|
|
reason: "stop",
|
|
message: {
|
|
...partial,
|
|
content: [{ type: "text", text: "hello" }],
|
|
stopReason: "stop",
|
|
},
|
|
} as any);
|
|
await consumer;
|
|
|
|
expect(abortCalled).toBe(false);
|
|
expect(received.some((event) => event.type === "text_delta")).toBe(true);
|
|
expect(received.some((event) => event.type === "done")).toBe(true);
|
|
});
|
|
|
|
it("keeps the first-event window after synthetic block-start events until a provider delta", async () => {
|
|
const { stream: baseStream, controller } = createFakeBaseStream();
|
|
let abortCalled = false;
|
|
|
|
const underlying = vi.fn((_model, _context, options) => {
|
|
if (options?.signal) {
|
|
options.signal.addEventListener("abort", () => {
|
|
abortCalled = true;
|
|
});
|
|
}
|
|
return baseStream;
|
|
});
|
|
|
|
const wrapper = createOpencodeGoStalledStreamWrapper(underlying as any, {
|
|
provider: "opencode-go",
|
|
idleTimeoutMs: 5_000,
|
|
firstEventTimeoutMs: 10_000,
|
|
});
|
|
|
|
const downstream = await Promise.resolve(
|
|
wrapper({ provider: "opencode-go", id: "deepseek-v4-flash" } as any, {} as any, {} as any),
|
|
);
|
|
expect(downstream).toBeDefined();
|
|
if (!downstream) {
|
|
return;
|
|
}
|
|
|
|
const received: AnyEvent[] = [];
|
|
const consumer = (async () => {
|
|
for await (const event of downstream) {
|
|
received.push(event);
|
|
}
|
|
})();
|
|
|
|
const partial = {
|
|
role: "assistant",
|
|
content: [{ type: "text", text: "" }],
|
|
stopReason: undefined,
|
|
};
|
|
controller.emit({ type: "start", partial } as any);
|
|
controller.emit({ type: "text_start", contentIndex: 0, partial } as any);
|
|
|
|
await vi.advanceTimersByTimeAsync(6_000);
|
|
expect(abortCalled).toBe(false);
|
|
|
|
const message = {
|
|
...partial,
|
|
content: [{ type: "text", text: "hello" }],
|
|
stopReason: "stop",
|
|
};
|
|
controller.emit({
|
|
type: "text_delta",
|
|
contentIndex: 0,
|
|
delta: "hello",
|
|
partial: message,
|
|
} as any);
|
|
controller.emit({ type: "done", reason: "stop", message } as any);
|
|
await consumer;
|
|
|
|
expect(abortCalled).toBe(false);
|
|
expect(received.some((event) => event.type === "text_delta")).toBe(true);
|
|
expect(received.some((event) => event.type === "done")).toBe(true);
|
|
});
|
|
|
|
it("honors explicit opencode-go provider request timeout above the wrapper idle default", async () => {
|
|
const { stream: baseStream, controller } = createFakeBaseStream();
|
|
let abortCalled = false;
|
|
|
|
const underlying = vi.fn((_model, _context, options) => {
|
|
if (options?.signal) {
|
|
options.signal.addEventListener("abort", () => {
|
|
abortCalled = true;
|
|
});
|
|
}
|
|
return baseStream;
|
|
});
|
|
|
|
const wrapper = createOpencodeGoStalledStreamWrapper(underlying as any, {
|
|
provider: "opencode-go",
|
|
idleTimeoutMs: 5_000,
|
|
firstEventTimeoutMs: 5_000,
|
|
});
|
|
|
|
const downstream = await Promise.resolve(
|
|
wrapper(
|
|
{ provider: "opencode-go", id: "deepseek-v4-flash", requestTimeoutMs: 10_000 } as any,
|
|
{} as any,
|
|
{} as any,
|
|
),
|
|
);
|
|
expect(downstream).toBeDefined();
|
|
if (!downstream) {
|
|
return;
|
|
}
|
|
|
|
const consumer = (async () => {
|
|
for await (const event of downstream) {
|
|
void event;
|
|
}
|
|
})();
|
|
|
|
const partial = {
|
|
role: "assistant",
|
|
content: [{ type: "text", text: "slow" }],
|
|
stopReason: undefined,
|
|
};
|
|
controller.emit({ type: "start", partial } as any);
|
|
|
|
await vi.advanceTimersByTimeAsync(6_000);
|
|
expect(abortCalled).toBe(false);
|
|
|
|
await vi.advanceTimersByTimeAsync(5_000);
|
|
expect(abortCalled).toBe(true);
|
|
await consumer;
|
|
});
|
|
|
|
it("honors explicit opencode-go provider request timeout below wrapper defaults", async () => {
|
|
const { stream: baseStream } = createFakeBaseStream();
|
|
let abortCalled = false;
|
|
|
|
const underlying = vi.fn((_model, _context, options) => {
|
|
if (options?.signal) {
|
|
options.signal.addEventListener("abort", () => {
|
|
abortCalled = true;
|
|
});
|
|
}
|
|
return baseStream;
|
|
});
|
|
|
|
const wrapper = createOpencodeGoStalledStreamWrapper(underlying as any, {
|
|
provider: "opencode-go",
|
|
idleTimeoutMs: 5_000,
|
|
firstEventTimeoutMs: 10_000,
|
|
});
|
|
|
|
const downstream = await Promise.resolve(
|
|
wrapper(
|
|
{ provider: "opencode-go", id: "deepseek-v4-flash", requestTimeoutMs: 2_000 } as any,
|
|
{} as any,
|
|
{} as any,
|
|
),
|
|
);
|
|
expect(downstream).toBeDefined();
|
|
if (!downstream) {
|
|
return;
|
|
}
|
|
|
|
const consumer = (async () => {
|
|
for await (const event of downstream) {
|
|
void event;
|
|
}
|
|
})();
|
|
|
|
await vi.advanceTimersByTimeAsync(2_500);
|
|
expect(abortCalled).toBe(true);
|
|
await consumer;
|
|
});
|
|
|
|
it("aborts and releases the underlying stream when no first event arrives", async () => {
|
|
const { stream: baseStream, getReturnCalls } = createFakeBaseStream();
|
|
let abortCalled = false;
|
|
const capturedSignals: AbortSignal[] = [];
|
|
|
|
const underlying = vi.fn((_model, _context, options) => {
|
|
if (options?.signal) {
|
|
capturedSignals.push(options.signal);
|
|
options.signal.addEventListener("abort", () => {
|
|
abortCalled = true;
|
|
});
|
|
}
|
|
return baseStream;
|
|
});
|
|
|
|
const wrapper = createOpencodeGoStalledStreamWrapper(underlying as any, {
|
|
provider: "opencode-go",
|
|
idleTimeoutMs: 5_000,
|
|
});
|
|
|
|
const downstream = await Promise.resolve(
|
|
wrapper({ provider: "opencode-go", id: "deepseek-v4-flash" } as any, {} as any, {} as any),
|
|
);
|
|
expect(downstream).toBeDefined();
|
|
if (!downstream) {
|
|
return;
|
|
}
|
|
|
|
const received: AnyEvent[] = [];
|
|
const consumer = (async () => {
|
|
for await (const event of downstream) {
|
|
received.push(event);
|
|
}
|
|
})();
|
|
|
|
await vi.advanceTimersByTimeAsync(6_000);
|
|
|
|
expect(capturedSignals).toHaveLength(1);
|
|
expect(abortCalled).toBe(true);
|
|
expect(getReturnCalls()).toBe(1);
|
|
expect(
|
|
received.some((event) => event.type === "error" && (event as any).reason === "error"),
|
|
).toBe(true);
|
|
|
|
await consumer;
|
|
});
|
|
|
|
it("aborts stream creation when the upstream stream promise never resolves", async () => {
|
|
let abortCalled = false;
|
|
|
|
const underlying = vi.fn((_model, _context, options) => {
|
|
if (options?.signal) {
|
|
options.signal.addEventListener("abort", () => {
|
|
abortCalled = true;
|
|
});
|
|
}
|
|
return new Promise<StreamLike>(() => {
|
|
// keep pending
|
|
});
|
|
});
|
|
|
|
const wrapper = createOpencodeGoStalledStreamWrapper(underlying as any, {
|
|
provider: "opencode-go",
|
|
idleTimeoutMs: 5_000,
|
|
});
|
|
|
|
const downstream = await Promise.resolve(
|
|
wrapper({ provider: "opencode-go", id: "deepseek-v4-flash" } as any, {} as any, {} as any),
|
|
);
|
|
expect(downstream).toBeDefined();
|
|
if (!downstream) {
|
|
return;
|
|
}
|
|
|
|
const received: AnyEvent[] = [];
|
|
const consumer = (async () => {
|
|
for await (const event of downstream) {
|
|
received.push(event);
|
|
}
|
|
})();
|
|
|
|
await vi.advanceTimersByTimeAsync(6_000);
|
|
|
|
expect(abortCalled).toBe(true);
|
|
expect(
|
|
received.some((event) => event.type === "error" && (event as any).reason === "error"),
|
|
).toBe(true);
|
|
await consumer;
|
|
});
|
|
|
|
it("aborts through the fallback combined signal when no first event arrives", async () => {
|
|
const abortSignalAnyDescriptor = disableAbortSignalAny();
|
|
const { stream: baseStream } = createFakeBaseStream();
|
|
let abortCalled = false;
|
|
|
|
try {
|
|
const underlying = vi.fn((_model, _context, options) => {
|
|
if (options?.signal) {
|
|
options.signal.addEventListener("abort", () => {
|
|
abortCalled = true;
|
|
});
|
|
}
|
|
return baseStream;
|
|
});
|
|
|
|
const wrapper = createOpencodeGoStalledStreamWrapper(underlying as any, {
|
|
provider: "opencode-go",
|
|
idleTimeoutMs: 5_000,
|
|
});
|
|
|
|
const downstream = await Promise.resolve(
|
|
wrapper(
|
|
{ provider: "opencode-go", id: "deepseek-v4-flash" } as any,
|
|
{} as any,
|
|
{ signal: new AbortController().signal } as any,
|
|
),
|
|
);
|
|
expect(downstream).toBeDefined();
|
|
if (!downstream) {
|
|
return;
|
|
}
|
|
|
|
const consumer = (async () => {
|
|
for await (const event of downstream) {
|
|
void event;
|
|
}
|
|
})();
|
|
|
|
await vi.advanceTimersByTimeAsync(6_000);
|
|
|
|
expect(abortCalled).toBe(true);
|
|
await consumer;
|
|
} finally {
|
|
restoreAbortSignalAny(abortSignalAnyDescriptor);
|
|
}
|
|
});
|
|
|
|
it("cleans up fallback AbortSignal listeners after natural completion", async () => {
|
|
const abortSignalAnyDescriptor = disableAbortSignalAny();
|
|
const sourceController = new AbortController();
|
|
const addEventListener = vi.spyOn(sourceController.signal, "addEventListener");
|
|
const removeEventListener = vi.spyOn(sourceController.signal, "removeEventListener");
|
|
const { stream: baseStream, controller } = createFakeBaseStream();
|
|
|
|
try {
|
|
const wrapper = createOpencodeGoStalledStreamWrapper(vi.fn(() => baseStream) as any, {
|
|
provider: "opencode-go",
|
|
idleTimeoutMs: 5_000,
|
|
});
|
|
|
|
const downstream = await Promise.resolve(
|
|
wrapper(
|
|
{ provider: "opencode-go", id: "deepseek-v4-flash" } as any,
|
|
{} as any,
|
|
{ signal: sourceController.signal } as any,
|
|
),
|
|
);
|
|
expect(downstream).toBeDefined();
|
|
if (!downstream) {
|
|
return;
|
|
}
|
|
|
|
const received: AnyEvent[] = [];
|
|
const consumer = (async () => {
|
|
for await (const event of downstream) {
|
|
received.push(event);
|
|
}
|
|
})();
|
|
|
|
const partial = {
|
|
role: "assistant",
|
|
content: [{ type: "text", text: "done" }],
|
|
stopReason: "stop",
|
|
};
|
|
controller.emit({ type: "start", partial } as any);
|
|
controller.emit({ type: "done", reason: "stop", message: partial } as any);
|
|
await consumer;
|
|
|
|
expect(received.some((event) => event.type === "done")).toBe(true);
|
|
expect(addEventListener).toHaveBeenCalledWith("abort", expect.any(Function), { once: true });
|
|
expect(removeEventListener).toHaveBeenCalledWith("abort", expect.any(Function));
|
|
} finally {
|
|
restoreAbortSignalAny(abortSignalAnyDescriptor);
|
|
addEventListener.mockRestore();
|
|
removeEventListener.mockRestore();
|
|
}
|
|
});
|
|
|
|
it("preserves normal delayed usage-only completion without aborting", async () => {
|
|
// Arrange: a fake base stream that streams a normal completion, including
|
|
// a long quiet gap before the final usage-only delta — but well within the
|
|
// idle timeout. The wrapper must not abort.
|
|
const { stream: baseStream, controller } = createFakeBaseStream();
|
|
void baseStream;
|
|
let abortCalled = false;
|
|
const capturedSignals: AbortSignal[] = [];
|
|
|
|
const underlying = vi.fn((_model, _context, options) => {
|
|
if (options?.signal) {
|
|
capturedSignals.push(options.signal);
|
|
options.signal.addEventListener("abort", () => {
|
|
abortCalled = true;
|
|
});
|
|
}
|
|
return baseStream;
|
|
});
|
|
|
|
const wrapper = createOpencodeGoStalledStreamWrapper(underlying as any, {
|
|
provider: "opencode-go",
|
|
idleTimeoutMs: 5_000,
|
|
});
|
|
|
|
const downstream = await Promise.resolve(
|
|
wrapper({ provider: "opencode-go", id: "deepseek-v4-flash" } as any, {} as any, {} as any),
|
|
);
|
|
expect(downstream).toBeDefined();
|
|
if (!downstream) {
|
|
return;
|
|
}
|
|
|
|
const received: AnyEvent[] = [];
|
|
const consumer = (async () => {
|
|
for await (const event of downstream) {
|
|
received.push(event);
|
|
}
|
|
})();
|
|
|
|
const partial = {
|
|
role: "assistant",
|
|
content: [{ type: "text", text: "hello" }],
|
|
stopReason: "stop",
|
|
};
|
|
controller.emit({ type: "start", partial } as any);
|
|
controller.emit({
|
|
type: "text_delta",
|
|
contentIndex: 0,
|
|
delta: "hello",
|
|
partial,
|
|
} as any);
|
|
|
|
// Simulate a delayed final chunk after a short (sub-timeout) quiet gap.
|
|
await vi.advanceTimersByTimeAsync(2_000);
|
|
|
|
// Final completion event arrives before idle timeout fires.
|
|
controller.emit({
|
|
type: "done",
|
|
reason: "stop",
|
|
message: partial,
|
|
} as any);
|
|
|
|
// Advance well past the idle timeout — wrapper should NOT have fired.
|
|
await vi.advanceTimersByTimeAsync(10_000);
|
|
|
|
expect(abortCalled).toBe(false);
|
|
|
|
// Downstream must contain all forwarded events including the done event.
|
|
const doneEvent = received.find((event) => event.type === "done");
|
|
expect(doneEvent).toBeDefined();
|
|
|
|
// Cleanup
|
|
controller.end();
|
|
await consumer;
|
|
});
|
|
});
|