refactor(agents): read btw context asynchronously

Read /btw transcript context through the async parser path while preserving active snapshot leaf selection.
This commit is contained in:
Peter Steinberger
2026-05-02 04:13:32 +01:00
committed by GitHub
parent f4ef1bf04e
commit 9fdcc03ff8
3 changed files with 232 additions and 101 deletions

View File

@@ -0,0 +1,135 @@
import { readFile } from "node:fs/promises";
import {
buildSessionContext,
migrateSessionEntries,
parseSessionEntries,
type SessionEntry as PiSessionEntry,
} from "@mariozechner/pi-coding-agent";
import {
resolveSessionFilePath,
resolveSessionFilePathOptions,
type SessionEntry as StoredSessionEntry,
} from "../config/sessions.js";
import { diagnosticLogger as diag } from "../logging/diagnostic.js";
export function resolveBtwSessionTranscriptPath(params: {
sessionId: string;
sessionEntry?: StoredSessionEntry;
sessionKey?: string;
storePath?: string;
}): string | undefined {
try {
const agentId = params.sessionKey?.split(":")[1];
const pathOpts = resolveSessionFilePathOptions({
agentId,
storePath: params.storePath,
});
return resolveSessionFilePath(params.sessionId, params.sessionEntry, pathOpts);
} catch (error) {
diag.debug(
`resolveSessionTranscriptPath failed: sessionId=${params.sessionId} err=${String(error)}`,
);
return undefined;
}
}
function readSessionEntryId(entry: PiSessionEntry): string | undefined {
const id = (entry as { id?: unknown }).id;
return typeof id === "string" && id.trim().length > 0 ? id : undefined;
}
function readSessionEntryParentId(entry: PiSessionEntry): string | null | undefined {
const parentId = (entry as { parentId?: unknown }).parentId;
if (parentId === null) {
return null;
}
return typeof parentId === "string" && parentId.trim().length > 0 ? parentId : undefined;
}
function hasParentLinkedEntries(entries: PiSessionEntry[]): boolean {
return entries.some((entry) => Boolean(readSessionEntryId(entry) && "parentId" in entry));
}
function buildSessionBranchEntries(
entries: PiSessionEntry[],
leafId: string | undefined,
): PiSessionEntry[] | undefined {
if (!leafId) {
return undefined;
}
const byId = new Map<string, PiSessionEntry>();
for (const entry of entries) {
const id = readSessionEntryId(entry);
if (id) {
byId.set(id, entry);
}
}
const branch: PiSessionEntry[] = [];
const seen = new Set<string>();
let currentId: string | undefined = leafId;
while (currentId) {
if (seen.has(currentId)) {
return undefined;
}
seen.add(currentId);
const entry = byId.get(currentId);
if (!entry) {
return undefined;
}
branch.push(entry);
currentId = readSessionEntryParentId(entry) ?? undefined;
}
return branch.toReversed();
}
function readDefaultLeafId(entries: PiSessionEntry[]): string | undefined {
for (let index = entries.length - 1; index >= 0; index -= 1) {
const id = readSessionEntryId(entries[index]);
if (id) {
return id;
}
}
return undefined;
}
function isTrailingUserMessage(entry: PiSessionEntry | undefined): boolean {
return (
entry?.type === "message" &&
(entry as { message?: { role?: unknown } }).message?.role === "user"
);
}
export async function readBtwTranscriptMessages(params: {
sessionFile: string;
sessionId: string;
snapshotLeafId?: string | null;
}): Promise<unknown[]> {
try {
const entries = parseSessionEntries(await readFile(params.sessionFile, "utf-8"));
migrateSessionEntries(entries);
const sessionEntries = entries.filter(
(entry): entry is PiSessionEntry => entry.type !== "session",
);
if (!hasParentLinkedEntries(sessionEntries)) {
return buildSessionContext(sessionEntries).messages;
}
let branchEntries = params.snapshotLeafId
? buildSessionBranchEntries(sessionEntries, params.snapshotLeafId)
: undefined;
if (params.snapshotLeafId && !branchEntries) {
diag.debug(
`btw snapshot leaf unavailable: sessionId=${params.sessionId} leaf=${params.snapshotLeafId}`,
);
}
branchEntries ??= buildSessionBranchEntries(sessionEntries, readDefaultLeafId(sessionEntries));
if (!params.snapshotLeafId && isTrailingUserMessage(branchEntries?.at(-1))) {
const parentId = readSessionEntryParentId(branchEntries!.at(-1)!);
branchEntries = parentId ? (buildSessionBranchEntries(sessionEntries, parentId) ?? []) : [];
}
const sessionContext = buildSessionContext(branchEntries ?? sessionEntries);
return Array.isArray(sessionContext.messages) ? sessionContext.messages : [];
} catch {
return [];
}
}

