fix: recover stuck diagnostic sessions safely

This commit is contained in:
Peter Steinberger
2026-05-05 04:01:22 +01:00
parent c739088d62
commit 761e668acf
20 changed files with 825 additions and 79 deletions

View File

@@ -65,6 +65,7 @@ Docs: https://docs.openclaw.ai
- Slack/subagents: keep resumed parent `message.send` calls in the originating Slack thread when ambient session thread context is present, and suppress successful silent child completion rows from follow-up findings. Thanks @bek91.
- Infra/Windows: skip the POSIX `/tmp/openclaw` preferred path on Windows in `resolvePreferredOpenClawTmpDir` so log files, TTS temp files, and other writes land in `%TEMP%\openclaw-<uid>` instead of `C:\tmp\openclaw`. Fixes #60713. Thanks @juan-flores077.
- Gateway/diagnostics: make stuck-session recovery outcome-driven and generation-guarded, add `diagnostics.stuckSessionAbortMs`, and emit structured recovery requested/completed events so stale or skipped recovery no longer looks like a successful abort.
- Media/Windows: open saved attachment temp files read/write before fsync so Windows WebChat and `chat.send` media offloads no longer fail with EPERM during durability flush. (#76593) Thanks @qq230849622-a11y.
- Agents/tools: honor narrow runtime tool allowlists when constructing embedded-runner tool families and bundled MCP/LSP runtimes, so cron/subagent runs that request tools such as `update_plan`, `browser`, `x_search`, channel login tools, or `group:plugins` no longer start with missing tools or unrelated bootstrap work. (#77519, #77532)
- Codex plugin: mirror the experimental upstream app-server protocol and format generated TypeScript before drift checks, keeping OpenClaw's `experimentalApi` bridge compatible with latest Codex while preserving formatter gates.

View File

@@ -1,4 +1,4 @@
b4491e9b8ea5606cad18c1acf06f03d35301ebec1974d201ec9ee7582d2f6001 config-baseline.json
9c0c9369d49c2001f91ec030e3852ccdc2ac9084229f335804aa9141c13b4795 config-baseline.core.json
657060e80f3dc4b7d992e8625d2a8b0ff9b1b408960148d3f5f6a381d602359a config-baseline.json
92cbb12ca382f7424e7bd52df21798b10a57621f5c266909fa74e23f6cb973d7 config-baseline.core.json
cd7c0c7fb1435bc7e59099e9ac334462d5ad444016e9ab4512aae63a238f78dc config-baseline.channel.json
9832b30a696930a3da7efccf38073137571e1b66cae84e54d747b733fdafcc54 config-baseline.plugin.json

View File

@@ -165,7 +165,7 @@ surfaces, while Codex native hooks remain a separate lower-level Codex mechanism
- `agent.wait` default: 30s (just the wait). `timeoutMs` param overrides.
- Agent runtime: `agents.defaults.timeoutSeconds` default 172800s (48 hours); enforced in `runEmbeddedPiAgent` abort timer.
- Cron runtime: isolated agent-turn `timeoutSeconds` is owned by cron. The scheduler starts that timer when execution begins, aborts the underlying run at the configured deadline, then runs bounded cleanup before recording the timeout so a stale child session cannot keep the lane stuck.
- Session liveness diagnostics: with diagnostics enabled, `diagnostics.stuckSessionWarnMs` classifies long `processing` sessions that have no observed reply, tool, status, block, or ACP progress. Active embedded runs, model calls, and tool calls report as `session.long_running`; active work with no recent progress reports as `session.stalled`; `session.stuck` is reserved for stale session bookkeeping with no active work. Stale session bookkeeping releases the affected session lane immediately; stalled embedded runs are abort-drained only after an extended no-progress window (at least 10 minutes and 5x the warning threshold) so queued work can resume without cutting off merely slow runs. Repeated `session.stuck` diagnostics back off while the session remains unchanged.
- Session liveness diagnostics: with diagnostics enabled, `diagnostics.stuckSessionWarnMs` classifies long `processing` sessions that have no observed reply, tool, status, block, or ACP progress. Active embedded runs, model calls, and tool calls report as `session.long_running`; active work with no recent progress reports as `session.stalled`; `session.stuck` is reserved for stale session bookkeeping with no active work. Stale session bookkeeping releases the affected session lane immediately; stalled embedded runs are abort-drained only after `diagnostics.stuckSessionAbortMs` (default: at least 10 minutes and 5x the warning threshold) so queued work can resume without cutting off merely slow runs. Recovery emits structured requested/completed outcomes, and diagnostic state is marked idle only if the same processing generation is still current. Repeated `session.stuck` diagnostics back off while the session remains unchanged.
- Model idle timeout: OpenClaw aborts a model request when no response chunks arrive before the idle window. `models.providers.<id>.timeoutSeconds` extends this idle watchdog for slow local/self-hosted providers; otherwise OpenClaw uses `agents.defaults.timeoutSeconds` when configured, capped at 120s by default. Cron-triggered runs with no explicit model or agent timeout disable the idle watchdog and rely on the cron outer timeout.
- Provider HTTP request timeout: `models.providers.<id>.timeoutSeconds` applies to that provider's model HTTP fetches, including connect, headers, body, SDK request timeout, total guarded-fetch abort handling, and model stream idle watchdog. Use this for slow local/self-hosted providers such as Ollama before raising the whole agent runtime timeout.

View File

@@ -920,6 +920,7 @@ Notes:
enabled: true,
flags: ["telegram.*"],
stuckSessionWarnMs: 30000,
stuckSessionAbortMs: 600000,
otel: {
enabled: false,
@@ -959,6 +960,7 @@ Notes:
- `enabled`: master toggle for instrumentation output (default: `true`).
- `flags`: array of flag strings enabling targeted log output (supports wildcards like `"telegram.*"` or `"*"`).
- `stuckSessionWarnMs`: no-progress age threshold in ms for classifying long-running processing sessions as `session.long_running`, `session.stalled`, or `session.stuck`. Reply, tool, status, block, and ACP progress reset the timer; repeated `session.stuck` diagnostics back off while unchanged.
- `stuckSessionAbortMs`: no-progress age threshold in ms before eligible stalled active work may be abort-drained for recovery. When unset, OpenClaw uses the safer extended embedded-run window of at least 10 minutes and 5x `stuckSessionWarnMs`.
- `otel.enabled`: enables the OpenTelemetry export pipeline (default: `false`). For the full configuration, signal catalog, and privacy model, see [OpenTelemetry export](/gateway/opentelemetry).
- `otel.endpoint`: collector URL for OTel export.
- `otel.tracesEndpoint` / `otel.metricsEndpoint` / `otel.logsEndpoint`: optional signal-specific OTLP endpoints. When set, they override `otel.endpoint` for that signal only.

View File

@@ -216,11 +216,18 @@ OpenClaw classifies sessions by the work it can still observe:
still making progress.
- `session.stalled`: active work exists, but the active run has not reported
recent progress. Stalled embedded runs stay observe-only at first, then
abort-drain after at least 10 minutes and 5x `diagnostics.stuckSessionWarnMs`
with no progress so queued turns behind the lane can resume.
abort-drain after `diagnostics.stuckSessionAbortMs` with no progress so queued
turns behind the lane can resume. When unset, the abort threshold defaults to
the safer extended window of at least 10 minutes and 5x
`diagnostics.stuckSessionWarnMs`.
- `session.stuck`: stale session bookkeeping with no active work. This releases
the affected session lane immediately.
Recovery emits structured `session.recovery.requested` and
`session.recovery.completed` events. Diagnostic session state is marked idle
only after a mutating recovery outcome (`aborted` or `released`) and only if the
same processing generation is still current.
Only `session.stuck` emits the `openclaw.session.stuck` counter, the
`openclaw.session.stuck_age_ms` histogram, and the `openclaw.session.stuck`
span. Repeated `session.stuck` diagnostics back off while the session remains

View File

@@ -151,6 +151,14 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
description:
"No-progress age threshold in milliseconds for classifying long processing sessions as long-running, stalled, or stuck. Reply, tool, status, block, and ACP progress reset the timer; repeated stuck diagnostics back off while unchanged.",
},
stuckSessionAbortMs: {
type: "integer",
exclusiveMinimum: 0,
maximum: 9007199254740991,
title: "Session Abort Threshold (ms)",
description:
"No-progress age threshold in milliseconds before eligible stalled active work may be abort-drained for recovery. Defaults to the safer extended embedded-run recovery window.",
},
otel: {
type: "object",
properties: {
@@ -24666,6 +24674,11 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
help: "No-progress age threshold in milliseconds for classifying long processing sessions as long-running, stalled, or stuck. Reply, tool, status, block, and ACP progress reset the timer; repeated stuck diagnostics back off while unchanged.",
tags: ["observability", "storage"],
},
"diagnostics.stuckSessionAbortMs": {
label: "Session Abort Threshold (ms)",
help: "No-progress age threshold in milliseconds before eligible stalled active work may be abort-drained for recovery. Defaults to the safer extended embedded-run recovery window.",
tags: ["observability", "storage"],
},
"diagnostics.otel.enabled": {
label: "OpenTelemetry Enabled",
help: "Enables OpenTelemetry export pipeline for traces, metrics, and logs based on configured endpoint/protocol settings. Keep disabled unless your collector endpoint and auth are fully configured.",

View File

@@ -588,6 +588,8 @@ export const FIELD_HELP: Record<string, string> = {
"Master toggle for diagnostics instrumentation output in logs and telemetry wiring paths. Defaults to enabled; set false only in tightly constrained environments.",
"diagnostics.stuckSessionWarnMs":
"No-progress age threshold in milliseconds for classifying long processing sessions as long-running, stalled, or stuck. Reply, tool, status, block, and ACP progress reset the timer; repeated stuck diagnostics back off while unchanged.",
"diagnostics.stuckSessionAbortMs":
"No-progress age threshold in milliseconds before eligible stalled active work may be abort-drained for recovery. Defaults to the safer extended embedded-run recovery window.",
"diagnostics.otel.enabled":
"Enables OpenTelemetry export pipeline for traces, metrics, and logs based on configured endpoint/protocol settings. Keep disabled unless your collector endpoint and auth are fully configured.",
"diagnostics.otel.endpoint":

View File

@@ -41,6 +41,7 @@ export const FIELD_LABELS: Record<string, string> = {
"diagnostics.enabled": "Diagnostics Enabled",
"diagnostics.flags": "Diagnostics Flags",
"diagnostics.stuckSessionWarnMs": "Session Liveness Threshold (ms)",
"diagnostics.stuckSessionAbortMs": "Session Abort Threshold (ms)",
"diagnostics.otel.enabled": "OpenTelemetry Enabled",
"diagnostics.otel.endpoint": "OpenTelemetry Endpoint",
"diagnostics.otel.tracesEndpoint": "OpenTelemetry Traces Endpoint",

View File

@@ -309,6 +309,8 @@ export type DiagnosticsConfig = {
flags?: string[];
/** Threshold in ms before a processing session with no observed progress logs diagnostics. */
stuckSessionWarnMs?: number;
/** Threshold in ms before eligible stalled active work may be aborted for recovery. */
stuckSessionAbortMs?: number;
otel?: DiagnosticsOtelConfig;
cacheTrace?: DiagnosticsCacheTraceConfig;
};

View File

@@ -342,6 +342,7 @@ export const OpenClawSchema = z
enabled: z.boolean().optional(),
flags: z.array(z.string()).optional(),
stuckSessionWarnMs: z.number().int().positive().optional(),
stuckSessionAbortMs: z.number().int().positive().optional(),
otel: z
.object({
enabled: z.boolean().optional(),

View File

@@ -63,8 +63,9 @@ const BASE_RELOAD_RULES: ReloadRule[] = [
kind: "hot",
actions: ["restart-health-monitor"],
},
// Stuck-session warning threshold is read by the diagnostics heartbeat loop.
// Stuck-session thresholds are read by the diagnostics heartbeat loop.
{ prefix: "diagnostics.stuckSessionWarnMs", kind: "none" },
{ prefix: "diagnostics.stuckSessionAbortMs", kind: "none" },
{ prefix: "hooks.gmail", kind: "hot", actions: ["restart-gmail-watcher"] },
{ prefix: "hooks", kind: "hot", actions: ["reload-hooks"] },
{

View File

@@ -388,10 +388,14 @@ describe("buildGatewayReloadPlan", () => {
expect(plan.noopPaths).toContain("secrets.providers.default.path");
});
it("treats diagnostics.stuckSessionWarnMs as no-op for gateway restart planning", () => {
const plan = buildGatewayReloadPlan(["diagnostics.stuckSessionWarnMs"]);
it("treats diagnostics stuck-session thresholds as no-op for gateway restart planning", () => {
const plan = buildGatewayReloadPlan([
"diagnostics.stuckSessionWarnMs",
"diagnostics.stuckSessionAbortMs",
]);
expect(plan.restartGateway).toBe(false);
expect(plan.noopPaths).toContain("diagnostics.stuckSessionWarnMs");
expect(plan.noopPaths).toContain("diagnostics.stuckSessionAbortMs");
});
it("restarts for gateway.auth.token changes", () => {

View File

@@ -164,6 +164,38 @@ export type DiagnosticSessionStuckEvent = DiagnosticSessionAttentionBaseEvent &
classification: "stale_session_state";
};
export type DiagnosticSessionRecoveryStatus =
| "aborted"
| "released"
| "skipped"
| "noop"
| "failed";
type DiagnosticSessionRecoveryBaseEvent = DiagnosticBaseEvent & {
sessionKey?: string;
sessionId?: string;
state: DiagnosticSessionState;
stateGeneration?: number;
ageMs: number;
queueDepth?: number;
reason?: string;
activeWorkKind?: DiagnosticSessionActiveWorkKind;
allowActiveAbort?: boolean;
};
export type DiagnosticSessionRecoveryRequestedEvent = DiagnosticSessionRecoveryBaseEvent & {
type: "session.recovery.requested";
};
export type DiagnosticSessionRecoveryCompletedEvent = DiagnosticSessionRecoveryBaseEvent & {
type: "session.recovery.completed";
status: DiagnosticSessionRecoveryStatus;
action: string;
outcomeReason?: string;
released?: number;
stale?: boolean;
};
export type DiagnosticLaneEnqueueEvent = DiagnosticBaseEvent & {
type: "queue.lane.enqueue";
lane: string;
@@ -520,6 +552,8 @@ export type DiagnosticEventPayload =
| DiagnosticSessionLongRunningEvent
| DiagnosticSessionStalledEvent
| DiagnosticSessionStuckEvent
| DiagnosticSessionRecoveryRequestedEvent
| DiagnosticSessionRecoveryCompletedEvent
| DiagnosticLaneEnqueueEvent
| DiagnosticLaneDequeueEvent
| DiagnosticRunAttemptEvent

View File

@@ -0,0 +1,200 @@
import { emitDiagnosticEvent } from "../infra/diagnostic-events.js";
import { markDiagnosticActivity as markActivity } from "./diagnostic-runtime.js";
import type { SessionAttentionClassification } from "./diagnostic-session-attention.js";
import {
recoveryOutcomeMutatesSessionState,
recoveryOutcomeReleasedCount,
type StuckSessionRecoveryOutcome,
type StuckSessionRecoveryRequest,
} from "./diagnostic-session-recovery.js";
import {
getDiagnosticSessionState,
isDiagnosticSessionStateCurrent,
} from "./diagnostic-session-state.js";
export type RecoverStuckSession = (
params: StuckSessionRecoveryRequest,
) => void | StuckSessionRecoveryOutcome | Promise<void | StuckSessionRecoveryOutcome>;
const recoveryRequestsInFlight = new Set<string>();
function emitSessionRecoveryRequested(params: {
request: StuckSessionRecoveryRequest;
classification: SessionAttentionClassification;
}): void {
emitDiagnosticEvent({
type: "session.recovery.requested",
sessionId: params.request.sessionId,
sessionKey: params.request.sessionKey,
state: "processing",
stateGeneration: params.request.stateGeneration,
ageMs: params.request.ageMs,
queueDepth: params.request.queueDepth,
reason: params.classification.reason,
activeWorkKind: params.classification.activeWorkKind,
allowActiveAbort: params.request.allowActiveAbort,
});
}
function emitSessionRecoveryCompleted(params: {
request: StuckSessionRecoveryRequest;
outcome: StuckSessionRecoveryOutcome;
stale?: boolean;
}): void {
emitDiagnosticEvent({
type: "session.recovery.completed",
sessionId: params.request.sessionId,
sessionKey: params.request.sessionKey,
state: "processing",
stateGeneration: params.request.stateGeneration,
ageMs: params.request.ageMs,
queueDepth: params.request.queueDepth,
activeWorkKind: params.outcome.activeWorkKind,
status: params.outcome.status,
action: params.outcome.action,
outcomeReason: "reason" in params.outcome ? params.outcome.reason : undefined,
released: recoveryOutcomeReleasedCount(params.outcome) || undefined,
stale: params.stale,
});
}
function recoveryRequestKey(request: StuckSessionRecoveryRequest): string | undefined {
const ref = request.sessionKey?.trim() || request.sessionId?.trim();
if (!ref) {
return undefined;
}
return `${ref}:${request.stateGeneration ?? "unknown"}`;
}
function isRecoveryPromiseLike(
value: void | StuckSessionRecoveryOutcome | Promise<void | StuckSessionRecoveryOutcome>,
): value is Promise<void | StuckSessionRecoveryOutcome> {
return (
typeof (value as Promise<void | StuckSessionRecoveryOutcome> | undefined)?.then === "function"
);
}
function applyRecoveryOutcomeToDiagnosticState(params: {
request: StuckSessionRecoveryRequest;
outcome: StuckSessionRecoveryOutcome | undefined;
}): void {
if (!params.outcome) {
return;
}
if (!recoveryOutcomeMutatesSessionState(params.outcome)) {
emitSessionRecoveryCompleted({ request: params.request, outcome: params.outcome });
return;
}
if (
!isDiagnosticSessionStateCurrent({
sessionId: params.request.sessionId,
sessionKey: params.request.sessionKey,
generation: params.request.stateGeneration,
state: "processing",
})
) {
emitSessionRecoveryCompleted({
request: params.request,
outcome: params.outcome,
stale: true,
});
return;
}
const state = getDiagnosticSessionState(params.request);
const prevState = state.state;
state.state = "idle";
state.lastActivity = Date.now();
state.generation = (state.generation ?? 0) + 1;
state.lastStuckWarnAgeMs = undefined;
state.lastLongRunningWarnAgeMs = undefined;
const released = recoveryOutcomeReleasedCount(params.outcome);
state.queueDepth = released > 0 ? 0 : Math.max(0, state.queueDepth - 1);
emitDiagnosticEvent({
type: "session.state",
sessionId: state.sessionId,
sessionKey: state.sessionKey,
prevState,
state: "idle",
reason: `stuck_recovery:${params.outcome.status}`,
queueDepth: state.queueDepth,
});
emitSessionRecoveryCompleted({ request: params.request, outcome: params.outcome });
markActivity();
}
export function requestStuckSessionRecovery(params: {
recover: RecoverStuckSession;
request: StuckSessionRecoveryRequest;
classification: SessionAttentionClassification;
}): void {
const inFlightKey = recoveryRequestKey(params.request);
if (inFlightKey && recoveryRequestsInFlight.has(inFlightKey)) {
emitSessionRecoveryCompleted({
request: params.request,
outcome: {
status: "skipped",
action: "observe_only",
reason: "already_in_flight",
sessionId: params.request.sessionId,
sessionKey: params.request.sessionKey,
activeWorkKind: params.classification.activeWorkKind,
},
});
return;
}
if (inFlightKey) {
recoveryRequestsInFlight.add(inFlightKey);
}
emitSessionRecoveryRequested({
request: params.request,
classification: params.classification,
});
const clearInFlight = () => {
if (inFlightKey) {
recoveryRequestsInFlight.delete(inFlightKey);
}
};
const failRecovery = (err: unknown) => {
applyRecoveryOutcomeToDiagnosticState({
request: params.request,
outcome: {
status: "failed",
action: "none",
reason: "exception",
sessionId: params.request.sessionId,
sessionKey: params.request.sessionKey,
error: String(err),
},
});
};
try {
const result = params.recover(params.request);
if (isRecoveryPromiseLike(result)) {
void result
.then((outcome) => {
applyRecoveryOutcomeToDiagnosticState({
request: params.request,
outcome: outcome ?? undefined,
});
})
.catch(failRecovery)
.finally(clearInFlight);
return;
}
applyRecoveryOutcomeToDiagnosticState({
request: params.request,
outcome: result ?? undefined,
});
clearInFlight();
} catch (err) {
try {
failRecovery(err);
} finally {
clearInFlight();
}
}
}
export function resetDiagnosticSessionRecoveryCoordinatorForTest(): void {
recoveryRequestsInFlight.clear();
}

View File

@@ -0,0 +1,122 @@
import type { DiagnosticSessionActiveWorkKind } from "../infra/diagnostic-events.js";
export type DiagnosticSessionRecoveryStatus =
| "aborted"
| "released"
| "skipped"
| "noop"
| "failed";
export type DiagnosticSessionRecoverySkipReason =
| "active_embedded_run"
| "active_reply_work"
| "active_lane_task"
| "already_in_flight"
| "missing_session_ref"
| "stale_session_state";
export type DiagnosticSessionRecoveryNoopReason = "no_active_work";
export type StuckSessionRecoveryRequest = {
sessionId?: string;
sessionKey?: string;
ageMs: number;
queueDepth?: number;
allowActiveAbort?: boolean;
stateGeneration?: number;
};
type DiagnosticSessionRecoveryBaseOutcome = {
sessionId?: string;
sessionKey?: string;
activeSessionId?: string;
lane?: string;
activeWorkKind?: DiagnosticSessionActiveWorkKind;
};
export type StuckSessionRecoveryOutcome =
| (DiagnosticSessionRecoveryBaseOutcome & {
status: "aborted";
action: "abort_embedded_run";
aborted: boolean;
drained: boolean;
forceCleared: boolean;
released: number;
})
| (DiagnosticSessionRecoveryBaseOutcome & {
status: "released";
action: "release_lane";
released: number;
})
| (DiagnosticSessionRecoveryBaseOutcome & {
status: "skipped";
action: "observe_only" | "keep_lane";
reason: DiagnosticSessionRecoverySkipReason;
activeCount?: number;
queuedCount?: number;
})
| (DiagnosticSessionRecoveryBaseOutcome & {
status: "noop";
action: "none";
reason: DiagnosticSessionRecoveryNoopReason;
})
| (DiagnosticSessionRecoveryBaseOutcome & {
status: "failed";
action: "none";
reason: "exception";
error: string;
});
export function recoveryOutcomeMutatesSessionState(
outcome: StuckSessionRecoveryOutcome | undefined,
): boolean {
if (!outcome) {
return false;
}
return outcome.status === "aborted" || outcome.status === "released";
}
export function recoveryOutcomeReleasedCount(outcome: StuckSessionRecoveryOutcome): number {
return "released" in outcome ? outcome.released : 0;
}
export function formatRecoveryOutcome(outcome: StuckSessionRecoveryOutcome): string {
const fields = [
`status=${outcome.status}`,
`action=${outcome.action}`,
`sessionId=${outcome.sessionId ?? outcome.activeSessionId ?? "unknown"}`,
`sessionKey=${outcome.sessionKey ?? "unknown"}`,
];
if (outcome.activeSessionId) {
fields.push(`activeSessionId=${outcome.activeSessionId}`);
}
if (outcome.activeWorkKind) {
fields.push(`activeWorkKind=${outcome.activeWorkKind}`);
}
if (outcome.lane) {
fields.push(`lane=${outcome.lane}`);
}
if ("reason" in outcome) {
fields.push(`reason=${outcome.reason}`);
}
if ("aborted" in outcome) {
fields.push(
`aborted=${outcome.aborted}`,
`drained=${outcome.drained}`,
`forceCleared=${outcome.forceCleared}`,
);
}
if ("released" in outcome) {
fields.push(`released=${outcome.released}`);
}
if ("activeCount" in outcome && outcome.activeCount !== undefined) {
fields.push(`laneActive=${outcome.activeCount}`);
}
if ("queuedCount" in outcome && outcome.queuedCount !== undefined) {
fields.push(`laneQueued=${outcome.queuedCount}`);
}
if ("error" in outcome) {
fields.push(`error=${outcome.error}`);
}
return fields.join(" ");
}

View File

@@ -4,6 +4,7 @@ export type SessionState = {
sessionId?: string;
sessionKey?: string;
lastActivity: number;
generation?: number;
lastStuckWarnAgeMs?: number;
lastLongRunningWarnAgeMs?: number;
state: SessionStateValue;
@@ -100,6 +101,7 @@ function mergeSessionState(target: SessionState, source: SessionState): void {
if (sourceIsNewer || sourceIsSameAgeAndMoreActive) {
target.state = source.state;
}
target.generation = Math.max(target.generation ?? 0, source.generation ?? 0);
target.lastActivity = Math.max(target.lastActivity, source.lastActivity);
target.queueDepth += source.queueDepth;
target.lastStuckWarnAgeMs =
@@ -156,6 +158,7 @@ export function getDiagnosticSessionState(ref: SessionRef): SessionState {
sessionId: ref.sessionId,
sessionKey: ref.sessionKey,
lastActivity: Date.now(),
generation: 0,
state: "idle",
queueDepth: 0,
};
@@ -164,6 +167,14 @@ export function getDiagnosticSessionState(ref: SessionRef): SessionState {
return created;
}
export function peekDiagnosticSessionState(ref: SessionRef): SessionState | undefined {
const key = resolveSessionKey(ref);
return (
diagnosticSessionStates.get(key) ??
(ref.sessionId ? findStateEntryBySessionId(ref.sessionId)?.[1] : undefined)
);
}
export function getDiagnosticSessionStateCountForTest(): number {
return diagnosticSessionStates.size;
}
@@ -172,3 +183,22 @@ export function resetDiagnosticSessionStateForTest(): void {
diagnosticSessionStates.clear();
lastSessionPruneAt = 0;
}
export function isDiagnosticSessionStateCurrent(params: {
sessionId?: string;
sessionKey?: string;
generation?: number;
state?: SessionStateValue;
}): boolean {
if (params.generation === undefined) {
return true;
}
const state = peekDiagnosticSessionState(params);
if (!state) {
return false;
}
return (
(state.generation ?? 0) === params.generation &&
(params.state === undefined || state.state === params.state)
);
}

View File

@@ -250,6 +250,27 @@ function sanitizeDiagnosticEvent(event: DiagnosticEventPayload): DiagnosticStabi
record.toolName = event.activeToolName;
}
break;
case "session.recovery.requested":
record.outcome = event.state;
record.action = event.allowActiveAbort ? "abort" : "recover";
record.ageMs = event.ageMs;
record.queueDepth = event.queueDepth;
if (event.activeWorkKind) {
record.activeWorkKind = event.activeWorkKind;
}
assignReasonCode(record, event.reason);
break;
case "session.recovery.completed":
record.outcome = event.status;
record.action = event.action;
record.ageMs = event.ageMs;
record.queueDepth = event.queueDepth;
record.count = event.released;
if (event.activeWorkKind) {
record.activeWorkKind = event.activeWorkKind;
}
assignReasonCode(record, event.outcomeReason ?? event.reason);
break;
case "queue.lane.enqueue":
record.source = event.lane;
record.queueSize = event.queueSize;

View File

@@ -12,17 +12,17 @@ import {
formatStoppedCronSessionDiagnosticFields,
resolveCronSessionDiagnosticContext,
} from "./diagnostic-session-context.js";
import {
formatRecoveryOutcome,
type StuckSessionRecoveryOutcome,
type StuckSessionRecoveryRequest,
} from "./diagnostic-session-recovery.js";
import { isDiagnosticSessionStateCurrent } from "./diagnostic-session-state.js";
const STUCK_SESSION_ABORT_SETTLE_MS = 15_000;
const recoveriesInFlight = new Set<string>();
export type StuckSessionRecoveryParams = {
sessionId?: string;
sessionKey?: string;
ageMs: number;
queueDepth?: number;
allowActiveAbort?: boolean;
};
export type StuckSessionRecoveryParams = StuckSessionRecoveryRequest;
function recoveryKey(params: StuckSessionRecoveryParams): string | undefined {
return params.sessionKey?.trim() || params.sessionId?.trim() || undefined;
@@ -55,14 +55,36 @@ function formatRecoveryContext(
export async function recoverStuckDiagnosticSession(
params: StuckSessionRecoveryParams,
): Promise<void> {
): Promise<StuckSessionRecoveryOutcome> {
const key = recoveryKey(params);
if (!key || recoveriesInFlight.has(key)) {
return;
return {
status: "skipped",
action: "observe_only",
reason: key ? "already_in_flight" : "missing_session_ref",
sessionId: params.sessionId,
sessionKey: params.sessionKey,
};
}
recoveriesInFlight.add(key);
try {
if (
!isDiagnosticSessionStateCurrent({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
generation: params.stateGeneration,
state: "processing",
})
) {
return {
status: "skipped",
action: "observe_only",
reason: "stale_session_state",
sessionId: params.sessionId,
sessionKey: params.sessionKey,
};
}
const fallbackActiveSessionId =
params.sessionId && isEmbeddedPiRunHandleActive(params.sessionId)
? params.sessionId
@@ -77,16 +99,24 @@ export async function recoverStuckDiagnosticSession(
const sessionLane = laneKey ? resolveEmbeddedSessionLane(laneKey) : null;
let aborted = false;
let drained = true;
let forceCleared = false;
if (activeSessionId) {
if (params.allowActiveAbort !== true) {
const outcome: StuckSessionRecoveryOutcome = {
status: "skipped",
action: "observe_only",
reason: "active_embedded_run",
sessionId: params.sessionId,
sessionKey: params.sessionKey,
activeSessionId,
activeWorkKind: "embedded_run",
};
diag.warn(
`stuck session recovery skipped: reason=active_embedded_run action=observe_only ${formatRecoveryContext(
params,
{ activeSessionId },
)}`,
`stuck session recovery skipped: ${formatRecoveryContext(params, { activeSessionId })}`,
);
return;
diag.warn(`stuck session recovery outcome: ${formatRecoveryOutcome(outcome)}`);
return outcome;
}
const result = await abortAndDrainEmbeddedPiRun({
sessionId: activeSessionId,
@@ -97,32 +127,38 @@ export async function recoverStuckDiagnosticSession(
});
aborted = result.aborted;
drained = result.drained;
forceCleared = result.forceCleared;
}
if (!activeSessionId && activeWorkSessionId && isEmbeddedPiRunActive(activeWorkSessionId)) {
diag.warn(
`stuck session recovery skipped: reason=active_reply_work action=keep_lane ${formatRecoveryContext(
params,
{ activeSessionId: activeWorkSessionId },
)}`,
);
return;
const outcome: StuckSessionRecoveryOutcome = {
status: "skipped",
action: "keep_lane",
reason: "active_reply_work",
sessionId: params.sessionId,
sessionKey: params.sessionKey,
activeSessionId: activeWorkSessionId,
activeWorkKind: "embedded_run",
};
diag.warn(`stuck session recovery outcome: ${formatRecoveryOutcome(outcome)}`);
return outcome;
}
if (!activeSessionId && sessionLane) {
const laneSnapshot = getCommandLaneSnapshot(sessionLane);
if (laneSnapshot.activeCount > 0) {
diag.warn(
`stuck session recovery skipped: reason=active_lane_task action=keep_lane ${formatRecoveryContext(
params,
{
lane: sessionLane,
activeCount: laneSnapshot.activeCount,
queuedCount: laneSnapshot.queuedCount,
},
)}`,
);
return;
const outcome: StuckSessionRecoveryOutcome = {
status: "skipped",
action: "keep_lane",
reason: "active_lane_task",
sessionId: params.sessionId,
sessionKey: params.sessionKey,
lane: sessionLane,
activeCount: laneSnapshot.activeCount,
queuedCount: laneSnapshot.queuedCount,
};
diag.warn(`stuck session recovery outcome: ${formatRecoveryOutcome(outcome)}`);
return outcome;
}
}
@@ -141,22 +177,56 @@ export async function recoverStuckDiagnosticSession(
stoppedFields ? ` ${stoppedFields}` : ""
}`,
);
} else {
diag.warn(
`stuck session recovery no-op: reason=no_active_work action=none ${formatRecoveryContext(
params,
{
const outcome: StuckSessionRecoveryOutcome = aborted
? {
status: "aborted",
action: "abort_embedded_run",
sessionId: params.sessionId,
sessionKey: params.sessionKey,
activeSessionId,
activeWorkKind: "embedded_run",
aborted,
drained,
forceCleared,
released,
lane: sessionLane ?? undefined,
},
)}`,
);
}
: {
status: "released",
action: "release_lane",
sessionId: params.sessionId,
sessionKey: params.sessionKey,
released,
lane: sessionLane ?? undefined,
};
diag.warn(`stuck session recovery outcome: ${formatRecoveryOutcome(outcome)}`);
return outcome;
}
const outcome: StuckSessionRecoveryOutcome = {
status: "noop",
action: "none",
reason: "no_active_work",
sessionId: params.sessionId,
sessionKey: params.sessionKey,
lane: sessionLane ?? undefined,
};
diag.warn(`stuck session recovery outcome: ${formatRecoveryOutcome(outcome)}`);
return outcome;
} catch (err) {
const outcome: StuckSessionRecoveryOutcome = {
status: "failed",
action: "none",
reason: "exception",
sessionId: params.sessionId,
sessionKey: params.sessionKey,
error: String(err),
};
diag.warn(
`stuck session recovery failed: sessionId=${params.sessionId ?? "unknown"} sessionKey=${
params.sessionKey ?? "unknown"
} err=${String(err)}`,
);
return outcome;
} finally {
recoveriesInFlight.delete(key);
}

View File

@@ -33,6 +33,7 @@ import {
diagnosticLogger,
markDiagnosticSessionProgress,
resetDiagnosticStateForTest,
resolveStuckSessionAbortMs,
resolveStuckSessionWarnMs,
startDiagnosticHeartbeat,
} from "./diagnostic.js";
@@ -78,6 +79,7 @@ describe("diagnostic session state pruning", () => {
diagnosticSessionStates.set(`session-${i}`, {
sessionId: `session-${i}`,
lastActivity: now + i,
generation: 0,
state: "idle",
queueDepth: 1,
});
@@ -232,6 +234,7 @@ describe("stuck session diagnostics threshold", () => {
sessionKey: "main",
ageMs: expect.any(Number),
queueDepth: 0,
stateGeneration: expect.any(Number),
});
});
@@ -271,6 +274,7 @@ describe("stuck session diagnostics threshold", () => {
sessionKey: "main",
ageMs: expect.any(Number),
queueDepth: 1,
stateGeneration: expect.any(Number),
});
});
@@ -443,9 +447,191 @@ describe("stuck session diagnostics threshold", () => {
ageMs: expect.any(Number),
queueDepth: 0,
allowActiveAbort: true,
stateGeneration: expect.any(Number),
});
});
it("uses diagnostics.stuckSessionAbortMs for stalled active-work recovery", () => {
const recoverStuckSession = vi.fn();
startDiagnosticHeartbeat(
{
diagnostics: {
enabled: true,
stuckSessionWarnMs: 30_000,
stuckSessionAbortMs: 60_000,
},
},
{ recoverStuckSession },
);
logSessionStateChange({ sessionId: "s1", sessionKey: "main", state: "processing" });
markDiagnosticEmbeddedRunStarted({ sessionId: "s1", sessionKey: "main" });
vi.advanceTimersByTime(61_000);
expect(recoverStuckSession).toHaveBeenCalledWith({
sessionId: "s1",
sessionKey: "main",
ageMs: expect.any(Number),
queueDepth: 0,
allowActiveAbort: true,
stateGeneration: expect.any(Number),
});
});
it("marks diagnostic session state idle only after a mutating recovery outcome", async () => {
const events: DiagnosticEventPayload[] = [];
const recoverStuckSession = vi.fn().mockResolvedValue({
status: "released",
action: "release_lane",
released: 1,
sessionId: "s1",
sessionKey: "main",
});
const unsubscribe = onDiagnosticEvent((event) => {
events.push(event);
});
try {
startDiagnosticHeartbeat(
{
diagnostics: {
enabled: true,
stuckSessionWarnMs: 30_000,
},
},
{ recoverStuckSession },
);
logMessageQueued({ sessionId: "s1", sessionKey: "main", source: "test" });
logSessionStateChange({ sessionId: "s1", sessionKey: "main", state: "processing" });
vi.advanceTimersByTime(61_000);
await Promise.resolve();
} finally {
unsubscribe();
}
const state = getDiagnosticSessionState({ sessionId: "s1", sessionKey: "main" });
expect(state.state).toBe("idle");
expect(state.queueDepth).toBe(0);
expect(events).toContainEqual(
expect.objectContaining({
type: "session.recovery.completed",
status: "released",
action: "release_lane",
}),
);
});
it("does not mark a newer processing generation idle after a late recovery outcome", async () => {
const events: DiagnosticEventPayload[] = [];
const recoverStuckSession = vi.fn().mockImplementation(async () => {
markDiagnosticSessionProgress({ sessionId: "s1", sessionKey: "main" });
return {
status: "released",
action: "release_lane",
released: 1,
sessionId: "s1",
sessionKey: "main",
};
});
const unsubscribe = onDiagnosticEvent((event) => {
events.push(event);
});
try {
startDiagnosticHeartbeat(
{
diagnostics: {
enabled: true,
stuckSessionWarnMs: 30_000,
},
},
{ recoverStuckSession },
);
logSessionStateChange({ sessionId: "s1", sessionKey: "main", state: "processing" });
vi.advanceTimersByTime(61_000);
await Promise.resolve();
await Promise.resolve();
} finally {
unsubscribe();
}
expect(getDiagnosticSessionState({ sessionId: "s1", sessionKey: "main" }).state).toBe(
"processing",
);
expect(events).toContainEqual(
expect.objectContaining({
type: "session.recovery.completed",
status: "released",
stale: true,
}),
);
});
it("does not start duplicate recovery for the same processing generation", async () => {
const events: DiagnosticEventPayload[] = [];
let resolveRecovery:
| ((outcome: {
status: "noop";
action: "none";
reason: "no_active_work";
sessionId: string;
sessionKey: string;
}) => void)
| undefined;
const recoverStuckSession = vi.fn(
() =>
new Promise<{
status: "noop";
action: "none";
reason: "no_active_work";
sessionId: string;
sessionKey: string;
}>((resolve) => {
resolveRecovery = resolve;
}),
);
const unsubscribe = onDiagnosticEvent((event) => {
events.push(event);
});
try {
startDiagnosticHeartbeat(
{
diagnostics: {
enabled: true,
stuckSessionWarnMs: 30_000,
},
},
{ recoverStuckSession },
);
logSessionStateChange({ sessionId: "s1", sessionKey: "main", state: "processing" });
vi.advanceTimersByTime(61_000);
expect(recoverStuckSession).toHaveBeenCalledTimes(1);
vi.advanceTimersByTime(60_000);
expect(recoverStuckSession).toHaveBeenCalledTimes(1);
expect(events).toContainEqual(
expect.objectContaining({
type: "session.recovery.completed",
status: "skipped",
outcomeReason: "already_in_flight",
}),
);
resolveRecovery?.({
status: "noop",
action: "none",
reason: "no_active_work",
sessionId: "s1",
sessionKey: "main",
});
await Promise.resolve();
} finally {
unsubscribe();
}
});
it("reports long-running sessions separately when active work is making progress", () => {
const events: DiagnosticEventPayload[] = [];
const recoverStuckSession = vi.fn();
@@ -894,6 +1080,16 @@ describe("stuck session diagnostics threshold", () => {
expect(resolveStuckSessionWarnMs({ diagnostics: { stuckSessionWarnMs: -1 } })).toBe(120_000);
expect(resolveStuckSessionWarnMs({ diagnostics: { stuckSessionWarnMs: 0 } })).toBe(120_000);
expect(resolveStuckSessionWarnMs()).toBe(120_000);
expect(
resolveStuckSessionAbortMs({ diagnostics: { stuckSessionAbortMs: 5_000 } }, 30_000),
).toBe(30_000);
expect(
resolveStuckSessionAbortMs(
{ diagnostics: { stuckSessionAbortMs: 48 * 60 * 60_000 } },
30_000,
),
).toBe(48 * 60 * 60_000);
expect(resolveStuckSessionAbortMs(undefined, 30_000)).toBe(10 * 60_000);
});
});

View File

@@ -33,6 +33,15 @@ import {
formatCronSessionDiagnosticFields,
resolveCronSessionDiagnosticContext,
} from "./diagnostic-session-context.js";
import {
requestStuckSessionRecovery,
resetDiagnosticSessionRecoveryCoordinatorForTest,
type RecoverStuckSession,
} from "./diagnostic-session-recovery-coordinator.js";
import {
type StuckSessionRecoveryOutcome,
type StuckSessionRecoveryRequest,
} from "./diagnostic-session-recovery.js";
import {
diagnosticSessionStates,
getDiagnosticSessionState,
@@ -92,14 +101,6 @@ type DiagnosticWorkSnapshot = {
queuedLabels: string[];
};
type RecoverStuckSession = (params: {
sessionId?: string;
sessionKey?: string;
ageMs: number;
queueDepth?: number;
allowActiveAbort?: boolean;
}) => void | Promise<void>;
type DiagnosticLivenessSample = {
reasons: DiagnosticLivenessWarningReason[];
intervalMs: number;
@@ -136,18 +137,22 @@ function loadCommandPollBackoffRuntime() {
return commandPollBackoffRuntimePromise;
}
function recoverStuckSession(params: {
sessionId?: string;
sessionKey?: string;
ageMs: number;
queueDepth?: number;
allowActiveAbort?: boolean;
}) {
async function recoverStuckSession(
params: StuckSessionRecoveryRequest,
): Promise<StuckSessionRecoveryOutcome> {
stuckSessionRecoveryRuntimePromise ??= import("./diagnostic-stuck-session-recovery.runtime.js");
void stuckSessionRecoveryRuntimePromise
return stuckSessionRecoveryRuntimePromise
.then(({ recoverStuckDiagnosticSession }) => recoverStuckDiagnosticSession(params))
.catch((err) => {
diag.warn(`stuck session recovery unavailable: ${String(err)}`);
return {
status: "failed",
action: "none",
reason: "exception",
sessionId: params.sessionId,
sessionKey: params.sessionKey,
error: String(err),
};
});
}
@@ -425,6 +430,21 @@ export function resolveStuckSessionWarnMs(config?: OpenClawConfig): number {
return rounded;
}
export function resolveStuckSessionAbortMs(
config: OpenClawConfig | undefined,
stuckSessionWarnMs: number,
): number {
const raw = config?.diagnostics?.stuckSessionAbortMs;
if (typeof raw !== "number" || !Number.isFinite(raw)) {
return resolveStalledEmbeddedRunAbortMs(stuckSessionWarnMs);
}
const rounded = Math.floor(raw);
if (rounded <= 0) {
return resolveStalledEmbeddedRunAbortMs(stuckSessionWarnMs);
}
return Math.max(stuckSessionWarnMs, rounded);
}
function resolveStalledEmbeddedRunAbortMs(stuckSessionWarnMs: number): number {
return Math.max(
MIN_STALLED_EMBEDDED_RUN_ABORT_MS,
@@ -435,13 +455,13 @@ function resolveStalledEmbeddedRunAbortMs(stuckSessionWarnMs: number): number {
function isStalledEmbeddedRunRecoveryEligible(params: {
classification: SessionAttentionClassification | undefined;
ageMs: number;
stuckSessionWarnMs: number;
stuckSessionAbortMs: number;
}): boolean {
return (
params.classification?.eventType === "session.stalled" &&
params.classification.classification === "stalled_agent_run" &&
params.classification.activeWorkKind === "embedded_run" &&
params.ageMs >= resolveStalledEmbeddedRunAbortMs(params.stuckSessionWarnMs)
params.ageMs >= params.stuckSessionAbortMs
);
}
@@ -537,6 +557,7 @@ export function logMessageQueued(params: {
const state = getDiagnosticSessionState(params);
state.queueDepth += 1;
state.lastActivity = Date.now();
state.generation = (state.generation ?? 0) + 1;
state.lastStuckWarnAgeMs = undefined;
state.lastLongRunningWarnAgeMs = undefined;
if (diag.isEnabled("debug")) {
@@ -617,6 +638,7 @@ export function logSessionStateChange(
const prevState = state.state;
state.state = params.state;
state.lastActivity = Date.now();
state.generation = (state.generation ?? 0) + 1;
state.lastStuckWarnAgeMs = undefined;
state.lastLongRunningWarnAgeMs = undefined;
if (params.state === "idle") {
@@ -649,6 +671,7 @@ export function markDiagnosticSessionProgress(params: SessionRef) {
}
const state = getDiagnosticSessionState(params);
state.lastActivity = Date.now();
state.generation = (state.generation ?? 0) + 1;
state.lastStuckWarnAgeMs = undefined;
state.lastLongRunningWarnAgeMs = undefined;
markActivity();
@@ -724,6 +747,7 @@ export function logSessionAttention(
state: SessionStateValue;
ageMs: number;
thresholdMs: number;
abortThresholdMs?: number;
},
): SessionAttentionClassification | undefined {
if (!areDiagnosticsEnabledForProcess()) {
@@ -744,7 +768,8 @@ export function logSessionAttention(
isStalledEmbeddedRunRecoveryEligible({
classification,
ageMs: params.ageMs,
stuckSessionWarnMs: params.thresholdMs,
stuckSessionAbortMs:
params.abortThresholdMs ?? resolveStalledEmbeddedRunAbortMs(params.thresholdMs),
});
if (classification.eventType === "session.stuck") {
const nextWarnAgeMs =
@@ -924,6 +949,7 @@ export function startDiagnosticHeartbeat(
}
}
const stuckSessionWarnMs = resolveStuckSessionWarnMs(heartbeatConfig);
const stuckSessionAbortMs = resolveStuckSessionAbortMs(heartbeatConfig, stuckSessionWarnMs);
const now = Date.now();
pruneDiagnosticSessionStates(now, true);
const work = getDiagnosticWorkSnapshot(now);
@@ -981,27 +1007,39 @@ export function startDiagnosticHeartbeat(
state: state.state,
ageMs,
thresholdMs: stuckSessionWarnMs,
abortThresholdMs: stuckSessionAbortMs,
});
if (classification?.recoveryEligible) {
void (opts?.recoverStuckSession ?? recoverStuckSession)({
sessionId: state.sessionId,
sessionKey: state.sessionKey,
ageMs,
queueDepth: state.queueDepth,
requestStuckSessionRecovery({
recover: opts?.recoverStuckSession ?? recoverStuckSession,
classification,
request: {
sessionId: state.sessionId,
sessionKey: state.sessionKey,
ageMs,
queueDepth: state.queueDepth,
stateGeneration: state.generation,
},
});
} else if (
classification &&
isStalledEmbeddedRunRecoveryEligible({
classification,
ageMs,
stuckSessionWarnMs,
stuckSessionAbortMs,
})
) {
void (opts?.recoverStuckSession ?? recoverStuckSession)({
sessionId: state.sessionId,
sessionKey: state.sessionKey,
ageMs,
queueDepth: state.queueDepth,
allowActiveAbort: true,
requestStuckSessionRecovery({
recover: opts?.recoverStuckSession ?? recoverStuckSession,
classification,
request: {
sessionId: state.sessionId,
sessionKey: state.sessionKey,
ageMs,
queueDepth: state.queueDepth,
allowActiveAbort: true,
stateGeneration: state.generation,
},
});
}
}
@@ -1025,6 +1063,7 @@ export function getDiagnosticSessionStateCountForTest(): number {
}
export function resetDiagnosticStateForTest(): void {
resetDiagnosticSessionRecoveryCoordinatorForTest();
resetDiagnosticSessionStateForTest();
resetDiagnosticActivityForTest();
resetDiagnosticRunActivityForTest();