mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 07:40:44 +00:00
perf: bound async transcript history reads (#75977)
Summary: - The PR bounds async transcript history reads and shares async transcript-index builds across gateway history, embedded/TUI history, restart recovery, fork token checks, and preflight compaction paths. - Reproducibility: not applicable. this is a performance PR rather than a user bug report. The verification pa ... ource review plus the added unit coverage for bounded reads, usage snapshots, and concurrent index sharing. ClawSweeper fixups: - No separate fixup commits were needed after automerge opt-in. Validation: - ClawSweeper review passed for headccfe33658c. - Required merge gates passed before the squash merge. Prepared head SHA:ccfe33658cReview: https://github.com/openclaw/openclaw/pull/75977#issuecomment-4363170293 Co-authored-by: Peter Steinberger <steipete@gmail.com>
This commit is contained in:
committed by
GitHub
parent
3ec5afb09c
commit
4d9c658f40
@@ -47,6 +47,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Plugins/ClawHub: preserve official source-linked trust through archive installs, so OpenClaw can install trusted ClawHub plugin packages that trigger the built-in dangerous-pattern scanner. Thanks @vincentkoc.
|
||||
- Plugins/ClawHub: install package runtime dependencies for archive-backed plugin installs, so ClawHub packages such as WhatsApp load declared dependencies after download. Thanks @vincentkoc.
|
||||
- Providers/LM Studio: allow `models.providers.lmstudio.params.preload: false` to skip OpenClaw's native model-load call so LM Studio JIT loading, idle TTL, and auto-evict can own model lifecycle. Fixes #75921. Thanks @garyd9.
|
||||
- Agents/transcripts: keep chat history, restart recovery, fork token checks, and stale-token compaction checks on bounded async transcript reads or cached async indexes instead of reparsing large session files. Thanks @mariozechner.
|
||||
- Telegram: inherit the process DNS result order for Bot API transport and downgrade recovered sticky IPv4 fallback promotions to debug logs, while keeping pinned-IP escalation warnings visible. Fixes #75904. Thanks @highfly-hi and @neeravmakwana.
|
||||
- Sessions: keep durable external conversation pointers, including group and thread-scoped chat sessions, out of age, count, and disk-budget maintenance eviction while still allowing synthetic runtime entries to age out. Fixes #58088. Thanks @drinkflav.
|
||||
- Web search/MiniMax: allow `MINIMAX_OAUTH_TOKEN` to satisfy MiniMax Search credentials, so OAuth-authorized MiniMax Token Plan setups do not need a separate web-search key. Fixes #65768. Thanks @kikibrian and @zhouhe-xydt.
|
||||
|
||||
@@ -54,6 +54,12 @@ OpenClaw persists sessions in two layers:
|
||||
transcript exceeds the checkpoint size cap, avoiding a second giant
|
||||
`.checkpoint.*.jsonl` copy.
|
||||
|
||||
Gateway history readers should avoid materializing the whole transcript unless
|
||||
the surface explicitly needs arbitrary historical access. First-page history,
|
||||
embedded chat history, restart recovery, and token/usage checks use bounded tail
|
||||
reads. Full transcript scans go through the async transcript index, which is
|
||||
cached by file path plus `mtimeMs`/`size` and shared across concurrent readers.
|
||||
|
||||
---
|
||||
|
||||
## On-disk locations
|
||||
|
||||
@@ -230,6 +230,11 @@ async function recoverStore(params: {
|
||||
entry.sessionId,
|
||||
params.storePath,
|
||||
entry.sessionFile,
|
||||
{
|
||||
mode: "recent",
|
||||
maxMessages: 20,
|
||||
maxBytes: 256 * 1024,
|
||||
},
|
||||
);
|
||||
} catch (err) {
|
||||
log.warn(`failed to read transcript for ${sessionKey}: ${String(err)}`);
|
||||
|
||||
@@ -354,6 +354,11 @@ export async function recoverOrphanedSubagentSessions(params: {
|
||||
entry.sessionId,
|
||||
storePath,
|
||||
entry.sessionFile,
|
||||
{
|
||||
mode: "recent",
|
||||
maxMessages: 200,
|
||||
maxBytes: 1024 * 1024,
|
||||
},
|
||||
);
|
||||
const lastHumanMessage = [...messages]
|
||||
.toReversed()
|
||||
|
||||
@@ -91,10 +91,20 @@ describe("embedded gateway stub", () => {
|
||||
maxChars: 100_000,
|
||||
maxMessages: 200,
|
||||
});
|
||||
expect(runtime.readSessionMessagesAsync).toHaveBeenCalledWith(
|
||||
"sess-main",
|
||||
"/tmp/openclaw-sessions.json",
|
||||
undefined,
|
||||
{
|
||||
mode: "recent",
|
||||
maxMessages: 200,
|
||||
maxBytes: 1024 * 1024,
|
||||
},
|
||||
);
|
||||
expect(result.messages).toEqual(projectedMessages);
|
||||
});
|
||||
|
||||
it("passes the full raw history to projection before limiting visible messages", async () => {
|
||||
it("passes the requested recent history window to projection", async () => {
|
||||
const rawMessages = [
|
||||
{ role: "user", content: "visible older" },
|
||||
{ role: "assistant", content: "hidden newer" },
|
||||
@@ -111,5 +121,15 @@ describe("embedded gateway stub", () => {
|
||||
maxChars: 100_000,
|
||||
maxMessages: 1,
|
||||
});
|
||||
expect(runtime.readSessionMessagesAsync).toHaveBeenCalledWith(
|
||||
"sess-main",
|
||||
"/tmp/openclaw-sessions.json",
|
||||
undefined,
|
||||
{
|
||||
mode: "recent",
|
||||
maxMessages: 1,
|
||||
maxBytes: 1024 * 1024,
|
||||
},
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import type { CallGatewayOptions } from "../../gateway/call.js";
|
||||
import type { SessionsListParams, SessionsResolveParams } from "../../gateway/protocol/index.js";
|
||||
import type { ReadSessionMessagesAsyncOptions } from "../../gateway/session-utils.fs.js";
|
||||
import type { SessionsListResult } from "../../gateway/session-utils.types.js";
|
||||
import type { SessionsResolveResult } from "../../gateway/sessions-resolve.js";
|
||||
|
||||
@@ -52,7 +53,8 @@ interface EmbeddedGatewayRuntime {
|
||||
readSessionMessagesAsync: (
|
||||
sessionId: string,
|
||||
storePath: string,
|
||||
sessionFile?: string,
|
||||
sessionFile: string | undefined,
|
||||
opts: ReadSessionMessagesAsyncOptions,
|
||||
) => Promise<unknown[]>;
|
||||
resolveSessionModelRef: (
|
||||
cfg: OpenClawConfig,
|
||||
@@ -112,6 +114,11 @@ async function handleChatHistory(params: Record<string, unknown>): Promise<{
|
||||
const sessionId = entry?.sessionId as string | undefined;
|
||||
const sessionAgentId = rt.resolveSessionAgentId({ sessionKey, config: cfg });
|
||||
const resolvedSessionModel = rt.resolveSessionModelRef(cfg, entry, sessionAgentId);
|
||||
const hardMax = 1000;
|
||||
const defaultLimit = 200;
|
||||
const requested = typeof limit === "number" ? limit : defaultLimit;
|
||||
const max = Math.min(hardMax, requested);
|
||||
const maxHistoryBytes = rt.getMaxChatHistoryMessagesBytes();
|
||||
|
||||
const localMessages =
|
||||
sessionId && storePath
|
||||
@@ -119,6 +126,11 @@ async function handleChatHistory(params: Record<string, unknown>): Promise<{
|
||||
sessionId,
|
||||
storePath,
|
||||
entry?.sessionFile as string | undefined,
|
||||
{
|
||||
mode: "recent",
|
||||
maxMessages: max,
|
||||
maxBytes: Math.max(maxHistoryBytes * 2, 1024 * 1024),
|
||||
},
|
||||
)
|
||||
: [];
|
||||
|
||||
@@ -128,10 +140,6 @@ async function handleChatHistory(params: Record<string, unknown>): Promise<{
|
||||
localMessages,
|
||||
});
|
||||
|
||||
const hardMax = 1000;
|
||||
const defaultLimit = 200;
|
||||
const requested = typeof limit === "number" ? limit : defaultLimit;
|
||||
const max = Math.min(hardMax, requested);
|
||||
const effectiveMaxChars = rt.resolveEffectiveChatHistoryMaxChars(cfg);
|
||||
|
||||
const normalized = rt.augmentChatHistoryWithCanvasBlocks(
|
||||
@@ -141,7 +149,6 @@ async function handleChatHistory(params: Record<string, unknown>): Promise<{
|
||||
}),
|
||||
);
|
||||
|
||||
const maxHistoryBytes = rt.getMaxChatHistoryMessagesBytes();
|
||||
const perMessageHardCap = Math.min(rt.CHAT_HISTORY_MAX_SINGLE_MESSAGE_BYTES, maxHistoryBytes);
|
||||
const replaced = rt.replaceOversizedChatHistoryMessages({
|
||||
messages: normalized,
|
||||
|
||||
@@ -429,6 +429,168 @@ describe("runMemoryFlushIfNeeded", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("includes recent output tokens when deciding preflight compaction", async () => {
|
||||
const sessionFile = path.join(rootDir, "session-usage.jsonl");
|
||||
await fs.writeFile(
|
||||
sessionFile,
|
||||
`${JSON.stringify({
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: "large answer",
|
||||
usage: { input: 90_000, output: 10_000 },
|
||||
},
|
||||
})}\n`,
|
||||
"utf8",
|
||||
);
|
||||
registerMemoryFlushPlanResolver(() => ({
|
||||
softThresholdTokens: 4_000,
|
||||
forceFlushTranscriptBytes: 1_000_000_000,
|
||||
reserveTokensFloor: 0,
|
||||
prompt: "Pre-compaction memory flush.\nNO_REPLY",
|
||||
systemPrompt: "Write memory to memory/YYYY-MM-DD.md.",
|
||||
relativePath: "memory/2023-11-14.md",
|
||||
}));
|
||||
const sessionEntry: SessionEntry = {
|
||||
sessionId: "session",
|
||||
sessionFile,
|
||||
updatedAt: Date.now(),
|
||||
totalTokensFresh: false,
|
||||
};
|
||||
|
||||
await runPreflightCompactionIfNeeded({
|
||||
cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } },
|
||||
followupRun: createTestFollowupRun({
|
||||
sessionId: "session",
|
||||
sessionFile,
|
||||
sessionKey: "main",
|
||||
}),
|
||||
defaultModel: "anthropic/claude-opus-4-6",
|
||||
agentCfgContextTokens: 100_000,
|
||||
sessionEntry,
|
||||
sessionStore: { main: sessionEntry },
|
||||
sessionKey: "main",
|
||||
storePath: path.join(rootDir, "sessions.json"),
|
||||
isHeartbeat: false,
|
||||
replyOperation: createReplyOperation(),
|
||||
});
|
||||
|
||||
const compactCall = compactEmbeddedPiSessionMock.mock.calls[0]?.[0] as {
|
||||
currentTokenCount?: number;
|
||||
};
|
||||
expect(compactCall.currentTokenCount).toBeGreaterThanOrEqual(100_000);
|
||||
});
|
||||
|
||||
it("uses the active run sessionFile when the session entry has no transcript path", async () => {
|
||||
const sessionFile = path.join(rootDir, "active-run-session.jsonl");
|
||||
await fs.writeFile(
|
||||
sessionFile,
|
||||
`${JSON.stringify({
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: "large answer",
|
||||
usage: { input: 90_000, output: 8_000 },
|
||||
},
|
||||
})}\n`,
|
||||
"utf8",
|
||||
);
|
||||
registerMemoryFlushPlanResolver(() => ({
|
||||
softThresholdTokens: 4_000,
|
||||
forceFlushTranscriptBytes: 1_000_000_000,
|
||||
reserveTokensFloor: 0,
|
||||
prompt: "Pre-compaction memory flush.\nNO_REPLY",
|
||||
systemPrompt: "Write memory to memory/YYYY-MM-DD.md.",
|
||||
relativePath: "memory/2023-11-14.md",
|
||||
}));
|
||||
const sessionEntry: SessionEntry = {
|
||||
sessionId: "session",
|
||||
updatedAt: Date.now(),
|
||||
totalTokensFresh: false,
|
||||
};
|
||||
|
||||
await runPreflightCompactionIfNeeded({
|
||||
cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } },
|
||||
followupRun: createTestFollowupRun({
|
||||
sessionId: "session",
|
||||
sessionFile,
|
||||
sessionKey: "main",
|
||||
}),
|
||||
defaultModel: "anthropic/claude-opus-4-6",
|
||||
agentCfgContextTokens: 100_000,
|
||||
sessionEntry,
|
||||
sessionStore: { main: sessionEntry },
|
||||
sessionKey: "main",
|
||||
storePath: path.join(rootDir, "sessions.json"),
|
||||
isHeartbeat: false,
|
||||
replyOperation: createReplyOperation(),
|
||||
});
|
||||
|
||||
expect(compactEmbeddedPiSessionMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
sessionId: "session",
|
||||
sessionFile: expect.stringContaining("active-run-session.jsonl"),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("keeps preflight compaction conservative for content appended after latest usage", async () => {
|
||||
const sessionFile = path.join(rootDir, "post-usage-tail-session.jsonl");
|
||||
await fs.writeFile(
|
||||
sessionFile,
|
||||
[
|
||||
JSON.stringify({
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: "small answer",
|
||||
usage: { input: 40_000, output: 2_000 },
|
||||
},
|
||||
}),
|
||||
JSON.stringify({
|
||||
message: {
|
||||
role: "tool",
|
||||
content: `large interrupted tool output ${"x".repeat(450_000)}`,
|
||||
},
|
||||
}),
|
||||
].join("\n"),
|
||||
"utf8",
|
||||
);
|
||||
registerMemoryFlushPlanResolver(() => ({
|
||||
softThresholdTokens: 4_000,
|
||||
forceFlushTranscriptBytes: 1_000_000_000,
|
||||
reserveTokensFloor: 0,
|
||||
prompt: "Pre-compaction memory flush.\nNO_REPLY",
|
||||
systemPrompt: "Write memory to memory/YYYY-MM-DD.md.",
|
||||
relativePath: "memory/2023-11-14.md",
|
||||
}));
|
||||
const sessionEntry: SessionEntry = {
|
||||
sessionId: "session",
|
||||
sessionFile,
|
||||
updatedAt: Date.now(),
|
||||
totalTokensFresh: false,
|
||||
};
|
||||
|
||||
await runPreflightCompactionIfNeeded({
|
||||
cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } },
|
||||
followupRun: createTestFollowupRun({
|
||||
sessionId: "session",
|
||||
sessionFile,
|
||||
sessionKey: "main",
|
||||
}),
|
||||
defaultModel: "anthropic/claude-opus-4-6",
|
||||
agentCfgContextTokens: 100_000,
|
||||
sessionEntry,
|
||||
sessionStore: { main: sessionEntry },
|
||||
sessionKey: "main",
|
||||
storePath: path.join(rootDir, "sessions.json"),
|
||||
isHeartbeat: false,
|
||||
replyOperation: createReplyOperation(),
|
||||
});
|
||||
|
||||
const compactCall = compactEmbeddedPiSessionMock.mock.calls[0]?.[0] as {
|
||||
currentTokenCount?: number;
|
||||
};
|
||||
expect(compactCall.currentTokenCount).toBeGreaterThan(100_000);
|
||||
});
|
||||
|
||||
it("triggers preflight compaction when the active transcript exceeds the configured byte threshold", async () => {
|
||||
const sessionFile = path.join(rootDir, "large-session.jsonl");
|
||||
await fs.writeFile(
|
||||
|
||||
@@ -165,6 +165,7 @@ export type SessionTranscriptUsageSnapshot = {
|
||||
// transcript reads in time to flip memory-flush gating when needed.
|
||||
const TRANSCRIPT_OUTPUT_READ_BUFFER_TOKENS = 8192;
|
||||
const TRANSCRIPT_TAIL_CHUNK_BYTES = 64 * 1024;
|
||||
const FALLBACK_TRANSCRIPT_BYTES_PER_TOKEN = 4;
|
||||
|
||||
function parseUsageFromTranscriptLine(line: string): ReturnType<typeof normalizeUsage> | undefined {
|
||||
const trimmed = line.trim();
|
||||
@@ -341,20 +342,64 @@ async function readLastNonzeroUsageFromSessionLog(logPath: string) {
|
||||
}
|
||||
}
|
||||
|
||||
type TranscriptTokenEstimate = {
|
||||
promptTokens: number;
|
||||
outputTokens?: number;
|
||||
transcriptBytesTokens?: number;
|
||||
};
|
||||
|
||||
async function estimatePromptTokensFromSessionTranscript(params: {
|
||||
sessionId?: string;
|
||||
storePath?: string;
|
||||
sessionEntry?: SessionEntry;
|
||||
sessionKey?: string;
|
||||
sessionFile?: string;
|
||||
}): Promise<number | undefined> {
|
||||
storePath?: string;
|
||||
}): Promise<TranscriptTokenEstimate | undefined> {
|
||||
const sessionId = normalizeOptionalString(params.sessionId);
|
||||
if (!sessionId) {
|
||||
return undefined;
|
||||
}
|
||||
const fallbackSessionFile = normalizeOptionalString(params.sessionFile);
|
||||
const sessionEntryForTranscript =
|
||||
params.sessionEntry?.sessionFile || !fallbackSessionFile
|
||||
? params.sessionEntry
|
||||
: ({ ...params.sessionEntry, sessionFile: fallbackSessionFile } as SessionEntry);
|
||||
try {
|
||||
const snapshot = await readSessionLogSnapshot({
|
||||
sessionId,
|
||||
sessionEntry: sessionEntryForTranscript,
|
||||
sessionKey: params.sessionKey,
|
||||
opts: { storePath: params.storePath },
|
||||
includeByteSize: true,
|
||||
includeUsage: true,
|
||||
});
|
||||
const transcriptBytesTokens =
|
||||
typeof snapshot.byteSize === "number" &&
|
||||
Number.isFinite(snapshot.byteSize) &&
|
||||
snapshot.byteSize > 0
|
||||
? Math.ceil(snapshot.byteSize / FALLBACK_TRANSCRIPT_BYTES_PER_TOKEN)
|
||||
: undefined;
|
||||
const promptTokens = snapshot.usage?.promptTokens;
|
||||
if (typeof promptTokens === "number" && Number.isFinite(promptTokens) && promptTokens > 0) {
|
||||
const outputTokens = snapshot.usage?.outputTokens;
|
||||
return {
|
||||
promptTokens: Math.ceil(promptTokens),
|
||||
outputTokens:
|
||||
typeof outputTokens === "number" && Number.isFinite(outputTokens) && outputTokens > 0
|
||||
? Math.ceil(outputTokens)
|
||||
: undefined,
|
||||
transcriptBytesTokens,
|
||||
};
|
||||
}
|
||||
const messages = (await readSessionMessagesAsync(
|
||||
sessionId,
|
||||
params.storePath,
|
||||
params.sessionFile,
|
||||
sessionEntryForTranscript?.sessionFile,
|
||||
{
|
||||
mode: "recent",
|
||||
maxMessages: 200,
|
||||
maxBytes: 1024 * 1024,
|
||||
},
|
||||
)) as AgentMessage[];
|
||||
if (messages.length === 0) {
|
||||
return undefined;
|
||||
@@ -363,7 +408,10 @@ async function estimatePromptTokensFromSessionTranscript(params: {
|
||||
if (!Number.isFinite(estimatedTokens) || estimatedTokens <= 0) {
|
||||
return undefined;
|
||||
}
|
||||
return Math.ceil(estimatedTokens);
|
||||
return {
|
||||
promptTokens: Math.ceil(estimatedTokens),
|
||||
transcriptBytesTokens,
|
||||
};
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
@@ -422,7 +470,10 @@ export async function runPreflightCompactionIfNeeded(params: {
|
||||
const transcriptSizeSnapshot = shouldCheckActiveTranscriptBytes
|
||||
? await readSessionLogSnapshot({
|
||||
sessionId: entry.sessionId,
|
||||
sessionEntry: entry,
|
||||
sessionEntry:
|
||||
entry.sessionFile || !params.followupRun.run.sessionFile
|
||||
? entry
|
||||
: { ...entry, sessionFile: params.followupRun.run.sessionFile },
|
||||
sessionKey: params.sessionKey ?? params.followupRun.run.sessionKey,
|
||||
opts: { storePath: params.storePath },
|
||||
includeByteSize: true,
|
||||
@@ -441,22 +492,44 @@ export async function runPreflightCompactionIfNeeded(params: {
|
||||
const promptTokenEstimate = estimatePromptTokensForMemoryFlush(
|
||||
params.promptForEstimate ?? params.followupRun.prompt,
|
||||
);
|
||||
const transcriptPromptTokens =
|
||||
const transcriptUsageTokens =
|
||||
typeof freshPersistedTokens === "number"
|
||||
? undefined
|
||||
: await estimatePromptTokensFromSessionTranscript({
|
||||
sessionId: entry.sessionId,
|
||||
storePath: params.storePath,
|
||||
sessionEntry: entry,
|
||||
sessionKey: params.sessionKey ?? params.followupRun.run.sessionKey,
|
||||
sessionFile: entry.sessionFile ?? params.followupRun.run.sessionFile,
|
||||
storePath: params.storePath,
|
||||
});
|
||||
const projectedTokenCount =
|
||||
typeof transcriptPromptTokens === "number"
|
||||
? resolveEffectivePromptTokens(transcriptPromptTokens, undefined, promptTokenEstimate)
|
||||
const stalePersistedPromptTokens = hasPersistedTotalTokens
|
||||
? Math.floor(persistedTotalTokens)
|
||||
: undefined;
|
||||
const transcriptPromptTokens = transcriptUsageTokens?.promptTokens;
|
||||
const transcriptOutputTokens = transcriptUsageTokens?.outputTokens;
|
||||
const transcriptBytesProjectedTokens =
|
||||
typeof transcriptUsageTokens?.transcriptBytesTokens === "number"
|
||||
? resolveEffectivePromptTokens(
|
||||
transcriptUsageTokens.transcriptBytesTokens,
|
||||
undefined,
|
||||
promptTokenEstimate,
|
||||
)
|
||||
: undefined;
|
||||
const usageProjectedTokenCount =
|
||||
typeof transcriptPromptTokens === "number"
|
||||
? resolveEffectivePromptTokens(
|
||||
transcriptPromptTokens,
|
||||
transcriptOutputTokens,
|
||||
promptTokenEstimate,
|
||||
)
|
||||
: undefined;
|
||||
const projectedTokenCount = Math.max(
|
||||
usageProjectedTokenCount ?? 0,
|
||||
transcriptBytesProjectedTokens ?? 0,
|
||||
stalePersistedPromptTokens ?? 0,
|
||||
);
|
||||
const tokenCountForCompaction =
|
||||
typeof projectedTokenCount === "number" &&
|
||||
Number.isFinite(projectedTokenCount) &&
|
||||
projectedTokenCount > 0
|
||||
Number.isFinite(projectedTokenCount) && projectedTokenCount > 0
|
||||
? projectedTokenCount
|
||||
: undefined;
|
||||
|
||||
|
||||
@@ -21,7 +21,7 @@ afterEach(async () => {
|
||||
});
|
||||
|
||||
describe("resolveParentForkTokenCountRuntime", () => {
|
||||
it("falls back to transcript-estimated tokens when cached totals are stale", async () => {
|
||||
it("falls back to recent transcript usage when cached totals are stale", async () => {
|
||||
const root = await makeRoot("openclaw-parent-fork-token-estimate-");
|
||||
const sessionsDir = path.join(root, "sessions");
|
||||
await fs.mkdir(sessionsDir);
|
||||
@@ -38,7 +38,7 @@ describe("resolveParentForkTokenCountRuntime", () => {
|
||||
}),
|
||||
];
|
||||
for (let index = 0; index < 40; index += 1) {
|
||||
const body = `turn-${index} ${"x".repeat(12_000)}`;
|
||||
const body = `turn-${index} ${"x".repeat(200)}`;
|
||||
lines.push(
|
||||
JSON.stringify({
|
||||
type: "message",
|
||||
@@ -52,7 +52,11 @@ describe("resolveParentForkTokenCountRuntime", () => {
|
||||
id: `a${index}`,
|
||||
parentId: `u${index}`,
|
||||
timestamp: new Date().toISOString(),
|
||||
message: { role: "assistant", content: body },
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: body,
|
||||
usage: index === 39 ? { input: 90_000, output: 20_000 } : undefined,
|
||||
},
|
||||
}),
|
||||
);
|
||||
}
|
||||
@@ -71,6 +75,149 @@ describe("resolveParentForkTokenCountRuntime", () => {
|
||||
storePath: path.join(root, "sessions.json"),
|
||||
});
|
||||
|
||||
expect(tokens).toBe(110_000);
|
||||
});
|
||||
|
||||
it("falls back to a conservative byte estimate when stale parent transcript has no usage", async () => {
|
||||
const root = await makeRoot("openclaw-parent-fork-byte-estimate-");
|
||||
const sessionsDir = path.join(root, "sessions");
|
||||
await fs.mkdir(sessionsDir);
|
||||
|
||||
const sessionId = "parent-no-usage-transcript";
|
||||
const sessionFile = path.join(sessionsDir, "parent.jsonl");
|
||||
const lines = [
|
||||
JSON.stringify({
|
||||
type: "session",
|
||||
version: 3,
|
||||
id: sessionId,
|
||||
timestamp: new Date().toISOString(),
|
||||
cwd: process.cwd(),
|
||||
}),
|
||||
];
|
||||
for (let index = 0; index < 24; index += 1) {
|
||||
lines.push(
|
||||
JSON.stringify({
|
||||
type: "message",
|
||||
id: `u${index}`,
|
||||
parentId: index === 0 ? null : `a${index - 1}`,
|
||||
timestamp: new Date().toISOString(),
|
||||
message: { role: "user", content: `turn-${index} ${"x".repeat(24_000)}` },
|
||||
}),
|
||||
);
|
||||
}
|
||||
await fs.writeFile(sessionFile, `${lines.join("\n")}\n`, "utf-8");
|
||||
|
||||
const entry: SessionEntry = {
|
||||
sessionId,
|
||||
sessionFile,
|
||||
updatedAt: Date.now(),
|
||||
totalTokensFresh: false,
|
||||
};
|
||||
|
||||
const tokens = await resolveParentForkTokenCountRuntime({
|
||||
parentEntry: entry,
|
||||
storePath: path.join(root, "sessions.json"),
|
||||
});
|
||||
|
||||
expect(tokens).toBeGreaterThan(100_000);
|
||||
});
|
||||
|
||||
it("uses the latest usage snapshot instead of tail aggregates for parent fork checks", async () => {
|
||||
const root = await makeRoot("openclaw-parent-fork-latest-usage-");
|
||||
const sessionsDir = path.join(root, "sessions");
|
||||
await fs.mkdir(sessionsDir);
|
||||
|
||||
const sessionId = "parent-multiple-usage-transcript";
|
||||
const sessionFile = path.join(sessionsDir, "parent.jsonl");
|
||||
await fs.writeFile(
|
||||
sessionFile,
|
||||
[
|
||||
JSON.stringify({
|
||||
type: "session",
|
||||
version: 3,
|
||||
id: sessionId,
|
||||
timestamp: new Date().toISOString(),
|
||||
cwd: process.cwd(),
|
||||
}),
|
||||
JSON.stringify({
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: "older",
|
||||
usage: { input: 60_000, output: 5_000 },
|
||||
},
|
||||
}),
|
||||
JSON.stringify({
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: "latest",
|
||||
usage: { input: 70_000, output: 8_000 },
|
||||
},
|
||||
}),
|
||||
].join("\n"),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
const entry: SessionEntry = {
|
||||
sessionId,
|
||||
sessionFile,
|
||||
updatedAt: Date.now(),
|
||||
totalTokensFresh: false,
|
||||
};
|
||||
|
||||
const tokens = await resolveParentForkTokenCountRuntime({
|
||||
parentEntry: entry,
|
||||
storePath: path.join(root, "sessions.json"),
|
||||
});
|
||||
|
||||
expect(tokens).toBe(78_000);
|
||||
});
|
||||
|
||||
it("keeps parent fork checks conservative for content appended after latest usage", async () => {
|
||||
const root = await makeRoot("openclaw-parent-fork-post-usage-tail-");
|
||||
const sessionsDir = path.join(root, "sessions");
|
||||
await fs.mkdir(sessionsDir);
|
||||
|
||||
const sessionId = "parent-post-usage-tail";
|
||||
const sessionFile = path.join(sessionsDir, "parent.jsonl");
|
||||
await fs.writeFile(
|
||||
sessionFile,
|
||||
[
|
||||
JSON.stringify({
|
||||
type: "session",
|
||||
version: 3,
|
||||
id: sessionId,
|
||||
timestamp: new Date().toISOString(),
|
||||
cwd: process.cwd(),
|
||||
}),
|
||||
JSON.stringify({
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: "latest model call",
|
||||
usage: { input: 40_000, output: 2_000 },
|
||||
},
|
||||
}),
|
||||
JSON.stringify({
|
||||
message: {
|
||||
role: "tool",
|
||||
content: `large appended tool result ${"x".repeat(450_000)}`,
|
||||
},
|
||||
}),
|
||||
].join("\n"),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
const entry: SessionEntry = {
|
||||
sessionId,
|
||||
sessionFile,
|
||||
updatedAt: Date.now(),
|
||||
totalTokensFresh: false,
|
||||
};
|
||||
|
||||
const tokens = await resolveParentForkTokenCountRuntime({
|
||||
parentEntry: entry,
|
||||
storePath: path.join(root, "sessions.json"),
|
||||
});
|
||||
|
||||
expect(tokens).toBeGreaterThan(100_000);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import crypto from "node:crypto";
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import type { AgentMessage } from "@mariozechner/pi-agent-core";
|
||||
import {
|
||||
CURRENT_SESSION_VERSION,
|
||||
migrateSessionEntries,
|
||||
@@ -10,13 +9,16 @@ import {
|
||||
type SessionEntry as PiSessionEntry,
|
||||
type SessionHeader,
|
||||
} from "@mariozechner/pi-coding-agent";
|
||||
import { estimateMessagesTokens } from "../../agents/compaction.js";
|
||||
import { resolveSessionFilePath } from "../../config/sessions/paths.js";
|
||||
import { derivePromptTokens } from "../../agents/usage.js";
|
||||
import {
|
||||
resolveSessionFilePath,
|
||||
resolveSessionFilePathOptions,
|
||||
} from "../../config/sessions/paths.js";
|
||||
import {
|
||||
resolveFreshSessionTotalTokens,
|
||||
type SessionEntry as StoreSessionEntry,
|
||||
} from "../../config/sessions/types.js";
|
||||
import { readSessionMessagesAsync } from "../../gateway/session-utils.fs.js";
|
||||
import { readLatestRecentSessionUsageFromTranscriptAsync } from "../../gateway/session-utils.fs.js";
|
||||
|
||||
type ForkSourceTranscript = {
|
||||
cwd: string;
|
||||
@@ -26,12 +28,42 @@ type ForkSourceTranscript = {
|
||||
labelsToWrite: Array<{ targetId: string; label: string; timestamp: string }>;
|
||||
};
|
||||
|
||||
const FALLBACK_TRANSCRIPT_BYTES_PER_TOKEN = 4;
|
||||
|
||||
function resolvePositiveTokenCount(value: number | undefined): number | undefined {
|
||||
return typeof value === "number" && Number.isFinite(value) && value > 0
|
||||
? Math.floor(value)
|
||||
: undefined;
|
||||
}
|
||||
|
||||
function maxPositiveTokenCount(...values: Array<number | undefined>): number | undefined {
|
||||
let max: number | undefined;
|
||||
for (const value of values) {
|
||||
const normalized = resolvePositiveTokenCount(value);
|
||||
if (typeof normalized === "number" && (max === undefined || normalized > max)) {
|
||||
max = normalized;
|
||||
}
|
||||
}
|
||||
return max;
|
||||
}
|
||||
|
||||
async function estimateParentTranscriptTokensFromBytes(params: {
|
||||
parentEntry: StoreSessionEntry;
|
||||
storePath: string;
|
||||
}): Promise<number | undefined> {
|
||||
try {
|
||||
const filePath = resolveSessionFilePath(
|
||||
params.parentEntry.sessionId,
|
||||
params.parentEntry,
|
||||
resolveSessionFilePathOptions({ storePath: params.storePath }),
|
||||
);
|
||||
const stat = await fs.stat(filePath);
|
||||
return resolvePositiveTokenCount(Math.ceil(stat.size / FALLBACK_TRANSCRIPT_BYTES_PER_TOKEN));
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
export async function resolveParentForkTokenCountRuntime(params: {
|
||||
parentEntry: StoreSessionEntry;
|
||||
storePath: string;
|
||||
@@ -41,26 +73,36 @@ export async function resolveParentForkTokenCountRuntime(params: {
|
||||
return freshPersistedTokens;
|
||||
}
|
||||
|
||||
const cachedTokens = resolvePositiveTokenCount(params.parentEntry.totalTokens);
|
||||
const byteEstimateTokens = await estimateParentTranscriptTokensFromBytes(params);
|
||||
try {
|
||||
const transcriptMessages = (await readSessionMessagesAsync(
|
||||
const usage = await readLatestRecentSessionUsageFromTranscriptAsync(
|
||||
params.parentEntry.sessionId,
|
||||
params.storePath,
|
||||
params.parentEntry.sessionFile,
|
||||
)) as AgentMessage[];
|
||||
if (transcriptMessages.length > 0) {
|
||||
const estimatedTokens = estimateMessagesTokens(transcriptMessages);
|
||||
const transcriptTokens = resolvePositiveTokenCount(
|
||||
Number.isFinite(estimatedTokens) ? Math.ceil(estimatedTokens) : undefined,
|
||||
undefined,
|
||||
1024 * 1024,
|
||||
);
|
||||
const promptTokens = resolvePositiveTokenCount(
|
||||
derivePromptTokens({
|
||||
input: usage?.inputTokens,
|
||||
cacheRead: usage?.cacheRead,
|
||||
cacheWrite: usage?.cacheWrite,
|
||||
}),
|
||||
);
|
||||
const outputTokens = resolvePositiveTokenCount(usage?.outputTokens);
|
||||
if (typeof promptTokens === "number") {
|
||||
return maxPositiveTokenCount(
|
||||
promptTokens + (outputTokens ?? 0),
|
||||
cachedTokens,
|
||||
byteEstimateTokens,
|
||||
);
|
||||
if (typeof transcriptTokens === "number") {
|
||||
return transcriptTokens;
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Fall back to cached totals when the parent transcript cannot be read.
|
||||
// Fall back to cached totals when recent transcript usage cannot be read.
|
||||
}
|
||||
|
||||
return resolvePositiveTokenCount(params.parentEntry.totalTokens);
|
||||
return maxPositiveTokenCount(cachedTokens, byteEstimateTokens);
|
||||
}
|
||||
|
||||
function isSessionEntry(entry: FileEntry): entry is PiSessionEntry {
|
||||
|
||||
@@ -1163,7 +1163,10 @@ async function readSessionAssistantTexts(sessionKey: string, modelKey?: string):
|
||||
if (!entry?.sessionId) {
|
||||
return [];
|
||||
}
|
||||
const messages = await readSessionMessagesAsync(entry.sessionId, storePath, entry.sessionFile);
|
||||
const messages = await readSessionMessagesAsync(entry.sessionId, storePath, entry.sessionFile, {
|
||||
mode: "full",
|
||||
reason: "live model assistant text verification",
|
||||
});
|
||||
const assistantTexts: string[] = [];
|
||||
for (const message of messages) {
|
||||
if (!message || typeof message !== "object") {
|
||||
|
||||
@@ -717,7 +717,10 @@ async function getSessionManagedOutgoingAttachmentIndex(
|
||||
}
|
||||
}
|
||||
|
||||
const messages = await readSessionMessagesAsync(sessionId, storePath, entry.sessionFile);
|
||||
const messages = await readSessionMessagesAsync(sessionId, storePath, entry.sessionFile, {
|
||||
mode: "full",
|
||||
reason: "managed outgoing attachment index",
|
||||
});
|
||||
const index: SessionManagedOutgoingAttachmentIndex = new Set();
|
||||
for (const message of messages) {
|
||||
const meta = (message as { __openclaw?: { id?: string } } | null)?.__openclaw;
|
||||
|
||||
@@ -313,16 +313,25 @@ async function loadArtifacts(
|
||||
return { sessionKey, artifacts: [] };
|
||||
}
|
||||
const artifacts: ArtifactRecord[] = [];
|
||||
await visitSessionMessagesAsync(sessionId, storePath, entry?.sessionFile, (message, seq) => {
|
||||
collectArtifactsFromMessage({
|
||||
message,
|
||||
messageFallbackSeq: seq,
|
||||
artifacts,
|
||||
sessionKey,
|
||||
runId: query.runId,
|
||||
taskId: query.taskId,
|
||||
});
|
||||
});
|
||||
await visitSessionMessagesAsync(
|
||||
sessionId,
|
||||
storePath,
|
||||
entry?.sessionFile,
|
||||
(message, seq) => {
|
||||
collectArtifactsFromMessage({
|
||||
message,
|
||||
messageFallbackSeq: seq,
|
||||
artifacts,
|
||||
sessionKey,
|
||||
runId: query.runId,
|
||||
taskId: query.taskId,
|
||||
});
|
||||
},
|
||||
{
|
||||
mode: "full",
|
||||
reason: "artifact query transcript scan",
|
||||
},
|
||||
);
|
||||
return {
|
||||
sessionKey,
|
||||
artifacts,
|
||||
|
||||
@@ -288,6 +288,10 @@ export class SessionHistorySseState {
|
||||
this.target.sessionId,
|
||||
this.target.storePath,
|
||||
this.target.sessionFile,
|
||||
{
|
||||
mode: "full",
|
||||
reason: "session history cursor pagination",
|
||||
},
|
||||
),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -454,7 +454,10 @@ async function emitGatewayBeforeResetPluginHook(params: {
|
||||
let messages: unknown[] = [];
|
||||
try {
|
||||
if (typeof sessionId === "string" && sessionId.trim().length > 0) {
|
||||
messages = await readSessionMessagesAsync(sessionId, params.storePath, sessionFile);
|
||||
messages = await readSessionMessagesAsync(sessionId, params.storePath, sessionFile, {
|
||||
mode: "full",
|
||||
reason: "before_reset hook payload",
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
logVerbose(
|
||||
|
||||
@@ -38,6 +38,14 @@ type CacheEntry = {
|
||||
};
|
||||
|
||||
const transcriptIndexCache = new Map<string, CacheEntry>();
|
||||
const transcriptIndexBuilds = new Map<
|
||||
string,
|
||||
{
|
||||
mtimeMs: number;
|
||||
size: number;
|
||||
promise: Promise<SessionTranscriptIndex>;
|
||||
}
|
||||
>();
|
||||
|
||||
function normalizeOptionalString(value: unknown): string | undefined {
|
||||
return typeof value === "string" && value.trim().length > 0 ? value : undefined;
|
||||
@@ -66,6 +74,7 @@ function setCachedIndex(filePath: string, entry: CacheEntry): void {
|
||||
|
||||
export function clearSessionTranscriptIndexCache(): void {
|
||||
transcriptIndexCache.clear();
|
||||
transcriptIndexBuilds.clear();
|
||||
}
|
||||
|
||||
function isIndexableTranscriptRecord(record: unknown): record is ParsedTranscriptRecord {
|
||||
@@ -233,7 +242,22 @@ export async function readSessionTranscriptIndex(
|
||||
if (cached && cached.mtimeMs === stat.mtimeMs && cached.size === stat.size) {
|
||||
return touchCachedIndex(filePath, cached);
|
||||
}
|
||||
const index = await buildSessionTranscriptIndex(filePath, stat);
|
||||
const inFlight = transcriptIndexBuilds.get(filePath);
|
||||
if (inFlight && inFlight.mtimeMs === stat.mtimeMs && inFlight.size === stat.size) {
|
||||
return await inFlight.promise;
|
||||
}
|
||||
const promise = buildSessionTranscriptIndex(filePath, stat);
|
||||
transcriptIndexBuilds.set(filePath, {
|
||||
mtimeMs: stat.mtimeMs,
|
||||
size: stat.size,
|
||||
promise,
|
||||
});
|
||||
const index = await promise.finally(() => {
|
||||
const current = transcriptIndexBuilds.get(filePath);
|
||||
if (current?.promise === promise) {
|
||||
transcriptIndexBuilds.delete(filePath);
|
||||
}
|
||||
});
|
||||
setCachedIndex(filePath, {
|
||||
mtimeMs: stat.mtimeMs,
|
||||
size: stat.size,
|
||||
|
||||
@@ -11,6 +11,8 @@ import {
|
||||
readLastMessagePreviewFromTranscript,
|
||||
readLatestSessionUsageFromTranscript,
|
||||
readLatestSessionUsageFromTranscriptAsync,
|
||||
readLatestRecentSessionUsageFromTranscriptAsync,
|
||||
readRecentSessionUsageFromTranscriptAsync,
|
||||
readRecentSessionUsageFromTranscript,
|
||||
readRecentSessionMessagesAsync,
|
||||
readRecentSessionMessages,
|
||||
@@ -786,7 +788,10 @@ describe("readSessionMessages", () => {
|
||||
const readFileSpy = vi.spyOn(fs, "readFileSync");
|
||||
|
||||
try {
|
||||
const messages = await readSessionMessagesAsync(sessionId, storePath);
|
||||
const messages = await readSessionMessagesAsync(sessionId, storePath, undefined, {
|
||||
mode: "full",
|
||||
reason: "test active branch selection",
|
||||
});
|
||||
expect(messages.map((message) => (message as { content?: unknown }).content)).toEqual([
|
||||
"root",
|
||||
"active branch",
|
||||
@@ -822,6 +827,104 @@ describe("readSessionMessages", () => {
|
||||
}
|
||||
});
|
||||
|
||||
test("shares concurrent async transcript index builds", async () => {
|
||||
const sessionId = "test-session-index-cache-concurrent";
|
||||
writeTranscript(tmpDir, sessionId, [
|
||||
{ type: "session", version: 1, id: sessionId },
|
||||
{ message: { role: "user", content: "hello" } },
|
||||
{ message: { role: "assistant", content: "hi" } },
|
||||
]);
|
||||
clearSessionTranscriptIndexCache();
|
||||
|
||||
const openSpy = vi.spyOn(fs.promises, "open");
|
||||
try {
|
||||
await expect(
|
||||
Promise.all(
|
||||
Array.from({ length: 8 }, () => readSessionMessageCountAsync(sessionId, storePath)),
|
||||
),
|
||||
).resolves.toEqual(Array.from({ length: 8 }, () => 2));
|
||||
expect(openSpy).toHaveBeenCalledTimes(1);
|
||||
} finally {
|
||||
openSpy.mockRestore();
|
||||
}
|
||||
});
|
||||
|
||||
test("readSessionMessagesAsync recent mode honors byte caps", async () => {
|
||||
const sessionId = "test-session-async-recent-mode";
|
||||
writeTranscript(tmpDir, sessionId, [
|
||||
{ type: "session", version: 1, id: sessionId },
|
||||
{ message: { role: "user", content: "older" } },
|
||||
{ message: { role: "assistant", content: "x".repeat(32 * 1024) } },
|
||||
{ message: { role: "user", content: "latest" } },
|
||||
]);
|
||||
clearSessionTranscriptIndexCache();
|
||||
const openSpy = vi.spyOn(fs.promises, "open");
|
||||
|
||||
try {
|
||||
const messages = await readSessionMessagesAsync(sessionId, storePath, undefined, {
|
||||
mode: "recent",
|
||||
maxMessages: 1,
|
||||
maxBytes: 2048,
|
||||
});
|
||||
expect(messages).toHaveLength(1);
|
||||
expect(messages[0]).toMatchObject({ role: "user", content: "latest" });
|
||||
expect(JSON.stringify(messages)).not.toContain("older");
|
||||
expect(openSpy).toHaveBeenCalledTimes(1);
|
||||
} finally {
|
||||
openSpy.mockRestore();
|
||||
}
|
||||
});
|
||||
|
||||
test("reads recent session usage asynchronously from the transcript tail", async () => {
|
||||
const sessionId = "test-session-async-recent-usage";
|
||||
writeTranscript(tmpDir, sessionId, [
|
||||
{ type: "session", version: 1, id: sessionId },
|
||||
{ message: { role: "assistant", content: "older", usage: { input: 10, output: 1 } } },
|
||||
{ message: { role: "assistant", content: "x".repeat(32 * 1024) } },
|
||||
{ message: { role: "assistant", content: "latest", usage: { input: 42, output: 7 } } },
|
||||
]);
|
||||
|
||||
const usage = await readRecentSessionUsageFromTranscriptAsync(
|
||||
sessionId,
|
||||
storePath,
|
||||
undefined,
|
||||
undefined,
|
||||
2048,
|
||||
);
|
||||
|
||||
expect(usage).toMatchObject({
|
||||
inputTokens: 42,
|
||||
outputTokens: 7,
|
||||
});
|
||||
});
|
||||
|
||||
test("reads latest recent session usage separately from tail aggregates", async () => {
|
||||
const sessionId = "test-session-async-latest-recent-usage";
|
||||
writeTranscript(tmpDir, sessionId, [
|
||||
{ type: "session", version: 1, id: sessionId },
|
||||
{ message: { role: "assistant", content: "older", usage: { input: 50, output: 5 } } },
|
||||
{ message: { role: "assistant", content: "latest", usage: { input: 70, output: 9 } } },
|
||||
]);
|
||||
|
||||
const aggregate = await readRecentSessionUsageFromTranscriptAsync(
|
||||
sessionId,
|
||||
storePath,
|
||||
undefined,
|
||||
undefined,
|
||||
2048,
|
||||
);
|
||||
const latest = await readLatestRecentSessionUsageFromTranscriptAsync(
|
||||
sessionId,
|
||||
storePath,
|
||||
undefined,
|
||||
undefined,
|
||||
2048,
|
||||
);
|
||||
|
||||
expect(aggregate).toMatchObject({ inputTokens: 120, outputTokens: 14 });
|
||||
expect(latest).toMatchObject({ inputTokens: 70, outputTokens: 9 });
|
||||
});
|
||||
|
||||
test("tails transcript lines for manual compaction without loading the whole file", () => {
|
||||
const sessionId = "test-session-line-tail";
|
||||
const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`);
|
||||
|
||||
@@ -153,12 +153,21 @@ export function readSessionMessages(
|
||||
return transcriptRecordsToMessages(readSelectedTranscriptRecords(filePath));
|
||||
}
|
||||
|
||||
type ReadRecentSessionMessagesOptions = {
|
||||
export type ReadRecentSessionMessagesOptions = {
|
||||
maxMessages: number;
|
||||
maxBytes?: number;
|
||||
maxLines?: number;
|
||||
};
|
||||
|
||||
export type ReadSessionMessagesAsyncOptions =
|
||||
| {
|
||||
mode: "full";
|
||||
reason: string;
|
||||
}
|
||||
| ({
|
||||
mode: "recent";
|
||||
} & ReadRecentSessionMessagesOptions);
|
||||
|
||||
type ReadRecentSessionMessagesResult = {
|
||||
messages: unknown[];
|
||||
totalMessages: number;
|
||||
@@ -472,8 +481,13 @@ export function readSessionMessageCount(
|
||||
export async function readSessionMessagesAsync(
|
||||
sessionId: string,
|
||||
storePath: string | undefined,
|
||||
sessionFile?: string,
|
||||
sessionFile: string | undefined,
|
||||
opts: ReadSessionMessagesAsyncOptions,
|
||||
): Promise<unknown[]> {
|
||||
if (opts.mode === "recent") {
|
||||
const { mode: _mode, ...recentOpts } = opts;
|
||||
return await readRecentSessionMessagesAsync(sessionId, storePath, sessionFile, recentOpts);
|
||||
}
|
||||
const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile);
|
||||
if (!filePath) {
|
||||
return [];
|
||||
@@ -487,6 +501,7 @@ export async function visitSessionMessagesAsync(
|
||||
storePath: string | undefined,
|
||||
sessionFile: string | undefined,
|
||||
visit: (message: unknown, seq: number) => void,
|
||||
_opts: { mode: "full"; reason: string },
|
||||
): Promise<number> {
|
||||
const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile);
|
||||
if (!filePath) {
|
||||
@@ -1027,7 +1042,91 @@ function resolvePositiveUsageNumber(value: unknown): number | undefined {
|
||||
return typeof value === "number" && Number.isFinite(value) && value > 0 ? value : undefined;
|
||||
}
|
||||
|
||||
function extractLatestUsageFromTranscriptLines(
|
||||
function extractUsageSnapshotFromTranscriptLine(
|
||||
line: string,
|
||||
): SessionTranscriptUsageSnapshot | null {
|
||||
try {
|
||||
const parsed = JSON.parse(line) as Record<string, unknown>;
|
||||
const message =
|
||||
parsed.message && typeof parsed.message === "object" && !Array.isArray(parsed.message)
|
||||
? (parsed.message as Record<string, unknown>)
|
||||
: undefined;
|
||||
if (!message) {
|
||||
return null;
|
||||
}
|
||||
const role = typeof message.role === "string" ? message.role : undefined;
|
||||
if (role && role !== "assistant") {
|
||||
return null;
|
||||
}
|
||||
const usageRaw =
|
||||
message.usage && typeof message.usage === "object" && !Array.isArray(message.usage)
|
||||
? message.usage
|
||||
: parsed.usage && typeof parsed.usage === "object" && !Array.isArray(parsed.usage)
|
||||
? parsed.usage
|
||||
: undefined;
|
||||
const usage = normalizeUsage(usageRaw);
|
||||
const totalTokens = resolvePositiveUsageNumber(deriveSessionTotalTokens({ usage }));
|
||||
const costUsd = extractTranscriptUsageCost(usageRaw);
|
||||
const modelProvider =
|
||||
typeof message.provider === "string"
|
||||
? message.provider.trim()
|
||||
: typeof parsed.provider === "string"
|
||||
? parsed.provider.trim()
|
||||
: undefined;
|
||||
const model =
|
||||
typeof message.model === "string"
|
||||
? message.model.trim()
|
||||
: typeof parsed.model === "string"
|
||||
? parsed.model.trim()
|
||||
: undefined;
|
||||
const isDeliveryMirror = modelProvider === "openclaw" && model === "delivery-mirror";
|
||||
const hasMeaningfulUsage =
|
||||
hasNonzeroUsage(usage) ||
|
||||
typeof totalTokens === "number" ||
|
||||
(typeof costUsd === "number" && Number.isFinite(costUsd));
|
||||
const hasModelIdentity = Boolean(modelProvider || model);
|
||||
if (!hasMeaningfulUsage && !hasModelIdentity) {
|
||||
return null;
|
||||
}
|
||||
if (isDeliveryMirror && !hasMeaningfulUsage) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const snapshot: SessionTranscriptUsageSnapshot = {};
|
||||
if (!isDeliveryMirror) {
|
||||
if (modelProvider) {
|
||||
snapshot.modelProvider = modelProvider;
|
||||
}
|
||||
if (model) {
|
||||
snapshot.model = model;
|
||||
}
|
||||
}
|
||||
if (typeof usage?.input === "number" && Number.isFinite(usage.input)) {
|
||||
snapshot.inputTokens = usage.input;
|
||||
}
|
||||
if (typeof usage?.output === "number" && Number.isFinite(usage.output)) {
|
||||
snapshot.outputTokens = usage.output;
|
||||
}
|
||||
if (typeof usage?.cacheRead === "number" && Number.isFinite(usage.cacheRead)) {
|
||||
snapshot.cacheRead = usage.cacheRead;
|
||||
}
|
||||
if (typeof usage?.cacheWrite === "number" && Number.isFinite(usage.cacheWrite)) {
|
||||
snapshot.cacheWrite = usage.cacheWrite;
|
||||
}
|
||||
if (typeof totalTokens === "number") {
|
||||
snapshot.totalTokens = totalTokens;
|
||||
snapshot.totalTokensFresh = true;
|
||||
}
|
||||
if (typeof costUsd === "number" && Number.isFinite(costUsd)) {
|
||||
snapshot.costUsd = costUsd;
|
||||
}
|
||||
return snapshot;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function extractAggregateUsageFromTranscriptLines(
|
||||
lines: Iterable<string>,
|
||||
): SessionTranscriptUsageSnapshot | null {
|
||||
const snapshot: SessionTranscriptUsageSnapshot = {};
|
||||
@@ -1044,88 +1143,40 @@ function extractLatestUsageFromTranscriptLines(
|
||||
let sawCost = false;
|
||||
|
||||
for (const line of lines) {
|
||||
try {
|
||||
const parsed = JSON.parse(line) as Record<string, unknown>;
|
||||
const message =
|
||||
parsed.message && typeof parsed.message === "object" && !Array.isArray(parsed.message)
|
||||
? (parsed.message as Record<string, unknown>)
|
||||
: undefined;
|
||||
if (!message) {
|
||||
continue;
|
||||
}
|
||||
const role = typeof message.role === "string" ? message.role : undefined;
|
||||
if (role && role !== "assistant") {
|
||||
continue;
|
||||
}
|
||||
const usageRaw =
|
||||
message.usage && typeof message.usage === "object" && !Array.isArray(message.usage)
|
||||
? message.usage
|
||||
: parsed.usage && typeof parsed.usage === "object" && !Array.isArray(parsed.usage)
|
||||
? parsed.usage
|
||||
: undefined;
|
||||
const usage = normalizeUsage(usageRaw);
|
||||
const totalTokens = resolvePositiveUsageNumber(deriveSessionTotalTokens({ usage }));
|
||||
const costUsd = extractTranscriptUsageCost(usageRaw);
|
||||
const modelProvider =
|
||||
typeof message.provider === "string"
|
||||
? message.provider.trim()
|
||||
: typeof parsed.provider === "string"
|
||||
? parsed.provider.trim()
|
||||
: undefined;
|
||||
const model =
|
||||
typeof message.model === "string"
|
||||
? message.model.trim()
|
||||
: typeof parsed.model === "string"
|
||||
? parsed.model.trim()
|
||||
: undefined;
|
||||
const isDeliveryMirror = modelProvider === "openclaw" && model === "delivery-mirror";
|
||||
const hasMeaningfulUsage =
|
||||
hasNonzeroUsage(usage) ||
|
||||
typeof totalTokens === "number" ||
|
||||
(typeof costUsd === "number" && Number.isFinite(costUsd));
|
||||
const hasModelIdentity = Boolean(modelProvider || model);
|
||||
if (!hasMeaningfulUsage && !hasModelIdentity) {
|
||||
continue;
|
||||
}
|
||||
if (isDeliveryMirror && !hasMeaningfulUsage) {
|
||||
continue;
|
||||
}
|
||||
|
||||
sawSnapshot = true;
|
||||
if (!isDeliveryMirror) {
|
||||
if (modelProvider) {
|
||||
snapshot.modelProvider = modelProvider;
|
||||
}
|
||||
if (model) {
|
||||
snapshot.model = model;
|
||||
}
|
||||
}
|
||||
if (typeof usage?.input === "number" && Number.isFinite(usage.input)) {
|
||||
inputTokens += usage.input;
|
||||
sawInputTokens = true;
|
||||
}
|
||||
if (typeof usage?.output === "number" && Number.isFinite(usage.output)) {
|
||||
outputTokens += usage.output;
|
||||
sawOutputTokens = true;
|
||||
}
|
||||
if (typeof usage?.cacheRead === "number" && Number.isFinite(usage.cacheRead)) {
|
||||
cacheRead += usage.cacheRead;
|
||||
sawCacheRead = true;
|
||||
}
|
||||
if (typeof usage?.cacheWrite === "number" && Number.isFinite(usage.cacheWrite)) {
|
||||
cacheWrite += usage.cacheWrite;
|
||||
sawCacheWrite = true;
|
||||
}
|
||||
if (typeof totalTokens === "number") {
|
||||
snapshot.totalTokens = totalTokens;
|
||||
snapshot.totalTokensFresh = true;
|
||||
}
|
||||
if (typeof costUsd === "number" && Number.isFinite(costUsd)) {
|
||||
costUsdTotal += costUsd;
|
||||
sawCost = true;
|
||||
}
|
||||
} catch {
|
||||
// skip malformed lines
|
||||
const current = extractUsageSnapshotFromTranscriptLine(line);
|
||||
if (!current) {
|
||||
continue;
|
||||
}
|
||||
sawSnapshot = true;
|
||||
if (current.modelProvider) {
|
||||
snapshot.modelProvider = current.modelProvider;
|
||||
}
|
||||
if (current.model) {
|
||||
snapshot.model = current.model;
|
||||
}
|
||||
if (typeof current.inputTokens === "number") {
|
||||
inputTokens += current.inputTokens;
|
||||
sawInputTokens = true;
|
||||
}
|
||||
if (typeof current.outputTokens === "number") {
|
||||
outputTokens += current.outputTokens;
|
||||
sawOutputTokens = true;
|
||||
}
|
||||
if (typeof current.cacheRead === "number") {
|
||||
cacheRead += current.cacheRead;
|
||||
sawCacheRead = true;
|
||||
}
|
||||
if (typeof current.cacheWrite === "number") {
|
||||
cacheWrite += current.cacheWrite;
|
||||
sawCacheWrite = true;
|
||||
}
|
||||
if (typeof current.totalTokens === "number") {
|
||||
snapshot.totalTokens = current.totalTokens;
|
||||
snapshot.totalTokensFresh = true;
|
||||
}
|
||||
if (typeof current.costUsd === "number" && Number.isFinite(current.costUsd)) {
|
||||
costUsdTotal += current.costUsd;
|
||||
sawCost = true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1150,10 +1201,20 @@ function extractLatestUsageFromTranscriptLines(
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
function extractLatestUsageFromTranscriptChunk(
|
||||
function extractLatestUsageFromTranscriptLines(
|
||||
lines: Iterable<string>,
|
||||
): SessionTranscriptUsageSnapshot | null {
|
||||
let latest: SessionTranscriptUsageSnapshot | null = null;
|
||||
for (const line of lines) {
|
||||
latest = extractUsageSnapshotFromTranscriptLine(line) ?? latest;
|
||||
}
|
||||
return latest;
|
||||
}
|
||||
|
||||
function extractAggregateUsageFromTranscriptChunk(
|
||||
chunk: string,
|
||||
): SessionTranscriptUsageSnapshot | null {
|
||||
return extractLatestUsageFromTranscriptLines(
|
||||
return extractAggregateUsageFromTranscriptLines(
|
||||
chunk.split(/\r?\n/).filter((line) => line.trim().length > 0),
|
||||
);
|
||||
}
|
||||
@@ -1175,7 +1236,7 @@ export function readLatestSessionUsageFromTranscript(
|
||||
return null;
|
||||
}
|
||||
const chunk = fs.readFileSync(fd, "utf-8");
|
||||
return extractLatestUsageFromTranscriptChunk(chunk);
|
||||
return extractAggregateUsageFromTranscriptChunk(chunk);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1201,6 +1262,62 @@ export async function readLatestSessionUsageFromTranscriptAsync(
|
||||
lines.push(line);
|
||||
}
|
||||
});
|
||||
return extractAggregateUsageFromTranscriptLines(lines);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
export async function readRecentSessionUsageFromTranscriptAsync(
|
||||
sessionId: string,
|
||||
storePath: string | undefined,
|
||||
sessionFile: string | undefined,
|
||||
agentId: string | undefined,
|
||||
maxBytes: number,
|
||||
): Promise<SessionTranscriptUsageSnapshot | null> {
|
||||
const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile, agentId);
|
||||
if (!filePath) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
const stat = await fs.promises.stat(filePath);
|
||||
if (stat.size === 0) {
|
||||
return null;
|
||||
}
|
||||
const lines = await readRecentTranscriptTailLinesAsync(filePath, stat, {
|
||||
maxMessages: 1,
|
||||
maxLines: 1000,
|
||||
maxBytes,
|
||||
});
|
||||
return extractAggregateUsageFromTranscriptLines(lines);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
export async function readLatestRecentSessionUsageFromTranscriptAsync(
|
||||
sessionId: string,
|
||||
storePath: string | undefined,
|
||||
sessionFile: string | undefined,
|
||||
agentId: string | undefined,
|
||||
maxBytes: number,
|
||||
): Promise<SessionTranscriptUsageSnapshot | null> {
|
||||
const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile, agentId);
|
||||
if (!filePath) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
const stat = await fs.promises.stat(filePath);
|
||||
if (stat.size === 0) {
|
||||
return null;
|
||||
}
|
||||
const lines = await readRecentTranscriptTailLinesAsync(filePath, stat, {
|
||||
maxMessages: 1,
|
||||
maxLines: 1000,
|
||||
maxBytes,
|
||||
});
|
||||
return extractLatestUsageFromTranscriptLines(lines);
|
||||
} catch {
|
||||
return null;
|
||||
@@ -1236,7 +1353,7 @@ export function readRecentSessionUsageFromTranscript(
|
||||
.split(/\r?\n/)
|
||||
.slice(readStart > 0 ? 1 : 0)
|
||||
.join("\n");
|
||||
return extractLatestUsageFromTranscriptChunk(chunk);
|
||||
return extractAggregateUsageFromTranscriptChunk(chunk);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -106,6 +106,8 @@ export {
|
||||
readFirstUserMessageFromTranscript,
|
||||
readLastMessagePreviewFromTranscript,
|
||||
readLatestSessionUsageFromTranscriptAsync,
|
||||
readLatestRecentSessionUsageFromTranscriptAsync,
|
||||
readRecentSessionUsageFromTranscriptAsync,
|
||||
readRecentSessionMessagesAsync,
|
||||
readRecentSessionMessagesWithStatsAsync,
|
||||
readRecentSessionTranscriptLines,
|
||||
@@ -118,6 +120,7 @@ export {
|
||||
visitSessionMessagesAsync,
|
||||
resolveSessionTranscriptCandidates,
|
||||
} from "./session-utils.fs.js";
|
||||
export type { ReadSessionMessagesAsyncOptions } from "./session-utils.fs.js";
|
||||
export { canonicalizeSpawnedByForAgent, resolveSessionStoreKey } from "./session-store-key.js";
|
||||
export type {
|
||||
GatewayAgentRow,
|
||||
|
||||
@@ -168,7 +168,10 @@ export async function handleSessionHistoryHttpRequest(
|
||||
const rawSnapshot =
|
||||
boundedSnapshot?.messages ??
|
||||
(entry?.sessionId
|
||||
? await readSessionMessagesAsync(entry.sessionId, target.storePath, entry.sessionFile)
|
||||
? await readSessionMessagesAsync(entry.sessionId, target.storePath, entry.sessionFile, {
|
||||
mode: "full",
|
||||
reason: "session history cursor pagination",
|
||||
})
|
||||
: []);
|
||||
const historySnapshot = buildSessionHistorySnapshot({
|
||||
rawMessages: rawSnapshot,
|
||||
|
||||
@@ -196,16 +196,21 @@ export class EmbeddedTuiBackend implements TuiBackend {
|
||||
const sessionId = entry?.sessionId;
|
||||
const sessionAgentId = resolveSessionAgentId({ sessionKey: opts.sessionKey, config: cfg });
|
||||
const resolvedSessionModel = resolveSessionModelRef(cfg, entry, sessionAgentId);
|
||||
const max = Math.min(1000, typeof opts.limit === "number" ? opts.limit : 200);
|
||||
const maxHistoryBytes = getMaxChatHistoryMessagesBytes();
|
||||
const localMessages =
|
||||
sessionId && storePath
|
||||
? await readSessionMessagesAsync(sessionId, storePath, entry?.sessionFile)
|
||||
? await readSessionMessagesAsync(sessionId, storePath, entry?.sessionFile, {
|
||||
mode: "recent",
|
||||
maxMessages: max,
|
||||
maxBytes: Math.max(maxHistoryBytes * 2, 1024 * 1024),
|
||||
})
|
||||
: [];
|
||||
const rawMessages = augmentChatHistoryWithCliSessionImports({
|
||||
entry,
|
||||
provider: resolvedSessionModel.provider,
|
||||
localMessages,
|
||||
});
|
||||
const max = Math.min(1000, typeof opts.limit === "number" ? opts.limit : 200);
|
||||
const effectiveMaxChars = resolveEffectiveChatHistoryMaxChars(cfg);
|
||||
const normalized = augmentChatHistoryWithCanvasBlocks(
|
||||
projectRecentChatDisplayMessages(rawMessages, {
|
||||
@@ -213,7 +218,6 @@ export class EmbeddedTuiBackend implements TuiBackend {
|
||||
maxMessages: max,
|
||||
}),
|
||||
);
|
||||
const maxHistoryBytes = getMaxChatHistoryMessagesBytes();
|
||||
const perMessageHardCap = Math.min(CHAT_HISTORY_MAX_SINGLE_MESSAGE_BYTES, maxHistoryBytes);
|
||||
const replaced = replaceOversizedChatHistoryMessages({
|
||||
messages: normalized,
|
||||
|
||||
Reference in New Issue
Block a user