From d4e52f454255d2a950731baaeac87c4c8c217221 Mon Sep 17 00:00:00 2001 From: Roger Deng <13251150+rogerdigital@users.noreply.github.com> Date: Wed, 29 Apr 2026 19:17:18 +0800 Subject: [PATCH] fix(tui): resync streaming watchdog after reconnect (#74224) * fix(tui): resync streaming watchdog after reconnect * fix(tui): keep reconnect history fallback armed * fix(tui): tighten reconnect watchdog recovery --- CHANGELOG.md | 1 + src/tui/tui-event-handlers.test.ts | 143 +++++++++++++++++++++++++++++ src/tui/tui-event-handlers.ts | 52 ++++++++++- src/tui/tui.ts | 12 ++- 4 files changed, 206 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e6dba75f6a3..2e6e7d36cdb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ Docs: https://docs.openclaw.ai - TUI/status: clear stale `streaming` footer state when a final event arrives after the active run was already cleared and no tracked runs remain, while preserving concurrent-run ownership and inactive local `/btw` terminal handling. Fixes #64825; carries forward #64842, #64843, #64847, and #64862. Thanks @briandevans and @Yanhu007. - Channels/Discord: fail startup closed when Discord cannot resolve the bot's own identity and keep mention gating active when only configured mention patterns can detect mentions, so the provider no longer continues with a missing bot id. Fixes #42219; carries forward #46856 and #49218. Thanks @education-01 and @BenediktSchackenberg. - Channels/Discord: split long CJK replies at punctuation and code-point-safe fallback boundaries so Discord chunking stays readable without corrupting astral characters. Fixes #38597; repairs #71384. Thanks @p3nchan. +- TUI: keep the streaming watchdog alive across active tool/lifecycle proof-of-life, pause it during disconnects, and reload history after stale reconnect runs so long-running chats stop flipping to false idle or hanging on stale streaming. Fixes #69081. Thanks @EenvoudJasper. - Browser/gateway: ignore Playwright dialog-close races from `Page.handleJavaScriptDialog` so browser automation no longer crashes the Gateway when a dialog disappears before Playwright accepts it. (#40067) Thanks @randyjtw. - Cron/Gateway: defer missed isolated agent-turn catch-up out of the channel startup window, so overdue cron work cannot starve Discord or Telegram while providers connect after a restart. Thanks @vincentkoc. - Heartbeat/cron: defer heartbeat turns while cron work is active or queued, add opt-in `heartbeat.skipWhenBusy` for subagent/nested lane pressure, and retry busy skips without advancing the schedule so local Ollama hosts do not run heartbeat and cron prompts concurrently. Fixes #50773. Thanks @scottgl9. diff --git a/src/tui/tui-event-handlers.test.ts b/src/tui/tui-event-handlers.test.ts index 3d51640f09e..5f4a66662dc 100644 --- a/src/tui/tui-event-handlers.test.ts +++ b/src/tui/tui-event-handlers.test.ts @@ -1004,6 +1004,149 @@ describe("tui-event-handlers: streaming watchdog", () => { handlers.dispose?.(); }); + it("rearms the watchdog on active-run tool events even when tool verbosity is off", () => { + const { state, setActivityStatus, handlers } = createHarness({ + streamingWatchdogMs: 5_000, + }); + state.sessionInfo.verboseLevel = "off"; + + handlers.handleChatEvent({ + runId: "run-tools", + sessionKey: state.currentSessionKey, + state: "delta", + message: { content: "first" }, + } satisfies ChatEvent); + + vi.advanceTimersByTime(3_000); + + handlers.handleAgentEvent({ + runId: "run-tools", + stream: "tool", + data: { phase: "start", toolCallId: "tool-1", name: "read" }, + } satisfies AgentEvent); + + vi.advanceTimersByTime(3_000); + + expect(setActivityStatus).not.toHaveBeenCalledWith("idle"); + expect(state.activeChatRunId).toBe("run-tools"); + + vi.advanceTimersByTime(2_001); + + expect(setActivityStatus).toHaveBeenLastCalledWith("idle"); + expect(state.activeChatRunId).toBeNull(); + + handlers.dispose?.(); + }); + + it("pauses the watchdog while disconnected and rearms it on reconnect without clearing the active run", () => { + const { state, setActivityStatus, loadHistory, handlers } = createHarness({ + streamingWatchdogMs: 5_000, + }); + + handlers.handleChatEvent({ + runId: "run-reconnect", + sessionKey: state.currentSessionKey, + state: "delta", + message: { content: "hello" }, + } satisfies ChatEvent); + + handlers.pauseStreamingWatchdog(); + vi.advanceTimersByTime(10_000); + + expect(state.activeChatRunId).toBe("run-reconnect"); + expect(setActivityStatus).not.toHaveBeenCalledWith("idle"); + + handlers.reconnectStreamingWatchdog(); + + expect(setActivityStatus).toHaveBeenCalledWith("streaming"); + expect(state.activeChatRunId).toBe("run-reconnect"); + + vi.advanceTimersByTime(5_001); + + expect(setActivityStatus).toHaveBeenLastCalledWith("idle"); + expect(state.activeChatRunId).toBeNull(); + expect(loadHistory).toHaveBeenCalledTimes(1); + + handlers.dispose?.(); + }); + + it("reloads history only once when reconnect recovery and deferred history refresh overlap", () => { + const { state, loadHistory, noteLocalRunId, handlers } = createHarness({ + streamingWatchdogMs: 5_000, + }); + + handlers.handleChatEvent({ + runId: "run-reconnect", + sessionKey: state.currentSessionKey, + state: "delta", + message: { content: "hello" }, + } satisfies ChatEvent); + + noteLocalRunId("run-local-empty"); + handlers.handleChatEvent({ + runId: "run-local-empty", + sessionKey: state.currentSessionKey, + state: "final", + } satisfies ChatEvent); + + handlers.pauseStreamingWatchdog(); + handlers.reconnectStreamingWatchdog(); + vi.advanceTimersByTime(5_001); + + expect(loadHistory).toHaveBeenCalledTimes(1); + + handlers.dispose?.(); + }); + + it("resets to idle when reconnect drops an active run that is no longer tracked", () => { + const { state, setActivityStatus, handlers } = createHarness({ + streamingWatchdogMs: 5_000, + }); + state.activeChatRunId = "run-stale"; + state.activityStatus = "streaming"; + + handlers.reconnectStreamingWatchdog(); + + expect(state.activeChatRunId).toBeNull(); + expect(state.activityStatus).toBe("idle"); + expect(setActivityStatus).toHaveBeenLastCalledWith("idle"); + + handlers.dispose?.(); + }); + + it("keeps reconnect recovery armed when only terminal lifecycle arrives after reconnect", () => { + const { state, chatLog, setActivityStatus, loadHistory, handlers } = createHarness({ + streamingWatchdogMs: 5_000, + }); + + handlers.handleChatEvent({ + runId: "run-lifecycle-only", + sessionKey: state.currentSessionKey, + state: "delta", + message: { content: "hello" }, + } satisfies ChatEvent); + + handlers.pauseStreamingWatchdog(); + handlers.reconnectStreamingWatchdog(); + + handlers.handleAgentEvent({ + runId: "run-lifecycle-only", + stream: "lifecycle", + data: { phase: "end" }, + } satisfies AgentEvent); + + vi.advanceTimersByTime(5_001); + + expect(setActivityStatus).toHaveBeenLastCalledWith("idle"); + expect(state.activeChatRunId).toBeNull(); + expect(loadHistory).toHaveBeenCalledTimes(1); + expect(chatLog.addSystem).not.toHaveBeenCalledWith( + expect.stringContaining("streaming watchdog"), + ); + + handlers.dispose?.(); + }); + it("cancels the watchdog when the run finalizes normally", () => { const { state, chatLog, setActivityStatus, handlers } = createHarness({ streamingWatchdogMs: 5_000, diff --git a/src/tui/tui-event-handlers.ts b/src/tui/tui-event-handlers.ts index b4e5b945755..e97687cb8a4 100644 --- a/src/tui/tui-event-handlers.ts +++ b/src/tui/tui-event-handlers.ts @@ -72,6 +72,7 @@ export function createEventHandlers(context: EventHandlerContext) { let streamAssembler = new TuiStreamAssembler(); let lastSessionKey = state.currentSessionKey; let pendingHistoryRefresh = false; + let reconnectPendingRunId: string | null = null; const streamingWatchdogMs = typeof context.streamingWatchdogMs === "number" && @@ -98,6 +99,10 @@ export function createEventHandlers(context: EventHandlerContext) { streamingWatchdogRunId = null; }; + const pauseStreamingWatchdog = () => { + clearStreamingWatchdog(); + }; + const armStreamingWatchdog = (runId: string) => { if (streamingWatchdogMs <= 0) { return; @@ -115,6 +120,13 @@ export function createEventHandlers(context: EventHandlerContext) { state.activeChatRunId = null; state.activityStatus = "idle"; setActivityStatus("idle"); + if (reconnectPendingRunId === runId) { + reconnectPendingRunId = null; + pendingHistoryRefresh = false; + void loadHistory?.(); + tui.requestRender(); + return; + } flushPendingHistoryRefreshIfIdle(); chatLog.addSystem( `streaming watchdog: no stream updates for ${Math.round( @@ -162,6 +174,7 @@ export function createEventHandlers(context: EventHandlerContext) { streamAssembler = new TuiStreamAssembler(); pendingHistoryRefresh = false; state.pendingOptimisticUserMessage = false; + reconnectPendingRunId = null; clearLocalRunIds?.(); clearLocalBtwRunIds?.(); btw.clear(); @@ -210,6 +223,27 @@ export function createEventHandlers(context: EventHandlerContext) { flushPendingHistoryRefreshIfIdle(); }; + const reconnectStreamingWatchdog = () => { + clearStreamingWatchdog(); + const activeRunId = state.activeChatRunId; + if (!activeRunId) { + reconnectPendingRunId = null; + clearStaleStreamingRunIfNoTrackedRunRemains(); + return; + } + if (!sessionRuns.has(activeRunId)) { + reconnectPendingRunId = null; + state.activeChatRunId = null; + state.activityStatus = "idle"; + setActivityStatus("idle"); + flushPendingHistoryRefreshIfIdle(); + return; + } + reconnectPendingRunId = activeRunId; + setActivityStatus("streaming"); + armStreamingWatchdog(activeRunId); + }; + const finalizeRun = (params: { runId: string; wasActiveRun: boolean; @@ -324,6 +358,9 @@ export function createEventHandlers(context: EventHandlerContext) { return; } } + if (reconnectPendingRunId === evt.runId) { + reconnectPendingRunId = null; + } noteSessionRun(evt.runId); if (!state.activeChatRunId && !isLocalBtwRunId?.(evt.runId)) { state.activeChatRunId = evt.runId; @@ -435,6 +472,9 @@ export function createEventHandlers(context: EventHandlerContext) { return; } if (evt.stream === "tool") { + if (isActiveRun) { + armStreamingWatchdog(evt.runId); + } const verbose = state.sessionInfo.verboseLevel ?? "off"; const allowToolEvents = verbose !== "off"; const allowToolOutput = verbose === "full"; @@ -474,6 +514,9 @@ export function createEventHandlers(context: EventHandlerContext) { return; } const phase = typeof evt.data?.phase === "string" ? evt.data.phase : ""; + if (phase && phase !== "end" && phase !== "error") { + armStreamingWatchdog(evt.runId); + } if (phase === "start") { setActivityStatus("running"); } @@ -516,5 +559,12 @@ export function createEventHandlers(context: EventHandlerContext) { clearStreamingWatchdog(); }; - return { handleChatEvent, handleAgentEvent, handleBtwEvent, dispose }; + return { + handleChatEvent, + handleAgentEvent, + handleBtwEvent, + pauseStreamingWatchdog, + reconnectStreamingWatchdog, + dispose, + }; } diff --git a/src/tui/tui.ts b/src/tui/tui.ts index bb0af53dfe5..88fc58feda8 100644 --- a/src/tui/tui.ts +++ b/src/tui/tui.ts @@ -900,7 +900,13 @@ export async function runTui(opts: RunTuiOptions): Promise { abortActive, } = sessionActions; - const { handleChatEvent, handleAgentEvent, handleBtwEvent } = createEventHandlers({ + const { + handleChatEvent, + handleAgentEvent, + handleBtwEvent, + pauseStreamingWatchdog, + reconnectStreamingWatchdog, + } = createEventHandlers({ chatLog, btw, tui, @@ -1069,6 +1075,9 @@ export async function runTui(opts: RunTuiOptions): Promise { pairingHintShown = false; const reconnected = wasDisconnected; wasDisconnected = false; + if (reconnected) { + reconnectStreamingWatchdog(); + } setConnectionStatus(isLocalMode ? "local ready" : "connected"); void (async () => { await refreshAgents(); @@ -1092,6 +1101,7 @@ export async function runTui(opts: RunTuiOptions): Promise { isConnected = false; wasDisconnected = true; historyLoaded = false; + pauseStreamingWatchdog(); const disconnectState = isLocalMode ? { connectionStatus: `local runtime stopped${reason ? `: ${reason}` : ""}`,