From 6f76d9f246a3ca4bbb868c072ca57a1f76bfda30 Mon Sep 17 00:00:00 2001 From: Chunyue Wang <80630709+openperf@users.noreply.github.com> Date: Mon, 25 May 2026 21:20:59 +0800 Subject: [PATCH] fix(diagnostics): reclaim wedged session lanes with a stale leaked active run (#86056) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(diagnostics): reclaim wedged session lanes with a stale leaked active run A group session lane could wedge permanently (#85639): an embedded run that dies abnormally leaves a stale ACTIVE_EMBEDDED_RUNS handle, so the diagnostic heartbeat classifies the lane stale_session_state (recoveryEligible without allowActiveAbort) while stuck-session recovery reads the leaked isEmbeddedPiRunActive flag and skips with active_reply_work — a tautology that keeps the lane forever. The age-based escape never fires because ageMs (last-activity) resets on every incoming queued message. Make the active-run skip a liveness check: before keeping the lane, consult the run's real forward-progress age (lastProgressAgeMs, not refreshed by incoming messages). If a run flagged active has made no forward progress past the resolved diagnostics.stuckSessionAbortMs threshold (threaded through the recovery request; falls back to a 5-minute floor) with queued work waiting, treat it as a leaked/dead handle and reclaim it (abort + drain + force-clear) instead of skipping. A genuinely progressing run, or one within an operator-raised threshold, is kept. Fixes #85639 * test(diagnostics): cover stale active run recovery --------- Co-authored-by: Peter Steinberger --- CHANGELOG.md | 1 + src/logging/diagnostic-session-recovery.ts | 6 + ...tic-stuck-session-recovery.runtime.test.ts | 130 ++++++++++++++++++ ...agnostic-stuck-session-recovery.runtime.ts | 71 +++++++++- src/logging/diagnostic.ts | 1 + 5 files changed, 207 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8fc2667dd01..b1b4d8d9e72 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ Docs: https://docs.openclaw.ai - Tests: give the memory fallback QA scenario enough turn budget to exercise native Windows gateway runs instead of failing on the client timeout while the mock agent is still dispatching. - Tests: collect QA gateway CPU/RSS metrics on native Windows and give the channel baseline enough turn budget to report slow gateway runs instead of timing out before proof. - Install/update: bypass npm `min-release-age` policies with `--min-release-age=0` instead of `--before` so hosted installers keep working on npm versions that reject the combined config. (#84749) Thanks @TeodoroRodrigo. +- Diagnostics: reclaim wedged session lanes when stale active-run bookkeeping blocks queued work despite no forward progress. Fixes #85639. Thanks @openperf. - WebChat: keep message-tool replies visible in the chat while still summarizing internal tool results for the model. Fixes #86347. Thanks @shakkernerd. - Gateway/perf: fail startup benchmark samples when the Gateway process exits before benchmark teardown, including signal deaths after readiness probes. - Gateway/perf: fail restart benchmark samples when the Gateway exits before benchmark teardown, including clean exits and signal deaths after successful restart probes. diff --git a/src/logging/diagnostic-session-recovery.ts b/src/logging/diagnostic-session-recovery.ts index 0f1cdf9f550..1dea8c345c8 100644 --- a/src/logging/diagnostic-session-recovery.ts +++ b/src/logging/diagnostic-session-recovery.ts @@ -28,6 +28,12 @@ export type StuckSessionRecoveryRequest = { allowActiveAbort?: boolean; expectedState?: DiagnosticSessionState; stateGeneration?: number; + /** + * Resolved no-forward-progress age (from `diagnostics.stuckSessionAbortMs`) after + * which an "active" run with queued work is treated as a leaked/dead handle and + * reclaimed. Honors an operator-raised threshold; falls back to a safe floor. + */ + staleActiveProgressAbortMs?: number; }; type DiagnosticSessionRecoveryBaseOutcome = { diff --git a/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts b/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts index bd44a4b5c9b..5ce3adc57f9 100644 --- a/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts +++ b/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts @@ -14,6 +14,7 @@ const mocks = vi.hoisted(() => ({ resolveActiveEmbeddedRunHandleSessionId: vi.fn(), resolveEmbeddedSessionLane: vi.fn((key: string) => `session:${key}`), waitForEmbeddedPiRunEnd: vi.fn(), + getDiagnosticSessionActivitySnapshot: vi.fn(), diag: { debug: vi.fn(), warn: vi.fn(), @@ -60,6 +61,10 @@ vi.mock("./diagnostic-runtime.js", () => ({ diagnosticLogger: mocks.diag, })); +vi.mock("./diagnostic-run-activity.js", () => ({ + getDiagnosticSessionActivitySnapshot: mocks.getDiagnosticSessionActivitySnapshot, +})); + import { testing, recoverStuckDiagnosticSession, @@ -85,6 +90,10 @@ function resetMocks() { mocks.resolveActiveEmbeddedRunHandleSessionId.mockReset(); mocks.resolveEmbeddedSessionLane.mockClear(); mocks.waitForEmbeddedPiRunEnd.mockReset(); + mocks.getDiagnosticSessionActivitySnapshot.mockReset(); + // Default: no progress signal, so the staleness gate stays off unless a test + // opts in by returning a stale lastProgressAgeMs. + mocks.getDiagnosticSessionActivitySnapshot.mockReturnValue({}); mocks.diag.debug.mockReset(); mocks.diag.warn.mockReset(); } @@ -121,6 +130,26 @@ describe("stuck session recovery", () => { ]); }); + it("reclaims a stale active embedded run with queued work and no forward progress (#85639)", async () => { + mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue("session-1"); + mocks.getDiagnosticSessionActivitySnapshot.mockReturnValue({ + lastProgressAgeMs: 10 * 60_000, + }); + mocks.abortEmbeddedPiRun.mockReturnValue(true); + mocks.waitForEmbeddedPiRunEnd.mockResolvedValue(true); + + const outcome = await recoverStuckDiagnosticSession({ + sessionId: "session-1", + sessionKey: "agent:main:main", + ageMs: 180_000, + queueDepth: 1, + }); + + expect(mocks.abortEmbeddedPiRun).toHaveBeenCalledWith("session-1"); + expect(outcome.status).toBe("aborted"); + expect(warnLogMessages().some((m) => m.includes("reclaiming stale active run"))).toBe(true); + }); + it("aborts an active embedded run when active abort recovery is enabled", async () => { mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue("session-1"); mocks.abortEmbeddedPiRun.mockReturnValue(true); @@ -292,6 +321,107 @@ describe("stuck session recovery", () => { ]); }); + it("reclaims stale leaked reply work with queued work and no forward progress (#85639)", async () => { + mocks.resolveActiveEmbeddedRunSessionId.mockReturnValue("queued-reply-session"); + mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue(undefined); + mocks.isEmbeddedPiRunActive.mockReturnValue(true); + mocks.isEmbeddedPiRunHandleActive.mockReturnValue(false); + // The "active" run has made no forward progress for well past the staleness + // window — a leaked/dead handle, not genuine work. + mocks.getDiagnosticSessionActivitySnapshot.mockReturnValue({ lastProgressAgeMs: 10 * 60_000 }); + mocks.abortEmbeddedPiRun.mockReturnValue(true); + mocks.waitForEmbeddedPiRunEnd.mockResolvedValue(true); + + const outcome = await recoverStuckDiagnosticSession({ + sessionId: "queued-reply-session", + sessionKey: "agent:main:main", + ageMs: 180_000, + queueDepth: 1, + }); + + // Reclaimed (aborted) instead of skipping with active_reply_work. + expect(mocks.abortEmbeddedPiRun).toHaveBeenCalledWith("queued-reply-session"); + expect(outcome.status).not.toBe("skipped"); + expect(warnLogMessages().some((m) => m.includes("reclaiming stale active reply work"))).toBe( + true, + ); + }); + + it("honors an operator-raised stuck-session abort threshold for stale reclaim (#85639)", async () => { + mocks.resolveActiveEmbeddedRunSessionId.mockReturnValue("queued-reply-session"); + mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue(undefined); + mocks.isEmbeddedPiRunActive.mockReturnValue(true); + mocks.isEmbeddedPiRunHandleActive.mockReturnValue(false); + mocks.abortEmbeddedPiRun.mockReturnValue(true); + mocks.waitForEmbeddedPiRunEnd.mockResolvedValue(true); + + // Operator raised the abort threshold to 20 min to protect slow active work. + const raisedAbortMs = 20 * 60_000; + + // Below the raised threshold (10 min): keep the lane, do not reclaim. + mocks.getDiagnosticSessionActivitySnapshot.mockReturnValue({ lastProgressAgeMs: 10 * 60_000 }); + const kept = await recoverStuckDiagnosticSession({ + sessionId: "queued-reply-session", + sessionKey: "agent:main:main", + ageMs: 180_000, + queueDepth: 1, + staleActiveProgressAbortMs: raisedAbortMs, + }); + expect(mocks.abortEmbeddedPiRun).not.toHaveBeenCalled(); + expect(kept.status).toBe("skipped"); + + // Past the raised threshold (25 min): reclaim. + mocks.getDiagnosticSessionActivitySnapshot.mockReturnValue({ lastProgressAgeMs: 25 * 60_000 }); + const reclaimed = await recoverStuckDiagnosticSession({ + sessionId: "queued-reply-session", + sessionKey: "agent:main:main", + ageMs: 180_000, + queueDepth: 1, + staleActiveProgressAbortMs: raisedAbortMs, + }); + expect(mocks.abortEmbeddedPiRun).toHaveBeenCalledWith("queued-reply-session"); + expect(reclaimed.status).not.toBe("skipped"); + }); + + it("keeps the lane when active reply work is still progressing", async () => { + mocks.resolveActiveEmbeddedRunSessionId.mockReturnValue("queued-reply-session"); + mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue(undefined); + mocks.isEmbeddedPiRunActive.mockReturnValue(true); + mocks.isEmbeddedPiRunHandleActive.mockReturnValue(false); + // Recent forward progress: a genuinely active run must not be reclaimed. + mocks.getDiagnosticSessionActivitySnapshot.mockReturnValue({ lastProgressAgeMs: 5_000 }); + + const outcome = await recoverStuckDiagnosticSession({ + sessionId: "queued-reply-session", + sessionKey: "agent:main:main", + ageMs: 180_000, + queueDepth: 1, + }); + + expect(mocks.abortEmbeddedPiRun).not.toHaveBeenCalled(); + expect(outcome.status).toBe("skipped"); + expect(warnLogMessages().some((m) => m.includes("reason=active_reply_work"))).toBe(true); + }); + + it("does not reclaim stale reply work when no work is queued", async () => { + mocks.resolveActiveEmbeddedRunSessionId.mockReturnValue("queued-reply-session"); + mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue(undefined); + mocks.isEmbeddedPiRunActive.mockReturnValue(true); + mocks.isEmbeddedPiRunHandleActive.mockReturnValue(false); + mocks.getDiagnosticSessionActivitySnapshot.mockReturnValue({ lastProgressAgeMs: 10 * 60_000 }); + + const outcome = await recoverStuckDiagnosticSession({ + sessionId: "queued-reply-session", + sessionKey: "agent:main:main", + ageMs: 180_000, + queueDepth: 0, + }); + + expect(mocks.abortEmbeddedPiRun).not.toHaveBeenCalled(); + expect(outcome.status).toBe("skipped"); + expect(warnLogMessages().some((m) => m.includes("reason=active_reply_work"))).toBe(true); + }); + it("aborts stale reply work without an embedded handle when active abort recovery is enabled", async () => { mocks.resolveActiveEmbeddedRunSessionId.mockReturnValue("queued-reply-session"); mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue(undefined); diff --git a/src/logging/diagnostic-stuck-session-recovery.runtime.ts b/src/logging/diagnostic-stuck-session-recovery.runtime.ts index ff0d3523409..ee382ebde93 100644 --- a/src/logging/diagnostic-stuck-session-recovery.runtime.ts +++ b/src/logging/diagnostic-stuck-session-recovery.runtime.ts @@ -7,6 +7,7 @@ import { resolveActiveEmbeddedRunHandleSessionId, } from "../agents/pi-embedded-runner/runs.js"; import { getCommandLaneSnapshot, resetCommandLane } from "../process/command-queue.js"; +import { getDiagnosticSessionActivitySnapshot } from "./diagnostic-run-activity.js"; import { diagnosticLogger as diag } from "./diagnostic-runtime.js"; import { formatStoppedCronSessionDiagnosticFields, @@ -20,6 +21,42 @@ import { import { isDiagnosticSessionStateCurrent } from "./diagnostic-session-state.js"; const STUCK_SESSION_ABORT_SETTLE_MS = 15_000; +// Default no-forward-progress age used only when the caller does not carry a +// resolved `diagnostics.stuckSessionAbortMs`. A run flagged "active" that has made +// no forward progress (tool/model/chunk events) for at least the resolved window, +// while queued work waits, is treated as a leaked/dead handle and reclaimed even +// without an explicit active-abort grant. `lastProgressAgeMs` tracks real progress +// (not incoming queued messages), so it keeps growing while a lane is wedged. +const STUCK_SESSION_PROGRESS_STALE_MS = 5 * 60_000; + +function resolveStaleActiveProgressAbortMs(params: StuckSessionRecoveryParams): number { + const configured = params.staleActiveProgressAbortMs; + // Honor the resolved `diagnostics.stuckSessionAbortMs` as-is — an operator can + // raise it to protect slow active work (it is the same threshold the existing + // `session.stalled` abort uses). It is floored at the warn threshold upstream, + // not necessarily 5 min, so we only apply the 5-min default when no value is + // carried (e.g. direct callers). + return typeof configured === "number" && configured > 0 + ? configured + : STUCK_SESSION_PROGRESS_STALE_MS; +} + +function isActiveRunProgressStale(params: { + sessionId?: string; + sessionKey?: string; + queueDepth?: number; + staleAbortMs: number; +}): boolean { + if ((params.queueDepth ?? 0) <= 0) { + return false; + } + const activity = getDiagnosticSessionActivitySnapshot({ + sessionId: params.sessionId, + sessionKey: params.sessionKey, + }); + const lastProgressAgeMs = activity.lastProgressAgeMs; + return typeof lastProgressAgeMs === "number" && lastProgressAgeMs >= params.staleAbortMs; +} const recoveriesInFlight = new Set(); export type StuckSessionRecoveryParams = StuckSessionRecoveryRequest; @@ -100,9 +137,18 @@ export async function recoverStuckDiagnosticSession( let aborted = false; let drained = true; let forceCleared = false; + const staleActiveProgressAbortMs = resolveStaleActiveProgressAbortMs(params); if (activeSessionId) { - if (params.allowActiveAbort !== true) { + const reclaimStaleActiveRun = + params.allowActiveAbort !== true && + isActiveRunProgressStale({ + sessionId: activeSessionId, + sessionKey: params.sessionKey, + queueDepth: params.queueDepth, + staleAbortMs: staleActiveProgressAbortMs, + }); + if (params.allowActiveAbort !== true && !reclaimStaleActiveRun) { const outcome: StuckSessionRecoveryOutcome = { status: "skipped", action: "observe_only", @@ -118,6 +164,11 @@ export async function recoverStuckDiagnosticSession( diag.warn(`stuck session recovery outcome: ${formatRecoveryOutcome(outcome)}`); return outcome; } + if (reclaimStaleActiveRun) { + diag.warn( + `stuck session recovery reclaiming stale active run: ${formatRecoveryContext(params, { activeSessionId })}`, + ); + } const result = await abortAndDrainEmbeddedPiRun({ sessionId: activeSessionId, sessionKey: params.sessionKey, @@ -131,7 +182,23 @@ export async function recoverStuckDiagnosticSession( } if (!activeSessionId && activeWorkSessionId && isEmbeddedPiRunActive(activeWorkSessionId)) { - if (params.allowActiveAbort === true) { + const reclaimStaleReplyWork = + params.allowActiveAbort !== true && + isActiveRunProgressStale({ + sessionId: activeWorkSessionId, + sessionKey: params.sessionKey, + queueDepth: params.queueDepth, + staleAbortMs: staleActiveProgressAbortMs, + }); + if (params.allowActiveAbort === true || reclaimStaleReplyWork) { + if (reclaimStaleReplyWork) { + diag.warn( + `stuck session recovery reclaiming stale active reply work: ${formatRecoveryContext( + params, + { activeSessionId: activeWorkSessionId }, + )}`, + ); + } const result = await abortAndDrainEmbeddedPiRun({ sessionId: activeWorkSessionId, sessionKey: params.sessionKey, diff --git a/src/logging/diagnostic.ts b/src/logging/diagnostic.ts index 3b792edef2a..db814ef4326 100644 --- a/src/logging/diagnostic.ts +++ b/src/logging/diagnostic.ts @@ -1288,6 +1288,7 @@ export function startDiagnosticHeartbeat( queueDepth: state.queueDepth, expectedState: state.state, stateGeneration: state.generation, + staleActiveProgressAbortMs: stuckSessionAbortMs, }, }); } else if (