mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-28 12:53:34 +00:00
fix(opencode-go): re-arm idle timer on block-boundary events to prevent false stalled-stream abort (#97128)
* fix(opencode-go): re-arm idle timer on block-boundary events to prevent false stalled-stream abort When the opencode-go model finalizes a tool call and deliberates before the next one, the provider emits real block-boundary SSE events (text_end, thinking_end, toolcall_start, toolcall_end) that prove the socket is alive, but the watchdog's isProviderProgressEvent only returned true for token deltas (text_delta, thinking_delta, toolcall_delta). This caused the idle timer to fire and falsely abort a live stream, replacing a completed answer with a stalled error and dropping the provider's real done event. Fix: include block-boundary events in isProviderProgressEvent so the idle timer is re-armed on any forward-progress provider event. text_start and thinking_start are intentionally excluded because they are synthetic preamble events that should not shorten the first-event window. Closes #96518 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * test(opencode-go): satisfy lint in stream regression * test(opencode-go): satisfy lint in stream regression * test(opencode-go): satisfy lint in stream regression --------- Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com> Co-authored-by: Vincent Koc <vincentkoc@ieee.org>
This commit is contained in:
@@ -713,4 +713,100 @@ describe("createOpencodeGoStalledStreamWrapper", () => {
|
||||
controller.end();
|
||||
await consumer;
|
||||
});
|
||||
|
||||
it("must NOT abort a live stream that keeps emitting block-boundary events between deltas", async () => {
|
||||
// Regression for https://github.com/openclaw/openclaw/issues/96518:
|
||||
// the idle timer must re-arm on block-boundary events (text_end,
|
||||
// thinking_end, toolcall_start, toolcall_end), not only on token
|
||||
// deltas. A stream that keeps producing boundary events between
|
||||
// deltas is demonstrably alive and must not be aborted.
|
||||
const { stream: baseStream, controller } = createFakeBaseStream();
|
||||
let abortCalled = false;
|
||||
const underlying = vi.fn((_model, _context, options) => {
|
||||
if (options?.signal) {
|
||||
options.signal.addEventListener("abort", () => {
|
||||
abortCalled = true;
|
||||
});
|
||||
}
|
||||
return baseStream;
|
||||
});
|
||||
|
||||
const idleTimeoutMs = 5_000;
|
||||
const wrapper = createOpencodeGoStalledStreamWrapper(underlying as any, {
|
||||
provider: "opencode-go",
|
||||
idleTimeoutMs,
|
||||
});
|
||||
|
||||
const downstream = await Promise.resolve(
|
||||
wrapper({ provider: "opencode-go", id: "glm-4.6" } as any, {} as any, {} as any),
|
||||
);
|
||||
expect(downstream).toBeDefined();
|
||||
if (!downstream) {
|
||||
return;
|
||||
}
|
||||
|
||||
const received: AnyEvent[] = [];
|
||||
const consumer = (async () => {
|
||||
for await (const event of downstream) {
|
||||
received.push(event);
|
||||
}
|
||||
})();
|
||||
|
||||
const partial = { role: "assistant", content: [{ type: "text", text: "x" }] };
|
||||
|
||||
// Provider starts producing a tool-call turn. The last *delta* arms the idle timer.
|
||||
controller.emit({ type: "start", partial } as any);
|
||||
controller.emit({
|
||||
type: "toolcall_delta",
|
||||
contentIndex: 0,
|
||||
delta: "{",
|
||||
partial,
|
||||
} as any);
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
|
||||
// The model finalizes the tool call and deliberates on the next one,
|
||||
// emitting real block-boundary events that prove the SSE socket is alive.
|
||||
// Each gap is < idleTimeoutMs, so a liveness-aware watchdog must stay armed.
|
||||
await vi.advanceTimersByTimeAsync(3_000);
|
||||
controller.emit({
|
||||
type: "toolcall_end",
|
||||
contentIndex: 0,
|
||||
toolCall: { name: "f", arguments: "{}" },
|
||||
partial,
|
||||
} as any);
|
||||
await vi.advanceTimersByTimeAsync(3_000);
|
||||
controller.emit({
|
||||
type: "toolcall_start",
|
||||
contentIndex: 1,
|
||||
partial,
|
||||
} as any);
|
||||
|
||||
// Advance to 5s after the last delta, but only 2s after the last
|
||||
// boundary event. The idle timer should have been re-armed by the
|
||||
// boundary events, so it must NOT fire yet.
|
||||
await vi.advanceTimersByTimeAsync(1_000);
|
||||
|
||||
// The provider's completed answer arrives right after.
|
||||
controller.emit({
|
||||
type: "done",
|
||||
reason: "stop",
|
||||
message: {
|
||||
...partial,
|
||||
content: [{ type: "text", text: "final answer" }],
|
||||
stopReason: "stop",
|
||||
},
|
||||
} as any);
|
||||
controller.end();
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
await consumer;
|
||||
|
||||
const hasDone = received.some((e) => e.type === "done");
|
||||
const hasStalledError = received.some(
|
||||
(e) => e.type === "error" && (e as any).error?.stopReason === "error",
|
||||
);
|
||||
|
||||
expect(abortCalled).toBe(false);
|
||||
expect(hasDone).toBe(true);
|
||||
expect(hasStalledError).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -55,7 +55,11 @@ function isProviderProgressEvent(event: AssistantMessageEvent): boolean {
|
||||
return (
|
||||
event.type === "text_delta" ||
|
||||
event.type === "thinking_delta" ||
|
||||
event.type === "toolcall_delta"
|
||||
event.type === "toolcall_delta" ||
|
||||
event.type === "text_end" ||
|
||||
event.type === "thinking_end" ||
|
||||
event.type === "toolcall_start" ||
|
||||
event.type === "toolcall_end"
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user