TUI/streaming: add watchdog that resets the activity indicator after delta silence

This commit is contained in:
Xan Torres
2026-04-15 23:14:56 +02:00
committed by Ayaan Zaidi
parent 36ed36768c
commit f44ab20d4d
3 changed files with 264 additions and 2 deletions

View File

@@ -38,6 +38,7 @@ Docs: https://docs.openclaw.ai
- Agents/failover: treat HTML provider error pages as upstream transport failures for CDN-style 5xx responses without misclassifying embedded body text as API rate limits, while still preserving auth remediation for HTML 401/403 pages and proxy remediation for HTML 407 pages. (#67642) Thanks @stainlu.
- Gateway/skills: bump the cached skills-snapshot version whenever a config write touches `skills.*` (for example `skills.allowBundled`, `skills.entries.<id>.enabled`, or `skills.profile`). Existing agent sessions persist a `skillsSnapshot` in `sessions.json` that reuses the skill list frozen at session creation; without this invalidation, removing a bundled skill from the allowlist left the old snapshot live and the model kept calling the disabled tool, producing `Tool <name> not found` loops that ran until the embedded-run timeout. (#67401) Thanks @xantorres.
- Agents/tool-loop: enable the unknown-tool stream guard by default. Previously `resolveUnknownToolGuardThreshold` returned `undefined` unless `tools.loopDetection.enabled` was explicitly set to `true`, which left the protection off in the default configuration. A hallucinated or removed tool (for example `himalaya` after it was dropped from `skills.allowBundled`) would then loop "Tool X not found" attempts until the full embedded-run timeout. The guard has no false-positive surface because it only triggers on tools that are objectively not registered in the run, so it now stays on regardless of `tools.loopDetection.enabled` and still accepts `tools.loopDetection.unknownToolThreshold` as a per-run override (default 10). (#67401) Thanks @xantorres.
- TUI/streaming: add a client-side streaming watchdog to `tui-event-handlers` so the `streaming · Xm Ys` activity indicator resets to `idle` after 30s of delta silence on the active run. Guards against lost or late `state: "final"` chat events (WS reconnects, gateway restarts, etc.) leaving the TUI stuck on `streaming` indefinitely; a new system log line surfaces the reset so users know to send a new message to resync. The window is configurable via the new `streamingWatchdogMs` context option (set to `0` to disable), and the handler now exposes a `dispose()` that clears the pending timer on shutdown. (#67401) Thanks @xantorres.
## 2026.4.15-beta.1

View File

@@ -1,4 +1,4 @@
import { describe, expect, it, vi } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { createEventHandlers } from "./tui-event-handlers.js";
import type { AgentEvent, BtwEvent, ChatEvent, TuiStateAccess } from "./tui-types.js";
@@ -651,3 +651,188 @@ describe("tui-event-handlers: handleAgentEvent", () => {
expect(loadHistory).toHaveBeenCalledTimes(1);
});
});
describe("tui-event-handlers: streaming watchdog", () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});
const makeState = (overrides?: Partial<TuiStateAccess>): TuiStateAccess => ({
agentDefaultId: "main",
sessionMainKey: "agent:main:main",
sessionScope: "global",
agents: [],
currentAgentId: "main",
currentSessionKey: "agent:main:main",
currentSessionId: "session-1",
activeChatRunId: null,
pendingOptimisticUserMessage: false,
historyLoaded: true,
sessionInfo: { verboseLevel: "on" },
initialSessionApplied: true,
isConnected: true,
autoMessageSent: false,
toolsExpanded: false,
showThinking: false,
connectionStatus: "connected",
activityStatus: "idle",
statusTimeout: null,
lastCtrlCAt: 0,
...overrides,
});
const createHarness = (options?: { streamingWatchdogMs?: number }) => {
const state = makeState();
const chatLog = createMockChatLog();
const btw = createMockBtwPresenter();
const tui = { requestRender: vi.fn() } as unknown as MockTui & HandlerTui;
const setActivityStatus = vi.fn();
const handlers = createEventHandlers({
chatLog,
btw,
tui,
state,
setActivityStatus,
streamingWatchdogMs: options?.streamingWatchdogMs,
});
return { state, chatLog, tui, setActivityStatus, handlers };
};
it("resets activityStatus to idle when no stream delta arrives for the watchdog window", () => {
const { state, chatLog, setActivityStatus, handlers } = createHarness({
streamingWatchdogMs: 5_000,
});
handlers.handleChatEvent({
runId: "run-stuck",
sessionKey: state.currentSessionKey,
state: "delta",
message: { content: "hello" },
} satisfies ChatEvent);
expect(setActivityStatus).toHaveBeenLastCalledWith("streaming");
expect(state.activeChatRunId).toBe("run-stuck");
vi.advanceTimersByTime(5_001);
expect(setActivityStatus).toHaveBeenLastCalledWith("idle");
expect(state.activeChatRunId).toBeNull();
expect(chatLog.addSystem).toHaveBeenCalledWith(
expect.stringContaining("streaming watchdog"),
);
handlers.dispose?.();
});
it("refreshes the watchdog window on each new stream delta", () => {
const { state, setActivityStatus, handlers } = createHarness({
streamingWatchdogMs: 5_000,
});
handlers.handleChatEvent({
runId: "run-flow",
sessionKey: state.currentSessionKey,
state: "delta",
message: { content: "first" },
} satisfies ChatEvent);
vi.advanceTimersByTime(3_000);
handlers.handleChatEvent({
runId: "run-flow",
sessionKey: state.currentSessionKey,
state: "delta",
message: { content: "second" },
} satisfies ChatEvent);
vi.advanceTimersByTime(3_000);
// 6s total, but the latest delta was only 3s ago, so the watchdog must not
// have fired yet.
expect(setActivityStatus).not.toHaveBeenCalledWith("idle");
expect(state.activeChatRunId).toBe("run-flow");
vi.advanceTimersByTime(2_500);
expect(setActivityStatus).toHaveBeenLastCalledWith("idle");
expect(state.activeChatRunId).toBeNull();
handlers.dispose?.();
});
it("cancels the watchdog when the run finalizes normally", () => {
const { state, chatLog, setActivityStatus, handlers } = createHarness({
streamingWatchdogMs: 5_000,
});
handlers.handleChatEvent({
runId: "run-normal",
sessionKey: state.currentSessionKey,
state: "delta",
message: { content: "hi" },
} satisfies ChatEvent);
handlers.handleChatEvent({
runId: "run-normal",
sessionKey: state.currentSessionKey,
state: "final",
message: { content: [{ type: "text", text: "done" }], stopReason: "stop" },
} satisfies ChatEvent);
vi.advanceTimersByTime(10_000);
// After a normal final, the watchdog timer must have been cancelled and
// cannot later re-overwrite the status or emit the warning banner.
const statusCalls = setActivityStatus.mock.calls.map((c) => c[0]);
expect(statusCalls.filter((s) => s === "idle").length).toBe(1);
expect(chatLog.addSystem).not.toHaveBeenCalledWith(
expect.stringContaining("streaming watchdog"),
);
expect(state.activeChatRunId).toBeNull();
handlers.dispose?.();
});
it("is disabled when streamingWatchdogMs is 0", () => {
const { state, chatLog, setActivityStatus, handlers } = createHarness({
streamingWatchdogMs: 0,
});
handlers.handleChatEvent({
runId: "run-no-watchdog",
sessionKey: state.currentSessionKey,
state: "delta",
message: { content: "hi" },
} satisfies ChatEvent);
vi.advanceTimersByTime(60_000);
expect(setActivityStatus).not.toHaveBeenCalledWith("idle");
expect(chatLog.addSystem).not.toHaveBeenCalled();
expect(state.activeChatRunId).toBe("run-no-watchdog");
handlers.dispose?.();
});
it("dispose clears a pending watchdog without firing it", () => {
const { setActivityStatus, chatLog, handlers, state } = createHarness({
streamingWatchdogMs: 5_000,
});
handlers.handleChatEvent({
runId: "run-dispose",
sessionKey: state.currentSessionKey,
state: "delta",
message: { content: "hi" },
} satisfies ChatEvent);
handlers.dispose?.();
vi.advanceTimersByTime(10_000);
expect(setActivityStatus).not.toHaveBeenCalledWith("idle");
expect(chatLog.addSystem).not.toHaveBeenCalled();
});
});

