diff --git a/src/agents/command/attempt-execution.cli.test.ts b/src/agents/command/attempt-execution.cli.test.ts index 27e4f6d55b3..f118f4cfe88 100644 --- a/src/agents/command/attempt-execution.cli.test.ts +++ b/src/agents/command/attempt-execution.cli.test.ts @@ -73,6 +73,23 @@ async function readSessionMessages(sessionFile: string) { ); } +async function readSessionFileEntries(sessionFile: string) { + const raw = await fs.readFile(sessionFile, "utf-8"); + return raw + .split(/\r?\n/) + .filter(Boolean) + .map( + (line) => + JSON.parse(line) as { + type?: string; + id?: string; + parentId?: string | null; + cwd?: string; + message?: { role?: string }; + }, + ); +} + describe("CLI attempt execution", () => { let tmpDir: string; let storePath: string; @@ -374,6 +391,17 @@ describe("CLI attempt execution", () => { const sessionFile = updatedEntry?.sessionFile; expect(sessionFile).toBeTruthy(); + const entries = await readSessionFileEntries(sessionFile!); + expect(entries[0]).toMatchObject({ + type: "session", + id: sessionEntry.sessionId, + cwd: tmpDir, + }); + expect(entries[1]).toMatchObject({ type: "message", parentId: null }); + expect(entries[2]).toMatchObject({ + type: "message", + parentId: entries[1]?.id, + }); const messages = await readSessionMessages(sessionFile!); expect(messages).toHaveLength(2); expect(messages[0]).toMatchObject({ diff --git a/src/agents/command/attempt-execution.ts b/src/agents/command/attempt-execution.ts index 038efc4272e..95687498e78 100644 --- a/src/agents/command/attempt-execution.ts +++ b/src/agents/command/attempt-execution.ts @@ -1,7 +1,6 @@ -import fs from "node:fs/promises"; -import { SessionManager } from "@mariozechner/pi-coding-agent"; import { normalizeReplyPayload } from "../../auto-reply/reply/normalize-reply.js"; import type { ThinkLevel, VerboseLevel } from "../../auto-reply/thinking.js"; +import { appendSessionTranscriptMessage } from "../../config/sessions/transcript-append.js"; import { resolveSessionTranscriptFile } from "../../config/sessions/transcript.js"; import type { SessionEntry } from "../../config/sessions/types.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; @@ -20,9 +19,9 @@ import { FailoverError } from "../failover-error.js"; import { resolveAgentHarnessPolicy } from "../harness/selection.js"; import { isCliRuntimeAlias, resolveCliRuntimeExecutionProvider } from "../model-runtime-aliases.js"; import { isCliProvider } from "../model-selection.js"; -import { prepareSessionManagerForRun } from "../pi-embedded-runner/session-manager-init.js"; import { runEmbeddedPiAgent, type EmbeddedPiRunResult } from "../pi-embedded.js"; import { buildAgentRuntimeAuthPlan } from "../runtime-plan/auth.js"; +import { acquireSessionWriteLock } from "../session-write-lock.js"; import { buildWorkspaceSkillSnapshot } from "../skills.js"; import { buildUsageWithNoCost } from "../stream-message-shared.js"; import { @@ -194,38 +193,44 @@ async function persistTextTurnTranscript( agentId: params.sessionAgentId, threadId: params.threadId, }); - const hadSessionFile = await fs - .access(sessionFile) - .then(() => true) - .catch(() => false); - const sessionManager = SessionManager.open(sessionFile); - await prepareSessionManagerForRun({ - sessionManager, + const lock = await acquireSessionWriteLock({ sessionFile, - hadSessionFile, - sessionId: params.sessionId, - cwd: params.sessionCwd, + timeoutMs: 10_000, + allowReentrant: true, }); + try { + if (promptText) { + await appendSessionTranscriptMessage({ + transcriptPath: sessionFile, + sessionId: params.sessionId, + cwd: params.sessionCwd, + message: { + role: "user", + content: promptText, + timestamp: Date.now(), + }, + }); + } - if (promptText) { - sessionManager.appendMessage({ - role: "user", - content: promptText, - timestamp: Date.now(), - }); - } - - if (replyText) { - sessionManager.appendMessage({ - role: "assistant", - content: [{ type: "text", text: replyText }], - api: params.assistant.api, - provider: params.assistant.provider, - model: params.assistant.model, - usage: resolveTranscriptUsage(params.assistant.usage), - stopReason: "stop", - timestamp: Date.now(), - }); + if (replyText) { + await appendSessionTranscriptMessage({ + transcriptPath: sessionFile, + sessionId: params.sessionId, + cwd: params.sessionCwd, + message: { + role: "assistant", + content: [{ type: "text", text: replyText }], + api: params.assistant.api, + provider: params.assistant.provider, + model: params.assistant.model, + usage: resolveTranscriptUsage(params.assistant.usage), + stopReason: "stop", + timestamp: Date.now(), + }, + }); + } + } finally { + await lock.release(); } emitSessionTranscriptUpdate(sessionFile); diff --git a/src/agents/openclaw-tools.sessions.test.ts b/src/agents/openclaw-tools.sessions.test.ts index c63804c0a6e..74317996f17 100644 --- a/src/agents/openclaw-tools.sessions.test.ts +++ b/src/agents/openclaw-tools.sessions.test.ts @@ -299,6 +299,8 @@ describe("sessions tools", () => { params: { activeMinutes: undefined, agentId: "main", + includeDerivedTitles: false, + includeLastMessage: false, includeGlobal: true, includeUnknown: true, label: "mailbox", @@ -382,8 +384,8 @@ describe("sessions tools", () => { callGatewayMock.mockImplementation(async (opts: unknown) => { const request = opts as { method?: string; params?: Record }; if (request.method === "sessions.list") { - expect(request.params?.includeDerivedTitles).toBeUndefined(); - expect(request.params?.includeLastMessage).toBeUndefined(); + expect(request.params?.includeDerivedTitles).toBe(false); + expect(request.params?.includeLastMessage).toBe(false); return { path: storePath, sessions: [ diff --git a/src/agents/tools/sessions-list-tool.ts b/src/agents/tools/sessions-list-tool.ts index 8bf8b089c2f..dc046dbbec2 100644 --- a/src/agents/tools/sessions-list-tool.ts +++ b/src/agents/tools/sessions-list-tool.ts @@ -6,9 +6,13 @@ import { resolveSessionFilePathOptions, resolveStorePath, } from "../../config/sessions.js"; +import type { SessionEntry } from "../../config/sessions/types.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { callGateway } from "../../gateway/call.js"; -import { deriveSessionTitle } from "../../gateway/session-utils.js"; +import { + deriveSessionTitle, + readSessionTitleFieldsFromTranscriptAsync, +} from "../../gateway/session-utils.js"; import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js"; import { normalizeOptionalLowercaseString, readStringValue } from "../../shared/string-coerce.js"; import { @@ -45,6 +49,8 @@ const SessionsListToolSchema = Type.Object({ type GatewayCaller = typeof callGateway; +const SESSIONS_LIST_TRANSCRIPT_FIELD_ROWS = 100; + function readSessionRunStatus(value: unknown): SessionRunStatus | undefined { return value === "running" || value === "done" || @@ -109,6 +115,8 @@ export function createSessionsListTool(opts?: { const includeDerivedTitles = params.includeDerivedTitles === true; const includeLastMessage = params.includeLastMessage === true; const gatewayCall = opts?.callGateway ?? callGateway; + const a2aPolicy = createAgentToAgentPolicy(cfg); + const hydrateTranscriptFieldsAfterFiltering = includeDerivedTitles || includeLastMessage; const list = await gatewayCall<{ sessions: Array; path: string }>({ method: "sessions.list", @@ -118,8 +126,8 @@ export function createSessionsListTool(opts?: { label, agentId, search, - includeDerivedTitles, - includeLastMessage, + includeDerivedTitles: false, + includeLastMessage: false, includeGlobal: !restrictToSpawned, includeUnknown: !restrictToSpawned, spawnedBy: restrictToSpawned ? effectiveRequesterKey : undefined, @@ -128,7 +136,6 @@ export function createSessionsListTool(opts?: { const sessions = Array.isArray(list?.sessions) ? list.sessions : []; const storePath = typeof list?.path === "string" ? list.path : undefined; - const a2aPolicy = createAgentToAgentPolicy(cfg); const visibilityGuard = await createSessionVisibilityGuard({ action: "list", requesterSessionKey: effectiveRequesterKey, @@ -137,6 +144,13 @@ export function createSessionsListTool(opts?: { }); const rows: SessionListRow[] = []; const historyTargets: Array<{ row: SessionListRow; resolvedKey: string }> = []; + const titleTargets: Array<{ + row: SessionListRow; + titleEntry: SessionEntry; + sessionId: string; + sessionFile?: string; + agentId: string; + }> = []; for (const entry of sessions) { if (!entry || typeof entry !== "object") { @@ -310,17 +324,24 @@ export function createSessionsListTool(opts?: { lastAccountId, transcriptPath, }; - if (sessionId && includeDerivedTitles && !row.derivedTitle) { - row.derivedTitle = deriveSessionTitle( - { + if ( + sessionId && + hydrateTranscriptFieldsAfterFiltering && + titleTargets.length < SESSIONS_LIST_TRANSCRIPT_FIELD_ROWS + ) { + titleTargets.push({ + row, + titleEntry: { sessionId, displayName: row.displayName, label: row.label, subject: readStringValue((entry as { subject?: unknown }).subject), updatedAt: typeof row.updatedAt === "number" ? row.updatedAt : 0, }, - undefined, - ); + sessionId, + ...(sessionFile ? { sessionFile } : {}), + agentId: resolvedAgentId, + }); } if (messageLimit > 0) { const resolvedKey = resolveInternalSessionKey({ @@ -333,6 +354,37 @@ export function createSessionsListTool(opts?: { rows.push(row); } + if (titleTargets.length > 0) { + const maxConcurrent = Math.min(4, titleTargets.length); + let index = 0; + const worker = async () => { + while (true) { + const next = index; + index += 1; + if (next >= titleTargets.length) { + return; + } + const target = titleTargets[next]; + const fields = await readSessionTitleFieldsFromTranscriptAsync( + target.sessionId, + storePath, + target.sessionFile, + target.agentId, + ); + if (includeDerivedTitles && !target.row.derivedTitle) { + target.row.derivedTitle = deriveSessionTitle( + target.titleEntry, + fields.firstUserMessage, + ); + } + if (includeLastMessage && fields.lastMessagePreview) { + target.row.lastMessagePreview = fields.lastMessagePreview; + } + } + }; + await Promise.all(Array.from({ length: maxConcurrent }, () => worker())); + } + if (messageLimit > 0 && historyTargets.length > 0) { const maxConcurrent = Math.min(4, historyTargets.length); let index = 0; diff --git a/src/config/sessions/transcript-append.ts b/src/config/sessions/transcript-append.ts index f26c221f2bc..f7a2d8e49cc 100644 --- a/src/config/sessions/transcript-append.ts +++ b/src/config/sessions/transcript-append.ts @@ -158,7 +158,10 @@ async function migrateLinearTranscriptToParentLinked(transcriptPath: string): Pr return result; } -async function ensureTranscriptHeader(transcriptPath: string): Promise { +async function ensureTranscriptHeader( + transcriptPath: string, + params: { sessionId?: string; cwd?: string } = {}, +): Promise { const stat = await fs.stat(transcriptPath).catch(() => null); if (stat?.isFile() && stat.size > 0) { return; @@ -167,9 +170,9 @@ async function ensureTranscriptHeader(transcriptPath: string): Promise { const header = { type: "session", version: CURRENT_SESSION_VERSION, - id: randomUUID(), + id: params.sessionId ?? randomUUID(), timestamp: new Date().toISOString(), - cwd: process.cwd(), + cwd: params.cwd ?? process.cwd(), }; await fs.writeFile(transcriptPath, `${JSON.stringify(header)}\n`, { encoding: "utf-8", @@ -182,6 +185,8 @@ export async function appendSessionTranscriptMessage(params: { transcriptPath: string; message: unknown; now?: number; + sessionId?: string; + cwd?: string; useRawWhenLinear?: boolean; }): Promise<{ messageId: string }> { const lock = await acquireSessionWriteLock({ @@ -192,7 +197,10 @@ export async function appendSessionTranscriptMessage(params: { try { const now = params.now ?? Date.now(); const messageId = randomUUID(); - await ensureTranscriptHeader(params.transcriptPath); + await ensureTranscriptHeader(params.transcriptPath, { + ...(params.sessionId ? { sessionId: params.sessionId } : {}), + ...(params.cwd ? { cwd: params.cwd } : {}), + }); const stat = await fs.stat(params.transcriptPath).catch(() => null); let leafInfo: TranscriptLeafInfo = await readTranscriptLeafInfo(params.transcriptPath).catch( () => ({