fix: keep TUI watchdog bound to active run (#67401) (thanks @xantorres)

This commit is contained in:
Ayaan Zaidi
2026-04-16 18:01:43 +05:30
parent d7f489f85e
commit 3525273930
2 changed files with 47 additions and 24 deletions

View File

@@ -721,9 +721,7 @@ describe("tui-event-handlers: streaming watchdog", () => {
expect(setActivityStatus).toHaveBeenLastCalledWith("idle");
expect(state.activeChatRunId).toBeNull();
expect(chatLog.addSystem).toHaveBeenCalledWith(
expect.stringContaining("streaming watchdog"),
);
expect(chatLog.addSystem).toHaveBeenCalledWith(expect.stringContaining("streaming watchdog"));
handlers.dispose?.();
});
@@ -751,8 +749,6 @@ describe("tui-event-handlers: streaming watchdog", () => {
vi.advanceTimersByTime(3_000);
// 6s total, but the latest delta was only 3s ago, so the watchdog must not
// have fired yet.
expect(setActivityStatus).not.toHaveBeenCalledWith("idle");
expect(state.activeChatRunId).toBe("run-flow");
@@ -784,8 +780,6 @@ describe("tui-event-handlers: streaming watchdog", () => {
vi.advanceTimersByTime(10_000);
// After a normal final, the watchdog timer must have been cancelled and
// cannot later re-overwrite the status or emit the warning banner.
const statusCalls = setActivityStatus.mock.calls.map((c) => c[0]);
expect(statusCalls.filter((s) => s === "idle").length).toBe(1);
expect(chatLog.addSystem).not.toHaveBeenCalledWith(
@@ -817,6 +811,47 @@ describe("tui-event-handlers: streaming watchdog", () => {
handlers.dispose?.();
});
it("does not let an older run steal the active run watchdog", () => {
const { state, chatLog, setActivityStatus, handlers } = createHarness({
streamingWatchdogMs: 5_000,
});
handlers.handleChatEvent({
runId: "run-old",
sessionKey: state.currentSessionKey,
state: "delta",
message: { content: "old" },
} satisfies ChatEvent);
vi.advanceTimersByTime(5_001);
expect(state.activeChatRunId).toBeNull();
handlers.handleChatEvent({
runId: "run-new",
sessionKey: state.currentSessionKey,
state: "delta",
message: { content: "new" },
} satisfies ChatEvent);
expect(state.activeChatRunId).toBe("run-new");
vi.advanceTimersByTime(3_000);
handlers.handleChatEvent({
runId: "run-old",
sessionKey: state.currentSessionKey,
state: "delta",
message: { content: "old again" },
} satisfies ChatEvent);
vi.advanceTimersByTime(2_001);
expect(setActivityStatus).toHaveBeenLastCalledWith("idle");
expect(state.activeChatRunId).toBeNull();
expect(chatLog.addSystem).toHaveBeenCalledTimes(2);
handlers.dispose?.();
});
it("dispose clears a pending watchdog without firing it", () => {
const { setActivityStatus, chatLog, handlers, state } = createHarness({
streamingWatchdogMs: 5_000,

View File

@@ -41,13 +41,7 @@ type EventHandlerContext = {
isLocalBtwRunId?: (runId: string) => boolean;
forgetLocalBtwRunId?: (runId: string) => void;
clearLocalBtwRunIds?: () => void;
/**
* Milliseconds of stream-delta silence that force the `streaming` activity
* indicator to reset to `idle`. Guards against lost/late "final" events from
* the gateway (WS flaps, gateway restarts, backends that emit `stopReason`
* without an explicit stream-end event) leaving the TUI stuck on
* `streaming · Xm Ys` forever. Defaults to 30s. Set to 0 to disable.
*/
/** Reset `streaming` after this much delta silence. Set to 0 to disable. */
streamingWatchdogMs?: number;
};
@@ -103,16 +97,10 @@ export function createEventHandlers(context: EventHandlerContext) {
streamingWatchdogRunId = runId;
streamingWatchdogTimer = setTimeout(() => {
streamingWatchdogTimer = null;
// Only act if the timer still matches the run that armed it and that run
// is still the TUI's active stream. A later `final`/`aborted`/`error`
// event already cleared the indicator by the normal path otherwise.
if (streamingWatchdogRunId !== runId) {
if (streamingWatchdogRunId !== runId || state.activeChatRunId !== runId) {
return;
}
streamingWatchdogRunId = null;
if (state.activeChatRunId !== runId) {
return;
}
state.activeChatRunId = null;
setActivityStatus("idle");
chatLog.addSystem(
@@ -122,8 +110,6 @@ export function createEventHandlers(context: EventHandlerContext) {
);
tui.requestRender();
}, streamingWatchdogMs);
// Keep the watchdog from blocking process exit when the TUI is shutting
// down. Node timers expose unref() on the returned Timeout object.
const maybeUnref = (streamingWatchdogTimer as { unref?: () => void }).unref;
if (typeof maybeUnref === "function") {
maybeUnref.call(streamingWatchdogTimer);
@@ -318,7 +304,9 @@ export function createEventHandlers(context: EventHandlerContext) {
}
chatLog.updateAssistant(displayText, evt.runId);
setActivityStatus("streaming");
armStreamingWatchdog(evt.runId);
if (state.activeChatRunId === evt.runId) {
armStreamingWatchdog(evt.runId);
}
}
if (evt.state === "final") {
const isLocalBtwRun = isLocalBtwRunId?.(evt.runId) ?? false;