View File

@@ -41,8 +41,18 @@ type EventHandlerContext = {
isLocalBtwRunId?: (runId: string) => boolean;
forgetLocalBtwRunId?: (runId: string) => void;
clearLocalBtwRunIds?: () => void;
/**
* Milliseconds of stream-delta silence that force the `streaming` activity
* indicator to reset to `idle`. Guards against lost/late "final" events from
* the gateway (WS flaps, gateway restarts, backends that emit `stopReason`
* without an explicit stream-end event) leaving the TUI stuck on
* `streaming · Xm Ys` forever. Defaults to 30s. Set to 0 to disable.
*/
streamingWatchdogMs?: number;
};
const DEFAULT_STREAMING_WATCHDOG_MS = 30_000;
export function createEventHandlers(context: EventHandlerContext) {
const {
chatLog,
@@ -66,6 +76,60 @@ export function createEventHandlers(context: EventHandlerContext) {
let lastSessionKey = state.currentSessionKey;
let pendingHistoryRefresh = false;
const streamingWatchdogMs =
typeof context.streamingWatchdogMs === "number" &&
Number.isFinite(context.streamingWatchdogMs) &&
context.streamingWatchdogMs >= 0
? Math.floor(context.streamingWatchdogMs)
: DEFAULT_STREAMING_WATCHDOG_MS;
let streamingWatchdogTimer: ReturnType<typeof setTimeout> | null = null;
let streamingWatchdogRunId: string | null = null;
const clearStreamingWatchdog = () => {
if (streamingWatchdogTimer) {
clearTimeout(streamingWatchdogTimer);
streamingWatchdogTimer = null;
}
streamingWatchdogRunId = null;
};
const armStreamingWatchdog = (runId: string) => {
if (streamingWatchdogMs <= 0) {
return;
}
if (streamingWatchdogTimer) {
clearTimeout(streamingWatchdogTimer);
}
streamingWatchdogRunId = runId;
streamingWatchdogTimer = setTimeout(() => {
streamingWatchdogTimer = null;
// Only act if the timer still matches the run that armed it and that run
// is still the TUI's active stream. A later `final`/`aborted`/`error`
// event already cleared the indicator by the normal path otherwise.
if (streamingWatchdogRunId !== runId) {
return;
}
streamingWatchdogRunId = null;
if (state.activeChatRunId !== runId) {
return;
}
state.activeChatRunId = null;
setActivityStatus("idle");
chatLog.addSystem(
`streaming watchdog: no stream updates for ${Math.round(
streamingWatchdogMs / 1000,
)}s; resetting status. The backend may have dropped this run silently — send a new message to resync.`,
);
tui.requestRender();
}, streamingWatchdogMs);
// Keep the watchdog from blocking process exit when the TUI is shutting
// down. Node timers expose unref() on the returned Timeout object.
const maybeUnref = (streamingWatchdogTimer as { unref?: () => void }).unref;
if (typeof maybeUnref === "function") {
maybeUnref.call(streamingWatchdogTimer);
}
};
const pruneRunMap = (runs: Map<string, number>) => {
if (runs.size <= 200) {
return;
@@ -102,6 +166,7 @@ export function createEventHandlers(context: EventHandlerContext) {
clearLocalRunIds?.();
clearLocalBtwRunIds?.();
btw.clear();
clearStreamingWatchdog();
};
const flushPendingHistoryRefreshIfIdle = () => {
@@ -140,6 +205,9 @@ export function createEventHandlers(context: EventHandlerContext) {
flushPendingHistoryRefreshIfIdle();
if (params.wasActiveRun) {
setActivityStatus(params.status);
clearStreamingWatchdog();
} else if (streamingWatchdogRunId === params.runId) {
clearStreamingWatchdog();
}
void refreshSessionInfo?.();
};
@@ -155,6 +223,9 @@ export function createEventHandlers(context: EventHandlerContext) {
flushPendingHistoryRefreshIfIdle();
if (params.wasActiveRun) {
setActivityStatus(params.status);
clearStreamingWatchdog();
} else if (streamingWatchdogRunId === params.runId) {
clearStreamingWatchdog();
}
void refreshSessionInfo?.();
};
@@ -247,6 +318,7 @@ export function createEventHandlers(context: EventHandlerContext) {
}
chatLog.updateAssistant(displayText, evt.runId);
setActivityStatus("streaming");
armStreamingWatchdog(evt.runId);
}
if (evt.state === "final") {
const isLocalBtwRun = isLocalBtwRunId?.(evt.runId) ?? false;
@@ -412,5 +484,9 @@ export function createEventHandlers(context: EventHandlerContext) {
tui.requestRender();
};
return { handleChatEvent, handleAgentEvent, handleBtwEvent };
const dispose = () => {
clearStreamingWatchdog();
};
return { handleChatEvent, handleAgentEvent, handleBtwEvent, dispose };
}