mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 13:30:42 +00:00
fix(gateway): skip oversized JSONL lines to prevent event-loop starvation
Large transcript JSONL records (multi-MB tool results, file content) block the event loop via JSON.parse before truncation logic can skip them. Add a 256 KiB line-size guard to parseTailTranscriptRecord and extractUsageSnapshotFromTranscriptLine, and replace the full transcript index scan in readSessionTitleFieldsFromTranscriptAsync with the existing bounded sync reader. Observed improvement on a production install (33 sessions): chat.history dropped from 13-16s to ~1.2s, event loop utilization from 0.999 to normal, steady-state CPU from ~100% to 0.2-0.3%. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -24,6 +24,7 @@ import {
|
||||
readSessionMessagesAsync,
|
||||
readSessionMessages,
|
||||
readSessionTitleFieldsFromTranscript,
|
||||
readSessionTitleFieldsFromTranscriptAsync,
|
||||
readSessionPreviewItemsFromTranscript,
|
||||
resolveSessionTranscriptCandidates,
|
||||
} from "./session-utils.fs.js";
|
||||
@@ -1654,3 +1655,86 @@ describe("archiveSessionTranscripts", () => {
|
||||
expect(fs.existsSync(transcriptPath)).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("oversized transcript line guards", () => {
|
||||
let tmpDir: string;
|
||||
let storePath: string;
|
||||
|
||||
registerTempSessionStore("openclaw-session-fs-oversized-", (nextTmpDir, nextStorePath) => {
|
||||
tmpDir = nextTmpDir;
|
||||
storePath = nextStorePath;
|
||||
});
|
||||
|
||||
test("readRecentSessionMessagesAsync skips oversized JSONL lines", async () => {
|
||||
const sessionId = "test-oversized-recent";
|
||||
const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`);
|
||||
const oversizedContent = "x".repeat(300 * 1024);
|
||||
const lines = [
|
||||
JSON.stringify({ type: "session", version: 1, id: sessionId }),
|
||||
JSON.stringify({ message: { role: "user", content: "start" } }),
|
||||
JSON.stringify({ message: { role: "assistant", content: oversizedContent } }),
|
||||
JSON.stringify({ message: { role: "user", content: "after oversized" } }),
|
||||
];
|
||||
fs.writeFileSync(transcriptPath, `${lines.join("\n")}\n`, "utf-8");
|
||||
|
||||
const out = await readRecentSessionMessagesAsync(sessionId, storePath, undefined, {
|
||||
maxMessages: 10,
|
||||
});
|
||||
|
||||
const contents = out.map((m) => (typeof m.content === "string" ? m.content : ""));
|
||||
expect(contents).not.toContain(oversizedContent);
|
||||
expect(contents).toContain("after oversized");
|
||||
});
|
||||
|
||||
test("readRecentSessionUsageFromTranscriptAsync skips oversized lines", async () => {
|
||||
const sessionId = "test-oversized-usage";
|
||||
const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`);
|
||||
const oversizedContent = "y".repeat(300 * 1024);
|
||||
const lines = [
|
||||
JSON.stringify({ type: "session", version: 1, id: sessionId }),
|
||||
JSON.stringify({
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: oversizedContent,
|
||||
usage: { input: 9999, output: 9999 },
|
||||
provider: "oversized-provider",
|
||||
model: "oversized-model",
|
||||
},
|
||||
}),
|
||||
JSON.stringify({
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: "normal",
|
||||
usage: { input: 100, output: 50 },
|
||||
provider: "test-provider",
|
||||
model: "test-model",
|
||||
},
|
||||
}),
|
||||
];
|
||||
fs.writeFileSync(transcriptPath, `${lines.join("\n")}\n`, "utf-8");
|
||||
|
||||
const usage = await readRecentSessionUsageFromTranscriptAsync(
|
||||
sessionId,
|
||||
storePath,
|
||||
undefined,
|
||||
undefined,
|
||||
512 * 1024,
|
||||
);
|
||||
|
||||
expect(usage).not.toBeNull();
|
||||
expect(usage?.modelProvider).not.toBe("oversized-provider");
|
||||
expect(usage?.modelProvider).toBe("test-provider");
|
||||
});
|
||||
|
||||
test("readSessionTitleFieldsFromTranscriptAsync delegates to bounded sync reader", async () => {
|
||||
const sessionId = "test-async-title-bounded";
|
||||
writeTranscript(tmpDir, sessionId, buildBasicSessionTranscript(sessionId, "User says hi", "Bot says hello"));
|
||||
|
||||
const syncResult = readSessionTitleFieldsFromTranscript(sessionId, storePath);
|
||||
const asyncResult = await readSessionTitleFieldsFromTranscriptAsync(sessionId, storePath);
|
||||
|
||||
expect(asyncResult).toEqual(syncResult);
|
||||
expect(asyncResult.firstUserMessage).toBe("User says hi");
|
||||
expect(asyncResult.lastMessagePreview).toBe("Bot says hello");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -265,11 +265,18 @@ async function readRecentTranscriptTailLinesAsync(
|
||||
}
|
||||
}
|
||||
|
||||
const MAX_TRANSCRIPT_PARSE_LINE_BYTES = 256 * 1024;
|
||||
|
||||
function isOversizedTranscriptLine(line: string): boolean {
|
||||
return Buffer.byteLength(line, "utf8") > MAX_TRANSCRIPT_PARSE_LINE_BYTES;
|
||||
}
|
||||
|
||||
function normalizeTailEntryString(value: unknown): string | undefined {
|
||||
return typeof value === "string" && value.trim().length > 0 ? value : undefined;
|
||||
}
|
||||
|
||||
function parseTailTranscriptRecord(line: string): TailTranscriptRecord | null {
|
||||
if (isOversizedTranscriptLine(line)) return null;
|
||||
try {
|
||||
const parsed = JSON.parse(line) as unknown;
|
||||
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
|
||||
@@ -797,59 +804,7 @@ export async function readSessionTitleFieldsFromTranscriptAsync(
|
||||
agentId?: string,
|
||||
opts?: { includeInterSession?: boolean },
|
||||
): Promise<SessionTitleFields> {
|
||||
const candidates = resolveSessionTranscriptCandidates(sessionId, storePath, sessionFile, agentId);
|
||||
const filePath = candidates.find((p) => fs.existsSync(p));
|
||||
if (!filePath) {
|
||||
return { firstUserMessage: null, lastMessagePreview: null };
|
||||
}
|
||||
let stat: fs.Stats;
|
||||
try {
|
||||
stat = await fs.promises.stat(filePath);
|
||||
} catch {
|
||||
return { firstUserMessage: null, lastMessagePreview: null };
|
||||
}
|
||||
const cacheKey = readSessionTitleFieldsCacheKey(filePath, opts);
|
||||
const cached = getCachedSessionTitleFields(cacheKey, stat);
|
||||
if (cached) {
|
||||
return cached;
|
||||
}
|
||||
const index = await readSessionTranscriptIndex(filePath);
|
||||
if (!index) {
|
||||
return { firstUserMessage: null, lastMessagePreview: null };
|
||||
}
|
||||
|
||||
let firstUserMessage: string | null = null;
|
||||
for (const entry of index.entries) {
|
||||
const msg = entry.record.message as TranscriptMessage | undefined;
|
||||
if (msg?.role !== "user") {
|
||||
continue;
|
||||
}
|
||||
if (opts?.includeInterSession !== true && hasInterSessionUserProvenance(msg)) {
|
||||
continue;
|
||||
}
|
||||
const text = extractTextFromContent(msg.content);
|
||||
if (text) {
|
||||
firstUserMessage = text;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let lastMessagePreview: string | null = null;
|
||||
for (const entry of index.entries.toReversed()) {
|
||||
const msg = entry.record.message as TranscriptMessage | undefined;
|
||||
if (!msg || (msg.role !== "user" && msg.role !== "assistant")) {
|
||||
continue;
|
||||
}
|
||||
const text = extractTextFromContent(msg.content);
|
||||
if (text) {
|
||||
lastMessagePreview = text;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
const result = { firstUserMessage, lastMessagePreview };
|
||||
setCachedSessionTitleFields(cacheKey, stat, result);
|
||||
return result;
|
||||
return readSessionTitleFieldsFromTranscript(sessionId, storePath, sessionFile, agentId, opts);
|
||||
}
|
||||
|
||||
function extractTextFromContent(content: TranscriptMessage["content"]): string | null {
|
||||
@@ -1045,6 +1000,7 @@ function resolvePositiveUsageNumber(value: unknown): number | undefined {
|
||||
function extractUsageSnapshotFromTranscriptLine(
|
||||
line: string,
|
||||
): SessionTranscriptUsageSnapshot | null {
|
||||
if (isOversizedTranscriptLine(line)) return null;
|
||||
try {
|
||||
const parsed = JSON.parse(line) as Record<string, unknown>;
|
||||
const message =
|
||||
|
||||
Reference in New Issue
Block a user