fix(diagnostics): bound diagnostic buffers

This commit is contained in:
Vincent Koc
2026-05-22 11:15:46 +08:00
parent c21ca883b0
commit bdcaac06c6
10 changed files with 291 additions and 10 deletions

View File

@@ -88,6 +88,7 @@ Docs: https://docs.openclaw.ai
- Channels: treat bare abort messages such as `stop`, `abort`, and `wait` as immediate control commands in inbound debounce paths so stop requests are not delayed behind pending message coalescing. (#83348) Thanks @IWhatsskill.
- Channels/message tool: resolve configured external channel plugins during in-agent channel selection, so `openclaw agent --local` message-tool sends no longer report an available channel as unavailable. (#85022) Thanks @Kaspre.
- Gateway/ACP: close child ACP sessions spawned via `sessions_spawn` when their parent session is reset or deleted, instead of leaving orphaned `claude-agent-acp` processes that accumulate and exhaust memory. Fixes #68916. (#85190) Thanks @openperf.
- Diagnostics: bound cleanup timeout detail logs and emit drop summaries when async diagnostic bursts exceed the queue cap.
- Agents/subagents: surface blocked child-run completions as errors instead of successful subagent finishes. (#80886) Thanks @TurboTheTurtle.
- Agents/Pi: treat accepted embedded `sessions_spawn` child-session handoffs as terminal progress so parent turns no longer report false non-deliverable failures. (#85054) Thanks @samzong.
- CLI/models: resolve `openclaw models set` aliases from the runtime config while keeping authored aliases ahead of runtime-only defaults. (#83262) Thanks @IWhatsskill.

View File

@@ -1,2 +1,2 @@
b3105c70370edd21c77b943b8c34f5d7ea99df2a92eaf3871f36016823579ffe plugin-sdk-api-baseline.json
b0fe23ab4862aa667111a3b433e42faed77d4b7126e9db974b1a00a298232b85 plugin-sdk-api-baseline.jsonl
bc51139688a48ac217ee4c03b2d76bf4e5b87346c2dbbc0442bf8b3fb72c746b plugin-sdk-api-baseline.json
0e681f44ebec6d16f1898ded7123ea5c608ab520d479fd905d8653f8aca1f008 plugin-sdk-api-baseline.jsonl

View File

@@ -1,6 +1,7 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
AGENT_CLEANUP_STEP_TIMEOUT_MS,
CLEANUP_TIMEOUT_DETAILS_MAX_CHARS,
resolveAgentCleanupStepTimeoutMs,
runAgentCleanupStep,
} from "./run-cleanup-timeout.js";
@@ -86,6 +87,34 @@ describe("agent cleanup timeout", () => {
);
});
it("bounds cleanup timeout details before logging", async () => {
const cleanup = vi.fn(async () => new Promise<never>(() => {}));
const oversizedDetails = `queuedBytes=${"9".repeat(CLEANUP_TIMEOUT_DETAILS_MAX_CHARS * 2)}`;
const result = runAgentCleanupStep({
runId: "run-trajectory",
sessionId: "session-trajectory",
step: "pi-trajectory-flush",
cleanup,
log,
timeoutMs: 5,
getTimeoutDetails: () => oversizedDetails,
});
await vi.advanceTimersByTimeAsync(5);
await expect(result).resolves.toBeUndefined();
const message = String(log.warn.mock.calls.at(-1)?.[0] ?? "");
expect(message).toContain(" details=queuedBytes=");
expect(message).toContain("...[truncated]");
expect(message.length).toBeLessThan(
"agent cleanup timed out: runId=run-trajectory sessionId=session-trajectory step=pi-trajectory-flush timeoutMs=5 details="
.length +
CLEANUP_TIMEOUT_DETAILS_MAX_CHARS +
1,
);
});
it("does not fail cleanup when timeout details throw", async () => {
const cleanup = vi.fn(async () => new Promise<never>(() => {}));
@@ -109,6 +138,35 @@ describe("agent cleanup timeout", () => {
);
});
it("bounds cleanup timeout detail errors before logging", async () => {
const cleanup = vi.fn(async () => new Promise<never>(() => {}));
const result = runAgentCleanupStep({
runId: "run-trajectory",
sessionId: "session-trajectory",
step: "pi-trajectory-flush",
cleanup,
log,
timeoutMs: 5,
getTimeoutDetails: () => {
throw new Error("details unavailable ".repeat(CLEANUP_TIMEOUT_DETAILS_MAX_CHARS));
},
});
await vi.advanceTimersByTimeAsync(5);
await expect(result).resolves.toBeUndefined();
const message = String(log.warn.mock.calls.at(-1)?.[0] ?? "");
expect(message).toContain(" detailsError=details unavailable");
expect(message).toContain("...[truncated]");
expect(message.length).toBeLessThan(
"agent cleanup timed out: runId=run-trajectory sessionId=session-trajectory step=pi-trajectory-flush timeoutMs=5 detailsError="
.length +
CLEANUP_TIMEOUT_DETAILS_MAX_CHARS +
1,
);
});
it("uses the general cleanup timeout environment override for other cleanup steps", async () => {
const cleanup = vi.fn(async () => new Promise<never>(() => {}));

View File

@@ -3,6 +3,9 @@ import { formatErrorMessage } from "../infra/errors.js";
export const AGENT_CLEANUP_STEP_TIMEOUT_MS = 10_000;
export const AGENT_CLEANUP_STEP_TIMEOUT_ENV = "OPENCLAW_AGENT_CLEANUP_TIMEOUT_MS";
export const TRAJECTORY_FLUSH_TIMEOUT_ENV = "OPENCLAW_TRAJECTORY_FLUSH_TIMEOUT_MS";
export const CLEANUP_TIMEOUT_DETAILS_MAX_CHARS = 512;
const CLEANUP_TIMEOUT_DETAILS_TRUNCATED_SUFFIX = "...[truncated]";
type AgentCleanupLogger = {
warn: (message: string) => void;
@@ -33,12 +36,23 @@ function resolveCleanupTimeoutDetails(
): string {
try {
const timeoutDetails = getTimeoutDetails?.()?.trim();
return timeoutDetails ? ` details=${timeoutDetails}` : "";
return timeoutDetails ? ` details=${truncateCleanupTimeoutDetails(timeoutDetails)}` : "";
} catch (error) {
return ` detailsError=${formatErrorMessage(error)}`;
return ` detailsError=${truncateCleanupTimeoutDetails(formatErrorMessage(error))}`;
}
}
function truncateCleanupTimeoutDetails(value: string): string {
if (value.length <= CLEANUP_TIMEOUT_DETAILS_MAX_CHARS) {
return value;
}
const prefixLength = Math.max(
0,
CLEANUP_TIMEOUT_DETAILS_MAX_CHARS - CLEANUP_TIMEOUT_DETAILS_TRUNCATED_SUFFIX.length,
);
return `${value.slice(0, prefixLength)}${CLEANUP_TIMEOUT_DETAILS_TRUNCATED_SUFFIX}`;
}
export function resolveAgentCleanupStepTimeoutMs(params: {
step: string;
timeoutMs?: number;

View File

@@ -211,6 +211,9 @@ function formatStabilityEvent(record: DiagnosticStabilityEventRecord): string {
record.bytes !== undefined ? `bytes=${formatBytes(record.bytes)}` : "",
record.limitBytes !== undefined ? `limit=${formatBytes(record.limitBytes)}` : "",
record.queueDepth !== undefined ? `queueDepth=${record.queueDepth}` : "",
record.queueLength !== undefined ? `queueLength=${record.queueLength}` : "",
record.droppedEvents !== undefined ? `dropped=${record.droppedEvents}` : "",
record.maxQueueLength !== undefined ? `maxQueue=${record.maxQueueLength}` : "",
record.queued !== undefined ? `queued=${record.queued}` : "",
record.memory ? `rss=${formatBytes(record.memory.rssBytes)}` : "",
record.memory ? `heap=${formatBytes(record.memory.heapUsedBytes)}` : "",

View File

@@ -636,6 +636,40 @@ describe("diagnostic-events", () => {
expect(events.filter((event) => event.type === "model.call.started")).toHaveLength(9_998);
});
it("emits a bounded summary when async diagnostics are dropped at saturation", async () => {
const events: DiagnosticEventPayload[] = [];
onDiagnosticEvent((event) => {
events.push(event);
});
for (let index = 0; index < 10_001; index += 1) {
emitDiagnosticEvent({
type: "model.call.started",
runId: `drop-run-${index}`,
callId: `drop-call-${index}`,
provider: "openai",
model: "gpt-5.4",
});
}
await waitForDiagnosticEventsDrained();
const dropSummary = events.find(
(
event,
): event is Extract<DiagnosticEventPayload, { type: "diagnostic.async_queue.dropped" }> =>
event.type === "diagnostic.async_queue.dropped",
);
expect(dropSummary).toMatchObject({
type: "diagnostic.async_queue.dropped",
droppedEvents: 1,
droppedUntrustedEvents: 1,
maxQueueLength: 10_000,
drainBatchSize: 100,
});
expect(events.filter((event) => event.type === "model.call.started")).toHaveLength(10_000);
});
it("keeps log records off the public diagnostic event stream", async () => {
const publicEvents: string[] = [];
const internalEvents: string[] = [];

View File

@@ -612,6 +612,17 @@ export type DiagnosticTelemetryExporterEvent = DiagnosticBaseEvent & {
errorCategory?: string;
};
export type DiagnosticAsyncQueueDroppedEvent = DiagnosticBaseEvent & {
type: "diagnostic.async_queue.dropped";
droppedEvents: number;
droppedTrustedEvents?: number;
droppedUntrustedEvents?: number;
droppedPriorityEvents?: number;
queueLength: number;
maxQueueLength: number;
drainBatchSize: number;
};
export type DiagnosticEventPayload =
| DiagnosticUsageEvent
| DiagnosticWebhookReceivedEvent
@@ -660,6 +671,7 @@ export type DiagnosticEventPayload =
| DiagnosticPayloadLargeEvent
| DiagnosticLogRecordEvent
| DiagnosticTelemetryExporterEvent
| DiagnosticAsyncQueueDroppedEvent
| DiagnosticFailoverEvent;
export type DiagnosticEventInput = DiagnosticEventPayload extends infer Event
@@ -690,6 +702,10 @@ type DiagnosticEventsGlobalState = {
dispatchDepth: number;
asyncQueue: QueuedDiagnosticEvent[];
asyncDrainScheduled: boolean;
asyncDroppedEvents: number;
asyncDroppedTrustedEvents: number;
asyncDroppedUntrustedEvents: number;
asyncDroppedPriorityEvents: number;
};
const MAX_ASYNC_DIAGNOSTIC_EVENTS = 10_000;
@@ -731,6 +747,10 @@ function createDiagnosticEventsState(): DiagnosticEventsGlobalState {
dispatchDepth: 0,
asyncQueue: [],
asyncDrainScheduled: false,
asyncDroppedEvents: 0,
asyncDroppedTrustedEvents: 0,
asyncDroppedUntrustedEvents: 0,
asyncDroppedPriorityEvents: 0,
};
}
@@ -754,6 +774,10 @@ function getDiagnosticEventsState(): DiagnosticEventsGlobalState {
const globalRecord = globalThis as Record<PropertyKey, unknown>;
const existing = globalRecord[DIAGNOSTIC_EVENTS_STATE_KEY];
if (isDiagnosticEventsState(existing)) {
existing.asyncDroppedEvents ??= 0;
existing.asyncDroppedTrustedEvents ??= 0;
existing.asyncDroppedUntrustedEvents ??= 0;
existing.asyncDroppedPriorityEvents ??= 0;
return existing;
}
const state = createDiagnosticEventsState();
@@ -834,15 +858,31 @@ function isPriorityAsyncDiagnosticEvent(entry: QueuedDiagnosticEvent): boolean {
return entry.metadata.trusted && PRIORITY_ASYNC_DIAGNOSTIC_EVENT_TYPES.has(entry.event.type);
}
function makeRoomForPriorityAsyncDiagnosticEvent(state: DiagnosticEventsGlobalState): void {
function noteAsyncDiagnosticDrop(
state: DiagnosticEventsGlobalState,
entry: QueuedDiagnosticEvent,
): void {
state.asyncDroppedEvents += 1;
if (entry.metadata.trusted) {
state.asyncDroppedTrustedEvents += 1;
} else {
state.asyncDroppedUntrustedEvents += 1;
}
if (isPriorityAsyncDiagnosticEvent(entry)) {
state.asyncDroppedPriorityEvents += 1;
}
}
function makeRoomForPriorityAsyncDiagnosticEvent(
state: DiagnosticEventsGlobalState,
): QueuedDiagnosticEvent | undefined {
const nonPriorityIndex = state.asyncQueue.findIndex(
(entry) => !isPriorityAsyncDiagnosticEvent(entry),
);
if (nonPriorityIndex >= 0) {
state.asyncQueue.splice(nonPriorityIndex, 1);
return;
return state.asyncQueue.splice(nonPriorityIndex, 1)[0];
}
state.asyncQueue.shift();
return state.asyncQueue.shift();
}
function deepFreezeDiagnosticValue(value: unknown, seen = new WeakSet<object>()): unknown {
@@ -878,10 +918,37 @@ function scheduleAsyncDiagnosticDrain(state: DiagnosticEventsGlobalState): void
}
if (state.asyncQueue.length > 0) {
scheduleAsyncDiagnosticDrain(state);
return;
}
dispatchAsyncDiagnosticDropSummary(state);
});
}
function dispatchAsyncDiagnosticDropSummary(state: DiagnosticEventsGlobalState): void {
if (state.asyncDroppedEvents <= 0) {
return;
}
const droppedEvents = state.asyncDroppedEvents;
const droppedTrustedEvents = state.asyncDroppedTrustedEvents;
const droppedUntrustedEvents = state.asyncDroppedUntrustedEvents;
const droppedPriorityEvents = state.asyncDroppedPriorityEvents;
state.asyncDroppedEvents = 0;
state.asyncDroppedTrustedEvents = 0;
state.asyncDroppedUntrustedEvents = 0;
state.asyncDroppedPriorityEvents = 0;
const event = enrichDiagnosticEvent(state, {
type: "diagnostic.async_queue.dropped",
droppedEvents,
...(droppedTrustedEvents > 0 ? { droppedTrustedEvents } : {}),
...(droppedUntrustedEvents > 0 ? { droppedUntrustedEvents } : {}),
...(droppedPriorityEvents > 0 ? { droppedPriorityEvents } : {}),
queueLength: state.asyncQueue.length,
maxQueueLength: MAX_ASYNC_DIAGNOSTIC_EVENTS,
drainBatchSize: MAX_ASYNC_DIAGNOSTIC_EVENTS_PER_TURN,
});
dispatchDiagnosticEvent(state, event, { trusted: false });
}
export async function waitForDiagnosticEventsDrained(): Promise<void> {
const state = getDiagnosticEventsState();
while (state.asyncDrainScheduled || state.asyncQueue.length > 0) {
@@ -919,9 +986,13 @@ function emitDiagnosticEventWithTrust(event: DiagnosticEventInput, trusted: bool
if (ASYNC_DIAGNOSTIC_EVENT_TYPES.has(enriched.type)) {
if (state.asyncQueue.length >= MAX_ASYNC_DIAGNOSTIC_EVENTS) {
if (!trusted || !PRIORITY_ASYNC_DIAGNOSTIC_EVENT_TYPES.has(enriched.type)) {
noteAsyncDiagnosticDrop(state, { event: enriched, metadata });
return;
}
makeRoomForPriorityAsyncDiagnosticEvent(state);
const droppedEntry = makeRoomForPriorityAsyncDiagnosticEvent(state);
if (droppedEntry) {
noteAsyncDiagnosticDrop(state, droppedEntry);
}
}
state.asyncQueue.push({ event: enriched, metadata });
scheduleAsyncDiagnosticDrain(state);
@@ -999,4 +1070,8 @@ export function resetDiagnosticEventsForTest(): void {
state.dispatchDepth = 0;
state.asyncQueue = [];
state.asyncDrainScheduled = false;
state.asyncDroppedEvents = 0;
state.asyncDroppedTrustedEvents = 0;
state.asyncDroppedUntrustedEvents = 0;
state.asyncDroppedPriorityEvents = 0;
}

View File

@@ -740,10 +740,42 @@ function readStabilityEventRecord(
assignOptionalNumber(sanitized, "ageMs", record.ageMs, `${label}.ageMs`);
assignOptionalNumber(sanitized, "queueDepth", record.queueDepth, `${label}.queueDepth`);
assignOptionalNumber(sanitized, "queueSize", record.queueSize, `${label}.queueSize`);
assignOptionalNumber(sanitized, "queueLength", record.queueLength, `${label}.queueLength`);
assignOptionalNumber(sanitized, "waitMs", record.waitMs, `${label}.waitMs`);
assignOptionalNumber(sanitized, "active", record.active, `${label}.active`);
assignOptionalNumber(sanitized, "waiting", record.waiting, `${label}.waiting`);
assignOptionalNumber(sanitized, "queued", record.queued, `${label}.queued`);
assignOptionalNumber(sanitized, "droppedEvents", record.droppedEvents, `${label}.droppedEvents`);
assignOptionalNumber(
sanitized,
"droppedTrustedEvents",
record.droppedTrustedEvents,
`${label}.droppedTrustedEvents`,
);
assignOptionalNumber(
sanitized,
"droppedUntrustedEvents",
record.droppedUntrustedEvents,
`${label}.droppedUntrustedEvents`,
);
assignOptionalNumber(
sanitized,
"droppedPriorityEvents",
record.droppedPriorityEvents,
`${label}.droppedPriorityEvents`,
);
assignOptionalNumber(
sanitized,
"maxQueueLength",
record.maxQueueLength,
`${label}.maxQueueLength`,
);
assignOptionalNumber(
sanitized,
"drainBatchSize",
record.drainBatchSize,
`${label}.drainBatchSize`,
);
if (record.webhooks !== undefined) {
const webhooks = readObject(record.webhooks, `${label}.webhooks`);

View File

@@ -1,5 +1,9 @@
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { emitDiagnosticEvent, resetDiagnosticEventsForTest } from "../infra/diagnostic-events.js";
import {
emitDiagnosticEvent,
resetDiagnosticEventsForTest,
waitForDiagnosticEventsDrained,
} from "../infra/diagnostic-events.js";
import {
getDiagnosticStabilitySnapshot,
normalizeDiagnosticStabilityQuery,
@@ -410,6 +414,50 @@ describe("diagnostic stability recorder", () => {
});
});
it("keeps async queue drop summaries after drained queued events for sinceSeq polling", async () => {
startDiagnosticStabilityRecorder();
for (let index = 0; index < 10_001; index += 1) {
emitDiagnosticEvent({
type: "model.call.started",
runId: `overflow-run-${index}`,
callId: `overflow-call-${index}`,
provider: "openai",
model: "gpt-5.4",
});
}
await new Promise<void>((resolve) => setImmediate(resolve));
const midDrainSnapshot = getDiagnosticStabilitySnapshot({ limit: 1000 });
expect(midDrainSnapshot.lastSeq).toBe(100);
expect(
midDrainSnapshot.events.some((event) => event.type === "diagnostic.async_queue.dropped"),
).toBe(false);
await waitForDiagnosticEventsDrained();
const sinceMidDrain = getDiagnosticStabilitySnapshot({
sinceSeq: midDrainSnapshot.lastSeq,
limit: 1000,
});
const dropSummary = sinceMidDrain.events.find(
(event) => event.type === "diagnostic.async_queue.dropped",
);
expectFields(dropSummary, {
type: "diagnostic.async_queue.dropped",
droppedEvents: 1,
droppedUntrustedEvents: 1,
queueLength: 0,
maxQueueLength: 10_000,
drainBatchSize: 100,
});
expect(
sinceMidDrain.events.filter((event) => event.type === "model.call.started"),
).not.toHaveLength(0);
expect(sinceMidDrain.lastSeq).toBeGreaterThan(10_000);
});
it("applies query filters to persisted snapshots without mutating the source", () => {
const snapshot: DiagnosticStabilitySnapshot = {
generatedAt: "2026-04-22T12:00:00.000Z",

View File

@@ -59,11 +59,18 @@ export type DiagnosticStabilityEventRecord = {
ageMs?: number;
queueDepth?: number;
queueSize?: number;
queueLength?: number;
waitMs?: number;
failureKind?: string;
active?: number;
waiting?: number;
queued?: number;
droppedEvents?: number;
droppedTrustedEvents?: number;
droppedUntrustedEvents?: number;
droppedPriorityEvents?: number;
maxQueueLength?: number;
drainBatchSize?: number;
webhooks?: {
received: number;
processed: number;
@@ -501,6 +508,15 @@ function sanitizeDiagnosticEvent(event: DiagnosticEventPayload): DiagnosticStabi
record.outcome = event.status;
assignReasonCode(record, event.reason ?? event.errorCategory);
break;
case "diagnostic.async_queue.dropped":
record.droppedEvents = event.droppedEvents;
record.droppedTrustedEvents = event.droppedTrustedEvents;
record.droppedUntrustedEvents = event.droppedUntrustedEvents;
record.droppedPriorityEvents = event.droppedPriorityEvents;
record.queueLength = event.queueLength;
record.maxQueueLength = event.maxQueueLength;
record.drainBatchSize = event.drainBatchSize;
break;
case "model.failover":
record.provider = event.fromProvider;
record.model = event.fromModel;