mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:10:44 +00:00
fix: preserve queued session recovery diagnostics
This commit is contained in:
@@ -124,11 +124,7 @@ export type DiagnosticSessionStateEvent = DiagnosticBaseEvent & {
|
||||
queueDepth?: number;
|
||||
};
|
||||
|
||||
export type DiagnosticSessionActiveWorkKind =
|
||||
| "embedded_run"
|
||||
| "model_call"
|
||||
| "tool_call"
|
||||
| "queued_work";
|
||||
export type DiagnosticSessionActiveWorkKind = "embedded_run" | "model_call" | "tool_call";
|
||||
|
||||
export type DiagnosticSessionAttentionClassification =
|
||||
| "long_running"
|
||||
|
||||
@@ -24,6 +24,7 @@ import {
|
||||
} from "./diagnostic-stability.js";
|
||||
import {
|
||||
logSessionStateChange,
|
||||
logMessageQueued,
|
||||
resetDiagnosticStateForTest,
|
||||
resolveStuckSessionWarnMs,
|
||||
startDiagnosticHeartbeat,
|
||||
@@ -162,6 +163,45 @@ describe("stuck session diagnostics threshold", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps queued stale sessions eligible for lane recovery", () => {
|
||||
const events: DiagnosticEventPayload[] = [];
|
||||
const recoverStuckSession = vi.fn();
|
||||
const unsubscribe = onDiagnosticEvent((event) => {
|
||||
events.push(event);
|
||||
});
|
||||
try {
|
||||
startDiagnosticHeartbeat(
|
||||
{
|
||||
diagnostics: {
|
||||
enabled: true,
|
||||
stuckSessionWarnMs: 30_000,
|
||||
},
|
||||
},
|
||||
{ recoverStuckSession },
|
||||
);
|
||||
logMessageQueued({ sessionId: "s1", sessionKey: "main", source: "test" });
|
||||
logSessionStateChange({ sessionId: "s1", sessionKey: "main", state: "processing" });
|
||||
vi.advanceTimersByTime(61_000);
|
||||
} finally {
|
||||
unsubscribe();
|
||||
}
|
||||
|
||||
expect(events.filter((event) => event.type === "session.long_running")).toHaveLength(0);
|
||||
const stuckEvents = events.filter((event) => event.type === "session.stuck");
|
||||
expect(stuckEvents).toHaveLength(1);
|
||||
expect(stuckEvents[0]).toMatchObject({
|
||||
classification: "stale_session_state",
|
||||
reason: "queued_work_without_active_run",
|
||||
queueDepth: 1,
|
||||
});
|
||||
expect(recoverStuckSession).toHaveBeenCalledWith({
|
||||
sessionId: "s1",
|
||||
sessionKey: "main",
|
||||
ageMs: expect.any(Number),
|
||||
queueDepth: 1,
|
||||
});
|
||||
});
|
||||
|
||||
it("reports active sessions as stalled instead of stuck when active work stops progressing", () => {
|
||||
const events: DiagnosticEventPayload[] = [];
|
||||
const recoverStuckSession = vi.fn();
|
||||
@@ -232,6 +272,44 @@ describe("stuck session diagnostics threshold", () => {
|
||||
expect(recoverStuckSession).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("keeps queued sessions non-recoverable while active work is making progress", () => {
|
||||
const events: DiagnosticEventPayload[] = [];
|
||||
const recoverStuckSession = vi.fn();
|
||||
const unsubscribe = onDiagnosticEvent((event) => {
|
||||
events.push(event);
|
||||
});
|
||||
try {
|
||||
startDiagnosticHeartbeat(
|
||||
{
|
||||
diagnostics: {
|
||||
enabled: true,
|
||||
stuckSessionWarnMs: 30_000,
|
||||
},
|
||||
},
|
||||
{ recoverStuckSession },
|
||||
);
|
||||
logMessageQueued({ sessionId: "s1", sessionKey: "main", source: "test" });
|
||||
logSessionStateChange({ sessionId: "s1", sessionKey: "main", state: "processing" });
|
||||
vi.advanceTimersByTime(45_000);
|
||||
markDiagnosticEmbeddedRunStarted({ sessionId: "s1", sessionKey: "main" });
|
||||
vi.advanceTimersByTime(16_000);
|
||||
} finally {
|
||||
unsubscribe();
|
||||
}
|
||||
|
||||
expect(events.filter((event) => event.type === "session.stuck")).toHaveLength(0);
|
||||
expect(events.filter((event) => event.type === "session.stalled")).toHaveLength(0);
|
||||
const longRunningEvents = events.filter((event) => event.type === "session.long_running");
|
||||
expect(longRunningEvents).toHaveLength(1);
|
||||
expect(longRunningEvents[0]).toMatchObject({
|
||||
classification: "long_running",
|
||||
reason: "queued_behind_active_work",
|
||||
activeWorkKind: "embedded_run",
|
||||
queueDepth: 1,
|
||||
});
|
||||
expect(recoverStuckSession).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("starts and stops the stability recorder with the heartbeat lifecycle", () => {
|
||||
startDiagnosticHeartbeat({
|
||||
diagnostics: {
|
||||
|
||||
@@ -216,19 +216,9 @@ function classifySessionAttention(params: {
|
||||
};
|
||||
}
|
||||
|
||||
if (params.queueDepth > 0) {
|
||||
return {
|
||||
eventType: "session.long_running",
|
||||
reason: "queued_work_without_active_run",
|
||||
classification: "long_running",
|
||||
activeWorkKind: "queued_work",
|
||||
recoveryEligible: false,
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
eventType: "session.stuck",
|
||||
reason: "stale_session_state",
|
||||
reason: params.queueDepth > 0 ? "queued_work_without_active_run" : "stale_session_state",
|
||||
classification: "stale_session_state",
|
||||
recoveryEligible: true,
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user