fix: enrich stalled session recovery logs

This commit is contained in:
Peter Steinberger
2026-05-04 05:39:06 +01:00
parent b5d408cd69
commit 65f2c2a0db
7 changed files with 396 additions and 2 deletions

View File

@@ -61,6 +61,7 @@ Docs: https://docs.openclaw.ai
- Control UI: render tool results whose output arrives as text-block arrays and give expanded tool output a scrollable block, so read/exec output remains visible in WebChat. Fixes #77054.
- MCP: include serialized conversation/message payloads in the primary text content for `conversations_list` and `messages_read`, while preserving `structuredContent` for capable clients. Fixes #77024.
- Media: treat `EPERM` from the post-write media fsync step as best-effort, allowing WebChat and channel uploads to finish on Windows filesystems that reject `fsync` after a successful write. Fixes #76844.
- Diagnostics: include last progress, cron job/run ids, stopped cron job name, and the last assistant transcript snippet in stalled-session and stuck-session recovery logs so cron stalls show what was stopped.
- Diagnostics: keep webhook/message OTEL attributes and Prometheus delivery labels low-cardinality and omit raw chat/message IDs from spans, so progress-draft and message-tool modes do not leak high-cardinality messaging identifiers.
- Google Meet: stop advertising legacy `mode: "realtime"` to agents and config UIs, while keeping it as a hidden compatibility alias for `mode: "agent"`, so new joins use the STT -> OpenClaw agent -> TTS path instead of selecting the direct realtime voice fallback.
- Google Meet: add `chrome.audioBufferBytes` for generated command-pair SoX audio commands and lower the default buffer from SoX's 8192 bytes to 4096 bytes to reduce Chrome talk-back latency.

View File

@@ -0,0 +1,94 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import {
formatCronSessionDiagnosticFields,
formatStoppedCronSessionDiagnosticFields,
parseCronRunSessionKey,
readLastAssistantFromSessionFile,
resolveCronSessionDiagnosticContext,
} from "./diagnostic-session-context.js";
let tempDir: string | undefined;
let previousStateDir: string | undefined;
function writeJsonl(filePath: string, rows: unknown[]) {
fs.mkdirSync(path.dirname(filePath), { recursive: true });
fs.writeFileSync(filePath, rows.map((row) => JSON.stringify(row)).join("\n") + "\n");
}
describe("diagnostic session context", () => {
beforeEach(() => {
previousStateDir = process.env.OPENCLAW_STATE_DIR;
tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-diagnostic-session-"));
process.env.OPENCLAW_STATE_DIR = tempDir;
});
afterEach(() => {
if (previousStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = previousStateDir;
}
if (tempDir) {
fs.rmSync(tempDir, { recursive: true, force: true });
}
tempDir = undefined;
});
it("parses cron run session keys", () => {
expect(parseCronRunSessionKey("agent:clawblocker:cron:job-123:run:run-456")).toEqual({
agentId: "clawblocker",
cronJobId: "job-123",
cronRunId: "run-456",
});
});
it("formats cron job and last assistant context for stalled session logs", () => {
const stateDir = tempDir!;
fs.mkdirSync(path.join(stateDir, "cron"), { recursive: true });
fs.writeFileSync(
path.join(stateDir, "cron", "jobs.json"),
JSON.stringify({
jobs: [{ id: "job-123", name: "Twitter Mention Moderation Agent" }],
}),
);
writeJsonl(path.join(stateDir, "agents", "clawblocker", "sessions", "run-456.jsonl"), [
{ message: { role: "user", content: "run" } },
{
message: {
role: "assistant",
content: [{ type: "text", text: "There are 40\ncached mentions ready." }],
},
},
]);
const context = resolveCronSessionDiagnosticContext({
sessionKey: "agent:clawblocker:cron:job-123:run:run-456",
});
expect(formatCronSessionDiagnosticFields(context)).toContain("cronJobId=job-123");
expect(formatCronSessionDiagnosticFields(context)).toContain("cronRunId=run-456");
expect(formatCronSessionDiagnosticFields(context)).toContain(
'cronJob="Twitter Mention Moderation Agent"',
);
expect(formatCronSessionDiagnosticFields(context)).toContain(
'lastAssistant="There are 40 cached mentions ready."',
);
expect(formatStoppedCronSessionDiagnosticFields(context)).toContain(
'stopped="Twitter Mention Moderation Agent"',
);
});
it("reads the latest assistant message from a transcript tail", () => {
const filePath = path.join(tempDir!, "session.jsonl");
writeJsonl(filePath, [
{ message: { role: "assistant", content: "older" } },
{ message: { role: "user", content: "later user" } },
{ message: { role: "assistant", content: "newer" } },
]);
expect(readLastAssistantFromSessionFile(filePath)).toBe("newer");
});
});

