fix: classify session liveness diagnostics

This commit is contained in:
Peter Steinberger
2026-05-01 22:34:52 +01:00
parent fd16687a0b
commit 32db81ca5c
18 changed files with 621 additions and 41 deletions

View File

@@ -164,7 +164,7 @@ surfaces, while Codex native hooks remain a separate lower-level Codex mechanism
- `agent.wait` default: 30s (just the wait). `timeoutMs` param overrides.
- Agent runtime: `agents.defaults.timeoutSeconds` default 172800s (48 hours); enforced in `runEmbeddedPiAgent` abort timer.
- Cron runtime: isolated agent-turn `timeoutSeconds` is owned by cron. The scheduler starts that timer when execution begins, aborts the underlying run at the configured deadline, then runs bounded cleanup before recording the timeout so a stale child session cannot keep the lane stuck.
- Stuck-session recovery: with diagnostics enabled, `diagnostics.stuckSessionWarnMs` detects long `processing` sessions. Active embedded runs, active reply operations, and active session-lane tasks remain warning-only by default; if diagnostics show no active work for the session, the watchdog releases the affected session lane so queued startup work can drain.
- Session liveness diagnostics: with diagnostics enabled, `diagnostics.stuckSessionWarnMs` classifies long `processing` sessions. Active embedded runs, model calls, and tool calls report as `session.long_running`; active work with no recent progress reports as `session.stalled`; `session.stuck` is reserved for stale session bookkeeping with no active work, and only that path releases the affected session lane so queued startup work can drain.
- Model idle timeout: OpenClaw aborts a model request when no response chunks arrive before the idle window. `models.providers.<id>.timeoutSeconds` extends this idle watchdog for slow local/self-hosted providers; otherwise OpenClaw uses `agents.defaults.timeoutSeconds` when configured, capped at 120s by default. Cron-triggered runs with no explicit model or agent timeout disable the idle watchdog and rely on the cron outer timeout.
- Provider HTTP request timeout: `models.providers.<id>.timeoutSeconds` applies to that provider's model HTTP fetches, including connect, headers, body, SDK request timeout, total guarded-fetch abort handling, and model stream idle watchdog. Use this for slow local/self-hosted providers such as Ollama before raising the whole agent runtime timeout.

View File

@@ -115,7 +115,7 @@ keys.
- If commands seem stuck, enable verbose logs and look for “queued for …ms” lines to confirm the queue is draining.
- If you need queue depth, enable verbose logs and watch for queue timing lines.
- Codex app-server runs that accept a turn and then stop emitting progress are interrupted by the Codex adapter so the active session lane can release instead of waiting for the outer run timeout.
- When diagnostics are enabled, sessions that remain in `processing` past `diagnostics.stuckSessionWarnMs` log a stuck-session warning. Active embedded runs, active reply operations, and active lane tasks remain warning-only by default; stale startup bookkeeping with no active session work can release the affected session lane so queued work drains.
- When diagnostics are enabled, sessions that remain in `processing` past `diagnostics.stuckSessionWarnMs` are classified by current activity. Active work logs as `session.long_running`; active work with no recent progress logs as `session.stalled`; `session.stuck` is reserved for stale session bookkeeping with no active work, and only that path can release the affected session lane so queued work drains.
## Related

View File

@@ -937,7 +937,7 @@ Notes:
- `enabled`: master toggle for instrumentation output (default: `true`).
- `flags`: array of flag strings enabling targeted log output (supports wildcards like `"telegram.*"` or `"*"`).
- `stuckSessionWarnMs`: age threshold in ms for emitting stuck-session warnings while a session remains in processing state.
- `stuckSessionWarnMs`: age threshold in ms for classifying long-running processing sessions as `session.long_running`, `session.stalled`, or `session.stuck`.
- `otel.enabled`: enables the OpenTelemetry export pipeline (default: `false`). For the full configuration, signal catalog, and privacy model, see [OpenTelemetry export](/gateway/opentelemetry).
- `otel.endpoint`: collector URL for OTel export.
- `otel.tracesEndpoint` / `otel.metricsEndpoint` / `otel.logsEndpoint`: optional signal-specific OTLP endpoints. When set, they override `otel.endpoint` for that signal only.

View File

