diff --git a/CHANGELOG.md b/CHANGELOG.md index acdc7b5c463..85a5d9c1549 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -88,6 +88,7 @@ Docs: https://docs.openclaw.ai - Channels: treat bare abort messages such as `stop`, `abort`, and `wait` as immediate control commands in inbound debounce paths so stop requests are not delayed behind pending message coalescing. (#83348) Thanks @IWhatsskill. - Channels/message tool: resolve configured external channel plugins during in-agent channel selection, so `openclaw agent --local` message-tool sends no longer report an available channel as unavailable. (#85022) Thanks @Kaspre. - Gateway/ACP: close child ACP sessions spawned via `sessions_spawn` when their parent session is reset or deleted, instead of leaving orphaned `claude-agent-acp` processes that accumulate and exhaust memory. Fixes #68916. (#85190) Thanks @openperf. +- Diagnostics: bound cleanup timeout detail logs and emit drop summaries when async diagnostic bursts exceed the queue cap. - Agents/subagents: surface blocked child-run completions as errors instead of successful subagent finishes. (#80886) Thanks @TurboTheTurtle. - Agents/Pi: treat accepted embedded `sessions_spawn` child-session handoffs as terminal progress so parent turns no longer report false non-deliverable failures. (#85054) Thanks @samzong. - CLI/models: resolve `openclaw models set` aliases from the runtime config while keeping authored aliases ahead of runtime-only defaults. (#83262) Thanks @IWhatsskill. diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index 8feac8f0e2d..f1ae363bcb4 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -b3105c70370edd21c77b943b8c34f5d7ea99df2a92eaf3871f36016823579ffe plugin-sdk-api-baseline.json -b0fe23ab4862aa667111a3b433e42faed77d4b7126e9db974b1a00a298232b85 plugin-sdk-api-baseline.jsonl +bc51139688a48ac217ee4c03b2d76bf4e5b87346c2dbbc0442bf8b3fb72c746b plugin-sdk-api-baseline.json +0e681f44ebec6d16f1898ded7123ea5c608ab520d479fd905d8653f8aca1f008 plugin-sdk-api-baseline.jsonl diff --git a/src/agents/run-cleanup-timeout.test.ts b/src/agents/run-cleanup-timeout.test.ts index 34f8014d36a..4799055c524 100644 --- a/src/agents/run-cleanup-timeout.test.ts +++ b/src/agents/run-cleanup-timeout.test.ts @@ -1,6 +1,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { AGENT_CLEANUP_STEP_TIMEOUT_MS, + CLEANUP_TIMEOUT_DETAILS_MAX_CHARS, resolveAgentCleanupStepTimeoutMs, runAgentCleanupStep, } from "./run-cleanup-timeout.js"; @@ -86,6 +87,34 @@ describe("agent cleanup timeout", () => { ); }); + it("bounds cleanup timeout details before logging", async () => { + const cleanup = vi.fn(async () => new Promise(() => {})); + const oversizedDetails = `queuedBytes=${"9".repeat(CLEANUP_TIMEOUT_DETAILS_MAX_CHARS * 2)}`; + + const result = runAgentCleanupStep({ + runId: "run-trajectory", + sessionId: "session-trajectory", + step: "pi-trajectory-flush", + cleanup, + log, + timeoutMs: 5, + getTimeoutDetails: () => oversizedDetails, + }); + + await vi.advanceTimersByTimeAsync(5); + await expect(result).resolves.toBeUndefined(); + + const message = String(log.warn.mock.calls.at(-1)?.[0] ?? ""); + expect(message).toContain(" details=queuedBytes="); + expect(message).toContain("...[truncated]"); + expect(message.length).toBeLessThan( + "agent cleanup timed out: runId=run-trajectory sessionId=session-trajectory step=pi-trajectory-flush timeoutMs=5 details=" + .length + + CLEANUP_TIMEOUT_DETAILS_MAX_CHARS + + 1, + ); + }); + it("does not fail cleanup when timeout details throw", async () => { const cleanup = vi.fn(async () => new Promise(() => {})); @@ -109,6 +138,35 @@ describe("agent cleanup timeout", () => { ); }); + it("bounds cleanup timeout detail errors before logging", async () => { + const cleanup = vi.fn(async () => new Promise(() => {})); + + const result = runAgentCleanupStep({ + runId: "run-trajectory", + sessionId: "session-trajectory", + step: "pi-trajectory-flush", + cleanup, + log, + timeoutMs: 5, + getTimeoutDetails: () => { + throw new Error("details unavailable ".repeat(CLEANUP_TIMEOUT_DETAILS_MAX_CHARS)); + }, + }); + + await vi.advanceTimersByTimeAsync(5); + await expect(result).resolves.toBeUndefined(); + + const message = String(log.warn.mock.calls.at(-1)?.[0] ?? ""); + expect(message).toContain(" detailsError=details unavailable"); + expect(message).toContain("...[truncated]"); + expect(message.length).toBeLessThan( + "agent cleanup timed out: runId=run-trajectory sessionId=session-trajectory step=pi-trajectory-flush timeoutMs=5 detailsError=" + .length + + CLEANUP_TIMEOUT_DETAILS_MAX_CHARS + + 1, + ); + }); + it("uses the general cleanup timeout environment override for other cleanup steps", async () => { const cleanup = vi.fn(async () => new Promise(() => {})); diff --git a/src/agents/run-cleanup-timeout.ts b/src/agents/run-cleanup-timeout.ts index cd3198c5e01..afec3e67ead 100644 --- a/src/agents/run-cleanup-timeout.ts +++ b/src/agents/run-cleanup-timeout.ts @@ -3,6 +3,9 @@ import { formatErrorMessage } from "../infra/errors.js"; export const AGENT_CLEANUP_STEP_TIMEOUT_MS = 10_000; export const AGENT_CLEANUP_STEP_TIMEOUT_ENV = "OPENCLAW_AGENT_CLEANUP_TIMEOUT_MS"; export const TRAJECTORY_FLUSH_TIMEOUT_ENV = "OPENCLAW_TRAJECTORY_FLUSH_TIMEOUT_MS"; +export const CLEANUP_TIMEOUT_DETAILS_MAX_CHARS = 512; + +const CLEANUP_TIMEOUT_DETAILS_TRUNCATED_SUFFIX = "...[truncated]"; type AgentCleanupLogger = { warn: (message: string) => void; @@ -33,12 +36,23 @@ function resolveCleanupTimeoutDetails( ): string { try { const timeoutDetails = getTimeoutDetails?.()?.trim(); - return timeoutDetails ? ` details=${timeoutDetails}` : ""; + return timeoutDetails ? ` details=${truncateCleanupTimeoutDetails(timeoutDetails)}` : ""; } catch (error) { - return ` detailsError=${formatErrorMessage(error)}`; + return ` detailsError=${truncateCleanupTimeoutDetails(formatErrorMessage(error))}`; } } +function truncateCleanupTimeoutDetails(value: string): string { + if (value.length <= CLEANUP_TIMEOUT_DETAILS_MAX_CHARS) { + return value; + } + const prefixLength = Math.max( + 0, + CLEANUP_TIMEOUT_DETAILS_MAX_CHARS - CLEANUP_TIMEOUT_DETAILS_TRUNCATED_SUFFIX.length, + ); + return `${value.slice(0, prefixLength)}${CLEANUP_TIMEOUT_DETAILS_TRUNCATED_SUFFIX}`; +} + export function resolveAgentCleanupStepTimeoutMs(params: { step: string; timeoutMs?: number; diff --git a/src/cli/gateway-cli/register.ts b/src/cli/gateway-cli/register.ts index 392dda45b2a..b9aa9889dc9 100644 --- a/src/cli/gateway-cli/register.ts +++ b/src/cli/gateway-cli/register.ts @@ -211,6 +211,9 @@ function formatStabilityEvent(record: DiagnosticStabilityEventRecord): string { record.bytes !== undefined ? `bytes=${formatBytes(record.bytes)}` : "", record.limitBytes !== undefined ? `limit=${formatBytes(record.limitBytes)}` : "", record.queueDepth !== undefined ? `queueDepth=${record.queueDepth}` : "", + record.queueLength !== undefined ? `queueLength=${record.queueLength}` : "", + record.droppedEvents !== undefined ? `dropped=${record.droppedEvents}` : "", + record.maxQueueLength !== undefined ? `maxQueue=${record.maxQueueLength}` : "", record.queued !== undefined ? `queued=${record.queued}` : "", record.memory ? `rss=${formatBytes(record.memory.rssBytes)}` : "", record.memory ? `heap=${formatBytes(record.memory.heapUsedBytes)}` : "", diff --git a/src/infra/diagnostic-events.test.ts b/src/infra/diagnostic-events.test.ts index 9021ae367f6..3251cd8513f 100644 --- a/src/infra/diagnostic-events.test.ts +++ b/src/infra/diagnostic-events.test.ts @@ -636,6 +636,40 @@ describe("diagnostic-events", () => { expect(events.filter((event) => event.type === "model.call.started")).toHaveLength(9_998); }); + it("emits a bounded summary when async diagnostics are dropped at saturation", async () => { + const events: DiagnosticEventPayload[] = []; + onDiagnosticEvent((event) => { + events.push(event); + }); + + for (let index = 0; index < 10_001; index += 1) { + emitDiagnosticEvent({ + type: "model.call.started", + runId: `drop-run-${index}`, + callId: `drop-call-${index}`, + provider: "openai", + model: "gpt-5.4", + }); + } + + await waitForDiagnosticEventsDrained(); + + const dropSummary = events.find( + ( + event, + ): event is Extract => + event.type === "diagnostic.async_queue.dropped", + ); + expect(dropSummary).toMatchObject({ + type: "diagnostic.async_queue.dropped", + droppedEvents: 1, + droppedUntrustedEvents: 1, + maxQueueLength: 10_000, + drainBatchSize: 100, + }); + expect(events.filter((event) => event.type === "model.call.started")).toHaveLength(10_000); + }); + it("keeps log records off the public diagnostic event stream", async () => { const publicEvents: string[] = []; const internalEvents: string[] = []; diff --git a/src/infra/diagnostic-events.ts b/src/infra/diagnostic-events.ts index 88de659807b..8a57a218d77 100644 --- a/src/infra/diagnostic-events.ts +++ b/src/infra/diagnostic-events.ts @@ -612,6 +612,17 @@ export type DiagnosticTelemetryExporterEvent = DiagnosticBaseEvent & { errorCategory?: string; }; +export type DiagnosticAsyncQueueDroppedEvent = DiagnosticBaseEvent & { + type: "diagnostic.async_queue.dropped"; + droppedEvents: number; + droppedTrustedEvents?: number; + droppedUntrustedEvents?: number; + droppedPriorityEvents?: number; + queueLength: number; + maxQueueLength: number; + drainBatchSize: number; +}; + export type DiagnosticEventPayload = | DiagnosticUsageEvent | DiagnosticWebhookReceivedEvent @@ -660,6 +671,7 @@ export type DiagnosticEventPayload = | DiagnosticPayloadLargeEvent | DiagnosticLogRecordEvent | DiagnosticTelemetryExporterEvent + | DiagnosticAsyncQueueDroppedEvent | DiagnosticFailoverEvent; export type DiagnosticEventInput = DiagnosticEventPayload extends infer Event @@ -690,6 +702,10 @@ type DiagnosticEventsGlobalState = { dispatchDepth: number; asyncQueue: QueuedDiagnosticEvent[]; asyncDrainScheduled: boolean; + asyncDroppedEvents: number; + asyncDroppedTrustedEvents: number; + asyncDroppedUntrustedEvents: number; + asyncDroppedPriorityEvents: number; }; const MAX_ASYNC_DIAGNOSTIC_EVENTS = 10_000; @@ -731,6 +747,10 @@ function createDiagnosticEventsState(): DiagnosticEventsGlobalState { dispatchDepth: 0, asyncQueue: [], asyncDrainScheduled: false, + asyncDroppedEvents: 0, + asyncDroppedTrustedEvents: 0, + asyncDroppedUntrustedEvents: 0, + asyncDroppedPriorityEvents: 0, }; } @@ -754,6 +774,10 @@ function getDiagnosticEventsState(): DiagnosticEventsGlobalState { const globalRecord = globalThis as Record; const existing = globalRecord[DIAGNOSTIC_EVENTS_STATE_KEY]; if (isDiagnosticEventsState(existing)) { + existing.asyncDroppedEvents ??= 0; + existing.asyncDroppedTrustedEvents ??= 0; + existing.asyncDroppedUntrustedEvents ??= 0; + existing.asyncDroppedPriorityEvents ??= 0; return existing; } const state = createDiagnosticEventsState(); @@ -834,15 +858,31 @@ function isPriorityAsyncDiagnosticEvent(entry: QueuedDiagnosticEvent): boolean { return entry.metadata.trusted && PRIORITY_ASYNC_DIAGNOSTIC_EVENT_TYPES.has(entry.event.type); } -function makeRoomForPriorityAsyncDiagnosticEvent(state: DiagnosticEventsGlobalState): void { +function noteAsyncDiagnosticDrop( + state: DiagnosticEventsGlobalState, + entry: QueuedDiagnosticEvent, +): void { + state.asyncDroppedEvents += 1; + if (entry.metadata.trusted) { + state.asyncDroppedTrustedEvents += 1; + } else { + state.asyncDroppedUntrustedEvents += 1; + } + if (isPriorityAsyncDiagnosticEvent(entry)) { + state.asyncDroppedPriorityEvents += 1; + } +} + +function makeRoomForPriorityAsyncDiagnosticEvent( + state: DiagnosticEventsGlobalState, +): QueuedDiagnosticEvent | undefined { const nonPriorityIndex = state.asyncQueue.findIndex( (entry) => !isPriorityAsyncDiagnosticEvent(entry), ); if (nonPriorityIndex >= 0) { - state.asyncQueue.splice(nonPriorityIndex, 1); - return; + return state.asyncQueue.splice(nonPriorityIndex, 1)[0]; } - state.asyncQueue.shift(); + return state.asyncQueue.shift(); } function deepFreezeDiagnosticValue(value: unknown, seen = new WeakSet()): unknown { @@ -878,10 +918,37 @@ function scheduleAsyncDiagnosticDrain(state: DiagnosticEventsGlobalState): void } if (state.asyncQueue.length > 0) { scheduleAsyncDiagnosticDrain(state); + return; } + dispatchAsyncDiagnosticDropSummary(state); }); } +function dispatchAsyncDiagnosticDropSummary(state: DiagnosticEventsGlobalState): void { + if (state.asyncDroppedEvents <= 0) { + return; + } + const droppedEvents = state.asyncDroppedEvents; + const droppedTrustedEvents = state.asyncDroppedTrustedEvents; + const droppedUntrustedEvents = state.asyncDroppedUntrustedEvents; + const droppedPriorityEvents = state.asyncDroppedPriorityEvents; + state.asyncDroppedEvents = 0; + state.asyncDroppedTrustedEvents = 0; + state.asyncDroppedUntrustedEvents = 0; + state.asyncDroppedPriorityEvents = 0; + const event = enrichDiagnosticEvent(state, { + type: "diagnostic.async_queue.dropped", + droppedEvents, + ...(droppedTrustedEvents > 0 ? { droppedTrustedEvents } : {}), + ...(droppedUntrustedEvents > 0 ? { droppedUntrustedEvents } : {}), + ...(droppedPriorityEvents > 0 ? { droppedPriorityEvents } : {}), + queueLength: state.asyncQueue.length, + maxQueueLength: MAX_ASYNC_DIAGNOSTIC_EVENTS, + drainBatchSize: MAX_ASYNC_DIAGNOSTIC_EVENTS_PER_TURN, + }); + dispatchDiagnosticEvent(state, event, { trusted: false }); +} + export async function waitForDiagnosticEventsDrained(): Promise { const state = getDiagnosticEventsState(); while (state.asyncDrainScheduled || state.asyncQueue.length > 0) { @@ -919,9 +986,13 @@ function emitDiagnosticEventWithTrust(event: DiagnosticEventInput, trusted: bool if (ASYNC_DIAGNOSTIC_EVENT_TYPES.has(enriched.type)) { if (state.asyncQueue.length >= MAX_ASYNC_DIAGNOSTIC_EVENTS) { if (!trusted || !PRIORITY_ASYNC_DIAGNOSTIC_EVENT_TYPES.has(enriched.type)) { + noteAsyncDiagnosticDrop(state, { event: enriched, metadata }); return; } - makeRoomForPriorityAsyncDiagnosticEvent(state); + const droppedEntry = makeRoomForPriorityAsyncDiagnosticEvent(state); + if (droppedEntry) { + noteAsyncDiagnosticDrop(state, droppedEntry); + } } state.asyncQueue.push({ event: enriched, metadata }); scheduleAsyncDiagnosticDrain(state); @@ -999,4 +1070,8 @@ export function resetDiagnosticEventsForTest(): void { state.dispatchDepth = 0; state.asyncQueue = []; state.asyncDrainScheduled = false; + state.asyncDroppedEvents = 0; + state.asyncDroppedTrustedEvents = 0; + state.asyncDroppedUntrustedEvents = 0; + state.asyncDroppedPriorityEvents = 0; } diff --git a/src/logging/diagnostic-stability-bundle.ts b/src/logging/diagnostic-stability-bundle.ts index e9d06b1cb2b..99628a1e303 100644 --- a/src/logging/diagnostic-stability-bundle.ts +++ b/src/logging/diagnostic-stability-bundle.ts @@ -740,10 +740,42 @@ function readStabilityEventRecord( assignOptionalNumber(sanitized, "ageMs", record.ageMs, `${label}.ageMs`); assignOptionalNumber(sanitized, "queueDepth", record.queueDepth, `${label}.queueDepth`); assignOptionalNumber(sanitized, "queueSize", record.queueSize, `${label}.queueSize`); + assignOptionalNumber(sanitized, "queueLength", record.queueLength, `${label}.queueLength`); assignOptionalNumber(sanitized, "waitMs", record.waitMs, `${label}.waitMs`); assignOptionalNumber(sanitized, "active", record.active, `${label}.active`); assignOptionalNumber(sanitized, "waiting", record.waiting, `${label}.waiting`); assignOptionalNumber(sanitized, "queued", record.queued, `${label}.queued`); + assignOptionalNumber(sanitized, "droppedEvents", record.droppedEvents, `${label}.droppedEvents`); + assignOptionalNumber( + sanitized, + "droppedTrustedEvents", + record.droppedTrustedEvents, + `${label}.droppedTrustedEvents`, + ); + assignOptionalNumber( + sanitized, + "droppedUntrustedEvents", + record.droppedUntrustedEvents, + `${label}.droppedUntrustedEvents`, + ); + assignOptionalNumber( + sanitized, + "droppedPriorityEvents", + record.droppedPriorityEvents, + `${label}.droppedPriorityEvents`, + ); + assignOptionalNumber( + sanitized, + "maxQueueLength", + record.maxQueueLength, + `${label}.maxQueueLength`, + ); + assignOptionalNumber( + sanitized, + "drainBatchSize", + record.drainBatchSize, + `${label}.drainBatchSize`, + ); if (record.webhooks !== undefined) { const webhooks = readObject(record.webhooks, `${label}.webhooks`); diff --git a/src/logging/diagnostic-stability.test.ts b/src/logging/diagnostic-stability.test.ts index 71a2a0d14ef..ca2f98805a9 100644 --- a/src/logging/diagnostic-stability.test.ts +++ b/src/logging/diagnostic-stability.test.ts @@ -1,5 +1,9 @@ import { afterEach, beforeEach, describe, expect, it } from "vitest"; -import { emitDiagnosticEvent, resetDiagnosticEventsForTest } from "../infra/diagnostic-events.js"; +import { + emitDiagnosticEvent, + resetDiagnosticEventsForTest, + waitForDiagnosticEventsDrained, +} from "../infra/diagnostic-events.js"; import { getDiagnosticStabilitySnapshot, normalizeDiagnosticStabilityQuery, @@ -410,6 +414,50 @@ describe("diagnostic stability recorder", () => { }); }); + it("keeps async queue drop summaries after drained queued events for sinceSeq polling", async () => { + startDiagnosticStabilityRecorder(); + + for (let index = 0; index < 10_001; index += 1) { + emitDiagnosticEvent({ + type: "model.call.started", + runId: `overflow-run-${index}`, + callId: `overflow-call-${index}`, + provider: "openai", + model: "gpt-5.4", + }); + } + + await new Promise((resolve) => setImmediate(resolve)); + + const midDrainSnapshot = getDiagnosticStabilitySnapshot({ limit: 1000 }); + expect(midDrainSnapshot.lastSeq).toBe(100); + expect( + midDrainSnapshot.events.some((event) => event.type === "diagnostic.async_queue.dropped"), + ).toBe(false); + + await waitForDiagnosticEventsDrained(); + + const sinceMidDrain = getDiagnosticStabilitySnapshot({ + sinceSeq: midDrainSnapshot.lastSeq, + limit: 1000, + }); + const dropSummary = sinceMidDrain.events.find( + (event) => event.type === "diagnostic.async_queue.dropped", + ); + expectFields(dropSummary, { + type: "diagnostic.async_queue.dropped", + droppedEvents: 1, + droppedUntrustedEvents: 1, + queueLength: 0, + maxQueueLength: 10_000, + drainBatchSize: 100, + }); + expect( + sinceMidDrain.events.filter((event) => event.type === "model.call.started"), + ).not.toHaveLength(0); + expect(sinceMidDrain.lastSeq).toBeGreaterThan(10_000); + }); + it("applies query filters to persisted snapshots without mutating the source", () => { const snapshot: DiagnosticStabilitySnapshot = { generatedAt: "2026-04-22T12:00:00.000Z", diff --git a/src/logging/diagnostic-stability.ts b/src/logging/diagnostic-stability.ts index 37aaf036ed8..760a02ecd53 100644 --- a/src/logging/diagnostic-stability.ts +++ b/src/logging/diagnostic-stability.ts @@ -59,11 +59,18 @@ export type DiagnosticStabilityEventRecord = { ageMs?: number; queueDepth?: number; queueSize?: number; + queueLength?: number; waitMs?: number; failureKind?: string; active?: number; waiting?: number; queued?: number; + droppedEvents?: number; + droppedTrustedEvents?: number; + droppedUntrustedEvents?: number; + droppedPriorityEvents?: number; + maxQueueLength?: number; + drainBatchSize?: number; webhooks?: { received: number; processed: number; @@ -501,6 +508,15 @@ function sanitizeDiagnosticEvent(event: DiagnosticEventPayload): DiagnosticStabi record.outcome = event.status; assignReasonCode(record, event.reason ?? event.errorCategory); break; + case "diagnostic.async_queue.dropped": + record.droppedEvents = event.droppedEvents; + record.droppedTrustedEvents = event.droppedTrustedEvents; + record.droppedUntrustedEvents = event.droppedUntrustedEvents; + record.droppedPriorityEvents = event.droppedPriorityEvents; + record.queueLength = event.queueLength; + record.maxQueueLength = event.maxQueueLength; + record.drainBatchSize = event.drainBatchSize; + break; case "model.failover": record.provider = event.fromProvider; record.model = event.fromModel;