View File

@@ -0,0 +1,201 @@
import fs from "node:fs";
import path from "node:path";
import { resolveStateDir } from "../config/paths.js";
const SESSION_TAIL_BYTES = 64 * 1024;
const MAX_QUOTED_FIELD_CHARS = 140;
type CronSessionContext = {
agentId?: string;
cronJobId?: string;
cronRunId?: string;
cronJobName?: string;
lastAssistant?: string;
};
function quoteLogField(value: string): string {
const oneLine = value.replace(/\s+/g, " ").trim();
const truncated =
oneLine.length > MAX_QUOTED_FIELD_CHARS
? `${oneLine.slice(0, Math.max(0, MAX_QUOTED_FIELD_CHARS - 3))}...`
: oneLine;
return `"${truncated.replace(/["\\]/g, "\\$&")}"`;
}
export function parseCronRunSessionKey(sessionKey?: string): {
agentId?: string;
cronJobId?: string;
cronRunId?: string;
} {
const parts = sessionKey?.trim().split(":") ?? [];
if (parts[0] !== "agent") {
return {};
}
const cronIndex = parts.indexOf("cron");
if (cronIndex < 2) {
return {};
}
const runIndex = parts.indexOf("run", cronIndex + 2);
return {
agentId: parts[1],
cronJobId: parts[cronIndex + 1],
cronRunId: runIndex >= 0 ? parts[runIndex + 1] : undefined,
};
}
function resolveSessionFile(params: {
agentId?: string;
cronRunId?: string;
activeSessionId?: string;
}): string | undefined {
const agentId = params.agentId?.trim();
const runId = params.activeSessionId?.trim() || params.cronRunId?.trim();
if (!agentId || !runId) {
return undefined;
}
return path.join(resolveStateDir(), "agents", agentId, "sessions", `${runId}.jsonl`);
}
function readTailText(filePath: string): { text: string; truncated: boolean } | undefined {
let fd: number | undefined;
try {
const stat = fs.statSync(filePath);
if (!stat.isFile() || stat.size <= 0) {
return undefined;
}
const length = Math.min(stat.size, SESSION_TAIL_BYTES);
const start = Math.max(0, stat.size - length);
const buffer = Buffer.alloc(length);
fd = fs.openSync(filePath, "r");
const read = fs.readSync(fd, buffer, 0, length, start);
return { text: buffer.subarray(0, read).toString("utf8"), truncated: start > 0 };
} catch {
return undefined;
} finally {
if (fd !== undefined) {
try {
fs.closeSync(fd);
} catch {
// best-effort diagnostic context only
}
}
}
}
function textFromContent(content: unknown): string | undefined {
if (typeof content === "string") {
return content;
}
if (!Array.isArray(content)) {
return undefined;
}
const texts = content
.map((part) => {
if (!part || typeof part !== "object") {
return undefined;
}
const text = (part as { text?: unknown }).text;
return typeof text === "string" ? text : undefined;
})
.filter((text): text is string => Boolean(text?.trim()));
return texts.length ? texts.join(" ") : undefined;
}
export function readLastAssistantFromSessionFile(filePath: string | undefined): string | undefined {
if (!filePath) {
return undefined;
}
const tail = readTailText(filePath);
if (!tail.text) {
return undefined;
}
const lines = tail.text.split(/\r?\n/).filter(Boolean);
if (tail.truncated && lines.length > 0) {
lines.shift();
}
for (let index = lines.length - 1; index >= 0; index -= 1) {
try {
const parsed = JSON.parse(lines[index]) as {
message?: { role?: unknown; content?: unknown };
};
if (parsed.message?.role !== "assistant") {
continue;
}
const text = textFromContent(parsed.message.content)?.trim();
if (text) {
return text;
}
} catch {
// Ignore partial or non-JSON diagnostic transcript lines.
}
}
return undefined;
}
function readCronJobName(cronJobId: string | undefined): string | undefined {
if (!cronJobId) {
return undefined;
}
try {
const raw = fs.readFileSync(path.join(resolveStateDir(), "cron", "jobs.json"), "utf8");
const parsed = JSON.parse(raw) as { jobs?: Array<{ id?: unknown; name?: unknown }> };
const job = parsed.jobs?.find((entry) => entry.id === cronJobId);
return typeof job?.name === "string" && job.name.trim() ? job.name.trim() : undefined;
} catch {
return undefined;
}
}
export function resolveCronSessionDiagnosticContext(params: {
sessionKey?: string;
activeSessionId?: string;
}): CronSessionContext {
const parsed = parseCronRunSessionKey(params.sessionKey);
if (!parsed.cronJobId && !parsed.cronRunId) {
return {};
}
return {
...parsed,
cronJobName: readCronJobName(parsed.cronJobId),
lastAssistant: readLastAssistantFromSessionFile(
resolveSessionFile({ ...parsed, activeSessionId: params.activeSessionId }),
),
};
}
export function formatCronSessionDiagnosticFields(context: CronSessionContext): string {
const fields: string[] = [];
if (context.cronJobId) {
fields.push(`cronJobId=${context.cronJobId}`);
}
if (context.cronRunId) {
fields.push(`cronRunId=${context.cronRunId}`);
}
if (context.cronJobName) {
fields.push(`cronJob=${quoteLogField(context.cronJobName)}`);
}
if (context.lastAssistant) {
fields.push(`lastAssistant=${quoteLogField(context.lastAssistant)}`);
}
return fields.join(" ");
}
export function formatStoppedCronSessionDiagnosticFields(context: CronSessionContext): string {
const fields: string[] = [];
if (context.cronJobName) {
fields.push(`stopped=${quoteLogField(context.cronJobName)}`);
}
const rest = formatCronSessionDiagnosticFields({
cronJobId: context.cronJobId,
cronRunId: context.cronRunId,
lastAssistant: context.lastAssistant,
});
if (rest) {
fields.push(rest);
}
return fields.join(" ");
}
export const __testing = {
quoteLogField,
};

