From b28ad98a4cc0890f3ef2f867042e953dd96045e0 Mon Sep 17 00:00:00 2001 From: Nimrod Gutman Date: Fri, 13 Mar 2026 22:33:27 +0200 Subject: [PATCH] feat(agent): add /btw side questions --- src/agents/btw.test.ts | 428 ++++++++++++++++++ src/agents/btw.ts | 426 +++++++++++++++++ src/agents/pi-embedded-runner/run/attempt.ts | 5 + src/agents/pi-embedded-runner/runs.test.ts | 22 + src/agents/pi-embedded-runner/runs.ts | 24 + src/auto-reply/commands-registry.data.ts | 8 + src/auto-reply/reply/commands-btw.test.ts | 93 ++++ src/auto-reply/reply/commands-btw.ts | 67 +++ src/auto-reply/reply/commands-core.ts | 2 + src/auto-reply/reply/commands-types.ts | 10 +- .../reply/get-reply-inline-actions.ts | 12 + src/auto-reply/reply/get-reply.ts | 2 + src/plugins/commands.ts | 1 + 13 files changed, 1099 insertions(+), 1 deletion(-) create mode 100644 src/agents/btw.test.ts create mode 100644 src/agents/btw.ts create mode 100644 src/auto-reply/reply/commands-btw.test.ts create mode 100644 src/auto-reply/reply/commands-btw.ts diff --git a/src/agents/btw.test.ts b/src/agents/btw.test.ts new file mode 100644 index 00000000000..76d7e0a8572 --- /dev/null +++ b/src/agents/btw.test.ts @@ -0,0 +1,428 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { SessionEntry } from "../config/sessions.js"; + +const streamSimpleMock = vi.fn(); +const appendCustomEntryMock = vi.fn(); +const buildSessionContextMock = vi.fn(); +const getLeafEntryMock = vi.fn(); +const branchMock = vi.fn(); +const resetLeafMock = vi.fn(); +const ensureOpenClawModelsJsonMock = vi.fn(); +const discoverAuthStorageMock = vi.fn(); +const discoverModelsMock = vi.fn(); +const resolveModelWithRegistryMock = vi.fn(); +const getApiKeyForModelMock = vi.fn(); +const requireApiKeyMock = vi.fn(); +const acquireSessionWriteLockMock = vi.fn(); +const resolveSessionAuthProfileOverrideMock = vi.fn(); +const getActiveEmbeddedRunSnapshotMock = vi.fn(); +const waitForEmbeddedPiRunEndMock = vi.fn(); + +vi.mock("@mariozechner/pi-ai", () => ({ + streamSimple: (...args: unknown[]) => streamSimpleMock(...args), +})); + +vi.mock("@mariozechner/pi-coding-agent", () => ({ + SessionManager: { + open: () => ({ + getLeafEntry: getLeafEntryMock, + branch: branchMock, + resetLeaf: resetLeafMock, + buildSessionContext: buildSessionContextMock, + appendCustomEntry: appendCustomEntryMock, + }), + }, +})); + +vi.mock("./models-config.js", () => ({ + ensureOpenClawModelsJson: (...args: unknown[]) => ensureOpenClawModelsJsonMock(...args), +})); + +vi.mock("./pi-model-discovery.js", () => ({ + discoverAuthStorage: (...args: unknown[]) => discoverAuthStorageMock(...args), + discoverModels: (...args: unknown[]) => discoverModelsMock(...args), +})); + +vi.mock("./pi-embedded-runner/model.js", () => ({ + resolveModelWithRegistry: (...args: unknown[]) => resolveModelWithRegistryMock(...args), +})); + +vi.mock("./model-auth.js", () => ({ + getApiKeyForModel: (...args: unknown[]) => getApiKeyForModelMock(...args), + requireApiKey: (...args: unknown[]) => requireApiKeyMock(...args), +})); + +vi.mock("./session-write-lock.js", () => ({ + acquireSessionWriteLock: (...args: unknown[]) => acquireSessionWriteLockMock(...args), +})); + +vi.mock("./pi-embedded-runner/runs.js", () => ({ + getActiveEmbeddedRunSnapshot: (...args: unknown[]) => getActiveEmbeddedRunSnapshotMock(...args), + waitForEmbeddedPiRunEnd: (...args: unknown[]) => waitForEmbeddedPiRunEndMock(...args), +})); + +vi.mock("./auth-profiles/session-override.js", () => ({ + resolveSessionAuthProfileOverride: (...args: unknown[]) => + resolveSessionAuthProfileOverrideMock(...args), +})); + +const { BTW_CUSTOM_TYPE, runBtwSideQuestion } = await import("./btw.js"); + +function makeAsyncEvents(events: unknown[]) { + return { + async *[Symbol.asyncIterator]() { + for (const event of events) { + yield event; + } + }, + }; +} + +function createSessionEntry(overrides: Partial = {}): SessionEntry { + return { + sessionId: "session-1", + sessionFile: "session-1.jsonl", + updatedAt: Date.now(), + ...overrides, + }; +} + +describe("runBtwSideQuestion", () => { + beforeEach(() => { + streamSimpleMock.mockReset(); + appendCustomEntryMock.mockReset(); + buildSessionContextMock.mockReset(); + getLeafEntryMock.mockReset(); + branchMock.mockReset(); + resetLeafMock.mockReset(); + ensureOpenClawModelsJsonMock.mockReset(); + discoverAuthStorageMock.mockReset(); + discoverModelsMock.mockReset(); + resolveModelWithRegistryMock.mockReset(); + getApiKeyForModelMock.mockReset(); + requireApiKeyMock.mockReset(); + acquireSessionWriteLockMock.mockReset(); + resolveSessionAuthProfileOverrideMock.mockReset(); + getActiveEmbeddedRunSnapshotMock.mockReset(); + waitForEmbeddedPiRunEndMock.mockReset(); + + buildSessionContextMock.mockReturnValue({ + messages: [{ role: "user", content: [{ type: "text", text: "hi" }], timestamp: 1 }], + }); + getLeafEntryMock.mockReturnValue(null); + resolveModelWithRegistryMock.mockReturnValue({ + provider: "anthropic", + id: "claude-sonnet-4-5", + api: "anthropic-messages", + }); + getApiKeyForModelMock.mockResolvedValue({ apiKey: "secret", mode: "api-key", source: "test" }); + requireApiKeyMock.mockReturnValue("secret"); + acquireSessionWriteLockMock.mockResolvedValue({ + release: vi.fn().mockResolvedValue(undefined), + }); + resolveSessionAuthProfileOverrideMock.mockResolvedValue("profile-1"); + getActiveEmbeddedRunSnapshotMock.mockReturnValue(undefined); + waitForEmbeddedPiRunEndMock.mockResolvedValue(true); + }); + + it("streams blocks and persists a non-context custom entry", async () => { + const onBlockReply = vi.fn().mockResolvedValue(undefined); + streamSimpleMock.mockReturnValue( + makeAsyncEvents([ + { + type: "text_delta", + delta: "Side answer.", + partial: { + role: "assistant", + content: [], + provider: "anthropic", + model: "claude-sonnet-4-5", + }, + }, + { + type: "text_end", + content: "Side answer.", + contentIndex: 0, + partial: { + role: "assistant", + content: [], + provider: "anthropic", + model: "claude-sonnet-4-5", + }, + }, + { + type: "done", + reason: "stop", + message: { + role: "assistant", + content: [{ type: "text", text: "Side answer." }], + provider: "anthropic", + api: "anthropic-messages", + model: "claude-sonnet-4-5", + stopReason: "stop", + usage: { + input: 1, + output: 2, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 3, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + timestamp: Date.now(), + }, + }, + ]), + ); + + const result = await runBtwSideQuestion({ + cfg: {} as never, + agentDir: "/tmp/agent", + provider: "anthropic", + model: "claude-sonnet-4-5", + question: "What changed?", + sessionEntry: createSessionEntry(), + sessionStore: {}, + sessionKey: "agent:main:main", + storePath: "/tmp/sessions.json", + resolvedThinkLevel: "low", + resolvedReasoningLevel: "off", + blockReplyChunking: { + minChars: 1, + maxChars: 200, + breakPreference: "paragraph", + }, + resolvedBlockStreamingBreak: "text_end", + opts: { onBlockReply }, + isNewSession: false, + }); + + expect(result).toBeUndefined(); + expect(onBlockReply).toHaveBeenCalledWith({ text: "Side answer." }); + expect(appendCustomEntryMock).toHaveBeenCalledWith( + BTW_CUSTOM_TYPE, + expect.objectContaining({ + question: "What changed?", + answer: "Side answer.", + provider: "anthropic", + model: "claude-sonnet-4-5", + }), + ); + }); + + it("returns a final payload when block streaming is unavailable", async () => { + streamSimpleMock.mockReturnValue( + makeAsyncEvents([ + { + type: "done", + reason: "stop", + message: { + role: "assistant", + content: [{ type: "text", text: "Final answer." }], + provider: "anthropic", + api: "anthropic-messages", + model: "claude-sonnet-4-5", + stopReason: "stop", + usage: { + input: 1, + output: 2, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 3, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + timestamp: Date.now(), + }, + }, + ]), + ); + + const result = await runBtwSideQuestion({ + cfg: {} as never, + agentDir: "/tmp/agent", + provider: "anthropic", + model: "claude-sonnet-4-5", + question: "What changed?", + sessionEntry: createSessionEntry(), + resolvedReasoningLevel: "off", + opts: {}, + isNewSession: false, + }); + + expect(result).toEqual({ text: "Final answer." }); + }); + + it("fails when the current branch has no messages", async () => { + buildSessionContextMock.mockReturnValue({ messages: [] }); + streamSimpleMock.mockReturnValue(makeAsyncEvents([])); + + await expect( + runBtwSideQuestion({ + cfg: {} as never, + agentDir: "/tmp/agent", + provider: "anthropic", + model: "claude-sonnet-4-5", + question: "What changed?", + sessionEntry: createSessionEntry(), + resolvedReasoningLevel: "off", + opts: {}, + isNewSession: false, + }), + ).rejects.toThrow("No active session context."); + }); + + it("branches away from an unresolved trailing user turn before building BTW context", async () => { + getLeafEntryMock.mockReturnValue({ + type: "message", + parentId: "assistant-1", + message: { role: "user" }, + }); + streamSimpleMock.mockReturnValue( + makeAsyncEvents([ + { + type: "done", + reason: "stop", + message: { + role: "assistant", + content: [{ type: "text", text: "323" }], + provider: "anthropic", + api: "anthropic-messages", + model: "claude-sonnet-4-5", + stopReason: "stop", + usage: { + input: 1, + output: 2, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 3, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + timestamp: Date.now(), + }, + }, + ]), + ); + + const result = await runBtwSideQuestion({ + cfg: {} as never, + agentDir: "/tmp/agent", + provider: "anthropic", + model: "claude-sonnet-4-5", + question: "What is 17 * 19?", + sessionEntry: createSessionEntry(), + resolvedReasoningLevel: "off", + opts: {}, + isNewSession: false, + }); + + expect(branchMock).toHaveBeenCalledWith("assistant-1"); + expect(resetLeafMock).not.toHaveBeenCalled(); + expect(buildSessionContextMock).toHaveBeenCalledTimes(1); + expect(result).toEqual({ text: "323" }); + }); + + it("branches to the active run snapshot leaf when the session is busy", async () => { + getActiveEmbeddedRunSnapshotMock.mockReturnValue({ + transcriptLeafId: "assistant-seed", + }); + streamSimpleMock.mockReturnValue( + makeAsyncEvents([ + { + type: "done", + reason: "stop", + message: { + role: "assistant", + content: [{ type: "text", text: "323" }], + provider: "anthropic", + api: "anthropic-messages", + model: "claude-sonnet-4-5", + stopReason: "stop", + usage: { + input: 1, + output: 2, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 3, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + timestamp: Date.now(), + }, + }, + ]), + ); + + const result = await runBtwSideQuestion({ + cfg: {} as never, + agentDir: "/tmp/agent", + provider: "anthropic", + model: "claude-sonnet-4-5", + question: "What is 17 * 19?", + sessionEntry: createSessionEntry(), + resolvedReasoningLevel: "off", + opts: {}, + isNewSession: false, + }); + + expect(branchMock).toHaveBeenCalledWith("assistant-seed"); + expect(getLeafEntryMock).not.toHaveBeenCalled(); + expect(result).toEqual({ text: "323" }); + }); + + it("returns the BTW answer and retries transcript persistence after a session lock", async () => { + acquireSessionWriteLockMock + .mockRejectedValueOnce( + new Error("session file locked (timeout 250ms): pid=123 /tmp/session.lock"), + ) + .mockResolvedValueOnce({ + release: vi.fn().mockResolvedValue(undefined), + }); + streamSimpleMock.mockReturnValue( + makeAsyncEvents([ + { + type: "done", + reason: "stop", + message: { + role: "assistant", + content: [{ type: "text", text: "323" }], + provider: "anthropic", + api: "anthropic-messages", + model: "claude-sonnet-4-5", + stopReason: "stop", + usage: { + input: 1, + output: 2, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 3, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + timestamp: Date.now(), + }, + }, + ]), + ); + + const result = await runBtwSideQuestion({ + cfg: {} as never, + agentDir: "/tmp/agent", + provider: "anthropic", + model: "claude-sonnet-4-5", + question: "What is 17 * 19?", + sessionEntry: createSessionEntry(), + resolvedReasoningLevel: "off", + opts: {}, + isNewSession: false, + }); + + expect(result).toEqual({ text: "323" }); + expect(waitForEmbeddedPiRunEndMock).toHaveBeenCalledWith("session-1", 30000); + await vi.waitFor(() => { + expect(appendCustomEntryMock).toHaveBeenCalledWith( + BTW_CUSTOM_TYPE, + expect.objectContaining({ + question: "What is 17 * 19?", + answer: "323", + }), + ); + }); + }); +}); diff --git a/src/agents/btw.ts b/src/agents/btw.ts new file mode 100644 index 00000000000..ff0b8b63412 --- /dev/null +++ b/src/agents/btw.ts @@ -0,0 +1,426 @@ +import { + streamSimple, + type Api, + type AssistantMessageEvent, + type ThinkingLevel as SimpleThinkingLevel, + type Message, + type Model, +} from "@mariozechner/pi-ai"; +import { SessionManager } from "@mariozechner/pi-coding-agent"; +import type { ReasoningLevel, ThinkLevel } from "../auto-reply/thinking.js"; +import type { GetReplyOptions, ReplyPayload } from "../auto-reply/types.js"; +import type { OpenClawConfig } from "../config/config.js"; +import { + resolveSessionFilePath, + resolveSessionFilePathOptions, + type SessionEntry, +} from "../config/sessions.js"; +import { resolveSessionAuthProfileOverride } from "./auth-profiles/session-override.js"; +import { getApiKeyForModel, requireApiKey } from "./model-auth.js"; +import { ensureOpenClawModelsJson } from "./models-config.js"; +import { EmbeddedBlockChunker, type BlockReplyChunking } from "./pi-embedded-block-chunker.js"; +import { resolveModelWithRegistry } from "./pi-embedded-runner/model.js"; +import { + getActiveEmbeddedRunSnapshot, + waitForEmbeddedPiRunEnd, +} from "./pi-embedded-runner/runs.js"; +import { mapThinkingLevel } from "./pi-embedded-runner/utils.js"; +import { discoverAuthStorage, discoverModels } from "./pi-model-discovery.js"; +import { acquireSessionWriteLock } from "./session-write-lock.js"; + +const BTW_CUSTOM_TYPE = "openclaw:btw"; +const BTW_PERSIST_TIMEOUT_MS = 250; +const BTW_PERSIST_RETRY_WAIT_MS = 30_000; +const BTW_PERSIST_RETRY_LOCK_MS = 10_000; + +type SessionManagerLike = { + getLeafEntry?: () => { + id?: string; + type?: string; + parentId?: string | null; + message?: { role?: string }; + } | null; + branch?: (parentId: string) => void; + resetLeaf?: () => void; + buildSessionContext: () => { messages?: unknown[] }; +}; + +type BtwCustomEntryData = { + timestamp: number; + question: string; + answer: string; + provider: string; + model: string; + thinkingLevel: ThinkLevel | "off"; + reasoningLevel: ReasoningLevel; + sessionKey?: string; + authProfileId?: string; + authProfileIdSource?: "auto" | "user"; + usage?: unknown; +}; + +async function appendBtwCustomEntry(params: { + sessionFile: string; + timeoutMs: number; + entry: BtwCustomEntryData; +}) { + const lock = await acquireSessionWriteLock({ + sessionFile: params.sessionFile, + timeoutMs: params.timeoutMs, + allowReentrant: false, + }); + try { + const persisted = SessionManager.open(params.sessionFile); + persisted.appendCustomEntry(BTW_CUSTOM_TYPE, params.entry); + } finally { + await lock.release(); + } +} + +function isSessionLockError(error: unknown): boolean { + const message = error instanceof Error ? error.message : String(error); + return message.includes("session file locked"); +} + +function deferBtwCustomEntryPersist(params: { + sessionId: string; + sessionFile: string; + entry: BtwCustomEntryData; +}) { + void (async () => { + try { + await waitForEmbeddedPiRunEnd(params.sessionId, BTW_PERSIST_RETRY_WAIT_MS); + await appendBtwCustomEntry({ + sessionFile: params.sessionFile, + timeoutMs: BTW_PERSIST_RETRY_LOCK_MS, + entry: params.entry, + }); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + console.warn(`[btw] skipped transcript persistence: ${message}`); + } + })(); +} + +function collectTextContent(content: Array<{ type?: string; text?: string }>): string { + return content + .filter((part): part is { type: "text"; text: string } => part.type === "text") + .map((part) => part.text) + .join(""); +} + +function collectThinkingContent(content: Array<{ type?: string; thinking?: string }>): string { + return content + .filter((part): part is { type: "thinking"; thinking: string } => part.type === "thinking") + .map((part) => part.thinking) + .join(""); +} + +function toSimpleContextMessages(messages: unknown[]): Message[] { + return messages.filter((message): message is Message => { + if (!message || typeof message !== "object") { + return false; + } + const role = (message as { role?: unknown }).role; + return role === "user" || role === "assistant" || role === "toolResult"; + }); +} + +function resolveSimpleThinkingLevel(level?: ThinkLevel): SimpleThinkingLevel | undefined { + if (!level || level === "off") { + return undefined; + } + return mapThinkingLevel(level) as SimpleThinkingLevel; +} + +function resolveSessionTranscriptPath(params: { + sessionId: string; + sessionEntry?: SessionEntry; + sessionKey?: string; + storePath?: string; +}): string | undefined { + try { + const agentId = params.sessionKey?.split(":")[1]; + const pathOpts = resolveSessionFilePathOptions({ + agentId, + storePath: params.storePath, + }); + return resolveSessionFilePath(params.sessionId, params.sessionEntry, pathOpts); + } catch { + return undefined; + } +} + +async function resolveRuntimeModel(params: { + cfg: OpenClawConfig; + provider: string; + model: string; + agentDir: string; + sessionEntry?: SessionEntry; + sessionStore?: Record; + sessionKey?: string; + storePath?: string; + isNewSession: boolean; +}): Promise<{ + model: Model; + authProfileId?: string; + authProfileIdSource?: "auto" | "user"; +}> { + await ensureOpenClawModelsJson(params.cfg, params.agentDir); + const authStorage = discoverAuthStorage(params.agentDir); + const modelRegistry = discoverModels(authStorage, params.agentDir); + const model = resolveModelWithRegistry({ + provider: params.provider, + modelId: params.model, + modelRegistry, + cfg: params.cfg, + }); + if (!model) { + throw new Error(`Unknown model: ${params.provider}/${params.model}`); + } + + const authProfileId = await resolveSessionAuthProfileOverride({ + cfg: params.cfg, + provider: params.provider, + agentDir: params.agentDir, + sessionEntry: params.sessionEntry, + sessionStore: params.sessionStore, + sessionKey: params.sessionKey, + storePath: params.storePath, + isNewSession: params.isNewSession, + }); + return { + model, + authProfileId, + authProfileIdSource: params.sessionEntry?.authProfileOverrideSource, + }; +} + +type RunBtwSideQuestionParams = { + cfg: OpenClawConfig; + agentDir: string; + provider: string; + model: string; + question: string; + sessionEntry: SessionEntry; + sessionStore?: Record; + sessionKey?: string; + storePath?: string; + resolvedThinkLevel?: ThinkLevel; + resolvedReasoningLevel: ReasoningLevel; + blockReplyChunking?: BlockReplyChunking; + resolvedBlockStreamingBreak?: "text_end" | "message_end"; + opts?: GetReplyOptions; + isNewSession: boolean; +}; + +export async function runBtwSideQuestion( + params: RunBtwSideQuestionParams, +): Promise { + const sessionId = params.sessionEntry.sessionId?.trim(); + if (!sessionId) { + throw new Error("No active session context."); + } + + const sessionFile = resolveSessionTranscriptPath({ + sessionId, + sessionEntry: params.sessionEntry, + sessionKey: params.sessionKey, + storePath: params.storePath, + }); + if (!sessionFile) { + throw new Error("No active session transcript."); + } + + const sessionManager = SessionManager.open(sessionFile) as SessionManagerLike; + const activeRunSnapshot = getActiveEmbeddedRunSnapshot(sessionId); + if (activeRunSnapshot) { + if (activeRunSnapshot.transcriptLeafId && sessionManager.branch) { + sessionManager.branch(activeRunSnapshot.transcriptLeafId); + } else { + sessionManager.resetLeaf?.(); + } + } else { + const leafEntry = sessionManager.getLeafEntry?.(); + if (leafEntry?.type === "message" && leafEntry.message?.role === "user") { + if (leafEntry.parentId && sessionManager.branch) { + sessionManager.branch(leafEntry.parentId); + } else { + sessionManager.resetLeaf?.(); + } + } + } + const sessionContext = sessionManager.buildSessionContext(); + const messages = toSimpleContextMessages( + Array.isArray(sessionContext.messages) ? sessionContext.messages : [], + ); + if (messages.length === 0) { + throw new Error("No active session context."); + } + + const { model, authProfileId, authProfileIdSource } = await resolveRuntimeModel({ + cfg: params.cfg, + provider: params.provider, + model: params.model, + agentDir: params.agentDir, + sessionEntry: params.sessionEntry, + sessionStore: params.sessionStore, + sessionKey: params.sessionKey, + storePath: params.storePath, + isNewSession: params.isNewSession, + }); + const apiKeyInfo = await getApiKeyForModel({ + model, + cfg: params.cfg, + profileId: authProfileId, + agentDir: params.agentDir, + }); + const apiKey = requireApiKey(apiKeyInfo, model.provider); + + const chunker = + params.opts?.onBlockReply && params.blockReplyChunking + ? new EmbeddedBlockChunker(params.blockReplyChunking) + : undefined; + let emittedBlocks = 0; + let blockEmitChain: Promise = Promise.resolve(); + let answerText = ""; + let reasoningText = ""; + let assistantStarted = false; + let sawTextEvent = false; + + const emitBlockChunk = async (text: string) => { + const trimmed = text.trim(); + if (!trimmed || !params.opts?.onBlockReply) { + return; + } + emittedBlocks += 1; + blockEmitChain = blockEmitChain.then(async () => { + await params.opts?.onBlockReply?.({ text }); + }); + await blockEmitChain; + }; + + const stream = streamSimple( + model, + { + messages: [ + ...messages, + { + role: "user", + content: [{ type: "text", text: params.question }], + timestamp: Date.now(), + }, + ], + }, + { + apiKey, + reasoning: resolveSimpleThinkingLevel(params.resolvedThinkLevel), + signal: params.opts?.abortSignal, + }, + ); + + let finalEvent: + | Extract + | Extract + | undefined; + + for await (const event of stream) { + finalEvent = event.type === "done" || event.type === "error" ? event : finalEvent; + + if (!assistantStarted && (event.type === "text_start" || event.type === "start")) { + assistantStarted = true; + await params.opts?.onAssistantMessageStart?.(); + } + + if (event.type === "text_delta") { + sawTextEvent = true; + answerText += event.delta; + chunker?.append(event.delta); + if (chunker && params.resolvedBlockStreamingBreak === "text_end") { + chunker.drain({ force: false, emit: (chunk) => void emitBlockChunk(chunk) }); + } + continue; + } + + if (event.type === "text_end" && chunker && params.resolvedBlockStreamingBreak === "text_end") { + chunker.drain({ force: true, emit: (chunk) => void emitBlockChunk(chunk) }); + continue; + } + + if (event.type === "thinking_delta") { + reasoningText += event.delta; + if (params.resolvedReasoningLevel !== "off") { + await params.opts?.onReasoningStream?.({ text: reasoningText, isReasoning: true }); + } + continue; + } + + if (event.type === "thinking_end" && params.resolvedReasoningLevel !== "off") { + await params.opts?.onReasoningEnd?.(); + } + } + + if (chunker && params.resolvedBlockStreamingBreak !== "text_end" && chunker.hasBuffered()) { + chunker.drain({ force: true, emit: (chunk) => void emitBlockChunk(chunk) }); + } + await blockEmitChain; + + if (finalEvent?.type === "error") { + const message = collectTextContent(finalEvent.error.content); + throw new Error(message || finalEvent.error.errorMessage || "BTW failed."); + } + + const finalMessage = finalEvent?.type === "done" ? finalEvent.message : undefined; + if (finalMessage) { + if (!sawTextEvent) { + answerText = collectTextContent(finalMessage.content); + } + if (!reasoningText) { + reasoningText = collectThinkingContent(finalMessage.content); + } + } + + const answer = answerText.trim(); + if (!answer) { + throw new Error("No BTW response generated."); + } + + const customEntry = { + timestamp: Date.now(), + question: params.question, + answer, + provider: model.provider, + model: model.id, + thinkingLevel: params.resolvedThinkLevel ?? "off", + reasoningLevel: params.resolvedReasoningLevel, + sessionKey: params.sessionKey, + authProfileId, + authProfileIdSource, + usage: finalMessage?.usage, + } satisfies BtwCustomEntryData; + + try { + await appendBtwCustomEntry({ + sessionFile, + timeoutMs: BTW_PERSIST_TIMEOUT_MS, + entry: customEntry, + }); + } catch (error) { + if (!isSessionLockError(error)) { + throw error; + } + deferBtwCustomEntryPersist({ + sessionId, + sessionFile, + entry: customEntry, + }); + } + + if (emittedBlocks > 0) { + return undefined; + } + + return { text: answer }; +} + +export { BTW_CUSTOM_TYPE }; diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 274ef0ef865..450f8b60d1d 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -111,6 +111,7 @@ import { clearActiveEmbeddedRun, type EmbeddedPiQueueHandle, setActiveEmbeddedRun, + updateActiveEmbeddedRunSnapshot, } from "../runs.js"; import { buildEmbeddedSandboxInfo } from "../sandbox-info.js"; import { prewarmSessionFile, trackSessionManagerAccess } from "../session-manager-cache.js"; @@ -2376,6 +2377,10 @@ export async function runEmbeddedAttempt( `runId=${params.runId} sessionId=${params.sessionId}`, ); } + updateActiveEmbeddedRunSnapshot(params.sessionId, { + transcriptLeafId: + (sessionManager.getLeafEntry() as { id?: string } | null | undefined)?.id ?? null, + }); try { // Idempotent cleanup for legacy sessions with persisted image payloads. diff --git a/src/agents/pi-embedded-runner/runs.test.ts b/src/agents/pi-embedded-runner/runs.test.ts index d9bf90f961d..21a36a183d8 100644 --- a/src/agents/pi-embedded-runner/runs.test.ts +++ b/src/agents/pi-embedded-runner/runs.test.ts @@ -4,7 +4,9 @@ import { __testing, abortEmbeddedPiRun, clearActiveEmbeddedRun, + getActiveEmbeddedRunSnapshot, setActiveEmbeddedRun, + updateActiveEmbeddedRunSnapshot, waitForActiveEmbeddedRuns, } from "./runs.js"; @@ -137,4 +139,24 @@ describe("pi-embedded runner run registry", () => { runsB.__testing.resetActiveEmbeddedRuns(); } }); + + it("tracks and clears per-session transcript snapshots for active runs", () => { + const handle = { + queueMessage: async () => {}, + isStreaming: () => true, + isCompacting: () => false, + abort: vi.fn(), + }; + + setActiveEmbeddedRun("session-snapshot", handle); + updateActiveEmbeddedRunSnapshot("session-snapshot", { + transcriptLeafId: "assistant-1", + }); + expect(getActiveEmbeddedRunSnapshot("session-snapshot")).toEqual({ + transcriptLeafId: "assistant-1", + }); + + clearActiveEmbeddedRun("session-snapshot", handle); + expect(getActiveEmbeddedRunSnapshot("session-snapshot")).toBeUndefined(); + }); }); diff --git a/src/agents/pi-embedded-runner/runs.ts b/src/agents/pi-embedded-runner/runs.ts index 0d4cecc8372..ad853108dad 100644 --- a/src/agents/pi-embedded-runner/runs.ts +++ b/src/agents/pi-embedded-runner/runs.ts @@ -12,6 +12,10 @@ type EmbeddedPiQueueHandle = { abort: () => void; }; +export type ActiveEmbeddedRunSnapshot = { + transcriptLeafId: string | null; +}; + type EmbeddedRunWaiter = { resolve: (ended: boolean) => void; timer: NodeJS.Timeout; @@ -25,9 +29,11 @@ const EMBEDDED_RUN_STATE_KEY = Symbol.for("openclaw.embeddedRunState"); const embeddedRunState = resolveGlobalSingleton(EMBEDDED_RUN_STATE_KEY, () => ({ activeRuns: new Map(), + snapshots: new Map(), waiters: new Map>(), })); const ACTIVE_EMBEDDED_RUNS = embeddedRunState.activeRuns; +const ACTIVE_EMBEDDED_RUN_SNAPSHOTS = embeddedRunState.snapshots; const EMBEDDED_RUN_WAITERS = embeddedRunState.waiters; export function queueEmbeddedPiMessage(sessionId: string, text: string): boolean { @@ -135,6 +141,12 @@ export function getActiveEmbeddedRunCount(): number { return ACTIVE_EMBEDDED_RUNS.size; } +export function getActiveEmbeddedRunSnapshot( + sessionId: string, +): ActiveEmbeddedRunSnapshot | undefined { + return ACTIVE_EMBEDDED_RUN_SNAPSHOTS.get(sessionId); +} + /** * Wait for active embedded runs to drain. * @@ -230,6 +242,16 @@ export function setActiveEmbeddedRun( } } +export function updateActiveEmbeddedRunSnapshot( + sessionId: string, + snapshot: ActiveEmbeddedRunSnapshot, +) { + if (!ACTIVE_EMBEDDED_RUNS.has(sessionId)) { + return; + } + ACTIVE_EMBEDDED_RUN_SNAPSHOTS.set(sessionId, snapshot); +} + export function clearActiveEmbeddedRun( sessionId: string, handle: EmbeddedPiQueueHandle, @@ -237,6 +259,7 @@ export function clearActiveEmbeddedRun( ) { if (ACTIVE_EMBEDDED_RUNS.get(sessionId) === handle) { ACTIVE_EMBEDDED_RUNS.delete(sessionId); + ACTIVE_EMBEDDED_RUN_SNAPSHOTS.delete(sessionId); logSessionStateChange({ sessionId, sessionKey, state: "idle", reason: "run_completed" }); if (!sessionId.startsWith("probe-")) { diag.debug(`run cleared: sessionId=${sessionId} totalActive=${ACTIVE_EMBEDDED_RUNS.size}`); @@ -257,6 +280,7 @@ export const __testing = { } EMBEDDED_RUN_WAITERS.clear(); ACTIVE_EMBEDDED_RUNS.clear(); + ACTIVE_EMBEDDED_RUN_SNAPSHOTS.clear(); }, }; diff --git a/src/auto-reply/commands-registry.data.ts b/src/auto-reply/commands-registry.data.ts index c499f03c526..80f8d4bd73f 100644 --- a/src/auto-reply/commands-registry.data.ts +++ b/src/auto-reply/commands-registry.data.ts @@ -196,6 +196,14 @@ function buildChatCommands(): ChatCommandDefinition[] { acceptsArgs: true, category: "status", }), + defineChatCommand({ + key: "btw", + nativeName: "btw", + description: "Ask a side question without changing future session context.", + textAlias: "/btw", + acceptsArgs: true, + category: "tools", + }), defineChatCommand({ key: "export-session", nativeName: "export-session", diff --git a/src/auto-reply/reply/commands-btw.test.ts b/src/auto-reply/reply/commands-btw.test.ts new file mode 100644 index 00000000000..85f181db4d4 --- /dev/null +++ b/src/auto-reply/reply/commands-btw.test.ts @@ -0,0 +1,93 @@ +import { describe, expect, it, vi, beforeEach } from "vitest"; +import type { OpenClawConfig } from "../../config/config.js"; +import { buildCommandTestParams } from "./commands.test-harness.js"; + +const runBtwSideQuestionMock = vi.fn(); + +vi.mock("../../agents/btw.js", () => ({ + runBtwSideQuestion: (...args: unknown[]) => runBtwSideQuestionMock(...args), +})); + +const { handleBtwCommand } = await import("./commands-btw.js"); + +function buildParams(commandBody: string) { + const cfg = { + commands: { text: true }, + channels: { whatsapp: { allowFrom: ["*"] } }, + } as OpenClawConfig; + return buildCommandTestParams(commandBody, cfg, undefined, { workspaceDir: "/tmp/workspace" }); +} + +describe("handleBtwCommand", () => { + beforeEach(() => { + runBtwSideQuestionMock.mockReset(); + }); + + it("returns usage when the side question is missing", async () => { + const result = await handleBtwCommand(buildParams("/btw"), true); + + expect(result).toEqual({ + shouldContinue: false, + reply: { text: "Usage: /btw " }, + }); + }); + + it("requires an active session context", async () => { + const params = buildParams("/btw what changed?"); + params.sessionEntry = undefined; + + const result = await handleBtwCommand(params, true); + + expect(result).toEqual({ + shouldContinue: false, + reply: { text: "⚠️ /btw requires an active session with existing context." }, + }); + }); + + it("still delegates while the session is actively running", async () => { + const params = buildParams("/btw what changed?"); + params.agentDir = "/tmp/agent"; + params.sessionEntry = { + sessionId: "session-1", + updatedAt: Date.now(), + }; + runBtwSideQuestionMock.mockResolvedValue({ text: "snapshot answer" }); + + const result = await handleBtwCommand(params, true); + + expect(runBtwSideQuestionMock).toHaveBeenCalledWith( + expect.objectContaining({ + question: "what changed?", + sessionEntry: params.sessionEntry, + }), + ); + expect(result).toEqual({ + shouldContinue: false, + reply: { text: "snapshot answer" }, + }); + }); + + it("delegates to the side-question runner", async () => { + const params = buildParams("/btw what changed?"); + params.agentDir = "/tmp/agent"; + params.sessionEntry = { + sessionId: "session-1", + updatedAt: Date.now(), + }; + runBtwSideQuestionMock.mockResolvedValue({ text: "nothing important" }); + + const result = await handleBtwCommand(params, true); + + expect(runBtwSideQuestionMock).toHaveBeenCalledWith( + expect.objectContaining({ + question: "what changed?", + agentDir: "/tmp/agent", + sessionEntry: params.sessionEntry, + }), + ); + expect(result).toEqual({ + shouldContinue: false, + reply: { text: "nothing important" }, + }); + }); +}); diff --git a/src/auto-reply/reply/commands-btw.ts b/src/auto-reply/reply/commands-btw.ts new file mode 100644 index 00000000000..c35b3f115c4 --- /dev/null +++ b/src/auto-reply/reply/commands-btw.ts @@ -0,0 +1,67 @@ +import { runBtwSideQuestion } from "../../agents/btw.js"; +import type { CommandHandler } from "./commands-types.js"; + +const BTW_USAGE = "Usage: /btw "; + +export const handleBtwCommand: CommandHandler = async (params) => { + const match = params.command.commandBodyNormalized.match(/^\/btw(?:\s+(.*))?$/i); + if (!match) { + return null; + } + + const question = match[1]?.trim() ?? ""; + if (!question) { + return { + shouldContinue: false, + reply: { text: BTW_USAGE }, + }; + } + + if (!params.sessionEntry?.sessionId) { + return { + shouldContinue: false, + reply: { text: "⚠️ /btw requires an active session with existing context." }, + }; + } + + if (!params.agentDir) { + return { + shouldContinue: false, + reply: { + text: "⚠️ /btw is unavailable because the active agent directory could not be resolved.", + }, + }; + } + + try { + const reply = await runBtwSideQuestion({ + cfg: params.cfg, + agentDir: params.agentDir, + provider: params.provider, + model: params.model, + question, + sessionEntry: params.sessionEntry, + sessionStore: params.sessionStore, + sessionKey: params.sessionKey, + storePath: params.storePath, + resolvedThinkLevel: params.resolvedThinkLevel, + resolvedReasoningLevel: params.resolvedReasoningLevel, + blockReplyChunking: params.blockReplyChunking, + resolvedBlockStreamingBreak: params.resolvedBlockStreamingBreak, + opts: params.opts, + isNewSession: false, + }); + return { + shouldContinue: false, + reply, + }; + } catch (error) { + const message = error instanceof Error ? error.message.trim() : ""; + return { + shouldContinue: false, + reply: { + text: `⚠️ /btw failed${message ? `: ${message}` : "."}`, + }, + }; + } +}; diff --git a/src/auto-reply/reply/commands-core.ts b/src/auto-reply/reply/commands-core.ts index ca67bbc3549..7a6cc36c05e 100644 --- a/src/auto-reply/reply/commands-core.ts +++ b/src/auto-reply/reply/commands-core.ts @@ -11,6 +11,7 @@ import { resolveBoundAcpThreadSessionKey } from "./commands-acp/targets.js"; import { handleAllowlistCommand } from "./commands-allowlist.js"; import { handleApproveCommand } from "./commands-approve.js"; import { handleBashCommand } from "./commands-bash.js"; +import { handleBtwCommand } from "./commands-btw.js"; import { handleCompactCommand } from "./commands-compact.js"; import { handleConfigCommand, handleDebugCommand } from "./commands-config.js"; import { @@ -174,6 +175,7 @@ export async function handleCommands(params: HandleCommandsParams): Promise "always" | "mention"; resolvedThinkLevel?: ThinkLevel; resolvedVerboseLevel: VerboseLevel; resolvedReasoningLevel: ReasoningLevel; resolvedElevatedLevel?: ElevatedLevel; + blockReplyChunking?: { + minChars: number; + maxChars: number; + breakPreference: "paragraph" | "newline" | "sentence"; + flushOnParagraph?: boolean; + }; + resolvedBlockStreamingBreak?: "text_end" | "message_end"; resolveDefaultThinkingLevel: () => Promise; provider: string; model: string; diff --git a/src/auto-reply/reply/get-reply-inline-actions.ts b/src/auto-reply/reply/get-reply-inline-actions.ts index c312e1144e4..ff56526d9ce 100644 --- a/src/auto-reply/reply/get-reply-inline-actions.ts +++ b/src/auto-reply/reply/get-reply-inline-actions.ts @@ -113,6 +113,13 @@ export async function handleInlineActions(params: { resolvedVerboseLevel: VerboseLevel | undefined; resolvedReasoningLevel: ReasoningLevel; resolvedElevatedLevel: ElevatedLevel; + blockReplyChunking?: { + minChars: number; + maxChars: number; + breakPreference: "paragraph" | "newline" | "sentence"; + flushOnParagraph?: boolean; + }; + resolvedBlockStreamingBreak?: "text_end" | "message_end"; resolveDefaultThinkingLevel: Awaited< ReturnType >["resolveDefaultThinkingLevel"]; @@ -152,6 +159,8 @@ export async function handleInlineActions(params: { resolvedVerboseLevel, resolvedReasoningLevel, resolvedElevatedLevel, + blockReplyChunking, + resolvedBlockStreamingBreak, resolveDefaultThinkingLevel, provider, model, @@ -357,11 +366,14 @@ export async function handleInlineActions(params: { storePath, sessionScope, workspaceDir, + opts, defaultGroupActivation: defaultActivation, resolvedThinkLevel, resolvedVerboseLevel: resolvedVerboseLevel ?? "off", resolvedReasoningLevel, resolvedElevatedLevel, + blockReplyChunking, + resolvedBlockStreamingBreak, resolveDefaultThinkingLevel, provider, model, diff --git a/src/auto-reply/reply/get-reply.ts b/src/auto-reply/reply/get-reply.ts index 81dd478a84a..9cee46cc2c9 100644 --- a/src/auto-reply/reply/get-reply.ts +++ b/src/auto-reply/reply/get-reply.ts @@ -332,6 +332,8 @@ export async function getReplyFromConfig( resolvedVerboseLevel, resolvedReasoningLevel, resolvedElevatedLevel, + blockReplyChunking, + resolvedBlockStreamingBreak, resolveDefaultThinkingLevel: modelState.resolveDefaultThinkingLevel, provider, model, diff --git a/src/plugins/commands.ts b/src/plugins/commands.ts index f0ec39539c8..00e4b3b34ae 100644 --- a/src/plugins/commands.ts +++ b/src/plugins/commands.ts @@ -37,6 +37,7 @@ const RESERVED_COMMANDS = new Set([ "status", "whoami", "context", + "btw", // Session management "stop", "restart",