@@ -192,8 +192,8 @@ When any subkey is enabled, model and tool spans get bounded, redacted
- `openclaw.queue.depth` (histogram, attrs: `openclaw.lane` or `openclaw.channel=heartbeat`)
- `openclaw.queue.wait_ms` (histogram, attrs: `openclaw.lane`)
- `openclaw.session.state` (counter, attrs: `openclaw.state`, `openclaw.reason`)
- `openclaw.session.stuck` (counter, attrs: `openclaw.state`)
- `openclaw.session.stuck_age_ms` (histogram, attrs: `openclaw.state`)
- `openclaw.session.stuck` (counter, attrs: `openclaw.state`; emitted only for stale session bookkeeping with no active work)
- `openclaw.session.stuck_age_ms` (histogram, attrs: `openclaw.state`; emitted only for stale session bookkeeping with no active work)
- `openclaw.run.attempt` (counter, attrs: `openclaw.attempt`)
### Harness lifecycle
@@ -277,8 +277,8 @@ to them directly without OTLP export.
**Queue and session**
- `queue.lane.enqueue` / `queue.lane.dequeue`
- `session.state` / `session.stuck`
- `run.attempt`
- `session.state` / `session.long_running` / `session.stalled` / `session.stuck`
- `run.attempt` / `run.progress`
- `diagnostic.heartbeat` (aggregate counters: webhooks/queue/session)
**Harness lifecycle**

View File

