diff --git a/CHANGELOG.md b/CHANGELOG.md index 0859cdaf61f..741fc77a9a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -340,6 +340,7 @@ Docs: https://docs.openclaw.ai - Configure/Gateway: reject literal `"undefined"`/`"null"` token input and validate gateway password prompt values to avoid invalid password-mode configs. (#13767) Thanks @omair445. - Gateway: handle async `EPIPE` on stdout/stderr during shutdown. (#13414) Thanks @keshav55. - Gateway/Control UI: resolve missing dashboard assets when `openclaw` is installed globally via symlink-based Node managers (nvm/fnm/n/Homebrew). (#14919) Thanks @aynorica. +- Gateway/Control UI: keep partial assistant output visible when runs are aborted, and persist aborted partials to session transcripts for follow-up context. - Cron: use requested `agentId` for isolated job auth resolution. (#13983) Thanks @0xRaini. - Cron: prevent cron jobs from skipping execution when `nextRunAtMs` advances. (#14068) Thanks @WalterSumbon. - Cron: pass `agentId` to `runHeartbeatOnce` for main-session jobs. (#14140) Thanks @ishikawa-pro. diff --git a/docs/web/control-ui.md b/docs/web/control-ui.md index 1c6e5ea57c5..a0c9037cb07 100644 --- a/docs/web/control-ui.md +++ b/docs/web/control-ui.md @@ -96,6 +96,10 @@ Cron jobs panel notes: - Click **Stop** (calls `chat.abort`) - Type `/stop` (or `stop|esc|abort|wait|exit|interrupt`) to abort out-of-band - `chat.abort` supports `{ sessionKey }` (no `runId`) to abort all active runs for that session +- Abort partial retention: + - When a run is aborted, partial assistant text can still be shown in the UI + - Gateway persists aborted partial assistant text into transcript history when buffered output exists + - Persisted entries include abort metadata so transcript consumers can tell abort partials from normal completion output ## Tailnet access (recommended) diff --git a/docs/web/webchat.md b/docs/web/webchat.md index a765f67598a..657e00ef8b2 100644 --- a/docs/web/webchat.md +++ b/docs/web/webchat.md @@ -25,6 +25,8 @@ Status: the macOS/iOS SwiftUI chat UI talks directly to the Gateway WebSocket. - The UI connects to the Gateway WebSocket and uses `chat.history`, `chat.send`, and `chat.inject`. - `chat.inject` appends an assistant note directly to the transcript and broadcasts it to the UI (no agent run). +- Aborted runs can keep partial assistant output visible in the UI. +- Gateway persists aborted partial assistant text into transcript history when buffered output exists, and marks those entries with abort metadata. - History is always fetched from the gateway (no local file watching). - If the gateway is unreachable, WebChat is read-only. diff --git a/src/gateway/chat-abort.test.ts b/src/gateway/chat-abort.test.ts new file mode 100644 index 00000000000..9829f45c999 --- /dev/null +++ b/src/gateway/chat-abort.test.ts @@ -0,0 +1,122 @@ +import { describe, expect, it, vi } from "vitest"; +import { + abortChatRunById, + type ChatAbortOps, + type ChatAbortControllerEntry, +} from "./chat-abort.js"; + +function createActiveEntry(sessionKey: string): ChatAbortControllerEntry { + const now = Date.now(); + return { + controller: new AbortController(), + sessionId: "sess-1", + sessionKey, + startedAtMs: now, + expiresAtMs: now + 10_000, + }; +} + +function createOps(params: { + runId: string; + entry: ChatAbortControllerEntry; + buffer?: string; +}): ChatAbortOps & { + broadcast: ReturnType; + nodeSendToSession: ReturnType; + removeChatRun: ReturnType; +} { + const { runId, entry, buffer } = params; + const broadcast = vi.fn(); + const nodeSendToSession = vi.fn(); + const removeChatRun = vi.fn(); + + return { + chatAbortControllers: new Map([[runId, entry]]), + chatRunBuffers: new Map(buffer !== undefined ? [[runId, buffer]] : []), + chatDeltaSentAt: new Map([[runId, Date.now()]]), + chatAbortedRuns: new Map(), + removeChatRun, + agentRunSeq: new Map(), + broadcast, + nodeSendToSession, + }; +} + +describe("abortChatRunById", () => { + it("broadcasts aborted payload with partial message when buffered text exists", () => { + const runId = "run-1"; + const sessionKey = "main"; + const entry = createActiveEntry(sessionKey); + const ops = createOps({ runId, entry, buffer: " Partial reply " }); + ops.agentRunSeq.set(runId, 2); + ops.agentRunSeq.set("client-run-1", 4); + ops.removeChatRun.mockReturnValue({ sessionKey, clientRunId: "client-run-1" }); + + const result = abortChatRunById(ops, { runId, sessionKey, stopReason: "user" }); + + expect(result).toEqual({ aborted: true }); + expect(entry.controller.signal.aborted).toBe(true); + expect(ops.chatAbortControllers.has(runId)).toBe(false); + expect(ops.chatRunBuffers.has(runId)).toBe(false); + expect(ops.chatDeltaSentAt.has(runId)).toBe(false); + expect(ops.removeChatRun).toHaveBeenCalledWith(runId, runId, sessionKey); + expect(ops.agentRunSeq.has(runId)).toBe(false); + expect(ops.agentRunSeq.has("client-run-1")).toBe(false); + + expect(ops.broadcast).toHaveBeenCalledTimes(1); + const payload = ops.broadcast.mock.calls[0]?.[1] as Record; + expect(payload).toEqual( + expect.objectContaining({ + runId, + sessionKey, + seq: 3, + state: "aborted", + stopReason: "user", + }), + ); + expect(payload.message).toEqual( + expect.objectContaining({ + role: "assistant", + content: [{ type: "text", text: " Partial reply " }], + }), + ); + expect((payload.message as { timestamp?: unknown }).timestamp).toEqual(expect.any(Number)); + expect(ops.nodeSendToSession).toHaveBeenCalledWith(sessionKey, "chat", payload); + }); + + it("omits aborted message when buffered text is empty", () => { + const runId = "run-1"; + const sessionKey = "main"; + const entry = createActiveEntry(sessionKey); + const ops = createOps({ runId, entry, buffer: " " }); + + const result = abortChatRunById(ops, { runId, sessionKey }); + + expect(result).toEqual({ aborted: true }); + const payload = ops.broadcast.mock.calls[0]?.[1] as Record; + expect(payload.message).toBeUndefined(); + }); + + it("preserves partial message even when abort listeners clear buffers synchronously", () => { + const runId = "run-1"; + const sessionKey = "main"; + const entry = createActiveEntry(sessionKey); + const ops = createOps({ runId, entry, buffer: "streamed text" }); + + // Simulate synchronous cleanup triggered by AbortController listeners. + entry.controller.signal.addEventListener("abort", () => { + ops.chatRunBuffers.delete(runId); + }); + + const result = abortChatRunById(ops, { runId, sessionKey }); + + expect(result).toEqual({ aborted: true }); + const payload = ops.broadcast.mock.calls[0]?.[1] as Record; + expect(payload.message).toEqual( + expect.objectContaining({ + role: "assistant", + content: [{ type: "text", text: "streamed text" }], + }), + ); + }); +}); diff --git a/src/gateway/chat-abort.ts b/src/gateway/chat-abort.ts index 12c47f5b189..0d544324133 100644 --- a/src/gateway/chat-abort.ts +++ b/src/gateway/chat-abort.ts @@ -52,15 +52,23 @@ function broadcastChatAborted( runId: string; sessionKey: string; stopReason?: string; + partialText?: string; }, ) { - const { runId, sessionKey, stopReason } = params; + const { runId, sessionKey, stopReason, partialText } = params; const payload = { runId, sessionKey, seq: (ops.agentRunSeq.get(runId) ?? 0) + 1, state: "aborted" as const, stopReason, + message: partialText + ? { + role: "assistant", + content: [{ type: "text", text: partialText }], + timestamp: Date.now(), + } + : undefined, }; ops.broadcast("chat", payload); ops.nodeSendToSession(sessionKey, "chat", payload); @@ -83,13 +91,15 @@ export function abortChatRunById( return { aborted: false }; } + const bufferedText = ops.chatRunBuffers.get(runId); + const partialText = bufferedText && bufferedText.trim() ? bufferedText : undefined; ops.chatAbortedRuns.set(runId, Date.now()); active.controller.abort(); ops.chatAbortControllers.delete(runId); ops.chatRunBuffers.delete(runId); ops.chatDeltaSentAt.delete(runId); const removed = ops.removeChatRun(runId, runId, sessionKey); - broadcastChatAborted(ops, { runId, sessionKey, stopReason }); + broadcastChatAborted(ops, { runId, sessionKey, stopReason, partialText }); ops.agentRunSeq.delete(runId); if (removed?.clientRunId) { ops.agentRunSeq.delete(removed.clientRunId); diff --git a/src/gateway/server-methods/chat.abort-persistence.test.ts b/src/gateway/server-methods/chat.abort-persistence.test.ts new file mode 100644 index 00000000000..cdf2dba83b0 --- /dev/null +++ b/src/gateway/server-methods/chat.abort-persistence.test.ts @@ -0,0 +1,252 @@ +import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent"; +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, it, vi } from "vitest"; + +type TranscriptLine = { + message?: Record; +}; + +function createActiveRun(sessionKey: string, sessionId: string) { + const now = Date.now(); + return { + controller: new AbortController(), + sessionId, + sessionKey, + startedAtMs: now, + expiresAtMs: now + 30_000, + }; +} + +async function writeTranscriptHeader(transcriptPath: string, sessionId: string) { + const header = { + type: "session", + version: CURRENT_SESSION_VERSION, + id: sessionId, + timestamp: new Date(0).toISOString(), + cwd: "/tmp", + }; + await fs.writeFile(transcriptPath, `${JSON.stringify(header)}\n`, "utf-8"); +} + +async function readTranscriptLines(transcriptPath: string): Promise { + const raw = await fs.readFile(transcriptPath, "utf-8"); + return raw + .split(/\r?\n/) + .filter((line) => line.trim().length > 0) + .map((line) => { + try { + return JSON.parse(line) as TranscriptLine; + } catch { + return {}; + } + }); +} + +async function importChatHandlersWithSession(transcriptPath: string, sessionId: string) { + vi.resetModules(); + vi.doMock("../session-utils.js", async (importOriginal) => { + const original = await importOriginal(); + return { + ...original, + loadSessionEntry: () => ({ + cfg: {}, + storePath: path.join(path.dirname(transcriptPath), "sessions.json"), + entry: { + sessionId, + sessionFile: transcriptPath, + }, + canonicalKey: "main", + }), + }; + }); + return import("./chat.js"); +} + +afterEach(() => { + vi.restoreAllMocks(); + vi.resetModules(); +}); + +describe("chat abort transcript persistence", () => { + it("persists run-scoped abort partial with rpc metadata and idempotency", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-chat-abort-run-")); + const transcriptPath = path.join(dir, "sess-main.jsonl"); + const sessionId = "sess-main"; + const runId = "idem-abort-run-1"; + await writeTranscriptHeader(transcriptPath, sessionId); + + const { chatHandlers } = await importChatHandlersWithSession(transcriptPath, sessionId); + const respond = vi.fn(); + const context = { + chatAbortControllers: new Map([[runId, createActiveRun("main", sessionId)]]), + chatRunBuffers: new Map([[runId, "Partial from run abort"]]), + chatDeltaSentAt: new Map([[runId, Date.now()]]), + chatAbortedRuns: new Map(), + removeChatRun: vi + .fn() + .mockReturnValue({ sessionKey: "main", clientRunId: "client-idem-abort-run-1" }), + agentRunSeq: new Map([ + [runId, 2], + ["client-idem-abort-run-1", 3], + ]), + broadcast: vi.fn(), + nodeSendToSession: vi.fn(), + logGateway: { warn: vi.fn() }, + }; + + await chatHandlers["chat.abort"]({ + params: { sessionKey: "main", runId }, + respond, + context: context as never, + }); + + const [ok1, payload1] = respond.mock.calls.at(-1) ?? []; + expect(ok1).toBe(true); + expect(payload1).toMatchObject({ aborted: true, runIds: [runId] }); + + context.chatAbortControllers.set(runId, createActiveRun("main", sessionId)); + context.chatRunBuffers.set(runId, "Partial from run abort"); + context.chatDeltaSentAt.set(runId, Date.now()); + + await chatHandlers["chat.abort"]({ + params: { sessionKey: "main", runId }, + respond, + context: context as never, + }); + + const lines = await readTranscriptLines(transcriptPath); + const persisted = lines + .map((line) => line.message) + .filter( + (message): message is Record => + Boolean(message) && message?.idempotencyKey === `${runId}:assistant`, + ); + + expect(persisted).toHaveLength(1); + expect(persisted[0]).toMatchObject({ + stopReason: "stop", + idempotencyKey: `${runId}:assistant`, + openclawAbort: { + aborted: true, + origin: "rpc", + runId, + }, + }); + }); + + it("persists session-scoped abort partials with rpc metadata", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-chat-abort-session-")); + const transcriptPath = path.join(dir, "sess-main.jsonl"); + const sessionId = "sess-main"; + await writeTranscriptHeader(transcriptPath, sessionId); + + const { chatHandlers } = await importChatHandlersWithSession(transcriptPath, sessionId); + const respond = vi.fn(); + const context = { + chatAbortControllers: new Map([ + ["run-a", createActiveRun("main", sessionId)], + ["run-b", createActiveRun("main", sessionId)], + ]), + chatRunBuffers: new Map([ + ["run-a", "Session abort partial"], + ["run-b", " "], + ]), + chatDeltaSentAt: new Map([ + ["run-a", Date.now()], + ["run-b", Date.now()], + ]), + chatAbortedRuns: new Map(), + removeChatRun: vi + .fn() + .mockImplementation((run: string) => ({ sessionKey: "main", clientRunId: run })), + agentRunSeq: new Map(), + broadcast: vi.fn(), + nodeSendToSession: vi.fn(), + logGateway: { warn: vi.fn() }, + }; + + await chatHandlers["chat.abort"]({ + params: { sessionKey: "main" }, + respond, + context: context as never, + }); + + const [ok, payload] = respond.mock.calls.at(-1) ?? []; + expect(ok).toBe(true); + expect(payload).toMatchObject({ aborted: true }); + expect(payload.runIds).toEqual(expect.arrayContaining(["run-a", "run-b"])); + + const lines = await readTranscriptLines(transcriptPath); + const runAPersisted = lines + .map((line) => line.message) + .find((message) => message?.idempotencyKey === "run-a:assistant"); + const runBPersisted = lines + .map((line) => line.message) + .find((message) => message?.idempotencyKey === "run-b:assistant"); + + expect(runAPersisted).toMatchObject({ + idempotencyKey: "run-a:assistant", + openclawAbort: { + aborted: true, + origin: "rpc", + runId: "run-a", + }, + }); + expect(runBPersisted).toBeUndefined(); + }); + + it("persists /stop partials with stop-command metadata", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-chat-stop-")); + const transcriptPath = path.join(dir, "sess-main.jsonl"); + const sessionId = "sess-main"; + await writeTranscriptHeader(transcriptPath, sessionId); + + const { chatHandlers } = await importChatHandlersWithSession(transcriptPath, sessionId); + const respond = vi.fn(); + const context = { + chatAbortControllers: new Map([["run-stop-1", createActiveRun("main", sessionId)]]), + chatRunBuffers: new Map([["run-stop-1", "Partial from /stop"]]), + chatDeltaSentAt: new Map([["run-stop-1", Date.now()]]), + chatAbortedRuns: new Map(), + removeChatRun: vi.fn().mockReturnValue({ sessionKey: "main", clientRunId: "client-stop-1" }), + agentRunSeq: new Map([["run-stop-1", 1]]), + broadcast: vi.fn(), + nodeSendToSession: vi.fn(), + logGateway: { warn: vi.fn() }, + dedupe: { + get: vi.fn(), + }, + }; + + await chatHandlers["chat.send"]({ + params: { + sessionKey: "main", + message: "/stop", + idempotencyKey: "idem-stop-req", + }, + respond, + context: context as never, + client: undefined, + }); + + const [ok, payload] = respond.mock.calls.at(-1) ?? []; + expect(ok).toBe(true); + expect(payload).toMatchObject({ aborted: true, runIds: ["run-stop-1"] }); + + const lines = await readTranscriptLines(transcriptPath); + const persisted = lines + .map((line) => line.message) + .find((message) => message?.idempotencyKey === "run-stop-1:assistant"); + + expect(persisted).toMatchObject({ + idempotencyKey: "run-stop-1:assistant", + openclawAbort: { + aborted: true, + origin: "stop-command", + runId: "run-stop-1", + }, + }); + }); +}); diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 52225dc0bb7..7d6c74a1505 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -15,6 +15,7 @@ import { INTERNAL_MESSAGE_CHANNEL } from "../../utils/message-channel.js"; import { abortChatRunById, abortChatRunsForSessionKey, + type ChatAbortControllerEntry, isChatStopCommandText, resolveChatRunExpiresAtMs, } from "../chat-abort.js"; @@ -49,6 +50,14 @@ type TranscriptAppendResult = { }; type AppendMessageArg = Parameters[0]; +type AbortOrigin = "rpc" | "stop-command"; + +type AbortedPartialSnapshot = { + runId: string; + sessionId: string; + text: string; + abortOrigin: AbortOrigin; +}; function stripDisallowedChatControlChars(message: string): string { let output = ""; @@ -116,6 +125,24 @@ function ensureTranscriptFile(params: { transcriptPath: string; sessionId: strin } } +function transcriptHasIdempotencyKey(transcriptPath: string, idempotencyKey: string): boolean { + try { + const lines = fs.readFileSync(transcriptPath, "utf-8").split(/\r?\n/); + for (const line of lines) { + if (!line.trim()) { + continue; + } + const parsed = JSON.parse(line) as { message?: { idempotencyKey?: unknown } }; + if (parsed?.message?.idempotencyKey === idempotencyKey) { + return true; + } + } + return false; + } catch { + return false; + } +} + function appendAssistantTranscriptMessage(params: { message: string; label?: string; @@ -124,6 +151,12 @@ function appendAssistantTranscriptMessage(params: { sessionFile?: string; agentId?: string; createIfMissing?: boolean; + idempotencyKey?: string; + abortMeta?: { + aborted: true; + origin: AbortOrigin; + runId: string; + }; }): TranscriptAppendResult { const transcriptPath = resolveTranscriptPath({ sessionId: params.sessionId, @@ -148,6 +181,10 @@ function appendAssistantTranscriptMessage(params: { } } + if (params.idempotencyKey && transcriptHasIdempotencyKey(transcriptPath, params.idempotencyKey)) { + return { ok: true }; + } + const now = Date.now(); const labelPrefix = params.label ? `[${params.label}]\n\n` : ""; const usage = { @@ -176,6 +213,16 @@ function appendAssistantTranscriptMessage(params: { api: "openai-responses", provider: "openclaw", model: "gateway-injected", + ...(params.idempotencyKey ? { idempotencyKey: params.idempotencyKey } : {}), + ...(params.abortMeta + ? { + openclawAbort: { + aborted: true, + origin: params.abortMeta.origin, + runId: params.abortMeta.runId, + }, + } + : {}), }; try { @@ -189,6 +236,63 @@ function appendAssistantTranscriptMessage(params: { } } +function collectSessionAbortPartials(params: { + chatAbortControllers: Map; + chatRunBuffers: Map; + sessionKey: string; + abortOrigin: AbortOrigin; +}): AbortedPartialSnapshot[] { + const out: AbortedPartialSnapshot[] = []; + for (const [runId, active] of params.chatAbortControllers) { + if (active.sessionKey !== params.sessionKey) { + continue; + } + const text = params.chatRunBuffers.get(runId); + if (!text || !text.trim()) { + continue; + } + out.push({ + runId, + sessionId: active.sessionId, + text, + abortOrigin: params.abortOrigin, + }); + } + return out; +} + +function persistAbortedPartials(params: { + context: Pick; + sessionKey: string; + snapshots: AbortedPartialSnapshot[]; +}) { + if (params.snapshots.length === 0) { + return; + } + const { storePath, entry } = loadSessionEntry(params.sessionKey); + for (const snapshot of params.snapshots) { + const sessionId = entry?.sessionId ?? snapshot.sessionId ?? snapshot.runId; + const appended = appendAssistantTranscriptMessage({ + message: snapshot.text, + sessionId, + storePath, + sessionFile: entry?.sessionFile, + createIfMissing: true, + idempotencyKey: `${snapshot.runId}:assistant`, + abortMeta: { + aborted: true, + origin: snapshot.abortOrigin, + runId: snapshot.runId, + }, + }); + if (!appended.ok) { + params.context.logGateway.warn( + `chat.abort transcript append failed: ${appended.error ?? "unknown error"}`, + ); + } + } +} + function nextChatSeq(context: { agentRunSeq: Map }, runId: string) { const next = (context.agentRunSeq.get(runId) ?? 0) + 1; context.agentRunSeq.set(runId, next); @@ -299,7 +403,7 @@ export const chatHandlers: GatewayRequestHandlers = { ); return; } - const { sessionKey, runId } = params as { + const { sessionKey: rawSessionKey, runId } = params as { sessionKey: string; runId?: string; }; @@ -316,10 +420,23 @@ export const chatHandlers: GatewayRequestHandlers = { }; if (!runId) { + const snapshots = collectSessionAbortPartials({ + chatAbortControllers: context.chatAbortControllers, + chatRunBuffers: context.chatRunBuffers, + sessionKey: rawSessionKey, + abortOrigin: "rpc", + }); const res = abortChatRunsForSessionKey(ops, { - sessionKey, + sessionKey: rawSessionKey, stopReason: "rpc", }); + if (res.aborted) { + persistAbortedPartials({ + context, + sessionKey: rawSessionKey, + snapshots, + }); + } respond(true, { ok: true, aborted: res.aborted, runIds: res.runIds }); return; } @@ -329,7 +446,7 @@ export const chatHandlers: GatewayRequestHandlers = { respond(true, { ok: true, aborted: false, runIds: [] }); return; } - if (active.sessionKey !== sessionKey) { + if (active.sessionKey !== rawSessionKey) { respond( false, undefined, @@ -338,11 +455,26 @@ export const chatHandlers: GatewayRequestHandlers = { return; } + const partialText = context.chatRunBuffers.get(runId); const res = abortChatRunById(ops, { runId, - sessionKey, + sessionKey: rawSessionKey, stopReason: "rpc", }); + if (res.aborted && partialText && partialText.trim()) { + persistAbortedPartials({ + context, + sessionKey: rawSessionKey, + snapshots: [ + { + runId, + sessionId: active.sessionId, + text: partialText, + abortOrigin: "rpc", + }, + ], + }); + } respond(true, { ok: true, aborted: res.aborted, @@ -437,6 +569,12 @@ export const chatHandlers: GatewayRequestHandlers = { } if (stopCommand) { + const snapshots = collectSessionAbortPartials({ + chatAbortControllers: context.chatAbortControllers, + chatRunBuffers: context.chatRunBuffers, + sessionKey: rawSessionKey, + abortOrigin: "stop-command", + }); const res = abortChatRunsForSessionKey( { chatAbortControllers: context.chatAbortControllers, @@ -450,6 +588,13 @@ export const chatHandlers: GatewayRequestHandlers = { }, { sessionKey: rawSessionKey, stopReason: "stop" }, ); + if (res.aborted) { + persistAbortedPartials({ + context, + sessionKey: rawSessionKey, + snapshots, + }); + } respond(true, { ok: true, aborted: res.aborted, runIds: res.runIds }); return; } diff --git a/ui/src/ui/controllers/chat.test.ts b/ui/src/ui/controllers/chat.test.ts index b99c38cae1d..2989092ae3b 100644 --- a/ui/src/ui/controllers/chat.test.ts +++ b/ui/src/ui/controllers/chat.test.ts @@ -92,4 +92,125 @@ describe("handleChatEvent", () => { expect(state.chatStream).toBe(null); expect(state.chatStreamStartedAt).toBe(null); }); + + it("processes aborted from own run and keeps partial assistant message", () => { + const existingMessage = { + role: "user", + content: [{ type: "text", text: "Hi" }], + timestamp: 1, + }; + const partialMessage = { + role: "assistant", + content: [{ type: "text", text: "Partial reply" }], + timestamp: 2, + }; + const state = createState({ + sessionKey: "main", + chatRunId: "run-1", + chatStream: "Partial reply", + chatStreamStartedAt: 100, + chatMessages: [existingMessage], + }); + const payload: ChatEventPayload = { + runId: "run-1", + sessionKey: "main", + state: "aborted", + message: partialMessage, + }; + + expect(handleChatEvent(state, payload)).toBe("aborted"); + expect(state.chatRunId).toBe(null); + expect(state.chatStream).toBe(null); + expect(state.chatStreamStartedAt).toBe(null); + expect(state.chatMessages).toEqual([existingMessage, partialMessage]); + }); + + it("falls back to streamed partial when aborted payload message is invalid", () => { + const existingMessage = { + role: "user", + content: [{ type: "text", text: "Hi" }], + timestamp: 1, + }; + const state = createState({ + sessionKey: "main", + chatRunId: "run-1", + chatStream: "Partial reply", + chatStreamStartedAt: 100, + chatMessages: [existingMessage], + }); + const payload = { + runId: "run-1", + sessionKey: "main", + state: "aborted", + message: "not-an-assistant-message", + } as unknown as ChatEventPayload; + + expect(handleChatEvent(state, payload)).toBe("aborted"); + expect(state.chatRunId).toBe(null); + expect(state.chatStream).toBe(null); + expect(state.chatStreamStartedAt).toBe(null); + expect(state.chatMessages).toHaveLength(2); + expect(state.chatMessages[0]).toEqual(existingMessage); + expect(state.chatMessages[1]).toMatchObject({ + role: "assistant", + content: [{ type: "text", text: "Partial reply" }], + }); + }); + + it("falls back to streamed partial when aborted payload has non-assistant role", () => { + const existingMessage = { + role: "user", + content: [{ type: "text", text: "Hi" }], + timestamp: 1, + }; + const state = createState({ + sessionKey: "main", + chatRunId: "run-1", + chatStream: "Partial reply", + chatStreamStartedAt: 100, + chatMessages: [existingMessage], + }); + const payload: ChatEventPayload = { + runId: "run-1", + sessionKey: "main", + state: "aborted", + message: { + role: "user", + content: [{ type: "text", text: "unexpected" }], + }, + }; + + expect(handleChatEvent(state, payload)).toBe("aborted"); + expect(state.chatMessages).toHaveLength(2); + expect(state.chatMessages[1]).toMatchObject({ + role: "assistant", + content: [{ type: "text", text: "Partial reply" }], + }); + }); + + it("processes aborted from own run without message and empty stream", () => { + const existingMessage = { + role: "user", + content: [{ type: "text", text: "Hi" }], + timestamp: 1, + }; + const state = createState({ + sessionKey: "main", + chatRunId: "run-1", + chatStream: "", + chatStreamStartedAt: 100, + chatMessages: [existingMessage], + }); + const payload: ChatEventPayload = { + runId: "run-1", + sessionKey: "main", + state: "aborted", + }; + + expect(handleChatEvent(state, payload)).toBe("aborted"); + expect(state.chatRunId).toBe(null); + expect(state.chatStream).toBe(null); + expect(state.chatStreamStartedAt).toBe(null); + expect(state.chatMessages).toEqual([existingMessage]); + }); }); diff --git a/ui/src/ui/controllers/chat.ts b/ui/src/ui/controllers/chat.ts index 127e03dd4d2..4990b6a7aa3 100644 --- a/ui/src/ui/controllers/chat.ts +++ b/ui/src/ui/controllers/chat.ts @@ -58,6 +58,20 @@ function dataUrlToBase64(dataUrl: string): { content: string; mimeType: string } return { mimeType: match[1], content: match[2] }; } +function normalizeAbortedAssistantMessage(message: unknown): Record | null { + if (!message || typeof message !== "object") { + return null; + } + const candidate = message as Record; + if (candidate.role !== "assistant") { + return null; + } + if (!("content" in candidate) || !Array.isArray(candidate.content)) { + return null; + } + return candidate; +} + export async function sendChatMessage( state: ChatState, message: string, @@ -198,6 +212,22 @@ export function handleChatEvent(state: ChatState, payload?: ChatEventPayload) { state.chatRunId = null; state.chatStreamStartedAt = null; } else if (payload.state === "aborted") { + const normalizedMessage = normalizeAbortedAssistantMessage(payload.message); + if (normalizedMessage) { + state.chatMessages = [...state.chatMessages, normalizedMessage]; + } else { + const streamedText = state.chatStream ?? ""; + if (streamedText.trim()) { + state.chatMessages = [ + ...state.chatMessages, + { + role: "assistant", + content: [{ type: "text", text: streamedText }], + timestamp: Date.now(), + }, + ]; + } + } state.chatStream = null; state.chatRunId = null; state.chatStreamStartedAt = null;