diff --git a/CHANGELOG.md b/CHANGELOG.md index c54ac1d38b5..5fcc14a08f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -78,6 +78,7 @@ Docs: https://docs.openclaw.ai - Plugins/update: skip ClawHub and marketplace plugin updates when the bundled version is newer than the recorded installed version, so `openclaw update` no longer overwrites working bundled plugins with older external packages. Fixes #75447. Thanks @amknight. - Gateway/sessions: use bounded tail reads for sessions-list transcript usage fallbacks and cap bulk title/last-message hydration, keeping large session stores responsive when rows request derived previews. Thanks @vincentkoc. - Gateway/sessions: yield during bulk transcript title/preview hydration and copy compaction checkpoints asynchronously, keeping the Gateway event loop responsive for large session stores and large transcripts. Refs #75330 and #75414. Thanks @amknight. +- Gateway/sessions: stream bounded transcript reads for session detail, history, artifacts, compaction, and send/subscribe sequence paths so small Gateway requests no longer materialize large transcripts or OOM on oversized session logs. Thanks @vincentkoc. - Gateway/chat: bound chat-history transcript reads to the requested display window so large session logs no longer OOM the Gateway when clients ask for a small history page. Thanks @vincentkoc. - Voice Call/Twilio: honor stored pre-connect TwiML before realtime webhook shortcuts and reject DTMF sequences outside conversation mode, so Meet PIN entry cannot be skipped or silently dropped. Thanks @donkeykong91 and @PfanP. - Docs/sandboxing: clarify that sandbox setup scripts (`sandbox-setup.sh`, `sandbox-common-setup.sh`, `sandbox-browser-setup.sh`) are only available from a source checkout, and add inline `docker build` commands for npm-installed users so sandbox image setup works without cloning the repo. Fixes #75485. Thanks @amknight. diff --git a/src/gateway/server-methods/artifacts.test.ts b/src/gateway/server-methods/artifacts.test.ts index e56a13e10af..91fe25d6a3f 100644 --- a/src/gateway/server-methods/artifacts.test.ts +++ b/src/gateway/server-methods/artifacts.test.ts @@ -4,7 +4,7 @@ import { artifactsHandlers, collectArtifactsFromMessages } from "./artifacts.js" const hoisted = vi.hoisted(() => ({ getTaskSessionLookupByIdForStatus: vi.fn(), loadSessionEntry: vi.fn(), - readSessionMessages: vi.fn(), + visitSessionMessages: vi.fn(), resolveSessionKeyForRun: vi.fn(), })); @@ -17,7 +17,7 @@ vi.mock("../session-utils.js", async () => { return { ...actual, loadSessionEntry: hoisted.loadSessionEntry, - readSessionMessages: hoisted.readSessionMessages, + visitSessionMessages: hoisted.visitSessionMessages, }; }); @@ -49,7 +49,7 @@ describe("artifacts RPC handlers", () => { storePath: "/tmp/sessions.json", entry: { sessionId: "sess-main", sessionFile: "/tmp/sess-main.jsonl" }, }); - hoisted.readSessionMessages.mockReturnValue([ + mockedMessages([ { role: "assistant", content: [ @@ -66,6 +66,15 @@ describe("artifacts RPC handlers", () => { ]); }); + function mockedMessages(messages: unknown[]) { + hoisted.visitSessionMessages.mockImplementation( + (_sessionId, _storePath, _sessionFile, visit) => { + messages.forEach((message, index) => visit(message, index + 1)); + return messages.length; + }, + ); + } + it("lists stable transcript artifact summaries by sessionKey", async () => { const { calls, respond } = createResponder(); @@ -99,7 +108,21 @@ describe("artifacts RPC handlers", () => { it("gets and downloads an inline artifact", async () => { const listed = collectArtifactsFromMessages({ sessionKey: "agent:main:main", - messages: hoisted.readSessionMessages(), + messages: [ + { + role: "assistant", + content: [ + { type: "text", text: "see attached" }, + { + type: "image", + data: "aGVsbG8=", + mimeType: "image/png", + alt: "result.png", + }, + ], + __openclaw: { seq: 2 }, + }, + ], }); const artifactId = listed[0]?.id; expect(artifactId).toBeTruthy(); @@ -137,7 +160,7 @@ describe("artifacts RPC handlers", () => { it("resolves runId queries through the gateway run-to-session lookup", async () => { hoisted.resolveSessionKeyForRun.mockReturnValue("agent:main:main"); - hoisted.readSessionMessages.mockReturnValue([ + mockedMessages([ { role: "assistant", content: [{ type: "image", data: "aGVsbG8=", alt: "run-result.png" }], @@ -166,7 +189,7 @@ describe("artifacts RPC handlers", () => { requesterSessionKey: "agent:main:main", runId: "run-for-task-1", }); - hoisted.readSessionMessages.mockReturnValue([ + mockedMessages([ { role: "assistant", content: [{ type: "image", data: "dGFyZ2V0", alt: "task-result.png" }], @@ -257,7 +280,7 @@ describe("artifacts RPC handlers", () => { }); it("discovers transcript image_url data blocks", async () => { - hoisted.readSessionMessages.mockReturnValue([ + mockedMessages([ { role: "user", content: [ diff --git a/src/gateway/server-methods/artifacts.ts b/src/gateway/server-methods/artifacts.ts index e48a1c806c5..269326783f8 100644 --- a/src/gateway/server-methods/artifacts.ts +++ b/src/gateway/server-methods/artifacts.ts @@ -10,7 +10,7 @@ import { validateArtifactsListParams, } from "../protocol/index.js"; import { resolveSessionKeyForRun } from "../server-session-key.js"; -import { loadSessionEntry, readSessionMessages } from "../session-utils.js"; +import { loadSessionEntry, visitSessionMessages } from "../session-utils.js"; import type { GatewayRequestHandlers, RespondFn } from "./types.js"; import { assertValidParams } from "./validation.js"; @@ -215,61 +215,72 @@ export function collectArtifactsFromMessages(params: { const artifacts: ArtifactRecord[] = []; let messageFallbackSeq = 0; for (const message of params.messages) { - const msg = asRecord(message); - if (!msg) { - continue; - } messageFallbackSeq += 1; - const messageSeq = resolveMessageSeq(msg, messageFallbackSeq); - const messageRunId = resolveMessageRunId(msg); - const messageTaskId = resolveMessageTaskId(msg); - if (params.runId && messageRunId !== params.runId) { - continue; - } - if (params.taskId && messageTaskId !== params.taskId) { - continue; - } - const content = Array.isArray(msg.content) ? msg.content : []; - for (let contentIndex = 0; contentIndex < content.length; contentIndex += 1) { - const block = asRecord(content[contentIndex]); - if (!block || !isArtifactBlock(block)) { - continue; - } - const type = normalizeArtifactType(asNonEmptyString(block.type) ?? "file"); - const title = - asNonEmptyString(block.title) ?? - asNonEmptyString(block.fileName) ?? - asNonEmptyString(block.filename) ?? - asNonEmptyString(block.alt) ?? - `${type} ${artifacts.length + 1}`; - const download = resolveBlockDownload(block); - const summary: ArtifactRecord = { - id: artifactId({ - sessionKey: params.sessionKey, - messageSeq, - contentIndex, - title, - type, - }), - type, - title, - ...(download.mimeType ? { mimeType: download.mimeType } : {}), - ...(download.sizeBytes !== undefined ? { sizeBytes: download.sizeBytes } : {}), - sessionKey: params.sessionKey, - ...(messageRunId ? { runId: messageRunId } : {}), - ...(messageTaskId ? { taskId: messageTaskId } : {}), - messageSeq, - source: "session-transcript", - download: { mode: download.mode }, - ...(download.data ? { data: download.data } : {}), - ...(download.url ? { url: download.url } : {}), - }; - artifacts.push(summary); - } + collectArtifactsFromMessage({ ...params, message, messageFallbackSeq, artifacts }); } return artifacts; } +function collectArtifactsFromMessage(params: { + message: unknown; + messageFallbackSeq: number; + artifacts: ArtifactRecord[]; + sessionKey: string; + runId?: string; + taskId?: string; +}): void { + const msg = asRecord(params.message); + if (!msg) { + return; + } + const messageSeq = resolveMessageSeq(msg, params.messageFallbackSeq); + const messageRunId = resolveMessageRunId(msg); + const messageTaskId = resolveMessageTaskId(msg); + if (params.runId && messageRunId !== params.runId) { + return; + } + if (params.taskId && messageTaskId !== params.taskId) { + return; + } + const content = Array.isArray(msg.content) ? msg.content : []; + for (let contentIndex = 0; contentIndex < content.length; contentIndex += 1) { + const block = asRecord(content[contentIndex]); + if (!block || !isArtifactBlock(block)) { + continue; + } + const type = normalizeArtifactType(asNonEmptyString(block.type) ?? "file"); + const title = + asNonEmptyString(block.title) ?? + asNonEmptyString(block.fileName) ?? + asNonEmptyString(block.filename) ?? + asNonEmptyString(block.alt) ?? + `${type} ${params.artifacts.length + 1}`; + const download = resolveBlockDownload(block); + const summary: ArtifactRecord = { + id: artifactId({ + sessionKey: params.sessionKey, + messageSeq, + contentIndex, + title, + type, + }), + type, + title, + ...(download.mimeType ? { mimeType: download.mimeType } : {}), + ...(download.sizeBytes !== undefined ? { sizeBytes: download.sizeBytes } : {}), + sessionKey: params.sessionKey, + ...(messageRunId ? { runId: messageRunId } : {}), + ...(messageTaskId ? { taskId: messageTaskId } : {}), + messageSeq, + source: "session-transcript", + download: { mode: download.mode }, + ...(download.data ? { data: download.data } : {}), + ...(download.url ? { url: download.url } : {}), + }; + params.artifacts.push(summary); + } +} + function resolveQuerySessionKey(query: ArtifactQuery): string | undefined { if (query.sessionKey) { return query.sessionKey; @@ -296,16 +307,23 @@ function loadArtifacts(query: ArtifactQuery): { artifacts: ArtifactRecord[]; ses } const { storePath, entry } = loadSessionEntry(sessionKey); const sessionId = entry?.sessionId; - const messages = - sessionId && storePath ? readSessionMessages(sessionId, storePath, entry?.sessionFile) : []; - return { - sessionKey, - artifacts: collectArtifactsFromMessages({ - messages, + if (!sessionId || !storePath) { + return { sessionKey, artifacts: [] }; + } + const artifacts: ArtifactRecord[] = []; + visitSessionMessages(sessionId, storePath, entry?.sessionFile, (message, seq) => { + collectArtifactsFromMessage({ + message, + messageFallbackSeq: seq, + artifacts, sessionKey, runId: query.runId, taskId: query.taskId, - }), + }); + }); + return { + sessionKey, + artifacts, }; } diff --git a/src/gateway/server-methods/chat-transcript-inject.ts b/src/gateway/server-methods/chat-transcript-inject.ts index 60fe9a83a9b..e8745d72876 100644 --- a/src/gateway/server-methods/chat-transcript-inject.ts +++ b/src/gateway/server-methods/chat-transcript-inject.ts @@ -1,9 +1,14 @@ +import { randomUUID } from "node:crypto"; +import fs from "node:fs"; +import { StringDecoder } from "node:string_decoder"; import { SessionManager } from "@mariozechner/pi-coding-agent"; import { formatErrorMessage } from "../../infra/errors.js"; import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js"; type AppendMessageArg = Parameters[0]; +const SESSION_MANAGER_APPEND_MAX_BYTES = 8 * 1024 * 1024; + export type GatewayInjectedAbortMeta = { aborted: true; origin: "rpc" | "stop-command"; @@ -41,6 +46,77 @@ function resolveInjectedAssistantContent(params: { return [{ type: "text", text: `${labelPrefix}${params.message}` }]; } +function transcriptHasParentLinkedEntries(transcriptPath: string): boolean { + let fd: number | null = null; + try { + fd = fs.openSync(transcriptPath, "r"); + const decoder = new StringDecoder("utf8"); + const buffer = Buffer.allocUnsafe(64 * 1024); + let carry = ""; + while (true) { + const bytesRead = fs.readSync(fd, buffer, 0, buffer.length, null); + if (bytesRead <= 0) { + break; + } + const text = carry + decoder.write(buffer.subarray(0, bytesRead)); + const lines = text.split(/\r?\n/); + carry = lines.pop() ?? ""; + for (const line of lines) { + if (lineHasParentLinkedEntry(line)) { + return true; + } + } + } + return lineHasParentLinkedEntry(carry + decoder.end()); + } catch { + return true; + } finally { + if (fd !== null) { + fs.closeSync(fd); + } + } +} + +function lineHasParentLinkedEntry(line: string): boolean { + if (!line.trim()) { + return false; + } + try { + const parsed = JSON.parse(line) as { type?: unknown; id?: unknown; parentId?: unknown }; + return parsed.type !== "session" && typeof parsed.id === "string" && "parentId" in parsed; + } catch { + return false; + } +} + +function shouldUseRawAppend(transcriptPath: string): boolean { + try { + const stat = fs.statSync(transcriptPath); + return ( + stat.size > SESSION_MANAGER_APPEND_MAX_BYTES && + !transcriptHasParentLinkedEntries(transcriptPath) + ); + } catch { + return false; + } +} + +function appendRawAssistantMessageToTranscript(params: { + transcriptPath: string; + message: AppendMessageArg & Record; + now: number; +}): { messageId: string } { + const messageId = randomUUID(); + const entry = { + type: "message", + id: messageId, + timestamp: new Date(params.now).toISOString(), + message: params.message, + }; + fs.appendFileSync(params.transcriptPath, `${JSON.stringify(entry)}\n`, "utf-8"); + return { messageId }; +} + export function appendInjectedAssistantMessageToTranscript(params: { transcriptPath: string; message: string; @@ -100,6 +176,20 @@ export function appendInjectedAssistantMessageToTranscript(params: { }; try { + if (shouldUseRawAppend(params.transcriptPath)) { + const { messageId } = appendRawAssistantMessageToTranscript({ + transcriptPath: params.transcriptPath, + message: messageBody, + now, + }); + emitSessionTranscriptUpdate({ + sessionFile: params.transcriptPath, + message: messageBody, + messageId, + }); + return { ok: true, messageId, message: messageBody }; + } + // IMPORTANT: Use SessionManager so the entry is attached to the current leaf via parentId. // Raw jsonl appends break the parent chain and can hide compaction summaries from context. const sessionManager = SessionManager.open(params.transcriptPath); diff --git a/src/gateway/server-methods/chat.inject.parentid.test.ts b/src/gateway/server-methods/chat.inject.parentid.test.ts index 12a4a38f114..0da7639878a 100644 --- a/src/gateway/server-methods/chat.inject.parentid.test.ts +++ b/src/gateway/server-methods/chat.inject.parentid.test.ts @@ -34,4 +34,43 @@ describe("gateway chat.inject transcript writes", () => { fs.rmSync(dir, { recursive: true, force: true }); } }); + + it("uses raw append for oversized append-only transcripts", async () => { + const { dir, transcriptPath } = createTranscriptFixtureSync({ + prefix: "openclaw-chat-inject-large-", + sessionId: "sess-1", + }); + + try { + fs.appendFileSync( + transcriptPath, + `${JSON.stringify({ + type: "message", + id: "legacy-large-message", + message: { + role: "assistant", + content: [{ type: "text", text: "x".repeat(9 * 1024 * 1024) }], + }, + })}\n`, + "utf-8", + ); + + const appended = appendInjectedAssistantMessageToTranscript({ + transcriptPath, + message: "hello", + }); + expect(appended.ok).toBe(true); + expect(appended.messageId).toBeTruthy(); + + const lines = fs.readFileSync(transcriptPath, "utf-8").split(/\r?\n/).filter(Boolean); + const last = JSON.parse(lines.at(-1) as string) as Record; + + expect(last.type).toBe("message"); + expect(last).toHaveProperty("id", appended.messageId); + expect(last).toHaveProperty("message"); + expect(Object.prototype.hasOwnProperty.call(last, "parentId")).toBe(false); + } finally { + fs.rmSync(dir, { recursive: true, force: true }); + } + }); }); diff --git a/src/gateway/server-methods/server-methods.test.ts b/src/gateway/server-methods/server-methods.test.ts index 66039018672..e66b4c1ff00 100644 --- a/src/gateway/server-methods/server-methods.test.ts +++ b/src/gateway/server-methods/server-methods.test.ts @@ -596,7 +596,8 @@ describe("gateway chat transcript writes (guardrail)", () => { expect(chatSrc.includes("fs.appendFileSync(transcriptPath")).toBe(false); expect(chatSrc).toContain("appendInjectedAssistantMessageToTranscript("); - expect(helperSrc.includes("fs.appendFileSync(params.transcriptPath")).toBe(false); + expect(helperSrc).toContain("function shouldUseRawAppend("); + expect(helperSrc).toContain("function appendRawAssistantMessageToTranscript("); expect(helperSrc).toContain("SessionManager.open(params.transcriptPath)"); expect(helperSrc).toContain("appendMessage(messageBody)"); }); diff --git a/src/gateway/server-methods/sessions.ts b/src/gateway/server-methods/sessions.ts index c9acc1bdf43..97491506baf 100644 --- a/src/gateway/server-methods/sessions.ts +++ b/src/gateway/server-methods/sessions.ts @@ -77,6 +77,9 @@ import { loadGatewaySessionRow, loadSessionEntry, migrateAndPruneGatewaySessionStoreKey, + readRecentSessionMessagesWithStats, + readRecentSessionTranscriptLines, + readSessionMessageCount, readSessionPreviewItemsFromTranscript, resolveDeletedAgentIdFromSessionKey, resolveFreshestSessionEntryFromStoreKeys, @@ -87,7 +90,6 @@ import { type SessionsPatchResult, type SessionsPreviewEntry, type SessionsPreviewResult, - readSessionMessages, } from "../session-utils.js"; import { applySessionsPatchToStore } from "../sessions-patch.js"; import { resolveSessionKeyFromResolveParams } from "../sessions-resolve.js"; @@ -569,7 +571,7 @@ async function handleSessionSend(params: { interruptedActiveRun = interruptResult.interrupted; } - const messageSeq = readSessionMessages(entry.sessionId, storePath, entry.sessionFile).length + 1; + const messageSeq = readSessionMessageCount(entry.sessionId, storePath, entry.sessionFile) + 1; let sendAcked = false; let sendPayload: unknown; let sendCached = false; @@ -983,8 +985,11 @@ export const sessionsHandlers: GatewayRequestHandlers = { let runError: unknown; let runMeta: Record | undefined; const messageSeq = initialMessage - ? readSessionMessages(createdEntry.sessionId, target.storePath, createdEntry.sessionFile) - .length + 1 + ? readSessionMessageCount( + createdEntry.sessionId, + target.storePath, + createdEntry.sessionFile, + ) + 1 : undefined; if (initialMessage) { @@ -1690,8 +1695,15 @@ export const sessionsHandlers: GatewayRequestHandlers = { respond(true, { messages: [] }, undefined); return; } - const allMessages = readSessionMessages(entry.sessionId, storePath, entry.sessionFile); - const messages = limit < allMessages.length ? allMessages.slice(-limit) : allMessages; + const { messages } = readRecentSessionMessagesWithStats( + entry.sessionId, + storePath, + entry.sessionFile, + { + maxMessages: limit, + maxLines: limit * 20 + 20, + }, + ); respond(true, { messages }, undefined); }, "sessions.compact": async ({ req, params, respond, context, client, isWebchatConnect }) => { @@ -1847,16 +1859,23 @@ export const sessionsHandlers: GatewayRequestHandlers = { return; } - const raw = fs.readFileSync(filePath, "utf-8"); - const lines = raw.split(/\r?\n/).filter((l) => Boolean(normalizeOptionalString(l))); - if (lines.length <= maxLines) { + const tail = readRecentSessionTranscriptLines({ + sessionId, + storePath, + sessionFile: entry?.sessionFile, + agentId: target.agentId, + maxLines, + }); + const lines = tail?.lines ?? []; + const totalLines = tail?.totalLines ?? 0; + if (totalLines <= maxLines) { respond( true, { ok: true, key: target.canonicalKey, compacted: false, - kept: lines.length, + kept: totalLines, }, undefined, ); @@ -1864,8 +1883,7 @@ export const sessionsHandlers: GatewayRequestHandlers = { } const archived = archiveFileOnDisk(filePath, "bak"); - const keptLines = lines.slice(-maxLines); - fs.writeFileSync(filePath, `${keptLines.join("\n")}\n`, "utf-8"); + fs.writeFileSync(filePath, `${lines.join("\n")}\n`, "utf-8"); await updateSessionStore(storePath, (store) => { const entryKey = compactTarget.primaryKey; @@ -1887,7 +1905,7 @@ export const sessionsHandlers: GatewayRequestHandlers = { key: target.canonicalKey, compacted: true, archived, - kept: keptLines.length, + kept: lines.length, }, undefined, ); diff --git a/src/gateway/server-session-events.ts b/src/gateway/server-session-events.ts index 19b2029a2aa..7cfe817d176 100644 --- a/src/gateway/server-session-events.ts +++ b/src/gateway/server-session-events.ts @@ -11,7 +11,7 @@ import { attachOpenClawTranscriptMeta, loadGatewaySessionRow, loadSessionEntry, - readSessionMessages, + readSessionMessageCount, type GatewaySessionRow, } from "./session-utils.js"; @@ -105,7 +105,7 @@ export function createTranscriptUpdateBroadcastHandler(params: { } const { entry, storePath } = loadSessionEntry(sessionKey); const messageSeq = entry?.sessionId - ? readSessionMessages(entry.sessionId, storePath, entry.sessionFile).length + ? readSessionMessageCount(entry.sessionId, storePath, entry.sessionFile) : undefined; const sessionSnapshot = buildGatewaySessionSnapshot({ sessionRow: loadGatewaySessionRow(sessionKey), diff --git a/src/gateway/session-history-state.test.ts b/src/gateway/session-history-state.test.ts index 95d827f727c..a6704f5f6d8 100644 --- a/src/gateway/session-history-state.test.ts +++ b/src/gateway/session-history-state.test.ts @@ -77,6 +77,69 @@ describe("SessionHistorySseState", () => { expect(snapshot.rawTranscriptSeq).toBe(2); }); + test("marks bounded tail snapshots as having older history", () => { + const snapshot = buildSessionHistorySnapshot({ + rawMessages: [ + { + role: "assistant", + content: [{ type: "text", text: "tail" }], + __openclaw: { seq: 99 }, + }, + ], + limit: 1, + rawTranscriptSeq: 99, + totalRawMessages: 99, + }); + + expect(snapshot.history.hasMore).toBe(true); + expect(snapshot.history.nextCursor).toBe("99"); + expect(snapshot.rawTranscriptSeq).toBe(99); + }); + + test("refreshes limited SSE history from bounded tail reads", () => { + const fullReadSpy = vi.spyOn(sessionUtils, "readSessionMessages").mockReturnValue([]); + const tailReadSpy = vi + .spyOn(sessionUtils, "readRecentSessionMessagesWithStats") + .mockReturnValueOnce({ + messages: [ + { + role: "assistant", + content: [{ type: "text", text: "tail one" }], + __openclaw: { seq: 7 }, + }, + ], + totalMessages: 7, + }) + .mockReturnValueOnce({ + messages: [ + { + role: "assistant", + content: [{ type: "text", text: "tail two" }], + __openclaw: { seq: 8 }, + }, + ], + totalMessages: 8, + }); + try { + const state = new SessionHistorySseState({ + target: { sessionId: "sess-main" }, + limit: 1, + }); + + expect(state.snapshot().messages[0]?.__openclaw?.seq).toBe(7); + const refreshed = state.refresh(); + + expect(refreshed.hasMore).toBe(true); + expect(refreshed.nextCursor).toBe("8"); + expect(refreshed.messages[0]?.__openclaw?.seq).toBe(8); + expect(tailReadSpy).toHaveBeenCalledTimes(2); + expect(fullReadSpy).not.toHaveBeenCalled(); + } finally { + fullReadSpy.mockRestore(); + tailReadSpy.mockRestore(); + } + }); + test("strips legacy internal envelopes before exposing history", () => { const snapshot = buildSessionHistorySnapshot({ rawMessages: [ diff --git a/src/gateway/session-history-state.ts b/src/gateway/session-history-state.ts index fc99a3b4023..84ae7da8f04 100644 --- a/src/gateway/session-history-state.ts +++ b/src/gateway/session-history-state.ts @@ -2,7 +2,11 @@ import { DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS, projectChatDisplayMessages, } from "./chat-display-projection.js"; -import { attachOpenClawTranscriptMeta, readSessionMessages } from "./session-utils.js"; +import { + attachOpenClawTranscriptMeta, + readRecentSessionMessagesWithStats, + readSessionMessages, +} from "./session-utils.js"; type SessionHistoryTranscriptMeta = { seq?: number; @@ -30,6 +34,24 @@ type SessionHistoryTranscriptTarget = { sessionFile?: string; }; +type SessionHistoryRawSnapshot = { + rawMessages: unknown[]; + rawTranscriptSeq?: number; + totalRawMessages?: number; +}; + +export function resolveSessionHistoryTailReadOptions(limit: number): { + maxMessages: number; + maxLines: number; +} { + const requested = Math.max(1, Math.floor(limit)); + const rawWindow = requested * 20 + 20; + return { + maxMessages: rawWindow, + maxLines: rawWindow, + }; +} + function resolveCursorSeq(cursor: string | undefined): number | undefined { if (!cursor) { return undefined; @@ -98,6 +120,8 @@ export function buildSessionHistorySnapshot(params: { maxChars?: number; limit?: number; cursor?: string; + rawTranscriptSeq?: number; + totalRawMessages?: number; }): SessionHistorySnapshot { const visibleMessages = toSessionHistoryMessages( projectChatDisplayMessages(params.rawMessages, { @@ -105,10 +129,25 @@ export function buildSessionHistorySnapshot(params: { }), ); const history = paginateSessionMessages(visibleMessages, params.limit, params.cursor); + if ( + !params.cursor && + typeof params.totalRawMessages === "number" && + params.totalRawMessages > params.rawMessages.length && + history.messages.length > 0 + ) { + const firstSeq = resolveMessageSeq(history.messages[0]); + history.hasMore = true; + if (typeof firstSeq === "number") { + history.nextCursor = String(firstSeq); + } + } const rawHistoryMessages = toSessionHistoryMessages(params.rawMessages); return { history, - rawTranscriptSeq: resolveMessageSeq(rawHistoryMessages.at(-1)) ?? rawHistoryMessages.length, + rawTranscriptSeq: + params.rawTranscriptSeq ?? + resolveMessageSeq(rawHistoryMessages.at(-1)) ?? + rawHistoryMessages.length, }; } @@ -123,6 +162,8 @@ export class SessionHistorySseState { static fromRawSnapshot(params: { target: SessionHistoryTranscriptTarget; rawMessages: unknown[]; + rawTranscriptSeq?: number; + totalRawMessages?: number; maxChars?: number; limit?: number; cursor?: string; @@ -133,6 +174,8 @@ export class SessionHistorySseState { limit: params.limit, cursor: params.cursor, initialRawMessages: params.rawMessages, + rawTranscriptSeq: params.rawTranscriptSeq, + totalRawMessages: params.totalRawMessages, }); } @@ -142,17 +185,36 @@ export class SessionHistorySseState { limit?: number; cursor?: string; initialRawMessages?: unknown[]; + rawTranscriptSeq?: number; + totalRawMessages?: number; }) { this.target = params.target; this.maxChars = params.maxChars ?? DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS; this.limit = params.limit; this.cursor = params.cursor; - const rawMessages = params.initialRawMessages ?? this.readRawMessages(); + const rawSnapshot = + params.initialRawMessages === undefined + ? this.readRawSnapshot() + : { + rawMessages: params.initialRawMessages, + ...(typeof params.rawTranscriptSeq === "number" + ? { rawTranscriptSeq: params.rawTranscriptSeq } + : {}), + ...(typeof params.totalRawMessages === "number" + ? { totalRawMessages: params.totalRawMessages } + : {}), + }; const snapshot = buildSessionHistorySnapshot({ - rawMessages, + rawMessages: rawSnapshot.rawMessages, maxChars: this.maxChars, limit: this.limit, cursor: this.cursor, + ...(typeof rawSnapshot.rawTranscriptSeq === "number" + ? { rawTranscriptSeq: rawSnapshot.rawTranscriptSeq } + : {}), + ...(typeof rawSnapshot.totalRawMessages === "number" + ? { totalRawMessages: rawSnapshot.totalRawMessages } + : {}), }); this.sentHistory = snapshot.history; this.rawTranscriptSeq = snapshot.rawTranscriptSeq; @@ -192,17 +254,43 @@ export class SessionHistorySseState { } refresh(): PaginatedSessionHistory { + const rawSnapshot = this.readRawSnapshot(); const snapshot = buildSessionHistorySnapshot({ - rawMessages: this.readRawMessages(), + rawMessages: rawSnapshot.rawMessages, maxChars: this.maxChars, limit: this.limit, cursor: this.cursor, + ...(typeof rawSnapshot.rawTranscriptSeq === "number" + ? { rawTranscriptSeq: rawSnapshot.rawTranscriptSeq } + : {}), + ...(typeof rawSnapshot.totalRawMessages === "number" + ? { totalRawMessages: rawSnapshot.totalRawMessages } + : {}), }); this.rawTranscriptSeq = snapshot.rawTranscriptSeq; this.sentHistory = snapshot.history; return snapshot.history; } + private readRawSnapshot(): SessionHistoryRawSnapshot { + if (this.cursor === undefined && typeof this.limit === "number") { + const snapshot = readRecentSessionMessagesWithStats( + this.target.sessionId, + this.target.storePath, + this.target.sessionFile, + resolveSessionHistoryTailReadOptions(this.limit), + ); + return { + rawMessages: snapshot.messages, + rawTranscriptSeq: snapshot.totalMessages, + totalRawMessages: snapshot.totalMessages, + }; + } + return { + rawMessages: this.readRawMessages(), + }; + } + private readRawMessages(): unknown[] { return readSessionMessages( this.target.sessionId, diff --git a/src/gateway/session-utils.fs.test.ts b/src/gateway/session-utils.fs.test.ts index 3a34f9ada0f..ef983613a26 100644 --- a/src/gateway/session-utils.fs.test.ts +++ b/src/gateway/session-utils.fs.test.ts @@ -10,6 +10,9 @@ import { readLatestSessionUsageFromTranscript, readRecentSessionUsageFromTranscript, readRecentSessionMessages, + readRecentSessionMessagesWithStats, + readRecentSessionTranscriptLines, + readSessionMessageCount, readSessionMessages, readSessionTitleFieldsFromTranscript, readSessionPreviewItemsFromTranscript, @@ -563,6 +566,84 @@ describe("readSessionMessages", () => { } }); + test("preserves real sequence metadata for bounded recent-message reads", () => { + const sessionId = "test-session-recent-seq"; + writeTranscript(tmpDir, sessionId, [ + { type: "session", version: 1, id: sessionId }, + { message: { role: "user", content: "old" } }, + { message: { role: "assistant", content: "middle" } }, + { message: { role: "user", content: "recent" } }, + { message: { role: "assistant", content: "latest" } }, + ]); + + const result = readRecentSessionMessagesWithStats(sessionId, storePath, undefined, { + maxMessages: 2, + maxBytes: 256, + }); + + expect(result.totalMessages).toBe(4); + expect(result.messages).toEqual([ + expect.objectContaining({ + content: "recent", + __openclaw: expect.objectContaining({ seq: 3 }), + }), + expect.objectContaining({ + content: "latest", + __openclaw: expect.objectContaining({ seq: 4 }), + }), + ]); + }); + + test("counts transcript messages without loading the whole file", () => { + const sessionId = "test-session-count-large"; + const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`); + const lines = [ + JSON.stringify({ type: "session", version: 1, id: sessionId }), + ...Array.from({ length: 2500 }, (_, index) => + JSON.stringify({ message: { role: "user", content: `message ${index}` } }), + ), + ]; + fs.writeFileSync(transcriptPath, lines.join("\n"), "utf-8"); + const readFileSpy = vi.spyOn(fs, "readFileSync"); + + try { + expect(readSessionMessageCount(sessionId, storePath)).toBe(2500); + expect(readFileSpy).not.toHaveBeenCalled(); + } finally { + readFileSpy.mockRestore(); + } + }); + + test("tails transcript lines for manual compaction without loading the whole file", () => { + const sessionId = "test-session-line-tail"; + const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`); + const lines = [ + JSON.stringify({ type: "session", version: 1, id: sessionId }), + ...Array.from({ length: 10 }, (_, index) => + JSON.stringify({ message: { role: "user", content: `message ${index}` } }), + ), + ]; + fs.writeFileSync(transcriptPath, `${lines.join("\n")}\n`, "utf-8"); + const readFileSpy = vi.spyOn(fs, "readFileSync"); + + try { + const result = readRecentSessionTranscriptLines({ + sessionId, + storePath, + maxLines: 3, + }); + expect(result?.totalLines).toBe(11); + expect(result?.lines.map((line) => JSON.parse(line).message?.content)).toEqual([ + "message 7", + "message 8", + "message 9", + ]); + expect(readFileSpy).not.toHaveBeenCalled(); + } finally { + readFileSpy.mockRestore(); + } + }); + test("reads only the active branch when transcript rewrites abandon older entries", () => { const sessionId = "test-session-active-branch"; const sessionFile = path.join(tmpDir, `${sessionId}.jsonl`); diff --git a/src/gateway/session-utils.fs.ts b/src/gateway/session-utils.fs.ts index 242769c4e60..5f7923056b4 100644 --- a/src/gateway/session-utils.fs.ts +++ b/src/gateway/session-utils.fs.ts @@ -1,4 +1,5 @@ import fs from "node:fs"; +import { StringDecoder } from "node:string_decoder"; import { SessionManager, type SessionEntry } from "@mariozechner/pi-coding-agent"; import { deriveSessionTotalTokens, hasNonzeroUsage, normalizeUsage } from "../agents/usage.js"; import { jsonUtf8Bytes } from "../infra/json-utf8-bytes.js"; @@ -174,6 +175,11 @@ export type ReadRecentSessionMessagesOptions = { maxLines?: number; }; +export type ReadRecentSessionMessagesResult = { + messages: unknown[]; + totalMessages: number; +}; + const RECENT_SESSION_MESSAGES_DEFAULT_MAX_BYTES = 8 * 1024 * 1024; export function readRecentSessionMessages( @@ -247,6 +253,189 @@ export function readRecentSessionMessages( ); } +function visitTranscriptLines(filePath: string, visit: (line: string) => void): void { + const fd = fs.openSync(filePath, "r"); + try { + const decoder = new StringDecoder("utf8"); + const buffer = Buffer.allocUnsafe(64 * 1024); + let carry = ""; + while (true) { + const bytesRead = fs.readSync(fd, buffer, 0, buffer.length, null); + if (bytesRead <= 0) { + break; + } + const text = carry + decoder.write(buffer.subarray(0, bytesRead)); + const lines = text.split(/\r?\n/); + carry = lines.pop() ?? ""; + for (const line of lines) { + visit(line); + } + } + const tail = carry + decoder.end(); + if (tail) { + visit(tail); + } + } finally { + fs.closeSync(fd); + } +} + +function transcriptHasTreeEntries(filePath: string): boolean { + let hasTreeEntries = false; + try { + visitTranscriptLines(filePath, (line) => { + if (!hasTreeEntries && hasSessionTreeEntry(line)) { + hasTreeEntries = true; + } + }); + } catch { + return false; + } + return hasTreeEntries; +} + +function visitSessionManagerBranchMessages( + filePath: string, + visit: (message: unknown, seq: number) => void, +): number { + const branchEntries = SessionManager.open(filePath).getBranch(); + let messageSeq = 0; + for (const entry of branchEntries) { + if (entry.type === "message" && entry.message) { + messageSeq += 1; + visit( + attachOpenClawTranscriptMeta(entry.message, { + ...(typeof entry.id === "string" ? { id: entry.id } : {}), + seq: messageSeq, + }), + messageSeq, + ); + continue; + } + + if (entry.type === "compaction") { + const ts = typeof entry.timestamp === "string" ? Date.parse(entry.timestamp) : Number.NaN; + const timestamp = Number.isFinite(ts) ? ts : Date.now(); + messageSeq += 1; + visit( + { + role: "system", + content: [{ type: "text", text: "Compaction" }], + timestamp, + __openclaw: { + kind: "compaction", + id: typeof entry.id === "string" ? entry.id : undefined, + seq: messageSeq, + }, + }, + messageSeq, + ); + } + } + return messageSeq; +} + +export function visitSessionMessages( + sessionId: string, + storePath: string | undefined, + sessionFile: string | undefined, + visit: (message: unknown, seq: number) => void, +): number { + const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile); + if (!filePath) { + return 0; + } + + if (transcriptHasTreeEntries(filePath)) { + try { + return visitSessionManagerBranchMessages(filePath, visit); + } catch { + return 0; + } + } + + let messageSeq = 0; + try { + visitTranscriptLines(filePath, (line) => { + if (!line.trim()) { + return; + } + try { + const parsed = JSON.parse(line); + const message = parsedSessionEntryToMessage(parsed, messageSeq + 1); + if (message) { + messageSeq += 1; + visit(message, messageSeq); + } + } catch { + // ignore bad lines + } + }); + } catch { + return 0; + } + return messageSeq; +} + +export function readSessionMessageCount( + sessionId: string, + storePath: string | undefined, + sessionFile?: string, +): number { + return visitSessionMessages(sessionId, storePath, sessionFile, () => undefined); +} + +export function readRecentSessionMessagesWithStats( + sessionId: string, + storePath: string | undefined, + sessionFile: string | undefined, + opts: ReadRecentSessionMessagesOptions, +): ReadRecentSessionMessagesResult { + const totalMessages = readSessionMessageCount(sessionId, storePath, sessionFile); + const messages = readRecentSessionMessages(sessionId, storePath, sessionFile, opts); + const firstSeq = Math.max(1, totalMessages - messages.length + 1); + const messagesWithSeq = messages.map((message, index) => + attachOpenClawTranscriptMeta(message, { seq: firstSeq + index }), + ); + return { messages: messagesWithSeq, totalMessages }; +} + +export function readRecentSessionTranscriptLines(params: { + sessionId: string; + storePath: string | undefined; + sessionFile?: string; + agentId?: string; + maxLines: number; +}): { lines: string[]; totalLines: number } | null { + const filePath = findExistingTranscriptPath( + params.sessionId, + params.storePath, + params.sessionFile, + params.agentId, + ); + if (!filePath) { + return null; + } + const maxLines = Math.max(1, Math.floor(params.maxLines)); + const lines: string[] = []; + let totalLines = 0; + try { + visitTranscriptLines(filePath, (line) => { + if (!line.trim()) { + return; + } + totalLines += 1; + lines.push(line); + if (lines.length > maxLines) { + lines.shift(); + } + }); + } catch { + return null; + } + return { lines, totalLines }; +} + function hasSessionTreeEntry(line: string): boolean { if (!line.trim()) { return false; diff --git a/src/gateway/session-utils.ts b/src/gateway/session-utils.ts index 646ff0276fb..efde8bebe1e 100644 --- a/src/gateway/session-utils.ts +++ b/src/gateway/session-utils.ts @@ -106,11 +106,15 @@ export { readFirstUserMessageFromTranscript, readLastMessagePreviewFromTranscript, readLatestSessionUsageFromTranscript, - readRecentSessionUsageFromTranscript, readRecentSessionMessages, + readRecentSessionMessagesWithStats, + readRecentSessionTranscriptLines, + readRecentSessionUsageFromTranscript, + readSessionMessageCount, readSessionTitleFieldsFromTranscript, readSessionPreviewItemsFromTranscript, readSessionMessages, + visitSessionMessages, resolveSessionTranscriptCandidates, } from "./session-utils.fs.js"; export { canonicalizeSpawnedByForAgent, resolveSessionStoreKey } from "./session-store-key.js"; diff --git a/src/gateway/sessions-history-http.ts b/src/gateway/sessions-history-http.ts index e75a0262837..76be7c2a024 100644 --- a/src/gateway/sessions-history-http.ts +++ b/src/gateway/sessions-history-http.ts @@ -25,8 +25,13 @@ import { } from "./http-utils.js"; import { authorizeOperatorScopesForMethod } from "./method-scopes.js"; import { DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS } from "./server-methods/chat.js"; -import { buildSessionHistorySnapshot, SessionHistorySseState } from "./session-history-state.js"; import { + buildSessionHistorySnapshot, + resolveSessionHistoryTailReadOptions, + SessionHistorySseState, +} from "./session-history-state.js"; +import { + readRecentSessionMessagesWithStats, readSessionMessages, resolveFreshestSessionEntryFromStoreKeys, resolveGatewaySessionStoreTarget, @@ -149,17 +154,29 @@ export async function handleSessionHistoryHttpRequest( typeof cfg.gateway?.webchat?.chatHistoryMaxChars === "number" ? cfg.gateway.webchat.chatHistoryMaxChars : DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS; - // Read the transcript once and derive both sanitized and raw views from the - // same snapshot, eliminating the theoretical race window where a concurrent - // write between two separate reads could cause seq/content divergence. - const rawSnapshot = entry?.sessionId - ? readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile) - : []; + const boundedSnapshot = + cursor === undefined && typeof limit === "number" + ? readRecentSessionMessagesWithStats( + entry.sessionId, + target.storePath, + entry.sessionFile, + resolveSessionHistoryTailReadOptions(limit), + ) + : undefined; + // Cursor reads still need an arbitrary historical window. The common first + // page path is bounded above so `limit=1` cannot materialize huge transcripts. + const rawSnapshot = + boundedSnapshot?.messages ?? + (entry?.sessionId + ? readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile) + : []); const historySnapshot = buildSessionHistorySnapshot({ rawMessages: rawSnapshot, maxChars: effectiveMaxChars, limit, cursor, + rawTranscriptSeq: boundedSnapshot?.totalMessages, + totalRawMessages: boundedSnapshot?.totalMessages, }); const history = historySnapshot.history; @@ -192,6 +209,8 @@ export async function handleSessionHistoryHttpRequest( sessionFile: entry.sessionFile, }, rawMessages: rawSnapshot, + rawTranscriptSeq: boundedSnapshot?.totalMessages, + totalRawMessages: boundedSnapshot?.totalMessages, maxChars: effectiveMaxChars, limit, cursor,