View File

@@ -1,3 +1,6 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { beforeEach, describe, expect, it, vi } from "vitest";
const mocks = vi.hoisted(() => ({
@@ -129,6 +132,57 @@ describe("stuck session recovery", () => {
expect(mocks.resetCommandLane).not.toHaveBeenCalled();
});
it("logs stopped cron context when aborting an active embedded run", async () => {
const previousStateDir = process.env.OPENCLAW_STATE_DIR;
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-recovery-context-"));
try {
process.env.OPENCLAW_STATE_DIR = tempDir;
fs.mkdirSync(path.join(tempDir, "cron"), { recursive: true });
fs.writeFileSync(
path.join(tempDir, "cron", "jobs.json"),
JSON.stringify({
jobs: [{ id: "job-123", name: "Twitter Mention Moderation Agent" }],
}),
);
fs.mkdirSync(path.join(tempDir, "agents", "clawblocker", "sessions"), {
recursive: true,
});
fs.writeFileSync(
path.join(tempDir, "agents", "clawblocker", "sessions", "run-456.jsonl"),
JSON.stringify({
message: { role: "assistant", content: "There are 40 cached mentions." },
}) + "\n",
);
mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue("run-456");
mocks.abortEmbeddedPiRun.mockReturnValue(true);
mocks.waitForEmbeddedPiRunEnd.mockResolvedValue(true);
await recoverStuckDiagnosticSession({
sessionId: "run-456",
sessionKey: "agent:clawblocker:cron:job-123:run:run-456",
ageMs: 629_000,
allowActiveAbort: true,
});
} finally {
if (previousStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = previousStateDir;
}
fs.rmSync(tempDir, { recursive: true, force: true });
}
expect(mocks.diag.warn).toHaveBeenCalledWith(
expect.stringContaining("action=abort_embedded_run"),
);
expect(mocks.diag.warn).toHaveBeenCalledWith(
expect.stringContaining('stopped="Twitter Mention Moderation Agent"'),
);
expect(mocks.diag.warn).toHaveBeenCalledWith(
expect.stringContaining('lastAssistant="There are 40 cached mentions."'),
);
});
it("force-clears and releases the session lane when abort cleanup does not drain", async () => {
mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue("session-1");
mocks.abortEmbeddedPiRun.mockReturnValue(true);

View File

@@ -8,6 +8,10 @@ import {
} from "../agents/pi-embedded-runner/runs.js";
import { getCommandLaneSnapshot, resetCommandLane } from "../process/command-queue.js";
import { diagnosticLogger as diag } from "./diagnostic-runtime.js";
import {
formatStoppedCronSessionDiagnosticFields,
resolveCronSessionDiagnosticContext,
} from "./diagnostic-session-context.js";
const STUCK_SESSION_ABORT_SETTLE_MS = 15_000;
const recoveriesInFlight = new Set<string>();
@@ -126,10 +130,16 @@ export async function recoverStuckDiagnosticSession(
sessionLane && (!activeSessionId || !aborted || !drained) ? resetCommandLane(sessionLane) : 0;
if (aborted || released > 0) {
const action = aborted ? "abort_embedded_run" : "release_lane";
const stoppedFields = formatStoppedCronSessionDiagnosticFields(
resolveCronSessionDiagnosticContext({ sessionKey: params.sessionKey, activeSessionId }),
);
diag.warn(
`stuck session recovery: sessionId=${params.sessionId ?? activeSessionId ?? "unknown"} sessionKey=${
params.sessionKey ?? "unknown"
} age=${Math.round(params.ageMs / 1000)}s aborted=${aborted} drained=${drained} released=${released}`,
} age=${Math.round(params.ageMs / 1000)}s action=${action} aborted=${aborted} drained=${drained} released=${released}${
stoppedFields ? ` ${stoppedFields}` : ""
}`,
);
} else {
diag.warn(

View File

@@ -332,6 +332,7 @@ describe("stuck session diagnostics threshold", () => {
it("reports active sessions as stalled instead of stuck when active work stops progressing", () => {
const events: DiagnosticEventPayload[] = [];
const recoverStuckSession = vi.fn();
const warnSpy = vi.spyOn(diagnosticLogger, "warn").mockImplementation(() => undefined);
const unsubscribe = onDiagnosticEvent((event) => {
events.push(event);
});
@@ -360,6 +361,10 @@ describe("stuck session diagnostics threshold", () => {
reason: "active_work_without_progress",
activeWorkKind: "embedded_run",
});
expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining("lastProgress=embedded_run:started"),
);
expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining("lastProgressAge=60s"));
expect(recoverStuckSession).not.toHaveBeenCalled();
});

View File

@@ -23,6 +23,10 @@ import {
classifySessionAttention,
type SessionAttentionClassification,
} from "./diagnostic-session-attention.js";
import {
formatCronSessionDiagnosticFields,
resolveCronSessionDiagnosticContext,
} from "./diagnostic-session-context.js";
import {
diagnosticSessionStates,
getDiagnosticSessionState,
@@ -604,6 +608,26 @@ function sessionAttentionFields(params: {
};
}
function formatSessionActivityLogFields(activity: DiagnosticSessionActivitySnapshot): string {
const fields: string[] = [];
if (activity.lastProgressReason) {
fields.push(`lastProgress=${activity.lastProgressReason}`);
}
if (activity.lastProgressAgeMs !== undefined) {
fields.push(`lastProgressAge=${Math.round(activity.lastProgressAgeMs / 1000)}s`);
}
if (activity.activeToolName) {
fields.push(`activeTool=${activity.activeToolName}`);
}
if (activity.activeToolCallId) {
fields.push(`activeToolCallId=${activity.activeToolCallId}`);
}
if (activity.activeToolAgeMs !== undefined) {
fields.push(`activeToolAge=${Math.round(activity.activeToolAgeMs / 1000)}s`);
}
return fields.join(" ");
}
export function logSessionAttention(
params: SessionRef & {
state: SessionStateValue;
@@ -660,13 +684,18 @@ export function logSessionAttention(
: classification.eventType === "session.stalled"
? "stalled session"
: "long-running session";
const activityFields = formatSessionActivityLogFields(activity);
const cronFields = formatCronSessionDiagnosticFields(
resolveCronSessionDiagnosticContext({ sessionKey: state.sessionKey }),
);
const detailFields = [activityFields, cronFields].filter(Boolean).join(" ");
const message = `${label}: sessionId=${state.sessionId ?? "unknown"} sessionKey=${
state.sessionKey ?? "unknown"
} state=${params.state} age=${Math.round(params.ageMs / 1000)}s queueDepth=${
state.queueDepth
} reason=${classification.reason} classification=${classification.classification}${
classification.activeWorkKind ? ` activeWorkKind=${classification.activeWorkKind}` : ""
} recovery=${recoveryEligible ? "checking" : "none"}`;
}${detailFields ? ` ${detailFields}` : ""} recovery=${recoveryEligible ? "checking" : "none"}`;
if (classification.eventType === "session.long_running" && state.queueDepth <= 0) {
diag.debug(message);
} else {