fix(tui): clear stale streaming after orphaned finals (#72389)

* fix(tui): clear stale streaming after orphaned finals

* fix(tui): clear stale streaming after orphaned finals

* fix(tui): clear stale streaming after orphaned finals
This commit is contained in:
Vincent Koc
2026-04-27 22:23:13 -07:00
committed by GitHub
parent db7cab4a9a
commit d7e67b455a
3 changed files with 152 additions and 13 deletions

View File

@@ -428,6 +428,7 @@ Docs: https://docs.openclaw.ai
- Gateway health: preserve live runtime-backed channel/account state in `gateway.health` snapshots and cached refreshes while keeping raw probe payloads on sensitive/admin paths only. (#39921, #42586, #46527, #52770, #42543) Thanks @FAL1989, @rstar327, @0xble, and @ajayr.
- Feishu: extract quoted/replied interactive-card text across schema 1.0, schema 2.0, i18n, template-variable, and post-format fallback shapes without carrying broad generated/config churn from related parser experiments. (#38776, #60383, #42218, #45936) Thanks @lishuaigit, @lskun, @just2gooo, and @Br1an67.
- Telegram/agents: hide raw failed write/edit warning messages in Telegram when the assistant already explicitly acknowledges the failed action, while keeping warnings when the reply claims success or omits the failure; #39406 remains the broader configurable delivery-policy follow-up. Fixes #51065; covers #39631. Thanks @Bartok9 and @Bortlesboat.
- TUI: clear stale streaming status when an orphaned final event or watchdog reset leaves no tracked active run, flushing deferred local history refreshes without surfacing inactive-run failures. Fixes #64825; carries forward #52745. Thanks @lyksdu.
- Exec approvals: accept a symlinked `OPENCLAW_HOME` as the trusted approvals root while still rejecting symlinked `.openclaw` path components below it. (#64663) Thanks @FunJim.
- Logging: add top-level `hostname`, flattened `message`, and available `agent_id`, `session_id`, and `channel` fields to file-log JSONL records for multi-agent filtering without removing existing structured log arguments. Fixes #51075. Thanks @stevengonsalvez.
- ACP: route server logs to stderr before Gateway config/bootstrap work so ACP stdout remains JSON-RPC only for IDE integrations. Fixes #49060. Thanks @Hollychou924.

View File

@@ -532,6 +532,79 @@ describe("tui-event-handlers: handleAgentEvent", () => {
expect(chatLog.updateAssistant).toHaveBeenLastCalledWith("continued", "run-active");
});
it("clears stale streaming when an orphan final arrives and no tracked run remains", () => {
const { state, setActivityStatus, handleChatEvent } = createHandlersHarness({
state: { activeChatRunId: "run-stale", activityStatus: "streaming" },
});
handleChatEvent({
runId: "run-orphan",
sessionKey: state.currentSessionKey,
state: "final",
message: { content: [{ type: "text", text: "done" }] },
});
expect(state.activeChatRunId).toBeNull();
expect(setActivityStatus).toHaveBeenCalledWith("idle");
});
it("flushes deferred history reload after stale streaming clear makes the TUI idle", () => {
const { state, loadHistory, noteLocalRunId, setActivityStatus, handleChatEvent } =
createHandlersHarness({
state: { activeChatRunId: "run-stale", activityStatus: "streaming" },
});
noteLocalRunId("run-local-empty");
loadHistory.mockImplementation(() => {
expect(state.activeChatRunId).toBeNull();
expect(state.activityStatus).toBe("idle");
});
handleChatEvent({
runId: "run-local-empty",
sessionKey: state.currentSessionKey,
state: "final",
});
expect(state.activeChatRunId).toBeNull();
expect(state.activityStatus).toBe("idle");
expect(setActivityStatus).toHaveBeenCalledWith("idle");
expect(loadHistory).toHaveBeenCalledTimes(1);
});
it("does not surface inactive orphan final failures as the global status", () => {
const { state, setActivityStatus, handleChatEvent } = createHandlersHarness({
state: { activeChatRunId: "run-stale", activityStatus: "streaming" },
});
handleChatEvent({
runId: "run-orphan-error",
sessionKey: state.currentSessionKey,
state: "final",
message: { content: [{ type: "text", text: "failed" }], stopReason: "error" },
});
expect(state.activeChatRunId).toBeNull();
expect(setActivityStatus).toHaveBeenCalledWith("idle");
expect(setActivityStatus).not.toHaveBeenCalledWith("error");
});
it("does not force idle for an inactive final while another tracked run is active", () => {
const { state, setActivityStatus, handleChatEvent } = createConcurrentRunHarness("partial");
state.activityStatus = "streaming";
setActivityStatus.mockClear();
handleChatEvent({
runId: "run-other",
sessionKey: state.currentSessionKey,
state: "final",
message: { content: [{ type: "text", text: "other final" }] },
});
expect(state.activeChatRunId).toBe("run-active");
expect(setActivityStatus).not.toHaveBeenCalledWith("idle");
});
it("suppresses non-local empty final placeholders during concurrent runs", () => {
const { state, chatLog, loadHistory, handleChatEvent } =
createConcurrentRunHarness("local stream");
@@ -715,15 +788,24 @@ describe("tui-event-handlers: streaming watchdog", () => {
const btw = createMockBtwPresenter();
const tui = { requestRender: vi.fn() } as unknown as MockTui & HandlerTui;
const setActivityStatus = vi.fn();
const loadHistory = vi.fn();
const localRunIds = new Set<string>();
const noteLocalRunId = (runId: string) => {
localRunIds.add(runId);
};
const handlers = createEventHandlers({
chatLog,
btw,
tui,
state,
setActivityStatus,
loadHistory,
noteLocalRunId,
isLocalRunId: localRunIds.has.bind(localRunIds),
forgetLocalRunId: localRunIds.delete.bind(localRunIds),
streamingWatchdogMs: options?.streamingWatchdogMs,
});
return { state, chatLog, tui, setActivityStatus, handlers };
return { state, chatLog, tui, setActivityStatus, loadHistory, noteLocalRunId, handlers };
};
it("resets activityStatus to idle when no stream delta arrives for the watchdog window", () => {
@@ -750,6 +832,37 @@ describe("tui-event-handlers: streaming watchdog", () => {
handlers.dispose?.();
});
it("flushes a deferred history reload when the watchdog clears the active run", () => {
const { state, loadHistory, noteLocalRunId, setActivityStatus, handlers } = createHarness({
streamingWatchdogMs: 5_000,
});
handlers.handleChatEvent({
runId: "run-stuck",
sessionKey: state.currentSessionKey,
state: "delta",
message: { content: "hello" },
} satisfies ChatEvent);
noteLocalRunId("run-local-empty");
handlers.handleChatEvent({
runId: "run-local-empty",
sessionKey: state.currentSessionKey,
state: "final",
} satisfies ChatEvent);
expect(loadHistory).not.toHaveBeenCalled();
vi.advanceTimersByTime(5_001);
expect(state.activeChatRunId).toBeNull();
expect(state.activityStatus).toBe("idle");
expect(setActivityStatus).toHaveBeenLastCalledWith("idle");
expect(loadHistory).toHaveBeenCalledTimes(1);
handlers.dispose?.();
});
it("refreshes the watchdog window on each new stream delta", () => {
const { state, setActivityStatus, handlers } = createHarness({
streamingWatchdogMs: 5_000,

View File

@@ -82,6 +82,14 @@ export function createEventHandlers(context: EventHandlerContext) {
let streamingWatchdogTimer: ReturnType<typeof setTimeout> | null = null;
let streamingWatchdogRunId: string | null = null;
const flushPendingHistoryRefreshIfIdle = () => {
if (!pendingHistoryRefresh || state.activeChatRunId) {
return;
}
pendingHistoryRefresh = false;
void loadHistory?.();
};
const clearStreamingWatchdog = () => {
if (streamingWatchdogTimer) {
clearTimeout(streamingWatchdogTimer);
@@ -105,7 +113,9 @@ export function createEventHandlers(context: EventHandlerContext) {
}
streamingWatchdogRunId = null;
state.activeChatRunId = null;
state.activityStatus = "idle";
setActivityStatus("idle");
flushPendingHistoryRefreshIfIdle();
chatLog.addSystem(
`streaming watchdog: no stream updates for ${Math.round(
streamingWatchdogMs / 1000,
@@ -158,14 +168,6 @@ export function createEventHandlers(context: EventHandlerContext) {
clearStreamingWatchdog();
};
const flushPendingHistoryRefreshIfIdle = () => {
if (!pendingHistoryRefresh || state.activeChatRunId) {
return;
}
pendingHistoryRefresh = false;
void loadHistory?.();
};
const resolveAuthErrorHint = (errorMessage: string): string | undefined => {
if (!localMode || !isAuthErrorMessage(errorMessage)) {
return undefined;
@@ -194,6 +196,23 @@ export function createEventHandlers(context: EventHandlerContext) {
}
};
const clearStaleStreamingRunIfNoTrackedRunRemains = () => {
const activeRunId = state.activeChatRunId;
if (
!activeRunId ||
sessionRuns.has(activeRunId) ||
sessionRuns.size > 0 ||
state.activityStatus !== "streaming"
) {
return;
}
state.activeChatRunId = null;
state.activityStatus = "idle";
setActivityStatus("idle");
clearStreamingWatchdog();
flushPendingHistoryRefreshIfIdle();
};
const finalizeRun = (params: {
runId: string;
wasActiveRun: boolean;
@@ -205,8 +224,11 @@ export function createEventHandlers(context: EventHandlerContext) {
if (params.wasActiveRun) {
setActivityStatus(params.status);
clearStreamingWatchdog();
} else if (streamingWatchdogRunId === params.runId) {
clearStreamingWatchdog();
} else {
if (streamingWatchdogRunId === params.runId) {
clearStreamingWatchdog();
}
clearStaleStreamingRunIfNoTrackedRunRemains();
}
void refreshSessionInfo?.();
};
@@ -223,8 +245,11 @@ export function createEventHandlers(context: EventHandlerContext) {
if (params.wasActiveRun) {
setActivityStatus(params.status);
clearStreamingWatchdog();
} else if (streamingWatchdogRunId === params.runId) {
clearStreamingWatchdog();
} else {
if (streamingWatchdogRunId === params.runId) {
clearStreamingWatchdog();
}
clearStaleStreamingRunIfNoTrackedRunRemains();
}
void refreshSessionInfo?.();
};