From 9fdcc03ff87ea69d83441b3cbbdb82b08a0cdbe1 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 2 May 2026 04:13:32 +0100 Subject: [PATCH] refactor(agents): read btw context asynchronously Read /btw transcript context through the async parser path while preserving active snapshot leaf selection. --- src/agents/btw-transcript.ts | 135 +++++++++++++++++++++++++++++++++++ src/agents/btw.test.ts | 117 +++++++++++++++++++++--------- src/agents/btw.ts | 81 ++++----------------- 3 files changed, 232 insertions(+), 101 deletions(-) create mode 100644 src/agents/btw-transcript.ts diff --git a/src/agents/btw-transcript.ts b/src/agents/btw-transcript.ts new file mode 100644 index 00000000000..486dbbfb6c0 --- /dev/null +++ b/src/agents/btw-transcript.ts @@ -0,0 +1,135 @@ +import { readFile } from "node:fs/promises"; +import { + buildSessionContext, + migrateSessionEntries, + parseSessionEntries, + type SessionEntry as PiSessionEntry, +} from "@mariozechner/pi-coding-agent"; +import { + resolveSessionFilePath, + resolveSessionFilePathOptions, + type SessionEntry as StoredSessionEntry, +} from "../config/sessions.js"; +import { diagnosticLogger as diag } from "../logging/diagnostic.js"; + +export function resolveBtwSessionTranscriptPath(params: { + sessionId: string; + sessionEntry?: StoredSessionEntry; + sessionKey?: string; + storePath?: string; +}): string | undefined { + try { + const agentId = params.sessionKey?.split(":")[1]; + const pathOpts = resolveSessionFilePathOptions({ + agentId, + storePath: params.storePath, + }); + return resolveSessionFilePath(params.sessionId, params.sessionEntry, pathOpts); + } catch (error) { + diag.debug( + `resolveSessionTranscriptPath failed: sessionId=${params.sessionId} err=${String(error)}`, + ); + return undefined; + } +} + +function readSessionEntryId(entry: PiSessionEntry): string | undefined { + const id = (entry as { id?: unknown }).id; + return typeof id === "string" && id.trim().length > 0 ? id : undefined; +} + +function readSessionEntryParentId(entry: PiSessionEntry): string | null | undefined { + const parentId = (entry as { parentId?: unknown }).parentId; + if (parentId === null) { + return null; + } + return typeof parentId === "string" && parentId.trim().length > 0 ? parentId : undefined; +} + +function hasParentLinkedEntries(entries: PiSessionEntry[]): boolean { + return entries.some((entry) => Boolean(readSessionEntryId(entry) && "parentId" in entry)); +} + +function buildSessionBranchEntries( + entries: PiSessionEntry[], + leafId: string | undefined, +): PiSessionEntry[] | undefined { + if (!leafId) { + return undefined; + } + const byId = new Map(); + for (const entry of entries) { + const id = readSessionEntryId(entry); + if (id) { + byId.set(id, entry); + } + } + const branch: PiSessionEntry[] = []; + const seen = new Set(); + let currentId: string | undefined = leafId; + while (currentId) { + if (seen.has(currentId)) { + return undefined; + } + seen.add(currentId); + const entry = byId.get(currentId); + if (!entry) { + return undefined; + } + branch.push(entry); + currentId = readSessionEntryParentId(entry) ?? undefined; + } + return branch.toReversed(); +} + +function readDefaultLeafId(entries: PiSessionEntry[]): string | undefined { + for (let index = entries.length - 1; index >= 0; index -= 1) { + const id = readSessionEntryId(entries[index]); + if (id) { + return id; + } + } + return undefined; +} + +function isTrailingUserMessage(entry: PiSessionEntry | undefined): boolean { + return ( + entry?.type === "message" && + (entry as { message?: { role?: unknown } }).message?.role === "user" + ); +} + +export async function readBtwTranscriptMessages(params: { + sessionFile: string; + sessionId: string; + snapshotLeafId?: string | null; +}): Promise { + try { + const entries = parseSessionEntries(await readFile(params.sessionFile, "utf-8")); + migrateSessionEntries(entries); + const sessionEntries = entries.filter( + (entry): entry is PiSessionEntry => entry.type !== "session", + ); + if (!hasParentLinkedEntries(sessionEntries)) { + return buildSessionContext(sessionEntries).messages; + } + + let branchEntries = params.snapshotLeafId + ? buildSessionBranchEntries(sessionEntries, params.snapshotLeafId) + : undefined; + if (params.snapshotLeafId && !branchEntries) { + diag.debug( + `btw snapshot leaf unavailable: sessionId=${params.sessionId} leaf=${params.snapshotLeafId}`, + ); + } + branchEntries ??= buildSessionBranchEntries(sessionEntries, readDefaultLeafId(sessionEntries)); + if (!params.snapshotLeafId && isTrailingUserMessage(branchEntries?.at(-1))) { + const parentId = readSessionEntryParentId(branchEntries!.at(-1)!); + branchEntries = parentId ? (buildSessionBranchEntries(sessionEntries, parentId) ?? []) : []; + } + const sessionContext = buildSessionContext(branchEntries ?? sessionEntries); + return Array.isArray(sessionContext.messages) ? sessionContext.messages : []; + } catch { + return []; + } +} diff --git a/src/agents/btw.test.ts b/src/agents/btw.test.ts index 2bcc19260e3..e83b3f179d3 100644 --- a/src/agents/btw.test.ts +++ b/src/agents/btw.test.ts @@ -2,10 +2,10 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import type { SessionEntry } from "../config/sessions.js"; const streamSimpleMock = vi.fn(); +const readFileMock = vi.fn(); +const parseSessionEntriesMock = vi.fn(); +const migrateSessionEntriesMock = vi.fn(); const buildSessionContextMock = vi.fn(); -const getLeafEntryMock = vi.fn(); -const branchMock = vi.fn(); -const resetLeafMock = vi.fn(); const ensureOpenClawModelsJsonMock = vi.fn(); const discoverAuthStorageMock = vi.fn(); const discoverModelsMock = vi.fn(); @@ -29,16 +29,18 @@ vi.mock("@mariozechner/pi-ai", async () => { }; }); -vi.mock("@mariozechner/pi-coding-agent", () => ({ - generateSummary: vi.fn(async () => "summary"), - SessionManager: { - open: () => ({ - getLeafEntry: getLeafEntryMock, - branch: branchMock, - resetLeaf: resetLeafMock, - buildSessionContext: buildSessionContextMock, - }), +vi.mock("node:fs/promises", () => ({ + default: { + readFile: (...args: unknown[]) => readFileMock(...args), }, + readFile: (...args: unknown[]) => readFileMock(...args), +})); + +vi.mock("@mariozechner/pi-coding-agent", () => ({ + buildSessionContext: (...args: unknown[]) => buildSessionContextMock(...args), + generateSummary: vi.fn(async () => "summary"), + migrateSessionEntries: (...args: unknown[]) => migrateSessionEntriesMock(...args), + parseSessionEntries: (...args: unknown[]) => parseSessionEntriesMock(...args), })); vi.mock("./models-config.js", () => ({ @@ -216,6 +218,19 @@ function createAssistantTranscriptMessage( }; } +function createTranscriptEntry(params: { id: string; parentId?: string | null; message: unknown }) { + return { + type: "message", + id: params.id, + parentId: params.parentId ?? null, + message: params.message, + }; +} + +function mockTranscriptEntries(entries: unknown[]) { + parseSessionEntriesMock.mockReturnValue(entries); +} + function mockActiveTranscript(messages: unknown[]) { getActiveEmbeddedRunSnapshotMock.mockReturnValue({ transcriptLeafId: "assistant-1", @@ -266,10 +281,10 @@ function expectSeedOnlyUserContext(context: unknown) { describe("runBtwSideQuestion", () => { beforeEach(() => { streamSimpleMock.mockReset(); + readFileMock.mockReset(); + parseSessionEntriesMock.mockReset(); + migrateSessionEntriesMock.mockReset(); buildSessionContextMock.mockReset(); - getLeafEntryMock.mockReset(); - branchMock.mockReset(); - resetLeafMock.mockReset(); ensureOpenClawModelsJsonMock.mockReset(); discoverAuthStorageMock.mockReset(); discoverModelsMock.mockReset(); @@ -284,10 +299,25 @@ describe("runBtwSideQuestion", () => { registerProviderStreamForModelMock.mockReset(); diagDebugMock.mockReset(); - buildSessionContextMock.mockReturnValue({ - messages: [{ role: "user", content: [{ type: "text", text: "hi" }], timestamp: 1 }], + readFileMock.mockResolvedValue("mock transcript"); + parseSessionEntriesMock.mockReturnValue([ + createTranscriptEntry({ + id: "user-1", + message: { role: "user", content: [{ type: "text", text: "hi" }], timestamp: 1 }, + }), + createTranscriptEntry({ + id: "assistant-1", + parentId: "user-1", + message: { + role: "assistant", + content: [{ type: "text", text: "hello" }], + timestamp: 2, + }, + }), + ]); + buildSessionContextMock.mockImplementation((entries: Array<{ message?: unknown }> = []) => { + return { messages: entries.flatMap((entry) => (entry.message ? [entry.message] : [])) }; }); - getLeafEntryMock.mockReturnValue(null); resolveModelWithRegistryMock.mockReturnValue({ provider: "anthropic", id: "claude-sonnet-4-6", @@ -662,22 +692,40 @@ describe("runBtwSideQuestion", () => { }); it("branches away from an unresolved trailing user turn before building BTW context", async () => { - getLeafEntryMock.mockReturnValue({ - type: "message", - parentId: "assistant-1", - message: { role: "user" }, + const assistantEntry = createTranscriptEntry({ + id: "assistant-1", + message: createAssistantTranscriptMessage([{ type: "text", text: "seed answer" }]), }); + const trailingUserEntry = createTranscriptEntry({ + id: "user-2", + parentId: "assistant-1", + message: createUserTranscriptMessage([{ type: "text", text: "unfinished task" }]), + }); + mockTranscriptEntries([assistantEntry, trailingUserEntry]); mockDoneAnswer(MATH_ANSWER); const result = await runMathSideQuestion(); - expect(branchMock).toHaveBeenCalledWith("assistant-1"); - expect(resetLeafMock).not.toHaveBeenCalled(); - expect(buildSessionContextMock).toHaveBeenCalledTimes(1); + expect(buildSessionContextMock).toHaveBeenCalledWith([assistantEntry]); expect(result).toEqual({ text: MATH_ANSWER }); }); it("branches to the active run snapshot leaf when the session is busy", async () => { + const userEntry = createTranscriptEntry({ + id: "user-seed", + message: createUserTranscriptMessage(), + }); + const assistantEntry = createTranscriptEntry({ + id: "assistant-seed", + parentId: "user-seed", + message: createAssistantTranscriptMessage([{ type: "text", text: "seed answer" }]), + }); + const newerEntry = createTranscriptEntry({ + id: "newer-user", + parentId: "assistant-seed", + message: createUserTranscriptMessage([{ type: "text", text: "newer unfinished task" }]), + }); + mockTranscriptEntries([userEntry, assistantEntry, newerEntry]); getActiveEmbeddedRunSnapshotMock.mockReturnValue({ transcriptLeafId: "assistant-seed", }); @@ -685,24 +733,29 @@ describe("runBtwSideQuestion", () => { const result = await runMathSideQuestion(); - expect(branchMock).toHaveBeenCalledWith("assistant-seed"); - expect(getLeafEntryMock).not.toHaveBeenCalled(); + expect(buildSessionContextMock).toHaveBeenCalledWith([userEntry, assistantEntry]); expect(result).toEqual({ text: MATH_ANSWER }); }); it("falls back when the active run snapshot leaf no longer exists", async () => { + const userEntry = createTranscriptEntry({ + id: "user-seed", + message: createUserTranscriptMessage(), + }); + const assistantEntry = createTranscriptEntry({ + id: "assistant-seed", + parentId: "user-seed", + message: createAssistantTranscriptMessage([{ type: "text", text: "seed answer" }]), + }); + mockTranscriptEntries([userEntry, assistantEntry]); getActiveEmbeddedRunSnapshotMock.mockReturnValue({ transcriptLeafId: "assistant-gone", }); - branchMock.mockImplementationOnce(() => { - throw new Error("Entry 3235c7c4 not found"); - }); mockDoneAnswer(MATH_ANSWER); const result = await runMathSideQuestion(); - expect(branchMock).toHaveBeenCalledWith("assistant-gone"); - expect(resetLeafMock).toHaveBeenCalled(); + expect(buildSessionContextMock).toHaveBeenCalledWith([userEntry, assistantEntry]); expect(result).toEqual({ text: MATH_ANSWER }); expect(diagDebugMock).toHaveBeenCalledWith( expect.stringContaining("btw snapshot leaf unavailable: sessionId=session-1"), diff --git a/src/agents/btw.ts b/src/agents/btw.ts index 40199e560da..43bb5e0f444 100644 --- a/src/agents/btw.ts +++ b/src/agents/btw.ts @@ -7,21 +7,16 @@ import { type Model, type TextContent, } from "@mariozechner/pi-ai"; -import { SessionManager } from "@mariozechner/pi-coding-agent"; import type { GetReplyOptions } from "../auto-reply/get-reply-options.types.js"; import type { ReplyPayload } from "../auto-reply/reply-payload.js"; import type { ReasoningLevel, ThinkLevel } from "../auto-reply/thinking.js"; -import { - resolveSessionFilePath, - resolveSessionFilePathOptions, - type SessionEntry, -} from "../config/sessions.js"; +import type { SessionEntry as StoredSessionEntry } from "../config/sessions.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; -import { diagnosticLogger as diag } from "../logging/diagnostic.js"; import { prepareProviderRuntimeAuth } from "../plugins/provider-runtime.js"; import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js"; import { resolveAgentWorkspaceDir, resolveSessionAgentId } from "./agent-scope.js"; import { resolveSessionAuthProfileOverride } from "./auth-profiles/session-override.js"; +import { readBtwTranscriptMessages, resolveBtwSessionTranscriptPath } from "./btw-transcript.js"; import { resolveImageSanitizationLimits, type ImageSanitizationLimits, @@ -37,18 +32,6 @@ import { registerProviderStreamForModel } from "./provider-stream.js"; import { stripToolResultDetails } from "./session-transcript-repair.js"; import { sanitizeImageBlocks } from "./tool-images.js"; -type SessionManagerLike = { - getLeafEntry?: () => { - id?: string; - type?: string; - parentId?: string | null; - message?: { role?: string }; - } | null; - branch?: (parentId: string) => void; - resetLeaf?: () => void; - buildSessionContext: () => { messages?: unknown[] }; -}; - function collectTextContent(content: Array<{ type?: string; text?: string }>): string { return content .filter((part): part is { type: "text"; text: string } => part.type === "text") @@ -228,34 +211,13 @@ async function toSimpleContextMessages(params: { ) as Message[]; } -function resolveSessionTranscriptPath(params: { - sessionId: string; - sessionEntry?: SessionEntry; - sessionKey?: string; - storePath?: string; -}): string | undefined { - try { - const agentId = params.sessionKey?.split(":")[1]; - const pathOpts = resolveSessionFilePathOptions({ - agentId, - storePath: params.storePath, - }); - return resolveSessionFilePath(params.sessionId, params.sessionEntry, pathOpts); - } catch (error) { - diag.debug( - `resolveSessionTranscriptPath failed: sessionId=${params.sessionId} err=${String(error)}`, - ); - return undefined; - } -} - async function resolveRuntimeModel(params: { cfg: OpenClawConfig; provider: string; model: string; agentDir: string; - sessionEntry?: SessionEntry; - sessionStore?: Record; + sessionEntry?: StoredSessionEntry; + sessionStore?: Record; sessionKey?: string; storePath?: string; isNewSession: boolean; @@ -300,8 +262,8 @@ type RunBtwSideQuestionParams = { provider: string; model: string; question: string; - sessionEntry: SessionEntry; - sessionStore?: Record; + sessionEntry: StoredSessionEntry; + sessionStore?: Record; sessionKey?: string; storePath?: string; resolvedThinkLevel?: ThinkLevel; @@ -320,7 +282,7 @@ export async function runBtwSideQuestion( throw new Error("No active session context."); } - const sessionFile = resolveSessionTranscriptPath({ + const sessionFile = resolveBtwSessionTranscriptPath({ sessionId, sessionEntry: params.sessionEntry, sessionKey: params.sessionKey, @@ -330,7 +292,6 @@ export async function runBtwSideQuestion( throw new Error("No active session transcript."); } - const sessionManager = SessionManager.open(sessionFile) as SessionManagerLike; const activeRunSnapshot = getActiveEmbeddedRunSnapshot(sessionId); const imageLimits = resolveImageSanitizationLimits(params.cfg); let messages: Message[] = []; @@ -343,32 +304,14 @@ export async function runBtwSideQuestion( inFlightPrompt = activeRunSnapshot.inFlightPrompt; } else if (activeRunSnapshot) { inFlightPrompt = activeRunSnapshot.inFlightPrompt; - if (activeRunSnapshot.transcriptLeafId && sessionManager.branch) { - try { - sessionManager.branch(activeRunSnapshot.transcriptLeafId); - } catch (error) { - diag.debug( - `btw snapshot leaf unavailable: sessionId=${sessionId} leaf=${activeRunSnapshot.transcriptLeafId} err=${String(error)}`, - ); - sessionManager.resetLeaf?.(); - } - } else { - sessionManager.resetLeaf?.(); - } - } else { - const leafEntry = sessionManager.getLeafEntry?.(); - if (leafEntry?.type === "message" && leafEntry.message?.role === "user") { - if (leafEntry.parentId && sessionManager.branch) { - sessionManager.branch(leafEntry.parentId); - } else { - sessionManager.resetLeaf?.(); - } - } } if (messages.length === 0) { - const sessionContext = sessionManager.buildSessionContext(); messages = await toSimpleContextMessages({ - messages: Array.isArray(sessionContext.messages) ? sessionContext.messages : [], + messages: await readBtwTranscriptMessages({ + sessionFile, + sessionId, + snapshotLeafId: activeRunSnapshot?.transcriptLeafId, + }), imageLimits, }); }