@@ -36,6 +36,7 @@ import {
type NativeHookRelayEvent,
type NativeHookRelayRegistrationHandle,
} from "openclaw/plugin-sdk/agent-harness-runtime";
import { emitTrustedDiagnosticEvent } from "openclaw/plugin-sdk/diagnostic-runtime";
import { handleCodexAppServerApprovalRequest } from "./approval-bridge.js";
import { refreshCodexAppServerAuthTokens } from "./auth-bridge.js";
import {
@@ -708,6 +709,13 @@ export async function runCodexAppServerAttempt(
const touchTurnCompletionActivity = (reason: string, options?: { arm?: boolean }) => {
turnCompletionLastActivityAt = Date.now();
turnCompletionLastActivityReason = reason;
emitTrustedDiagnosticEvent({
type: "run.progress",
runId: params.runId,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
reason: `codex_app_server:${reason}`,
});
if (options?.arm) {
turnCompletionIdleWatchArmed = true;
}

View File

@@ -320,6 +320,7 @@ describe("diagnostics-otel service", () => {
type: "session.stuck",
state: "processing",
ageMs: 125_000,
classification: "stale_session_state",
});
emitDiagnosticEvent({
type: "run.attempt",

View File

@@ -2215,12 +2215,17 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
case "session.state":
recordSessionState(evt);
return;
case "session.long_running":
case "session.stalled":
return;
case "session.stuck":
recordSessionStuck(evt);
return;
case "run.attempt":
recordRunAttempt(evt);
return;
case "run.progress":
return;
case "diagnostic.heartbeat":
recordHeartbeat(evt);
return;

View File

@@ -79,7 +79,6 @@ export {
logMessageQueued,
logRunAttempt,
logSessionStateChange,
logSessionStuck,
logSuccess,
logToolLoopAction,
logWarn,

View File

@@ -8,6 +8,10 @@ import {
resolveActiveReplyRunSessionId,
waitForReplyRunEndBySessionId,
} from "../../auto-reply/reply/reply-run-registry.js";
import {
markDiagnosticEmbeddedRunEnded,
markDiagnosticEmbeddedRunStarted,
} from "../../logging/diagnostic-run-activity.js";
import {
diagnosticLogger as diag,
logMessageQueued,
@@ -366,6 +370,7 @@ export function setActiveEmbeddedRun(
state: "processing",
reason: wasActive ? "run_replaced" : "run_started",
});
markDiagnosticEmbeddedRunStarted({ sessionId, sessionKey });
if (!sessionId.startsWith("probe-")) {
diag.debug(`run registered: sessionId=${sessionId} totalActive=${ACTIVE_EMBEDDED_RUNS.size}`);
}
@@ -392,6 +397,7 @@ export function clearActiveEmbeddedRun(
EMBEDDED_RUN_MODEL_SWITCH_REQUESTS.delete(sessionId);
clearActiveRunSessionKeys(sessionId, sessionKey);
logSessionStateChange({ sessionId, sessionKey, state: "idle", reason: "run_completed" });
markDiagnosticEmbeddedRunEnded({ sessionId, sessionKey });
if (!sessionId.startsWith("probe-")) {
diag.debug(`run cleared: sessionId=${sessionId} totalActive=${ACTIVE_EMBEDDED_RUNS.size}`);
}
@@ -413,6 +419,7 @@ export function forceClearEmbeddedPiRun(
EMBEDDED_RUN_MODEL_SWITCH_REQUESTS.delete(sessionId);
clearActiveRunSessionKeys(sessionId, sessionKey);
logSessionStateChange({ sessionId, sessionKey, state: "idle", reason });
markDiagnosticEmbeddedRunEnded({ sessionId, sessionKey });
notifyEmbeddedRunEnded(sessionId);
cleared = true;
}

View File

@@ -147,9 +147,9 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
type: "integer",
exclusiveMinimum: 0,
maximum: 9007199254740991,
title: "Stuck Session Warning Threshold (ms)",
title: "Session Liveness Threshold (ms)",
description:
"Age threshold in milliseconds for emitting stuck-session warnings while a session remains in processing state. Increase for long multi-tool turns to reduce false positives; decrease for faster hang detection.",
"Age threshold in milliseconds for classifying long processing sessions as long-running, stalled, or stuck. Increase for long multi-tool turns; decrease for faster stale-session detection.",
},
otel: {
type: "object",
@@ -24484,8 +24484,8 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
tags: ["observability"],
},
"diagnostics.stuckSessionWarnMs": {
label: "Stuck Session Warning Threshold (ms)",
help: "Age threshold in milliseconds for emitting stuck-session warnings while a session remains in processing state. Increase for long multi-tool turns to reduce false positives; decrease for faster hang detection.",
label: "Session Liveness Threshold (ms)",
help: "Age threshold in milliseconds for classifying long processing sessions as long-running, stalled, or stuck. Increase for long multi-tool turns; decrease for faster stale-session detection.",
tags: ["observability", "storage"],
},
"diagnostics.otel.enabled": {

View File

@@ -585,7 +585,7 @@ export const FIELD_HELP: Record<string, string> = {
"diagnostics.enabled":
"Master toggle for diagnostics instrumentation output in logs and telemetry wiring paths. Defaults to enabled; set false only in tightly constrained environments.",
"diagnostics.stuckSessionWarnMs":
"Age threshold in milliseconds for emitting stuck-session warnings while a session remains in processing state. Increase for long multi-tool turns to reduce false positives; decrease for faster hang detection.",
"Age threshold in milliseconds for classifying long processing sessions as long-running, stalled, or stuck. Increase for long multi-tool turns; decrease for faster stale-session detection.",
"diagnostics.otel.enabled":
"Enables OpenTelemetry export pipeline for traces, metrics, and logs based on configured endpoint/protocol settings. Keep disabled unless your collector endpoint and auth are fully configured.",
"diagnostics.otel.endpoint":

View File

@@ -40,7 +40,7 @@ export const FIELD_LABELS: Record<string, string> = {
"commitments.maxPerDay": "Commitments per Day",
"diagnostics.enabled": "Diagnostics Enabled",
"diagnostics.flags": "Diagnostics Flags",
"diagnostics.stuckSessionWarnMs": "Stuck Session Warning Threshold (ms)",
"diagnostics.stuckSessionWarnMs": "Session Liveness Threshold (ms)",
"diagnostics.otel.enabled": "OpenTelemetry Enabled",
"diagnostics.otel.endpoint": "OpenTelemetry Endpoint",
"diagnostics.otel.tracesEndpoint": "OpenTelemetry Traces Endpoint",

View File

@@ -124,14 +124,47 @@ export type DiagnosticSessionStateEvent = DiagnosticBaseEvent & {
queueDepth?: number;
};
export type DiagnosticSessionStuckEvent = DiagnosticBaseEvent & {
type: "session.stuck";
export type DiagnosticSessionActiveWorkKind =
| "embedded_run"
| "model_call"
| "tool_call"
| "queued_work";
export type DiagnosticSessionAttentionClassification =
| "long_running"
| "blocked_tool_call"
| "stalled_agent_run"
| "stale_session_state";
type DiagnosticSessionAttentionBaseEvent = DiagnosticBaseEvent & {
sessionKey?: string;
sessionId?: string;
state: DiagnosticSessionState;
ageMs: number;
queueDepth?: number;
reason?: string;
classification: DiagnosticSessionAttentionClassification;
activeWorkKind?: DiagnosticSessionActiveWorkKind;
lastProgressAgeMs?: number;
lastProgressReason?: string;
activeToolName?: string;
activeToolCallId?: string;
activeToolAgeMs?: number;
};
export type DiagnosticSessionLongRunningEvent = DiagnosticSessionAttentionBaseEvent & {
type: "session.long_running";
classification: "long_running";
};
export type DiagnosticSessionStalledEvent = DiagnosticSessionAttentionBaseEvent & {
type: "session.stalled";
classification: "blocked_tool_call" | "stalled_agent_run";
};
export type DiagnosticSessionStuckEvent = DiagnosticSessionAttentionBaseEvent & {
type: "session.stuck";
classification: "stale_session_state";
};
export type DiagnosticLaneEnqueueEvent = DiagnosticBaseEvent & {
@@ -155,6 +188,14 @@ export type DiagnosticRunAttemptEvent = DiagnosticBaseEvent & {
attempt: number;
};
export type DiagnosticRunProgressEvent = DiagnosticBaseEvent & {
type: "run.progress";
sessionKey?: string;
sessionId?: string;
runId?: string;
reason: string;
};
export type DiagnosticHeartbeatEvent = DiagnosticBaseEvent & {
type: "diagnostic.heartbeat";
webhooks: {
@@ -455,10 +496,13 @@ export type DiagnosticEventPayload =
| DiagnosticMessageDeliveryCompletedEvent
| DiagnosticMessageDeliveryErrorEvent
| DiagnosticSessionStateEvent
| DiagnosticSessionLongRunningEvent
| DiagnosticSessionStalledEvent
| DiagnosticSessionStuckEvent
| DiagnosticLaneEnqueueEvent
| DiagnosticLaneDequeueEvent
| DiagnosticRunAttemptEvent
| DiagnosticRunProgressEvent
| DiagnosticHeartbeatEvent
| DiagnosticLivenessWarningEvent
| DiagnosticToolLoopEvent
@@ -526,6 +570,7 @@ const ASYNC_DIAGNOSTIC_EVENT_TYPES = new Set<DiagnosticEventPayload["type"]>([
"model.call.started",
"model.call.completed",
"model.call.error",
"run.progress",
"harness.run.started",
"harness.run.completed",
"harness.run.error",

View File

@@ -0,0 +1,281 @@
import {
onInternalDiagnosticEvent,
type DiagnosticEventPayload,
type DiagnosticSessionActiveWorkKind,
} from "../infra/diagnostic-events.js";
type SessionActivity = {
sessionId?: string;
sessionKey?: string;
activeEmbeddedRun: boolean;
activeTools: Map<string, ActiveTool>;
activeModelCalls: Set<string>;
lastProgressAt: number;
lastProgressReason?: string;
};
type ActiveTool = {
toolName: string;
toolCallId?: string;
startedAt: number;
lastProgressAt: number;
};
export type DiagnosticSessionActivitySnapshot = {
activeWorkKind?: DiagnosticSessionActiveWorkKind;
activeToolName?: string;
activeToolCallId?: string;
activeToolAgeMs?: number;
lastProgressAgeMs?: number;
lastProgressReason?: string;
};
const activityByRef = new Map<string, SessionActivity>();
const activityByRunId = new Map<string, SessionActivity>();
function sessionRefs(params: { sessionId?: string; sessionKey?: string }): string[] {
const refs: string[] = [];
const sessionId = params.sessionId?.trim();
const sessionKey = params.sessionKey?.trim();
if (sessionId) {
refs.push(`id:${sessionId}`);
}
if (sessionKey) {
refs.push(`key:${sessionKey}`);
}
return refs;
}
function resolveSessionActivity(params: {
sessionId?: string;
sessionKey?: string;
runId?: string;
create?: boolean;
}): SessionActivity | undefined {
if (params.runId) {
const byRun = activityByRunId.get(params.runId);
if (byRun) {
return byRun;
}
}
for (const ref of sessionRefs(params)) {
const activity = activityByRef.get(ref);
if (activity) {
if (params.runId) {
activityByRunId.set(params.runId, activity);
}
return activity;
}
}
if (!params.create) {
return undefined;
}
const activity: SessionActivity = {
sessionId: params.sessionId,
sessionKey: params.sessionKey,
activeEmbeddedRun: false,
activeTools: new Map(),
activeModelCalls: new Set(),
lastProgressAt: Date.now(),
};
for (const ref of sessionRefs(params)) {
activityByRef.set(ref, activity);
}
if (params.runId) {
activityByRunId.set(params.runId, activity);
}
return activity;
}
function touchSessionActivity(activity: SessionActivity, reason: string, now = Date.now()): void {
activity.lastProgressAt = now;
activity.lastProgressReason = reason;
}
function toolKey(event: {
runId?: string;
sessionId?: string;
sessionKey?: string;
toolCallId?: string;
toolName: string;
}): string {
return `${event.runId ?? event.sessionId ?? event.sessionKey ?? "unknown"}:${
event.toolCallId ?? event.toolName
}`;
}
function modelCallKey(event: { runId?: string; provider?: string; model?: string }): string {
return `${event.runId ?? "unknown"}:${event.provider ?? "provider"}:${event.model ?? "model"}`;
}
function recordToolStarted(
event: Extract<DiagnosticEventPayload, { type: "tool.execution.started" }>,
): void {
const activity = resolveSessionActivity({ ...event, create: true });
if (!activity) {
return;
}
const now = Date.now();
activity.activeTools.set(toolKey(event), {
toolName: event.toolName,
toolCallId: event.toolCallId,
startedAt: now,
lastProgressAt: now,
});
touchSessionActivity(activity, `tool:${event.toolName}:started`, now);
}
function recordToolEnded(
event: Extract<
DiagnosticEventPayload,
{ type: "tool.execution.completed" | "tool.execution.error" | "tool.execution.blocked" }
>,
): void {
const activity = resolveSessionActivity(event);
if (!activity) {
return;
}
activity.activeTools.delete(toolKey(event));
touchSessionActivity(activity, `tool:${event.toolName}:ended`);
}
function recordModelStarted(
event: Extract<DiagnosticEventPayload, { type: "model.call.started" }>,
): void {
const activity = resolveSessionActivity({ ...event, create: true });
if (!activity) {
return;
}
activity.activeModelCalls.add(modelCallKey(event));
touchSessionActivity(activity, "model_call:started");
}
function recordModelEnded(
event: Extract<DiagnosticEventPayload, { type: "model.call.completed" | "model.call.error" }>,
): void {
const activity = resolveSessionActivity(event);
if (!activity) {
return;
}
activity.activeModelCalls.delete(modelCallKey(event));
touchSessionActivity(activity, "model_call:ended");
}
function recordRunProgress(event: Extract<DiagnosticEventPayload, { type: "run.progress" }>): void {
const activity = resolveSessionActivity({ ...event, create: true });
if (!activity) {
return;
}
touchSessionActivity(activity, event.reason);
}
function recordRunCompleted(
event: Extract<DiagnosticEventPayload, { type: "run.completed" }>,
): void {
const activity = resolveSessionActivity(event);
if (!activity) {
return;
}
activityByRunId.delete(event.runId);
activity.activeTools.clear();
activity.activeModelCalls.clear();
activity.activeEmbeddedRun = false;
touchSessionActivity(activity, "run:completed");
}
export function markDiagnosticEmbeddedRunStarted(params: {
sessionId: string;
sessionKey?: string;
}): void {
const activity = resolveSessionActivity({ ...params, create: true });
if (!activity) {
return;
}
activity.activeEmbeddedRun = true;
touchSessionActivity(activity, "embedded_run:started");
}
export function markDiagnosticEmbeddedRunEnded(params: {
sessionId: string;
sessionKey?: string;
}): void {
const activity = resolveSessionActivity(params);
if (!activity) {
return;
}
activity.activeEmbeddedRun = false;
activity.activeTools.clear();
activity.activeModelCalls.clear();
touchSessionActivity(activity, "embedded_run:ended");
}
export function getDiagnosticSessionActivitySnapshot(
params: { sessionId?: string; sessionKey?: string },
now = Date.now(),
): DiagnosticSessionActivitySnapshot {
const activity = resolveSessionActivity(params);
if (!activity) {
return {};
}
let activeWorkKind: DiagnosticSessionActiveWorkKind | undefined;
if (activity.activeTools.size > 0) {
activeWorkKind = "tool_call";
} else if (activity.activeModelCalls.size > 0) {
activeWorkKind = "model_call";
} else if (activity.activeEmbeddedRun) {
activeWorkKind = "embedded_run";
}
let activeTool: ActiveTool | undefined;
for (const tool of activity.activeTools.values()) {
if (!activeTool || tool.startedAt < activeTool.startedAt) {
activeTool = tool;
}
}
return {
activeWorkKind,
activeToolName: activeTool?.toolName,
activeToolCallId: activeTool?.toolCallId,
activeToolAgeMs: activeTool ? Math.max(0, now - activeTool.startedAt) : undefined,
lastProgressAgeMs: Math.max(0, now - activity.lastProgressAt),
lastProgressReason: activity.lastProgressReason,
};
}
export function resetDiagnosticRunActivityForTest(): void {
activityByRef.clear();
activityByRunId.clear();
}
onInternalDiagnosticEvent((event) => {
switch (event.type) {
case "tool.execution.started":
recordToolStarted(event);
return;
case "tool.execution.completed":
case "tool.execution.error":
case "tool.execution.blocked":
recordToolEnded(event);
return;
case "model.call.started":
recordModelStarted(event);
return;
case "model.call.completed":
case "model.call.error":
recordModelEnded(event);
return;
case "run.progress":
recordRunProgress(event);
return;
case "run.completed":
recordRunCompleted(event);
return;
default:
return;
}
});

View File

@@ -328,6 +328,12 @@ function readStabilityEventRecord(
assignOptionalCodeString(sanitized, "level", record.level, `${label}.level`);
assignOptionalCodeString(sanitized, "detector", record.detector, `${label}.detector`);
assignOptionalCodeString(sanitized, "toolName", record.toolName, `${label}.toolName`);
assignOptionalCodeString(
sanitized,
"activeWorkKind",
record.activeWorkKind,
`${label}.activeWorkKind`,
);
assignOptionalCodeString(
sanitized,
"pairedToolName",

View File

@@ -27,6 +27,7 @@ export type DiagnosticStabilityEventRecord = {
detector?: string;
deliveryKind?: string;
toolName?: string;
activeWorkKind?: string;
pairedToolName?: string;
provider?: string;
model?: string;
@@ -231,11 +232,22 @@ function sanitizeDiagnosticEvent(event: DiagnosticEventPayload): DiagnosticStabi
assignReasonCode(record, event.reason);
record.queueDepth = event.queueDepth;
break;
case "session.long_running":
case "session.stalled":
case "session.stuck":
record.outcome = event.state;
if (event.type === "session.stuck") {
record.level = "warning";
}
assignReasonCode(record, event.reason);
record.ageMs = event.ageMs;
record.queueDepth = event.queueDepth;
if (event.activeWorkKind) {
record.activeWorkKind = event.activeWorkKind;
}
if (event.activeToolName) {
record.toolName = event.activeToolName;
}
break;
case "queue.lane.enqueue":
record.source = event.lane;
@@ -249,6 +261,9 @@ function sanitizeDiagnosticEvent(event: DiagnosticEventPayload): DiagnosticStabi
case "run.attempt":
record.count = event.attempt;
break;
case "run.progress":
assignReasonCode(record, event.reason);
break;
case "context.assembled":
record.channel = event.channel;
record.provider = event.provider;

View File

@@ -8,6 +8,7 @@ import {
setDiagnosticsEnabledForProcess,
type DiagnosticEventPayload,
} from "../infra/diagnostic-events.js";
import { markDiagnosticEmbeddedRunStarted } from "./diagnostic-run-activity.js";
import {
diagnosticSessionStates,
getDiagnosticSessionStateCountForTest,
@@ -149,7 +150,8 @@ describe("stuck session diagnostics threshold", () => {
const stuckEvents = events.filter((event) => event.type === "session.stuck");
expect(stuckEvents).toHaveLength(1);
expect(stuckEvents[0]).toMatchObject({
reason: "processing_without_queue",
classification: "stale_session_state",
reason: "stale_session_state",
queueDepth: 0,
});
expect(recoverStuckSession).toHaveBeenCalledWith({
@@ -160,6 +162,76 @@ describe("stuck session diagnostics threshold", () => {
});
});
it("reports active sessions as stalled instead of stuck when active work stops progressing", () => {
const events: DiagnosticEventPayload[] = [];
const recoverStuckSession = vi.fn();
const unsubscribe = onDiagnosticEvent((event) => {
events.push(event);
});
try {
startDiagnosticHeartbeat(
{
diagnostics: {
enabled: true,
stuckSessionWarnMs: 30_000,
},
},
{ recoverStuckSession },
);
logSessionStateChange({ sessionId: "s1", sessionKey: "main", state: "processing" });
markDiagnosticEmbeddedRunStarted({ sessionId: "s1", sessionKey: "main" });
vi.advanceTimersByTime(61_000);
} finally {
unsubscribe();
}
expect(events.filter((event) => event.type === "session.stuck")).toHaveLength(0);
const stalledEvents = events.filter((event) => event.type === "session.stalled");
expect(stalledEvents).toHaveLength(1);
expect(stalledEvents[0]).toMatchObject({
classification: "stalled_agent_run",
reason: "active_work_without_progress",
activeWorkKind: "embedded_run",
});
expect(recoverStuckSession).not.toHaveBeenCalled();
});
it("reports long-running sessions separately when active work is making progress", () => {
const events: DiagnosticEventPayload[] = [];
const recoverStuckSession = vi.fn();
const unsubscribe = onDiagnosticEvent((event) => {
events.push(event);
});
try {
startDiagnosticHeartbeat(
{
diagnostics: {
enabled: true,
stuckSessionWarnMs: 30_000,
},
},
{ recoverStuckSession },
);
logSessionStateChange({ sessionId: "s1", sessionKey: "main", state: "processing" });
vi.advanceTimersByTime(45_000);
markDiagnosticEmbeddedRunStarted({ sessionId: "s1", sessionKey: "main" });
vi.advanceTimersByTime(16_000);
} finally {
unsubscribe();
}
expect(events.filter((event) => event.type === "session.stuck")).toHaveLength(0);
expect(events.filter((event) => event.type === "session.stalled")).toHaveLength(0);
const longRunningEvents = events.filter((event) => event.type === "session.long_running");
expect(longRunningEvents).toHaveLength(1);
expect(longRunningEvents[0]).toMatchObject({
classification: "long_running",
reason: "active_work",
activeWorkKind: "embedded_run",
});
expect(recoverStuckSession).not.toHaveBeenCalled();
});
it("starts and stops the stability recorder with the heartbeat lifecycle", () => {
startDiagnosticHeartbeat({
diagnostics: {

View File

@@ -5,9 +5,15 @@ import {
areDiagnosticsEnabledForProcess,
emitDiagnosticEvent,
isDiagnosticsEnabled,
type DiagnosticSessionActiveWorkKind,
type DiagnosticLivenessWarningReason,
} from "../infra/diagnostic-events.js";
import { emitDiagnosticMemorySample, resetDiagnosticMemoryForTest } from "./diagnostic-memory.js";
import {
getDiagnosticSessionActivitySnapshot,
resetDiagnosticRunActivityForTest,
type DiagnosticSessionActivitySnapshot,
} from "./diagnostic-run-activity.js";
import {
diagnosticLogger as diag,
getLastDiagnosticActivityAt,
@@ -150,17 +156,82 @@ function hasRecentDiagnosticActivity(now: number): boolean {
return lastActivityAt > 0 && now - lastActivityAt <= RECENT_DIAGNOSTIC_ACTIVITY_MS;
}
function resolveStuckSessionReason(state: {
state: SessionStateValue;
type SessionAttentionClassification =
| {
eventType: "session.long_running";
reason: string;
classification: "long_running";
activeWorkKind?: DiagnosticSessionActiveWorkKind;
recoveryEligible: false;
}
| {
eventType: "session.stalled";
reason: string;
classification: "blocked_tool_call" | "stalled_agent_run";
activeWorkKind?: DiagnosticSessionActiveWorkKind;
recoveryEligible: false;
}
| {
eventType: "session.stuck";
reason: string;
classification: "stale_session_state";
activeWorkKind?: undefined;
recoveryEligible: true;
};
function classifySessionAttention(params: {
queueDepth: number;
}): string {
if (state.queueDepth > 0) {
return "processing_with_queued_work";
activity: DiagnosticSessionActivitySnapshot;
staleMs: number;
}): SessionAttentionClassification {
if (params.activity.activeWorkKind) {
if (
params.activity.activeWorkKind === "tool_call" &&
(params.activity.activeToolAgeMs ?? 0) > params.staleMs &&
(params.activity.lastProgressAgeMs ?? 0) > params.staleMs
) {
return {
eventType: "session.stalled",
reason: "blocked_tool_call",
classification: "blocked_tool_call",
activeWorkKind: params.activity.activeWorkKind,
recoveryEligible: false,
};
}
if ((params.activity.lastProgressAgeMs ?? 0) > params.staleMs) {
return {
eventType: "session.stalled",
reason: "active_work_without_progress",
classification: "stalled_agent_run",
activeWorkKind: params.activity.activeWorkKind,
recoveryEligible: false,
};
}
return {
eventType: "session.long_running",
reason: params.queueDepth > 0 ? "queued_behind_active_work" : "active_work",
classification: "long_running",
activeWorkKind: params.activity.activeWorkKind,
recoveryEligible: false,
};
}
if (state.state === "processing") {
return "processing_without_queue";
if (params.queueDepth > 0) {
return {
eventType: "session.long_running",
reason: "queued_work_without_active_run",
classification: "long_running",
activeWorkKind: "queued_work",
recoveryEligible: false,
};
}
return "stale_session_state";
return {
eventType: "session.stuck",
reason: "stale_session_state",
classification: "stale_session_state",
recoveryEligible: true,
};
}
function roundDiagnosticMetric(value: number, digits = 3): number {
@@ -522,29 +593,95 @@ export function logSessionStateChange(
markActivity();
}
export function logSessionStuck(params: SessionRef & { state: SessionStateValue; ageMs: number }) {
function sessionAttentionFields(params: {
classification: SessionAttentionClassification;
activity: DiagnosticSessionActivitySnapshot;
}) {
return {
...(params.classification.activeWorkKind
? { activeWorkKind: params.classification.activeWorkKind }
: {}),
...(params.activity.lastProgressAgeMs !== undefined
? { lastProgressAgeMs: params.activity.lastProgressAgeMs }
: {}),
...(params.activity.lastProgressReason
? { lastProgressReason: params.activity.lastProgressReason }
: {}),
...(params.activity.activeToolName ? { activeToolName: params.activity.activeToolName } : {}),
...(params.activity.activeToolCallId
? { activeToolCallId: params.activity.activeToolCallId }
: {}),
...(params.activity.activeToolAgeMs !== undefined
? { activeToolAgeMs: params.activity.activeToolAgeMs }
: {}),
};
}
export function logSessionAttention(
params: SessionRef & {
state: SessionStateValue;
ageMs: number;
thresholdMs: number;
},
): SessionAttentionClassification | undefined {
if (!areDiagnosticsEnabledForProcess()) {
return;
return undefined;
}
const state = getDiagnosticSessionState(params);
const reason = resolveStuckSessionReason(state);
const activity = getDiagnosticSessionActivitySnapshot(
{ sessionId: state.sessionId, sessionKey: state.sessionKey },
Date.now(),
);
const classification = classifySessionAttention({
queueDepth: state.queueDepth,
activity,
staleMs: params.thresholdMs,
});
const label =
classification.eventType === "session.stuck"
? "stuck session"
: classification.eventType === "session.stalled"
? "stalled session"
: "long-running session";
diag.warn(
`stuck session: sessionId=${state.sessionId ?? "unknown"} sessionKey=${
`${label}: sessionId=${state.sessionId ?? "unknown"} sessionKey=${
state.sessionKey ?? "unknown"
} state=${params.state} age=${Math.round(params.ageMs / 1000)}s queueDepth=${
state.queueDepth
} reason=${reason} recovery=checking`,
} reason=${classification.reason} classification=${classification.classification}${
classification.activeWorkKind ? ` activeWorkKind=${classification.activeWorkKind}` : ""
} recovery=${classification.recoveryEligible ? "checking" : "none"}`,
);
emitDiagnosticEvent({
type: "session.stuck",
const baseEvent = {
sessionId: state.sessionId,
sessionKey: state.sessionKey,
state: params.state,
ageMs: params.ageMs,
queueDepth: state.queueDepth,
reason,
});
reason: classification.reason,
...sessionAttentionFields({ classification, activity }),
};
if (classification.eventType === "session.long_running") {
emitDiagnosticEvent({
type: "session.long_running",
...baseEvent,
classification: "long_running",
});
} else if (classification.eventType === "session.stalled") {
emitDiagnosticEvent({
type: "session.stalled",
...baseEvent,
classification: classification.classification,
});
} else {
emitDiagnosticEvent({
type: "session.stuck",
...baseEvent,
classification: "stale_session_state",
});
}
markActivity();
return classification;
}
export function logRunAttempt(params: SessionRef & { runId: string; attempt: number }) {
@@ -695,18 +832,21 @@ export function startDiagnosticHeartbeat(
for (const [, state] of diagnosticSessionStates) {
const ageMs = now - state.lastActivity;
if (state.state === "processing" && ageMs > stuckSessionWarnMs) {
logSessionStuck({
const classification = logSessionAttention({
sessionId: state.sessionId,
sessionKey: state.sessionKey,
state: state.state,
ageMs,
thresholdMs: stuckSessionWarnMs,
});
void (opts?.recoverStuckSession ?? recoverStuckSession)({
sessionId: state.sessionId,
sessionKey: state.sessionKey,
ageMs,
queueDepth: state.queueDepth,
});
if (classification?.recoveryEligible) {
void (opts?.recoverStuckSession ?? recoverStuckSession)({
sessionId: state.sessionId,
sessionKey: state.sessionKey,
ageMs,
queueDepth: state.queueDepth,
});
}
}
}
}, 30_000);
@@ -730,6 +870,7 @@ export function getDiagnosticSessionStateCountForTest(): number {
export function resetDiagnosticStateForTest(): void {
resetDiagnosticSessionStateForTest();
resetDiagnosticActivityForTest();
resetDiagnosticRunActivityForTest();
webhookStats.received = 0;
webhookStats.processed = 0;
webhookStats.errors = 0;