diff --git a/CHANGELOG.md b/CHANGELOG.md index b7aaa7db1cb..2dae9149138 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,6 +61,7 @@ Docs: https://docs.openclaw.ai - Control UI: render tool results whose output arrives as text-block arrays and give expanded tool output a scrollable block, so read/exec output remains visible in WebChat. Fixes #77054. - MCP: include serialized conversation/message payloads in the primary text content for `conversations_list` and `messages_read`, while preserving `structuredContent` for capable clients. Fixes #77024. - Media: treat `EPERM` from the post-write media fsync step as best-effort, allowing WebChat and channel uploads to finish on Windows filesystems that reject `fsync` after a successful write. Fixes #76844. +- Diagnostics: include last progress, cron job/run ids, stopped cron job name, and the last assistant transcript snippet in stalled-session and stuck-session recovery logs so cron stalls show what was stopped. - Diagnostics: keep webhook/message OTEL attributes and Prometheus delivery labels low-cardinality and omit raw chat/message IDs from spans, so progress-draft and message-tool modes do not leak high-cardinality messaging identifiers. - Google Meet: stop advertising legacy `mode: "realtime"` to agents and config UIs, while keeping it as a hidden compatibility alias for `mode: "agent"`, so new joins use the STT -> OpenClaw agent -> TTS path instead of selecting the direct realtime voice fallback. - Google Meet: add `chrome.audioBufferBytes` for generated command-pair SoX audio commands and lower the default buffer from SoX's 8192 bytes to 4096 bytes to reduce Chrome talk-back latency. diff --git a/src/logging/diagnostic-session-context.test.ts b/src/logging/diagnostic-session-context.test.ts new file mode 100644 index 00000000000..6904df2cff8 --- /dev/null +++ b/src/logging/diagnostic-session-context.test.ts @@ -0,0 +1,94 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { + formatCronSessionDiagnosticFields, + formatStoppedCronSessionDiagnosticFields, + parseCronRunSessionKey, + readLastAssistantFromSessionFile, + resolveCronSessionDiagnosticContext, +} from "./diagnostic-session-context.js"; + +let tempDir: string | undefined; +let previousStateDir: string | undefined; + +function writeJsonl(filePath: string, rows: unknown[]) { + fs.mkdirSync(path.dirname(filePath), { recursive: true }); + fs.writeFileSync(filePath, rows.map((row) => JSON.stringify(row)).join("\n") + "\n"); +} + +describe("diagnostic session context", () => { + beforeEach(() => { + previousStateDir = process.env.OPENCLAW_STATE_DIR; + tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-diagnostic-session-")); + process.env.OPENCLAW_STATE_DIR = tempDir; + }); + + afterEach(() => { + if (previousStateDir === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = previousStateDir; + } + if (tempDir) { + fs.rmSync(tempDir, { recursive: true, force: true }); + } + tempDir = undefined; + }); + + it("parses cron run session keys", () => { + expect(parseCronRunSessionKey("agent:clawblocker:cron:job-123:run:run-456")).toEqual({ + agentId: "clawblocker", + cronJobId: "job-123", + cronRunId: "run-456", + }); + }); + + it("formats cron job and last assistant context for stalled session logs", () => { + const stateDir = tempDir!; + fs.mkdirSync(path.join(stateDir, "cron"), { recursive: true }); + fs.writeFileSync( + path.join(stateDir, "cron", "jobs.json"), + JSON.stringify({ + jobs: [{ id: "job-123", name: "Twitter Mention Moderation Agent" }], + }), + ); + writeJsonl(path.join(stateDir, "agents", "clawblocker", "sessions", "run-456.jsonl"), [ + { message: { role: "user", content: "run" } }, + { + message: { + role: "assistant", + content: [{ type: "text", text: "There are 40\ncached mentions ready." }], + }, + }, + ]); + + const context = resolveCronSessionDiagnosticContext({ + sessionKey: "agent:clawblocker:cron:job-123:run:run-456", + }); + + expect(formatCronSessionDiagnosticFields(context)).toContain("cronJobId=job-123"); + expect(formatCronSessionDiagnosticFields(context)).toContain("cronRunId=run-456"); + expect(formatCronSessionDiagnosticFields(context)).toContain( + 'cronJob="Twitter Mention Moderation Agent"', + ); + expect(formatCronSessionDiagnosticFields(context)).toContain( + 'lastAssistant="There are 40 cached mentions ready."', + ); + expect(formatStoppedCronSessionDiagnosticFields(context)).toContain( + 'stopped="Twitter Mention Moderation Agent"', + ); + }); + + it("reads the latest assistant message from a transcript tail", () => { + const filePath = path.join(tempDir!, "session.jsonl"); + writeJsonl(filePath, [ + { message: { role: "assistant", content: "older" } }, + { message: { role: "user", content: "later user" } }, + { message: { role: "assistant", content: "newer" } }, + ]); + + expect(readLastAssistantFromSessionFile(filePath)).toBe("newer"); + }); +}); diff --git a/src/logging/diagnostic-session-context.ts b/src/logging/diagnostic-session-context.ts new file mode 100644 index 00000000000..4194be4619f --- /dev/null +++ b/src/logging/diagnostic-session-context.ts @@ -0,0 +1,201 @@ +import fs from "node:fs"; +import path from "node:path"; +import { resolveStateDir } from "../config/paths.js"; + +const SESSION_TAIL_BYTES = 64 * 1024; +const MAX_QUOTED_FIELD_CHARS = 140; + +type CronSessionContext = { + agentId?: string; + cronJobId?: string; + cronRunId?: string; + cronJobName?: string; + lastAssistant?: string; +}; + +function quoteLogField(value: string): string { + const oneLine = value.replace(/\s+/g, " ").trim(); + const truncated = + oneLine.length > MAX_QUOTED_FIELD_CHARS + ? `${oneLine.slice(0, Math.max(0, MAX_QUOTED_FIELD_CHARS - 3))}...` + : oneLine; + return `"${truncated.replace(/["\\]/g, "\\$&")}"`; +} + +export function parseCronRunSessionKey(sessionKey?: string): { + agentId?: string; + cronJobId?: string; + cronRunId?: string; +} { + const parts = sessionKey?.trim().split(":") ?? []; + if (parts[0] !== "agent") { + return {}; + } + const cronIndex = parts.indexOf("cron"); + if (cronIndex < 2) { + return {}; + } + const runIndex = parts.indexOf("run", cronIndex + 2); + return { + agentId: parts[1], + cronJobId: parts[cronIndex + 1], + cronRunId: runIndex >= 0 ? parts[runIndex + 1] : undefined, + }; +} + +function resolveSessionFile(params: { + agentId?: string; + cronRunId?: string; + activeSessionId?: string; +}): string | undefined { + const agentId = params.agentId?.trim(); + const runId = params.activeSessionId?.trim() || params.cronRunId?.trim(); + if (!agentId || !runId) { + return undefined; + } + return path.join(resolveStateDir(), "agents", agentId, "sessions", `${runId}.jsonl`); +} + +function readTailText(filePath: string): { text: string; truncated: boolean } | undefined { + let fd: number | undefined; + try { + const stat = fs.statSync(filePath); + if (!stat.isFile() || stat.size <= 0) { + return undefined; + } + const length = Math.min(stat.size, SESSION_TAIL_BYTES); + const start = Math.max(0, stat.size - length); + const buffer = Buffer.alloc(length); + fd = fs.openSync(filePath, "r"); + const read = fs.readSync(fd, buffer, 0, length, start); + return { text: buffer.subarray(0, read).toString("utf8"), truncated: start > 0 }; + } catch { + return undefined; + } finally { + if (fd !== undefined) { + try { + fs.closeSync(fd); + } catch { + // best-effort diagnostic context only + } + } + } +} + +function textFromContent(content: unknown): string | undefined { + if (typeof content === "string") { + return content; + } + if (!Array.isArray(content)) { + return undefined; + } + const texts = content + .map((part) => { + if (!part || typeof part !== "object") { + return undefined; + } + const text = (part as { text?: unknown }).text; + return typeof text === "string" ? text : undefined; + }) + .filter((text): text is string => Boolean(text?.trim())); + return texts.length ? texts.join(" ") : undefined; +} + +export function readLastAssistantFromSessionFile(filePath: string | undefined): string | undefined { + if (!filePath) { + return undefined; + } + const tail = readTailText(filePath); + if (!tail.text) { + return undefined; + } + const lines = tail.text.split(/\r?\n/).filter(Boolean); + if (tail.truncated && lines.length > 0) { + lines.shift(); + } + for (let index = lines.length - 1; index >= 0; index -= 1) { + try { + const parsed = JSON.parse(lines[index]) as { + message?: { role?: unknown; content?: unknown }; + }; + if (parsed.message?.role !== "assistant") { + continue; + } + const text = textFromContent(parsed.message.content)?.trim(); + if (text) { + return text; + } + } catch { + // Ignore partial or non-JSON diagnostic transcript lines. + } + } + return undefined; +} + +function readCronJobName(cronJobId: string | undefined): string | undefined { + if (!cronJobId) { + return undefined; + } + try { + const raw = fs.readFileSync(path.join(resolveStateDir(), "cron", "jobs.json"), "utf8"); + const parsed = JSON.parse(raw) as { jobs?: Array<{ id?: unknown; name?: unknown }> }; + const job = parsed.jobs?.find((entry) => entry.id === cronJobId); + return typeof job?.name === "string" && job.name.trim() ? job.name.trim() : undefined; + } catch { + return undefined; + } +} + +export function resolveCronSessionDiagnosticContext(params: { + sessionKey?: string; + activeSessionId?: string; +}): CronSessionContext { + const parsed = parseCronRunSessionKey(params.sessionKey); + if (!parsed.cronJobId && !parsed.cronRunId) { + return {}; + } + return { + ...parsed, + cronJobName: readCronJobName(parsed.cronJobId), + lastAssistant: readLastAssistantFromSessionFile( + resolveSessionFile({ ...parsed, activeSessionId: params.activeSessionId }), + ), + }; +} + +export function formatCronSessionDiagnosticFields(context: CronSessionContext): string { + const fields: string[] = []; + if (context.cronJobId) { + fields.push(`cronJobId=${context.cronJobId}`); + } + if (context.cronRunId) { + fields.push(`cronRunId=${context.cronRunId}`); + } + if (context.cronJobName) { + fields.push(`cronJob=${quoteLogField(context.cronJobName)}`); + } + if (context.lastAssistant) { + fields.push(`lastAssistant=${quoteLogField(context.lastAssistant)}`); + } + return fields.join(" "); +} + +export function formatStoppedCronSessionDiagnosticFields(context: CronSessionContext): string { + const fields: string[] = []; + if (context.cronJobName) { + fields.push(`stopped=${quoteLogField(context.cronJobName)}`); + } + const rest = formatCronSessionDiagnosticFields({ + cronJobId: context.cronJobId, + cronRunId: context.cronRunId, + lastAssistant: context.lastAssistant, + }); + if (rest) { + fields.push(rest); + } + return fields.join(" "); +} + +export const __testing = { + quoteLogField, +}; diff --git a/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts b/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts index a5013f059ed..bd719460a46 100644 --- a/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts +++ b/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts @@ -1,3 +1,6 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; import { beforeEach, describe, expect, it, vi } from "vitest"; const mocks = vi.hoisted(() => ({ @@ -129,6 +132,57 @@ describe("stuck session recovery", () => { expect(mocks.resetCommandLane).not.toHaveBeenCalled(); }); + it("logs stopped cron context when aborting an active embedded run", async () => { + const previousStateDir = process.env.OPENCLAW_STATE_DIR; + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-recovery-context-")); + try { + process.env.OPENCLAW_STATE_DIR = tempDir; + fs.mkdirSync(path.join(tempDir, "cron"), { recursive: true }); + fs.writeFileSync( + path.join(tempDir, "cron", "jobs.json"), + JSON.stringify({ + jobs: [{ id: "job-123", name: "Twitter Mention Moderation Agent" }], + }), + ); + fs.mkdirSync(path.join(tempDir, "agents", "clawblocker", "sessions"), { + recursive: true, + }); + fs.writeFileSync( + path.join(tempDir, "agents", "clawblocker", "sessions", "run-456.jsonl"), + JSON.stringify({ + message: { role: "assistant", content: "There are 40 cached mentions." }, + }) + "\n", + ); + mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue("run-456"); + mocks.abortEmbeddedPiRun.mockReturnValue(true); + mocks.waitForEmbeddedPiRunEnd.mockResolvedValue(true); + + await recoverStuckDiagnosticSession({ + sessionId: "run-456", + sessionKey: "agent:clawblocker:cron:job-123:run:run-456", + ageMs: 629_000, + allowActiveAbort: true, + }); + } finally { + if (previousStateDir === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = previousStateDir; + } + fs.rmSync(tempDir, { recursive: true, force: true }); + } + + expect(mocks.diag.warn).toHaveBeenCalledWith( + expect.stringContaining("action=abort_embedded_run"), + ); + expect(mocks.diag.warn).toHaveBeenCalledWith( + expect.stringContaining('stopped="Twitter Mention Moderation Agent"'), + ); + expect(mocks.diag.warn).toHaveBeenCalledWith( + expect.stringContaining('lastAssistant="There are 40 cached mentions."'), + ); + }); + it("force-clears and releases the session lane when abort cleanup does not drain", async () => { mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue("session-1"); mocks.abortEmbeddedPiRun.mockReturnValue(true); diff --git a/src/logging/diagnostic-stuck-session-recovery.runtime.ts b/src/logging/diagnostic-stuck-session-recovery.runtime.ts index b036e81e257..b75988260f2 100644 --- a/src/logging/diagnostic-stuck-session-recovery.runtime.ts +++ b/src/logging/diagnostic-stuck-session-recovery.runtime.ts @@ -8,6 +8,10 @@ import { } from "../agents/pi-embedded-runner/runs.js"; import { getCommandLaneSnapshot, resetCommandLane } from "../process/command-queue.js"; import { diagnosticLogger as diag } from "./diagnostic-runtime.js"; +import { + formatStoppedCronSessionDiagnosticFields, + resolveCronSessionDiagnosticContext, +} from "./diagnostic-session-context.js"; const STUCK_SESSION_ABORT_SETTLE_MS = 15_000; const recoveriesInFlight = new Set(); @@ -126,10 +130,16 @@ export async function recoverStuckDiagnosticSession( sessionLane && (!activeSessionId || !aborted || !drained) ? resetCommandLane(sessionLane) : 0; if (aborted || released > 0) { + const action = aborted ? "abort_embedded_run" : "release_lane"; + const stoppedFields = formatStoppedCronSessionDiagnosticFields( + resolveCronSessionDiagnosticContext({ sessionKey: params.sessionKey, activeSessionId }), + ); diag.warn( `stuck session recovery: sessionId=${params.sessionId ?? activeSessionId ?? "unknown"} sessionKey=${ params.sessionKey ?? "unknown" - } age=${Math.round(params.ageMs / 1000)}s aborted=${aborted} drained=${drained} released=${released}`, + } age=${Math.round(params.ageMs / 1000)}s action=${action} aborted=${aborted} drained=${drained} released=${released}${ + stoppedFields ? ` ${stoppedFields}` : "" + }`, ); } else { diag.warn( diff --git a/src/logging/diagnostic.test.ts b/src/logging/diagnostic.test.ts index f67830e321f..22d5d3c829e 100644 --- a/src/logging/diagnostic.test.ts +++ b/src/logging/diagnostic.test.ts @@ -332,6 +332,7 @@ describe("stuck session diagnostics threshold", () => { it("reports active sessions as stalled instead of stuck when active work stops progressing", () => { const events: DiagnosticEventPayload[] = []; const recoverStuckSession = vi.fn(); + const warnSpy = vi.spyOn(diagnosticLogger, "warn").mockImplementation(() => undefined); const unsubscribe = onDiagnosticEvent((event) => { events.push(event); }); @@ -360,6 +361,10 @@ describe("stuck session diagnostics threshold", () => { reason: "active_work_without_progress", activeWorkKind: "embedded_run", }); + expect(warnSpy).toHaveBeenCalledWith( + expect.stringContaining("lastProgress=embedded_run:started"), + ); + expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining("lastProgressAge=60s")); expect(recoverStuckSession).not.toHaveBeenCalled(); }); diff --git a/src/logging/diagnostic.ts b/src/logging/diagnostic.ts index 63ed3a9d931..c8840561499 100644 --- a/src/logging/diagnostic.ts +++ b/src/logging/diagnostic.ts @@ -23,6 +23,10 @@ import { classifySessionAttention, type SessionAttentionClassification, } from "./diagnostic-session-attention.js"; +import { + formatCronSessionDiagnosticFields, + resolveCronSessionDiagnosticContext, +} from "./diagnostic-session-context.js"; import { diagnosticSessionStates, getDiagnosticSessionState, @@ -604,6 +608,26 @@ function sessionAttentionFields(params: { }; } +function formatSessionActivityLogFields(activity: DiagnosticSessionActivitySnapshot): string { + const fields: string[] = []; + if (activity.lastProgressReason) { + fields.push(`lastProgress=${activity.lastProgressReason}`); + } + if (activity.lastProgressAgeMs !== undefined) { + fields.push(`lastProgressAge=${Math.round(activity.lastProgressAgeMs / 1000)}s`); + } + if (activity.activeToolName) { + fields.push(`activeTool=${activity.activeToolName}`); + } + if (activity.activeToolCallId) { + fields.push(`activeToolCallId=${activity.activeToolCallId}`); + } + if (activity.activeToolAgeMs !== undefined) { + fields.push(`activeToolAge=${Math.round(activity.activeToolAgeMs / 1000)}s`); + } + return fields.join(" "); +} + export function logSessionAttention( params: SessionRef & { state: SessionStateValue; @@ -660,13 +684,18 @@ export function logSessionAttention( : classification.eventType === "session.stalled" ? "stalled session" : "long-running session"; + const activityFields = formatSessionActivityLogFields(activity); + const cronFields = formatCronSessionDiagnosticFields( + resolveCronSessionDiagnosticContext({ sessionKey: state.sessionKey }), + ); + const detailFields = [activityFields, cronFields].filter(Boolean).join(" "); const message = `${label}: sessionId=${state.sessionId ?? "unknown"} sessionKey=${ state.sessionKey ?? "unknown" } state=${params.state} age=${Math.round(params.ageMs / 1000)}s queueDepth=${ state.queueDepth } reason=${classification.reason} classification=${classification.classification}${ classification.activeWorkKind ? ` activeWorkKind=${classification.activeWorkKind}` : "" - } recovery=${recoveryEligible ? "checking" : "none"}`; + }${detailFields ? ` ${detailFields}` : ""} recovery=${recoveryEligible ? "checking" : "none"}`; if (classification.eventType === "session.long_running" && state.queueDepth <= 0) { diag.debug(message); } else {