diff --git a/CHANGELOG.md b/CHANGELOG.md index 267f3bf6369..1e669362076 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ Docs: https://docs.openclaw.ai - Agents/failover: treat HTML provider error pages as upstream transport failures for CDN-style 5xx responses without misclassifying embedded body text as API rate limits, while still preserving auth remediation for HTML 401/403 pages and proxy remediation for HTML 407 pages. (#67642) Thanks @stainlu. - Gateway/skills: bump the cached skills-snapshot version whenever a config write touches `skills.*` (for example `skills.allowBundled`, `skills.entries..enabled`, or `skills.profile`). Existing agent sessions persist a `skillsSnapshot` in `sessions.json` that reuses the skill list frozen at session creation; without this invalidation, removing a bundled skill from the allowlist left the old snapshot live and the model kept calling the disabled tool, producing `Tool not found` loops that ran until the embedded-run timeout. (#67401) Thanks @xantorres. - Agents/tool-loop: enable the unknown-tool stream guard by default. Previously `resolveUnknownToolGuardThreshold` returned `undefined` unless `tools.loopDetection.enabled` was explicitly set to `true`, which left the protection off in the default configuration. A hallucinated or removed tool (for example `himalaya` after it was dropped from `skills.allowBundled`) would then loop "Tool X not found" attempts until the full embedded-run timeout. The guard has no false-positive surface because it only triggers on tools that are objectively not registered in the run, so it now stays on regardless of `tools.loopDetection.enabled` and still accepts `tools.loopDetection.unknownToolThreshold` as a per-run override (default 10). (#67401) Thanks @xantorres. +- TUI/streaming: add a client-side streaming watchdog to `tui-event-handlers` so the `streaming · Xm Ys` activity indicator resets to `idle` after 30s of delta silence on the active run. Guards against lost or late `state: "final"` chat events (WS reconnects, gateway restarts, etc.) leaving the TUI stuck on `streaming` indefinitely; a new system log line surfaces the reset so users know to send a new message to resync. The window is configurable via the new `streamingWatchdogMs` context option (set to `0` to disable), and the handler now exposes a `dispose()` that clears the pending timer on shutdown. (#67401) Thanks @xantorres. ## 2026.4.15-beta.1 diff --git a/src/tui/tui-event-handlers.test.ts b/src/tui/tui-event-handlers.test.ts index 29dcb1f77d1..625ea1b3add 100644 --- a/src/tui/tui-event-handlers.test.ts +++ b/src/tui/tui-event-handlers.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { createEventHandlers } from "./tui-event-handlers.js"; import type { AgentEvent, BtwEvent, ChatEvent, TuiStateAccess } from "./tui-types.js"; @@ -651,3 +651,188 @@ describe("tui-event-handlers: handleAgentEvent", () => { expect(loadHistory).toHaveBeenCalledTimes(1); }); }); + +describe("tui-event-handlers: streaming watchdog", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + const makeState = (overrides?: Partial): TuiStateAccess => ({ + agentDefaultId: "main", + sessionMainKey: "agent:main:main", + sessionScope: "global", + agents: [], + currentAgentId: "main", + currentSessionKey: "agent:main:main", + currentSessionId: "session-1", + activeChatRunId: null, + pendingOptimisticUserMessage: false, + historyLoaded: true, + sessionInfo: { verboseLevel: "on" }, + initialSessionApplied: true, + isConnected: true, + autoMessageSent: false, + toolsExpanded: false, + showThinking: false, + connectionStatus: "connected", + activityStatus: "idle", + statusTimeout: null, + lastCtrlCAt: 0, + ...overrides, + }); + + const createHarness = (options?: { streamingWatchdogMs?: number }) => { + const state = makeState(); + const chatLog = createMockChatLog(); + const btw = createMockBtwPresenter(); + const tui = { requestRender: vi.fn() } as unknown as MockTui & HandlerTui; + const setActivityStatus = vi.fn(); + const handlers = createEventHandlers({ + chatLog, + btw, + tui, + state, + setActivityStatus, + streamingWatchdogMs: options?.streamingWatchdogMs, + }); + return { state, chatLog, tui, setActivityStatus, handlers }; + }; + + it("resets activityStatus to idle when no stream delta arrives for the watchdog window", () => { + const { state, chatLog, setActivityStatus, handlers } = createHarness({ + streamingWatchdogMs: 5_000, + }); + + handlers.handleChatEvent({ + runId: "run-stuck", + sessionKey: state.currentSessionKey, + state: "delta", + message: { content: "hello" }, + } satisfies ChatEvent); + + expect(setActivityStatus).toHaveBeenLastCalledWith("streaming"); + expect(state.activeChatRunId).toBe("run-stuck"); + + vi.advanceTimersByTime(5_001); + + expect(setActivityStatus).toHaveBeenLastCalledWith("idle"); + expect(state.activeChatRunId).toBeNull(); + expect(chatLog.addSystem).toHaveBeenCalledWith( + expect.stringContaining("streaming watchdog"), + ); + + handlers.dispose?.(); + }); + + it("refreshes the watchdog window on each new stream delta", () => { + const { state, setActivityStatus, handlers } = createHarness({ + streamingWatchdogMs: 5_000, + }); + + handlers.handleChatEvent({ + runId: "run-flow", + sessionKey: state.currentSessionKey, + state: "delta", + message: { content: "first" }, + } satisfies ChatEvent); + + vi.advanceTimersByTime(3_000); + + handlers.handleChatEvent({ + runId: "run-flow", + sessionKey: state.currentSessionKey, + state: "delta", + message: { content: "second" }, + } satisfies ChatEvent); + + vi.advanceTimersByTime(3_000); + + // 6s total, but the latest delta was only 3s ago, so the watchdog must not + // have fired yet. + expect(setActivityStatus).not.toHaveBeenCalledWith("idle"); + expect(state.activeChatRunId).toBe("run-flow"); + + vi.advanceTimersByTime(2_500); + + expect(setActivityStatus).toHaveBeenLastCalledWith("idle"); + expect(state.activeChatRunId).toBeNull(); + + handlers.dispose?.(); + }); + + it("cancels the watchdog when the run finalizes normally", () => { + const { state, chatLog, setActivityStatus, handlers } = createHarness({ + streamingWatchdogMs: 5_000, + }); + + handlers.handleChatEvent({ + runId: "run-normal", + sessionKey: state.currentSessionKey, + state: "delta", + message: { content: "hi" }, + } satisfies ChatEvent); + handlers.handleChatEvent({ + runId: "run-normal", + sessionKey: state.currentSessionKey, + state: "final", + message: { content: [{ type: "text", text: "done" }], stopReason: "stop" }, + } satisfies ChatEvent); + + vi.advanceTimersByTime(10_000); + + // After a normal final, the watchdog timer must have been cancelled and + // cannot later re-overwrite the status or emit the warning banner. + const statusCalls = setActivityStatus.mock.calls.map((c) => c[0]); + expect(statusCalls.filter((s) => s === "idle").length).toBe(1); + expect(chatLog.addSystem).not.toHaveBeenCalledWith( + expect.stringContaining("streaming watchdog"), + ); + expect(state.activeChatRunId).toBeNull(); + + handlers.dispose?.(); + }); + + it("is disabled when streamingWatchdogMs is 0", () => { + const { state, chatLog, setActivityStatus, handlers } = createHarness({ + streamingWatchdogMs: 0, + }); + + handlers.handleChatEvent({ + runId: "run-no-watchdog", + sessionKey: state.currentSessionKey, + state: "delta", + message: { content: "hi" }, + } satisfies ChatEvent); + + vi.advanceTimersByTime(60_000); + + expect(setActivityStatus).not.toHaveBeenCalledWith("idle"); + expect(chatLog.addSystem).not.toHaveBeenCalled(); + expect(state.activeChatRunId).toBe("run-no-watchdog"); + + handlers.dispose?.(); + }); + + it("dispose clears a pending watchdog without firing it", () => { + const { setActivityStatus, chatLog, handlers, state } = createHarness({ + streamingWatchdogMs: 5_000, + }); + + handlers.handleChatEvent({ + runId: "run-dispose", + sessionKey: state.currentSessionKey, + state: "delta", + message: { content: "hi" }, + } satisfies ChatEvent); + + handlers.dispose?.(); + vi.advanceTimersByTime(10_000); + + expect(setActivityStatus).not.toHaveBeenCalledWith("idle"); + expect(chatLog.addSystem).not.toHaveBeenCalled(); + }); +}); diff --git a/src/tui/tui-event-handlers.ts b/src/tui/tui-event-handlers.ts index 3519a95c57b..1379a1a6506 100644 --- a/src/tui/tui-event-handlers.ts +++ b/src/tui/tui-event-handlers.ts @@ -41,8 +41,18 @@ type EventHandlerContext = { isLocalBtwRunId?: (runId: string) => boolean; forgetLocalBtwRunId?: (runId: string) => void; clearLocalBtwRunIds?: () => void; + /** + * Milliseconds of stream-delta silence that force the `streaming` activity + * indicator to reset to `idle`. Guards against lost/late "final" events from + * the gateway (WS flaps, gateway restarts, backends that emit `stopReason` + * without an explicit stream-end event) leaving the TUI stuck on + * `streaming · Xm Ys` forever. Defaults to 30s. Set to 0 to disable. + */ + streamingWatchdogMs?: number; }; +const DEFAULT_STREAMING_WATCHDOG_MS = 30_000; + export function createEventHandlers(context: EventHandlerContext) { const { chatLog, @@ -66,6 +76,60 @@ export function createEventHandlers(context: EventHandlerContext) { let lastSessionKey = state.currentSessionKey; let pendingHistoryRefresh = false; + const streamingWatchdogMs = + typeof context.streamingWatchdogMs === "number" && + Number.isFinite(context.streamingWatchdogMs) && + context.streamingWatchdogMs >= 0 + ? Math.floor(context.streamingWatchdogMs) + : DEFAULT_STREAMING_WATCHDOG_MS; + let streamingWatchdogTimer: ReturnType | null = null; + let streamingWatchdogRunId: string | null = null; + + const clearStreamingWatchdog = () => { + if (streamingWatchdogTimer) { + clearTimeout(streamingWatchdogTimer); + streamingWatchdogTimer = null; + } + streamingWatchdogRunId = null; + }; + + const armStreamingWatchdog = (runId: string) => { + if (streamingWatchdogMs <= 0) { + return; + } + if (streamingWatchdogTimer) { + clearTimeout(streamingWatchdogTimer); + } + streamingWatchdogRunId = runId; + streamingWatchdogTimer = setTimeout(() => { + streamingWatchdogTimer = null; + // Only act if the timer still matches the run that armed it and that run + // is still the TUI's active stream. A later `final`/`aborted`/`error` + // event already cleared the indicator by the normal path otherwise. + if (streamingWatchdogRunId !== runId) { + return; + } + streamingWatchdogRunId = null; + if (state.activeChatRunId !== runId) { + return; + } + state.activeChatRunId = null; + setActivityStatus("idle"); + chatLog.addSystem( + `streaming watchdog: no stream updates for ${Math.round( + streamingWatchdogMs / 1000, + )}s; resetting status. The backend may have dropped this run silently — send a new message to resync.`, + ); + tui.requestRender(); + }, streamingWatchdogMs); + // Keep the watchdog from blocking process exit when the TUI is shutting + // down. Node timers expose unref() on the returned Timeout object. + const maybeUnref = (streamingWatchdogTimer as { unref?: () => void }).unref; + if (typeof maybeUnref === "function") { + maybeUnref.call(streamingWatchdogTimer); + } + }; + const pruneRunMap = (runs: Map) => { if (runs.size <= 200) { return; @@ -102,6 +166,7 @@ export function createEventHandlers(context: EventHandlerContext) { clearLocalRunIds?.(); clearLocalBtwRunIds?.(); btw.clear(); + clearStreamingWatchdog(); }; const flushPendingHistoryRefreshIfIdle = () => { @@ -140,6 +205,9 @@ export function createEventHandlers(context: EventHandlerContext) { flushPendingHistoryRefreshIfIdle(); if (params.wasActiveRun) { setActivityStatus(params.status); + clearStreamingWatchdog(); + } else if (streamingWatchdogRunId === params.runId) { + clearStreamingWatchdog(); } void refreshSessionInfo?.(); }; @@ -155,6 +223,9 @@ export function createEventHandlers(context: EventHandlerContext) { flushPendingHistoryRefreshIfIdle(); if (params.wasActiveRun) { setActivityStatus(params.status); + clearStreamingWatchdog(); + } else if (streamingWatchdogRunId === params.runId) { + clearStreamingWatchdog(); } void refreshSessionInfo?.(); }; @@ -247,6 +318,7 @@ export function createEventHandlers(context: EventHandlerContext) { } chatLog.updateAssistant(displayText, evt.runId); setActivityStatus("streaming"); + armStreamingWatchdog(evt.runId); } if (evt.state === "final") { const isLocalBtwRun = isLocalBtwRunId?.(evt.runId) ?? false; @@ -412,5 +484,9 @@ export function createEventHandlers(context: EventHandlerContext) { tui.requestRender(); }; - return { handleChatEvent, handleAgentEvent, handleBtwEvent }; + const dispose = () => { + clearStreamingWatchdog(); + }; + + return { handleChatEvent, handleAgentEvent, handleBtwEvent, dispose }; }