mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 12:00:44 +00:00
fix: explain stuck session diagnostics
This commit is contained in:
@@ -29,6 +29,7 @@ Docs: https://docs.openclaw.ai
|
||||
### Fixes
|
||||
|
||||
- Agents/sessions: emit a terminal lifecycle backstop when embedded timeout/error turns return without `agent_end`, so Gateway sessions no longer stay stuck in `running` after failover surfaces a timeout. Fixes #74607. Thanks @millerc79.
|
||||
- Gateway/diagnostics: include stuck-session reason hints and recovery skip causes in warnings, so operators can tell whether a lane is waiting on active work, queued work, or stale bookkeeping. Thanks @vincentkoc.
|
||||
- Agents/Codex: bound embedded-run cleanup, trajectory flushing, and command-lane task timeouts after runtime failures, so Discord and other chat sessions return to idle instead of staying stuck in processing. Thanks @vincentkoc.
|
||||
- Heartbeat/exec: consume successful metadata-only async exec completions silently so Telegram and other chat surfaces no longer ask users for missing command logs after `No session found`. Fixes #74595. Thanks @gkoch02.
|
||||
- Web fetch: add a documented `tools.web.fetch.ssrfPolicy.allowIpv6UniqueLocalRange` opt-in and thread it through cache keys and DNS/IP checks so trusted fake-IP proxy stacks using `fc00::/7` can work without broad private-network access. Fixes #74351. Thanks @jeffrey701.
|
||||
|
||||
@@ -131,6 +131,7 @@ export type DiagnosticSessionStuckEvent = DiagnosticBaseEvent & {
|
||||
state: DiagnosticSessionState;
|
||||
ageMs: number;
|
||||
queueDepth?: number;
|
||||
reason?: string;
|
||||
};
|
||||
|
||||
export type DiagnosticLaneEnqueueEvent = DiagnosticBaseEvent & {
|
||||
|
||||
@@ -239,6 +239,7 @@ function sanitizeDiagnosticEvent(event: DiagnosticEventPayload): DiagnosticStabi
|
||||
break;
|
||||
case "session.stuck":
|
||||
record.outcome = event.state;
|
||||
assignReasonCode(record, event.reason);
|
||||
record.ageMs = event.ageMs;
|
||||
record.queueDepth = event.queueDepth;
|
||||
break;
|
||||
|
||||
@@ -105,6 +105,10 @@ describe("stuck session recovery", () => {
|
||||
expect(mocks.waitForEmbeddedPiRunEnd).not.toHaveBeenCalled();
|
||||
expect(mocks.forceClearEmbeddedPiRun).not.toHaveBeenCalled();
|
||||
expect(mocks.resetCommandLane).not.toHaveBeenCalled();
|
||||
expect(mocks.diag.warn).toHaveBeenCalledWith(
|
||||
expect.stringContaining("reason=active_embedded_run"),
|
||||
);
|
||||
expect(mocks.diag.warn).toHaveBeenCalledWith(expect.stringContaining("action=observe_only"));
|
||||
});
|
||||
|
||||
it("aborts an active embedded run when active abort recovery is enabled", async () => {
|
||||
@@ -197,6 +201,12 @@ describe("stuck session recovery", () => {
|
||||
expect(mocks.abortEmbeddedPiRun).not.toHaveBeenCalled();
|
||||
expect(mocks.forceClearEmbeddedPiRun).not.toHaveBeenCalled();
|
||||
expect(mocks.resetCommandLane).not.toHaveBeenCalled();
|
||||
expect(mocks.diag.warn).toHaveBeenCalledWith(
|
||||
expect.stringContaining("reason=active_reply_work"),
|
||||
);
|
||||
expect(mocks.diag.warn).toHaveBeenCalledWith(
|
||||
expect.stringContaining("activeSessionId=queued-reply-session"),
|
||||
);
|
||||
});
|
||||
|
||||
it("does not release the session lane while unregistered lane work is active", async () => {
|
||||
@@ -223,6 +233,26 @@ describe("stuck session recovery", () => {
|
||||
expect(mocks.abortEmbeddedPiRun).not.toHaveBeenCalled();
|
||||
expect(mocks.forceClearEmbeddedPiRun).not.toHaveBeenCalled();
|
||||
expect(mocks.resetCommandLane).not.toHaveBeenCalled();
|
||||
expect(mocks.diag.warn).toHaveBeenCalledWith(
|
||||
expect.stringContaining("reason=active_lane_task"),
|
||||
);
|
||||
expect(mocks.diag.warn).toHaveBeenCalledWith(expect.stringContaining("laneActive=1"));
|
||||
});
|
||||
|
||||
it("reports when recovery finds no active work to release", async () => {
|
||||
mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue(undefined);
|
||||
mocks.resolveActiveEmbeddedRunSessionId.mockReturnValue(undefined);
|
||||
mocks.isEmbeddedPiRunActive.mockReturnValue(false);
|
||||
mocks.resetCommandLane.mockReturnValue(0);
|
||||
|
||||
await recoverStuckDiagnosticSession({
|
||||
sessionId: "stale-session",
|
||||
sessionKey: "agent:main:main",
|
||||
ageMs: 180_000,
|
||||
});
|
||||
|
||||
expect(mocks.resetCommandLane).toHaveBeenCalledWith("session:agent:main:main");
|
||||
expect(mocks.diag.warn).toHaveBeenCalledWith(expect.stringContaining("reason=no_active_work"));
|
||||
});
|
||||
|
||||
it("releases a stale session-id lane when no session key is available", async () => {
|
||||
|
||||
@@ -24,6 +24,31 @@ function recoveryKey(params: StuckSessionRecoveryParams): string | undefined {
|
||||
return params.sessionKey?.trim() || params.sessionId?.trim() || undefined;
|
||||
}
|
||||
|
||||
function formatRecoveryContext(
|
||||
params: StuckSessionRecoveryParams,
|
||||
extra?: { activeSessionId?: string; lane?: string; activeCount?: number; queuedCount?: number },
|
||||
): string {
|
||||
const fields = [
|
||||
`sessionId=${params.sessionId ?? extra?.activeSessionId ?? "unknown"}`,
|
||||
`sessionKey=${params.sessionKey ?? "unknown"}`,
|
||||
`age=${Math.round(params.ageMs / 1000)}s`,
|
||||
`queueDepth=${params.queueDepth ?? 0}`,
|
||||
];
|
||||
if (extra?.activeSessionId) {
|
||||
fields.push(`activeSessionId=${extra.activeSessionId}`);
|
||||
}
|
||||
if (extra?.lane) {
|
||||
fields.push(`lane=${extra.lane}`);
|
||||
}
|
||||
if (extra?.activeCount !== undefined) {
|
||||
fields.push(`laneActive=${extra.activeCount}`);
|
||||
}
|
||||
if (extra?.queuedCount !== undefined) {
|
||||
fields.push(`laneQueued=${extra.queuedCount}`);
|
||||
}
|
||||
return fields.join(" ");
|
||||
}
|
||||
|
||||
export async function recoverStuckDiagnosticSession(
|
||||
params: StuckSessionRecoveryParams,
|
||||
): Promise<void> {
|
||||
@@ -51,12 +76,11 @@ export async function recoverStuckDiagnosticSession(
|
||||
|
||||
if (activeSessionId) {
|
||||
if (params.allowActiveAbort !== true) {
|
||||
diag.debug(
|
||||
`stuck session recovery skipped active abort: sessionId=${
|
||||
params.sessionId ?? activeSessionId
|
||||
} sessionKey=${params.sessionKey ?? "unknown"} age=${Math.round(
|
||||
params.ageMs / 1000,
|
||||
)}s queueDepth=${params.queueDepth ?? 0}`,
|
||||
diag.warn(
|
||||
`stuck session recovery skipped: reason=active_embedded_run action=observe_only ${formatRecoveryContext(
|
||||
params,
|
||||
{ activeSessionId },
|
||||
)}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
@@ -72,10 +96,11 @@ export async function recoverStuckDiagnosticSession(
|
||||
}
|
||||
|
||||
if (!activeSessionId && activeWorkSessionId && isEmbeddedPiRunActive(activeWorkSessionId)) {
|
||||
diag.debug(
|
||||
`stuck session recovery skipped lane reset: active reply work sessionId=${activeWorkSessionId} sessionKey=${
|
||||
params.sessionKey ?? "unknown"
|
||||
} age=${Math.round(params.ageMs / 1000)}s queueDepth=${params.queueDepth ?? 0}`,
|
||||
diag.warn(
|
||||
`stuck session recovery skipped: reason=active_reply_work action=keep_lane ${formatRecoveryContext(
|
||||
params,
|
||||
{ activeSessionId: activeWorkSessionId },
|
||||
)}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
@@ -83,10 +108,15 @@ export async function recoverStuckDiagnosticSession(
|
||||
if (!activeSessionId && sessionLane) {
|
||||
const laneSnapshot = getCommandLaneSnapshot(sessionLane);
|
||||
if (laneSnapshot.activeCount > 0) {
|
||||
diag.debug(
|
||||
`stuck session recovery skipped lane reset: active lane task lane=${sessionLane} active=${laneSnapshot.activeCount} queued=${laneSnapshot.queuedCount} sessionId=${
|
||||
params.sessionId ?? "unknown"
|
||||
} sessionKey=${params.sessionKey ?? "unknown"} age=${Math.round(params.ageMs / 1000)}s`,
|
||||
diag.warn(
|
||||
`stuck session recovery skipped: reason=active_lane_task action=keep_lane ${formatRecoveryContext(
|
||||
params,
|
||||
{
|
||||
lane: sessionLane,
|
||||
activeCount: laneSnapshot.activeCount,
|
||||
queuedCount: laneSnapshot.queuedCount,
|
||||
},
|
||||
)}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
@@ -101,6 +131,15 @@ export async function recoverStuckDiagnosticSession(
|
||||
params.sessionKey ?? "unknown"
|
||||
} age=${Math.round(params.ageMs / 1000)}s aborted=${aborted} drained=${drained} released=${released}`,
|
||||
);
|
||||
} else {
|
||||
diag.warn(
|
||||
`stuck session recovery no-op: reason=no_active_work action=none ${formatRecoveryContext(
|
||||
params,
|
||||
{
|
||||
lane: sessionLane ?? undefined,
|
||||
},
|
||||
)}`,
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
diag.warn(
|
||||
|
||||
@@ -6,6 +6,7 @@ import {
|
||||
onDiagnosticEvent,
|
||||
resetDiagnosticEventsForTest,
|
||||
setDiagnosticsEnabledForProcess,
|
||||
type DiagnosticEventPayload,
|
||||
} from "../infra/diagnostic-events.js";
|
||||
import {
|
||||
diagnosticSessionStates,
|
||||
@@ -124,10 +125,10 @@ describe("stuck session diagnostics threshold", () => {
|
||||
});
|
||||
|
||||
it("uses the configured diagnostics.stuckSessionWarnMs threshold", () => {
|
||||
const events: Array<{ type: string }> = [];
|
||||
const events: DiagnosticEventPayload[] = [];
|
||||
const recoverStuckSession = vi.fn();
|
||||
const unsubscribe = onDiagnosticEvent((event) => {
|
||||
events.push({ type: event.type });
|
||||
events.push(event);
|
||||
});
|
||||
try {
|
||||
startDiagnosticHeartbeat(
|
||||
@@ -145,7 +146,12 @@ describe("stuck session diagnostics threshold", () => {
|
||||
unsubscribe();
|
||||
}
|
||||
|
||||
expect(events.filter((event) => event.type === "session.stuck")).toHaveLength(1);
|
||||
const stuckEvents = events.filter((event) => event.type === "session.stuck");
|
||||
expect(stuckEvents).toHaveLength(1);
|
||||
expect(stuckEvents[0]).toMatchObject({
|
||||
reason: "processing_without_queue",
|
||||
queueDepth: 0,
|
||||
});
|
||||
expect(recoverStuckSession).toHaveBeenCalledWith({
|
||||
sessionId: "s1",
|
||||
sessionKey: "main",
|
||||
|
||||
@@ -150,6 +150,19 @@ function hasRecentDiagnosticActivity(now: number): boolean {
|
||||
return lastActivityAt > 0 && now - lastActivityAt <= RECENT_DIAGNOSTIC_ACTIVITY_MS;
|
||||
}
|
||||
|
||||
function resolveStuckSessionReason(state: {
|
||||
state: SessionStateValue;
|
||||
queueDepth: number;
|
||||
}): string {
|
||||
if (state.queueDepth > 0) {
|
||||
return "processing_with_queued_work";
|
||||
}
|
||||
if (state.state === "processing") {
|
||||
return "processing_without_queue";
|
||||
}
|
||||
return "stale_session_state";
|
||||
}
|
||||
|
||||
function roundDiagnosticMetric(value: number, digits = 3): number {
|
||||
if (!Number.isFinite(value)) {
|
||||
return 0;
|
||||
@@ -514,10 +527,13 @@ export function logSessionStuck(params: SessionRef & { state: SessionStateValue;
|
||||
return;
|
||||
}
|
||||
const state = getDiagnosticSessionState(params);
|
||||
const reason = resolveStuckSessionReason(state);
|
||||
diag.warn(
|
||||
`stuck session: sessionId=${state.sessionId ?? "unknown"} sessionKey=${
|
||||
state.sessionKey ?? "unknown"
|
||||
} state=${params.state} age=${Math.round(params.ageMs / 1000)}s queueDepth=${state.queueDepth}`,
|
||||
} state=${params.state} age=${Math.round(params.ageMs / 1000)}s queueDepth=${
|
||||
state.queueDepth
|
||||
} reason=${reason} recovery=checking`,
|
||||
);
|
||||
emitDiagnosticEvent({
|
||||
type: "session.stuck",
|
||||
@@ -526,6 +542,7 @@ export function logSessionStuck(params: SessionRef & { state: SessionStateValue;
|
||||
state: params.state,
|
||||
ageMs: params.ageMs,
|
||||
queueDepth: state.queueDepth,
|
||||
reason,
|
||||
});
|
||||
markActivity();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user