fix(session): harden usage accounting and memory flush recovery

This commit is contained in:
Peter Steinberger
2026-03-02 00:06:52 +00:00
parent ee96e1751e
commit d729ab2150
9 changed files with 285 additions and 33 deletions

View File

@@ -96,6 +96,57 @@ function parseUsageFromTranscriptLine(line: string): ReturnType<typeof normalize
return undefined;
}
function resolveSessionLogPath(
sessionId?: string,
sessionEntry?: SessionEntry,
sessionKey?: string,
opts?: { storePath?: string },
): string | undefined {
if (!sessionId) {
return undefined;
}
try {
const transcriptPath = (
sessionEntry as (SessionEntry & { transcriptPath?: string }) | undefined
)?.transcriptPath?.trim();
const sessionFile = sessionEntry?.sessionFile?.trim() || transcriptPath;
const agentId = resolveAgentIdFromSessionKey(sessionKey);
const pathOpts = resolveSessionFilePathOptions({
agentId,
storePath: opts?.storePath,
});
// Normalize sessionFile through resolveSessionFilePath so relative entries
// are resolved against the sessions dir/store layout, not process.cwd().
return resolveSessionFilePath(
sessionId,
sessionFile ? { sessionFile } : sessionEntry,
pathOpts,
);
} catch {
return undefined;
}
}
async function readSessionLogByteSize(
sessionId?: string,
sessionEntry?: SessionEntry,
sessionKey?: string,
opts?: { storePath?: string },
): Promise<number | undefined> {
const logPath = resolveSessionLogPath(sessionId, sessionEntry, sessionKey, opts);
if (!logPath) {
return undefined;
}
try {
const stat = await fs.promises.stat(logPath);
const size = Math.floor(stat.size);
return Number.isFinite(size) && size >= 0 ? size : undefined;
} catch {
return undefined;
}
}
async function readLastNonzeroUsageFromSessionLog(logPath: string) {
const handle = await fs.promises.open(logPath, "r");
try {
@@ -134,28 +185,12 @@ export async function readPromptTokensFromSessionLog(
sessionKey?: string,
opts?: { storePath?: string },
): Promise<SessionTranscriptUsageSnapshot | undefined> {
if (!sessionId) {
const logPath = resolveSessionLogPath(sessionId, sessionEntry, sessionKey, opts);
if (!logPath) {
return undefined;
}
try {
const transcriptPath = (
sessionEntry as (SessionEntry & { transcriptPath?: string }) | undefined
)?.transcriptPath?.trim();
const sessionFile = sessionEntry?.sessionFile?.trim() || transcriptPath;
const agentId = resolveAgentIdFromSessionKey(sessionKey);
const pathOpts = resolveSessionFilePathOptions({
agentId,
storePath: opts?.storePath,
});
// Normalize sessionFile through resolveSessionFilePath so relative entries
// are resolved against the sessions dir/store layout, not process.cwd().
const logPath = resolveSessionFilePath(
sessionId,
sessionFile ? { sessionFile } : sessionEntry,
pathOpts,
);
const lastUsage = await readLastNonzeroUsageFromSessionLog(logPath);
if (!lastUsage) {
return undefined;
@@ -262,6 +297,23 @@ export async function runMemoryFlushIfNeeded(params: {
const shouldReadTranscript =
canAttemptFlush && entry && (!hasFreshPersistedPromptTokens || shouldReadTranscriptForOutput);
const forceFlushTranscriptBytes = memoryFlushSettings.forceFlushTranscriptBytes;
const shouldCheckTranscriptSizeForForcedFlush =
canAttemptFlush &&
entry &&
Number.isFinite(forceFlushTranscriptBytes) &&
forceFlushTranscriptBytes > 0;
const transcriptByteSize = shouldCheckTranscriptSizeForForcedFlush
? await readSessionLogByteSize(
params.followupRun.run.sessionId,
entry,
params.sessionKey ?? params.followupRun.run.sessionKey,
{ storePath: params.storePath },
)
: undefined;
const shouldForceFlushByTranscriptSize =
typeof transcriptByteSize === "number" && transcriptByteSize >= forceFlushTranscriptBytes;
const transcriptUsageSnapshot = shouldReadTranscript
? await readPromptTokensFromSessionLog(
params.followupRun.run.sessionId,
@@ -341,21 +393,23 @@ export async function runMemoryFlushIfNeeded(params: {
`compactionCount=${entry?.compactionCount ?? 0} memoryFlushCompactionCount=${entry?.memoryFlushCompactionCount ?? "undefined"} ` +
`persistedPromptTokens=${persistedPromptTokens ?? "undefined"} persistedFresh=${entry?.totalTokensFresh === true} ` +
`promptTokensEst=${promptTokenEstimate ?? "undefined"} transcriptPromptTokens=${transcriptPromptTokens ?? "undefined"} transcriptOutputTokens=${transcriptOutputTokens ?? "undefined"} ` +
`projectedTokenCount=${projectedTokenCount ?? "undefined"}`,
`projectedTokenCount=${projectedTokenCount ?? "undefined"} transcriptBytes=${transcriptByteSize ?? "undefined"} ` +
`forceFlushTranscriptBytes=${forceFlushTranscriptBytes} forceFlushByTranscriptSize=${shouldForceFlushByTranscriptSize}`,
);
const shouldFlushMemory =
memoryFlushSettings &&
memoryFlushWritable &&
!params.isHeartbeat &&
!isCli &&
shouldRunMemoryFlush({
entry,
tokenCount: tokenCountForFlush,
contextWindowTokens,
reserveTokensFloor: memoryFlushSettings.reserveTokensFloor,
softThresholdTokens: memoryFlushSettings.softThresholdTokens,
});
(memoryFlushSettings &&
memoryFlushWritable &&
!params.isHeartbeat &&
!isCli &&
shouldRunMemoryFlush({
entry,
tokenCount: tokenCountForFlush,
contextWindowTokens,
reserveTokensFloor: memoryFlushSettings.reserveTokensFloor,
softThresholdTokens: memoryFlushSettings.softThresholdTokens,
})) ||
shouldForceFlushByTranscriptSize;
if (!shouldFlushMemory) {
return entry ?? params.sessionEntry;

View File

@@ -8,7 +8,12 @@ import type { TypingMode } from "../../config/types.js";
import { withStateDirEnv } from "../../test-helpers/state-dir-env.js";
import type { TemplateContext } from "../templating.js";
import type { GetReplyOptions } from "../types.js";
import { enqueueFollowupRun, type FollowupRun, type QueueSettings } from "./queue.js";
import {
enqueueFollowupRun,
scheduleFollowupDrain,
type FollowupRun,
type QueueSettings,
} from "./queue.js";
import { createMockTypingController } from "./test-helpers.js";
type AgentRunParams = {
@@ -87,6 +92,7 @@ beforeEach(() => {
state.runEmbeddedPiAgentMock.mockClear();
state.runCliAgentMock.mockClear();
vi.mocked(enqueueFollowupRun).mockClear();
vi.mocked(scheduleFollowupDrain).mockClear();
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
});
@@ -311,6 +317,25 @@ describe("runReplyAgent heartbeat followup guard", () => {
expect(vi.mocked(enqueueFollowupRun)).toHaveBeenCalledTimes(1);
expect(state.runEmbeddedPiAgentMock).not.toHaveBeenCalled();
});
it("drains followup queue when an unexpected exception escapes the run path", async () => {
const accounting = await import("./session-run-accounting.js");
const persistSpy = vi
.spyOn(accounting, "persistRunSessionUsage")
.mockRejectedValueOnce(new Error("persist exploded"));
state.runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "ok" }],
meta: { agentMeta: { usage: { input: 1, output: 1 } } },
});
try {
const { run } = createMinimalRun();
await expect(run()).rejects.toThrow("persist exploded");
expect(vi.mocked(scheduleFollowupDrain)).toHaveBeenCalledTimes(1);
} finally {
persistSpy.mockRestore();
}
});
});
describe("runReplyAgent typing (heartbeat)", () => {
@@ -1661,6 +1686,68 @@ describe("runReplyAgent memory flush", () => {
});
});
it("forces memory flush when transcript file exceeds configured byte threshold", async () => {
await withTempStore(async (storePath) => {
const sessionKey = "main";
const sessionFile = "oversized-session.jsonl";
const transcriptPath = path.join(path.dirname(storePath), sessionFile);
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
await fs.writeFile(transcriptPath, "x".repeat(3_000), "utf-8");
const sessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
sessionFile,
totalTokens: 10,
totalTokensFresh: false,
compactionCount: 1,
};
await seedSessionStore({ storePath, sessionKey, entry: sessionEntry });
const calls: Array<{ prompt?: string }> = [];
state.runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => {
calls.push({ prompt: params.prompt });
if (params.prompt?.includes("Pre-compaction memory flush.")) {
return { payloads: [], meta: {} };
}
return {
payloads: [{ text: "ok" }],
meta: { agentMeta: { usage: { input: 1, output: 1 } } },
};
});
const baseRun = createBaseRun({
storePath,
sessionEntry,
config: {
agents: {
defaults: {
compaction: {
memoryFlush: {
forceFlushTranscriptBytes: 256,
},
},
},
},
},
runOverrides: { sessionFile },
});
await runReplyAgentWithBase({
baseRun,
storePath,
sessionKey,
sessionEntry,
commandBody: "hello",
});
expect(calls).toHaveLength(2);
expect(calls[0]?.prompt).toContain("Pre-compaction memory flush.");
expect(calls[1]?.prompt).toBe("hello");
});
});
it("skips memory flush when disabled in config", async () => {
await withTempStore(async (storePath) => {
const sessionKey = "main";

View File

@@ -715,6 +715,11 @@ export async function runReplyAgent(params: {
queueKey,
runFollowupTurn,
);
} catch (error) {
// Keep the followup queue moving even when an unexpected exception escapes
// the run path; the caller still receives the original error.
finalizeWithFollowup(undefined, queueKey, runFollowupTurn);
throw error;
} finally {
blockReplyPipeline?.stop();
typing.markRunComplete();

View File

@@ -2,11 +2,13 @@ import { lookupContextTokens } from "../../agents/context.js";
import { resolveCronStyleNow } from "../../agents/current-time.js";
import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js";
import { DEFAULT_PI_COMPACTION_RESERVE_TOKENS_FLOOR } from "../../agents/pi-settings.js";
import { parseByteSize } from "../../cli/parse-bytes.js";
import type { OpenClawConfig } from "../../config/config.js";
import { resolveFreshSessionTotalTokens, type SessionEntry } from "../../config/sessions.js";
import { SILENT_REPLY_TOKEN } from "../tokens.js";
export const DEFAULT_MEMORY_FLUSH_SOFT_TOKENS = 4000;
export const DEFAULT_MEMORY_FLUSH_FORCE_TRANSCRIPT_BYTES = 2 * 1024 * 1024;
export const DEFAULT_MEMORY_FLUSH_PROMPT = [
"Pre-compaction memory flush.",
@@ -58,6 +60,11 @@ export function resolveMemoryFlushPromptForRun(params: {
export type MemoryFlushSettings = {
enabled: boolean;
softThresholdTokens: number;
/**
* Force a pre-compaction memory flush when the session transcript reaches this
* size. Set to 0 to disable byte-size based triggering.
*/
forceFlushTranscriptBytes: number;
prompt: string;
systemPrompt: string;
reserveTokensFloor: number;
@@ -71,6 +78,26 @@ const normalizeNonNegativeInt = (value: unknown): number | null => {
return int >= 0 ? int : null;
};
const normalizeOptionalByteSize = (value: unknown): number | null => {
if (typeof value === "number" && Number.isFinite(value)) {
const int = Math.floor(value);
return int >= 0 ? int : null;
}
if (typeof value === "string") {
const trimmed = value.trim();
if (!trimmed) {
return null;
}
try {
const bytes = parseByteSize(trimmed, { defaultUnit: "b" });
return bytes >= 0 ? bytes : null;
} catch {
return null;
}
}
return null;
};
export function resolveMemoryFlushSettings(cfg?: OpenClawConfig): MemoryFlushSettings | null {
const defaults = cfg?.agents?.defaults?.compaction?.memoryFlush;
const enabled = defaults?.enabled ?? true;
@@ -79,6 +106,9 @@ export function resolveMemoryFlushSettings(cfg?: OpenClawConfig): MemoryFlushSet
}
const softThresholdTokens =
normalizeNonNegativeInt(defaults?.softThresholdTokens) ?? DEFAULT_MEMORY_FLUSH_SOFT_TOKENS;
const forceFlushTranscriptBytes =
normalizeOptionalByteSize(defaults?.forceFlushTranscriptBytes) ??
DEFAULT_MEMORY_FLUSH_FORCE_TRANSCRIPT_BYTES;
const prompt = defaults?.prompt?.trim() || DEFAULT_MEMORY_FLUSH_PROMPT;
const systemPrompt = defaults?.systemPrompt?.trim() || DEFAULT_MEMORY_FLUSH_SYSTEM_PROMPT;
const reserveTokensFloor =
@@ -88,6 +118,7 @@ export function resolveMemoryFlushSettings(cfg?: OpenClawConfig): MemoryFlushSet
return {
enabled,
softThresholdTokens,
forceFlushTranscriptBytes,
prompt: ensureNoReplyHint(prompt),
systemPrompt: ensureNoReplyHint(systemPrompt),
reserveTokensFloor,

View File

@@ -15,6 +15,7 @@ import {
recordPendingHistoryEntryIfEnabled,
} from "./history.js";
import {
DEFAULT_MEMORY_FLUSH_FORCE_TRANSCRIPT_BYTES,
DEFAULT_MEMORY_FLUSH_SOFT_TOKENS,
resolveMemoryFlushContextWindowTokens,
resolveMemoryFlushSettings,
@@ -198,6 +199,7 @@ describe("memory flush settings", () => {
const settings = resolveMemoryFlushSettings();
expect(settings).not.toBeNull();
expect(settings?.enabled).toBe(true);
expect(settings?.forceFlushTranscriptBytes).toBe(DEFAULT_MEMORY_FLUSH_FORCE_TRANSCRIPT_BYTES);
expect(settings?.prompt.length).toBeGreaterThan(0);
expect(settings?.systemPrompt.length).toBeGreaterThan(0);
});
@@ -244,8 +246,25 @@ describe("memory flush settings", () => {
});
expect(settings?.softThresholdTokens).toBe(DEFAULT_MEMORY_FLUSH_SOFT_TOKENS);
expect(settings?.forceFlushTranscriptBytes).toBe(DEFAULT_MEMORY_FLUSH_FORCE_TRANSCRIPT_BYTES);
expect(settings?.reserveTokensFloor).toBe(DEFAULT_PI_COMPACTION_RESERVE_TOKENS_FLOOR);
});
it("parses forceFlushTranscriptBytes from byte-size strings", () => {
const settings = resolveMemoryFlushSettings({
agents: {
defaults: {
compaction: {
memoryFlush: {
forceFlushTranscriptBytes: "3mb",
},
},
},
},
});
expect(settings?.forceFlushTranscriptBytes).toBe(3 * 1024 * 1024);
});
});
describe("shouldRunMemoryFlush", () => {

View File

@@ -93,8 +93,11 @@ export async function persistSessionUsageUpdate(params: {
if (hasUsage) {
patch.inputTokens = params.usage?.input ?? 0;
patch.outputTokens = params.usage?.output ?? 0;
patch.cacheRead = params.usage?.cacheRead ?? 0;
patch.cacheWrite = params.usage?.cacheWrite ?? 0;
// Cache counters should reflect the latest context snapshot when
// available, not accumulated per-call totals across a whole run.
const cacheUsage = params.lastCallUsage ?? params.usage;
patch.cacheRead = cacheUsage?.cacheRead ?? 0;
patch.cacheWrite = cacheUsage?.cacheWrite ?? 0;
}
// Missing a last-call snapshot (and promptTokens fallback) means
// context utilization is stale/unknown.

View File

@@ -1170,6 +1170,40 @@ describe("persistSessionUsageUpdate", () => {
expect(stored[sessionKey].outputTokens).toBe(10_000);
});
it("uses lastCallUsage cache counters when available", async () => {
const storePath = await createStorePath("openclaw-usage-cache-");
const sessionKey = "main";
await seedSessionStore({
storePath,
sessionKey,
entry: { sessionId: "s1", updatedAt: Date.now() },
});
await persistSessionUsageUpdate({
storePath,
sessionKey,
usage: {
input: 100_000,
output: 8_000,
cacheRead: 260_000,
cacheWrite: 90_000,
},
lastCallUsage: {
input: 12_000,
output: 1_000,
cacheRead: 18_000,
cacheWrite: 4_000,
},
contextTokensUsed: 200_000,
});
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(stored[sessionKey].inputTokens).toBe(100_000);
expect(stored[sessionKey].outputTokens).toBe(8_000);
expect(stored[sessionKey].cacheRead).toBe(18_000);
expect(stored[sessionKey].cacheWrite).toBe(4_000);
});
it("marks totalTokens as unknown when no fresh context snapshot is available", async () => {
const storePath = await createStorePath("openclaw-usage-");
const sessionKey = "main";

View File

@@ -295,6 +295,11 @@ export type AgentCompactionMemoryFlushConfig = {
enabled?: boolean;
/** Run the memory flush when context is within this many tokens of the compaction threshold. */
softThresholdTokens?: number;
/**
* Force a memory flush when transcript size reaches this threshold
* (bytes, or byte-size string like "2mb"). Set to 0 to disable.
*/
forceFlushTranscriptBytes?: number | string;
/** User prompt used for the memory flush turn (NO_REPLY is enforced if missing). */
prompt?: string;
/** System prompt appended for the memory flush turn. */

View File

@@ -1,4 +1,5 @@
import { z } from "zod";
import { parseByteSize } from "../cli/parse-bytes.js";
import {
HeartbeatSchema,
AgentSandboxSchema,
@@ -92,6 +93,19 @@ export const AgentDefaultsSchema = z
.object({
enabled: z.boolean().optional(),
softThresholdTokens: z.number().int().nonnegative().optional(),
forceFlushTranscriptBytes: z
.union([
z.number().int().nonnegative(),
z.string().refine((value) => {
try {
parseByteSize(value.trim(), { defaultUnit: "b" });
return true;
} catch {
return false;
}
}, "Expected byte size string like 2mb"),
])
.optional(),
prompt: z.string().optional(),
systemPrompt: z.string().optional(),
})