diff --git a/src/tui/tui-event-handlers.test.ts b/src/tui/tui-event-handlers.test.ts index 625ea1b3add..a73de0be466 100644 --- a/src/tui/tui-event-handlers.test.ts +++ b/src/tui/tui-event-handlers.test.ts @@ -721,9 +721,7 @@ describe("tui-event-handlers: streaming watchdog", () => { expect(setActivityStatus).toHaveBeenLastCalledWith("idle"); expect(state.activeChatRunId).toBeNull(); - expect(chatLog.addSystem).toHaveBeenCalledWith( - expect.stringContaining("streaming watchdog"), - ); + expect(chatLog.addSystem).toHaveBeenCalledWith(expect.stringContaining("streaming watchdog")); handlers.dispose?.(); }); @@ -751,8 +749,6 @@ describe("tui-event-handlers: streaming watchdog", () => { vi.advanceTimersByTime(3_000); - // 6s total, but the latest delta was only 3s ago, so the watchdog must not - // have fired yet. expect(setActivityStatus).not.toHaveBeenCalledWith("idle"); expect(state.activeChatRunId).toBe("run-flow"); @@ -784,8 +780,6 @@ describe("tui-event-handlers: streaming watchdog", () => { vi.advanceTimersByTime(10_000); - // After a normal final, the watchdog timer must have been cancelled and - // cannot later re-overwrite the status or emit the warning banner. const statusCalls = setActivityStatus.mock.calls.map((c) => c[0]); expect(statusCalls.filter((s) => s === "idle").length).toBe(1); expect(chatLog.addSystem).not.toHaveBeenCalledWith( @@ -817,6 +811,47 @@ describe("tui-event-handlers: streaming watchdog", () => { handlers.dispose?.(); }); + it("does not let an older run steal the active run watchdog", () => { + const { state, chatLog, setActivityStatus, handlers } = createHarness({ + streamingWatchdogMs: 5_000, + }); + + handlers.handleChatEvent({ + runId: "run-old", + sessionKey: state.currentSessionKey, + state: "delta", + message: { content: "old" }, + } satisfies ChatEvent); + + vi.advanceTimersByTime(5_001); + expect(state.activeChatRunId).toBeNull(); + + handlers.handleChatEvent({ + runId: "run-new", + sessionKey: state.currentSessionKey, + state: "delta", + message: { content: "new" }, + } satisfies ChatEvent); + expect(state.activeChatRunId).toBe("run-new"); + + vi.advanceTimersByTime(3_000); + + handlers.handleChatEvent({ + runId: "run-old", + sessionKey: state.currentSessionKey, + state: "delta", + message: { content: "old again" }, + } satisfies ChatEvent); + + vi.advanceTimersByTime(2_001); + + expect(setActivityStatus).toHaveBeenLastCalledWith("idle"); + expect(state.activeChatRunId).toBeNull(); + expect(chatLog.addSystem).toHaveBeenCalledTimes(2); + + handlers.dispose?.(); + }); + it("dispose clears a pending watchdog without firing it", () => { const { setActivityStatus, chatLog, handlers, state } = createHarness({ streamingWatchdogMs: 5_000, diff --git a/src/tui/tui-event-handlers.ts b/src/tui/tui-event-handlers.ts index 1379a1a6506..b503801d8dc 100644 --- a/src/tui/tui-event-handlers.ts +++ b/src/tui/tui-event-handlers.ts @@ -41,13 +41,7 @@ type EventHandlerContext = { isLocalBtwRunId?: (runId: string) => boolean; forgetLocalBtwRunId?: (runId: string) => void; clearLocalBtwRunIds?: () => void; - /** - * Milliseconds of stream-delta silence that force the `streaming` activity - * indicator to reset to `idle`. Guards against lost/late "final" events from - * the gateway (WS flaps, gateway restarts, backends that emit `stopReason` - * without an explicit stream-end event) leaving the TUI stuck on - * `streaming ยท Xm Ys` forever. Defaults to 30s. Set to 0 to disable. - */ + /** Reset `streaming` after this much delta silence. Set to 0 to disable. */ streamingWatchdogMs?: number; }; @@ -103,16 +97,10 @@ export function createEventHandlers(context: EventHandlerContext) { streamingWatchdogRunId = runId; streamingWatchdogTimer = setTimeout(() => { streamingWatchdogTimer = null; - // Only act if the timer still matches the run that armed it and that run - // is still the TUI's active stream. A later `final`/`aborted`/`error` - // event already cleared the indicator by the normal path otherwise. - if (streamingWatchdogRunId !== runId) { + if (streamingWatchdogRunId !== runId || state.activeChatRunId !== runId) { return; } streamingWatchdogRunId = null; - if (state.activeChatRunId !== runId) { - return; - } state.activeChatRunId = null; setActivityStatus("idle"); chatLog.addSystem( @@ -122,8 +110,6 @@ export function createEventHandlers(context: EventHandlerContext) { ); tui.requestRender(); }, streamingWatchdogMs); - // Keep the watchdog from blocking process exit when the TUI is shutting - // down. Node timers expose unref() on the returned Timeout object. const maybeUnref = (streamingWatchdogTimer as { unref?: () => void }).unref; if (typeof maybeUnref === "function") { maybeUnref.call(streamingWatchdogTimer); @@ -318,7 +304,9 @@ export function createEventHandlers(context: EventHandlerContext) { } chatLog.updateAssistant(displayText, evt.runId); setActivityStatus("streaming"); - armStreamingWatchdog(evt.runId); + if (state.activeChatRunId === evt.runId) { + armStreamingWatchdog(evt.runId); + } } if (evt.state === "final") { const isLocalBtwRun = isLocalBtwRunId?.(evt.runId) ?? false;