diff --git a/CHANGELOG.md b/CHANGELOG.md index 545cfd0bbd0..618574cf045 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -65,6 +65,7 @@ Docs: https://docs.openclaw.ai - Slack/subagents: keep resumed parent `message.send` calls in the originating Slack thread when ambient session thread context is present, and suppress successful silent child completion rows from follow-up findings. Thanks @bek91. - Infra/Windows: skip the POSIX `/tmp/openclaw` preferred path on Windows in `resolvePreferredOpenClawTmpDir` so log files, TTS temp files, and other writes land in `%TEMP%\openclaw-` instead of `C:\tmp\openclaw`. Fixes #60713. Thanks @juan-flores077. +- Gateway/diagnostics: make stuck-session recovery outcome-driven and generation-guarded, add `diagnostics.stuckSessionAbortMs`, and emit structured recovery requested/completed events so stale or skipped recovery no longer looks like a successful abort. - Media/Windows: open saved attachment temp files read/write before fsync so Windows WebChat and `chat.send` media offloads no longer fail with EPERM during durability flush. (#76593) Thanks @qq230849622-a11y. - Agents/tools: honor narrow runtime tool allowlists when constructing embedded-runner tool families and bundled MCP/LSP runtimes, so cron/subagent runs that request tools such as `update_plan`, `browser`, `x_search`, channel login tools, or `group:plugins` no longer start with missing tools or unrelated bootstrap work. (#77519, #77532) - Codex plugin: mirror the experimental upstream app-server protocol and format generated TypeScript before drift checks, keeping OpenClaw's `experimentalApi` bridge compatible with latest Codex while preserving formatter gates. diff --git a/docs/.generated/config-baseline.sha256 b/docs/.generated/config-baseline.sha256 index e0dfaa2f385..309743143e4 100644 --- a/docs/.generated/config-baseline.sha256 +++ b/docs/.generated/config-baseline.sha256 @@ -1,4 +1,4 @@ -b4491e9b8ea5606cad18c1acf06f03d35301ebec1974d201ec9ee7582d2f6001 config-baseline.json -9c0c9369d49c2001f91ec030e3852ccdc2ac9084229f335804aa9141c13b4795 config-baseline.core.json +657060e80f3dc4b7d992e8625d2a8b0ff9b1b408960148d3f5f6a381d602359a config-baseline.json +92cbb12ca382f7424e7bd52df21798b10a57621f5c266909fa74e23f6cb973d7 config-baseline.core.json cd7c0c7fb1435bc7e59099e9ac334462d5ad444016e9ab4512aae63a238f78dc config-baseline.channel.json 9832b30a696930a3da7efccf38073137571e1b66cae84e54d747b733fdafcc54 config-baseline.plugin.json diff --git a/docs/concepts/agent-loop.md b/docs/concepts/agent-loop.md index 9c6584f4d8e..1917012aa91 100644 --- a/docs/concepts/agent-loop.md +++ b/docs/concepts/agent-loop.md @@ -165,7 +165,7 @@ surfaces, while Codex native hooks remain a separate lower-level Codex mechanism - `agent.wait` default: 30s (just the wait). `timeoutMs` param overrides. - Agent runtime: `agents.defaults.timeoutSeconds` default 172800s (48 hours); enforced in `runEmbeddedPiAgent` abort timer. - Cron runtime: isolated agent-turn `timeoutSeconds` is owned by cron. The scheduler starts that timer when execution begins, aborts the underlying run at the configured deadline, then runs bounded cleanup before recording the timeout so a stale child session cannot keep the lane stuck. -- Session liveness diagnostics: with diagnostics enabled, `diagnostics.stuckSessionWarnMs` classifies long `processing` sessions that have no observed reply, tool, status, block, or ACP progress. Active embedded runs, model calls, and tool calls report as `session.long_running`; active work with no recent progress reports as `session.stalled`; `session.stuck` is reserved for stale session bookkeeping with no active work. Stale session bookkeeping releases the affected session lane immediately; stalled embedded runs are abort-drained only after an extended no-progress window (at least 10 minutes and 5x the warning threshold) so queued work can resume without cutting off merely slow runs. Repeated `session.stuck` diagnostics back off while the session remains unchanged. +- Session liveness diagnostics: with diagnostics enabled, `diagnostics.stuckSessionWarnMs` classifies long `processing` sessions that have no observed reply, tool, status, block, or ACP progress. Active embedded runs, model calls, and tool calls report as `session.long_running`; active work with no recent progress reports as `session.stalled`; `session.stuck` is reserved for stale session bookkeeping with no active work. Stale session bookkeeping releases the affected session lane immediately; stalled embedded runs are abort-drained only after `diagnostics.stuckSessionAbortMs` (default: at least 10 minutes and 5x the warning threshold) so queued work can resume without cutting off merely slow runs. Recovery emits structured requested/completed outcomes, and diagnostic state is marked idle only if the same processing generation is still current. Repeated `session.stuck` diagnostics back off while the session remains unchanged. - Model idle timeout: OpenClaw aborts a model request when no response chunks arrive before the idle window. `models.providers..timeoutSeconds` extends this idle watchdog for slow local/self-hosted providers; otherwise OpenClaw uses `agents.defaults.timeoutSeconds` when configured, capped at 120s by default. Cron-triggered runs with no explicit model or agent timeout disable the idle watchdog and rely on the cron outer timeout. - Provider HTTP request timeout: `models.providers..timeoutSeconds` applies to that provider's model HTTP fetches, including connect, headers, body, SDK request timeout, total guarded-fetch abort handling, and model stream idle watchdog. Use this for slow local/self-hosted providers such as Ollama before raising the whole agent runtime timeout. diff --git a/docs/gateway/configuration-reference.md b/docs/gateway/configuration-reference.md index 789bc97c3e9..ddb0c861e61 100644 --- a/docs/gateway/configuration-reference.md +++ b/docs/gateway/configuration-reference.md @@ -920,6 +920,7 @@ Notes: enabled: true, flags: ["telegram.*"], stuckSessionWarnMs: 30000, + stuckSessionAbortMs: 600000, otel: { enabled: false, @@ -959,6 +960,7 @@ Notes: - `enabled`: master toggle for instrumentation output (default: `true`). - `flags`: array of flag strings enabling targeted log output (supports wildcards like `"telegram.*"` or `"*"`). - `stuckSessionWarnMs`: no-progress age threshold in ms for classifying long-running processing sessions as `session.long_running`, `session.stalled`, or `session.stuck`. Reply, tool, status, block, and ACP progress reset the timer; repeated `session.stuck` diagnostics back off while unchanged. +- `stuckSessionAbortMs`: no-progress age threshold in ms before eligible stalled active work may be abort-drained for recovery. When unset, OpenClaw uses the safer extended embedded-run window of at least 10 minutes and 5x `stuckSessionWarnMs`. - `otel.enabled`: enables the OpenTelemetry export pipeline (default: `false`). For the full configuration, signal catalog, and privacy model, see [OpenTelemetry export](/gateway/opentelemetry). - `otel.endpoint`: collector URL for OTel export. - `otel.tracesEndpoint` / `otel.metricsEndpoint` / `otel.logsEndpoint`: optional signal-specific OTLP endpoints. When set, they override `otel.endpoint` for that signal only. diff --git a/docs/gateway/opentelemetry.md b/docs/gateway/opentelemetry.md index 31e0b82f455..3f9883afe0a 100644 --- a/docs/gateway/opentelemetry.md +++ b/docs/gateway/opentelemetry.md @@ -216,11 +216,18 @@ OpenClaw classifies sessions by the work it can still observe: still making progress. - `session.stalled`: active work exists, but the active run has not reported recent progress. Stalled embedded runs stay observe-only at first, then - abort-drain after at least 10 minutes and 5x `diagnostics.stuckSessionWarnMs` - with no progress so queued turns behind the lane can resume. + abort-drain after `diagnostics.stuckSessionAbortMs` with no progress so queued + turns behind the lane can resume. When unset, the abort threshold defaults to + the safer extended window of at least 10 minutes and 5x + `diagnostics.stuckSessionWarnMs`. - `session.stuck`: stale session bookkeeping with no active work. This releases the affected session lane immediately. +Recovery emits structured `session.recovery.requested` and +`session.recovery.completed` events. Diagnostic session state is marked idle +only after a mutating recovery outcome (`aborted` or `released`) and only if the +same processing generation is still current. + Only `session.stuck` emits the `openclaw.session.stuck` counter, the `openclaw.session.stuck_age_ms` histogram, and the `openclaw.session.stuck` span. Repeated `session.stuck` diagnostics back off while the session remains diff --git a/src/config/schema.base.generated.ts b/src/config/schema.base.generated.ts index 8b1e238f020..8097867c164 100644 --- a/src/config/schema.base.generated.ts +++ b/src/config/schema.base.generated.ts @@ -151,6 +151,14 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = { description: "No-progress age threshold in milliseconds for classifying long processing sessions as long-running, stalled, or stuck. Reply, tool, status, block, and ACP progress reset the timer; repeated stuck diagnostics back off while unchanged.", }, + stuckSessionAbortMs: { + type: "integer", + exclusiveMinimum: 0, + maximum: 9007199254740991, + title: "Session Abort Threshold (ms)", + description: + "No-progress age threshold in milliseconds before eligible stalled active work may be abort-drained for recovery. Defaults to the safer extended embedded-run recovery window.", + }, otel: { type: "object", properties: { @@ -24666,6 +24674,11 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = { help: "No-progress age threshold in milliseconds for classifying long processing sessions as long-running, stalled, or stuck. Reply, tool, status, block, and ACP progress reset the timer; repeated stuck diagnostics back off while unchanged.", tags: ["observability", "storage"], }, + "diagnostics.stuckSessionAbortMs": { + label: "Session Abort Threshold (ms)", + help: "No-progress age threshold in milliseconds before eligible stalled active work may be abort-drained for recovery. Defaults to the safer extended embedded-run recovery window.", + tags: ["observability", "storage"], + }, "diagnostics.otel.enabled": { label: "OpenTelemetry Enabled", help: "Enables OpenTelemetry export pipeline for traces, metrics, and logs based on configured endpoint/protocol settings. Keep disabled unless your collector endpoint and auth are fully configured.", diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index 006051ea634..7c24c86c427 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -588,6 +588,8 @@ export const FIELD_HELP: Record = { "Master toggle for diagnostics instrumentation output in logs and telemetry wiring paths. Defaults to enabled; set false only in tightly constrained environments.", "diagnostics.stuckSessionWarnMs": "No-progress age threshold in milliseconds for classifying long processing sessions as long-running, stalled, or stuck. Reply, tool, status, block, and ACP progress reset the timer; repeated stuck diagnostics back off while unchanged.", + "diagnostics.stuckSessionAbortMs": + "No-progress age threshold in milliseconds before eligible stalled active work may be abort-drained for recovery. Defaults to the safer extended embedded-run recovery window.", "diagnostics.otel.enabled": "Enables OpenTelemetry export pipeline for traces, metrics, and logs based on configured endpoint/protocol settings. Keep disabled unless your collector endpoint and auth are fully configured.", "diagnostics.otel.endpoint": diff --git a/src/config/schema.labels.ts b/src/config/schema.labels.ts index eb683423c90..ccd5068b338 100644 --- a/src/config/schema.labels.ts +++ b/src/config/schema.labels.ts @@ -41,6 +41,7 @@ export const FIELD_LABELS: Record = { "diagnostics.enabled": "Diagnostics Enabled", "diagnostics.flags": "Diagnostics Flags", "diagnostics.stuckSessionWarnMs": "Session Liveness Threshold (ms)", + "diagnostics.stuckSessionAbortMs": "Session Abort Threshold (ms)", "diagnostics.otel.enabled": "OpenTelemetry Enabled", "diagnostics.otel.endpoint": "OpenTelemetry Endpoint", "diagnostics.otel.tracesEndpoint": "OpenTelemetry Traces Endpoint", diff --git a/src/config/types.base.ts b/src/config/types.base.ts index b844b70b78a..c441b6a4582 100644 --- a/src/config/types.base.ts +++ b/src/config/types.base.ts @@ -309,6 +309,8 @@ export type DiagnosticsConfig = { flags?: string[]; /** Threshold in ms before a processing session with no observed progress logs diagnostics. */ stuckSessionWarnMs?: number; + /** Threshold in ms before eligible stalled active work may be aborted for recovery. */ + stuckSessionAbortMs?: number; otel?: DiagnosticsOtelConfig; cacheTrace?: DiagnosticsCacheTraceConfig; }; diff --git a/src/config/zod-schema.ts b/src/config/zod-schema.ts index 9de258fe3f6..d3a61e86e12 100644 --- a/src/config/zod-schema.ts +++ b/src/config/zod-schema.ts @@ -342,6 +342,7 @@ export const OpenClawSchema = z enabled: z.boolean().optional(), flags: z.array(z.string()).optional(), stuckSessionWarnMs: z.number().int().positive().optional(), + stuckSessionAbortMs: z.number().int().positive().optional(), otel: z .object({ enabled: z.boolean().optional(), diff --git a/src/gateway/config-reload-plan.ts b/src/gateway/config-reload-plan.ts index 16e7ab3d01e..0dd8a5a1506 100644 --- a/src/gateway/config-reload-plan.ts +++ b/src/gateway/config-reload-plan.ts @@ -63,8 +63,9 @@ const BASE_RELOAD_RULES: ReloadRule[] = [ kind: "hot", actions: ["restart-health-monitor"], }, - // Stuck-session warning threshold is read by the diagnostics heartbeat loop. + // Stuck-session thresholds are read by the diagnostics heartbeat loop. { prefix: "diagnostics.stuckSessionWarnMs", kind: "none" }, + { prefix: "diagnostics.stuckSessionAbortMs", kind: "none" }, { prefix: "hooks.gmail", kind: "hot", actions: ["restart-gmail-watcher"] }, { prefix: "hooks", kind: "hot", actions: ["reload-hooks"] }, { diff --git a/src/gateway/config-reload.test.ts b/src/gateway/config-reload.test.ts index a094c5c92bf..f3b451ddc6d 100644 --- a/src/gateway/config-reload.test.ts +++ b/src/gateway/config-reload.test.ts @@ -388,10 +388,14 @@ describe("buildGatewayReloadPlan", () => { expect(plan.noopPaths).toContain("secrets.providers.default.path"); }); - it("treats diagnostics.stuckSessionWarnMs as no-op for gateway restart planning", () => { - const plan = buildGatewayReloadPlan(["diagnostics.stuckSessionWarnMs"]); + it("treats diagnostics stuck-session thresholds as no-op for gateway restart planning", () => { + const plan = buildGatewayReloadPlan([ + "diagnostics.stuckSessionWarnMs", + "diagnostics.stuckSessionAbortMs", + ]); expect(plan.restartGateway).toBe(false); expect(plan.noopPaths).toContain("diagnostics.stuckSessionWarnMs"); + expect(plan.noopPaths).toContain("diagnostics.stuckSessionAbortMs"); }); it("restarts for gateway.auth.token changes", () => { diff --git a/src/infra/diagnostic-events.ts b/src/infra/diagnostic-events.ts index 08015523027..113670d2879 100644 --- a/src/infra/diagnostic-events.ts +++ b/src/infra/diagnostic-events.ts @@ -164,6 +164,38 @@ export type DiagnosticSessionStuckEvent = DiagnosticSessionAttentionBaseEvent & classification: "stale_session_state"; }; +export type DiagnosticSessionRecoveryStatus = + | "aborted" + | "released" + | "skipped" + | "noop" + | "failed"; + +type DiagnosticSessionRecoveryBaseEvent = DiagnosticBaseEvent & { + sessionKey?: string; + sessionId?: string; + state: DiagnosticSessionState; + stateGeneration?: number; + ageMs: number; + queueDepth?: number; + reason?: string; + activeWorkKind?: DiagnosticSessionActiveWorkKind; + allowActiveAbort?: boolean; +}; + +export type DiagnosticSessionRecoveryRequestedEvent = DiagnosticSessionRecoveryBaseEvent & { + type: "session.recovery.requested"; +}; + +export type DiagnosticSessionRecoveryCompletedEvent = DiagnosticSessionRecoveryBaseEvent & { + type: "session.recovery.completed"; + status: DiagnosticSessionRecoveryStatus; + action: string; + outcomeReason?: string; + released?: number; + stale?: boolean; +}; + export type DiagnosticLaneEnqueueEvent = DiagnosticBaseEvent & { type: "queue.lane.enqueue"; lane: string; @@ -520,6 +552,8 @@ export type DiagnosticEventPayload = | DiagnosticSessionLongRunningEvent | DiagnosticSessionStalledEvent | DiagnosticSessionStuckEvent + | DiagnosticSessionRecoveryRequestedEvent + | DiagnosticSessionRecoveryCompletedEvent | DiagnosticLaneEnqueueEvent | DiagnosticLaneDequeueEvent | DiagnosticRunAttemptEvent diff --git a/src/logging/diagnostic-session-recovery-coordinator.ts b/src/logging/diagnostic-session-recovery-coordinator.ts new file mode 100644 index 00000000000..6c3d23d20c1 --- /dev/null +++ b/src/logging/diagnostic-session-recovery-coordinator.ts @@ -0,0 +1,200 @@ +import { emitDiagnosticEvent } from "../infra/diagnostic-events.js"; +import { markDiagnosticActivity as markActivity } from "./diagnostic-runtime.js"; +import type { SessionAttentionClassification } from "./diagnostic-session-attention.js"; +import { + recoveryOutcomeMutatesSessionState, + recoveryOutcomeReleasedCount, + type StuckSessionRecoveryOutcome, + type StuckSessionRecoveryRequest, +} from "./diagnostic-session-recovery.js"; +import { + getDiagnosticSessionState, + isDiagnosticSessionStateCurrent, +} from "./diagnostic-session-state.js"; + +export type RecoverStuckSession = ( + params: StuckSessionRecoveryRequest, +) => void | StuckSessionRecoveryOutcome | Promise; + +const recoveryRequestsInFlight = new Set(); + +function emitSessionRecoveryRequested(params: { + request: StuckSessionRecoveryRequest; + classification: SessionAttentionClassification; +}): void { + emitDiagnosticEvent({ + type: "session.recovery.requested", + sessionId: params.request.sessionId, + sessionKey: params.request.sessionKey, + state: "processing", + stateGeneration: params.request.stateGeneration, + ageMs: params.request.ageMs, + queueDepth: params.request.queueDepth, + reason: params.classification.reason, + activeWorkKind: params.classification.activeWorkKind, + allowActiveAbort: params.request.allowActiveAbort, + }); +} + +function emitSessionRecoveryCompleted(params: { + request: StuckSessionRecoveryRequest; + outcome: StuckSessionRecoveryOutcome; + stale?: boolean; +}): void { + emitDiagnosticEvent({ + type: "session.recovery.completed", + sessionId: params.request.sessionId, + sessionKey: params.request.sessionKey, + state: "processing", + stateGeneration: params.request.stateGeneration, + ageMs: params.request.ageMs, + queueDepth: params.request.queueDepth, + activeWorkKind: params.outcome.activeWorkKind, + status: params.outcome.status, + action: params.outcome.action, + outcomeReason: "reason" in params.outcome ? params.outcome.reason : undefined, + released: recoveryOutcomeReleasedCount(params.outcome) || undefined, + stale: params.stale, + }); +} + +function recoveryRequestKey(request: StuckSessionRecoveryRequest): string | undefined { + const ref = request.sessionKey?.trim() || request.sessionId?.trim(); + if (!ref) { + return undefined; + } + return `${ref}:${request.stateGeneration ?? "unknown"}`; +} + +function isRecoveryPromiseLike( + value: void | StuckSessionRecoveryOutcome | Promise, +): value is Promise { + return ( + typeof (value as Promise | undefined)?.then === "function" + ); +} + +function applyRecoveryOutcomeToDiagnosticState(params: { + request: StuckSessionRecoveryRequest; + outcome: StuckSessionRecoveryOutcome | undefined; +}): void { + if (!params.outcome) { + return; + } + if (!recoveryOutcomeMutatesSessionState(params.outcome)) { + emitSessionRecoveryCompleted({ request: params.request, outcome: params.outcome }); + return; + } + if ( + !isDiagnosticSessionStateCurrent({ + sessionId: params.request.sessionId, + sessionKey: params.request.sessionKey, + generation: params.request.stateGeneration, + state: "processing", + }) + ) { + emitSessionRecoveryCompleted({ + request: params.request, + outcome: params.outcome, + stale: true, + }); + return; + } + const state = getDiagnosticSessionState(params.request); + const prevState = state.state; + state.state = "idle"; + state.lastActivity = Date.now(); + state.generation = (state.generation ?? 0) + 1; + state.lastStuckWarnAgeMs = undefined; + state.lastLongRunningWarnAgeMs = undefined; + const released = recoveryOutcomeReleasedCount(params.outcome); + state.queueDepth = released > 0 ? 0 : Math.max(0, state.queueDepth - 1); + emitDiagnosticEvent({ + type: "session.state", + sessionId: state.sessionId, + sessionKey: state.sessionKey, + prevState, + state: "idle", + reason: `stuck_recovery:${params.outcome.status}`, + queueDepth: state.queueDepth, + }); + emitSessionRecoveryCompleted({ request: params.request, outcome: params.outcome }); + markActivity(); +} + +export function requestStuckSessionRecovery(params: { + recover: RecoverStuckSession; + request: StuckSessionRecoveryRequest; + classification: SessionAttentionClassification; +}): void { + const inFlightKey = recoveryRequestKey(params.request); + if (inFlightKey && recoveryRequestsInFlight.has(inFlightKey)) { + emitSessionRecoveryCompleted({ + request: params.request, + outcome: { + status: "skipped", + action: "observe_only", + reason: "already_in_flight", + sessionId: params.request.sessionId, + sessionKey: params.request.sessionKey, + activeWorkKind: params.classification.activeWorkKind, + }, + }); + return; + } + if (inFlightKey) { + recoveryRequestsInFlight.add(inFlightKey); + } + emitSessionRecoveryRequested({ + request: params.request, + classification: params.classification, + }); + const clearInFlight = () => { + if (inFlightKey) { + recoveryRequestsInFlight.delete(inFlightKey); + } + }; + const failRecovery = (err: unknown) => { + applyRecoveryOutcomeToDiagnosticState({ + request: params.request, + outcome: { + status: "failed", + action: "none", + reason: "exception", + sessionId: params.request.sessionId, + sessionKey: params.request.sessionKey, + error: String(err), + }, + }); + }; + try { + const result = params.recover(params.request); + if (isRecoveryPromiseLike(result)) { + void result + .then((outcome) => { + applyRecoveryOutcomeToDiagnosticState({ + request: params.request, + outcome: outcome ?? undefined, + }); + }) + .catch(failRecovery) + .finally(clearInFlight); + return; + } + applyRecoveryOutcomeToDiagnosticState({ + request: params.request, + outcome: result ?? undefined, + }); + clearInFlight(); + } catch (err) { + try { + failRecovery(err); + } finally { + clearInFlight(); + } + } +} + +export function resetDiagnosticSessionRecoveryCoordinatorForTest(): void { + recoveryRequestsInFlight.clear(); +} diff --git a/src/logging/diagnostic-session-recovery.ts b/src/logging/diagnostic-session-recovery.ts new file mode 100644 index 00000000000..5b5ff3e23e1 --- /dev/null +++ b/src/logging/diagnostic-session-recovery.ts @@ -0,0 +1,122 @@ +import type { DiagnosticSessionActiveWorkKind } from "../infra/diagnostic-events.js"; + +export type DiagnosticSessionRecoveryStatus = + | "aborted" + | "released" + | "skipped" + | "noop" + | "failed"; + +export type DiagnosticSessionRecoverySkipReason = + | "active_embedded_run" + | "active_reply_work" + | "active_lane_task" + | "already_in_flight" + | "missing_session_ref" + | "stale_session_state"; + +export type DiagnosticSessionRecoveryNoopReason = "no_active_work"; + +export type StuckSessionRecoveryRequest = { + sessionId?: string; + sessionKey?: string; + ageMs: number; + queueDepth?: number; + allowActiveAbort?: boolean; + stateGeneration?: number; +}; + +type DiagnosticSessionRecoveryBaseOutcome = { + sessionId?: string; + sessionKey?: string; + activeSessionId?: string; + lane?: string; + activeWorkKind?: DiagnosticSessionActiveWorkKind; +}; + +export type StuckSessionRecoveryOutcome = + | (DiagnosticSessionRecoveryBaseOutcome & { + status: "aborted"; + action: "abort_embedded_run"; + aborted: boolean; + drained: boolean; + forceCleared: boolean; + released: number; + }) + | (DiagnosticSessionRecoveryBaseOutcome & { + status: "released"; + action: "release_lane"; + released: number; + }) + | (DiagnosticSessionRecoveryBaseOutcome & { + status: "skipped"; + action: "observe_only" | "keep_lane"; + reason: DiagnosticSessionRecoverySkipReason; + activeCount?: number; + queuedCount?: number; + }) + | (DiagnosticSessionRecoveryBaseOutcome & { + status: "noop"; + action: "none"; + reason: DiagnosticSessionRecoveryNoopReason; + }) + | (DiagnosticSessionRecoveryBaseOutcome & { + status: "failed"; + action: "none"; + reason: "exception"; + error: string; + }); + +export function recoveryOutcomeMutatesSessionState( + outcome: StuckSessionRecoveryOutcome | undefined, +): boolean { + if (!outcome) { + return false; + } + return outcome.status === "aborted" || outcome.status === "released"; +} + +export function recoveryOutcomeReleasedCount(outcome: StuckSessionRecoveryOutcome): number { + return "released" in outcome ? outcome.released : 0; +} + +export function formatRecoveryOutcome(outcome: StuckSessionRecoveryOutcome): string { + const fields = [ + `status=${outcome.status}`, + `action=${outcome.action}`, + `sessionId=${outcome.sessionId ?? outcome.activeSessionId ?? "unknown"}`, + `sessionKey=${outcome.sessionKey ?? "unknown"}`, + ]; + if (outcome.activeSessionId) { + fields.push(`activeSessionId=${outcome.activeSessionId}`); + } + if (outcome.activeWorkKind) { + fields.push(`activeWorkKind=${outcome.activeWorkKind}`); + } + if (outcome.lane) { + fields.push(`lane=${outcome.lane}`); + } + if ("reason" in outcome) { + fields.push(`reason=${outcome.reason}`); + } + if ("aborted" in outcome) { + fields.push( + `aborted=${outcome.aborted}`, + `drained=${outcome.drained}`, + `forceCleared=${outcome.forceCleared}`, + ); + } + if ("released" in outcome) { + fields.push(`released=${outcome.released}`); + } + if ("activeCount" in outcome && outcome.activeCount !== undefined) { + fields.push(`laneActive=${outcome.activeCount}`); + } + if ("queuedCount" in outcome && outcome.queuedCount !== undefined) { + fields.push(`laneQueued=${outcome.queuedCount}`); + } + if ("error" in outcome) { + fields.push(`error=${outcome.error}`); + } + return fields.join(" "); +} diff --git a/src/logging/diagnostic-session-state.ts b/src/logging/diagnostic-session-state.ts index 964915bccaa..3679f4e7fd7 100644 --- a/src/logging/diagnostic-session-state.ts +++ b/src/logging/diagnostic-session-state.ts @@ -4,6 +4,7 @@ export type SessionState = { sessionId?: string; sessionKey?: string; lastActivity: number; + generation?: number; lastStuckWarnAgeMs?: number; lastLongRunningWarnAgeMs?: number; state: SessionStateValue; @@ -100,6 +101,7 @@ function mergeSessionState(target: SessionState, source: SessionState): void { if (sourceIsNewer || sourceIsSameAgeAndMoreActive) { target.state = source.state; } + target.generation = Math.max(target.generation ?? 0, source.generation ?? 0); target.lastActivity = Math.max(target.lastActivity, source.lastActivity); target.queueDepth += source.queueDepth; target.lastStuckWarnAgeMs = @@ -156,6 +158,7 @@ export function getDiagnosticSessionState(ref: SessionRef): SessionState { sessionId: ref.sessionId, sessionKey: ref.sessionKey, lastActivity: Date.now(), + generation: 0, state: "idle", queueDepth: 0, }; @@ -164,6 +167,14 @@ export function getDiagnosticSessionState(ref: SessionRef): SessionState { return created; } +export function peekDiagnosticSessionState(ref: SessionRef): SessionState | undefined { + const key = resolveSessionKey(ref); + return ( + diagnosticSessionStates.get(key) ?? + (ref.sessionId ? findStateEntryBySessionId(ref.sessionId)?.[1] : undefined) + ); +} + export function getDiagnosticSessionStateCountForTest(): number { return diagnosticSessionStates.size; } @@ -172,3 +183,22 @@ export function resetDiagnosticSessionStateForTest(): void { diagnosticSessionStates.clear(); lastSessionPruneAt = 0; } + +export function isDiagnosticSessionStateCurrent(params: { + sessionId?: string; + sessionKey?: string; + generation?: number; + state?: SessionStateValue; +}): boolean { + if (params.generation === undefined) { + return true; + } + const state = peekDiagnosticSessionState(params); + if (!state) { + return false; + } + return ( + (state.generation ?? 0) === params.generation && + (params.state === undefined || state.state === params.state) + ); +} diff --git a/src/logging/diagnostic-stability.ts b/src/logging/diagnostic-stability.ts index 332b6806ee7..1266c113a63 100644 --- a/src/logging/diagnostic-stability.ts +++ b/src/logging/diagnostic-stability.ts @@ -250,6 +250,27 @@ function sanitizeDiagnosticEvent(event: DiagnosticEventPayload): DiagnosticStabi record.toolName = event.activeToolName; } break; + case "session.recovery.requested": + record.outcome = event.state; + record.action = event.allowActiveAbort ? "abort" : "recover"; + record.ageMs = event.ageMs; + record.queueDepth = event.queueDepth; + if (event.activeWorkKind) { + record.activeWorkKind = event.activeWorkKind; + } + assignReasonCode(record, event.reason); + break; + case "session.recovery.completed": + record.outcome = event.status; + record.action = event.action; + record.ageMs = event.ageMs; + record.queueDepth = event.queueDepth; + record.count = event.released; + if (event.activeWorkKind) { + record.activeWorkKind = event.activeWorkKind; + } + assignReasonCode(record, event.outcomeReason ?? event.reason); + break; case "queue.lane.enqueue": record.source = event.lane; record.queueSize = event.queueSize; diff --git a/src/logging/diagnostic-stuck-session-recovery.runtime.ts b/src/logging/diagnostic-stuck-session-recovery.runtime.ts index b75988260f2..df554859768 100644 --- a/src/logging/diagnostic-stuck-session-recovery.runtime.ts +++ b/src/logging/diagnostic-stuck-session-recovery.runtime.ts @@ -12,17 +12,17 @@ import { formatStoppedCronSessionDiagnosticFields, resolveCronSessionDiagnosticContext, } from "./diagnostic-session-context.js"; +import { + formatRecoveryOutcome, + type StuckSessionRecoveryOutcome, + type StuckSessionRecoveryRequest, +} from "./diagnostic-session-recovery.js"; +import { isDiagnosticSessionStateCurrent } from "./diagnostic-session-state.js"; const STUCK_SESSION_ABORT_SETTLE_MS = 15_000; const recoveriesInFlight = new Set(); -export type StuckSessionRecoveryParams = { - sessionId?: string; - sessionKey?: string; - ageMs: number; - queueDepth?: number; - allowActiveAbort?: boolean; -}; +export type StuckSessionRecoveryParams = StuckSessionRecoveryRequest; function recoveryKey(params: StuckSessionRecoveryParams): string | undefined { return params.sessionKey?.trim() || params.sessionId?.trim() || undefined; @@ -55,14 +55,36 @@ function formatRecoveryContext( export async function recoverStuckDiagnosticSession( params: StuckSessionRecoveryParams, -): Promise { +): Promise { const key = recoveryKey(params); if (!key || recoveriesInFlight.has(key)) { - return; + return { + status: "skipped", + action: "observe_only", + reason: key ? "already_in_flight" : "missing_session_ref", + sessionId: params.sessionId, + sessionKey: params.sessionKey, + }; } recoveriesInFlight.add(key); try { + if ( + !isDiagnosticSessionStateCurrent({ + sessionId: params.sessionId, + sessionKey: params.sessionKey, + generation: params.stateGeneration, + state: "processing", + }) + ) { + return { + status: "skipped", + action: "observe_only", + reason: "stale_session_state", + sessionId: params.sessionId, + sessionKey: params.sessionKey, + }; + } const fallbackActiveSessionId = params.sessionId && isEmbeddedPiRunHandleActive(params.sessionId) ? params.sessionId @@ -77,16 +99,24 @@ export async function recoverStuckDiagnosticSession( const sessionLane = laneKey ? resolveEmbeddedSessionLane(laneKey) : null; let aborted = false; let drained = true; + let forceCleared = false; if (activeSessionId) { if (params.allowActiveAbort !== true) { + const outcome: StuckSessionRecoveryOutcome = { + status: "skipped", + action: "observe_only", + reason: "active_embedded_run", + sessionId: params.sessionId, + sessionKey: params.sessionKey, + activeSessionId, + activeWorkKind: "embedded_run", + }; diag.warn( - `stuck session recovery skipped: reason=active_embedded_run action=observe_only ${formatRecoveryContext( - params, - { activeSessionId }, - )}`, + `stuck session recovery skipped: ${formatRecoveryContext(params, { activeSessionId })}`, ); - return; + diag.warn(`stuck session recovery outcome: ${formatRecoveryOutcome(outcome)}`); + return outcome; } const result = await abortAndDrainEmbeddedPiRun({ sessionId: activeSessionId, @@ -97,32 +127,38 @@ export async function recoverStuckDiagnosticSession( }); aborted = result.aborted; drained = result.drained; + forceCleared = result.forceCleared; } if (!activeSessionId && activeWorkSessionId && isEmbeddedPiRunActive(activeWorkSessionId)) { - diag.warn( - `stuck session recovery skipped: reason=active_reply_work action=keep_lane ${formatRecoveryContext( - params, - { activeSessionId: activeWorkSessionId }, - )}`, - ); - return; + const outcome: StuckSessionRecoveryOutcome = { + status: "skipped", + action: "keep_lane", + reason: "active_reply_work", + sessionId: params.sessionId, + sessionKey: params.sessionKey, + activeSessionId: activeWorkSessionId, + activeWorkKind: "embedded_run", + }; + diag.warn(`stuck session recovery outcome: ${formatRecoveryOutcome(outcome)}`); + return outcome; } if (!activeSessionId && sessionLane) { const laneSnapshot = getCommandLaneSnapshot(sessionLane); if (laneSnapshot.activeCount > 0) { - diag.warn( - `stuck session recovery skipped: reason=active_lane_task action=keep_lane ${formatRecoveryContext( - params, - { - lane: sessionLane, - activeCount: laneSnapshot.activeCount, - queuedCount: laneSnapshot.queuedCount, - }, - )}`, - ); - return; + const outcome: StuckSessionRecoveryOutcome = { + status: "skipped", + action: "keep_lane", + reason: "active_lane_task", + sessionId: params.sessionId, + sessionKey: params.sessionKey, + lane: sessionLane, + activeCount: laneSnapshot.activeCount, + queuedCount: laneSnapshot.queuedCount, + }; + diag.warn(`stuck session recovery outcome: ${formatRecoveryOutcome(outcome)}`); + return outcome; } } @@ -141,22 +177,56 @@ export async function recoverStuckDiagnosticSession( stoppedFields ? ` ${stoppedFields}` : "" }`, ); - } else { - diag.warn( - `stuck session recovery no-op: reason=no_active_work action=none ${formatRecoveryContext( - params, - { + const outcome: StuckSessionRecoveryOutcome = aborted + ? { + status: "aborted", + action: "abort_embedded_run", + sessionId: params.sessionId, + sessionKey: params.sessionKey, + activeSessionId, + activeWorkKind: "embedded_run", + aborted, + drained, + forceCleared, + released, lane: sessionLane ?? undefined, - }, - )}`, - ); + } + : { + status: "released", + action: "release_lane", + sessionId: params.sessionId, + sessionKey: params.sessionKey, + released, + lane: sessionLane ?? undefined, + }; + diag.warn(`stuck session recovery outcome: ${formatRecoveryOutcome(outcome)}`); + return outcome; } + const outcome: StuckSessionRecoveryOutcome = { + status: "noop", + action: "none", + reason: "no_active_work", + sessionId: params.sessionId, + sessionKey: params.sessionKey, + lane: sessionLane ?? undefined, + }; + diag.warn(`stuck session recovery outcome: ${formatRecoveryOutcome(outcome)}`); + return outcome; } catch (err) { + const outcome: StuckSessionRecoveryOutcome = { + status: "failed", + action: "none", + reason: "exception", + sessionId: params.sessionId, + sessionKey: params.sessionKey, + error: String(err), + }; diag.warn( `stuck session recovery failed: sessionId=${params.sessionId ?? "unknown"} sessionKey=${ params.sessionKey ?? "unknown" } err=${String(err)}`, ); + return outcome; } finally { recoveriesInFlight.delete(key); } diff --git a/src/logging/diagnostic.test.ts b/src/logging/diagnostic.test.ts index 26eae67d146..2b95aa79aea 100644 --- a/src/logging/diagnostic.test.ts +++ b/src/logging/diagnostic.test.ts @@ -33,6 +33,7 @@ import { diagnosticLogger, markDiagnosticSessionProgress, resetDiagnosticStateForTest, + resolveStuckSessionAbortMs, resolveStuckSessionWarnMs, startDiagnosticHeartbeat, } from "./diagnostic.js"; @@ -78,6 +79,7 @@ describe("diagnostic session state pruning", () => { diagnosticSessionStates.set(`session-${i}`, { sessionId: `session-${i}`, lastActivity: now + i, + generation: 0, state: "idle", queueDepth: 1, }); @@ -232,6 +234,7 @@ describe("stuck session diagnostics threshold", () => { sessionKey: "main", ageMs: expect.any(Number), queueDepth: 0, + stateGeneration: expect.any(Number), }); }); @@ -271,6 +274,7 @@ describe("stuck session diagnostics threshold", () => { sessionKey: "main", ageMs: expect.any(Number), queueDepth: 1, + stateGeneration: expect.any(Number), }); }); @@ -443,9 +447,191 @@ describe("stuck session diagnostics threshold", () => { ageMs: expect.any(Number), queueDepth: 0, allowActiveAbort: true, + stateGeneration: expect.any(Number), }); }); + it("uses diagnostics.stuckSessionAbortMs for stalled active-work recovery", () => { + const recoverStuckSession = vi.fn(); + + startDiagnosticHeartbeat( + { + diagnostics: { + enabled: true, + stuckSessionWarnMs: 30_000, + stuckSessionAbortMs: 60_000, + }, + }, + { recoverStuckSession }, + ); + logSessionStateChange({ sessionId: "s1", sessionKey: "main", state: "processing" }); + markDiagnosticEmbeddedRunStarted({ sessionId: "s1", sessionKey: "main" }); + + vi.advanceTimersByTime(61_000); + + expect(recoverStuckSession).toHaveBeenCalledWith({ + sessionId: "s1", + sessionKey: "main", + ageMs: expect.any(Number), + queueDepth: 0, + allowActiveAbort: true, + stateGeneration: expect.any(Number), + }); + }); + + it("marks diagnostic session state idle only after a mutating recovery outcome", async () => { + const events: DiagnosticEventPayload[] = []; + const recoverStuckSession = vi.fn().mockResolvedValue({ + status: "released", + action: "release_lane", + released: 1, + sessionId: "s1", + sessionKey: "main", + }); + const unsubscribe = onDiagnosticEvent((event) => { + events.push(event); + }); + try { + startDiagnosticHeartbeat( + { + diagnostics: { + enabled: true, + stuckSessionWarnMs: 30_000, + }, + }, + { recoverStuckSession }, + ); + logMessageQueued({ sessionId: "s1", sessionKey: "main", source: "test" }); + logSessionStateChange({ sessionId: "s1", sessionKey: "main", state: "processing" }); + + vi.advanceTimersByTime(61_000); + await Promise.resolve(); + } finally { + unsubscribe(); + } + + const state = getDiagnosticSessionState({ sessionId: "s1", sessionKey: "main" }); + expect(state.state).toBe("idle"); + expect(state.queueDepth).toBe(0); + expect(events).toContainEqual( + expect.objectContaining({ + type: "session.recovery.completed", + status: "released", + action: "release_lane", + }), + ); + }); + + it("does not mark a newer processing generation idle after a late recovery outcome", async () => { + const events: DiagnosticEventPayload[] = []; + const recoverStuckSession = vi.fn().mockImplementation(async () => { + markDiagnosticSessionProgress({ sessionId: "s1", sessionKey: "main" }); + return { + status: "released", + action: "release_lane", + released: 1, + sessionId: "s1", + sessionKey: "main", + }; + }); + const unsubscribe = onDiagnosticEvent((event) => { + events.push(event); + }); + try { + startDiagnosticHeartbeat( + { + diagnostics: { + enabled: true, + stuckSessionWarnMs: 30_000, + }, + }, + { recoverStuckSession }, + ); + logSessionStateChange({ sessionId: "s1", sessionKey: "main", state: "processing" }); + + vi.advanceTimersByTime(61_000); + await Promise.resolve(); + await Promise.resolve(); + } finally { + unsubscribe(); + } + + expect(getDiagnosticSessionState({ sessionId: "s1", sessionKey: "main" }).state).toBe( + "processing", + ); + expect(events).toContainEqual( + expect.objectContaining({ + type: "session.recovery.completed", + status: "released", + stale: true, + }), + ); + }); + + it("does not start duplicate recovery for the same processing generation", async () => { + const events: DiagnosticEventPayload[] = []; + let resolveRecovery: + | ((outcome: { + status: "noop"; + action: "none"; + reason: "no_active_work"; + sessionId: string; + sessionKey: string; + }) => void) + | undefined; + const recoverStuckSession = vi.fn( + () => + new Promise<{ + status: "noop"; + action: "none"; + reason: "no_active_work"; + sessionId: string; + sessionKey: string; + }>((resolve) => { + resolveRecovery = resolve; + }), + ); + const unsubscribe = onDiagnosticEvent((event) => { + events.push(event); + }); + try { + startDiagnosticHeartbeat( + { + diagnostics: { + enabled: true, + stuckSessionWarnMs: 30_000, + }, + }, + { recoverStuckSession }, + ); + logSessionStateChange({ sessionId: "s1", sessionKey: "main", state: "processing" }); + + vi.advanceTimersByTime(61_000); + expect(recoverStuckSession).toHaveBeenCalledTimes(1); + + vi.advanceTimersByTime(60_000); + expect(recoverStuckSession).toHaveBeenCalledTimes(1); + expect(events).toContainEqual( + expect.objectContaining({ + type: "session.recovery.completed", + status: "skipped", + outcomeReason: "already_in_flight", + }), + ); + + resolveRecovery?.({ + status: "noop", + action: "none", + reason: "no_active_work", + sessionId: "s1", + sessionKey: "main", + }); + await Promise.resolve(); + } finally { + unsubscribe(); + } + }); + it("reports long-running sessions separately when active work is making progress", () => { const events: DiagnosticEventPayload[] = []; const recoverStuckSession = vi.fn(); @@ -894,6 +1080,16 @@ describe("stuck session diagnostics threshold", () => { expect(resolveStuckSessionWarnMs({ diagnostics: { stuckSessionWarnMs: -1 } })).toBe(120_000); expect(resolveStuckSessionWarnMs({ diagnostics: { stuckSessionWarnMs: 0 } })).toBe(120_000); expect(resolveStuckSessionWarnMs()).toBe(120_000); + expect( + resolveStuckSessionAbortMs({ diagnostics: { stuckSessionAbortMs: 5_000 } }, 30_000), + ).toBe(30_000); + expect( + resolveStuckSessionAbortMs( + { diagnostics: { stuckSessionAbortMs: 48 * 60 * 60_000 } }, + 30_000, + ), + ).toBe(48 * 60 * 60_000); + expect(resolveStuckSessionAbortMs(undefined, 30_000)).toBe(10 * 60_000); }); }); diff --git a/src/logging/diagnostic.ts b/src/logging/diagnostic.ts index 65db6574ab4..9f29fc7d6a5 100644 --- a/src/logging/diagnostic.ts +++ b/src/logging/diagnostic.ts @@ -33,6 +33,15 @@ import { formatCronSessionDiagnosticFields, resolveCronSessionDiagnosticContext, } from "./diagnostic-session-context.js"; +import { + requestStuckSessionRecovery, + resetDiagnosticSessionRecoveryCoordinatorForTest, + type RecoverStuckSession, +} from "./diagnostic-session-recovery-coordinator.js"; +import { + type StuckSessionRecoveryOutcome, + type StuckSessionRecoveryRequest, +} from "./diagnostic-session-recovery.js"; import { diagnosticSessionStates, getDiagnosticSessionState, @@ -92,14 +101,6 @@ type DiagnosticWorkSnapshot = { queuedLabels: string[]; }; -type RecoverStuckSession = (params: { - sessionId?: string; - sessionKey?: string; - ageMs: number; - queueDepth?: number; - allowActiveAbort?: boolean; -}) => void | Promise; - type DiagnosticLivenessSample = { reasons: DiagnosticLivenessWarningReason[]; intervalMs: number; @@ -136,18 +137,22 @@ function loadCommandPollBackoffRuntime() { return commandPollBackoffRuntimePromise; } -function recoverStuckSession(params: { - sessionId?: string; - sessionKey?: string; - ageMs: number; - queueDepth?: number; - allowActiveAbort?: boolean; -}) { +async function recoverStuckSession( + params: StuckSessionRecoveryRequest, +): Promise { stuckSessionRecoveryRuntimePromise ??= import("./diagnostic-stuck-session-recovery.runtime.js"); - void stuckSessionRecoveryRuntimePromise + return stuckSessionRecoveryRuntimePromise .then(({ recoverStuckDiagnosticSession }) => recoverStuckDiagnosticSession(params)) .catch((err) => { diag.warn(`stuck session recovery unavailable: ${String(err)}`); + return { + status: "failed", + action: "none", + reason: "exception", + sessionId: params.sessionId, + sessionKey: params.sessionKey, + error: String(err), + }; }); } @@ -425,6 +430,21 @@ export function resolveStuckSessionWarnMs(config?: OpenClawConfig): number { return rounded; } +export function resolveStuckSessionAbortMs( + config: OpenClawConfig | undefined, + stuckSessionWarnMs: number, +): number { + const raw = config?.diagnostics?.stuckSessionAbortMs; + if (typeof raw !== "number" || !Number.isFinite(raw)) { + return resolveStalledEmbeddedRunAbortMs(stuckSessionWarnMs); + } + const rounded = Math.floor(raw); + if (rounded <= 0) { + return resolveStalledEmbeddedRunAbortMs(stuckSessionWarnMs); + } + return Math.max(stuckSessionWarnMs, rounded); +} + function resolveStalledEmbeddedRunAbortMs(stuckSessionWarnMs: number): number { return Math.max( MIN_STALLED_EMBEDDED_RUN_ABORT_MS, @@ -435,13 +455,13 @@ function resolveStalledEmbeddedRunAbortMs(stuckSessionWarnMs: number): number { function isStalledEmbeddedRunRecoveryEligible(params: { classification: SessionAttentionClassification | undefined; ageMs: number; - stuckSessionWarnMs: number; + stuckSessionAbortMs: number; }): boolean { return ( params.classification?.eventType === "session.stalled" && params.classification.classification === "stalled_agent_run" && params.classification.activeWorkKind === "embedded_run" && - params.ageMs >= resolveStalledEmbeddedRunAbortMs(params.stuckSessionWarnMs) + params.ageMs >= params.stuckSessionAbortMs ); } @@ -537,6 +557,7 @@ export function logMessageQueued(params: { const state = getDiagnosticSessionState(params); state.queueDepth += 1; state.lastActivity = Date.now(); + state.generation = (state.generation ?? 0) + 1; state.lastStuckWarnAgeMs = undefined; state.lastLongRunningWarnAgeMs = undefined; if (diag.isEnabled("debug")) { @@ -617,6 +638,7 @@ export function logSessionStateChange( const prevState = state.state; state.state = params.state; state.lastActivity = Date.now(); + state.generation = (state.generation ?? 0) + 1; state.lastStuckWarnAgeMs = undefined; state.lastLongRunningWarnAgeMs = undefined; if (params.state === "idle") { @@ -649,6 +671,7 @@ export function markDiagnosticSessionProgress(params: SessionRef) { } const state = getDiagnosticSessionState(params); state.lastActivity = Date.now(); + state.generation = (state.generation ?? 0) + 1; state.lastStuckWarnAgeMs = undefined; state.lastLongRunningWarnAgeMs = undefined; markActivity(); @@ -724,6 +747,7 @@ export function logSessionAttention( state: SessionStateValue; ageMs: number; thresholdMs: number; + abortThresholdMs?: number; }, ): SessionAttentionClassification | undefined { if (!areDiagnosticsEnabledForProcess()) { @@ -744,7 +768,8 @@ export function logSessionAttention( isStalledEmbeddedRunRecoveryEligible({ classification, ageMs: params.ageMs, - stuckSessionWarnMs: params.thresholdMs, + stuckSessionAbortMs: + params.abortThresholdMs ?? resolveStalledEmbeddedRunAbortMs(params.thresholdMs), }); if (classification.eventType === "session.stuck") { const nextWarnAgeMs = @@ -924,6 +949,7 @@ export function startDiagnosticHeartbeat( } } const stuckSessionWarnMs = resolveStuckSessionWarnMs(heartbeatConfig); + const stuckSessionAbortMs = resolveStuckSessionAbortMs(heartbeatConfig, stuckSessionWarnMs); const now = Date.now(); pruneDiagnosticSessionStates(now, true); const work = getDiagnosticWorkSnapshot(now); @@ -981,27 +1007,39 @@ export function startDiagnosticHeartbeat( state: state.state, ageMs, thresholdMs: stuckSessionWarnMs, + abortThresholdMs: stuckSessionAbortMs, }); if (classification?.recoveryEligible) { - void (opts?.recoverStuckSession ?? recoverStuckSession)({ - sessionId: state.sessionId, - sessionKey: state.sessionKey, - ageMs, - queueDepth: state.queueDepth, + requestStuckSessionRecovery({ + recover: opts?.recoverStuckSession ?? recoverStuckSession, + classification, + request: { + sessionId: state.sessionId, + sessionKey: state.sessionKey, + ageMs, + queueDepth: state.queueDepth, + stateGeneration: state.generation, + }, }); } else if ( + classification && isStalledEmbeddedRunRecoveryEligible({ classification, ageMs, - stuckSessionWarnMs, + stuckSessionAbortMs, }) ) { - void (opts?.recoverStuckSession ?? recoverStuckSession)({ - sessionId: state.sessionId, - sessionKey: state.sessionKey, - ageMs, - queueDepth: state.queueDepth, - allowActiveAbort: true, + requestStuckSessionRecovery({ + recover: opts?.recoverStuckSession ?? recoverStuckSession, + classification, + request: { + sessionId: state.sessionId, + sessionKey: state.sessionKey, + ageMs, + queueDepth: state.queueDepth, + allowActiveAbort: true, + stateGeneration: state.generation, + }, }); } } @@ -1025,6 +1063,7 @@ export function getDiagnosticSessionStateCountForTest(): number { } export function resetDiagnosticStateForTest(): void { + resetDiagnosticSessionRecoveryCoordinatorForTest(); resetDiagnosticSessionStateForTest(); resetDiagnosticActivityForTest(); resetDiagnosticRunActivityForTest();