diff --git a/CHANGELOG.md b/CHANGELOG.md index e83629989ff..4d9625e944d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ Docs: https://docs.openclaw.ai ### Fixes - Agents/sessions: emit a terminal lifecycle backstop when embedded timeout/error turns return without `agent_end`, so Gateway sessions no longer stay stuck in `running` after failover surfaces a timeout. Fixes #74607. Thanks @millerc79. +- Gateway/diagnostics: include stuck-session reason hints and recovery skip causes in warnings, so operators can tell whether a lane is waiting on active work, queued work, or stale bookkeeping. Thanks @vincentkoc. - Agents/Codex: bound embedded-run cleanup, trajectory flushing, and command-lane task timeouts after runtime failures, so Discord and other chat sessions return to idle instead of staying stuck in processing. Thanks @vincentkoc. - Heartbeat/exec: consume successful metadata-only async exec completions silently so Telegram and other chat surfaces no longer ask users for missing command logs after `No session found`. Fixes #74595. Thanks @gkoch02. - Web fetch: add a documented `tools.web.fetch.ssrfPolicy.allowIpv6UniqueLocalRange` opt-in and thread it through cache keys and DNS/IP checks so trusted fake-IP proxy stacks using `fc00::/7` can work without broad private-network access. Fixes #74351. Thanks @jeffrey701. diff --git a/src/infra/diagnostic-events.ts b/src/infra/diagnostic-events.ts index e00039fb980..769eb513a80 100644 --- a/src/infra/diagnostic-events.ts +++ b/src/infra/diagnostic-events.ts @@ -131,6 +131,7 @@ export type DiagnosticSessionStuckEvent = DiagnosticBaseEvent & { state: DiagnosticSessionState; ageMs: number; queueDepth?: number; + reason?: string; }; export type DiagnosticLaneEnqueueEvent = DiagnosticBaseEvent & { diff --git a/src/logging/diagnostic-stability.ts b/src/logging/diagnostic-stability.ts index ec213c76f03..493e67e32ed 100644 --- a/src/logging/diagnostic-stability.ts +++ b/src/logging/diagnostic-stability.ts @@ -239,6 +239,7 @@ function sanitizeDiagnosticEvent(event: DiagnosticEventPayload): DiagnosticStabi break; case "session.stuck": record.outcome = event.state; + assignReasonCode(record, event.reason); record.ageMs = event.ageMs; record.queueDepth = event.queueDepth; break; diff --git a/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts b/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts index e03bfe7cd2e..a5013f059ed 100644 --- a/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts +++ b/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts @@ -105,6 +105,10 @@ describe("stuck session recovery", () => { expect(mocks.waitForEmbeddedPiRunEnd).not.toHaveBeenCalled(); expect(mocks.forceClearEmbeddedPiRun).not.toHaveBeenCalled(); expect(mocks.resetCommandLane).not.toHaveBeenCalled(); + expect(mocks.diag.warn).toHaveBeenCalledWith( + expect.stringContaining("reason=active_embedded_run"), + ); + expect(mocks.diag.warn).toHaveBeenCalledWith(expect.stringContaining("action=observe_only")); }); it("aborts an active embedded run when active abort recovery is enabled", async () => { @@ -197,6 +201,12 @@ describe("stuck session recovery", () => { expect(mocks.abortEmbeddedPiRun).not.toHaveBeenCalled(); expect(mocks.forceClearEmbeddedPiRun).not.toHaveBeenCalled(); expect(mocks.resetCommandLane).not.toHaveBeenCalled(); + expect(mocks.diag.warn).toHaveBeenCalledWith( + expect.stringContaining("reason=active_reply_work"), + ); + expect(mocks.diag.warn).toHaveBeenCalledWith( + expect.stringContaining("activeSessionId=queued-reply-session"), + ); }); it("does not release the session lane while unregistered lane work is active", async () => { @@ -223,6 +233,26 @@ describe("stuck session recovery", () => { expect(mocks.abortEmbeddedPiRun).not.toHaveBeenCalled(); expect(mocks.forceClearEmbeddedPiRun).not.toHaveBeenCalled(); expect(mocks.resetCommandLane).not.toHaveBeenCalled(); + expect(mocks.diag.warn).toHaveBeenCalledWith( + expect.stringContaining("reason=active_lane_task"), + ); + expect(mocks.diag.warn).toHaveBeenCalledWith(expect.stringContaining("laneActive=1")); + }); + + it("reports when recovery finds no active work to release", async () => { + mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue(undefined); + mocks.resolveActiveEmbeddedRunSessionId.mockReturnValue(undefined); + mocks.isEmbeddedPiRunActive.mockReturnValue(false); + mocks.resetCommandLane.mockReturnValue(0); + + await recoverStuckDiagnosticSession({ + sessionId: "stale-session", + sessionKey: "agent:main:main", + ageMs: 180_000, + }); + + expect(mocks.resetCommandLane).toHaveBeenCalledWith("session:agent:main:main"); + expect(mocks.diag.warn).toHaveBeenCalledWith(expect.stringContaining("reason=no_active_work")); }); it("releases a stale session-id lane when no session key is available", async () => { diff --git a/src/logging/diagnostic-stuck-session-recovery.runtime.ts b/src/logging/diagnostic-stuck-session-recovery.runtime.ts index 00dd9bcd04f..b036e81e257 100644 --- a/src/logging/diagnostic-stuck-session-recovery.runtime.ts +++ b/src/logging/diagnostic-stuck-session-recovery.runtime.ts @@ -24,6 +24,31 @@ function recoveryKey(params: StuckSessionRecoveryParams): string | undefined { return params.sessionKey?.trim() || params.sessionId?.trim() || undefined; } +function formatRecoveryContext( + params: StuckSessionRecoveryParams, + extra?: { activeSessionId?: string; lane?: string; activeCount?: number; queuedCount?: number }, +): string { + const fields = [ + `sessionId=${params.sessionId ?? extra?.activeSessionId ?? "unknown"}`, + `sessionKey=${params.sessionKey ?? "unknown"}`, + `age=${Math.round(params.ageMs / 1000)}s`, + `queueDepth=${params.queueDepth ?? 0}`, + ]; + if (extra?.activeSessionId) { + fields.push(`activeSessionId=${extra.activeSessionId}`); + } + if (extra?.lane) { + fields.push(`lane=${extra.lane}`); + } + if (extra?.activeCount !== undefined) { + fields.push(`laneActive=${extra.activeCount}`); + } + if (extra?.queuedCount !== undefined) { + fields.push(`laneQueued=${extra.queuedCount}`); + } + return fields.join(" "); +} + export async function recoverStuckDiagnosticSession( params: StuckSessionRecoveryParams, ): Promise { @@ -51,12 +76,11 @@ export async function recoverStuckDiagnosticSession( if (activeSessionId) { if (params.allowActiveAbort !== true) { - diag.debug( - `stuck session recovery skipped active abort: sessionId=${ - params.sessionId ?? activeSessionId - } sessionKey=${params.sessionKey ?? "unknown"} age=${Math.round( - params.ageMs / 1000, - )}s queueDepth=${params.queueDepth ?? 0}`, + diag.warn( + `stuck session recovery skipped: reason=active_embedded_run action=observe_only ${formatRecoveryContext( + params, + { activeSessionId }, + )}`, ); return; } @@ -72,10 +96,11 @@ export async function recoverStuckDiagnosticSession( } if (!activeSessionId && activeWorkSessionId && isEmbeddedPiRunActive(activeWorkSessionId)) { - diag.debug( - `stuck session recovery skipped lane reset: active reply work sessionId=${activeWorkSessionId} sessionKey=${ - params.sessionKey ?? "unknown" - } age=${Math.round(params.ageMs / 1000)}s queueDepth=${params.queueDepth ?? 0}`, + diag.warn( + `stuck session recovery skipped: reason=active_reply_work action=keep_lane ${formatRecoveryContext( + params, + { activeSessionId: activeWorkSessionId }, + )}`, ); return; } @@ -83,10 +108,15 @@ export async function recoverStuckDiagnosticSession( if (!activeSessionId && sessionLane) { const laneSnapshot = getCommandLaneSnapshot(sessionLane); if (laneSnapshot.activeCount > 0) { - diag.debug( - `stuck session recovery skipped lane reset: active lane task lane=${sessionLane} active=${laneSnapshot.activeCount} queued=${laneSnapshot.queuedCount} sessionId=${ - params.sessionId ?? "unknown" - } sessionKey=${params.sessionKey ?? "unknown"} age=${Math.round(params.ageMs / 1000)}s`, + diag.warn( + `stuck session recovery skipped: reason=active_lane_task action=keep_lane ${formatRecoveryContext( + params, + { + lane: sessionLane, + activeCount: laneSnapshot.activeCount, + queuedCount: laneSnapshot.queuedCount, + }, + )}`, ); return; } @@ -101,6 +131,15 @@ export async function recoverStuckDiagnosticSession( params.sessionKey ?? "unknown" } age=${Math.round(params.ageMs / 1000)}s aborted=${aborted} drained=${drained} released=${released}`, ); + } else { + diag.warn( + `stuck session recovery no-op: reason=no_active_work action=none ${formatRecoveryContext( + params, + { + lane: sessionLane ?? undefined, + }, + )}`, + ); } } catch (err) { diag.warn( diff --git a/src/logging/diagnostic.test.ts b/src/logging/diagnostic.test.ts index 5984fa8839a..470b4187bc2 100644 --- a/src/logging/diagnostic.test.ts +++ b/src/logging/diagnostic.test.ts @@ -6,6 +6,7 @@ import { onDiagnosticEvent, resetDiagnosticEventsForTest, setDiagnosticsEnabledForProcess, + type DiagnosticEventPayload, } from "../infra/diagnostic-events.js"; import { diagnosticSessionStates, @@ -124,10 +125,10 @@ describe("stuck session diagnostics threshold", () => { }); it("uses the configured diagnostics.stuckSessionWarnMs threshold", () => { - const events: Array<{ type: string }> = []; + const events: DiagnosticEventPayload[] = []; const recoverStuckSession = vi.fn(); const unsubscribe = onDiagnosticEvent((event) => { - events.push({ type: event.type }); + events.push(event); }); try { startDiagnosticHeartbeat( @@ -145,7 +146,12 @@ describe("stuck session diagnostics threshold", () => { unsubscribe(); } - expect(events.filter((event) => event.type === "session.stuck")).toHaveLength(1); + const stuckEvents = events.filter((event) => event.type === "session.stuck"); + expect(stuckEvents).toHaveLength(1); + expect(stuckEvents[0]).toMatchObject({ + reason: "processing_without_queue", + queueDepth: 0, + }); expect(recoverStuckSession).toHaveBeenCalledWith({ sessionId: "s1", sessionKey: "main", diff --git a/src/logging/diagnostic.ts b/src/logging/diagnostic.ts index 9c17030dd74..da5aa9e20d2 100644 --- a/src/logging/diagnostic.ts +++ b/src/logging/diagnostic.ts @@ -150,6 +150,19 @@ function hasRecentDiagnosticActivity(now: number): boolean { return lastActivityAt > 0 && now - lastActivityAt <= RECENT_DIAGNOSTIC_ACTIVITY_MS; } +function resolveStuckSessionReason(state: { + state: SessionStateValue; + queueDepth: number; +}): string { + if (state.queueDepth > 0) { + return "processing_with_queued_work"; + } + if (state.state === "processing") { + return "processing_without_queue"; + } + return "stale_session_state"; +} + function roundDiagnosticMetric(value: number, digits = 3): number { if (!Number.isFinite(value)) { return 0; @@ -514,10 +527,13 @@ export function logSessionStuck(params: SessionRef & { state: SessionStateValue; return; } const state = getDiagnosticSessionState(params); + const reason = resolveStuckSessionReason(state); diag.warn( `stuck session: sessionId=${state.sessionId ?? "unknown"} sessionKey=${ state.sessionKey ?? "unknown" - } state=${params.state} age=${Math.round(params.ageMs / 1000)}s queueDepth=${state.queueDepth}`, + } state=${params.state} age=${Math.round(params.ageMs / 1000)}s queueDepth=${ + state.queueDepth + } reason=${reason} recovery=checking`, ); emitDiagnosticEvent({ type: "session.stuck", @@ -526,6 +542,7 @@ export function logSessionStuck(params: SessionRef & { state: SessionStateValue; state: params.state, ageMs: params.ageMs, queueDepth: state.queueDepth, + reason, }); markActivity(); }