feat: add gateway stall diagnostics

This commit is contained in:
Peter Steinberger
2026-05-04 22:34:59 +01:00
parent 358cd87ff3
commit e84d4b27f4
13 changed files with 401 additions and 14 deletions

View File

@@ -25,6 +25,7 @@ import { formatErrorMessage } from "../../infra/errors.js";
import { GatewayLockError } from "../../infra/gateway-lock.js";
import type { RespawnSupervisor } from "../../infra/supervisor-markers.js";
import { setConsoleSubsystemFilter, setConsoleTimestampPrefix } from "../../logging/console.js";
import { withDiagnosticPhase } from "../../logging/diagnostic-phase.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
import { defaultRuntime } from "../../runtime.js";
import {
@@ -158,7 +159,7 @@ function createGatewayCliStartupTrace() {
async measure<T>(name: string, run: () => Awaitable<T>): Promise<T> {
const before = performance.now();
try {
return await run();
return await withDiagnosticPhase(`cli.${name}`, run);
} finally {
const now = performance.now();
emit(name, now - before, now - started);

View File

@@ -37,6 +37,7 @@ import { ensureOpenClawCliOnPath } from "../infra/path-env.js";
import { setGatewaySigusr1RestartPolicy, setPreRestartDeferralCheck } from "../infra/restart.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import type { VoiceWakeRoutingConfig } from "../infra/voicewake-routing.js";
import { withDiagnosticPhase } from "../logging/diagnostic-phase.js";
import { startDiagnosticHeartbeat, stopDiagnosticHeartbeat } from "../logging/diagnostic.js";
import { createSubsystemLogger, runtimeForLogger } from "../logging/subsystem.js";
import {
@@ -355,7 +356,7 @@ function createGatewayStartupTrace() {
timelineOptions(),
);
try {
const result = await run();
const result = await withDiagnosticPhase(mapTimelineName(name), run, { traceName: name });
const now = performance.now();
emitDiagnosticsTimelineEvent(
{

View File

@@ -146,6 +146,7 @@ type DiagnosticSessionAttentionBaseEvent = DiagnosticBaseEvent & {
activeToolName?: string;
activeToolCallId?: string;
activeToolAgeMs?: number;
terminalProgressStale?: boolean;
};
export type DiagnosticSessionLongRunningEvent = DiagnosticSessionAttentionBaseEvent & {
@@ -206,6 +207,20 @@ export type DiagnosticHeartbeatEvent = DiagnosticBaseEvent & {
export type DiagnosticLivenessWarningReason = "event_loop_delay" | "event_loop_utilization" | "cpu";
export type DiagnosticPhaseDetails = Record<string, string | number | boolean>;
export type DiagnosticPhaseSnapshot = {
name: string;
startedAt: number;
endedAt?: number;
durationMs?: number;
cpuUserMs?: number;
cpuSystemMs?: number;
cpuTotalMs?: number;
cpuCoreRatio?: number;
details?: DiagnosticPhaseDetails;
};
export type DiagnosticLivenessWarningEvent = DiagnosticBaseEvent & {
type: "diagnostic.liveness.warning";
reasons: DiagnosticLivenessWarningReason[];
@@ -220,8 +235,18 @@ export type DiagnosticLivenessWarningEvent = DiagnosticBaseEvent & {
active: number;
waiting: number;
queued: number;
phase?: string;
recentPhases?: DiagnosticPhaseSnapshot[];
activeWorkLabels?: string[];
waitingWorkLabels?: string[];
queuedWorkLabels?: string[];
};
export type DiagnosticPhaseCompletedEvent = DiagnosticBaseEvent &
DiagnosticPhaseSnapshot & {
type: "diagnostic.phase.completed";
};
export type DiagnosticToolLoopEvent = DiagnosticBaseEvent & {
type: "tool.loop";
sessionKey?: string;
@@ -501,6 +526,7 @@ export type DiagnosticEventPayload =
| DiagnosticRunProgressEvent
| DiagnosticHeartbeatEvent
| DiagnosticLivenessWarningEvent
| DiagnosticPhaseCompletedEvent
| DiagnosticToolLoopEvent
| DiagnosticToolExecutionStartedEvent
| DiagnosticToolExecutionCompletedEvent

View File

@@ -497,6 +497,44 @@ describe("run-node script", () => {
});
});
it("adds Node sync I/O tracing flag to the launched OpenClaw child when requested", async () => {
await withTempDir({ prefix: "openclaw-run-node-" }, async (tmp) => {
await setupTrackedProject(tmp, {
files: {
[ROOT_SRC]: "export const value = 1;\n",
},
oldPaths: [ROOT_SRC, ROOT_TSCONFIG, ROOT_PACKAGE],
buildPaths: [DIST_ENTRY, BUILD_STAMP],
});
const spawnCalls: string[][] = [];
const spawn = (_cmd: string, args: string[]) => {
spawnCalls.push(args);
return createExitedProcess(0);
};
const { spawnSync } = createSpawnRecorder({
gitHead: "abc123\n",
gitStatus: "",
});
const exitCode = await runNodeMain({
cwd: tmp,
args: ["gateway", "--force"],
env: {
...process.env,
OPENCLAW_RUNNER_LOG: "0",
OPENCLAW_TRACE_SYNC_IO: "1",
},
spawn,
spawnSync,
execPath: process.execPath,
platform: process.platform,
});
expect(exitCode).toBe(0);
expect(spawnCalls.at(-1)).toEqual(["--trace-sync-io", "openclaw.mjs", "gateway", "--force"]);
});
});
it("surfaces generic output log stream errors", async () => {
await withTempDir({ prefix: "openclaw-run-node-" }, async (tmp) => {
await setupTrackedProject(tmp);

View File

@@ -0,0 +1,96 @@
import { performance } from "node:perf_hooks";
import {
areDiagnosticsEnabledForProcess,
emitDiagnosticEvent,
type DiagnosticPhaseDetails,
type DiagnosticPhaseSnapshot,
} from "../infra/diagnostic-events.js";
const RECENT_PHASE_CAPACITY = 40;
type ActiveDiagnosticPhase = {
name: string;
startedAt: number;
startedWallMs: number;
cpuStarted: NodeJS.CpuUsage;
details?: DiagnosticPhaseDetails;
};
let activePhaseStack: ActiveDiagnosticPhase[] = [];
let recentPhases: DiagnosticPhaseSnapshot[] = [];
function roundMetric(value: number, digits = 1): number {
if (!Number.isFinite(value)) {
return 0;
}
const factor = 10 ** digits;
return Math.round(value * factor) / factor;
}
function pushRecentPhase(snapshot: DiagnosticPhaseSnapshot): void {
recentPhases.push(snapshot);
if (recentPhases.length > RECENT_PHASE_CAPACITY) {
recentPhases = recentPhases.slice(-RECENT_PHASE_CAPACITY);
}
}
export function getCurrentDiagnosticPhase(): string | undefined {
return activePhaseStack.at(-1)?.name;
}
export function getRecentDiagnosticPhases(limit = 8): DiagnosticPhaseSnapshot[] {
return recentPhases.slice(-Math.max(0, limit)).map((phase) => ({ ...phase }));
}
export function recordDiagnosticPhase(snapshot: DiagnosticPhaseSnapshot): void {
pushRecentPhase(snapshot);
if (!areDiagnosticsEnabledForProcess()) {
return;
}
emitDiagnosticEvent({
type: "diagnostic.phase.completed",
...snapshot,
});
}
export async function withDiagnosticPhase<T>(
name: string,
run: () => Promise<T> | T,
details?: DiagnosticPhaseDetails,
): Promise<T> {
const active: ActiveDiagnosticPhase = {
name,
startedAt: Date.now(),
startedWallMs: performance.now(),
cpuStarted: process.cpuUsage(),
details,
};
activePhaseStack.push(active);
try {
return await run();
} finally {
const endedAt = Date.now();
const durationMs = roundMetric(performance.now() - active.startedWallMs, 1);
const cpu = process.cpuUsage(active.cpuStarted);
const cpuUserMs = roundMetric(cpu.user / 1_000, 1);
const cpuSystemMs = roundMetric(cpu.system / 1_000, 1);
const cpuTotalMs = roundMetric(cpuUserMs + cpuSystemMs, 1);
activePhaseStack = activePhaseStack.filter((entry) => entry !== active);
recordDiagnosticPhase({
name,
startedAt: active.startedAt,
endedAt,
durationMs,
cpuUserMs,
cpuSystemMs,
cpuTotalMs,
cpuCoreRatio: roundMetric(cpuTotalMs / Math.max(1, durationMs), 3),
details: active.details,
});
}
}
export function resetDiagnosticPhasesForTest(): void {
activePhaseStack = [];
recentPhases = [];
}

View File

@@ -294,6 +294,19 @@ export function getDiagnosticSessionActivitySnapshot(
};
}
export function markDiagnosticRunProgressForTest(params: {
sessionId?: string;
sessionKey?: string;
runId?: string;
reason: string;
}): void {
const activity = resolveSessionActivity({ ...params, create: true });
if (!activity) {
return;
}
touchSessionActivity(activity, params.reason);
}
export function resetDiagnosticRunActivityForTest(): void {
activityByRef.clear();
activityByRunId.clear();

View File

@@ -348,6 +348,7 @@ function readStabilityEventRecord(
assignOptionalCodeString(sanitized, "reason", record.reason, `${label}.reason`);
assignOptionalCodeString(sanitized, "outcome", record.outcome, `${label}.outcome`);
assignOptionalCodeString(sanitized, "level", record.level, `${label}.level`);
assignOptionalCodeString(sanitized, "phase", record.phase, `${label}.phase`);
assignOptionalCodeString(sanitized, "detector", record.detector, `${label}.detector`);
assignOptionalCodeString(sanitized, "toolName", record.toolName, `${label}.toolName`);
assignOptionalCodeString(

View File

@@ -24,6 +24,7 @@ export type DiagnosticStabilityEventRecord = {
outcome?: string;
mode?: string;
level?: string;
phase?: string;
detector?: string;
deliveryKind?: string;
toolName?: string;
@@ -292,6 +293,17 @@ function sanitizeDiagnosticEvent(event: DiagnosticEventPayload): DiagnosticStabi
record.active = event.active;
record.waiting = event.waiting;
record.queued = event.queued;
record.phase = event.phase;
if (event.activeWorkLabels?.length) {
record.source = event.activeWorkLabels[0];
} else if (event.queuedWorkLabels?.length) {
record.source = event.queuedWorkLabels[0];
}
break;
case "diagnostic.phase.completed":
record.phase = event.name;
record.durationMs = event.durationMs;
record.cpuCoreRatio = event.cpuCoreRatio;
break;
case "tool.loop":
record.toolName = event.toolName;

View File

@@ -8,8 +8,10 @@ import {
setDiagnosticsEnabledForProcess,
type DiagnosticEventPayload,
} from "../infra/diagnostic-events.js";
import { withDiagnosticPhase } from "./diagnostic-phase.js";
import {
getDiagnosticSessionActivitySnapshot,
markDiagnosticRunProgressForTest,
markDiagnosticEmbeddedRunStarted,
} from "./diagnostic-run-activity.js";
import {
@@ -368,6 +370,39 @@ describe("stuck session diagnostics threshold", () => {
expect(recoverStuckSession).not.toHaveBeenCalled();
});
it("flags stale terminal bridge progress in stalled session diagnostics", async () => {
const events: DiagnosticEventPayload[] = [];
const warnSpy = vi.spyOn(diagnosticLogger, "warn").mockImplementation(() => undefined);
const unsubscribe = onDiagnosticEvent((event) => {
events.push(event);
});
try {
logSessionStateChange({ sessionId: "s1", sessionKey: "main", state: "processing" });
markDiagnosticEmbeddedRunStarted({ sessionId: "s1", sessionKey: "main" });
markDiagnosticRunProgressForTest({
sessionId: "s1",
sessionKey: "main",
reason: "codex_app_server:notification:rawResponseItem/completed",
});
startDiagnosticHeartbeat({
diagnostics: {
enabled: true,
stuckSessionWarnMs: 30_000,
},
});
vi.advanceTimersByTime(61_000);
} finally {
unsubscribe();
}
expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining("terminalProgressStale=true"));
expect(events.filter((event) => event.type === "session.stalled").at(-1)).toMatchObject({
terminalProgressStale: true,
lastProgressReason: "codex_app_server:notification:rawResponseItem/completed",
});
});
it("aborts and drains embedded runs after an extended no-progress stall", () => {
const events: DiagnosticEventPayload[] = [];
const recoverStuckSession = vi.fn();
@@ -678,6 +713,55 @@ describe("stuck session diagnostics threshold", () => {
);
});
it("adds phase and work labels to liveness warnings", async () => {
const warnSpy = vi.spyOn(diagnosticLogger, "warn").mockImplementation(() => undefined);
const events: DiagnosticEventPayload[] = [];
const unsubscribe = onDiagnosticEvent((event) => events.push(event));
let finishPhase!: () => void;
const phase = withDiagnosticPhase(
"startup.plugins.load",
() =>
new Promise<void>((resolve) => {
finishPhase = resolve;
}),
);
try {
startDiagnosticHeartbeat(
{
diagnostics: {
enabled: true,
},
},
{
emitMemorySample: createEmitMemorySampleMock(),
sampleLiveness: () => ({
reasons: ["event_loop_delay"],
intervalMs: 30_000,
eventLoopDelayP99Ms: 1_500,
eventLoopDelayMaxMs: 2_000,
}),
},
);
logMessageQueued({ sessionId: "s1", sessionKey: "main", source: "telegram" });
vi.advanceTimersByTime(30_000);
} finally {
finishPhase();
await phase;
unsubscribe();
}
expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining("phase=startup.plugins.load"));
expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining("work=[queued=main("));
expect(
events.filter((event) => event.type === "diagnostic.liveness.warning").at(-1),
).toMatchObject({
phase: "startup.plugins.load",
queuedWorkLabels: [expect.stringContaining("main(")],
});
});
it("keeps transient event-loop max spikes debug-only when only background work is active", () => {
const warnSpy = vi.spyOn(diagnosticLogger, "warn").mockImplementation(() => undefined);

View File

@@ -5,9 +5,15 @@ import {
areDiagnosticsEnabledForProcess,
emitDiagnosticEvent,
isDiagnosticsEnabled,
type DiagnosticPhaseSnapshot,
type DiagnosticLivenessWarningReason,
} from "../infra/diagnostic-events.js";
import { emitDiagnosticMemorySample, resetDiagnosticMemoryForTest } from "./diagnostic-memory.js";
import {
getCurrentDiagnosticPhase,
getRecentDiagnosticPhases,
resetDiagnosticPhasesForTest,
} from "./diagnostic-phase.js";
import {
getDiagnosticSessionActivitySnapshot,
resetDiagnosticRunActivityForTest,
@@ -81,6 +87,9 @@ type DiagnosticWorkSnapshot = {
activeCount: number;
waitingCount: number;
queuedCount: number;
activeLabels: string[];
waitingLabels: string[];
queuedLabels: string[];
};
type RecoverStuckSession = (params: {
@@ -142,21 +151,56 @@ function recoverStuckSession(params: {
});
}
function getDiagnosticWorkSnapshot(): DiagnosticWorkSnapshot {
function formatDiagnosticWorkLabel(
state: {
sessionId?: string;
sessionKey?: string;
state: SessionStateValue;
queueDepth: number;
lastActivity: number;
},
now: number,
): string {
const label = state.sessionKey ?? state.sessionId ?? "unknown";
const ageSeconds = Math.round(Math.max(0, now - state.lastActivity) / 1000);
const activity = getDiagnosticSessionActivitySnapshot(
{ sessionId: state.sessionId, sessionKey: state.sessionKey },
now,
);
const workKind = activity.activeWorkKind ? `/${activity.activeWorkKind}` : "";
const lastProgress = activity.lastProgressReason ? ` last=${activity.lastProgressReason}` : "";
return `${label}(${state.state}${workKind},q=${state.queueDepth},age=${ageSeconds}s${lastProgress})`;
}
function pushLimitedDiagnosticLabel(labels: string[], label: string, limit = 5): void {
if (labels.length < limit) {
labels.push(label);
}
}
function getDiagnosticWorkSnapshot(now = Date.now()): DiagnosticWorkSnapshot {
let activeCount = 0;
let waitingCount = 0;
let queuedCount = 0;
const activeLabels: string[] = [];
const waitingLabels: string[] = [];
const queuedLabels: string[] = [];
for (const state of diagnosticSessionStates.values()) {
if (state.state === "processing") {
activeCount += 1;
pushLimitedDiagnosticLabel(activeLabels, formatDiagnosticWorkLabel(state, now));
} else if (state.state === "waiting") {
waitingCount += 1;
pushLimitedDiagnosticLabel(waitingLabels, formatDiagnosticWorkLabel(state, now));
}
if (state.queueDepth > 0) {
pushLimitedDiagnosticLabel(queuedLabels, formatDiagnosticWorkLabel(state, now));
}
queuedCount += state.queueDepth;
}
return { activeCount, waitingCount, queuedCount };
return { activeCount, waitingCount, queuedCount, activeLabels, waitingLabels, queuedLabels };
}
function hasOpenDiagnosticWork(snapshot: DiagnosticWorkSnapshot): boolean {
@@ -306,6 +350,10 @@ function emitDiagnosticLivenessWarning(
sample: DiagnosticLivenessSample,
work: DiagnosticWorkSnapshot,
): void {
const phase = getCurrentDiagnosticPhase();
const recentPhases = getRecentDiagnosticPhases(6);
const recentPhaseSummary = formatRecentDiagnosticPhases(recentPhases);
const workLabelSummary = formatDiagnosticWorkLabels(work);
const message = `liveness warning: reasons=${sample.reasons.join(",")} interval=${Math.round(
sample.intervalMs / 1000,
)}s eventLoopDelayP99Ms=${formatOptionalDiagnosticMetric(
@@ -316,7 +364,11 @@ function emitDiagnosticLivenessWarning(
sample.eventLoopUtilization,
)} cpuCoreRatio=${formatOptionalDiagnosticMetric(sample.cpuCoreRatio)} active=${
work.activeCount
} waiting=${work.waitingCount} queued=${work.queuedCount}`;
} waiting=${work.waitingCount} queued=${work.queuedCount}${
phase ? ` phase=${phase}` : ""
}${recentPhaseSummary ? ` recentPhases=${recentPhaseSummary}` : ""}${
workLabelSummary ? ` work=[${workLabelSummary}]` : ""
}`;
const hasBlockingWork = work.waitingCount > 0 || work.queuedCount > 0;
const hasSustainedEventLoopDelay =
(sample.eventLoopDelayP99Ms ?? 0) >= DEFAULT_LIVENESS_EVENT_LOOP_DELAY_WARN_MS;
@@ -339,10 +391,28 @@ function emitDiagnosticLivenessWarning(
active: work.activeCount,
waiting: work.waitingCount,
queued: work.queuedCount,
phase,
recentPhases,
activeWorkLabels: work.activeLabels,
waitingWorkLabels: work.waitingLabels,
queuedWorkLabels: work.queuedLabels,
});
markActivity();
}
function formatRecentDiagnosticPhases(phases: DiagnosticPhaseSnapshot[]): string {
return phases.map((phase) => `${phase.name}:${Math.round(phase.durationMs ?? 0)}ms`).join(",");
}
function formatDiagnosticWorkLabels(work: DiagnosticWorkSnapshot): string {
const parts = [
work.activeLabels.length > 0 ? `active=${work.activeLabels.join("|")}` : "",
work.waitingLabels.length > 0 ? `waiting=${work.waitingLabels.join("|")}` : "",
work.queuedLabels.length > 0 ? `queued=${work.queuedLabels.join("|")}` : "",
].filter(Boolean);
return parts.join(" ");
}
export function resolveStuckSessionWarnMs(config?: OpenClawConfig): number {
const raw = config?.diagnostics?.stuckSessionWarnMs;
if (typeof raw !== "number" || !Number.isFinite(raw)) {
@@ -588,6 +658,9 @@ function sessionAttentionFields(params: {
classification: SessionAttentionClassification;
activity: DiagnosticSessionActivitySnapshot;
}) {
const terminalProgressStale = isTerminalDiagnosticProgressReason(
params.activity.lastProgressReason,
);
return {
...(params.classification.activeWorkKind
? { activeWorkKind: params.classification.activeWorkKind }
@@ -605,9 +678,24 @@ function sessionAttentionFields(params: {
...(params.activity.activeToolAgeMs !== undefined
? { activeToolAgeMs: params.activity.activeToolAgeMs }
: {}),
...(terminalProgressStale ? { terminalProgressStale: true } : {}),
};
}
function isTerminalDiagnosticProgressReason(reason: string | undefined): boolean {
if (!reason) {
return false;
}
return (
reason === "run:completed" ||
reason === "embedded_run:ended" ||
reason.includes("response.completed") ||
reason.includes("rawResponseItem/completed") ||
reason.includes("raw_response_item.completed") ||
reason.includes("output_item.done")
);
}
function formatSessionActivityLogFields(activity: DiagnosticSessionActivitySnapshot): string {
const fields: string[] = [];
if (activity.lastProgressReason) {
@@ -625,6 +713,9 @@ function formatSessionActivityLogFields(activity: DiagnosticSessionActivitySnaps
if (activity.activeToolAgeMs !== undefined) {
fields.push(`activeToolAge=${Math.round(activity.activeToolAgeMs / 1000)}s`);
}
if (isTerminalDiagnosticProgressReason(activity.lastProgressReason)) {
fields.push("terminalProgressStale=true");
}
return fields.join(" ");
}
@@ -835,7 +926,7 @@ export function startDiagnosticHeartbeat(
const stuckSessionWarnMs = resolveStuckSessionWarnMs(heartbeatConfig);
const now = Date.now();
pruneDiagnosticSessionStates(now, true);
const work = getDiagnosticWorkSnapshot();
const work = getDiagnosticWorkSnapshot(now);
const livenessSample = (opts?.sampleLiveness ?? sampleDiagnosticLiveness)(now, work);
const shouldEmitLivenessEvent =
livenessSample !== null && shouldEmitDiagnosticLivenessEvent(now);
@@ -943,6 +1034,7 @@ export function resetDiagnosticStateForTest(): void {
webhookStats.lastReceived = 0;
stopDiagnosticHeartbeat();
resetDiagnosticMemoryForTest();
resetDiagnosticPhasesForTest();
resetDiagnosticStabilityRecorderForTest();
resetDiagnosticStabilityBundleForTest();
}