diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e12a54a394..d3c76723242 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ Docs: https://docs.openclaw.ai - Gateway/containers: auto-bind to `0.0.0.0` during container startup for Docker and Podman compatibility, while keeping host-side status and doctor checks on the hardened loopback default when `gateway.bind` is unset. (#61818) Thanks @openperf. - TUI/status: route `/status` through the shared session-status command and move the old gateway-wide diagnostic summary to `/gateway-status` (`/gwstatus`). Thanks @vincentkoc. - Agents/history: use one shared assistant-visible sanitizer across embedded delivery and chat-history extraction so leaked `` and `` XML blocks stay hidden from user-facing replies. (#61729) Thanks @openperf. +- Gateway/TUI: defer terminal chat finalization for per-attempt lifecycle errors so fallback retries keep streaming before the run is marked failed. (#60043) Thanks @jwchmodx. ## 2026.4.5 diff --git a/src/gateway/server-chat.agent-events.test.ts b/src/gateway/server-chat.agent-events.test.ts index 0fd00690593..91de3ba3d62 100644 --- a/src/gateway/server-chat.agent-events.test.ts +++ b/src/gateway/server-chat.agent-events.test.ts @@ -48,18 +48,22 @@ describe("agent event handler", () => { }); afterEach(() => { + vi.useRealTimers(); resetAgentRunContextForTest(); }); function createHarness(params?: { now?: number; resolveSessionKeyForRun?: (runId: string) => string | undefined; + lifecycleErrorRetryGraceMs?: number; + isChatSendRunActive?: (runId: string) => boolean; }) { const nowSpy = params?.now === undefined ? undefined : vi.spyOn(Date, "now").mockReturnValue(params.now); const broadcast = vi.fn(); const broadcastToConnIds = vi.fn(); const nodeSendToSession = vi.fn(); + const clearAgentRunContext = vi.fn(); const agentRunSeq = new Map(); const chatRunState = createChatRunState(); const toolEventRecipients = createToolEventRecipientRegistry(); @@ -72,9 +76,11 @@ describe("agent event handler", () => { agentRunSeq, chatRunState, resolveSessionKeyForRun: params?.resolveSessionKeyForRun ?? (() => undefined), - clearAgentRunContext: vi.fn(), + clearAgentRunContext, toolEventRecipients, sessionEventSubscribers, + lifecycleErrorRetryGraceMs: params?.lifecycleErrorRetryGraceMs, + isChatSendRunActive: params?.isChatSendRunActive, }); return { @@ -82,6 +88,7 @@ describe("agent event handler", () => { broadcast, broadcastToConnIds, nodeSendToSession, + clearAgentRunContext, agentRunSeq, chatRunState, toolEventRecipients, @@ -1066,6 +1073,148 @@ describe("agent event handler", () => { expect(nodePayload.runId).toBe("run-fallback-client"); }); + it("keeps chat-linked run remapping alive across per-attempt lifecycle errors", () => { + vi.useFakeTimers(); + const { broadcast, chatRunState, clearAgentRunContext, agentRunSeq, handler } = createHarness({ + resolveSessionKeyForRun: () => "session-fallback", + lifecycleErrorRetryGraceMs: 100, + }); + chatRunState.registry.add("run-fallback-retry", { + sessionKey: "session-fallback", + clientRunId: "run-fallback-client", + }); + + handler({ + runId: "run-fallback-retry", + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "draft" }, + }); + handler({ + runId: "run-fallback-retry", + seq: 2, + stream: "lifecycle", + ts: Date.now(), + data: { phase: "error", error: "provider failed" }, + }); + + expect(chatRunState.registry.peek("run-fallback-retry")).toEqual({ + sessionKey: "session-fallback", + clientRunId: "run-fallback-client", + }); + expect(clearAgentRunContext).not.toHaveBeenCalled(); + expect(agentRunSeq.get("run-fallback-retry")).toBe(2); + + emitFallbackLifecycle({ + handler, + runId: "run-fallback-retry", + seq: 3, + sessionKey: "session-fallback", + }); + const agentCalls = broadcast.mock.calls.filter(([event]) => event === "agent"); + const fallbackPayload = agentCalls.at(-1)?.[1] as { + runId?: string; + data?: Record; + }; + expect(fallbackPayload.runId).toBe("run-fallback-client"); + expect(fallbackPayload.data?.phase).toBe("fallback"); + + emitLifecycleEnd(handler, "run-fallback-retry", 4); + + expect( + chatBroadcastCalls(broadcast).some( + ([, payload]) => (payload as { state?: string }).state === "error", + ), + ).toBe(false); + const finalPayload = chatBroadcastCalls(broadcast).at(-1)?.[1] as { + state?: string; + runId?: string; + }; + expect(finalPayload.state).toBe("final"); + expect(finalPayload.runId).toBe("run-fallback-client"); + expect(clearAgentRunContext).toHaveBeenCalledWith("run-fallback-retry"); + expect(agentRunSeq.has("run-fallback-retry")).toBe(false); + }); + + it("defers terminal lifecycle-error cleanup for non-chat-send runs until the retry grace expires", () => { + vi.useFakeTimers(); + const { broadcast, clearAgentRunContext, agentRunSeq, handler } = createHarness({ + resolveSessionKeyForRun: () => "session-terminal-error", + lifecycleErrorRetryGraceMs: 100, + }); + registerAgentRunContext("run-terminal-error", { sessionKey: "session-terminal-error" }); + + handler({ + runId: "run-terminal-error", + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "partial" }, + }); + handler({ + runId: "run-terminal-error", + seq: 2, + stream: "lifecycle", + ts: Date.now(), + data: { phase: "error", error: "still broken" }, + }); + + expect(clearAgentRunContext).not.toHaveBeenCalled(); + expect(agentRunSeq.get("run-terminal-error")).toBe(2); + expect( + chatBroadcastCalls(broadcast).some( + ([, payload]) => (payload as { state?: string }).state === "error", + ), + ).toBe(false); + + vi.advanceTimersByTime(100); + + const finalPayload = chatBroadcastCalls(broadcast).at(-1)?.[1] as { + state?: string; + runId?: string; + }; + expect(finalPayload.state).toBe("error"); + expect(finalPayload.runId).toBe("run-terminal-error"); + expect(clearAgentRunContext).toHaveBeenCalledWith("run-terminal-error"); + expect(agentRunSeq.has("run-terminal-error")).toBe(false); + }); + + it("suppresses delayed lifecycle chat errors for active chat.send runs while still cleaning up", () => { + vi.useFakeTimers(); + const { broadcast, clearAgentRunContext, agentRunSeq, handler } = createHarness({ + resolveSessionKeyForRun: () => "session-chat-send", + lifecycleErrorRetryGraceMs: 100, + isChatSendRunActive: (runId) => runId === "run-chat-send", + }); + registerAgentRunContext("run-chat-send", { sessionKey: "session-chat-send" }); + + handler({ + runId: "run-chat-send", + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "partial" }, + }); + handler({ + runId: "run-chat-send", + seq: 2, + stream: "lifecycle", + ts: Date.now(), + data: { phase: "error", error: "chat.send failed" }, + }); + + vi.advanceTimersByTime(100); + + expect( + chatBroadcastCalls(broadcast).some( + ([, payload]) => (payload as { state?: string }).state === "error", + ), + ).toBe(false); + expect(clearAgentRunContext).toHaveBeenCalledWith("run-chat-send"); + expect(agentRunSeq.has("run-chat-send")).toBe(false); + }); + it("suppresses chat and node session events for non-control-UI-visible runs", () => { const { broadcast, nodeSendToSession, handler } = createHarness({ resolveSessionKeyForRun: () => "session-hidden", diff --git a/src/gateway/server-chat.ts b/src/gateway/server-chat.ts index d96a07c72fe..f85331ff809 100644 --- a/src/gateway/server-chat.ts +++ b/src/gateway/server-chat.ts @@ -263,6 +263,11 @@ type ToolRecipientEntry = { const TOOL_EVENT_RECIPIENT_TTL_MS = 10 * 60 * 1000; const TOOL_EVENT_RECIPIENT_FINAL_GRACE_MS = 30 * 1000; +/** + * Keep this aligned with the agent.wait lifecycle-error grace so chat surfaces + * do not finalize a run before fallback or retry reuses the same runId. + */ +const AGENT_LIFECYCLE_ERROR_RETRY_GRACE_MS = 15_000; export function createSessionEventSubscriberRegistry(): SessionEventSubscriberRegistry { const connIds = new Set(); @@ -449,6 +454,8 @@ export type AgentEventHandlerOptions = { clearAgentRunContext: (runId: string) => void; toolEventRecipients: ToolEventRecipientRegistry; sessionEventSubscribers: SessionEventSubscriberRegistry; + lifecycleErrorRetryGraceMs?: number; + isChatSendRunActive?: (runId: string) => boolean; }; export function createAgentEventHandler({ @@ -461,7 +468,26 @@ export function createAgentEventHandler({ clearAgentRunContext, toolEventRecipients, sessionEventSubscribers, + lifecycleErrorRetryGraceMs = AGENT_LIFECYCLE_ERROR_RETRY_GRACE_MS, + isChatSendRunActive = () => false, }: AgentEventHandlerOptions) { + const pendingTerminalLifecycleErrors = new Map(); + + const clearBufferedChatState = (clientRunId: string) => { + chatRunState.buffers.delete(clientRunId); + chatRunState.deltaSentAt.delete(clientRunId); + chatRunState.deltaLastBroadcastLen.delete(clientRunId); + }; + + const clearPendingTerminalLifecycleError = (runId: string) => { + const pending = pendingTerminalLifecycleErrors.get(runId); + if (!pending) { + return; + } + clearTimeout(pending); + pendingTerminalLifecycleErrors.delete(runId); + }; + const buildSessionEventSnapshot = (sessionKey: string, evt?: AgentEventPayload) => { const row = loadGatewaySessionRow(sessionKey); const lifecyclePatch = evt @@ -531,6 +557,110 @@ export function createAgentEventHandler({ }; }; + const finalizeLifecycleEvent = ( + evt: AgentEventPayload, + opts?: { skipChatErrorFinal?: boolean }, + ) => { + const lifecyclePhase = + evt.stream === "lifecycle" && typeof evt.data?.phase === "string" ? evt.data.phase : null; + if (lifecyclePhase !== "end" && lifecyclePhase !== "error") { + return; + } + + clearPendingTerminalLifecycleError(evt.runId); + + const chatLink = chatRunState.registry.peek(evt.runId); + const eventSessionKey = + typeof evt.sessionKey === "string" && evt.sessionKey.trim() ? evt.sessionKey : undefined; + const isControlUiVisible = getAgentRunContext(evt.runId)?.isControlUiVisible ?? true; + const sessionKey = + chatLink?.sessionKey ?? eventSessionKey ?? resolveSessionKeyForRun(evt.runId); + const clientRunId = chatLink?.clientRunId ?? evt.runId; + const eventRunId = chatLink?.clientRunId ?? evt.runId; + const isAborted = + chatRunState.abortedRuns.has(clientRunId) || chatRunState.abortedRuns.has(evt.runId); + + if (isControlUiVisible && sessionKey) { + if (!isAborted) { + const evtStopReason = + typeof evt.data?.stopReason === "string" ? evt.data.stopReason : undefined; + if (chatLink) { + const finished = chatRunState.registry.shift(evt.runId); + if (!finished) { + clearAgentRunContext(evt.runId); + return; + } + if (!(opts?.skipChatErrorFinal && lifecyclePhase === "error")) { + emitChatFinal( + finished.sessionKey, + finished.clientRunId, + evt.runId, + evt.seq, + lifecyclePhase === "error" ? "error" : "done", + evt.data?.error, + evtStopReason, + ); + } + } else if (!(opts?.skipChatErrorFinal && lifecyclePhase === "error")) { + emitChatFinal( + sessionKey, + eventRunId, + evt.runId, + evt.seq, + lifecyclePhase === "error" ? "error" : "done", + evt.data?.error, + evtStopReason, + ); + } + } else { + chatRunState.abortedRuns.delete(clientRunId); + chatRunState.abortedRuns.delete(evt.runId); + clearBufferedChatState(clientRunId); + if (chatLink) { + chatRunState.registry.remove(evt.runId, clientRunId, sessionKey); + } + } + } + + toolEventRecipients.markFinal(evt.runId); + clearAgentRunContext(evt.runId); + agentRunSeq.delete(evt.runId); + agentRunSeq.delete(clientRunId); + + if (sessionKey) { + void persistGatewaySessionLifecycleEvent({ sessionKey, event: evt }).catch(() => undefined); + const sessionEventConnIds = sessionEventSubscribers.getAll(); + if (sessionEventConnIds.size > 0) { + broadcastToConnIds( + "sessions.changed", + { + sessionKey, + phase: lifecyclePhase, + runId: evt.runId, + ts: evt.ts, + ...buildSessionEventSnapshot(sessionKey, evt), + }, + sessionEventConnIds, + { dropIfSlow: true }, + ); + } + } + }; + + const scheduleTerminalLifecycleError = ( + evt: AgentEventPayload, + opts?: { skipChatErrorFinal?: boolean }, + ) => { + clearPendingTerminalLifecycleError(evt.runId); + const delayMs = Math.max(1, Math.min(Math.floor(lifecycleErrorRetryGraceMs), 2_147_483_647)); + const timer = setTimeout(() => { + pendingTerminalLifecycleErrors.delete(evt.runId); + finalizeLifecycleEvent(evt, opts); + }, delayMs); + timer.unref?.(); + pendingTerminalLifecycleErrors.set(evt.runId, timer); + }; + const emitChatDelta = ( sessionKey: string, clientRunId: string, @@ -714,6 +844,12 @@ export function createAgentEventHandler({ }; return (evt: AgentEventPayload) => { + const lifecyclePhase = + evt.stream === "lifecycle" && typeof evt.data?.phase === "string" ? evt.data.phase : null; + if (evt.stream !== "lifecycle" || lifecyclePhase !== "error") { + clearPendingTerminalLifecycleError(evt.runId); + } + const chatLink = chatRunState.registry.peek(evt.runId); const eventSessionKey = typeof evt.sessionKey === "string" && evt.sessionKey.trim() ? evt.sessionKey : undefined; @@ -800,9 +936,6 @@ export function createAgentEventHandler({ broadcast("agent", agentPayload); } - const lifecyclePhase = - evt.stream === "lifecycle" && typeof evt.data?.phase === "string" ? evt.data.phase : null; - if (isControlUiVisible && sessionKey) { // Send tool events to node/channel subscribers only when verbose is enabled; // WS clients already received the event above via broadcastToConnIds. @@ -815,57 +948,26 @@ export function createAgentEventHandler({ } if (!isAborted && evt.stream === "assistant" && typeof evt.data?.text === "string") { emitChatDelta(sessionKey, clientRunId, evt.runId, evt.seq, evt.data.text, evt.data.delta); - } else if (!isAborted && (lifecyclePhase === "end" || lifecyclePhase === "error")) { - const evtStopReason = - typeof evt.data?.stopReason === "string" ? evt.data.stopReason : undefined; - if (chatLink) { - const finished = chatRunState.registry.shift(evt.runId); - if (!finished) { - clearAgentRunContext(evt.runId); - return; - } - emitChatFinal( - finished.sessionKey, - finished.clientRunId, - evt.runId, - evt.seq, - lifecyclePhase === "error" ? "error" : "done", - evt.data?.error, - evtStopReason, - ); - } else { - emitChatFinal( - sessionKey, - eventRunId, - evt.runId, - evt.seq, - lifecyclePhase === "error" ? "error" : "done", - evt.data?.error, - evtStopReason, - ); - } - } else if (isAborted && (lifecyclePhase === "end" || lifecyclePhase === "error")) { - chatRunState.abortedRuns.delete(clientRunId); - chatRunState.abortedRuns.delete(evt.runId); - chatRunState.buffers.delete(clientRunId); - chatRunState.deltaSentAt.delete(clientRunId); - if (chatLink) { - chatRunState.registry.remove(evt.runId, clientRunId, sessionKey); - } } } - if (lifecyclePhase === "end" || lifecyclePhase === "error") { - toolEventRecipients.markFinal(evt.runId); - clearAgentRunContext(evt.runId); - agentRunSeq.delete(evt.runId); - agentRunSeq.delete(clientRunId); + if (lifecyclePhase === "error") { + clearBufferedChatState(clientRunId); + const skipChatErrorFinal = isChatSendRunActive(evt.runId) && !chatLink; + if (isAborted || lifecycleErrorRetryGraceMs <= 0) { + finalizeLifecycleEvent(evt, { skipChatErrorFinal }); + } else { + scheduleTerminalLifecycleError(evt, { skipChatErrorFinal }); + } + return; } - if ( - sessionKey && - (lifecyclePhase === "start" || lifecyclePhase === "end" || lifecyclePhase === "error") - ) { + if (lifecyclePhase === "end") { + finalizeLifecycleEvent(evt); + return; + } + + if (sessionKey && lifecyclePhase === "start") { void persistGatewaySessionLifecycleEvent({ sessionKey, event: evt }).catch(() => undefined); const sessionEventConnIds = sessionEventSubscribers.getAll(); if (sessionEventConnIds.size > 0) { diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 3bc6a6e540e..ff51d99b1da 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -971,6 +971,7 @@ export async function startGatewayServer( clearAgentRunContext, toolEventRecipients, sessionEventSubscribers, + isChatSendRunActive: (runId) => chatAbortControllers.has(runId), }), );