Add /btw side questions (#45444)

* feat(agent): add /btw side questions

* fix(agent): gate and log /btw reviews

* feat(btw): isolate side-question delivery

* test(reply): update route reply runtime mocks

* fix(btw): complete side-result delivery across clients

* fix(gateway): handle streamed btw side results

* fix(telegram): unblock btw side questions

* fix(reply): make external btw replies explicit

* fix(chat): keep btw side results ephemeral in internal history

* fix(btw): address remaining review feedback

* fix(chat): preserve btw history on mobile refresh

* fix(acp): keep btw replies out of prompt history

* refactor(btw): narrow side questions to live channels

* fix(btw): preserve channel typing indicators

* fix(btw): keep side questions isolated in chat

* fix(outbound): restore typed channel send deps

* fix(btw): avoid blocking replies on transcript persistence

* fix(btw): keep side questions fast

* docs(commands): document btw slash command

* docs(changelog): add btw side questions entry

* test(outbound): align session transcript mocks
This commit is contained in:
Nimrod Gutman
2026-03-14 17:27:54 +02:00
committed by GitHub
parent b5ba2101c7
commit 9aac55d306
41 changed files with 4254 additions and 143 deletions

829
src/agents/btw.test.ts Normal file
View File

@@ -0,0 +1,829 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { SessionEntry } from "../config/sessions.js";
const streamSimpleMock = vi.fn();
const appendCustomEntryMock = vi.fn();
const buildSessionContextMock = vi.fn();
const getLeafEntryMock = vi.fn();
const branchMock = vi.fn();
const resetLeafMock = vi.fn();
const ensureOpenClawModelsJsonMock = vi.fn();
const discoverAuthStorageMock = vi.fn();
const discoverModelsMock = vi.fn();
const resolveModelWithRegistryMock = vi.fn();
const getApiKeyForModelMock = vi.fn();
const requireApiKeyMock = vi.fn();
const acquireSessionWriteLockMock = vi.fn();
const resolveSessionAuthProfileOverrideMock = vi.fn();
const getActiveEmbeddedRunSnapshotMock = vi.fn();
const waitForEmbeddedPiRunEndMock = vi.fn();
const diagWarnMock = vi.fn();
const diagDebugMock = vi.fn();
vi.mock("@mariozechner/pi-ai", () => ({
streamSimple: (...args: unknown[]) => streamSimpleMock(...args),
}));
vi.mock("@mariozechner/pi-coding-agent", () => ({
SessionManager: {
open: () => ({
getLeafEntry: getLeafEntryMock,
branch: branchMock,
resetLeaf: resetLeafMock,
buildSessionContext: buildSessionContextMock,
appendCustomEntry: appendCustomEntryMock,
}),
},
}));
vi.mock("./models-config.js", () => ({
ensureOpenClawModelsJson: (...args: unknown[]) => ensureOpenClawModelsJsonMock(...args),
}));
vi.mock("./pi-model-discovery.js", () => ({
discoverAuthStorage: (...args: unknown[]) => discoverAuthStorageMock(...args),
discoverModels: (...args: unknown[]) => discoverModelsMock(...args),
}));
vi.mock("./pi-embedded-runner/model.js", () => ({
resolveModelWithRegistry: (...args: unknown[]) => resolveModelWithRegistryMock(...args),
}));
vi.mock("./model-auth.js", () => ({
getApiKeyForModel: (...args: unknown[]) => getApiKeyForModelMock(...args),
requireApiKey: (...args: unknown[]) => requireApiKeyMock(...args),
}));
vi.mock("./session-write-lock.js", () => ({
acquireSessionWriteLock: (...args: unknown[]) => acquireSessionWriteLockMock(...args),
}));
vi.mock("./pi-embedded-runner/runs.js", () => ({
getActiveEmbeddedRunSnapshot: (...args: unknown[]) => getActiveEmbeddedRunSnapshotMock(...args),
waitForEmbeddedPiRunEnd: (...args: unknown[]) => waitForEmbeddedPiRunEndMock(...args),
}));
vi.mock("./auth-profiles/session-override.js", () => ({
resolveSessionAuthProfileOverride: (...args: unknown[]) =>
resolveSessionAuthProfileOverrideMock(...args),
}));
vi.mock("../logging/diagnostic.js", () => ({
diagnosticLogger: {
warn: (...args: unknown[]) => diagWarnMock(...args),
debug: (...args: unknown[]) => diagDebugMock(...args),
},
}));
const { BTW_CUSTOM_TYPE, runBtwSideQuestion } = await import("./btw.js");
function makeAsyncEvents(events: unknown[]) {
return {
async *[Symbol.asyncIterator]() {
for (const event of events) {
yield event;
}
},
};
}
function createSessionEntry(overrides: Partial<SessionEntry> = {}): SessionEntry {
return {
sessionId: "session-1",
sessionFile: "session-1.jsonl",
updatedAt: Date.now(),
...overrides,
};
}
describe("runBtwSideQuestion", () => {
beforeEach(() => {
streamSimpleMock.mockReset();
appendCustomEntryMock.mockReset();
buildSessionContextMock.mockReset();
getLeafEntryMock.mockReset();
branchMock.mockReset();
resetLeafMock.mockReset();
ensureOpenClawModelsJsonMock.mockReset();
discoverAuthStorageMock.mockReset();
discoverModelsMock.mockReset();
resolveModelWithRegistryMock.mockReset();
getApiKeyForModelMock.mockReset();
requireApiKeyMock.mockReset();
acquireSessionWriteLockMock.mockReset();
resolveSessionAuthProfileOverrideMock.mockReset();
getActiveEmbeddedRunSnapshotMock.mockReset();
waitForEmbeddedPiRunEndMock.mockReset();
diagWarnMock.mockReset();
diagDebugMock.mockReset();
buildSessionContextMock.mockReturnValue({
messages: [{ role: "user", content: [{ type: "text", text: "hi" }], timestamp: 1 }],
});
getLeafEntryMock.mockReturnValue(null);
resolveModelWithRegistryMock.mockReturnValue({
provider: "anthropic",
id: "claude-sonnet-4-5",
api: "anthropic-messages",
});
getApiKeyForModelMock.mockResolvedValue({ apiKey: "secret", mode: "api-key", source: "test" });
requireApiKeyMock.mockReturnValue("secret");
acquireSessionWriteLockMock.mockResolvedValue({
release: vi.fn().mockResolvedValue(undefined),
});
resolveSessionAuthProfileOverrideMock.mockResolvedValue("profile-1");
getActiveEmbeddedRunSnapshotMock.mockReturnValue(undefined);
waitForEmbeddedPiRunEndMock.mockResolvedValue(true);
});
it("streams blocks and persists a non-context custom entry", async () => {
const onBlockReply = vi.fn().mockResolvedValue(undefined);
streamSimpleMock.mockReturnValue(
makeAsyncEvents([
{
type: "text_delta",
delta: "Side answer.",
partial: {
role: "assistant",
content: [],
provider: "anthropic",
model: "claude-sonnet-4-5",
},
},
{
type: "text_end",
content: "Side answer.",
contentIndex: 0,
partial: {
role: "assistant",
content: [],
provider: "anthropic",
model: "claude-sonnet-4-5",
},
},
{
type: "done",
reason: "stop",
message: {
role: "assistant",
content: [{ type: "text", text: "Side answer." }],
provider: "anthropic",
api: "anthropic-messages",
model: "claude-sonnet-4-5",
stopReason: "stop",
usage: {
input: 1,
output: 2,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 3,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
timestamp: Date.now(),
},
},
]),
);
const result = await runBtwSideQuestion({
cfg: {} as never,
agentDir: "/tmp/agent",
provider: "anthropic",
model: "claude-sonnet-4-5",
question: "What changed?",
sessionEntry: createSessionEntry(),
sessionStore: {},
sessionKey: "agent:main:main",
storePath: "/tmp/sessions.json",
resolvedThinkLevel: "low",
resolvedReasoningLevel: "off",
blockReplyChunking: {
minChars: 1,
maxChars: 200,
breakPreference: "paragraph",
},
resolvedBlockStreamingBreak: "text_end",
opts: { onBlockReply },
isNewSession: false,
});
expect(result).toBeUndefined();
expect(onBlockReply).toHaveBeenCalledWith({
text: "Side answer.",
btw: { question: "What changed?" },
});
await vi.waitFor(() => {
expect(appendCustomEntryMock).toHaveBeenCalledWith(
BTW_CUSTOM_TYPE,
expect.objectContaining({
question: "What changed?",
answer: "Side answer.",
provider: "anthropic",
model: "claude-sonnet-4-5",
}),
);
});
});
it("returns a final payload when block streaming is unavailable", async () => {
streamSimpleMock.mockReturnValue(
makeAsyncEvents([
{
type: "done",
reason: "stop",
message: {
role: "assistant",
content: [{ type: "text", text: "Final answer." }],
provider: "anthropic",
api: "anthropic-messages",
model: "claude-sonnet-4-5",
stopReason: "stop",
usage: {
input: 1,
output: 2,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 3,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
timestamp: Date.now(),
},
},
]),
);
const result = await runBtwSideQuestion({
cfg: {} as never,
agentDir: "/tmp/agent",
provider: "anthropic",
model: "claude-sonnet-4-5",
question: "What changed?",
sessionEntry: createSessionEntry(),
resolvedReasoningLevel: "off",
opts: {},
isNewSession: false,
});
expect(result).toEqual({ text: "Final answer." });
});
it("fails when the current branch has no messages", async () => {
buildSessionContextMock.mockReturnValue({ messages: [] });
streamSimpleMock.mockReturnValue(makeAsyncEvents([]));
await expect(
runBtwSideQuestion({
cfg: {} as never,
agentDir: "/tmp/agent",
provider: "anthropic",
model: "claude-sonnet-4-5",
question: "What changed?",
sessionEntry: createSessionEntry(),
resolvedReasoningLevel: "off",
opts: {},
isNewSession: false,
}),
).rejects.toThrow("No active session context.");
});
it("uses active-run snapshot messages for BTW context while the main run is in flight", async () => {
buildSessionContextMock.mockReturnValue({ messages: [] });
getActiveEmbeddedRunSnapshotMock.mockReturnValue({
transcriptLeafId: "assistant-1",
messages: [
{
role: "user",
content: [
{ type: "text", text: "write some things then wait 30 seconds and write more" },
],
timestamp: 1,
},
],
});
streamSimpleMock.mockReturnValue(
makeAsyncEvents([
{
type: "done",
reason: "stop",
message: {
role: "assistant",
content: [{ type: "text", text: "323" }],
provider: "anthropic",
api: "anthropic-messages",
model: "claude-sonnet-4-5",
stopReason: "stop",
usage: {
input: 1,
output: 2,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 3,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
timestamp: Date.now(),
},
},
]),
);
const result = await runBtwSideQuestion({
cfg: {} as never,
agentDir: "/tmp/agent",
provider: "anthropic",
model: "claude-sonnet-4-5",
question: "What is 17 * 19?",
sessionEntry: createSessionEntry(),
resolvedReasoningLevel: "off",
opts: {},
isNewSession: false,
});
expect(result).toEqual({ text: "323" });
expect(streamSimpleMock).toHaveBeenCalledWith(
expect.anything(),
expect.objectContaining({
systemPrompt: expect.stringContaining("ephemeral /btw side question"),
messages: expect.arrayContaining([
expect.objectContaining({ role: "user" }),
expect.objectContaining({
role: "user",
content: [
{
type: "text",
text: expect.stringContaining(
"<btw_side_question>\nWhat is 17 * 19?\n</btw_side_question>",
),
},
],
}),
]),
}),
expect.anything(),
);
});
it("uses the in-flight prompt as background only when there is no prior transcript context", async () => {
buildSessionContextMock.mockReturnValue({ messages: [] });
getActiveEmbeddedRunSnapshotMock.mockReturnValue({
transcriptLeafId: null,
messages: [],
inFlightPrompt: "build me a tic-tac-toe game in brainfuck",
});
streamSimpleMock.mockReturnValue(
makeAsyncEvents([
{
type: "done",
reason: "stop",
message: {
role: "assistant",
content: [{ type: "text", text: "You're building a tic-tac-toe game in Brainfuck." }],
provider: "anthropic",
api: "anthropic-messages",
model: "claude-sonnet-4-5",
stopReason: "stop",
usage: {
input: 1,
output: 2,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 3,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
timestamp: Date.now(),
},
},
]),
);
const result = await runBtwSideQuestion({
cfg: {} as never,
agentDir: "/tmp/agent",
provider: "anthropic",
model: "claude-sonnet-4-5",
question: "what are we doing?",
sessionEntry: createSessionEntry(),
resolvedReasoningLevel: "off",
opts: {},
isNewSession: false,
});
expect(result).toEqual({ text: "You're building a tic-tac-toe game in Brainfuck." });
expect(streamSimpleMock).toHaveBeenCalledWith(
expect.anything(),
expect.objectContaining({
messages: [
expect.objectContaining({
role: "user",
content: [
{
type: "text",
text: expect.stringContaining(
"<in_flight_main_task>\nbuild me a tic-tac-toe game in brainfuck\n</in_flight_main_task>",
),
},
],
}),
],
}),
expect.anything(),
);
});
it("wraps the side question so the model does not treat it as a main-task continuation", async () => {
streamSimpleMock.mockReturnValue(
makeAsyncEvents([
{
type: "done",
reason: "stop",
message: {
role: "assistant",
content: [{ type: "text", text: "About 93 million miles." }],
provider: "anthropic",
api: "anthropic-messages",
model: "claude-sonnet-4-5",
stopReason: "stop",
usage: {
input: 1,
output: 2,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 3,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
timestamp: Date.now(),
},
},
]),
);
await runBtwSideQuestion({
cfg: {} as never,
agentDir: "/tmp/agent",
provider: "anthropic",
model: "claude-sonnet-4-5",
question: "what is the distance to the sun?",
sessionEntry: createSessionEntry(),
resolvedReasoningLevel: "off",
opts: {},
isNewSession: false,
});
const [, context] = streamSimpleMock.mock.calls[0] ?? [];
expect(context).toMatchObject({
systemPrompt: expect.stringContaining(
"Do not continue, resume, or complete any unfinished task",
),
});
expect(context).toMatchObject({
messages: expect.arrayContaining([
expect.objectContaining({
role: "user",
content: [
{
type: "text",
text: expect.stringContaining(
"Ignore any unfinished task in the conversation while answering it.",
),
},
],
}),
]),
});
});
it("branches away from an unresolved trailing user turn before building BTW context", async () => {
getLeafEntryMock.mockReturnValue({
type: "message",
parentId: "assistant-1",
message: { role: "user" },
});
streamSimpleMock.mockReturnValue(
makeAsyncEvents([
{
type: "done",
reason: "stop",
message: {
role: "assistant",
content: [{ type: "text", text: "323" }],
provider: "anthropic",
api: "anthropic-messages",
model: "claude-sonnet-4-5",
stopReason: "stop",
usage: {
input: 1,
output: 2,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 3,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
timestamp: Date.now(),
},
},
]),
);
const result = await runBtwSideQuestion({
cfg: {} as never,
agentDir: "/tmp/agent",
provider: "anthropic",
model: "claude-sonnet-4-5",
question: "What is 17 * 19?",
sessionEntry: createSessionEntry(),
resolvedReasoningLevel: "off",
opts: {},
isNewSession: false,
});
expect(branchMock).toHaveBeenCalledWith("assistant-1");
expect(resetLeafMock).not.toHaveBeenCalled();
expect(buildSessionContextMock).toHaveBeenCalledTimes(1);
expect(result).toEqual({ text: "323" });
});
it("branches to the active run snapshot leaf when the session is busy", async () => {
getActiveEmbeddedRunSnapshotMock.mockReturnValue({
transcriptLeafId: "assistant-seed",
});
streamSimpleMock.mockReturnValue(
makeAsyncEvents([
{
type: "done",
reason: "stop",
message: {
role: "assistant",
content: [{ type: "text", text: "323" }],
provider: "anthropic",
api: "anthropic-messages",
model: "claude-sonnet-4-5",
stopReason: "stop",
usage: {
input: 1,
output: 2,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 3,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
timestamp: Date.now(),
},
},
]),
);
const result = await runBtwSideQuestion({
cfg: {} as never,
agentDir: "/tmp/agent",
provider: "anthropic",
model: "claude-sonnet-4-5",
question: "What is 17 * 19?",
sessionEntry: createSessionEntry(),
resolvedReasoningLevel: "off",
opts: {},
isNewSession: false,
});
expect(branchMock).toHaveBeenCalledWith("assistant-seed");
expect(getLeafEntryMock).not.toHaveBeenCalled();
expect(result).toEqual({ text: "323" });
});
it("falls back when the active run snapshot leaf no longer exists", async () => {
getActiveEmbeddedRunSnapshotMock.mockReturnValue({
transcriptLeafId: "assistant-gone",
});
branchMock.mockImplementationOnce(() => {
throw new Error("Entry 3235c7c4 not found");
});
streamSimpleMock.mockReturnValue(
makeAsyncEvents([
{
type: "done",
reason: "stop",
message: {
role: "assistant",
content: [{ type: "text", text: "323" }],
provider: "anthropic",
api: "anthropic-messages",
model: "claude-sonnet-4-5",
stopReason: "stop",
usage: {
input: 1,
output: 2,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 3,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
timestamp: Date.now(),
},
},
]),
);
const result = await runBtwSideQuestion({
cfg: {} as never,
agentDir: "/tmp/agent",
provider: "anthropic",
model: "claude-sonnet-4-5",
question: "What is 17 * 19?",
sessionEntry: createSessionEntry(),
resolvedReasoningLevel: "off",
opts: {},
isNewSession: false,
});
expect(branchMock).toHaveBeenCalledWith("assistant-gone");
expect(resetLeafMock).toHaveBeenCalled();
expect(result).toEqual({ text: "323" });
expect(diagDebugMock).toHaveBeenCalledWith(
expect.stringContaining("btw snapshot leaf unavailable: sessionId=session-1"),
);
});
it("returns the BTW answer and retries transcript persistence after a session lock", async () => {
acquireSessionWriteLockMock
.mockRejectedValueOnce(
new Error("session file locked (timeout 250ms): pid=123 /tmp/session.lock"),
)
.mockResolvedValueOnce({
release: vi.fn().mockResolvedValue(undefined),
});
streamSimpleMock.mockReturnValue(
makeAsyncEvents([
{
type: "done",
reason: "stop",
message: {
role: "assistant",
content: [{ type: "text", text: "323" }],
provider: "anthropic",
api: "anthropic-messages",
model: "claude-sonnet-4-5",
stopReason: "stop",
usage: {
input: 1,
output: 2,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 3,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
timestamp: Date.now(),
},
},
]),
);
const result = await runBtwSideQuestion({
cfg: {} as never,
agentDir: "/tmp/agent",
provider: "anthropic",
model: "claude-sonnet-4-5",
question: "What is 17 * 19?",
sessionEntry: createSessionEntry(),
resolvedReasoningLevel: "off",
opts: {},
isNewSession: false,
});
expect(result).toEqual({ text: "323" });
await vi.waitFor(() => {
expect(waitForEmbeddedPiRunEndMock).toHaveBeenCalledWith("session-1", 30000);
expect(appendCustomEntryMock).toHaveBeenCalledWith(
BTW_CUSTOM_TYPE,
expect.objectContaining({
question: "What is 17 * 19?",
answer: "323",
}),
);
});
});
it("logs deferred persistence failures through the diagnostic logger", async () => {
acquireSessionWriteLockMock
.mockRejectedValueOnce(
new Error("session file locked (timeout 250ms): pid=123 /tmp/session.lock"),
)
.mockRejectedValueOnce(
new Error("session file locked (timeout 10000ms): pid=123 /tmp/session.lock"),
);
streamSimpleMock.mockReturnValue(
makeAsyncEvents([
{
type: "done",
reason: "stop",
message: {
role: "assistant",
content: [{ type: "text", text: "323" }],
provider: "anthropic",
api: "anthropic-messages",
model: "claude-sonnet-4-5",
stopReason: "stop",
usage: {
input: 1,
output: 2,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 3,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
timestamp: Date.now(),
},
},
]),
);
const result = await runBtwSideQuestion({
cfg: {} as never,
agentDir: "/tmp/agent",
provider: "anthropic",
model: "claude-sonnet-4-5",
question: "What is 17 * 19?",
sessionEntry: createSessionEntry(),
resolvedReasoningLevel: "off",
opts: {},
isNewSession: false,
});
expect(result).toEqual({ text: "323" });
await vi.waitFor(() => {
expect(diagWarnMock).toHaveBeenCalledWith(
expect.stringContaining("btw transcript persistence skipped: sessionId=session-1"),
);
});
});
it("excludes tool results from BTW context to avoid replaying raw tool output", async () => {
getActiveEmbeddedRunSnapshotMock.mockReturnValue({
transcriptLeafId: "assistant-1",
messages: [
{
role: "user",
content: [{ type: "text", text: "seed" }],
timestamp: 1,
},
{
role: "toolResult",
content: [{ type: "text", text: "sensitive tool output" }],
details: { raw: "secret" },
timestamp: 2,
},
{
role: "assistant",
content: [{ type: "text", text: "done" }],
timestamp: 3,
},
],
});
streamSimpleMock.mockReturnValue(
makeAsyncEvents([
{
type: "done",
reason: "stop",
message: {
role: "assistant",
content: [{ type: "text", text: "323" }],
provider: "anthropic",
api: "anthropic-messages",
model: "claude-sonnet-4-5",
stopReason: "stop",
usage: {
input: 1,
output: 2,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 3,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
timestamp: Date.now(),
},
},
]),
);
await runBtwSideQuestion({
cfg: {} as never,
agentDir: "/tmp/agent",
provider: "anthropic",
model: "claude-sonnet-4-5",
question: "What is 17 * 19?",
sessionEntry: createSessionEntry(),
resolvedReasoningLevel: "off",
opts: {},
isNewSession: false,
});
const [, context] = streamSimpleMock.mock.calls[0] ?? [];
expect(context).toMatchObject({
messages: [
expect.objectContaining({ role: "user" }),
expect.objectContaining({ role: "assistant" }),
expect.objectContaining({ role: "user" }),
],
});
expect((context as { messages?: Array<{ role?: string }> }).messages).not.toEqual(
expect.arrayContaining([expect.objectContaining({ role: "toolResult" })]),
);
});
});

513
src/agents/btw.ts Normal file
View File

@@ -0,0 +1,513 @@
import {
streamSimple,
type Api,
type AssistantMessageEvent,
type ThinkingLevel as SimpleThinkingLevel,
type Message,
type Model,
} from "@mariozechner/pi-ai";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import type { ReasoningLevel, ThinkLevel } from "../auto-reply/thinking.js";
import type { GetReplyOptions, ReplyPayload } from "../auto-reply/types.js";
import type { OpenClawConfig } from "../config/config.js";
import {
resolveSessionFilePath,
resolveSessionFilePathOptions,
type SessionEntry,
} from "../config/sessions.js";
import { diagnosticLogger as diag } from "../logging/diagnostic.js";
import { resolveSessionAuthProfileOverride } from "./auth-profiles/session-override.js";
import { getApiKeyForModel, requireApiKey } from "./model-auth.js";
import { ensureOpenClawModelsJson } from "./models-config.js";
import { EmbeddedBlockChunker, type BlockReplyChunking } from "./pi-embedded-block-chunker.js";
import { resolveModelWithRegistry } from "./pi-embedded-runner/model.js";
import {
getActiveEmbeddedRunSnapshot,
waitForEmbeddedPiRunEnd,
} from "./pi-embedded-runner/runs.js";
import { mapThinkingLevel } from "./pi-embedded-runner/utils.js";
import { discoverAuthStorage, discoverModels } from "./pi-model-discovery.js";
import { stripToolResultDetails } from "./session-transcript-repair.js";
import { acquireSessionWriteLock } from "./session-write-lock.js";
const BTW_CUSTOM_TYPE = "openclaw:btw";
const BTW_PERSIST_TIMEOUT_MS = 250;
const BTW_PERSIST_RETRY_WAIT_MS = 30_000;
const BTW_PERSIST_RETRY_LOCK_MS = 10_000;
type SessionManagerLike = {
getLeafEntry?: () => {
id?: string;
type?: string;
parentId?: string | null;
message?: { role?: string };
} | null;
branch?: (parentId: string) => void;
resetLeaf?: () => void;
buildSessionContext: () => { messages?: unknown[] };
};
type BtwCustomEntryData = {
timestamp: number;
question: string;
answer: string;
provider: string;
model: string;
thinkingLevel: ThinkLevel | "off";
reasoningLevel: ReasoningLevel;
sessionKey?: string;
authProfileId?: string;
authProfileIdSource?: "auto" | "user";
usage?: unknown;
};
async function appendBtwCustomEntry(params: {
sessionFile: string;
timeoutMs: number;
entry: BtwCustomEntryData;
}) {
const lock = await acquireSessionWriteLock({
sessionFile: params.sessionFile,
timeoutMs: params.timeoutMs,
allowReentrant: false,
});
try {
const persisted = SessionManager.open(params.sessionFile);
persisted.appendCustomEntry(BTW_CUSTOM_TYPE, params.entry);
} finally {
await lock.release();
}
}
function isSessionLockError(error: unknown): boolean {
const message = error instanceof Error ? error.message : String(error);
return message.includes("session file locked");
}
function deferBtwCustomEntryPersist(params: {
sessionId: string;
sessionFile: string;
entry: BtwCustomEntryData;
}) {
void (async () => {
try {
await waitForEmbeddedPiRunEnd(params.sessionId, BTW_PERSIST_RETRY_WAIT_MS);
await appendBtwCustomEntry({
sessionFile: params.sessionFile,
timeoutMs: BTW_PERSIST_RETRY_LOCK_MS,
entry: params.entry,
});
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
diag.warn(`btw transcript persistence skipped: sessionId=${params.sessionId} err=${message}`);
}
})();
}
async function persistBtwCustomEntry(params: {
sessionId: string;
sessionFile: string;
entry: BtwCustomEntryData;
}) {
try {
await appendBtwCustomEntry({
sessionFile: params.sessionFile,
timeoutMs: BTW_PERSIST_TIMEOUT_MS,
entry: params.entry,
});
} catch (error) {
if (!isSessionLockError(error)) {
throw error;
}
deferBtwCustomEntryPersist({
sessionId: params.sessionId,
sessionFile: params.sessionFile,
entry: params.entry,
});
}
}
function persistBtwCustomEntryInBackground(params: {
sessionId: string;
sessionFile: string;
entry: BtwCustomEntryData;
}) {
void persistBtwCustomEntry(params).catch((error) => {
const message = error instanceof Error ? error.message : String(error);
diag.warn(`btw transcript persistence skipped: sessionId=${params.sessionId} err=${message}`);
});
}
function collectTextContent(content: Array<{ type?: string; text?: string }>): string {
return content
.filter((part): part is { type: "text"; text: string } => part.type === "text")
.map((part) => part.text)
.join("");
}
function collectThinkingContent(content: Array<{ type?: string; thinking?: string }>): string {
return content
.filter((part): part is { type: "thinking"; thinking: string } => part.type === "thinking")
.map((part) => part.thinking)
.join("");
}
function buildBtwSystemPrompt(): string {
return [
"You are answering an ephemeral /btw side question about the current conversation.",
"Use the conversation only as background context.",
"Answer only the side question in the last user message.",
"Do not continue, resume, or complete any unfinished task from the conversation.",
"Do not emit tool calls, pseudo-tool calls, shell commands, file writes, patches, or code unless the side question explicitly asks for them.",
"Do not say you will continue the main task after answering.",
"If the question can be answered briefly, answer briefly.",
].join("\n");
}
function buildBtwQuestionPrompt(question: string, inFlightPrompt?: string): string {
const lines = [
"Answer this side question only.",
"Ignore any unfinished task in the conversation while answering it.",
];
const trimmedPrompt = inFlightPrompt?.trim();
if (trimmedPrompt) {
lines.push(
"",
"Current in-flight main task request for background context only:",
"<in_flight_main_task>",
trimmedPrompt,
"</in_flight_main_task>",
"Do not continue or complete that task while answering the side question.",
);
}
lines.push("", "<btw_side_question>", question.trim(), "</btw_side_question>");
return lines.join("\n");
}
function toSimpleContextMessages(messages: unknown[]): Message[] {
const contextMessages = messages.filter((message): message is Message => {
if (!message || typeof message !== "object") {
return false;
}
const role = (message as { role?: unknown }).role;
return role === "user" || role === "assistant";
});
return stripToolResultDetails(
contextMessages as Parameters<typeof stripToolResultDetails>[0],
) as Message[];
}
function resolveSimpleThinkingLevel(level?: ThinkLevel): SimpleThinkingLevel | undefined {
if (!level || level === "off") {
return undefined;
}
return mapThinkingLevel(level) as SimpleThinkingLevel;
}
function resolveSessionTranscriptPath(params: {
sessionId: string;
sessionEntry?: SessionEntry;
sessionKey?: string;
storePath?: string;
}): string | undefined {
try {
const agentId = params.sessionKey?.split(":")[1];
const pathOpts = resolveSessionFilePathOptions({
agentId,
storePath: params.storePath,
});
return resolveSessionFilePath(params.sessionId, params.sessionEntry, pathOpts);
} catch (error) {
diag.debug(
`resolveSessionTranscriptPath failed: sessionId=${params.sessionId} err=${String(error)}`,
);
return undefined;
}
}
async function resolveRuntimeModel(params: {
cfg: OpenClawConfig;
provider: string;
model: string;
agentDir: string;
sessionEntry?: SessionEntry;
sessionStore?: Record<string, SessionEntry>;
sessionKey?: string;
storePath?: string;
isNewSession: boolean;
}): Promise<{
model: Model<Api>;
authProfileId?: string;
authProfileIdSource?: "auto" | "user";
}> {
await ensureOpenClawModelsJson(params.cfg, params.agentDir);
const authStorage = discoverAuthStorage(params.agentDir);
const modelRegistry = discoverModels(authStorage, params.agentDir);
const model = resolveModelWithRegistry({
provider: params.provider,
modelId: params.model,
modelRegistry,
cfg: params.cfg,
});
if (!model) {
throw new Error(`Unknown model: ${params.provider}/${params.model}`);
}
const authProfileId = await resolveSessionAuthProfileOverride({
cfg: params.cfg,
provider: params.provider,
agentDir: params.agentDir,
sessionEntry: params.sessionEntry,
sessionStore: params.sessionStore,
sessionKey: params.sessionKey,
storePath: params.storePath,
isNewSession: params.isNewSession,
});
return {
model,
authProfileId,
authProfileIdSource: params.sessionEntry?.authProfileOverrideSource,
};
}
type RunBtwSideQuestionParams = {
cfg: OpenClawConfig;
agentDir: string;
provider: string;
model: string;
question: string;
sessionEntry: SessionEntry;
sessionStore?: Record<string, SessionEntry>;
sessionKey?: string;
storePath?: string;
resolvedThinkLevel?: ThinkLevel;
resolvedReasoningLevel: ReasoningLevel;
blockReplyChunking?: BlockReplyChunking;
resolvedBlockStreamingBreak?: "text_end" | "message_end";
opts?: GetReplyOptions;
isNewSession: boolean;
};
export async function runBtwSideQuestion(
params: RunBtwSideQuestionParams,
): Promise<ReplyPayload | undefined> {
const sessionId = params.sessionEntry.sessionId?.trim();
if (!sessionId) {
throw new Error("No active session context.");
}
const sessionFile = resolveSessionTranscriptPath({
sessionId,
sessionEntry: params.sessionEntry,
sessionKey: params.sessionKey,
storePath: params.storePath,
});
if (!sessionFile) {
throw new Error("No active session transcript.");
}
const sessionManager = SessionManager.open(sessionFile) as SessionManagerLike;
const activeRunSnapshot = getActiveEmbeddedRunSnapshot(sessionId);
let messages: Message[] = [];
let inFlightPrompt: string | undefined;
if (Array.isArray(activeRunSnapshot?.messages) && activeRunSnapshot.messages.length > 0) {
messages = toSimpleContextMessages(activeRunSnapshot.messages);
inFlightPrompt = activeRunSnapshot.inFlightPrompt;
} else if (activeRunSnapshot) {
inFlightPrompt = activeRunSnapshot.inFlightPrompt;
if (activeRunSnapshot.transcriptLeafId && sessionManager.branch) {
try {
sessionManager.branch(activeRunSnapshot.transcriptLeafId);
} catch (error) {
diag.debug(
`btw snapshot leaf unavailable: sessionId=${sessionId} leaf=${activeRunSnapshot.transcriptLeafId} err=${String(error)}`,
);
sessionManager.resetLeaf?.();
}
} else {
sessionManager.resetLeaf?.();
}
} else {
const leafEntry = sessionManager.getLeafEntry?.();
if (leafEntry?.type === "message" && leafEntry.message?.role === "user") {
if (leafEntry.parentId && sessionManager.branch) {
sessionManager.branch(leafEntry.parentId);
} else {
sessionManager.resetLeaf?.();
}
}
}
if (messages.length === 0) {
const sessionContext = sessionManager.buildSessionContext();
messages = toSimpleContextMessages(
Array.isArray(sessionContext.messages) ? sessionContext.messages : [],
);
}
if (messages.length === 0 && !inFlightPrompt?.trim()) {
throw new Error("No active session context.");
}
const { model, authProfileId, authProfileIdSource } = await resolveRuntimeModel({
cfg: params.cfg,
provider: params.provider,
model: params.model,
agentDir: params.agentDir,
sessionEntry: params.sessionEntry,
sessionStore: params.sessionStore,
sessionKey: params.sessionKey,
storePath: params.storePath,
isNewSession: params.isNewSession,
});
const apiKeyInfo = await getApiKeyForModel({
model,
cfg: params.cfg,
profileId: authProfileId,
agentDir: params.agentDir,
});
const apiKey = requireApiKey(apiKeyInfo, model.provider);
const chunker =
params.opts?.onBlockReply && params.blockReplyChunking
? new EmbeddedBlockChunker(params.blockReplyChunking)
: undefined;
let emittedBlocks = 0;
let blockEmitChain: Promise<void> = Promise.resolve();
let answerText = "";
let reasoningText = "";
let assistantStarted = false;
let sawTextEvent = false;
const emitBlockChunk = async (text: string) => {
const trimmed = text.trim();
if (!trimmed || !params.opts?.onBlockReply) {
return;
}
emittedBlocks += 1;
blockEmitChain = blockEmitChain.then(async () => {
await params.opts?.onBlockReply?.({
text,
btw: { question: params.question },
});
});
await blockEmitChain;
};
const stream = streamSimple(
model,
{
systemPrompt: buildBtwSystemPrompt(),
messages: [
...messages,
{
role: "user",
content: [
{
type: "text",
text: buildBtwQuestionPrompt(params.question, inFlightPrompt),
},
],
timestamp: Date.now(),
},
],
},
{
apiKey,
reasoning: resolveSimpleThinkingLevel(params.resolvedThinkLevel),
signal: params.opts?.abortSignal,
},
);
let finalEvent:
| Extract<AssistantMessageEvent, { type: "done" }>
| Extract<AssistantMessageEvent, { type: "error" }>
| undefined;
for await (const event of stream) {
finalEvent = event.type === "done" || event.type === "error" ? event : finalEvent;
if (!assistantStarted && (event.type === "text_start" || event.type === "start")) {
assistantStarted = true;
await params.opts?.onAssistantMessageStart?.();
}
if (event.type === "text_delta") {
sawTextEvent = true;
answerText += event.delta;
chunker?.append(event.delta);
if (chunker && params.resolvedBlockStreamingBreak === "text_end") {
chunker.drain({ force: false, emit: (chunk) => void emitBlockChunk(chunk) });
}
continue;
}
if (event.type === "text_end" && chunker && params.resolvedBlockStreamingBreak === "text_end") {
chunker.drain({ force: true, emit: (chunk) => void emitBlockChunk(chunk) });
continue;
}
if (event.type === "thinking_delta") {
reasoningText += event.delta;
if (params.resolvedReasoningLevel !== "off") {
await params.opts?.onReasoningStream?.({ text: reasoningText, isReasoning: true });
}
continue;
}
if (event.type === "thinking_end" && params.resolvedReasoningLevel !== "off") {
await params.opts?.onReasoningEnd?.();
}
}
if (chunker && params.resolvedBlockStreamingBreak !== "text_end" && chunker.hasBuffered()) {
chunker.drain({ force: true, emit: (chunk) => void emitBlockChunk(chunk) });
}
await blockEmitChain;
if (finalEvent?.type === "error") {
const message = collectTextContent(finalEvent.error.content);
throw new Error(message || finalEvent.error.errorMessage || "BTW failed.");
}
const finalMessage = finalEvent?.type === "done" ? finalEvent.message : undefined;
if (finalMessage) {
if (!sawTextEvent) {
answerText = collectTextContent(finalMessage.content);
}
if (!reasoningText) {
reasoningText = collectThinkingContent(finalMessage.content);
}
}
const answer = answerText.trim();
if (!answer) {
throw new Error("No BTW response generated.");
}
const customEntry = {
timestamp: Date.now(),
question: params.question,
answer,
provider: model.provider,
model: model.id,
thinkingLevel: params.resolvedThinkLevel ?? "off",
reasoningLevel: params.resolvedReasoningLevel,
sessionKey: params.sessionKey,
authProfileId,
authProfileIdSource,
usage: finalMessage?.usage,
} satisfies BtwCustomEntryData;
persistBtwCustomEntryInBackground({
sessionId,
sessionFile,
entry: customEntry,
});
if (emittedBlocks > 0) {
return undefined;
}
return { text: answer };
}
export { BTW_CUSTOM_TYPE };

View File

@@ -111,6 +111,7 @@ import {
clearActiveEmbeddedRun,
type EmbeddedPiQueueHandle,
setActiveEmbeddedRun,
updateActiveEmbeddedRunSnapshot,
} from "../runs.js";
import { buildEmbeddedSandboxInfo } from "../sandbox-info.js";
import { prewarmSessionFile, trackSessionManagerAccess } from "../session-manager-cache.js";
@@ -830,6 +831,7 @@ function extractBalancedJsonPrefix(raw: string): string | null {
const MAX_TOOLCALL_REPAIR_BUFFER_CHARS = 64_000;
const MAX_TOOLCALL_REPAIR_TRAILING_CHARS = 3;
const TOOLCALL_REPAIR_ALLOWED_TRAILING_RE = /^[^\s{}[\]":,\\]{1,3}$/;
const MAX_BTW_SNAPSHOT_MESSAGES = 100;
function shouldAttemptMalformedToolCallRepair(partialJson: string, delta: string): boolean {
if (/[}\]]/.test(delta)) {
@@ -2376,6 +2378,8 @@ export async function runEmbeddedAttempt(
`runId=${params.runId} sessionId=${params.sessionId}`,
);
}
const transcriptLeafId =
(sessionManager.getLeafEntry() as { id?: string } | null | undefined)?.id ?? null;
try {
// Idempotent cleanup for legacy sessions with persisted image payloads.
@@ -2454,6 +2458,13 @@ export async function runEmbeddedAttempt(
});
}
const btwSnapshotMessages = activeSession.messages.slice(-MAX_BTW_SNAPSHOT_MESSAGES);
updateActiveEmbeddedRunSnapshot(params.sessionId, {
transcriptLeafId,
messages: btwSnapshotMessages,
inFlightPrompt: effectivePrompt,
});
// Only pass images option if there are actually images to pass
// This avoids potential issues with models that don't expect the images parameter
if (imageResult.images.length > 0) {

View File

@@ -4,7 +4,9 @@ import {
__testing,
abortEmbeddedPiRun,
clearActiveEmbeddedRun,
getActiveEmbeddedRunSnapshot,
setActiveEmbeddedRun,
updateActiveEmbeddedRunSnapshot,
waitForActiveEmbeddedRuns,
} from "./runs.js";
@@ -137,4 +139,28 @@ describe("pi-embedded runner run registry", () => {
runsB.__testing.resetActiveEmbeddedRuns();
}
});
it("tracks and clears per-session transcript snapshots for active runs", () => {
const handle = {
queueMessage: async () => {},
isStreaming: () => true,
isCompacting: () => false,
abort: vi.fn(),
};
setActiveEmbeddedRun("session-snapshot", handle);
updateActiveEmbeddedRunSnapshot("session-snapshot", {
transcriptLeafId: "assistant-1",
messages: [{ role: "user", content: [{ type: "text", text: "hello" }], timestamp: 1 }],
inFlightPrompt: "keep going",
});
expect(getActiveEmbeddedRunSnapshot("session-snapshot")).toEqual({
transcriptLeafId: "assistant-1",
messages: [{ role: "user", content: [{ type: "text", text: "hello" }], timestamp: 1 }],
inFlightPrompt: "keep going",
});
clearActiveEmbeddedRun("session-snapshot", handle);
expect(getActiveEmbeddedRunSnapshot("session-snapshot")).toBeUndefined();
});
});

View File

@@ -12,6 +12,12 @@ type EmbeddedPiQueueHandle = {
abort: () => void;
};
export type ActiveEmbeddedRunSnapshot = {
transcriptLeafId: string | null;
messages?: unknown[];
inFlightPrompt?: string;
};
type EmbeddedRunWaiter = {
resolve: (ended: boolean) => void;
timer: NodeJS.Timeout;
@@ -25,9 +31,11 @@ const EMBEDDED_RUN_STATE_KEY = Symbol.for("openclaw.embeddedRunState");
const embeddedRunState = resolveGlobalSingleton(EMBEDDED_RUN_STATE_KEY, () => ({
activeRuns: new Map<string, EmbeddedPiQueueHandle>(),
snapshots: new Map<string, ActiveEmbeddedRunSnapshot>(),
waiters: new Map<string, Set<EmbeddedRunWaiter>>(),
}));
const ACTIVE_EMBEDDED_RUNS = embeddedRunState.activeRuns;
const ACTIVE_EMBEDDED_RUN_SNAPSHOTS = embeddedRunState.snapshots;
const EMBEDDED_RUN_WAITERS = embeddedRunState.waiters;
export function queueEmbeddedPiMessage(sessionId: string, text: string): boolean {
@@ -135,6 +143,12 @@ export function getActiveEmbeddedRunCount(): number {
return ACTIVE_EMBEDDED_RUNS.size;
}
export function getActiveEmbeddedRunSnapshot(
sessionId: string,
): ActiveEmbeddedRunSnapshot | undefined {
return ACTIVE_EMBEDDED_RUN_SNAPSHOTS.get(sessionId);
}
/**
* Wait for active embedded runs to drain.
*
@@ -230,6 +244,16 @@ export function setActiveEmbeddedRun(
}
}
export function updateActiveEmbeddedRunSnapshot(
sessionId: string,
snapshot: ActiveEmbeddedRunSnapshot,
) {
if (!ACTIVE_EMBEDDED_RUNS.has(sessionId)) {
return;
}
ACTIVE_EMBEDDED_RUN_SNAPSHOTS.set(sessionId, snapshot);
}
export function clearActiveEmbeddedRun(
sessionId: string,
handle: EmbeddedPiQueueHandle,
@@ -237,6 +261,7 @@ export function clearActiveEmbeddedRun(
) {
if (ACTIVE_EMBEDDED_RUNS.get(sessionId) === handle) {
ACTIVE_EMBEDDED_RUNS.delete(sessionId);
ACTIVE_EMBEDDED_RUN_SNAPSHOTS.delete(sessionId);
logSessionStateChange({ sessionId, sessionKey, state: "idle", reason: "run_completed" });
if (!sessionId.startsWith("probe-")) {
diag.debug(`run cleared: sessionId=${sessionId} totalActive=${ACTIVE_EMBEDDED_RUNS.size}`);
@@ -257,6 +282,7 @@ export const __testing = {
}
EMBEDDED_RUN_WAITERS.clear();
ACTIVE_EMBEDDED_RUNS.clear();
ACTIVE_EMBEDDED_RUN_SNAPSHOTS.clear();
},
};