View File

@@ -2,10 +2,10 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
import type { SessionEntry } from "../config/sessions.js";
const streamSimpleMock = vi.fn();
const readFileMock = vi.fn();
const parseSessionEntriesMock = vi.fn();
const migrateSessionEntriesMock = vi.fn();
const buildSessionContextMock = vi.fn();
const getLeafEntryMock = vi.fn();
const branchMock = vi.fn();
const resetLeafMock = vi.fn();
const ensureOpenClawModelsJsonMock = vi.fn();
const discoverAuthStorageMock = vi.fn();
const discoverModelsMock = vi.fn();
@@ -29,16 +29,18 @@ vi.mock("@mariozechner/pi-ai", async () => {
};
});
vi.mock("@mariozechner/pi-coding-agent", () => ({
generateSummary: vi.fn(async () => "summary"),
SessionManager: {
open: () => ({
getLeafEntry: getLeafEntryMock,
branch: branchMock,
resetLeaf: resetLeafMock,
buildSessionContext: buildSessionContextMock,
}),
vi.mock("node:fs/promises", () => ({
default: {
readFile: (...args: unknown[]) => readFileMock(...args),
},
readFile: (...args: unknown[]) => readFileMock(...args),
}));
vi.mock("@mariozechner/pi-coding-agent", () => ({
buildSessionContext: (...args: unknown[]) => buildSessionContextMock(...args),
generateSummary: vi.fn(async () => "summary"),
migrateSessionEntries: (...args: unknown[]) => migrateSessionEntriesMock(...args),
parseSessionEntries: (...args: unknown[]) => parseSessionEntriesMock(...args),
}));
vi.mock("./models-config.js", () => ({
@@ -216,6 +218,19 @@ function createAssistantTranscriptMessage(
};
}
function createTranscriptEntry(params: { id: string; parentId?: string | null; message: unknown }) {
return {
type: "message",
id: params.id,
parentId: params.parentId ?? null,
message: params.message,
};
}
function mockTranscriptEntries(entries: unknown[]) {
parseSessionEntriesMock.mockReturnValue(entries);
}
function mockActiveTranscript(messages: unknown[]) {
getActiveEmbeddedRunSnapshotMock.mockReturnValue({
transcriptLeafId: "assistant-1",
@@ -266,10 +281,10 @@ function expectSeedOnlyUserContext(context: unknown) {
describe("runBtwSideQuestion", () => {
beforeEach(() => {
streamSimpleMock.mockReset();
readFileMock.mockReset();
parseSessionEntriesMock.mockReset();
migrateSessionEntriesMock.mockReset();
buildSessionContextMock.mockReset();
getLeafEntryMock.mockReset();
branchMock.mockReset();
resetLeafMock.mockReset();
ensureOpenClawModelsJsonMock.mockReset();
discoverAuthStorageMock.mockReset();
discoverModelsMock.mockReset();
@@ -284,10 +299,25 @@ describe("runBtwSideQuestion", () => {
registerProviderStreamForModelMock.mockReset();
diagDebugMock.mockReset();
buildSessionContextMock.mockReturnValue({
messages: [{ role: "user", content: [{ type: "text", text: "hi" }], timestamp: 1 }],
readFileMock.mockResolvedValue("mock transcript");
parseSessionEntriesMock.mockReturnValue([
createTranscriptEntry({
id: "user-1",
message: { role: "user", content: [{ type: "text", text: "hi" }], timestamp: 1 },
}),
createTranscriptEntry({
id: "assistant-1",
parentId: "user-1",
message: {
role: "assistant",
content: [{ type: "text", text: "hello" }],
timestamp: 2,
},
}),
]);
buildSessionContextMock.mockImplementation((entries: Array<{ message?: unknown }> = []) => {
return { messages: entries.flatMap((entry) => (entry.message ? [entry.message] : [])) };
});
getLeafEntryMock.mockReturnValue(null);
resolveModelWithRegistryMock.mockReturnValue({
provider: "anthropic",
id: "claude-sonnet-4-6",
@@ -662,22 +692,40 @@ describe("runBtwSideQuestion", () => {
});
it("branches away from an unresolved trailing user turn before building BTW context", async () => {
getLeafEntryMock.mockReturnValue({
type: "message",
parentId: "assistant-1",
message: { role: "user" },
const assistantEntry = createTranscriptEntry({
id: "assistant-1",
message: createAssistantTranscriptMessage([{ type: "text", text: "seed answer" }]),
});
const trailingUserEntry = createTranscriptEntry({
id: "user-2",
parentId: "assistant-1",
message: createUserTranscriptMessage([{ type: "text", text: "unfinished task" }]),
});
mockTranscriptEntries([assistantEntry, trailingUserEntry]);
mockDoneAnswer(MATH_ANSWER);
const result = await runMathSideQuestion();
expect(branchMock).toHaveBeenCalledWith("assistant-1");
expect(resetLeafMock).not.toHaveBeenCalled();
expect(buildSessionContextMock).toHaveBeenCalledTimes(1);
expect(buildSessionContextMock).toHaveBeenCalledWith([assistantEntry]);
expect(result).toEqual({ text: MATH_ANSWER });
});
it("branches to the active run snapshot leaf when the session is busy", async () => {
const userEntry = createTranscriptEntry({
id: "user-seed",
message: createUserTranscriptMessage(),
});
const assistantEntry = createTranscriptEntry({
id: "assistant-seed",
parentId: "user-seed",
message: createAssistantTranscriptMessage([{ type: "text", text: "seed answer" }]),
});
const newerEntry = createTranscriptEntry({
id: "newer-user",
parentId: "assistant-seed",
message: createUserTranscriptMessage([{ type: "text", text: "newer unfinished task" }]),
});
mockTranscriptEntries([userEntry, assistantEntry, newerEntry]);
getActiveEmbeddedRunSnapshotMock.mockReturnValue({
transcriptLeafId: "assistant-seed",
});
@@ -685,24 +733,29 @@ describe("runBtwSideQuestion", () => {
const result = await runMathSideQuestion();
expect(branchMock).toHaveBeenCalledWith("assistant-seed");
expect(getLeafEntryMock).not.toHaveBeenCalled();
expect(buildSessionContextMock).toHaveBeenCalledWith([userEntry, assistantEntry]);
expect(result).toEqual({ text: MATH_ANSWER });
});
it("falls back when the active run snapshot leaf no longer exists", async () => {
const userEntry = createTranscriptEntry({
id: "user-seed",
message: createUserTranscriptMessage(),
});
const assistantEntry = createTranscriptEntry({
id: "assistant-seed",
parentId: "user-seed",
message: createAssistantTranscriptMessage([{ type: "text", text: "seed answer" }]),
});
mockTranscriptEntries([userEntry, assistantEntry]);
getActiveEmbeddedRunSnapshotMock.mockReturnValue({
transcriptLeafId: "assistant-gone",
});
branchMock.mockImplementationOnce(() => {
throw new Error("Entry 3235c7c4 not found");
});
mockDoneAnswer(MATH_ANSWER);
const result = await runMathSideQuestion();
expect(branchMock).toHaveBeenCalledWith("assistant-gone");
expect(resetLeafMock).toHaveBeenCalled();
expect(buildSessionContextMock).toHaveBeenCalledWith([userEntry, assistantEntry]);
expect(result).toEqual({ text: MATH_ANSWER });
expect(diagDebugMock).toHaveBeenCalledWith(
expect.stringContaining("btw snapshot leaf unavailable: sessionId=session-1"),

View File

@@ -7,21 +7,16 @@ import {
type Model,
type TextContent,
} from "@mariozechner/pi-ai";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import type { GetReplyOptions } from "../auto-reply/get-reply-options.types.js";
import type { ReplyPayload } from "../auto-reply/reply-payload.js";
import type { ReasoningLevel, ThinkLevel } from "../auto-reply/thinking.js";
import {
resolveSessionFilePath,
resolveSessionFilePathOptions,
type SessionEntry,
} from "../config/sessions.js";
import type { SessionEntry as StoredSessionEntry } from "../config/sessions.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { diagnosticLogger as diag } from "../logging/diagnostic.js";
import { prepareProviderRuntimeAuth } from "../plugins/provider-runtime.js";
import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js";
import { resolveAgentWorkspaceDir, resolveSessionAgentId } from "./agent-scope.js";
import { resolveSessionAuthProfileOverride } from "./auth-profiles/session-override.js";
import { readBtwTranscriptMessages, resolveBtwSessionTranscriptPath } from "./btw-transcript.js";
import {
resolveImageSanitizationLimits,
type ImageSanitizationLimits,
@@ -37,18 +32,6 @@ import { registerProviderStreamForModel } from "./provider-stream.js";
import { stripToolResultDetails } from "./session-transcript-repair.js";
import { sanitizeImageBlocks } from "./tool-images.js";
type SessionManagerLike = {
getLeafEntry?: () => {
id?: string;
type?: string;
parentId?: string | null;
message?: { role?: string };
} | null;
branch?: (parentId: string) => void;
resetLeaf?: () => void;
buildSessionContext: () => { messages?: unknown[] };
};
function collectTextContent(content: Array<{ type?: string; text?: string }>): string {
return content
.filter((part): part is { type: "text"; text: string } => part.type === "text")
@@ -228,34 +211,13 @@ async function toSimpleContextMessages(params: {
) as Message[];
}
function resolveSessionTranscriptPath(params: {
sessionId: string;
sessionEntry?: SessionEntry;
sessionKey?: string;
storePath?: string;
}): string | undefined {
try {
const agentId = params.sessionKey?.split(":")[1];
const pathOpts = resolveSessionFilePathOptions({
agentId,
storePath: params.storePath,
});
return resolveSessionFilePath(params.sessionId, params.sessionEntry, pathOpts);
} catch (error) {
diag.debug(
`resolveSessionTranscriptPath failed: sessionId=${params.sessionId} err=${String(error)}`,
);
return undefined;
}
}
async function resolveRuntimeModel(params: {
cfg: OpenClawConfig;
provider: string;
model: string;
agentDir: string;
sessionEntry?: SessionEntry;
sessionStore?: Record<string, SessionEntry>;
sessionEntry?: StoredSessionEntry;
sessionStore?: Record<string, StoredSessionEntry>;
sessionKey?: string;
storePath?: string;
isNewSession: boolean;
@@ -300,8 +262,8 @@ type RunBtwSideQuestionParams = {
provider: string;
model: string;
question: string;
sessionEntry: SessionEntry;
sessionStore?: Record<string, SessionEntry>;
sessionEntry: StoredSessionEntry;
sessionStore?: Record<string, StoredSessionEntry>;
sessionKey?: string;
storePath?: string;
resolvedThinkLevel?: ThinkLevel;
@@ -320,7 +282,7 @@ export async function runBtwSideQuestion(
throw new Error("No active session context.");
}
const sessionFile = resolveSessionTranscriptPath({
const sessionFile = resolveBtwSessionTranscriptPath({
sessionId,
sessionEntry: params.sessionEntry,
sessionKey: params.sessionKey,
@@ -330,7 +292,6 @@ export async function runBtwSideQuestion(
throw new Error("No active session transcript.");
}
const sessionManager = SessionManager.open(sessionFile) as SessionManagerLike;
const activeRunSnapshot = getActiveEmbeddedRunSnapshot(sessionId);
const imageLimits = resolveImageSanitizationLimits(params.cfg);
let messages: Message[] = [];
@@ -343,32 +304,14 @@ export async function runBtwSideQuestion(
inFlightPrompt = activeRunSnapshot.inFlightPrompt;
} else if (activeRunSnapshot) {
inFlightPrompt = activeRunSnapshot.inFlightPrompt;
if (activeRunSnapshot.transcriptLeafId && sessionManager.branch) {
try {
sessionManager.branch(activeRunSnapshot.transcriptLeafId);
} catch (error) {
diag.debug(
`btw snapshot leaf unavailable: sessionId=${sessionId} leaf=${activeRunSnapshot.transcriptLeafId} err=${String(error)}`,
);
sessionManager.resetLeaf?.();
}
} else {
sessionManager.resetLeaf?.();
}
} else {
const leafEntry = sessionManager.getLeafEntry?.();
if (leafEntry?.type === "message" && leafEntry.message?.role === "user") {
if (leafEntry.parentId && sessionManager.branch) {
sessionManager.branch(leafEntry.parentId);
} else {
sessionManager.resetLeaf?.();
}
}
}
if (messages.length === 0) {
const sessionContext = sessionManager.buildSessionContext();
messages = await toSimpleContextMessages({
messages: Array.isArray(sessionContext.messages) ? sessionContext.messages : [],
messages: await readBtwTranscriptMessages({
sessionFile,
sessionId,
snapshotLeafId: activeRunSnapshot?.transcriptLeafId,
}),
imageLimits,
});
}