fix(diagnostics): reclaim wedged session lanes with a stale leaked active run (#86056)

* fix(diagnostics): reclaim wedged session lanes with a stale leaked active run

A group session lane could wedge permanently (#85639): an embedded run that dies
abnormally leaves a stale ACTIVE_EMBEDDED_RUNS handle, so the diagnostic heartbeat
classifies the lane stale_session_state (recoveryEligible without allowActiveAbort)
while stuck-session recovery reads the leaked isEmbeddedPiRunActive flag and skips
with active_reply_work — a tautology that keeps the lane forever. The age-based
escape never fires because ageMs (last-activity) resets on every incoming queued
message.

Make the active-run skip a liveness check: before keeping the lane, consult the
run's real forward-progress age (lastProgressAgeMs, not refreshed by incoming
messages). If a run flagged active has made no forward progress past the resolved
diagnostics.stuckSessionAbortMs threshold (threaded through the recovery request;
falls back to a 5-minute floor) with queued work waiting, treat it as a
leaked/dead handle and reclaim it (abort + drain + force-clear) instead of
skipping. A genuinely progressing run, or one within an operator-raised
threshold, is kept.

Fixes #85639

* test(diagnostics): cover stale active run recovery

---------

Co-authored-by: Peter Steinberger <steipete@gmail.com>
This commit is contained in:
Chunyue Wang
2026-05-25 21:20:59 +08:00
committed by GitHub
parent e761eb8f3e
commit 6f76d9f246
5 changed files with 207 additions and 2 deletions

View File

@@ -41,6 +41,7 @@ Docs: https://docs.openclaw.ai
- Tests: give the memory fallback QA scenario enough turn budget to exercise native Windows gateway runs instead of failing on the client timeout while the mock agent is still dispatching.
- Tests: collect QA gateway CPU/RSS metrics on native Windows and give the channel baseline enough turn budget to report slow gateway runs instead of timing out before proof.
- Install/update: bypass npm `min-release-age` policies with `--min-release-age=0` instead of `--before` so hosted installers keep working on npm versions that reject the combined config. (#84749) Thanks @TeodoroRodrigo.
- Diagnostics: reclaim wedged session lanes when stale active-run bookkeeping blocks queued work despite no forward progress. Fixes #85639. Thanks @openperf.
- WebChat: keep message-tool replies visible in the chat while still summarizing internal tool results for the model. Fixes #86347. Thanks @shakkernerd.
- Gateway/perf: fail startup benchmark samples when the Gateway process exits before benchmark teardown, including signal deaths after readiness probes.
- Gateway/perf: fail restart benchmark samples when the Gateway exits before benchmark teardown, including clean exits and signal deaths after successful restart probes.

View File

@@ -28,6 +28,12 @@ export type StuckSessionRecoveryRequest = {
allowActiveAbort?: boolean;
expectedState?: DiagnosticSessionState;
stateGeneration?: number;
/**
* Resolved no-forward-progress age (from `diagnostics.stuckSessionAbortMs`) after
* which an "active" run with queued work is treated as a leaked/dead handle and
* reclaimed. Honors an operator-raised threshold; falls back to a safe floor.
*/
staleActiveProgressAbortMs?: number;
};
type DiagnosticSessionRecoveryBaseOutcome = {

View File

@@ -14,6 +14,7 @@ const mocks = vi.hoisted(() => ({
resolveActiveEmbeddedRunHandleSessionId: vi.fn(),
resolveEmbeddedSessionLane: vi.fn((key: string) => `session:${key}`),
waitForEmbeddedPiRunEnd: vi.fn(),
getDiagnosticSessionActivitySnapshot: vi.fn(),
diag: {
debug: vi.fn(),
warn: vi.fn(),
@@ -60,6 +61,10 @@ vi.mock("./diagnostic-runtime.js", () => ({
diagnosticLogger: mocks.diag,
}));
vi.mock("./diagnostic-run-activity.js", () => ({
getDiagnosticSessionActivitySnapshot: mocks.getDiagnosticSessionActivitySnapshot,
}));
import {
testing,
recoverStuckDiagnosticSession,
@@ -85,6 +90,10 @@ function resetMocks() {
mocks.resolveActiveEmbeddedRunHandleSessionId.mockReset();
mocks.resolveEmbeddedSessionLane.mockClear();
mocks.waitForEmbeddedPiRunEnd.mockReset();
mocks.getDiagnosticSessionActivitySnapshot.mockReset();
// Default: no progress signal, so the staleness gate stays off unless a test
// opts in by returning a stale lastProgressAgeMs.
mocks.getDiagnosticSessionActivitySnapshot.mockReturnValue({});
mocks.diag.debug.mockReset();
mocks.diag.warn.mockReset();
}
@@ -121,6 +130,26 @@ describe("stuck session recovery", () => {
]);
});
it("reclaims a stale active embedded run with queued work and no forward progress (#85639)", async () => {
mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue("session-1");
mocks.getDiagnosticSessionActivitySnapshot.mockReturnValue({
lastProgressAgeMs: 10 * 60_000,
});
mocks.abortEmbeddedPiRun.mockReturnValue(true);
mocks.waitForEmbeddedPiRunEnd.mockResolvedValue(true);
const outcome = await recoverStuckDiagnosticSession({
sessionId: "session-1",
sessionKey: "agent:main:main",
ageMs: 180_000,
queueDepth: 1,
});
expect(mocks.abortEmbeddedPiRun).toHaveBeenCalledWith("session-1");
expect(outcome.status).toBe("aborted");
expect(warnLogMessages().some((m) => m.includes("reclaiming stale active run"))).toBe(true);
});
it("aborts an active embedded run when active abort recovery is enabled", async () => {
mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue("session-1");
mocks.abortEmbeddedPiRun.mockReturnValue(true);
@@ -292,6 +321,107 @@ describe("stuck session recovery", () => {
]);
});
it("reclaims stale leaked reply work with queued work and no forward progress (#85639)", async () => {
mocks.resolveActiveEmbeddedRunSessionId.mockReturnValue("queued-reply-session");
mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue(undefined);
mocks.isEmbeddedPiRunActive.mockReturnValue(true);
mocks.isEmbeddedPiRunHandleActive.mockReturnValue(false);
// The "active" run has made no forward progress for well past the staleness
// window — a leaked/dead handle, not genuine work.
mocks.getDiagnosticSessionActivitySnapshot.mockReturnValue({ lastProgressAgeMs: 10 * 60_000 });
mocks.abortEmbeddedPiRun.mockReturnValue(true);
mocks.waitForEmbeddedPiRunEnd.mockResolvedValue(true);
const outcome = await recoverStuckDiagnosticSession({
sessionId: "queued-reply-session",
sessionKey: "agent:main:main",
ageMs: 180_000,
queueDepth: 1,
});
// Reclaimed (aborted) instead of skipping with active_reply_work.
expect(mocks.abortEmbeddedPiRun).toHaveBeenCalledWith("queued-reply-session");
expect(outcome.status).not.toBe("skipped");
expect(warnLogMessages().some((m) => m.includes("reclaiming stale active reply work"))).toBe(
true,
);
});
it("honors an operator-raised stuck-session abort threshold for stale reclaim (#85639)", async () => {
mocks.resolveActiveEmbeddedRunSessionId.mockReturnValue("queued-reply-session");
mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue(undefined);
mocks.isEmbeddedPiRunActive.mockReturnValue(true);
mocks.isEmbeddedPiRunHandleActive.mockReturnValue(false);
mocks.abortEmbeddedPiRun.mockReturnValue(true);
mocks.waitForEmbeddedPiRunEnd.mockResolvedValue(true);
// Operator raised the abort threshold to 20 min to protect slow active work.
const raisedAbortMs = 20 * 60_000;
// Below the raised threshold (10 min): keep the lane, do not reclaim.
mocks.getDiagnosticSessionActivitySnapshot.mockReturnValue({ lastProgressAgeMs: 10 * 60_000 });
const kept = await recoverStuckDiagnosticSession({
sessionId: "queued-reply-session",
sessionKey: "agent:main:main",
ageMs: 180_000,
queueDepth: 1,
staleActiveProgressAbortMs: raisedAbortMs,
});
expect(mocks.abortEmbeddedPiRun).not.toHaveBeenCalled();
expect(kept.status).toBe("skipped");
// Past the raised threshold (25 min): reclaim.
mocks.getDiagnosticSessionActivitySnapshot.mockReturnValue({ lastProgressAgeMs: 25 * 60_000 });
const reclaimed = await recoverStuckDiagnosticSession({
sessionId: "queued-reply-session",
sessionKey: "agent:main:main",
ageMs: 180_000,
queueDepth: 1,
staleActiveProgressAbortMs: raisedAbortMs,
});
expect(mocks.abortEmbeddedPiRun).toHaveBeenCalledWith("queued-reply-session");
expect(reclaimed.status).not.toBe("skipped");
});
it("keeps the lane when active reply work is still progressing", async () => {
mocks.resolveActiveEmbeddedRunSessionId.mockReturnValue("queued-reply-session");
mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue(undefined);
mocks.isEmbeddedPiRunActive.mockReturnValue(true);
mocks.isEmbeddedPiRunHandleActive.mockReturnValue(false);
// Recent forward progress: a genuinely active run must not be reclaimed.
mocks.getDiagnosticSessionActivitySnapshot.mockReturnValue({ lastProgressAgeMs: 5_000 });
const outcome = await recoverStuckDiagnosticSession({
sessionId: "queued-reply-session",
sessionKey: "agent:main:main",
ageMs: 180_000,
queueDepth: 1,
});
expect(mocks.abortEmbeddedPiRun).not.toHaveBeenCalled();
expect(outcome.status).toBe("skipped");
expect(warnLogMessages().some((m) => m.includes("reason=active_reply_work"))).toBe(true);
});
it("does not reclaim stale reply work when no work is queued", async () => {
mocks.resolveActiveEmbeddedRunSessionId.mockReturnValue("queued-reply-session");
mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue(undefined);
mocks.isEmbeddedPiRunActive.mockReturnValue(true);
mocks.isEmbeddedPiRunHandleActive.mockReturnValue(false);
mocks.getDiagnosticSessionActivitySnapshot.mockReturnValue({ lastProgressAgeMs: 10 * 60_000 });
const outcome = await recoverStuckDiagnosticSession({
sessionId: "queued-reply-session",
sessionKey: "agent:main:main",
ageMs: 180_000,
queueDepth: 0,
});
expect(mocks.abortEmbeddedPiRun).not.toHaveBeenCalled();
expect(outcome.status).toBe("skipped");
expect(warnLogMessages().some((m) => m.includes("reason=active_reply_work"))).toBe(true);
});
it("aborts stale reply work without an embedded handle when active abort recovery is enabled", async () => {
mocks.resolveActiveEmbeddedRunSessionId.mockReturnValue("queued-reply-session");
mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue(undefined);

View File

@@ -7,6 +7,7 @@ import {
resolveActiveEmbeddedRunHandleSessionId,
} from "../agents/pi-embedded-runner/runs.js";
import { getCommandLaneSnapshot, resetCommandLane } from "../process/command-queue.js";
import { getDiagnosticSessionActivitySnapshot } from "./diagnostic-run-activity.js";
import { diagnosticLogger as diag } from "./diagnostic-runtime.js";
import {
formatStoppedCronSessionDiagnosticFields,
@@ -20,6 +21,42 @@ import {
import { isDiagnosticSessionStateCurrent } from "./diagnostic-session-state.js";
const STUCK_SESSION_ABORT_SETTLE_MS = 15_000;
// Default no-forward-progress age used only when the caller does not carry a
// resolved `diagnostics.stuckSessionAbortMs`. A run flagged "active" that has made
// no forward progress (tool/model/chunk events) for at least the resolved window,
// while queued work waits, is treated as a leaked/dead handle and reclaimed even
// without an explicit active-abort grant. `lastProgressAgeMs` tracks real progress
// (not incoming queued messages), so it keeps growing while a lane is wedged.
const STUCK_SESSION_PROGRESS_STALE_MS = 5 * 60_000;
function resolveStaleActiveProgressAbortMs(params: StuckSessionRecoveryParams): number {
const configured = params.staleActiveProgressAbortMs;
// Honor the resolved `diagnostics.stuckSessionAbortMs` as-is — an operator can
// raise it to protect slow active work (it is the same threshold the existing
// `session.stalled` abort uses). It is floored at the warn threshold upstream,
// not necessarily 5 min, so we only apply the 5-min default when no value is
// carried (e.g. direct callers).
return typeof configured === "number" && configured > 0
? configured
: STUCK_SESSION_PROGRESS_STALE_MS;
}
function isActiveRunProgressStale(params: {
sessionId?: string;
sessionKey?: string;
queueDepth?: number;
staleAbortMs: number;
}): boolean {
if ((params.queueDepth ?? 0) <= 0) {
return false;
}
const activity = getDiagnosticSessionActivitySnapshot({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
});
const lastProgressAgeMs = activity.lastProgressAgeMs;
return typeof lastProgressAgeMs === "number" && lastProgressAgeMs >= params.staleAbortMs;
}
const recoveriesInFlight = new Set<string>();
export type StuckSessionRecoveryParams = StuckSessionRecoveryRequest;
@@ -100,9 +137,18 @@ export async function recoverStuckDiagnosticSession(
let aborted = false;
let drained = true;
let forceCleared = false;
const staleActiveProgressAbortMs = resolveStaleActiveProgressAbortMs(params);
if (activeSessionId) {
if (params.allowActiveAbort !== true) {
const reclaimStaleActiveRun =
params.allowActiveAbort !== true &&
isActiveRunProgressStale({
sessionId: activeSessionId,
sessionKey: params.sessionKey,
queueDepth: params.queueDepth,
staleAbortMs: staleActiveProgressAbortMs,
});
if (params.allowActiveAbort !== true && !reclaimStaleActiveRun) {
const outcome: StuckSessionRecoveryOutcome = {
status: "skipped",
action: "observe_only",
@@ -118,6 +164,11 @@ export async function recoverStuckDiagnosticSession(
diag.warn(`stuck session recovery outcome: ${formatRecoveryOutcome(outcome)}`);
return outcome;
}
if (reclaimStaleActiveRun) {
diag.warn(
`stuck session recovery reclaiming stale active run: ${formatRecoveryContext(params, { activeSessionId })}`,
);
}
const result = await abortAndDrainEmbeddedPiRun({
sessionId: activeSessionId,
sessionKey: params.sessionKey,
@@ -131,7 +182,23 @@ export async function recoverStuckDiagnosticSession(
}
if (!activeSessionId && activeWorkSessionId && isEmbeddedPiRunActive(activeWorkSessionId)) {
if (params.allowActiveAbort === true) {
const reclaimStaleReplyWork =
params.allowActiveAbort !== true &&
isActiveRunProgressStale({
sessionId: activeWorkSessionId,
sessionKey: params.sessionKey,
queueDepth: params.queueDepth,
staleAbortMs: staleActiveProgressAbortMs,
});
if (params.allowActiveAbort === true || reclaimStaleReplyWork) {
if (reclaimStaleReplyWork) {
diag.warn(
`stuck session recovery reclaiming stale active reply work: ${formatRecoveryContext(
params,
{ activeSessionId: activeWorkSessionId },
)}`,
);
}
const result = await abortAndDrainEmbeddedPiRun({
sessionId: activeWorkSessionId,
sessionKey: params.sessionKey,

View File

@@ -1288,6 +1288,7 @@ export function startDiagnosticHeartbeat(
queueDepth: state.queueDepth,
expectedState: state.state,
stateGeneration: state.generation,
staleActiveProgressAbortMs: stuckSessionAbortMs,
},
});
} else if (