fix(gateway): bound session transcript hot paths

Bound recent transcript reads and oversized injected-message writes across gateway session paths.\n\nThanks @vincentkoc
This commit is contained in:
Vincent Koc
2026-05-01 09:00:43 -07:00
committed by GitHub
parent ea4d0a3ce7
commit ec59af3386
14 changed files with 728 additions and 94 deletions

View File

@@ -78,6 +78,7 @@ Docs: https://docs.openclaw.ai
- Plugins/update: skip ClawHub and marketplace plugin updates when the bundled version is newer than the recorded installed version, so `openclaw update` no longer overwrites working bundled plugins with older external packages. Fixes #75447. Thanks @amknight.
- Gateway/sessions: use bounded tail reads for sessions-list transcript usage fallbacks and cap bulk title/last-message hydration, keeping large session stores responsive when rows request derived previews. Thanks @vincentkoc.
- Gateway/sessions: yield during bulk transcript title/preview hydration and copy compaction checkpoints asynchronously, keeping the Gateway event loop responsive for large session stores and large transcripts. Refs #75330 and #75414. Thanks @amknight.
- Gateway/sessions: stream bounded transcript reads for session detail, history, artifacts, compaction, and send/subscribe sequence paths so small Gateway requests no longer materialize large transcripts or OOM on oversized session logs. Thanks @vincentkoc.
- Gateway/chat: bound chat-history transcript reads to the requested display window so large session logs no longer OOM the Gateway when clients ask for a small history page. Thanks @vincentkoc.
- Voice Call/Twilio: honor stored pre-connect TwiML before realtime webhook shortcuts and reject DTMF sequences outside conversation mode, so Meet PIN entry cannot be skipped or silently dropped. Thanks @donkeykong91 and @PfanP.
- Docs/sandboxing: clarify that sandbox setup scripts (`sandbox-setup.sh`, `sandbox-common-setup.sh`, `sandbox-browser-setup.sh`) are only available from a source checkout, and add inline `docker build` commands for npm-installed users so sandbox image setup works without cloning the repo. Fixes #75485. Thanks @amknight.

View File

