fix(gateway): async session transcript IO (#75875)

* fix(gateway): async session transcript IO

* fix(plugins): restore jiti loader cache helper

* test(gateway): mock async artifact transcript reads

* chore(plugins): drop obsolete jiti loader shim
This commit is contained in:
Peter Steinberger
2026-05-02 02:06:38 +01:00
committed by GitHub
parent 8d7f4d28ce
commit 6147e1b91d
37 changed files with 1890 additions and 338 deletions

View File

@@ -15,6 +15,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Gateway/sessions: move hot transcript reads and mirror appends onto async bounded IO with serialized parent-linked writes, keeping large session histories from stalling Gateway requests and channel replies. Fixes #75656. Thanks @DerFlash.
- Doctor/WhatsApp: warn when Linux crontabs still run the legacy `ensure-whatsapp.sh` health check, which can misreport `Gateway inactive` when cron lacks the systemd user-bus environment. Fixes #60204. Thanks @mySebbe.
- Slack/setup: print the generated app manifest as plain JSON instead of embedding it inside the framed setup note, so it can be copied into Slack without deleting border characters. Fixes #65751. Thanks @theDanielJLewis.
- Channels/WhatsApp: route CLI logout through the live Gateway and stop runtime-backed listeners before channel removal, so removing a WhatsApp account does not leave the old socket replying until restart. Fixes #67746. Thanks @123Mismail.

View File

@@ -29,6 +29,12 @@ async function createTempSessionFile() {
return path.join(dir, "session.jsonl");
}
async function makeRoot(prefix: string): Promise<string> {
const root = await fs.mkdtemp(path.join(os.tmpdir(), prefix));
tempDirs.push(root);
return root;
}
describe("mirrorCodexAppServerTranscript", () => {
it("mirrors user and assistant messages into the Pi transcript", async () => {
const sessionFile = await createTempSessionFile();
@@ -58,6 +64,27 @@ describe("mirrorCodexAppServerTranscript", () => {
expect(raw).toContain('"idempotencyKey":"scope-1:assistant:1"');
});
it("creates the transcript directory on first mirror", async () => {
const root = await makeRoot("openclaw-codex-transcript-missing-dir-");
const sessionFile = path.join(root, "nested", "sessions", "session.jsonl");
await mirrorCodexAppServerTranscript({
sessionFile,
sessionKey: "session-1",
messages: [
makeAgentAssistantMessage({
content: [{ type: "text", text: "first mirror" }],
timestamp: Date.now(),
}),
],
idempotencyScope: "scope-1",
});
const raw = await fs.readFile(sessionFile, "utf8");
expect(raw).toContain('"role":"assistant"');
expect(raw).toContain('"content":[{"type":"text","text":"first mirror"}]');
});
it("deduplicates app-server turn mirrors by idempotency scope", async () => {
const sessionFile = await createTempSessionFile();
const messages = [
@@ -183,4 +210,56 @@ describe("mirrorCodexAppServerTranscript", () => {
await expect(fs.readFile(sessionFile, "utf8")).rejects.toMatchObject({ code: "ENOENT" });
});
it("migrates small linear transcripts before mirroring", async () => {
const sessionFile = await createTempSessionFile();
await fs.writeFile(
sessionFile,
[
JSON.stringify({
type: "session",
version: 3,
id: "linear-codex-session",
timestamp: new Date().toISOString(),
cwd: process.cwd(),
}),
JSON.stringify({
type: "message",
id: "legacy-user",
timestamp: new Date().toISOString(),
message: { role: "user", content: "legacy user" },
}),
].join("\n") + "\n",
"utf8",
);
await mirrorCodexAppServerTranscript({
sessionFile,
sessionKey: "session-1",
messages: [
makeAgentAssistantMessage({
content: [{ type: "text", text: "mirrored assistant" }],
timestamp: Date.now(),
}),
],
idempotencyScope: "scope-1",
});
const records = (await fs.readFile(sessionFile, "utf8"))
.trim()
.split("\n")
.map(
(line) =>
JSON.parse(line) as {
type?: string;
id?: string;
parentId?: string | null;
message?: { role?: string };
},
)
.filter((record) => record.type === "message");
expect(records[0]).toMatchObject({ id: "legacy-user", parentId: null });
expect(records[1]).toMatchObject({ parentId: "legacy-user" });
});
});

View File

@@ -1,6 +1,8 @@
import { randomUUID } from "node:crypto";
import fs from "node:fs/promises";
import path from "node:path";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import { StringDecoder } from "node:string_decoder";
import { CURRENT_SESSION_VERSION, type SessionManager } from "@mariozechner/pi-coding-agent";
import {
acquireSessionWriteLock,
emitSessionTranscriptUpdate,
@@ -8,6 +10,15 @@ import {
type AgentMessage,
} from "openclaw/plugin-sdk/agent-harness-runtime";
const TRANSCRIPT_APPEND_SCAN_CHUNK_BYTES = 64 * 1024;
const SESSION_MANAGER_APPEND_MAX_BYTES = 8 * 1024 * 1024;
type TranscriptLeafInfo = {
leafId?: string;
hasParentLinkedEntries: boolean;
nonSessionEntryCount: number;
};
export async function mirrorCodexAppServerTranscript(params: {
sessionFile: string;
sessionKey?: string;
@@ -29,7 +40,6 @@ export async function mirrorCodexAppServerTranscript(params: {
});
try {
const existingIdempotencyKeys = await readTranscriptIdempotencyKeys(params.sessionFile);
const sessionManager = SessionManager.open(params.sessionFile);
for (const [index, message] of messages.entries()) {
const idempotencyKey = params.idempotencyScope
? `${params.idempotencyScope}:${message.role}:${index}`
@@ -55,7 +65,10 @@ export async function mirrorCodexAppServerTranscript(params: {
idempotencyKey,
}
: nextMessage) as unknown as Parameters<SessionManager["appendMessage"]>[0];
sessionManager.appendMessage(messageToAppend);
await appendCodexAppServerTranscriptMessage({
transcriptPath: params.sessionFile,
message: messageToAppend,
});
if (idempotencyKey) {
existingIdempotencyKeys.add(idempotencyKey);
}
@@ -71,6 +84,202 @@ export async function mirrorCodexAppServerTranscript(params: {
}
}
async function appendCodexAppServerTranscriptMessage(params: {
transcriptPath: string;
message: unknown;
}): Promise<void> {
await ensureTranscriptHeader(params.transcriptPath);
const stat = await fs.stat(params.transcriptPath).catch(() => null);
let leafInfo: TranscriptLeafInfo = await readTranscriptLeafInfo(params.transcriptPath).catch(
() => ({
hasParentLinkedEntries: false,
nonSessionEntryCount: 0,
}),
);
const hasLinearEntries = !leafInfo.hasParentLinkedEntries && leafInfo.nonSessionEntryCount > 0;
const shouldRawAppend = hasLinearEntries && (stat?.size ?? 0) > SESSION_MANAGER_APPEND_MAX_BYTES;
if (hasLinearEntries && !shouldRawAppend) {
const migrated = await migrateLinearTranscriptToParentLinked(params.transcriptPath);
leafInfo = {
...(migrated.leafId ? { leafId: migrated.leafId } : {}),
hasParentLinkedEntries: Boolean(migrated.leafId),
nonSessionEntryCount: leafInfo.nonSessionEntryCount,
};
}
const entry = {
type: "message",
id: randomUUID(),
...(shouldRawAppend ? {} : { parentId: leafInfo.leafId ?? null }),
timestamp: new Date().toISOString(),
message: params.message,
};
await fs.appendFile(params.transcriptPath, `${JSON.stringify(entry)}\n`, "utf-8");
}
async function ensureTranscriptHeader(transcriptPath: string): Promise<void> {
const stat = await fs.stat(transcriptPath).catch(() => null);
if (stat?.isFile() && stat.size > 0) {
return;
}
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
const header = {
type: "session",
version: CURRENT_SESSION_VERSION,
id: randomUUID(),
timestamp: new Date().toISOString(),
cwd: process.cwd(),
};
await fs.writeFile(transcriptPath, `${JSON.stringify(header)}\n`, {
encoding: "utf-8",
mode: 0o600,
flag: stat?.isFile() ? "w" : "wx",
});
}
async function readTranscriptLeafInfo(transcriptPath: string): Promise<TranscriptLeafInfo> {
const handle = await fs.open(transcriptPath, "r");
try {
const decoder = new StringDecoder("utf8");
const buffer = Buffer.allocUnsafe(TRANSCRIPT_APPEND_SCAN_CHUNK_BYTES);
let carry = "";
let leafId: string | undefined;
let hasParentLinkedEntries = false;
let nonSessionEntryCount = 0;
while (true) {
const { bytesRead } = await handle.read(buffer, 0, buffer.length, null);
if (bytesRead <= 0) {
break;
}
const text = carry + decoder.write(buffer.subarray(0, bytesRead));
const lines = text.split(/\r?\n/);
carry = lines.pop() ?? "";
for (const line of lines) {
if (lineHasNonSessionEntry(line)) {
nonSessionEntryCount += 1;
}
const id = lineParentLinkedEntryId(line);
if (id) {
leafId = id;
hasParentLinkedEntries = true;
}
}
await yieldTranscriptAppendScan();
}
const tail = carry + decoder.end();
if (lineHasNonSessionEntry(tail)) {
nonSessionEntryCount += 1;
}
const id = lineParentLinkedEntryId(tail);
if (id) {
leafId = id;
hasParentLinkedEntries = true;
}
return {
...(leafId ? { leafId } : {}),
hasParentLinkedEntries,
nonSessionEntryCount,
};
} finally {
await handle.close();
}
}
async function migrateLinearTranscriptToParentLinked(transcriptPath: string): Promise<{
leafId?: string;
}> {
const raw = await fs.readFile(transcriptPath, "utf-8");
const existingIds = new Set<string>();
const output: string[] = [];
let previousId: string | null = null;
let leafId: string | undefined;
for (const line of raw.split(/\r?\n/)) {
if (!line.trim()) {
continue;
}
let parsed: unknown;
try {
parsed = JSON.parse(line);
} catch {
output.push(line);
continue;
}
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
output.push(line);
continue;
}
const record = parsed as Record<string, unknown>;
if (record.type === "session") {
output.push(JSON.stringify({ ...record, version: CURRENT_SESSION_VERSION }));
continue;
}
const id = normalizeEntryId(record.id) ?? generateEntryId(existingIds);
existingIds.add(id);
record.id = id;
if (!Object.hasOwn(record, "parentId")) {
record.parentId = previousId;
}
previousId = id;
leafId = id;
output.push(JSON.stringify(record));
}
await fs.writeFile(transcriptPath, `${output.join("\n")}\n`, {
encoding: "utf-8",
mode: 0o600,
});
const result: { leafId?: string } = {};
if (leafId) {
result.leafId = leafId;
}
return result;
}
function normalizeEntryId(value: unknown): string | undefined {
return typeof value === "string" && value.trim().length > 0 ? value : undefined;
}
function generateEntryId(existingIds: Set<string>): string {
for (let attempt = 0; attempt < 100; attempt += 1) {
const id = randomUUID().slice(0, 8);
if (!existingIds.has(id)) {
existingIds.add(id);
return id;
}
}
const id = randomUUID();
existingIds.add(id);
return id;
}
function lineHasNonSessionEntry(line: string): boolean {
if (!line.trim()) {
return false;
}
try {
const parsed = JSON.parse(line) as { type?: unknown };
return parsed.type !== "session";
} catch {
return false;
}
}
function lineParentLinkedEntryId(line: string): string | undefined {
if (!line.trim()) {
return undefined;
}
try {
const parsed = JSON.parse(line) as { type?: unknown; id?: unknown; parentId?: unknown };
return parsed.type !== "session" && typeof parsed.id === "string" && "parentId" in parsed
? parsed.id
: undefined;
} catch {
return undefined;
}
}
async function yieldTranscriptAppendScan(): Promise<void> {
await new Promise<void>((resolve) => setImmediate(resolve));
}
async function readTranscriptIdempotencyKeys(sessionFile: string): Promise<Set<string>> {
const keys = new Set<string>();
let raw: string;

View File

@@ -7,7 +7,7 @@ import path from "node:path";
import { resolveStateDir } from "../config/paths.js";
import { type SessionEntry, loadSessionStore, updateSessionStore } from "../config/sessions.js";
import { callGateway } from "../gateway/call.js";
import { readSessionMessages } from "../gateway/session-utils.fs.js";
import { readSessionMessagesAsync } from "../gateway/session-utils.fs.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { CommandLane } from "../process/lanes.js";
import { isAcpSessionKey, isCronSessionKey, isSubagentSessionKey } from "../routing/session-key.js";
@@ -226,7 +226,11 @@ async function recoverStore(params: {
let messages: unknown[];
try {
messages = readSessionMessages(entry.sessionId, params.storePath, entry.sessionFile);
messages = await readSessionMessagesAsync(
entry.sessionId,
params.storePath,
entry.sessionFile,
);
} catch (err) {
log.warn(`failed to read transcript for ${sessionKey}: ${String(err)}`);
result.failed++;

View File

@@ -29,7 +29,7 @@ vi.mock("../gateway/call.js", () => ({
}));
vi.mock("../gateway/session-utils.fs.js", () => ({
readSessionMessages: vi.fn(() => []),
readSessionMessagesAsync: vi.fn(async () => []),
}));
vi.mock("./subagent-announce-delivery.js", () => ({
@@ -465,7 +465,7 @@ describe("subagent-orphan-recovery", () => {
it("includes last human message in resume when available", async () => {
mockSingleAbortedSession({ sessionFile: "session-abc.jsonl" });
vi.mocked(sessionUtils.readSessionMessages).mockReturnValue([
vi.mocked(sessionUtils.readSessionMessagesAsync).mockResolvedValue([
{ role: "user", content: [{ type: "text", text: "Please build feature Y" }] },
{ role: "assistant", content: [{ type: "text", text: "Working on it..." }] },
{ role: "user", content: [{ type: "text", text: "Also add tests for it" }] },
@@ -484,7 +484,7 @@ describe("subagent-orphan-recovery", () => {
it("adds config change hint when assistant messages reference config modifications", async () => {
mockSingleAbortedSession();
vi.mocked(sessionUtils.readSessionMessages).mockReturnValue([
vi.mocked(sessionUtils.readSessionMessagesAsync).mockResolvedValue([
{ role: "user", content: "Update the config" },
{ role: "assistant", content: "I've modified openclaw.json to add the new setting." },
]);

View File

@@ -19,7 +19,7 @@ import {
type SessionEntry,
} from "../config/sessions.js";
import { callGateway } from "../gateway/call.js";
import { readSessionMessages } from "../gateway/session-utils.fs.js";
import { readSessionMessagesAsync } from "../gateway/session-utils.fs.js";
import { formatErrorMessage } from "../infra/errors.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { buildAnnounceIdempotencyKey } from "./announce-idempotency.js";
@@ -350,7 +350,11 @@ export async function recoverOrphanedSubagentSessions(params: {
log.info(`found orphaned subagent session: ${childSessionKey} (run=${runId})`);
const messages = readSessionMessages(entry.sessionId, storePath, entry.sessionFile);
const messages = await readSessionMessagesAsync(
entry.sessionId,
storePath,
entry.sessionFile,
);
const lastHumanMessage = [...messages]
.toReversed()
.find((msg) => (msg as { role?: unknown } | null)?.role === "user");

View File

@@ -14,10 +14,10 @@ export {
} from "../../gateway/server-methods/chat.js";
export { capArrayByJsonBytes } from "../../gateway/session-utils.fs.js";
export {
listSessionsFromStore,
listSessionsFromStoreAsync,
loadCombinedSessionStoreForGateway,
loadSessionEntry,
readSessionMessages,
readSessionMessagesAsync,
resolveSessionModelRef,
} from "../../gateway/session-utils.js";
export { resolveSessionKeyFromResolveParams } from "../../gateway/sessions-resolve.js";

View File

@@ -11,7 +11,7 @@ const runtime = vi.hoisted(() => ({
entry: { sessionId: "sess-main" },
})),
resolveSessionModelRef: vi.fn(() => ({ provider: "openai" })),
readSessionMessages: vi.fn((): unknown[] => []),
readSessionMessagesAsync: vi.fn(async (): Promise<unknown[]> => []),
augmentChatHistoryWithCliSessionImports: vi.fn(
({ localMessages }: { localMessages?: unknown[] }) => localMessages ?? [],
),
@@ -34,7 +34,7 @@ describe("embedded gateway stub", () => {
runtime.getRuntimeConfig.mockClear();
runtime.resolveSessionKeyFromResolveParams.mockReset();
runtime.projectRecentChatDisplayMessages.mockClear();
runtime.readSessionMessages.mockClear();
runtime.readSessionMessagesAsync.mockClear();
});
it("resolves sessions through the gateway session resolver", async () => {
@@ -78,7 +78,7 @@ describe("embedded gateway stub", () => {
{ role: "assistant", content: "hi" },
];
const projectedMessages = [{ role: "assistant", content: "hi" }];
runtime.readSessionMessages.mockReturnValueOnce(rawMessages);
runtime.readSessionMessagesAsync.mockResolvedValueOnce(rawMessages);
runtime.projectRecentChatDisplayMessages.mockReturnValueOnce(projectedMessages);
const callGateway = createEmbeddedCallGateway();
@@ -99,7 +99,7 @@ describe("embedded gateway stub", () => {
{ role: "user", content: "visible older" },
{ role: "assistant", content: "hidden newer" },
];
runtime.readSessionMessages.mockReturnValueOnce(rawMessages);
runtime.readSessionMessagesAsync.mockResolvedValueOnce(rawMessages);
const callGateway = createEmbeddedCallGateway();
await callGateway<{ messages: unknown[] }>({

View File

@@ -30,12 +30,12 @@ interface EmbeddedGatewayRuntime {
opts?: { maxChars?: number; maxMessages?: number },
) => unknown[];
capArrayByJsonBytes: (items: unknown[], maxBytes: number) => { items: unknown[] };
listSessionsFromStore: (opts: {
listSessionsFromStoreAsync: (opts: {
cfg: OpenClawConfig;
storePath: string;
store: unknown;
opts: SessionsListParams;
}) => SessionsListResult;
}) => Promise<SessionsListResult>;
loadCombinedSessionStoreForGateway: (cfg: OpenClawConfig) => {
storePath: string;
store: unknown;
@@ -49,7 +49,11 @@ interface EmbeddedGatewayRuntime {
storePath: string | undefined;
entry: Record<string, unknown> | undefined;
};
readSessionMessages: (sessionId: string, storePath: string, sessionFile?: string) => unknown[];
readSessionMessagesAsync: (
sessionId: string,
storePath: string,
sessionFile?: string,
) => Promise<unknown[]>;
resolveSessionModelRef: (
cfg: OpenClawConfig,
entry: unknown,
@@ -70,7 +74,7 @@ async function handleSessionsList(params: Record<string, unknown>) {
const rt = await getRuntime();
const cfg = rt.getRuntimeConfig();
const { storePath, store } = rt.loadCombinedSessionStoreForGateway(cfg);
return rt.listSessionsFromStore({
return rt.listSessionsFromStoreAsync({
cfg,
storePath,
store,
@@ -111,7 +115,11 @@ async function handleChatHistory(params: Record<string, unknown>): Promise<{
const localMessages =
sessionId && storePath
? rt.readSessionMessages(sessionId, storePath, entry?.sessionFile as string | undefined)
? await rt.readSessionMessagesAsync(
sessionId,
storePath,
entry?.sessionFile as string | undefined,
)
: [];
const rawMessages = rt.augmentChatHistoryWithCliSessionImports({

View File

@@ -8,7 +8,6 @@ import {
} from "../../config/sessions.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { callGateway } from "../../gateway/call.js";
import { readSessionTitleFieldsFromTranscript } from "../../gateway/session-utils.fs.js";
import { deriveSessionTitle } from "../../gateway/session-utils.js";
import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js";
import { normalizeOptionalLowercaseString, readStringValue } from "../../shared/string-coerce.js";
@@ -119,6 +118,8 @@ export function createSessionsListTool(opts?: {
label,
agentId,
search,
includeDerivedTitles,
includeLastMessage,
includeGlobal: !restrictToSpawned,
includeUnknown: !restrictToSpawned,
spawnedBy: restrictToSpawned ? effectiveRequesterKey : undefined,
@@ -309,31 +310,17 @@ export function createSessionsListTool(opts?: {
lastAccountId,
transcriptPath,
};
if (sessionId && (includeDerivedTitles || includeLastMessage)) {
const fields = readSessionTitleFieldsFromTranscript(
sessionId,
storePath,
sessionFile,
resolvedAgentId,
if (sessionId && includeDerivedTitles && !row.derivedTitle) {
row.derivedTitle = deriveSessionTitle(
{
sessionId,
displayName: row.displayName,
label: row.label,
subject: readStringValue((entry as { subject?: unknown }).subject),
updatedAt: typeof row.updatedAt === "number" ? row.updatedAt : 0,
},
undefined,
);
if (includeDerivedTitles && !row.derivedTitle) {
const derivedTitle = deriveSessionTitle(
{
sessionId,
displayName: row.displayName,
label: row.label,
subject: readStringValue((entry as { subject?: unknown }).subject),
updatedAt: typeof row.updatedAt === "number" ? row.updatedAt : 0,
},
fields.firstUserMessage,
);
if (derivedTitle) {
row.derivedTitle = derivedTitle;
}
}
if (includeLastMessage && !row.lastMessagePreview && fields.lastMessagePreview) {
row.lastMessagePreview = fields.lastMessagePreview;
}
}
if (messageLimit > 0) {
const resolvedKey = resolveInternalSessionKey({

View File

@@ -21,7 +21,7 @@ import {
updateSessionStoreEntry,
} from "../../config/sessions.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { readSessionMessages } from "../../gateway/session-utils.fs.js";
import { readSessionMessagesAsync } from "../../gateway/session-utils.fs.js";
import { logVerbose } from "../../globals.js";
import { registerAgentRunContext } from "../../infra/agent-events.js";
import { resolveMemoryFlushPlan } from "../../plugins/memory-state.js";
@@ -341,21 +341,21 @@ async function readLastNonzeroUsageFromSessionLog(logPath: string) {
}
}
function estimatePromptTokensFromSessionTranscript(params: {
async function estimatePromptTokensFromSessionTranscript(params: {
sessionId?: string;
storePath?: string;
sessionFile?: string;
}): number | undefined {
}): Promise<number | undefined> {
const sessionId = normalizeOptionalString(params.sessionId);
if (!sessionId) {
return undefined;
}
try {
const messages = readSessionMessages(
const messages = (await readSessionMessagesAsync(
sessionId,
params.storePath,
params.sessionFile,
) as AgentMessage[];
)) as AgentMessage[];
if (messages.length === 0) {
return undefined;
}
@@ -444,7 +444,7 @@ export async function runPreflightCompactionIfNeeded(params: {
const transcriptPromptTokens =
typeof freshPersistedTokens === "number"
? undefined
: estimatePromptTokensFromSessionTranscript({
: await estimatePromptTokensFromSessionTranscript({
sessionId: entry.sessionId,
storePath: params.storePath,
sessionFile: entry.sessionFile ?? params.followupRun.run.sessionFile,

View File

@@ -63,7 +63,7 @@ describe("resolveParentForkTokenCountRuntime", () => {
totalTokensFresh: false,
};
const tokens = resolveParentForkTokenCountRuntime({
const tokens = await resolveParentForkTokenCountRuntime({
parentEntry: entry,
storePath: path.join(root, "sessions.json"),
});

View File

@@ -6,7 +6,7 @@ import { CURRENT_SESSION_VERSION, SessionManager } from "@mariozechner/pi-coding
import { estimateMessagesTokens } from "../../agents/compaction.js";
import { resolveSessionFilePath } from "../../config/sessions/paths.js";
import { resolveFreshSessionTotalTokens, type SessionEntry } from "../../config/sessions/types.js";
import { readSessionMessages } from "../../gateway/session-utils.fs.js";
import { readSessionMessagesAsync } from "../../gateway/session-utils.fs.js";
function resolvePositiveTokenCount(value: number | undefined): number | undefined {
return typeof value === "number" && Number.isFinite(value) && value > 0
@@ -14,21 +14,21 @@ function resolvePositiveTokenCount(value: number | undefined): number | undefine
: undefined;
}
export function resolveParentForkTokenCountRuntime(params: {
export async function resolveParentForkTokenCountRuntime(params: {
parentEntry: SessionEntry;
storePath: string;
}): number | undefined {
}): Promise<number | undefined> {
const freshPersistedTokens = resolveFreshSessionTotalTokens(params.parentEntry);
if (typeof freshPersistedTokens === "number") {
return freshPersistedTokens;
}
try {
const transcriptMessages = readSessionMessages(
const transcriptMessages = (await readSessionMessagesAsync(
params.parentEntry.sessionId,
params.storePath,
params.parentEntry.sessionFile,
) as AgentMessage[];
)) as AgentMessage[];
if (transcriptMessages.length > 0) {
const estimatedTokens = estimateMessagesTokens(transcriptMessages);
const transcriptTokens = resolvePositiveTokenCount(

View File

@@ -0,0 +1,229 @@
import { randomUUID } from "node:crypto";
import fs from "node:fs/promises";
import path from "node:path";
import { StringDecoder } from "node:string_decoder";
import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent";
import { acquireSessionWriteLock } from "../../agents/session-write-lock.js";
const TRANSCRIPT_APPEND_SCAN_CHUNK_BYTES = 64 * 1024;
const SESSION_MANAGER_APPEND_MAX_BYTES = 8 * 1024 * 1024;
type TranscriptLeafInfo = {
leafId?: string;
hasParentLinkedEntries: boolean;
nonSessionEntryCount: number;
};
async function yieldTranscriptAppendScan(): Promise<void> {
await new Promise<void>((resolve) => setImmediate(resolve));
}
function lineParentLinkedEntryId(line: string): string | undefined {
if (!line.trim()) {
return undefined;
}
try {
const parsed = JSON.parse(line) as { type?: unknown; id?: unknown; parentId?: unknown };
return parsed.type !== "session" && typeof parsed.id === "string" && "parentId" in parsed
? parsed.id
: undefined;
} catch {
return undefined;
}
}
function normalizeEntryId(value: unknown): string | undefined {
return typeof value === "string" && value.trim().length > 0 ? value : undefined;
}
function generateEntryId(existingIds: Set<string>): string {
for (let attempt = 0; attempt < 100; attempt += 1) {
const id = randomUUID().slice(0, 8);
if (!existingIds.has(id)) {
existingIds.add(id);
return id;
}
}
const id = randomUUID();
existingIds.add(id);
return id;
}
async function readTranscriptLeafInfo(transcriptPath: string): Promise<TranscriptLeafInfo> {
const handle = await fs.open(transcriptPath, "r");
try {
const decoder = new StringDecoder("utf8");
const buffer = Buffer.allocUnsafe(TRANSCRIPT_APPEND_SCAN_CHUNK_BYTES);
let carry = "";
let leafId: string | undefined;
let hasParentLinkedEntries = false;
let nonSessionEntryCount = 0;
while (true) {
const { bytesRead } = await handle.read(buffer, 0, buffer.length, null);
if (bytesRead <= 0) {
break;
}
const text = carry + decoder.write(buffer.subarray(0, bytesRead));
const lines = text.split(/\r?\n/);
carry = lines.pop() ?? "";
for (const line of lines) {
if (lineHasNonSessionEntry(line)) {
nonSessionEntryCount += 1;
}
const id = lineParentLinkedEntryId(line);
if (id) {
leafId = id;
hasParentLinkedEntries = true;
}
}
await yieldTranscriptAppendScan();
}
const tail = carry + decoder.end();
if (lineHasNonSessionEntry(tail)) {
nonSessionEntryCount += 1;
}
const id = lineParentLinkedEntryId(tail);
if (id) {
leafId = id;
hasParentLinkedEntries = true;
}
return {
...(leafId ? { leafId } : {}),
hasParentLinkedEntries,
nonSessionEntryCount,
};
} finally {
await handle.close();
}
}
function lineHasNonSessionEntry(line: string): boolean {
if (!line.trim()) {
return false;
}
try {
const parsed = JSON.parse(line) as { type?: unknown };
return parsed.type !== "session";
} catch {
return false;
}
}
async function migrateLinearTranscriptToParentLinked(transcriptPath: string): Promise<{
leafId?: string;
}> {
const raw = await fs.readFile(transcriptPath, "utf-8");
const existingIds = new Set<string>();
const output: string[] = [];
let previousId: string | null = null;
let leafId: string | undefined;
for (const line of raw.split(/\r?\n/)) {
if (!line.trim()) {
continue;
}
let parsed: unknown;
try {
parsed = JSON.parse(line);
} catch {
output.push(line);
continue;
}
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
output.push(line);
continue;
}
const record = parsed as Record<string, unknown>;
if (record.type === "session") {
output.push(JSON.stringify({ ...record, version: CURRENT_SESSION_VERSION }));
continue;
}
const id = normalizeEntryId(record.id) ?? generateEntryId(existingIds);
existingIds.add(id);
record.id = id;
if (!Object.hasOwn(record, "parentId")) {
record.parentId = previousId;
}
previousId = id;
leafId = id;
output.push(JSON.stringify(record));
}
await fs.writeFile(transcriptPath, `${output.join("\n")}\n`, {
encoding: "utf-8",
mode: 0o600,
});
const result: { leafId?: string } = {};
if (leafId) {
result.leafId = leafId;
}
return result;
}
async function ensureTranscriptHeader(transcriptPath: string): Promise<void> {
const stat = await fs.stat(transcriptPath).catch(() => null);
if (stat?.isFile() && stat.size > 0) {
return;
}
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
const header = {
type: "session",
version: CURRENT_SESSION_VERSION,
id: randomUUID(),
timestamp: new Date().toISOString(),
cwd: process.cwd(),
};
await fs.writeFile(transcriptPath, `${JSON.stringify(header)}\n`, {
encoding: "utf-8",
mode: 0o600,
flag: stat?.isFile() ? "w" : "wx",
});
}
export async function appendSessionTranscriptMessage(params: {
transcriptPath: string;
message: unknown;
now?: number;
useRawWhenLinear?: boolean;
}): Promise<{ messageId: string }> {
const lock = await acquireSessionWriteLock({
sessionFile: params.transcriptPath,
timeoutMs: 10_000,
allowReentrant: true,
});
try {
const now = params.now ?? Date.now();
const messageId = randomUUID();
await ensureTranscriptHeader(params.transcriptPath);
const stat = await fs.stat(params.transcriptPath).catch(() => null);
let leafInfo: TranscriptLeafInfo = await readTranscriptLeafInfo(params.transcriptPath).catch(
() => ({
hasParentLinkedEntries: false,
nonSessionEntryCount: 0,
}),
);
const hasLinearEntries = !leafInfo.hasParentLinkedEntries && leafInfo.nonSessionEntryCount > 0;
const allowRawWhenLinear = params.useRawWhenLinear !== false;
const shouldRawAppend =
allowRawWhenLinear &&
hasLinearEntries &&
(stat?.size ?? 0) > SESSION_MANAGER_APPEND_MAX_BYTES;
if (hasLinearEntries && !shouldRawAppend) {
const migrated = await migrateLinearTranscriptToParentLinked(params.transcriptPath);
leafInfo = {
...(migrated.leafId ? { leafId: migrated.leafId } : {}),
hasParentLinkedEntries: Boolean(migrated.leafId),
nonSessionEntryCount: leafInfo.nonSessionEntryCount,
};
}
const entry = {
type: "message",
id: messageId,
...(shouldRawAppend ? {} : { parentId: leafInfo.leafId ?? null }),
timestamp: new Date(now).toISOString(),
message: params.message,
};
await fs.appendFile(params.transcriptPath, `${JSON.stringify(entry)}\n`, "utf-8");
return { messageId };
} finally {
await lock.release();
}
}

View File

@@ -3,6 +3,7 @@ import { describe, expect, it, vi } from "vitest";
import * as transcriptEvents from "../../sessions/transcript-events.js";
import { resolveSessionTranscriptPathInDir } from "./paths.js";
import { useTempSessionsFixture } from "./test-helpers.js";
import { appendSessionTranscriptMessage } from "./transcript-append.js";
import {
appendAssistantMessageToSessionTranscript,
appendExactAssistantMessageToSessionTranscript,
@@ -357,4 +358,124 @@ describe("appendAssistantMessageToSessionTranscript", () => {
}
emitSpy.mockRestore();
});
it("serializes concurrent parent-linked transcript appends", async () => {
const sessionFile = resolveSessionTranscriptPathInDir(
"concurrent-tree-session",
fixture.sessionsDir(),
);
fs.writeFileSync(
sessionFile,
[
JSON.stringify({
type: "session",
version: 1,
id: "concurrent-tree-session",
timestamp: new Date().toISOString(),
cwd: process.cwd(),
}),
JSON.stringify({
type: "message",
id: "root-message",
parentId: null,
timestamp: new Date().toISOString(),
message: { role: "user", content: "root" },
}),
].join("\n") + "\n",
"utf-8",
);
await Promise.all(
Array.from({ length: 8 }, (_, index) =>
appendSessionTranscriptMessage({
transcriptPath: sessionFile,
message: { role: "assistant", content: `reply ${index}` },
}),
),
);
const records = fs
.readFileSync(sessionFile, "utf-8")
.trim()
.split("\n")
.map(
(line) =>
JSON.parse(line) as {
type?: string;
id?: string;
parentId?: string | null;
message?: { content?: string };
},
)
.filter((record) => record.type === "message");
expect(records).toHaveLength(9);
for (let index = 1; index < records.length; index += 1) {
expect(records[index]?.parentId).toBe(records[index - 1]?.id);
}
});
it("migrates small linear transcripts before appending", async () => {
const sessionFile = resolveSessionTranscriptPathInDir(
"small-linear-session",
fixture.sessionsDir(),
);
fs.writeFileSync(
sessionFile,
[
JSON.stringify({
type: "session",
version: 3,
id: "small-linear-session",
timestamp: new Date().toISOString(),
cwd: process.cwd(),
}),
JSON.stringify({
type: "message",
id: "legacy-first",
timestamp: new Date().toISOString(),
message: { role: "user", content: "legacy first" },
}),
JSON.stringify({
type: "message",
id: "legacy-second",
timestamp: new Date().toISOString(),
message: { role: "assistant", content: "legacy second" },
}),
].join("\n") + "\n",
"utf-8",
);
const appended = await appendSessionTranscriptMessage({
transcriptPath: sessionFile,
message: { role: "assistant", content: "new reply" },
});
const records = fs
.readFileSync(sessionFile, "utf-8")
.trim()
.split("\n")
.map(
(line) =>
JSON.parse(line) as {
type?: string;
id?: string;
parentId?: string | null;
message?: { content?: string };
},
);
const messages = records.filter((record) => record.type === "message");
expect(messages.map((record) => record.message?.content)).toEqual([
"legacy first",
"legacy second",
"new reply",
]);
expect(messages[0]).toMatchObject({ id: "legacy-first", parentId: null });
expect(messages[1]).toMatchObject({ id: "legacy-second", parentId: "legacy-first" });
expect(messages[2]).toMatchObject({
id: appended.messageId,
parentId: "legacy-second",
});
});
});

View File

@@ -13,6 +13,7 @@ import {
import { resolveAndPersistSessionFile } from "./session-file.js";
import { loadSessionStore, normalizeStoreSessionKey } from "./store.js";
import { parseSessionThreadInfo } from "./thread-info.js";
import { appendSessionTranscriptMessage } from "./transcript-append.js";
import { resolveMirroredTranscriptText } from "./transcript-mirror.js";
import type { SessionEntry } from "./types.js";
@@ -261,7 +262,11 @@ export async function appendExactAssistantMessageToSessionTranscript(params: {
? await transcriptHasIdempotencyKey(sessionFile, explicitIdempotencyKey)
: undefined;
if (existingMessageId) {
return { ok: true, sessionFile, messageId: existingMessageId };
return {
ok: true,
sessionFile,
messageId: existingMessageId === true ? (explicitIdempotencyKey ?? "") : existingMessageId,
};
}
const latestEquivalentAssistantId = isRedundantDeliveryMirror(params.message)
@@ -275,9 +280,10 @@ export async function appendExactAssistantMessageToSessionTranscript(params: {
...params.message,
...(explicitIdempotencyKey ? { idempotencyKey: explicitIdempotencyKey } : {}),
} as Parameters<SessionManager["appendMessage"]>[0];
const { SessionManager } = await loadPiCodingAgentModule();
const sessionManager = SessionManager.open(sessionFile);
const messageId = sessionManager.appendMessage(message);
const { messageId } = await appendSessionTranscriptMessage({
transcriptPath: sessionFile,
message,
});
switch (params.updateMode ?? "inline") {
case "inline":
@@ -295,7 +301,7 @@ export async function appendExactAssistantMessageToSessionTranscript(params: {
async function transcriptHasIdempotencyKey(
transcriptPath: string,
idempotencyKey: string,
): Promise<string | undefined> {
): Promise<string | true | undefined> {
try {
const raw = await fs.promises.readFile(transcriptPath, "utf-8");
for (const line of raw.split(/\r?\n/)) {
@@ -314,6 +320,9 @@ async function transcriptHasIdempotencyKey(
) {
return parsed.id;
}
if (parsed.message?.idempotencyKey === idempotencyKey) {
return true;
}
} catch {
continue;
}

View File

@@ -20,7 +20,7 @@ vi.mock("./http-utils.js", () => ({
vi.mock("./session-utils.js", () => ({
loadSessionEntry: loadSessionEntryMock,
readSessionMessages: readSessionMessagesMock,
readSessionMessagesAsync: readSessionMessagesMock,
}));
vi.mock("../agents/subagent-registry.js", () => ({

View File

@@ -23,7 +23,7 @@ import {
resolveOpenAiCompatibleHttpOperatorScopes,
} from "./http-utils.js";
import { authorizeOperatorScopesForMethod } from "./method-scopes.js";
import { loadSessionEntry, readSessionMessages } from "./session-utils.js";
import { loadSessionEntry, readSessionMessagesAsync } from "./session-utils.js";
const OUTGOING_IMAGE_ROUTE_PREFIX = "/api/chat/media/outgoing";
const DEFAULT_TRANSIENT_OUTGOING_IMAGE_TTL_MS = 15 * 60 * 1000;
@@ -717,7 +717,7 @@ async function getSessionManagedOutgoingAttachmentIndex(
}
}
const messages = readSessionMessages(sessionId, storePath, entry.sessionFile);
const messages = await readSessionMessagesAsync(sessionId, storePath, entry.sessionFile);
const index: SessionManagedOutgoingAttachmentIndex = new Set();
for (const message of messages) {
const meta = (message as { __openclaw?: { id?: string } } | null)?.__openclaw;

View File

@@ -4,7 +4,7 @@ import { artifactsHandlers, collectArtifactsFromMessages } from "./artifacts.js"
const hoisted = vi.hoisted(() => ({
getTaskSessionLookupByIdForStatus: vi.fn(),
loadSessionEntry: vi.fn(),
visitSessionMessages: vi.fn(),
visitSessionMessagesAsync: vi.fn(),
resolveSessionKeyForRun: vi.fn(),
}));
@@ -17,7 +17,7 @@ vi.mock("../session-utils.js", async () => {
return {
...actual,
loadSessionEntry: hoisted.loadSessionEntry,
visitSessionMessages: hoisted.visitSessionMessages,
visitSessionMessagesAsync: hoisted.visitSessionMessagesAsync,
};
});
@@ -67,8 +67,8 @@ describe("artifacts RPC handlers", () => {
});
function mockedMessages(messages: unknown[]) {
hoisted.visitSessionMessages.mockImplementation(
(_sessionId, _storePath, _sessionFile, visit) => {
hoisted.visitSessionMessagesAsync.mockImplementation(
async (_sessionId, _storePath, _sessionFile, visit) => {
messages.forEach((message, index) => visit(message, index + 1));
return messages.length;
},

View File

@@ -10,7 +10,7 @@ import {
validateArtifactsListParams,
} from "../protocol/index.js";
import { resolveSessionKeyForRun } from "../server-session-key.js";
import { loadSessionEntry, visitSessionMessages } from "../session-utils.js";
import { loadSessionEntry, visitSessionMessagesAsync } from "../session-utils.js";
import type { GatewayRequestHandlers, RespondFn } from "./types.js";
import { assertValidParams } from "./validation.js";
@@ -300,7 +300,9 @@ function resolveQuerySessionKey(query: ArtifactQuery): string | undefined {
return undefined;
}
function loadArtifacts(query: ArtifactQuery): { artifacts: ArtifactRecord[]; sessionKey?: string } {
async function loadArtifacts(
query: ArtifactQuery,
): Promise<{ artifacts: ArtifactRecord[]; sessionKey?: string }> {
const sessionKey = resolveQuerySessionKey(query);
if (!sessionKey) {
return { artifacts: [] };
@@ -311,7 +313,7 @@ function loadArtifacts(query: ArtifactQuery): { artifacts: ArtifactRecord[]; ses
return { sessionKey, artifacts: [] };
}
const artifacts: ArtifactRecord[] = [];
visitSessionMessages(sessionId, storePath, entry?.sessionFile, (message, seq) => {
await visitSessionMessagesAsync(sessionId, storePath, entry?.sessionFile, (message, seq) => {
collectArtifactsFromMessage({
message,
messageFallbackSeq: seq,
@@ -342,11 +344,11 @@ function requireQueryable(params: ArtifactQuery, respond: RespondFn): boolean {
return false;
}
function findArtifact(params: ArtifactsGetParams): {
async function findArtifact(params: ArtifactsGetParams): Promise<{
artifact?: ArtifactRecord;
sessionKey?: string;
} {
const loaded = loadArtifacts(params);
}> {
const loaded = await loadArtifacts(params);
return {
sessionKey: loaded.sessionKey,
artifact: loaded.artifacts.find((artifact) => artifact.id === params.artifactId),
@@ -359,14 +361,14 @@ function toSummary(artifact: ArtifactRecord): ArtifactSummary {
}
export const artifactsHandlers: GatewayRequestHandlers = {
"artifacts.list": ({ params, respond }) => {
"artifacts.list": async ({ params, respond }) => {
if (!assertValidParams(params, validateArtifactsListParams, "artifacts.list", respond)) {
return;
}
if (!requireQueryable(params, respond)) {
return;
}
const { artifacts, sessionKey } = loadArtifacts(params);
const { artifacts, sessionKey } = await loadArtifacts(params);
if (!sessionKey && (params.runId || params.taskId)) {
respond(
false,
@@ -377,14 +379,14 @@ export const artifactsHandlers: GatewayRequestHandlers = {
}
respond(true, { artifacts: artifacts.map(toSummary) });
},
"artifacts.get": ({ params, respond }) => {
"artifacts.get": async ({ params, respond }) => {
if (!assertValidParams(params, validateArtifactsGetParams, "artifacts.get", respond)) {
return;
}
if (!requireQueryable(params, respond)) {
return;
}
const { artifact } = findArtifact(params);
const { artifact } = await findArtifact(params);
if (!artifact) {
respond(
false,
@@ -397,7 +399,7 @@ export const artifactsHandlers: GatewayRequestHandlers = {
}
respond(true, { artifact: toSummary(artifact) });
},
"artifacts.download": ({ params, respond }) => {
"artifacts.download": async ({ params, respond }) => {
if (
!assertValidParams(params, validateArtifactsDownloadParams, "artifacts.download", respond)
) {
@@ -406,7 +408,7 @@ export const artifactsHandlers: GatewayRequestHandlers = {
if (!requireQueryable(params, respond)) {
return;
}
const { artifact } = findArtifact(params);
const { artifact } = await findArtifact(params);
if (!artifact) {
respond(
false,

View File

@@ -1,14 +1,10 @@
import { randomUUID } from "node:crypto";
import fs from "node:fs";
import { StringDecoder } from "node:string_decoder";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import type { SessionManager } from "@mariozechner/pi-coding-agent";
import { appendSessionTranscriptMessage } from "../../config/sessions/transcript-append.js";
import { formatErrorMessage } from "../../infra/errors.js";
import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js";
type AppendMessageArg = Parameters<SessionManager["appendMessage"]>[0];
const SESSION_MANAGER_APPEND_MAX_BYTES = 8 * 1024 * 1024;
export type GatewayInjectedAbortMeta = {
aborted: true;
origin: "rpc" | "stop-command";
@@ -46,78 +42,7 @@ function resolveInjectedAssistantContent(params: {
return [{ type: "text", text: `${labelPrefix}${params.message}` }];
}
function transcriptHasParentLinkedEntries(transcriptPath: string): boolean {
let fd: number | null = null;
try {
fd = fs.openSync(transcriptPath, "r");
const decoder = new StringDecoder("utf8");
const buffer = Buffer.allocUnsafe(64 * 1024);
let carry = "";
while (true) {
const bytesRead = fs.readSync(fd, buffer, 0, buffer.length, null);
if (bytesRead <= 0) {
break;
}
const text = carry + decoder.write(buffer.subarray(0, bytesRead));
const lines = text.split(/\r?\n/);
carry = lines.pop() ?? "";
for (const line of lines) {
if (lineHasParentLinkedEntry(line)) {
return true;
}
}
}
return lineHasParentLinkedEntry(carry + decoder.end());
} catch {
return true;
} finally {
if (fd !== null) {
fs.closeSync(fd);
}
}
}
function lineHasParentLinkedEntry(line: string): boolean {
if (!line.trim()) {
return false;
}
try {
const parsed = JSON.parse(line) as { type?: unknown; id?: unknown; parentId?: unknown };
return parsed.type !== "session" && typeof parsed.id === "string" && "parentId" in parsed;
} catch {
return false;
}
}
function shouldUseRawAppend(transcriptPath: string): boolean {
try {
const stat = fs.statSync(transcriptPath);
return (
stat.size > SESSION_MANAGER_APPEND_MAX_BYTES &&
!transcriptHasParentLinkedEntries(transcriptPath)
);
} catch {
return false;
}
}
function appendRawAssistantMessageToTranscript(params: {
transcriptPath: string;
message: AppendMessageArg & Record<string, unknown>;
now: number;
}): { messageId: string } {
const messageId = randomUUID();
const entry = {
type: "message",
id: messageId,
timestamp: new Date(params.now).toISOString(),
message: params.message,
};
fs.appendFileSync(params.transcriptPath, `${JSON.stringify(entry)}\n`, "utf-8");
return { messageId };
}
export function appendInjectedAssistantMessageToTranscript(params: {
export async function appendInjectedAssistantMessageToTranscript(params: {
transcriptPath: string;
message: string;
label?: string;
@@ -126,7 +51,7 @@ export function appendInjectedAssistantMessageToTranscript(params: {
idempotencyKey?: string;
abortMeta?: GatewayInjectedAbortMeta;
now?: number;
}): GatewayInjectedTranscriptAppendResult {
}): Promise<GatewayInjectedTranscriptAppendResult> {
const now = params.now ?? Date.now();
const usage = {
input: 0,
@@ -176,24 +101,12 @@ export function appendInjectedAssistantMessageToTranscript(params: {
};
try {
if (shouldUseRawAppend(params.transcriptPath)) {
const { messageId } = appendRawAssistantMessageToTranscript({
transcriptPath: params.transcriptPath,
message: messageBody,
now,
});
emitSessionTranscriptUpdate({
sessionFile: params.transcriptPath,
message: messageBody,
messageId,
});
return { ok: true, messageId, message: messageBody };
}
// IMPORTANT: Use SessionManager so the entry is attached to the current leaf via parentId.
// Raw jsonl appends break the parent chain and can hide compaction summaries from context.
const sessionManager = SessionManager.open(params.transcriptPath);
const messageId = sessionManager.appendMessage(messageBody);
const { messageId } = await appendSessionTranscriptMessage({
transcriptPath: params.transcriptPath,
message: messageBody,
now,
useRawWhenLinear: true,
});
emitSessionTranscriptUpdate({
sessionFile: params.transcriptPath,
message: messageBody,

View File

@@ -3,8 +3,8 @@ import { describe, expect, it } from "vitest";
import { appendInjectedAssistantMessageToTranscript } from "./chat-transcript-inject.js";
import { createTranscriptFixtureSync } from "./chat.test-helpers.js";
// Guardrail: Ensure gateway "injected" assistant transcript messages are appended via SessionManager,
// so they are attached to the current leaf with a `parentId` and do not sever compaction history.
// Guardrail: Gateway-injected assistant transcript messages must attach to the
// current leaf with a `parentId` and must not sever compaction history.
describe("gateway chat.inject transcript writes", () => {
it("appends a Pi session entry that includes parentId", async () => {
const { dir, transcriptPath } = createTranscriptFixtureSync({
@@ -13,7 +13,7 @@ describe("gateway chat.inject transcript writes", () => {
});
try {
const appended = appendInjectedAssistantMessageToTranscript({
const appended = await appendInjectedAssistantMessageToTranscript({
transcriptPath,
message: "hello",
});
@@ -55,7 +55,7 @@ describe("gateway chat.inject transcript writes", () => {
"utf-8",
);
const appended = appendInjectedAssistantMessageToTranscript({
const appended = await appendInjectedAssistantMessageToTranscript({
transcriptPath,
message: "hello",
});

View File

@@ -1,6 +1,7 @@
import fs from "node:fs";
import path from "node:path";
import { CURRENT_SESSION_VERSION, SessionManager } from "@mariozechner/pi-coding-agent";
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent";
import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload";
import { resolveAgentWorkspaceDir, resolveSessionAgentId } from "../../agents/agent-scope.js";
import { rewriteTranscriptEntriesInSessionFile } from "../../agents/pi-embedded-runner/transcript-rewrite.js";
@@ -94,13 +95,14 @@ import {
} from "../protocol/index.js";
import { CHAT_SEND_SESSION_KEY_MAX_LENGTH } from "../protocol/schema/primitives.js";
import { getMaxChatHistoryMessagesBytes } from "../server-constants.js";
import { readSessionTranscriptIndex } from "../session-transcript-index.fs.js";
import {
capArrayByJsonBytes,
loadSessionEntry,
resolveGatewayModelSupportsImages,
resolveGatewaySessionThinkingDefault,
resolveDeletedAgentIdFromSessionKey,
readRecentSessionMessages,
readRecentSessionMessagesAsync,
resolveSessionModelRef,
} from "../session-utils.js";
import { formatForLog } from "../ws-log.js";
@@ -959,31 +961,30 @@ async function rewriteChatSendUserTurnMediaPaths(params: {
if (!("MediaPath" in mediaFields)) {
return;
}
const sessionManager = SessionManager.open(params.transcriptPath);
const branch = sessionManager.getBranch();
const target = [...branch].toReversed().find((entry) => {
if (entry.type !== "message" || entry.message.role !== "user") {
const index = await readSessionTranscriptIndex(params.transcriptPath);
const target = index?.entries.toReversed().find((entry) => {
const message = entry.record.message as Record<string, unknown> | undefined;
if (!message || message.role !== "user") {
return false;
}
const existingPaths = Array.isArray((entry.message as { MediaPaths?: unknown }).MediaPaths)
? (entry.message as { MediaPaths?: unknown[] }).MediaPaths
const existingPaths = Array.isArray((message as { MediaPaths?: unknown }).MediaPaths)
? (message as { MediaPaths?: unknown[] }).MediaPaths
: undefined;
if (
(typeof (entry.message as { MediaPath?: unknown }).MediaPath === "string" &&
(entry.message as { MediaPath?: string }).MediaPath) ||
(typeof (message as { MediaPath?: unknown }).MediaPath === "string" &&
(message as { MediaPath?: string }).MediaPath) ||
(existingPaths && existingPaths.length > 0)
) {
return false;
}
return (
extractTranscriptUserText((entry.message as { content?: unknown }).content) === params.message
);
return extractTranscriptUserText((message as { content?: unknown }).content) === params.message;
});
if (!target || target.type !== "message") {
const targetMessage = target?.record.message as Record<string, unknown> | undefined;
if (!target || !target.id || !targetMessage) {
return;
}
const rewrittenMessage = {
...target.message,
...targetMessage,
...mediaFields,
};
await rewriteTranscriptEntriesInSessionFile({
@@ -993,7 +994,7 @@ async function rewriteChatSendUserTurnMediaPaths(params: {
replacements: [
{
entryId: target.id,
message: rewrittenMessage,
message: rewrittenMessage as AgentMessage,
},
],
},
@@ -1282,9 +1283,12 @@ function ensureTranscriptFile(params: { transcriptPath: string; sessionId: strin
}
}
function transcriptHasIdempotencyKey(transcriptPath: string, idempotencyKey: string): boolean {
async function transcriptHasIdempotencyKey(
transcriptPath: string,
idempotencyKey: string,
): Promise<boolean> {
try {
const lines = fs.readFileSync(transcriptPath, "utf-8").split(/\r?\n/);
const lines = (await fs.promises.readFile(transcriptPath, "utf-8")).split(/\r?\n/);
for (const line of lines) {
if (!line.trim()) {
continue;
@@ -1300,7 +1304,7 @@ function transcriptHasIdempotencyKey(transcriptPath: string, idempotencyKey: str
}
}
function appendAssistantTranscriptMessage(params: {
async function appendAssistantTranscriptMessage(params: {
message: string;
label?: string;
content?: Array<Record<string, unknown>>;
@@ -1315,7 +1319,7 @@ function appendAssistantTranscriptMessage(params: {
origin: AbortOrigin;
runId: string;
};
}): TranscriptAppendResult {
}): Promise<TranscriptAppendResult> {
const transcriptPath = resolveTranscriptPath({
sessionId: params.sessionId,
storePath: params.storePath,
@@ -1339,11 +1343,14 @@ function appendAssistantTranscriptMessage(params: {
}
}
if (params.idempotencyKey && transcriptHasIdempotencyKey(transcriptPath, params.idempotencyKey)) {
if (
params.idempotencyKey &&
(await transcriptHasIdempotencyKey(transcriptPath, params.idempotencyKey))
) {
return { ok: true };
}
return appendInjectedAssistantMessageToTranscript({
return await appendInjectedAssistantMessageToTranscript({
transcriptPath,
message: params.message,
label: params.label,
@@ -1378,18 +1385,18 @@ function collectSessionAbortPartials(params: {
return out;
}
function persistAbortedPartials(params: {
async function persistAbortedPartials(params: {
context: Pick<GatewayRequestContext, "logGateway">;
sessionKey: string;
snapshots: AbortedPartialSnapshot[];
}) {
}): Promise<void> {
if (params.snapshots.length === 0) {
return;
}
const { storePath, entry } = loadSessionEntry(params.sessionKey);
for (const snapshot of params.snapshots) {
const sessionId = entry?.sessionId ?? snapshot.sessionId ?? snapshot.runId;
const appended = appendAssistantTranscriptMessage({
const appended = await appendAssistantTranscriptMessage({
message: snapshot.text,
sessionId,
storePath,
@@ -1520,14 +1527,14 @@ function resolveAuthorizedRunIdsForSession(params: {
};
}
function abortChatRunsForSessionKeyWithPartials(params: {
async function abortChatRunsForSessionKeyWithPartials(params: {
context: GatewayRequestContext;
ops: ChatAbortOps;
sessionKey: string;
abortOrigin: AbortOrigin;
stopReason?: string;
requester: ChatAbortRequester;
}) {
}): Promise<{ aborted: boolean; runIds: string[]; unauthorized: boolean }> {
const { matchedSessionRuns, authorizedRunIds } = resolveAuthorizedRunIdsForSession({
chatAbortControllers: params.context.chatAbortControllers,
sessionKey: params.sessionKey,
@@ -1560,7 +1567,7 @@ function abortChatRunsForSessionKeyWithPartials(params: {
}
const res = { aborted: runIds.length > 0, runIds, unauthorized: false };
if (res.aborted) {
persistAbortedPartials({
await persistAbortedPartials({
context: params.context,
sessionKey: params.sessionKey,
snapshots,
@@ -1669,7 +1676,7 @@ export const chatHandlers: GatewayRequestHandlers = {
const maxHistoryBytes = getMaxChatHistoryMessagesBytes();
const localMessages =
sessionId && storePath
? readRecentSessionMessages(sessionId, storePath, entry?.sessionFile, {
? await readRecentSessionMessagesAsync(sessionId, storePath, entry?.sessionFile, {
maxMessages: max,
maxBytes: Math.max(maxHistoryBytes * 2, 1024 * 1024),
})
@@ -1728,7 +1735,7 @@ export const chatHandlers: GatewayRequestHandlers = {
verboseLevel,
});
},
"chat.abort": ({ params, respond, context, client }) => {
"chat.abort": async ({ params, respond, context, client }) => {
if (!validateChatAbortParams(params)) {
respond(
false,
@@ -1749,7 +1756,7 @@ export const chatHandlers: GatewayRequestHandlers = {
const requester = resolveChatAbortRequester(client);
if (!runId) {
const res = abortChatRunsForSessionKeyWithPartials({
const res = await abortChatRunsForSessionKeyWithPartials({
context,
ops,
sessionKey: rawSessionKey,
@@ -1790,7 +1797,7 @@ export const chatHandlers: GatewayRequestHandlers = {
stopReason: "rpc",
});
if (res.aborted && partialText && partialText.trim()) {
persistAbortedPartials({
await persistAbortedPartials({
context,
sessionKey: rawSessionKey,
snapshots: [
@@ -1944,7 +1951,7 @@ export const chatHandlers: GatewayRequestHandlers = {
}
if (stopCommand) {
const res = abortChatRunsForSessionKeyWithPartials({
const res = await abortChatRunsForSessionKeyWithPartials({
context,
ops: createChatAbortOps(context),
sessionKey: rawSessionKey,
@@ -2262,7 +2269,7 @@ export const chatHandlers: GatewayRequestHandlers = {
if (!transcriptReply && !persistedAssistantContent?.length && !assistantContent?.length) {
return;
}
const appended = appendAssistantTranscriptMessage({
const appended = await appendAssistantTranscriptMessage({
message: transcriptReply,
...(persistedContentForAppend?.length ? { content: persistedContentForAppend } : {}),
sessionId,
@@ -2467,7 +2474,7 @@ export const chatHandlers: GatewayRequestHandlers = {
persistedContentForAppend?.length ||
assistantContent?.length
) {
const appended = appendAssistantTranscriptMessage({
const appended = await appendAssistantTranscriptMessage({
message: transcriptReply,
...(persistedContentForAppend?.length
? { content: persistedContentForAppend }
@@ -2625,7 +2632,7 @@ export const chatHandlers: GatewayRequestHandlers = {
return;
}
const appended = appendAssistantTranscriptMessage({
const appended = await appendAssistantTranscriptMessage({
message: p.message,
label: p.label,
sessionId,

View File

@@ -587,7 +587,7 @@ describe("sanitizeChatSendMessageInput", () => {
});
describe("gateway chat transcript writes (guardrail)", () => {
it("routes transcript writes through helper and SessionManager parentId append", () => {
it("routes transcript writes through helper and async parentId append", () => {
const chatTs = fileURLToPath(new URL("./chat.ts", import.meta.url));
const chatSrc = fs.readFileSync(chatTs, "utf-8");
const helperTs = fileURLToPath(new URL("./chat-transcript-inject.ts", import.meta.url));
@@ -596,10 +596,9 @@ describe("gateway chat transcript writes (guardrail)", () => {
expect(chatSrc.includes("fs.appendFileSync(transcriptPath")).toBe(false);
expect(chatSrc).toContain("appendInjectedAssistantMessageToTranscript(");
expect(helperSrc).toContain("function shouldUseRawAppend(");
expect(helperSrc).toContain("function appendRawAssistantMessageToTranscript(");
expect(helperSrc).toContain("SessionManager.open(params.transcriptPath)");
expect(helperSrc).toContain("appendMessage(messageBody)");
expect(helperSrc).toContain("appendSessionTranscriptMessage({");
expect(helperSrc).toContain("useRawWhenLinear: true");
expect(helperSrc).not.toContain("SessionManager.open(params.transcriptPath)");
});
});

View File

@@ -77,9 +77,9 @@ import {
loadGatewaySessionRow,
loadSessionEntry,
migrateAndPruneGatewaySessionStoreKey,
readRecentSessionMessagesWithStats,
readRecentSessionMessagesWithStatsAsync,
readRecentSessionTranscriptLines,
readSessionMessageCount,
readSessionMessageCountAsync,
readSessionPreviewItemsFromTranscript,
resolveDeletedAgentIdFromSessionKey,
resolveFreshestSessionEntryFromStoreKeys,
@@ -571,7 +571,8 @@ async function handleSessionSend(params: {
interruptedActiveRun = interruptResult.interrupted;
}
const messageSeq = readSessionMessageCount(entry.sessionId, storePath, entry.sessionFile) + 1;
const messageSeq =
(await readSessionMessageCountAsync(entry.sessionId, storePath, entry.sessionFile)) + 1;
let sendAcked = false;
let sendPayload: unknown;
let sendCached = false;
@@ -985,11 +986,11 @@ export const sessionsHandlers: GatewayRequestHandlers = {
let runError: unknown;
let runMeta: Record<string, unknown> | undefined;
const messageSeq = initialMessage
? readSessionMessageCount(
? (await readSessionMessageCountAsync(
createdEntry.sessionId,
target.storePath,
createdEntry.sessionFile,
) + 1
)) + 1
: undefined;
if (initialMessage) {
@@ -1674,7 +1675,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
});
}
},
"sessions.get": ({ params, respond, context }) => {
"sessions.get": async ({ params, respond, context }) => {
const p = params;
const key = requireSessionKey(p.key ?? p.sessionKey, respond);
if (!key) {
@@ -1695,7 +1696,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
respond(true, { messages: [] }, undefined);
return;
}
const { messages } = readRecentSessionMessagesWithStats(
const { messages } = await readRecentSessionMessagesWithStatsAsync(
entry.sessionId,
storePath,
entry.sessionFile,

View File

@@ -11,7 +11,7 @@ import {
attachOpenClawTranscriptMeta,
loadGatewaySessionRow,
loadSessionEntry,
readSessionMessageCount,
readSessionMessageCountAsync,
type GatewaySessionRow,
} from "./session-utils.js";
@@ -88,67 +88,81 @@ export function createTranscriptUpdateBroadcastHandler(params: {
sessionEventSubscribers: SessionEventSubscribers;
sessionMessageSubscribers: SessionMessageSubscribers;
}) {
let broadcastQueue = Promise.resolve();
return (update: SessionTranscriptUpdate): void => {
const sessionKey = update.sessionKey ?? resolveSessionKeyForTranscriptFile(update.sessionFile);
if (!sessionKey || update.message === undefined) {
return;
}
const connIds = new Set<string>();
for (const connId of params.sessionEventSubscribers.getAll()) {
connIds.add(connId);
}
for (const connId of params.sessionMessageSubscribers.get(sessionKey)) {
connIds.add(connId);
}
if (connIds.size === 0) {
return;
}
const { entry, storePath } = loadSessionEntry(sessionKey);
const messageSeq = entry?.sessionId
? readSessionMessageCount(entry.sessionId, storePath, entry.sessionFile)
: undefined;
const sessionSnapshot = buildGatewaySessionSnapshot({
sessionRow: loadGatewaySessionRow(sessionKey),
includeSession: true,
});
const rawMessage = attachOpenClawTranscriptMeta(update.message, {
...(typeof update.messageId === "string" ? { id: update.messageId } : {}),
...(typeof messageSeq === "number" ? { seq: messageSeq } : {}),
});
const message = projectChatDisplayMessage(rawMessage);
if (message) {
params.broadcastToConnIds(
"session.message",
{
sessionKey,
message,
...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}),
...(typeof messageSeq === "number" ? { messageSeq } : {}),
...sessionSnapshot,
},
connIds,
{ dropIfSlow: true },
);
}
broadcastQueue = broadcastQueue
.then(() => handleTranscriptUpdateBroadcast(params, update))
.catch(() => undefined);
};
}
const sessionEventConnIds = params.sessionEventSubscribers.getAll();
if (sessionEventConnIds.size === 0) {
return;
}
async function handleTranscriptUpdateBroadcast(
params: {
broadcastToConnIds: GatewayBroadcastToConnIdsFn;
sessionEventSubscribers: SessionEventSubscribers;
sessionMessageSubscribers: SessionMessageSubscribers;
},
update: SessionTranscriptUpdate,
): Promise<void> {
const sessionKey = update.sessionKey ?? resolveSessionKeyForTranscriptFile(update.sessionFile);
if (!sessionKey || update.message === undefined) {
return;
}
const connIds = new Set<string>();
for (const connId of params.sessionEventSubscribers.getAll()) {
connIds.add(connId);
}
for (const connId of params.sessionMessageSubscribers.get(sessionKey)) {
connIds.add(connId);
}
if (connIds.size === 0) {
return;
}
const { entry, storePath } = loadSessionEntry(sessionKey);
const messageSeq = entry?.sessionId
? await readSessionMessageCountAsync(entry.sessionId, storePath, entry.sessionFile)
: undefined;
const sessionSnapshot = buildGatewaySessionSnapshot({
sessionRow: loadGatewaySessionRow(sessionKey, { transcriptUsageMaxBytes: 64 * 1024 }),
includeSession: true,
});
const rawMessage = attachOpenClawTranscriptMeta(update.message, {
...(typeof update.messageId === "string" ? { id: update.messageId } : {}),
...(typeof messageSeq === "number" ? { seq: messageSeq } : {}),
});
const message = projectChatDisplayMessage(rawMessage);
if (message) {
params.broadcastToConnIds(
"sessions.changed",
"session.message",
{
sessionKey,
phase: "message",
ts: Date.now(),
message,
...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}),
...(typeof messageSeq === "number" ? { messageSeq } : {}),
...sessionSnapshot,
},
sessionEventConnIds,
connIds,
{ dropIfSlow: true },
);
};
}
const sessionEventConnIds = params.sessionEventSubscribers.getAll();
if (sessionEventConnIds.size === 0) {
return;
}
params.broadcastToConnIds(
"sessions.changed",
{
sessionKey,
phase: "message",
ts: Date.now(),
...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}),
...(typeof messageSeq === "number" ? { messageSeq } : {}),
...sessionSnapshot,
},
sessionEventConnIds,
{ dropIfSlow: true },
);
}
export function createLifecycleEventBroadcastHandler(params: {

View File

@@ -5,7 +5,9 @@ import {
import {
attachOpenClawTranscriptMeta,
readRecentSessionMessagesWithStats,
readRecentSessionMessagesWithStatsAsync,
readSessionMessages,
readSessionMessagesAsync,
} from "./session-utils.js";
type SessionHistoryTranscriptMeta = {
@@ -272,6 +274,25 @@ export class SessionHistorySseState {
return snapshot.history;
}
async refreshAsync(): Promise<PaginatedSessionHistory> {
const rawSnapshot = await this.readRawSnapshotAsync();
const snapshot = buildSessionHistorySnapshot({
rawMessages: rawSnapshot.rawMessages,
maxChars: this.maxChars,
limit: this.limit,
cursor: this.cursor,
...(typeof rawSnapshot.rawTranscriptSeq === "number"
? { rawTranscriptSeq: rawSnapshot.rawTranscriptSeq }
: {}),
...(typeof rawSnapshot.totalRawMessages === "number"
? { totalRawMessages: rawSnapshot.totalRawMessages }
: {}),
});
this.rawTranscriptSeq = snapshot.rawTranscriptSeq;
this.sentHistory = snapshot.history;
return snapshot.history;
}
private readRawSnapshot(): SessionHistoryRawSnapshot {
if (this.cursor === undefined && typeof this.limit === "number") {
const snapshot = readRecentSessionMessagesWithStats(
@@ -298,4 +319,27 @@ export class SessionHistorySseState {
this.target.sessionFile,
);
}
private async readRawSnapshotAsync(): Promise<SessionHistoryRawSnapshot> {
if (this.cursor === undefined && typeof this.limit === "number") {
const snapshot = await readRecentSessionMessagesWithStatsAsync(
this.target.sessionId,
this.target.storePath,
this.target.sessionFile,
resolveSessionHistoryTailReadOptions(this.limit),
);
return {
rawMessages: snapshot.messages,
rawTranscriptSeq: snapshot.totalMessages,
totalRawMessages: snapshot.totalMessages,
};
}
return {
rawMessages: await readSessionMessagesAsync(
this.target.sessionId,
this.target.storePath,
this.target.sessionFile,
),
};
}
}

View File

@@ -46,7 +46,7 @@ import {
import {
loadSessionEntry,
migrateAndPruneGatewaySessionStoreKey,
readSessionMessages,
readSessionMessagesAsync,
resolveGatewaySessionStoreTarget,
resolveSessionModelRef,
} from "./session-utils.js";
@@ -433,14 +433,14 @@ export async function cleanupSessionBeforeMutation(params: {
});
}
function emitGatewayBeforeResetPluginHook(params: {
async function emitGatewayBeforeResetPluginHook(params: {
cfg: OpenClawConfig;
key: string;
target: ReturnType<typeof resolveGatewaySessionStoreTarget>;
storePath: string;
entry?: SessionEntry;
reason: "new" | "reset";
}): void {
}): Promise<void> {
const hookRunner = getGlobalHookRunner();
if (!hookRunner?.hasHooks("before_reset")) {
return;
@@ -454,7 +454,7 @@ function emitGatewayBeforeResetPluginHook(params: {
let messages: unknown[] = [];
try {
if (typeof sessionId === "string" && sessionId.trim().length > 0) {
messages = readSessionMessages(sessionId, params.storePath, sessionFile);
messages = await readSessionMessagesAsync(sessionId, params.storePath, sessionFile);
}
} catch (err) {
logVerbose(
@@ -630,7 +630,7 @@ export async function performGatewaySessionReset(params: {
store[primaryKey] = nextEntry;
return nextEntry;
});
emitGatewayBeforeResetPluginHook({
await emitGatewayBeforeResetPluginHook({
cfg,
key: params.key,
target,

View File

@@ -0,0 +1,247 @@
import fs from "node:fs";
import { StringDecoder } from "node:string_decoder";
const TRANSCRIPT_INDEX_READ_CHUNK_BYTES = 64 * 1024;
const MAX_TRANSCRIPT_INDEX_CACHE_ENTRIES = 256;
type ParsedTranscriptRecord = Record<string, unknown>;
export type IndexedTranscriptEntry = {
seq: number;
id?: string;
offset: number;
byteLength: number;
record: ParsedTranscriptRecord;
};
export type SessionTranscriptIndex = {
filePath: string;
mtimeMs: number;
size: number;
hasTreeEntries: boolean;
leafId?: string;
entries: IndexedTranscriptEntry[];
};
type IndexedRawEntry = {
id?: string;
parentId?: string | null;
offset: number;
byteLength: number;
record: ParsedTranscriptRecord;
};
type CacheEntry = {
mtimeMs: number;
size: number;
index: SessionTranscriptIndex;
};
const transcriptIndexCache = new Map<string, CacheEntry>();
function normalizeOptionalString(value: unknown): string | undefined {
return typeof value === "string" && value.trim().length > 0 ? value : undefined;
}
async function yieldTranscriptIndexScan(): Promise<void> {
await new Promise<void>((resolve) => setImmediate(resolve));
}
function touchCachedIndex(filePath: string, entry: CacheEntry): SessionTranscriptIndex {
transcriptIndexCache.delete(filePath);
transcriptIndexCache.set(filePath, entry);
return entry.index;
}
function setCachedIndex(filePath: string, entry: CacheEntry): void {
transcriptIndexCache.set(filePath, entry);
while (transcriptIndexCache.size > MAX_TRANSCRIPT_INDEX_CACHE_ENTRIES) {
const oldestKey = transcriptIndexCache.keys().next().value;
if (typeof oldestKey !== "string" || !oldestKey) {
break;
}
transcriptIndexCache.delete(oldestKey);
}
}
export function invalidateSessionTranscriptIndex(filePath: string): void {
transcriptIndexCache.delete(filePath);
}
export function clearSessionTranscriptIndexCache(): void {
transcriptIndexCache.clear();
}
function isIndexableTranscriptRecord(record: unknown): record is ParsedTranscriptRecord {
return Boolean(record && typeof record === "object" && !Array.isArray(record));
}
function isVisibleTranscriptRecord(record: ParsedTranscriptRecord): boolean {
return Boolean(record.message) || record.type === "compaction";
}
function isTreeTranscriptRecord(record: ParsedTranscriptRecord): boolean {
return record.type !== "session" && typeof record.id === "string" && "parentId" in record;
}
async function visitTranscriptJsonLines(
filePath: string,
visit: (line: string, offset: number, byteLength: number) => void,
): Promise<void> {
const handle = await fs.promises.open(filePath, "r");
try {
const decoder = new StringDecoder("utf8");
const buffer = Buffer.allocUnsafe(TRANSCRIPT_INDEX_READ_CHUNK_BYTES);
let carry = "";
let carryOffset = 0;
let nextOffset = 0;
while (true) {
const { bytesRead } = await handle.read(buffer, 0, buffer.length, null);
if (bytesRead <= 0) {
break;
}
const chunk = buffer.subarray(0, bytesRead);
const text = carry + decoder.write(chunk);
const lines = text.split("\n");
carry = lines.pop() ?? "";
let lineOffset = carryOffset;
for (const rawLine of lines) {
const line = rawLine.endsWith("\r") ? rawLine.slice(0, -1) : rawLine;
const byteLength = Buffer.byteLength(line, "utf8");
visit(line, lineOffset, byteLength);
lineOffset += Buffer.byteLength(rawLine, "utf8") + 1;
}
nextOffset += bytesRead;
carryOffset = nextOffset - Buffer.byteLength(carry, "utf8");
await yieldTranscriptIndexScan();
}
const tail = carry + decoder.end();
if (tail) {
const line = tail.endsWith("\r") ? tail.slice(0, -1) : tail;
visit(line, carryOffset, Buffer.byteLength(line, "utf8"));
}
} finally {
await handle.close();
}
}
function buildActiveTreeEntries(params: {
byId: Map<string, IndexedRawEntry>;
leafId?: string;
}): IndexedRawEntry[] {
const out: IndexedRawEntry[] = [];
const seen = new Set<string>();
let currentId = params.leafId;
while (currentId) {
if (seen.has(currentId)) {
return [];
}
seen.add(currentId);
const entry = params.byId.get(currentId);
if (!entry) {
return [];
}
out.push(entry);
currentId = entry.parentId ?? undefined;
}
return out.toReversed();
}
function toIndexedEntries(rawEntries: IndexedRawEntry[]): IndexedTranscriptEntry[] {
const entries: IndexedTranscriptEntry[] = [];
let seq = 0;
for (const entry of rawEntries) {
if (!isVisibleTranscriptRecord(entry.record)) {
continue;
}
seq += 1;
entries.push({
seq,
...(entry.id ? { id: entry.id } : {}),
offset: entry.offset,
byteLength: entry.byteLength,
record: entry.record,
});
}
return entries;
}
async function buildSessionTranscriptIndex(
filePath: string,
stat: fs.Stats,
): Promise<SessionTranscriptIndex> {
const rawEntries: IndexedRawEntry[] = [];
const byId = new Map<string, IndexedRawEntry>();
let hasTreeEntries = false;
let leafId: string | undefined;
await visitTranscriptJsonLines(filePath, (line, offset, byteLength) => {
if (!line.trim()) {
return;
}
let parsed: unknown;
try {
parsed = JSON.parse(line);
} catch {
return;
}
if (!isIndexableTranscriptRecord(parsed)) {
return;
}
const id = normalizeOptionalString(parsed.id);
const parentId =
parsed.parentId === null ? null : (normalizeOptionalString(parsed.parentId) ?? undefined);
const rawEntry: IndexedRawEntry = {
...(id ? { id } : {}),
...(parentId !== undefined ? { parentId } : {}),
offset,
byteLength,
record: parsed,
};
rawEntries.push(rawEntry);
if (isTreeTranscriptRecord(parsed) && id) {
hasTreeEntries = true;
leafId = id;
byId.set(id, rawEntry);
}
});
const activeRawEntries = hasTreeEntries ? buildActiveTreeEntries({ byId, leafId }) : rawEntries;
return {
filePath,
mtimeMs: stat.mtimeMs,
size: stat.size,
hasTreeEntries,
...(leafId ? { leafId } : {}),
entries: toIndexedEntries(activeRawEntries),
};
}
export async function readSessionTranscriptIndex(
filePath: string,
): Promise<SessionTranscriptIndex | null> {
let stat: fs.Stats;
try {
stat = await fs.promises.stat(filePath);
} catch {
transcriptIndexCache.delete(filePath);
return null;
}
if (!stat.isFile()) {
transcriptIndexCache.delete(filePath);
return null;
}
const cached = transcriptIndexCache.get(filePath);
if (cached && cached.mtimeMs === stat.mtimeMs && cached.size === stat.size) {
return touchCachedIndex(filePath, cached);
}
const index = await buildSessionTranscriptIndex(filePath, stat);
setCachedIndex(filePath, {
mtimeMs: stat.mtimeMs,
size: stat.size,
index,
});
return index;
}

View File

@@ -1,18 +1,25 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import { afterAll, afterEach, beforeAll, describe, expect, test, vi } from "vitest";
import { createToolSummaryPreviewTranscriptLines } from "./session-preview.test-helpers.js";
import { clearSessionTranscriptIndexCache } from "./session-transcript-index.fs.js";
import {
archiveSessionTranscripts,
readFirstUserMessageFromTranscript,
readLastMessagePreviewFromTranscript,
readLatestSessionUsageFromTranscript,
readLatestSessionUsageFromTranscriptAsync,
readRecentSessionUsageFromTranscript,
readRecentSessionMessagesAsync,
readRecentSessionMessages,
readRecentSessionMessagesWithStatsAsync,
readRecentSessionMessagesWithStats,
readRecentSessionTranscriptLines,
readSessionMessageCountAsync,
readSessionMessageCount,
readSessionMessagesAsync,
readSessionMessages,
readSessionTitleFieldsFromTranscript,
readSessionPreviewItemsFromTranscript,
@@ -594,6 +601,72 @@ describe("readSessionMessages", () => {
]);
});
test("preserves real sequence metadata for async bounded recent-message reads", async () => {
const sessionId = "test-session-recent-seq-async";
writeTranscript(tmpDir, sessionId, [
{ type: "session", version: 1, id: sessionId },
{ message: { role: "user", content: "old" } },
{ message: { role: "assistant", content: "middle" } },
{ message: { role: "user", content: "recent" } },
{ message: { role: "assistant", content: "latest" } },
]);
const readFileSpy = vi.spyOn(fs, "readFileSync");
try {
const result = await readRecentSessionMessagesWithStatsAsync(
sessionId,
storePath,
undefined,
{
maxMessages: 2,
maxBytes: 256,
},
);
expect(result.totalMessages).toBe(4);
expect(result.messages).toEqual([
expect.objectContaining({
content: "recent",
__openclaw: expect.objectContaining({ seq: 3 }),
}),
expect.objectContaining({
content: "latest",
__openclaw: expect.objectContaining({ seq: 4 }),
}),
]);
expect(readFileSpy).not.toHaveBeenCalled();
} finally {
readFileSpy.mockRestore();
}
});
test("honors byte caps for async recent-message reads", async () => {
const sessionId = "test-session-recent-async-byte-cap";
const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`);
const hugeContent = "huge ".repeat(4096);
const lines = [
JSON.stringify({ type: "session", version: 1, id: sessionId }),
JSON.stringify({ message: { role: "user", content: "old" } }),
JSON.stringify({ message: { role: "assistant", content: hugeContent } }),
JSON.stringify({ message: { role: "assistant", content: "tail" } }),
];
fs.writeFileSync(transcriptPath, `${lines.join("\n")}\n`, "utf-8");
const readFileSpy = vi.spyOn(fs, "readFileSync");
try {
const out = await readRecentSessionMessagesAsync(sessionId, storePath, undefined, {
maxMessages: 2,
maxBytes: 2048,
});
expect(out).toEqual([expect.objectContaining({ role: "assistant", content: "tail" })]);
expect(JSON.stringify(out)).not.toContain("huge");
expect(readFileSpy).not.toHaveBeenCalled();
} finally {
readFileSpy.mockRestore();
}
});
test("counts transcript messages without loading the whole file", () => {
const sessionId = "test-session-count-large";
const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`);
@@ -614,6 +687,96 @@ describe("readSessionMessages", () => {
}
});
test("counts transcript messages asynchronously without loading the whole file", async () => {
const sessionId = "test-session-count-large-async";
const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`);
const lines = [
JSON.stringify({ type: "session", version: 1, id: sessionId }),
...Array.from({ length: 2500 }, (_, index) =>
JSON.stringify({ message: { role: "user", content: `message ${index}` } }),
),
];
fs.writeFileSync(transcriptPath, lines.join("\n"), "utf-8");
const readFileSpy = vi.spyOn(fs, "readFileSync");
try {
expect(await readSessionMessageCountAsync(sessionId, storePath)).toBe(2500);
expect(readFileSpy).not.toHaveBeenCalled();
} finally {
readFileSpy.mockRestore();
}
});
test("reads active tree branch asynchronously without SessionManager.open", async () => {
const sessionId = "test-session-tree-async";
writeTranscript(tmpDir, sessionId, [
{ type: "session", version: 3, id: sessionId },
{
type: "message",
id: "user-1",
parentId: null,
message: { role: "user", content: "root" },
},
{
type: "message",
id: "assistant-1",
parentId: "user-1",
message: { role: "assistant", content: "active branch" },
},
{
type: "message",
id: "assistant-inactive",
parentId: "user-1",
message: { role: "assistant", content: "inactive branch" },
},
{
type: "message",
id: "user-2",
parentId: "assistant-1",
message: { role: "user", content: "latest active" },
},
]);
clearSessionTranscriptIndexCache();
const sessionManagerOpenSpy = vi.spyOn(SessionManager, "open");
const readFileSpy = vi.spyOn(fs, "readFileSync");
try {
const messages = await readSessionMessagesAsync(sessionId, storePath);
expect(messages.map((message) => (message as { content?: unknown }).content)).toEqual([
"root",
"active branch",
"latest active",
]);
expect(messages[2]).toMatchObject({
__openclaw: expect.objectContaining({ id: "user-2", seq: 3 }),
});
expect(sessionManagerOpenSpy).not.toHaveBeenCalled();
expect(readFileSpy).not.toHaveBeenCalled();
} finally {
sessionManagerOpenSpy.mockRestore();
readFileSpy.mockRestore();
}
});
test("caches async transcript indexes by file stats", async () => {
const sessionId = "test-session-index-cache";
writeTranscript(tmpDir, sessionId, [
{ type: "session", version: 1, id: sessionId },
{ message: { role: "user", content: "hello" } },
{ message: { role: "assistant", content: "hi" } },
]);
clearSessionTranscriptIndexCache();
expect(await readSessionMessageCountAsync(sessionId, storePath)).toBe(2);
const openSpy = vi.spyOn(fs.promises, "open");
try {
expect(await readSessionMessageCountAsync(sessionId, storePath)).toBe(2);
expect(openSpy).not.toHaveBeenCalled();
} finally {
openSpy.mockRestore();
}
});
test("tails transcript lines for manual compaction without loading the whole file", () => {
const sessionId = "test-session-line-tail";
const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`);
@@ -982,6 +1145,55 @@ describe("readLatestSessionUsageFromTranscript", () => {
expect(snapshot?.costUsd).toBeCloseTo(0.0115, 8);
});
test("aggregates assistant usage asynchronously without readFileSync", async () => {
const sessionId = "usage-aggregate-async";
writeTranscript(tmpDir, sessionId, [
{ type: "session", version: 1, id: sessionId },
{
message: {
role: "assistant",
provider: "anthropic",
model: "claude-sonnet-4-6",
usage: {
input: 1_800,
output: 400,
cacheRead: 600,
cost: { total: 0.0055 },
},
},
},
{
message: {
role: "assistant",
usage: {
input: 2_400,
output: 250,
cacheRead: 900,
cost: { total: 0.006 },
},
},
},
]);
const readFileSpy = vi.spyOn(fs, "readFileSync");
try {
const snapshot = await readLatestSessionUsageFromTranscriptAsync(sessionId, storePath);
expect(snapshot).toMatchObject({
modelProvider: "anthropic",
model: "claude-sonnet-4-6",
inputTokens: 4200,
outputTokens: 650,
cacheRead: 1500,
totalTokens: 3300,
totalTokensFresh: true,
});
expect(snapshot?.costUsd).toBeCloseTo(0.0115, 8);
expect(readFileSpy).not.toHaveBeenCalled();
} finally {
readFileSpy.mockRestore();
}
});
test("reads earlier assistant usage outside the old tail window", () => {
const sessionId = "usage-full-transcript";
const filler = "x".repeat(20_000);

View File

@@ -15,6 +15,10 @@ import {
archiveSessionTranscripts,
cleanupArchivedSessionTranscripts,
} from "./session-transcript-files.fs.js";
import {
readSessionTranscriptIndex,
type IndexedTranscriptEntry,
} from "./session-transcript-index.fs.js";
import type { SessionPreviewItem } from "./session-utils.types.js";
type SessionTitleFields = {
@@ -29,6 +33,16 @@ type SessionTitleFieldsCacheEntry = SessionTitleFields & {
const sessionTitleFieldsCache = new Map<string, SessionTitleFieldsCacheEntry>();
const MAX_SESSION_TITLE_FIELDS_CACHE_ENTRIES = 5000;
const transcriptMessageCountCache = new Map<
string,
{
mtimeMs: number;
size: number;
count: number;
}
>();
const MAX_TRANSCRIPT_MESSAGE_COUNT_CACHE_ENTRIES = 5000;
const TRANSCRIPT_ASYNC_READ_CHUNK_BYTES = 64 * 1024;
function readSessionTitleFieldsCacheKey(
filePath: string,
@@ -71,6 +85,39 @@ function setCachedSessionTitleFields(cacheKey: string, stat: fs.Stats, value: Se
}
}
function getCachedTranscriptMessageCount(filePath: string, stat: fs.Stats): number | null {
const cached = transcriptMessageCountCache.get(filePath);
if (!cached) {
return null;
}
if (cached.mtimeMs !== stat.mtimeMs || cached.size !== stat.size) {
transcriptMessageCountCache.delete(filePath);
return null;
}
transcriptMessageCountCache.delete(filePath);
transcriptMessageCountCache.set(filePath, cached);
return cached.count;
}
function setCachedTranscriptMessageCount(filePath: string, stat: fs.Stats, count: number): void {
transcriptMessageCountCache.set(filePath, {
mtimeMs: stat.mtimeMs,
size: stat.size,
count,
});
while (transcriptMessageCountCache.size > MAX_TRANSCRIPT_MESSAGE_COUNT_CACHE_ENTRIES) {
const oldestKey = transcriptMessageCountCache.keys().next().value;
if (typeof oldestKey !== "string" || !oldestKey) {
break;
}
transcriptMessageCountCache.delete(oldestKey);
}
}
async function yieldTranscriptScan(): Promise<void> {
await new Promise<void>((resolve) => setImmediate(resolve));
}
export function attachOpenClawTranscriptMeta(
message: unknown,
meta: Record<string, unknown>,
@@ -182,6 +229,12 @@ type ReadRecentSessionMessagesResult = {
const RECENT_SESSION_MESSAGES_DEFAULT_MAX_BYTES = 8 * 1024 * 1024;
type TailTranscriptRecord = {
id?: string;
parentId?: string | null;
record: Record<string, unknown>;
};
export function readRecentSessionMessages(
sessionId: string,
storePath: string | undefined,
@@ -253,6 +306,121 @@ export function readRecentSessionMessages(
);
}
async function readRecentTranscriptTailLinesAsync(
filePath: string,
stat: fs.Stats,
opts: ReadRecentSessionMessagesOptions,
): Promise<string[]> {
const maxMessages = Math.max(0, Math.floor(opts.maxMessages));
const maxBytes = Math.max(
1024,
Math.floor(opts.maxBytes ?? RECENT_SESSION_MESSAGES_DEFAULT_MAX_BYTES),
);
const readLen = Math.min(stat.size, maxBytes);
const readStart = Math.max(0, stat.size - readLen);
const maxLines = Math.max(maxMessages, Math.floor(opts.maxLines ?? maxMessages * 20 + 20));
const handle = await fs.promises.open(filePath, "r");
try {
const buffer = Buffer.alloc(readLen);
const { bytesRead } = await handle.read(buffer, 0, readLen, readStart);
if (bytesRead <= 0) {
return [];
}
return buffer
.toString("utf-8", 0, bytesRead)
.split(/\r?\n/)
.slice(readStart > 0 ? 1 : 0)
.filter((line) => line.trim().length > 0)
.slice(-maxLines);
} finally {
await handle.close();
}
}
function normalizeTailEntryString(value: unknown): string | undefined {
return typeof value === "string" && value.trim().length > 0 ? value : undefined;
}
function parseTailTranscriptRecord(line: string): TailTranscriptRecord | null {
try {
const parsed = JSON.parse(line) as unknown;
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
return null;
}
const record = parsed as Record<string, unknown>;
return {
...(normalizeTailEntryString(record.id) ? { id: normalizeTailEntryString(record.id) } : {}),
...(record.parentId === null
? { parentId: null }
: normalizeTailEntryString(record.parentId)
? { parentId: normalizeTailEntryString(record.parentId) }
: {}),
record,
};
} catch {
return null;
}
}
function tailRecordHasTreeLink(entry: TailTranscriptRecord): boolean {
return (
entry.record.type !== "session" &&
typeof entry.id === "string" &&
Object.hasOwn(entry.record, "parentId")
);
}
function selectBoundedActiveTailRecords(entries: TailTranscriptRecord[]): TailTranscriptRecord[] {
const byId = new Map<string, TailTranscriptRecord>();
let leafId: string | undefined;
for (const entry of entries) {
if (tailRecordHasTreeLink(entry) && entry.id) {
byId.set(entry.id, entry);
leafId = entry.id;
}
}
if (!leafId) {
return entries;
}
const selected: TailTranscriptRecord[] = [];
const seen = new Set<string>();
let currentId: string | undefined = leafId;
while (currentId) {
if (seen.has(currentId)) {
return [];
}
seen.add(currentId);
const entry = byId.get(currentId);
if (!entry) {
break;
}
selected.push(entry);
currentId = entry.parentId ?? undefined;
}
return selected.toReversed();
}
function parseRecentTranscriptTailMessages(lines: string[], maxMessages: number): unknown[] {
const entries = lines.flatMap((line) => {
const entry = parseTailTranscriptRecord(line);
return entry ? [entry] : [];
});
const selected = entries.some(tailRecordHasTreeLink)
? selectBoundedActiveTailRecords(entries)
: entries;
const messages: unknown[] = [];
let messageSeq = 0;
for (const entry of selected) {
const message = parsedSessionEntryToMessage(entry.record, messageSeq + 1);
if (message) {
messageSeq += 1;
messages.push(message);
}
}
return messages.slice(-maxMessages);
}
function visitTranscriptLines(filePath: string, visit: (line: string) => void): void {
const fd = fs.openSync(filePath, "r");
try {
@@ -280,6 +448,37 @@ function visitTranscriptLines(filePath: string, visit: (line: string) => void):
}
}
async function visitTranscriptLinesAsync(
filePath: string,
visit: (line: string) => void,
): Promise<void> {
const handle = await fs.promises.open(filePath, "r");
try {
const decoder = new StringDecoder("utf8");
const buffer = Buffer.allocUnsafe(TRANSCRIPT_ASYNC_READ_CHUNK_BYTES);
let carry = "";
while (true) {
const { bytesRead } = await handle.read(buffer, 0, buffer.length, null);
if (bytesRead <= 0) {
break;
}
const text = carry + decoder.write(buffer.subarray(0, bytesRead));
const lines = text.split(/\r?\n/);
carry = lines.pop() ?? "";
for (const line of lines) {
visit(line);
}
await yieldTranscriptScan();
}
const tail = carry + decoder.end();
if (tail) {
visit(tail);
}
} finally {
await handle.close();
}
}
function transcriptHasTreeEntries(filePath: string): boolean {
let hasTreeEntries = false;
try {
@@ -382,7 +581,88 @@ export function readSessionMessageCount(
storePath: string | undefined,
sessionFile?: string,
): number {
return visitSessionMessages(sessionId, storePath, sessionFile, () => undefined);
const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile);
if (!filePath) {
return 0;
}
let stat: fs.Stats | null = null;
try {
stat = fs.statSync(filePath);
const cached = getCachedTranscriptMessageCount(filePath, stat);
if (typeof cached === "number") {
return cached;
}
} catch {
// Count from the transcript reader below when stat metadata is unavailable.
}
const count = visitSessionMessages(sessionId, storePath, sessionFile, () => undefined);
if (stat) {
setCachedTranscriptMessageCount(filePath, stat, count);
}
return count;
}
export async function readSessionMessagesAsync(
sessionId: string,
storePath: string | undefined,
sessionFile?: string,
): Promise<unknown[]> {
const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile);
if (!filePath) {
return [];
}
const index = await readSessionTranscriptIndex(filePath);
return index?.entries.flatMap((entry) => indexedTranscriptEntryToMessages(entry)) ?? [];
}
export async function visitSessionMessagesAsync(
sessionId: string,
storePath: string | undefined,
sessionFile: string | undefined,
visit: (message: unknown, seq: number) => void,
): Promise<number> {
const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile);
if (!filePath) {
return 0;
}
const index = await readSessionTranscriptIndex(filePath);
if (!index) {
return 0;
}
for (const entry of index.entries) {
const message = indexedTranscriptEntryToMessage(entry);
if (message) {
visit(message, entry.seq);
}
}
return index.entries.length;
}
export async function readSessionMessageCountAsync(
sessionId: string,
storePath: string | undefined,
sessionFile?: string,
): Promise<number> {
const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile);
if (!filePath) {
return 0;
}
let stat: fs.Stats | null = null;
try {
stat = await fs.promises.stat(filePath);
const cached = getCachedTranscriptMessageCount(filePath, stat);
if (typeof cached === "number") {
return cached;
}
} catch {
// Count from the transcript reader below when stat metadata is unavailable.
}
const index = await readSessionTranscriptIndex(filePath);
const count = index?.entries.length ?? 0;
if (stat) {
setCachedTranscriptMessageCount(filePath, stat, count);
}
return count;
}
export function readRecentSessionMessagesWithStats(
@@ -400,6 +680,53 @@ export function readRecentSessionMessagesWithStats(
return { messages: messagesWithSeq, totalMessages };
}
export async function readRecentSessionMessagesAsync(
sessionId: string,
storePath: string | undefined,
sessionFile?: string,
opts?: ReadRecentSessionMessagesOptions,
): Promise<unknown[]> {
const maxMessages = Math.max(0, Math.floor(opts?.maxMessages ?? 0));
if (maxMessages === 0) {
return [];
}
const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile);
if (!filePath) {
return [];
}
let stat: fs.Stats;
try {
stat = await fs.promises.stat(filePath);
} catch {
return [];
}
if (stat.size === 0) {
return [];
}
const lines = await readRecentTranscriptTailLinesAsync(filePath, stat, {
...opts,
maxMessages,
});
return parseRecentTranscriptTailMessages(lines, maxMessages);
}
export async function readRecentSessionMessagesWithStatsAsync(
sessionId: string,
storePath: string | undefined,
sessionFile: string | undefined,
opts: ReadRecentSessionMessagesOptions,
): Promise<ReadRecentSessionMessagesResult> {
const totalMessages = await readSessionMessageCountAsync(sessionId, storePath, sessionFile);
const messages = await readRecentSessionMessagesAsync(sessionId, storePath, sessionFile, opts);
const firstSeq = Math.max(1, totalMessages - messages.length + 1);
const messagesWithSeq = messages.map((message, index) =>
attachOpenClawTranscriptMeta(message, { seq: firstSeq + index }),
);
return { messages: messagesWithSeq, totalMessages };
}
export function readRecentSessionTranscriptLines(params: {
sessionId: string;
storePath: string | undefined;
@@ -479,6 +806,15 @@ function parsedSessionEntryToMessage(parsed: unknown, seq: number): unknown {
return null;
}
function indexedTranscriptEntryToMessage(entry: IndexedTranscriptEntry): unknown {
return parsedSessionEntryToMessage(entry.record, entry.seq);
}
function indexedTranscriptEntryToMessages(entry: IndexedTranscriptEntry): unknown[] {
const message = indexedTranscriptEntryToMessage(entry);
return message ? [message] : [];
}
export {
archiveFileOnDisk,
archiveSessionTranscripts,
@@ -584,6 +920,68 @@ export function readSessionTitleFieldsFromTranscript(
}
}
export async function readSessionTitleFieldsFromTranscriptAsync(
sessionId: string,
storePath: string | undefined,
sessionFile?: string,
agentId?: string,
opts?: { includeInterSession?: boolean },
): Promise<SessionTitleFields> {
const candidates = resolveSessionTranscriptCandidates(sessionId, storePath, sessionFile, agentId);
const filePath = candidates.find((p) => fs.existsSync(p));
if (!filePath) {
return { firstUserMessage: null, lastMessagePreview: null };
}
let stat: fs.Stats;
try {
stat = await fs.promises.stat(filePath);
} catch {
return { firstUserMessage: null, lastMessagePreview: null };
}
const cacheKey = readSessionTitleFieldsCacheKey(filePath, opts);
const cached = getCachedSessionTitleFields(cacheKey, stat);
if (cached) {
return cached;
}
const index = await readSessionTranscriptIndex(filePath);
if (!index) {
return { firstUserMessage: null, lastMessagePreview: null };
}
let firstUserMessage: string | null = null;
for (const entry of index.entries) {
const msg = entry.record.message as TranscriptMessage | undefined;
if (msg?.role !== "user") {
continue;
}
if (opts?.includeInterSession !== true && hasInterSessionUserProvenance(msg)) {
continue;
}
const text = extractTextFromContent(msg.content);
if (text) {
firstUserMessage = text;
break;
}
}
let lastMessagePreview: string | null = null;
for (const entry of index.entries.toReversed()) {
const msg = entry.record.message as TranscriptMessage | undefined;
if (!msg || (msg.role !== "user" && msg.role !== "assistant")) {
continue;
}
const text = extractTextFromContent(msg.content);
if (text) {
lastMessagePreview = text;
break;
}
}
const result = { firstUserMessage, lastMessagePreview };
setCachedSessionTitleFields(cacheKey, stat, result);
return result;
}
function extractTextFromContent(content: TranscriptMessage["content"]): string | null {
if (typeof content === "string") {
const normalized = stripInlineDirectiveTagsForDisplay(content).text.trim();
@@ -774,10 +1172,9 @@ function resolvePositiveUsageNumber(value: unknown): number | undefined {
return typeof value === "number" && Number.isFinite(value) && value > 0 ? value : undefined;
}
function extractLatestUsageFromTranscriptChunk(
chunk: string,
function extractLatestUsageFromTranscriptLines(
lines: Iterable<string>,
): SessionTranscriptUsageSnapshot | null {
const lines = chunk.split(/\r?\n/).filter((line) => line.trim().length > 0);
const snapshot: SessionTranscriptUsageSnapshot = {};
let sawSnapshot = false;
let inputTokens = 0;
@@ -898,6 +1295,14 @@ function extractLatestUsageFromTranscriptChunk(
return snapshot;
}
function extractLatestUsageFromTranscriptChunk(
chunk: string,
): SessionTranscriptUsageSnapshot | null {
return extractLatestUsageFromTranscriptLines(
chunk.split(/\r?\n/).filter((line) => line.trim().length > 0),
);
}
export function readLatestSessionUsageFromTranscript(
sessionId: string,
storePath: string | undefined,
@@ -919,6 +1324,34 @@ export function readLatestSessionUsageFromTranscript(
});
}
export async function readLatestSessionUsageFromTranscriptAsync(
sessionId: string,
storePath: string | undefined,
sessionFile?: string,
agentId?: string,
): Promise<SessionTranscriptUsageSnapshot | null> {
const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile, agentId);
if (!filePath) {
return null;
}
try {
const stat = await fs.promises.stat(filePath);
if (stat.size === 0) {
return null;
}
const lines: string[] = [];
await visitTranscriptLinesAsync(filePath, (line) => {
if (line.trim()) {
lines.push(line);
}
});
return extractLatestUsageFromTranscriptLines(lines);
} catch {
return null;
}
}
export function readRecentSessionUsageFromTranscript(
sessionId: string,
storePath: string | undefined,

View File

@@ -86,8 +86,8 @@ import {
resolveStoredSessionKeyForAgentStore,
} from "./session-store-key.js";
import {
readLatestSessionUsageFromTranscript,
readRecentSessionUsageFromTranscript,
readSessionTitleFieldsFromTranscriptAsync,
readSessionTitleFieldsFromTranscript,
} from "./session-utils.fs.js";
import type {
@@ -106,14 +106,21 @@ export {
readFirstUserMessageFromTranscript,
readLastMessagePreviewFromTranscript,
readLatestSessionUsageFromTranscript,
readLatestSessionUsageFromTranscriptAsync,
readRecentSessionMessages,
readRecentSessionMessagesAsync,
readRecentSessionMessagesWithStatsAsync,
readRecentSessionMessagesWithStats,
readRecentSessionTranscriptLines,
readRecentSessionUsageFromTranscript,
readSessionMessageCountAsync,
readSessionMessageCount,
readSessionTitleFieldsFromTranscript,
readSessionTitleFieldsFromTranscriptAsync,
readSessionPreviewItemsFromTranscript,
readSessionMessagesAsync,
readSessionMessages,
visitSessionMessagesAsync,
visitSessionMessages,
resolveSessionTranscriptCandidates,
} from "./session-utils.fs.js";
@@ -476,21 +483,13 @@ function resolveTranscriptUsageFallback(params: {
const agentId = parsed?.agentId
? normalizeAgentId(parsed.agentId)
: resolveDefaultAgentId(params.cfg);
const snapshot =
typeof params.maxTranscriptBytes === "number"
? readRecentSessionUsageFromTranscript(
entry.sessionId,
params.storePath,
entry.sessionFile,
agentId,
params.maxTranscriptBytes,
)
: readLatestSessionUsageFromTranscript(
entry.sessionId,
params.storePath,
entry.sessionFile,
agentId,
);
const snapshot = readRecentSessionUsageFromTranscript(
entry.sessionId,
params.storePath,
entry.sessionFile,
agentId,
typeof params.maxTranscriptBytes === "number" ? params.maxTranscriptBytes : 256 * 1024,
);
if (!snapshot) {
return null;
}
@@ -1661,7 +1660,12 @@ function resolveSessionListSearchDisplayName(
export function loadGatewaySessionRow(
sessionKey: string,
options?: { includeDerivedTitles?: boolean; includeLastMessage?: boolean; now?: number },
options?: {
includeDerivedTitles?: boolean;
includeLastMessage?: boolean;
now?: number;
transcriptUsageMaxBytes?: number;
},
): GatewaySessionRow | null {
const { cfg, storePath, store, entry, canonicalKey } = loadSessionEntry(sessionKey);
if (!entry) {
@@ -1676,6 +1680,7 @@ export function loadGatewaySessionRow(
now: options?.now,
includeDerivedTitles: options?.includeDerivedTitles,
includeLastMessage: options?.includeLastMessage,
transcriptUsageMaxBytes: options?.transcriptUsageMaxBytes,
});
}
@@ -1861,21 +1866,42 @@ export async function listSessionsFromStoreAsync(params: {
for (let i = 0; i < entries.length; i++) {
const [key, entry] = entries[i];
const includeTranscriptFields = i < sessionListTranscriptFieldRows;
sessions.push(
buildGatewaySessionRow({
cfg,
const row = buildGatewaySessionRow({
cfg,
storePath,
store,
key,
entry,
modelCatalog: params.modelCatalog,
now,
includeDerivedTitles: false,
includeLastMessage: false,
transcriptUsageMaxBytes: sessionListTranscriptUsageMaxBytes,
storeChildSessionsByKey,
});
if (
entry?.sessionId &&
includeTranscriptFields &&
(includeDerivedTitles || includeLastMessage)
) {
const parsed = parseAgentSessionKey(key);
const sessionAgentId = parsed?.agentId
? normalizeAgentId(parsed.agentId)
: resolveDefaultAgentId(cfg);
const fields = await readSessionTitleFieldsFromTranscriptAsync(
entry.sessionId,
storePath,
store,
key,
entry,
modelCatalog: params.modelCatalog,
now,
includeDerivedTitles: includeTranscriptFields && includeDerivedTitles,
includeLastMessage: includeTranscriptFields && includeLastMessage,
transcriptUsageMaxBytes: sessionListTranscriptUsageMaxBytes,
storeChildSessionsByKey,
}),
);
entry.sessionFile,
sessionAgentId,
);
if (includeDerivedTitles) {
row.derivedTitle = deriveSessionTitle(entry, fields.firstUserMessage);
}
if (includeLastMessage && fields.lastMessagePreview) {
row.lastMessagePreview = fields.lastMessagePreview;
}
}
sessions.push(row);
// Yield to the event loop between batches so WebSocket heartbeats,
// channel I/O, and concurrent RPC calls are not starved.
if ((i + 1) % SESSIONS_LIST_YIELD_BATCH_SIZE === 0 && i + 1 < entries.length) {

View File

@@ -91,7 +91,7 @@ vi.mock("./session-utils.js", () => ({
sessionId: "session-1",
sessionFile: "/tmp/session-1.jsonl",
}),
readSessionMessages: () => [],
readSessionMessagesAsync: async () => [],
resolveSessionTranscriptCandidates: () => ["/tmp/session-1.jsonl"],
}));
@@ -107,7 +107,7 @@ vi.mock("./session-history-state.js", () => ({
messageSeq: 1,
messageId,
}),
refresh: () => ({ items: [], nextCursor: null, messages: [] }),
refreshAsync: async () => ({ items: [], nextCursor: null, messages: [] }),
}),
},
}));

View File

@@ -31,8 +31,8 @@ import {
SessionHistorySseState,
} from "./session-history-state.js";
import {
readRecentSessionMessagesWithStats,
readSessionMessages,
readRecentSessionMessagesWithStatsAsync,
readSessionMessagesAsync,
resolveFreshestSessionEntryFromStoreKeys,
resolveGatewaySessionStoreTarget,
resolveSessionTranscriptCandidates,
@@ -156,7 +156,7 @@ export async function handleSessionHistoryHttpRequest(
: DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS;
const boundedSnapshot =
cursor === undefined && typeof limit === "number"
? readRecentSessionMessagesWithStats(
? await readRecentSessionMessagesWithStatsAsync(
entry.sessionId,
target.storePath,
entry.sessionFile,
@@ -168,7 +168,7 @@ export async function handleSessionHistoryHttpRequest(
const rawSnapshot =
boundedSnapshot?.messages ??
(entry?.sessionId
? readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile)
? await readSessionMessagesAsync(entry.sessionId, target.storePath, entry.sessionFile)
: []);
const historySnapshot = buildSessionHistorySnapshot({
rawMessages: rawSnapshot,
@@ -338,7 +338,7 @@ export async function handleSessionHistoryHttpRequest(
return;
}
}
sentHistory = sseState.refresh();
sentHistory = await sseState.refreshAsync();
sseWrite(res, "history", {
sessionKey: target.canonicalKey,
...sentHistory,

View File

@@ -33,7 +33,7 @@ import {
type SessionScope,
} from "../config/sessions.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { readLatestSessionUsageFromTranscript } from "../gateway/session-utils.fs.js";
import { readRecentSessionUsageFromTranscript } from "../gateway/session-utils.fs.js";
import { formatTimeAgo } from "../infra/format-time/format-relative.ts";
import { resolveCommitHash } from "../infra/git-commit.js";
import {
@@ -325,11 +325,12 @@ const readUsageFromSessionLog = (
}
try {
const snapshot = readLatestSessionUsageFromTranscript(
const snapshot = readRecentSessionUsageFromTranscript(
sessionId,
storePath,
sessionEntry?.sessionFile,
agentId ?? (sessionKey ? resolveAgentIdFromSessionKey(sessionKey) : undefined),
256 * 1024,
);
if (!snapshot) {
return undefined;

View File

@@ -72,7 +72,7 @@ vi.mock("../gateway/server-methods/chat.js", () => ({
vi.mock("../gateway/session-utils.js", () => ({
listAgentsForGateway: () => [],
listSessionsFromStore: () => ({ sessions: [] }),
listSessionsFromStoreAsync: async () => ({ sessions: [] }),
loadCombinedSessionStoreForGateway: () => ({
storePath: "/tmp/openclaw-sessions.json",
store: {},
@@ -83,7 +83,7 @@ vi.mock("../gateway/session-utils.js", () => ({
entry: {},
}),
migrateAndPruneGatewaySessionStoreKey: ({ key }: { key: string }) => ({ primaryKey: key }),
readSessionMessages: () => [],
readSessionMessagesAsync: async () => [],
resolveGatewaySessionStoreTarget: ({ key }: { key: string }) => ({
canonicalKey: key,
storePath: "/tmp/openclaw-sessions.json",

View File

@@ -34,13 +34,13 @@ import { performGatewaySessionReset } from "../gateway/session-reset-service.js"
import { capArrayByJsonBytes } from "../gateway/session-utils.fs.js";
import {
listAgentsForGateway,
listSessionsFromStore,
listSessionsFromStoreAsync,
loadCombinedSessionStoreForGateway,
loadSessionEntry,
migrateAndPruneGatewaySessionStoreKey,
resolveGatewaySessionStoreTarget,
resolveSessionModelRef,
readSessionMessages,
readSessionMessagesAsync,
} from "../gateway/session-utils.js";
import { applySessionsPatchToStore } from "../gateway/sessions-patch.js";
import { type AgentEventPayload, onAgentEvent } from "../infra/agent-events.js";
@@ -197,7 +197,9 @@ export class EmbeddedTuiBackend implements TuiBackend {
const sessionAgentId = resolveSessionAgentId({ sessionKey: opts.sessionKey, config: cfg });
const resolvedSessionModel = resolveSessionModelRef(cfg, entry, sessionAgentId);
const localMessages =
sessionId && storePath ? readSessionMessages(sessionId, storePath, entry?.sessionFile) : [];
sessionId && storePath
? await readSessionMessagesAsync(sessionId, storePath, entry?.sessionFile)
: [];
const rawMessages = augmentChatHistoryWithCliSessionImports({
entry,
provider: resolvedSessionModel.provider,
@@ -245,12 +247,12 @@ export class EmbeddedTuiBackend implements TuiBackend {
async listSessions(opts?: Parameters<TuiBackend["listSessions"]>[0]): Promise<TuiSessionList> {
const cfg = getRuntimeConfig();
const { storePath, store } = loadCombinedSessionStoreForGateway(cfg);
return listSessionsFromStore({
return (await listSessionsFromStoreAsync({
cfg,
storePath,
store,
opts: opts ?? {},
}) as TuiSessionList;
})) as TuiSessionList;
}
async listAgents(): Promise<TuiAgentsList> {