diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e9c057a29d..becc3bb43c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Docs: https://docs.openclaw.ai ### Fixes - Google Meet: interrupt Realtime provider output when local barge-in clears playback, so command-pair audio stops model speech instead of only restarting Chrome playback. Fixes #73850. (#73834) Thanks @shhtheonlyperson. +- Gateway/chat: bound chat-history transcript reads to the requested display window so large session logs no longer OOM the Gateway when clients ask for a small history page. Thanks @vincentkoc. - Voice Call/Twilio: honor stored pre-connect TwiML before realtime webhook shortcuts and reject DTMF sequences outside conversation mode, so Meet PIN entry cannot be skipped or silently dropped. Thanks @donkeykong91 and @PfanP. - Google Meet/Voice Call: play Twilio Meet DTMF before opening the realtime media stream and carry the intro as the initial Voice Call message, so the greeting is generated after Meet admits the phone participant instead of racing a live-call TwiML update. Thanks @donkeykong91 and @PfanP. - Google Meet/Voice Call: make Twilio setup preflight honor explicit `--transport twilio` and fail local/private Voice Call webhook URLs, including IPv6 loopback and unique-local forms, before joins. Thanks @donkeykong91 and @PfanP. diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 9952eaea285..0f750a2b9ce 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -100,7 +100,7 @@ import { resolveGatewayModelSupportsImages, resolveGatewaySessionThinkingDefault, resolveDeletedAgentIdFromSessionKey, - readSessionMessages, + readRecentSessionMessages, resolveSessionModelRef, } from "../session-utils.js"; import { formatForLog } from "../ws-log.js"; @@ -1662,17 +1662,23 @@ export const chatHandlers: GatewayRequestHandlers = { const sessionId = entry?.sessionId; const sessionAgentId = resolveSessionAgentId({ sessionKey, config: cfg }); const resolvedSessionModel = resolveSessionModelRef(cfg, entry, sessionAgentId); + const hardMax = 1000; + const defaultLimit = 200; + const requested = typeof limit === "number" ? limit : defaultLimit; + const max = Math.min(hardMax, requested); + const maxHistoryBytes = getMaxChatHistoryMessagesBytes(); const localMessages = - sessionId && storePath ? readSessionMessages(sessionId, storePath, entry?.sessionFile) : []; + sessionId && storePath + ? readRecentSessionMessages(sessionId, storePath, entry?.sessionFile, { + maxMessages: max, + maxBytes: Math.max(maxHistoryBytes * 2, 1024 * 1024), + }) + : []; const rawMessages = augmentChatHistoryWithCliSessionImports({ entry, provider: resolvedSessionModel.provider, localMessages, }); - const hardMax = 1000; - const defaultLimit = 200; - const requested = typeof limit === "number" ? limit : defaultLimit; - const max = Math.min(hardMax, requested); const effectiveMaxChars = resolveEffectiveChatHistoryMaxChars(cfg, maxChars); const normalized = augmentChatHistoryWithCanvasBlocks( projectRecentChatDisplayMessages(rawMessages, { @@ -1680,7 +1686,6 @@ export const chatHandlers: GatewayRequestHandlers = { maxMessages: max, }), ); - const maxHistoryBytes = getMaxChatHistoryMessagesBytes(); const perMessageHardCap = Math.min(CHAT_HISTORY_MAX_SINGLE_MESSAGE_BYTES, maxHistoryBytes); const replaced = replaceOversizedChatHistoryMessages({ messages: normalized, diff --git a/src/gateway/session-utils.fs.test.ts b/src/gateway/session-utils.fs.test.ts index f15e5dbf2a3..008fcc3d998 100644 --- a/src/gateway/session-utils.fs.test.ts +++ b/src/gateway/session-utils.fs.test.ts @@ -8,6 +8,7 @@ import { readFirstUserMessageFromTranscript, readLastMessagePreviewFromTranscript, readLatestSessionUsageFromTranscript, + readRecentSessionMessages, readSessionMessages, readSessionTitleFieldsFromTranscript, readSessionPreviewItemsFromTranscript, @@ -501,6 +502,66 @@ describe("readSessionMessages", () => { expect(typeof marker.timestamp).toBe("number"); }); + test("reads recent messages from the transcript tail without loading the whole file", () => { + const sessionId = "test-session-recent-tail"; + writeTranscript(tmpDir, sessionId, [ + { type: "session", version: 1, id: sessionId }, + { message: { role: "user", content: "old" } }, + { message: { role: "assistant", content: "middle" } }, + { message: { role: "user", content: "recent" } }, + { message: { role: "assistant", content: "latest" } }, + ]); + + const out = readRecentSessionMessages(sessionId, storePath, undefined, { + maxMessages: 2, + maxBytes: 1024, + }); + + expect(out).toEqual([ + expect.objectContaining({ + role: "user", + content: "recent", + __openclaw: expect.objectContaining({ seq: 3 }), + }), + expect.objectContaining({ + role: "assistant", + content: "latest", + __openclaw: expect.objectContaining({ seq: 4 }), + }), + ]); + }); + + test("bounds recent-message reads for large append-only transcripts", () => { + const sessionId = "test-session-recent-large"; + const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`); + const lines = [ + JSON.stringify({ type: "session", version: 1, id: sessionId }), + ...Array.from({ length: 2500 }, (_, index) => + JSON.stringify({ + message: { + role: index % 2 === 0 ? "user" : "assistant", + content: `message ${index} ${"x".repeat(700)}`, + }, + }), + ), + JSON.stringify({ message: { role: "assistant", content: "tail" } }), + ]; + fs.writeFileSync(transcriptPath, lines.join("\n"), "utf-8"); + const readFileSpy = vi.spyOn(fs, "readFileSync"); + + try { + const out = readRecentSessionMessages(sessionId, storePath, undefined, { + maxMessages: 1, + maxBytes: 64 * 1024, + }); + expect(out).toHaveLength(1); + expect(out[0]).toMatchObject({ role: "assistant", content: "tail" }); + expect(readFileSpy).not.toHaveBeenCalled(); + } finally { + readFileSpy.mockRestore(); + } + }); + test("reads only the active branch when transcript rewrites abandon older entries", () => { const sessionId = "test-session-active-branch"; const sessionFile = path.join(tmpDir, `${sessionId}.jsonl`); diff --git a/src/gateway/session-utils.fs.ts b/src/gateway/session-utils.fs.ts index 17fdf42e53e..32ffb820fd2 100644 --- a/src/gateway/session-utils.fs.ts +++ b/src/gateway/session-utils.fs.ts @@ -104,17 +104,7 @@ export function readSessionMessages( } const lines = fs.readFileSync(filePath, "utf-8").split(/\r?\n/); - const hasTreeEntries = lines.some((line) => { - if (!line.trim()) { - return false; - } - try { - const parsed = JSON.parse(line) as { type?: unknown; id?: unknown; parentId?: unknown }; - return parsed.type !== "session" && typeof parsed.id === "string" && "parentId" in parsed; - } catch { - return false; - } - }); + const hasTreeEntries = lines.some(hasSessionTreeEntry); let branchEntries: SessionEntry[] | null = null; if (hasTreeEntries) { try { @@ -166,33 +156,10 @@ export function readSessionMessages( } try { const parsed = JSON.parse(line); - if (parsed?.message) { + const message = parsedSessionEntryToMessage(parsed, messageSeq + 1); + if (message) { messageSeq += 1; - messages.push( - attachOpenClawTranscriptMeta(parsed.message, { - ...(typeof parsed.id === "string" ? { id: parsed.id } : {}), - seq: messageSeq, - }), - ); - continue; - } - - // Compaction entries are not "message" records, but they're useful context for debugging. - // Emit a lightweight synthetic message that the Web UI can render as a divider. - if (parsed?.type === "compaction") { - const ts = typeof parsed.timestamp === "string" ? Date.parse(parsed.timestamp) : Number.NaN; - const timestamp = Number.isFinite(ts) ? ts : Date.now(); - messageSeq += 1; - messages.push({ - role: "system", - content: [{ type: "text", text: "Compaction" }], - timestamp, - __openclaw: { - kind: "compaction", - id: typeof parsed.id === "string" ? parsed.id : undefined, - seq: messageSeq, - }, - }); + messages.push(message); } } catch { // ignore bad lines @@ -201,6 +168,128 @@ export function readSessionMessages( return messages; } +export type ReadRecentSessionMessagesOptions = { + maxMessages: number; + maxBytes?: number; + maxLines?: number; +}; + +const RECENT_SESSION_MESSAGES_DEFAULT_MAX_BYTES = 8 * 1024 * 1024; + +export function readRecentSessionMessages( + sessionId: string, + storePath: string | undefined, + sessionFile?: string, + opts?: ReadRecentSessionMessagesOptions, +): unknown[] { + const maxMessages = Math.max(0, Math.floor(opts?.maxMessages ?? 0)); + if (maxMessages === 0) { + return []; + } + + const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile); + if (!filePath) { + return []; + } + + let stat: fs.Stats; + try { + stat = fs.statSync(filePath); + } catch { + return []; + } + if (stat.size === 0) { + return []; + } + + const maxBytes = Math.max( + 1024, + Math.floor(opts?.maxBytes ?? RECENT_SESSION_MESSAGES_DEFAULT_MAX_BYTES), + ); + const readLen = Math.min(stat.size, maxBytes); + const readStart = Math.max(0, stat.size - readLen); + const maxLines = Math.max(maxMessages, Math.floor(opts?.maxLines ?? maxMessages * 20 + 20)); + + return ( + withOpenTranscriptFd(filePath, (fd) => { + const buf = Buffer.alloc(readLen); + const bytesRead = fs.readSync(fd, buf, 0, readLen, readStart); + if (bytesRead <= 0) { + return []; + } + const chunk = buf.toString("utf-8", 0, bytesRead); + const lines = chunk + .split(/\r?\n/) + .slice(readStart > 0 ? 1 : 0) + .filter((line) => line.trim().length > 0) + .slice(-maxLines); + + if (lines.some(hasSessionTreeEntry)) { + return readSessionMessages(sessionId, storePath, sessionFile).slice(-maxMessages); + } + + const messages: unknown[] = []; + let messageSeq = 0; + for (const line of lines) { + try { + const parsed = JSON.parse(line); + const message = parsedSessionEntryToMessage(parsed, messageSeq + 1); + if (message) { + messageSeq += 1; + messages.push(message); + } + } catch { + // ignore bad tail lines + } + } + return messages.slice(-maxMessages); + }) ?? [] + ); +} + +function hasSessionTreeEntry(line: string): boolean { + if (!line.trim()) { + return false; + } + try { + const parsed = JSON.parse(line) as { type?: unknown; id?: unknown; parentId?: unknown }; + return parsed.type !== "session" && typeof parsed.id === "string" && "parentId" in parsed; + } catch { + return false; + } +} + +function parsedSessionEntryToMessage(parsed: unknown, seq: number): unknown | null { + if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) { + return null; + } + const entry = parsed as Record; + if (entry.message) { + return attachOpenClawTranscriptMeta(entry.message, { + ...(typeof entry.id === "string" ? { id: entry.id } : {}), + seq, + }); + } + + // Compaction entries are not "message" records, but they're useful context for debugging. + // Emit a lightweight synthetic message that the Web UI can render as a divider. + if (entry.type === "compaction") { + const ts = typeof entry.timestamp === "string" ? Date.parse(entry.timestamp) : Number.NaN; + const timestamp = Number.isFinite(ts) ? ts : Date.now(); + return { + role: "system", + content: [{ type: "text", text: "Compaction" }], + timestamp, + __openclaw: { + kind: "compaction", + id: typeof entry.id === "string" ? entry.id : undefined, + seq, + }, + }; + } + return null; +} + export { archiveFileOnDisk, archiveSessionTranscripts, diff --git a/src/gateway/session-utils.ts b/src/gateway/session-utils.ts index 62bcfc38d24..66a0f8d4e80 100644 --- a/src/gateway/session-utils.ts +++ b/src/gateway/session-utils.ts @@ -105,6 +105,7 @@ export { readFirstUserMessageFromTranscript, readLastMessagePreviewFromTranscript, readLatestSessionUsageFromTranscript, + readRecentSessionMessages, readSessionTitleFieldsFromTranscript, readSessionPreviewItemsFromTranscript, readSessionMessages,