@@ -4,7 +4,7 @@ import { artifactsHandlers, collectArtifactsFromMessages } from "./artifacts.js"
const hoisted = vi.hoisted(() => ({
getTaskSessionLookupByIdForStatus: vi.fn(),
loadSessionEntry: vi.fn(),
readSessionMessages: vi.fn(),
visitSessionMessages: vi.fn(),
resolveSessionKeyForRun: vi.fn(),
}));
@@ -17,7 +17,7 @@ vi.mock("../session-utils.js", async () => {
return {
...actual,
loadSessionEntry: hoisted.loadSessionEntry,
readSessionMessages: hoisted.readSessionMessages,
visitSessionMessages: hoisted.visitSessionMessages,
};
});
@@ -49,7 +49,7 @@ describe("artifacts RPC handlers", () => {
storePath: "/tmp/sessions.json",
entry: { sessionId: "sess-main", sessionFile: "/tmp/sess-main.jsonl" },
});
hoisted.readSessionMessages.mockReturnValue([
mockedMessages([
{
role: "assistant",
content: [
@@ -66,6 +66,15 @@ describe("artifacts RPC handlers", () => {
]);
});
function mockedMessages(messages: unknown[]) {
hoisted.visitSessionMessages.mockImplementation(
(_sessionId, _storePath, _sessionFile, visit) => {
messages.forEach((message, index) => visit(message, index + 1));
return messages.length;
},
);
}
it("lists stable transcript artifact summaries by sessionKey", async () => {
const { calls, respond } = createResponder();
@@ -99,7 +108,21 @@ describe("artifacts RPC handlers", () => {
it("gets and downloads an inline artifact", async () => {
const listed = collectArtifactsFromMessages({
sessionKey: "agent:main:main",
messages: hoisted.readSessionMessages(),
messages: [
{
role: "assistant",
content: [
{ type: "text", text: "see attached" },
{
type: "image",
data: "aGVsbG8=",
mimeType: "image/png",
alt: "result.png",
},
],
__openclaw: { seq: 2 },
},
],
});
const artifactId = listed[0]?.id;
expect(artifactId).toBeTruthy();
@@ -137,7 +160,7 @@ describe("artifacts RPC handlers", () => {
it("resolves runId queries through the gateway run-to-session lookup", async () => {
hoisted.resolveSessionKeyForRun.mockReturnValue("agent:main:main");
hoisted.readSessionMessages.mockReturnValue([
mockedMessages([
{
role: "assistant",
content: [{ type: "image", data: "aGVsbG8=", alt: "run-result.png" }],
@@ -166,7 +189,7 @@ describe("artifacts RPC handlers", () => {
requesterSessionKey: "agent:main:main",
runId: "run-for-task-1",
});
hoisted.readSessionMessages.mockReturnValue([
mockedMessages([
{
role: "assistant",
content: [{ type: "image", data: "dGFyZ2V0", alt: "task-result.png" }],
@@ -257,7 +280,7 @@ describe("artifacts RPC handlers", () => {
});
it("discovers transcript image_url data blocks", async () => {
hoisted.readSessionMessages.mockReturnValue([
mockedMessages([
{
role: "user",
content: [

View File

@@ -10,7 +10,7 @@ import {
validateArtifactsListParams,
} from "../protocol/index.js";
import { resolveSessionKeyForRun } from "../server-session-key.js";
import { loadSessionEntry, readSessionMessages } from "../session-utils.js";
import { loadSessionEntry, visitSessionMessages } from "../session-utils.js";
import type { GatewayRequestHandlers, RespondFn } from "./types.js";
import { assertValidParams } from "./validation.js";
@@ -215,61 +215,72 @@ export function collectArtifactsFromMessages(params: {
const artifacts: ArtifactRecord[] = [];
let messageFallbackSeq = 0;
for (const message of params.messages) {
const msg = asRecord(message);
if (!msg) {
continue;
}
messageFallbackSeq += 1;
const messageSeq = resolveMessageSeq(msg, messageFallbackSeq);
const messageRunId = resolveMessageRunId(msg);
const messageTaskId = resolveMessageTaskId(msg);
if (params.runId && messageRunId !== params.runId) {
continue;
}
if (params.taskId && messageTaskId !== params.taskId) {
continue;
}
const content = Array.isArray(msg.content) ? msg.content : [];
for (let contentIndex = 0; contentIndex < content.length; contentIndex += 1) {
const block = asRecord(content[contentIndex]);
if (!block || !isArtifactBlock(block)) {
continue;
}
const type = normalizeArtifactType(asNonEmptyString(block.type) ?? "file");
const title =
asNonEmptyString(block.title) ??
asNonEmptyString(block.fileName) ??
asNonEmptyString(block.filename) ??
asNonEmptyString(block.alt) ??
`${type} ${artifacts.length + 1}`;
const download = resolveBlockDownload(block);
const summary: ArtifactRecord = {
id: artifactId({
sessionKey: params.sessionKey,
messageSeq,
contentIndex,
title,
type,
}),
type,
title,
...(download.mimeType ? { mimeType: download.mimeType } : {}),
...(download.sizeBytes !== undefined ? { sizeBytes: download.sizeBytes } : {}),
sessionKey: params.sessionKey,
...(messageRunId ? { runId: messageRunId } : {}),
...(messageTaskId ? { taskId: messageTaskId } : {}),
messageSeq,
source: "session-transcript",
download: { mode: download.mode },
...(download.data ? { data: download.data } : {}),
...(download.url ? { url: download.url } : {}),
};
artifacts.push(summary);
}
collectArtifactsFromMessage({ ...params, message, messageFallbackSeq, artifacts });
}
return artifacts;
}
function collectArtifactsFromMessage(params: {
message: unknown;
messageFallbackSeq: number;
artifacts: ArtifactRecord[];
sessionKey: string;
runId?: string;
taskId?: string;
}): void {
const msg = asRecord(params.message);
if (!msg) {
return;
}
const messageSeq = resolveMessageSeq(msg, params.messageFallbackSeq);
const messageRunId = resolveMessageRunId(msg);
const messageTaskId = resolveMessageTaskId(msg);
if (params.runId && messageRunId !== params.runId) {
return;
}
if (params.taskId && messageTaskId !== params.taskId) {
return;
}
const content = Array.isArray(msg.content) ? msg.content : [];
for (let contentIndex = 0; contentIndex < content.length; contentIndex += 1) {
const block = asRecord(content[contentIndex]);
if (!block || !isArtifactBlock(block)) {
continue;
}
const type = normalizeArtifactType(asNonEmptyString(block.type) ?? "file");
const title =
asNonEmptyString(block.title) ??
asNonEmptyString(block.fileName) ??
asNonEmptyString(block.filename) ??
asNonEmptyString(block.alt) ??
`${type} ${params.artifacts.length + 1}`;
const download = resolveBlockDownload(block);
const summary: ArtifactRecord = {
id: artifactId({
sessionKey: params.sessionKey,
messageSeq,
contentIndex,
title,
type,
}),
type,
title,
...(download.mimeType ? { mimeType: download.mimeType } : {}),
...(download.sizeBytes !== undefined ? { sizeBytes: download.sizeBytes } : {}),
sessionKey: params.sessionKey,
...(messageRunId ? { runId: messageRunId } : {}),
...(messageTaskId ? { taskId: messageTaskId } : {}),
messageSeq,
source: "session-transcript",
download: { mode: download.mode },
...(download.data ? { data: download.data } : {}),
...(download.url ? { url: download.url } : {}),
};
params.artifacts.push(summary);
}
}
function resolveQuerySessionKey(query: ArtifactQuery): string | undefined {
if (query.sessionKey) {
return query.sessionKey;
@@ -296,16 +307,23 @@ function loadArtifacts(query: ArtifactQuery): { artifacts: ArtifactRecord[]; ses
}
const { storePath, entry } = loadSessionEntry(sessionKey);
const sessionId = entry?.sessionId;
const messages =
sessionId && storePath ? readSessionMessages(sessionId, storePath, entry?.sessionFile) : [];
return {
sessionKey,
artifacts: collectArtifactsFromMessages({
messages,
if (!sessionId || !storePath) {
return { sessionKey, artifacts: [] };
}
const artifacts: ArtifactRecord[] = [];
visitSessionMessages(sessionId, storePath, entry?.sessionFile, (message, seq) => {
collectArtifactsFromMessage({
message,
messageFallbackSeq: seq,
artifacts,
sessionKey,
runId: query.runId,
taskId: query.taskId,
}),
});
});
return {
sessionKey,
artifacts,
};
}

View File

@@ -1,9 +1,14 @@
import { randomUUID } from "node:crypto";
import fs from "node:fs";
import { StringDecoder } from "node:string_decoder";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import { formatErrorMessage } from "../../infra/errors.js";
import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js";
type AppendMessageArg = Parameters<SessionManager["appendMessage"]>[0];
const SESSION_MANAGER_APPEND_MAX_BYTES = 8 * 1024 * 1024;
export type GatewayInjectedAbortMeta = {
aborted: true;
origin: "rpc" | "stop-command";
@@ -41,6 +46,77 @@ function resolveInjectedAssistantContent(params: {
return [{ type: "text", text: `${labelPrefix}${params.message}` }];
}
function transcriptHasParentLinkedEntries(transcriptPath: string): boolean {
let fd: number | null = null;
try {
fd = fs.openSync(transcriptPath, "r");
const decoder = new StringDecoder("utf8");
const buffer = Buffer.allocUnsafe(64 * 1024);
let carry = "";
while (true) {
const bytesRead = fs.readSync(fd, buffer, 0, buffer.length, null);
if (bytesRead <= 0) {
break;
}
const text = carry + decoder.write(buffer.subarray(0, bytesRead));
const lines = text.split(/\r?\n/);
carry = lines.pop() ?? "";
for (const line of lines) {
if (lineHasParentLinkedEntry(line)) {
return true;
}
}
}
return lineHasParentLinkedEntry(carry + decoder.end());
} catch {
return true;
} finally {
if (fd !== null) {
fs.closeSync(fd);
}
}
}
function lineHasParentLinkedEntry(line: string): boolean {
if (!line.trim()) {
return false;
}
try {
const parsed = JSON.parse(line) as { type?: unknown; id?: unknown; parentId?: unknown };
return parsed.type !== "session" && typeof parsed.id === "string" && "parentId" in parsed;
} catch {
return false;
}
}
function shouldUseRawAppend(transcriptPath: string): boolean {
try {
const stat = fs.statSync(transcriptPath);
return (
stat.size > SESSION_MANAGER_APPEND_MAX_BYTES &&
!transcriptHasParentLinkedEntries(transcriptPath)
);
} catch {
return false;
}
}
function appendRawAssistantMessageToTranscript(params: {
transcriptPath: string;
message: AppendMessageArg & Record<string, unknown>;
now: number;
}): { messageId: string } {
const messageId = randomUUID();
const entry = {
type: "message",
id: messageId,
timestamp: new Date(params.now).toISOString(),
message: params.message,
};
fs.appendFileSync(params.transcriptPath, `${JSON.stringify(entry)}\n`, "utf-8");
return { messageId };
}
export function appendInjectedAssistantMessageToTranscript(params: {
transcriptPath: string;
message: string;
@@ -100,6 +176,20 @@ export function appendInjectedAssistantMessageToTranscript(params: {
};
try {
if (shouldUseRawAppend(params.transcriptPath)) {
const { messageId } = appendRawAssistantMessageToTranscript({
transcriptPath: params.transcriptPath,
message: messageBody,
now,
});
emitSessionTranscriptUpdate({
sessionFile: params.transcriptPath,
message: messageBody,
messageId,
});
return { ok: true, messageId, message: messageBody };
}
// IMPORTANT: Use SessionManager so the entry is attached to the current leaf via parentId.
// Raw jsonl appends break the parent chain and can hide compaction summaries from context.
const sessionManager = SessionManager.open(params.transcriptPath);

View File

@@ -34,4 +34,43 @@ describe("gateway chat.inject transcript writes", () => {
fs.rmSync(dir, { recursive: true, force: true });
}
});
it("uses raw append for oversized append-only transcripts", async () => {
const { dir, transcriptPath } = createTranscriptFixtureSync({
prefix: "openclaw-chat-inject-large-",
sessionId: "sess-1",
});
try {
fs.appendFileSync(
transcriptPath,
`${JSON.stringify({
type: "message",
id: "legacy-large-message",
message: {
role: "assistant",
content: [{ type: "text", text: "x".repeat(9 * 1024 * 1024) }],
},
})}\n`,
"utf-8",
);
const appended = appendInjectedAssistantMessageToTranscript({
transcriptPath,
message: "hello",
});
expect(appended.ok).toBe(true);
expect(appended.messageId).toBeTruthy();
const lines = fs.readFileSync(transcriptPath, "utf-8").split(/\r?\n/).filter(Boolean);
const last = JSON.parse(lines.at(-1) as string) as Record<string, unknown>;
expect(last.type).toBe("message");
expect(last).toHaveProperty("id", appended.messageId);
expect(last).toHaveProperty("message");
expect(Object.prototype.hasOwnProperty.call(last, "parentId")).toBe(false);
} finally {
fs.rmSync(dir, { recursive: true, force: true });
}
});
});

View File

@@ -596,7 +596,8 @@ describe("gateway chat transcript writes (guardrail)", () => {
expect(chatSrc.includes("fs.appendFileSync(transcriptPath")).toBe(false);
expect(chatSrc).toContain("appendInjectedAssistantMessageToTranscript(");
expect(helperSrc.includes("fs.appendFileSync(params.transcriptPath")).toBe(false);
expect(helperSrc).toContain("function shouldUseRawAppend(");
expect(helperSrc).toContain("function appendRawAssistantMessageToTranscript(");
expect(helperSrc).toContain("SessionManager.open(params.transcriptPath)");
expect(helperSrc).toContain("appendMessage(messageBody)");
});

View File

@@ -77,6 +77,9 @@ import {
loadGatewaySessionRow,
loadSessionEntry,
migrateAndPruneGatewaySessionStoreKey,
readRecentSessionMessagesWithStats,
readRecentSessionTranscriptLines,
readSessionMessageCount,
readSessionPreviewItemsFromTranscript,
resolveDeletedAgentIdFromSessionKey,
resolveFreshestSessionEntryFromStoreKeys,
@@ -87,7 +90,6 @@ import {
type SessionsPatchResult,
type SessionsPreviewEntry,
type SessionsPreviewResult,
readSessionMessages,
} from "../session-utils.js";
import { applySessionsPatchToStore } from "../sessions-patch.js";
import { resolveSessionKeyFromResolveParams } from "../sessions-resolve.js";
@@ -569,7 +571,7 @@ async function handleSessionSend(params: {
interruptedActiveRun = interruptResult.interrupted;
}
const messageSeq = readSessionMessages(entry.sessionId, storePath, entry.sessionFile).length + 1;
const messageSeq = readSessionMessageCount(entry.sessionId, storePath, entry.sessionFile) + 1;
let sendAcked = false;
let sendPayload: unknown;
let sendCached = false;
@@ -983,8 +985,11 @@ export const sessionsHandlers: GatewayRequestHandlers = {
let runError: unknown;
let runMeta: Record<string, unknown> | undefined;
const messageSeq = initialMessage
? readSessionMessages(createdEntry.sessionId, target.storePath, createdEntry.sessionFile)
.length + 1
? readSessionMessageCount(
createdEntry.sessionId,
target.storePath,
createdEntry.sessionFile,
) + 1
: undefined;
if (initialMessage) {
@@ -1690,8 +1695,15 @@ export const sessionsHandlers: GatewayRequestHandlers = {
respond(true, { messages: [] }, undefined);
return;
}
const allMessages = readSessionMessages(entry.sessionId, storePath, entry.sessionFile);
const messages = limit < allMessages.length ? allMessages.slice(-limit) : allMessages;
const { messages } = readRecentSessionMessagesWithStats(
entry.sessionId,
storePath,
entry.sessionFile,
{
maxMessages: limit,
maxLines: limit * 20 + 20,
},
);
respond(true, { messages }, undefined);
},
"sessions.compact": async ({ req, params, respond, context, client, isWebchatConnect }) => {
@@ -1847,16 +1859,23 @@ export const sessionsHandlers: GatewayRequestHandlers = {
return;
}
const raw = fs.readFileSync(filePath, "utf-8");
const lines = raw.split(/\r?\n/).filter((l) => Boolean(normalizeOptionalString(l)));
if (lines.length <= maxLines) {
const tail = readRecentSessionTranscriptLines({
sessionId,
storePath,
sessionFile: entry?.sessionFile,
agentId: target.agentId,
maxLines,
});
const lines = tail?.lines ?? [];
const totalLines = tail?.totalLines ?? 0;
if (totalLines <= maxLines) {
respond(
true,
{
ok: true,
key: target.canonicalKey,
compacted: false,
kept: lines.length,
kept: totalLines,
},
undefined,
);
@@ -1864,8 +1883,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
}
const archived = archiveFileOnDisk(filePath, "bak");
const keptLines = lines.slice(-maxLines);
fs.writeFileSync(filePath, `${keptLines.join("\n")}\n`, "utf-8");
fs.writeFileSync(filePath, `${lines.join("\n")}\n`, "utf-8");
await updateSessionStore(storePath, (store) => {
const entryKey = compactTarget.primaryKey;
@@ -1887,7 +1905,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
key: target.canonicalKey,
compacted: true,
archived,
kept: keptLines.length,
kept: lines.length,
},
undefined,
);

View File

@@ -11,7 +11,7 @@ import {
attachOpenClawTranscriptMeta,
loadGatewaySessionRow,
loadSessionEntry,
readSessionMessages,
readSessionMessageCount,
type GatewaySessionRow,
} from "./session-utils.js";
@@ -105,7 +105,7 @@ export function createTranscriptUpdateBroadcastHandler(params: {
}
const { entry, storePath } = loadSessionEntry(sessionKey);
const messageSeq = entry?.sessionId
? readSessionMessages(entry.sessionId, storePath, entry.sessionFile).length
? readSessionMessageCount(entry.sessionId, storePath, entry.sessionFile)
: undefined;
const sessionSnapshot = buildGatewaySessionSnapshot({
sessionRow: loadGatewaySessionRow(sessionKey),

View File

@@ -77,6 +77,69 @@ describe("SessionHistorySseState", () => {
expect(snapshot.rawTranscriptSeq).toBe(2);
});
test("marks bounded tail snapshots as having older history", () => {
const snapshot = buildSessionHistorySnapshot({
rawMessages: [
{
role: "assistant",
content: [{ type: "text", text: "tail" }],
__openclaw: { seq: 99 },
},
],
limit: 1,
rawTranscriptSeq: 99,
totalRawMessages: 99,
});
expect(snapshot.history.hasMore).toBe(true);
expect(snapshot.history.nextCursor).toBe("99");
expect(snapshot.rawTranscriptSeq).toBe(99);
});
test("refreshes limited SSE history from bounded tail reads", () => {
const fullReadSpy = vi.spyOn(sessionUtils, "readSessionMessages").mockReturnValue([]);
const tailReadSpy = vi
.spyOn(sessionUtils, "readRecentSessionMessagesWithStats")
.mockReturnValueOnce({
messages: [
{
role: "assistant",
content: [{ type: "text", text: "tail one" }],
__openclaw: { seq: 7 },
},
],
totalMessages: 7,
})
.mockReturnValueOnce({
messages: [
{
role: "assistant",
content: [{ type: "text", text: "tail two" }],
__openclaw: { seq: 8 },
},
],
totalMessages: 8,
});
try {
const state = new SessionHistorySseState({
target: { sessionId: "sess-main" },
limit: 1,
});
expect(state.snapshot().messages[0]?.__openclaw?.seq).toBe(7);
const refreshed = state.refresh();
expect(refreshed.hasMore).toBe(true);
expect(refreshed.nextCursor).toBe("8");
expect(refreshed.messages[0]?.__openclaw?.seq).toBe(8);
expect(tailReadSpy).toHaveBeenCalledTimes(2);
expect(fullReadSpy).not.toHaveBeenCalled();
} finally {
fullReadSpy.mockRestore();
tailReadSpy.mockRestore();
}
});
test("strips legacy internal envelopes before exposing history", () => {
const snapshot = buildSessionHistorySnapshot({
rawMessages: [

View File

@@ -2,7 +2,11 @@ import {
DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS,
projectChatDisplayMessages,
} from "./chat-display-projection.js";
import { attachOpenClawTranscriptMeta, readSessionMessages } from "./session-utils.js";
import {
attachOpenClawTranscriptMeta,
readRecentSessionMessagesWithStats,
readSessionMessages,
} from "./session-utils.js";
type SessionHistoryTranscriptMeta = {
seq?: number;
@@ -30,6 +34,24 @@ type SessionHistoryTranscriptTarget = {
sessionFile?: string;
};
type SessionHistoryRawSnapshot = {
rawMessages: unknown[];
rawTranscriptSeq?: number;
totalRawMessages?: number;
};
export function resolveSessionHistoryTailReadOptions(limit: number): {
maxMessages: number;
maxLines: number;
} {
const requested = Math.max(1, Math.floor(limit));
const rawWindow = requested * 20 + 20;
return {
maxMessages: rawWindow,
maxLines: rawWindow,
};
}
function resolveCursorSeq(cursor: string | undefined): number | undefined {
if (!cursor) {
return undefined;
@@ -98,6 +120,8 @@ export function buildSessionHistorySnapshot(params: {
maxChars?: number;
limit?: number;
cursor?: string;
rawTranscriptSeq?: number;
totalRawMessages?: number;
}): SessionHistorySnapshot {
const visibleMessages = toSessionHistoryMessages(
projectChatDisplayMessages(params.rawMessages, {
@@ -105,10 +129,25 @@ export function buildSessionHistorySnapshot(params: {
}),
);
const history = paginateSessionMessages(visibleMessages, params.limit, params.cursor);
if (
!params.cursor &&
typeof params.totalRawMessages === "number" &&
params.totalRawMessages > params.rawMessages.length &&
history.messages.length > 0
) {
const firstSeq = resolveMessageSeq(history.messages[0]);
history.hasMore = true;
if (typeof firstSeq === "number") {
history.nextCursor = String(firstSeq);
}
}
const rawHistoryMessages = toSessionHistoryMessages(params.rawMessages);
return {
history,
rawTranscriptSeq: resolveMessageSeq(rawHistoryMessages.at(-1)) ?? rawHistoryMessages.length,
rawTranscriptSeq:
params.rawTranscriptSeq ??
resolveMessageSeq(rawHistoryMessages.at(-1)) ??
rawHistoryMessages.length,
};
}
@@ -123,6 +162,8 @@ export class SessionHistorySseState {
static fromRawSnapshot(params: {
target: SessionHistoryTranscriptTarget;
rawMessages: unknown[];
rawTranscriptSeq?: number;
totalRawMessages?: number;
maxChars?: number;
limit?: number;
cursor?: string;
@@ -133,6 +174,8 @@ export class SessionHistorySseState {
limit: params.limit,
cursor: params.cursor,
initialRawMessages: params.rawMessages,
rawTranscriptSeq: params.rawTranscriptSeq,
totalRawMessages: params.totalRawMessages,
});
}
@@ -142,17 +185,36 @@ export class SessionHistorySseState {
limit?: number;
cursor?: string;
initialRawMessages?: unknown[];
rawTranscriptSeq?: number;
totalRawMessages?: number;
}) {
this.target = params.target;
this.maxChars = params.maxChars ?? DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS;
this.limit = params.limit;
this.cursor = params.cursor;
const rawMessages = params.initialRawMessages ?? this.readRawMessages();
const rawSnapshot =
params.initialRawMessages === undefined
? this.readRawSnapshot()
: {
rawMessages: params.initialRawMessages,
...(typeof params.rawTranscriptSeq === "number"
? { rawTranscriptSeq: params.rawTranscriptSeq }
: {}),
...(typeof params.totalRawMessages === "number"
? { totalRawMessages: params.totalRawMessages }
: {}),
};
const snapshot = buildSessionHistorySnapshot({
rawMessages,
rawMessages: rawSnapshot.rawMessages,
maxChars: this.maxChars,
limit: this.limit,
cursor: this.cursor,
...(typeof rawSnapshot.rawTranscriptSeq === "number"
? { rawTranscriptSeq: rawSnapshot.rawTranscriptSeq }
: {}),
...(typeof rawSnapshot.totalRawMessages === "number"
? { totalRawMessages: rawSnapshot.totalRawMessages }
: {}),
});
this.sentHistory = snapshot.history;
this.rawTranscriptSeq = snapshot.rawTranscriptSeq;
@@ -192,17 +254,43 @@ export class SessionHistorySseState {
}
refresh(): PaginatedSessionHistory {
const rawSnapshot = this.readRawSnapshot();
const snapshot = buildSessionHistorySnapshot({
rawMessages: this.readRawMessages(),
rawMessages: rawSnapshot.rawMessages,
maxChars: this.maxChars,
limit: this.limit,
cursor: this.cursor,
...(typeof rawSnapshot.rawTranscriptSeq === "number"
? { rawTranscriptSeq: rawSnapshot.rawTranscriptSeq }
: {}),
...(typeof rawSnapshot.totalRawMessages === "number"
? { totalRawMessages: rawSnapshot.totalRawMessages }
: {}),
});
this.rawTranscriptSeq = snapshot.rawTranscriptSeq;
this.sentHistory = snapshot.history;
return snapshot.history;
}
private readRawSnapshot(): SessionHistoryRawSnapshot {
if (this.cursor === undefined && typeof this.limit === "number") {
const snapshot = readRecentSessionMessagesWithStats(
this.target.sessionId,
this.target.storePath,
this.target.sessionFile,
resolveSessionHistoryTailReadOptions(this.limit),
);
return {
rawMessages: snapshot.messages,
rawTranscriptSeq: snapshot.totalMessages,
totalRawMessages: snapshot.totalMessages,
};
}
return {
rawMessages: this.readRawMessages(),
};
}
private readRawMessages(): unknown[] {
return readSessionMessages(
this.target.sessionId,

View File

@@ -10,6 +10,9 @@ import {
readLatestSessionUsageFromTranscript,
readRecentSessionUsageFromTranscript,
readRecentSessionMessages,
readRecentSessionMessagesWithStats,
readRecentSessionTranscriptLines,
readSessionMessageCount,
readSessionMessages,
readSessionTitleFieldsFromTranscript,
readSessionPreviewItemsFromTranscript,
@@ -563,6 +566,84 @@ describe("readSessionMessages", () => {
}
});
test("preserves real sequence metadata for bounded recent-message reads", () => {
const sessionId = "test-session-recent-seq";
writeTranscript(tmpDir, sessionId, [
{ type: "session", version: 1, id: sessionId },
{ message: { role: "user", content: "old" } },
{ message: { role: "assistant", content: "middle" } },
{ message: { role: "user", content: "recent" } },
{ message: { role: "assistant", content: "latest" } },
]);
const result = readRecentSessionMessagesWithStats(sessionId, storePath, undefined, {
maxMessages: 2,
maxBytes: 256,
});
expect(result.totalMessages).toBe(4);
expect(result.messages).toEqual([
expect.objectContaining({
content: "recent",
__openclaw: expect.objectContaining({ seq: 3 }),
}),
expect.objectContaining({
content: "latest",
__openclaw: expect.objectContaining({ seq: 4 }),
}),
]);
});
test("counts transcript messages without loading the whole file", () => {
const sessionId = "test-session-count-large";
const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`);
const lines = [
JSON.stringify({ type: "session", version: 1, id: sessionId }),
...Array.from({ length: 2500 }, (_, index) =>
JSON.stringify({ message: { role: "user", content: `message ${index}` } }),
),
];
fs.writeFileSync(transcriptPath, lines.join("\n"), "utf-8");
const readFileSpy = vi.spyOn(fs, "readFileSync");
try {
expect(readSessionMessageCount(sessionId, storePath)).toBe(2500);
expect(readFileSpy).not.toHaveBeenCalled();
} finally {
readFileSpy.mockRestore();
}
});
test("tails transcript lines for manual compaction without loading the whole file", () => {
const sessionId = "test-session-line-tail";
const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`);
const lines = [
JSON.stringify({ type: "session", version: 1, id: sessionId }),
...Array.from({ length: 10 }, (_, index) =>
JSON.stringify({ message: { role: "user", content: `message ${index}` } }),
),
];
fs.writeFileSync(transcriptPath, `${lines.join("\n")}\n`, "utf-8");
const readFileSpy = vi.spyOn(fs, "readFileSync");
try {
const result = readRecentSessionTranscriptLines({
sessionId,
storePath,
maxLines: 3,
});
expect(result?.totalLines).toBe(11);
expect(result?.lines.map((line) => JSON.parse(line).message?.content)).toEqual([
"message 7",
"message 8",
"message 9",
]);
expect(readFileSpy).not.toHaveBeenCalled();
} finally {
readFileSpy.mockRestore();
}
});
test("reads only the active branch when transcript rewrites abandon older entries", () => {
const sessionId = "test-session-active-branch";
const sessionFile = path.join(tmpDir, `${sessionId}.jsonl`);

View File

@@ -1,4 +1,5 @@
import fs from "node:fs";
import { StringDecoder } from "node:string_decoder";
import { SessionManager, type SessionEntry } from "@mariozechner/pi-coding-agent";
import { deriveSessionTotalTokens, hasNonzeroUsage, normalizeUsage } from "../agents/usage.js";
import { jsonUtf8Bytes } from "../infra/json-utf8-bytes.js";
@@ -174,6 +175,11 @@ export type ReadRecentSessionMessagesOptions = {
maxLines?: number;
};
export type ReadRecentSessionMessagesResult = {
messages: unknown[];
totalMessages: number;
};
const RECENT_SESSION_MESSAGES_DEFAULT_MAX_BYTES = 8 * 1024 * 1024;
export function readRecentSessionMessages(
@@ -247,6 +253,189 @@ export function readRecentSessionMessages(
);
}
function visitTranscriptLines(filePath: string, visit: (line: string) => void): void {
const fd = fs.openSync(filePath, "r");
try {
const decoder = new StringDecoder("utf8");
const buffer = Buffer.allocUnsafe(64 * 1024);
let carry = "";
while (true) {
const bytesRead = fs.readSync(fd, buffer, 0, buffer.length, null);
if (bytesRead <= 0) {
break;
}
const text = carry + decoder.write(buffer.subarray(0, bytesRead));
const lines = text.split(/\r?\n/);
carry = lines.pop() ?? "";
for (const line of lines) {
visit(line);
}
}
const tail = carry + decoder.end();
if (tail) {
visit(tail);
}
} finally {
fs.closeSync(fd);
}
}
function transcriptHasTreeEntries(filePath: string): boolean {
let hasTreeEntries = false;
try {
visitTranscriptLines(filePath, (line) => {
if (!hasTreeEntries && hasSessionTreeEntry(line)) {
hasTreeEntries = true;
}
});
} catch {
return false;
}
return hasTreeEntries;
}
function visitSessionManagerBranchMessages(
filePath: string,
visit: (message: unknown, seq: number) => void,
): number {
const branchEntries = SessionManager.open(filePath).getBranch();
let messageSeq = 0;
for (const entry of branchEntries) {
if (entry.type === "message" && entry.message) {
messageSeq += 1;
visit(
attachOpenClawTranscriptMeta(entry.message, {
...(typeof entry.id === "string" ? { id: entry.id } : {}),
seq: messageSeq,
}),
messageSeq,
);
continue;
}
if (entry.type === "compaction") {
const ts = typeof entry.timestamp === "string" ? Date.parse(entry.timestamp) : Number.NaN;
const timestamp = Number.isFinite(ts) ? ts : Date.now();
messageSeq += 1;
visit(
{
role: "system",
content: [{ type: "text", text: "Compaction" }],
timestamp,
__openclaw: {
kind: "compaction",
id: typeof entry.id === "string" ? entry.id : undefined,
seq: messageSeq,
},
},
messageSeq,
);
}
}
return messageSeq;
}
export function visitSessionMessages(
sessionId: string,
storePath: string | undefined,
sessionFile: string | undefined,
visit: (message: unknown, seq: number) => void,
): number {
const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile);
if (!filePath) {
return 0;
}
if (transcriptHasTreeEntries(filePath)) {
try {
return visitSessionManagerBranchMessages(filePath, visit);
} catch {
return 0;
}
}
let messageSeq = 0;
try {
visitTranscriptLines(filePath, (line) => {
if (!line.trim()) {
return;
}
try {
const parsed = JSON.parse(line);
const message = parsedSessionEntryToMessage(parsed, messageSeq + 1);
if (message) {
messageSeq += 1;
visit(message, messageSeq);
}
} catch {
// ignore bad lines
}
});
} catch {
return 0;
}
return messageSeq;
}
export function readSessionMessageCount(
sessionId: string,
storePath: string | undefined,
sessionFile?: string,
): number {
return visitSessionMessages(sessionId, storePath, sessionFile, () => undefined);
}
export function readRecentSessionMessagesWithStats(
sessionId: string,
storePath: string | undefined,
sessionFile: string | undefined,
opts: ReadRecentSessionMessagesOptions,
): ReadRecentSessionMessagesResult {
const totalMessages = readSessionMessageCount(sessionId, storePath, sessionFile);
const messages = readRecentSessionMessages(sessionId, storePath, sessionFile, opts);
const firstSeq = Math.max(1, totalMessages - messages.length + 1);
const messagesWithSeq = messages.map((message, index) =>
attachOpenClawTranscriptMeta(message, { seq: firstSeq + index }),
);
return { messages: messagesWithSeq, totalMessages };
}
export function readRecentSessionTranscriptLines(params: {
sessionId: string;
storePath: string | undefined;
sessionFile?: string;
agentId?: string;
maxLines: number;
}): { lines: string[]; totalLines: number } | null {
const filePath = findExistingTranscriptPath(
params.sessionId,
params.storePath,
params.sessionFile,
params.agentId,
);
if (!filePath) {
return null;
}
const maxLines = Math.max(1, Math.floor(params.maxLines));
const lines: string[] = [];
let totalLines = 0;
try {
visitTranscriptLines(filePath, (line) => {
if (!line.trim()) {
return;
}
totalLines += 1;
lines.push(line);
if (lines.length > maxLines) {
lines.shift();
}
});
} catch {
return null;
}
return { lines, totalLines };
}
function hasSessionTreeEntry(line: string): boolean {
if (!line.trim()) {
return false;

View File

@@ -106,11 +106,15 @@ export {
readFirstUserMessageFromTranscript,
readLastMessagePreviewFromTranscript,
readLatestSessionUsageFromTranscript,
readRecentSessionUsageFromTranscript,
readRecentSessionMessages,
readRecentSessionMessagesWithStats,
readRecentSessionTranscriptLines,
readRecentSessionUsageFromTranscript,
readSessionMessageCount,
readSessionTitleFieldsFromTranscript,
readSessionPreviewItemsFromTranscript,
readSessionMessages,
visitSessionMessages,
resolveSessionTranscriptCandidates,
} from "./session-utils.fs.js";
export { canonicalizeSpawnedByForAgent, resolveSessionStoreKey } from "./session-store-key.js";

View File

@@ -25,8 +25,13 @@ import {
} from "./http-utils.js";
import { authorizeOperatorScopesForMethod } from "./method-scopes.js";
import { DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS } from "./server-methods/chat.js";
import { buildSessionHistorySnapshot, SessionHistorySseState } from "./session-history-state.js";
import {
buildSessionHistorySnapshot,
resolveSessionHistoryTailReadOptions,
SessionHistorySseState,
} from "./session-history-state.js";
import {
readRecentSessionMessagesWithStats,
readSessionMessages,
resolveFreshestSessionEntryFromStoreKeys,
resolveGatewaySessionStoreTarget,
@@ -149,17 +154,29 @@ export async function handleSessionHistoryHttpRequest(
typeof cfg.gateway?.webchat?.chatHistoryMaxChars === "number"
? cfg.gateway.webchat.chatHistoryMaxChars
: DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS;
// Read the transcript once and derive both sanitized and raw views from the
// same snapshot, eliminating the theoretical race window where a concurrent
// write between two separate reads could cause seq/content divergence.
const rawSnapshot = entry?.sessionId
? readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile)
: [];
const boundedSnapshot =
cursor === undefined && typeof limit === "number"
? readRecentSessionMessagesWithStats(
entry.sessionId,
target.storePath,
entry.sessionFile,
resolveSessionHistoryTailReadOptions(limit),
)
: undefined;
// Cursor reads still need an arbitrary historical window. The common first
// page path is bounded above so `limit=1` cannot materialize huge transcripts.
const rawSnapshot =
boundedSnapshot?.messages ??
(entry?.sessionId
? readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile)
: []);
const historySnapshot = buildSessionHistorySnapshot({
rawMessages: rawSnapshot,
maxChars: effectiveMaxChars,
limit,
cursor,
rawTranscriptSeq: boundedSnapshot?.totalMessages,
totalRawMessages: boundedSnapshot?.totalMessages,
});
const history = historySnapshot.history;
@@ -192,6 +209,8 @@ export async function handleSessionHistoryHttpRequest(
sessionFile: entry.sessionFile,
},
rawMessages: rawSnapshot,
rawTranscriptSeq: boundedSnapshot?.totalMessages,
totalRawMessages: boundedSnapshot?.totalMessages,
maxChars: effectiveMaxChars,
limit,
cursor,