From c500e8704f4efc723758dd75f1b6689727832a0b Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 28 Apr 2026 20:36:40 +0100 Subject: [PATCH] fix(gateway): recover stale session lanes --- CHANGELOG.md | 1 + docs/concepts/agent-loop.md | 1 + docs/concepts/queue.md | 1 + src/agents/pi-embedded-runner/runs.test.ts | 19 ++ src/agents/pi-embedded-runner/runs.ts | 36 +++ .../reply/reply-run-registry.test.ts | 34 +++ src/auto-reply/reply/reply-run-registry.ts | 9 + ...stuck-session-recovery.integration.test.ts | 90 +++++++ ...tic-stuck-session-recovery.runtime.test.ts | 251 ++++++++++++++++++ ...agnostic-stuck-session-recovery.runtime.ts | 120 +++++++++ src/logging/diagnostic.test.ts | 20 +- src/logging/diagnostic.ts | 31 +++ src/process/command-queue.test.ts | 67 +++++ src/process/command-queue.ts | 54 ++++ 14 files changed, 729 insertions(+), 5 deletions(-) create mode 100644 src/logging/diagnostic-stuck-session-recovery.integration.test.ts create mode 100644 src/logging/diagnostic-stuck-session-recovery.runtime.test.ts create mode 100644 src/logging/diagnostic-stuck-session-recovery.runtime.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index d9410fd3117..9ae256e73dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ Docs: https://docs.openclaw.ai - Channels/Discord: suppress duplicate gateway monitors when multiple enabled accounts resolve to the same bot token, preferring config tokens over default env fallback and reporting skipped duplicates as disabled. Supersedes #73608. Thanks @kagura-agent. - Channels/Discord: ignore stale route-shaped conversation bindings after a Discord channel is reconfigured to another agent, while preserving explicit focus and subagent bindings. Fixes #73626. Thanks @ramitrkar-hash. +- Gateway/sessions: add conservative stuck-session recovery that releases only stale session lanes while active embedded runs, reply operations, and lane tasks remain serialized, so queued follow-ups can drain without aborting legitimate long-running turns. Refs #73581, #73655, #73652, #73705, #73647, #73602, #73592, and #73601. Thanks @WS-Q0758, @bryangauvin, @spenceryang1996-dot, @bmilne1981, @mattmcintyre, @Vksh07, and @Spolen23. - NVIDIA/NIM: persist the `NVIDIA_API_KEY` provider marker and mark bundled NVIDIA Chat Completions models as string-content compatible, so NIM models load from `models.json` and OpenAI-compatible subagent calls send plain text content. Fixes #73013 and #50107; refs #73014. Thanks @bautrey, @iot2edge, @ifearghal, and @futhgar. - Channels/Discord: let text-only configs drop the `GuildVoiceStates` gateway intent and expose a bounded `/gateway/bot` metadata timeout with rate-limited fallback logs, reducing idle CPU and warning floods. Fixes #73709 and #73585. Thanks @sanchezm86 and @trac3r00. - Agents/sessions: mark same-turn `sessions_send` and A2A reply prompts with an inter-session `isUser=false` envelope before they reach the model, so foreign session output no longer lands as bare active user text. Fixes #73702; refs #73698, #73609, #73595, and #73622. Thanks @alvelda. diff --git a/docs/concepts/agent-loop.md b/docs/concepts/agent-loop.md index ad63d4e4b03..0ad8e765211 100644 --- a/docs/concepts/agent-loop.md +++ b/docs/concepts/agent-loop.md @@ -162,6 +162,7 @@ surfaces, while Codex native hooks remain a separate lower-level Codex mechanism - `agent.wait` default: 30s (just the wait). `timeoutMs` param overrides. - Agent runtime: `agents.defaults.timeoutSeconds` default 172800s (48 hours); enforced in `runEmbeddedPiAgent` abort timer. +- Stuck-session recovery: with diagnostics enabled, `diagnostics.stuckSessionWarnMs` detects long `processing` sessions. Active embedded runs, active reply operations, and active session-lane tasks remain warning-only by default; if diagnostics show no active work for the session, the watchdog releases the affected session lane so queued startup work can drain. - Model idle timeout: OpenClaw aborts a model request when no response chunks arrive before the idle window. `models.providers..timeoutSeconds` extends this idle watchdog for slow local/self-hosted providers; otherwise OpenClaw uses `agents.defaults.timeoutSeconds` when configured, capped at 120s by default. Cron-triggered runs with no explicit model or agent timeout disable the idle watchdog and rely on the cron outer timeout. - Provider HTTP request timeout: `models.providers..timeoutSeconds` applies to that provider's model HTTP fetches, including connect, headers, body, SDK request timeout, total guarded-fetch abort handling, and model stream idle watchdog. Use this for slow local/self-hosted providers such as Ollama before raising the whole agent runtime timeout. diff --git a/docs/concepts/queue.md b/docs/concepts/queue.md index fc92c82b9af..81763671a06 100644 --- a/docs/concepts/queue.md +++ b/docs/concepts/queue.md @@ -85,6 +85,7 @@ Defaults: `debounceMs: 1000`, `cap: 20`, `drop: summarize`. - If commands seem stuck, enable verbose logs and look for “queued for …ms” lines to confirm the queue is draining. - If you need queue depth, enable verbose logs and watch for queue timing lines. +- When diagnostics are enabled, sessions that remain in `processing` past `diagnostics.stuckSessionWarnMs` log a stuck-session warning. Active embedded runs, active reply operations, and active lane tasks remain warning-only by default; stale startup bookkeeping with no active session work can release the affected session lane so queued work drains. ## Related diff --git a/src/agents/pi-embedded-runner/runs.test.ts b/src/agents/pi-embedded-runner/runs.test.ts index 5297d65dc89..23d8d1fe3ab 100644 --- a/src/agents/pi-embedded-runner/runs.test.ts +++ b/src/agents/pi-embedded-runner/runs.test.ts @@ -6,7 +6,9 @@ import { clearActiveEmbeddedRun, consumeEmbeddedRunModelSwitch, getActiveEmbeddedRunSnapshot, + isEmbeddedPiRunHandleActive, requestEmbeddedRunModelSwitch, + resolveActiveEmbeddedRunHandleSessionId, setActiveEmbeddedRun, updateActiveEmbeddedRunSnapshot, waitForActiveEmbeddedRuns, @@ -124,6 +126,23 @@ describe("pi-embedded runner run registry", () => { } }); + it("tracks actual embedded handles separately from reply-operation ownership", () => { + const handle = createRunHandle(); + + expect(isEmbeddedPiRunHandleActive("session-a")).toBe(false); + expect(resolveActiveEmbeddedRunHandleSessionId("agent:main:main")).toBeUndefined(); + + setActiveEmbeddedRun("session-a", handle, "agent:main:main"); + + expect(isEmbeddedPiRunHandleActive("session-a")).toBe(true); + expect(resolveActiveEmbeddedRunHandleSessionId("agent:main:main")).toBe("session-a"); + + clearActiveEmbeddedRun("session-a", handle, "agent:main:main"); + + expect(isEmbeddedPiRunHandleActive("session-a")).toBe(false); + expect(resolveActiveEmbeddedRunHandleSessionId("agent:main:main")).toBeUndefined(); + }); + it("tracks and clears per-session transcript snapshots for active runs", () => { const handle = createRunHandle(); diff --git a/src/agents/pi-embedded-runner/runs.ts b/src/agents/pi-embedded-runner/runs.ts index 5965c3ef070..58448b5abca 100644 --- a/src/agents/pi-embedded-runner/runs.ts +++ b/src/agents/pi-embedded-runner/runs.ts @@ -1,6 +1,7 @@ import { abortActiveReplyRuns, abortReplyRunBySessionId, + forceClearReplyRunBySessionId, isReplyRunActiveForSessionId, isReplyRunStreamingForSessionId, queueReplyRunMessage, @@ -157,6 +158,14 @@ export function isEmbeddedPiRunActive(sessionId: string): boolean { return active; } +export function isEmbeddedPiRunHandleActive(sessionId: string): boolean { + const active = ACTIVE_EMBEDDED_RUNS.has(sessionId); + if (active) { + diag.debug(`run handle active check: sessionId=${sessionId} active=true`); + } + return active; +} + export function isEmbeddedPiRunStreaming(sessionId: string): boolean { const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId); if (!handle) { @@ -165,6 +174,14 @@ export function isEmbeddedPiRunStreaming(sessionId: string): boolean { return handle.isStreaming(); } +export function resolveActiveEmbeddedRunHandleSessionId(sessionKey: string): string | undefined { + const normalizedSessionKey = sessionKey.trim(); + if (!normalizedSessionKey) { + return undefined; + } + return ACTIVE_EMBEDDED_RUN_SESSION_IDS_BY_KEY.get(normalizedSessionKey); +} + export function resolveActiveEmbeddedRunSessionId(sessionKey: string): string | undefined { const normalizedSessionKey = sessionKey.trim(); if (!normalizedSessionKey) { @@ -355,6 +372,25 @@ export function clearActiveEmbeddedRun( } } +export function forceClearEmbeddedPiRun( + sessionId: string, + sessionKey?: string, + reason = "stuck_recovery", +): boolean { + let cleared = false; + if (ACTIVE_EMBEDDED_RUNS.has(sessionId)) { + ACTIVE_EMBEDDED_RUNS.delete(sessionId); + ACTIVE_EMBEDDED_RUN_SNAPSHOTS.delete(sessionId); + EMBEDDED_RUN_MODEL_SWITCH_REQUESTS.delete(sessionId); + clearActiveRunSessionKeys(sessionId, sessionKey); + logSessionStateChange({ sessionId, sessionKey, state: "idle", reason }); + notifyEmbeddedRunEnded(sessionId); + cleared = true; + } + const cause = new Error(`Embedded run force-cleared by ${reason}`); + return forceClearReplyRunBySessionId(sessionId, cause) || cleared; +} + export const __testing = { resetActiveEmbeddedRuns() { for (const waiters of EMBEDDED_RUN_WAITERS.values()) { diff --git a/src/auto-reply/reply/reply-run-registry.test.ts b/src/auto-reply/reply/reply-run-registry.test.ts index 0f887f821f7..848cb368f21 100644 --- a/src/auto-reply/reply/reply-run-registry.test.ts +++ b/src/auto-reply/reply/reply-run-registry.test.ts @@ -3,6 +3,7 @@ import { __testing, abortActiveReplyRuns, createReplyOperation, + forceClearReplyRunBySessionId, isReplyRunActiveForSessionId, queueReplyRunMessage, replyRunRegistry, @@ -65,6 +66,39 @@ describe("reply run registry", () => { expect(replyRunRegistry.isActive("agent:main:main")).toBe(false); }); + it("force-clears a running operation after abort without backend cleanup", async () => { + vi.useFakeTimers(); + try { + const cancel = vi.fn(); + const operation = createReplyOperation({ + sessionKey: "agent:main:main", + sessionId: "session-running", + resetTriggered: false, + }); + operation.attachBackend({ + kind: "embedded", + cancel, + isStreaming: () => true, + }); + operation.setPhase("running"); + + operation.abortByUser(); + const waitPromise = waitForReplyRunEndBySessionId("session-running", 1_000); + + expect(operation.result).toEqual({ kind: "aborted", code: "aborted_by_user" }); + expect(cancel).toHaveBeenCalledWith("user_abort"); + expect(isReplyRunActiveForSessionId("session-running")).toBe(true); + + expect(forceClearReplyRunBySessionId("session-running", new Error("stuck"))).toBe(true); + + expect(isReplyRunActiveForSessionId("session-running")).toBe(false); + await expect(waitPromise).resolves.toBe(true); + } finally { + await vi.runOnlyPendingTimersAsync(); + vi.useRealTimers(); + } + }); + it("queues messages only through the active running backend", async () => { const queueMessage = vi.fn(async () => {}); const operation = createReplyOperation({ diff --git a/src/auto-reply/reply/reply-run-registry.ts b/src/auto-reply/reply/reply-run-registry.ts index 3b96b56af54..f0c0127c872 100644 --- a/src/auto-reply/reply/reply-run-registry.ts +++ b/src/auto-reply/reply/reply-run-registry.ts @@ -479,6 +479,15 @@ export function abortReplyRunBySessionId(sessionId: string): boolean { return true; } +export function forceClearReplyRunBySessionId(sessionId: string, cause?: unknown): boolean { + const operation = resolveReplyRunForCurrentSessionId(sessionId); + if (!operation) { + return false; + } + operation.fail("run_failed", cause); + return true; +} + export function waitForReplyRunEndBySessionId( sessionId: string, timeoutMs = 15_000, diff --git a/src/logging/diagnostic-stuck-session-recovery.integration.test.ts b/src/logging/diagnostic-stuck-session-recovery.integration.test.ts new file mode 100644 index 00000000000..a0ab213b4b4 --- /dev/null +++ b/src/logging/diagnostic-stuck-session-recovery.integration.test.ts @@ -0,0 +1,90 @@ +import { afterEach, describe, expect, it } from "vitest"; +import { resolveEmbeddedSessionLane } from "../agents/pi-embedded-runner/lanes.js"; +import { + __testing as replyRunTesting, + createReplyOperation, +} from "../auto-reply/reply/reply-run-registry.js"; +import { + enqueueCommandInLane, + getQueueSize, + resetCommandLane, + resetCommandQueueStateForTest, +} from "../process/command-queue.js"; +import { + __testing as recoveryTesting, + recoverStuckDiagnosticSession, +} from "./diagnostic-stuck-session-recovery.runtime.js"; + +function delay(ms: number): Promise<"blocked"> { + return new Promise((resolve) => setTimeout(() => resolve("blocked"), ms)); +} + +describe("stuck session recovery integration", () => { + afterEach(() => { + recoveryTesting.resetRecoveriesInFlight(); + replyRunTesting.resetReplyRunRegistry(); + resetCommandQueueStateForTest(); + }); + + it("does not reset a blocked lane while a reply operation is still active", async () => { + const sessionKey = "agent:main:active-reply"; + const sessionId = "active-reply-session"; + const lane = resolveEmbeddedSessionLane(sessionKey); + + void enqueueCommandInLane(lane, () => new Promise(() => {}), { + warnAfterMs: Number.MAX_SAFE_INTEGER, + }); + const queued = enqueueCommandInLane(lane, async () => "drained", { + warnAfterMs: Number.MAX_SAFE_INTEGER, + }); + const operation = createReplyOperation({ + sessionKey, + sessionId, + resetTriggered: false, + }); + + expect(getQueueSize(lane)).toBe(2); + + await recoverStuckDiagnosticSession({ + sessionId, + sessionKey, + ageMs: 180_000, + queueDepth: 1, + }); + + await expect(Promise.race([queued, delay(100)])).resolves.toBe("blocked"); + expect(getQueueSize(lane)).toBe(2); + + operation.complete(); + expect(resetCommandLane(lane)).toBe(1); + await expect(queued).resolves.toBe("drained"); + }); + + it("does not reset a blocked lane while unregistered lane work is still active", async () => { + const sessionKey = "agent:main:unregistered-work"; + const sessionId = "unregistered-work-session"; + const lane = resolveEmbeddedSessionLane(sessionKey); + + void enqueueCommandInLane(lane, () => new Promise(() => {}), { + warnAfterMs: Number.MAX_SAFE_INTEGER, + }); + const queued = enqueueCommandInLane(lane, async () => "drained", { + warnAfterMs: Number.MAX_SAFE_INTEGER, + }); + + expect(getQueueSize(lane)).toBe(2); + + await recoverStuckDiagnosticSession({ + sessionId, + sessionKey, + ageMs: 180_000, + queueDepth: 1, + }); + + await expect(Promise.race([queued, delay(100)])).resolves.toBe("blocked"); + expect(getQueueSize(lane)).toBe(2); + + expect(resetCommandLane(lane)).toBe(1); + await expect(queued).resolves.toBe("drained"); + }); +}); diff --git a/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts b/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts new file mode 100644 index 00000000000..ea948c329e9 --- /dev/null +++ b/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts @@ -0,0 +1,251 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const mocks = vi.hoisted(() => ({ + abortEmbeddedPiRun: vi.fn(), + forceClearEmbeddedPiRun: vi.fn(), + isEmbeddedPiRunActive: vi.fn(), + isEmbeddedPiRunHandleActive: vi.fn(), + getCommandLaneSnapshot: vi.fn(), + resetCommandLane: vi.fn(), + resolveActiveEmbeddedRunSessionId: vi.fn(), + resolveActiveEmbeddedRunHandleSessionId: vi.fn(), + resolveEmbeddedSessionLane: vi.fn((key: string) => `session:${key}`), + waitForEmbeddedPiRunEnd: vi.fn(), + diag: { + debug: vi.fn(), + warn: vi.fn(), + }, +})); + +vi.mock("../agents/pi-embedded-runner/runs.js", () => ({ + abortEmbeddedPiRun: mocks.abortEmbeddedPiRun, + forceClearEmbeddedPiRun: mocks.forceClearEmbeddedPiRun, + isEmbeddedPiRunActive: mocks.isEmbeddedPiRunActive, + isEmbeddedPiRunHandleActive: mocks.isEmbeddedPiRunHandleActive, + resolveActiveEmbeddedRunSessionId: mocks.resolveActiveEmbeddedRunSessionId, + resolveActiveEmbeddedRunHandleSessionId: mocks.resolveActiveEmbeddedRunHandleSessionId, + waitForEmbeddedPiRunEnd: mocks.waitForEmbeddedPiRunEnd, +})); + +vi.mock("../agents/pi-embedded-runner/lanes.js", () => ({ + resolveEmbeddedSessionLane: mocks.resolveEmbeddedSessionLane, +})); + +vi.mock("../process/command-queue.js", () => ({ + getCommandLaneSnapshot: mocks.getCommandLaneSnapshot, + resetCommandLane: mocks.resetCommandLane, +})); + +vi.mock("./diagnostic-runtime.js", () => ({ + diagnosticLogger: mocks.diag, +})); + +import { + __testing, + recoverStuckDiagnosticSession, +} from "./diagnostic-stuck-session-recovery.runtime.js"; + +function resetMocks() { + __testing.resetRecoveriesInFlight(); + mocks.abortEmbeddedPiRun.mockReset(); + mocks.forceClearEmbeddedPiRun.mockReset(); + mocks.isEmbeddedPiRunActive.mockReset(); + mocks.isEmbeddedPiRunHandleActive.mockReset(); + mocks.getCommandLaneSnapshot.mockReset(); + mocks.getCommandLaneSnapshot.mockReturnValue({ + lane: "session:agent:main:main", + queuedCount: 1, + activeCount: 0, + maxConcurrent: 1, + draining: false, + generation: 0, + }); + mocks.resetCommandLane.mockReset(); + mocks.resolveActiveEmbeddedRunSessionId.mockReset(); + mocks.resolveActiveEmbeddedRunHandleSessionId.mockReset(); + mocks.resolveEmbeddedSessionLane.mockClear(); + mocks.waitForEmbeddedPiRunEnd.mockReset(); + mocks.diag.debug.mockReset(); + mocks.diag.warn.mockReset(); +} + +describe("stuck session recovery", () => { + beforeEach(() => { + resetMocks(); + }); + + it("does not abort an active embedded run by default", async () => { + mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue("session-1"); + + await recoverStuckDiagnosticSession({ + sessionId: "session-1", + sessionKey: "agent:main:main", + ageMs: 180_000, + queueDepth: 1, + }); + + expect(mocks.abortEmbeddedPiRun).not.toHaveBeenCalled(); + expect(mocks.waitForEmbeddedPiRunEnd).not.toHaveBeenCalled(); + expect(mocks.forceClearEmbeddedPiRun).not.toHaveBeenCalled(); + expect(mocks.resetCommandLane).not.toHaveBeenCalled(); + }); + + it("aborts an active embedded run when active abort recovery is enabled", async () => { + mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue("session-1"); + mocks.abortEmbeddedPiRun.mockReturnValue(true); + mocks.waitForEmbeddedPiRunEnd.mockResolvedValue(true); + + await recoverStuckDiagnosticSession({ + sessionId: "session-1", + sessionKey: "agent:main:main", + ageMs: 180_000, + allowActiveAbort: true, + }); + + expect(mocks.abortEmbeddedPiRun).toHaveBeenCalledWith("session-1"); + expect(mocks.waitForEmbeddedPiRunEnd).toHaveBeenCalledWith("session-1", 15_000); + expect(mocks.forceClearEmbeddedPiRun).not.toHaveBeenCalled(); + expect(mocks.resetCommandLane).not.toHaveBeenCalled(); + }); + + it("force-clears and releases the session lane when abort cleanup does not drain", async () => { + mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue("session-1"); + mocks.abortEmbeddedPiRun.mockReturnValue(true); + mocks.waitForEmbeddedPiRunEnd.mockResolvedValue(false); + mocks.resetCommandLane.mockReturnValue(1); + + await recoverStuckDiagnosticSession({ + sessionId: "session-1", + sessionKey: "agent:main:main", + ageMs: 240_000, + allowActiveAbort: true, + }); + + expect(mocks.forceClearEmbeddedPiRun).toHaveBeenCalledWith( + "session-1", + "agent:main:main", + "stuck_recovery", + ); + expect(mocks.resetCommandLane).toHaveBeenCalledWith("session:agent:main:main"); + }); + + it("force-clears and releases the session lane when an active run cannot be aborted", async () => { + mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue("session-1"); + mocks.abortEmbeddedPiRun.mockReturnValue(false); + mocks.resetCommandLane.mockReturnValue(1); + + await recoverStuckDiagnosticSession({ + sessionId: "session-1", + sessionKey: "agent:main:main", + ageMs: 240_000, + allowActiveAbort: true, + }); + + expect(mocks.waitForEmbeddedPiRunEnd).not.toHaveBeenCalled(); + expect(mocks.forceClearEmbeddedPiRun).toHaveBeenCalledWith( + "session-1", + "agent:main:main", + "stuck_recovery", + ); + expect(mocks.resetCommandLane).toHaveBeenCalledWith("session:agent:main:main"); + }); + + it("releases a stale session lane when diagnostics are processing but no active run exists", async () => { + mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue(undefined); + mocks.resetCommandLane.mockReturnValue(1); + + await recoverStuckDiagnosticSession({ + sessionId: "session-1", + sessionKey: "agent:main:main", + ageMs: 180_000, + }); + + expect(mocks.abortEmbeddedPiRun).not.toHaveBeenCalled(); + expect(mocks.resetCommandLane).toHaveBeenCalledWith("session:agent:main:main"); + }); + + it("does not release the session lane while reply work is active without an embedded handle", async () => { + mocks.resolveActiveEmbeddedRunSessionId.mockReturnValue("queued-reply-session"); + mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue(undefined); + mocks.isEmbeddedPiRunActive.mockReturnValue(true); + mocks.isEmbeddedPiRunHandleActive.mockReturnValue(false); + + await recoverStuckDiagnosticSession({ + sessionId: "queued-reply-session", + sessionKey: "agent:main:main", + ageMs: 180_000, + queueDepth: 1, + }); + + expect(mocks.abortEmbeddedPiRun).not.toHaveBeenCalled(); + expect(mocks.forceClearEmbeddedPiRun).not.toHaveBeenCalled(); + expect(mocks.resetCommandLane).not.toHaveBeenCalled(); + }); + + it("does not release the session lane while unregistered lane work is active", async () => { + mocks.resolveActiveEmbeddedRunSessionId.mockReturnValue(undefined); + mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue(undefined); + mocks.isEmbeddedPiRunActive.mockReturnValue(false); + mocks.isEmbeddedPiRunHandleActive.mockReturnValue(false); + mocks.getCommandLaneSnapshot.mockReturnValue({ + lane: "session:agent:main:main", + queuedCount: 1, + activeCount: 1, + maxConcurrent: 1, + draining: false, + generation: 0, + }); + + await recoverStuckDiagnosticSession({ + sessionId: "unregistered-work-session", + sessionKey: "agent:main:main", + ageMs: 180_000, + queueDepth: 1, + }); + + expect(mocks.abortEmbeddedPiRun).not.toHaveBeenCalled(); + expect(mocks.forceClearEmbeddedPiRun).not.toHaveBeenCalled(); + expect(mocks.resetCommandLane).not.toHaveBeenCalled(); + }); + + it("releases a stale session-id lane when no session key is available", async () => { + mocks.isEmbeddedPiRunHandleActive.mockReturnValue(false); + mocks.resetCommandLane.mockReturnValue(1); + + await recoverStuckDiagnosticSession({ + sessionId: "session-only", + ageMs: 180_000, + }); + + expect(mocks.abortEmbeddedPiRun).not.toHaveBeenCalled(); + expect(mocks.resolveEmbeddedSessionLane).toHaveBeenCalledWith("session-only"); + expect(mocks.resetCommandLane).toHaveBeenCalledWith("session:session-only"); + }); + + it("coalesces duplicate recovery attempts for the same session", async () => { + let resolveWait!: (value: boolean) => void; + const waitPromise = new Promise((resolve) => { + resolveWait = resolve; + }); + mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue("session-1"); + mocks.abortEmbeddedPiRun.mockReturnValue(true); + mocks.waitForEmbeddedPiRunEnd.mockReturnValue(waitPromise); + + const first = recoverStuckDiagnosticSession({ + sessionId: "session-1", + sessionKey: "agent:main:main", + ageMs: 180_000, + allowActiveAbort: true, + }); + await recoverStuckDiagnosticSession({ + sessionId: "session-1", + sessionKey: "agent:main:main", + ageMs: 210_000, + allowActiveAbort: true, + }); + + expect(mocks.abortEmbeddedPiRun).toHaveBeenCalledTimes(1); + resolveWait(true); + await first; + }); +}); diff --git a/src/logging/diagnostic-stuck-session-recovery.runtime.ts b/src/logging/diagnostic-stuck-session-recovery.runtime.ts new file mode 100644 index 00000000000..9d7fd0d6ae1 --- /dev/null +++ b/src/logging/diagnostic-stuck-session-recovery.runtime.ts @@ -0,0 +1,120 @@ +import { resolveEmbeddedSessionLane } from "../agents/pi-embedded-runner/lanes.js"; +import { + abortEmbeddedPiRun, + forceClearEmbeddedPiRun, + isEmbeddedPiRunActive, + isEmbeddedPiRunHandleActive, + resolveActiveEmbeddedRunSessionId, + resolveActiveEmbeddedRunHandleSessionId, + waitForEmbeddedPiRunEnd, +} from "../agents/pi-embedded-runner/runs.js"; +import { getCommandLaneSnapshot, resetCommandLane } from "../process/command-queue.js"; +import { diagnosticLogger as diag } from "./diagnostic-runtime.js"; + +const STUCK_SESSION_ABORT_SETTLE_MS = 15_000; +const recoveriesInFlight = new Set(); + +export type StuckSessionRecoveryParams = { + sessionId?: string; + sessionKey?: string; + ageMs: number; + queueDepth?: number; + allowActiveAbort?: boolean; +}; + +function recoveryKey(params: StuckSessionRecoveryParams): string | undefined { + return params.sessionKey?.trim() || params.sessionId?.trim() || undefined; +} + +export async function recoverStuckDiagnosticSession( + params: StuckSessionRecoveryParams, +): Promise { + const key = recoveryKey(params); + if (!key || recoveriesInFlight.has(key)) { + return; + } + + recoveriesInFlight.add(key); + try { + const fallbackActiveSessionId = + params.sessionId && isEmbeddedPiRunHandleActive(params.sessionId) + ? params.sessionId + : undefined; + const activeSessionId = params.sessionKey + ? (resolveActiveEmbeddedRunHandleSessionId(params.sessionKey) ?? fallbackActiveSessionId) + : fallbackActiveSessionId; + const activeWorkSessionId = params.sessionKey + ? (resolveActiveEmbeddedRunSessionId(params.sessionKey) ?? params.sessionId) + : params.sessionId; + const laneKey = params.sessionKey?.trim() || params.sessionId?.trim(); + const sessionLane = laneKey ? resolveEmbeddedSessionLane(laneKey) : null; + let aborted = false; + let drained = true; + + if (activeSessionId) { + if (params.allowActiveAbort !== true) { + diag.debug( + `stuck session recovery skipped active abort: sessionId=${ + params.sessionId ?? activeSessionId + } sessionKey=${params.sessionKey ?? "unknown"} age=${Math.round( + params.ageMs / 1000, + )}s queueDepth=${params.queueDepth ?? 0}`, + ); + return; + } + aborted = abortEmbeddedPiRun(activeSessionId); + if (aborted) { + drained = await waitForEmbeddedPiRunEnd(activeSessionId, STUCK_SESSION_ABORT_SETTLE_MS); + } + if (!aborted || !drained) { + forceClearEmbeddedPiRun(activeSessionId, params.sessionKey, "stuck_recovery"); + } + } + + if (!activeSessionId && activeWorkSessionId && isEmbeddedPiRunActive(activeWorkSessionId)) { + diag.debug( + `stuck session recovery skipped lane reset: active reply work sessionId=${activeWorkSessionId} sessionKey=${ + params.sessionKey ?? "unknown" + } age=${Math.round(params.ageMs / 1000)}s queueDepth=${params.queueDepth ?? 0}`, + ); + return; + } + + if (!activeSessionId && sessionLane) { + const laneSnapshot = getCommandLaneSnapshot(sessionLane); + if (laneSnapshot.activeCount > 0) { + diag.debug( + `stuck session recovery skipped lane reset: active lane task lane=${sessionLane} active=${laneSnapshot.activeCount} queued=${laneSnapshot.queuedCount} sessionId=${ + params.sessionId ?? "unknown" + } sessionKey=${params.sessionKey ?? "unknown"} age=${Math.round(params.ageMs / 1000)}s`, + ); + return; + } + } + + const released = + sessionLane && (!activeSessionId || !aborted || !drained) ? resetCommandLane(sessionLane) : 0; + + if (aborted || released > 0) { + diag.warn( + `stuck session recovery: sessionId=${params.sessionId ?? activeSessionId ?? "unknown"} sessionKey=${ + params.sessionKey ?? "unknown" + } age=${Math.round(params.ageMs / 1000)}s aborted=${aborted} drained=${drained} released=${released}`, + ); + } + } catch (err) { + diag.warn( + `stuck session recovery failed: sessionId=${params.sessionId ?? "unknown"} sessionKey=${ + params.sessionKey ?? "unknown" + } err=${String(err)}`, + ); + } finally { + recoveriesInFlight.delete(key); + } +} + +export const __testing = { + resetRecoveriesInFlight(): void { + recoveriesInFlight.clear(); + }, +}; diff --git a/src/logging/diagnostic.test.ts b/src/logging/diagnostic.test.ts index b5eef251ed0..5984fa8839a 100644 --- a/src/logging/diagnostic.test.ts +++ b/src/logging/diagnostic.test.ts @@ -125,16 +125,20 @@ describe("stuck session diagnostics threshold", () => { it("uses the configured diagnostics.stuckSessionWarnMs threshold", () => { const events: Array<{ type: string }> = []; + const recoverStuckSession = vi.fn(); const unsubscribe = onDiagnosticEvent((event) => { events.push({ type: event.type }); }); try { - startDiagnosticHeartbeat({ - diagnostics: { - enabled: true, - stuckSessionWarnMs: 30_000, + startDiagnosticHeartbeat( + { + diagnostics: { + enabled: true, + stuckSessionWarnMs: 30_000, + }, }, - }); + { recoverStuckSession }, + ); logSessionStateChange({ sessionId: "s1", sessionKey: "main", state: "processing" }); vi.advanceTimersByTime(61_000); } finally { @@ -142,6 +146,12 @@ describe("stuck session diagnostics threshold", () => { } expect(events.filter((event) => event.type === "session.stuck")).toHaveLength(1); + expect(recoverStuckSession).toHaveBeenCalledWith({ + sessionId: "s1", + sessionKey: "main", + ageMs: expect.any(Number), + queueDepth: 0, + }); }); it("starts and stops the stability recorder with the heartbeat lifecycle", () => { diff --git a/src/logging/diagnostic.ts b/src/logging/diagnostic.ts index 3de591c3bcd..9c17030dd74 100644 --- a/src/logging/diagnostic.ts +++ b/src/logging/diagnostic.ts @@ -53,6 +53,9 @@ const DEFAULT_LIVENESS_WARN_COOLDOWN_MS = 120_000; let commandPollBackoffRuntimePromise: Promise< typeof import("../agents/command-poll-backoff.runtime.js") > | null = null; +let stuckSessionRecoveryRuntimePromise: Promise< + typeof import("./diagnostic-stuck-session-recovery.runtime.js") +> | null = null; type EmitDiagnosticMemorySample = typeof emitDiagnosticMemorySample; type EventLoopDelayMonitor = ReturnType; @@ -65,6 +68,13 @@ type DiagnosticWorkSnapshot = { queuedCount: number; }; +type RecoverStuckSession = (params: { + sessionId?: string; + sessionKey?: string; + ageMs: number; + queueDepth?: number; +}) => void | Promise; + type DiagnosticLivenessSample = { reasons: DiagnosticLivenessWarningReason[]; intervalMs: number; @@ -86,6 +96,7 @@ type StartDiagnosticHeartbeatOptions = { getConfig?: () => OpenClawConfig; emitMemorySample?: EmitDiagnosticMemorySample; sampleLiveness?: SampleDiagnosticLiveness; + recoverStuckSession?: RecoverStuckSession; }; let diagnosticLivenessMonitor: EventLoopDelayMonitor | null = null; @@ -99,6 +110,20 @@ function loadCommandPollBackoffRuntime() { return commandPollBackoffRuntimePromise; } +function recoverStuckSession(params: { + sessionId?: string; + sessionKey?: string; + ageMs: number; + queueDepth?: number; +}) { + stuckSessionRecoveryRuntimePromise ??= import("./diagnostic-stuck-session-recovery.runtime.js"); + void stuckSessionRecoveryRuntimePromise + .then(({ recoverStuckDiagnosticSession }) => recoverStuckDiagnosticSession(params)) + .catch((err) => { + diag.warn(`stuck session recovery unavailable: ${String(err)}`); + }); +} + function getDiagnosticWorkSnapshot(): DiagnosticWorkSnapshot { let activeCount = 0; let waitingCount = 0; @@ -659,6 +684,12 @@ export function startDiagnosticHeartbeat( state: state.state, ageMs, }); + void (opts?.recoverStuckSession ?? recoverStuckSession)({ + sessionId: state.sessionId, + sessionKey: state.sessionKey, + ageMs, + queueDepth: state.queueDepth, + }); } } }, 30_000); diff --git a/src/process/command-queue.test.ts b/src/process/command-queue.test.ts index 3466d3b8aa1..c08ec441874 100644 --- a/src/process/command-queue.test.ts +++ b/src/process/command-queue.test.ts @@ -26,9 +26,11 @@ let enqueueCommand: CommandQueueModule["enqueueCommand"]; let enqueueCommandInLane: CommandQueueModule["enqueueCommandInLane"]; let GatewayDrainingError: CommandQueueModule["GatewayDrainingError"]; let getActiveTaskCount: CommandQueueModule["getActiveTaskCount"]; +let getCommandLaneSnapshot: CommandQueueModule["getCommandLaneSnapshot"]; let getQueueSize: CommandQueueModule["getQueueSize"]; let markGatewayDraining: CommandQueueModule["markGatewayDraining"]; let resetAllLanes: CommandQueueModule["resetAllLanes"]; +let resetCommandLane: CommandQueueModule["resetCommandLane"]; let resetCommandQueueStateForTest: CommandQueueModule["resetCommandQueueStateForTest"]; let setCommandLaneConcurrency: CommandQueueModule["setCommandLaneConcurrency"]; let waitForActiveTasks: CommandQueueModule["waitForActiveTasks"]; @@ -64,9 +66,11 @@ describe("command queue", () => { enqueueCommandInLane, GatewayDrainingError, getActiveTaskCount, + getCommandLaneSnapshot, getQueueSize, markGatewayDraining, resetAllLanes, + resetCommandLane, resetCommandQueueStateForTest, setCommandLaneConcurrency, waitForActiveTasks, @@ -282,6 +286,69 @@ describe("command queue", () => { expect(task2Ran).toBe(true); }); + it("resetCommandLane releases one stuck lane and drains its queued work", async () => { + const lane = `reset-lane-${Date.now()}-${Math.random().toString(16).slice(2)}`; + const otherLane = `reset-lane-other-${Date.now()}-${Math.random().toString(16).slice(2)}`; + setCommandLaneConcurrency(lane, 1); + setCommandLaneConcurrency(otherLane, 1); + + const blocker = createDeferred(); + const otherBlocker = createDeferred(); + const first = enqueueCommandInLane(lane, async () => { + await blocker.promise; + return "first"; + }); + const other = enqueueCommandInLane(otherLane, async () => { + await otherBlocker.promise; + return "other"; + }); + + let secondRan = false; + const second = enqueueCommandInLane(lane, async () => { + secondRan = true; + return "second"; + }); + + expect(secondRan).toBe(false); + expect(getActiveTaskCount()).toBe(2); + expect(resetCommandLane(lane)).toBe(1); + + await expect(second).resolves.toBe("second"); + expect(secondRan).toBe(true); + expect(getQueueSize(lane)).toBe(0); + expect(getQueueSize(otherLane)).toBe(1); + + blocker.resolve(); + otherBlocker.resolve(); + await expect(first).resolves.toBe("first"); + await expect(other).resolves.toBe("other"); + }); + + it("getCommandLaneSnapshot reports active and queued work for one lane", async () => { + const lane = `snapshot-lane-${Date.now()}-${Math.random().toString(16).slice(2)}`; + setCommandLaneConcurrency(lane, 1); + + const blocker = createDeferred(); + const first = enqueueCommandInLane(lane, async () => { + await blocker.promise; + return "first"; + }); + const second = enqueueCommandInLane(lane, async () => "second"); + + expect(getCommandLaneSnapshot(lane)).toMatchObject({ + lane, + activeCount: 1, + queuedCount: 1, + maxConcurrent: 1, + draining: false, + generation: 0, + }); + + blocker.resolve(); + await expect(first).resolves.toBe("first"); + await expect(second).resolves.toBe("second"); + }); + it("waitForActiveTasks ignores tasks that start after the call", async () => { const lane = `drain-snapshot-${Date.now()}-${Math.random().toString(16).slice(2)}`; setCommandLaneConcurrency(lane, 2); diff --git a/src/process/command-queue.ts b/src/process/command-queue.ts index fd547f1f13d..361d7e0d090 100644 --- a/src/process/command-queue.ts +++ b/src/process/command-queue.ts @@ -51,6 +51,15 @@ type LaneState = { generation: number; }; +export type CommandLaneSnapshot = { + lane: string; + queuedCount: number; + activeCount: number; + maxConcurrent: number; + draining: boolean; + generation: number; +}; + type ActiveTaskWaiter = { activeTaskIds: Set; resolve: (value: { drained: boolean }) => void; @@ -287,6 +296,29 @@ export function getQueueSize(lane: string = CommandLane.Main) { return getLaneDepth(state); } +export function getCommandLaneSnapshot(lane: string = CommandLane.Main): CommandLaneSnapshot { + const resolved = normalizeLane(lane); + const state = getQueueState().lanes.get(resolved); + if (!state) { + return { + lane: resolved, + queuedCount: 0, + activeCount: 0, + maxConcurrent: 1, + draining: false, + generation: 0, + }; + } + return { + lane: resolved, + queuedCount: state.queue.length, + activeCount: state.activeTaskIds.size, + maxConcurrent: state.maxConcurrent, + draining: state.draining, + generation: state.generation, + }; +} + export function getTotalQueueSize() { let total = 0; for (const s of getQueueState().lanes.values()) { @@ -309,6 +341,28 @@ export function clearCommandLane(lane: string = CommandLane.Main) { return removed; } +/** + * Force a single lane back to idle and immediately pump any queued entries. + * Used only by recovery paths after the owner has already attempted to abort + * the active work; stale completions from the previous generation are ignored. + */ +export function resetCommandLane(lane: string = CommandLane.Main): number { + const cleaned = normalizeLane(lane); + const state = getQueueState().lanes.get(cleaned); + if (!state) { + return 0; + } + const released = state.activeTaskIds.size; + state.generation += 1; + state.activeTaskIds.clear(); + state.draining = false; + if (state.queue.length > 0) { + drainLane(cleaned); + } + notifyActiveTaskWaiters(); + return released; +} + /** * Test-only hard reset that discards all queue state, including preserved * queued work from previous generations. Use this when a suite needs an