From e84d4b27f44ce4c1bca63f9d85c2a65f2ad7f688 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 4 May 2026 22:34:59 +0100 Subject: [PATCH] feat: add gateway stall diagnostics --- docs/gateway/diagnostics.md | 19 ++-- docs/help/debugging.md | 7 ++ scripts/run-node.mjs | 13 ++- src/cli/gateway-cli/run.ts | 3 +- src/gateway/server.impl.ts | 3 +- src/infra/diagnostic-events.ts | 26 ++++++ src/infra/run-node.test.ts | 38 ++++++++ src/logging/diagnostic-phase.ts | 96 ++++++++++++++++++++ src/logging/diagnostic-run-activity.ts | 13 +++ src/logging/diagnostic-stability-bundle.ts | 1 + src/logging/diagnostic-stability.ts | 12 +++ src/logging/diagnostic.test.ts | 84 +++++++++++++++++ src/logging/diagnostic.ts | 100 ++++++++++++++++++++- 13 files changed, 401 insertions(+), 14 deletions(-) create mode 100644 src/logging/diagnostic-phase.ts diff --git a/docs/gateway/diagnostics.md b/docs/gateway/diagnostics.md index bf3b0c600d4..5877cc9c6e3 100644 --- a/docs/gateway/diagnostics.md +++ b/docs/gateway/diagnostics.md @@ -117,12 +117,19 @@ diagnostics are enabled. It is for operational facts, not content. The same diagnostic heartbeat records liveness samples when the Gateway keeps running but the Node.js event loop or CPU looks saturated. These `diagnostic.liveness.warning` events include event-loop delay, event-loop -utilization, CPU-core ratio, and active/waiting/queued session counts. Idle -samples stay in telemetry at `info` level. Liveness samples become Gateway -warnings only when work is waiting or queued, or when active work overlaps with -sustained event-loop delay. Transient max-delay spikes during otherwise healthy -background work stay in debug logs. They do not restart the Gateway by -themselves. +utilization, CPU-core ratio, active/waiting/queued session counts, the current +startup/runtime phase when known, recent phase spans, and bounded active/queued +work labels. Idle samples stay in telemetry at `info` level. Liveness samples +become Gateway warnings only when work is waiting or queued, or when active work +overlaps with sustained event-loop delay. Transient max-delay spikes during +otherwise healthy background work stay in debug logs. They do not restart the +Gateway by themselves. + +Startup phases also emit `diagnostic.phase.completed` events with wall-clock and +CPU timing. Stalled embedded-run diagnostics mark `terminalProgressStale=true` +when the last bridge progress looked terminal, such as a raw response item or +response completion event, but the Gateway still considers the embedded run +active. Inspect the live recorder: diff --git a/docs/help/debugging.md b/docs/help/debugging.md index a88a286d114..70220c6d76c 100644 --- a/docs/help/debugging.md +++ b/docs/help/debugging.md @@ -89,6 +89,13 @@ OPENCLAW_RUN_NODE_CPU_PROF_DIR=.artifacts/cli-cpu pnpm openclaw status The source runner adds Node CPU profile flags and writes a `.cpuprofile` for the command. Use this before adding temporary instrumentation to command code. +For startup stalls that look like synchronous filesystem or module-loader work, +add Node's sync I/O trace flag through the source runner: + +```bash +OPENCLAW_TRACE_SYNC_IO=1 pnpm openclaw gateway --force +``` + ## Gateway watch mode For fast iteration, run the gateway under the file watcher: diff --git a/scripts/run-node.mjs b/scripts/run-node.mjs index 46226255357..e54361af211 100644 --- a/scripts/run-node.mjs +++ b/scripts/run-node.mjs @@ -487,6 +487,15 @@ const resolveRunNodeCpuProfileArgs = (deps) => { return ["--cpu-prof", `--cpu-prof-dir=${absoluteProfileDir}`, `--cpu-prof-name=${profileName}`]; }; +const resolveRunNodeDiagnosticArgs = (deps) => { + const args = [...resolveRunNodeCpuProfileArgs(deps)]; + if (deps.env.OPENCLAW_TRACE_SYNC_IO === "1") { + logRunner("Enabling Node --trace-sync-io for startup I/O diagnostics.", deps); + args.push("--trace-sync-io"); + } + return args; +}; + const waitForSpawnedProcess = async (childProcess, deps) => { let forwardedSignal = null; let onSigInt; @@ -557,8 +566,8 @@ const getInterruptedSpawnExitCode = (res) => { }; const runOpenClaw = async (deps) => { - const cpuProfileArgs = resolveRunNodeCpuProfileArgs(deps); - const nodeProcess = deps.spawn(deps.execPath, [...cpuProfileArgs, "openclaw.mjs", ...deps.args], { + const diagnosticArgs = resolveRunNodeDiagnosticArgs(deps); + const nodeProcess = deps.spawn(deps.execPath, [...diagnosticArgs, "openclaw.mjs", ...deps.args], { cwd: deps.cwd, env: deps.env, stdio: deps.outputTee ? ["inherit", "pipe", "pipe"] : "inherit", diff --git a/src/cli/gateway-cli/run.ts b/src/cli/gateway-cli/run.ts index 674c8932a7c..5b23d01708d 100644 --- a/src/cli/gateway-cli/run.ts +++ b/src/cli/gateway-cli/run.ts @@ -25,6 +25,7 @@ import { formatErrorMessage } from "../../infra/errors.js"; import { GatewayLockError } from "../../infra/gateway-lock.js"; import type { RespawnSupervisor } from "../../infra/supervisor-markers.js"; import { setConsoleSubsystemFilter, setConsoleTimestampPrefix } from "../../logging/console.js"; +import { withDiagnosticPhase } from "../../logging/diagnostic-phase.js"; import { createSubsystemLogger } from "../../logging/subsystem.js"; import { defaultRuntime } from "../../runtime.js"; import { @@ -158,7 +159,7 @@ function createGatewayCliStartupTrace() { async measure(name: string, run: () => Awaitable): Promise { const before = performance.now(); try { - return await run(); + return await withDiagnosticPhase(`cli.${name}`, run); } finally { const now = performance.now(); emit(name, now - before, now - started); diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index ce3ba130d75..aa13bd722ac 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -37,6 +37,7 @@ import { ensureOpenClawCliOnPath } from "../infra/path-env.js"; import { setGatewaySigusr1RestartPolicy, setPreRestartDeferralCheck } from "../infra/restart.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; import type { VoiceWakeRoutingConfig } from "../infra/voicewake-routing.js"; +import { withDiagnosticPhase } from "../logging/diagnostic-phase.js"; import { startDiagnosticHeartbeat, stopDiagnosticHeartbeat } from "../logging/diagnostic.js"; import { createSubsystemLogger, runtimeForLogger } from "../logging/subsystem.js"; import { @@ -355,7 +356,7 @@ function createGatewayStartupTrace() { timelineOptions(), ); try { - const result = await run(); + const result = await withDiagnosticPhase(mapTimelineName(name), run, { traceName: name }); const now = performance.now(); emitDiagnosticsTimelineEvent( { diff --git a/src/infra/diagnostic-events.ts b/src/infra/diagnostic-events.ts index 24dd86f0e5b..08015523027 100644 --- a/src/infra/diagnostic-events.ts +++ b/src/infra/diagnostic-events.ts @@ -146,6 +146,7 @@ type DiagnosticSessionAttentionBaseEvent = DiagnosticBaseEvent & { activeToolName?: string; activeToolCallId?: string; activeToolAgeMs?: number; + terminalProgressStale?: boolean; }; export type DiagnosticSessionLongRunningEvent = DiagnosticSessionAttentionBaseEvent & { @@ -206,6 +207,20 @@ export type DiagnosticHeartbeatEvent = DiagnosticBaseEvent & { export type DiagnosticLivenessWarningReason = "event_loop_delay" | "event_loop_utilization" | "cpu"; +export type DiagnosticPhaseDetails = Record; + +export type DiagnosticPhaseSnapshot = { + name: string; + startedAt: number; + endedAt?: number; + durationMs?: number; + cpuUserMs?: number; + cpuSystemMs?: number; + cpuTotalMs?: number; + cpuCoreRatio?: number; + details?: DiagnosticPhaseDetails; +}; + export type DiagnosticLivenessWarningEvent = DiagnosticBaseEvent & { type: "diagnostic.liveness.warning"; reasons: DiagnosticLivenessWarningReason[]; @@ -220,8 +235,18 @@ export type DiagnosticLivenessWarningEvent = DiagnosticBaseEvent & { active: number; waiting: number; queued: number; + phase?: string; + recentPhases?: DiagnosticPhaseSnapshot[]; + activeWorkLabels?: string[]; + waitingWorkLabels?: string[]; + queuedWorkLabels?: string[]; }; +export type DiagnosticPhaseCompletedEvent = DiagnosticBaseEvent & + DiagnosticPhaseSnapshot & { + type: "diagnostic.phase.completed"; + }; + export type DiagnosticToolLoopEvent = DiagnosticBaseEvent & { type: "tool.loop"; sessionKey?: string; @@ -501,6 +526,7 @@ export type DiagnosticEventPayload = | DiagnosticRunProgressEvent | DiagnosticHeartbeatEvent | DiagnosticLivenessWarningEvent + | DiagnosticPhaseCompletedEvent | DiagnosticToolLoopEvent | DiagnosticToolExecutionStartedEvent | DiagnosticToolExecutionCompletedEvent diff --git a/src/infra/run-node.test.ts b/src/infra/run-node.test.ts index d4bac6516ae..b38e944a929 100644 --- a/src/infra/run-node.test.ts +++ b/src/infra/run-node.test.ts @@ -497,6 +497,44 @@ describe("run-node script", () => { }); }); + it("adds Node sync I/O tracing flag to the launched OpenClaw child when requested", async () => { + await withTempDir({ prefix: "openclaw-run-node-" }, async (tmp) => { + await setupTrackedProject(tmp, { + files: { + [ROOT_SRC]: "export const value = 1;\n", + }, + oldPaths: [ROOT_SRC, ROOT_TSCONFIG, ROOT_PACKAGE], + buildPaths: [DIST_ENTRY, BUILD_STAMP], + }); + const spawnCalls: string[][] = []; + const spawn = (_cmd: string, args: string[]) => { + spawnCalls.push(args); + return createExitedProcess(0); + }; + const { spawnSync } = createSpawnRecorder({ + gitHead: "abc123\n", + gitStatus: "", + }); + + const exitCode = await runNodeMain({ + cwd: tmp, + args: ["gateway", "--force"], + env: { + ...process.env, + OPENCLAW_RUNNER_LOG: "0", + OPENCLAW_TRACE_SYNC_IO: "1", + }, + spawn, + spawnSync, + execPath: process.execPath, + platform: process.platform, + }); + + expect(exitCode).toBe(0); + expect(spawnCalls.at(-1)).toEqual(["--trace-sync-io", "openclaw.mjs", "gateway", "--force"]); + }); + }); + it("surfaces generic output log stream errors", async () => { await withTempDir({ prefix: "openclaw-run-node-" }, async (tmp) => { await setupTrackedProject(tmp); diff --git a/src/logging/diagnostic-phase.ts b/src/logging/diagnostic-phase.ts new file mode 100644 index 00000000000..abd8b379a40 --- /dev/null +++ b/src/logging/diagnostic-phase.ts @@ -0,0 +1,96 @@ +import { performance } from "node:perf_hooks"; +import { + areDiagnosticsEnabledForProcess, + emitDiagnosticEvent, + type DiagnosticPhaseDetails, + type DiagnosticPhaseSnapshot, +} from "../infra/diagnostic-events.js"; + +const RECENT_PHASE_CAPACITY = 40; + +type ActiveDiagnosticPhase = { + name: string; + startedAt: number; + startedWallMs: number; + cpuStarted: NodeJS.CpuUsage; + details?: DiagnosticPhaseDetails; +}; + +let activePhaseStack: ActiveDiagnosticPhase[] = []; +let recentPhases: DiagnosticPhaseSnapshot[] = []; + +function roundMetric(value: number, digits = 1): number { + if (!Number.isFinite(value)) { + return 0; + } + const factor = 10 ** digits; + return Math.round(value * factor) / factor; +} + +function pushRecentPhase(snapshot: DiagnosticPhaseSnapshot): void { + recentPhases.push(snapshot); + if (recentPhases.length > RECENT_PHASE_CAPACITY) { + recentPhases = recentPhases.slice(-RECENT_PHASE_CAPACITY); + } +} + +export function getCurrentDiagnosticPhase(): string | undefined { + return activePhaseStack.at(-1)?.name; +} + +export function getRecentDiagnosticPhases(limit = 8): DiagnosticPhaseSnapshot[] { + return recentPhases.slice(-Math.max(0, limit)).map((phase) => ({ ...phase })); +} + +export function recordDiagnosticPhase(snapshot: DiagnosticPhaseSnapshot): void { + pushRecentPhase(snapshot); + if (!areDiagnosticsEnabledForProcess()) { + return; + } + emitDiagnosticEvent({ + type: "diagnostic.phase.completed", + ...snapshot, + }); +} + +export async function withDiagnosticPhase( + name: string, + run: () => Promise | T, + details?: DiagnosticPhaseDetails, +): Promise { + const active: ActiveDiagnosticPhase = { + name, + startedAt: Date.now(), + startedWallMs: performance.now(), + cpuStarted: process.cpuUsage(), + details, + }; + activePhaseStack.push(active); + try { + return await run(); + } finally { + const endedAt = Date.now(); + const durationMs = roundMetric(performance.now() - active.startedWallMs, 1); + const cpu = process.cpuUsage(active.cpuStarted); + const cpuUserMs = roundMetric(cpu.user / 1_000, 1); + const cpuSystemMs = roundMetric(cpu.system / 1_000, 1); + const cpuTotalMs = roundMetric(cpuUserMs + cpuSystemMs, 1); + activePhaseStack = activePhaseStack.filter((entry) => entry !== active); + recordDiagnosticPhase({ + name, + startedAt: active.startedAt, + endedAt, + durationMs, + cpuUserMs, + cpuSystemMs, + cpuTotalMs, + cpuCoreRatio: roundMetric(cpuTotalMs / Math.max(1, durationMs), 3), + details: active.details, + }); + } +} + +export function resetDiagnosticPhasesForTest(): void { + activePhaseStack = []; + recentPhases = []; +} diff --git a/src/logging/diagnostic-run-activity.ts b/src/logging/diagnostic-run-activity.ts index f8155e94fcb..513b0d727a1 100644 --- a/src/logging/diagnostic-run-activity.ts +++ b/src/logging/diagnostic-run-activity.ts @@ -294,6 +294,19 @@ export function getDiagnosticSessionActivitySnapshot( }; } +export function markDiagnosticRunProgressForTest(params: { + sessionId?: string; + sessionKey?: string; + runId?: string; + reason: string; +}): void { + const activity = resolveSessionActivity({ ...params, create: true }); + if (!activity) { + return; + } + touchSessionActivity(activity, params.reason); +} + export function resetDiagnosticRunActivityForTest(): void { activityByRef.clear(); activityByRunId.clear(); diff --git a/src/logging/diagnostic-stability-bundle.ts b/src/logging/diagnostic-stability-bundle.ts index b8b4984ccbf..7d9d96542a6 100644 --- a/src/logging/diagnostic-stability-bundle.ts +++ b/src/logging/diagnostic-stability-bundle.ts @@ -348,6 +348,7 @@ function readStabilityEventRecord( assignOptionalCodeString(sanitized, "reason", record.reason, `${label}.reason`); assignOptionalCodeString(sanitized, "outcome", record.outcome, `${label}.outcome`); assignOptionalCodeString(sanitized, "level", record.level, `${label}.level`); + assignOptionalCodeString(sanitized, "phase", record.phase, `${label}.phase`); assignOptionalCodeString(sanitized, "detector", record.detector, `${label}.detector`); assignOptionalCodeString(sanitized, "toolName", record.toolName, `${label}.toolName`); assignOptionalCodeString( diff --git a/src/logging/diagnostic-stability.ts b/src/logging/diagnostic-stability.ts index dd4c361111b..332b6806ee7 100644 --- a/src/logging/diagnostic-stability.ts +++ b/src/logging/diagnostic-stability.ts @@ -24,6 +24,7 @@ export type DiagnosticStabilityEventRecord = { outcome?: string; mode?: string; level?: string; + phase?: string; detector?: string; deliveryKind?: string; toolName?: string; @@ -292,6 +293,17 @@ function sanitizeDiagnosticEvent(event: DiagnosticEventPayload): DiagnosticStabi record.active = event.active; record.waiting = event.waiting; record.queued = event.queued; + record.phase = event.phase; + if (event.activeWorkLabels?.length) { + record.source = event.activeWorkLabels[0]; + } else if (event.queuedWorkLabels?.length) { + record.source = event.queuedWorkLabels[0]; + } + break; + case "diagnostic.phase.completed": + record.phase = event.name; + record.durationMs = event.durationMs; + record.cpuCoreRatio = event.cpuCoreRatio; break; case "tool.loop": record.toolName = event.toolName; diff --git a/src/logging/diagnostic.test.ts b/src/logging/diagnostic.test.ts index 22d5d3c829e..ac93e0e2212 100644 --- a/src/logging/diagnostic.test.ts +++ b/src/logging/diagnostic.test.ts @@ -8,8 +8,10 @@ import { setDiagnosticsEnabledForProcess, type DiagnosticEventPayload, } from "../infra/diagnostic-events.js"; +import { withDiagnosticPhase } from "./diagnostic-phase.js"; import { getDiagnosticSessionActivitySnapshot, + markDiagnosticRunProgressForTest, markDiagnosticEmbeddedRunStarted, } from "./diagnostic-run-activity.js"; import { @@ -368,6 +370,39 @@ describe("stuck session diagnostics threshold", () => { expect(recoverStuckSession).not.toHaveBeenCalled(); }); + it("flags stale terminal bridge progress in stalled session diagnostics", async () => { + const events: DiagnosticEventPayload[] = []; + const warnSpy = vi.spyOn(diagnosticLogger, "warn").mockImplementation(() => undefined); + const unsubscribe = onDiagnosticEvent((event) => { + events.push(event); + }); + try { + logSessionStateChange({ sessionId: "s1", sessionKey: "main", state: "processing" }); + markDiagnosticEmbeddedRunStarted({ sessionId: "s1", sessionKey: "main" }); + markDiagnosticRunProgressForTest({ + sessionId: "s1", + sessionKey: "main", + reason: "codex_app_server:notification:rawResponseItem/completed", + }); + startDiagnosticHeartbeat({ + diagnostics: { + enabled: true, + stuckSessionWarnMs: 30_000, + }, + }); + + vi.advanceTimersByTime(61_000); + } finally { + unsubscribe(); + } + + expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining("terminalProgressStale=true")); + expect(events.filter((event) => event.type === "session.stalled").at(-1)).toMatchObject({ + terminalProgressStale: true, + lastProgressReason: "codex_app_server:notification:rawResponseItem/completed", + }); + }); + it("aborts and drains embedded runs after an extended no-progress stall", () => { const events: DiagnosticEventPayload[] = []; const recoverStuckSession = vi.fn(); @@ -678,6 +713,55 @@ describe("stuck session diagnostics threshold", () => { ); }); + it("adds phase and work labels to liveness warnings", async () => { + const warnSpy = vi.spyOn(diagnosticLogger, "warn").mockImplementation(() => undefined); + const events: DiagnosticEventPayload[] = []; + const unsubscribe = onDiagnosticEvent((event) => events.push(event)); + let finishPhase!: () => void; + const phase = withDiagnosticPhase( + "startup.plugins.load", + () => + new Promise((resolve) => { + finishPhase = resolve; + }), + ); + + try { + startDiagnosticHeartbeat( + { + diagnostics: { + enabled: true, + }, + }, + { + emitMemorySample: createEmitMemorySampleMock(), + sampleLiveness: () => ({ + reasons: ["event_loop_delay"], + intervalMs: 30_000, + eventLoopDelayP99Ms: 1_500, + eventLoopDelayMaxMs: 2_000, + }), + }, + ); + + logMessageQueued({ sessionId: "s1", sessionKey: "main", source: "telegram" }); + vi.advanceTimersByTime(30_000); + } finally { + finishPhase(); + await phase; + unsubscribe(); + } + + expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining("phase=startup.plugins.load")); + expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining("work=[queued=main(")); + expect( + events.filter((event) => event.type === "diagnostic.liveness.warning").at(-1), + ).toMatchObject({ + phase: "startup.plugins.load", + queuedWorkLabels: [expect.stringContaining("main(")], + }); + }); + it("keeps transient event-loop max spikes debug-only when only background work is active", () => { const warnSpy = vi.spyOn(diagnosticLogger, "warn").mockImplementation(() => undefined); diff --git a/src/logging/diagnostic.ts b/src/logging/diagnostic.ts index c8840561499..65db6574ab4 100644 --- a/src/logging/diagnostic.ts +++ b/src/logging/diagnostic.ts @@ -5,9 +5,15 @@ import { areDiagnosticsEnabledForProcess, emitDiagnosticEvent, isDiagnosticsEnabled, + type DiagnosticPhaseSnapshot, type DiagnosticLivenessWarningReason, } from "../infra/diagnostic-events.js"; import { emitDiagnosticMemorySample, resetDiagnosticMemoryForTest } from "./diagnostic-memory.js"; +import { + getCurrentDiagnosticPhase, + getRecentDiagnosticPhases, + resetDiagnosticPhasesForTest, +} from "./diagnostic-phase.js"; import { getDiagnosticSessionActivitySnapshot, resetDiagnosticRunActivityForTest, @@ -81,6 +87,9 @@ type DiagnosticWorkSnapshot = { activeCount: number; waitingCount: number; queuedCount: number; + activeLabels: string[]; + waitingLabels: string[]; + queuedLabels: string[]; }; type RecoverStuckSession = (params: { @@ -142,21 +151,56 @@ function recoverStuckSession(params: { }); } -function getDiagnosticWorkSnapshot(): DiagnosticWorkSnapshot { +function formatDiagnosticWorkLabel( + state: { + sessionId?: string; + sessionKey?: string; + state: SessionStateValue; + queueDepth: number; + lastActivity: number; + }, + now: number, +): string { + const label = state.sessionKey ?? state.sessionId ?? "unknown"; + const ageSeconds = Math.round(Math.max(0, now - state.lastActivity) / 1000); + const activity = getDiagnosticSessionActivitySnapshot( + { sessionId: state.sessionId, sessionKey: state.sessionKey }, + now, + ); + const workKind = activity.activeWorkKind ? `/${activity.activeWorkKind}` : ""; + const lastProgress = activity.lastProgressReason ? ` last=${activity.lastProgressReason}` : ""; + return `${label}(${state.state}${workKind},q=${state.queueDepth},age=${ageSeconds}s${lastProgress})`; +} + +function pushLimitedDiagnosticLabel(labels: string[], label: string, limit = 5): void { + if (labels.length < limit) { + labels.push(label); + } +} + +function getDiagnosticWorkSnapshot(now = Date.now()): DiagnosticWorkSnapshot { let activeCount = 0; let waitingCount = 0; let queuedCount = 0; + const activeLabels: string[] = []; + const waitingLabels: string[] = []; + const queuedLabels: string[] = []; for (const state of diagnosticSessionStates.values()) { if (state.state === "processing") { activeCount += 1; + pushLimitedDiagnosticLabel(activeLabels, formatDiagnosticWorkLabel(state, now)); } else if (state.state === "waiting") { waitingCount += 1; + pushLimitedDiagnosticLabel(waitingLabels, formatDiagnosticWorkLabel(state, now)); + } + if (state.queueDepth > 0) { + pushLimitedDiagnosticLabel(queuedLabels, formatDiagnosticWorkLabel(state, now)); } queuedCount += state.queueDepth; } - return { activeCount, waitingCount, queuedCount }; + return { activeCount, waitingCount, queuedCount, activeLabels, waitingLabels, queuedLabels }; } function hasOpenDiagnosticWork(snapshot: DiagnosticWorkSnapshot): boolean { @@ -306,6 +350,10 @@ function emitDiagnosticLivenessWarning( sample: DiagnosticLivenessSample, work: DiagnosticWorkSnapshot, ): void { + const phase = getCurrentDiagnosticPhase(); + const recentPhases = getRecentDiagnosticPhases(6); + const recentPhaseSummary = formatRecentDiagnosticPhases(recentPhases); + const workLabelSummary = formatDiagnosticWorkLabels(work); const message = `liveness warning: reasons=${sample.reasons.join(",")} interval=${Math.round( sample.intervalMs / 1000, )}s eventLoopDelayP99Ms=${formatOptionalDiagnosticMetric( @@ -316,7 +364,11 @@ function emitDiagnosticLivenessWarning( sample.eventLoopUtilization, )} cpuCoreRatio=${formatOptionalDiagnosticMetric(sample.cpuCoreRatio)} active=${ work.activeCount - } waiting=${work.waitingCount} queued=${work.queuedCount}`; + } waiting=${work.waitingCount} queued=${work.queuedCount}${ + phase ? ` phase=${phase}` : "" + }${recentPhaseSummary ? ` recentPhases=${recentPhaseSummary}` : ""}${ + workLabelSummary ? ` work=[${workLabelSummary}]` : "" + }`; const hasBlockingWork = work.waitingCount > 0 || work.queuedCount > 0; const hasSustainedEventLoopDelay = (sample.eventLoopDelayP99Ms ?? 0) >= DEFAULT_LIVENESS_EVENT_LOOP_DELAY_WARN_MS; @@ -339,10 +391,28 @@ function emitDiagnosticLivenessWarning( active: work.activeCount, waiting: work.waitingCount, queued: work.queuedCount, + phase, + recentPhases, + activeWorkLabels: work.activeLabels, + waitingWorkLabels: work.waitingLabels, + queuedWorkLabels: work.queuedLabels, }); markActivity(); } +function formatRecentDiagnosticPhases(phases: DiagnosticPhaseSnapshot[]): string { + return phases.map((phase) => `${phase.name}:${Math.round(phase.durationMs ?? 0)}ms`).join(","); +} + +function formatDiagnosticWorkLabels(work: DiagnosticWorkSnapshot): string { + const parts = [ + work.activeLabels.length > 0 ? `active=${work.activeLabels.join("|")}` : "", + work.waitingLabels.length > 0 ? `waiting=${work.waitingLabels.join("|")}` : "", + work.queuedLabels.length > 0 ? `queued=${work.queuedLabels.join("|")}` : "", + ].filter(Boolean); + return parts.join(" "); +} + export function resolveStuckSessionWarnMs(config?: OpenClawConfig): number { const raw = config?.diagnostics?.stuckSessionWarnMs; if (typeof raw !== "number" || !Number.isFinite(raw)) { @@ -588,6 +658,9 @@ function sessionAttentionFields(params: { classification: SessionAttentionClassification; activity: DiagnosticSessionActivitySnapshot; }) { + const terminalProgressStale = isTerminalDiagnosticProgressReason( + params.activity.lastProgressReason, + ); return { ...(params.classification.activeWorkKind ? { activeWorkKind: params.classification.activeWorkKind } @@ -605,9 +678,24 @@ function sessionAttentionFields(params: { ...(params.activity.activeToolAgeMs !== undefined ? { activeToolAgeMs: params.activity.activeToolAgeMs } : {}), + ...(terminalProgressStale ? { terminalProgressStale: true } : {}), }; } +function isTerminalDiagnosticProgressReason(reason: string | undefined): boolean { + if (!reason) { + return false; + } + return ( + reason === "run:completed" || + reason === "embedded_run:ended" || + reason.includes("response.completed") || + reason.includes("rawResponseItem/completed") || + reason.includes("raw_response_item.completed") || + reason.includes("output_item.done") + ); +} + function formatSessionActivityLogFields(activity: DiagnosticSessionActivitySnapshot): string { const fields: string[] = []; if (activity.lastProgressReason) { @@ -625,6 +713,9 @@ function formatSessionActivityLogFields(activity: DiagnosticSessionActivitySnaps if (activity.activeToolAgeMs !== undefined) { fields.push(`activeToolAge=${Math.round(activity.activeToolAgeMs / 1000)}s`); } + if (isTerminalDiagnosticProgressReason(activity.lastProgressReason)) { + fields.push("terminalProgressStale=true"); + } return fields.join(" "); } @@ -835,7 +926,7 @@ export function startDiagnosticHeartbeat( const stuckSessionWarnMs = resolveStuckSessionWarnMs(heartbeatConfig); const now = Date.now(); pruneDiagnosticSessionStates(now, true); - const work = getDiagnosticWorkSnapshot(); + const work = getDiagnosticWorkSnapshot(now); const livenessSample = (opts?.sampleLiveness ?? sampleDiagnosticLiveness)(now, work); const shouldEmitLivenessEvent = livenessSample !== null && shouldEmitDiagnosticLivenessEvent(now); @@ -943,6 +1034,7 @@ export function resetDiagnosticStateForTest(): void { webhookStats.lastReceived = 0; stopDiagnosticHeartbeat(); resetDiagnosticMemoryForTest(); + resetDiagnosticPhasesForTest(); resetDiagnosticStabilityRecorderForTest(); resetDiagnosticStabilityBundleForTest(); }