fix(gateway): recover stale session lanes

This commit is contained in:
Peter Steinberger
2026-04-28 20:36:40 +01:00
parent 933c7968dc
commit c500e8704f
14 changed files with 729 additions and 5 deletions

View File

@@ -14,6 +14,7 @@ Docs: https://docs.openclaw.ai
- Channels/Discord: suppress duplicate gateway monitors when multiple enabled accounts resolve to the same bot token, preferring config tokens over default env fallback and reporting skipped duplicates as disabled. Supersedes #73608. Thanks @kagura-agent.
- Channels/Discord: ignore stale route-shaped conversation bindings after a Discord channel is reconfigured to another agent, while preserving explicit focus and subagent bindings. Fixes #73626. Thanks @ramitrkar-hash.
- Gateway/sessions: add conservative stuck-session recovery that releases only stale session lanes while active embedded runs, reply operations, and lane tasks remain serialized, so queued follow-ups can drain without aborting legitimate long-running turns. Refs #73581, #73655, #73652, #73705, #73647, #73602, #73592, and #73601. Thanks @WS-Q0758, @bryangauvin, @spenceryang1996-dot, @bmilne1981, @mattmcintyre, @Vksh07, and @Spolen23.
- NVIDIA/NIM: persist the `NVIDIA_API_KEY` provider marker and mark bundled NVIDIA Chat Completions models as string-content compatible, so NIM models load from `models.json` and OpenAI-compatible subagent calls send plain text content. Fixes #73013 and #50107; refs #73014. Thanks @bautrey, @iot2edge, @ifearghal, and @futhgar.
- Channels/Discord: let text-only configs drop the `GuildVoiceStates` gateway intent and expose a bounded `/gateway/bot` metadata timeout with rate-limited fallback logs, reducing idle CPU and warning floods. Fixes #73709 and #73585. Thanks @sanchezm86 and @trac3r00.
- Agents/sessions: mark same-turn `sessions_send` and A2A reply prompts with an inter-session `isUser=false` envelope before they reach the model, so foreign session output no longer lands as bare active user text. Fixes #73702; refs #73698, #73609, #73595, and #73622. Thanks @alvelda.

View File

@@ -162,6 +162,7 @@ surfaces, while Codex native hooks remain a separate lower-level Codex mechanism
- `agent.wait` default: 30s (just the wait). `timeoutMs` param overrides.
- Agent runtime: `agents.defaults.timeoutSeconds` default 172800s (48 hours); enforced in `runEmbeddedPiAgent` abort timer.
- Stuck-session recovery: with diagnostics enabled, `diagnostics.stuckSessionWarnMs` detects long `processing` sessions. Active embedded runs, active reply operations, and active session-lane tasks remain warning-only by default; if diagnostics show no active work for the session, the watchdog releases the affected session lane so queued startup work can drain.
- Model idle timeout: OpenClaw aborts a model request when no response chunks arrive before the idle window. `models.providers.<id>.timeoutSeconds` extends this idle watchdog for slow local/self-hosted providers; otherwise OpenClaw uses `agents.defaults.timeoutSeconds` when configured, capped at 120s by default. Cron-triggered runs with no explicit model or agent timeout disable the idle watchdog and rely on the cron outer timeout.
- Provider HTTP request timeout: `models.providers.<id>.timeoutSeconds` applies to that provider's model HTTP fetches, including connect, headers, body, SDK request timeout, total guarded-fetch abort handling, and model stream idle watchdog. Use this for slow local/self-hosted providers such as Ollama before raising the whole agent runtime timeout.

View File

@@ -85,6 +85,7 @@ Defaults: `debounceMs: 1000`, `cap: 20`, `drop: summarize`.
- If commands seem stuck, enable verbose logs and look for “queued for …ms” lines to confirm the queue is draining.
- If you need queue depth, enable verbose logs and watch for queue timing lines.
- When diagnostics are enabled, sessions that remain in `processing` past `diagnostics.stuckSessionWarnMs` log a stuck-session warning. Active embedded runs, active reply operations, and active lane tasks remain warning-only by default; stale startup bookkeeping with no active session work can release the affected session lane so queued work drains.
## Related

View File

