diff --git a/src/gateway/chat-abort.test.ts b/src/gateway/chat-abort.test.ts index c526917a7e6..f7e262c5fd8 100644 --- a/src/gateway/chat-abort.test.ts +++ b/src/gateway/chat-abort.test.ts @@ -2,12 +2,14 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import { abortChatRunById, abortChatRunsForProvider, + boundInFlightRunSnapshotForChatHistory, isChatStopCommandText, registerChatAbortController, resolveAgentRunExpiresAtMs, resolveChatRunExpiresAtMs, type ChatAbortOps, type ChatAbortControllerEntry, + resolveInFlightRunSnapshot, updateChatRunProvider, } from "./chat-abort.js"; @@ -349,3 +351,262 @@ describe("abortChatRunsForProvider", () => { ); }); }); + +describe("resolveInFlightRunSnapshot", () => { + const inFlightEntry = ( + sessionKey: string, + opts?: { + agentId?: string; + aborted?: boolean; + projectSessionActive?: boolean; + startedAtMs?: number; + kind?: ChatAbortControllerEntry["kind"]; + }, + ): ChatAbortControllerEntry => { + const now = Date.now(); + const controller = new AbortController(); + if (opts?.aborted) { + controller.abort(); + } + const startedAtMs = opts?.startedAtMs ?? now; + return { + controller, + sessionId: "sess-1", + sessionKey, + agentId: opts?.agentId, + startedAtMs, + expiresAtMs: startedAtMs + 10_000, + projectSessionActive: opts?.projectSessionActive ?? true, + kind: opts?.kind, + }; + }; + + // Most cases request with requestedKey === canonicalKey; default canonical to + // the requested key unless a case exercises the requested/canonical split. + const snap = (p: { + chatAbortControllers: Map; + chatRunBuffers: Map; + sessionKey: string; + canonicalSessionKey?: string; + agentId?: string; + defaultAgentId?: string; + }) => + resolveInFlightRunSnapshot({ + chatAbortControllers: p.chatAbortControllers, + chatRunBuffers: p.chatRunBuffers, + requestedSessionKey: p.sessionKey, + canonicalSessionKey: p.canonicalSessionKey ?? p.sessionKey, + agentId: p.agentId, + defaultAgentId: p.defaultAgentId, + }); + + it("returns the live assistant text of a matching active run", () => { + const result = snap({ + chatAbortControllers: new Map([["run-1", inFlightEntry("agent:main:tui-x")]]), + chatRunBuffers: new Map([["run-1", "partial answer so far"]]), + sessionKey: "agent:main:tui-x", + }); + expect(result).toEqual({ runId: "run-1", text: "partial answer so far" }); + }); + + it("is a no-op when chatAbortControllers is not a Map (unpopulated context)", () => { + expect( + snap({ + chatAbortControllers: undefined as never, + chatRunBuffers: undefined as never, + sessionKey: "agent:main:s", + }), + ).toBeUndefined(); + }); + + it("matches a run stored under the canonical key when requested with a different key", () => { + // Abort entry holds the canonical store key; the client requests history with + // a different (requested) key for the same logical session. + const result = snap({ + chatAbortControllers: new Map([["run-1", inFlightEntry("agent:main:main")]]), + chatRunBuffers: new Map([["run-1", "partial"]]), + sessionKey: "main", + canonicalSessionKey: "agent:main:main", + }); + expect(result).toEqual({ runId: "run-1", text: "partial" }); + }); + + it("ignores aborted, completed (not projected active), and other-session runs", () => { + const variants: ChatAbortControllerEntry[] = [ + inFlightEntry("agent:main:s", { aborted: true }), + inFlightEntry("agent:main:s", { projectSessionActive: false }), + inFlightEntry("agent:main:other"), + ]; + for (const entry of variants) { + expect( + snap({ + chatAbortControllers: new Map([["run", entry]]), + chatRunBuffers: new Map([["run", "text"]]), + sessionKey: "agent:main:s", + }), + ).toBeUndefined(); + } + }); + + it("ignores hidden agent runs that are not visible chat sends", () => { + expect( + snap({ + chatAbortControllers: new Map([ + ["run-agent", inFlightEntry("agent:main:s", { kind: "agent" })], + ]), + chatRunBuffers: new Map([["run-agent", "hidden partial"]]), + sessionKey: "agent:main:s", + }), + ).toBeUndefined(); + }); + + it("treats an entry with undefined projectSessionActive as active (sessions.list contract)", () => { + const entry = inFlightEntry("agent:main:s"); + delete (entry as { projectSessionActive?: boolean }).projectSessionActive; + expect( + snap({ + chatAbortControllers: new Map([["run", entry]]), + chatRunBuffers: new Map([["run", "live partial"]]), + sessionKey: "agent:main:s", + }), + ).toEqual({ runId: "run", text: "live partial" }); + }); + + it("returns an active run with empty text (Codex streams no incremental text mid-run)", () => { + expect( + snap({ + chatAbortControllers: new Map([["run", inFlightEntry("agent:main:s")]]), + chatRunBuffers: new Map(), + sessionKey: "agent:main:s", + }), + ).toEqual({ runId: "run", text: "" }); + }); + + it("does not surface suppressed control-token lead fragments from the live buffer", () => { + expect( + snap({ + chatAbortControllers: new Map([["run", inFlightEntry("agent:main:s")]]), + chatRunBuffers: new Map([["run", "NO_"]]), + sessionKey: "agent:main:s", + }), + ).toEqual({ runId: "run", text: "" }); + }); + + it("scopes the shared global session by agent so one agent's run is not restored into another", () => { + const controllers = new Map([ + ["run-a", inFlightEntry("global", { agentId: "main" })], + ["run-b", inFlightEntry("global", { agentId: "work" })], + ]); + const buffers = new Map([ + ["run-a", "main agent global text"], + ["run-b", "work agent global text"], + ]); + expect( + snap({ + chatAbortControllers: controllers, + chatRunBuffers: buffers, + sessionKey: "global", + agentId: "work", + }), + ).toEqual({ runId: "run-b", text: "work agent global text" }); + expect( + snap({ + chatAbortControllers: controllers, + chatRunBuffers: buffers, + sessionKey: "global", + agentId: "main", + }), + ).toEqual({ runId: "run-a", text: "main agent global text" }); + }); + + it("resolves bare global history snapshots to the default agent", () => { + const controllers = new Map([ + ["run-main", inFlightEntry("global", { agentId: "main", startedAtMs: 1_000 })], + ["run-work", inFlightEntry("global", { agentId: "work", startedAtMs: 2_000 })], + ]); + const buffers = new Map([ + ["run-main", "main default text"], + ["run-work", "work global text"], + ]); + + expect( + snap({ + chatAbortControllers: controllers, + chatRunBuffers: buffers, + sessionKey: "global", + defaultAgentId: "main", + }), + ).toEqual({ runId: "run-main", text: "main default text" }); + }); + + it("prefers the newest startedAtMs when several runs match the same session+agent", () => { + // A fast restart/retry/stale-controller race can leave two active entries for + // the same key; selection must not depend on Map insertion order. Insert the + // older run first so a first-match selector would return the wrong one. + const controllers = new Map([ + ["run-old", inFlightEntry("agent:main:s", { startedAtMs: 1_000 })], + ["run-new", inFlightEntry("agent:main:s", { startedAtMs: 2_000 })], + ]); + const buffers = new Map([ + ["run-old", "stale partial"], + ["run-new", "current partial"], + ]); + expect( + snap({ + chatAbortControllers: controllers, + chatRunBuffers: buffers, + sessionKey: "agent:main:s", + }), + ).toEqual({ runId: "run-new", text: "current partial" }); + }); + + it("breaks startedAtMs ties deterministically by runId regardless of insertion order", () => { + const buffers = new Map([ + ["run-a", "a"], + ["run-b", "b"], + ]); + const ascending = new Map([ + ["run-a", inFlightEntry("agent:main:s", { startedAtMs: 5_000 })], + ["run-b", inFlightEntry("agent:main:s", { startedAtMs: 5_000 })], + ]); + const descending = new Map([ + ["run-b", inFlightEntry("agent:main:s", { startedAtMs: 5_000 })], + ["run-a", inFlightEntry("agent:main:s", { startedAtMs: 5_000 })], + ]); + // Same winner ("run-b" > "run-a") no matter which order the map was built in. + expect( + snap({ + chatAbortControllers: ascending, + chatRunBuffers: buffers, + sessionKey: "agent:main:s", + }), + ).toEqual({ runId: "run-b", text: "b" }); + expect( + snap({ + chatAbortControllers: descending, + chatRunBuffers: buffers, + sessionKey: "agent:main:s", + }), + ).toEqual({ runId: "run-b", text: "b" }); + }); + + it("keeps in-flight text when it fits the chat history budget", () => { + expect( + boundInFlightRunSnapshotForChatHistory({ + snapshot: { runId: "run-1", text: "partial" }, + messages: [], + maxBytes: 1_000, + }), + ).toEqual({ runId: "run-1", text: "partial" }); + }); + + it("drops oversized in-flight text but keeps the run id for adoption", () => { + expect( + boundInFlightRunSnapshotForChatHistory({ + snapshot: { runId: "run-1", text: "x".repeat(1_000) }, + messages: [], + maxBytes: 100, + }), + ).toEqual({ runId: "run-1", text: "" }); + }); +}); diff --git a/src/gateway/chat-abort.ts b/src/gateway/chat-abort.ts index fb9732e3186..d470eea332d 100644 --- a/src/gateway/chat-abort.ts +++ b/src/gateway/chat-abort.ts @@ -7,6 +7,8 @@ import { resolveDefaultAgentId } from "../agents/agent-scope-config.js"; import { isAbortRequestText } from "../auto-reply/reply/abort-primitives.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { emitAgentEvent } from "../infra/agent-events.js"; +import { jsonUtf8Bytes } from "../infra/json-utf8-bytes.js"; +import { projectLiveAssistantBufferedText } from "./live-chat-projector.js"; const DEFAULT_CHAT_RUN_ABORT_GRACE_MS = 60_000; @@ -162,6 +164,114 @@ function normalizeActiveAgentId(agentId: string | undefined): string | undefined return trimmed || undefined; } +/** + * Snapshot the live assistant text of any in-flight run for a session+agent. Used + * by chat.history so a run that kept streaming while the client was switched away + * — whose deltas the gateway delivered to a delivery key this client is no longer + * subscribed to — is restored on switch-back. + * + * Matches a run the same way sessions.list's active-run projection does: an abort + * entry can hold the requested key while chat run state holds the canonical store + * key, so accept a match on EITHER `requestedSessionKey` or `canonicalSessionKey`, + * scoping the shared "global" session by agent. Only runs still projected active + * (`projectSessionActive !== false`, matching sessions.list; the terminal lifecycle + * flips it to false), not aborted, and visible chat-send runs are returned, so a + * finalized run — already in persisted history — is not duplicated and hidden + * agent runs cannot be adopted by chat clients that will not receive their final + * events. + */ +export function resolveInFlightRunSnapshot(params: { + chatAbortControllers: Map; + chatRunBuffers: Map; + requestedSessionKey: string; + canonicalSessionKey: string; + agentId?: string; + defaultAgentId?: string; +}): { runId: string; text: string } | undefined { + const matchesKey = (entry: ChatAbortControllerEntry, key: string): boolean => { + if (entry.sessionKey !== key) { + return false; + } + if (key !== "global") { + return true; + } + const requestedAgentId = + normalizeActiveAgentId(params.agentId) ?? normalizeActiveAgentId(params.defaultAgentId); + if (!requestedAgentId) { + return false; + } + const runAgentId = + normalizeActiveAgentId(entry.agentId) ?? normalizeActiveAgentId(params.defaultAgentId); + return runAgentId === requestedAgentId; + }; + // Some callers/tests run without populated run state; guard like + // collectTrackedActiveSessionRuns so a missing map is a no-op, not a throw. + if (!(params.chatAbortControllers instanceof Map)) { + return undefined; + } + // Pick the newest matching run rather than the first iterated. If a fast + // restart/retry/stale-controller race leaves two active entries for the same + // (sessionKey, agentId), Map insertion order is not a meaningful selector; + // the latest `startedAtMs` is the run a switching-back client wants, and the + // runId tie-break keeps the choice deterministic when timestamps collide. + let best: { runId: string; startedAtMs: number } | undefined; + for (const [runId, entry] of params.chatAbortControllers) { + // Active unless explicitly projected inactive — mirrors sessions.list's + // collectTrackedActiveSessionRuns (`projectSessionActive !== false`), so a run + // that indicator shows active is never silently dropped here. + if ( + entry.projectSessionActive === false || + entry.controller.signal.aborted || + entry.kind === "agent" + ) { + continue; + } + if ( + !matchesKey(entry, params.requestedSessionKey) && + !matchesKey(entry, params.canonicalSessionKey) + ) { + continue; + } + const newer = best === undefined || entry.startedAtMs > best.startedAtMs; + const tie = best !== undefined && entry.startedAtMs === best.startedAtMs && runId > best.runId; + if (newer || tie) { + best = { runId, startedAtMs: entry.startedAtMs }; + } + } + if (best === undefined) { + return undefined; + } + // Adopt the run even when no assistant text is buffered yet. Some runtimes + // (e.g. Codex) do not stream incremental assistant text — the result exists + // only at completion — so there is nothing to show mid-run, but the client + // should still adopt the run and show a `streaming` status (not idle) and + // render the result cleanly when it lands. + const bufferedText = params.chatRunBuffers?.get(best.runId) ?? ""; + const projected = projectLiveAssistantBufferedText(bufferedText, { + suppressLeadFragments: true, + }); + return { runId: best.runId, text: projected.suppress ? "" : projected.text }; +} + +export function boundInFlightRunSnapshotForChatHistory(params: { + snapshot: { runId: string; text: string } | undefined; + messages: unknown[]; + maxBytes: number; +}): { runId: string; text: string } | undefined { + if (!params.snapshot?.text) { + return params.snapshot; + } + const messagesBytes = jsonUtf8Bytes(params.messages); + const snapshotBytes = jsonUtf8Bytes(params.snapshot); + if (messagesBytes + snapshotBytes <= params.maxBytes) { + return params.snapshot; + } + // The run id is the recovery contract; buffered partial text is opportunistic. + // If it would break the history payload budget, keep adoption and wait for the + // next live delta/final instead of sending an oversized chat.history response. + return { runId: params.snapshot.runId, text: "" }; +} + export type ChatAbortOps = { chatAbortControllers: Map; chatRunBuffers: Map; diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 27110dd87d4..04457ef5397 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -99,10 +99,12 @@ import { } from "../../utils/message-channel.js"; import { abortChatRunById, + boundInFlightRunSnapshotForChatHistory, type ChatAbortControllerEntry, type ChatAbortOps, isChatStopCommandText, registerChatAbortController, + resolveInFlightRunSnapshot, updateChatRunProvider, } from "../chat-abort.js"; import { @@ -2531,19 +2533,36 @@ export const chatHandlers: GatewayRequestHandlers = { agentId: selectedAgent.agentId, modelCatalog, }); + const defaultAgentId = resolveDefaultAgentId(cfg); + const activeRunAgentId = + canonicalKey === "global" ? (selectedAgent.agentId ?? defaultAgentId) : selectedAgent.agentId; sessionInfo.hasActiveRun = hasTrackedActiveSessionRun({ context, requestedKey: sessionKey, canonicalKey, - ...(canonicalKey === "global" && selectedAgent.agentId - ? { agentId: selectedAgent.agentId } - : {}), - defaultAgentId: resolveDefaultAgentId(cfg), + ...(activeRunAgentId ? { agentId: activeRunAgentId } : {}), + defaultAgentId, }); const defaults = getSessionDefaults(cfg, modelCatalog, { allowPluginNormalization: false }); const thinkingLevel = sessionInfo.thinkingLevel ?? sessionInfo.thinkingDefault; const verboseLevel = entry?.verboseLevel ?? cfg.agents?.defaults?.verboseDefault; sessionInfo.verboseLevel = verboseLevel; + // Surface any run still streaming for this session+agent so a client that + // switched away (and stopped receiving the run's per-agent-delivered events) + // can restore the in-flight assistant text on switch-back. + const inFlightRun = resolveInFlightRunSnapshot({ + chatAbortControllers: context.chatAbortControllers, + chatRunBuffers: context.chatRunBuffers, + requestedSessionKey: sessionKey, + canonicalSessionKey: resolveSessionStoreKey({ cfg, sessionKey }), + agentId: activeRunAgentId, + defaultAgentId, + }); + const boundedInFlightRun = boundInFlightRunSnapshotForChatHistory({ + snapshot: inFlightRun, + messages: bounded.messages, + maxBytes: maxHistoryBytes, + }); respond(true, { sessionKey, sessionId, @@ -2553,6 +2572,7 @@ export const chatHandlers: GatewayRequestHandlers = { thinkingLevel, fastMode: entry?.fastMode, verboseLevel, + ...(boundedInFlightRun ? { inFlightRun: boundedInFlightRun } : {}), }); }, "chat.message.get": async ({ params, respond, context }) => { diff --git a/src/tui/tui-session-actions.test.ts b/src/tui/tui-session-actions.test.ts index 5832daea6fa..37303fa00ce 100644 --- a/src/tui/tui-session-actions.test.ts +++ b/src/tui/tui-session-actions.test.ts @@ -489,6 +489,99 @@ describe("tui session actions", () => { expect(listSessions).not.toHaveBeenCalled(); }); + it("restores an in-flight run reported by chat.history on switch-back", async () => { + const loadHistory = vi.fn().mockResolvedValue({ + sessionId: "session-bg", + messages: [], + inFlightRun: { runId: "run-bg", text: "still working in the background" }, + }); + const updateAssistant = vi.fn(); + const setActivityStatus = vi.fn(); + const chatLog = { + addSystem: vi.fn(), + clearAll: vi.fn(), + addUser: vi.fn(), + finalizeAssistant: vi.fn(), + updateAssistant, + startTool: vi.fn(), + } as unknown as import("./components/chat-log.js").ChatLog; + const state = createBaseState({ currentSessionKey: "agent:main:other" }); + + const { setSession } = createTestSessionActions({ + client: { listSessions: vi.fn(), loadHistory } as unknown as TuiBackend, + chatLog, + state, + setActivityStatus, + }); + + await setSession("agent:main:main"); + + expect(updateAssistant).toHaveBeenCalledWith("still working in the background", "run-bg"); + expect(state.activeChatRunId).toBe("run-bg"); + expect(setActivityStatus).toHaveBeenLastCalledWith("streaming"); + }); + + it("adopts an in-flight run with no buffered text (Codex) and shows streaming", async () => { + const loadHistory = vi.fn().mockResolvedValue({ + sessionId: "session-bg", + messages: [], + inFlightRun: { runId: "run-bg", text: "" }, + }); + const updateAssistant = vi.fn(); + const setActivityStatus = vi.fn(); + const chatLog = { + addSystem: vi.fn(), + clearAll: vi.fn(), + addUser: vi.fn(), + finalizeAssistant: vi.fn(), + updateAssistant, + startTool: vi.fn(), + } as unknown as import("./components/chat-log.js").ChatLog; + const state = createBaseState({ currentSessionKey: "agent:main:other" }); + + const { setSession } = createTestSessionActions({ + client: { listSessions: vi.fn(), loadHistory } as unknown as TuiBackend, + chatLog, + state, + setActivityStatus, + }); + + await setSession("agent:main:main"); + + // No partial bubble (none exists), but the run is adopted and shows streaming. + expect(updateAssistant).not.toHaveBeenCalled(); + expect(state.activeChatRunId).toBe("run-bg"); + expect(setActivityStatus).toHaveBeenLastCalledWith("streaming"); + }); + + it("stays idle when chat.history reports no in-flight run", async () => { + const loadHistory = vi.fn().mockResolvedValue({ sessionId: "session-x", messages: [] }); + const updateAssistant = vi.fn(); + const setActivityStatus = vi.fn(); + const chatLog = { + addSystem: vi.fn(), + clearAll: vi.fn(), + addUser: vi.fn(), + finalizeAssistant: vi.fn(), + updateAssistant, + startTool: vi.fn(), + } as unknown as import("./components/chat-log.js").ChatLog; + const state = createBaseState({ currentSessionKey: "agent:main:other" }); + + const { setSession } = createTestSessionActions({ + client: { listSessions: vi.fn(), loadHistory } as unknown as TuiBackend, + chatLog, + state, + setActivityStatus, + }); + + await setSession("agent:main:main"); + + expect(updateAssistant).not.toHaveBeenCalled(); + expect(state.activeChatRunId).toBeNull(); + expect(setActivityStatus).toHaveBeenLastCalledWith("idle"); + }); + it("applies default model info when the current session has no persisted entry yet", async () => { const listSessions = vi.fn().mockResolvedValue({ ts: Date.now(), diff --git a/src/tui/tui-session-actions.ts b/src/tui/tui-session-actions.ts index 4d5fc021e9a..4061269359e 100644 --- a/src/tui/tui-session-actions.ts +++ b/src/tui/tui-session-actions.ts @@ -415,6 +415,7 @@ export function createSessionActions(context: SessionActionContext) { fastMode?: boolean; verboseLevel?: string; traceLevel?: string; + inFlightRun?: { runId?: unknown; text?: unknown }; }; const sessionInfo = record.sessionInfo; if (sessionInfo?.key && sessionInfo.key !== state.currentSessionKey) { @@ -499,6 +500,24 @@ export function createSessionActions(context: SessionActionContext) { ); } } + // Restore a run still streaming for this session+agent that the gateway + // reports as in-flight. Its live deltas were delivered to a per-agent key + // we stopped watching after switching away, so the persisted history above + // does not contain it; render the partial and re-adopt the run so further + // deltas (now that this session is active again) continue it. + const inFlight = record.inFlightRun; + const inFlightRunId = asString(inFlight?.runId, ""); + const inFlightText = asString(inFlight?.text, ""); + if (inFlightRunId) { + // Render any buffered partial (embedded runtimes); Codex has none mid-run. + if (inFlightText) { + chatLog.updateAssistant(inFlightText, inFlightRunId); + } + // Adopt the run regardless so its status shows `streaming` (not idle) and + // its completion is handled here instead of an unowned error path. + state.activeChatRunId = inFlightRunId; + setActivityStatus("streaming"); + } state.historyLoaded = true; void rememberSessionKey?.(state.currentSessionKey); } catch (err) {