diff --git a/CHANGELOG.md b/CHANGELOG.md index 29599c943d4..89258107566 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,7 @@ Docs: https://docs.openclaw.ai - Gateway/config: cap oversized plugin-owned schemas in the full `config.schema` response so large installed plugin sets cannot balloon Gateway RSS or crash schema clients. Thanks @vincentkoc. - Plugins/update: skip ClawHub and marketplace plugin updates when the bundled version is newer than the recorded installed version, so `openclaw update` no longer overwrites working bundled plugins with older external packages. Fixes #75447. Thanks @amknight. - Gateway/sessions: use bounded tail reads for sessions-list transcript usage fallbacks and cap bulk title/last-message hydration, keeping large session stores responsive when rows request derived previews. Thanks @vincentkoc. +- Gateway/sessions: yield during bulk transcript title/preview hydration and copy compaction checkpoints asynchronously, keeping the Gateway event loop responsive for large session stores and large transcripts. Refs #75330 and #75414. Thanks @amknight. - Gateway/chat: bound chat-history transcript reads to the requested display window so large session logs no longer OOM the Gateway when clients ask for a small history page. Thanks @vincentkoc. - Voice Call/Twilio: honor stored pre-connect TwiML before realtime webhook shortcuts and reject DTMF sequences outside conversation mode, so Meet PIN entry cannot be skipped or silently dropped. Thanks @donkeykong91 and @PfanP. - Docs/sandboxing: clarify that sandbox setup scripts (`sandbox-setup.sh`, `sandbox-common-setup.sh`, `sandbox-browser-setup.sh`) are only available from a source checkout, and add inline `docker build` commands for npm-installed users so sandbox image setup works without cloning the repo. Fixes #75485. Thanks @amknight. diff --git a/src/agents/pi-embedded-runner/compact.queued.ts b/src/agents/pi-embedded-runner/compact.queued.ts index 9ba6ba58c45..ac77208d2df 100644 --- a/src/agents/pi-embedded-runner/compact.queued.ts +++ b/src/agents/pi-embedded-runner/compact.queued.ts @@ -3,9 +3,10 @@ import { ensureContextEnginesInitialized } from "../../context-engine/init.js"; import { resolveContextEngine } from "../../context-engine/registry.js"; import type { ContextEngineRuntimeContext } from "../../context-engine/types.js"; import { - captureCompactionCheckpointSnapshot, + captureCompactionCheckpointSnapshotAsync, cleanupCompactionCheckpointSnapshot, persistSessionCompactionCheckpoint, + readSessionLeafIdFromTranscriptAsync, resolveSessionCompactionCheckpointReason, type CapturedCompactionCheckpointSnapshot, } from "../../gateway/session-compaction-checkpoints.js"; @@ -115,8 +116,7 @@ export async function compactEmbeddedPiSession( // are notified regardless of which engine is active. const engineOwnsCompaction = contextEngine.info.ownsCompaction === true; checkpointSnapshot = engineOwnsCompaction - ? captureCompactionCheckpointSnapshot({ - sessionManager: SessionManager.open(params.sessionFile), + ? await captureCompactionCheckpointSnapshotAsync({ sessionFile: params.sessionFile, }) : null; @@ -200,7 +200,7 @@ export async function compactEmbeddedPiSession( try { const postLeafId = postCompactionLeafId ?? - SessionManager.open(postCompactionSessionFile).getLeafId() ?? + (await readSessionLeafIdFromTranscriptAsync(postCompactionSessionFile)) ?? undefined; const storedCheckpoint = await persistSessionCompactionCheckpoint({ cfg: params.config, diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 965487e8c6f..db3fbe7f020 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -11,7 +11,7 @@ import { isAcpRuntimeSpawnAvailable } from "../../acp/runtime/availability.js"; import type { ThinkLevel } from "../../auto-reply/thinking.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { - captureCompactionCheckpointSnapshot, + captureCompactionCheckpointSnapshotAsync, cleanupCompactionCheckpointSnapshot, persistSessionCompactionCheckpoint, resolveSessionCompactionCheckpointReason, @@ -822,7 +822,7 @@ export async function compactEmbeddedPiSessionDirect( : undefined, allowedToolNames, }); - checkpointSnapshot = captureCompactionCheckpointSnapshot({ + checkpointSnapshot = await captureCompactionCheckpointSnapshotAsync({ sessionManager, sessionFile: params.sessionFile, }); diff --git a/src/gateway/server-methods/sessions.ts b/src/gateway/server-methods/sessions.ts index 42eb0acfabf..c9acc1bdf43 100644 --- a/src/gateway/server-methods/sessions.ts +++ b/src/gateway/server-methods/sessions.ts @@ -72,7 +72,7 @@ import { import { reactivateCompletedSubagentSession } from "../session-subagent-reactivation.js"; import { archiveFileOnDisk, - listSessionsFromStore, + listSessionsFromStoreAsync, loadCombinedSessionStoreForGateway, loadGatewaySessionRow, loadSessionEntry, @@ -650,7 +650,7 @@ export const sessionsHandlers: GatewayRequestHandlers = { const cfg = context.getRuntimeConfig(); const { storePath, store } = loadCombinedSessionStoreForGateway(cfg); const modelCatalog = await loadOptionalSessionsListModelCatalog(context); - const result = listSessionsFromStore({ + const result = await listSessionsFromStoreAsync({ cfg, storePath, store, diff --git a/src/gateway/server.sessions.list-changed.test.ts b/src/gateway/server.sessions.list-changed.test.ts index 34487e267cd..c8e02b16d6b 100644 --- a/src/gateway/server.sessions.list-changed.test.ts +++ b/src/gateway/server.sessions.list-changed.test.ts @@ -157,6 +157,76 @@ test("sessions.list uses the gateway model catalog for effective thinking defaul ); }); +test("sessions.list yields before responding during bulk transcript hydration", async () => { + const { dir } = await createSessionStoreDir(); + const entries: Record> = {}; + const now = Date.now(); + for (let i = 0; i < 11; i += 1) { + const sessionId = `sess-list-yield-${i}`; + entries[`bulk-${i}`] = sessionStoreEntry(sessionId, { updatedAt: now - i }); + await fs.writeFile( + path.join(dir, `${sessionId}.jsonl`), + [ + JSON.stringify({ type: "session", version: 1, id: sessionId }), + JSON.stringify({ message: { role: "user", content: `title ${i}` } }), + JSON.stringify({ message: { role: "assistant", content: `last ${i}` } }), + ].join("\n"), + "utf-8", + ); + } + await writeSessionStore({ entries }); + + const respond = vi.fn(); + const sessionsHandlers = await getSessionsHandlers(); + const { getRuntimeConfig } = await getGatewayConfigModule(); + const request = sessionsHandlers["sessions.list"]({ + req: { + type: "req", + id: "req-sessions-list-yield", + method: "sessions.list", + params: { + includeDerivedTitles: true, + includeLastMessage: true, + limit: 11, + }, + }, + params: { + includeDerivedTitles: true, + includeLastMessage: true, + limit: 11, + }, + respond, + client: null, + isWebchatConnect: () => false, + context: { + getRuntimeConfig, + loadGatewayModelCatalog: async () => [], + logGateway: { + debug: vi.fn(), + }, + } as never, + }); + + await Promise.resolve(); + await Promise.resolve(); + + expect(respond).not.toHaveBeenCalled(); + await request; + expect(respond).toHaveBeenCalledWith( + true, + expect.objectContaining({ + sessions: expect.arrayContaining([ + expect.objectContaining({ + key: "agent:main:bulk-0", + derivedTitle: "title 0", + lastMessagePreview: "last 0", + }), + ]), + }), + undefined, + ); +}); + test("sessions.list does not block on slow model catalog discovery", async () => { await createSessionStoreDir(); await writeSessionStore({ diff --git a/src/gateway/session-compaction-checkpoints.test.ts b/src/gateway/session-compaction-checkpoints.test.ts index 8cc32f415f7..12b5e3c48a6 100644 --- a/src/gateway/session-compaction-checkpoints.test.ts +++ b/src/gateway/session-compaction-checkpoints.test.ts @@ -4,13 +4,15 @@ import os from "node:os"; import path from "node:path"; import type { AssistantMessage, UserMessage } from "@mariozechner/pi-ai"; import { SessionManager } from "@mariozechner/pi-coding-agent"; -import { afterEach, describe, expect, test } from "vitest"; +import { afterEach, describe, expect, test, vi } from "vitest"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { captureCompactionCheckpointSnapshot, + captureCompactionCheckpointSnapshotAsync, cleanupCompactionCheckpointSnapshot, MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES, persistSessionCompactionCheckpoint, + readSessionLeafIdFromTranscriptAsync, } from "./session-compaction-checkpoints.js"; const tempDirs: string[] = []; @@ -85,6 +87,144 @@ describe("session-compaction-checkpoints", () => { expect(fsSync.existsSync(sessionFile!)).toBe(true); }); + test("async capture stores the copied pre-compaction transcript without sync copy", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-async-")); + tempDirs.push(dir); + + const session = SessionManager.create(dir, dir); + session.appendMessage({ + role: "user", + content: "before async compaction", + timestamp: Date.now(), + }); + session.appendMessage({ + role: "assistant", + content: [{ type: "text", text: "async working on it" }], + api: "responses", + provider: "openai", + model: "gpt-test", + timestamp: Date.now(), + } as AssistantMessage); + + const sessionFile = session.getSessionFile(); + const leafId = session.getLeafId(); + expect(sessionFile).toBeTruthy(); + expect(leafId).toBeTruthy(); + + const originalBefore = await fs.readFile(sessionFile!, "utf-8"); + const copyFileSyncSpy = vi.spyOn(fsSync, "copyFileSync"); + const sessionManagerOpenSpy = vi.spyOn(SessionManager, "open"); + try { + const snapshot = await captureCompactionCheckpointSnapshotAsync({ + sessionManager: session, + sessionFile: sessionFile!, + }); + + expect(copyFileSyncSpy).not.toHaveBeenCalled(); + expect(sessionManagerOpenSpy).not.toHaveBeenCalled(); + expect(snapshot).not.toBeNull(); + expect(snapshot?.leafId).toBe(leafId); + expect(snapshot?.sessionFile).not.toBe(sessionFile); + expect(snapshot?.sessionFile).toContain(".checkpoint."); + expect(fsSync.existsSync(snapshot!.sessionFile)).toBe(true); + expect(await fs.readFile(snapshot!.sessionFile, "utf-8")).toBe(originalBefore); + + session.appendCompaction("checkpoint summary", leafId!, 123, { ok: true }); + + expect(await fs.readFile(snapshot!.sessionFile, "utf-8")).toBe(originalBefore); + expect(await fs.readFile(sessionFile!, "utf-8")).not.toBe(originalBefore); + + await cleanupCompactionCheckpointSnapshot(snapshot); + + expect(fsSync.existsSync(snapshot!.sessionFile)).toBe(false); + expect(fsSync.existsSync(sessionFile!)).toBe(true); + } finally { + copyFileSyncSpy.mockRestore(); + sessionManagerOpenSpy.mockRestore(); + } + }); + + test("async capture derives session metadata without synchronous SessionManager.open", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-async-metadata-")); + tempDirs.push(dir); + + const session = SessionManager.create(dir, dir); + session.appendMessage({ + role: "user", + content: "derive checkpoint metadata", + timestamp: Date.now(), + }); + session.appendMessage({ + role: "assistant", + content: "metadata derived", + api: "responses", + provider: "openai", + model: "gpt-test", + timestamp: Date.now(), + } as unknown as AssistantMessage); + + const sessionFile = session.getSessionFile(); + const sessionId = session.getSessionId(); + const leafId = session.getLeafId(); + expect(sessionFile).toBeTruthy(); + expect(sessionId).toBeTruthy(); + expect(leafId).toBeTruthy(); + await fs.appendFile(sessionFile!, "\nnot-json\n", "utf-8"); + + const copyFileSyncSpy = vi.spyOn(fsSync, "copyFileSync"); + const sessionManagerOpenSpy = vi.spyOn(SessionManager, "open"); + let snapshot: Awaited> = null; + try { + expect(await readSessionLeafIdFromTranscriptAsync(sessionFile!)).toBe(leafId); + snapshot = await captureCompactionCheckpointSnapshotAsync({ + sessionFile: sessionFile!, + }); + + expect(copyFileSyncSpy).not.toHaveBeenCalled(); + expect(sessionManagerOpenSpy).not.toHaveBeenCalled(); + expect(snapshot).not.toBeNull(); + expect(snapshot?.sessionId).toBe(sessionId); + expect(snapshot?.leafId).toBe(leafId); + expect(snapshot?.sessionFile).not.toBe(sessionFile); + expect(snapshot?.sessionFile).toContain(".checkpoint."); + } finally { + await cleanupCompactionCheckpointSnapshot(snapshot); + copyFileSyncSpy.mockRestore(); + sessionManagerOpenSpy.mockRestore(); + } + }); + + test("async capture skips oversized pre-compaction transcripts without sync copy", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-async-oversized-")); + tempDirs.push(dir); + + const session = SessionManager.create(dir, dir); + session.appendMessage({ + role: "user", + content: "before compaction", + timestamp: Date.now(), + }); + const sessionFile = session.getSessionFile(); + expect(sessionFile).toBeTruthy(); + await fs.appendFile(sessionFile!, "x".repeat(128), "utf-8"); + + const copyFileSyncSpy = vi.spyOn(fsSync, "copyFileSync"); + try { + const snapshot = await captureCompactionCheckpointSnapshotAsync({ + sessionManager: session, + sessionFile: sessionFile!, + maxBytes: 64, + }); + + expect(snapshot).toBeNull(); + expect(copyFileSyncSpy).not.toHaveBeenCalled(); + expect(MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES).toBeGreaterThan(64); + expect(fsSync.readdirSync(dir).filter((file) => file.includes(".checkpoint."))).toEqual([]); + } finally { + copyFileSyncSpy.mockRestore(); + } + }); + test("capture skips oversized pre-compaction transcripts", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-oversized-")); tempDirs.push(dir); diff --git a/src/gateway/session-compaction-checkpoints.ts b/src/gateway/session-compaction-checkpoints.ts index 50680dd6a0c..cbca36fb186 100644 --- a/src/gateway/session-compaction-checkpoints.ts +++ b/src/gateway/session-compaction-checkpoints.ts @@ -60,6 +60,138 @@ export function resolveSessionCompactionCheckpointReason(params: { return "auto-threshold"; } +const SESSION_HEADER_READ_MAX_BYTES = 64 * 1024; +const SESSION_TAIL_READ_INITIAL_BYTES = 64 * 1024; + +type AsyncTranscriptFileHandle = Awaited>; + +async function readFileRangeAsync( + fileHandle: AsyncTranscriptFileHandle, + position: number, + length: number, +): Promise { + const buffer = Buffer.alloc(length); + let offset = 0; + while (offset < length) { + const { bytesRead } = await fileHandle.read(buffer, offset, length - offset, position + offset); + if (bytesRead <= 0) { + break; + } + offset += bytesRead; + } + return offset === length ? buffer : buffer.subarray(0, offset); +} + +async function readSessionIdFromTranscriptHeaderAsync(sessionFile: string): Promise { + let fileHandle: AsyncTranscriptFileHandle | undefined; + try { + fileHandle = await fs.open(sessionFile, "r"); + const buffer = await readFileRangeAsync(fileHandle, 0, SESSION_HEADER_READ_MAX_BYTES); + if (buffer.length <= 0) { + return null; + } + const chunk = buffer.toString("utf-8"); + const firstLine = chunk + .split(/\r?\n/) + .map((line) => line.trim()) + .find((line) => line.length > 0); + if (!firstLine) { + return null; + } + const parsed = JSON.parse(firstLine) as { type?: unknown; id?: unknown }; + return parsed.type === "session" && typeof parsed.id === "string" && parsed.id.trim() + ? parsed.id.trim() + : null; + } catch { + return null; + } finally { + if (fileHandle) { + await fileHandle.close().catch(() => undefined); + } + } +} + +function parseTranscriptLineId( + line: string, +): { kind: "session" } | { kind: "entry"; id: string } | null { + try { + const parsed = JSON.parse(line) as { type?: unknown; id?: unknown }; + if (parsed.type === "session") { + return { kind: "session" }; + } + if (typeof parsed.id === "string" && parsed.id.trim()) { + return { kind: "entry", id: parsed.id.trim() }; + } + } catch { + return null; + } + return null; +} + +export async function readSessionLeafIdFromTranscriptAsync( + sessionFile: string, + maxBytes = MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES, +): Promise { + let fileHandle: AsyncTranscriptFileHandle | undefined; + try { + fileHandle = await fs.open(sessionFile, "r"); + const stat = await fileHandle.stat(); + if (!stat.isFile() || stat.size <= 0) { + return null; + } + + const requestedMaxBytes = Number.isFinite(maxBytes) + ? Math.max(1024, Math.floor(maxBytes)) + : MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES; + const maxReadableBytes = Math.min(stat.size, requestedMaxBytes); + let readLength = Math.min(maxReadableBytes, SESSION_TAIL_READ_INITIAL_BYTES); + while (readLength > 0) { + const readStart = Math.max(0, stat.size - readLength); + const buffer = await readFileRangeAsync(fileHandle, readStart, readLength); + const lines = buffer.toString("utf-8").split(/\r?\n/); + // If we did not read from the beginning, the first line may be a suffix of + // a larger JSONL entry. Ignore it and grow the window if no complete entry + // is found. + const candidateLines = readStart > 0 ? lines.slice(1) : lines; + for (let i = candidateLines.length - 1; i >= 0; i -= 1) { + const line = candidateLines[i]?.trim(); + if (!line) { + continue; + } + const parsed = parseTranscriptLineId(line); + if (!parsed) { + continue; + } + if (parsed.kind === "session") { + return null; + } + return parsed.id; + } + + if (readStart === 0) { + return null; + } + const nextReadLength = Math.min(maxReadableBytes, readLength * 2); + if (nextReadLength === readLength) { + return null; + } + readLength = nextReadLength; + } + } catch { + return null; + } finally { + if (fileHandle) { + await fileHandle.close().catch(() => undefined); + } + } + return null; +} + +/** + * Synchronous version — kept for callers that cannot be made async. + * Prefer captureCompactionCheckpointSnapshotAsync for large transcripts + * to avoid blocking the event loop during file copy. + */ export function captureCompactionCheckpointSnapshot(params: { sessionManager: Pick; sessionFile: string; @@ -121,6 +253,65 @@ export function captureCompactionCheckpointSnapshot(params: { }; } +/** + * Async version of captureCompactionCheckpointSnapshot that uses async file + * operations to avoid blocking the event loop. Large transcript files (20MB+) + * were observed blocking the event loop for minutes when copied synchronously + * (see issue #75414). + */ +export async function captureCompactionCheckpointSnapshotAsync(params: { + sessionManager?: Pick; + sessionFile: string; + maxBytes?: number; +}): Promise { + const getLeafId = + params.sessionManager && typeof params.sessionManager.getLeafId === "function" + ? params.sessionManager.getLeafId.bind(params.sessionManager) + : null; + const sessionFile = params.sessionFile.trim(); + if (!sessionFile || (params.sessionManager && !getLeafId)) { + return null; + } + const liveLeafId = getLeafId ? getLeafId() : undefined; + if (getLeafId && !liveLeafId) { + return null; + } + const maxBytes = params.maxBytes ?? MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES; + try { + const stat = await fs.stat(sessionFile); + if (!stat.isFile() || stat.size > maxBytes) { + return null; + } + } catch { + return null; + } + const parsedSessionFile = path.parse(sessionFile); + const snapshotFile = path.join( + parsedSessionFile.dir, + `${parsedSessionFile.name}.checkpoint.${randomUUID()}${parsedSessionFile.ext || ".jsonl"}`, + ); + try { + await fs.copyFile(sessionFile, snapshotFile); + } catch { + return null; + } + const sessionId = await readSessionIdFromTranscriptHeaderAsync(snapshotFile); + const leafId = liveLeafId ?? (await readSessionLeafIdFromTranscriptAsync(snapshotFile, maxBytes)); + if (!sessionId || !leafId) { + try { + await fs.unlink(snapshotFile); + } catch { + // Best-effort cleanup if the copied transcript cannot be validated. + } + return null; + } + return { + sessionId, + sessionFile: snapshotFile, + leafId, + }; +} + export async function cleanupCompactionCheckpointSnapshot( snapshot: CapturedCompactionCheckpointSnapshot | null | undefined, ): Promise { diff --git a/src/gateway/session-utils.test.ts b/src/gateway/session-utils.test.ts index 9614fb62aa9..a372d7e07ac 100644 --- a/src/gateway/session-utils.test.ts +++ b/src/gateway/session-utils.test.ts @@ -16,6 +16,7 @@ import { getSessionDefaults, listAgentsForGateway, listSessionsFromStore, + listSessionsFromStoreAsync, loadSessionEntry, migrateAndPruneGatewaySessionStoreKey, parseGroupKey, @@ -1110,6 +1111,55 @@ describe("resolveSessionModelRef", () => { }); describe("listSessionsFromStore selected model display", () => { + test("async list yields during bulk transcript title and last-message hydration", async () => { + const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-sessions-list-yield-")); + try { + const storePath = path.join(tmpDir, "sessions.json"); + const store: Record = {}; + const now = Date.now(); + for (let i = 0; i < 11; i += 1) { + const sessionId = `sess-yield-${i}`; + store[`agent:main:${sessionId}`] = { + sessionId, + updatedAt: now - i, + } as SessionEntry; + fs.writeFileSync( + path.join(tmpDir, `${sessionId}.jsonl`), + [ + JSON.stringify({ type: "session", version: 1, id: sessionId }), + JSON.stringify({ message: { role: "user", content: `title ${i}` } }), + JSON.stringify({ message: { role: "assistant", content: `last ${i}` } }), + ].join("\n"), + "utf-8", + ); + } + + const params = { + cfg: createModelDefaultsConfig({ primary: "openai/gpt-5.4" }), + storePath, + store, + opts: { includeDerivedTitles: true, includeLastMessage: true, limit: 11 }, + }; + const expected = listSessionsFromStore(params); + const listedPromise = listSessionsFromStoreAsync(params); + let settled = false; + void listedPromise.then(() => { + settled = true; + }); + + await Promise.resolve(); + + expect(settled).toBe(false); + const listed = await listedPromise; + expect(listed.path).toBe(expected.path); + expect(listed.count).toBe(expected.count); + expect(listed.defaults).toEqual(expected.defaults); + expect(listed.sessions).toEqual(expected.sessions); + } finally { + fs.rmSync(tmpDir, { recursive: true, force: true }); + } + }); + test("caps transcript title and last-message hydration for bulk list responses", () => { const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-sessions-list-cap-")); try { diff --git a/src/gateway/session-utils.ts b/src/gateway/session-utils.ts index 5290e44fbce..646ff0276fb 100644 --- a/src/gateway/session-utils.ts +++ b/src/gateway/session-utils.ts @@ -1675,23 +1675,21 @@ export function loadGatewaySessionRow( }); } -export function listSessionsFromStore(params: { - cfg: OpenClawConfig; - storePath: string; - store: Record; - modelCatalog?: ModelCatalogEntry[]; - opts: import("./protocol/index.js").SessionsListParams; -}): SessionsListResult { - const { cfg, storePath, store, opts } = params; - const now = Date.now(); - const sessionListTranscriptUsageMaxBytes = 64 * 1024; - const sessionListTranscriptFieldRows = 100; - const storeChildSessionsByKey = buildStoreChildSessionIndex(store, now); +/** + * Number of session rows to build per batch before yielding to the event loop. + * Keeps the main thread responsive during large session list operations while + * avoiding excessive yielding overhead for small stores. + */ +const SESSIONS_LIST_YIELD_BATCH_SIZE = 10; +function filterAndSortSessionEntries(params: { + store: Record; + opts: import("./protocol/index.js").SessionsListParams; + now: number; +}): [string, SessionEntry][] { + const { store, opts, now } = params; const includeGlobal = opts.includeGlobal === true; const includeUnknown = opts.includeUnknown === true; - const includeDerivedTitles = opts.includeDerivedTitles === true; - const includeLastMessage = opts.includeLastMessage === true; const spawnedBy = typeof opts.spawnedBy === "string" ? opts.spawnedBy : ""; const label = normalizeOptionalString(opts.label) ?? ""; const agentId = typeof opts.agentId === "string" ? normalizeAgentId(opts.agentId) : ""; @@ -1782,6 +1780,26 @@ export function listSessionsFromStore(params: { entries = entries.slice(0, limit); } + return entries; +} + +export function listSessionsFromStore(params: { + cfg: OpenClawConfig; + storePath: string; + store: Record; + modelCatalog?: ModelCatalogEntry[]; + opts: import("./protocol/index.js").SessionsListParams; +}): SessionsListResult { + const { cfg, storePath, store, opts } = params; + const now = Date.now(); + const sessionListTranscriptUsageMaxBytes = 64 * 1024; + const sessionListTranscriptFieldRows = 100; + const storeChildSessionsByKey = buildStoreChildSessionIndex(store, now); + const includeDerivedTitles = opts.includeDerivedTitles === true; + const includeLastMessage = opts.includeLastMessage === true; + + const entries = filterAndSortSessionEntries({ store, opts, now }); + const sessions = entries.map(([key, entry], index) => { const includeTranscriptFields = index < sessionListTranscriptFieldRows; return buildGatewaySessionRow({ @@ -1807,3 +1825,65 @@ export function listSessionsFromStore(params: { sessions, }; } + +/** + * Async version of listSessionsFromStore that yields to the event loop between + * batches of session row builds. This prevents large session stores from + * blocking the event loop during sessions.list requests. + * + * The synchronous file I/O in readSessionTitleFieldsFromTranscript (head/tail + * reads for derived titles and last-message previews) is the dominant blocker. + * By yielding every SESSIONS_LIST_YIELD_BATCH_SIZE rows, we keep the event + * loop responsive for WebSocket heartbeats, channel I/O, and concurrent RPC. + */ +export async function listSessionsFromStoreAsync(params: { + cfg: OpenClawConfig; + storePath: string; + store: Record; + modelCatalog?: ModelCatalogEntry[]; + opts: import("./protocol/index.js").SessionsListParams; +}): Promise { + const { cfg, storePath, store, opts } = params; + const now = Date.now(); + const sessionListTranscriptUsageMaxBytes = 64 * 1024; + const sessionListTranscriptFieldRows = 100; + const storeChildSessionsByKey = buildStoreChildSessionIndex(store, now); + const includeDerivedTitles = opts.includeDerivedTitles === true; + const includeLastMessage = opts.includeLastMessage === true; + + const entries = filterAndSortSessionEntries({ store, opts, now }); + + const sessions: GatewaySessionRow[] = []; + for (let i = 0; i < entries.length; i++) { + const [key, entry] = entries[i]; + const includeTranscriptFields = i < sessionListTranscriptFieldRows; + sessions.push( + buildGatewaySessionRow({ + cfg, + storePath, + store, + key, + entry, + modelCatalog: params.modelCatalog, + now, + includeDerivedTitles: includeTranscriptFields && includeDerivedTitles, + includeLastMessage: includeTranscriptFields && includeLastMessage, + transcriptUsageMaxBytes: sessionListTranscriptUsageMaxBytes, + storeChildSessionsByKey, + }), + ); + // Yield to the event loop between batches so WebSocket heartbeats, + // channel I/O, and concurrent RPC calls are not starved. + if ((i + 1) % SESSIONS_LIST_YIELD_BATCH_SIZE === 0 && i + 1 < entries.length) { + await new Promise((resolve) => setImmediate(resolve)); + } + } + + return { + ts: now, + path: storePath, + count: sessions.length, + defaults: getSessionDefaults(cfg, params.modelCatalog), + sessions, + }; +}