feat(agent): add /btw side questions

This commit is contained in:
Nimrod Gutman
2026-03-13 22:33:27 +02:00
parent 5682ec37fa
commit b28ad98a4c
13 changed files with 1099 additions and 1 deletions

428
src/agents/btw.test.ts Normal file
View File

@@ -0,0 +1,428 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { SessionEntry } from "../config/sessions.js";
const streamSimpleMock = vi.fn();
const appendCustomEntryMock = vi.fn();
const buildSessionContextMock = vi.fn();
const getLeafEntryMock = vi.fn();
const branchMock = vi.fn();
const resetLeafMock = vi.fn();
const ensureOpenClawModelsJsonMock = vi.fn();
const discoverAuthStorageMock = vi.fn();
const discoverModelsMock = vi.fn();
const resolveModelWithRegistryMock = vi.fn();
const getApiKeyForModelMock = vi.fn();
const requireApiKeyMock = vi.fn();
const acquireSessionWriteLockMock = vi.fn();
const resolveSessionAuthProfileOverrideMock = vi.fn();
const getActiveEmbeddedRunSnapshotMock = vi.fn();
const waitForEmbeddedPiRunEndMock = vi.fn();
vi.mock("@mariozechner/pi-ai", () => ({
streamSimple: (...args: unknown[]) => streamSimpleMock(...args),
}));
vi.mock("@mariozechner/pi-coding-agent", () => ({
SessionManager: {
open: () => ({
getLeafEntry: getLeafEntryMock,
branch: branchMock,
resetLeaf: resetLeafMock,
buildSessionContext: buildSessionContextMock,
appendCustomEntry: appendCustomEntryMock,
}),
},
}));
vi.mock("./models-config.js", () => ({
ensureOpenClawModelsJson: (...args: unknown[]) => ensureOpenClawModelsJsonMock(...args),
}));
vi.mock("./pi-model-discovery.js", () => ({
discoverAuthStorage: (...args: unknown[]) => discoverAuthStorageMock(...args),
discoverModels: (...args: unknown[]) => discoverModelsMock(...args),
}));
vi.mock("./pi-embedded-runner/model.js", () => ({
resolveModelWithRegistry: (...args: unknown[]) => resolveModelWithRegistryMock(...args),
}));
vi.mock("./model-auth.js", () => ({
getApiKeyForModel: (...args: unknown[]) => getApiKeyForModelMock(...args),
requireApiKey: (...args: unknown[]) => requireApiKeyMock(...args),
}));
vi.mock("./session-write-lock.js", () => ({
acquireSessionWriteLock: (...args: unknown[]) => acquireSessionWriteLockMock(...args),
}));
vi.mock("./pi-embedded-runner/runs.js", () => ({
getActiveEmbeddedRunSnapshot: (...args: unknown[]) => getActiveEmbeddedRunSnapshotMock(...args),
waitForEmbeddedPiRunEnd: (...args: unknown[]) => waitForEmbeddedPiRunEndMock(...args),
}));
vi.mock("./auth-profiles/session-override.js", () => ({
resolveSessionAuthProfileOverride: (...args: unknown[]) =>
resolveSessionAuthProfileOverrideMock(...args),
}));
const { BTW_CUSTOM_TYPE, runBtwSideQuestion } = await import("./btw.js");
function makeAsyncEvents(events: unknown[]) {
return {
async *[Symbol.asyncIterator]() {
for (const event of events) {
yield event;
}
},
};
}
function createSessionEntry(overrides: Partial<SessionEntry> = {}): SessionEntry {
return {
sessionId: "session-1",
sessionFile: "session-1.jsonl",
updatedAt: Date.now(),
...overrides,
};
}
describe("runBtwSideQuestion", () => {
beforeEach(() => {
streamSimpleMock.mockReset();
appendCustomEntryMock.mockReset();
buildSessionContextMock.mockReset();
getLeafEntryMock.mockReset();
branchMock.mockReset();
resetLeafMock.mockReset();
ensureOpenClawModelsJsonMock.mockReset();
discoverAuthStorageMock.mockReset();
discoverModelsMock.mockReset();
resolveModelWithRegistryMock.mockReset();
getApiKeyForModelMock.mockReset();
requireApiKeyMock.mockReset();
acquireSessionWriteLockMock.mockReset();
resolveSessionAuthProfileOverrideMock.mockReset();
getActiveEmbeddedRunSnapshotMock.mockReset();
waitForEmbeddedPiRunEndMock.mockReset();
buildSessionContextMock.mockReturnValue({
messages: [{ role: "user", content: [{ type: "text", text: "hi" }], timestamp: 1 }],
});
getLeafEntryMock.mockReturnValue(null);
resolveModelWithRegistryMock.mockReturnValue({
provider: "anthropic",
id: "claude-sonnet-4-5",
api: "anthropic-messages",
});
getApiKeyForModelMock.mockResolvedValue({ apiKey: "secret", mode: "api-key", source: "test" });
requireApiKeyMock.mockReturnValue("secret");
acquireSessionWriteLockMock.mockResolvedValue({
release: vi.fn().mockResolvedValue(undefined),
});
resolveSessionAuthProfileOverrideMock.mockResolvedValue("profile-1");
getActiveEmbeddedRunSnapshotMock.mockReturnValue(undefined);
waitForEmbeddedPiRunEndMock.mockResolvedValue(true);
});
it("streams blocks and persists a non-context custom entry", async () => {
const onBlockReply = vi.fn().mockResolvedValue(undefined);
streamSimpleMock.mockReturnValue(
makeAsyncEvents([
{
type: "text_delta",
delta: "Side answer.",
partial: {
role: "assistant",
content: [],
provider: "anthropic",
model: "claude-sonnet-4-5",
},
},
{
type: "text_end",
content: "Side answer.",
contentIndex: 0,
partial: {
role: "assistant",
content: [],
provider: "anthropic",
model: "claude-sonnet-4-5",
},
},
{
type: "done",
reason: "stop",
message: {
role: "assistant",
content: [{ type: "text", text: "Side answer." }],
provider: "anthropic",
api: "anthropic-messages",
model: "claude-sonnet-4-5",
stopReason: "stop",
usage: {
input: 1,
output: 2,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 3,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
timestamp: Date.now(),
},
},
]),
);
const result = await runBtwSideQuestion({
cfg: {} as never,
agentDir: "/tmp/agent",
provider: "anthropic",
model: "claude-sonnet-4-5",
question: "What changed?",
sessionEntry: createSessionEntry(),
sessionStore: {},
sessionKey: "agent:main:main",
storePath: "/tmp/sessions.json",
resolvedThinkLevel: "low",
resolvedReasoningLevel: "off",
blockReplyChunking: {
minChars: 1,
maxChars: 200,
breakPreference: "paragraph",
},
resolvedBlockStreamingBreak: "text_end",
opts: { onBlockReply },
isNewSession: false,
});
expect(result).toBeUndefined();
expect(onBlockReply).toHaveBeenCalledWith({ text: "Side answer." });
expect(appendCustomEntryMock).toHaveBeenCalledWith(
BTW_CUSTOM_TYPE,
expect.objectContaining({
question: "What changed?",
answer: "Side answer.",
provider: "anthropic",
model: "claude-sonnet-4-5",
}),
);
});
it("returns a final payload when block streaming is unavailable", async () => {
streamSimpleMock.mockReturnValue(
makeAsyncEvents([
{
type: "done",
reason: "stop",
message: {
role: "assistant",
content: [{ type: "text", text: "Final answer." }],
provider: "anthropic",
api: "anthropic-messages",
model: "claude-sonnet-4-5",
stopReason: "stop",
usage: {
input: 1,
output: 2,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 3,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
timestamp: Date.now(),
},
},
]),
);
const result = await runBtwSideQuestion({
cfg: {} as never,
agentDir: "/tmp/agent",
provider: "anthropic",
model: "claude-sonnet-4-5",
question: "What changed?",
sessionEntry: createSessionEntry(),
resolvedReasoningLevel: "off",
opts: {},
isNewSession: false,
});
expect(result).toEqual({ text: "Final answer." });
});
it("fails when the current branch has no messages", async () => {
buildSessionContextMock.mockReturnValue({ messages: [] });
streamSimpleMock.mockReturnValue(makeAsyncEvents([]));
await expect(
runBtwSideQuestion({
cfg: {} as never,
agentDir: "/tmp/agent",
provider: "anthropic",
model: "claude-sonnet-4-5",
question: "What changed?",
sessionEntry: createSessionEntry(),
resolvedReasoningLevel: "off",
opts: {},
isNewSession: false,
}),
).rejects.toThrow("No active session context.");
});
it("branches away from an unresolved trailing user turn before building BTW context", async () => {
getLeafEntryMock.mockReturnValue({
type: "message",
parentId: "assistant-1",
message: { role: "user" },
});
streamSimpleMock.mockReturnValue(
makeAsyncEvents([
{
type: "done",
reason: "stop",
message: {
role: "assistant",
content: [{ type: "text", text: "323" }],
provider: "anthropic",
api: "anthropic-messages",
model: "claude-sonnet-4-5",
stopReason: "stop",
usage: {
input: 1,
output: 2,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 3,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
timestamp: Date.now(),
},
},
]),
);
const result = await runBtwSideQuestion({
cfg: {} as never,
agentDir: "/tmp/agent",
provider: "anthropic",
model: "claude-sonnet-4-5",
question: "What is 17 * 19?",
sessionEntry: createSessionEntry(),
resolvedReasoningLevel: "off",
opts: {},
isNewSession: false,
});
expect(branchMock).toHaveBeenCalledWith("assistant-1");
expect(resetLeafMock).not.toHaveBeenCalled();
expect(buildSessionContextMock).toHaveBeenCalledTimes(1);
expect(result).toEqual({ text: "323" });
});
it("branches to the active run snapshot leaf when the session is busy", async () => {
getActiveEmbeddedRunSnapshotMock.mockReturnValue({
transcriptLeafId: "assistant-seed",
});
streamSimpleMock.mockReturnValue(
makeAsyncEvents([
{
type: "done",
reason: "stop",
message: {
role: "assistant",
content: [{ type: "text", text: "323" }],
provider: "anthropic",
api: "anthropic-messages",
model: "claude-sonnet-4-5",
stopReason: "stop",
usage: {
input: 1,
output: 2,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 3,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
timestamp: Date.now(),
},
},
]),
);
const result = await runBtwSideQuestion({
cfg: {} as never,
agentDir: "/tmp/agent",
provider: "anthropic",
model: "claude-sonnet-4-5",
question: "What is 17 * 19?",
sessionEntry: createSessionEntry(),
resolvedReasoningLevel: "off",
opts: {},
isNewSession: false,
});
expect(branchMock).toHaveBeenCalledWith("assistant-seed");
expect(getLeafEntryMock).not.toHaveBeenCalled();
expect(result).toEqual({ text: "323" });
});
it("returns the BTW answer and retries transcript persistence after a session lock", async () => {
acquireSessionWriteLockMock
.mockRejectedValueOnce(
new Error("session file locked (timeout 250ms): pid=123 /tmp/session.lock"),
)
.mockResolvedValueOnce({
release: vi.fn().mockResolvedValue(undefined),
});
streamSimpleMock.mockReturnValue(
makeAsyncEvents([
{
type: "done",
reason: "stop",
message: {
role: "assistant",
content: [{ type: "text", text: "323" }],
provider: "anthropic",
api: "anthropic-messages",
model: "claude-sonnet-4-5",
stopReason: "stop",
usage: {
input: 1,
output: 2,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 3,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
timestamp: Date.now(),
},
},
]),
);
const result = await runBtwSideQuestion({
cfg: {} as never,
agentDir: "/tmp/agent",
provider: "anthropic",
model: "claude-sonnet-4-5",
question: "What is 17 * 19?",
sessionEntry: createSessionEntry(),
resolvedReasoningLevel: "off",
opts: {},
isNewSession: false,
});
expect(result).toEqual({ text: "323" });
expect(waitForEmbeddedPiRunEndMock).toHaveBeenCalledWith("session-1", 30000);
await vi.waitFor(() => {
expect(appendCustomEntryMock).toHaveBeenCalledWith(
BTW_CUSTOM_TYPE,
expect.objectContaining({
question: "What is 17 * 19?",
answer: "323",
}),
);
});
});
});

426
src/agents/btw.ts Normal file
View File

@@ -0,0 +1,426 @@
import {
streamSimple,
type Api,
type AssistantMessageEvent,
type ThinkingLevel as SimpleThinkingLevel,
type Message,
type Model,
} from "@mariozechner/pi-ai";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import type { ReasoningLevel, ThinkLevel } from "../auto-reply/thinking.js";
import type { GetReplyOptions, ReplyPayload } from "../auto-reply/types.js";
import type { OpenClawConfig } from "../config/config.js";
import {
resolveSessionFilePath,
resolveSessionFilePathOptions,
type SessionEntry,
} from "../config/sessions.js";
import { resolveSessionAuthProfileOverride } from "./auth-profiles/session-override.js";
import { getApiKeyForModel, requireApiKey } from "./model-auth.js";
import { ensureOpenClawModelsJson } from "./models-config.js";
import { EmbeddedBlockChunker, type BlockReplyChunking } from "./pi-embedded-block-chunker.js";
import { resolveModelWithRegistry } from "./pi-embedded-runner/model.js";
import {
getActiveEmbeddedRunSnapshot,
waitForEmbeddedPiRunEnd,
} from "./pi-embedded-runner/runs.js";
import { mapThinkingLevel } from "./pi-embedded-runner/utils.js";
import { discoverAuthStorage, discoverModels } from "./pi-model-discovery.js";
import { acquireSessionWriteLock } from "./session-write-lock.js";
const BTW_CUSTOM_TYPE = "openclaw:btw";
const BTW_PERSIST_TIMEOUT_MS = 250;
const BTW_PERSIST_RETRY_WAIT_MS = 30_000;
const BTW_PERSIST_RETRY_LOCK_MS = 10_000;
type SessionManagerLike = {
getLeafEntry?: () => {
id?: string;
type?: string;
parentId?: string | null;
message?: { role?: string };
} | null;
branch?: (parentId: string) => void;
resetLeaf?: () => void;
buildSessionContext: () => { messages?: unknown[] };
};
type BtwCustomEntryData = {
timestamp: number;
question: string;
answer: string;
provider: string;
model: string;
thinkingLevel: ThinkLevel | "off";
reasoningLevel: ReasoningLevel;
sessionKey?: string;
authProfileId?: string;
authProfileIdSource?: "auto" | "user";
usage?: unknown;
};
async function appendBtwCustomEntry(params: {
sessionFile: string;
timeoutMs: number;
entry: BtwCustomEntryData;
}) {
const lock = await acquireSessionWriteLock({
sessionFile: params.sessionFile,
timeoutMs: params.timeoutMs,
allowReentrant: false,
});
try {
const persisted = SessionManager.open(params.sessionFile);
persisted.appendCustomEntry(BTW_CUSTOM_TYPE, params.entry);
} finally {
await lock.release();
}
}
function isSessionLockError(error: unknown): boolean {
const message = error instanceof Error ? error.message : String(error);
return message.includes("session file locked");
}
function deferBtwCustomEntryPersist(params: {
sessionId: string;
sessionFile: string;
entry: BtwCustomEntryData;
}) {
void (async () => {
try {
await waitForEmbeddedPiRunEnd(params.sessionId, BTW_PERSIST_RETRY_WAIT_MS);
await appendBtwCustomEntry({
sessionFile: params.sessionFile,
timeoutMs: BTW_PERSIST_RETRY_LOCK_MS,
entry: params.entry,
});
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
console.warn(`[btw] skipped transcript persistence: ${message}`);
}
})();
}
function collectTextContent(content: Array<{ type?: string; text?: string }>): string {
return content
.filter((part): part is { type: "text"; text: string } => part.type === "text")
.map((part) => part.text)
.join("");
}
function collectThinkingContent(content: Array<{ type?: string; thinking?: string }>): string {
return content
.filter((part): part is { type: "thinking"; thinking: string } => part.type === "thinking")
.map((part) => part.thinking)
.join("");
}
function toSimpleContextMessages(messages: unknown[]): Message[] {
return messages.filter((message): message is Message => {
if (!message || typeof message !== "object") {
return false;
}
const role = (message as { role?: unknown }).role;
return role === "user" || role === "assistant" || role === "toolResult";
});
}
function resolveSimpleThinkingLevel(level?: ThinkLevel): SimpleThinkingLevel | undefined {
if (!level || level === "off") {
return undefined;
}
return mapThinkingLevel(level) as SimpleThinkingLevel;
}
function resolveSessionTranscriptPath(params: {
sessionId: string;
sessionEntry?: SessionEntry;
sessionKey?: string;
storePath?: string;
}): string | undefined {
try {
const agentId = params.sessionKey?.split(":")[1];
const pathOpts = resolveSessionFilePathOptions({
agentId,
storePath: params.storePath,
});
return resolveSessionFilePath(params.sessionId, params.sessionEntry, pathOpts);
} catch {
return undefined;
}
}
async function resolveRuntimeModel(params: {
cfg: OpenClawConfig;
provider: string;
model: string;
agentDir: string;
sessionEntry?: SessionEntry;
sessionStore?: Record<string, SessionEntry>;
sessionKey?: string;
storePath?: string;
isNewSession: boolean;
}): Promise<{
model: Model<Api>;
authProfileId?: string;
authProfileIdSource?: "auto" | "user";
}> {
await ensureOpenClawModelsJson(params.cfg, params.agentDir);
const authStorage = discoverAuthStorage(params.agentDir);
const modelRegistry = discoverModels(authStorage, params.agentDir);
const model = resolveModelWithRegistry({
provider: params.provider,
modelId: params.model,
modelRegistry,
cfg: params.cfg,
});
if (!model) {
throw new Error(`Unknown model: ${params.provider}/${params.model}`);
}
const authProfileId = await resolveSessionAuthProfileOverride({
cfg: params.cfg,
provider: params.provider,
agentDir: params.agentDir,
sessionEntry: params.sessionEntry,
sessionStore: params.sessionStore,
sessionKey: params.sessionKey,
storePath: params.storePath,
isNewSession: params.isNewSession,
});
return {
model,
authProfileId,
authProfileIdSource: params.sessionEntry?.authProfileOverrideSource,
};
}
type RunBtwSideQuestionParams = {
cfg: OpenClawConfig;
agentDir: string;
provider: string;
model: string;
question: string;
sessionEntry: SessionEntry;
sessionStore?: Record<string, SessionEntry>;
sessionKey?: string;
storePath?: string;
resolvedThinkLevel?: ThinkLevel;
resolvedReasoningLevel: ReasoningLevel;
blockReplyChunking?: BlockReplyChunking;
resolvedBlockStreamingBreak?: "text_end" | "message_end";
opts?: GetReplyOptions;
isNewSession: boolean;
};
export async function runBtwSideQuestion(
params: RunBtwSideQuestionParams,
): Promise<ReplyPayload | undefined> {
const sessionId = params.sessionEntry.sessionId?.trim();
if (!sessionId) {
throw new Error("No active session context.");
}
const sessionFile = resolveSessionTranscriptPath({
sessionId,
sessionEntry: params.sessionEntry,
sessionKey: params.sessionKey,
storePath: params.storePath,
});
if (!sessionFile) {
throw new Error("No active session transcript.");
}
const sessionManager = SessionManager.open(sessionFile) as SessionManagerLike;
const activeRunSnapshot = getActiveEmbeddedRunSnapshot(sessionId);
if (activeRunSnapshot) {
if (activeRunSnapshot.transcriptLeafId && sessionManager.branch) {
sessionManager.branch(activeRunSnapshot.transcriptLeafId);
} else {
sessionManager.resetLeaf?.();
}
} else {
const leafEntry = sessionManager.getLeafEntry?.();
if (leafEntry?.type === "message" && leafEntry.message?.role === "user") {
if (leafEntry.parentId && sessionManager.branch) {
sessionManager.branch(leafEntry.parentId);
} else {
sessionManager.resetLeaf?.();
}
}
}
const sessionContext = sessionManager.buildSessionContext();
const messages = toSimpleContextMessages(
Array.isArray(sessionContext.messages) ? sessionContext.messages : [],
);
if (messages.length === 0) {
throw new Error("No active session context.");
}
const { model, authProfileId, authProfileIdSource } = await resolveRuntimeModel({
cfg: params.cfg,
provider: params.provider,
model: params.model,
agentDir: params.agentDir,
sessionEntry: params.sessionEntry,
sessionStore: params.sessionStore,
sessionKey: params.sessionKey,
storePath: params.storePath,
isNewSession: params.isNewSession,
});
const apiKeyInfo = await getApiKeyForModel({
model,
cfg: params.cfg,
profileId: authProfileId,
agentDir: params.agentDir,
});
const apiKey = requireApiKey(apiKeyInfo, model.provider);
const chunker =
params.opts?.onBlockReply && params.blockReplyChunking
? new EmbeddedBlockChunker(params.blockReplyChunking)
: undefined;
let emittedBlocks = 0;
let blockEmitChain: Promise<void> = Promise.resolve();
let answerText = "";
let reasoningText = "";
let assistantStarted = false;
let sawTextEvent = false;
const emitBlockChunk = async (text: string) => {
const trimmed = text.trim();
if (!trimmed || !params.opts?.onBlockReply) {
return;
}
emittedBlocks += 1;
blockEmitChain = blockEmitChain.then(async () => {
await params.opts?.onBlockReply?.({ text });
});
await blockEmitChain;
};
const stream = streamSimple(
model,
{
messages: [
...messages,
{
role: "user",
content: [{ type: "text", text: params.question }],
timestamp: Date.now(),
},
],
},
{
apiKey,
reasoning: resolveSimpleThinkingLevel(params.resolvedThinkLevel),
signal: params.opts?.abortSignal,
},
);
let finalEvent:
| Extract<AssistantMessageEvent, { type: "done" }>
| Extract<AssistantMessageEvent, { type: "error" }>
| undefined;
for await (const event of stream) {
finalEvent = event.type === "done" || event.type === "error" ? event : finalEvent;
if (!assistantStarted && (event.type === "text_start" || event.type === "start")) {
assistantStarted = true;
await params.opts?.onAssistantMessageStart?.();
}
if (event.type === "text_delta") {
sawTextEvent = true;
answerText += event.delta;
chunker?.append(event.delta);
if (chunker && params.resolvedBlockStreamingBreak === "text_end") {
chunker.drain({ force: false, emit: (chunk) => void emitBlockChunk(chunk) });
}
continue;
}
if (event.type === "text_end" && chunker && params.resolvedBlockStreamingBreak === "text_end") {
chunker.drain({ force: true, emit: (chunk) => void emitBlockChunk(chunk) });
continue;
}
if (event.type === "thinking_delta") {
reasoningText += event.delta;
if (params.resolvedReasoningLevel !== "off") {
await params.opts?.onReasoningStream?.({ text: reasoningText, isReasoning: true });
}
continue;
}
if (event.type === "thinking_end" && params.resolvedReasoningLevel !== "off") {
await params.opts?.onReasoningEnd?.();
}
}
if (chunker && params.resolvedBlockStreamingBreak !== "text_end" && chunker.hasBuffered()) {
chunker.drain({ force: true, emit: (chunk) => void emitBlockChunk(chunk) });
}
await blockEmitChain;
if (finalEvent?.type === "error") {
const message = collectTextContent(finalEvent.error.content);
throw new Error(message || finalEvent.error.errorMessage || "BTW failed.");
}
const finalMessage = finalEvent?.type === "done" ? finalEvent.message : undefined;
if (finalMessage) {
if (!sawTextEvent) {
answerText = collectTextContent(finalMessage.content);
}
if (!reasoningText) {
reasoningText = collectThinkingContent(finalMessage.content);
}
}
const answer = answerText.trim();
if (!answer) {
throw new Error("No BTW response generated.");
}
const customEntry = {
timestamp: Date.now(),
question: params.question,
answer,
provider: model.provider,
model: model.id,
thinkingLevel: params.resolvedThinkLevel ?? "off",
reasoningLevel: params.resolvedReasoningLevel,
sessionKey: params.sessionKey,
authProfileId,
authProfileIdSource,
usage: finalMessage?.usage,
} satisfies BtwCustomEntryData;
try {
await appendBtwCustomEntry({
sessionFile,
timeoutMs: BTW_PERSIST_TIMEOUT_MS,
entry: customEntry,
});
} catch (error) {
if (!isSessionLockError(error)) {
throw error;
}
deferBtwCustomEntryPersist({
sessionId,
sessionFile,
entry: customEntry,
});
}
if (emittedBlocks > 0) {
return undefined;
}
return { text: answer };
}
export { BTW_CUSTOM_TYPE };

View File

@@ -111,6 +111,7 @@ import {
clearActiveEmbeddedRun,
type EmbeddedPiQueueHandle,
setActiveEmbeddedRun,
updateActiveEmbeddedRunSnapshot,
} from "../runs.js";
import { buildEmbeddedSandboxInfo } from "../sandbox-info.js";
import { prewarmSessionFile, trackSessionManagerAccess } from "../session-manager-cache.js";
@@ -2376,6 +2377,10 @@ export async function runEmbeddedAttempt(
`runId=${params.runId} sessionId=${params.sessionId}`,
);
}
updateActiveEmbeddedRunSnapshot(params.sessionId, {
transcriptLeafId:
(sessionManager.getLeafEntry() as { id?: string } | null | undefined)?.id ?? null,
});
try {
// Idempotent cleanup for legacy sessions with persisted image payloads.

View File

@@ -4,7 +4,9 @@ import {
__testing,
abortEmbeddedPiRun,
clearActiveEmbeddedRun,
getActiveEmbeddedRunSnapshot,
setActiveEmbeddedRun,
updateActiveEmbeddedRunSnapshot,
waitForActiveEmbeddedRuns,
} from "./runs.js";
@@ -137,4 +139,24 @@ describe("pi-embedded runner run registry", () => {
runsB.__testing.resetActiveEmbeddedRuns();
}
});
it("tracks and clears per-session transcript snapshots for active runs", () => {
const handle = {
queueMessage: async () => {},
isStreaming: () => true,
isCompacting: () => false,
abort: vi.fn(),
};
setActiveEmbeddedRun("session-snapshot", handle);
updateActiveEmbeddedRunSnapshot("session-snapshot", {
transcriptLeafId: "assistant-1",
});
expect(getActiveEmbeddedRunSnapshot("session-snapshot")).toEqual({
transcriptLeafId: "assistant-1",
});
clearActiveEmbeddedRun("session-snapshot", handle);
expect(getActiveEmbeddedRunSnapshot("session-snapshot")).toBeUndefined();
});
});

View File

@@ -12,6 +12,10 @@ type EmbeddedPiQueueHandle = {
abort: () => void;
};
export type ActiveEmbeddedRunSnapshot = {
transcriptLeafId: string | null;
};
type EmbeddedRunWaiter = {
resolve: (ended: boolean) => void;
timer: NodeJS.Timeout;
@@ -25,9 +29,11 @@ const EMBEDDED_RUN_STATE_KEY = Symbol.for("openclaw.embeddedRunState");
const embeddedRunState = resolveGlobalSingleton(EMBEDDED_RUN_STATE_KEY, () => ({
activeRuns: new Map<string, EmbeddedPiQueueHandle>(),
snapshots: new Map<string, ActiveEmbeddedRunSnapshot>(),
waiters: new Map<string, Set<EmbeddedRunWaiter>>(),
}));
const ACTIVE_EMBEDDED_RUNS = embeddedRunState.activeRuns;
const ACTIVE_EMBEDDED_RUN_SNAPSHOTS = embeddedRunState.snapshots;
const EMBEDDED_RUN_WAITERS = embeddedRunState.waiters;
export function queueEmbeddedPiMessage(sessionId: string, text: string): boolean {
@@ -135,6 +141,12 @@ export function getActiveEmbeddedRunCount(): number {
return ACTIVE_EMBEDDED_RUNS.size;
}
export function getActiveEmbeddedRunSnapshot(
sessionId: string,
): ActiveEmbeddedRunSnapshot | undefined {
return ACTIVE_EMBEDDED_RUN_SNAPSHOTS.get(sessionId);
}
/**
* Wait for active embedded runs to drain.
*
@@ -230,6 +242,16 @@ export function setActiveEmbeddedRun(
}
}
export function updateActiveEmbeddedRunSnapshot(
sessionId: string,
snapshot: ActiveEmbeddedRunSnapshot,
) {
if (!ACTIVE_EMBEDDED_RUNS.has(sessionId)) {
return;
}
ACTIVE_EMBEDDED_RUN_SNAPSHOTS.set(sessionId, snapshot);
}
export function clearActiveEmbeddedRun(
sessionId: string,
handle: EmbeddedPiQueueHandle,
@@ -237,6 +259,7 @@ export function clearActiveEmbeddedRun(
) {
if (ACTIVE_EMBEDDED_RUNS.get(sessionId) === handle) {
ACTIVE_EMBEDDED_RUNS.delete(sessionId);
ACTIVE_EMBEDDED_RUN_SNAPSHOTS.delete(sessionId);
logSessionStateChange({ sessionId, sessionKey, state: "idle", reason: "run_completed" });
if (!sessionId.startsWith("probe-")) {
diag.debug(`run cleared: sessionId=${sessionId} totalActive=${ACTIVE_EMBEDDED_RUNS.size}`);
@@ -257,6 +280,7 @@ export const __testing = {
}
EMBEDDED_RUN_WAITERS.clear();
ACTIVE_EMBEDDED_RUNS.clear();
ACTIVE_EMBEDDED_RUN_SNAPSHOTS.clear();
},
};

View File

@@ -196,6 +196,14 @@ function buildChatCommands(): ChatCommandDefinition[] {
acceptsArgs: true,
category: "status",
}),
defineChatCommand({
key: "btw",
nativeName: "btw",
description: "Ask a side question without changing future session context.",
textAlias: "/btw",
acceptsArgs: true,
category: "tools",
}),
defineChatCommand({
key: "export-session",
nativeName: "export-session",

View File

@@ -0,0 +1,93 @@
import { describe, expect, it, vi, beforeEach } from "vitest";
import type { OpenClawConfig } from "../../config/config.js";
import { buildCommandTestParams } from "./commands.test-harness.js";
const runBtwSideQuestionMock = vi.fn();
vi.mock("../../agents/btw.js", () => ({
runBtwSideQuestion: (...args: unknown[]) => runBtwSideQuestionMock(...args),
}));
const { handleBtwCommand } = await import("./commands-btw.js");
function buildParams(commandBody: string) {
const cfg = {
commands: { text: true },
channels: { whatsapp: { allowFrom: ["*"] } },
} as OpenClawConfig;
return buildCommandTestParams(commandBody, cfg, undefined, { workspaceDir: "/tmp/workspace" });
}
describe("handleBtwCommand", () => {
beforeEach(() => {
runBtwSideQuestionMock.mockReset();
});
it("returns usage when the side question is missing", async () => {
const result = await handleBtwCommand(buildParams("/btw"), true);
expect(result).toEqual({
shouldContinue: false,
reply: { text: "Usage: /btw <side question>" },
});
});
it("requires an active session context", async () => {
const params = buildParams("/btw what changed?");
params.sessionEntry = undefined;
const result = await handleBtwCommand(params, true);
expect(result).toEqual({
shouldContinue: false,
reply: { text: "⚠️ /btw requires an active session with existing context." },
});
});
it("still delegates while the session is actively running", async () => {
const params = buildParams("/btw what changed?");
params.agentDir = "/tmp/agent";
params.sessionEntry = {
sessionId: "session-1",
updatedAt: Date.now(),
};
runBtwSideQuestionMock.mockResolvedValue({ text: "snapshot answer" });
const result = await handleBtwCommand(params, true);
expect(runBtwSideQuestionMock).toHaveBeenCalledWith(
expect.objectContaining({
question: "what changed?",
sessionEntry: params.sessionEntry,
}),
);
expect(result).toEqual({
shouldContinue: false,
reply: { text: "snapshot answer" },
});
});
it("delegates to the side-question runner", async () => {
const params = buildParams("/btw what changed?");
params.agentDir = "/tmp/agent";
params.sessionEntry = {
sessionId: "session-1",
updatedAt: Date.now(),
};
runBtwSideQuestionMock.mockResolvedValue({ text: "nothing important" });
const result = await handleBtwCommand(params, true);
expect(runBtwSideQuestionMock).toHaveBeenCalledWith(
expect.objectContaining({
question: "what changed?",
agentDir: "/tmp/agent",
sessionEntry: params.sessionEntry,
}),
);
expect(result).toEqual({
shouldContinue: false,
reply: { text: "nothing important" },
});
});
});

View File

@@ -0,0 +1,67 @@
import { runBtwSideQuestion } from "../../agents/btw.js";
import type { CommandHandler } from "./commands-types.js";
const BTW_USAGE = "Usage: /btw <side question>";
export const handleBtwCommand: CommandHandler = async (params) => {
const match = params.command.commandBodyNormalized.match(/^\/btw(?:\s+(.*))?$/i);
if (!match) {
return null;
}
const question = match[1]?.trim() ?? "";
if (!question) {
return {
shouldContinue: false,
reply: { text: BTW_USAGE },
};
}
if (!params.sessionEntry?.sessionId) {
return {
shouldContinue: false,
reply: { text: "⚠️ /btw requires an active session with existing context." },
};
}
if (!params.agentDir) {
return {
shouldContinue: false,
reply: {
text: "⚠️ /btw is unavailable because the active agent directory could not be resolved.",
},
};
}
try {
const reply = await runBtwSideQuestion({
cfg: params.cfg,
agentDir: params.agentDir,
provider: params.provider,
model: params.model,
question,
sessionEntry: params.sessionEntry,
sessionStore: params.sessionStore,
sessionKey: params.sessionKey,
storePath: params.storePath,
resolvedThinkLevel: params.resolvedThinkLevel,
resolvedReasoningLevel: params.resolvedReasoningLevel,
blockReplyChunking: params.blockReplyChunking,
resolvedBlockStreamingBreak: params.resolvedBlockStreamingBreak,
opts: params.opts,
isNewSession: false,
});
return {
shouldContinue: false,
reply,
};
} catch (error) {
const message = error instanceof Error ? error.message.trim() : "";
return {
shouldContinue: false,
reply: {
text: `⚠️ /btw failed${message ? `: ${message}` : "."}`,
},
};
}
};

View File

@@ -11,6 +11,7 @@ import { resolveBoundAcpThreadSessionKey } from "./commands-acp/targets.js";
import { handleAllowlistCommand } from "./commands-allowlist.js";
import { handleApproveCommand } from "./commands-approve.js";
import { handleBashCommand } from "./commands-bash.js";
import { handleBtwCommand } from "./commands-btw.js";
import { handleCompactCommand } from "./commands-compact.js";
import { handleConfigCommand, handleDebugCommand } from "./commands-config.js";
import {
@@ -174,6 +175,7 @@ export async function handleCommands(params: HandleCommandsParams): Promise<Comm
HANDLERS = [
// Plugin commands are processed first, before built-in commands
handlePluginCommand,
handleBtwCommand,
handleBashCommand,
handleActivationCommand,
handleSendPolicyCommand,

View File

@@ -4,7 +4,7 @@ import type { OpenClawConfig } from "../../config/config.js";
import type { SessionEntry, SessionScope } from "../../config/sessions.js";
import type { MsgContext } from "../templating.js";
import type { ElevatedLevel, ReasoningLevel, ThinkLevel, VerboseLevel } from "../thinking.js";
import type { ReplyPayload } from "../types.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
import type { InlineDirectives } from "./directive-handling.js";
export type CommandContext = {
@@ -44,11 +44,19 @@ export type HandleCommandsParams = {
storePath?: string;
sessionScope?: SessionScope;
workspaceDir: string;
opts?: GetReplyOptions;
defaultGroupActivation: () => "always" | "mention";
resolvedThinkLevel?: ThinkLevel;
resolvedVerboseLevel: VerboseLevel;
resolvedReasoningLevel: ReasoningLevel;
resolvedElevatedLevel?: ElevatedLevel;
blockReplyChunking?: {
minChars: number;
maxChars: number;
breakPreference: "paragraph" | "newline" | "sentence";
flushOnParagraph?: boolean;
};
resolvedBlockStreamingBreak?: "text_end" | "message_end";
resolveDefaultThinkingLevel: () => Promise<ThinkLevel | undefined>;
provider: string;
model: string;

View File

@@ -113,6 +113,13 @@ export async function handleInlineActions(params: {
resolvedVerboseLevel: VerboseLevel | undefined;
resolvedReasoningLevel: ReasoningLevel;
resolvedElevatedLevel: ElevatedLevel;
blockReplyChunking?: {
minChars: number;
maxChars: number;
breakPreference: "paragraph" | "newline" | "sentence";
flushOnParagraph?: boolean;
};
resolvedBlockStreamingBreak?: "text_end" | "message_end";
resolveDefaultThinkingLevel: Awaited<
ReturnType<typeof createModelSelectionState>
>["resolveDefaultThinkingLevel"];
@@ -152,6 +159,8 @@ export async function handleInlineActions(params: {
resolvedVerboseLevel,
resolvedReasoningLevel,
resolvedElevatedLevel,
blockReplyChunking,
resolvedBlockStreamingBreak,
resolveDefaultThinkingLevel,
provider,
model,
@@ -357,11 +366,14 @@ export async function handleInlineActions(params: {
storePath,
sessionScope,
workspaceDir,
opts,
defaultGroupActivation: defaultActivation,
resolvedThinkLevel,
resolvedVerboseLevel: resolvedVerboseLevel ?? "off",
resolvedReasoningLevel,
resolvedElevatedLevel,
blockReplyChunking,
resolvedBlockStreamingBreak,
resolveDefaultThinkingLevel,
provider,
model,

View File

@@ -332,6 +332,8 @@ export async function getReplyFromConfig(
resolvedVerboseLevel,
resolvedReasoningLevel,
resolvedElevatedLevel,
blockReplyChunking,
resolvedBlockStreamingBreak,
resolveDefaultThinkingLevel: modelState.resolveDefaultThinkingLevel,
provider,
model,

View File

@@ -37,6 +37,7 @@ const RESERVED_COMMANDS = new Set([
"status",
"whoami",
"context",
"btw",
// Session management
"stop",
"restart",