mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 07:10:43 +00:00
fix: async transcript I/O to unblock gateway event loop (#75595)
* fix: async transcript I/O to unblock gateway event loop Two related fixes for event-loop starvation caused by synchronous file operations on session transcript files during gateway hot paths. ## sessions.list: yield between transcript reads (#75330) Extract filterAndSortSessionEntries() from listSessionsFromStore() and add a new listSessionsFromStoreAsync() that yields to the event loop via setImmediate every 10 session rows. The sessions.list RPC handler now uses the async version. The synchronous version is kept for callers that need it (sessions- resolve visibility checks, embedded backends, subagent tools). The dominant blocker is readSessionTitleFieldsFromTranscript(), which performs fs.statSync + fs.openSync + fs.readSync (head) + fs.readSync (tail) for every session row that requests derived titles or last- message previews. With 100+ sessions, this blocks the event loop for 32-64 seconds, starving WebSocket heartbeats, channel I/O, and concurrent RPC. ## session compaction: async file copy (#75414) Add captureCompactionCheckpointSnapshotAsync() using fs.promises for stat, copyFile, and unlink instead of fsSync equivalents. Switch both compact.ts and compact.queued.ts to the async version. The synchronous copyFileSync of large transcript files (20MB+ observed in production) was blocking the event loop for the entire copy duration — one reporter measured a 43-minute event loop block from a single compaction checkpoint capture. Refs: #75330, #75414 * test: cover async transcript I/O responsiveness * fix: avoid sync checkpoint metadata reads
This commit is contained in:
@@ -46,6 +46,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Gateway/config: cap oversized plugin-owned schemas in the full `config.schema` response so large installed plugin sets cannot balloon Gateway RSS or crash schema clients. Thanks @vincentkoc.
|
||||
- Plugins/update: skip ClawHub and marketplace plugin updates when the bundled version is newer than the recorded installed version, so `openclaw update` no longer overwrites working bundled plugins with older external packages. Fixes #75447. Thanks @amknight.
|
||||
- Gateway/sessions: use bounded tail reads for sessions-list transcript usage fallbacks and cap bulk title/last-message hydration, keeping large session stores responsive when rows request derived previews. Thanks @vincentkoc.
|
||||
- Gateway/sessions: yield during bulk transcript title/preview hydration and copy compaction checkpoints asynchronously, keeping the Gateway event loop responsive for large session stores and large transcripts. Refs #75330 and #75414. Thanks @amknight.
|
||||
- Gateway/chat: bound chat-history transcript reads to the requested display window so large session logs no longer OOM the Gateway when clients ask for a small history page. Thanks @vincentkoc.
|
||||
- Voice Call/Twilio: honor stored pre-connect TwiML before realtime webhook shortcuts and reject DTMF sequences outside conversation mode, so Meet PIN entry cannot be skipped or silently dropped. Thanks @donkeykong91 and @PfanP.
|
||||
- Docs/sandboxing: clarify that sandbox setup scripts (`sandbox-setup.sh`, `sandbox-common-setup.sh`, `sandbox-browser-setup.sh`) are only available from a source checkout, and add inline `docker build` commands for npm-installed users so sandbox image setup works without cloning the repo. Fixes #75485. Thanks @amknight.
|
||||
|
||||
@@ -3,9 +3,10 @@ import { ensureContextEnginesInitialized } from "../../context-engine/init.js";
|
||||
import { resolveContextEngine } from "../../context-engine/registry.js";
|
||||
import type { ContextEngineRuntimeContext } from "../../context-engine/types.js";
|
||||
import {
|
||||
captureCompactionCheckpointSnapshot,
|
||||
captureCompactionCheckpointSnapshotAsync,
|
||||
cleanupCompactionCheckpointSnapshot,
|
||||
persistSessionCompactionCheckpoint,
|
||||
readSessionLeafIdFromTranscriptAsync,
|
||||
resolveSessionCompactionCheckpointReason,
|
||||
type CapturedCompactionCheckpointSnapshot,
|
||||
} from "../../gateway/session-compaction-checkpoints.js";
|
||||
@@ -115,8 +116,7 @@ export async function compactEmbeddedPiSession(
|
||||
// are notified regardless of which engine is active.
|
||||
const engineOwnsCompaction = contextEngine.info.ownsCompaction === true;
|
||||
checkpointSnapshot = engineOwnsCompaction
|
||||
? captureCompactionCheckpointSnapshot({
|
||||
sessionManager: SessionManager.open(params.sessionFile),
|
||||
? await captureCompactionCheckpointSnapshotAsync({
|
||||
sessionFile: params.sessionFile,
|
||||
})
|
||||
: null;
|
||||
@@ -200,7 +200,7 @@ export async function compactEmbeddedPiSession(
|
||||
try {
|
||||
const postLeafId =
|
||||
postCompactionLeafId ??
|
||||
SessionManager.open(postCompactionSessionFile).getLeafId() ??
|
||||
(await readSessionLeafIdFromTranscriptAsync(postCompactionSessionFile)) ??
|
||||
undefined;
|
||||
const storedCheckpoint = await persistSessionCompactionCheckpoint({
|
||||
cfg: params.config,
|
||||
|
||||
@@ -11,7 +11,7 @@ import { isAcpRuntimeSpawnAvailable } from "../../acp/runtime/availability.js";
|
||||
import type { ThinkLevel } from "../../auto-reply/thinking.js";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import {
|
||||
captureCompactionCheckpointSnapshot,
|
||||
captureCompactionCheckpointSnapshotAsync,
|
||||
cleanupCompactionCheckpointSnapshot,
|
||||
persistSessionCompactionCheckpoint,
|
||||
resolveSessionCompactionCheckpointReason,
|
||||
@@ -822,7 +822,7 @@ export async function compactEmbeddedPiSessionDirect(
|
||||
: undefined,
|
||||
allowedToolNames,
|
||||
});
|
||||
checkpointSnapshot = captureCompactionCheckpointSnapshot({
|
||||
checkpointSnapshot = await captureCompactionCheckpointSnapshotAsync({
|
||||
sessionManager,
|
||||
sessionFile: params.sessionFile,
|
||||
});
|
||||
|
||||
@@ -72,7 +72,7 @@ import {
|
||||
import { reactivateCompletedSubagentSession } from "../session-subagent-reactivation.js";
|
||||
import {
|
||||
archiveFileOnDisk,
|
||||
listSessionsFromStore,
|
||||
listSessionsFromStoreAsync,
|
||||
loadCombinedSessionStoreForGateway,
|
||||
loadGatewaySessionRow,
|
||||
loadSessionEntry,
|
||||
@@ -650,7 +650,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
const cfg = context.getRuntimeConfig();
|
||||
const { storePath, store } = loadCombinedSessionStoreForGateway(cfg);
|
||||
const modelCatalog = await loadOptionalSessionsListModelCatalog(context);
|
||||
const result = listSessionsFromStore({
|
||||
const result = await listSessionsFromStoreAsync({
|
||||
cfg,
|
||||
storePath,
|
||||
store,
|
||||
|
||||
@@ -157,6 +157,76 @@ test("sessions.list uses the gateway model catalog for effective thinking defaul
|
||||
);
|
||||
});
|
||||
|
||||
test("sessions.list yields before responding during bulk transcript hydration", async () => {
|
||||
const { dir } = await createSessionStoreDir();
|
||||
const entries: Record<string, ReturnType<typeof sessionStoreEntry>> = {};
|
||||
const now = Date.now();
|
||||
for (let i = 0; i < 11; i += 1) {
|
||||
const sessionId = `sess-list-yield-${i}`;
|
||||
entries[`bulk-${i}`] = sessionStoreEntry(sessionId, { updatedAt: now - i });
|
||||
await fs.writeFile(
|
||||
path.join(dir, `${sessionId}.jsonl`),
|
||||
[
|
||||
JSON.stringify({ type: "session", version: 1, id: sessionId }),
|
||||
JSON.stringify({ message: { role: "user", content: `title ${i}` } }),
|
||||
JSON.stringify({ message: { role: "assistant", content: `last ${i}` } }),
|
||||
].join("\n"),
|
||||
"utf-8",
|
||||
);
|
||||
}
|
||||
await writeSessionStore({ entries });
|
||||
|
||||
const respond = vi.fn();
|
||||
const sessionsHandlers = await getSessionsHandlers();
|
||||
const { getRuntimeConfig } = await getGatewayConfigModule();
|
||||
const request = sessionsHandlers["sessions.list"]({
|
||||
req: {
|
||||
type: "req",
|
||||
id: "req-sessions-list-yield",
|
||||
method: "sessions.list",
|
||||
params: {
|
||||
includeDerivedTitles: true,
|
||||
includeLastMessage: true,
|
||||
limit: 11,
|
||||
},
|
||||
},
|
||||
params: {
|
||||
includeDerivedTitles: true,
|
||||
includeLastMessage: true,
|
||||
limit: 11,
|
||||
},
|
||||
respond,
|
||||
client: null,
|
||||
isWebchatConnect: () => false,
|
||||
context: {
|
||||
getRuntimeConfig,
|
||||
loadGatewayModelCatalog: async () => [],
|
||||
logGateway: {
|
||||
debug: vi.fn(),
|
||||
},
|
||||
} as never,
|
||||
});
|
||||
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
|
||||
expect(respond).not.toHaveBeenCalled();
|
||||
await request;
|
||||
expect(respond).toHaveBeenCalledWith(
|
||||
true,
|
||||
expect.objectContaining({
|
||||
sessions: expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
key: "agent:main:bulk-0",
|
||||
derivedTitle: "title 0",
|
||||
lastMessagePreview: "last 0",
|
||||
}),
|
||||
]),
|
||||
}),
|
||||
undefined,
|
||||
);
|
||||
});
|
||||
|
||||
test("sessions.list does not block on slow model catalog discovery", async () => {
|
||||
await createSessionStoreDir();
|
||||
await writeSessionStore({
|
||||
|
||||
@@ -4,13 +4,15 @@ import os from "node:os";
|
||||
import path from "node:path";
|
||||
import type { AssistantMessage, UserMessage } from "@mariozechner/pi-ai";
|
||||
import { SessionManager } from "@mariozechner/pi-coding-agent";
|
||||
import { afterEach, describe, expect, test } from "vitest";
|
||||
import { afterEach, describe, expect, test, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import {
|
||||
captureCompactionCheckpointSnapshot,
|
||||
captureCompactionCheckpointSnapshotAsync,
|
||||
cleanupCompactionCheckpointSnapshot,
|
||||
MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES,
|
||||
persistSessionCompactionCheckpoint,
|
||||
readSessionLeafIdFromTranscriptAsync,
|
||||
} from "./session-compaction-checkpoints.js";
|
||||
|
||||
const tempDirs: string[] = [];
|
||||
@@ -85,6 +87,144 @@ describe("session-compaction-checkpoints", () => {
|
||||
expect(fsSync.existsSync(sessionFile!)).toBe(true);
|
||||
});
|
||||
|
||||
test("async capture stores the copied pre-compaction transcript without sync copy", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-async-"));
|
||||
tempDirs.push(dir);
|
||||
|
||||
const session = SessionManager.create(dir, dir);
|
||||
session.appendMessage({
|
||||
role: "user",
|
||||
content: "before async compaction",
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
session.appendMessage({
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "async working on it" }],
|
||||
api: "responses",
|
||||
provider: "openai",
|
||||
model: "gpt-test",
|
||||
timestamp: Date.now(),
|
||||
} as AssistantMessage);
|
||||
|
||||
const sessionFile = session.getSessionFile();
|
||||
const leafId = session.getLeafId();
|
||||
expect(sessionFile).toBeTruthy();
|
||||
expect(leafId).toBeTruthy();
|
||||
|
||||
const originalBefore = await fs.readFile(sessionFile!, "utf-8");
|
||||
const copyFileSyncSpy = vi.spyOn(fsSync, "copyFileSync");
|
||||
const sessionManagerOpenSpy = vi.spyOn(SessionManager, "open");
|
||||
try {
|
||||
const snapshot = await captureCompactionCheckpointSnapshotAsync({
|
||||
sessionManager: session,
|
||||
sessionFile: sessionFile!,
|
||||
});
|
||||
|
||||
expect(copyFileSyncSpy).not.toHaveBeenCalled();
|
||||
expect(sessionManagerOpenSpy).not.toHaveBeenCalled();
|
||||
expect(snapshot).not.toBeNull();
|
||||
expect(snapshot?.leafId).toBe(leafId);
|
||||
expect(snapshot?.sessionFile).not.toBe(sessionFile);
|
||||
expect(snapshot?.sessionFile).toContain(".checkpoint.");
|
||||
expect(fsSync.existsSync(snapshot!.sessionFile)).toBe(true);
|
||||
expect(await fs.readFile(snapshot!.sessionFile, "utf-8")).toBe(originalBefore);
|
||||
|
||||
session.appendCompaction("checkpoint summary", leafId!, 123, { ok: true });
|
||||
|
||||
expect(await fs.readFile(snapshot!.sessionFile, "utf-8")).toBe(originalBefore);
|
||||
expect(await fs.readFile(sessionFile!, "utf-8")).not.toBe(originalBefore);
|
||||
|
||||
await cleanupCompactionCheckpointSnapshot(snapshot);
|
||||
|
||||
expect(fsSync.existsSync(snapshot!.sessionFile)).toBe(false);
|
||||
expect(fsSync.existsSync(sessionFile!)).toBe(true);
|
||||
} finally {
|
||||
copyFileSyncSpy.mockRestore();
|
||||
sessionManagerOpenSpy.mockRestore();
|
||||
}
|
||||
});
|
||||
|
||||
test("async capture derives session metadata without synchronous SessionManager.open", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-async-metadata-"));
|
||||
tempDirs.push(dir);
|
||||
|
||||
const session = SessionManager.create(dir, dir);
|
||||
session.appendMessage({
|
||||
role: "user",
|
||||
content: "derive checkpoint metadata",
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
session.appendMessage({
|
||||
role: "assistant",
|
||||
content: "metadata derived",
|
||||
api: "responses",
|
||||
provider: "openai",
|
||||
model: "gpt-test",
|
||||
timestamp: Date.now(),
|
||||
} as unknown as AssistantMessage);
|
||||
|
||||
const sessionFile = session.getSessionFile();
|
||||
const sessionId = session.getSessionId();
|
||||
const leafId = session.getLeafId();
|
||||
expect(sessionFile).toBeTruthy();
|
||||
expect(sessionId).toBeTruthy();
|
||||
expect(leafId).toBeTruthy();
|
||||
await fs.appendFile(sessionFile!, "\nnot-json\n", "utf-8");
|
||||
|
||||
const copyFileSyncSpy = vi.spyOn(fsSync, "copyFileSync");
|
||||
const sessionManagerOpenSpy = vi.spyOn(SessionManager, "open");
|
||||
let snapshot: Awaited<ReturnType<typeof captureCompactionCheckpointSnapshotAsync>> = null;
|
||||
try {
|
||||
expect(await readSessionLeafIdFromTranscriptAsync(sessionFile!)).toBe(leafId);
|
||||
snapshot = await captureCompactionCheckpointSnapshotAsync({
|
||||
sessionFile: sessionFile!,
|
||||
});
|
||||
|
||||
expect(copyFileSyncSpy).not.toHaveBeenCalled();
|
||||
expect(sessionManagerOpenSpy).not.toHaveBeenCalled();
|
||||
expect(snapshot).not.toBeNull();
|
||||
expect(snapshot?.sessionId).toBe(sessionId);
|
||||
expect(snapshot?.leafId).toBe(leafId);
|
||||
expect(snapshot?.sessionFile).not.toBe(sessionFile);
|
||||
expect(snapshot?.sessionFile).toContain(".checkpoint.");
|
||||
} finally {
|
||||
await cleanupCompactionCheckpointSnapshot(snapshot);
|
||||
copyFileSyncSpy.mockRestore();
|
||||
sessionManagerOpenSpy.mockRestore();
|
||||
}
|
||||
});
|
||||
|
||||
test("async capture skips oversized pre-compaction transcripts without sync copy", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-async-oversized-"));
|
||||
tempDirs.push(dir);
|
||||
|
||||
const session = SessionManager.create(dir, dir);
|
||||
session.appendMessage({
|
||||
role: "user",
|
||||
content: "before compaction",
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
const sessionFile = session.getSessionFile();
|
||||
expect(sessionFile).toBeTruthy();
|
||||
await fs.appendFile(sessionFile!, "x".repeat(128), "utf-8");
|
||||
|
||||
const copyFileSyncSpy = vi.spyOn(fsSync, "copyFileSync");
|
||||
try {
|
||||
const snapshot = await captureCompactionCheckpointSnapshotAsync({
|
||||
sessionManager: session,
|
||||
sessionFile: sessionFile!,
|
||||
maxBytes: 64,
|
||||
});
|
||||
|
||||
expect(snapshot).toBeNull();
|
||||
expect(copyFileSyncSpy).not.toHaveBeenCalled();
|
||||
expect(MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES).toBeGreaterThan(64);
|
||||
expect(fsSync.readdirSync(dir).filter((file) => file.includes(".checkpoint."))).toEqual([]);
|
||||
} finally {
|
||||
copyFileSyncSpy.mockRestore();
|
||||
}
|
||||
});
|
||||
|
||||
test("capture skips oversized pre-compaction transcripts", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-oversized-"));
|
||||
tempDirs.push(dir);
|
||||
|
||||
@@ -60,6 +60,138 @@ export function resolveSessionCompactionCheckpointReason(params: {
|
||||
return "auto-threshold";
|
||||
}
|
||||
|
||||
const SESSION_HEADER_READ_MAX_BYTES = 64 * 1024;
|
||||
const SESSION_TAIL_READ_INITIAL_BYTES = 64 * 1024;
|
||||
|
||||
type AsyncTranscriptFileHandle = Awaited<ReturnType<typeof fs.open>>;
|
||||
|
||||
async function readFileRangeAsync(
|
||||
fileHandle: AsyncTranscriptFileHandle,
|
||||
position: number,
|
||||
length: number,
|
||||
): Promise<Buffer> {
|
||||
const buffer = Buffer.alloc(length);
|
||||
let offset = 0;
|
||||
while (offset < length) {
|
||||
const { bytesRead } = await fileHandle.read(buffer, offset, length - offset, position + offset);
|
||||
if (bytesRead <= 0) {
|
||||
break;
|
||||
}
|
||||
offset += bytesRead;
|
||||
}
|
||||
return offset === length ? buffer : buffer.subarray(0, offset);
|
||||
}
|
||||
|
||||
async function readSessionIdFromTranscriptHeaderAsync(sessionFile: string): Promise<string | null> {
|
||||
let fileHandle: AsyncTranscriptFileHandle | undefined;
|
||||
try {
|
||||
fileHandle = await fs.open(sessionFile, "r");
|
||||
const buffer = await readFileRangeAsync(fileHandle, 0, SESSION_HEADER_READ_MAX_BYTES);
|
||||
if (buffer.length <= 0) {
|
||||
return null;
|
||||
}
|
||||
const chunk = buffer.toString("utf-8");
|
||||
const firstLine = chunk
|
||||
.split(/\r?\n/)
|
||||
.map((line) => line.trim())
|
||||
.find((line) => line.length > 0);
|
||||
if (!firstLine) {
|
||||
return null;
|
||||
}
|
||||
const parsed = JSON.parse(firstLine) as { type?: unknown; id?: unknown };
|
||||
return parsed.type === "session" && typeof parsed.id === "string" && parsed.id.trim()
|
||||
? parsed.id.trim()
|
||||
: null;
|
||||
} catch {
|
||||
return null;
|
||||
} finally {
|
||||
if (fileHandle) {
|
||||
await fileHandle.close().catch(() => undefined);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function parseTranscriptLineId(
|
||||
line: string,
|
||||
): { kind: "session" } | { kind: "entry"; id: string } | null {
|
||||
try {
|
||||
const parsed = JSON.parse(line) as { type?: unknown; id?: unknown };
|
||||
if (parsed.type === "session") {
|
||||
return { kind: "session" };
|
||||
}
|
||||
if (typeof parsed.id === "string" && parsed.id.trim()) {
|
||||
return { kind: "entry", id: parsed.id.trim() };
|
||||
}
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
export async function readSessionLeafIdFromTranscriptAsync(
|
||||
sessionFile: string,
|
||||
maxBytes = MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES,
|
||||
): Promise<string | null> {
|
||||
let fileHandle: AsyncTranscriptFileHandle | undefined;
|
||||
try {
|
||||
fileHandle = await fs.open(sessionFile, "r");
|
||||
const stat = await fileHandle.stat();
|
||||
if (!stat.isFile() || stat.size <= 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const requestedMaxBytes = Number.isFinite(maxBytes)
|
||||
? Math.max(1024, Math.floor(maxBytes))
|
||||
: MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES;
|
||||
const maxReadableBytes = Math.min(stat.size, requestedMaxBytes);
|
||||
let readLength = Math.min(maxReadableBytes, SESSION_TAIL_READ_INITIAL_BYTES);
|
||||
while (readLength > 0) {
|
||||
const readStart = Math.max(0, stat.size - readLength);
|
||||
const buffer = await readFileRangeAsync(fileHandle, readStart, readLength);
|
||||
const lines = buffer.toString("utf-8").split(/\r?\n/);
|
||||
// If we did not read from the beginning, the first line may be a suffix of
|
||||
// a larger JSONL entry. Ignore it and grow the window if no complete entry
|
||||
// is found.
|
||||
const candidateLines = readStart > 0 ? lines.slice(1) : lines;
|
||||
for (let i = candidateLines.length - 1; i >= 0; i -= 1) {
|
||||
const line = candidateLines[i]?.trim();
|
||||
if (!line) {
|
||||
continue;
|
||||
}
|
||||
const parsed = parseTranscriptLineId(line);
|
||||
if (!parsed) {
|
||||
continue;
|
||||
}
|
||||
if (parsed.kind === "session") {
|
||||
return null;
|
||||
}
|
||||
return parsed.id;
|
||||
}
|
||||
|
||||
if (readStart === 0) {
|
||||
return null;
|
||||
}
|
||||
const nextReadLength = Math.min(maxReadableBytes, readLength * 2);
|
||||
if (nextReadLength === readLength) {
|
||||
return null;
|
||||
}
|
||||
readLength = nextReadLength;
|
||||
}
|
||||
} catch {
|
||||
return null;
|
||||
} finally {
|
||||
if (fileHandle) {
|
||||
await fileHandle.close().catch(() => undefined);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Synchronous version — kept for callers that cannot be made async.
|
||||
* Prefer captureCompactionCheckpointSnapshotAsync for large transcripts
|
||||
* to avoid blocking the event loop during file copy.
|
||||
*/
|
||||
export function captureCompactionCheckpointSnapshot(params: {
|
||||
sessionManager: Pick<SessionManager, "getLeafId">;
|
||||
sessionFile: string;
|
||||
@@ -121,6 +253,65 @@ export function captureCompactionCheckpointSnapshot(params: {
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Async version of captureCompactionCheckpointSnapshot that uses async file
|
||||
* operations to avoid blocking the event loop. Large transcript files (20MB+)
|
||||
* were observed blocking the event loop for minutes when copied synchronously
|
||||
* (see issue #75414).
|
||||
*/
|
||||
export async function captureCompactionCheckpointSnapshotAsync(params: {
|
||||
sessionManager?: Pick<SessionManager, "getLeafId">;
|
||||
sessionFile: string;
|
||||
maxBytes?: number;
|
||||
}): Promise<CapturedCompactionCheckpointSnapshot | null> {
|
||||
const getLeafId =
|
||||
params.sessionManager && typeof params.sessionManager.getLeafId === "function"
|
||||
? params.sessionManager.getLeafId.bind(params.sessionManager)
|
||||
: null;
|
||||
const sessionFile = params.sessionFile.trim();
|
||||
if (!sessionFile || (params.sessionManager && !getLeafId)) {
|
||||
return null;
|
||||
}
|
||||
const liveLeafId = getLeafId ? getLeafId() : undefined;
|
||||
if (getLeafId && !liveLeafId) {
|
||||
return null;
|
||||
}
|
||||
const maxBytes = params.maxBytes ?? MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES;
|
||||
try {
|
||||
const stat = await fs.stat(sessionFile);
|
||||
if (!stat.isFile() || stat.size > maxBytes) {
|
||||
return null;
|
||||
}
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
const parsedSessionFile = path.parse(sessionFile);
|
||||
const snapshotFile = path.join(
|
||||
parsedSessionFile.dir,
|
||||
`${parsedSessionFile.name}.checkpoint.${randomUUID()}${parsedSessionFile.ext || ".jsonl"}`,
|
||||
);
|
||||
try {
|
||||
await fs.copyFile(sessionFile, snapshotFile);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
const sessionId = await readSessionIdFromTranscriptHeaderAsync(snapshotFile);
|
||||
const leafId = liveLeafId ?? (await readSessionLeafIdFromTranscriptAsync(snapshotFile, maxBytes));
|
||||
if (!sessionId || !leafId) {
|
||||
try {
|
||||
await fs.unlink(snapshotFile);
|
||||
} catch {
|
||||
// Best-effort cleanup if the copied transcript cannot be validated.
|
||||
}
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
sessionId,
|
||||
sessionFile: snapshotFile,
|
||||
leafId,
|
||||
};
|
||||
}
|
||||
|
||||
export async function cleanupCompactionCheckpointSnapshot(
|
||||
snapshot: CapturedCompactionCheckpointSnapshot | null | undefined,
|
||||
): Promise<void> {
|
||||
|
||||
@@ -16,6 +16,7 @@ import {
|
||||
getSessionDefaults,
|
||||
listAgentsForGateway,
|
||||
listSessionsFromStore,
|
||||
listSessionsFromStoreAsync,
|
||||
loadSessionEntry,
|
||||
migrateAndPruneGatewaySessionStoreKey,
|
||||
parseGroupKey,
|
||||
@@ -1110,6 +1111,55 @@ describe("resolveSessionModelRef", () => {
|
||||
});
|
||||
|
||||
describe("listSessionsFromStore selected model display", () => {
|
||||
test("async list yields during bulk transcript title and last-message hydration", async () => {
|
||||
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-sessions-list-yield-"));
|
||||
try {
|
||||
const storePath = path.join(tmpDir, "sessions.json");
|
||||
const store: Record<string, SessionEntry> = {};
|
||||
const now = Date.now();
|
||||
for (let i = 0; i < 11; i += 1) {
|
||||
const sessionId = `sess-yield-${i}`;
|
||||
store[`agent:main:${sessionId}`] = {
|
||||
sessionId,
|
||||
updatedAt: now - i,
|
||||
} as SessionEntry;
|
||||
fs.writeFileSync(
|
||||
path.join(tmpDir, `${sessionId}.jsonl`),
|
||||
[
|
||||
JSON.stringify({ type: "session", version: 1, id: sessionId }),
|
||||
JSON.stringify({ message: { role: "user", content: `title ${i}` } }),
|
||||
JSON.stringify({ message: { role: "assistant", content: `last ${i}` } }),
|
||||
].join("\n"),
|
||||
"utf-8",
|
||||
);
|
||||
}
|
||||
|
||||
const params = {
|
||||
cfg: createModelDefaultsConfig({ primary: "openai/gpt-5.4" }),
|
||||
storePath,
|
||||
store,
|
||||
opts: { includeDerivedTitles: true, includeLastMessage: true, limit: 11 },
|
||||
};
|
||||
const expected = listSessionsFromStore(params);
|
||||
const listedPromise = listSessionsFromStoreAsync(params);
|
||||
let settled = false;
|
||||
void listedPromise.then(() => {
|
||||
settled = true;
|
||||
});
|
||||
|
||||
await Promise.resolve();
|
||||
|
||||
expect(settled).toBe(false);
|
||||
const listed = await listedPromise;
|
||||
expect(listed.path).toBe(expected.path);
|
||||
expect(listed.count).toBe(expected.count);
|
||||
expect(listed.defaults).toEqual(expected.defaults);
|
||||
expect(listed.sessions).toEqual(expected.sessions);
|
||||
} finally {
|
||||
fs.rmSync(tmpDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
test("caps transcript title and last-message hydration for bulk list responses", () => {
|
||||
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-sessions-list-cap-"));
|
||||
try {
|
||||
|
||||
@@ -1675,23 +1675,21 @@ export function loadGatewaySessionRow(
|
||||
});
|
||||
}
|
||||
|
||||
export function listSessionsFromStore(params: {
|
||||
cfg: OpenClawConfig;
|
||||
storePath: string;
|
||||
store: Record<string, SessionEntry>;
|
||||
modelCatalog?: ModelCatalogEntry[];
|
||||
opts: import("./protocol/index.js").SessionsListParams;
|
||||
}): SessionsListResult {
|
||||
const { cfg, storePath, store, opts } = params;
|
||||
const now = Date.now();
|
||||
const sessionListTranscriptUsageMaxBytes = 64 * 1024;
|
||||
const sessionListTranscriptFieldRows = 100;
|
||||
const storeChildSessionsByKey = buildStoreChildSessionIndex(store, now);
|
||||
/**
|
||||
* Number of session rows to build per batch before yielding to the event loop.
|
||||
* Keeps the main thread responsive during large session list operations while
|
||||
* avoiding excessive yielding overhead for small stores.
|
||||
*/
|
||||
const SESSIONS_LIST_YIELD_BATCH_SIZE = 10;
|
||||
|
||||
function filterAndSortSessionEntries(params: {
|
||||
store: Record<string, SessionEntry>;
|
||||
opts: import("./protocol/index.js").SessionsListParams;
|
||||
now: number;
|
||||
}): [string, SessionEntry][] {
|
||||
const { store, opts, now } = params;
|
||||
const includeGlobal = opts.includeGlobal === true;
|
||||
const includeUnknown = opts.includeUnknown === true;
|
||||
const includeDerivedTitles = opts.includeDerivedTitles === true;
|
||||
const includeLastMessage = opts.includeLastMessage === true;
|
||||
const spawnedBy = typeof opts.spawnedBy === "string" ? opts.spawnedBy : "";
|
||||
const label = normalizeOptionalString(opts.label) ?? "";
|
||||
const agentId = typeof opts.agentId === "string" ? normalizeAgentId(opts.agentId) : "";
|
||||
@@ -1782,6 +1780,26 @@ export function listSessionsFromStore(params: {
|
||||
entries = entries.slice(0, limit);
|
||||
}
|
||||
|
||||
return entries;
|
||||
}
|
||||
|
||||
export function listSessionsFromStore(params: {
|
||||
cfg: OpenClawConfig;
|
||||
storePath: string;
|
||||
store: Record<string, SessionEntry>;
|
||||
modelCatalog?: ModelCatalogEntry[];
|
||||
opts: import("./protocol/index.js").SessionsListParams;
|
||||
}): SessionsListResult {
|
||||
const { cfg, storePath, store, opts } = params;
|
||||
const now = Date.now();
|
||||
const sessionListTranscriptUsageMaxBytes = 64 * 1024;
|
||||
const sessionListTranscriptFieldRows = 100;
|
||||
const storeChildSessionsByKey = buildStoreChildSessionIndex(store, now);
|
||||
const includeDerivedTitles = opts.includeDerivedTitles === true;
|
||||
const includeLastMessage = opts.includeLastMessage === true;
|
||||
|
||||
const entries = filterAndSortSessionEntries({ store, opts, now });
|
||||
|
||||
const sessions = entries.map(([key, entry], index) => {
|
||||
const includeTranscriptFields = index < sessionListTranscriptFieldRows;
|
||||
return buildGatewaySessionRow({
|
||||
@@ -1807,3 +1825,65 @@ export function listSessionsFromStore(params: {
|
||||
sessions,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Async version of listSessionsFromStore that yields to the event loop between
|
||||
* batches of session row builds. This prevents large session stores from
|
||||
* blocking the event loop during sessions.list requests.
|
||||
*
|
||||
* The synchronous file I/O in readSessionTitleFieldsFromTranscript (head/tail
|
||||
* reads for derived titles and last-message previews) is the dominant blocker.
|
||||
* By yielding every SESSIONS_LIST_YIELD_BATCH_SIZE rows, we keep the event
|
||||
* loop responsive for WebSocket heartbeats, channel I/O, and concurrent RPC.
|
||||
*/
|
||||
export async function listSessionsFromStoreAsync(params: {
|
||||
cfg: OpenClawConfig;
|
||||
storePath: string;
|
||||
store: Record<string, SessionEntry>;
|
||||
modelCatalog?: ModelCatalogEntry[];
|
||||
opts: import("./protocol/index.js").SessionsListParams;
|
||||
}): Promise<SessionsListResult> {
|
||||
const { cfg, storePath, store, opts } = params;
|
||||
const now = Date.now();
|
||||
const sessionListTranscriptUsageMaxBytes = 64 * 1024;
|
||||
const sessionListTranscriptFieldRows = 100;
|
||||
const storeChildSessionsByKey = buildStoreChildSessionIndex(store, now);
|
||||
const includeDerivedTitles = opts.includeDerivedTitles === true;
|
||||
const includeLastMessage = opts.includeLastMessage === true;
|
||||
|
||||
const entries = filterAndSortSessionEntries({ store, opts, now });
|
||||
|
||||
const sessions: GatewaySessionRow[] = [];
|
||||
for (let i = 0; i < entries.length; i++) {
|
||||
const [key, entry] = entries[i];
|
||||
const includeTranscriptFields = i < sessionListTranscriptFieldRows;
|
||||
sessions.push(
|
||||
buildGatewaySessionRow({
|
||||
cfg,
|
||||
storePath,
|
||||
store,
|
||||
key,
|
||||
entry,
|
||||
modelCatalog: params.modelCatalog,
|
||||
now,
|
||||
includeDerivedTitles: includeTranscriptFields && includeDerivedTitles,
|
||||
includeLastMessage: includeTranscriptFields && includeLastMessage,
|
||||
transcriptUsageMaxBytes: sessionListTranscriptUsageMaxBytes,
|
||||
storeChildSessionsByKey,
|
||||
}),
|
||||
);
|
||||
// Yield to the event loop between batches so WebSocket heartbeats,
|
||||
// channel I/O, and concurrent RPC calls are not starved.
|
||||
if ((i + 1) % SESSIONS_LIST_YIELD_BATCH_SIZE === 0 && i + 1 < entries.length) {
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
ts: now,
|
||||
path: storePath,
|
||||
count: sessions.length,
|
||||
defaults: getSessionDefaults(cfg, params.modelCatalog),
|
||||
sessions,
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user