feat(cli): add sessions tail progress view

Adds `openclaw sessions tail` as an operator-facing progress view over session trajectory events, with conservative redaction for prompt text, tool arguments, and tool result bodies. The command supports explicit session keys, store/agent scope, follow mode, relocated trajectory pointer files, and cursor-safe follow across bounded trajectory window rewrites.

Documents the new sessions tail CLI surface in `docs/cli/sessions.md`.

Fixes #83441.

Co-authored-by: zhengzuo0-ai <zheng.zuo0@gmail.com>
This commit is contained in:
Zee Zheng
2026-05-31 04:29:39 +08:00
committed by GitHub
parent b6891d284d
commit c80ec43325
7 changed files with 1014 additions and 81 deletions

View File

@@ -47,6 +47,20 @@ Scope selection:
- `--store <path>`: explicit store path (cannot be combined with `--agent` or `--all-agents`)
- `--limit <n|all>`: max rows to output (default `100`; `all` restores full output)
Tail human-readable trajectory progress for stored sessions:
```bash
openclaw sessions tail
openclaw sessions tail --follow
openclaw sessions tail --session-key "agent:main:telegram:direct:123" --tail 25
openclaw sessions --agent work tail --follow
openclaw sessions --all-agents tail --follow
```
`openclaw sessions tail` renders recent trajectory JSONL events as compact progress lines. Without `--session-key`, it tails running sessions first, then the latest stored session. `--tail <count>` controls how many existing events print before follow mode; the default is `80`, and `0` starts at the current end. `--follow` keeps watching the selected trajectory files, including relocated files referenced by `<session>.trajectory-path.json`.
The progress view is intentionally conservative: prompt text, tool arguments, and tool result bodies are not printed. Tool calls show the tool name with `{...redacted...}`; tool results show status such as `ok`, `error`, or `done`; model completion lines show provider/model and terminal status.
Export a trajectory bundle for a stored session:
```bash

View File

@@ -7,6 +7,7 @@ const mocks = vi.hoisted(() => ({
healthCommand: vi.fn(),
sessionsCommand: vi.fn(),
sessionsCleanupCommand: vi.fn(),
sessionsTailCommand: vi.fn(),
exportTrajectoryCommand: vi.fn(),
commitmentsListCommand: vi.fn(),
commitmentsDismissCommand: vi.fn(),
@@ -31,6 +32,7 @@ const statusCommand = mocks.statusCommand;
const healthCommand = mocks.healthCommand;
const sessionsCommand = mocks.sessionsCommand;
const sessionsCleanupCommand = mocks.sessionsCleanupCommand;
const sessionsTailCommand = mocks.sessionsTailCommand;
const exportTrajectoryCommand = mocks.exportTrajectoryCommand;
const commitmentsListCommand = mocks.commitmentsListCommand;
const commitmentsDismissCommand = mocks.commitmentsDismissCommand;
@@ -88,6 +90,10 @@ vi.mock("../../commands/sessions-cleanup.js", () => ({
sessionsCleanupCommand: mocks.sessionsCleanupCommand,
}));
vi.mock("../../commands/sessions-tail.js", () => ({
sessionsTailCommand: mocks.sessionsTailCommand,
}));
vi.mock("../../commands/export-trajectory.js", () => ({
exportTrajectoryCommand: mocks.exportTrajectoryCommand,
}));
@@ -134,6 +140,7 @@ describe("registerStatusHealthSessionsCommands", () => {
healthCommand.mockResolvedValue(undefined);
sessionsCommand.mockResolvedValue(undefined);
sessionsCleanupCommand.mockResolvedValue(undefined);
sessionsTailCommand.mockResolvedValue(undefined);
exportTrajectoryCommand.mockResolvedValue(undefined);
commitmentsListCommand.mockResolvedValue(undefined);
commitmentsDismissCommand.mockResolvedValue(undefined);
@@ -345,6 +352,31 @@ describe("registerStatusHealthSessionsCommands", () => {
});
});
it("runs sessions tail with forwarded progress options", async () => {
await runCli([
"sessions",
"--store",
"/tmp/sessions.json",
"--agent",
"work",
"tail",
"--session-key",
"agent:main:telegram:direct:owner",
"--tail",
"5",
"--follow",
]);
expectCommandOptions(sessionsTailCommand, {
sessionKey: "agent:main:telegram:direct:owner",
store: "/tmp/sessions.json",
agent: "work",
allAgents: false,
follow: true,
tail: "5",
});
});
it("runs sessions export-trajectory with owner-routable export options", async () => {
await runCli([
"sessions",

View File

@@ -286,6 +286,39 @@ export function registerStatusHealthSessionsCommands(program: Command) {
});
});
sessionsCmd
.command("tail")
.description("Tail human-readable session trajectory progress")
.option("--session-key <key>", "Session key to tail (default: active sessions or latest)")
.option("--tail <count>", "Number of existing trajectory events to show", "80")
.option("--follow", "Continue following for new trajectory events", false)
.option("--store <path>", "Path to session store (default: resolved from config)")
.option("--agent <id>", "Agent id to inspect (default: configured default agent)")
.option("--all-agents", "Aggregate sessions across all configured agents", false)
.action(async (opts, command) => {
const parentOpts = command.parent?.opts() as
| {
store?: string;
agent?: string;
allAgents?: boolean;
}
| undefined;
await runCommandWithRuntime(defaultRuntime, async () => {
const { sessionsTailCommand } = await import("../../commands/sessions-tail.js");
await sessionsTailCommand(
{
sessionKey: opts.sessionKey as string | undefined,
store: (opts.store as string | undefined) ?? parentOpts?.store,
agent: (opts.agent as string | undefined) ?? parentOpts?.agent,
allAgents: Boolean(opts.allAgents || parentOpts?.allAgents),
follow: Boolean(opts.follow),
tail: opts.tail as string | undefined,
},
defaultRuntime,
);
});
});
sessionsCmd
.command("export-trajectory")
.description("Export a redacted trajectory bundle for a stored session")

View File

@@ -0,0 +1,315 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { RuntimeEnv } from "../runtime.js";
import { resolveTrajectoryPointerFilePath } from "../trajectory/paths.js";
import type { TrajectoryEvent } from "../trajectory/types.js";
import { sessionsTailCommand } from "./sessions-tail.js";
const mocks = vi.hoisted(() => ({
getRuntimeConfig: vi.fn(() => ({})),
}));
vi.mock("../config/config.js", () => ({
getRuntimeConfig: mocks.getRuntimeConfig,
}));
const sessionKey = "agent:main:telegram:direct:owner";
function makeRuntime(): RuntimeEnv {
return {
log: vi.fn(),
error: vi.fn(),
exit: vi.fn(),
};
}
function makeEvent(
params: Partial<TrajectoryEvent> & { type: string; ts: string },
): TrajectoryEvent {
return {
traceSchema: "openclaw-trajectory",
schemaVersion: 1,
traceId: "trace-1",
source: "runtime",
seq: 1,
sessionId: "session-one",
sessionKey,
...params,
};
}
function writeJsonl(filePath: string, events: TrajectoryEvent[]): void {
fs.writeFileSync(filePath, `${events.map((event) => JSON.stringify(event)).join("\n")}\n`);
}
function appendJsonl(filePath: string, event: TrajectoryEvent): void {
fs.appendFileSync(filePath, `${JSON.stringify(event)}\n`);
}
function runtimeOutput(runtime: RuntimeEnv): string {
return vi
.mocked(runtime.log)
.mock.calls.map((call) => String(call[0]))
.join("\n");
}
async function waitForRuntimeOutput(
runtime: RuntimeEnv,
pattern: string,
timeoutMs = 3_000,
): Promise<void> {
const startedAt = Date.now();
while (!runtimeOutput(runtime).includes(pattern)) {
if (Date.now() - startedAt > timeoutMs) {
throw new Error(`Timed out waiting for output containing ${pattern}`);
}
await new Promise((resolve) => setTimeout(resolve, 25));
}
}
describe("sessionsTailCommand", () => {
let tmpDir: string;
let storePath: string;
let trajectoryPath: string;
let previousStateDir: string | undefined;
beforeEach(() => {
previousStateDir = process.env.OPENCLAW_STATE_DIR;
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-sessions-tail-"));
process.env.OPENCLAW_STATE_DIR = path.join(tmpDir, "state");
mocks.getRuntimeConfig.mockReturnValue({
agents: {
list: [{ id: "main" }, { id: "ops" }],
},
});
storePath = path.join(tmpDir, "sessions.json");
trajectoryPath = path.join(tmpDir, "session-one.trajectory.jsonl");
fs.writeFileSync(
storePath,
`${JSON.stringify({
[sessionKey]: {
sessionId: "session-one",
sessionFile: "session-one.jsonl",
updatedAt: 2,
status: "running",
},
})}\n`,
);
});
afterEach(() => {
if (previousStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = previousStateDir;
}
fs.rmSync(tmpDir, { recursive: true, force: true });
});
it("renders compact redacted progress lines", async () => {
const runtime = makeRuntime();
writeJsonl(trajectoryPath, [
makeEvent({
type: "tool.call",
ts: "2026-05-18T12:04:18.000Z",
data: { name: "bash", arguments: { command: "echo SECRET" } },
}),
makeEvent({
type: "tool.result",
ts: "2026-05-18T12:04:21.000Z",
data: { name: "bash", success: true, output: "SECRET" },
}),
makeEvent({
type: "model.completed",
ts: "2026-05-18T12:04:29.000Z",
provider: "openai",
modelId: "gpt-5.2",
}),
]);
await sessionsTailCommand({ store: storePath, sessionKey }, runtime);
const output = vi
.mocked(runtime.log)
.mock.calls.map((call) => String(call[0]))
.join("\n");
expect(output).toContain("12:04:18");
expect(output).toContain("tool.call");
expect(output).toContain("bash {...redacted...}");
expect(output).toContain("tool.result");
expect(output).toContain("bash ok");
expect(output).toContain("model.completed");
expect(output).toContain("openai/gpt-5.2 done");
expect(output).not.toContain("SECRET");
});
it("honors the tail count before rendering existing trajectory events", async () => {
const runtime = makeRuntime();
writeJsonl(trajectoryPath, [
makeEvent({ type: "session.started", ts: "2026-05-18T12:04:17.000Z" }),
makeEvent({
type: "tool.call",
ts: "2026-05-18T12:04:18.000Z",
data: { name: "bash" },
}),
makeEvent({
type: "tool.result",
ts: "2026-05-18T12:04:21.000Z",
data: { name: "bash", success: true },
}),
]);
await sessionsTailCommand({ store: storePath, sessionKey, tail: "2" }, runtime);
const output = vi
.mocked(runtime.log)
.mock.calls.map((call) => String(call[0]))
.join("\n");
expect(output).not.toContain("session.started");
expect(output).toContain("tool.call");
expect(output).toContain("tool.result");
});
it("uses a session trajectory pointer for relocated runtime files", async () => {
const runtime = makeRuntime();
const relocatedDir = path.join(tmpDir, "relocated-trajectories");
const relocatedTrajectoryPath = path.join(relocatedDir, "session-one.jsonl");
fs.mkdirSync(relocatedDir, { recursive: true });
fs.writeFileSync(
resolveTrajectoryPointerFilePath(path.join(tmpDir, "session-one.jsonl")),
`${JSON.stringify({
traceSchema: "openclaw-trajectory-pointer",
schemaVersion: 1,
sessionId: "session-one",
runtimeFile: relocatedTrajectoryPath,
})}\n`,
);
writeJsonl(relocatedTrajectoryPath, [
makeEvent({
type: "tool.result",
ts: "2026-05-18T12:04:21.000Z",
data: { name: "bash", success: true },
}),
]);
await sessionsTailCommand({ store: storePath, sessionKey }, runtime);
const output = runtimeOutput(runtime);
expect(output).toContain("tool.result");
expect(output).toContain("bash ok");
expect(output).not.toContain("No sessions found");
});
it("preserves events appended while follow mode starts", async () => {
const runtime = makeRuntime();
writeJsonl(trajectoryPath, [
makeEvent({ type: "session.started", ts: "2026-05-18T12:04:17.000Z" }),
]);
const appendedEvent = makeEvent({
type: "tool.result",
ts: "2026-05-18T12:04:21.000Z",
data: { name: "bash", success: true },
});
let appended = false;
vi.mocked(runtime.log).mockImplementation((message) => {
if (!appended && String(message).includes("session.started")) {
appended = true;
appendJsonl(trajectoryPath, appendedEvent);
}
});
const run = sessionsTailCommand(
{ store: storePath, sessionKey, tail: "1", follow: true },
runtime,
);
try {
await waitForRuntimeOutput(runtime, "bash ok");
} finally {
process.emit("SIGTERM", "SIGTERM");
await run;
}
const output = runtimeOutput(runtime);
expect(output).toContain("session.started");
expect(output).toContain("tool.result");
expect(output).toContain("bash ok");
});
it("continues following when a bounded trajectory window is rewritten", async () => {
const runtime = makeRuntime();
writeJsonl(trajectoryPath, [
makeEvent({
sourceSeq: 1,
type: "session.started",
ts: "2026-05-18T12:04:17.000Z",
}),
]);
const rewrittenEvent = makeEvent({
sourceSeq: 2,
type: "tool.result",
ts: "2026-05-18T12:04:21.000Z",
data: { name: "python", success: true },
});
let rewritten = false;
vi.mocked(runtime.log).mockImplementation((message) => {
if (!rewritten && String(message).includes("session.started")) {
rewritten = true;
const nextPath = path.join(tmpDir, "session-one.next.trajectory.jsonl");
writeJsonl(nextPath, [rewrittenEvent]);
fs.renameSync(nextPath, trajectoryPath);
}
});
const run = sessionsTailCommand(
{ store: storePath, sessionKey, tail: "1", follow: true },
runtime,
);
try {
await waitForRuntimeOutput(runtime, "python ok");
} finally {
process.emit("SIGTERM", "SIGTERM");
await run;
}
const output = runtimeOutput(runtime);
expect(output).toContain("tool.result");
expect(output).toContain("python ok");
});
it("resolves the target store from a fully qualified non-default agent session key", async () => {
const runtime = makeRuntime();
const opsSessionKey = "agent:ops:telegram:direct:owner";
const opsSessionsDir = path.join(process.env.OPENCLAW_STATE_DIR!, "agents", "ops", "sessions");
fs.mkdirSync(opsSessionsDir, { recursive: true });
fs.writeFileSync(
path.join(opsSessionsDir, "sessions.json"),
`${JSON.stringify({
[opsSessionKey]: {
sessionId: "ops-session",
sessionFile: "ops-session.jsonl",
updatedAt: 3,
status: "done",
},
})}\n`,
);
writeJsonl(path.join(opsSessionsDir, "ops-session.trajectory.jsonl"), [
makeEvent({
sessionId: "ops-session",
sessionKey: opsSessionKey,
type: "tool.result",
ts: "2026-05-18T12:04:21.000Z",
data: { name: "bash", success: true },
}),
]);
await sessionsTailCommand({ sessionKey: opsSessionKey }, runtime);
const output = runtimeOutput(runtime);
expect(output).toContain("agent:ops:telegram:direct:own…");
expect(output).toContain("tool.result");
expect(output).toContain("bash ok");
expect(output).not.toContain("No sessions found");
});
});

View File

@@ -0,0 +1,532 @@
import fs from "node:fs";
import path from "node:path";
import { getRuntimeConfig } from "../config/config.js";
import { loadSessionStore } from "../config/sessions.js";
import { resolveSessionFilePath } from "../config/sessions/paths.js";
import type { SessionEntry } from "../config/sessions/types.js";
import { formatErrorMessage } from "../infra/errors.js";
import { resolveAgentIdFromSessionKey } from "../routing/session-key.js";
import type { RuntimeEnv } from "../runtime.js";
import { resolveTrajectoryFilePath } from "../trajectory/paths.js";
import { resolveTrajectoryRuntimeFile } from "../trajectory/runtime-file.js";
import type { TrajectoryEvent } from "../trajectory/types.js";
import { resolveSessionStoreTargetsOrExit } from "./session-store-targets.js";
import { shortenText } from "./text-format.js";
type SessionsTailOptions = {
store?: string;
agent?: string;
allAgents?: boolean;
sessionKey?: string;
follow?: boolean;
tail?: string | number;
};
type TailSelection = {
agentId: string;
key: string;
entry: SessionEntry;
storePath: string;
trajectoryPath: string;
};
type FollowState = {
cursor: TrajectoryCursor | null;
fileState: FollowFileState | null;
offset: number;
pending: string;
selection: TailSelection;
};
type TrajectorySnapshot = {
events: TrajectoryEvent[];
fileState: FollowFileState | null;
offset: number;
};
type FollowFileState = {
dev: number;
ino: number;
mtimeMs: number;
size: number;
};
type TrajectoryCursor = {
seq: number | null;
tsMs: number;
};
const DEFAULT_TAIL_COUNT = 80;
const SESSION_KEY_PAD = 30;
const EVENT_TYPE_PAD = 16;
const FOLLOW_INTERVAL_MS = 1_000;
function parseTailCount(value: string | number | undefined): number | null {
if (value === undefined) {
return DEFAULT_TAIL_COUNT;
}
if (typeof value === "number") {
return Number.isInteger(value) && value >= 0 ? value : null;
}
const trimmed = value.trim();
if (!/^\d+$/.test(trimmed)) {
return null;
}
return Number.parseInt(trimmed, 10);
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function toOptionalString(value: unknown): string | undefined {
return typeof value === "string" && value.trim() ? value.trim() : undefined;
}
function isTrajectoryEvent(value: unknown): value is TrajectoryEvent {
return (
isRecord(value) &&
value.traceSchema === "openclaw-trajectory" &&
value.schemaVersion === 1 &&
typeof value.type === "string" &&
typeof value.ts === "string" &&
typeof value.sessionId === "string"
);
}
function parseTrajectoryEventLine(line: string): TrajectoryEvent | null {
const trimmed = line.trim();
if (!trimmed) {
return null;
}
try {
const parsed = JSON.parse(trimmed) as unknown;
return isTrajectoryEvent(parsed) ? parsed : null;
} catch {
return null;
}
}
function parseTrajectoryEventLines(lines: string[]): TrajectoryEvent[] {
return lines.flatMap((line) => {
const event = parseTrajectoryEventLine(line);
return event ? [event] : [];
});
}
function eventSequence(event: TrajectoryEvent): number | null {
const seq = event.sourceSeq ?? event.seq;
return Number.isFinite(seq) ? seq : null;
}
function eventTimestampMs(event: TrajectoryEvent): number {
const parsed = Date.parse(event.ts);
return Number.isFinite(parsed) ? parsed : Number.NEGATIVE_INFINITY;
}
function eventCursor(event: TrajectoryEvent): TrajectoryCursor {
return {
seq: eventSequence(event),
tsMs: eventTimestampMs(event),
};
}
function compareCursors(left: TrajectoryCursor, right: TrajectoryCursor): number {
if (left.seq !== null && right.seq !== null && left.seq !== right.seq) {
return left.seq - right.seq;
}
const byTimestamp = left.tsMs - right.tsMs;
if (byTimestamp !== 0) {
return byTimestamp;
}
if (left.seq !== null && right.seq !== null) {
return left.seq - right.seq;
}
return 0;
}
function maxCursorValue(
current: TrajectoryCursor | null,
candidate: TrajectoryCursor,
): TrajectoryCursor {
return !current || compareCursors(candidate, current) > 0 ? candidate : current;
}
function maxCursor(current: TrajectoryCursor | null, event: TrajectoryEvent): TrajectoryCursor {
return maxCursorValue(current, eventCursor(event));
}
function maxCursorFromEvents(events: TrajectoryEvent[]): TrajectoryCursor | null {
return events.reduce<TrajectoryCursor | null>((cursor, event) => maxCursor(cursor, event), null);
}
function eventsAfterCursor(
events: TrajectoryEvent[],
cursor: TrajectoryCursor | null,
): TrajectoryEvent[] {
if (!cursor) {
return events;
}
return events.filter((event) => compareCursors(eventCursor(event), cursor) > 0);
}
function formatTimestamp(ts: string): string {
const date = new Date(ts);
if (Number.isNaN(date.getTime())) {
return "--:--:--";
}
return date.toISOString().slice(11, 19);
}
function modelLabel(event: TrajectoryEvent): string | undefined {
const provider = event.provider?.trim();
const model = event.modelId?.trim();
if (provider && model) {
return `${provider}/${model}`;
}
return model || provider || undefined;
}
function toolName(data: Record<string, unknown> | undefined): string {
return toOptionalString(data?.name) ?? toOptionalString(data?.toolName) ?? "tool";
}
function resultStatus(data: Record<string, unknown> | undefined): string {
if (data?.success === true) {
return "ok";
}
if (data?.success === false || data?.isError === true) {
return "error";
}
return toOptionalString(data?.status) ?? "done";
}
function modelCompletionStatus(data: Record<string, unknown> | undefined): string {
if (data?.timedOut === true) {
return "timeout";
}
if (data?.aborted === true) {
return "aborted";
}
if (toOptionalString(data?.promptError)) {
return "error";
}
return "done";
}
function safePreview(event: TrajectoryEvent): string {
const data = event.data;
switch (event.type) {
case "session.started":
return "session started";
case "context.compiled": {
const tools = Array.isArray(data?.tools) ? data.tools.length : undefined;
return tools === undefined ? "context compiled" : `context compiled (${tools} tools)`;
}
case "prompt.submitted":
return "prompt submitted";
case "prompt.skipped": {
const reason = toOptionalString(data?.reason);
return `prompt skipped${reason ? `: ${reason}` : ""}`;
}
case "tool.call":
return `${toolName(data)} {...redacted...}`;
case "tool.timeout":
return `${toolName(data)} timeout`;
case "tool.result":
return `${toolName(data)} ${resultStatus(data)}`;
case "model.completed": {
const model = modelLabel(event);
const status = modelCompletionStatus(data);
return model ? `${model} ${status}` : status;
}
case "session.ended":
return toOptionalString(data?.status) ?? "ended";
case "trace.truncated":
return "trajectory truncated";
default:
return toOptionalString(data?.status) ?? toOptionalString(data?.name) ?? "";
}
}
function formatProgressLine(event: TrajectoryEvent): string {
const sessionLabel = shortenText(event.sessionKey ?? event.sessionId, SESSION_KEY_PAD).padEnd(
SESSION_KEY_PAD,
);
const typeLabel = shortenText(event.type, EVENT_TYPE_PAD).padEnd(EVENT_TYPE_PAD);
const preview = safePreview(event);
return [formatTimestamp(event.ts), typeLabel, sessionLabel, preview].join(" ").trimEnd();
}
function readTrajectorySnapshot(filePath: string): TrajectorySnapshot {
try {
const stat = fs.statSync(filePath);
const text = fs.readFileSync(filePath, "utf8");
return {
events: parseTrajectoryEventLines(text.split(/\r?\n/u)),
fileState: fileStateFromStat(stat),
offset: Buffer.byteLength(text, "utf8"),
};
} catch (error) {
if ((error as NodeJS.ErrnoException).code === "ENOENT") {
return { events: [], fileState: null, offset: 0 };
}
throw error;
}
}
function renderEvents(events: TrajectoryEvent[], runtime: RuntimeEnv): TrajectoryCursor | null {
let cursor: TrajectoryCursor | null = null;
for (const event of events) {
runtime.log(formatProgressLine(event));
cursor = maxCursor(cursor, event);
}
return cursor;
}
function fileStateFromStat(stat: fs.Stats): FollowFileState {
return {
dev: stat.dev,
ino: stat.ino,
mtimeMs: stat.mtimeMs,
size: stat.size,
};
}
function sameFileIdentity(left: FollowFileState | null, right: FollowFileState): boolean {
return Boolean(left && left.dev === right.dev && left.ino === right.ino);
}
function readFollowFileState(filePath: string): FollowFileState | null {
try {
return fileStateFromStat(fs.statSync(filePath));
} catch (error) {
if ((error as NodeJS.ErrnoException).code === "ENOENT") {
return null;
}
throw error;
}
}
function isRunningSession(entry: SessionEntry): boolean {
return entry.status === "running" || entry.acp?.state === "running";
}
function compareSelectionsByUpdatedAt(a: TailSelection, b: TailSelection): number {
return (b.entry.updatedAt ?? 0) - (a.entry.updatedAt ?? 0);
}
async function buildTailSelection(params: {
agentId: string;
entry: SessionEntry;
key: string;
storePath: string;
}): Promise<TailSelection> {
const sessionsDir = path.dirname(params.storePath);
const sessionFile = resolveSessionFilePath(params.entry.sessionId, params.entry, {
agentId: params.agentId,
sessionsDir,
});
const trajectoryPath =
(await resolveTrajectoryRuntimeFile({
sessionFile,
sessionId: params.entry.sessionId,
})) ??
resolveTrajectoryFilePath({
sessionFile,
sessionId: params.entry.sessionId,
});
return {
agentId: params.agentId,
entry: params.entry,
key: params.key,
storePath: params.storePath,
trajectoryPath,
};
}
function selectSessionsToTail(selections: TailSelection[], sessionKey?: string): TailSelection[] {
const requested = sessionKey?.trim();
if (requested) {
return selections.filter((selection) => selection.key === requested);
}
const running = selections.filter((selection) => isRunningSession(selection.entry));
if (running.length > 0) {
return running.toSorted(compareSelectionsByUpdatedAt);
}
const latest = selections.toSorted(compareSelectionsByUpdatedAt)[0];
return latest ? [latest] : [];
}
function statFileSize(filePath: string): number {
try {
return fs.statSync(filePath).size;
} catch (error) {
if ((error as NodeJS.ErrnoException).code === "ENOENT") {
return 0;
}
throw error;
}
}
function readNewFollowEvents(state: FollowState): TrajectoryEvent[] {
const fileState = readFollowFileState(state.selection.trajectoryPath);
if (!fileState) {
state.fileState = null;
state.offset = 0;
state.pending = "";
return [];
}
const replaced = !sameFileIdentity(state.fileState, fileState);
const truncated = fileState.size < state.offset;
const possiblyRewrittenSameSize =
fileState.size === state.offset && state.fileState?.mtimeMs !== fileState.mtimeMs;
if (replaced || truncated || possiblyRewrittenSameSize) {
const snapshot = readTrajectorySnapshot(state.selection.trajectoryPath);
state.fileState = snapshot.fileState;
state.offset = snapshot.offset;
state.pending = "";
return eventsAfterCursor(snapshot.events, state.cursor);
}
if (fileState.size === state.offset) {
state.fileState = fileState;
return [];
}
const fd = fs.openSync(state.selection.trajectoryPath, "r");
try {
const buffer = Buffer.alloc(fileState.size - state.offset);
fs.readSync(fd, buffer, 0, buffer.length, state.offset);
state.offset = fileState.size;
state.fileState = fileState;
const combined = `${state.pending}${buffer.toString("utf8")}`;
const lines = combined.split(/\r?\n/u);
state.pending = lines.pop() ?? "";
return parseTrajectoryEventLines(lines);
} finally {
fs.closeSync(fd);
}
}
function renderFollowEvents(
events: TrajectoryEvent[],
state: FollowState,
runtime: RuntimeEnv,
): void {
const cursor = renderEvents(events, runtime);
if (cursor) {
state.cursor = maxCursorValue(state.cursor, cursor);
}
}
async function followSelections(
selections: TailSelection[],
runtime: RuntimeEnv,
initialSnapshots: Map<string, TrajectorySnapshot>,
): Promise<void> {
const states = selections.map((selection): FollowState => {
const snapshot = initialSnapshots.get(selection.trajectoryPath);
return {
cursor: snapshot ? maxCursorFromEvents(snapshot.events) : null,
fileState: snapshot?.fileState ?? readFollowFileState(selection.trajectoryPath),
offset: snapshot?.offset ?? statFileSize(selection.trajectoryPath),
pending: "",
selection,
};
});
await new Promise<void>((resolve) => {
const interval = setInterval(() => {
for (const state of states) {
try {
renderFollowEvents(readNewFollowEvents(state), state, runtime);
} catch (error) {
runtime.error(
`Failed to read trajectory progress for ${state.selection.key}: ${formatErrorMessage(
error,
)}`,
);
}
}
}, FOLLOW_INTERVAL_MS);
const stop = () => {
clearInterval(interval);
process.off("SIGINT", stop);
process.off("SIGTERM", stop);
resolve();
};
process.once("SIGINT", stop);
process.once("SIGTERM", stop);
});
}
function resolveTailTargetAgent(opts: SessionsTailOptions): string | undefined {
if (opts.agent?.trim() || opts.store?.trim() || opts.allAgents === true) {
return opts.agent;
}
return opts.sessionKey?.trim() ? resolveAgentIdFromSessionKey(opts.sessionKey) : undefined;
}
export async function sessionsTailCommand(
opts: SessionsTailOptions,
runtime: RuntimeEnv,
): Promise<void> {
const tailCount = parseTailCount(opts.tail);
if (tailCount === null) {
runtime.error("--tail must be a non-negative integer, for example --tail 25.");
runtime.exit(1);
return;
}
const cfg = getRuntimeConfig();
const targets = resolveSessionStoreTargetsOrExit({
cfg,
opts: {
store: opts.store,
agent: resolveTailTargetAgent(opts),
allAgents: opts.allAgents,
},
runtime,
});
if (!targets) {
return;
}
const selections: TailSelection[] = [];
for (const target of targets) {
const store = loadSessionStore(target.storePath);
for (const [key, entry] of Object.entries(store)) {
selections.push(
await buildTailSelection({
agentId: target.agentId,
entry,
key,
storePath: target.storePath,
}),
);
}
}
const selected = selectSessionsToTail(selections, opts.sessionKey);
if (selected.length === 0) {
const suffix = opts.sessionKey ? ` for ${opts.sessionKey}` : "";
runtime.log(`No sessions found${suffix}.`);
return;
}
const followSnapshots = new Map<string, TrajectorySnapshot>();
for (const selection of selected) {
const snapshot = readTrajectorySnapshot(selection.trajectoryPath);
followSnapshots.set(selection.trajectoryPath, snapshot);
renderEvents(tailCount > 0 ? snapshot.events.slice(-tailCount) : [], runtime);
}
if (opts.follow) {
await followSelections(selected, runtime, followSnapshots);
}
}

View File

@@ -19,12 +19,8 @@ import {
} from "../logging/diagnostic-support-redaction.js";
import { isRecord } from "../shared/record-coerce.js";
import { safeJsonStringify } from "../utils/safe-json.js";
import {
TRAJECTORY_RUNTIME_FILE_MAX_BYTES,
resolveTrajectoryFilePath,
resolveTrajectoryPointerFilePath,
safeTrajectorySessionFileName,
} from "./paths.js";
import { TRAJECTORY_RUNTIME_FILE_MAX_BYTES, safeTrajectorySessionFileName } from "./paths.js";
import { isRegularNonSymlinkFile, resolveTrajectoryRuntimeFile } from "./runtime-file.js";
import type {
TrajectoryBundleManifest,
TrajectoryBundleWarning,
@@ -330,81 +326,6 @@ function summarizeJsonlWarnings(warnings: JsonlParseWarning[]): TrajectoryBundle
return [...byKey.values()];
}
async function isRegularNonSymlinkFile(filePath: string): Promise<boolean> {
try {
const linkStat = await fsp.lstat(filePath);
if (linkStat.isSymbolicLink() || !linkStat.isFile()) {
return false;
}
const stat = await fsp.stat(filePath);
return stat.isFile() && stat.dev === linkStat.dev && stat.ino === linkStat.ino;
} catch {
return false;
}
}
async function readRuntimePointerFile(
sessionFile: string,
sessionId: string,
): Promise<string | undefined> {
const pointerPath = resolveTrajectoryPointerFilePath(sessionFile);
if (!(await isRegularNonSymlinkFile(pointerPath))) {
return undefined;
}
try {
const parsed = JSON.parse(await fsp.readFile(pointerPath, "utf8")) as unknown;
if (!isRecord(parsed)) {
return undefined;
}
if (parsed.sessionId !== sessionId || typeof parsed.runtimeFile !== "string") {
return undefined;
}
const runtimeFile = path.resolve(parsed.runtimeFile);
const safeRuntimeFileName = `${safeTrajectorySessionFileName(sessionId)}.jsonl`;
const defaultRuntimeFile = path.resolve(
resolveTrajectoryFilePath({
env: {},
sessionFile,
sessionId,
}),
);
if (runtimeFile !== defaultRuntimeFile && path.basename(runtimeFile) !== safeRuntimeFileName) {
return undefined;
}
return runtimeFile;
} catch {
return undefined;
}
}
async function resolveTrajectoryRuntimeFile(params: {
runtimeFile?: string;
sessionFile: string;
sessionId: string;
}): Promise<string | undefined> {
if (params.runtimeFile) {
return params.runtimeFile;
}
const candidates = [
await readRuntimePointerFile(params.sessionFile, params.sessionId),
resolveTrajectoryFilePath({
env: {},
sessionFile: params.sessionFile,
sessionId: params.sessionId,
}),
resolveTrajectoryFilePath({
sessionFile: params.sessionFile,
sessionId: params.sessionId,
}),
].filter((candidate): candidate is string => Boolean(candidate));
for (const candidate of candidates) {
if (await isRegularNonSymlinkFile(candidate)) {
return candidate;
}
}
return undefined;
}
function normalizeTimestamp(value: unknown): string {
if (typeof value === "number" && Number.isFinite(value)) {
const parsed = new Date(value);

View File

@@ -0,0 +1,86 @@
import fsp from "node:fs/promises";
import path from "node:path";
import {
resolveTrajectoryFilePath,
resolveTrajectoryPointerFilePath,
safeTrajectorySessionFileName,
} from "./paths.js";
function isRecord(value: unknown): value is Record<string, unknown> {
return Boolean(value) && typeof value === "object" && !Array.isArray(value);
}
export async function isRegularNonSymlinkFile(filePath: string): Promise<boolean> {
try {
const linkStat = await fsp.lstat(filePath);
if (linkStat.isSymbolicLink() || !linkStat.isFile()) {
return false;
}
const stat = await fsp.stat(filePath);
return stat.isFile() && stat.dev === linkStat.dev && stat.ino === linkStat.ino;
} catch {
return false;
}
}
async function readRuntimePointerFile(
sessionFile: string,
sessionId: string,
): Promise<string | undefined> {
const pointerPath = resolveTrajectoryPointerFilePath(sessionFile);
if (!(await isRegularNonSymlinkFile(pointerPath))) {
return undefined;
}
try {
const parsed = JSON.parse(await fsp.readFile(pointerPath, "utf8")) as unknown;
if (!isRecord(parsed)) {
return undefined;
}
if (parsed.sessionId !== sessionId || typeof parsed.runtimeFile !== "string") {
return undefined;
}
const runtimeFile = path.resolve(parsed.runtimeFile);
const safeRuntimeFileName = `${safeTrajectorySessionFileName(sessionId)}.jsonl`;
const defaultRuntimeFile = path.resolve(
resolveTrajectoryFilePath({
env: {},
sessionFile,
sessionId,
}),
);
if (runtimeFile !== defaultRuntimeFile && path.basename(runtimeFile) !== safeRuntimeFileName) {
return undefined;
}
return runtimeFile;
} catch {
return undefined;
}
}
export async function resolveTrajectoryRuntimeFile(params: {
runtimeFile?: string;
sessionFile: string;
sessionId: string;
}): Promise<string | undefined> {
if (params.runtimeFile) {
return params.runtimeFile;
}
const candidates = [
await readRuntimePointerFile(params.sessionFile, params.sessionId),
resolveTrajectoryFilePath({
env: {},
sessionFile: params.sessionFile,
sessionId: params.sessionId,
}),
resolveTrajectoryFilePath({
sessionFile: params.sessionFile,
sessionId: params.sessionId,
}),
].filter((candidate): candidate is string => Boolean(candidate));
for (const candidate of candidates) {
if (await isRegularNonSymlinkFile(candidate)) {
return candidate;
}
}
return undefined;
}