mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-12 07:20:45 +00:00
fix(session): harden usage accounting and memory flush recovery
This commit is contained in:
@@ -96,6 +96,57 @@ function parseUsageFromTranscriptLine(line: string): ReturnType<typeof normalize
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function resolveSessionLogPath(
|
||||
sessionId?: string,
|
||||
sessionEntry?: SessionEntry,
|
||||
sessionKey?: string,
|
||||
opts?: { storePath?: string },
|
||||
): string | undefined {
|
||||
if (!sessionId) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
try {
|
||||
const transcriptPath = (
|
||||
sessionEntry as (SessionEntry & { transcriptPath?: string }) | undefined
|
||||
)?.transcriptPath?.trim();
|
||||
const sessionFile = sessionEntry?.sessionFile?.trim() || transcriptPath;
|
||||
const agentId = resolveAgentIdFromSessionKey(sessionKey);
|
||||
const pathOpts = resolveSessionFilePathOptions({
|
||||
agentId,
|
||||
storePath: opts?.storePath,
|
||||
});
|
||||
// Normalize sessionFile through resolveSessionFilePath so relative entries
|
||||
// are resolved against the sessions dir/store layout, not process.cwd().
|
||||
return resolveSessionFilePath(
|
||||
sessionId,
|
||||
sessionFile ? { sessionFile } : sessionEntry,
|
||||
pathOpts,
|
||||
);
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
async function readSessionLogByteSize(
|
||||
sessionId?: string,
|
||||
sessionEntry?: SessionEntry,
|
||||
sessionKey?: string,
|
||||
opts?: { storePath?: string },
|
||||
): Promise<number | undefined> {
|
||||
const logPath = resolveSessionLogPath(sessionId, sessionEntry, sessionKey, opts);
|
||||
if (!logPath) {
|
||||
return undefined;
|
||||
}
|
||||
try {
|
||||
const stat = await fs.promises.stat(logPath);
|
||||
const size = Math.floor(stat.size);
|
||||
return Number.isFinite(size) && size >= 0 ? size : undefined;
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
async function readLastNonzeroUsageFromSessionLog(logPath: string) {
|
||||
const handle = await fs.promises.open(logPath, "r");
|
||||
try {
|
||||
@@ -134,28 +185,12 @@ export async function readPromptTokensFromSessionLog(
|
||||
sessionKey?: string,
|
||||
opts?: { storePath?: string },
|
||||
): Promise<SessionTranscriptUsageSnapshot | undefined> {
|
||||
if (!sessionId) {
|
||||
const logPath = resolveSessionLogPath(sessionId, sessionEntry, sessionKey, opts);
|
||||
if (!logPath) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
try {
|
||||
const transcriptPath = (
|
||||
sessionEntry as (SessionEntry & { transcriptPath?: string }) | undefined
|
||||
)?.transcriptPath?.trim();
|
||||
const sessionFile = sessionEntry?.sessionFile?.trim() || transcriptPath;
|
||||
const agentId = resolveAgentIdFromSessionKey(sessionKey);
|
||||
const pathOpts = resolveSessionFilePathOptions({
|
||||
agentId,
|
||||
storePath: opts?.storePath,
|
||||
});
|
||||
// Normalize sessionFile through resolveSessionFilePath so relative entries
|
||||
// are resolved against the sessions dir/store layout, not process.cwd().
|
||||
const logPath = resolveSessionFilePath(
|
||||
sessionId,
|
||||
sessionFile ? { sessionFile } : sessionEntry,
|
||||
pathOpts,
|
||||
);
|
||||
|
||||
const lastUsage = await readLastNonzeroUsageFromSessionLog(logPath);
|
||||
if (!lastUsage) {
|
||||
return undefined;
|
||||
@@ -262,6 +297,23 @@ export async function runMemoryFlushIfNeeded(params: {
|
||||
const shouldReadTranscript =
|
||||
canAttemptFlush && entry && (!hasFreshPersistedPromptTokens || shouldReadTranscriptForOutput);
|
||||
|
||||
const forceFlushTranscriptBytes = memoryFlushSettings.forceFlushTranscriptBytes;
|
||||
const shouldCheckTranscriptSizeForForcedFlush =
|
||||
canAttemptFlush &&
|
||||
entry &&
|
||||
Number.isFinite(forceFlushTranscriptBytes) &&
|
||||
forceFlushTranscriptBytes > 0;
|
||||
const transcriptByteSize = shouldCheckTranscriptSizeForForcedFlush
|
||||
? await readSessionLogByteSize(
|
||||
params.followupRun.run.sessionId,
|
||||
entry,
|
||||
params.sessionKey ?? params.followupRun.run.sessionKey,
|
||||
{ storePath: params.storePath },
|
||||
)
|
||||
: undefined;
|
||||
const shouldForceFlushByTranscriptSize =
|
||||
typeof transcriptByteSize === "number" && transcriptByteSize >= forceFlushTranscriptBytes;
|
||||
|
||||
const transcriptUsageSnapshot = shouldReadTranscript
|
||||
? await readPromptTokensFromSessionLog(
|
||||
params.followupRun.run.sessionId,
|
||||
@@ -341,21 +393,23 @@ export async function runMemoryFlushIfNeeded(params: {
|
||||
`compactionCount=${entry?.compactionCount ?? 0} memoryFlushCompactionCount=${entry?.memoryFlushCompactionCount ?? "undefined"} ` +
|
||||
`persistedPromptTokens=${persistedPromptTokens ?? "undefined"} persistedFresh=${entry?.totalTokensFresh === true} ` +
|
||||
`promptTokensEst=${promptTokenEstimate ?? "undefined"} transcriptPromptTokens=${transcriptPromptTokens ?? "undefined"} transcriptOutputTokens=${transcriptOutputTokens ?? "undefined"} ` +
|
||||
`projectedTokenCount=${projectedTokenCount ?? "undefined"}`,
|
||||
`projectedTokenCount=${projectedTokenCount ?? "undefined"} transcriptBytes=${transcriptByteSize ?? "undefined"} ` +
|
||||
`forceFlushTranscriptBytes=${forceFlushTranscriptBytes} forceFlushByTranscriptSize=${shouldForceFlushByTranscriptSize}`,
|
||||
);
|
||||
|
||||
const shouldFlushMemory =
|
||||
memoryFlushSettings &&
|
||||
memoryFlushWritable &&
|
||||
!params.isHeartbeat &&
|
||||
!isCli &&
|
||||
shouldRunMemoryFlush({
|
||||
entry,
|
||||
tokenCount: tokenCountForFlush,
|
||||
contextWindowTokens,
|
||||
reserveTokensFloor: memoryFlushSettings.reserveTokensFloor,
|
||||
softThresholdTokens: memoryFlushSettings.softThresholdTokens,
|
||||
});
|
||||
(memoryFlushSettings &&
|
||||
memoryFlushWritable &&
|
||||
!params.isHeartbeat &&
|
||||
!isCli &&
|
||||
shouldRunMemoryFlush({
|
||||
entry,
|
||||
tokenCount: tokenCountForFlush,
|
||||
contextWindowTokens,
|
||||
reserveTokensFloor: memoryFlushSettings.reserveTokensFloor,
|
||||
softThresholdTokens: memoryFlushSettings.softThresholdTokens,
|
||||
})) ||
|
||||
shouldForceFlushByTranscriptSize;
|
||||
|
||||
if (!shouldFlushMemory) {
|
||||
return entry ?? params.sessionEntry;
|
||||
|
||||
@@ -8,7 +8,12 @@ import type { TypingMode } from "../../config/types.js";
|
||||
import { withStateDirEnv } from "../../test-helpers/state-dir-env.js";
|
||||
import type { TemplateContext } from "../templating.js";
|
||||
import type { GetReplyOptions } from "../types.js";
|
||||
import { enqueueFollowupRun, type FollowupRun, type QueueSettings } from "./queue.js";
|
||||
import {
|
||||
enqueueFollowupRun,
|
||||
scheduleFollowupDrain,
|
||||
type FollowupRun,
|
||||
type QueueSettings,
|
||||
} from "./queue.js";
|
||||
import { createMockTypingController } from "./test-helpers.js";
|
||||
|
||||
type AgentRunParams = {
|
||||
@@ -87,6 +92,7 @@ beforeEach(() => {
|
||||
state.runEmbeddedPiAgentMock.mockClear();
|
||||
state.runCliAgentMock.mockClear();
|
||||
vi.mocked(enqueueFollowupRun).mockClear();
|
||||
vi.mocked(scheduleFollowupDrain).mockClear();
|
||||
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
|
||||
});
|
||||
|
||||
@@ -311,6 +317,25 @@ describe("runReplyAgent heartbeat followup guard", () => {
|
||||
expect(vi.mocked(enqueueFollowupRun)).toHaveBeenCalledTimes(1);
|
||||
expect(state.runEmbeddedPiAgentMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("drains followup queue when an unexpected exception escapes the run path", async () => {
|
||||
const accounting = await import("./session-run-accounting.js");
|
||||
const persistSpy = vi
|
||||
.spyOn(accounting, "persistRunSessionUsage")
|
||||
.mockRejectedValueOnce(new Error("persist exploded"));
|
||||
state.runEmbeddedPiAgentMock.mockResolvedValueOnce({
|
||||
payloads: [{ text: "ok" }],
|
||||
meta: { agentMeta: { usage: { input: 1, output: 1 } } },
|
||||
});
|
||||
|
||||
try {
|
||||
const { run } = createMinimalRun();
|
||||
await expect(run()).rejects.toThrow("persist exploded");
|
||||
expect(vi.mocked(scheduleFollowupDrain)).toHaveBeenCalledTimes(1);
|
||||
} finally {
|
||||
persistSpy.mockRestore();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("runReplyAgent typing (heartbeat)", () => {
|
||||
@@ -1661,6 +1686,68 @@ describe("runReplyAgent memory flush", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("forces memory flush when transcript file exceeds configured byte threshold", async () => {
|
||||
await withTempStore(async (storePath) => {
|
||||
const sessionKey = "main";
|
||||
const sessionFile = "oversized-session.jsonl";
|
||||
const transcriptPath = path.join(path.dirname(storePath), sessionFile);
|
||||
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
|
||||
await fs.writeFile(transcriptPath, "x".repeat(3_000), "utf-8");
|
||||
|
||||
const sessionEntry = {
|
||||
sessionId: "session",
|
||||
updatedAt: Date.now(),
|
||||
sessionFile,
|
||||
totalTokens: 10,
|
||||
totalTokensFresh: false,
|
||||
compactionCount: 1,
|
||||
};
|
||||
|
||||
await seedSessionStore({ storePath, sessionKey, entry: sessionEntry });
|
||||
|
||||
const calls: Array<{ prompt?: string }> = [];
|
||||
state.runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => {
|
||||
calls.push({ prompt: params.prompt });
|
||||
if (params.prompt?.includes("Pre-compaction memory flush.")) {
|
||||
return { payloads: [], meta: {} };
|
||||
}
|
||||
return {
|
||||
payloads: [{ text: "ok" }],
|
||||
meta: { agentMeta: { usage: { input: 1, output: 1 } } },
|
||||
};
|
||||
});
|
||||
|
||||
const baseRun = createBaseRun({
|
||||
storePath,
|
||||
sessionEntry,
|
||||
config: {
|
||||
agents: {
|
||||
defaults: {
|
||||
compaction: {
|
||||
memoryFlush: {
|
||||
forceFlushTranscriptBytes: 256,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
runOverrides: { sessionFile },
|
||||
});
|
||||
|
||||
await runReplyAgentWithBase({
|
||||
baseRun,
|
||||
storePath,
|
||||
sessionKey,
|
||||
sessionEntry,
|
||||
commandBody: "hello",
|
||||
});
|
||||
|
||||
expect(calls).toHaveLength(2);
|
||||
expect(calls[0]?.prompt).toContain("Pre-compaction memory flush.");
|
||||
expect(calls[1]?.prompt).toBe("hello");
|
||||
});
|
||||
});
|
||||
|
||||
it("skips memory flush when disabled in config", async () => {
|
||||
await withTempStore(async (storePath) => {
|
||||
const sessionKey = "main";
|
||||
|
||||
@@ -715,6 +715,11 @@ export async function runReplyAgent(params: {
|
||||
queueKey,
|
||||
runFollowupTurn,
|
||||
);
|
||||
} catch (error) {
|
||||
// Keep the followup queue moving even when an unexpected exception escapes
|
||||
// the run path; the caller still receives the original error.
|
||||
finalizeWithFollowup(undefined, queueKey, runFollowupTurn);
|
||||
throw error;
|
||||
} finally {
|
||||
blockReplyPipeline?.stop();
|
||||
typing.markRunComplete();
|
||||
|
||||
@@ -2,11 +2,13 @@ import { lookupContextTokens } from "../../agents/context.js";
|
||||
import { resolveCronStyleNow } from "../../agents/current-time.js";
|
||||
import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js";
|
||||
import { DEFAULT_PI_COMPACTION_RESERVE_TOKENS_FLOOR } from "../../agents/pi-settings.js";
|
||||
import { parseByteSize } from "../../cli/parse-bytes.js";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import { resolveFreshSessionTotalTokens, type SessionEntry } from "../../config/sessions.js";
|
||||
import { SILENT_REPLY_TOKEN } from "../tokens.js";
|
||||
|
||||
export const DEFAULT_MEMORY_FLUSH_SOFT_TOKENS = 4000;
|
||||
export const DEFAULT_MEMORY_FLUSH_FORCE_TRANSCRIPT_BYTES = 2 * 1024 * 1024;
|
||||
|
||||
export const DEFAULT_MEMORY_FLUSH_PROMPT = [
|
||||
"Pre-compaction memory flush.",
|
||||
@@ -58,6 +60,11 @@ export function resolveMemoryFlushPromptForRun(params: {
|
||||
export type MemoryFlushSettings = {
|
||||
enabled: boolean;
|
||||
softThresholdTokens: number;
|
||||
/**
|
||||
* Force a pre-compaction memory flush when the session transcript reaches this
|
||||
* size. Set to 0 to disable byte-size based triggering.
|
||||
*/
|
||||
forceFlushTranscriptBytes: number;
|
||||
prompt: string;
|
||||
systemPrompt: string;
|
||||
reserveTokensFloor: number;
|
||||
@@ -71,6 +78,26 @@ const normalizeNonNegativeInt = (value: unknown): number | null => {
|
||||
return int >= 0 ? int : null;
|
||||
};
|
||||
|
||||
const normalizeOptionalByteSize = (value: unknown): number | null => {
|
||||
if (typeof value === "number" && Number.isFinite(value)) {
|
||||
const int = Math.floor(value);
|
||||
return int >= 0 ? int : null;
|
||||
}
|
||||
if (typeof value === "string") {
|
||||
const trimmed = value.trim();
|
||||
if (!trimmed) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
const bytes = parseByteSize(trimmed, { defaultUnit: "b" });
|
||||
return bytes >= 0 ? bytes : null;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
};
|
||||
|
||||
export function resolveMemoryFlushSettings(cfg?: OpenClawConfig): MemoryFlushSettings | null {
|
||||
const defaults = cfg?.agents?.defaults?.compaction?.memoryFlush;
|
||||
const enabled = defaults?.enabled ?? true;
|
||||
@@ -79,6 +106,9 @@ export function resolveMemoryFlushSettings(cfg?: OpenClawConfig): MemoryFlushSet
|
||||
}
|
||||
const softThresholdTokens =
|
||||
normalizeNonNegativeInt(defaults?.softThresholdTokens) ?? DEFAULT_MEMORY_FLUSH_SOFT_TOKENS;
|
||||
const forceFlushTranscriptBytes =
|
||||
normalizeOptionalByteSize(defaults?.forceFlushTranscriptBytes) ??
|
||||
DEFAULT_MEMORY_FLUSH_FORCE_TRANSCRIPT_BYTES;
|
||||
const prompt = defaults?.prompt?.trim() || DEFAULT_MEMORY_FLUSH_PROMPT;
|
||||
const systemPrompt = defaults?.systemPrompt?.trim() || DEFAULT_MEMORY_FLUSH_SYSTEM_PROMPT;
|
||||
const reserveTokensFloor =
|
||||
@@ -88,6 +118,7 @@ export function resolveMemoryFlushSettings(cfg?: OpenClawConfig): MemoryFlushSet
|
||||
return {
|
||||
enabled,
|
||||
softThresholdTokens,
|
||||
forceFlushTranscriptBytes,
|
||||
prompt: ensureNoReplyHint(prompt),
|
||||
systemPrompt: ensureNoReplyHint(systemPrompt),
|
||||
reserveTokensFloor,
|
||||
|
||||
@@ -15,6 +15,7 @@ import {
|
||||
recordPendingHistoryEntryIfEnabled,
|
||||
} from "./history.js";
|
||||
import {
|
||||
DEFAULT_MEMORY_FLUSH_FORCE_TRANSCRIPT_BYTES,
|
||||
DEFAULT_MEMORY_FLUSH_SOFT_TOKENS,
|
||||
resolveMemoryFlushContextWindowTokens,
|
||||
resolveMemoryFlushSettings,
|
||||
@@ -198,6 +199,7 @@ describe("memory flush settings", () => {
|
||||
const settings = resolveMemoryFlushSettings();
|
||||
expect(settings).not.toBeNull();
|
||||
expect(settings?.enabled).toBe(true);
|
||||
expect(settings?.forceFlushTranscriptBytes).toBe(DEFAULT_MEMORY_FLUSH_FORCE_TRANSCRIPT_BYTES);
|
||||
expect(settings?.prompt.length).toBeGreaterThan(0);
|
||||
expect(settings?.systemPrompt.length).toBeGreaterThan(0);
|
||||
});
|
||||
@@ -244,8 +246,25 @@ describe("memory flush settings", () => {
|
||||
});
|
||||
|
||||
expect(settings?.softThresholdTokens).toBe(DEFAULT_MEMORY_FLUSH_SOFT_TOKENS);
|
||||
expect(settings?.forceFlushTranscriptBytes).toBe(DEFAULT_MEMORY_FLUSH_FORCE_TRANSCRIPT_BYTES);
|
||||
expect(settings?.reserveTokensFloor).toBe(DEFAULT_PI_COMPACTION_RESERVE_TOKENS_FLOOR);
|
||||
});
|
||||
|
||||
it("parses forceFlushTranscriptBytes from byte-size strings", () => {
|
||||
const settings = resolveMemoryFlushSettings({
|
||||
agents: {
|
||||
defaults: {
|
||||
compaction: {
|
||||
memoryFlush: {
|
||||
forceFlushTranscriptBytes: "3mb",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
expect(settings?.forceFlushTranscriptBytes).toBe(3 * 1024 * 1024);
|
||||
});
|
||||
});
|
||||
|
||||
describe("shouldRunMemoryFlush", () => {
|
||||
|
||||
@@ -93,8 +93,11 @@ export async function persistSessionUsageUpdate(params: {
|
||||
if (hasUsage) {
|
||||
patch.inputTokens = params.usage?.input ?? 0;
|
||||
patch.outputTokens = params.usage?.output ?? 0;
|
||||
patch.cacheRead = params.usage?.cacheRead ?? 0;
|
||||
patch.cacheWrite = params.usage?.cacheWrite ?? 0;
|
||||
// Cache counters should reflect the latest context snapshot when
|
||||
// available, not accumulated per-call totals across a whole run.
|
||||
const cacheUsage = params.lastCallUsage ?? params.usage;
|
||||
patch.cacheRead = cacheUsage?.cacheRead ?? 0;
|
||||
patch.cacheWrite = cacheUsage?.cacheWrite ?? 0;
|
||||
}
|
||||
// Missing a last-call snapshot (and promptTokens fallback) means
|
||||
// context utilization is stale/unknown.
|
||||
|
||||
@@ -1170,6 +1170,40 @@ describe("persistSessionUsageUpdate", () => {
|
||||
expect(stored[sessionKey].outputTokens).toBe(10_000);
|
||||
});
|
||||
|
||||
it("uses lastCallUsage cache counters when available", async () => {
|
||||
const storePath = await createStorePath("openclaw-usage-cache-");
|
||||
const sessionKey = "main";
|
||||
await seedSessionStore({
|
||||
storePath,
|
||||
sessionKey,
|
||||
entry: { sessionId: "s1", updatedAt: Date.now() },
|
||||
});
|
||||
|
||||
await persistSessionUsageUpdate({
|
||||
storePath,
|
||||
sessionKey,
|
||||
usage: {
|
||||
input: 100_000,
|
||||
output: 8_000,
|
||||
cacheRead: 260_000,
|
||||
cacheWrite: 90_000,
|
||||
},
|
||||
lastCallUsage: {
|
||||
input: 12_000,
|
||||
output: 1_000,
|
||||
cacheRead: 18_000,
|
||||
cacheWrite: 4_000,
|
||||
},
|
||||
contextTokensUsed: 200_000,
|
||||
});
|
||||
|
||||
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
|
||||
expect(stored[sessionKey].inputTokens).toBe(100_000);
|
||||
expect(stored[sessionKey].outputTokens).toBe(8_000);
|
||||
expect(stored[sessionKey].cacheRead).toBe(18_000);
|
||||
expect(stored[sessionKey].cacheWrite).toBe(4_000);
|
||||
});
|
||||
|
||||
it("marks totalTokens as unknown when no fresh context snapshot is available", async () => {
|
||||
const storePath = await createStorePath("openclaw-usage-");
|
||||
const sessionKey = "main";
|
||||
|
||||
@@ -295,6 +295,11 @@ export type AgentCompactionMemoryFlushConfig = {
|
||||
enabled?: boolean;
|
||||
/** Run the memory flush when context is within this many tokens of the compaction threshold. */
|
||||
softThresholdTokens?: number;
|
||||
/**
|
||||
* Force a memory flush when transcript size reaches this threshold
|
||||
* (bytes, or byte-size string like "2mb"). Set to 0 to disable.
|
||||
*/
|
||||
forceFlushTranscriptBytes?: number | string;
|
||||
/** User prompt used for the memory flush turn (NO_REPLY is enforced if missing). */
|
||||
prompt?: string;
|
||||
/** System prompt appended for the memory flush turn. */
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { z } from "zod";
|
||||
import { parseByteSize } from "../cli/parse-bytes.js";
|
||||
import {
|
||||
HeartbeatSchema,
|
||||
AgentSandboxSchema,
|
||||
@@ -92,6 +93,19 @@ export const AgentDefaultsSchema = z
|
||||
.object({
|
||||
enabled: z.boolean().optional(),
|
||||
softThresholdTokens: z.number().int().nonnegative().optional(),
|
||||
forceFlushTranscriptBytes: z
|
||||
.union([
|
||||
z.number().int().nonnegative(),
|
||||
z.string().refine((value) => {
|
||||
try {
|
||||
parseByteSize(value.trim(), { defaultUnit: "b" });
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}, "Expected byte size string like 2mb"),
|
||||
])
|
||||
.optional(),
|
||||
prompt: z.string().optional(),
|
||||
systemPrompt: z.string().optional(),
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user