@@ -6,7 +6,9 @@ import {
clearActiveEmbeddedRun,
consumeEmbeddedRunModelSwitch,
getActiveEmbeddedRunSnapshot,
isEmbeddedPiRunHandleActive,
requestEmbeddedRunModelSwitch,
resolveActiveEmbeddedRunHandleSessionId,
setActiveEmbeddedRun,
updateActiveEmbeddedRunSnapshot,
waitForActiveEmbeddedRuns,
@@ -124,6 +126,23 @@ describe("pi-embedded runner run registry", () => {
}
});
it("tracks actual embedded handles separately from reply-operation ownership", () => {
const handle = createRunHandle();
expect(isEmbeddedPiRunHandleActive("session-a")).toBe(false);
expect(resolveActiveEmbeddedRunHandleSessionId("agent:main:main")).toBeUndefined();
setActiveEmbeddedRun("session-a", handle, "agent:main:main");
expect(isEmbeddedPiRunHandleActive("session-a")).toBe(true);
expect(resolveActiveEmbeddedRunHandleSessionId("agent:main:main")).toBe("session-a");
clearActiveEmbeddedRun("session-a", handle, "agent:main:main");
expect(isEmbeddedPiRunHandleActive("session-a")).toBe(false);
expect(resolveActiveEmbeddedRunHandleSessionId("agent:main:main")).toBeUndefined();
});
it("tracks and clears per-session transcript snapshots for active runs", () => {
const handle = createRunHandle();

View File

@@ -1,6 +1,7 @@
import {
abortActiveReplyRuns,
abortReplyRunBySessionId,
forceClearReplyRunBySessionId,
isReplyRunActiveForSessionId,
isReplyRunStreamingForSessionId,
queueReplyRunMessage,
@@ -157,6 +158,14 @@ export function isEmbeddedPiRunActive(sessionId: string): boolean {
return active;
}
export function isEmbeddedPiRunHandleActive(sessionId: string): boolean {
const active = ACTIVE_EMBEDDED_RUNS.has(sessionId);
if (active) {
diag.debug(`run handle active check: sessionId=${sessionId} active=true`);
}
return active;
}
export function isEmbeddedPiRunStreaming(sessionId: string): boolean {
const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId);
if (!handle) {
@@ -165,6 +174,14 @@ export function isEmbeddedPiRunStreaming(sessionId: string): boolean {
return handle.isStreaming();
}
export function resolveActiveEmbeddedRunHandleSessionId(sessionKey: string): string | undefined {
const normalizedSessionKey = sessionKey.trim();
if (!normalizedSessionKey) {
return undefined;
}
return ACTIVE_EMBEDDED_RUN_SESSION_IDS_BY_KEY.get(normalizedSessionKey);
}
export function resolveActiveEmbeddedRunSessionId(sessionKey: string): string | undefined {
const normalizedSessionKey = sessionKey.trim();
if (!normalizedSessionKey) {
@@ -355,6 +372,25 @@ export function clearActiveEmbeddedRun(
}
}
export function forceClearEmbeddedPiRun(
sessionId: string,
sessionKey?: string,
reason = "stuck_recovery",
): boolean {
let cleared = false;
if (ACTIVE_EMBEDDED_RUNS.has(sessionId)) {
ACTIVE_EMBEDDED_RUNS.delete(sessionId);
ACTIVE_EMBEDDED_RUN_SNAPSHOTS.delete(sessionId);
EMBEDDED_RUN_MODEL_SWITCH_REQUESTS.delete(sessionId);
clearActiveRunSessionKeys(sessionId, sessionKey);
logSessionStateChange({ sessionId, sessionKey, state: "idle", reason });
notifyEmbeddedRunEnded(sessionId);
cleared = true;
}
const cause = new Error(`Embedded run force-cleared by ${reason}`);
return forceClearReplyRunBySessionId(sessionId, cause) || cleared;
}
export const __testing = {
resetActiveEmbeddedRuns() {
for (const waiters of EMBEDDED_RUN_WAITERS.values()) {

View File

@@ -3,6 +3,7 @@ import {
__testing,
abortActiveReplyRuns,
createReplyOperation,
forceClearReplyRunBySessionId,
isReplyRunActiveForSessionId,
queueReplyRunMessage,
replyRunRegistry,
@@ -65,6 +66,39 @@ describe("reply run registry", () => {
expect(replyRunRegistry.isActive("agent:main:main")).toBe(false);
});
it("force-clears a running operation after abort without backend cleanup", async () => {
vi.useFakeTimers();
try {
const cancel = vi.fn();
const operation = createReplyOperation({
sessionKey: "agent:main:main",
sessionId: "session-running",
resetTriggered: false,
});
operation.attachBackend({
kind: "embedded",
cancel,
isStreaming: () => true,
});
operation.setPhase("running");
operation.abortByUser();
const waitPromise = waitForReplyRunEndBySessionId("session-running", 1_000);
expect(operation.result).toEqual({ kind: "aborted", code: "aborted_by_user" });
expect(cancel).toHaveBeenCalledWith("user_abort");
expect(isReplyRunActiveForSessionId("session-running")).toBe(true);
expect(forceClearReplyRunBySessionId("session-running", new Error("stuck"))).toBe(true);
expect(isReplyRunActiveForSessionId("session-running")).toBe(false);
await expect(waitPromise).resolves.toBe(true);
} finally {
await vi.runOnlyPendingTimersAsync();
vi.useRealTimers();
}
});
it("queues messages only through the active running backend", async () => {
const queueMessage = vi.fn(async () => {});
const operation = createReplyOperation({

View File

@@ -479,6 +479,15 @@ export function abortReplyRunBySessionId(sessionId: string): boolean {
return true;
}
export function forceClearReplyRunBySessionId(sessionId: string, cause?: unknown): boolean {
const operation = resolveReplyRunForCurrentSessionId(sessionId);
if (!operation) {
return false;
}
operation.fail("run_failed", cause);
return true;
}
export function waitForReplyRunEndBySessionId(
sessionId: string,
timeoutMs = 15_000,

View File

@@ -0,0 +1,90 @@
import { afterEach, describe, expect, it } from "vitest";
import { resolveEmbeddedSessionLane } from "../agents/pi-embedded-runner/lanes.js";
import {
__testing as replyRunTesting,
createReplyOperation,
} from "../auto-reply/reply/reply-run-registry.js";
import {
enqueueCommandInLane,
getQueueSize,
resetCommandLane,
resetCommandQueueStateForTest,
} from "../process/command-queue.js";
import {
__testing as recoveryTesting,
recoverStuckDiagnosticSession,
} from "./diagnostic-stuck-session-recovery.runtime.js";
function delay(ms: number): Promise<"blocked"> {
return new Promise((resolve) => setTimeout(() => resolve("blocked"), ms));
}
describe("stuck session recovery integration", () => {
afterEach(() => {
recoveryTesting.resetRecoveriesInFlight();
replyRunTesting.resetReplyRunRegistry();
resetCommandQueueStateForTest();
});
it("does not reset a blocked lane while a reply operation is still active", async () => {
const sessionKey = "agent:main:active-reply";
const sessionId = "active-reply-session";
const lane = resolveEmbeddedSessionLane(sessionKey);
void enqueueCommandInLane(lane, () => new Promise<never>(() => {}), {
warnAfterMs: Number.MAX_SAFE_INTEGER,
});
const queued = enqueueCommandInLane(lane, async () => "drained", {
warnAfterMs: Number.MAX_SAFE_INTEGER,
});
const operation = createReplyOperation({
sessionKey,
sessionId,
resetTriggered: false,
});
expect(getQueueSize(lane)).toBe(2);
await recoverStuckDiagnosticSession({
sessionId,
sessionKey,
ageMs: 180_000,
queueDepth: 1,
});
await expect(Promise.race([queued, delay(100)])).resolves.toBe("blocked");
expect(getQueueSize(lane)).toBe(2);
operation.complete();
expect(resetCommandLane(lane)).toBe(1);
await expect(queued).resolves.toBe("drained");
});
it("does not reset a blocked lane while unregistered lane work is still active", async () => {
const sessionKey = "agent:main:unregistered-work";
const sessionId = "unregistered-work-session";
const lane = resolveEmbeddedSessionLane(sessionKey);
void enqueueCommandInLane(lane, () => new Promise<never>(() => {}), {
warnAfterMs: Number.MAX_SAFE_INTEGER,
});
const queued = enqueueCommandInLane(lane, async () => "drained", {
warnAfterMs: Number.MAX_SAFE_INTEGER,
});
expect(getQueueSize(lane)).toBe(2);
await recoverStuckDiagnosticSession({
sessionId,
sessionKey,
ageMs: 180_000,
queueDepth: 1,
});
await expect(Promise.race([queued, delay(100)])).resolves.toBe("blocked");
expect(getQueueSize(lane)).toBe(2);
expect(resetCommandLane(lane)).toBe(1);
await expect(queued).resolves.toBe("drained");
});
});

View File

@@ -0,0 +1,251 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
const mocks = vi.hoisted(() => ({
abortEmbeddedPiRun: vi.fn(),
forceClearEmbeddedPiRun: vi.fn(),
isEmbeddedPiRunActive: vi.fn(),
isEmbeddedPiRunHandleActive: vi.fn(),
getCommandLaneSnapshot: vi.fn(),
resetCommandLane: vi.fn(),
resolveActiveEmbeddedRunSessionId: vi.fn(),
resolveActiveEmbeddedRunHandleSessionId: vi.fn(),
resolveEmbeddedSessionLane: vi.fn((key: string) => `session:${key}`),
waitForEmbeddedPiRunEnd: vi.fn(),
diag: {
debug: vi.fn(),
warn: vi.fn(),
},
}));
vi.mock("../agents/pi-embedded-runner/runs.js", () => ({
abortEmbeddedPiRun: mocks.abortEmbeddedPiRun,
forceClearEmbeddedPiRun: mocks.forceClearEmbeddedPiRun,
isEmbeddedPiRunActive: mocks.isEmbeddedPiRunActive,
isEmbeddedPiRunHandleActive: mocks.isEmbeddedPiRunHandleActive,
resolveActiveEmbeddedRunSessionId: mocks.resolveActiveEmbeddedRunSessionId,
resolveActiveEmbeddedRunHandleSessionId: mocks.resolveActiveEmbeddedRunHandleSessionId,
waitForEmbeddedPiRunEnd: mocks.waitForEmbeddedPiRunEnd,
}));
vi.mock("../agents/pi-embedded-runner/lanes.js", () => ({
resolveEmbeddedSessionLane: mocks.resolveEmbeddedSessionLane,
}));
vi.mock("../process/command-queue.js", () => ({
getCommandLaneSnapshot: mocks.getCommandLaneSnapshot,
resetCommandLane: mocks.resetCommandLane,
}));
vi.mock("./diagnostic-runtime.js", () => ({
diagnosticLogger: mocks.diag,
}));
import {
__testing,
recoverStuckDiagnosticSession,
} from "./diagnostic-stuck-session-recovery.runtime.js";
function resetMocks() {
__testing.resetRecoveriesInFlight();
mocks.abortEmbeddedPiRun.mockReset();
mocks.forceClearEmbeddedPiRun.mockReset();
mocks.isEmbeddedPiRunActive.mockReset();
mocks.isEmbeddedPiRunHandleActive.mockReset();
mocks.getCommandLaneSnapshot.mockReset();
mocks.getCommandLaneSnapshot.mockReturnValue({
lane: "session:agent:main:main",
queuedCount: 1,
activeCount: 0,
maxConcurrent: 1,
draining: false,
generation: 0,
});
mocks.resetCommandLane.mockReset();
mocks.resolveActiveEmbeddedRunSessionId.mockReset();
mocks.resolveActiveEmbeddedRunHandleSessionId.mockReset();
mocks.resolveEmbeddedSessionLane.mockClear();
mocks.waitForEmbeddedPiRunEnd.mockReset();
mocks.diag.debug.mockReset();
mocks.diag.warn.mockReset();
}
describe("stuck session recovery", () => {
beforeEach(() => {
resetMocks();
});
it("does not abort an active embedded run by default", async () => {
mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue("session-1");
await recoverStuckDiagnosticSession({
sessionId: "session-1",
sessionKey: "agent:main:main",
ageMs: 180_000,
queueDepth: 1,
});
expect(mocks.abortEmbeddedPiRun).not.toHaveBeenCalled();
expect(mocks.waitForEmbeddedPiRunEnd).not.toHaveBeenCalled();
expect(mocks.forceClearEmbeddedPiRun).not.toHaveBeenCalled();
expect(mocks.resetCommandLane).not.toHaveBeenCalled();
});
it("aborts an active embedded run when active abort recovery is enabled", async () => {
mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue("session-1");
mocks.abortEmbeddedPiRun.mockReturnValue(true);
mocks.waitForEmbeddedPiRunEnd.mockResolvedValue(true);
await recoverStuckDiagnosticSession({
sessionId: "session-1",
sessionKey: "agent:main:main",
ageMs: 180_000,
allowActiveAbort: true,
});
expect(mocks.abortEmbeddedPiRun).toHaveBeenCalledWith("session-1");
expect(mocks.waitForEmbeddedPiRunEnd).toHaveBeenCalledWith("session-1", 15_000);
expect(mocks.forceClearEmbeddedPiRun).not.toHaveBeenCalled();
expect(mocks.resetCommandLane).not.toHaveBeenCalled();
});
it("force-clears and releases the session lane when abort cleanup does not drain", async () => {
mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue("session-1");
mocks.abortEmbeddedPiRun.mockReturnValue(true);
mocks.waitForEmbeddedPiRunEnd.mockResolvedValue(false);
mocks.resetCommandLane.mockReturnValue(1);
await recoverStuckDiagnosticSession({
sessionId: "session-1",
sessionKey: "agent:main:main",
ageMs: 240_000,
allowActiveAbort: true,
});
expect(mocks.forceClearEmbeddedPiRun).toHaveBeenCalledWith(
"session-1",
"agent:main:main",
"stuck_recovery",
);
expect(mocks.resetCommandLane).toHaveBeenCalledWith("session:agent:main:main");
});
it("force-clears and releases the session lane when an active run cannot be aborted", async () => {
mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue("session-1");
mocks.abortEmbeddedPiRun.mockReturnValue(false);
mocks.resetCommandLane.mockReturnValue(1);
await recoverStuckDiagnosticSession({
sessionId: "session-1",
sessionKey: "agent:main:main",
ageMs: 240_000,
allowActiveAbort: true,
});
expect(mocks.waitForEmbeddedPiRunEnd).not.toHaveBeenCalled();
expect(mocks.forceClearEmbeddedPiRun).toHaveBeenCalledWith(
"session-1",
"agent:main:main",
"stuck_recovery",
);
expect(mocks.resetCommandLane).toHaveBeenCalledWith("session:agent:main:main");
});
it("releases a stale session lane when diagnostics are processing but no active run exists", async () => {
mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue(undefined);
mocks.resetCommandLane.mockReturnValue(1);
await recoverStuckDiagnosticSession({
sessionId: "session-1",
sessionKey: "agent:main:main",
ageMs: 180_000,
});
expect(mocks.abortEmbeddedPiRun).not.toHaveBeenCalled();
expect(mocks.resetCommandLane).toHaveBeenCalledWith("session:agent:main:main");
});
it("does not release the session lane while reply work is active without an embedded handle", async () => {
mocks.resolveActiveEmbeddedRunSessionId.mockReturnValue("queued-reply-session");
mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue(undefined);
mocks.isEmbeddedPiRunActive.mockReturnValue(true);
mocks.isEmbeddedPiRunHandleActive.mockReturnValue(false);
await recoverStuckDiagnosticSession({
sessionId: "queued-reply-session",
sessionKey: "agent:main:main",
ageMs: 180_000,
queueDepth: 1,
});
expect(mocks.abortEmbeddedPiRun).not.toHaveBeenCalled();
expect(mocks.forceClearEmbeddedPiRun).not.toHaveBeenCalled();
expect(mocks.resetCommandLane).not.toHaveBeenCalled();
});
it("does not release the session lane while unregistered lane work is active", async () => {
mocks.resolveActiveEmbeddedRunSessionId.mockReturnValue(undefined);
mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue(undefined);
mocks.isEmbeddedPiRunActive.mockReturnValue(false);
mocks.isEmbeddedPiRunHandleActive.mockReturnValue(false);
mocks.getCommandLaneSnapshot.mockReturnValue({
lane: "session:agent:main:main",
queuedCount: 1,
activeCount: 1,
maxConcurrent: 1,
draining: false,
generation: 0,
});
await recoverStuckDiagnosticSession({
sessionId: "unregistered-work-session",
sessionKey: "agent:main:main",
ageMs: 180_000,
queueDepth: 1,
});
expect(mocks.abortEmbeddedPiRun).not.toHaveBeenCalled();
expect(mocks.forceClearEmbeddedPiRun).not.toHaveBeenCalled();
expect(mocks.resetCommandLane).not.toHaveBeenCalled();
});
it("releases a stale session-id lane when no session key is available", async () => {
mocks.isEmbeddedPiRunHandleActive.mockReturnValue(false);
mocks.resetCommandLane.mockReturnValue(1);
await recoverStuckDiagnosticSession({
sessionId: "session-only",
ageMs: 180_000,
});
expect(mocks.abortEmbeddedPiRun).not.toHaveBeenCalled();
expect(mocks.resolveEmbeddedSessionLane).toHaveBeenCalledWith("session-only");
expect(mocks.resetCommandLane).toHaveBeenCalledWith("session:session-only");
});
it("coalesces duplicate recovery attempts for the same session", async () => {
let resolveWait!: (value: boolean) => void;
const waitPromise = new Promise<boolean>((resolve) => {
resolveWait = resolve;
});
mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue("session-1");
mocks.abortEmbeddedPiRun.mockReturnValue(true);
mocks.waitForEmbeddedPiRunEnd.mockReturnValue(waitPromise);
const first = recoverStuckDiagnosticSession({
sessionId: "session-1",
sessionKey: "agent:main:main",
ageMs: 180_000,
allowActiveAbort: true,
});
await recoverStuckDiagnosticSession({
sessionId: "session-1",
sessionKey: "agent:main:main",
ageMs: 210_000,
allowActiveAbort: true,
});
expect(mocks.abortEmbeddedPiRun).toHaveBeenCalledTimes(1);
resolveWait(true);
await first;
});
});

View File

@@ -0,0 +1,120 @@
import { resolveEmbeddedSessionLane } from "../agents/pi-embedded-runner/lanes.js";
import {
abortEmbeddedPiRun,
forceClearEmbeddedPiRun,
isEmbeddedPiRunActive,
isEmbeddedPiRunHandleActive,
resolveActiveEmbeddedRunSessionId,
resolveActiveEmbeddedRunHandleSessionId,
waitForEmbeddedPiRunEnd,
} from "../agents/pi-embedded-runner/runs.js";
import { getCommandLaneSnapshot, resetCommandLane } from "../process/command-queue.js";
import { diagnosticLogger as diag } from "./diagnostic-runtime.js";
const STUCK_SESSION_ABORT_SETTLE_MS = 15_000;
const recoveriesInFlight = new Set<string>();
export type StuckSessionRecoveryParams = {
sessionId?: string;
sessionKey?: string;
ageMs: number;
queueDepth?: number;
allowActiveAbort?: boolean;
};
function recoveryKey(params: StuckSessionRecoveryParams): string | undefined {
return params.sessionKey?.trim() || params.sessionId?.trim() || undefined;
}
export async function recoverStuckDiagnosticSession(
params: StuckSessionRecoveryParams,
): Promise<void> {
const key = recoveryKey(params);
if (!key || recoveriesInFlight.has(key)) {
return;
}
recoveriesInFlight.add(key);
try {
const fallbackActiveSessionId =
params.sessionId && isEmbeddedPiRunHandleActive(params.sessionId)
? params.sessionId
: undefined;
const activeSessionId = params.sessionKey
? (resolveActiveEmbeddedRunHandleSessionId(params.sessionKey) ?? fallbackActiveSessionId)
: fallbackActiveSessionId;
const activeWorkSessionId = params.sessionKey
? (resolveActiveEmbeddedRunSessionId(params.sessionKey) ?? params.sessionId)
: params.sessionId;
const laneKey = params.sessionKey?.trim() || params.sessionId?.trim();
const sessionLane = laneKey ? resolveEmbeddedSessionLane(laneKey) : null;
let aborted = false;
let drained = true;
if (activeSessionId) {
if (params.allowActiveAbort !== true) {
diag.debug(
`stuck session recovery skipped active abort: sessionId=${
params.sessionId ?? activeSessionId
} sessionKey=${params.sessionKey ?? "unknown"} age=${Math.round(
params.ageMs / 1000,
)}s queueDepth=${params.queueDepth ?? 0}`,
);
return;
}
aborted = abortEmbeddedPiRun(activeSessionId);
if (aborted) {
drained = await waitForEmbeddedPiRunEnd(activeSessionId, STUCK_SESSION_ABORT_SETTLE_MS);
}
if (!aborted || !drained) {
forceClearEmbeddedPiRun(activeSessionId, params.sessionKey, "stuck_recovery");
}
}
if (!activeSessionId && activeWorkSessionId && isEmbeddedPiRunActive(activeWorkSessionId)) {
diag.debug(
`stuck session recovery skipped lane reset: active reply work sessionId=${activeWorkSessionId} sessionKey=${
params.sessionKey ?? "unknown"
} age=${Math.round(params.ageMs / 1000)}s queueDepth=${params.queueDepth ?? 0}`,
);
return;
}
if (!activeSessionId && sessionLane) {
const laneSnapshot = getCommandLaneSnapshot(sessionLane);
if (laneSnapshot.activeCount > 0) {
diag.debug(
`stuck session recovery skipped lane reset: active lane task lane=${sessionLane} active=${laneSnapshot.activeCount} queued=${laneSnapshot.queuedCount} sessionId=${
params.sessionId ?? "unknown"
} sessionKey=${params.sessionKey ?? "unknown"} age=${Math.round(params.ageMs / 1000)}s`,
);
return;
}
}
const released =
sessionLane && (!activeSessionId || !aborted || !drained) ? resetCommandLane(sessionLane) : 0;
if (aborted || released > 0) {
diag.warn(
`stuck session recovery: sessionId=${params.sessionId ?? activeSessionId ?? "unknown"} sessionKey=${
params.sessionKey ?? "unknown"
} age=${Math.round(params.ageMs / 1000)}s aborted=${aborted} drained=${drained} released=${released}`,
);
}
} catch (err) {
diag.warn(
`stuck session recovery failed: sessionId=${params.sessionId ?? "unknown"} sessionKey=${
params.sessionKey ?? "unknown"
} err=${String(err)}`,
);
} finally {
recoveriesInFlight.delete(key);
}
}
export const __testing = {
resetRecoveriesInFlight(): void {
recoveriesInFlight.clear();
},
};

View File

@@ -125,16 +125,20 @@ describe("stuck session diagnostics threshold", () => {
it("uses the configured diagnostics.stuckSessionWarnMs threshold", () => {
const events: Array<{ type: string }> = [];
const recoverStuckSession = vi.fn();
const unsubscribe = onDiagnosticEvent((event) => {
events.push({ type: event.type });
});
try {
startDiagnosticHeartbeat({
diagnostics: {
enabled: true,
stuckSessionWarnMs: 30_000,
startDiagnosticHeartbeat(
{
diagnostics: {
enabled: true,
stuckSessionWarnMs: 30_000,
},
},
});
{ recoverStuckSession },
);
logSessionStateChange({ sessionId: "s1", sessionKey: "main", state: "processing" });
vi.advanceTimersByTime(61_000);
} finally {
@@ -142,6 +146,12 @@ describe("stuck session diagnostics threshold", () => {
}
expect(events.filter((event) => event.type === "session.stuck")).toHaveLength(1);
expect(recoverStuckSession).toHaveBeenCalledWith({
sessionId: "s1",
sessionKey: "main",
ageMs: expect.any(Number),
queueDepth: 0,
});
});
it("starts and stops the stability recorder with the heartbeat lifecycle", () => {

View File

@@ -53,6 +53,9 @@ const DEFAULT_LIVENESS_WARN_COOLDOWN_MS = 120_000;
let commandPollBackoffRuntimePromise: Promise<
typeof import("../agents/command-poll-backoff.runtime.js")
> | null = null;
let stuckSessionRecoveryRuntimePromise: Promise<
typeof import("./diagnostic-stuck-session-recovery.runtime.js")
> | null = null;
type EmitDiagnosticMemorySample = typeof emitDiagnosticMemorySample;
type EventLoopDelayMonitor = ReturnType<typeof monitorEventLoopDelay>;
@@ -65,6 +68,13 @@ type DiagnosticWorkSnapshot = {
queuedCount: number;
};
type RecoverStuckSession = (params: {
sessionId?: string;
sessionKey?: string;
ageMs: number;
queueDepth?: number;
}) => void | Promise<void>;
type DiagnosticLivenessSample = {
reasons: DiagnosticLivenessWarningReason[];
intervalMs: number;
@@ -86,6 +96,7 @@ type StartDiagnosticHeartbeatOptions = {
getConfig?: () => OpenClawConfig;
emitMemorySample?: EmitDiagnosticMemorySample;
sampleLiveness?: SampleDiagnosticLiveness;
recoverStuckSession?: RecoverStuckSession;
};
let diagnosticLivenessMonitor: EventLoopDelayMonitor | null = null;
@@ -99,6 +110,20 @@ function loadCommandPollBackoffRuntime() {
return commandPollBackoffRuntimePromise;
}
function recoverStuckSession(params: {
sessionId?: string;
sessionKey?: string;
ageMs: number;
queueDepth?: number;
}) {
stuckSessionRecoveryRuntimePromise ??= import("./diagnostic-stuck-session-recovery.runtime.js");
void stuckSessionRecoveryRuntimePromise
.then(({ recoverStuckDiagnosticSession }) => recoverStuckDiagnosticSession(params))
.catch((err) => {
diag.warn(`stuck session recovery unavailable: ${String(err)}`);
});
}
function getDiagnosticWorkSnapshot(): DiagnosticWorkSnapshot {
let activeCount = 0;
let waitingCount = 0;
@@ -659,6 +684,12 @@ export function startDiagnosticHeartbeat(
state: state.state,
ageMs,
});
void (opts?.recoverStuckSession ?? recoverStuckSession)({
sessionId: state.sessionId,
sessionKey: state.sessionKey,
ageMs,
queueDepth: state.queueDepth,
});
}
}
}, 30_000);

View File

@@ -26,9 +26,11 @@ let enqueueCommand: CommandQueueModule["enqueueCommand"];
let enqueueCommandInLane: CommandQueueModule["enqueueCommandInLane"];
let GatewayDrainingError: CommandQueueModule["GatewayDrainingError"];
let getActiveTaskCount: CommandQueueModule["getActiveTaskCount"];
let getCommandLaneSnapshot: CommandQueueModule["getCommandLaneSnapshot"];
let getQueueSize: CommandQueueModule["getQueueSize"];
let markGatewayDraining: CommandQueueModule["markGatewayDraining"];
let resetAllLanes: CommandQueueModule["resetAllLanes"];
let resetCommandLane: CommandQueueModule["resetCommandLane"];
let resetCommandQueueStateForTest: CommandQueueModule["resetCommandQueueStateForTest"];
let setCommandLaneConcurrency: CommandQueueModule["setCommandLaneConcurrency"];
let waitForActiveTasks: CommandQueueModule["waitForActiveTasks"];
@@ -64,9 +66,11 @@ describe("command queue", () => {
enqueueCommandInLane,
GatewayDrainingError,
getActiveTaskCount,
getCommandLaneSnapshot,
getQueueSize,
markGatewayDraining,
resetAllLanes,
resetCommandLane,
resetCommandQueueStateForTest,
setCommandLaneConcurrency,
waitForActiveTasks,
@@ -282,6 +286,69 @@ describe("command queue", () => {
expect(task2Ran).toBe(true);
});
it("resetCommandLane releases one stuck lane and drains its queued work", async () => {
const lane = `reset-lane-${Date.now()}-${Math.random().toString(16).slice(2)}`;
const otherLane = `reset-lane-other-${Date.now()}-${Math.random().toString(16).slice(2)}`;
setCommandLaneConcurrency(lane, 1);
setCommandLaneConcurrency(otherLane, 1);
const blocker = createDeferred();
const otherBlocker = createDeferred();
const first = enqueueCommandInLane(lane, async () => {
await blocker.promise;
return "first";
});
const other = enqueueCommandInLane(otherLane, async () => {
await otherBlocker.promise;
return "other";
});
let secondRan = false;
const second = enqueueCommandInLane(lane, async () => {
secondRan = true;
return "second";
});
expect(secondRan).toBe(false);
expect(getActiveTaskCount()).toBe(2);
expect(resetCommandLane(lane)).toBe(1);
await expect(second).resolves.toBe("second");
expect(secondRan).toBe(true);
expect(getQueueSize(lane)).toBe(0);
expect(getQueueSize(otherLane)).toBe(1);
blocker.resolve();
otherBlocker.resolve();
await expect(first).resolves.toBe("first");
await expect(other).resolves.toBe("other");
});
it("getCommandLaneSnapshot reports active and queued work for one lane", async () => {
const lane = `snapshot-lane-${Date.now()}-${Math.random().toString(16).slice(2)}`;
setCommandLaneConcurrency(lane, 1);
const blocker = createDeferred();
const first = enqueueCommandInLane(lane, async () => {
await blocker.promise;
return "first";
});
const second = enqueueCommandInLane(lane, async () => "second");
expect(getCommandLaneSnapshot(lane)).toMatchObject({
lane,
activeCount: 1,
queuedCount: 1,
maxConcurrent: 1,
draining: false,
generation: 0,
});
blocker.resolve();
await expect(first).resolves.toBe("first");
await expect(second).resolves.toBe("second");
});
it("waitForActiveTasks ignores tasks that start after the call", async () => {
const lane = `drain-snapshot-${Date.now()}-${Math.random().toString(16).slice(2)}`;
setCommandLaneConcurrency(lane, 2);

View File

@@ -51,6 +51,15 @@ type LaneState = {
generation: number;
};
export type CommandLaneSnapshot = {
lane: string;
queuedCount: number;
activeCount: number;
maxConcurrent: number;
draining: boolean;
generation: number;
};
type ActiveTaskWaiter = {
activeTaskIds: Set<number>;
resolve: (value: { drained: boolean }) => void;
@@ -287,6 +296,29 @@ export function getQueueSize(lane: string = CommandLane.Main) {
return getLaneDepth(state);
}
export function getCommandLaneSnapshot(lane: string = CommandLane.Main): CommandLaneSnapshot {
const resolved = normalizeLane(lane);
const state = getQueueState().lanes.get(resolved);
if (!state) {
return {
lane: resolved,
queuedCount: 0,
activeCount: 0,
maxConcurrent: 1,
draining: false,
generation: 0,
};
}
return {
lane: resolved,
queuedCount: state.queue.length,
activeCount: state.activeTaskIds.size,
maxConcurrent: state.maxConcurrent,
draining: state.draining,
generation: state.generation,
};
}
export function getTotalQueueSize() {
let total = 0;
for (const s of getQueueState().lanes.values()) {
@@ -309,6 +341,28 @@ export function clearCommandLane(lane: string = CommandLane.Main) {
return removed;
}
/**
* Force a single lane back to idle and immediately pump any queued entries.
* Used only by recovery paths after the owner has already attempted to abort
* the active work; stale completions from the previous generation are ignored.
*/
export function resetCommandLane(lane: string = CommandLane.Main): number {
const cleaned = normalizeLane(lane);
const state = getQueueState().lanes.get(cleaned);
if (!state) {
return 0;
}
const released = state.activeTaskIds.size;
state.generation += 1;
state.activeTaskIds.clear();
state.draining = false;
if (state.queue.length > 0) {
drainLane(cleaned);
}
notifyActiveTaskWaiters();
return released;
}
/**
* Test-only hard reset that discards all queue state, including preserved
* queued work from previous generations. Use this when a suite needs an