From 32db81ca5c352846ff16751dde703e510e7c8cea Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 1 May 2026 22:34:52 +0100 Subject: [PATCH] fix: classify session liveness diagnostics --- docs/concepts/agent-loop.md | 2 +- docs/concepts/queue.md | 2 +- docs/gateway/configuration-reference.md | 2 +- docs/gateway/opentelemetry.md | 8 +- .../codex/src/app-server/run-attempt.ts | 8 + .../diagnostics-otel/src/service.test.ts | 1 + extensions/diagnostics-otel/src/service.ts | 5 + extensions/whatsapp/api.ts | 1 - src/agents/pi-embedded-runner/runs.ts | 7 + src/config/schema.base.generated.ts | 8 +- src/config/schema.help.ts | 2 +- src/config/schema.labels.ts | 2 +- src/infra/diagnostic-events.ts | 49 ++- src/logging/diagnostic-run-activity.ts | 281 ++++++++++++++++++ src/logging/diagnostic-stability-bundle.ts | 6 + src/logging/diagnostic-stability.ts | 15 + src/logging/diagnostic.test.ts | 74 ++++- src/logging/diagnostic.ts | 189 ++++++++++-- 18 files changed, 621 insertions(+), 41 deletions(-) create mode 100644 src/logging/diagnostic-run-activity.ts diff --git a/docs/concepts/agent-loop.md b/docs/concepts/agent-loop.md index 79a7bfb5f53..d04f0d9d1ad 100644 --- a/docs/concepts/agent-loop.md +++ b/docs/concepts/agent-loop.md @@ -164,7 +164,7 @@ surfaces, while Codex native hooks remain a separate lower-level Codex mechanism - `agent.wait` default: 30s (just the wait). `timeoutMs` param overrides. - Agent runtime: `agents.defaults.timeoutSeconds` default 172800s (48 hours); enforced in `runEmbeddedPiAgent` abort timer. - Cron runtime: isolated agent-turn `timeoutSeconds` is owned by cron. The scheduler starts that timer when execution begins, aborts the underlying run at the configured deadline, then runs bounded cleanup before recording the timeout so a stale child session cannot keep the lane stuck. -- Stuck-session recovery: with diagnostics enabled, `diagnostics.stuckSessionWarnMs` detects long `processing` sessions. Active embedded runs, active reply operations, and active session-lane tasks remain warning-only by default; if diagnostics show no active work for the session, the watchdog releases the affected session lane so queued startup work can drain. +- Session liveness diagnostics: with diagnostics enabled, `diagnostics.stuckSessionWarnMs` classifies long `processing` sessions. Active embedded runs, model calls, and tool calls report as `session.long_running`; active work with no recent progress reports as `session.stalled`; `session.stuck` is reserved for stale session bookkeeping with no active work, and only that path releases the affected session lane so queued startup work can drain. - Model idle timeout: OpenClaw aborts a model request when no response chunks arrive before the idle window. `models.providers..timeoutSeconds` extends this idle watchdog for slow local/self-hosted providers; otherwise OpenClaw uses `agents.defaults.timeoutSeconds` when configured, capped at 120s by default. Cron-triggered runs with no explicit model or agent timeout disable the idle watchdog and rely on the cron outer timeout. - Provider HTTP request timeout: `models.providers..timeoutSeconds` applies to that provider's model HTTP fetches, including connect, headers, body, SDK request timeout, total guarded-fetch abort handling, and model stream idle watchdog. Use this for slow local/self-hosted providers such as Ollama before raising the whole agent runtime timeout. diff --git a/docs/concepts/queue.md b/docs/concepts/queue.md index 41e501e82b2..10c847a7c6d 100644 --- a/docs/concepts/queue.md +++ b/docs/concepts/queue.md @@ -115,7 +115,7 @@ keys. - If commands seem stuck, enable verbose logs and look for “queued for …ms” lines to confirm the queue is draining. - If you need queue depth, enable verbose logs and watch for queue timing lines. - Codex app-server runs that accept a turn and then stop emitting progress are interrupted by the Codex adapter so the active session lane can release instead of waiting for the outer run timeout. -- When diagnostics are enabled, sessions that remain in `processing` past `diagnostics.stuckSessionWarnMs` log a stuck-session warning. Active embedded runs, active reply operations, and active lane tasks remain warning-only by default; stale startup bookkeeping with no active session work can release the affected session lane so queued work drains. +- When diagnostics are enabled, sessions that remain in `processing` past `diagnostics.stuckSessionWarnMs` are classified by current activity. Active work logs as `session.long_running`; active work with no recent progress logs as `session.stalled`; `session.stuck` is reserved for stale session bookkeeping with no active work, and only that path can release the affected session lane so queued work drains. ## Related diff --git a/docs/gateway/configuration-reference.md b/docs/gateway/configuration-reference.md index 385336a5143..b63f1f4b407 100644 --- a/docs/gateway/configuration-reference.md +++ b/docs/gateway/configuration-reference.md @@ -937,7 +937,7 @@ Notes: - `enabled`: master toggle for instrumentation output (default: `true`). - `flags`: array of flag strings enabling targeted log output (supports wildcards like `"telegram.*"` or `"*"`). -- `stuckSessionWarnMs`: age threshold in ms for emitting stuck-session warnings while a session remains in processing state. +- `stuckSessionWarnMs`: age threshold in ms for classifying long-running processing sessions as `session.long_running`, `session.stalled`, or `session.stuck`. - `otel.enabled`: enables the OpenTelemetry export pipeline (default: `false`). For the full configuration, signal catalog, and privacy model, see [OpenTelemetry export](/gateway/opentelemetry). - `otel.endpoint`: collector URL for OTel export. - `otel.tracesEndpoint` / `otel.metricsEndpoint` / `otel.logsEndpoint`: optional signal-specific OTLP endpoints. When set, they override `otel.endpoint` for that signal only. diff --git a/docs/gateway/opentelemetry.md b/docs/gateway/opentelemetry.md index 55a108aa949..1bab14a7d83 100644 --- a/docs/gateway/opentelemetry.md +++ b/docs/gateway/opentelemetry.md @@ -192,8 +192,8 @@ When any subkey is enabled, model and tool spans get bounded, redacted - `openclaw.queue.depth` (histogram, attrs: `openclaw.lane` or `openclaw.channel=heartbeat`) - `openclaw.queue.wait_ms` (histogram, attrs: `openclaw.lane`) - `openclaw.session.state` (counter, attrs: `openclaw.state`, `openclaw.reason`) -- `openclaw.session.stuck` (counter, attrs: `openclaw.state`) -- `openclaw.session.stuck_age_ms` (histogram, attrs: `openclaw.state`) +- `openclaw.session.stuck` (counter, attrs: `openclaw.state`; emitted only for stale session bookkeeping with no active work) +- `openclaw.session.stuck_age_ms` (histogram, attrs: `openclaw.state`; emitted only for stale session bookkeeping with no active work) - `openclaw.run.attempt` (counter, attrs: `openclaw.attempt`) ### Harness lifecycle @@ -277,8 +277,8 @@ to them directly without OTLP export. **Queue and session** - `queue.lane.enqueue` / `queue.lane.dequeue` -- `session.state` / `session.stuck` -- `run.attempt` +- `session.state` / `session.long_running` / `session.stalled` / `session.stuck` +- `run.attempt` / `run.progress` - `diagnostic.heartbeat` (aggregate counters: webhooks/queue/session) **Harness lifecycle** diff --git a/extensions/codex/src/app-server/run-attempt.ts b/extensions/codex/src/app-server/run-attempt.ts index 5f03573c22a..85b89dcbe01 100644 --- a/extensions/codex/src/app-server/run-attempt.ts +++ b/extensions/codex/src/app-server/run-attempt.ts @@ -36,6 +36,7 @@ import { type NativeHookRelayEvent, type NativeHookRelayRegistrationHandle, } from "openclaw/plugin-sdk/agent-harness-runtime"; +import { emitTrustedDiagnosticEvent } from "openclaw/plugin-sdk/diagnostic-runtime"; import { handleCodexAppServerApprovalRequest } from "./approval-bridge.js"; import { refreshCodexAppServerAuthTokens } from "./auth-bridge.js"; import { @@ -708,6 +709,13 @@ export async function runCodexAppServerAttempt( const touchTurnCompletionActivity = (reason: string, options?: { arm?: boolean }) => { turnCompletionLastActivityAt = Date.now(); turnCompletionLastActivityReason = reason; + emitTrustedDiagnosticEvent({ + type: "run.progress", + runId: params.runId, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + reason: `codex_app_server:${reason}`, + }); if (options?.arm) { turnCompletionIdleWatchArmed = true; } diff --git a/extensions/diagnostics-otel/src/service.test.ts b/extensions/diagnostics-otel/src/service.test.ts index 2d32a0322ea..4b637adf5ed 100644 --- a/extensions/diagnostics-otel/src/service.test.ts +++ b/extensions/diagnostics-otel/src/service.test.ts @@ -320,6 +320,7 @@ describe("diagnostics-otel service", () => { type: "session.stuck", state: "processing", ageMs: 125_000, + classification: "stale_session_state", }); emitDiagnosticEvent({ type: "run.attempt", diff --git a/extensions/diagnostics-otel/src/service.ts b/extensions/diagnostics-otel/src/service.ts index 527f1f20b1b..755d465019a 100644 --- a/extensions/diagnostics-otel/src/service.ts +++ b/extensions/diagnostics-otel/src/service.ts @@ -2215,12 +2215,17 @@ export function createDiagnosticsOtelService(): OpenClawPluginService { case "session.state": recordSessionState(evt); return; + case "session.long_running": + case "session.stalled": + return; case "session.stuck": recordSessionStuck(evt); return; case "run.attempt": recordRunAttempt(evt); return; + case "run.progress": + return; case "diagnostic.heartbeat": recordHeartbeat(evt); return; diff --git a/extensions/whatsapp/api.ts b/extensions/whatsapp/api.ts index 0fe9da961d4..70839d9f409 100644 --- a/extensions/whatsapp/api.ts +++ b/extensions/whatsapp/api.ts @@ -79,7 +79,6 @@ export { logMessageQueued, logRunAttempt, logSessionStateChange, - logSessionStuck, logSuccess, logToolLoopAction, logWarn, diff --git a/src/agents/pi-embedded-runner/runs.ts b/src/agents/pi-embedded-runner/runs.ts index a808336513d..b354baa8562 100644 --- a/src/agents/pi-embedded-runner/runs.ts +++ b/src/agents/pi-embedded-runner/runs.ts @@ -8,6 +8,10 @@ import { resolveActiveReplyRunSessionId, waitForReplyRunEndBySessionId, } from "../../auto-reply/reply/reply-run-registry.js"; +import { + markDiagnosticEmbeddedRunEnded, + markDiagnosticEmbeddedRunStarted, +} from "../../logging/diagnostic-run-activity.js"; import { diagnosticLogger as diag, logMessageQueued, @@ -366,6 +370,7 @@ export function setActiveEmbeddedRun( state: "processing", reason: wasActive ? "run_replaced" : "run_started", }); + markDiagnosticEmbeddedRunStarted({ sessionId, sessionKey }); if (!sessionId.startsWith("probe-")) { diag.debug(`run registered: sessionId=${sessionId} totalActive=${ACTIVE_EMBEDDED_RUNS.size}`); } @@ -392,6 +397,7 @@ export function clearActiveEmbeddedRun( EMBEDDED_RUN_MODEL_SWITCH_REQUESTS.delete(sessionId); clearActiveRunSessionKeys(sessionId, sessionKey); logSessionStateChange({ sessionId, sessionKey, state: "idle", reason: "run_completed" }); + markDiagnosticEmbeddedRunEnded({ sessionId, sessionKey }); if (!sessionId.startsWith("probe-")) { diag.debug(`run cleared: sessionId=${sessionId} totalActive=${ACTIVE_EMBEDDED_RUNS.size}`); } @@ -413,6 +419,7 @@ export function forceClearEmbeddedPiRun( EMBEDDED_RUN_MODEL_SWITCH_REQUESTS.delete(sessionId); clearActiveRunSessionKeys(sessionId, sessionKey); logSessionStateChange({ sessionId, sessionKey, state: "idle", reason }); + markDiagnosticEmbeddedRunEnded({ sessionId, sessionKey }); notifyEmbeddedRunEnded(sessionId); cleared = true; } diff --git a/src/config/schema.base.generated.ts b/src/config/schema.base.generated.ts index c131b83e7b2..f0799f24813 100644 --- a/src/config/schema.base.generated.ts +++ b/src/config/schema.base.generated.ts @@ -147,9 +147,9 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = { type: "integer", exclusiveMinimum: 0, maximum: 9007199254740991, - title: "Stuck Session Warning Threshold (ms)", + title: "Session Liveness Threshold (ms)", description: - "Age threshold in milliseconds for emitting stuck-session warnings while a session remains in processing state. Increase for long multi-tool turns to reduce false positives; decrease for faster hang detection.", + "Age threshold in milliseconds for classifying long processing sessions as long-running, stalled, or stuck. Increase for long multi-tool turns; decrease for faster stale-session detection.", }, otel: { type: "object", @@ -24484,8 +24484,8 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = { tags: ["observability"], }, "diagnostics.stuckSessionWarnMs": { - label: "Stuck Session Warning Threshold (ms)", - help: "Age threshold in milliseconds for emitting stuck-session warnings while a session remains in processing state. Increase for long multi-tool turns to reduce false positives; decrease for faster hang detection.", + label: "Session Liveness Threshold (ms)", + help: "Age threshold in milliseconds for classifying long processing sessions as long-running, stalled, or stuck. Increase for long multi-tool turns; decrease for faster stale-session detection.", tags: ["observability", "storage"], }, "diagnostics.otel.enabled": { diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index 57272b772d4..34ea63e73c6 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -585,7 +585,7 @@ export const FIELD_HELP: Record = { "diagnostics.enabled": "Master toggle for diagnostics instrumentation output in logs and telemetry wiring paths. Defaults to enabled; set false only in tightly constrained environments.", "diagnostics.stuckSessionWarnMs": - "Age threshold in milliseconds for emitting stuck-session warnings while a session remains in processing state. Increase for long multi-tool turns to reduce false positives; decrease for faster hang detection.", + "Age threshold in milliseconds for classifying long processing sessions as long-running, stalled, or stuck. Increase for long multi-tool turns; decrease for faster stale-session detection.", "diagnostics.otel.enabled": "Enables OpenTelemetry export pipeline for traces, metrics, and logs based on configured endpoint/protocol settings. Keep disabled unless your collector endpoint and auth are fully configured.", "diagnostics.otel.endpoint": diff --git a/src/config/schema.labels.ts b/src/config/schema.labels.ts index 6b06cc5a954..b9d78eeafed 100644 --- a/src/config/schema.labels.ts +++ b/src/config/schema.labels.ts @@ -40,7 +40,7 @@ export const FIELD_LABELS: Record = { "commitments.maxPerDay": "Commitments per Day", "diagnostics.enabled": "Diagnostics Enabled", "diagnostics.flags": "Diagnostics Flags", - "diagnostics.stuckSessionWarnMs": "Stuck Session Warning Threshold (ms)", + "diagnostics.stuckSessionWarnMs": "Session Liveness Threshold (ms)", "diagnostics.otel.enabled": "OpenTelemetry Enabled", "diagnostics.otel.endpoint": "OpenTelemetry Endpoint", "diagnostics.otel.tracesEndpoint": "OpenTelemetry Traces Endpoint", diff --git a/src/infra/diagnostic-events.ts b/src/infra/diagnostic-events.ts index 769eb513a80..65308a335f8 100644 --- a/src/infra/diagnostic-events.ts +++ b/src/infra/diagnostic-events.ts @@ -124,14 +124,47 @@ export type DiagnosticSessionStateEvent = DiagnosticBaseEvent & { queueDepth?: number; }; -export type DiagnosticSessionStuckEvent = DiagnosticBaseEvent & { - type: "session.stuck"; +export type DiagnosticSessionActiveWorkKind = + | "embedded_run" + | "model_call" + | "tool_call" + | "queued_work"; + +export type DiagnosticSessionAttentionClassification = + | "long_running" + | "blocked_tool_call" + | "stalled_agent_run" + | "stale_session_state"; + +type DiagnosticSessionAttentionBaseEvent = DiagnosticBaseEvent & { sessionKey?: string; sessionId?: string; state: DiagnosticSessionState; ageMs: number; queueDepth?: number; reason?: string; + classification: DiagnosticSessionAttentionClassification; + activeWorkKind?: DiagnosticSessionActiveWorkKind; + lastProgressAgeMs?: number; + lastProgressReason?: string; + activeToolName?: string; + activeToolCallId?: string; + activeToolAgeMs?: number; +}; + +export type DiagnosticSessionLongRunningEvent = DiagnosticSessionAttentionBaseEvent & { + type: "session.long_running"; + classification: "long_running"; +}; + +export type DiagnosticSessionStalledEvent = DiagnosticSessionAttentionBaseEvent & { + type: "session.stalled"; + classification: "blocked_tool_call" | "stalled_agent_run"; +}; + +export type DiagnosticSessionStuckEvent = DiagnosticSessionAttentionBaseEvent & { + type: "session.stuck"; + classification: "stale_session_state"; }; export type DiagnosticLaneEnqueueEvent = DiagnosticBaseEvent & { @@ -155,6 +188,14 @@ export type DiagnosticRunAttemptEvent = DiagnosticBaseEvent & { attempt: number; }; +export type DiagnosticRunProgressEvent = DiagnosticBaseEvent & { + type: "run.progress"; + sessionKey?: string; + sessionId?: string; + runId?: string; + reason: string; +}; + export type DiagnosticHeartbeatEvent = DiagnosticBaseEvent & { type: "diagnostic.heartbeat"; webhooks: { @@ -455,10 +496,13 @@ export type DiagnosticEventPayload = | DiagnosticMessageDeliveryCompletedEvent | DiagnosticMessageDeliveryErrorEvent | DiagnosticSessionStateEvent + | DiagnosticSessionLongRunningEvent + | DiagnosticSessionStalledEvent | DiagnosticSessionStuckEvent | DiagnosticLaneEnqueueEvent | DiagnosticLaneDequeueEvent | DiagnosticRunAttemptEvent + | DiagnosticRunProgressEvent | DiagnosticHeartbeatEvent | DiagnosticLivenessWarningEvent | DiagnosticToolLoopEvent @@ -526,6 +570,7 @@ const ASYNC_DIAGNOSTIC_EVENT_TYPES = new Set([ "model.call.started", "model.call.completed", "model.call.error", + "run.progress", "harness.run.started", "harness.run.completed", "harness.run.error", diff --git a/src/logging/diagnostic-run-activity.ts b/src/logging/diagnostic-run-activity.ts new file mode 100644 index 00000000000..5f1c1fe7671 --- /dev/null +++ b/src/logging/diagnostic-run-activity.ts @@ -0,0 +1,281 @@ +import { + onInternalDiagnosticEvent, + type DiagnosticEventPayload, + type DiagnosticSessionActiveWorkKind, +} from "../infra/diagnostic-events.js"; + +type SessionActivity = { + sessionId?: string; + sessionKey?: string; + activeEmbeddedRun: boolean; + activeTools: Map; + activeModelCalls: Set; + lastProgressAt: number; + lastProgressReason?: string; +}; + +type ActiveTool = { + toolName: string; + toolCallId?: string; + startedAt: number; + lastProgressAt: number; +}; + +export type DiagnosticSessionActivitySnapshot = { + activeWorkKind?: DiagnosticSessionActiveWorkKind; + activeToolName?: string; + activeToolCallId?: string; + activeToolAgeMs?: number; + lastProgressAgeMs?: number; + lastProgressReason?: string; +}; + +const activityByRef = new Map(); +const activityByRunId = new Map(); + +function sessionRefs(params: { sessionId?: string; sessionKey?: string }): string[] { + const refs: string[] = []; + const sessionId = params.sessionId?.trim(); + const sessionKey = params.sessionKey?.trim(); + if (sessionId) { + refs.push(`id:${sessionId}`); + } + if (sessionKey) { + refs.push(`key:${sessionKey}`); + } + return refs; +} + +function resolveSessionActivity(params: { + sessionId?: string; + sessionKey?: string; + runId?: string; + create?: boolean; +}): SessionActivity | undefined { + if (params.runId) { + const byRun = activityByRunId.get(params.runId); + if (byRun) { + return byRun; + } + } + + for (const ref of sessionRefs(params)) { + const activity = activityByRef.get(ref); + if (activity) { + if (params.runId) { + activityByRunId.set(params.runId, activity); + } + return activity; + } + } + + if (!params.create) { + return undefined; + } + + const activity: SessionActivity = { + sessionId: params.sessionId, + sessionKey: params.sessionKey, + activeEmbeddedRun: false, + activeTools: new Map(), + activeModelCalls: new Set(), + lastProgressAt: Date.now(), + }; + for (const ref of sessionRefs(params)) { + activityByRef.set(ref, activity); + } + if (params.runId) { + activityByRunId.set(params.runId, activity); + } + return activity; +} + +function touchSessionActivity(activity: SessionActivity, reason: string, now = Date.now()): void { + activity.lastProgressAt = now; + activity.lastProgressReason = reason; +} + +function toolKey(event: { + runId?: string; + sessionId?: string; + sessionKey?: string; + toolCallId?: string; + toolName: string; +}): string { + return `${event.runId ?? event.sessionId ?? event.sessionKey ?? "unknown"}:${ + event.toolCallId ?? event.toolName + }`; +} + +function modelCallKey(event: { runId?: string; provider?: string; model?: string }): string { + return `${event.runId ?? "unknown"}:${event.provider ?? "provider"}:${event.model ?? "model"}`; +} + +function recordToolStarted( + event: Extract, +): void { + const activity = resolveSessionActivity({ ...event, create: true }); + if (!activity) { + return; + } + const now = Date.now(); + activity.activeTools.set(toolKey(event), { + toolName: event.toolName, + toolCallId: event.toolCallId, + startedAt: now, + lastProgressAt: now, + }); + touchSessionActivity(activity, `tool:${event.toolName}:started`, now); +} + +function recordToolEnded( + event: Extract< + DiagnosticEventPayload, + { type: "tool.execution.completed" | "tool.execution.error" | "tool.execution.blocked" } + >, +): void { + const activity = resolveSessionActivity(event); + if (!activity) { + return; + } + activity.activeTools.delete(toolKey(event)); + touchSessionActivity(activity, `tool:${event.toolName}:ended`); +} + +function recordModelStarted( + event: Extract, +): void { + const activity = resolveSessionActivity({ ...event, create: true }); + if (!activity) { + return; + } + activity.activeModelCalls.add(modelCallKey(event)); + touchSessionActivity(activity, "model_call:started"); +} + +function recordModelEnded( + event: Extract, +): void { + const activity = resolveSessionActivity(event); + if (!activity) { + return; + } + activity.activeModelCalls.delete(modelCallKey(event)); + touchSessionActivity(activity, "model_call:ended"); +} + +function recordRunProgress(event: Extract): void { + const activity = resolveSessionActivity({ ...event, create: true }); + if (!activity) { + return; + } + touchSessionActivity(activity, event.reason); +} + +function recordRunCompleted( + event: Extract, +): void { + const activity = resolveSessionActivity(event); + if (!activity) { + return; + } + activityByRunId.delete(event.runId); + activity.activeTools.clear(); + activity.activeModelCalls.clear(); + activity.activeEmbeddedRun = false; + touchSessionActivity(activity, "run:completed"); +} + +export function markDiagnosticEmbeddedRunStarted(params: { + sessionId: string; + sessionKey?: string; +}): void { + const activity = resolveSessionActivity({ ...params, create: true }); + if (!activity) { + return; + } + activity.activeEmbeddedRun = true; + touchSessionActivity(activity, "embedded_run:started"); +} + +export function markDiagnosticEmbeddedRunEnded(params: { + sessionId: string; + sessionKey?: string; +}): void { + const activity = resolveSessionActivity(params); + if (!activity) { + return; + } + activity.activeEmbeddedRun = false; + activity.activeTools.clear(); + activity.activeModelCalls.clear(); + touchSessionActivity(activity, "embedded_run:ended"); +} + +export function getDiagnosticSessionActivitySnapshot( + params: { sessionId?: string; sessionKey?: string }, + now = Date.now(), +): DiagnosticSessionActivitySnapshot { + const activity = resolveSessionActivity(params); + if (!activity) { + return {}; + } + + let activeWorkKind: DiagnosticSessionActiveWorkKind | undefined; + if (activity.activeTools.size > 0) { + activeWorkKind = "tool_call"; + } else if (activity.activeModelCalls.size > 0) { + activeWorkKind = "model_call"; + } else if (activity.activeEmbeddedRun) { + activeWorkKind = "embedded_run"; + } + + let activeTool: ActiveTool | undefined; + for (const tool of activity.activeTools.values()) { + if (!activeTool || tool.startedAt < activeTool.startedAt) { + activeTool = tool; + } + } + + return { + activeWorkKind, + activeToolName: activeTool?.toolName, + activeToolCallId: activeTool?.toolCallId, + activeToolAgeMs: activeTool ? Math.max(0, now - activeTool.startedAt) : undefined, + lastProgressAgeMs: Math.max(0, now - activity.lastProgressAt), + lastProgressReason: activity.lastProgressReason, + }; +} + +export function resetDiagnosticRunActivityForTest(): void { + activityByRef.clear(); + activityByRunId.clear(); +} + +onInternalDiagnosticEvent((event) => { + switch (event.type) { + case "tool.execution.started": + recordToolStarted(event); + return; + case "tool.execution.completed": + case "tool.execution.error": + case "tool.execution.blocked": + recordToolEnded(event); + return; + case "model.call.started": + recordModelStarted(event); + return; + case "model.call.completed": + case "model.call.error": + recordModelEnded(event); + return; + case "run.progress": + recordRunProgress(event); + return; + case "run.completed": + recordRunCompleted(event); + return; + default: + return; + } +}); diff --git a/src/logging/diagnostic-stability-bundle.ts b/src/logging/diagnostic-stability-bundle.ts index 1ca1623dc8e..88c35343f43 100644 --- a/src/logging/diagnostic-stability-bundle.ts +++ b/src/logging/diagnostic-stability-bundle.ts @@ -328,6 +328,12 @@ function readStabilityEventRecord( assignOptionalCodeString(sanitized, "level", record.level, `${label}.level`); assignOptionalCodeString(sanitized, "detector", record.detector, `${label}.detector`); assignOptionalCodeString(sanitized, "toolName", record.toolName, `${label}.toolName`); + assignOptionalCodeString( + sanitized, + "activeWorkKind", + record.activeWorkKind, + `${label}.activeWorkKind`, + ); assignOptionalCodeString( sanitized, "pairedToolName", diff --git a/src/logging/diagnostic-stability.ts b/src/logging/diagnostic-stability.ts index d47da20c15f..37d97c3fb25 100644 --- a/src/logging/diagnostic-stability.ts +++ b/src/logging/diagnostic-stability.ts @@ -27,6 +27,7 @@ export type DiagnosticStabilityEventRecord = { detector?: string; deliveryKind?: string; toolName?: string; + activeWorkKind?: string; pairedToolName?: string; provider?: string; model?: string; @@ -231,11 +232,22 @@ function sanitizeDiagnosticEvent(event: DiagnosticEventPayload): DiagnosticStabi assignReasonCode(record, event.reason); record.queueDepth = event.queueDepth; break; + case "session.long_running": + case "session.stalled": case "session.stuck": record.outcome = event.state; + if (event.type === "session.stuck") { + record.level = "warning"; + } assignReasonCode(record, event.reason); record.ageMs = event.ageMs; record.queueDepth = event.queueDepth; + if (event.activeWorkKind) { + record.activeWorkKind = event.activeWorkKind; + } + if (event.activeToolName) { + record.toolName = event.activeToolName; + } break; case "queue.lane.enqueue": record.source = event.lane; @@ -249,6 +261,9 @@ function sanitizeDiagnosticEvent(event: DiagnosticEventPayload): DiagnosticStabi case "run.attempt": record.count = event.attempt; break; + case "run.progress": + assignReasonCode(record, event.reason); + break; case "context.assembled": record.channel = event.channel; record.provider = event.provider; diff --git a/src/logging/diagnostic.test.ts b/src/logging/diagnostic.test.ts index 470b4187bc2..20e116e40b1 100644 --- a/src/logging/diagnostic.test.ts +++ b/src/logging/diagnostic.test.ts @@ -8,6 +8,7 @@ import { setDiagnosticsEnabledForProcess, type DiagnosticEventPayload, } from "../infra/diagnostic-events.js"; +import { markDiagnosticEmbeddedRunStarted } from "./diagnostic-run-activity.js"; import { diagnosticSessionStates, getDiagnosticSessionStateCountForTest, @@ -149,7 +150,8 @@ describe("stuck session diagnostics threshold", () => { const stuckEvents = events.filter((event) => event.type === "session.stuck"); expect(stuckEvents).toHaveLength(1); expect(stuckEvents[0]).toMatchObject({ - reason: "processing_without_queue", + classification: "stale_session_state", + reason: "stale_session_state", queueDepth: 0, }); expect(recoverStuckSession).toHaveBeenCalledWith({ @@ -160,6 +162,76 @@ describe("stuck session diagnostics threshold", () => { }); }); + it("reports active sessions as stalled instead of stuck when active work stops progressing", () => { + const events: DiagnosticEventPayload[] = []; + const recoverStuckSession = vi.fn(); + const unsubscribe = onDiagnosticEvent((event) => { + events.push(event); + }); + try { + startDiagnosticHeartbeat( + { + diagnostics: { + enabled: true, + stuckSessionWarnMs: 30_000, + }, + }, + { recoverStuckSession }, + ); + logSessionStateChange({ sessionId: "s1", sessionKey: "main", state: "processing" }); + markDiagnosticEmbeddedRunStarted({ sessionId: "s1", sessionKey: "main" }); + vi.advanceTimersByTime(61_000); + } finally { + unsubscribe(); + } + + expect(events.filter((event) => event.type === "session.stuck")).toHaveLength(0); + const stalledEvents = events.filter((event) => event.type === "session.stalled"); + expect(stalledEvents).toHaveLength(1); + expect(stalledEvents[0]).toMatchObject({ + classification: "stalled_agent_run", + reason: "active_work_without_progress", + activeWorkKind: "embedded_run", + }); + expect(recoverStuckSession).not.toHaveBeenCalled(); + }); + + it("reports long-running sessions separately when active work is making progress", () => { + const events: DiagnosticEventPayload[] = []; + const recoverStuckSession = vi.fn(); + const unsubscribe = onDiagnosticEvent((event) => { + events.push(event); + }); + try { + startDiagnosticHeartbeat( + { + diagnostics: { + enabled: true, + stuckSessionWarnMs: 30_000, + }, + }, + { recoverStuckSession }, + ); + logSessionStateChange({ sessionId: "s1", sessionKey: "main", state: "processing" }); + vi.advanceTimersByTime(45_000); + markDiagnosticEmbeddedRunStarted({ sessionId: "s1", sessionKey: "main" }); + vi.advanceTimersByTime(16_000); + } finally { + unsubscribe(); + } + + expect(events.filter((event) => event.type === "session.stuck")).toHaveLength(0); + expect(events.filter((event) => event.type === "session.stalled")).toHaveLength(0); + const longRunningEvents = events.filter((event) => event.type === "session.long_running"); + expect(longRunningEvents).toHaveLength(1); + expect(longRunningEvents[0]).toMatchObject({ + classification: "long_running", + reason: "active_work", + activeWorkKind: "embedded_run", + }); + expect(recoverStuckSession).not.toHaveBeenCalled(); + }); + it("starts and stops the stability recorder with the heartbeat lifecycle", () => { startDiagnosticHeartbeat({ diagnostics: { diff --git a/src/logging/diagnostic.ts b/src/logging/diagnostic.ts index da5aa9e20d2..3ba77124a8d 100644 --- a/src/logging/diagnostic.ts +++ b/src/logging/diagnostic.ts @@ -5,9 +5,15 @@ import { areDiagnosticsEnabledForProcess, emitDiagnosticEvent, isDiagnosticsEnabled, + type DiagnosticSessionActiveWorkKind, type DiagnosticLivenessWarningReason, } from "../infra/diagnostic-events.js"; import { emitDiagnosticMemorySample, resetDiagnosticMemoryForTest } from "./diagnostic-memory.js"; +import { + getDiagnosticSessionActivitySnapshot, + resetDiagnosticRunActivityForTest, + type DiagnosticSessionActivitySnapshot, +} from "./diagnostic-run-activity.js"; import { diagnosticLogger as diag, getLastDiagnosticActivityAt, @@ -150,17 +156,82 @@ function hasRecentDiagnosticActivity(now: number): boolean { return lastActivityAt > 0 && now - lastActivityAt <= RECENT_DIAGNOSTIC_ACTIVITY_MS; } -function resolveStuckSessionReason(state: { - state: SessionStateValue; +type SessionAttentionClassification = + | { + eventType: "session.long_running"; + reason: string; + classification: "long_running"; + activeWorkKind?: DiagnosticSessionActiveWorkKind; + recoveryEligible: false; + } + | { + eventType: "session.stalled"; + reason: string; + classification: "blocked_tool_call" | "stalled_agent_run"; + activeWorkKind?: DiagnosticSessionActiveWorkKind; + recoveryEligible: false; + } + | { + eventType: "session.stuck"; + reason: string; + classification: "stale_session_state"; + activeWorkKind?: undefined; + recoveryEligible: true; + }; + +function classifySessionAttention(params: { queueDepth: number; -}): string { - if (state.queueDepth > 0) { - return "processing_with_queued_work"; + activity: DiagnosticSessionActivitySnapshot; + staleMs: number; +}): SessionAttentionClassification { + if (params.activity.activeWorkKind) { + if ( + params.activity.activeWorkKind === "tool_call" && + (params.activity.activeToolAgeMs ?? 0) > params.staleMs && + (params.activity.lastProgressAgeMs ?? 0) > params.staleMs + ) { + return { + eventType: "session.stalled", + reason: "blocked_tool_call", + classification: "blocked_tool_call", + activeWorkKind: params.activity.activeWorkKind, + recoveryEligible: false, + }; + } + if ((params.activity.lastProgressAgeMs ?? 0) > params.staleMs) { + return { + eventType: "session.stalled", + reason: "active_work_without_progress", + classification: "stalled_agent_run", + activeWorkKind: params.activity.activeWorkKind, + recoveryEligible: false, + }; + } + return { + eventType: "session.long_running", + reason: params.queueDepth > 0 ? "queued_behind_active_work" : "active_work", + classification: "long_running", + activeWorkKind: params.activity.activeWorkKind, + recoveryEligible: false, + }; } - if (state.state === "processing") { - return "processing_without_queue"; + + if (params.queueDepth > 0) { + return { + eventType: "session.long_running", + reason: "queued_work_without_active_run", + classification: "long_running", + activeWorkKind: "queued_work", + recoveryEligible: false, + }; } - return "stale_session_state"; + + return { + eventType: "session.stuck", + reason: "stale_session_state", + classification: "stale_session_state", + recoveryEligible: true, + }; } function roundDiagnosticMetric(value: number, digits = 3): number { @@ -522,29 +593,95 @@ export function logSessionStateChange( markActivity(); } -export function logSessionStuck(params: SessionRef & { state: SessionStateValue; ageMs: number }) { +function sessionAttentionFields(params: { + classification: SessionAttentionClassification; + activity: DiagnosticSessionActivitySnapshot; +}) { + return { + ...(params.classification.activeWorkKind + ? { activeWorkKind: params.classification.activeWorkKind } + : {}), + ...(params.activity.lastProgressAgeMs !== undefined + ? { lastProgressAgeMs: params.activity.lastProgressAgeMs } + : {}), + ...(params.activity.lastProgressReason + ? { lastProgressReason: params.activity.lastProgressReason } + : {}), + ...(params.activity.activeToolName ? { activeToolName: params.activity.activeToolName } : {}), + ...(params.activity.activeToolCallId + ? { activeToolCallId: params.activity.activeToolCallId } + : {}), + ...(params.activity.activeToolAgeMs !== undefined + ? { activeToolAgeMs: params.activity.activeToolAgeMs } + : {}), + }; +} + +export function logSessionAttention( + params: SessionRef & { + state: SessionStateValue; + ageMs: number; + thresholdMs: number; + }, +): SessionAttentionClassification | undefined { if (!areDiagnosticsEnabledForProcess()) { - return; + return undefined; } const state = getDiagnosticSessionState(params); - const reason = resolveStuckSessionReason(state); + const activity = getDiagnosticSessionActivitySnapshot( + { sessionId: state.sessionId, sessionKey: state.sessionKey }, + Date.now(), + ); + const classification = classifySessionAttention({ + queueDepth: state.queueDepth, + activity, + staleMs: params.thresholdMs, + }); + const label = + classification.eventType === "session.stuck" + ? "stuck session" + : classification.eventType === "session.stalled" + ? "stalled session" + : "long-running session"; diag.warn( - `stuck session: sessionId=${state.sessionId ?? "unknown"} sessionKey=${ + `${label}: sessionId=${state.sessionId ?? "unknown"} sessionKey=${ state.sessionKey ?? "unknown" } state=${params.state} age=${Math.round(params.ageMs / 1000)}s queueDepth=${ state.queueDepth - } reason=${reason} recovery=checking`, + } reason=${classification.reason} classification=${classification.classification}${ + classification.activeWorkKind ? ` activeWorkKind=${classification.activeWorkKind}` : "" + } recovery=${classification.recoveryEligible ? "checking" : "none"}`, ); - emitDiagnosticEvent({ - type: "session.stuck", + const baseEvent = { sessionId: state.sessionId, sessionKey: state.sessionKey, state: params.state, ageMs: params.ageMs, queueDepth: state.queueDepth, - reason, - }); + reason: classification.reason, + ...sessionAttentionFields({ classification, activity }), + }; + if (classification.eventType === "session.long_running") { + emitDiagnosticEvent({ + type: "session.long_running", + ...baseEvent, + classification: "long_running", + }); + } else if (classification.eventType === "session.stalled") { + emitDiagnosticEvent({ + type: "session.stalled", + ...baseEvent, + classification: classification.classification, + }); + } else { + emitDiagnosticEvent({ + type: "session.stuck", + ...baseEvent, + classification: "stale_session_state", + }); + } markActivity(); + return classification; } export function logRunAttempt(params: SessionRef & { runId: string; attempt: number }) { @@ -695,18 +832,21 @@ export function startDiagnosticHeartbeat( for (const [, state] of diagnosticSessionStates) { const ageMs = now - state.lastActivity; if (state.state === "processing" && ageMs > stuckSessionWarnMs) { - logSessionStuck({ + const classification = logSessionAttention({ sessionId: state.sessionId, sessionKey: state.sessionKey, state: state.state, ageMs, + thresholdMs: stuckSessionWarnMs, }); - void (opts?.recoverStuckSession ?? recoverStuckSession)({ - sessionId: state.sessionId, - sessionKey: state.sessionKey, - ageMs, - queueDepth: state.queueDepth, - }); + if (classification?.recoveryEligible) { + void (opts?.recoverStuckSession ?? recoverStuckSession)({ + sessionId: state.sessionId, + sessionKey: state.sessionKey, + ageMs, + queueDepth: state.queueDepth, + }); + } } } }, 30_000); @@ -730,6 +870,7 @@ export function getDiagnosticSessionStateCountForTest(): number { export function resetDiagnosticStateForTest(): void { resetDiagnosticSessionStateForTest(); resetDiagnosticActivityForTest(); + resetDiagnosticRunActivityForTest(); webhookStats.received = 0; webhookStats.processed = 0; webhookStats.errors = 0;