From 474bea162b4d64a7295e851adf0ce99dcdd88d35 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 4 May 2026 09:48:03 +0100 Subject: [PATCH] fix: bound trajectory runtime flush (#77154) * fix: bound trajectory runtime flush * fix: keep trajectory export cap compatible * test: keep followup delivery test pure --- CHANGELOG.md | 1 + docs/tools/trajectory.md | 2 +- src/agents/queued-file-writer.test.ts | 12 ++ src/agents/queued-file-writer.ts | 30 ++- .../reply/followup-delivery.test.ts | 6 +- src/trajectory/export.test.ts | 4 +- src/trajectory/paths.ts | 1 + src/trajectory/runtime.test.ts | 44 +++- src/trajectory/runtime.ts | 192 +++++++++++++++--- 9 files changed, 256 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c1d228281b6..8f49ef25d00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -85,6 +85,7 @@ Docs: https://docs.openclaw.ai - Google Meet: fork the caller's current agent transcript into agent-mode meeting consultant sessions, so Meet replies inherit the context from the tool call that joined the meeting. - iOS/mobile pairing: reject non-loopback `ws://` setup URLs before QR/setup-code issuance and let the iOS Gateway settings screen scan QR codes or paste full setup-code messages. Thanks @BunsDev. - Control UI: keep Gateway Access inputs and locale picker contained inside the card at narrow and tablet widths. +- Agents/trajectory: bound runtime trajectory capture and yield queued sidecar writes so oversized traces stop recording instead of monopolizing Gateway cleanup. Fixes #77124. Thanks @loyur. - Telegram/streaming: sanitize tool-progress draft preview backticks before shared compaction, so long backtick-heavy progress text still renders inside the safe code-formatted preview instead of collapsing to an ellipsis. - UI/chat: remove the unsupported `line-clamp` declaration from the chat queue text rule to eliminate Firefox console noise without changing visible truncation behavior. Thanks @ZanderH-code. - Agents/Pi: suppress persistence for synthetic mid-turn overflow continuation prompts, so transcript-retry recovery does not write the "continue from transcript" prompt as a new user turn. Thanks @vincentkoc. diff --git a/docs/tools/trajectory.md b/docs/tools/trajectory.md index 27699d9ee04..b6f8eb0598a 100644 --- a/docs/tools/trajectory.md +++ b/docs/tools/trajectory.md @@ -181,7 +181,7 @@ OpenClaw redacts sensitive values before writing export files: The exporter also bounds input size: -- runtime sidecar files: 50 MiB +- runtime sidecar files: live capture stops at 10 MiB and records a truncation event when space remains; export accepts existing runtime sidecars up to 50 MiB - session files: 50 MiB - runtime events: 200,000 - total exported events: 250,000 diff --git a/src/agents/queued-file-writer.test.ts b/src/agents/queued-file-writer.test.ts index 6486d23234c..8a23f8ec1f0 100644 --- a/src/agents/queued-file-writer.test.ts +++ b/src/agents/queued-file-writer.test.ts @@ -80,4 +80,16 @@ describe("getQueuedFileWriter", () => { expect(fs.readFileSync(filePath, "utf8")).toBe("12345\n"); }); + + it("drops writes that would exceed the pending queue cap", async () => { + const tmpDir = makeTempDir(); + const filePath = path.join(tmpDir, "trace.jsonl"); + const writer = getQueuedFileWriter(new Map(), filePath, { maxQueuedBytes: 6 }); + + expect(writer.write("12345\n")).toBe("queued"); + expect(writer.write("after\n")).toBe("dropped"); + await writer.flush(); + + expect(fs.readFileSync(filePath, "utf8")).toBe("12345\n"); + }); }); diff --git a/src/agents/queued-file-writer.ts b/src/agents/queued-file-writer.ts index f6e8616c077..f02e5d9ef96 100644 --- a/src/agents/queued-file-writer.ts +++ b/src/agents/queued-file-writer.ts @@ -2,14 +2,18 @@ import nodeFs from "node:fs"; import fs from "node:fs/promises"; import path from "node:path"; +export type QueuedFileWriteResult = "queued" | "dropped"; + export type QueuedFileWriter = { filePath: string; - write: (line: string) => void; + write: (line: string) => unknown; flush: () => Promise; }; type QueuedFileWriterOptions = { maxFileBytes?: number; + maxQueuedBytes?: number; + yieldBeforeWrite?: boolean; }; type QueuedFileAppendFlagConstants = Pick< @@ -111,6 +115,12 @@ async function safeAppendFile( } } +function waitForImmediate(): Promise { + return new Promise((resolve) => { + setImmediate(resolve); + }); +} + export function getQueuedFileWriter( writers: Map, filePath: string, @@ -123,15 +133,29 @@ export function getQueuedFileWriter( const dir = path.dirname(filePath); const ready = fs.mkdir(dir, { recursive: true, mode: 0o700 }).catch(() => undefined); - let queue = Promise.resolve(); + let queue: Promise = Promise.resolve(); + let queuedBytes = 0; const writer: QueuedFileWriter = { filePath, write: (line: string) => { + const lineBytes = Buffer.byteLength(line, "utf8"); + if ( + options.maxQueuedBytes !== undefined && + queuedBytes + lineBytes > options.maxQueuedBytes + ) { + return "dropped"; + } + queuedBytes += lineBytes; queue = queue .then(() => ready) + .then(() => (options.yieldBeforeWrite ? waitForImmediate() : undefined)) .then(() => safeAppendFile(filePath, line, options)) - .catch(() => undefined); + .catch(() => undefined) + .finally(() => { + queuedBytes = Math.max(0, queuedBytes - lineBytes); + }); + return "queued"; }, flush: async () => { await queue; diff --git a/src/auto-reply/reply/followup-delivery.test.ts b/src/auto-reply/reply/followup-delivery.test.ts index 59cf07f42a9..64cdbb8133c 100644 --- a/src/auto-reply/reply/followup-delivery.test.ts +++ b/src/auto-reply/reply/followup-delivery.test.ts @@ -1,7 +1,11 @@ -import { describe, expect, it } from "vitest"; +import { describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../../config/config.js"; import { resolveFollowupDeliveryPayloads } from "./followup-delivery.js"; +vi.mock("../../channels/plugins/index.js", () => ({ + getChannelPlugin: () => undefined, +})); + const baseConfig = {} as OpenClawConfig; describe("resolveFollowupDeliveryPayloads", () => { diff --git a/src/trajectory/export.test.ts b/src/trajectory/export.test.ts index b3b23c6ec25..c6e786788b4 100644 --- a/src/trajectory/export.test.ts +++ b/src/trajectory/export.test.ts @@ -4,7 +4,7 @@ import path from "node:path"; import type { Message, Usage } from "@mariozechner/pi-ai"; import { afterAll, describe, expect, it } from "vitest"; import { exportTrajectoryBundle, resolveDefaultTrajectoryExportDir } from "./export.js"; -import { resolveTrajectoryPointerFilePath } from "./paths.js"; +import { TRAJECTORY_RUNTIME_FILE_MAX_BYTES, resolveTrajectoryPointerFilePath } from "./paths.js"; import type { TrajectoryEvent } from "./types.js"; const tempRoot = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-trajectory-")); @@ -272,7 +272,7 @@ describe("exportTrajectoryBundle", () => { const outputDir = path.join(tmpDir, "bundle"); writeSimpleSessionFile(sessionFile); fs.closeSync(fs.openSync(runtimeFile, "w")); - fs.truncateSync(runtimeFile, 50 * 1024 * 1024 + 1); + fs.truncateSync(runtimeFile, TRAJECTORY_RUNTIME_FILE_MAX_BYTES + 1); await expect( exportTrajectoryBundle({ diff --git a/src/trajectory/paths.ts b/src/trajectory/paths.ts index 76915989671..d2b3033830a 100644 --- a/src/trajectory/paths.ts +++ b/src/trajectory/paths.ts @@ -2,6 +2,7 @@ import fs from "node:fs"; import path from "node:path"; import { resolveHomeRelativePath } from "../infra/home-dir.js"; +export const TRAJECTORY_RUNTIME_CAPTURE_MAX_BYTES = 10 * 1024 * 1024; export const TRAJECTORY_RUNTIME_FILE_MAX_BYTES = 50 * 1024 * 1024; export const TRAJECTORY_RUNTIME_EVENT_MAX_BYTES = 256 * 1024; diff --git a/src/trajectory/runtime.test.ts b/src/trajectory/runtime.test.ts index c364b30dfb8..63723c76d22 100644 --- a/src/trajectory/runtime.test.ts +++ b/src/trajectory/runtime.test.ts @@ -88,7 +88,7 @@ describe("trajectory runtime", () => { expect(JSON.stringify(parsed.data)).not.toContain("sk-other-secret-token"); }); - it("truncates events that exceed the runtime event byte limit", () => { + it("bounds large runtime event fields before serialization", () => { const writes: string[] = []; const recorder = createTrajectoryRuntimeRecorder({ sessionId: "session-1", @@ -108,15 +108,53 @@ describe("trajectory runtime", () => { expect(writes).toHaveLength(1); const parsed = JSON.parse(writes[0]); - expect(parsed.data).toMatchObject({ + expect(parsed.data.prompt).toMatchObject({ truncated: true, - reason: "trajectory-event-size-limit", + reason: "trajectory-field-size-limit", }); expect(Buffer.byteLength(writes[0], "utf8")).toBeLessThanOrEqual( TRAJECTORY_RUNTIME_EVENT_MAX_BYTES + 1, ); }); + it("stops runtime capture at the file budget and records a truncation event", async () => { + const writes: string[] = []; + const recorder = createTrajectoryRuntimeRecorder({ + sessionId: "session-1", + sessionFile: "/tmp/session.jsonl", + maxRuntimeFileBytes: 900, + writer: { + filePath: "/tmp/session.trajectory.jsonl", + write: (line) => { + writes.push(line); + }, + flush: async () => undefined, + }, + }); + + recorder?.recordEvent("context.compiled", { + prompt: "x".repeat(180), + }); + recorder?.recordEvent("prompt.submitted", { + prompt: "y".repeat(180), + }); + recorder?.recordEvent("model.completed", { + get prompt() { + throw new Error("stopped recorder should not read dropped payloads"); + }, + }); + await recorder?.flush(); + + const parsed = writes.map((line) => JSON.parse(line)); + expect(parsed.map((event) => event.type)).toContain("trace.truncated"); + const truncated = parsed.find((event) => event.type === "trace.truncated"); + expect(truncated?.data).toMatchObject({ + reason: "trajectory-runtime-file-size-limit", + limitBytes: 900, + }); + expect(truncated?.data.droppedEvents).toBeGreaterThan(0); + }); + it("writes a session-adjacent pointer when using an override directory", () => { const tmpDir = makeTempDir(); const sessionFile = path.join(tmpDir, "session.jsonl"); diff --git a/src/trajectory/runtime.ts b/src/trajectory/runtime.ts index af2b5e1f93f..e714cd30ac2 100644 --- a/src/trajectory/runtime.ts +++ b/src/trajectory/runtime.ts @@ -6,6 +6,7 @@ import type { OpenClawConfig } from "../config/types.openclaw.js"; import { parseBooleanValue } from "../utils/boolean.js"; import { safeJsonStringify } from "../utils/safe-json.js"; import { + TRAJECTORY_RUNTIME_CAPTURE_MAX_BYTES, TRAJECTORY_RUNTIME_EVENT_MAX_BYTES, TRAJECTORY_RUNTIME_FILE_MAX_BYTES, resolveTrajectoryFilePath, @@ -15,6 +16,7 @@ import { import type { TrajectoryEvent, TrajectoryToolDefinition } from "./types.js"; export { + TRAJECTORY_RUNTIME_CAPTURE_MAX_BYTES, TRAJECTORY_RUNTIME_EVENT_MAX_BYTES, TRAJECTORY_RUNTIME_FILE_MAX_BYTES, resolveTrajectoryFilePath, @@ -26,6 +28,7 @@ export { type TrajectoryRuntimeInit = { cfg?: OpenClawConfig; env?: NodeJS.ProcessEnv; + maxRuntimeFileBytes?: number; runId?: string; sessionId: string; sessionKey?: string; @@ -46,6 +49,11 @@ type TrajectoryRuntimeRecorder = { const writers = new Map(); const MAX_TRAJECTORY_WRITERS = 100; +const TRAJECTORY_RUNTIME_TRUNCATION_SENTINEL_RESERVE_BYTES = 2048; +const TRAJECTORY_RUNTIME_DATA_STRING_MAX_CHARS = 32_768; +const TRAJECTORY_RUNTIME_DATA_ARRAY_MAX_ITEMS = 64; +const TRAJECTORY_RUNTIME_DATA_OBJECT_MAX_KEYS = 64; +const TRAJECTORY_RUNTIME_DATA_MAX_DEPTH = 6; function writeTrajectoryPointerBestEffort(params: { filePath: string; @@ -128,6 +136,75 @@ function truncateOversizedTrajectoryEvent( return undefined; } +function truncatedTrajectoryValue(reason: string, details: Record = {}): unknown { + return { + truncated: true, + reason, + ...details, + }; +} + +function limitTrajectoryPayloadValue( + value: unknown, + depth = 0, + seen: WeakSet = new WeakSet(), +): unknown { + if (typeof value === "string") { + if (value.length > TRAJECTORY_RUNTIME_DATA_STRING_MAX_CHARS) { + return truncatedTrajectoryValue("trajectory-field-size-limit", { + originalChars: value.length, + limitChars: TRAJECTORY_RUNTIME_DATA_STRING_MAX_CHARS, + }); + } + return value; + } + if (typeof value !== "object" || value === null) { + return value; + } + if (seen.has(value)) { + return truncatedTrajectoryValue("trajectory-circular-reference"); + } + if (depth >= TRAJECTORY_RUNTIME_DATA_MAX_DEPTH) { + return truncatedTrajectoryValue("trajectory-depth-limit", { + limitDepth: TRAJECTORY_RUNTIME_DATA_MAX_DEPTH, + }); + } + seen.add(value); + if (Array.isArray(value)) { + const limited = value + .slice(0, TRAJECTORY_RUNTIME_DATA_ARRAY_MAX_ITEMS) + .map((item) => limitTrajectoryPayloadValue(item, depth + 1, seen)); + if (value.length > TRAJECTORY_RUNTIME_DATA_ARRAY_MAX_ITEMS) { + limited.push( + truncatedTrajectoryValue("trajectory-array-size-limit", { + originalLength: value.length, + limitItems: TRAJECTORY_RUNTIME_DATA_ARRAY_MAX_ITEMS, + }), + ); + } + seen.delete(value); + return limited; + } + const record = value as Record; + const keys = Object.keys(record); + const limited: Record = {}; + for (const key of keys.slice(0, TRAJECTORY_RUNTIME_DATA_OBJECT_MAX_KEYS)) { + limited[key] = limitTrajectoryPayloadValue(record[key], depth + 1, seen); + } + if (keys.length > TRAJECTORY_RUNTIME_DATA_OBJECT_MAX_KEYS) { + limited._truncated = truncatedTrajectoryValue("trajectory-object-size-limit", { + originalKeys: keys.length, + limitKeys: TRAJECTORY_RUNTIME_DATA_OBJECT_MAX_KEYS, + }); + } + seen.delete(value); + return limited; +} + +function sanitizeTrajectoryPayload(data: Record): Record { + return sanitizeDiagnosticPayload(limitTrajectoryPayloadValue(data)) as Record; +} + export function toTrajectoryToolDefinitions( tools: ReadonlyArray<{ name?: string; description?: string; parameters?: unknown }>, ): TrajectoryToolDefinition[] { @@ -141,7 +218,7 @@ export function toTrajectoryToolDefinitions( { name, description: tool.description, - parameters: sanitizeDiagnosticPayload(tool.parameters), + parameters: sanitizeDiagnosticPayload(limitTrajectoryPayloadValue(tool.parameters)), }, ]; }) @@ -167,10 +244,16 @@ export function createTrajectoryRuntimeRecorder( if (!params.writer) { trimTrajectoryWriterCache(); } + const maxRuntimeFileBytes = Math.max( + 1, + Math.floor(params.maxRuntimeFileBytes ?? TRAJECTORY_RUNTIME_CAPTURE_MAX_BYTES), + ); const writer = params.writer ?? getQueuedFileWriter(writers, filePath, { - maxFileBytes: TRAJECTORY_RUNTIME_FILE_MAX_BYTES, + maxFileBytes: maxRuntimeFileBytes, + maxQueuedBytes: maxRuntimeFileBytes, + yieldBeforeWrite: true, }); writeTrajectoryPointerBestEffort({ filePath, @@ -179,40 +262,97 @@ export function createTrajectoryRuntimeRecorder( }); let seq = 0; const traceId = params.sessionId; + const sentinelReserveBytes = Math.min( + TRAJECTORY_RUNTIME_TRUNCATION_SENTINEL_RESERVE_BYTES, + Math.floor(maxRuntimeFileBytes / 2), + ); + const normalEventLimitBytes = Math.max(1, maxRuntimeFileBytes - sentinelReserveBytes); + let acceptedRuntimeBytes = 0; + let droppedEvents = 0; + let droppedEventBytes = 0; + let captureStopped = false; + + const writeBoundedLine = (line: string, options: { reserveSentinel: boolean }): boolean => { + const jsonlLine = `${line}\n`; + const lineBytes = Buffer.byteLength(jsonlLine, "utf8"); + const limitBytes = options.reserveSentinel ? normalEventLimitBytes : maxRuntimeFileBytes; + if (acceptedRuntimeBytes + lineBytes > limitBytes) { + captureStopped = true; + droppedEvents += 1; + droppedEventBytes += lineBytes; + return false; + } + const result = writer.write(jsonlLine); + if (result === "dropped") { + captureStopped = true; + droppedEvents += 1; + droppedEventBytes += lineBytes; + return false; + } + acceptedRuntimeBytes += lineBytes; + return true; + }; + + const buildEventLine = (type: string, data?: Record): string | undefined => { + const nextSeq = seq + 1; + const event: TrajectoryEvent = { + traceSchema: "openclaw-trajectory", + schemaVersion: 1, + traceId, + source: "runtime", + type, + ts: new Date().toISOString(), + seq: nextSeq, + sourceSeq: nextSeq, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + runId: params.runId, + workspaceDir: params.workspaceDir, + provider: params.provider, + modelId: params.modelId, + modelApi: params.modelApi, + data: data ? sanitizeTrajectoryPayload(data) : undefined, + }; + const line = safeJsonStringify(event); + if (!line) { + return undefined; + } + const boundedLine = truncateOversizedTrajectoryEvent(event, line); + if (!boundedLine) { + return undefined; + } + seq = nextSeq; + return boundedLine; + }; return { enabled: true, filePath, recordEvent: (type, data) => { - const event: TrajectoryEvent = { - traceSchema: "openclaw-trajectory", - schemaVersion: 1, - traceId, - source: "runtime", - type, - ts: new Date().toISOString(), - seq: (seq += 1), - sourceSeq: seq, - sessionId: params.sessionId, - sessionKey: params.sessionKey, - runId: params.runId, - workspaceDir: params.workspaceDir, - provider: params.provider, - modelId: params.modelId, - modelApi: params.modelApi, - data: data ? (sanitizeDiagnosticPayload(data) as Record) : undefined, - }; - const line = safeJsonStringify(event); + if (captureStopped) { + droppedEvents += 1; + return; + } + const line = buildEventLine(type, data); if (!line) { return; } - const boundedLine = truncateOversizedTrajectoryEvent(event, line); - if (!boundedLine) { - return; - } - writer.write(`${boundedLine}\n`); + writeBoundedLine(line, { reserveSentinel: true }); }, flush: async () => { + if (droppedEvents > 0) { + const line = buildEventLine("trace.truncated", { + reason: "trajectory-runtime-file-size-limit", + droppedEvents, + droppedEventBytes, + limitBytes: maxRuntimeFileBytes, + }); + if (line) { + writeBoundedLine(line, { reserveSentinel: false }); + } + droppedEvents = 0; + droppedEventBytes = 0; + } await writer.flush(); if (!params.writer) { writers.delete(filePath);