From c80ec433258016ee50583e97bd019245c36e0bea Mon Sep 17 00:00:00 2001 From: Zee Zheng Date: Sun, 31 May 2026 04:29:39 +0800 Subject: [PATCH] feat(cli): add sessions tail progress view Adds `openclaw sessions tail` as an operator-facing progress view over session trajectory events, with conservative redaction for prompt text, tool arguments, and tool result bodies. The command supports explicit session keys, store/agent scope, follow mode, relocated trajectory pointer files, and cursor-safe follow across bounded trajectory window rewrites. Documents the new sessions tail CLI surface in `docs/cli/sessions.md`. Fixes #83441. Co-authored-by: zhengzuo0-ai --- docs/cli/sessions.md | 14 + .../register.status-health-sessions.test.ts | 32 ++ .../register.status-health-sessions.ts | 33 ++ src/commands/sessions-tail.test.ts | 315 +++++++++++ src/commands/sessions-tail.ts | 532 ++++++++++++++++++ src/trajectory/export.ts | 83 +-- src/trajectory/runtime-file.ts | 86 +++ 7 files changed, 1014 insertions(+), 81 deletions(-) create mode 100644 src/commands/sessions-tail.test.ts create mode 100644 src/commands/sessions-tail.ts create mode 100644 src/trajectory/runtime-file.ts diff --git a/docs/cli/sessions.md b/docs/cli/sessions.md index 4b5a59f531b..d3ccd7b8e42 100644 --- a/docs/cli/sessions.md +++ b/docs/cli/sessions.md @@ -47,6 +47,20 @@ Scope selection: - `--store `: explicit store path (cannot be combined with `--agent` or `--all-agents`) - `--limit `: max rows to output (default `100`; `all` restores full output) +Tail human-readable trajectory progress for stored sessions: + +```bash +openclaw sessions tail +openclaw sessions tail --follow +openclaw sessions tail --session-key "agent:main:telegram:direct:123" --tail 25 +openclaw sessions --agent work tail --follow +openclaw sessions --all-agents tail --follow +``` + +`openclaw sessions tail` renders recent trajectory JSONL events as compact progress lines. Without `--session-key`, it tails running sessions first, then the latest stored session. `--tail ` controls how many existing events print before follow mode; the default is `80`, and `0` starts at the current end. `--follow` keeps watching the selected trajectory files, including relocated files referenced by `.trajectory-path.json`. + +The progress view is intentionally conservative: prompt text, tool arguments, and tool result bodies are not printed. Tool calls show the tool name with `{...redacted...}`; tool results show status such as `ok`, `error`, or `done`; model completion lines show provider/model and terminal status. + Export a trajectory bundle for a stored session: ```bash diff --git a/src/cli/program/register.status-health-sessions.test.ts b/src/cli/program/register.status-health-sessions.test.ts index 737c6540e88..0ac7970b306 100644 --- a/src/cli/program/register.status-health-sessions.test.ts +++ b/src/cli/program/register.status-health-sessions.test.ts @@ -7,6 +7,7 @@ const mocks = vi.hoisted(() => ({ healthCommand: vi.fn(), sessionsCommand: vi.fn(), sessionsCleanupCommand: vi.fn(), + sessionsTailCommand: vi.fn(), exportTrajectoryCommand: vi.fn(), commitmentsListCommand: vi.fn(), commitmentsDismissCommand: vi.fn(), @@ -31,6 +32,7 @@ const statusCommand = mocks.statusCommand; const healthCommand = mocks.healthCommand; const sessionsCommand = mocks.sessionsCommand; const sessionsCleanupCommand = mocks.sessionsCleanupCommand; +const sessionsTailCommand = mocks.sessionsTailCommand; const exportTrajectoryCommand = mocks.exportTrajectoryCommand; const commitmentsListCommand = mocks.commitmentsListCommand; const commitmentsDismissCommand = mocks.commitmentsDismissCommand; @@ -88,6 +90,10 @@ vi.mock("../../commands/sessions-cleanup.js", () => ({ sessionsCleanupCommand: mocks.sessionsCleanupCommand, })); +vi.mock("../../commands/sessions-tail.js", () => ({ + sessionsTailCommand: mocks.sessionsTailCommand, +})); + vi.mock("../../commands/export-trajectory.js", () => ({ exportTrajectoryCommand: mocks.exportTrajectoryCommand, })); @@ -134,6 +140,7 @@ describe("registerStatusHealthSessionsCommands", () => { healthCommand.mockResolvedValue(undefined); sessionsCommand.mockResolvedValue(undefined); sessionsCleanupCommand.mockResolvedValue(undefined); + sessionsTailCommand.mockResolvedValue(undefined); exportTrajectoryCommand.mockResolvedValue(undefined); commitmentsListCommand.mockResolvedValue(undefined); commitmentsDismissCommand.mockResolvedValue(undefined); @@ -345,6 +352,31 @@ describe("registerStatusHealthSessionsCommands", () => { }); }); + it("runs sessions tail with forwarded progress options", async () => { + await runCli([ + "sessions", + "--store", + "/tmp/sessions.json", + "--agent", + "work", + "tail", + "--session-key", + "agent:main:telegram:direct:owner", + "--tail", + "5", + "--follow", + ]); + + expectCommandOptions(sessionsTailCommand, { + sessionKey: "agent:main:telegram:direct:owner", + store: "/tmp/sessions.json", + agent: "work", + allAgents: false, + follow: true, + tail: "5", + }); + }); + it("runs sessions export-trajectory with owner-routable export options", async () => { await runCli([ "sessions", diff --git a/src/cli/program/register.status-health-sessions.ts b/src/cli/program/register.status-health-sessions.ts index d76d0456d6d..828577a3824 100644 --- a/src/cli/program/register.status-health-sessions.ts +++ b/src/cli/program/register.status-health-sessions.ts @@ -286,6 +286,39 @@ export function registerStatusHealthSessionsCommands(program: Command) { }); }); + sessionsCmd + .command("tail") + .description("Tail human-readable session trajectory progress") + .option("--session-key ", "Session key to tail (default: active sessions or latest)") + .option("--tail ", "Number of existing trajectory events to show", "80") + .option("--follow", "Continue following for new trajectory events", false) + .option("--store ", "Path to session store (default: resolved from config)") + .option("--agent ", "Agent id to inspect (default: configured default agent)") + .option("--all-agents", "Aggregate sessions across all configured agents", false) + .action(async (opts, command) => { + const parentOpts = command.parent?.opts() as + | { + store?: string; + agent?: string; + allAgents?: boolean; + } + | undefined; + await runCommandWithRuntime(defaultRuntime, async () => { + const { sessionsTailCommand } = await import("../../commands/sessions-tail.js"); + await sessionsTailCommand( + { + sessionKey: opts.sessionKey as string | undefined, + store: (opts.store as string | undefined) ?? parentOpts?.store, + agent: (opts.agent as string | undefined) ?? parentOpts?.agent, + allAgents: Boolean(opts.allAgents || parentOpts?.allAgents), + follow: Boolean(opts.follow), + tail: opts.tail as string | undefined, + }, + defaultRuntime, + ); + }); + }); + sessionsCmd .command("export-trajectory") .description("Export a redacted trajectory bundle for a stored session") diff --git a/src/commands/sessions-tail.test.ts b/src/commands/sessions-tail.test.ts new file mode 100644 index 00000000000..d3b60c89594 --- /dev/null +++ b/src/commands/sessions-tail.test.ts @@ -0,0 +1,315 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { RuntimeEnv } from "../runtime.js"; +import { resolveTrajectoryPointerFilePath } from "../trajectory/paths.js"; +import type { TrajectoryEvent } from "../trajectory/types.js"; +import { sessionsTailCommand } from "./sessions-tail.js"; + +const mocks = vi.hoisted(() => ({ + getRuntimeConfig: vi.fn(() => ({})), +})); + +vi.mock("../config/config.js", () => ({ + getRuntimeConfig: mocks.getRuntimeConfig, +})); + +const sessionKey = "agent:main:telegram:direct:owner"; + +function makeRuntime(): RuntimeEnv { + return { + log: vi.fn(), + error: vi.fn(), + exit: vi.fn(), + }; +} + +function makeEvent( + params: Partial & { type: string; ts: string }, +): TrajectoryEvent { + return { + traceSchema: "openclaw-trajectory", + schemaVersion: 1, + traceId: "trace-1", + source: "runtime", + seq: 1, + sessionId: "session-one", + sessionKey, + ...params, + }; +} + +function writeJsonl(filePath: string, events: TrajectoryEvent[]): void { + fs.writeFileSync(filePath, `${events.map((event) => JSON.stringify(event)).join("\n")}\n`); +} + +function appendJsonl(filePath: string, event: TrajectoryEvent): void { + fs.appendFileSync(filePath, `${JSON.stringify(event)}\n`); +} + +function runtimeOutput(runtime: RuntimeEnv): string { + return vi + .mocked(runtime.log) + .mock.calls.map((call) => String(call[0])) + .join("\n"); +} + +async function waitForRuntimeOutput( + runtime: RuntimeEnv, + pattern: string, + timeoutMs = 3_000, +): Promise { + const startedAt = Date.now(); + while (!runtimeOutput(runtime).includes(pattern)) { + if (Date.now() - startedAt > timeoutMs) { + throw new Error(`Timed out waiting for output containing ${pattern}`); + } + await new Promise((resolve) => setTimeout(resolve, 25)); + } +} + +describe("sessionsTailCommand", () => { + let tmpDir: string; + let storePath: string; + let trajectoryPath: string; + let previousStateDir: string | undefined; + + beforeEach(() => { + previousStateDir = process.env.OPENCLAW_STATE_DIR; + tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-sessions-tail-")); + process.env.OPENCLAW_STATE_DIR = path.join(tmpDir, "state"); + mocks.getRuntimeConfig.mockReturnValue({ + agents: { + list: [{ id: "main" }, { id: "ops" }], + }, + }); + storePath = path.join(tmpDir, "sessions.json"); + trajectoryPath = path.join(tmpDir, "session-one.trajectory.jsonl"); + fs.writeFileSync( + storePath, + `${JSON.stringify({ + [sessionKey]: { + sessionId: "session-one", + sessionFile: "session-one.jsonl", + updatedAt: 2, + status: "running", + }, + })}\n`, + ); + }); + + afterEach(() => { + if (previousStateDir === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = previousStateDir; + } + fs.rmSync(tmpDir, { recursive: true, force: true }); + }); + + it("renders compact redacted progress lines", async () => { + const runtime = makeRuntime(); + writeJsonl(trajectoryPath, [ + makeEvent({ + type: "tool.call", + ts: "2026-05-18T12:04:18.000Z", + data: { name: "bash", arguments: { command: "echo SECRET" } }, + }), + makeEvent({ + type: "tool.result", + ts: "2026-05-18T12:04:21.000Z", + data: { name: "bash", success: true, output: "SECRET" }, + }), + makeEvent({ + type: "model.completed", + ts: "2026-05-18T12:04:29.000Z", + provider: "openai", + modelId: "gpt-5.2", + }), + ]); + + await sessionsTailCommand({ store: storePath, sessionKey }, runtime); + + const output = vi + .mocked(runtime.log) + .mock.calls.map((call) => String(call[0])) + .join("\n"); + expect(output).toContain("12:04:18"); + expect(output).toContain("tool.call"); + expect(output).toContain("bash {...redacted...}"); + expect(output).toContain("tool.result"); + expect(output).toContain("bash ok"); + expect(output).toContain("model.completed"); + expect(output).toContain("openai/gpt-5.2 done"); + expect(output).not.toContain("SECRET"); + }); + + it("honors the tail count before rendering existing trajectory events", async () => { + const runtime = makeRuntime(); + writeJsonl(trajectoryPath, [ + makeEvent({ type: "session.started", ts: "2026-05-18T12:04:17.000Z" }), + makeEvent({ + type: "tool.call", + ts: "2026-05-18T12:04:18.000Z", + data: { name: "bash" }, + }), + makeEvent({ + type: "tool.result", + ts: "2026-05-18T12:04:21.000Z", + data: { name: "bash", success: true }, + }), + ]); + + await sessionsTailCommand({ store: storePath, sessionKey, tail: "2" }, runtime); + + const output = vi + .mocked(runtime.log) + .mock.calls.map((call) => String(call[0])) + .join("\n"); + expect(output).not.toContain("session.started"); + expect(output).toContain("tool.call"); + expect(output).toContain("tool.result"); + }); + + it("uses a session trajectory pointer for relocated runtime files", async () => { + const runtime = makeRuntime(); + const relocatedDir = path.join(tmpDir, "relocated-trajectories"); + const relocatedTrajectoryPath = path.join(relocatedDir, "session-one.jsonl"); + fs.mkdirSync(relocatedDir, { recursive: true }); + fs.writeFileSync( + resolveTrajectoryPointerFilePath(path.join(tmpDir, "session-one.jsonl")), + `${JSON.stringify({ + traceSchema: "openclaw-trajectory-pointer", + schemaVersion: 1, + sessionId: "session-one", + runtimeFile: relocatedTrajectoryPath, + })}\n`, + ); + writeJsonl(relocatedTrajectoryPath, [ + makeEvent({ + type: "tool.result", + ts: "2026-05-18T12:04:21.000Z", + data: { name: "bash", success: true }, + }), + ]); + + await sessionsTailCommand({ store: storePath, sessionKey }, runtime); + + const output = runtimeOutput(runtime); + expect(output).toContain("tool.result"); + expect(output).toContain("bash ok"); + expect(output).not.toContain("No sessions found"); + }); + + it("preserves events appended while follow mode starts", async () => { + const runtime = makeRuntime(); + writeJsonl(trajectoryPath, [ + makeEvent({ type: "session.started", ts: "2026-05-18T12:04:17.000Z" }), + ]); + const appendedEvent = makeEvent({ + type: "tool.result", + ts: "2026-05-18T12:04:21.000Z", + data: { name: "bash", success: true }, + }); + let appended = false; + vi.mocked(runtime.log).mockImplementation((message) => { + if (!appended && String(message).includes("session.started")) { + appended = true; + appendJsonl(trajectoryPath, appendedEvent); + } + }); + + const run = sessionsTailCommand( + { store: storePath, sessionKey, tail: "1", follow: true }, + runtime, + ); + try { + await waitForRuntimeOutput(runtime, "bash ok"); + } finally { + process.emit("SIGTERM", "SIGTERM"); + await run; + } + + const output = runtimeOutput(runtime); + expect(output).toContain("session.started"); + expect(output).toContain("tool.result"); + expect(output).toContain("bash ok"); + }); + + it("continues following when a bounded trajectory window is rewritten", async () => { + const runtime = makeRuntime(); + writeJsonl(trajectoryPath, [ + makeEvent({ + sourceSeq: 1, + type: "session.started", + ts: "2026-05-18T12:04:17.000Z", + }), + ]); + const rewrittenEvent = makeEvent({ + sourceSeq: 2, + type: "tool.result", + ts: "2026-05-18T12:04:21.000Z", + data: { name: "python", success: true }, + }); + let rewritten = false; + vi.mocked(runtime.log).mockImplementation((message) => { + if (!rewritten && String(message).includes("session.started")) { + rewritten = true; + const nextPath = path.join(tmpDir, "session-one.next.trajectory.jsonl"); + writeJsonl(nextPath, [rewrittenEvent]); + fs.renameSync(nextPath, trajectoryPath); + } + }); + + const run = sessionsTailCommand( + { store: storePath, sessionKey, tail: "1", follow: true }, + runtime, + ); + try { + await waitForRuntimeOutput(runtime, "python ok"); + } finally { + process.emit("SIGTERM", "SIGTERM"); + await run; + } + + const output = runtimeOutput(runtime); + expect(output).toContain("tool.result"); + expect(output).toContain("python ok"); + }); + + it("resolves the target store from a fully qualified non-default agent session key", async () => { + const runtime = makeRuntime(); + const opsSessionKey = "agent:ops:telegram:direct:owner"; + const opsSessionsDir = path.join(process.env.OPENCLAW_STATE_DIR!, "agents", "ops", "sessions"); + fs.mkdirSync(opsSessionsDir, { recursive: true }); + fs.writeFileSync( + path.join(opsSessionsDir, "sessions.json"), + `${JSON.stringify({ + [opsSessionKey]: { + sessionId: "ops-session", + sessionFile: "ops-session.jsonl", + updatedAt: 3, + status: "done", + }, + })}\n`, + ); + writeJsonl(path.join(opsSessionsDir, "ops-session.trajectory.jsonl"), [ + makeEvent({ + sessionId: "ops-session", + sessionKey: opsSessionKey, + type: "tool.result", + ts: "2026-05-18T12:04:21.000Z", + data: { name: "bash", success: true }, + }), + ]); + + await sessionsTailCommand({ sessionKey: opsSessionKey }, runtime); + + const output = runtimeOutput(runtime); + expect(output).toContain("agent:ops:telegram:direct:own…"); + expect(output).toContain("tool.result"); + expect(output).toContain("bash ok"); + expect(output).not.toContain("No sessions found"); + }); +}); diff --git a/src/commands/sessions-tail.ts b/src/commands/sessions-tail.ts new file mode 100644 index 00000000000..e0d97914ed4 --- /dev/null +++ b/src/commands/sessions-tail.ts @@ -0,0 +1,532 @@ +import fs from "node:fs"; +import path from "node:path"; +import { getRuntimeConfig } from "../config/config.js"; +import { loadSessionStore } from "../config/sessions.js"; +import { resolveSessionFilePath } from "../config/sessions/paths.js"; +import type { SessionEntry } from "../config/sessions/types.js"; +import { formatErrorMessage } from "../infra/errors.js"; +import { resolveAgentIdFromSessionKey } from "../routing/session-key.js"; +import type { RuntimeEnv } from "../runtime.js"; +import { resolveTrajectoryFilePath } from "../trajectory/paths.js"; +import { resolveTrajectoryRuntimeFile } from "../trajectory/runtime-file.js"; +import type { TrajectoryEvent } from "../trajectory/types.js"; +import { resolveSessionStoreTargetsOrExit } from "./session-store-targets.js"; +import { shortenText } from "./text-format.js"; + +type SessionsTailOptions = { + store?: string; + agent?: string; + allAgents?: boolean; + sessionKey?: string; + follow?: boolean; + tail?: string | number; +}; + +type TailSelection = { + agentId: string; + key: string; + entry: SessionEntry; + storePath: string; + trajectoryPath: string; +}; + +type FollowState = { + cursor: TrajectoryCursor | null; + fileState: FollowFileState | null; + offset: number; + pending: string; + selection: TailSelection; +}; + +type TrajectorySnapshot = { + events: TrajectoryEvent[]; + fileState: FollowFileState | null; + offset: number; +}; + +type FollowFileState = { + dev: number; + ino: number; + mtimeMs: number; + size: number; +}; + +type TrajectoryCursor = { + seq: number | null; + tsMs: number; +}; + +const DEFAULT_TAIL_COUNT = 80; +const SESSION_KEY_PAD = 30; +const EVENT_TYPE_PAD = 16; +const FOLLOW_INTERVAL_MS = 1_000; + +function parseTailCount(value: string | number | undefined): number | null { + if (value === undefined) { + return DEFAULT_TAIL_COUNT; + } + if (typeof value === "number") { + return Number.isInteger(value) && value >= 0 ? value : null; + } + const trimmed = value.trim(); + if (!/^\d+$/.test(trimmed)) { + return null; + } + return Number.parseInt(trimmed, 10); +} + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} + +function toOptionalString(value: unknown): string | undefined { + return typeof value === "string" && value.trim() ? value.trim() : undefined; +} + +function isTrajectoryEvent(value: unknown): value is TrajectoryEvent { + return ( + isRecord(value) && + value.traceSchema === "openclaw-trajectory" && + value.schemaVersion === 1 && + typeof value.type === "string" && + typeof value.ts === "string" && + typeof value.sessionId === "string" + ); +} + +function parseTrajectoryEventLine(line: string): TrajectoryEvent | null { + const trimmed = line.trim(); + if (!trimmed) { + return null; + } + try { + const parsed = JSON.parse(trimmed) as unknown; + return isTrajectoryEvent(parsed) ? parsed : null; + } catch { + return null; + } +} + +function parseTrajectoryEventLines(lines: string[]): TrajectoryEvent[] { + return lines.flatMap((line) => { + const event = parseTrajectoryEventLine(line); + return event ? [event] : []; + }); +} + +function eventSequence(event: TrajectoryEvent): number | null { + const seq = event.sourceSeq ?? event.seq; + return Number.isFinite(seq) ? seq : null; +} + +function eventTimestampMs(event: TrajectoryEvent): number { + const parsed = Date.parse(event.ts); + return Number.isFinite(parsed) ? parsed : Number.NEGATIVE_INFINITY; +} + +function eventCursor(event: TrajectoryEvent): TrajectoryCursor { + return { + seq: eventSequence(event), + tsMs: eventTimestampMs(event), + }; +} + +function compareCursors(left: TrajectoryCursor, right: TrajectoryCursor): number { + if (left.seq !== null && right.seq !== null && left.seq !== right.seq) { + return left.seq - right.seq; + } + const byTimestamp = left.tsMs - right.tsMs; + if (byTimestamp !== 0) { + return byTimestamp; + } + if (left.seq !== null && right.seq !== null) { + return left.seq - right.seq; + } + return 0; +} + +function maxCursorValue( + current: TrajectoryCursor | null, + candidate: TrajectoryCursor, +): TrajectoryCursor { + return !current || compareCursors(candidate, current) > 0 ? candidate : current; +} + +function maxCursor(current: TrajectoryCursor | null, event: TrajectoryEvent): TrajectoryCursor { + return maxCursorValue(current, eventCursor(event)); +} + +function maxCursorFromEvents(events: TrajectoryEvent[]): TrajectoryCursor | null { + return events.reduce((cursor, event) => maxCursor(cursor, event), null); +} + +function eventsAfterCursor( + events: TrajectoryEvent[], + cursor: TrajectoryCursor | null, +): TrajectoryEvent[] { + if (!cursor) { + return events; + } + return events.filter((event) => compareCursors(eventCursor(event), cursor) > 0); +} + +function formatTimestamp(ts: string): string { + const date = new Date(ts); + if (Number.isNaN(date.getTime())) { + return "--:--:--"; + } + return date.toISOString().slice(11, 19); +} + +function modelLabel(event: TrajectoryEvent): string | undefined { + const provider = event.provider?.trim(); + const model = event.modelId?.trim(); + if (provider && model) { + return `${provider}/${model}`; + } + return model || provider || undefined; +} + +function toolName(data: Record | undefined): string { + return toOptionalString(data?.name) ?? toOptionalString(data?.toolName) ?? "tool"; +} + +function resultStatus(data: Record | undefined): string { + if (data?.success === true) { + return "ok"; + } + if (data?.success === false || data?.isError === true) { + return "error"; + } + return toOptionalString(data?.status) ?? "done"; +} + +function modelCompletionStatus(data: Record | undefined): string { + if (data?.timedOut === true) { + return "timeout"; + } + if (data?.aborted === true) { + return "aborted"; + } + if (toOptionalString(data?.promptError)) { + return "error"; + } + return "done"; +} + +function safePreview(event: TrajectoryEvent): string { + const data = event.data; + switch (event.type) { + case "session.started": + return "session started"; + case "context.compiled": { + const tools = Array.isArray(data?.tools) ? data.tools.length : undefined; + return tools === undefined ? "context compiled" : `context compiled (${tools} tools)`; + } + case "prompt.submitted": + return "prompt submitted"; + case "prompt.skipped": { + const reason = toOptionalString(data?.reason); + return `prompt skipped${reason ? `: ${reason}` : ""}`; + } + case "tool.call": + return `${toolName(data)} {...redacted...}`; + case "tool.timeout": + return `${toolName(data)} timeout`; + case "tool.result": + return `${toolName(data)} ${resultStatus(data)}`; + case "model.completed": { + const model = modelLabel(event); + const status = modelCompletionStatus(data); + return model ? `${model} ${status}` : status; + } + case "session.ended": + return toOptionalString(data?.status) ?? "ended"; + case "trace.truncated": + return "trajectory truncated"; + default: + return toOptionalString(data?.status) ?? toOptionalString(data?.name) ?? ""; + } +} + +function formatProgressLine(event: TrajectoryEvent): string { + const sessionLabel = shortenText(event.sessionKey ?? event.sessionId, SESSION_KEY_PAD).padEnd( + SESSION_KEY_PAD, + ); + const typeLabel = shortenText(event.type, EVENT_TYPE_PAD).padEnd(EVENT_TYPE_PAD); + const preview = safePreview(event); + return [formatTimestamp(event.ts), typeLabel, sessionLabel, preview].join(" ").trimEnd(); +} + +function readTrajectorySnapshot(filePath: string): TrajectorySnapshot { + try { + const stat = fs.statSync(filePath); + const text = fs.readFileSync(filePath, "utf8"); + return { + events: parseTrajectoryEventLines(text.split(/\r?\n/u)), + fileState: fileStateFromStat(stat), + offset: Buffer.byteLength(text, "utf8"), + }; + } catch (error) { + if ((error as NodeJS.ErrnoException).code === "ENOENT") { + return { events: [], fileState: null, offset: 0 }; + } + throw error; + } +} + +function renderEvents(events: TrajectoryEvent[], runtime: RuntimeEnv): TrajectoryCursor | null { + let cursor: TrajectoryCursor | null = null; + for (const event of events) { + runtime.log(formatProgressLine(event)); + cursor = maxCursor(cursor, event); + } + return cursor; +} + +function fileStateFromStat(stat: fs.Stats): FollowFileState { + return { + dev: stat.dev, + ino: stat.ino, + mtimeMs: stat.mtimeMs, + size: stat.size, + }; +} + +function sameFileIdentity(left: FollowFileState | null, right: FollowFileState): boolean { + return Boolean(left && left.dev === right.dev && left.ino === right.ino); +} + +function readFollowFileState(filePath: string): FollowFileState | null { + try { + return fileStateFromStat(fs.statSync(filePath)); + } catch (error) { + if ((error as NodeJS.ErrnoException).code === "ENOENT") { + return null; + } + throw error; + } +} + +function isRunningSession(entry: SessionEntry): boolean { + return entry.status === "running" || entry.acp?.state === "running"; +} + +function compareSelectionsByUpdatedAt(a: TailSelection, b: TailSelection): number { + return (b.entry.updatedAt ?? 0) - (a.entry.updatedAt ?? 0); +} + +async function buildTailSelection(params: { + agentId: string; + entry: SessionEntry; + key: string; + storePath: string; +}): Promise { + const sessionsDir = path.dirname(params.storePath); + const sessionFile = resolveSessionFilePath(params.entry.sessionId, params.entry, { + agentId: params.agentId, + sessionsDir, + }); + const trajectoryPath = + (await resolveTrajectoryRuntimeFile({ + sessionFile, + sessionId: params.entry.sessionId, + })) ?? + resolveTrajectoryFilePath({ + sessionFile, + sessionId: params.entry.sessionId, + }); + return { + agentId: params.agentId, + entry: params.entry, + key: params.key, + storePath: params.storePath, + trajectoryPath, + }; +} + +function selectSessionsToTail(selections: TailSelection[], sessionKey?: string): TailSelection[] { + const requested = sessionKey?.trim(); + if (requested) { + return selections.filter((selection) => selection.key === requested); + } + + const running = selections.filter((selection) => isRunningSession(selection.entry)); + if (running.length > 0) { + return running.toSorted(compareSelectionsByUpdatedAt); + } + + const latest = selections.toSorted(compareSelectionsByUpdatedAt)[0]; + return latest ? [latest] : []; +} + +function statFileSize(filePath: string): number { + try { + return fs.statSync(filePath).size; + } catch (error) { + if ((error as NodeJS.ErrnoException).code === "ENOENT") { + return 0; + } + throw error; + } +} + +function readNewFollowEvents(state: FollowState): TrajectoryEvent[] { + const fileState = readFollowFileState(state.selection.trajectoryPath); + if (!fileState) { + state.fileState = null; + state.offset = 0; + state.pending = ""; + return []; + } + + const replaced = !sameFileIdentity(state.fileState, fileState); + const truncated = fileState.size < state.offset; + const possiblyRewrittenSameSize = + fileState.size === state.offset && state.fileState?.mtimeMs !== fileState.mtimeMs; + + if (replaced || truncated || possiblyRewrittenSameSize) { + const snapshot = readTrajectorySnapshot(state.selection.trajectoryPath); + state.fileState = snapshot.fileState; + state.offset = snapshot.offset; + state.pending = ""; + return eventsAfterCursor(snapshot.events, state.cursor); + } + + if (fileState.size === state.offset) { + state.fileState = fileState; + return []; + } + + const fd = fs.openSync(state.selection.trajectoryPath, "r"); + try { + const buffer = Buffer.alloc(fileState.size - state.offset); + fs.readSync(fd, buffer, 0, buffer.length, state.offset); + state.offset = fileState.size; + state.fileState = fileState; + const combined = `${state.pending}${buffer.toString("utf8")}`; + const lines = combined.split(/\r?\n/u); + state.pending = lines.pop() ?? ""; + return parseTrajectoryEventLines(lines); + } finally { + fs.closeSync(fd); + } +} + +function renderFollowEvents( + events: TrajectoryEvent[], + state: FollowState, + runtime: RuntimeEnv, +): void { + const cursor = renderEvents(events, runtime); + if (cursor) { + state.cursor = maxCursorValue(state.cursor, cursor); + } +} + +async function followSelections( + selections: TailSelection[], + runtime: RuntimeEnv, + initialSnapshots: Map, +): Promise { + const states = selections.map((selection): FollowState => { + const snapshot = initialSnapshots.get(selection.trajectoryPath); + return { + cursor: snapshot ? maxCursorFromEvents(snapshot.events) : null, + fileState: snapshot?.fileState ?? readFollowFileState(selection.trajectoryPath), + offset: snapshot?.offset ?? statFileSize(selection.trajectoryPath), + pending: "", + selection, + }; + }); + + await new Promise((resolve) => { + const interval = setInterval(() => { + for (const state of states) { + try { + renderFollowEvents(readNewFollowEvents(state), state, runtime); + } catch (error) { + runtime.error( + `Failed to read trajectory progress for ${state.selection.key}: ${formatErrorMessage( + error, + )}`, + ); + } + } + }, FOLLOW_INTERVAL_MS); + + const stop = () => { + clearInterval(interval); + process.off("SIGINT", stop); + process.off("SIGTERM", stop); + resolve(); + }; + process.once("SIGINT", stop); + process.once("SIGTERM", stop); + }); +} + +function resolveTailTargetAgent(opts: SessionsTailOptions): string | undefined { + if (opts.agent?.trim() || opts.store?.trim() || opts.allAgents === true) { + return opts.agent; + } + return opts.sessionKey?.trim() ? resolveAgentIdFromSessionKey(opts.sessionKey) : undefined; +} + +export async function sessionsTailCommand( + opts: SessionsTailOptions, + runtime: RuntimeEnv, +): Promise { + const tailCount = parseTailCount(opts.tail); + if (tailCount === null) { + runtime.error("--tail must be a non-negative integer, for example --tail 25."); + runtime.exit(1); + return; + } + + const cfg = getRuntimeConfig(); + const targets = resolveSessionStoreTargetsOrExit({ + cfg, + opts: { + store: opts.store, + agent: resolveTailTargetAgent(opts), + allAgents: opts.allAgents, + }, + runtime, + }); + if (!targets) { + return; + } + + const selections: TailSelection[] = []; + for (const target of targets) { + const store = loadSessionStore(target.storePath); + for (const [key, entry] of Object.entries(store)) { + selections.push( + await buildTailSelection({ + agentId: target.agentId, + entry, + key, + storePath: target.storePath, + }), + ); + } + } + const selected = selectSessionsToTail(selections, opts.sessionKey); + if (selected.length === 0) { + const suffix = opts.sessionKey ? ` for ${opts.sessionKey}` : ""; + runtime.log(`No sessions found${suffix}.`); + return; + } + + const followSnapshots = new Map(); + for (const selection of selected) { + const snapshot = readTrajectorySnapshot(selection.trajectoryPath); + followSnapshots.set(selection.trajectoryPath, snapshot); + renderEvents(tailCount > 0 ? snapshot.events.slice(-tailCount) : [], runtime); + } + + if (opts.follow) { + await followSelections(selected, runtime, followSnapshots); + } +} diff --git a/src/trajectory/export.ts b/src/trajectory/export.ts index a0ea4572307..a50626f3b7c 100644 --- a/src/trajectory/export.ts +++ b/src/trajectory/export.ts @@ -19,12 +19,8 @@ import { } from "../logging/diagnostic-support-redaction.js"; import { isRecord } from "../shared/record-coerce.js"; import { safeJsonStringify } from "../utils/safe-json.js"; -import { - TRAJECTORY_RUNTIME_FILE_MAX_BYTES, - resolveTrajectoryFilePath, - resolveTrajectoryPointerFilePath, - safeTrajectorySessionFileName, -} from "./paths.js"; +import { TRAJECTORY_RUNTIME_FILE_MAX_BYTES, safeTrajectorySessionFileName } from "./paths.js"; +import { isRegularNonSymlinkFile, resolveTrajectoryRuntimeFile } from "./runtime-file.js"; import type { TrajectoryBundleManifest, TrajectoryBundleWarning, @@ -330,81 +326,6 @@ function summarizeJsonlWarnings(warnings: JsonlParseWarning[]): TrajectoryBundle return [...byKey.values()]; } -async function isRegularNonSymlinkFile(filePath: string): Promise { - try { - const linkStat = await fsp.lstat(filePath); - if (linkStat.isSymbolicLink() || !linkStat.isFile()) { - return false; - } - const stat = await fsp.stat(filePath); - return stat.isFile() && stat.dev === linkStat.dev && stat.ino === linkStat.ino; - } catch { - return false; - } -} - -async function readRuntimePointerFile( - sessionFile: string, - sessionId: string, -): Promise { - const pointerPath = resolveTrajectoryPointerFilePath(sessionFile); - if (!(await isRegularNonSymlinkFile(pointerPath))) { - return undefined; - } - try { - const parsed = JSON.parse(await fsp.readFile(pointerPath, "utf8")) as unknown; - if (!isRecord(parsed)) { - return undefined; - } - if (parsed.sessionId !== sessionId || typeof parsed.runtimeFile !== "string") { - return undefined; - } - const runtimeFile = path.resolve(parsed.runtimeFile); - const safeRuntimeFileName = `${safeTrajectorySessionFileName(sessionId)}.jsonl`; - const defaultRuntimeFile = path.resolve( - resolveTrajectoryFilePath({ - env: {}, - sessionFile, - sessionId, - }), - ); - if (runtimeFile !== defaultRuntimeFile && path.basename(runtimeFile) !== safeRuntimeFileName) { - return undefined; - } - return runtimeFile; - } catch { - return undefined; - } -} - -async function resolveTrajectoryRuntimeFile(params: { - runtimeFile?: string; - sessionFile: string; - sessionId: string; -}): Promise { - if (params.runtimeFile) { - return params.runtimeFile; - } - const candidates = [ - await readRuntimePointerFile(params.sessionFile, params.sessionId), - resolveTrajectoryFilePath({ - env: {}, - sessionFile: params.sessionFile, - sessionId: params.sessionId, - }), - resolveTrajectoryFilePath({ - sessionFile: params.sessionFile, - sessionId: params.sessionId, - }), - ].filter((candidate): candidate is string => Boolean(candidate)); - for (const candidate of candidates) { - if (await isRegularNonSymlinkFile(candidate)) { - return candidate; - } - } - return undefined; -} - function normalizeTimestamp(value: unknown): string { if (typeof value === "number" && Number.isFinite(value)) { const parsed = new Date(value); diff --git a/src/trajectory/runtime-file.ts b/src/trajectory/runtime-file.ts new file mode 100644 index 00000000000..75480337e46 --- /dev/null +++ b/src/trajectory/runtime-file.ts @@ -0,0 +1,86 @@ +import fsp from "node:fs/promises"; +import path from "node:path"; +import { + resolveTrajectoryFilePath, + resolveTrajectoryPointerFilePath, + safeTrajectorySessionFileName, +} from "./paths.js"; + +function isRecord(value: unknown): value is Record { + return Boolean(value) && typeof value === "object" && !Array.isArray(value); +} + +export async function isRegularNonSymlinkFile(filePath: string): Promise { + try { + const linkStat = await fsp.lstat(filePath); + if (linkStat.isSymbolicLink() || !linkStat.isFile()) { + return false; + } + const stat = await fsp.stat(filePath); + return stat.isFile() && stat.dev === linkStat.dev && stat.ino === linkStat.ino; + } catch { + return false; + } +} + +async function readRuntimePointerFile( + sessionFile: string, + sessionId: string, +): Promise { + const pointerPath = resolveTrajectoryPointerFilePath(sessionFile); + if (!(await isRegularNonSymlinkFile(pointerPath))) { + return undefined; + } + try { + const parsed = JSON.parse(await fsp.readFile(pointerPath, "utf8")) as unknown; + if (!isRecord(parsed)) { + return undefined; + } + if (parsed.sessionId !== sessionId || typeof parsed.runtimeFile !== "string") { + return undefined; + } + const runtimeFile = path.resolve(parsed.runtimeFile); + const safeRuntimeFileName = `${safeTrajectorySessionFileName(sessionId)}.jsonl`; + const defaultRuntimeFile = path.resolve( + resolveTrajectoryFilePath({ + env: {}, + sessionFile, + sessionId, + }), + ); + if (runtimeFile !== defaultRuntimeFile && path.basename(runtimeFile) !== safeRuntimeFileName) { + return undefined; + } + return runtimeFile; + } catch { + return undefined; + } +} + +export async function resolveTrajectoryRuntimeFile(params: { + runtimeFile?: string; + sessionFile: string; + sessionId: string; +}): Promise { + if (params.runtimeFile) { + return params.runtimeFile; + } + const candidates = [ + await readRuntimePointerFile(params.sessionFile, params.sessionId), + resolveTrajectoryFilePath({ + env: {}, + sessionFile: params.sessionFile, + sessionId: params.sessionId, + }), + resolveTrajectoryFilePath({ + sessionFile: params.sessionFile, + sessionId: params.sessionId, + }), + ].filter((candidate): candidate is string => Boolean(candidate)); + for (const candidate of candidates) { + if (await isRegularNonSymlinkFile(candidate)) { + return candidate; + } + } + return undefined; +}