From 6147e1b91d3ecc7a62e5d1bdffcf75de21345a90 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 2 May 2026 02:06:38 +0100 Subject: [PATCH] fix(gateway): async session transcript IO (#75875) * fix(gateway): async session transcript IO * fix(plugins): restore jiti loader cache helper * test(gateway): mock async artifact transcript reads * chore(plugins): drop obsolete jiti loader shim --- CHANGELOG.md | 1 + .../src/app-server/transcript-mirror.test.ts | 79 ++++ .../codex/src/app-server/transcript-mirror.ts | 215 ++++++++- src/agents/main-session-restart-recovery.ts | 8 +- src/agents/subagent-orphan-recovery.test.ts | 6 +- src/agents/subagent-orphan-recovery.ts | 8 +- .../tools/embedded-gateway-stub.runtime.ts | 4 +- .../tools/embedded-gateway-stub.test.ts | 8 +- src/agents/tools/embedded-gateway-stub.ts | 18 +- src/agents/tools/sessions-list-tool.ts | 37 +- src/auto-reply/reply/agent-runner-memory.ts | 12 +- .../reply/session-fork.runtime.test.ts | 2 +- src/auto-reply/reply/session-fork.runtime.ts | 10 +- src/config/sessions/transcript-append.ts | 229 +++++++++ src/config/sessions/transcript.test.ts | 121 +++++ src/config/sessions/transcript.ts | 19 +- src/gateway/managed-image-attachments.test.ts | 2 +- src/gateway/managed-image-attachments.ts | 4 +- src/gateway/server-methods/artifacts.test.ts | 8 +- src/gateway/server-methods/artifacts.ts | 26 +- .../server-methods/chat-transcript-inject.ts | 107 +---- .../chat.inject.parentid.test.ts | 8 +- src/gateway/server-methods/chat.ts | 79 ++-- .../server-methods/server-methods.test.ts | 9 +- src/gateway/server-methods/sessions.ts | 15 +- src/gateway/server-session-events.ts | 116 +++-- src/gateway/session-history-state.ts | 44 ++ src/gateway/session-reset-service.ts | 10 +- src/gateway/session-transcript-index.fs.ts | 247 ++++++++++ src/gateway/session-utils.fs.test.ts | 212 +++++++++ src/gateway/session-utils.fs.ts | 441 +++++++++++++++++- src/gateway/session-utils.ts | 88 ++-- .../sessions-history-http.revocation.test.ts | 4 +- src/gateway/sessions-history-http.ts | 10 +- src/status/status-message.ts | 5 +- src/tui/embedded-backend.test.ts | 4 +- src/tui/embedded-backend.ts | 12 +- 37 files changed, 1890 insertions(+), 338 deletions(-) create mode 100644 src/config/sessions/transcript-append.ts create mode 100644 src/gateway/session-transcript-index.fs.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index a58164cb2b0..736c71351aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Gateway/sessions: move hot transcript reads and mirror appends onto async bounded IO with serialized parent-linked writes, keeping large session histories from stalling Gateway requests and channel replies. Fixes #75656. Thanks @DerFlash. - Doctor/WhatsApp: warn when Linux crontabs still run the legacy `ensure-whatsapp.sh` health check, which can misreport `Gateway inactive` when cron lacks the systemd user-bus environment. Fixes #60204. Thanks @mySebbe. - Slack/setup: print the generated app manifest as plain JSON instead of embedding it inside the framed setup note, so it can be copied into Slack without deleting border characters. Fixes #65751. Thanks @theDanielJLewis. - Channels/WhatsApp: route CLI logout through the live Gateway and stop runtime-backed listeners before channel removal, so removing a WhatsApp account does not leave the old socket replying until restart. Fixes #67746. Thanks @123Mismail. diff --git a/extensions/codex/src/app-server/transcript-mirror.test.ts b/extensions/codex/src/app-server/transcript-mirror.test.ts index c5e57dff5d8..9415891fa69 100644 --- a/extensions/codex/src/app-server/transcript-mirror.test.ts +++ b/extensions/codex/src/app-server/transcript-mirror.test.ts @@ -29,6 +29,12 @@ async function createTempSessionFile() { return path.join(dir, "session.jsonl"); } +async function makeRoot(prefix: string): Promise { + const root = await fs.mkdtemp(path.join(os.tmpdir(), prefix)); + tempDirs.push(root); + return root; +} + describe("mirrorCodexAppServerTranscript", () => { it("mirrors user and assistant messages into the Pi transcript", async () => { const sessionFile = await createTempSessionFile(); @@ -58,6 +64,27 @@ describe("mirrorCodexAppServerTranscript", () => { expect(raw).toContain('"idempotencyKey":"scope-1:assistant:1"'); }); + it("creates the transcript directory on first mirror", async () => { + const root = await makeRoot("openclaw-codex-transcript-missing-dir-"); + const sessionFile = path.join(root, "nested", "sessions", "session.jsonl"); + + await mirrorCodexAppServerTranscript({ + sessionFile, + sessionKey: "session-1", + messages: [ + makeAgentAssistantMessage({ + content: [{ type: "text", text: "first mirror" }], + timestamp: Date.now(), + }), + ], + idempotencyScope: "scope-1", + }); + + const raw = await fs.readFile(sessionFile, "utf8"); + expect(raw).toContain('"role":"assistant"'); + expect(raw).toContain('"content":[{"type":"text","text":"first mirror"}]'); + }); + it("deduplicates app-server turn mirrors by idempotency scope", async () => { const sessionFile = await createTempSessionFile(); const messages = [ @@ -183,4 +210,56 @@ describe("mirrorCodexAppServerTranscript", () => { await expect(fs.readFile(sessionFile, "utf8")).rejects.toMatchObject({ code: "ENOENT" }); }); + + it("migrates small linear transcripts before mirroring", async () => { + const sessionFile = await createTempSessionFile(); + await fs.writeFile( + sessionFile, + [ + JSON.stringify({ + type: "session", + version: 3, + id: "linear-codex-session", + timestamp: new Date().toISOString(), + cwd: process.cwd(), + }), + JSON.stringify({ + type: "message", + id: "legacy-user", + timestamp: new Date().toISOString(), + message: { role: "user", content: "legacy user" }, + }), + ].join("\n") + "\n", + "utf8", + ); + + await mirrorCodexAppServerTranscript({ + sessionFile, + sessionKey: "session-1", + messages: [ + makeAgentAssistantMessage({ + content: [{ type: "text", text: "mirrored assistant" }], + timestamp: Date.now(), + }), + ], + idempotencyScope: "scope-1", + }); + + const records = (await fs.readFile(sessionFile, "utf8")) + .trim() + .split("\n") + .map( + (line) => + JSON.parse(line) as { + type?: string; + id?: string; + parentId?: string | null; + message?: { role?: string }; + }, + ) + .filter((record) => record.type === "message"); + + expect(records[0]).toMatchObject({ id: "legacy-user", parentId: null }); + expect(records[1]).toMatchObject({ parentId: "legacy-user" }); + }); }); diff --git a/extensions/codex/src/app-server/transcript-mirror.ts b/extensions/codex/src/app-server/transcript-mirror.ts index 5a39912647b..e606ac00fe9 100644 --- a/extensions/codex/src/app-server/transcript-mirror.ts +++ b/extensions/codex/src/app-server/transcript-mirror.ts @@ -1,6 +1,8 @@ +import { randomUUID } from "node:crypto"; import fs from "node:fs/promises"; import path from "node:path"; -import { SessionManager } from "@mariozechner/pi-coding-agent"; +import { StringDecoder } from "node:string_decoder"; +import { CURRENT_SESSION_VERSION, type SessionManager } from "@mariozechner/pi-coding-agent"; import { acquireSessionWriteLock, emitSessionTranscriptUpdate, @@ -8,6 +10,15 @@ import { type AgentMessage, } from "openclaw/plugin-sdk/agent-harness-runtime"; +const TRANSCRIPT_APPEND_SCAN_CHUNK_BYTES = 64 * 1024; +const SESSION_MANAGER_APPEND_MAX_BYTES = 8 * 1024 * 1024; + +type TranscriptLeafInfo = { + leafId?: string; + hasParentLinkedEntries: boolean; + nonSessionEntryCount: number; +}; + export async function mirrorCodexAppServerTranscript(params: { sessionFile: string; sessionKey?: string; @@ -29,7 +40,6 @@ export async function mirrorCodexAppServerTranscript(params: { }); try { const existingIdempotencyKeys = await readTranscriptIdempotencyKeys(params.sessionFile); - const sessionManager = SessionManager.open(params.sessionFile); for (const [index, message] of messages.entries()) { const idempotencyKey = params.idempotencyScope ? `${params.idempotencyScope}:${message.role}:${index}` @@ -55,7 +65,10 @@ export async function mirrorCodexAppServerTranscript(params: { idempotencyKey, } : nextMessage) as unknown as Parameters[0]; - sessionManager.appendMessage(messageToAppend); + await appendCodexAppServerTranscriptMessage({ + transcriptPath: params.sessionFile, + message: messageToAppend, + }); if (idempotencyKey) { existingIdempotencyKeys.add(idempotencyKey); } @@ -71,6 +84,202 @@ export async function mirrorCodexAppServerTranscript(params: { } } +async function appendCodexAppServerTranscriptMessage(params: { + transcriptPath: string; + message: unknown; +}): Promise { + await ensureTranscriptHeader(params.transcriptPath); + const stat = await fs.stat(params.transcriptPath).catch(() => null); + let leafInfo: TranscriptLeafInfo = await readTranscriptLeafInfo(params.transcriptPath).catch( + () => ({ + hasParentLinkedEntries: false, + nonSessionEntryCount: 0, + }), + ); + const hasLinearEntries = !leafInfo.hasParentLinkedEntries && leafInfo.nonSessionEntryCount > 0; + const shouldRawAppend = hasLinearEntries && (stat?.size ?? 0) > SESSION_MANAGER_APPEND_MAX_BYTES; + if (hasLinearEntries && !shouldRawAppend) { + const migrated = await migrateLinearTranscriptToParentLinked(params.transcriptPath); + leafInfo = { + ...(migrated.leafId ? { leafId: migrated.leafId } : {}), + hasParentLinkedEntries: Boolean(migrated.leafId), + nonSessionEntryCount: leafInfo.nonSessionEntryCount, + }; + } + const entry = { + type: "message", + id: randomUUID(), + ...(shouldRawAppend ? {} : { parentId: leafInfo.leafId ?? null }), + timestamp: new Date().toISOString(), + message: params.message, + }; + await fs.appendFile(params.transcriptPath, `${JSON.stringify(entry)}\n`, "utf-8"); +} + +async function ensureTranscriptHeader(transcriptPath: string): Promise { + const stat = await fs.stat(transcriptPath).catch(() => null); + if (stat?.isFile() && stat.size > 0) { + return; + } + await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); + const header = { + type: "session", + version: CURRENT_SESSION_VERSION, + id: randomUUID(), + timestamp: new Date().toISOString(), + cwd: process.cwd(), + }; + await fs.writeFile(transcriptPath, `${JSON.stringify(header)}\n`, { + encoding: "utf-8", + mode: 0o600, + flag: stat?.isFile() ? "w" : "wx", + }); +} + +async function readTranscriptLeafInfo(transcriptPath: string): Promise { + const handle = await fs.open(transcriptPath, "r"); + try { + const decoder = new StringDecoder("utf8"); + const buffer = Buffer.allocUnsafe(TRANSCRIPT_APPEND_SCAN_CHUNK_BYTES); + let carry = ""; + let leafId: string | undefined; + let hasParentLinkedEntries = false; + let nonSessionEntryCount = 0; + while (true) { + const { bytesRead } = await handle.read(buffer, 0, buffer.length, null); + if (bytesRead <= 0) { + break; + } + const text = carry + decoder.write(buffer.subarray(0, bytesRead)); + const lines = text.split(/\r?\n/); + carry = lines.pop() ?? ""; + for (const line of lines) { + if (lineHasNonSessionEntry(line)) { + nonSessionEntryCount += 1; + } + const id = lineParentLinkedEntryId(line); + if (id) { + leafId = id; + hasParentLinkedEntries = true; + } + } + await yieldTranscriptAppendScan(); + } + const tail = carry + decoder.end(); + if (lineHasNonSessionEntry(tail)) { + nonSessionEntryCount += 1; + } + const id = lineParentLinkedEntryId(tail); + if (id) { + leafId = id; + hasParentLinkedEntries = true; + } + return { + ...(leafId ? { leafId } : {}), + hasParentLinkedEntries, + nonSessionEntryCount, + }; + } finally { + await handle.close(); + } +} + +async function migrateLinearTranscriptToParentLinked(transcriptPath: string): Promise<{ + leafId?: string; +}> { + const raw = await fs.readFile(transcriptPath, "utf-8"); + const existingIds = new Set(); + const output: string[] = []; + let previousId: string | null = null; + let leafId: string | undefined; + for (const line of raw.split(/\r?\n/)) { + if (!line.trim()) { + continue; + } + let parsed: unknown; + try { + parsed = JSON.parse(line); + } catch { + output.push(line); + continue; + } + if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) { + output.push(line); + continue; + } + const record = parsed as Record; + if (record.type === "session") { + output.push(JSON.stringify({ ...record, version: CURRENT_SESSION_VERSION })); + continue; + } + const id = normalizeEntryId(record.id) ?? generateEntryId(existingIds); + existingIds.add(id); + record.id = id; + if (!Object.hasOwn(record, "parentId")) { + record.parentId = previousId; + } + previousId = id; + leafId = id; + output.push(JSON.stringify(record)); + } + await fs.writeFile(transcriptPath, `${output.join("\n")}\n`, { + encoding: "utf-8", + mode: 0o600, + }); + const result: { leafId?: string } = {}; + if (leafId) { + result.leafId = leafId; + } + return result; +} + +function normalizeEntryId(value: unknown): string | undefined { + return typeof value === "string" && value.trim().length > 0 ? value : undefined; +} + +function generateEntryId(existingIds: Set): string { + for (let attempt = 0; attempt < 100; attempt += 1) { + const id = randomUUID().slice(0, 8); + if (!existingIds.has(id)) { + existingIds.add(id); + return id; + } + } + const id = randomUUID(); + existingIds.add(id); + return id; +} + +function lineHasNonSessionEntry(line: string): boolean { + if (!line.trim()) { + return false; + } + try { + const parsed = JSON.parse(line) as { type?: unknown }; + return parsed.type !== "session"; + } catch { + return false; + } +} + +function lineParentLinkedEntryId(line: string): string | undefined { + if (!line.trim()) { + return undefined; + } + try { + const parsed = JSON.parse(line) as { type?: unknown; id?: unknown; parentId?: unknown }; + return parsed.type !== "session" && typeof parsed.id === "string" && "parentId" in parsed + ? parsed.id + : undefined; + } catch { + return undefined; + } +} + +async function yieldTranscriptAppendScan(): Promise { + await new Promise((resolve) => setImmediate(resolve)); +} + async function readTranscriptIdempotencyKeys(sessionFile: string): Promise> { const keys = new Set(); let raw: string; diff --git a/src/agents/main-session-restart-recovery.ts b/src/agents/main-session-restart-recovery.ts index 48635665c8e..77071703df7 100644 --- a/src/agents/main-session-restart-recovery.ts +++ b/src/agents/main-session-restart-recovery.ts @@ -7,7 +7,7 @@ import path from "node:path"; import { resolveStateDir } from "../config/paths.js"; import { type SessionEntry, loadSessionStore, updateSessionStore } from "../config/sessions.js"; import { callGateway } from "../gateway/call.js"; -import { readSessionMessages } from "../gateway/session-utils.fs.js"; +import { readSessionMessagesAsync } from "../gateway/session-utils.fs.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { CommandLane } from "../process/lanes.js"; import { isAcpSessionKey, isCronSessionKey, isSubagentSessionKey } from "../routing/session-key.js"; @@ -226,7 +226,11 @@ async function recoverStore(params: { let messages: unknown[]; try { - messages = readSessionMessages(entry.sessionId, params.storePath, entry.sessionFile); + messages = await readSessionMessagesAsync( + entry.sessionId, + params.storePath, + entry.sessionFile, + ); } catch (err) { log.warn(`failed to read transcript for ${sessionKey}: ${String(err)}`); result.failed++; diff --git a/src/agents/subagent-orphan-recovery.test.ts b/src/agents/subagent-orphan-recovery.test.ts index 8375d25d8b3..20d46982f6b 100644 --- a/src/agents/subagent-orphan-recovery.test.ts +++ b/src/agents/subagent-orphan-recovery.test.ts @@ -29,7 +29,7 @@ vi.mock("../gateway/call.js", () => ({ })); vi.mock("../gateway/session-utils.fs.js", () => ({ - readSessionMessages: vi.fn(() => []), + readSessionMessagesAsync: vi.fn(async () => []), })); vi.mock("./subagent-announce-delivery.js", () => ({ @@ -465,7 +465,7 @@ describe("subagent-orphan-recovery", () => { it("includes last human message in resume when available", async () => { mockSingleAbortedSession({ sessionFile: "session-abc.jsonl" }); - vi.mocked(sessionUtils.readSessionMessages).mockReturnValue([ + vi.mocked(sessionUtils.readSessionMessagesAsync).mockResolvedValue([ { role: "user", content: [{ type: "text", text: "Please build feature Y" }] }, { role: "assistant", content: [{ type: "text", text: "Working on it..." }] }, { role: "user", content: [{ type: "text", text: "Also add tests for it" }] }, @@ -484,7 +484,7 @@ describe("subagent-orphan-recovery", () => { it("adds config change hint when assistant messages reference config modifications", async () => { mockSingleAbortedSession(); - vi.mocked(sessionUtils.readSessionMessages).mockReturnValue([ + vi.mocked(sessionUtils.readSessionMessagesAsync).mockResolvedValue([ { role: "user", content: "Update the config" }, { role: "assistant", content: "I've modified openclaw.json to add the new setting." }, ]); diff --git a/src/agents/subagent-orphan-recovery.ts b/src/agents/subagent-orphan-recovery.ts index 0757cf5fae4..0143175965b 100644 --- a/src/agents/subagent-orphan-recovery.ts +++ b/src/agents/subagent-orphan-recovery.ts @@ -19,7 +19,7 @@ import { type SessionEntry, } from "../config/sessions.js"; import { callGateway } from "../gateway/call.js"; -import { readSessionMessages } from "../gateway/session-utils.fs.js"; +import { readSessionMessagesAsync } from "../gateway/session-utils.fs.js"; import { formatErrorMessage } from "../infra/errors.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { buildAnnounceIdempotencyKey } from "./announce-idempotency.js"; @@ -350,7 +350,11 @@ export async function recoverOrphanedSubagentSessions(params: { log.info(`found orphaned subagent session: ${childSessionKey} (run=${runId})`); - const messages = readSessionMessages(entry.sessionId, storePath, entry.sessionFile); + const messages = await readSessionMessagesAsync( + entry.sessionId, + storePath, + entry.sessionFile, + ); const lastHumanMessage = [...messages] .toReversed() .find((msg) => (msg as { role?: unknown } | null)?.role === "user"); diff --git a/src/agents/tools/embedded-gateway-stub.runtime.ts b/src/agents/tools/embedded-gateway-stub.runtime.ts index 1878816f496..72299107c11 100644 --- a/src/agents/tools/embedded-gateway-stub.runtime.ts +++ b/src/agents/tools/embedded-gateway-stub.runtime.ts @@ -14,10 +14,10 @@ export { } from "../../gateway/server-methods/chat.js"; export { capArrayByJsonBytes } from "../../gateway/session-utils.fs.js"; export { - listSessionsFromStore, + listSessionsFromStoreAsync, loadCombinedSessionStoreForGateway, loadSessionEntry, - readSessionMessages, + readSessionMessagesAsync, resolveSessionModelRef, } from "../../gateway/session-utils.js"; export { resolveSessionKeyFromResolveParams } from "../../gateway/sessions-resolve.js"; diff --git a/src/agents/tools/embedded-gateway-stub.test.ts b/src/agents/tools/embedded-gateway-stub.test.ts index 7b350f165a5..d6c8451ef61 100644 --- a/src/agents/tools/embedded-gateway-stub.test.ts +++ b/src/agents/tools/embedded-gateway-stub.test.ts @@ -11,7 +11,7 @@ const runtime = vi.hoisted(() => ({ entry: { sessionId: "sess-main" }, })), resolveSessionModelRef: vi.fn(() => ({ provider: "openai" })), - readSessionMessages: vi.fn((): unknown[] => []), + readSessionMessagesAsync: vi.fn(async (): Promise => []), augmentChatHistoryWithCliSessionImports: vi.fn( ({ localMessages }: { localMessages?: unknown[] }) => localMessages ?? [], ), @@ -34,7 +34,7 @@ describe("embedded gateway stub", () => { runtime.getRuntimeConfig.mockClear(); runtime.resolveSessionKeyFromResolveParams.mockReset(); runtime.projectRecentChatDisplayMessages.mockClear(); - runtime.readSessionMessages.mockClear(); + runtime.readSessionMessagesAsync.mockClear(); }); it("resolves sessions through the gateway session resolver", async () => { @@ -78,7 +78,7 @@ describe("embedded gateway stub", () => { { role: "assistant", content: "hi" }, ]; const projectedMessages = [{ role: "assistant", content: "hi" }]; - runtime.readSessionMessages.mockReturnValueOnce(rawMessages); + runtime.readSessionMessagesAsync.mockResolvedValueOnce(rawMessages); runtime.projectRecentChatDisplayMessages.mockReturnValueOnce(projectedMessages); const callGateway = createEmbeddedCallGateway(); @@ -99,7 +99,7 @@ describe("embedded gateway stub", () => { { role: "user", content: "visible older" }, { role: "assistant", content: "hidden newer" }, ]; - runtime.readSessionMessages.mockReturnValueOnce(rawMessages); + runtime.readSessionMessagesAsync.mockResolvedValueOnce(rawMessages); const callGateway = createEmbeddedCallGateway(); await callGateway<{ messages: unknown[] }>({ diff --git a/src/agents/tools/embedded-gateway-stub.ts b/src/agents/tools/embedded-gateway-stub.ts index 060575c29a5..fff0974fdcd 100644 --- a/src/agents/tools/embedded-gateway-stub.ts +++ b/src/agents/tools/embedded-gateway-stub.ts @@ -30,12 +30,12 @@ interface EmbeddedGatewayRuntime { opts?: { maxChars?: number; maxMessages?: number }, ) => unknown[]; capArrayByJsonBytes: (items: unknown[], maxBytes: number) => { items: unknown[] }; - listSessionsFromStore: (opts: { + listSessionsFromStoreAsync: (opts: { cfg: OpenClawConfig; storePath: string; store: unknown; opts: SessionsListParams; - }) => SessionsListResult; + }) => Promise; loadCombinedSessionStoreForGateway: (cfg: OpenClawConfig) => { storePath: string; store: unknown; @@ -49,7 +49,11 @@ interface EmbeddedGatewayRuntime { storePath: string | undefined; entry: Record | undefined; }; - readSessionMessages: (sessionId: string, storePath: string, sessionFile?: string) => unknown[]; + readSessionMessagesAsync: ( + sessionId: string, + storePath: string, + sessionFile?: string, + ) => Promise; resolveSessionModelRef: ( cfg: OpenClawConfig, entry: unknown, @@ -70,7 +74,7 @@ async function handleSessionsList(params: Record) { const rt = await getRuntime(); const cfg = rt.getRuntimeConfig(); const { storePath, store } = rt.loadCombinedSessionStoreForGateway(cfg); - return rt.listSessionsFromStore({ + return rt.listSessionsFromStoreAsync({ cfg, storePath, store, @@ -111,7 +115,11 @@ async function handleChatHistory(params: Record): Promise<{ const localMessages = sessionId && storePath - ? rt.readSessionMessages(sessionId, storePath, entry?.sessionFile as string | undefined) + ? await rt.readSessionMessagesAsync( + sessionId, + storePath, + entry?.sessionFile as string | undefined, + ) : []; const rawMessages = rt.augmentChatHistoryWithCliSessionImports({ diff --git a/src/agents/tools/sessions-list-tool.ts b/src/agents/tools/sessions-list-tool.ts index d491cef9ecf..8bf8b089c2f 100644 --- a/src/agents/tools/sessions-list-tool.ts +++ b/src/agents/tools/sessions-list-tool.ts @@ -8,7 +8,6 @@ import { } from "../../config/sessions.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { callGateway } from "../../gateway/call.js"; -import { readSessionTitleFieldsFromTranscript } from "../../gateway/session-utils.fs.js"; import { deriveSessionTitle } from "../../gateway/session-utils.js"; import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js"; import { normalizeOptionalLowercaseString, readStringValue } from "../../shared/string-coerce.js"; @@ -119,6 +118,8 @@ export function createSessionsListTool(opts?: { label, agentId, search, + includeDerivedTitles, + includeLastMessage, includeGlobal: !restrictToSpawned, includeUnknown: !restrictToSpawned, spawnedBy: restrictToSpawned ? effectiveRequesterKey : undefined, @@ -309,31 +310,17 @@ export function createSessionsListTool(opts?: { lastAccountId, transcriptPath, }; - if (sessionId && (includeDerivedTitles || includeLastMessage)) { - const fields = readSessionTitleFieldsFromTranscript( - sessionId, - storePath, - sessionFile, - resolvedAgentId, + if (sessionId && includeDerivedTitles && !row.derivedTitle) { + row.derivedTitle = deriveSessionTitle( + { + sessionId, + displayName: row.displayName, + label: row.label, + subject: readStringValue((entry as { subject?: unknown }).subject), + updatedAt: typeof row.updatedAt === "number" ? row.updatedAt : 0, + }, + undefined, ); - if (includeDerivedTitles && !row.derivedTitle) { - const derivedTitle = deriveSessionTitle( - { - sessionId, - displayName: row.displayName, - label: row.label, - subject: readStringValue((entry as { subject?: unknown }).subject), - updatedAt: typeof row.updatedAt === "number" ? row.updatedAt : 0, - }, - fields.firstUserMessage, - ); - if (derivedTitle) { - row.derivedTitle = derivedTitle; - } - } - if (includeLastMessage && !row.lastMessagePreview && fields.lastMessagePreview) { - row.lastMessagePreview = fields.lastMessagePreview; - } } if (messageLimit > 0) { const resolvedKey = resolveInternalSessionKey({ diff --git a/src/auto-reply/reply/agent-runner-memory.ts b/src/auto-reply/reply/agent-runner-memory.ts index a5a97505f9a..4c94c9c3c08 100644 --- a/src/auto-reply/reply/agent-runner-memory.ts +++ b/src/auto-reply/reply/agent-runner-memory.ts @@ -21,7 +21,7 @@ import { updateSessionStoreEntry, } from "../../config/sessions.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; -import { readSessionMessages } from "../../gateway/session-utils.fs.js"; +import { readSessionMessagesAsync } from "../../gateway/session-utils.fs.js"; import { logVerbose } from "../../globals.js"; import { registerAgentRunContext } from "../../infra/agent-events.js"; import { resolveMemoryFlushPlan } from "../../plugins/memory-state.js"; @@ -341,21 +341,21 @@ async function readLastNonzeroUsageFromSessionLog(logPath: string) { } } -function estimatePromptTokensFromSessionTranscript(params: { +async function estimatePromptTokensFromSessionTranscript(params: { sessionId?: string; storePath?: string; sessionFile?: string; -}): number | undefined { +}): Promise { const sessionId = normalizeOptionalString(params.sessionId); if (!sessionId) { return undefined; } try { - const messages = readSessionMessages( + const messages = (await readSessionMessagesAsync( sessionId, params.storePath, params.sessionFile, - ) as AgentMessage[]; + )) as AgentMessage[]; if (messages.length === 0) { return undefined; } @@ -444,7 +444,7 @@ export async function runPreflightCompactionIfNeeded(params: { const transcriptPromptTokens = typeof freshPersistedTokens === "number" ? undefined - : estimatePromptTokensFromSessionTranscript({ + : await estimatePromptTokensFromSessionTranscript({ sessionId: entry.sessionId, storePath: params.storePath, sessionFile: entry.sessionFile ?? params.followupRun.run.sessionFile, diff --git a/src/auto-reply/reply/session-fork.runtime.test.ts b/src/auto-reply/reply/session-fork.runtime.test.ts index 3f4e4432cc3..6bf4e75cd44 100644 --- a/src/auto-reply/reply/session-fork.runtime.test.ts +++ b/src/auto-reply/reply/session-fork.runtime.test.ts @@ -63,7 +63,7 @@ describe("resolveParentForkTokenCountRuntime", () => { totalTokensFresh: false, }; - const tokens = resolveParentForkTokenCountRuntime({ + const tokens = await resolveParentForkTokenCountRuntime({ parentEntry: entry, storePath: path.join(root, "sessions.json"), }); diff --git a/src/auto-reply/reply/session-fork.runtime.ts b/src/auto-reply/reply/session-fork.runtime.ts index 96ceafe71a1..732c4fede9c 100644 --- a/src/auto-reply/reply/session-fork.runtime.ts +++ b/src/auto-reply/reply/session-fork.runtime.ts @@ -6,7 +6,7 @@ import { CURRENT_SESSION_VERSION, SessionManager } from "@mariozechner/pi-coding import { estimateMessagesTokens } from "../../agents/compaction.js"; import { resolveSessionFilePath } from "../../config/sessions/paths.js"; import { resolveFreshSessionTotalTokens, type SessionEntry } from "../../config/sessions/types.js"; -import { readSessionMessages } from "../../gateway/session-utils.fs.js"; +import { readSessionMessagesAsync } from "../../gateway/session-utils.fs.js"; function resolvePositiveTokenCount(value: number | undefined): number | undefined { return typeof value === "number" && Number.isFinite(value) && value > 0 @@ -14,21 +14,21 @@ function resolvePositiveTokenCount(value: number | undefined): number | undefine : undefined; } -export function resolveParentForkTokenCountRuntime(params: { +export async function resolveParentForkTokenCountRuntime(params: { parentEntry: SessionEntry; storePath: string; -}): number | undefined { +}): Promise { const freshPersistedTokens = resolveFreshSessionTotalTokens(params.parentEntry); if (typeof freshPersistedTokens === "number") { return freshPersistedTokens; } try { - const transcriptMessages = readSessionMessages( + const transcriptMessages = (await readSessionMessagesAsync( params.parentEntry.sessionId, params.storePath, params.parentEntry.sessionFile, - ) as AgentMessage[]; + )) as AgentMessage[]; if (transcriptMessages.length > 0) { const estimatedTokens = estimateMessagesTokens(transcriptMessages); const transcriptTokens = resolvePositiveTokenCount( diff --git a/src/config/sessions/transcript-append.ts b/src/config/sessions/transcript-append.ts new file mode 100644 index 00000000000..f26c221f2bc --- /dev/null +++ b/src/config/sessions/transcript-append.ts @@ -0,0 +1,229 @@ +import { randomUUID } from "node:crypto"; +import fs from "node:fs/promises"; +import path from "node:path"; +import { StringDecoder } from "node:string_decoder"; +import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent"; +import { acquireSessionWriteLock } from "../../agents/session-write-lock.js"; + +const TRANSCRIPT_APPEND_SCAN_CHUNK_BYTES = 64 * 1024; +const SESSION_MANAGER_APPEND_MAX_BYTES = 8 * 1024 * 1024; + +type TranscriptLeafInfo = { + leafId?: string; + hasParentLinkedEntries: boolean; + nonSessionEntryCount: number; +}; + +async function yieldTranscriptAppendScan(): Promise { + await new Promise((resolve) => setImmediate(resolve)); +} + +function lineParentLinkedEntryId(line: string): string | undefined { + if (!line.trim()) { + return undefined; + } + try { + const parsed = JSON.parse(line) as { type?: unknown; id?: unknown; parentId?: unknown }; + return parsed.type !== "session" && typeof parsed.id === "string" && "parentId" in parsed + ? parsed.id + : undefined; + } catch { + return undefined; + } +} + +function normalizeEntryId(value: unknown): string | undefined { + return typeof value === "string" && value.trim().length > 0 ? value : undefined; +} + +function generateEntryId(existingIds: Set): string { + for (let attempt = 0; attempt < 100; attempt += 1) { + const id = randomUUID().slice(0, 8); + if (!existingIds.has(id)) { + existingIds.add(id); + return id; + } + } + const id = randomUUID(); + existingIds.add(id); + return id; +} + +async function readTranscriptLeafInfo(transcriptPath: string): Promise { + const handle = await fs.open(transcriptPath, "r"); + try { + const decoder = new StringDecoder("utf8"); + const buffer = Buffer.allocUnsafe(TRANSCRIPT_APPEND_SCAN_CHUNK_BYTES); + let carry = ""; + let leafId: string | undefined; + let hasParentLinkedEntries = false; + let nonSessionEntryCount = 0; + while (true) { + const { bytesRead } = await handle.read(buffer, 0, buffer.length, null); + if (bytesRead <= 0) { + break; + } + const text = carry + decoder.write(buffer.subarray(0, bytesRead)); + const lines = text.split(/\r?\n/); + carry = lines.pop() ?? ""; + for (const line of lines) { + if (lineHasNonSessionEntry(line)) { + nonSessionEntryCount += 1; + } + const id = lineParentLinkedEntryId(line); + if (id) { + leafId = id; + hasParentLinkedEntries = true; + } + } + await yieldTranscriptAppendScan(); + } + const tail = carry + decoder.end(); + if (lineHasNonSessionEntry(tail)) { + nonSessionEntryCount += 1; + } + const id = lineParentLinkedEntryId(tail); + if (id) { + leafId = id; + hasParentLinkedEntries = true; + } + return { + ...(leafId ? { leafId } : {}), + hasParentLinkedEntries, + nonSessionEntryCount, + }; + } finally { + await handle.close(); + } +} + +function lineHasNonSessionEntry(line: string): boolean { + if (!line.trim()) { + return false; + } + try { + const parsed = JSON.parse(line) as { type?: unknown }; + return parsed.type !== "session"; + } catch { + return false; + } +} + +async function migrateLinearTranscriptToParentLinked(transcriptPath: string): Promise<{ + leafId?: string; +}> { + const raw = await fs.readFile(transcriptPath, "utf-8"); + const existingIds = new Set(); + const output: string[] = []; + let previousId: string | null = null; + let leafId: string | undefined; + for (const line of raw.split(/\r?\n/)) { + if (!line.trim()) { + continue; + } + let parsed: unknown; + try { + parsed = JSON.parse(line); + } catch { + output.push(line); + continue; + } + if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) { + output.push(line); + continue; + } + const record = parsed as Record; + if (record.type === "session") { + output.push(JSON.stringify({ ...record, version: CURRENT_SESSION_VERSION })); + continue; + } + const id = normalizeEntryId(record.id) ?? generateEntryId(existingIds); + existingIds.add(id); + record.id = id; + if (!Object.hasOwn(record, "parentId")) { + record.parentId = previousId; + } + previousId = id; + leafId = id; + output.push(JSON.stringify(record)); + } + await fs.writeFile(transcriptPath, `${output.join("\n")}\n`, { + encoding: "utf-8", + mode: 0o600, + }); + const result: { leafId?: string } = {}; + if (leafId) { + result.leafId = leafId; + } + return result; +} + +async function ensureTranscriptHeader(transcriptPath: string): Promise { + const stat = await fs.stat(transcriptPath).catch(() => null); + if (stat?.isFile() && stat.size > 0) { + return; + } + await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); + const header = { + type: "session", + version: CURRENT_SESSION_VERSION, + id: randomUUID(), + timestamp: new Date().toISOString(), + cwd: process.cwd(), + }; + await fs.writeFile(transcriptPath, `${JSON.stringify(header)}\n`, { + encoding: "utf-8", + mode: 0o600, + flag: stat?.isFile() ? "w" : "wx", + }); +} + +export async function appendSessionTranscriptMessage(params: { + transcriptPath: string; + message: unknown; + now?: number; + useRawWhenLinear?: boolean; +}): Promise<{ messageId: string }> { + const lock = await acquireSessionWriteLock({ + sessionFile: params.transcriptPath, + timeoutMs: 10_000, + allowReentrant: true, + }); + try { + const now = params.now ?? Date.now(); + const messageId = randomUUID(); + await ensureTranscriptHeader(params.transcriptPath); + const stat = await fs.stat(params.transcriptPath).catch(() => null); + let leafInfo: TranscriptLeafInfo = await readTranscriptLeafInfo(params.transcriptPath).catch( + () => ({ + hasParentLinkedEntries: false, + nonSessionEntryCount: 0, + }), + ); + const hasLinearEntries = !leafInfo.hasParentLinkedEntries && leafInfo.nonSessionEntryCount > 0; + const allowRawWhenLinear = params.useRawWhenLinear !== false; + const shouldRawAppend = + allowRawWhenLinear && + hasLinearEntries && + (stat?.size ?? 0) > SESSION_MANAGER_APPEND_MAX_BYTES; + if (hasLinearEntries && !shouldRawAppend) { + const migrated = await migrateLinearTranscriptToParentLinked(params.transcriptPath); + leafInfo = { + ...(migrated.leafId ? { leafId: migrated.leafId } : {}), + hasParentLinkedEntries: Boolean(migrated.leafId), + nonSessionEntryCount: leafInfo.nonSessionEntryCount, + }; + } + const entry = { + type: "message", + id: messageId, + ...(shouldRawAppend ? {} : { parentId: leafInfo.leafId ?? null }), + timestamp: new Date(now).toISOString(), + message: params.message, + }; + await fs.appendFile(params.transcriptPath, `${JSON.stringify(entry)}\n`, "utf-8"); + return { messageId }; + } finally { + await lock.release(); + } +} diff --git a/src/config/sessions/transcript.test.ts b/src/config/sessions/transcript.test.ts index 5ff0bf6e04c..2dbf8dffc9e 100644 --- a/src/config/sessions/transcript.test.ts +++ b/src/config/sessions/transcript.test.ts @@ -3,6 +3,7 @@ import { describe, expect, it, vi } from "vitest"; import * as transcriptEvents from "../../sessions/transcript-events.js"; import { resolveSessionTranscriptPathInDir } from "./paths.js"; import { useTempSessionsFixture } from "./test-helpers.js"; +import { appendSessionTranscriptMessage } from "./transcript-append.js"; import { appendAssistantMessageToSessionTranscript, appendExactAssistantMessageToSessionTranscript, @@ -357,4 +358,124 @@ describe("appendAssistantMessageToSessionTranscript", () => { } emitSpy.mockRestore(); }); + + it("serializes concurrent parent-linked transcript appends", async () => { + const sessionFile = resolveSessionTranscriptPathInDir( + "concurrent-tree-session", + fixture.sessionsDir(), + ); + fs.writeFileSync( + sessionFile, + [ + JSON.stringify({ + type: "session", + version: 1, + id: "concurrent-tree-session", + timestamp: new Date().toISOString(), + cwd: process.cwd(), + }), + JSON.stringify({ + type: "message", + id: "root-message", + parentId: null, + timestamp: new Date().toISOString(), + message: { role: "user", content: "root" }, + }), + ].join("\n") + "\n", + "utf-8", + ); + + await Promise.all( + Array.from({ length: 8 }, (_, index) => + appendSessionTranscriptMessage({ + transcriptPath: sessionFile, + message: { role: "assistant", content: `reply ${index}` }, + }), + ), + ); + + const records = fs + .readFileSync(sessionFile, "utf-8") + .trim() + .split("\n") + .map( + (line) => + JSON.parse(line) as { + type?: string; + id?: string; + parentId?: string | null; + message?: { content?: string }; + }, + ) + .filter((record) => record.type === "message"); + + expect(records).toHaveLength(9); + for (let index = 1; index < records.length; index += 1) { + expect(records[index]?.parentId).toBe(records[index - 1]?.id); + } + }); + + it("migrates small linear transcripts before appending", async () => { + const sessionFile = resolveSessionTranscriptPathInDir( + "small-linear-session", + fixture.sessionsDir(), + ); + fs.writeFileSync( + sessionFile, + [ + JSON.stringify({ + type: "session", + version: 3, + id: "small-linear-session", + timestamp: new Date().toISOString(), + cwd: process.cwd(), + }), + JSON.stringify({ + type: "message", + id: "legacy-first", + timestamp: new Date().toISOString(), + message: { role: "user", content: "legacy first" }, + }), + JSON.stringify({ + type: "message", + id: "legacy-second", + timestamp: new Date().toISOString(), + message: { role: "assistant", content: "legacy second" }, + }), + ].join("\n") + "\n", + "utf-8", + ); + + const appended = await appendSessionTranscriptMessage({ + transcriptPath: sessionFile, + message: { role: "assistant", content: "new reply" }, + }); + + const records = fs + .readFileSync(sessionFile, "utf-8") + .trim() + .split("\n") + .map( + (line) => + JSON.parse(line) as { + type?: string; + id?: string; + parentId?: string | null; + message?: { content?: string }; + }, + ); + const messages = records.filter((record) => record.type === "message"); + + expect(messages.map((record) => record.message?.content)).toEqual([ + "legacy first", + "legacy second", + "new reply", + ]); + expect(messages[0]).toMatchObject({ id: "legacy-first", parentId: null }); + expect(messages[1]).toMatchObject({ id: "legacy-second", parentId: "legacy-first" }); + expect(messages[2]).toMatchObject({ + id: appended.messageId, + parentId: "legacy-second", + }); + }); }); diff --git a/src/config/sessions/transcript.ts b/src/config/sessions/transcript.ts index f5d4913cd43..b0d65b89a75 100644 --- a/src/config/sessions/transcript.ts +++ b/src/config/sessions/transcript.ts @@ -13,6 +13,7 @@ import { import { resolveAndPersistSessionFile } from "./session-file.js"; import { loadSessionStore, normalizeStoreSessionKey } from "./store.js"; import { parseSessionThreadInfo } from "./thread-info.js"; +import { appendSessionTranscriptMessage } from "./transcript-append.js"; import { resolveMirroredTranscriptText } from "./transcript-mirror.js"; import type { SessionEntry } from "./types.js"; @@ -261,7 +262,11 @@ export async function appendExactAssistantMessageToSessionTranscript(params: { ? await transcriptHasIdempotencyKey(sessionFile, explicitIdempotencyKey) : undefined; if (existingMessageId) { - return { ok: true, sessionFile, messageId: existingMessageId }; + return { + ok: true, + sessionFile, + messageId: existingMessageId === true ? (explicitIdempotencyKey ?? "") : existingMessageId, + }; } const latestEquivalentAssistantId = isRedundantDeliveryMirror(params.message) @@ -275,9 +280,10 @@ export async function appendExactAssistantMessageToSessionTranscript(params: { ...params.message, ...(explicitIdempotencyKey ? { idempotencyKey: explicitIdempotencyKey } : {}), } as Parameters[0]; - const { SessionManager } = await loadPiCodingAgentModule(); - const sessionManager = SessionManager.open(sessionFile); - const messageId = sessionManager.appendMessage(message); + const { messageId } = await appendSessionTranscriptMessage({ + transcriptPath: sessionFile, + message, + }); switch (params.updateMode ?? "inline") { case "inline": @@ -295,7 +301,7 @@ export async function appendExactAssistantMessageToSessionTranscript(params: { async function transcriptHasIdempotencyKey( transcriptPath: string, idempotencyKey: string, -): Promise { +): Promise { try { const raw = await fs.promises.readFile(transcriptPath, "utf-8"); for (const line of raw.split(/\r?\n/)) { @@ -314,6 +320,9 @@ async function transcriptHasIdempotencyKey( ) { return parsed.id; } + if (parsed.message?.idempotencyKey === idempotencyKey) { + return true; + } } catch { continue; } diff --git a/src/gateway/managed-image-attachments.test.ts b/src/gateway/managed-image-attachments.test.ts index 32892896588..7c3a58a961b 100644 --- a/src/gateway/managed-image-attachments.test.ts +++ b/src/gateway/managed-image-attachments.test.ts @@ -20,7 +20,7 @@ vi.mock("./http-utils.js", () => ({ vi.mock("./session-utils.js", () => ({ loadSessionEntry: loadSessionEntryMock, - readSessionMessages: readSessionMessagesMock, + readSessionMessagesAsync: readSessionMessagesMock, })); vi.mock("../agents/subagent-registry.js", () => ({ diff --git a/src/gateway/managed-image-attachments.ts b/src/gateway/managed-image-attachments.ts index 8747aefb0ee..dd05be7a1bb 100644 --- a/src/gateway/managed-image-attachments.ts +++ b/src/gateway/managed-image-attachments.ts @@ -23,7 +23,7 @@ import { resolveOpenAiCompatibleHttpOperatorScopes, } from "./http-utils.js"; import { authorizeOperatorScopesForMethod } from "./method-scopes.js"; -import { loadSessionEntry, readSessionMessages } from "./session-utils.js"; +import { loadSessionEntry, readSessionMessagesAsync } from "./session-utils.js"; const OUTGOING_IMAGE_ROUTE_PREFIX = "/api/chat/media/outgoing"; const DEFAULT_TRANSIENT_OUTGOING_IMAGE_TTL_MS = 15 * 60 * 1000; @@ -717,7 +717,7 @@ async function getSessionManagedOutgoingAttachmentIndex( } } - const messages = readSessionMessages(sessionId, storePath, entry.sessionFile); + const messages = await readSessionMessagesAsync(sessionId, storePath, entry.sessionFile); const index: SessionManagedOutgoingAttachmentIndex = new Set(); for (const message of messages) { const meta = (message as { __openclaw?: { id?: string } } | null)?.__openclaw; diff --git a/src/gateway/server-methods/artifacts.test.ts b/src/gateway/server-methods/artifacts.test.ts index 91fe25d6a3f..7a48a4f7b92 100644 --- a/src/gateway/server-methods/artifacts.test.ts +++ b/src/gateway/server-methods/artifacts.test.ts @@ -4,7 +4,7 @@ import { artifactsHandlers, collectArtifactsFromMessages } from "./artifacts.js" const hoisted = vi.hoisted(() => ({ getTaskSessionLookupByIdForStatus: vi.fn(), loadSessionEntry: vi.fn(), - visitSessionMessages: vi.fn(), + visitSessionMessagesAsync: vi.fn(), resolveSessionKeyForRun: vi.fn(), })); @@ -17,7 +17,7 @@ vi.mock("../session-utils.js", async () => { return { ...actual, loadSessionEntry: hoisted.loadSessionEntry, - visitSessionMessages: hoisted.visitSessionMessages, + visitSessionMessagesAsync: hoisted.visitSessionMessagesAsync, }; }); @@ -67,8 +67,8 @@ describe("artifacts RPC handlers", () => { }); function mockedMessages(messages: unknown[]) { - hoisted.visitSessionMessages.mockImplementation( - (_sessionId, _storePath, _sessionFile, visit) => { + hoisted.visitSessionMessagesAsync.mockImplementation( + async (_sessionId, _storePath, _sessionFile, visit) => { messages.forEach((message, index) => visit(message, index + 1)); return messages.length; }, diff --git a/src/gateway/server-methods/artifacts.ts b/src/gateway/server-methods/artifacts.ts index 269326783f8..bdace8f9366 100644 --- a/src/gateway/server-methods/artifacts.ts +++ b/src/gateway/server-methods/artifacts.ts @@ -10,7 +10,7 @@ import { validateArtifactsListParams, } from "../protocol/index.js"; import { resolveSessionKeyForRun } from "../server-session-key.js"; -import { loadSessionEntry, visitSessionMessages } from "../session-utils.js"; +import { loadSessionEntry, visitSessionMessagesAsync } from "../session-utils.js"; import type { GatewayRequestHandlers, RespondFn } from "./types.js"; import { assertValidParams } from "./validation.js"; @@ -300,7 +300,9 @@ function resolveQuerySessionKey(query: ArtifactQuery): string | undefined { return undefined; } -function loadArtifacts(query: ArtifactQuery): { artifacts: ArtifactRecord[]; sessionKey?: string } { +async function loadArtifacts( + query: ArtifactQuery, +): Promise<{ artifacts: ArtifactRecord[]; sessionKey?: string }> { const sessionKey = resolveQuerySessionKey(query); if (!sessionKey) { return { artifacts: [] }; @@ -311,7 +313,7 @@ function loadArtifacts(query: ArtifactQuery): { artifacts: ArtifactRecord[]; ses return { sessionKey, artifacts: [] }; } const artifacts: ArtifactRecord[] = []; - visitSessionMessages(sessionId, storePath, entry?.sessionFile, (message, seq) => { + await visitSessionMessagesAsync(sessionId, storePath, entry?.sessionFile, (message, seq) => { collectArtifactsFromMessage({ message, messageFallbackSeq: seq, @@ -342,11 +344,11 @@ function requireQueryable(params: ArtifactQuery, respond: RespondFn): boolean { return false; } -function findArtifact(params: ArtifactsGetParams): { +async function findArtifact(params: ArtifactsGetParams): Promise<{ artifact?: ArtifactRecord; sessionKey?: string; -} { - const loaded = loadArtifacts(params); +}> { + const loaded = await loadArtifacts(params); return { sessionKey: loaded.sessionKey, artifact: loaded.artifacts.find((artifact) => artifact.id === params.artifactId), @@ -359,14 +361,14 @@ function toSummary(artifact: ArtifactRecord): ArtifactSummary { } export const artifactsHandlers: GatewayRequestHandlers = { - "artifacts.list": ({ params, respond }) => { + "artifacts.list": async ({ params, respond }) => { if (!assertValidParams(params, validateArtifactsListParams, "artifacts.list", respond)) { return; } if (!requireQueryable(params, respond)) { return; } - const { artifacts, sessionKey } = loadArtifacts(params); + const { artifacts, sessionKey } = await loadArtifacts(params); if (!sessionKey && (params.runId || params.taskId)) { respond( false, @@ -377,14 +379,14 @@ export const artifactsHandlers: GatewayRequestHandlers = { } respond(true, { artifacts: artifacts.map(toSummary) }); }, - "artifacts.get": ({ params, respond }) => { + "artifacts.get": async ({ params, respond }) => { if (!assertValidParams(params, validateArtifactsGetParams, "artifacts.get", respond)) { return; } if (!requireQueryable(params, respond)) { return; } - const { artifact } = findArtifact(params); + const { artifact } = await findArtifact(params); if (!artifact) { respond( false, @@ -397,7 +399,7 @@ export const artifactsHandlers: GatewayRequestHandlers = { } respond(true, { artifact: toSummary(artifact) }); }, - "artifacts.download": ({ params, respond }) => { + "artifacts.download": async ({ params, respond }) => { if ( !assertValidParams(params, validateArtifactsDownloadParams, "artifacts.download", respond) ) { @@ -406,7 +408,7 @@ export const artifactsHandlers: GatewayRequestHandlers = { if (!requireQueryable(params, respond)) { return; } - const { artifact } = findArtifact(params); + const { artifact } = await findArtifact(params); if (!artifact) { respond( false, diff --git a/src/gateway/server-methods/chat-transcript-inject.ts b/src/gateway/server-methods/chat-transcript-inject.ts index e8745d72876..94c761ee35d 100644 --- a/src/gateway/server-methods/chat-transcript-inject.ts +++ b/src/gateway/server-methods/chat-transcript-inject.ts @@ -1,14 +1,10 @@ -import { randomUUID } from "node:crypto"; -import fs from "node:fs"; -import { StringDecoder } from "node:string_decoder"; -import { SessionManager } from "@mariozechner/pi-coding-agent"; +import type { SessionManager } from "@mariozechner/pi-coding-agent"; +import { appendSessionTranscriptMessage } from "../../config/sessions/transcript-append.js"; import { formatErrorMessage } from "../../infra/errors.js"; import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js"; type AppendMessageArg = Parameters[0]; -const SESSION_MANAGER_APPEND_MAX_BYTES = 8 * 1024 * 1024; - export type GatewayInjectedAbortMeta = { aborted: true; origin: "rpc" | "stop-command"; @@ -46,78 +42,7 @@ function resolveInjectedAssistantContent(params: { return [{ type: "text", text: `${labelPrefix}${params.message}` }]; } -function transcriptHasParentLinkedEntries(transcriptPath: string): boolean { - let fd: number | null = null; - try { - fd = fs.openSync(transcriptPath, "r"); - const decoder = new StringDecoder("utf8"); - const buffer = Buffer.allocUnsafe(64 * 1024); - let carry = ""; - while (true) { - const bytesRead = fs.readSync(fd, buffer, 0, buffer.length, null); - if (bytesRead <= 0) { - break; - } - const text = carry + decoder.write(buffer.subarray(0, bytesRead)); - const lines = text.split(/\r?\n/); - carry = lines.pop() ?? ""; - for (const line of lines) { - if (lineHasParentLinkedEntry(line)) { - return true; - } - } - } - return lineHasParentLinkedEntry(carry + decoder.end()); - } catch { - return true; - } finally { - if (fd !== null) { - fs.closeSync(fd); - } - } -} - -function lineHasParentLinkedEntry(line: string): boolean { - if (!line.trim()) { - return false; - } - try { - const parsed = JSON.parse(line) as { type?: unknown; id?: unknown; parentId?: unknown }; - return parsed.type !== "session" && typeof parsed.id === "string" && "parentId" in parsed; - } catch { - return false; - } -} - -function shouldUseRawAppend(transcriptPath: string): boolean { - try { - const stat = fs.statSync(transcriptPath); - return ( - stat.size > SESSION_MANAGER_APPEND_MAX_BYTES && - !transcriptHasParentLinkedEntries(transcriptPath) - ); - } catch { - return false; - } -} - -function appendRawAssistantMessageToTranscript(params: { - transcriptPath: string; - message: AppendMessageArg & Record; - now: number; -}): { messageId: string } { - const messageId = randomUUID(); - const entry = { - type: "message", - id: messageId, - timestamp: new Date(params.now).toISOString(), - message: params.message, - }; - fs.appendFileSync(params.transcriptPath, `${JSON.stringify(entry)}\n`, "utf-8"); - return { messageId }; -} - -export function appendInjectedAssistantMessageToTranscript(params: { +export async function appendInjectedAssistantMessageToTranscript(params: { transcriptPath: string; message: string; label?: string; @@ -126,7 +51,7 @@ export function appendInjectedAssistantMessageToTranscript(params: { idempotencyKey?: string; abortMeta?: GatewayInjectedAbortMeta; now?: number; -}): GatewayInjectedTranscriptAppendResult { +}): Promise { const now = params.now ?? Date.now(); const usage = { input: 0, @@ -176,24 +101,12 @@ export function appendInjectedAssistantMessageToTranscript(params: { }; try { - if (shouldUseRawAppend(params.transcriptPath)) { - const { messageId } = appendRawAssistantMessageToTranscript({ - transcriptPath: params.transcriptPath, - message: messageBody, - now, - }); - emitSessionTranscriptUpdate({ - sessionFile: params.transcriptPath, - message: messageBody, - messageId, - }); - return { ok: true, messageId, message: messageBody }; - } - - // IMPORTANT: Use SessionManager so the entry is attached to the current leaf via parentId. - // Raw jsonl appends break the parent chain and can hide compaction summaries from context. - const sessionManager = SessionManager.open(params.transcriptPath); - const messageId = sessionManager.appendMessage(messageBody); + const { messageId } = await appendSessionTranscriptMessage({ + transcriptPath: params.transcriptPath, + message: messageBody, + now, + useRawWhenLinear: true, + }); emitSessionTranscriptUpdate({ sessionFile: params.transcriptPath, message: messageBody, diff --git a/src/gateway/server-methods/chat.inject.parentid.test.ts b/src/gateway/server-methods/chat.inject.parentid.test.ts index 0da7639878a..1e16ca977a5 100644 --- a/src/gateway/server-methods/chat.inject.parentid.test.ts +++ b/src/gateway/server-methods/chat.inject.parentid.test.ts @@ -3,8 +3,8 @@ import { describe, expect, it } from "vitest"; import { appendInjectedAssistantMessageToTranscript } from "./chat-transcript-inject.js"; import { createTranscriptFixtureSync } from "./chat.test-helpers.js"; -// Guardrail: Ensure gateway "injected" assistant transcript messages are appended via SessionManager, -// so they are attached to the current leaf with a `parentId` and do not sever compaction history. +// Guardrail: Gateway-injected assistant transcript messages must attach to the +// current leaf with a `parentId` and must not sever compaction history. describe("gateway chat.inject transcript writes", () => { it("appends a Pi session entry that includes parentId", async () => { const { dir, transcriptPath } = createTranscriptFixtureSync({ @@ -13,7 +13,7 @@ describe("gateway chat.inject transcript writes", () => { }); try { - const appended = appendInjectedAssistantMessageToTranscript({ + const appended = await appendInjectedAssistantMessageToTranscript({ transcriptPath, message: "hello", }); @@ -55,7 +55,7 @@ describe("gateway chat.inject transcript writes", () => { "utf-8", ); - const appended = appendInjectedAssistantMessageToTranscript({ + const appended = await appendInjectedAssistantMessageToTranscript({ transcriptPath, message: "hello", }); diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 0f750a2b9ce..d9eb912fc5b 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -1,6 +1,7 @@ import fs from "node:fs"; import path from "node:path"; -import { CURRENT_SESSION_VERSION, SessionManager } from "@mariozechner/pi-coding-agent"; +import type { AgentMessage } from "@mariozechner/pi-agent-core"; +import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent"; import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload"; import { resolveAgentWorkspaceDir, resolveSessionAgentId } from "../../agents/agent-scope.js"; import { rewriteTranscriptEntriesInSessionFile } from "../../agents/pi-embedded-runner/transcript-rewrite.js"; @@ -94,13 +95,14 @@ import { } from "../protocol/index.js"; import { CHAT_SEND_SESSION_KEY_MAX_LENGTH } from "../protocol/schema/primitives.js"; import { getMaxChatHistoryMessagesBytes } from "../server-constants.js"; +import { readSessionTranscriptIndex } from "../session-transcript-index.fs.js"; import { capArrayByJsonBytes, loadSessionEntry, resolveGatewayModelSupportsImages, resolveGatewaySessionThinkingDefault, resolveDeletedAgentIdFromSessionKey, - readRecentSessionMessages, + readRecentSessionMessagesAsync, resolveSessionModelRef, } from "../session-utils.js"; import { formatForLog } from "../ws-log.js"; @@ -959,31 +961,30 @@ async function rewriteChatSendUserTurnMediaPaths(params: { if (!("MediaPath" in mediaFields)) { return; } - const sessionManager = SessionManager.open(params.transcriptPath); - const branch = sessionManager.getBranch(); - const target = [...branch].toReversed().find((entry) => { - if (entry.type !== "message" || entry.message.role !== "user") { + const index = await readSessionTranscriptIndex(params.transcriptPath); + const target = index?.entries.toReversed().find((entry) => { + const message = entry.record.message as Record | undefined; + if (!message || message.role !== "user") { return false; } - const existingPaths = Array.isArray((entry.message as { MediaPaths?: unknown }).MediaPaths) - ? (entry.message as { MediaPaths?: unknown[] }).MediaPaths + const existingPaths = Array.isArray((message as { MediaPaths?: unknown }).MediaPaths) + ? (message as { MediaPaths?: unknown[] }).MediaPaths : undefined; if ( - (typeof (entry.message as { MediaPath?: unknown }).MediaPath === "string" && - (entry.message as { MediaPath?: string }).MediaPath) || + (typeof (message as { MediaPath?: unknown }).MediaPath === "string" && + (message as { MediaPath?: string }).MediaPath) || (existingPaths && existingPaths.length > 0) ) { return false; } - return ( - extractTranscriptUserText((entry.message as { content?: unknown }).content) === params.message - ); + return extractTranscriptUserText((message as { content?: unknown }).content) === params.message; }); - if (!target || target.type !== "message") { + const targetMessage = target?.record.message as Record | undefined; + if (!target || !target.id || !targetMessage) { return; } const rewrittenMessage = { - ...target.message, + ...targetMessage, ...mediaFields, }; await rewriteTranscriptEntriesInSessionFile({ @@ -993,7 +994,7 @@ async function rewriteChatSendUserTurnMediaPaths(params: { replacements: [ { entryId: target.id, - message: rewrittenMessage, + message: rewrittenMessage as AgentMessage, }, ], }, @@ -1282,9 +1283,12 @@ function ensureTranscriptFile(params: { transcriptPath: string; sessionId: strin } } -function transcriptHasIdempotencyKey(transcriptPath: string, idempotencyKey: string): boolean { +async function transcriptHasIdempotencyKey( + transcriptPath: string, + idempotencyKey: string, +): Promise { try { - const lines = fs.readFileSync(transcriptPath, "utf-8").split(/\r?\n/); + const lines = (await fs.promises.readFile(transcriptPath, "utf-8")).split(/\r?\n/); for (const line of lines) { if (!line.trim()) { continue; @@ -1300,7 +1304,7 @@ function transcriptHasIdempotencyKey(transcriptPath: string, idempotencyKey: str } } -function appendAssistantTranscriptMessage(params: { +async function appendAssistantTranscriptMessage(params: { message: string; label?: string; content?: Array>; @@ -1315,7 +1319,7 @@ function appendAssistantTranscriptMessage(params: { origin: AbortOrigin; runId: string; }; -}): TranscriptAppendResult { +}): Promise { const transcriptPath = resolveTranscriptPath({ sessionId: params.sessionId, storePath: params.storePath, @@ -1339,11 +1343,14 @@ function appendAssistantTranscriptMessage(params: { } } - if (params.idempotencyKey && transcriptHasIdempotencyKey(transcriptPath, params.idempotencyKey)) { + if ( + params.idempotencyKey && + (await transcriptHasIdempotencyKey(transcriptPath, params.idempotencyKey)) + ) { return { ok: true }; } - return appendInjectedAssistantMessageToTranscript({ + return await appendInjectedAssistantMessageToTranscript({ transcriptPath, message: params.message, label: params.label, @@ -1378,18 +1385,18 @@ function collectSessionAbortPartials(params: { return out; } -function persistAbortedPartials(params: { +async function persistAbortedPartials(params: { context: Pick; sessionKey: string; snapshots: AbortedPartialSnapshot[]; -}) { +}): Promise { if (params.snapshots.length === 0) { return; } const { storePath, entry } = loadSessionEntry(params.sessionKey); for (const snapshot of params.snapshots) { const sessionId = entry?.sessionId ?? snapshot.sessionId ?? snapshot.runId; - const appended = appendAssistantTranscriptMessage({ + const appended = await appendAssistantTranscriptMessage({ message: snapshot.text, sessionId, storePath, @@ -1520,14 +1527,14 @@ function resolveAuthorizedRunIdsForSession(params: { }; } -function abortChatRunsForSessionKeyWithPartials(params: { +async function abortChatRunsForSessionKeyWithPartials(params: { context: GatewayRequestContext; ops: ChatAbortOps; sessionKey: string; abortOrigin: AbortOrigin; stopReason?: string; requester: ChatAbortRequester; -}) { +}): Promise<{ aborted: boolean; runIds: string[]; unauthorized: boolean }> { const { matchedSessionRuns, authorizedRunIds } = resolveAuthorizedRunIdsForSession({ chatAbortControllers: params.context.chatAbortControllers, sessionKey: params.sessionKey, @@ -1560,7 +1567,7 @@ function abortChatRunsForSessionKeyWithPartials(params: { } const res = { aborted: runIds.length > 0, runIds, unauthorized: false }; if (res.aborted) { - persistAbortedPartials({ + await persistAbortedPartials({ context: params.context, sessionKey: params.sessionKey, snapshots, @@ -1669,7 +1676,7 @@ export const chatHandlers: GatewayRequestHandlers = { const maxHistoryBytes = getMaxChatHistoryMessagesBytes(); const localMessages = sessionId && storePath - ? readRecentSessionMessages(sessionId, storePath, entry?.sessionFile, { + ? await readRecentSessionMessagesAsync(sessionId, storePath, entry?.sessionFile, { maxMessages: max, maxBytes: Math.max(maxHistoryBytes * 2, 1024 * 1024), }) @@ -1728,7 +1735,7 @@ export const chatHandlers: GatewayRequestHandlers = { verboseLevel, }); }, - "chat.abort": ({ params, respond, context, client }) => { + "chat.abort": async ({ params, respond, context, client }) => { if (!validateChatAbortParams(params)) { respond( false, @@ -1749,7 +1756,7 @@ export const chatHandlers: GatewayRequestHandlers = { const requester = resolveChatAbortRequester(client); if (!runId) { - const res = abortChatRunsForSessionKeyWithPartials({ + const res = await abortChatRunsForSessionKeyWithPartials({ context, ops, sessionKey: rawSessionKey, @@ -1790,7 +1797,7 @@ export const chatHandlers: GatewayRequestHandlers = { stopReason: "rpc", }); if (res.aborted && partialText && partialText.trim()) { - persistAbortedPartials({ + await persistAbortedPartials({ context, sessionKey: rawSessionKey, snapshots: [ @@ -1944,7 +1951,7 @@ export const chatHandlers: GatewayRequestHandlers = { } if (stopCommand) { - const res = abortChatRunsForSessionKeyWithPartials({ + const res = await abortChatRunsForSessionKeyWithPartials({ context, ops: createChatAbortOps(context), sessionKey: rawSessionKey, @@ -2262,7 +2269,7 @@ export const chatHandlers: GatewayRequestHandlers = { if (!transcriptReply && !persistedAssistantContent?.length && !assistantContent?.length) { return; } - const appended = appendAssistantTranscriptMessage({ + const appended = await appendAssistantTranscriptMessage({ message: transcriptReply, ...(persistedContentForAppend?.length ? { content: persistedContentForAppend } : {}), sessionId, @@ -2467,7 +2474,7 @@ export const chatHandlers: GatewayRequestHandlers = { persistedContentForAppend?.length || assistantContent?.length ) { - const appended = appendAssistantTranscriptMessage({ + const appended = await appendAssistantTranscriptMessage({ message: transcriptReply, ...(persistedContentForAppend?.length ? { content: persistedContentForAppend } @@ -2625,7 +2632,7 @@ export const chatHandlers: GatewayRequestHandlers = { return; } - const appended = appendAssistantTranscriptMessage({ + const appended = await appendAssistantTranscriptMessage({ message: p.message, label: p.label, sessionId, diff --git a/src/gateway/server-methods/server-methods.test.ts b/src/gateway/server-methods/server-methods.test.ts index e66b4c1ff00..2a365fe0242 100644 --- a/src/gateway/server-methods/server-methods.test.ts +++ b/src/gateway/server-methods/server-methods.test.ts @@ -587,7 +587,7 @@ describe("sanitizeChatSendMessageInput", () => { }); describe("gateway chat transcript writes (guardrail)", () => { - it("routes transcript writes through helper and SessionManager parentId append", () => { + it("routes transcript writes through helper and async parentId append", () => { const chatTs = fileURLToPath(new URL("./chat.ts", import.meta.url)); const chatSrc = fs.readFileSync(chatTs, "utf-8"); const helperTs = fileURLToPath(new URL("./chat-transcript-inject.ts", import.meta.url)); @@ -596,10 +596,9 @@ describe("gateway chat transcript writes (guardrail)", () => { expect(chatSrc.includes("fs.appendFileSync(transcriptPath")).toBe(false); expect(chatSrc).toContain("appendInjectedAssistantMessageToTranscript("); - expect(helperSrc).toContain("function shouldUseRawAppend("); - expect(helperSrc).toContain("function appendRawAssistantMessageToTranscript("); - expect(helperSrc).toContain("SessionManager.open(params.transcriptPath)"); - expect(helperSrc).toContain("appendMessage(messageBody)"); + expect(helperSrc).toContain("appendSessionTranscriptMessage({"); + expect(helperSrc).toContain("useRawWhenLinear: true"); + expect(helperSrc).not.toContain("SessionManager.open(params.transcriptPath)"); }); }); diff --git a/src/gateway/server-methods/sessions.ts b/src/gateway/server-methods/sessions.ts index 97491506baf..5505296a292 100644 --- a/src/gateway/server-methods/sessions.ts +++ b/src/gateway/server-methods/sessions.ts @@ -77,9 +77,9 @@ import { loadGatewaySessionRow, loadSessionEntry, migrateAndPruneGatewaySessionStoreKey, - readRecentSessionMessagesWithStats, + readRecentSessionMessagesWithStatsAsync, readRecentSessionTranscriptLines, - readSessionMessageCount, + readSessionMessageCountAsync, readSessionPreviewItemsFromTranscript, resolveDeletedAgentIdFromSessionKey, resolveFreshestSessionEntryFromStoreKeys, @@ -571,7 +571,8 @@ async function handleSessionSend(params: { interruptedActiveRun = interruptResult.interrupted; } - const messageSeq = readSessionMessageCount(entry.sessionId, storePath, entry.sessionFile) + 1; + const messageSeq = + (await readSessionMessageCountAsync(entry.sessionId, storePath, entry.sessionFile)) + 1; let sendAcked = false; let sendPayload: unknown; let sendCached = false; @@ -985,11 +986,11 @@ export const sessionsHandlers: GatewayRequestHandlers = { let runError: unknown; let runMeta: Record | undefined; const messageSeq = initialMessage - ? readSessionMessageCount( + ? (await readSessionMessageCountAsync( createdEntry.sessionId, target.storePath, createdEntry.sessionFile, - ) + 1 + )) + 1 : undefined; if (initialMessage) { @@ -1674,7 +1675,7 @@ export const sessionsHandlers: GatewayRequestHandlers = { }); } }, - "sessions.get": ({ params, respond, context }) => { + "sessions.get": async ({ params, respond, context }) => { const p = params; const key = requireSessionKey(p.key ?? p.sessionKey, respond); if (!key) { @@ -1695,7 +1696,7 @@ export const sessionsHandlers: GatewayRequestHandlers = { respond(true, { messages: [] }, undefined); return; } - const { messages } = readRecentSessionMessagesWithStats( + const { messages } = await readRecentSessionMessagesWithStatsAsync( entry.sessionId, storePath, entry.sessionFile, diff --git a/src/gateway/server-session-events.ts b/src/gateway/server-session-events.ts index 7cfe817d176..8baccf50699 100644 --- a/src/gateway/server-session-events.ts +++ b/src/gateway/server-session-events.ts @@ -11,7 +11,7 @@ import { attachOpenClawTranscriptMeta, loadGatewaySessionRow, loadSessionEntry, - readSessionMessageCount, + readSessionMessageCountAsync, type GatewaySessionRow, } from "./session-utils.js"; @@ -88,67 +88,81 @@ export function createTranscriptUpdateBroadcastHandler(params: { sessionEventSubscribers: SessionEventSubscribers; sessionMessageSubscribers: SessionMessageSubscribers; }) { + let broadcastQueue = Promise.resolve(); return (update: SessionTranscriptUpdate): void => { - const sessionKey = update.sessionKey ?? resolveSessionKeyForTranscriptFile(update.sessionFile); - if (!sessionKey || update.message === undefined) { - return; - } - const connIds = new Set(); - for (const connId of params.sessionEventSubscribers.getAll()) { - connIds.add(connId); - } - for (const connId of params.sessionMessageSubscribers.get(sessionKey)) { - connIds.add(connId); - } - if (connIds.size === 0) { - return; - } - const { entry, storePath } = loadSessionEntry(sessionKey); - const messageSeq = entry?.sessionId - ? readSessionMessageCount(entry.sessionId, storePath, entry.sessionFile) - : undefined; - const sessionSnapshot = buildGatewaySessionSnapshot({ - sessionRow: loadGatewaySessionRow(sessionKey), - includeSession: true, - }); - const rawMessage = attachOpenClawTranscriptMeta(update.message, { - ...(typeof update.messageId === "string" ? { id: update.messageId } : {}), - ...(typeof messageSeq === "number" ? { seq: messageSeq } : {}), - }); - const message = projectChatDisplayMessage(rawMessage); - if (message) { - params.broadcastToConnIds( - "session.message", - { - sessionKey, - message, - ...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}), - ...(typeof messageSeq === "number" ? { messageSeq } : {}), - ...sessionSnapshot, - }, - connIds, - { dropIfSlow: true }, - ); - } + broadcastQueue = broadcastQueue + .then(() => handleTranscriptUpdateBroadcast(params, update)) + .catch(() => undefined); + }; +} - const sessionEventConnIds = params.sessionEventSubscribers.getAll(); - if (sessionEventConnIds.size === 0) { - return; - } +async function handleTranscriptUpdateBroadcast( + params: { + broadcastToConnIds: GatewayBroadcastToConnIdsFn; + sessionEventSubscribers: SessionEventSubscribers; + sessionMessageSubscribers: SessionMessageSubscribers; + }, + update: SessionTranscriptUpdate, +): Promise { + const sessionKey = update.sessionKey ?? resolveSessionKeyForTranscriptFile(update.sessionFile); + if (!sessionKey || update.message === undefined) { + return; + } + const connIds = new Set(); + for (const connId of params.sessionEventSubscribers.getAll()) { + connIds.add(connId); + } + for (const connId of params.sessionMessageSubscribers.get(sessionKey)) { + connIds.add(connId); + } + if (connIds.size === 0) { + return; + } + const { entry, storePath } = loadSessionEntry(sessionKey); + const messageSeq = entry?.sessionId + ? await readSessionMessageCountAsync(entry.sessionId, storePath, entry.sessionFile) + : undefined; + const sessionSnapshot = buildGatewaySessionSnapshot({ + sessionRow: loadGatewaySessionRow(sessionKey, { transcriptUsageMaxBytes: 64 * 1024 }), + includeSession: true, + }); + const rawMessage = attachOpenClawTranscriptMeta(update.message, { + ...(typeof update.messageId === "string" ? { id: update.messageId } : {}), + ...(typeof messageSeq === "number" ? { seq: messageSeq } : {}), + }); + const message = projectChatDisplayMessage(rawMessage); + if (message) { params.broadcastToConnIds( - "sessions.changed", + "session.message", { sessionKey, - phase: "message", - ts: Date.now(), + message, ...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}), ...(typeof messageSeq === "number" ? { messageSeq } : {}), ...sessionSnapshot, }, - sessionEventConnIds, + connIds, { dropIfSlow: true }, ); - }; + } + + const sessionEventConnIds = params.sessionEventSubscribers.getAll(); + if (sessionEventConnIds.size === 0) { + return; + } + params.broadcastToConnIds( + "sessions.changed", + { + sessionKey, + phase: "message", + ts: Date.now(), + ...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}), + ...(typeof messageSeq === "number" ? { messageSeq } : {}), + ...sessionSnapshot, + }, + sessionEventConnIds, + { dropIfSlow: true }, + ); } export function createLifecycleEventBroadcastHandler(params: { diff --git a/src/gateway/session-history-state.ts b/src/gateway/session-history-state.ts index 488895f573d..3f444cc7452 100644 --- a/src/gateway/session-history-state.ts +++ b/src/gateway/session-history-state.ts @@ -5,7 +5,9 @@ import { import { attachOpenClawTranscriptMeta, readRecentSessionMessagesWithStats, + readRecentSessionMessagesWithStatsAsync, readSessionMessages, + readSessionMessagesAsync, } from "./session-utils.js"; type SessionHistoryTranscriptMeta = { @@ -272,6 +274,25 @@ export class SessionHistorySseState { return snapshot.history; } + async refreshAsync(): Promise { + const rawSnapshot = await this.readRawSnapshotAsync(); + const snapshot = buildSessionHistorySnapshot({ + rawMessages: rawSnapshot.rawMessages, + maxChars: this.maxChars, + limit: this.limit, + cursor: this.cursor, + ...(typeof rawSnapshot.rawTranscriptSeq === "number" + ? { rawTranscriptSeq: rawSnapshot.rawTranscriptSeq } + : {}), + ...(typeof rawSnapshot.totalRawMessages === "number" + ? { totalRawMessages: rawSnapshot.totalRawMessages } + : {}), + }); + this.rawTranscriptSeq = snapshot.rawTranscriptSeq; + this.sentHistory = snapshot.history; + return snapshot.history; + } + private readRawSnapshot(): SessionHistoryRawSnapshot { if (this.cursor === undefined && typeof this.limit === "number") { const snapshot = readRecentSessionMessagesWithStats( @@ -298,4 +319,27 @@ export class SessionHistorySseState { this.target.sessionFile, ); } + + private async readRawSnapshotAsync(): Promise { + if (this.cursor === undefined && typeof this.limit === "number") { + const snapshot = await readRecentSessionMessagesWithStatsAsync( + this.target.sessionId, + this.target.storePath, + this.target.sessionFile, + resolveSessionHistoryTailReadOptions(this.limit), + ); + return { + rawMessages: snapshot.messages, + rawTranscriptSeq: snapshot.totalMessages, + totalRawMessages: snapshot.totalMessages, + }; + } + return { + rawMessages: await readSessionMessagesAsync( + this.target.sessionId, + this.target.storePath, + this.target.sessionFile, + ), + }; + } } diff --git a/src/gateway/session-reset-service.ts b/src/gateway/session-reset-service.ts index a10dbd04f4a..60153f593a6 100644 --- a/src/gateway/session-reset-service.ts +++ b/src/gateway/session-reset-service.ts @@ -46,7 +46,7 @@ import { import { loadSessionEntry, migrateAndPruneGatewaySessionStoreKey, - readSessionMessages, + readSessionMessagesAsync, resolveGatewaySessionStoreTarget, resolveSessionModelRef, } from "./session-utils.js"; @@ -433,14 +433,14 @@ export async function cleanupSessionBeforeMutation(params: { }); } -function emitGatewayBeforeResetPluginHook(params: { +async function emitGatewayBeforeResetPluginHook(params: { cfg: OpenClawConfig; key: string; target: ReturnType; storePath: string; entry?: SessionEntry; reason: "new" | "reset"; -}): void { +}): Promise { const hookRunner = getGlobalHookRunner(); if (!hookRunner?.hasHooks("before_reset")) { return; @@ -454,7 +454,7 @@ function emitGatewayBeforeResetPluginHook(params: { let messages: unknown[] = []; try { if (typeof sessionId === "string" && sessionId.trim().length > 0) { - messages = readSessionMessages(sessionId, params.storePath, sessionFile); + messages = await readSessionMessagesAsync(sessionId, params.storePath, sessionFile); } } catch (err) { logVerbose( @@ -630,7 +630,7 @@ export async function performGatewaySessionReset(params: { store[primaryKey] = nextEntry; return nextEntry; }); - emitGatewayBeforeResetPluginHook({ + await emitGatewayBeforeResetPluginHook({ cfg, key: params.key, target, diff --git a/src/gateway/session-transcript-index.fs.ts b/src/gateway/session-transcript-index.fs.ts new file mode 100644 index 00000000000..7fb7d93b7f2 --- /dev/null +++ b/src/gateway/session-transcript-index.fs.ts @@ -0,0 +1,247 @@ +import fs from "node:fs"; +import { StringDecoder } from "node:string_decoder"; + +const TRANSCRIPT_INDEX_READ_CHUNK_BYTES = 64 * 1024; +const MAX_TRANSCRIPT_INDEX_CACHE_ENTRIES = 256; + +type ParsedTranscriptRecord = Record; + +export type IndexedTranscriptEntry = { + seq: number; + id?: string; + offset: number; + byteLength: number; + record: ParsedTranscriptRecord; +}; + +export type SessionTranscriptIndex = { + filePath: string; + mtimeMs: number; + size: number; + hasTreeEntries: boolean; + leafId?: string; + entries: IndexedTranscriptEntry[]; +}; + +type IndexedRawEntry = { + id?: string; + parentId?: string | null; + offset: number; + byteLength: number; + record: ParsedTranscriptRecord; +}; + +type CacheEntry = { + mtimeMs: number; + size: number; + index: SessionTranscriptIndex; +}; + +const transcriptIndexCache = new Map(); + +function normalizeOptionalString(value: unknown): string | undefined { + return typeof value === "string" && value.trim().length > 0 ? value : undefined; +} + +async function yieldTranscriptIndexScan(): Promise { + await new Promise((resolve) => setImmediate(resolve)); +} + +function touchCachedIndex(filePath: string, entry: CacheEntry): SessionTranscriptIndex { + transcriptIndexCache.delete(filePath); + transcriptIndexCache.set(filePath, entry); + return entry.index; +} + +function setCachedIndex(filePath: string, entry: CacheEntry): void { + transcriptIndexCache.set(filePath, entry); + while (transcriptIndexCache.size > MAX_TRANSCRIPT_INDEX_CACHE_ENTRIES) { + const oldestKey = transcriptIndexCache.keys().next().value; + if (typeof oldestKey !== "string" || !oldestKey) { + break; + } + transcriptIndexCache.delete(oldestKey); + } +} + +export function invalidateSessionTranscriptIndex(filePath: string): void { + transcriptIndexCache.delete(filePath); +} + +export function clearSessionTranscriptIndexCache(): void { + transcriptIndexCache.clear(); +} + +function isIndexableTranscriptRecord(record: unknown): record is ParsedTranscriptRecord { + return Boolean(record && typeof record === "object" && !Array.isArray(record)); +} + +function isVisibleTranscriptRecord(record: ParsedTranscriptRecord): boolean { + return Boolean(record.message) || record.type === "compaction"; +} + +function isTreeTranscriptRecord(record: ParsedTranscriptRecord): boolean { + return record.type !== "session" && typeof record.id === "string" && "parentId" in record; +} + +async function visitTranscriptJsonLines( + filePath: string, + visit: (line: string, offset: number, byteLength: number) => void, +): Promise { + const handle = await fs.promises.open(filePath, "r"); + try { + const decoder = new StringDecoder("utf8"); + const buffer = Buffer.allocUnsafe(TRANSCRIPT_INDEX_READ_CHUNK_BYTES); + let carry = ""; + let carryOffset = 0; + let nextOffset = 0; + + while (true) { + const { bytesRead } = await handle.read(buffer, 0, buffer.length, null); + if (bytesRead <= 0) { + break; + } + const chunk = buffer.subarray(0, bytesRead); + const text = carry + decoder.write(chunk); + const lines = text.split("\n"); + carry = lines.pop() ?? ""; + let lineOffset = carryOffset; + for (const rawLine of lines) { + const line = rawLine.endsWith("\r") ? rawLine.slice(0, -1) : rawLine; + const byteLength = Buffer.byteLength(line, "utf8"); + visit(line, lineOffset, byteLength); + lineOffset += Buffer.byteLength(rawLine, "utf8") + 1; + } + nextOffset += bytesRead; + carryOffset = nextOffset - Buffer.byteLength(carry, "utf8"); + await yieldTranscriptIndexScan(); + } + + const tail = carry + decoder.end(); + if (tail) { + const line = tail.endsWith("\r") ? tail.slice(0, -1) : tail; + visit(line, carryOffset, Buffer.byteLength(line, "utf8")); + } + } finally { + await handle.close(); + } +} + +function buildActiveTreeEntries(params: { + byId: Map; + leafId?: string; +}): IndexedRawEntry[] { + const out: IndexedRawEntry[] = []; + const seen = new Set(); + let currentId = params.leafId; + while (currentId) { + if (seen.has(currentId)) { + return []; + } + seen.add(currentId); + const entry = params.byId.get(currentId); + if (!entry) { + return []; + } + out.push(entry); + currentId = entry.parentId ?? undefined; + } + return out.toReversed(); +} + +function toIndexedEntries(rawEntries: IndexedRawEntry[]): IndexedTranscriptEntry[] { + const entries: IndexedTranscriptEntry[] = []; + let seq = 0; + for (const entry of rawEntries) { + if (!isVisibleTranscriptRecord(entry.record)) { + continue; + } + seq += 1; + entries.push({ + seq, + ...(entry.id ? { id: entry.id } : {}), + offset: entry.offset, + byteLength: entry.byteLength, + record: entry.record, + }); + } + return entries; +} + +async function buildSessionTranscriptIndex( + filePath: string, + stat: fs.Stats, +): Promise { + const rawEntries: IndexedRawEntry[] = []; + const byId = new Map(); + let hasTreeEntries = false; + let leafId: string | undefined; + + await visitTranscriptJsonLines(filePath, (line, offset, byteLength) => { + if (!line.trim()) { + return; + } + let parsed: unknown; + try { + parsed = JSON.parse(line); + } catch { + return; + } + if (!isIndexableTranscriptRecord(parsed)) { + return; + } + const id = normalizeOptionalString(parsed.id); + const parentId = + parsed.parentId === null ? null : (normalizeOptionalString(parsed.parentId) ?? undefined); + const rawEntry: IndexedRawEntry = { + ...(id ? { id } : {}), + ...(parentId !== undefined ? { parentId } : {}), + offset, + byteLength, + record: parsed, + }; + rawEntries.push(rawEntry); + if (isTreeTranscriptRecord(parsed) && id) { + hasTreeEntries = true; + leafId = id; + byId.set(id, rawEntry); + } + }); + + const activeRawEntries = hasTreeEntries ? buildActiveTreeEntries({ byId, leafId }) : rawEntries; + return { + filePath, + mtimeMs: stat.mtimeMs, + size: stat.size, + hasTreeEntries, + ...(leafId ? { leafId } : {}), + entries: toIndexedEntries(activeRawEntries), + }; +} + +export async function readSessionTranscriptIndex( + filePath: string, +): Promise { + let stat: fs.Stats; + try { + stat = await fs.promises.stat(filePath); + } catch { + transcriptIndexCache.delete(filePath); + return null; + } + if (!stat.isFile()) { + transcriptIndexCache.delete(filePath); + return null; + } + const cached = transcriptIndexCache.get(filePath); + if (cached && cached.mtimeMs === stat.mtimeMs && cached.size === stat.size) { + return touchCachedIndex(filePath, cached); + } + const index = await buildSessionTranscriptIndex(filePath, stat); + setCachedIndex(filePath, { + mtimeMs: stat.mtimeMs, + size: stat.size, + index, + }); + return index; +} diff --git a/src/gateway/session-utils.fs.test.ts b/src/gateway/session-utils.fs.test.ts index ef983613a26..0521b0270f2 100644 --- a/src/gateway/session-utils.fs.test.ts +++ b/src/gateway/session-utils.fs.test.ts @@ -1,18 +1,25 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; +import { SessionManager } from "@mariozechner/pi-coding-agent"; import { afterAll, afterEach, beforeAll, describe, expect, test, vi } from "vitest"; import { createToolSummaryPreviewTranscriptLines } from "./session-preview.test-helpers.js"; +import { clearSessionTranscriptIndexCache } from "./session-transcript-index.fs.js"; import { archiveSessionTranscripts, readFirstUserMessageFromTranscript, readLastMessagePreviewFromTranscript, readLatestSessionUsageFromTranscript, + readLatestSessionUsageFromTranscriptAsync, readRecentSessionUsageFromTranscript, + readRecentSessionMessagesAsync, readRecentSessionMessages, + readRecentSessionMessagesWithStatsAsync, readRecentSessionMessagesWithStats, readRecentSessionTranscriptLines, + readSessionMessageCountAsync, readSessionMessageCount, + readSessionMessagesAsync, readSessionMessages, readSessionTitleFieldsFromTranscript, readSessionPreviewItemsFromTranscript, @@ -594,6 +601,72 @@ describe("readSessionMessages", () => { ]); }); + test("preserves real sequence metadata for async bounded recent-message reads", async () => { + const sessionId = "test-session-recent-seq-async"; + writeTranscript(tmpDir, sessionId, [ + { type: "session", version: 1, id: sessionId }, + { message: { role: "user", content: "old" } }, + { message: { role: "assistant", content: "middle" } }, + { message: { role: "user", content: "recent" } }, + { message: { role: "assistant", content: "latest" } }, + ]); + const readFileSpy = vi.spyOn(fs, "readFileSync"); + + try { + const result = await readRecentSessionMessagesWithStatsAsync( + sessionId, + storePath, + undefined, + { + maxMessages: 2, + maxBytes: 256, + }, + ); + + expect(result.totalMessages).toBe(4); + expect(result.messages).toEqual([ + expect.objectContaining({ + content: "recent", + __openclaw: expect.objectContaining({ seq: 3 }), + }), + expect.objectContaining({ + content: "latest", + __openclaw: expect.objectContaining({ seq: 4 }), + }), + ]); + expect(readFileSpy).not.toHaveBeenCalled(); + } finally { + readFileSpy.mockRestore(); + } + }); + + test("honors byte caps for async recent-message reads", async () => { + const sessionId = "test-session-recent-async-byte-cap"; + const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`); + const hugeContent = "huge ".repeat(4096); + const lines = [ + JSON.stringify({ type: "session", version: 1, id: sessionId }), + JSON.stringify({ message: { role: "user", content: "old" } }), + JSON.stringify({ message: { role: "assistant", content: hugeContent } }), + JSON.stringify({ message: { role: "assistant", content: "tail" } }), + ]; + fs.writeFileSync(transcriptPath, `${lines.join("\n")}\n`, "utf-8"); + const readFileSpy = vi.spyOn(fs, "readFileSync"); + + try { + const out = await readRecentSessionMessagesAsync(sessionId, storePath, undefined, { + maxMessages: 2, + maxBytes: 2048, + }); + + expect(out).toEqual([expect.objectContaining({ role: "assistant", content: "tail" })]); + expect(JSON.stringify(out)).not.toContain("huge"); + expect(readFileSpy).not.toHaveBeenCalled(); + } finally { + readFileSpy.mockRestore(); + } + }); + test("counts transcript messages without loading the whole file", () => { const sessionId = "test-session-count-large"; const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`); @@ -614,6 +687,96 @@ describe("readSessionMessages", () => { } }); + test("counts transcript messages asynchronously without loading the whole file", async () => { + const sessionId = "test-session-count-large-async"; + const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`); + const lines = [ + JSON.stringify({ type: "session", version: 1, id: sessionId }), + ...Array.from({ length: 2500 }, (_, index) => + JSON.stringify({ message: { role: "user", content: `message ${index}` } }), + ), + ]; + fs.writeFileSync(transcriptPath, lines.join("\n"), "utf-8"); + const readFileSpy = vi.spyOn(fs, "readFileSync"); + + try { + expect(await readSessionMessageCountAsync(sessionId, storePath)).toBe(2500); + expect(readFileSpy).not.toHaveBeenCalled(); + } finally { + readFileSpy.mockRestore(); + } + }); + + test("reads active tree branch asynchronously without SessionManager.open", async () => { + const sessionId = "test-session-tree-async"; + writeTranscript(tmpDir, sessionId, [ + { type: "session", version: 3, id: sessionId }, + { + type: "message", + id: "user-1", + parentId: null, + message: { role: "user", content: "root" }, + }, + { + type: "message", + id: "assistant-1", + parentId: "user-1", + message: { role: "assistant", content: "active branch" }, + }, + { + type: "message", + id: "assistant-inactive", + parentId: "user-1", + message: { role: "assistant", content: "inactive branch" }, + }, + { + type: "message", + id: "user-2", + parentId: "assistant-1", + message: { role: "user", content: "latest active" }, + }, + ]); + clearSessionTranscriptIndexCache(); + const sessionManagerOpenSpy = vi.spyOn(SessionManager, "open"); + const readFileSpy = vi.spyOn(fs, "readFileSync"); + + try { + const messages = await readSessionMessagesAsync(sessionId, storePath); + expect(messages.map((message) => (message as { content?: unknown }).content)).toEqual([ + "root", + "active branch", + "latest active", + ]); + expect(messages[2]).toMatchObject({ + __openclaw: expect.objectContaining({ id: "user-2", seq: 3 }), + }); + expect(sessionManagerOpenSpy).not.toHaveBeenCalled(); + expect(readFileSpy).not.toHaveBeenCalled(); + } finally { + sessionManagerOpenSpy.mockRestore(); + readFileSpy.mockRestore(); + } + }); + + test("caches async transcript indexes by file stats", async () => { + const sessionId = "test-session-index-cache"; + writeTranscript(tmpDir, sessionId, [ + { type: "session", version: 1, id: sessionId }, + { message: { role: "user", content: "hello" } }, + { message: { role: "assistant", content: "hi" } }, + ]); + clearSessionTranscriptIndexCache(); + expect(await readSessionMessageCountAsync(sessionId, storePath)).toBe(2); + + const openSpy = vi.spyOn(fs.promises, "open"); + try { + expect(await readSessionMessageCountAsync(sessionId, storePath)).toBe(2); + expect(openSpy).not.toHaveBeenCalled(); + } finally { + openSpy.mockRestore(); + } + }); + test("tails transcript lines for manual compaction without loading the whole file", () => { const sessionId = "test-session-line-tail"; const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`); @@ -982,6 +1145,55 @@ describe("readLatestSessionUsageFromTranscript", () => { expect(snapshot?.costUsd).toBeCloseTo(0.0115, 8); }); + test("aggregates assistant usage asynchronously without readFileSync", async () => { + const sessionId = "usage-aggregate-async"; + writeTranscript(tmpDir, sessionId, [ + { type: "session", version: 1, id: sessionId }, + { + message: { + role: "assistant", + provider: "anthropic", + model: "claude-sonnet-4-6", + usage: { + input: 1_800, + output: 400, + cacheRead: 600, + cost: { total: 0.0055 }, + }, + }, + }, + { + message: { + role: "assistant", + usage: { + input: 2_400, + output: 250, + cacheRead: 900, + cost: { total: 0.006 }, + }, + }, + }, + ]); + const readFileSpy = vi.spyOn(fs, "readFileSync"); + + try { + const snapshot = await readLatestSessionUsageFromTranscriptAsync(sessionId, storePath); + expect(snapshot).toMatchObject({ + modelProvider: "anthropic", + model: "claude-sonnet-4-6", + inputTokens: 4200, + outputTokens: 650, + cacheRead: 1500, + totalTokens: 3300, + totalTokensFresh: true, + }); + expect(snapshot?.costUsd).toBeCloseTo(0.0115, 8); + expect(readFileSpy).not.toHaveBeenCalled(); + } finally { + readFileSpy.mockRestore(); + } + }); + test("reads earlier assistant usage outside the old tail window", () => { const sessionId = "usage-full-transcript"; const filler = "x".repeat(20_000); diff --git a/src/gateway/session-utils.fs.ts b/src/gateway/session-utils.fs.ts index 9180459bce8..37512760b84 100644 --- a/src/gateway/session-utils.fs.ts +++ b/src/gateway/session-utils.fs.ts @@ -15,6 +15,10 @@ import { archiveSessionTranscripts, cleanupArchivedSessionTranscripts, } from "./session-transcript-files.fs.js"; +import { + readSessionTranscriptIndex, + type IndexedTranscriptEntry, +} from "./session-transcript-index.fs.js"; import type { SessionPreviewItem } from "./session-utils.types.js"; type SessionTitleFields = { @@ -29,6 +33,16 @@ type SessionTitleFieldsCacheEntry = SessionTitleFields & { const sessionTitleFieldsCache = new Map(); const MAX_SESSION_TITLE_FIELDS_CACHE_ENTRIES = 5000; +const transcriptMessageCountCache = new Map< + string, + { + mtimeMs: number; + size: number; + count: number; + } +>(); +const MAX_TRANSCRIPT_MESSAGE_COUNT_CACHE_ENTRIES = 5000; +const TRANSCRIPT_ASYNC_READ_CHUNK_BYTES = 64 * 1024; function readSessionTitleFieldsCacheKey( filePath: string, @@ -71,6 +85,39 @@ function setCachedSessionTitleFields(cacheKey: string, stat: fs.Stats, value: Se } } +function getCachedTranscriptMessageCount(filePath: string, stat: fs.Stats): number | null { + const cached = transcriptMessageCountCache.get(filePath); + if (!cached) { + return null; + } + if (cached.mtimeMs !== stat.mtimeMs || cached.size !== stat.size) { + transcriptMessageCountCache.delete(filePath); + return null; + } + transcriptMessageCountCache.delete(filePath); + transcriptMessageCountCache.set(filePath, cached); + return cached.count; +} + +function setCachedTranscriptMessageCount(filePath: string, stat: fs.Stats, count: number): void { + transcriptMessageCountCache.set(filePath, { + mtimeMs: stat.mtimeMs, + size: stat.size, + count, + }); + while (transcriptMessageCountCache.size > MAX_TRANSCRIPT_MESSAGE_COUNT_CACHE_ENTRIES) { + const oldestKey = transcriptMessageCountCache.keys().next().value; + if (typeof oldestKey !== "string" || !oldestKey) { + break; + } + transcriptMessageCountCache.delete(oldestKey); + } +} + +async function yieldTranscriptScan(): Promise { + await new Promise((resolve) => setImmediate(resolve)); +} + export function attachOpenClawTranscriptMeta( message: unknown, meta: Record, @@ -182,6 +229,12 @@ type ReadRecentSessionMessagesResult = { const RECENT_SESSION_MESSAGES_DEFAULT_MAX_BYTES = 8 * 1024 * 1024; +type TailTranscriptRecord = { + id?: string; + parentId?: string | null; + record: Record; +}; + export function readRecentSessionMessages( sessionId: string, storePath: string | undefined, @@ -253,6 +306,121 @@ export function readRecentSessionMessages( ); } +async function readRecentTranscriptTailLinesAsync( + filePath: string, + stat: fs.Stats, + opts: ReadRecentSessionMessagesOptions, +): Promise { + const maxMessages = Math.max(0, Math.floor(opts.maxMessages)); + const maxBytes = Math.max( + 1024, + Math.floor(opts.maxBytes ?? RECENT_SESSION_MESSAGES_DEFAULT_MAX_BYTES), + ); + const readLen = Math.min(stat.size, maxBytes); + const readStart = Math.max(0, stat.size - readLen); + const maxLines = Math.max(maxMessages, Math.floor(opts.maxLines ?? maxMessages * 20 + 20)); + const handle = await fs.promises.open(filePath, "r"); + try { + const buffer = Buffer.alloc(readLen); + const { bytesRead } = await handle.read(buffer, 0, readLen, readStart); + if (bytesRead <= 0) { + return []; + } + return buffer + .toString("utf-8", 0, bytesRead) + .split(/\r?\n/) + .slice(readStart > 0 ? 1 : 0) + .filter((line) => line.trim().length > 0) + .slice(-maxLines); + } finally { + await handle.close(); + } +} + +function normalizeTailEntryString(value: unknown): string | undefined { + return typeof value === "string" && value.trim().length > 0 ? value : undefined; +} + +function parseTailTranscriptRecord(line: string): TailTranscriptRecord | null { + try { + const parsed = JSON.parse(line) as unknown; + if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) { + return null; + } + const record = parsed as Record; + return { + ...(normalizeTailEntryString(record.id) ? { id: normalizeTailEntryString(record.id) } : {}), + ...(record.parentId === null + ? { parentId: null } + : normalizeTailEntryString(record.parentId) + ? { parentId: normalizeTailEntryString(record.parentId) } + : {}), + record, + }; + } catch { + return null; + } +} + +function tailRecordHasTreeLink(entry: TailTranscriptRecord): boolean { + return ( + entry.record.type !== "session" && + typeof entry.id === "string" && + Object.hasOwn(entry.record, "parentId") + ); +} + +function selectBoundedActiveTailRecords(entries: TailTranscriptRecord[]): TailTranscriptRecord[] { + const byId = new Map(); + let leafId: string | undefined; + for (const entry of entries) { + if (tailRecordHasTreeLink(entry) && entry.id) { + byId.set(entry.id, entry); + leafId = entry.id; + } + } + if (!leafId) { + return entries; + } + + const selected: TailTranscriptRecord[] = []; + const seen = new Set(); + let currentId: string | undefined = leafId; + while (currentId) { + if (seen.has(currentId)) { + return []; + } + seen.add(currentId); + const entry = byId.get(currentId); + if (!entry) { + break; + } + selected.push(entry); + currentId = entry.parentId ?? undefined; + } + return selected.toReversed(); +} + +function parseRecentTranscriptTailMessages(lines: string[], maxMessages: number): unknown[] { + const entries = lines.flatMap((line) => { + const entry = parseTailTranscriptRecord(line); + return entry ? [entry] : []; + }); + const selected = entries.some(tailRecordHasTreeLink) + ? selectBoundedActiveTailRecords(entries) + : entries; + const messages: unknown[] = []; + let messageSeq = 0; + for (const entry of selected) { + const message = parsedSessionEntryToMessage(entry.record, messageSeq + 1); + if (message) { + messageSeq += 1; + messages.push(message); + } + } + return messages.slice(-maxMessages); +} + function visitTranscriptLines(filePath: string, visit: (line: string) => void): void { const fd = fs.openSync(filePath, "r"); try { @@ -280,6 +448,37 @@ function visitTranscriptLines(filePath: string, visit: (line: string) => void): } } +async function visitTranscriptLinesAsync( + filePath: string, + visit: (line: string) => void, +): Promise { + const handle = await fs.promises.open(filePath, "r"); + try { + const decoder = new StringDecoder("utf8"); + const buffer = Buffer.allocUnsafe(TRANSCRIPT_ASYNC_READ_CHUNK_BYTES); + let carry = ""; + while (true) { + const { bytesRead } = await handle.read(buffer, 0, buffer.length, null); + if (bytesRead <= 0) { + break; + } + const text = carry + decoder.write(buffer.subarray(0, bytesRead)); + const lines = text.split(/\r?\n/); + carry = lines.pop() ?? ""; + for (const line of lines) { + visit(line); + } + await yieldTranscriptScan(); + } + const tail = carry + decoder.end(); + if (tail) { + visit(tail); + } + } finally { + await handle.close(); + } +} + function transcriptHasTreeEntries(filePath: string): boolean { let hasTreeEntries = false; try { @@ -382,7 +581,88 @@ export function readSessionMessageCount( storePath: string | undefined, sessionFile?: string, ): number { - return visitSessionMessages(sessionId, storePath, sessionFile, () => undefined); + const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile); + if (!filePath) { + return 0; + } + let stat: fs.Stats | null = null; + try { + stat = fs.statSync(filePath); + const cached = getCachedTranscriptMessageCount(filePath, stat); + if (typeof cached === "number") { + return cached; + } + } catch { + // Count from the transcript reader below when stat metadata is unavailable. + } + const count = visitSessionMessages(sessionId, storePath, sessionFile, () => undefined); + if (stat) { + setCachedTranscriptMessageCount(filePath, stat, count); + } + return count; +} + +export async function readSessionMessagesAsync( + sessionId: string, + storePath: string | undefined, + sessionFile?: string, +): Promise { + const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile); + if (!filePath) { + return []; + } + const index = await readSessionTranscriptIndex(filePath); + return index?.entries.flatMap((entry) => indexedTranscriptEntryToMessages(entry)) ?? []; +} + +export async function visitSessionMessagesAsync( + sessionId: string, + storePath: string | undefined, + sessionFile: string | undefined, + visit: (message: unknown, seq: number) => void, +): Promise { + const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile); + if (!filePath) { + return 0; + } + const index = await readSessionTranscriptIndex(filePath); + if (!index) { + return 0; + } + for (const entry of index.entries) { + const message = indexedTranscriptEntryToMessage(entry); + if (message) { + visit(message, entry.seq); + } + } + return index.entries.length; +} + +export async function readSessionMessageCountAsync( + sessionId: string, + storePath: string | undefined, + sessionFile?: string, +): Promise { + const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile); + if (!filePath) { + return 0; + } + let stat: fs.Stats | null = null; + try { + stat = await fs.promises.stat(filePath); + const cached = getCachedTranscriptMessageCount(filePath, stat); + if (typeof cached === "number") { + return cached; + } + } catch { + // Count from the transcript reader below when stat metadata is unavailable. + } + const index = await readSessionTranscriptIndex(filePath); + const count = index?.entries.length ?? 0; + if (stat) { + setCachedTranscriptMessageCount(filePath, stat, count); + } + return count; } export function readRecentSessionMessagesWithStats( @@ -400,6 +680,53 @@ export function readRecentSessionMessagesWithStats( return { messages: messagesWithSeq, totalMessages }; } +export async function readRecentSessionMessagesAsync( + sessionId: string, + storePath: string | undefined, + sessionFile?: string, + opts?: ReadRecentSessionMessagesOptions, +): Promise { + const maxMessages = Math.max(0, Math.floor(opts?.maxMessages ?? 0)); + if (maxMessages === 0) { + return []; + } + + const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile); + if (!filePath) { + return []; + } + + let stat: fs.Stats; + try { + stat = await fs.promises.stat(filePath); + } catch { + return []; + } + if (stat.size === 0) { + return []; + } + const lines = await readRecentTranscriptTailLinesAsync(filePath, stat, { + ...opts, + maxMessages, + }); + return parseRecentTranscriptTailMessages(lines, maxMessages); +} + +export async function readRecentSessionMessagesWithStatsAsync( + sessionId: string, + storePath: string | undefined, + sessionFile: string | undefined, + opts: ReadRecentSessionMessagesOptions, +): Promise { + const totalMessages = await readSessionMessageCountAsync(sessionId, storePath, sessionFile); + const messages = await readRecentSessionMessagesAsync(sessionId, storePath, sessionFile, opts); + const firstSeq = Math.max(1, totalMessages - messages.length + 1); + const messagesWithSeq = messages.map((message, index) => + attachOpenClawTranscriptMeta(message, { seq: firstSeq + index }), + ); + return { messages: messagesWithSeq, totalMessages }; +} + export function readRecentSessionTranscriptLines(params: { sessionId: string; storePath: string | undefined; @@ -479,6 +806,15 @@ function parsedSessionEntryToMessage(parsed: unknown, seq: number): unknown { return null; } +function indexedTranscriptEntryToMessage(entry: IndexedTranscriptEntry): unknown { + return parsedSessionEntryToMessage(entry.record, entry.seq); +} + +function indexedTranscriptEntryToMessages(entry: IndexedTranscriptEntry): unknown[] { + const message = indexedTranscriptEntryToMessage(entry); + return message ? [message] : []; +} + export { archiveFileOnDisk, archiveSessionTranscripts, @@ -584,6 +920,68 @@ export function readSessionTitleFieldsFromTranscript( } } +export async function readSessionTitleFieldsFromTranscriptAsync( + sessionId: string, + storePath: string | undefined, + sessionFile?: string, + agentId?: string, + opts?: { includeInterSession?: boolean }, +): Promise { + const candidates = resolveSessionTranscriptCandidates(sessionId, storePath, sessionFile, agentId); + const filePath = candidates.find((p) => fs.existsSync(p)); + if (!filePath) { + return { firstUserMessage: null, lastMessagePreview: null }; + } + let stat: fs.Stats; + try { + stat = await fs.promises.stat(filePath); + } catch { + return { firstUserMessage: null, lastMessagePreview: null }; + } + const cacheKey = readSessionTitleFieldsCacheKey(filePath, opts); + const cached = getCachedSessionTitleFields(cacheKey, stat); + if (cached) { + return cached; + } + const index = await readSessionTranscriptIndex(filePath); + if (!index) { + return { firstUserMessage: null, lastMessagePreview: null }; + } + + let firstUserMessage: string | null = null; + for (const entry of index.entries) { + const msg = entry.record.message as TranscriptMessage | undefined; + if (msg?.role !== "user") { + continue; + } + if (opts?.includeInterSession !== true && hasInterSessionUserProvenance(msg)) { + continue; + } + const text = extractTextFromContent(msg.content); + if (text) { + firstUserMessage = text; + break; + } + } + + let lastMessagePreview: string | null = null; + for (const entry of index.entries.toReversed()) { + const msg = entry.record.message as TranscriptMessage | undefined; + if (!msg || (msg.role !== "user" && msg.role !== "assistant")) { + continue; + } + const text = extractTextFromContent(msg.content); + if (text) { + lastMessagePreview = text; + break; + } + } + + const result = { firstUserMessage, lastMessagePreview }; + setCachedSessionTitleFields(cacheKey, stat, result); + return result; +} + function extractTextFromContent(content: TranscriptMessage["content"]): string | null { if (typeof content === "string") { const normalized = stripInlineDirectiveTagsForDisplay(content).text.trim(); @@ -774,10 +1172,9 @@ function resolvePositiveUsageNumber(value: unknown): number | undefined { return typeof value === "number" && Number.isFinite(value) && value > 0 ? value : undefined; } -function extractLatestUsageFromTranscriptChunk( - chunk: string, +function extractLatestUsageFromTranscriptLines( + lines: Iterable, ): SessionTranscriptUsageSnapshot | null { - const lines = chunk.split(/\r?\n/).filter((line) => line.trim().length > 0); const snapshot: SessionTranscriptUsageSnapshot = {}; let sawSnapshot = false; let inputTokens = 0; @@ -898,6 +1295,14 @@ function extractLatestUsageFromTranscriptChunk( return snapshot; } +function extractLatestUsageFromTranscriptChunk( + chunk: string, +): SessionTranscriptUsageSnapshot | null { + return extractLatestUsageFromTranscriptLines( + chunk.split(/\r?\n/).filter((line) => line.trim().length > 0), + ); +} + export function readLatestSessionUsageFromTranscript( sessionId: string, storePath: string | undefined, @@ -919,6 +1324,34 @@ export function readLatestSessionUsageFromTranscript( }); } +export async function readLatestSessionUsageFromTranscriptAsync( + sessionId: string, + storePath: string | undefined, + sessionFile?: string, + agentId?: string, +): Promise { + const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile, agentId); + if (!filePath) { + return null; + } + + try { + const stat = await fs.promises.stat(filePath); + if (stat.size === 0) { + return null; + } + const lines: string[] = []; + await visitTranscriptLinesAsync(filePath, (line) => { + if (line.trim()) { + lines.push(line); + } + }); + return extractLatestUsageFromTranscriptLines(lines); + } catch { + return null; + } +} + export function readRecentSessionUsageFromTranscript( sessionId: string, storePath: string | undefined, diff --git a/src/gateway/session-utils.ts b/src/gateway/session-utils.ts index efde8bebe1e..7bd931eede2 100644 --- a/src/gateway/session-utils.ts +++ b/src/gateway/session-utils.ts @@ -86,8 +86,8 @@ import { resolveStoredSessionKeyForAgentStore, } from "./session-store-key.js"; import { - readLatestSessionUsageFromTranscript, readRecentSessionUsageFromTranscript, + readSessionTitleFieldsFromTranscriptAsync, readSessionTitleFieldsFromTranscript, } from "./session-utils.fs.js"; import type { @@ -106,14 +106,21 @@ export { readFirstUserMessageFromTranscript, readLastMessagePreviewFromTranscript, readLatestSessionUsageFromTranscript, + readLatestSessionUsageFromTranscriptAsync, readRecentSessionMessages, + readRecentSessionMessagesAsync, + readRecentSessionMessagesWithStatsAsync, readRecentSessionMessagesWithStats, readRecentSessionTranscriptLines, readRecentSessionUsageFromTranscript, + readSessionMessageCountAsync, readSessionMessageCount, readSessionTitleFieldsFromTranscript, + readSessionTitleFieldsFromTranscriptAsync, readSessionPreviewItemsFromTranscript, + readSessionMessagesAsync, readSessionMessages, + visitSessionMessagesAsync, visitSessionMessages, resolveSessionTranscriptCandidates, } from "./session-utils.fs.js"; @@ -476,21 +483,13 @@ function resolveTranscriptUsageFallback(params: { const agentId = parsed?.agentId ? normalizeAgentId(parsed.agentId) : resolveDefaultAgentId(params.cfg); - const snapshot = - typeof params.maxTranscriptBytes === "number" - ? readRecentSessionUsageFromTranscript( - entry.sessionId, - params.storePath, - entry.sessionFile, - agentId, - params.maxTranscriptBytes, - ) - : readLatestSessionUsageFromTranscript( - entry.sessionId, - params.storePath, - entry.sessionFile, - agentId, - ); + const snapshot = readRecentSessionUsageFromTranscript( + entry.sessionId, + params.storePath, + entry.sessionFile, + agentId, + typeof params.maxTranscriptBytes === "number" ? params.maxTranscriptBytes : 256 * 1024, + ); if (!snapshot) { return null; } @@ -1661,7 +1660,12 @@ function resolveSessionListSearchDisplayName( export function loadGatewaySessionRow( sessionKey: string, - options?: { includeDerivedTitles?: boolean; includeLastMessage?: boolean; now?: number }, + options?: { + includeDerivedTitles?: boolean; + includeLastMessage?: boolean; + now?: number; + transcriptUsageMaxBytes?: number; + }, ): GatewaySessionRow | null { const { cfg, storePath, store, entry, canonicalKey } = loadSessionEntry(sessionKey); if (!entry) { @@ -1676,6 +1680,7 @@ export function loadGatewaySessionRow( now: options?.now, includeDerivedTitles: options?.includeDerivedTitles, includeLastMessage: options?.includeLastMessage, + transcriptUsageMaxBytes: options?.transcriptUsageMaxBytes, }); } @@ -1861,21 +1866,42 @@ export async function listSessionsFromStoreAsync(params: { for (let i = 0; i < entries.length; i++) { const [key, entry] = entries[i]; const includeTranscriptFields = i < sessionListTranscriptFieldRows; - sessions.push( - buildGatewaySessionRow({ - cfg, + const row = buildGatewaySessionRow({ + cfg, + storePath, + store, + key, + entry, + modelCatalog: params.modelCatalog, + now, + includeDerivedTitles: false, + includeLastMessage: false, + transcriptUsageMaxBytes: sessionListTranscriptUsageMaxBytes, + storeChildSessionsByKey, + }); + if ( + entry?.sessionId && + includeTranscriptFields && + (includeDerivedTitles || includeLastMessage) + ) { + const parsed = parseAgentSessionKey(key); + const sessionAgentId = parsed?.agentId + ? normalizeAgentId(parsed.agentId) + : resolveDefaultAgentId(cfg); + const fields = await readSessionTitleFieldsFromTranscriptAsync( + entry.sessionId, storePath, - store, - key, - entry, - modelCatalog: params.modelCatalog, - now, - includeDerivedTitles: includeTranscriptFields && includeDerivedTitles, - includeLastMessage: includeTranscriptFields && includeLastMessage, - transcriptUsageMaxBytes: sessionListTranscriptUsageMaxBytes, - storeChildSessionsByKey, - }), - ); + entry.sessionFile, + sessionAgentId, + ); + if (includeDerivedTitles) { + row.derivedTitle = deriveSessionTitle(entry, fields.firstUserMessage); + } + if (includeLastMessage && fields.lastMessagePreview) { + row.lastMessagePreview = fields.lastMessagePreview; + } + } + sessions.push(row); // Yield to the event loop between batches so WebSocket heartbeats, // channel I/O, and concurrent RPC calls are not starved. if ((i + 1) % SESSIONS_LIST_YIELD_BATCH_SIZE === 0 && i + 1 < entries.length) { diff --git a/src/gateway/sessions-history-http.revocation.test.ts b/src/gateway/sessions-history-http.revocation.test.ts index 21b37f4e7ed..5b745c0fd65 100644 --- a/src/gateway/sessions-history-http.revocation.test.ts +++ b/src/gateway/sessions-history-http.revocation.test.ts @@ -91,7 +91,7 @@ vi.mock("./session-utils.js", () => ({ sessionId: "session-1", sessionFile: "/tmp/session-1.jsonl", }), - readSessionMessages: () => [], + readSessionMessagesAsync: async () => [], resolveSessionTranscriptCandidates: () => ["/tmp/session-1.jsonl"], })); @@ -107,7 +107,7 @@ vi.mock("./session-history-state.js", () => ({ messageSeq: 1, messageId, }), - refresh: () => ({ items: [], nextCursor: null, messages: [] }), + refreshAsync: async () => ({ items: [], nextCursor: null, messages: [] }), }), }, })); diff --git a/src/gateway/sessions-history-http.ts b/src/gateway/sessions-history-http.ts index 76be7c2a024..d5d0e8570f1 100644 --- a/src/gateway/sessions-history-http.ts +++ b/src/gateway/sessions-history-http.ts @@ -31,8 +31,8 @@ import { SessionHistorySseState, } from "./session-history-state.js"; import { - readRecentSessionMessagesWithStats, - readSessionMessages, + readRecentSessionMessagesWithStatsAsync, + readSessionMessagesAsync, resolveFreshestSessionEntryFromStoreKeys, resolveGatewaySessionStoreTarget, resolveSessionTranscriptCandidates, @@ -156,7 +156,7 @@ export async function handleSessionHistoryHttpRequest( : DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS; const boundedSnapshot = cursor === undefined && typeof limit === "number" - ? readRecentSessionMessagesWithStats( + ? await readRecentSessionMessagesWithStatsAsync( entry.sessionId, target.storePath, entry.sessionFile, @@ -168,7 +168,7 @@ export async function handleSessionHistoryHttpRequest( const rawSnapshot = boundedSnapshot?.messages ?? (entry?.sessionId - ? readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile) + ? await readSessionMessagesAsync(entry.sessionId, target.storePath, entry.sessionFile) : []); const historySnapshot = buildSessionHistorySnapshot({ rawMessages: rawSnapshot, @@ -338,7 +338,7 @@ export async function handleSessionHistoryHttpRequest( return; } } - sentHistory = sseState.refresh(); + sentHistory = await sseState.refreshAsync(); sseWrite(res, "history", { sessionKey: target.canonicalKey, ...sentHistory, diff --git a/src/status/status-message.ts b/src/status/status-message.ts index b9781bd9d31..7a39cb4c07d 100644 --- a/src/status/status-message.ts +++ b/src/status/status-message.ts @@ -33,7 +33,7 @@ import { type SessionScope, } from "../config/sessions.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; -import { readLatestSessionUsageFromTranscript } from "../gateway/session-utils.fs.js"; +import { readRecentSessionUsageFromTranscript } from "../gateway/session-utils.fs.js"; import { formatTimeAgo } from "../infra/format-time/format-relative.ts"; import { resolveCommitHash } from "../infra/git-commit.js"; import { @@ -325,11 +325,12 @@ const readUsageFromSessionLog = ( } try { - const snapshot = readLatestSessionUsageFromTranscript( + const snapshot = readRecentSessionUsageFromTranscript( sessionId, storePath, sessionEntry?.sessionFile, agentId ?? (sessionKey ? resolveAgentIdFromSessionKey(sessionKey) : undefined), + 256 * 1024, ); if (!snapshot) { return undefined; diff --git a/src/tui/embedded-backend.test.ts b/src/tui/embedded-backend.test.ts index cb2b01a71ca..f6584596477 100644 --- a/src/tui/embedded-backend.test.ts +++ b/src/tui/embedded-backend.test.ts @@ -72,7 +72,7 @@ vi.mock("../gateway/server-methods/chat.js", () => ({ vi.mock("../gateway/session-utils.js", () => ({ listAgentsForGateway: () => [], - listSessionsFromStore: () => ({ sessions: [] }), + listSessionsFromStoreAsync: async () => ({ sessions: [] }), loadCombinedSessionStoreForGateway: () => ({ storePath: "/tmp/openclaw-sessions.json", store: {}, @@ -83,7 +83,7 @@ vi.mock("../gateway/session-utils.js", () => ({ entry: {}, }), migrateAndPruneGatewaySessionStoreKey: ({ key }: { key: string }) => ({ primaryKey: key }), - readSessionMessages: () => [], + readSessionMessagesAsync: async () => [], resolveGatewaySessionStoreTarget: ({ key }: { key: string }) => ({ canonicalKey: key, storePath: "/tmp/openclaw-sessions.json", diff --git a/src/tui/embedded-backend.ts b/src/tui/embedded-backend.ts index 6da7e67df96..8e0f8fbcde8 100644 --- a/src/tui/embedded-backend.ts +++ b/src/tui/embedded-backend.ts @@ -34,13 +34,13 @@ import { performGatewaySessionReset } from "../gateway/session-reset-service.js" import { capArrayByJsonBytes } from "../gateway/session-utils.fs.js"; import { listAgentsForGateway, - listSessionsFromStore, + listSessionsFromStoreAsync, loadCombinedSessionStoreForGateway, loadSessionEntry, migrateAndPruneGatewaySessionStoreKey, resolveGatewaySessionStoreTarget, resolveSessionModelRef, - readSessionMessages, + readSessionMessagesAsync, } from "../gateway/session-utils.js"; import { applySessionsPatchToStore } from "../gateway/sessions-patch.js"; import { type AgentEventPayload, onAgentEvent } from "../infra/agent-events.js"; @@ -197,7 +197,9 @@ export class EmbeddedTuiBackend implements TuiBackend { const sessionAgentId = resolveSessionAgentId({ sessionKey: opts.sessionKey, config: cfg }); const resolvedSessionModel = resolveSessionModelRef(cfg, entry, sessionAgentId); const localMessages = - sessionId && storePath ? readSessionMessages(sessionId, storePath, entry?.sessionFile) : []; + sessionId && storePath + ? await readSessionMessagesAsync(sessionId, storePath, entry?.sessionFile) + : []; const rawMessages = augmentChatHistoryWithCliSessionImports({ entry, provider: resolvedSessionModel.provider, @@ -245,12 +247,12 @@ export class EmbeddedTuiBackend implements TuiBackend { async listSessions(opts?: Parameters[0]): Promise { const cfg = getRuntimeConfig(); const { storePath, store } = loadCombinedSessionStoreForGateway(cfg); - return listSessionsFromStore({ + return (await listSessionsFromStoreAsync({ cfg, storePath, store, opts: opts ?? {}, - }) as TuiSessionList; + })) as TuiSessionList; } async listAgents(): Promise {