fix(logging): split queue diagnostic runtime

This commit is contained in:
Vincent Koc
2026-04-12 03:42:35 +01:00
parent 39f22ef8b3
commit d262b1c688
4 changed files with 56 additions and 36 deletions

View File

@@ -0,0 +1,40 @@
import { emitDiagnosticEvent } from "../infra/diagnostic-events.js";
import { createSubsystemLogger } from "./subsystem.js";
const diag = createSubsystemLogger("diagnostic");
let lastActivityAt = 0;
export const diagnosticLogger = diag;
export function markDiagnosticActivity(): void {
lastActivityAt = Date.now();
}
export function getLastDiagnosticActivityAt(): number {
return lastActivityAt;
}
export function resetDiagnosticActivityForTest(): void {
lastActivityAt = 0;
}
export function logLaneEnqueue(lane: string, queueSize: number): void {
diag.debug(`lane enqueue: lane=${lane} queueSize=${queueSize}`);
emitDiagnosticEvent({
type: "queue.lane.enqueue",
lane,
queueSize,
});
markDiagnosticActivity();
}
export function logLaneDequeue(lane: string, waitMs: number, queueSize: number): void {
diag.debug(`lane dequeue: lane=${lane} waitMs=${waitMs} queueSize=${queueSize}`);
emitDiagnosticEvent({
type: "queue.lane.dequeue",
lane,
queueSize,
waitMs,
});
markDiagnosticActivity();
}

View File

@@ -1,6 +1,12 @@
import { getRuntimeConfig } from "../config/config.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { emitDiagnosticEvent } from "../infra/diagnostic-events.js";
import {
diagnosticLogger as diag,
getLastDiagnosticActivityAt,
markDiagnosticActivity as markActivity,
resetDiagnosticActivityForTest,
} from "./diagnostic-runtime.js";
import {
diagnosticSessionStates,
getDiagnosticSessionState,
@@ -10,9 +16,7 @@ import {
type SessionRef,
type SessionStateValue,
} from "./diagnostic-session-state.js";
import { createSubsystemLogger } from "./subsystem.js";
const diag = createSubsystemLogger("diagnostic");
export { diagnosticLogger, logLaneDequeue, logLaneEnqueue } from "./diagnostic-runtime.js";
const webhookStats = {
received: 0,
@@ -21,7 +25,6 @@ const webhookStats = {
lastReceived: 0,
};
let lastActivityAt = 0;
const DEFAULT_STUCK_SESSION_WARN_MS = 120_000;
const MIN_STUCK_SESSION_WARN_MS = 1_000;
const MAX_STUCK_SESSION_WARN_MS = 24 * 60 * 60 * 1000;
@@ -34,10 +37,6 @@ function loadCommandPollBackoffRuntime() {
return commandPollBackoffRuntimePromise;
}
function markActivity() {
lastActivityAt = Date.now();
}
export function resolveStuckSessionWarnMs(config?: OpenClawConfig): number {
const raw = config?.diagnostics?.stuckSessionWarnMs;
if (typeof raw !== "number" || !Number.isFinite(raw)) {
@@ -244,27 +243,6 @@ export function logSessionStuck(params: SessionRef & { state: SessionStateValue;
markActivity();
}
export function logLaneEnqueue(lane: string, queueSize: number) {
diag.debug(`lane enqueue: lane=${lane} queueSize=${queueSize}`);
emitDiagnosticEvent({
type: "queue.lane.enqueue",
lane,
queueSize,
});
markActivity();
}
export function logLaneDequeue(lane: string, waitMs: number, queueSize: number) {
diag.debug(`lane dequeue: lane=${lane} waitMs=${waitMs} queueSize=${queueSize}`);
emitDiagnosticEvent({
type: "queue.lane.dequeue",
lane,
queueSize,
waitMs,
});
markActivity();
}
export function logRunAttempt(params: SessionRef & { runId: string; attempt: number }) {
diag.debug(
`run attempt: sessionId=${params.sessionId ?? "unknown"} sessionKey=${
@@ -360,7 +338,7 @@ export function startDiagnosticHeartbeat(
0,
);
const hasActivity =
lastActivityAt > 0 ||
getLastDiagnosticActivityAt() > 0 ||
webhookStats.received > 0 ||
activeCount > 0 ||
waitingCount > 0 ||
@@ -368,7 +346,7 @@ export function startDiagnosticHeartbeat(
if (!hasActivity) {
return;
}
if (now - lastActivityAt > 120_000 && activeCount === 0 && waitingCount === 0) {
if (now - getLastDiagnosticActivityAt() > 120_000 && activeCount === 0 && waitingCount === 0) {
return;
}
@@ -425,12 +403,10 @@ export function getDiagnosticSessionStateCountForTest(): number {
export function resetDiagnosticStateForTest(): void {
resetDiagnosticSessionStateForTest();
resetDiagnosticActivityForTest();
webhookStats.received = 0;
webhookStats.processed = 0;
webhookStats.errors = 0;
webhookStats.lastReceived = 0;
lastActivityAt = 0;
stopDiagnosticHeartbeat();
}
export { diag as diagnosticLogger };

View File

@@ -12,7 +12,7 @@ const diagnosticMocks = vi.hoisted(() => ({
},
}));
vi.mock("../logging/diagnostic.js", () => ({
vi.mock("../logging/diagnostic-runtime.js", () => ({
logLaneEnqueue: diagnosticMocks.logLaneEnqueue,
logLaneDequeue: diagnosticMocks.logLaneDequeue,
diagnosticLogger: diagnosticMocks.diag,

View File

@@ -1,4 +1,8 @@
import { diagnosticLogger as diag, logLaneDequeue, logLaneEnqueue } from "../logging/diagnostic.js";
import {
diagnosticLogger as diag,
logLaneDequeue,
logLaneEnqueue,
} from "../logging/diagnostic-runtime.js";
import { resolveGlobalSingleton } from "../shared/global-singleton.js";
import { CommandLane } from "./lanes.js";
/**