refactor(agents): append text turns asynchronously

This commit is contained in:
Peter Steinberger
2026-05-02 04:42:03 +01:00
parent a93ce361ab
commit b08220446a
5 changed files with 142 additions and 47 deletions

View File

@@ -73,6 +73,23 @@ async function readSessionMessages(sessionFile: string) {
);
}
async function readSessionFileEntries(sessionFile: string) {
const raw = await fs.readFile(sessionFile, "utf-8");
return raw
.split(/\r?\n/)
.filter(Boolean)
.map(
(line) =>
JSON.parse(line) as {
type?: string;
id?: string;
parentId?: string | null;
cwd?: string;
message?: { role?: string };
},
);
}
describe("CLI attempt execution", () => {
let tmpDir: string;
let storePath: string;
@@ -374,6 +391,17 @@ describe("CLI attempt execution", () => {
const sessionFile = updatedEntry?.sessionFile;
expect(sessionFile).toBeTruthy();
const entries = await readSessionFileEntries(sessionFile!);
expect(entries[0]).toMatchObject({
type: "session",
id: sessionEntry.sessionId,
cwd: tmpDir,
});
expect(entries[1]).toMatchObject({ type: "message", parentId: null });
expect(entries[2]).toMatchObject({
type: "message",
parentId: entries[1]?.id,
});
const messages = await readSessionMessages(sessionFile!);
expect(messages).toHaveLength(2);
expect(messages[0]).toMatchObject({

View File

@@ -1,7 +1,6 @@
import fs from "node:fs/promises";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import { normalizeReplyPayload } from "../../auto-reply/reply/normalize-reply.js";
import type { ThinkLevel, VerboseLevel } from "../../auto-reply/thinking.js";
import { appendSessionTranscriptMessage } from "../../config/sessions/transcript-append.js";
import { resolveSessionTranscriptFile } from "../../config/sessions/transcript.js";
import type { SessionEntry } from "../../config/sessions/types.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
@@ -20,9 +19,9 @@ import { FailoverError } from "../failover-error.js";
import { resolveAgentHarnessPolicy } from "../harness/selection.js";
import { isCliRuntimeAlias, resolveCliRuntimeExecutionProvider } from "../model-runtime-aliases.js";
import { isCliProvider } from "../model-selection.js";
import { prepareSessionManagerForRun } from "../pi-embedded-runner/session-manager-init.js";
import { runEmbeddedPiAgent, type EmbeddedPiRunResult } from "../pi-embedded.js";
import { buildAgentRuntimeAuthPlan } from "../runtime-plan/auth.js";
import { acquireSessionWriteLock } from "../session-write-lock.js";
import { buildWorkspaceSkillSnapshot } from "../skills.js";
import { buildUsageWithNoCost } from "../stream-message-shared.js";
import {
@@ -194,38 +193,44 @@ async function persistTextTurnTranscript(
agentId: params.sessionAgentId,
threadId: params.threadId,
});
const hadSessionFile = await fs
.access(sessionFile)
.then(() => true)
.catch(() => false);
const sessionManager = SessionManager.open(sessionFile);
await prepareSessionManagerForRun({
sessionManager,
const lock = await acquireSessionWriteLock({
sessionFile,
hadSessionFile,
sessionId: params.sessionId,
cwd: params.sessionCwd,
timeoutMs: 10_000,
allowReentrant: true,
});
try {
if (promptText) {
await appendSessionTranscriptMessage({
transcriptPath: sessionFile,
sessionId: params.sessionId,
cwd: params.sessionCwd,
message: {
role: "user",
content: promptText,
timestamp: Date.now(),
},
});
}
if (promptText) {
sessionManager.appendMessage({
role: "user",
content: promptText,
timestamp: Date.now(),
});
}
if (replyText) {
sessionManager.appendMessage({
role: "assistant",
content: [{ type: "text", text: replyText }],
api: params.assistant.api,
provider: params.assistant.provider,
model: params.assistant.model,
usage: resolveTranscriptUsage(params.assistant.usage),
stopReason: "stop",
timestamp: Date.now(),
});
if (replyText) {
await appendSessionTranscriptMessage({
transcriptPath: sessionFile,
sessionId: params.sessionId,
cwd: params.sessionCwd,
message: {
role: "assistant",
content: [{ type: "text", text: replyText }],
api: params.assistant.api,
provider: params.assistant.provider,
model: params.assistant.model,
usage: resolveTranscriptUsage(params.assistant.usage),
stopReason: "stop",
timestamp: Date.now(),
},
});
}
} finally {
await lock.release();
}
emitSessionTranscriptUpdate(sessionFile);

View File

@@ -299,6 +299,8 @@ describe("sessions tools", () => {
params: {
activeMinutes: undefined,
agentId: "main",
includeDerivedTitles: false,
includeLastMessage: false,
includeGlobal: true,
includeUnknown: true,
label: "mailbox",
@@ -382,8 +384,8 @@ describe("sessions tools", () => {
callGatewayMock.mockImplementation(async (opts: unknown) => {
const request = opts as { method?: string; params?: Record<string, unknown> };
if (request.method === "sessions.list") {
expect(request.params?.includeDerivedTitles).toBeUndefined();
expect(request.params?.includeLastMessage).toBeUndefined();
expect(request.params?.includeDerivedTitles).toBe(false);
expect(request.params?.includeLastMessage).toBe(false);
return {
path: storePath,
sessions: [

View File

@@ -6,9 +6,13 @@ import {
resolveSessionFilePathOptions,
resolveStorePath,
} from "../../config/sessions.js";
import type { SessionEntry } from "../../config/sessions/types.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { callGateway } from "../../gateway/call.js";
import { deriveSessionTitle } from "../../gateway/session-utils.js";
import {
deriveSessionTitle,
readSessionTitleFieldsFromTranscriptAsync,
} from "../../gateway/session-utils.js";
import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js";
import { normalizeOptionalLowercaseString, readStringValue } from "../../shared/string-coerce.js";
import {
@@ -45,6 +49,8 @@ const SessionsListToolSchema = Type.Object({
type GatewayCaller = typeof callGateway;
const SESSIONS_LIST_TRANSCRIPT_FIELD_ROWS = 100;
function readSessionRunStatus(value: unknown): SessionRunStatus | undefined {
return value === "running" ||
value === "done" ||
@@ -109,6 +115,8 @@ export function createSessionsListTool(opts?: {
const includeDerivedTitles = params.includeDerivedTitles === true;
const includeLastMessage = params.includeLastMessage === true;
const gatewayCall = opts?.callGateway ?? callGateway;
const a2aPolicy = createAgentToAgentPolicy(cfg);
const hydrateTranscriptFieldsAfterFiltering = includeDerivedTitles || includeLastMessage;
const list = await gatewayCall<{ sessions: Array<SessionListRow>; path: string }>({
method: "sessions.list",
@@ -118,8 +126,8 @@ export function createSessionsListTool(opts?: {
label,
agentId,
search,
includeDerivedTitles,
includeLastMessage,
includeDerivedTitles: false,
includeLastMessage: false,
includeGlobal: !restrictToSpawned,
includeUnknown: !restrictToSpawned,
spawnedBy: restrictToSpawned ? effectiveRequesterKey : undefined,
@@ -128,7 +136,6 @@ export function createSessionsListTool(opts?: {
const sessions = Array.isArray(list?.sessions) ? list.sessions : [];
const storePath = typeof list?.path === "string" ? list.path : undefined;
const a2aPolicy = createAgentToAgentPolicy(cfg);
const visibilityGuard = await createSessionVisibilityGuard({
action: "list",
requesterSessionKey: effectiveRequesterKey,
@@ -137,6 +144,13 @@ export function createSessionsListTool(opts?: {
});
const rows: SessionListRow[] = [];
const historyTargets: Array<{ row: SessionListRow; resolvedKey: string }> = [];
const titleTargets: Array<{
row: SessionListRow;
titleEntry: SessionEntry;
sessionId: string;
sessionFile?: string;
agentId: string;
}> = [];
for (const entry of sessions) {
if (!entry || typeof entry !== "object") {
@@ -310,17 +324,24 @@ export function createSessionsListTool(opts?: {
lastAccountId,
transcriptPath,
};
if (sessionId && includeDerivedTitles && !row.derivedTitle) {
row.derivedTitle = deriveSessionTitle(
{
if (
sessionId &&
hydrateTranscriptFieldsAfterFiltering &&
titleTargets.length < SESSIONS_LIST_TRANSCRIPT_FIELD_ROWS
) {
titleTargets.push({
row,
titleEntry: {
sessionId,
displayName: row.displayName,
label: row.label,
subject: readStringValue((entry as { subject?: unknown }).subject),
updatedAt: typeof row.updatedAt === "number" ? row.updatedAt : 0,
},
undefined,
);
sessionId,
...(sessionFile ? { sessionFile } : {}),
agentId: resolvedAgentId,
});
}
if (messageLimit > 0) {
const resolvedKey = resolveInternalSessionKey({
@@ -333,6 +354,37 @@ export function createSessionsListTool(opts?: {
rows.push(row);
}
if (titleTargets.length > 0) {
const maxConcurrent = Math.min(4, titleTargets.length);
let index = 0;
const worker = async () => {
while (true) {
const next = index;
index += 1;
if (next >= titleTargets.length) {
return;
}
const target = titleTargets[next];
const fields = await readSessionTitleFieldsFromTranscriptAsync(
target.sessionId,
storePath,
target.sessionFile,
target.agentId,
);
if (includeDerivedTitles && !target.row.derivedTitle) {
target.row.derivedTitle = deriveSessionTitle(
target.titleEntry,
fields.firstUserMessage,
);
}
if (includeLastMessage && fields.lastMessagePreview) {
target.row.lastMessagePreview = fields.lastMessagePreview;
}
}
};
await Promise.all(Array.from({ length: maxConcurrent }, () => worker()));
}
if (messageLimit > 0 && historyTargets.length > 0) {
const maxConcurrent = Math.min(4, historyTargets.length);
let index = 0;

View File

@@ -158,7 +158,10 @@ async function migrateLinearTranscriptToParentLinked(transcriptPath: string): Pr
return result;
}
async function ensureTranscriptHeader(transcriptPath: string): Promise<void> {
async function ensureTranscriptHeader(
transcriptPath: string,
params: { sessionId?: string; cwd?: string } = {},
): Promise<void> {
const stat = await fs.stat(transcriptPath).catch(() => null);
if (stat?.isFile() && stat.size > 0) {
return;
@@ -167,9 +170,9 @@ async function ensureTranscriptHeader(transcriptPath: string): Promise<void> {
const header = {
type: "session",
version: CURRENT_SESSION_VERSION,
id: randomUUID(),
id: params.sessionId ?? randomUUID(),
timestamp: new Date().toISOString(),
cwd: process.cwd(),
cwd: params.cwd ?? process.cwd(),
};
await fs.writeFile(transcriptPath, `${JSON.stringify(header)}\n`, {
encoding: "utf-8",
@@ -182,6 +185,8 @@ export async function appendSessionTranscriptMessage(params: {
transcriptPath: string;
message: unknown;
now?: number;
sessionId?: string;
cwd?: string;
useRawWhenLinear?: boolean;
}): Promise<{ messageId: string }> {
const lock = await acquireSessionWriteLock({
@@ -192,7 +197,10 @@ export async function appendSessionTranscriptMessage(params: {
try {
const now = params.now ?? Date.now();
const messageId = randomUUID();
await ensureTranscriptHeader(params.transcriptPath);
await ensureTranscriptHeader(params.transcriptPath, {
...(params.sessionId ? { sessionId: params.sessionId } : {}),
...(params.cwd ? { cwd: params.cwd } : {}),
});
const stat = await fs.stat(params.transcriptPath).catch(() => null);
let leafInfo: TranscriptLeafInfo = await readTranscriptLeafInfo(params.transcriptPath).catch(
() => ({