mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 08:30:42 +00:00
fix: bound trajectory runtime flush (#77154)
* fix: bound trajectory runtime flush * fix: keep trajectory export cap compatible * test: keep followup delivery test pure
This commit is contained in:
committed by
GitHub
parent
be41b8cbc7
commit
474bea162b
@@ -80,4 +80,16 @@ describe("getQueuedFileWriter", () => {
|
||||
|
||||
expect(fs.readFileSync(filePath, "utf8")).toBe("12345\n");
|
||||
});
|
||||
|
||||
it("drops writes that would exceed the pending queue cap", async () => {
|
||||
const tmpDir = makeTempDir();
|
||||
const filePath = path.join(tmpDir, "trace.jsonl");
|
||||
const writer = getQueuedFileWriter(new Map(), filePath, { maxQueuedBytes: 6 });
|
||||
|
||||
expect(writer.write("12345\n")).toBe("queued");
|
||||
expect(writer.write("after\n")).toBe("dropped");
|
||||
await writer.flush();
|
||||
|
||||
expect(fs.readFileSync(filePath, "utf8")).toBe("12345\n");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -2,14 +2,18 @@ import nodeFs from "node:fs";
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
|
||||
export type QueuedFileWriteResult = "queued" | "dropped";
|
||||
|
||||
export type QueuedFileWriter = {
|
||||
filePath: string;
|
||||
write: (line: string) => void;
|
||||
write: (line: string) => unknown;
|
||||
flush: () => Promise<void>;
|
||||
};
|
||||
|
||||
type QueuedFileWriterOptions = {
|
||||
maxFileBytes?: number;
|
||||
maxQueuedBytes?: number;
|
||||
yieldBeforeWrite?: boolean;
|
||||
};
|
||||
|
||||
type QueuedFileAppendFlagConstants = Pick<
|
||||
@@ -111,6 +115,12 @@ async function safeAppendFile(
|
||||
}
|
||||
}
|
||||
|
||||
function waitForImmediate(): Promise<void> {
|
||||
return new Promise((resolve) => {
|
||||
setImmediate(resolve);
|
||||
});
|
||||
}
|
||||
|
||||
export function getQueuedFileWriter(
|
||||
writers: Map<string, QueuedFileWriter>,
|
||||
filePath: string,
|
||||
@@ -123,15 +133,29 @@ export function getQueuedFileWriter(
|
||||
|
||||
const dir = path.dirname(filePath);
|
||||
const ready = fs.mkdir(dir, { recursive: true, mode: 0o700 }).catch(() => undefined);
|
||||
let queue = Promise.resolve();
|
||||
let queue: Promise<unknown> = Promise.resolve();
|
||||
let queuedBytes = 0;
|
||||
|
||||
const writer: QueuedFileWriter = {
|
||||
filePath,
|
||||
write: (line: string) => {
|
||||
const lineBytes = Buffer.byteLength(line, "utf8");
|
||||
if (
|
||||
options.maxQueuedBytes !== undefined &&
|
||||
queuedBytes + lineBytes > options.maxQueuedBytes
|
||||
) {
|
||||
return "dropped";
|
||||
}
|
||||
queuedBytes += lineBytes;
|
||||
queue = queue
|
||||
.then(() => ready)
|
||||
.then(() => (options.yieldBeforeWrite ? waitForImmediate() : undefined))
|
||||
.then(() => safeAppendFile(filePath, line, options))
|
||||
.catch(() => undefined);
|
||||
.catch(() => undefined)
|
||||
.finally(() => {
|
||||
queuedBytes = Math.max(0, queuedBytes - lineBytes);
|
||||
});
|
||||
return "queued";
|
||||
},
|
||||
flush: async () => {
|
||||
await queue;
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import { resolveFollowupDeliveryPayloads } from "./followup-delivery.js";
|
||||
|
||||
vi.mock("../../channels/plugins/index.js", () => ({
|
||||
getChannelPlugin: () => undefined,
|
||||
}));
|
||||
|
||||
const baseConfig = {} as OpenClawConfig;
|
||||
|
||||
describe("resolveFollowupDeliveryPayloads", () => {
|
||||
|
||||
@@ -4,7 +4,7 @@ import path from "node:path";
|
||||
import type { Message, Usage } from "@mariozechner/pi-ai";
|
||||
import { afterAll, describe, expect, it } from "vitest";
|
||||
import { exportTrajectoryBundle, resolveDefaultTrajectoryExportDir } from "./export.js";
|
||||
import { resolveTrajectoryPointerFilePath } from "./paths.js";
|
||||
import { TRAJECTORY_RUNTIME_FILE_MAX_BYTES, resolveTrajectoryPointerFilePath } from "./paths.js";
|
||||
import type { TrajectoryEvent } from "./types.js";
|
||||
|
||||
const tempRoot = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-trajectory-"));
|
||||
@@ -272,7 +272,7 @@ describe("exportTrajectoryBundle", () => {
|
||||
const outputDir = path.join(tmpDir, "bundle");
|
||||
writeSimpleSessionFile(sessionFile);
|
||||
fs.closeSync(fs.openSync(runtimeFile, "w"));
|
||||
fs.truncateSync(runtimeFile, 50 * 1024 * 1024 + 1);
|
||||
fs.truncateSync(runtimeFile, TRAJECTORY_RUNTIME_FILE_MAX_BYTES + 1);
|
||||
|
||||
await expect(
|
||||
exportTrajectoryBundle({
|
||||
|
||||
@@ -2,6 +2,7 @@ import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import { resolveHomeRelativePath } from "../infra/home-dir.js";
|
||||
|
||||
export const TRAJECTORY_RUNTIME_CAPTURE_MAX_BYTES = 10 * 1024 * 1024;
|
||||
export const TRAJECTORY_RUNTIME_FILE_MAX_BYTES = 50 * 1024 * 1024;
|
||||
export const TRAJECTORY_RUNTIME_EVENT_MAX_BYTES = 256 * 1024;
|
||||
|
||||
|
||||
@@ -88,7 +88,7 @@ describe("trajectory runtime", () => {
|
||||
expect(JSON.stringify(parsed.data)).not.toContain("sk-other-secret-token");
|
||||
});
|
||||
|
||||
it("truncates events that exceed the runtime event byte limit", () => {
|
||||
it("bounds large runtime event fields before serialization", () => {
|
||||
const writes: string[] = [];
|
||||
const recorder = createTrajectoryRuntimeRecorder({
|
||||
sessionId: "session-1",
|
||||
@@ -108,15 +108,53 @@ describe("trajectory runtime", () => {
|
||||
|
||||
expect(writes).toHaveLength(1);
|
||||
const parsed = JSON.parse(writes[0]);
|
||||
expect(parsed.data).toMatchObject({
|
||||
expect(parsed.data.prompt).toMatchObject({
|
||||
truncated: true,
|
||||
reason: "trajectory-event-size-limit",
|
||||
reason: "trajectory-field-size-limit",
|
||||
});
|
||||
expect(Buffer.byteLength(writes[0], "utf8")).toBeLessThanOrEqual(
|
||||
TRAJECTORY_RUNTIME_EVENT_MAX_BYTES + 1,
|
||||
);
|
||||
});
|
||||
|
||||
it("stops runtime capture at the file budget and records a truncation event", async () => {
|
||||
const writes: string[] = [];
|
||||
const recorder = createTrajectoryRuntimeRecorder({
|
||||
sessionId: "session-1",
|
||||
sessionFile: "/tmp/session.jsonl",
|
||||
maxRuntimeFileBytes: 900,
|
||||
writer: {
|
||||
filePath: "/tmp/session.trajectory.jsonl",
|
||||
write: (line) => {
|
||||
writes.push(line);
|
||||
},
|
||||
flush: async () => undefined,
|
||||
},
|
||||
});
|
||||
|
||||
recorder?.recordEvent("context.compiled", {
|
||||
prompt: "x".repeat(180),
|
||||
});
|
||||
recorder?.recordEvent("prompt.submitted", {
|
||||
prompt: "y".repeat(180),
|
||||
});
|
||||
recorder?.recordEvent("model.completed", {
|
||||
get prompt() {
|
||||
throw new Error("stopped recorder should not read dropped payloads");
|
||||
},
|
||||
});
|
||||
await recorder?.flush();
|
||||
|
||||
const parsed = writes.map((line) => JSON.parse(line));
|
||||
expect(parsed.map((event) => event.type)).toContain("trace.truncated");
|
||||
const truncated = parsed.find((event) => event.type === "trace.truncated");
|
||||
expect(truncated?.data).toMatchObject({
|
||||
reason: "trajectory-runtime-file-size-limit",
|
||||
limitBytes: 900,
|
||||
});
|
||||
expect(truncated?.data.droppedEvents).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
it("writes a session-adjacent pointer when using an override directory", () => {
|
||||
const tmpDir = makeTempDir();
|
||||
const sessionFile = path.join(tmpDir, "session.jsonl");
|
||||
|
||||
@@ -6,6 +6,7 @@ import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { parseBooleanValue } from "../utils/boolean.js";
|
||||
import { safeJsonStringify } from "../utils/safe-json.js";
|
||||
import {
|
||||
TRAJECTORY_RUNTIME_CAPTURE_MAX_BYTES,
|
||||
TRAJECTORY_RUNTIME_EVENT_MAX_BYTES,
|
||||
TRAJECTORY_RUNTIME_FILE_MAX_BYTES,
|
||||
resolveTrajectoryFilePath,
|
||||
@@ -15,6 +16,7 @@ import {
|
||||
import type { TrajectoryEvent, TrajectoryToolDefinition } from "./types.js";
|
||||
|
||||
export {
|
||||
TRAJECTORY_RUNTIME_CAPTURE_MAX_BYTES,
|
||||
TRAJECTORY_RUNTIME_EVENT_MAX_BYTES,
|
||||
TRAJECTORY_RUNTIME_FILE_MAX_BYTES,
|
||||
resolveTrajectoryFilePath,
|
||||
@@ -26,6 +28,7 @@ export {
|
||||
type TrajectoryRuntimeInit = {
|
||||
cfg?: OpenClawConfig;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
maxRuntimeFileBytes?: number;
|
||||
runId?: string;
|
||||
sessionId: string;
|
||||
sessionKey?: string;
|
||||
@@ -46,6 +49,11 @@ type TrajectoryRuntimeRecorder = {
|
||||
|
||||
const writers = new Map<string, QueuedFileWriter>();
|
||||
const MAX_TRAJECTORY_WRITERS = 100;
|
||||
const TRAJECTORY_RUNTIME_TRUNCATION_SENTINEL_RESERVE_BYTES = 2048;
|
||||
const TRAJECTORY_RUNTIME_DATA_STRING_MAX_CHARS = 32_768;
|
||||
const TRAJECTORY_RUNTIME_DATA_ARRAY_MAX_ITEMS = 64;
|
||||
const TRAJECTORY_RUNTIME_DATA_OBJECT_MAX_KEYS = 64;
|
||||
const TRAJECTORY_RUNTIME_DATA_MAX_DEPTH = 6;
|
||||
|
||||
function writeTrajectoryPointerBestEffort(params: {
|
||||
filePath: string;
|
||||
@@ -128,6 +136,75 @@ function truncateOversizedTrajectoryEvent(
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function truncatedTrajectoryValue(reason: string, details: Record<string, unknown> = {}): unknown {
|
||||
return {
|
||||
truncated: true,
|
||||
reason,
|
||||
...details,
|
||||
};
|
||||
}
|
||||
|
||||
function limitTrajectoryPayloadValue(
|
||||
value: unknown,
|
||||
depth = 0,
|
||||
seen: WeakSet<object> = new WeakSet(),
|
||||
): unknown {
|
||||
if (typeof value === "string") {
|
||||
if (value.length > TRAJECTORY_RUNTIME_DATA_STRING_MAX_CHARS) {
|
||||
return truncatedTrajectoryValue("trajectory-field-size-limit", {
|
||||
originalChars: value.length,
|
||||
limitChars: TRAJECTORY_RUNTIME_DATA_STRING_MAX_CHARS,
|
||||
});
|
||||
}
|
||||
return value;
|
||||
}
|
||||
if (typeof value !== "object" || value === null) {
|
||||
return value;
|
||||
}
|
||||
if (seen.has(value)) {
|
||||
return truncatedTrajectoryValue("trajectory-circular-reference");
|
||||
}
|
||||
if (depth >= TRAJECTORY_RUNTIME_DATA_MAX_DEPTH) {
|
||||
return truncatedTrajectoryValue("trajectory-depth-limit", {
|
||||
limitDepth: TRAJECTORY_RUNTIME_DATA_MAX_DEPTH,
|
||||
});
|
||||
}
|
||||
seen.add(value);
|
||||
if (Array.isArray(value)) {
|
||||
const limited = value
|
||||
.slice(0, TRAJECTORY_RUNTIME_DATA_ARRAY_MAX_ITEMS)
|
||||
.map((item) => limitTrajectoryPayloadValue(item, depth + 1, seen));
|
||||
if (value.length > TRAJECTORY_RUNTIME_DATA_ARRAY_MAX_ITEMS) {
|
||||
limited.push(
|
||||
truncatedTrajectoryValue("trajectory-array-size-limit", {
|
||||
originalLength: value.length,
|
||||
limitItems: TRAJECTORY_RUNTIME_DATA_ARRAY_MAX_ITEMS,
|
||||
}),
|
||||
);
|
||||
}
|
||||
seen.delete(value);
|
||||
return limited;
|
||||
}
|
||||
const record = value as Record<string, unknown>;
|
||||
const keys = Object.keys(record);
|
||||
const limited: Record<string, unknown> = {};
|
||||
for (const key of keys.slice(0, TRAJECTORY_RUNTIME_DATA_OBJECT_MAX_KEYS)) {
|
||||
limited[key] = limitTrajectoryPayloadValue(record[key], depth + 1, seen);
|
||||
}
|
||||
if (keys.length > TRAJECTORY_RUNTIME_DATA_OBJECT_MAX_KEYS) {
|
||||
limited._truncated = truncatedTrajectoryValue("trajectory-object-size-limit", {
|
||||
originalKeys: keys.length,
|
||||
limitKeys: TRAJECTORY_RUNTIME_DATA_OBJECT_MAX_KEYS,
|
||||
});
|
||||
}
|
||||
seen.delete(value);
|
||||
return limited;
|
||||
}
|
||||
|
||||
function sanitizeTrajectoryPayload(data: Record<string, unknown>): Record<string, unknown> {
|
||||
return sanitizeDiagnosticPayload(limitTrajectoryPayloadValue(data)) as Record<string, unknown>;
|
||||
}
|
||||
|
||||
export function toTrajectoryToolDefinitions(
|
||||
tools: ReadonlyArray<{ name?: string; description?: string; parameters?: unknown }>,
|
||||
): TrajectoryToolDefinition[] {
|
||||
@@ -141,7 +218,7 @@ export function toTrajectoryToolDefinitions(
|
||||
{
|
||||
name,
|
||||
description: tool.description,
|
||||
parameters: sanitizeDiagnosticPayload(tool.parameters),
|
||||
parameters: sanitizeDiagnosticPayload(limitTrajectoryPayloadValue(tool.parameters)),
|
||||
},
|
||||
];
|
||||
})
|
||||
@@ -167,10 +244,16 @@ export function createTrajectoryRuntimeRecorder(
|
||||
if (!params.writer) {
|
||||
trimTrajectoryWriterCache();
|
||||
}
|
||||
const maxRuntimeFileBytes = Math.max(
|
||||
1,
|
||||
Math.floor(params.maxRuntimeFileBytes ?? TRAJECTORY_RUNTIME_CAPTURE_MAX_BYTES),
|
||||
);
|
||||
const writer =
|
||||
params.writer ??
|
||||
getQueuedFileWriter(writers, filePath, {
|
||||
maxFileBytes: TRAJECTORY_RUNTIME_FILE_MAX_BYTES,
|
||||
maxFileBytes: maxRuntimeFileBytes,
|
||||
maxQueuedBytes: maxRuntimeFileBytes,
|
||||
yieldBeforeWrite: true,
|
||||
});
|
||||
writeTrajectoryPointerBestEffort({
|
||||
filePath,
|
||||
@@ -179,40 +262,97 @@ export function createTrajectoryRuntimeRecorder(
|
||||
});
|
||||
let seq = 0;
|
||||
const traceId = params.sessionId;
|
||||
const sentinelReserveBytes = Math.min(
|
||||
TRAJECTORY_RUNTIME_TRUNCATION_SENTINEL_RESERVE_BYTES,
|
||||
Math.floor(maxRuntimeFileBytes / 2),
|
||||
);
|
||||
const normalEventLimitBytes = Math.max(1, maxRuntimeFileBytes - sentinelReserveBytes);
|
||||
let acceptedRuntimeBytes = 0;
|
||||
let droppedEvents = 0;
|
||||
let droppedEventBytes = 0;
|
||||
let captureStopped = false;
|
||||
|
||||
const writeBoundedLine = (line: string, options: { reserveSentinel: boolean }): boolean => {
|
||||
const jsonlLine = `${line}\n`;
|
||||
const lineBytes = Buffer.byteLength(jsonlLine, "utf8");
|
||||
const limitBytes = options.reserveSentinel ? normalEventLimitBytes : maxRuntimeFileBytes;
|
||||
if (acceptedRuntimeBytes + lineBytes > limitBytes) {
|
||||
captureStopped = true;
|
||||
droppedEvents += 1;
|
||||
droppedEventBytes += lineBytes;
|
||||
return false;
|
||||
}
|
||||
const result = writer.write(jsonlLine);
|
||||
if (result === "dropped") {
|
||||
captureStopped = true;
|
||||
droppedEvents += 1;
|
||||
droppedEventBytes += lineBytes;
|
||||
return false;
|
||||
}
|
||||
acceptedRuntimeBytes += lineBytes;
|
||||
return true;
|
||||
};
|
||||
|
||||
const buildEventLine = (type: string, data?: Record<string, unknown>): string | undefined => {
|
||||
const nextSeq = seq + 1;
|
||||
const event: TrajectoryEvent = {
|
||||
traceSchema: "openclaw-trajectory",
|
||||
schemaVersion: 1,
|
||||
traceId,
|
||||
source: "runtime",
|
||||
type,
|
||||
ts: new Date().toISOString(),
|
||||
seq: nextSeq,
|
||||
sourceSeq: nextSeq,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
runId: params.runId,
|
||||
workspaceDir: params.workspaceDir,
|
||||
provider: params.provider,
|
||||
modelId: params.modelId,
|
||||
modelApi: params.modelApi,
|
||||
data: data ? sanitizeTrajectoryPayload(data) : undefined,
|
||||
};
|
||||
const line = safeJsonStringify(event);
|
||||
if (!line) {
|
||||
return undefined;
|
||||
}
|
||||
const boundedLine = truncateOversizedTrajectoryEvent(event, line);
|
||||
if (!boundedLine) {
|
||||
return undefined;
|
||||
}
|
||||
seq = nextSeq;
|
||||
return boundedLine;
|
||||
};
|
||||
|
||||
return {
|
||||
enabled: true,
|
||||
filePath,
|
||||
recordEvent: (type, data) => {
|
||||
const event: TrajectoryEvent = {
|
||||
traceSchema: "openclaw-trajectory",
|
||||
schemaVersion: 1,
|
||||
traceId,
|
||||
source: "runtime",
|
||||
type,
|
||||
ts: new Date().toISOString(),
|
||||
seq: (seq += 1),
|
||||
sourceSeq: seq,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
runId: params.runId,
|
||||
workspaceDir: params.workspaceDir,
|
||||
provider: params.provider,
|
||||
modelId: params.modelId,
|
||||
modelApi: params.modelApi,
|
||||
data: data ? (sanitizeDiagnosticPayload(data) as Record<string, unknown>) : undefined,
|
||||
};
|
||||
const line = safeJsonStringify(event);
|
||||
if (captureStopped) {
|
||||
droppedEvents += 1;
|
||||
return;
|
||||
}
|
||||
const line = buildEventLine(type, data);
|
||||
if (!line) {
|
||||
return;
|
||||
}
|
||||
const boundedLine = truncateOversizedTrajectoryEvent(event, line);
|
||||
if (!boundedLine) {
|
||||
return;
|
||||
}
|
||||
writer.write(`${boundedLine}\n`);
|
||||
writeBoundedLine(line, { reserveSentinel: true });
|
||||
},
|
||||
flush: async () => {
|
||||
if (droppedEvents > 0) {
|
||||
const line = buildEventLine("trace.truncated", {
|
||||
reason: "trajectory-runtime-file-size-limit",
|
||||
droppedEvents,
|
||||
droppedEventBytes,
|
||||
limitBytes: maxRuntimeFileBytes,
|
||||
});
|
||||
if (line) {
|
||||
writeBoundedLine(line, { reserveSentinel: false });
|
||||
}
|
||||
droppedEvents = 0;
|
||||
droppedEventBytes = 0;
|
||||
}
|
||||
await writer.flush();
|
||||
if (!params.writer) {
|
||||
writers.delete(filePath);
|
||||
|
||||
Reference in New Issue
Block a user