From b672b17cfd00581c6bd7140b2f1233d9ba6e6ffc Mon Sep 17 00:00:00 2001 From: Nimrod Gutman Date: Sat, 14 Mar 2026 12:22:27 +0200 Subject: [PATCH] feat(btw): isolate side-question delivery --- src/agents/btw.test.ts | 228 +++ src/agents/btw.ts | 98 +- src/agents/pi-embedded-runner/run/attempt.ts | 19 +- src/agents/pi-embedded-runner/runs.test.ts | 2 + src/agents/pi-embedded-runner/runs.ts | 1 + src/auto-reply/reply/commands-btw.test.ts | 4 +- src/auto-reply/reply/commands-btw.ts | 4 +- src/auto-reply/reply/commands-types.ts | 8 +- .../reply/get-reply-inline-actions.ts | 8 +- src/auto-reply/reply/reply-payloads.ts | 11 + src/auto-reply/reply/route-reply.test.ts | 30 + src/auto-reply/reply/route-reply.ts | 31 +- src/auto-reply/types.ts | 3 + src/commands/doctor-state-integrity.ts | 15 +- src/commands/sessions-cleanup.ts | 6 + src/config/sessions/artifacts.test.ts | 10 + src/config/sessions/artifacts.ts | 7 + src/config/sessions/disk-budget.ts | 26 +- src/gateway/server-methods/chat.ts | 147 +- .../server.chat.gateway-server-chat.test.ts | 116 ++ ...sessions.gateway-server-sessions-a.test.ts | 61 + src/gateway/session-utils.fs.test.ts | 102 ++ src/gateway/session-utils.fs.ts | 143 +- src/gateway/session-utils.ts | 1 + src/infra/outbound/deliver.test.ts | 575 +++++++- src/infra/outbound/outbound.test.ts | 1306 +++++++++++++++++ src/infra/outbound/payloads.ts | 7 +- src/infra/session-cost-usage.ts | 5 +- src/sessions/side-results.ts | 37 + src/tui/components/btw-inline-message.test.ts | 16 + src/tui/components/btw-inline-message.ts | 28 + src/tui/components/chat-log.test.ts | 21 + src/tui/components/chat-log.ts | 33 + src/tui/tui-command-handlers.test.ts | 40 +- src/tui/tui-command-handlers.ts | 34 +- src/tui/tui-event-handlers.test.ts | 49 +- src/tui/tui-event-handlers.ts | 37 +- src/tui/tui-session-actions.test.ts | 18 + src/tui/tui-session-actions.ts | 8 + src/tui/tui-types.ts | 11 + src/tui/tui.ts | 35 +- 41 files changed, 3173 insertions(+), 168 deletions(-) create mode 100644 src/sessions/side-results.ts create mode 100644 src/tui/components/btw-inline-message.test.ts create mode 100644 src/tui/components/btw-inline-message.ts diff --git a/src/agents/btw.test.ts b/src/agents/btw.test.ts index b2c222f5ac1..56ec021afcf 100644 --- a/src/agents/btw.test.ts +++ b/src/agents/btw.test.ts @@ -18,6 +18,8 @@ const resolveSessionAuthProfileOverrideMock = vi.fn(); const getActiveEmbeddedRunSnapshotMock = vi.fn(); const waitForEmbeddedPiRunEndMock = vi.fn(); const diagWarnMock = vi.fn(); +const diagDebugMock = vi.fn(); +const appendSessionSideResultMock = vi.fn(); vi.mock("@mariozechner/pi-ai", () => ({ streamSimple: (...args: unknown[]) => streamSimpleMock(...args), @@ -70,9 +72,14 @@ vi.mock("./auth-profiles/session-override.js", () => ({ vi.mock("../logging/diagnostic.js", () => ({ diagnosticLogger: { warn: (...args: unknown[]) => diagWarnMock(...args), + debug: (...args: unknown[]) => diagDebugMock(...args), }, })); +vi.mock("../sessions/side-results.js", () => ({ + appendSessionSideResult: (...args: unknown[]) => appendSessionSideResultMock(...args), +})); + const { BTW_CUSTOM_TYPE, runBtwSideQuestion } = await import("./btw.js"); function makeAsyncEvents(events: unknown[]) { @@ -113,6 +120,8 @@ describe("runBtwSideQuestion", () => { getActiveEmbeddedRunSnapshotMock.mockReset(); waitForEmbeddedPiRunEndMock.mockReset(); diagWarnMock.mockReset(); + diagDebugMock.mockReset(); + appendSessionSideResultMock.mockReset(); buildSessionContextMock.mockReturnValue({ messages: [{ role: "user", content: [{ type: "text", text: "hi" }], timestamp: 1 }], @@ -206,6 +215,14 @@ describe("runBtwSideQuestion", () => { expect(result).toBeUndefined(); expect(onBlockReply).toHaveBeenCalledWith({ text: "Side answer." }); + expect(appendSessionSideResultMock).toHaveBeenCalledWith({ + transcriptPath: expect.stringContaining("session-1.jsonl"), + result: expect.objectContaining({ + kind: "btw", + question: "What changed?", + text: "Side answer.", + }), + }); expect(appendCustomEntryMock).toHaveBeenCalledWith( BTW_CUSTOM_TYPE, expect.objectContaining({ @@ -278,6 +295,144 @@ describe("runBtwSideQuestion", () => { ).rejects.toThrow("No active session context."); }); + it("uses active-run snapshot messages for BTW context while the main run is in flight", async () => { + buildSessionContextMock.mockReturnValue({ messages: [] }); + getActiveEmbeddedRunSnapshotMock.mockReturnValue({ + transcriptLeafId: "assistant-1", + messages: [ + { + role: "user", + content: [ + { type: "text", text: "write some things then wait 30 seconds and write more" }, + ], + timestamp: 1, + }, + ], + }); + streamSimpleMock.mockReturnValue( + makeAsyncEvents([ + { + type: "done", + reason: "stop", + message: { + role: "assistant", + content: [{ type: "text", text: "323" }], + provider: "anthropic", + api: "anthropic-messages", + model: "claude-sonnet-4-5", + stopReason: "stop", + usage: { + input: 1, + output: 2, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 3, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + timestamp: Date.now(), + }, + }, + ]), + ); + + const result = await runBtwSideQuestion({ + cfg: {} as never, + agentDir: "/tmp/agent", + provider: "anthropic", + model: "claude-sonnet-4-5", + question: "What is 17 * 19?", + sessionEntry: createSessionEntry(), + resolvedReasoningLevel: "off", + opts: {}, + isNewSession: false, + }); + + expect(result).toEqual({ text: "323" }); + expect(streamSimpleMock).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + systemPrompt: expect.stringContaining("ephemeral /btw side question"), + messages: expect.arrayContaining([ + expect.objectContaining({ role: "user" }), + expect.objectContaining({ + role: "user", + content: [ + { + type: "text", + text: expect.stringContaining( + "\nWhat is 17 * 19?\n", + ), + }, + ], + }), + ]), + }), + expect.anything(), + ); + }); + + it("wraps the side question so the model does not treat it as a main-task continuation", async () => { + streamSimpleMock.mockReturnValue( + makeAsyncEvents([ + { + type: "done", + reason: "stop", + message: { + role: "assistant", + content: [{ type: "text", text: "About 93 million miles." }], + provider: "anthropic", + api: "anthropic-messages", + model: "claude-sonnet-4-5", + stopReason: "stop", + usage: { + input: 1, + output: 2, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 3, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + timestamp: Date.now(), + }, + }, + ]), + ); + + await runBtwSideQuestion({ + cfg: {} as never, + agentDir: "/tmp/agent", + provider: "anthropic", + model: "claude-sonnet-4-5", + question: "what is the distance to the sun?", + sessionEntry: createSessionEntry(), + resolvedReasoningLevel: "off", + opts: {}, + isNewSession: false, + }); + + const [, context] = streamSimpleMock.mock.calls[0] ?? []; + expect(context).toMatchObject({ + systemPrompt: expect.stringContaining( + "Do not continue, resume, or complete any unfinished task", + ), + }); + expect(context).toMatchObject({ + messages: expect.arrayContaining([ + expect.objectContaining({ + role: "user", + content: [ + { + type: "text", + text: expect.stringContaining( + "Ignore any unfinished task in the conversation while answering it.", + ), + }, + ], + }), + ]), + }); + }); + it("branches away from an unresolved trailing user turn before building BTW context", async () => { getLeafEntryMock.mockReturnValue({ type: "message", @@ -487,4 +642,77 @@ describe("runBtwSideQuestion", () => { ); }); }); + + it("excludes tool results from BTW context to avoid replaying raw tool output", async () => { + getActiveEmbeddedRunSnapshotMock.mockReturnValue({ + transcriptLeafId: "assistant-1", + messages: [ + { + role: "user", + content: [{ type: "text", text: "seed" }], + timestamp: 1, + }, + { + role: "toolResult", + content: [{ type: "text", text: "sensitive tool output" }], + details: { raw: "secret" }, + timestamp: 2, + }, + { + role: "assistant", + content: [{ type: "text", text: "done" }], + timestamp: 3, + }, + ], + }); + streamSimpleMock.mockReturnValue( + makeAsyncEvents([ + { + type: "done", + reason: "stop", + message: { + role: "assistant", + content: [{ type: "text", text: "323" }], + provider: "anthropic", + api: "anthropic-messages", + model: "claude-sonnet-4-5", + stopReason: "stop", + usage: { + input: 1, + output: 2, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 3, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + timestamp: Date.now(), + }, + }, + ]), + ); + + await runBtwSideQuestion({ + cfg: {} as never, + agentDir: "/tmp/agent", + provider: "anthropic", + model: "claude-sonnet-4-5", + question: "What is 17 * 19?", + sessionEntry: createSessionEntry(), + resolvedReasoningLevel: "off", + opts: {}, + isNewSession: false, + }); + + const [, context] = streamSimpleMock.mock.calls[0] ?? []; + expect(context).toMatchObject({ + messages: [ + expect.objectContaining({ role: "user" }), + expect.objectContaining({ role: "assistant" }), + expect.objectContaining({ role: "user" }), + ], + }); + expect((context as { messages?: Array<{ role?: string }> }).messages).not.toEqual( + expect.arrayContaining([expect.objectContaining({ role: "toolResult" })]), + ); + }); }); diff --git a/src/agents/btw.ts b/src/agents/btw.ts index 52fd4d24a46..158272d0f38 100644 --- a/src/agents/btw.ts +++ b/src/agents/btw.ts @@ -16,6 +16,8 @@ import { type SessionEntry, } from "../config/sessions.js"; import { diagnosticLogger as diag } from "../logging/diagnostic.js"; +import { appendSessionSideResult } from "../sessions/side-results.js"; +import { stripToolResultDetails } from "./session-transcript-repair.js"; import { resolveSessionAuthProfileOverride } from "./auth-profiles/session-override.js"; import { getApiKeyForModel, requireApiKey } from "./model-auth.js"; import { ensureOpenClawModelsJson } from "./models-config.js"; @@ -60,6 +62,20 @@ type BtwCustomEntryData = { usage?: unknown; }; +type BtwSideResultData = { + timestamp: number; + question: string; + answer: string; + provider: string; + model: string; + thinkingLevel: ThinkLevel | "off"; + reasoningLevel: ReasoningLevel; + sessionKey?: string; + authProfileId?: string; + authProfileIdSource?: "auto" | "user"; + usage?: unknown; +}; + async function appendBtwCustomEntry(params: { sessionFile: string; timeoutMs: number; @@ -78,6 +94,26 @@ async function appendBtwCustomEntry(params: { } } +function appendBtwSideResult(params: { sessionFile: string; entry: BtwSideResultData }) { + appendSessionSideResult({ + transcriptPath: params.sessionFile, + result: { + kind: "btw", + question: params.entry.question, + text: params.entry.answer, + ts: params.entry.timestamp, + provider: params.entry.provider, + model: params.entry.model, + thinkingLevel: params.entry.thinkingLevel, + reasoningLevel: params.entry.reasoningLevel, + sessionKey: params.entry.sessionKey, + authProfileId: params.entry.authProfileId, + authProfileIdSource: params.entry.authProfileIdSource, + usage: params.entry.usage, + }, + }); +} + function isSessionLockError(error: unknown): boolean { const message = error instanceof Error ? error.message : String(error); return message.includes("session file locked"); @@ -117,14 +153,39 @@ function collectThinkingContent(content: Array<{ type?: string; thinking?: strin .join(""); } +function buildBtwSystemPrompt(): string { + return [ + "You are answering an ephemeral /btw side question about the current conversation.", + "Use the conversation only as background context.", + "Answer only the side question in the last user message.", + "Do not continue, resume, or complete any unfinished task from the conversation.", + "Do not emit tool calls, pseudo-tool calls, shell commands, file writes, patches, or code unless the side question explicitly asks for them.", + "Do not say you will continue the main task after answering.", + "If the question can be answered briefly, answer briefly.", + ].join("\n"); +} + +function buildBtwQuestionPrompt(question: string): string { + return [ + "Answer this side question only.", + "Ignore any unfinished task in the conversation while answering it.", + "", + "", + question.trim(), + "", + ].join("\n"); +} + function toSimpleContextMessages(messages: unknown[]): Message[] { - return messages.filter((message): message is Message => { + const contextMessages = messages.filter((message): message is Message => { if (!message || typeof message !== "object") { return false; } const role = (message as { role?: unknown }).role; - return role === "user" || role === "assistant" || role === "toolResult"; + return role === "user" || role === "assistant"; }); + return stripToolResultDetails(contextMessages as Parameters[0]) as + Message[]; } function resolveSimpleThinkingLevel(level?: ThinkLevel): SimpleThinkingLevel | undefined { @@ -147,7 +208,10 @@ function resolveSessionTranscriptPath(params: { storePath: params.storePath, }); return resolveSessionFilePath(params.sessionId, params.sessionEntry, pathOpts); - } catch { + } catch (error) { + diag.debug( + `resolveSessionTranscriptPath failed: sessionId=${params.sessionId} err=${String(error)}`, + ); return undefined; } } @@ -235,7 +299,10 @@ export async function runBtwSideQuestion( const sessionManager = SessionManager.open(sessionFile) as SessionManagerLike; const activeRunSnapshot = getActiveEmbeddedRunSnapshot(sessionId); - if (activeRunSnapshot) { + let messages: Message[] = []; + if (Array.isArray(activeRunSnapshot?.messages) && activeRunSnapshot.messages.length > 0) { + messages = toSimpleContextMessages(activeRunSnapshot.messages); + } else if (activeRunSnapshot) { if (activeRunSnapshot.transcriptLeafId && sessionManager.branch) { sessionManager.branch(activeRunSnapshot.transcriptLeafId); } else { @@ -251,10 +318,12 @@ export async function runBtwSideQuestion( } } } - const sessionContext = sessionManager.buildSessionContext(); - const messages = toSimpleContextMessages( - Array.isArray(sessionContext.messages) ? sessionContext.messages : [], - ); + if (messages.length === 0) { + const sessionContext = sessionManager.buildSessionContext(); + messages = toSimpleContextMessages( + Array.isArray(sessionContext.messages) ? sessionContext.messages : [], + ); + } if (messages.length === 0) { throw new Error("No active session context."); } @@ -304,11 +373,12 @@ export async function runBtwSideQuestion( const stream = streamSimple( model, { + systemPrompt: buildBtwSystemPrompt(), messages: [ ...messages, { role: "user", - content: [{ type: "text", text: params.question }], + content: [{ type: "text", text: buildBtwQuestionPrompt(params.question) }], timestamp: Date.now(), }, ], @@ -400,6 +470,16 @@ export async function runBtwSideQuestion( usage: finalMessage?.usage, } satisfies BtwCustomEntryData; + try { + appendBtwSideResult({ + sessionFile, + entry: customEntry, + }); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + diag.warn(`btw side-result persistence skipped: sessionId=${sessionId} err=${message}`); + } + try { await appendBtwCustomEntry({ sessionFile, diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 450f8b60d1d..cb00a5ff6ff 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -2377,10 +2377,8 @@ export async function runEmbeddedAttempt( `runId=${params.runId} sessionId=${params.sessionId}`, ); } - updateActiveEmbeddedRunSnapshot(params.sessionId, { - transcriptLeafId: - (sessionManager.getLeafEntry() as { id?: string } | null | undefined)?.id ?? null, - }); + const transcriptLeafId = + (sessionManager.getLeafEntry() as { id?: string } | null | undefined)?.id ?? null; try { // Idempotent cleanup for legacy sessions with persisted image payloads. @@ -2459,6 +2457,19 @@ export async function runEmbeddedAttempt( }); } + const btwSnapshotMessages = [ + ...activeSession.messages, + { + role: "user", + content: [{ type: "text", text: effectivePrompt }], + timestamp: Date.now(), + }, + ]; + updateActiveEmbeddedRunSnapshot(params.sessionId, { + transcriptLeafId, + messages: btwSnapshotMessages, + }); + // Only pass images option if there are actually images to pass // This avoids potential issues with models that don't expect the images parameter if (imageResult.images.length > 0) { diff --git a/src/agents/pi-embedded-runner/runs.test.ts b/src/agents/pi-embedded-runner/runs.test.ts index 21a36a183d8..b7e67cd6083 100644 --- a/src/agents/pi-embedded-runner/runs.test.ts +++ b/src/agents/pi-embedded-runner/runs.test.ts @@ -151,9 +151,11 @@ describe("pi-embedded runner run registry", () => { setActiveEmbeddedRun("session-snapshot", handle); updateActiveEmbeddedRunSnapshot("session-snapshot", { transcriptLeafId: "assistant-1", + messages: [{ role: "user", content: [{ type: "text", text: "hello" }], timestamp: 1 }], }); expect(getActiveEmbeddedRunSnapshot("session-snapshot")).toEqual({ transcriptLeafId: "assistant-1", + messages: [{ role: "user", content: [{ type: "text", text: "hello" }], timestamp: 1 }], }); clearActiveEmbeddedRun("session-snapshot", handle); diff --git a/src/agents/pi-embedded-runner/runs.ts b/src/agents/pi-embedded-runner/runs.ts index ad853108dad..9ce76f9f5e0 100644 --- a/src/agents/pi-embedded-runner/runs.ts +++ b/src/agents/pi-embedded-runner/runs.ts @@ -14,6 +14,7 @@ type EmbeddedPiQueueHandle = { export type ActiveEmbeddedRunSnapshot = { transcriptLeafId: string | null; + messages?: unknown[]; }; type EmbeddedRunWaiter = { diff --git a/src/auto-reply/reply/commands-btw.test.ts b/src/auto-reply/reply/commands-btw.test.ts index 53e15667f91..ecc76020a5c 100644 --- a/src/auto-reply/reply/commands-btw.test.ts +++ b/src/auto-reply/reply/commands-btw.test.ts @@ -80,7 +80,7 @@ describe("handleBtwCommand", () => { ); expect(result).toEqual({ shouldContinue: false, - reply: { text: "snapshot answer" }, + reply: { text: "snapshot answer", btw: { question: "what changed?" } }, }); }); @@ -104,7 +104,7 @@ describe("handleBtwCommand", () => { ); expect(result).toEqual({ shouldContinue: false, - reply: { text: "nothing important" }, + reply: { text: "nothing important", btw: { question: "what changed?" } }, }); }); }); diff --git a/src/auto-reply/reply/commands-btw.ts b/src/auto-reply/reply/commands-btw.ts index 2d4f8bf6a31..957435ed2e7 100644 --- a/src/auto-reply/reply/commands-btw.ts +++ b/src/auto-reply/reply/commands-btw.ts @@ -61,7 +61,7 @@ export const handleBtwCommand: CommandHandler = async (params, allowTextCommands }); return { shouldContinue: false, - reply, + reply: reply ? { ...reply, btw: { question } } : reply, }; } catch (error) { const message = error instanceof Error ? error.message.trim() : ""; @@ -69,6 +69,8 @@ export const handleBtwCommand: CommandHandler = async (params, allowTextCommands shouldContinue: false, reply: { text: `⚠️ /btw failed${message ? `: ${message}` : "."}`, + btw: { question }, + isError: true, }, }; } diff --git a/src/auto-reply/reply/commands-types.ts b/src/auto-reply/reply/commands-types.ts index 79da67ca8d8..effb22ec6fa 100644 --- a/src/auto-reply/reply/commands-types.ts +++ b/src/auto-reply/reply/commands-types.ts @@ -1,4 +1,5 @@ import type { SkillCommandSpec } from "../../agents/skills.js"; +import type { BlockReplyChunking } from "../../agents/pi-embedded-block-chunker.js"; import type { ChannelId } from "../../channels/plugins/types.js"; import type { OpenClawConfig } from "../../config/config.js"; import type { SessionEntry, SessionScope } from "../../config/sessions.js"; @@ -50,12 +51,7 @@ export type HandleCommandsParams = { resolvedVerboseLevel: VerboseLevel; resolvedReasoningLevel: ReasoningLevel; resolvedElevatedLevel?: ElevatedLevel; - blockReplyChunking?: { - minChars: number; - maxChars: number; - breakPreference: "paragraph" | "newline" | "sentence"; - flushOnParagraph?: boolean; - }; + blockReplyChunking?: BlockReplyChunking; resolvedBlockStreamingBreak?: "text_end" | "message_end"; resolveDefaultThinkingLevel: () => Promise; provider: string; diff --git a/src/auto-reply/reply/get-reply-inline-actions.ts b/src/auto-reply/reply/get-reply-inline-actions.ts index 91bcd68eb56..f059b100e32 100644 --- a/src/auto-reply/reply/get-reply-inline-actions.ts +++ b/src/auto-reply/reply/get-reply-inline-actions.ts @@ -1,4 +1,5 @@ import { collectTextContentBlocks } from "../../agents/content-blocks.js"; +import type { BlockReplyChunking } from "../../agents/pi-embedded-block-chunker.js"; import { createOpenClawTools } from "../../agents/openclaw-tools.js"; import type { SkillCommandSpec } from "../../agents/skills.js"; import { applyOwnerOnlyToolPolicy } from "../../agents/tool-policy.js"; @@ -114,12 +115,7 @@ export async function handleInlineActions(params: { resolvedVerboseLevel: VerboseLevel | undefined; resolvedReasoningLevel: ReasoningLevel; resolvedElevatedLevel: ElevatedLevel; - blockReplyChunking?: { - minChars: number; - maxChars: number; - breakPreference: "paragraph" | "newline" | "sentence"; - flushOnParagraph?: boolean; - }; + blockReplyChunking?: BlockReplyChunking; resolvedBlockStreamingBreak?: "text_end" | "message_end"; resolveDefaultThinkingLevel: Awaited< ReturnType diff --git a/src/auto-reply/reply/reply-payloads.ts b/src/auto-reply/reply/reply-payloads.ts index 5a20d4ba950..e46ae422a9f 100644 --- a/src/auto-reply/reply/reply-payloads.ts +++ b/src/auto-reply/reply/reply-payloads.ts @@ -10,6 +10,17 @@ import type { ReplyPayload } from "../types.js"; import { extractReplyToTag } from "./reply-tags.js"; import { createReplyToModeFilterForChannel } from "./reply-threading.js"; +export function formatBtwTextForExternalDelivery(payload: ReplyPayload): string | undefined { + const text = payload.text?.trim(); + if (!text) { + return payload.text; + } + if (!payload.btw?.question?.trim()) { + return payload.text; + } + return text.startsWith("BTW:") ? text : `BTW: ${text}`; +} + function resolveReplyThreadingForPayload(params: { payload: ReplyPayload; implicitReplyToId?: string; diff --git a/src/auto-reply/reply/route-reply.test.ts b/src/auto-reply/reply/route-reply.test.ts index b0a2d393738..a656c5a0084 100644 --- a/src/auto-reply/reply/route-reply.test.ts +++ b/src/auto-reply/reply/route-reply.test.ts @@ -294,6 +294,36 @@ describe("routeReply", () => { ); }); + it("prefixes BTW replies on routed sends", async () => { + mocks.sendMessageSlack.mockClear(); + await routeReply({ + payload: { text: "323", btw: { question: "what is 17 * 19?" } }, + channel: "slack", + to: "channel:C123", + cfg: {} as never, + }); + expect(mocks.sendMessageSlack).toHaveBeenCalledWith( + "channel:C123", + "BTW: 323", + expect.any(Object), + ); + }); + + it("prefixes BTW replies on routed discord sends", async () => { + mocks.sendMessageDiscord.mockClear(); + await routeReply({ + payload: { text: "323", btw: { question: "what is 17 * 19?" } }, + channel: "discord", + to: "channel:123456", + cfg: {} as never, + }); + expect(mocks.sendMessageDiscord).toHaveBeenCalledWith( + "channel:123456", + "BTW: 323", + expect.any(Object), + ); + }); + it("passes replyToId to Telegram sends", async () => { mocks.sendMessageTelegram.mockClear(); await routeReply({ diff --git a/src/auto-reply/reply/route-reply.ts b/src/auto-reply/reply/route-reply.ts index a6f863d7d18..a698b53cf9f 100644 --- a/src/auto-reply/reply/route-reply.ts +++ b/src/auto-reply/reply/route-reply.ts @@ -18,7 +18,10 @@ import { INTERNAL_MESSAGE_CHANNEL, normalizeMessageChannel } from "../../utils/m import type { OriginatingChannelType } from "../templating.js"; import type { ReplyPayload } from "../types.js"; import { normalizeReplyPayload } from "./normalize-reply.js"; -import { shouldSuppressReasoningPayload } from "./reply-payloads.js"; +import { + formatBtwTextForExternalDelivery, + shouldSuppressReasoningPayload, +} from "./reply-payloads.js"; let deliverRuntimePromise: Promise< typeof import("../../infra/outbound/deliver-runtime.js") @@ -102,24 +105,28 @@ export async function routeReply(params: RouteReplyParams): Promise entry.isFile() && isPrimarySessionTranscriptFileName(entry.name)) + .filter( + (entry) => + entry.isFile() && + (isPrimarySessionTranscriptFileName(entry.name) || + isSessionSideResultsArtifactName(entry.name)), + ) .map((entry) => path.resolve(path.join(sessionsDir, entry.name))) .filter((filePath) => !referencedTranscriptPaths.has(filePath)); if (orphanTranscriptPaths.length > 0) { diff --git a/src/commands/sessions-cleanup.ts b/src/commands/sessions-cleanup.ts index a0b1d072386..9f5b12f56b1 100644 --- a/src/commands/sessions-cleanup.ts +++ b/src/commands/sessions-cleanup.ts @@ -13,6 +13,7 @@ import { type SessionMaintenanceApplyReport, } from "../config/sessions.js"; import type { RuntimeEnv } from "../runtime.js"; +import { resolveSessionSideResultsPathFromTranscript } from "../sessions/side-results.js"; import { isRich, theme } from "../terminal/theme.js"; import { resolveSessionStoreTargetsOrExit, @@ -147,6 +148,11 @@ function pruneMissingTranscriptEntries(params: { } const transcriptPath = resolveSessionFilePath(entry.sessionId, entry, sessionPathOpts); if (!fs.existsSync(transcriptPath)) { + try { + fs.rmSync(resolveSessionSideResultsPathFromTranscript(transcriptPath), { force: true }); + } catch { + // Best-effort cleanup for orphan BTW sidecars when the primary transcript is gone. + } delete params.store[key]; removed += 1; params.onPruned?.(key); diff --git a/src/config/sessions/artifacts.test.ts b/src/config/sessions/artifacts.test.ts index b8c438a9eca..679578563ab 100644 --- a/src/config/sessions/artifacts.test.ts +++ b/src/config/sessions/artifacts.test.ts @@ -3,6 +3,7 @@ import { formatSessionArchiveTimestamp, isPrimarySessionTranscriptFileName, isSessionArchiveArtifactName, + isSessionSideResultsArtifactName, parseSessionArchiveTimestamp, } from "./artifacts.js"; @@ -19,12 +20,21 @@ describe("session artifact helpers", () => { it("classifies primary transcript files", () => { expect(isPrimarySessionTranscriptFileName("abc.jsonl")).toBe(true); expect(isPrimarySessionTranscriptFileName("keep.deleted.keep.jsonl")).toBe(true); + expect(isPrimarySessionTranscriptFileName("abc.side-results.jsonl")).toBe(false); expect(isPrimarySessionTranscriptFileName("abc.jsonl.deleted.2026-01-01T00-00-00.000Z")).toBe( false, ); expect(isPrimarySessionTranscriptFileName("sessions.json")).toBe(false); }); + it("classifies BTW side-result artifacts separately from transcripts", () => { + expect(isSessionSideResultsArtifactName("abc.side-results.jsonl")).toBe(true); + expect( + isSessionSideResultsArtifactName("abc.side-results.jsonl.deleted.2026-01-01T00-00-00.000Z"), + ).toBe(true); + expect(isSessionSideResultsArtifactName("abc.jsonl")).toBe(false); + }); + it("formats and parses archive timestamps", () => { const now = Date.parse("2026-02-23T12:34:56.000Z"); const stamp = formatSessionArchiveTimestamp(now); diff --git a/src/config/sessions/artifacts.ts b/src/config/sessions/artifacts.ts index c851f7967fc..d561bbc0196 100644 --- a/src/config/sessions/artifacts.ts +++ b/src/config/sessions/artifacts.ts @@ -24,6 +24,10 @@ export function isSessionArchiveArtifactName(fileName: string): boolean { ); } +export function isSessionSideResultsArtifactName(fileName: string): boolean { + return fileName.endsWith(".side-results.jsonl") || fileName.includes(".side-results.jsonl."); +} + export function isPrimarySessionTranscriptFileName(fileName: string): boolean { if (fileName === "sessions.json") { return false; @@ -31,6 +35,9 @@ export function isPrimarySessionTranscriptFileName(fileName: string): boolean { if (!fileName.endsWith(".jsonl")) { return false; } + if (isSessionSideResultsArtifactName(fileName)) { + return false; + } return !isSessionArchiveArtifactName(fileName); } diff --git a/src/config/sessions/disk-budget.ts b/src/config/sessions/disk-budget.ts index 078acd904bf..4309ca6f7f3 100644 --- a/src/config/sessions/disk-budget.ts +++ b/src/config/sessions/disk-budget.ts @@ -1,6 +1,11 @@ import fs from "node:fs"; import path from "node:path"; -import { isPrimarySessionTranscriptFileName, isSessionArchiveArtifactName } from "./artifacts.js"; +import { resolveSessionSideResultsPathFromTranscript } from "../../sessions/side-results.js"; +import { + isPrimarySessionTranscriptFileName, + isSessionArchiveArtifactName, + isSessionSideResultsArtifactName, +} from "./artifacts.js"; import { resolveSessionFilePath } from "./paths.js"; import type { SessionEntry } from "./types.js"; @@ -123,6 +128,9 @@ function resolveReferencedSessionTranscriptPaths(params: { }); if (resolved) { referenced.add(canonicalizePathForComparison(resolved)); + referenced.add( + canonicalizePathForComparison(resolveSessionSideResultsPathFromTranscript(resolved)), + ); } } return referenced; @@ -255,7 +263,9 @@ export async function enforceSessionDiskBudget(params: { .filter( (file) => isSessionArchiveArtifactName(file.name) || - (isPrimarySessionTranscriptFileName(file.name) && !referencedPaths.has(file.canonicalPath)), + ((isPrimarySessionTranscriptFileName(file.name) || + isSessionSideResultsArtifactName(file.name)) && + !referencedPaths.has(file.canonicalPath)), ) .toSorted((a, b) => a.mtimeMs - b.mtimeMs); for (const file of removableFileQueue) { @@ -336,6 +346,18 @@ export async function enforceSessionDiskBudget(params: { total -= deletedBytes; freedBytes += deletedBytes; removedFiles += 1; + const sideResultsPath = resolveSessionSideResultsPathFromTranscript(transcriptPath); + const deletedSideResultsBytes = await removeFileForBudget({ + filePath: sideResultsPath, + dryRun, + fileSizesByPath, + simulatedRemovedPaths, + }); + if (deletedSideResultsBytes > 0) { + total -= deletedSideResultsBytes; + freedBytes += deletedSideResultsBytes; + removedFiles += 1; + } } } diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 3b506c052c0..b7a408e8812 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -8,6 +8,7 @@ import { dispatchInboundMessage } from "../../auto-reply/dispatch.js"; import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js"; import type { MsgContext } from "../../auto-reply/templating.js"; import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js"; +import type { ReplyPayload } from "../../auto-reply/types.js"; import { createReplyPrefixOptions } from "../../channels/reply-prefix.js"; import { resolveSessionFilePath } from "../../config/sessions.js"; import { jsonUtf8Bytes } from "../../infra/json-utf8-bytes.js"; @@ -55,6 +56,7 @@ import { capArrayByJsonBytes, loadSessionEntry, readSessionMessages, + readSessionSideResults, resolveSessionModelRef, } from "../session-utils.js"; import { formatForLog } from "../ws-log.js"; @@ -130,6 +132,16 @@ type ChatSendOriginatingRoute = { explicitDeliverRoute: boolean; }; +type SideResultPayload = { + kind: "btw"; + runId: string; + sessionKey: string; + question: string; + text: string; + isError?: boolean; + ts: number; +}; + function resolveChatSendOriginatingRoute(params: { client?: { mode?: string | null; id?: string | null } | null; deliver?: boolean; @@ -900,6 +912,34 @@ function broadcastChatFinal(params: { params.context.agentRunSeq.delete(params.runId); } +function isBtwReplyPayload(payload: ReplyPayload | undefined): payload is ReplyPayload & { + btw: { question: string }; + text: string; +} { + return ( + typeof payload?.btw?.question === "string" && + payload.btw.question.trim().length > 0 && + typeof payload.text === "string" && + payload.text.trim().length > 0 + ); +} + +function broadcastSideResult(params: { + context: Pick; + payload: SideResultPayload; +}) { + const seq = nextChatSeq({ agentRunSeq: params.context.agentRunSeq }, params.payload.runId); + params.context.broadcast("chat.side_result", { + ...params.payload, + seq, + }); + params.context.nodeSendToSession(params.payload.sessionKey, "chat.side_result", { + ...params.payload, + seq, + }); + params.context.agentRunSeq.delete(params.payload.runId); +} + function broadcastChatError(params: { context: Pick; runId: string; @@ -940,6 +980,10 @@ export const chatHandlers: GatewayRequestHandlers = { const sessionId = entry?.sessionId; const rawMessages = sessionId && storePath ? readSessionMessages(sessionId, storePath, entry?.sessionFile) : []; + const sideResults = + sessionId && storePath + ? readSessionSideResults(sessionId, storePath, entry?.sessionFile) + : []; const hardMax = 1000; const defaultLimit = 200; const requested = typeof limit === "number" ? limit : defaultLimit; @@ -979,6 +1023,7 @@ export const chatHandlers: GatewayRequestHandlers = { sessionKey, sessionId, messages: bounded.messages, + sideResults, thinkingLevel, fastMode: entry?.fastMode, verboseLevel, @@ -1284,7 +1329,7 @@ export const chatHandlers: GatewayRequestHandlers = { agentId, channel: INTERNAL_MESSAGE_CHANNEL, }); - const finalReplyParts: string[] = []; + const finalReplies: ReplyPayload[] = []; const dispatcher = createReplyDispatcher({ ...prefixOptions, onError: (err) => { @@ -1294,11 +1339,7 @@ export const chatHandlers: GatewayRequestHandlers = { if (info.kind !== "final") { return; } - const text = payload.text?.trim() ?? ""; - if (!text) { - return; - } - finalReplyParts.push(text); + finalReplies.push(payload); }, }); @@ -1335,48 +1376,64 @@ export const chatHandlers: GatewayRequestHandlers = { }) .then(() => { if (!agentRunStarted) { - const combinedReply = finalReplyParts - .map((part) => part.trim()) - .filter(Boolean) - .join("\n\n") - .trim(); - let message: Record | undefined; - if (combinedReply) { - const { storePath: latestStorePath, entry: latestEntry } = - loadSessionEntry(sessionKey); - const sessionId = latestEntry?.sessionId ?? entry?.sessionId ?? clientRunId; - const appended = appendAssistantTranscriptMessage({ - message: combinedReply, - sessionId, - storePath: latestStorePath, - sessionFile: latestEntry?.sessionFile, - agentId, - createIfMissing: true, + const btwReply = finalReplies.length === 1 ? finalReplies[0] : undefined; + if (isBtwReplyPayload(btwReply)) { + broadcastSideResult({ + context, + payload: { + kind: "btw", + runId: clientRunId, + sessionKey: rawSessionKey, + question: btwReply.btw.question.trim(), + text: btwReply.text.trim(), + isError: btwReply.isError, + ts: Date.now(), + }, }); - if (appended.ok) { - message = appended.message; - } else { - context.logGateway.warn( - `webchat transcript append failed: ${appended.error ?? "unknown error"}`, - ); - const now = Date.now(); - message = { - role: "assistant", - content: [{ type: "text", text: combinedReply }], - timestamp: now, - // Keep this compatible with Pi stopReason enums even though this message isn't - // persisted to the transcript due to the append failure. - stopReason: "stop", - usage: { input: 0, output: 0, totalTokens: 0 }, - }; + } else { + const combinedReply = finalReplies + .map((part) => part.text?.trim() ?? "") + .filter(Boolean) + .join("\n\n") + .trim(); + let message: Record | undefined; + if (combinedReply) { + const { storePath: latestStorePath, entry: latestEntry } = + loadSessionEntry(sessionKey); + const sessionId = latestEntry?.sessionId ?? entry?.sessionId ?? clientRunId; + const appended = appendAssistantTranscriptMessage({ + message: combinedReply, + sessionId, + storePath: latestStorePath, + sessionFile: latestEntry?.sessionFile, + agentId, + createIfMissing: true, + }); + if (appended.ok) { + message = appended.message; + } else { + context.logGateway.warn( + `webchat transcript append failed: ${appended.error ?? "unknown error"}`, + ); + const now = Date.now(); + message = { + role: "assistant", + content: [{ type: "text", text: combinedReply }], + timestamp: now, + // Keep this compatible with Pi stopReason enums even though this message isn't + // persisted to the transcript due to the append failure. + stopReason: "stop", + usage: { input: 0, output: 0, totalTokens: 0 }, + }; + } } + broadcastChatFinal({ + context, + runId: clientRunId, + sessionKey: rawSessionKey, + message, + }); } - broadcastChatFinal({ - context, - runId: clientRunId, - sessionKey: rawSessionKey, - message, - }); } setGatewayDedupeEntry({ dedupe: context.dedupe, diff --git a/src/gateway/server.chat.gateway-server-chat.test.ts b/src/gateway/server.chat.gateway-server-chat.test.ts index 9ecd16e35d3..666cbf47cc2 100644 --- a/src/gateway/server.chat.gateway-server-chat.test.ts +++ b/src/gateway/server.chat.gateway-server-chat.test.ts @@ -4,6 +4,7 @@ import path from "node:path"; import { describe, expect, test, vi } from "vitest"; import { WebSocket } from "ws"; import { emitAgentEvent, registerAgentRunContext } from "../infra/agent-events.js"; +import { resolveSessionSideResultsPathFromTranscript } from "../sessions/side-results.js"; import { extractFirstTextBlock } from "../shared/chat-message-content.js"; import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js"; import { @@ -497,6 +498,121 @@ describe("gateway server chat", () => { }); }); + test("routes /btw replies through side-result events without transcript injection", async () => { + await withMainSessionStore(async () => { + const replyMock = vi.mocked(getReplyFromConfig); + replyMock.mockResolvedValueOnce({ + text: "323", + btw: { question: "what is 17 * 19?" }, + }); + const sideResultPromise = onceMessage( + ws, + (o) => + o.type === "event" && + o.event === "chat.side_result" && + o.payload?.kind === "btw" && + o.payload?.runId === "idem-btw-1", + 8000, + ); + + const res = await rpcReq(ws, "chat.send", { + sessionKey: "main", + message: "/btw what is 17 * 19?", + idempotencyKey: "idem-btw-1", + }); + + expect(res.ok).toBe(true); + const sideResult = await sideResultPromise; + expect(sideResult.payload).toMatchObject({ + kind: "btw", + runId: "idem-btw-1", + sessionKey: "main", + question: "what is 17 * 19?", + text: "323", + }); + + const historyRes = await rpcReq<{ messages?: unknown[] }>(ws, "chat.history", { + sessionKey: "main", + }); + expect(historyRes.ok).toBe(true); + expect(historyRes.payload?.messages ?? []).toEqual([]); + }); + }); + + test("chat.history returns BTW side results separately from normal messages", async () => { + await withMainSessionStore(async (dir) => { + const transcriptPath = path.join(dir, "sess-main.jsonl"); + await fs.writeFile( + transcriptPath, + [ + JSON.stringify({ + type: "custom", + customType: "openclaw:btw", + data: { + timestamp: 123, + question: "what changed?", + answer: "nothing important", + }, + }), + ].join("\n"), + "utf-8", + ); + + const historyRes = await rpcReq<{ messages?: unknown[]; sideResults?: unknown[] }>( + ws, + "chat.history", + { + sessionKey: "main", + }, + ); + expect(historyRes.ok).toBe(true); + expect(historyRes.payload?.messages ?? []).toEqual([]); + expect(historyRes.payload?.sideResults ?? []).toEqual([ + { + kind: "btw", + question: "what changed?", + text: "nothing important", + ts: 123, + }, + ]); + }); + }); + + test("chat.history replays BTW side results from the sidecar file after transcript compaction", async () => { + await withMainSessionStore(async (dir) => { + const transcriptPath = path.join(dir, "sess-main.jsonl"); + await fs.writeFile(transcriptPath, JSON.stringify({ type: "session", version: 1 }) + "\n"); + await fs.writeFile( + resolveSessionSideResultsPathFromTranscript(transcriptPath), + JSON.stringify({ + kind: "btw", + question: "what changed?", + text: "still working", + ts: 456, + }) + "\n", + "utf-8", + ); + + const historyRes = await rpcReq<{ messages?: unknown[]; sideResults?: unknown[] }>( + ws, + "chat.history", + { + sessionKey: "main", + }, + ); + expect(historyRes.ok).toBe(true); + expect(historyRes.payload?.messages ?? []).toEqual([]); + expect(historyRes.payload?.sideResults ?? []).toEqual([ + { + kind: "btw", + question: "what changed?", + text: "still working", + ts: 456, + }, + ]); + }); + }); + test("chat.history hides assistant NO_REPLY-only entries and keeps mixed-content assistant entries", async () => { const historyMessages = await loadChatHistoryWithMessages(buildNoReplyHistoryFixture(true)); const roleAndText = historyMessages diff --git a/src/gateway/server.sessions.gateway-server-sessions-a.test.ts b/src/gateway/server.sessions.gateway-server-sessions-a.test.ts index 034020a61fe..bdf779c6929 100644 --- a/src/gateway/server.sessions.gateway-server-sessions-a.test.ts +++ b/src/gateway/server.sessions.gateway-server-sessions-a.test.ts @@ -4,6 +4,7 @@ import path from "node:path"; import { afterAll, beforeAll, beforeEach, describe, expect, test, vi } from "vitest"; import { WebSocket } from "ws"; import { DEFAULT_PROVIDER } from "../agents/defaults.js"; +import { resolveSessionSideResultsPathFromTranscript } from "../sessions/side-results.js"; import { GATEWAY_CLIENT_IDS, GATEWAY_CLIENT_MODES } from "./protocol/client-info.js"; import { startGatewayServerHarness, type GatewayServerHarness } from "./server.e2e-ws-harness.js"; import { createToolSummaryPreviewTranscriptLines } from "./session-preview.test-helpers.js"; @@ -752,6 +753,66 @@ describe("gateway server sessions", () => { ws.close(); }); + test("sessions.compact keeps BTW side results available via the sidecar file", async () => { + const { dir } = await createSessionStoreDir(); + const sessionId = "sess-compact-btw"; + const transcriptPath = path.join(dir, `${sessionId}.jsonl`); + await fs.writeFile( + transcriptPath, + [ + JSON.stringify({ type: "session", version: 1, id: sessionId }), + JSON.stringify({ message: { role: "user", content: "hello" } }), + JSON.stringify({ message: { role: "assistant", content: "hi" } }), + JSON.stringify({ + type: "custom", + customType: "openclaw:btw", + data: { timestamp: 123, question: "what changed?", answer: "nothing important" }, + }), + JSON.stringify({ message: { role: "user", content: "later" } }), + JSON.stringify({ message: { role: "assistant", content: "done" } }), + ].join("\n"), + "utf-8", + ); + await fs.writeFile( + resolveSessionSideResultsPathFromTranscript(transcriptPath), + JSON.stringify({ + kind: "btw", + question: "what changed?", + text: "nothing important", + ts: 123, + }) + "\n", + "utf-8", + ); + await writeSessionStore({ + entries: { + main: { sessionId, updatedAt: Date.now() }, + }, + }); + + const { ws } = await openClient(); + const compacted = await rpcReq<{ ok: true; compacted: boolean }>(ws, "sessions.compact", { + key: "main", + maxLines: 2, + }); + expect(compacted.ok).toBe(true); + expect(compacted.payload?.compacted).toBe(true); + + const history = await rpcReq<{ sideResults?: unknown[] }>(ws, "chat.history", { + sessionKey: "main", + }); + expect(history.ok).toBe(true); + expect(history.payload?.sideResults ?? []).toEqual([ + { + kind: "btw", + question: "what changed?", + text: "nothing important", + ts: 123, + }, + ]); + + ws.close(); + }); + test("sessions.delete rejects main and aborts active runs", async () => { const { dir } = await createSessionStoreDir(); await writeSingleLineSession(dir, "sess-main", "hello"); diff --git a/src/gateway/session-utils.fs.test.ts b/src/gateway/session-utils.fs.test.ts index 09ab7e2cda2..1160c52ee1e 100644 --- a/src/gateway/session-utils.fs.test.ts +++ b/src/gateway/session-utils.fs.test.ts @@ -2,12 +2,14 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; import { afterAll, afterEach, beforeAll, describe, expect, test, vi } from "vitest"; +import { resolveSessionSideResultsPathFromTranscript } from "../sessions/side-results.js"; import { createToolSummaryPreviewTranscriptLines } from "./session-preview.test-helpers.js"; import { archiveSessionTranscripts, readFirstUserMessageFromTranscript, readLastMessagePreviewFromTranscript, readSessionMessages, + readSessionSideResults, readSessionTitleFieldsFromTranscript, readSessionPreviewItemsFromTranscript, resolveSessionTranscriptCandidates, @@ -760,6 +762,32 @@ describe("archiveSessionTranscripts", () => { } }); + test("archives BTW side-result sidecars alongside transcripts", () => { + const sessionId = "sess-archive-side-results"; + const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`); + const sidecarPath = resolveSessionSideResultsPathFromTranscript(transcriptPath); + fs.writeFileSync(transcriptPath, '{"type":"session"}\n', "utf-8"); + fs.writeFileSync( + sidecarPath, + `${JSON.stringify({ kind: "btw", question: "q", text: "a", ts: 123 })}\n`, + "utf-8", + ); + + const archived = archiveSessionTranscripts({ + sessionId, + storePath, + reason: "reset", + }); + + expect(archived).toHaveLength(2); + expect(archived.some((entry) => entry.includes(`${sessionId}.jsonl.reset.`))).toBe(true); + expect(archived.some((entry) => entry.includes(`${sessionId}.side-results.jsonl.reset.`))).toBe( + true, + ); + expect(fs.existsSync(transcriptPath)).toBe(false); + expect(fs.existsSync(sidecarPath)).toBe(false); + }); + test("returns empty array when no transcript files exist", () => { const archived = archiveSessionTranscripts({ sessionId: "nonexistent-session", @@ -787,3 +815,77 @@ describe("archiveSessionTranscripts", () => { expect(fs.existsSync(transcriptPath)).toBe(false); }); }); + +describe("readSessionSideResults", () => { + let tmpDir: string; + let storePath: string; + + registerTempSessionStore("openclaw-side-results-test-", (nextTmpDir, nextStorePath) => { + tmpDir = nextTmpDir; + storePath = nextStorePath; + }); + + test("reads BTW results from transcript custom entries", () => { + const sessionId = "sess-btw-transcript"; + writeTranscript(tmpDir, sessionId, [ + { type: "session", version: 1, id: sessionId }, + { + type: "custom", + customType: "openclaw:btw", + data: { + timestamp: 10, + question: "what changed?", + answer: "nothing", + }, + }, + ]); + + expect(readSessionSideResults(sessionId, storePath)).toEqual([ + { + kind: "btw", + question: "what changed?", + text: "nothing", + ts: 10, + }, + ]); + }); + + test("merges BTW sidecar records and dedupes transcript duplicates", () => { + const sessionId = "sess-btw-sidecar"; + const transcriptPath = writeTranscript(tmpDir, sessionId, [ + { type: "session", version: 1, id: sessionId }, + { + type: "custom", + customType: "openclaw:btw", + data: { + timestamp: 10, + question: "what changed?", + answer: "nothing", + }, + }, + ]); + fs.writeFileSync( + resolveSessionSideResultsPathFromTranscript(transcriptPath), + [ + JSON.stringify({ kind: "btw", question: "what changed?", text: "nothing", ts: 10 }), + JSON.stringify({ kind: "btw", question: "what now?", text: "keep going", ts: 20 }), + ].join("\n"), + "utf-8", + ); + + expect(readSessionSideResults(sessionId, storePath)).toEqual([ + { + kind: "btw", + question: "what changed?", + text: "nothing", + ts: 10, + }, + { + kind: "btw", + question: "what now?", + text: "keep going", + ts: 20, + }, + ]); + }); +}); diff --git a/src/gateway/session-utils.fs.ts b/src/gateway/session-utils.fs.ts index 3712c8c8272..52eb2ff40c0 100644 --- a/src/gateway/session-utils.fs.ts +++ b/src/gateway/session-utils.fs.ts @@ -12,6 +12,10 @@ import { import { resolveRequiredHomeDir } from "../infra/home-dir.js"; import { jsonUtf8Bytes } from "../infra/json-utf8-bytes.js"; import { hasInterSessionUserProvenance } from "../sessions/input-provenance.js"; +import { + resolveSessionSideResultsPathFromTranscript, + type PersistedSessionSideResult, +} from "../sessions/side-results.js"; import { stripInlineDirectiveTagsForDisplay } from "../utils/directive-tags.js"; import { extractToolCallNames, hasToolCall } from "../utils/transcript-tools.js"; import { stripEnvelope } from "./chat-sanitize.js"; @@ -22,6 +26,16 @@ type SessionTitleFields = { lastMessagePreview: string | null; }; +const BTW_CUSTOM_TYPE = "openclaw:btw"; + +export type SessionSideResult = { + kind: "btw"; + question: string; + text: string; + isError?: boolean; + ts?: number; +}; + type SessionTitleFieldsCacheEntry = SessionTitleFields & { mtimeMs: number; size: number; @@ -118,6 +132,126 @@ export function readSessionMessages( return messages; } +export function readSessionSideResults( + sessionId: string, + storePath: string | undefined, + sessionFile?: string, +): SessionSideResult[] { + const candidates = resolveSessionTranscriptCandidates(sessionId, storePath, sessionFile); + const resultsByKey = new Map(); + + const pushResult = (result: SessionSideResult | null) => { + if (!result) { + return; + } + const dedupeKey = [ + result.kind, + result.ts ?? "", + result.question, + result.text, + result.isError ? "1" : "0", + ].join("\u0000"); + if (!resultsByKey.has(dedupeKey)) { + resultsByKey.set(dedupeKey, result); + } + }; + + for (const filePath of candidates.filter((p) => fs.existsSync(p))) { + const lines = fs.readFileSync(filePath, "utf-8").split(/\r?\n/); + for (const line of lines) { + if (!line.trim()) { + continue; + } + try { + const parsed = JSON.parse(line) as { + type?: string; + customType?: string; + data?: { question?: unknown; answer?: unknown; timestamp?: unknown; isError?: unknown }; + }; + pushResult(parseTranscriptSideResult(parsed)); + } catch { + // ignore bad lines + } + } + } + + const sideResultPaths = new Set( + candidates.map((candidate) => resolveSessionSideResultsPathFromTranscript(candidate)), + ); + for (const filePath of sideResultPaths) { + if (!fs.existsSync(filePath)) { + continue; + } + const lines = fs.readFileSync(filePath, "utf-8").split(/\r?\n/); + for (const line of lines) { + if (!line.trim()) { + continue; + } + try { + const parsed = JSON.parse(line) as PersistedSessionSideResult; + pushResult(parsePersistedSideResult(parsed)); + } catch { + // ignore bad lines + } + } + } + + return [...resultsByKey.values()].toSorted((left, right) => { + const leftTs = left.ts ?? Number.MAX_SAFE_INTEGER; + const rightTs = right.ts ?? Number.MAX_SAFE_INTEGER; + if (leftTs !== rightTs) { + return leftTs - rightTs; + } + return left.question.localeCompare(right.question); + }); +} + +function parseTranscriptSideResult(parsed: { + type?: string; + customType?: string; + data?: { question?: unknown; answer?: unknown; timestamp?: unknown; isError?: unknown }; +}): SessionSideResult | null { + if (parsed.type !== "custom" || parsed.customType !== BTW_CUSTOM_TYPE) { + return null; + } + const question = typeof parsed.data?.question === "string" ? parsed.data.question.trim() : ""; + const text = typeof parsed.data?.answer === "string" ? parsed.data.answer.trim() : ""; + if (!question || !text) { + return null; + } + const timestamp = + typeof parsed.data?.timestamp === "number" && Number.isFinite(parsed.data.timestamp) + ? parsed.data.timestamp + : undefined; + const isError = parsed.data?.isError === true ? true : undefined; + return { + kind: "btw", + question, + text, + isError, + ts: timestamp, + }; +} + +function parsePersistedSideResult(parsed: PersistedSessionSideResult): SessionSideResult | null { + if (parsed.kind !== "btw") { + return null; + } + const question = typeof parsed.question === "string" ? parsed.question.trim() : ""; + const text = typeof parsed.text === "string" ? parsed.text.trim() : ""; + if (!question || !text) { + return null; + } + const timestamp = Number.isFinite(parsed.ts) ? parsed.ts : undefined; + return { + kind: "btw", + question, + text, + isError: parsed.isError === true ? true : undefined, + ts: timestamp, + }; +} + export function resolveSessionTranscriptCandidates( sessionId: string, storePath: string | undefined, @@ -202,12 +336,17 @@ export function archiveSessionTranscripts(opts: { opts.restrictToStoreDir && opts.storePath ? canonicalizePathForComparison(path.dirname(opts.storePath)) : null; - for (const candidate of resolveSessionTranscriptCandidates( + const transcriptCandidates = resolveSessionTranscriptCandidates( opts.sessionId, opts.storePath, opts.sessionFile, opts.agentId, - )) { + ); + const archiveCandidates = new Set(transcriptCandidates); + for (const candidate of transcriptCandidates) { + archiveCandidates.add(resolveSessionSideResultsPathFromTranscript(candidate)); + } + for (const candidate of archiveCandidates) { const candidatePath = canonicalizePathForComparison(candidate); if (storeDir) { const relative = path.relative(storeDir, candidatePath); diff --git a/src/gateway/session-utils.ts b/src/gateway/session-utils.ts index 00a2cb7747e..25957ac17ca 100644 --- a/src/gateway/session-utils.ts +++ b/src/gateway/session-utils.ts @@ -57,6 +57,7 @@ export { readSessionTitleFieldsFromTranscript, readSessionPreviewItemsFromTranscript, readSessionMessages, + readSessionSideResults, resolveSessionTranscriptCandidates, } from "./session-utils.fs.js"; export type { diff --git a/src/infra/outbound/deliver.test.ts b/src/infra/outbound/deliver.test.ts index 119e7b3c5d7..d30415d0bcb 100644 --- a/src/infra/outbound/deliver.test.ts +++ b/src/infra/outbound/deliver.test.ts @@ -1,6 +1,8 @@ import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import type { ChannelOutboundAdapter } from "../../channels/plugins/types.adapters.js"; +import { signalOutbound } from "../../channels/plugins/outbound/signal.js"; +import { telegramOutbound } from "../../channels/plugins/outbound/telegram.js"; +import { whatsappOutbound } from "../../channels/plugins/outbound/whatsapp.js"; import type { OpenClawConfig } from "../../config/config.js"; import { STATE_DIR } from "../../config/paths.js"; import { setActivePluginRegistry } from "../../plugins/runtime.js"; @@ -8,15 +10,64 @@ import { markdownToSignalTextChunks } from "../../signal/format.js"; import { createOutboundTestPlugin, createTestRegistry } from "../../test-utils/channel-plugins.js"; import { withEnvAsync } from "../../test-utils/env.js"; import { createIMessageTestPlugin } from "../../test-utils/imessage-test-plugin.js"; +import { createInternalHookEventPayload } from "../../test-utils/internal-hook-event-payload.js"; import { resolvePreferredOpenClawTmpDir } from "../tmp-openclaw-dir.js"; -import { - clearDeliverTestRegistry, - hookMocks, - resetDeliverTestState, - resetDeliverTestMocks, - runChunkedWhatsAppDelivery as runChunkedWhatsAppDeliveryHelper, - whatsappChunkConfig, -} from "./deliver.test-helpers.js"; + +const mocks = vi.hoisted(() => ({ + appendAssistantMessageToSessionTranscript: vi.fn(async () => ({ ok: true, sessionFile: "x" })), +})); +const hookMocks = vi.hoisted(() => ({ + runner: { + hasHooks: vi.fn(() => false), + runMessageSent: vi.fn(async () => {}), + }, +})); +const internalHookMocks = vi.hoisted(() => ({ + createInternalHookEvent: vi.fn(), + triggerInternalHook: vi.fn(async () => {}), +})); +const queueMocks = vi.hoisted(() => ({ + enqueueDelivery: vi.fn(async () => "mock-queue-id"), + ackDelivery: vi.fn(async () => {}), + failDelivery: vi.fn(async () => {}), +})); +const logMocks = vi.hoisted(() => ({ + warn: vi.fn(), +})); + +vi.mock("../../config/sessions.js", async () => { + const actual = await vi.importActual( + "../../config/sessions.js", + ); + return { + ...actual, + appendAssistantMessageToSessionTranscript: mocks.appendAssistantMessageToSessionTranscript, + }; +}); +vi.mock("../../plugins/hook-runner-global.js", () => ({ + getGlobalHookRunner: () => hookMocks.runner, +})); +vi.mock("../../hooks/internal-hooks.js", () => ({ + createInternalHookEvent: internalHookMocks.createInternalHookEvent, + triggerInternalHook: internalHookMocks.triggerInternalHook, +})); +vi.mock("./delivery-queue.js", () => ({ + enqueueDelivery: queueMocks.enqueueDelivery, + ackDelivery: queueMocks.ackDelivery, + failDelivery: queueMocks.failDelivery, +})); +vi.mock("../../logging/subsystem.js", () => ({ + createSubsystemLogger: () => { + const makeLogger = () => ({ + warn: logMocks.warn, + info: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + child: vi.fn(() => makeLogger()), + }); + return makeLogger(); + }, +})); const { deliverOutboundPayloads, normalizeOutboundPayloads } = await import("./deliver.js"); @@ -24,34 +75,14 @@ const telegramChunkConfig: OpenClawConfig = { channels: { telegram: { botToken: "tok-1", textChunkLimit: 2 } }, }; +const whatsappChunkConfig: OpenClawConfig = { + channels: { whatsapp: { textChunkLimit: 4000 } }, +}; + type DeliverOutboundArgs = Parameters[0]; type DeliverOutboundPayload = DeliverOutboundArgs["payloads"][number]; type DeliverSession = DeliverOutboundArgs["session"]; -function setMatrixTextOnlyPlugin(sendText: NonNullable) { - setActivePluginRegistry( - createTestRegistry([ - { - pluginId: "matrix", - source: "test", - plugin: createOutboundTestPlugin({ - id: "matrix", - outbound: { deliveryMode: "direct", sendText }, - }), - }, - ]), - ); -} - -async function deliverMatrixPayloads(payloads: DeliverOutboundPayload[]) { - return deliverOutboundPayloads({ - cfg: {}, - channel: "matrix", - to: "!room:1", - payloads, - }); -} - async function deliverWhatsAppPayload(params: { sendWhatsApp: NonNullable< NonNullable[0]["deps"]>["sendWhatsApp"] @@ -86,14 +117,96 @@ async function deliverTelegramPayload(params: { }); } +async function runChunkedWhatsAppDelivery(params?: { + mirror?: Parameters[0]["mirror"]; +}) { + const sendWhatsApp = vi + .fn() + .mockResolvedValueOnce({ messageId: "w1", toJid: "jid" }) + .mockResolvedValueOnce({ messageId: "w2", toJid: "jid" }); + const cfg: OpenClawConfig = { + channels: { whatsapp: { textChunkLimit: 2 } }, + }; + const results = await deliverOutboundPayloads({ + cfg, + channel: "whatsapp", + to: "+1555", + payloads: [{ text: "abcd" }], + deps: { sendWhatsApp }, + ...(params?.mirror ? { mirror: params.mirror } : {}), + }); + return { sendWhatsApp, results }; +} + +async function deliverSingleWhatsAppForHookTest(params?: { sessionKey?: string }) { + const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" }); + await deliverOutboundPayloads({ + cfg: whatsappChunkConfig, + channel: "whatsapp", + to: "+1555", + payloads: [{ text: "hello" }], + deps: { sendWhatsApp }, + ...(params?.sessionKey ? { session: { key: params.sessionKey } } : {}), + }); +} + +async function runBestEffortPartialFailureDelivery() { + const sendWhatsApp = vi + .fn() + .mockRejectedValueOnce(new Error("fail")) + .mockResolvedValueOnce({ messageId: "w2", toJid: "jid" }); + const onError = vi.fn(); + const cfg: OpenClawConfig = {}; + const results = await deliverOutboundPayloads({ + cfg, + channel: "whatsapp", + to: "+1555", + payloads: [{ text: "a" }, { text: "b" }], + deps: { sendWhatsApp }, + bestEffort: true, + onError, + }); + return { sendWhatsApp, onError, results }; +} + +function expectSuccessfulWhatsAppInternalHookPayload( + expected: Partial<{ + content: string; + messageId: string; + isGroup: boolean; + groupId: string; + }>, +) { + return expect.objectContaining({ + to: "+1555", + success: true, + channelId: "whatsapp", + conversationId: "+1555", + ...expected, + }); +} + describe("deliverOutboundPayloads", () => { beforeEach(() => { - resetDeliverTestState(); - resetDeliverTestMocks(); + setActivePluginRegistry(defaultRegistry); + hookMocks.runner.hasHooks.mockClear(); + hookMocks.runner.hasHooks.mockReturnValue(false); + hookMocks.runner.runMessageSent.mockClear(); + hookMocks.runner.runMessageSent.mockResolvedValue(undefined); + internalHookMocks.createInternalHookEvent.mockClear(); + internalHookMocks.createInternalHookEvent.mockImplementation(createInternalHookEventPayload); + internalHookMocks.triggerInternalHook.mockClear(); + queueMocks.enqueueDelivery.mockClear(); + queueMocks.enqueueDelivery.mockResolvedValue("mock-queue-id"); + queueMocks.ackDelivery.mockClear(); + queueMocks.ackDelivery.mockResolvedValue(undefined); + queueMocks.failDelivery.mockClear(); + queueMocks.failDelivery.mockResolvedValue(undefined); + logMocks.warn.mockClear(); }); afterEach(() => { - clearDeliverTestRegistry(); + setActivePluginRegistry(emptyRegistry); }); it("chunks telegram markdown and passes through accountId", async () => { const sendTelegram = vi.fn().mockResolvedValue({ messageId: "m1", chatId: "c1" }); @@ -175,6 +288,24 @@ describe("deliverOutboundPayloads", () => { ); }); + it("prefixes BTW replies for telegram delivery", async () => { + const sendTelegram = vi.fn().mockResolvedValue({ messageId: "m1", chatId: "c1" }); + + await deliverTelegramPayload({ + sendTelegram, + cfg: { + channels: { telegram: { botToken: "tok-1", textChunkLimit: 100 } }, + }, + payload: { text: "323", btw: { question: "what is 17 * 19?" } }, + }); + + expect(sendTelegram).toHaveBeenCalledWith( + "123", + "BTW: 323", + expect.objectContaining({ verbose: false, textMode: "html" }), + ); + }); + it("preserves HTML text for telegram sendPayload channelData path", async () => { const sendTelegram = vi.fn().mockResolvedValue({ messageId: "m1", chatId: "c1" }); @@ -416,9 +547,7 @@ describe("deliverOutboundPayloads", () => { }); it("chunks WhatsApp text and returns all results", async () => { - const { sendWhatsApp, results } = await runChunkedWhatsAppDeliveryHelper({ - deliverOutboundPayloads, - }); + const { sendWhatsApp, results } = await runChunkedWhatsAppDelivery(); expect(sendWhatsApp).toHaveBeenCalledTimes(2); expect(results.map((r) => r.messageId)).toEqual(["w1", "w2"]); @@ -614,6 +743,222 @@ describe("deliverOutboundPayloads", () => { ]); }); + it("prefixes BTW replies for whatsapp delivery", async () => { + const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" }); + + await deliverWhatsAppPayload({ + sendWhatsApp, + payload: { text: "323", btw: { question: "what is 17 * 19?" } }, + }); + + expect(sendWhatsApp).toHaveBeenCalledWith("+1555", "BTW: 323", expect.any(Object)); + }); + + it("continues on errors when bestEffort is enabled", async () => { + const { sendWhatsApp, onError, results } = await runBestEffortPartialFailureDelivery(); + + expect(sendWhatsApp).toHaveBeenCalledTimes(2); + expect(onError).toHaveBeenCalledTimes(1); + expect(results).toEqual([{ channel: "whatsapp", messageId: "w2", toJid: "jid" }]); + }); + + it("emits internal message:sent hook with success=true for chunked payload delivery", async () => { + const { sendWhatsApp } = await runChunkedWhatsAppDelivery({ + mirror: { + sessionKey: "agent:main:main", + isGroup: true, + groupId: "whatsapp:group:123", + }, + }); + expect(sendWhatsApp).toHaveBeenCalledTimes(2); + + expect(internalHookMocks.createInternalHookEvent).toHaveBeenCalledTimes(1); + expect(internalHookMocks.createInternalHookEvent).toHaveBeenCalledWith( + "message", + "sent", + "agent:main:main", + expectSuccessfulWhatsAppInternalHookPayload({ + content: "abcd", + messageId: "w2", + isGroup: true, + groupId: "whatsapp:group:123", + }), + ); + expect(internalHookMocks.triggerInternalHook).toHaveBeenCalledTimes(1); + }); + + it("does not emit internal message:sent hook when neither mirror nor sessionKey is provided", async () => { + await deliverSingleWhatsAppForHookTest(); + + expect(internalHookMocks.createInternalHookEvent).not.toHaveBeenCalled(); + expect(internalHookMocks.triggerInternalHook).not.toHaveBeenCalled(); + }); + + it("emits internal message:sent hook when sessionKey is provided without mirror", async () => { + await deliverSingleWhatsAppForHookTest({ sessionKey: "agent:main:main" }); + + expect(internalHookMocks.createInternalHookEvent).toHaveBeenCalledTimes(1); + expect(internalHookMocks.createInternalHookEvent).toHaveBeenCalledWith( + "message", + "sent", + "agent:main:main", + expectSuccessfulWhatsAppInternalHookPayload({ content: "hello", messageId: "w1" }), + ); + expect(internalHookMocks.triggerInternalHook).toHaveBeenCalledTimes(1); + }); + + it("warns when session.agentId is set without a session key", async () => { + const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" }); + hookMocks.runner.hasHooks.mockReturnValue(true); + + await deliverOutboundPayloads({ + cfg: whatsappChunkConfig, + channel: "whatsapp", + to: "+1555", + payloads: [{ text: "hello" }], + deps: { sendWhatsApp }, + session: { agentId: "agent-main" }, + }); + + expect(logMocks.warn).toHaveBeenCalledWith( + "deliverOutboundPayloads: session.agentId present without session key; internal message:sent hook will be skipped", + expect.objectContaining({ channel: "whatsapp", to: "+1555", agentId: "agent-main" }), + ); + }); + + it("calls failDelivery instead of ackDelivery on bestEffort partial failure", async () => { + const { onError } = await runBestEffortPartialFailureDelivery(); + + // onError was called for the first payload's failure. + expect(onError).toHaveBeenCalledTimes(1); + + // Queue entry should NOT be acked — failDelivery should be called instead. + expect(queueMocks.ackDelivery).not.toHaveBeenCalled(); + expect(queueMocks.failDelivery).toHaveBeenCalledWith( + "mock-queue-id", + "partial delivery failure (bestEffort)", + ); + }); + + it("acks the queue entry when delivery is aborted", async () => { + const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" }); + const abortController = new AbortController(); + abortController.abort(); + const cfg: OpenClawConfig = {}; + + await expect( + deliverOutboundPayloads({ + cfg, + channel: "whatsapp", + to: "+1555", + payloads: [{ text: "a" }], + deps: { sendWhatsApp }, + abortSignal: abortController.signal, + }), + ).rejects.toThrow("Operation aborted"); + + expect(queueMocks.ackDelivery).toHaveBeenCalledWith("mock-queue-id"); + expect(queueMocks.failDelivery).not.toHaveBeenCalled(); + expect(sendWhatsApp).not.toHaveBeenCalled(); + }); + + it("passes normalized payload to onError", async () => { + const sendWhatsApp = vi.fn().mockRejectedValue(new Error("boom")); + const onError = vi.fn(); + const cfg: OpenClawConfig = {}; + + await deliverOutboundPayloads({ + cfg, + channel: "whatsapp", + to: "+1555", + payloads: [{ text: "hi", mediaUrl: "https://x.test/a.jpg" }], + deps: { sendWhatsApp }, + bestEffort: true, + onError, + }); + + expect(onError).toHaveBeenCalledTimes(1); + expect(onError).toHaveBeenCalledWith( + expect.any(Error), + expect.objectContaining({ text: "hi", mediaUrls: ["https://x.test/a.jpg"] }), + ); + }); + + it("mirrors delivered output when mirror options are provided", async () => { + const sendTelegram = vi.fn().mockResolvedValue({ messageId: "m1", chatId: "c1" }); + mocks.appendAssistantMessageToSessionTranscript.mockClear(); + + await deliverOutboundPayloads({ + cfg: telegramChunkConfig, + channel: "telegram", + to: "123", + payloads: [{ text: "caption", mediaUrl: "https://example.com/files/report.pdf?sig=1" }], + deps: { sendTelegram }, + mirror: { + sessionKey: "agent:main:main", + text: "caption", + mediaUrls: ["https://example.com/files/report.pdf?sig=1"], + idempotencyKey: "idem-deliver-1", + }, + }); + + expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledWith( + expect.objectContaining({ + text: "report.pdf", + idempotencyKey: "idem-deliver-1", + }), + ); + }); + + it("emits message_sent success for text-only deliveries", async () => { + hookMocks.runner.hasHooks.mockReturnValue(true); + const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" }); + + await deliverOutboundPayloads({ + cfg: {}, + channel: "whatsapp", + to: "+1555", + payloads: [{ text: "hello" }], + deps: { sendWhatsApp }, + }); + + expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith( + expect.objectContaining({ to: "+1555", content: "hello", success: true }), + expect.objectContaining({ channelId: "whatsapp" }), + ); + }); + + it("emits message_sent success for sendPayload deliveries", async () => { + hookMocks.runner.hasHooks.mockReturnValue(true); + const sendPayload = vi.fn().mockResolvedValue({ channel: "matrix", messageId: "mx-1" }); + const sendText = vi.fn(); + const sendMedia = vi.fn(); + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: createOutboundTestPlugin({ + id: "matrix", + outbound: { deliveryMode: "direct", sendPayload, sendText, sendMedia }, + }), + }, + ]), + ); + + await deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room:1", + payloads: [{ text: "payload text", channelData: { mode: "custom" } }], + }); + + expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith( + expect.objectContaining({ to: "!room:1", content: "payload text", success: true }), + expect.objectContaining({ channelId: "matrix" }), + ); + }); + it("preserves channelData-only payloads with empty text for non-WhatsApp sendPayload channels", async () => { const sendPayload = vi.fn().mockResolvedValue({ channel: "line", messageId: "ln-1" }); const sendText = vi.fn(); @@ -649,11 +994,25 @@ describe("deliverOutboundPayloads", () => { it("falls back to sendText when plugin outbound omits sendMedia", async () => { const sendText = vi.fn().mockResolvedValue({ channel: "matrix", messageId: "mx-1" }); - setMatrixTextOnlyPlugin(sendText); + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: createOutboundTestPlugin({ + id: "matrix", + outbound: { deliveryMode: "direct", sendText }, + }), + }, + ]), + ); - const results = await deliverMatrixPayloads([ - { text: "caption", mediaUrl: "https://example.com/file.png" }, - ]); + const results = await deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room:1", + payloads: [{ text: "caption", mediaUrl: "https://example.com/file.png" }], + }); expect(sendText).toHaveBeenCalledTimes(1); expect(sendText).toHaveBeenCalledWith( @@ -661,19 +1020,42 @@ describe("deliverOutboundPayloads", () => { text: "caption", }), ); + expect(logMocks.warn).toHaveBeenCalledWith( + "Plugin outbound adapter does not implement sendMedia; media URLs will be dropped and text fallback will be used", + expect.objectContaining({ + channel: "matrix", + mediaCount: 1, + }), + ); expect(results).toEqual([{ channel: "matrix", messageId: "mx-1" }]); }); it("falls back to one sendText call for multi-media payloads when sendMedia is omitted", async () => { const sendText = vi.fn().mockResolvedValue({ channel: "matrix", messageId: "mx-2" }); - setMatrixTextOnlyPlugin(sendText); + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: createOutboundTestPlugin({ + id: "matrix", + outbound: { deliveryMode: "direct", sendText }, + }), + }, + ]), + ); - const results = await deliverMatrixPayloads([ - { - text: "caption", - mediaUrls: ["https://example.com/a.png", "https://example.com/b.png"], - }, - ]); + const results = await deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room:1", + payloads: [ + { + text: "caption", + mediaUrls: ["https://example.com/a.png", "https://example.com/b.png"], + }, + ], + }); expect(sendText).toHaveBeenCalledTimes(1); expect(sendText).toHaveBeenCalledWith( @@ -681,20 +1063,109 @@ describe("deliverOutboundPayloads", () => { text: "caption", }), ); + expect(logMocks.warn).toHaveBeenCalledWith( + "Plugin outbound adapter does not implement sendMedia; media URLs will be dropped and text fallback will be used", + expect.objectContaining({ + channel: "matrix", + mediaCount: 2, + }), + ); expect(results).toEqual([{ channel: "matrix", messageId: "mx-2" }]); }); it("fails media-only payloads when plugin outbound omits sendMedia", async () => { hookMocks.runner.hasHooks.mockReturnValue(true); const sendText = vi.fn().mockResolvedValue({ channel: "matrix", messageId: "mx-3" }); - setMatrixTextOnlyPlugin(sendText); + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: createOutboundTestPlugin({ + id: "matrix", + outbound: { deliveryMode: "direct", sendText }, + }), + }, + ]), + ); await expect( - deliverMatrixPayloads([{ text: " ", mediaUrl: "https://example.com/file.png" }]), + deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room:1", + payloads: [{ text: " ", mediaUrl: "https://example.com/file.png" }], + }), ).rejects.toThrow( "Plugin outbound adapter does not implement sendMedia and no text fallback is available for media payload", ); expect(sendText).not.toHaveBeenCalled(); + expect(logMocks.warn).toHaveBeenCalledWith( + "Plugin outbound adapter does not implement sendMedia; media URLs will be dropped and text fallback will be used", + expect.objectContaining({ + channel: "matrix", + mediaCount: 1, + }), + ); + expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith( + expect.objectContaining({ + to: "!room:1", + content: "", + success: false, + error: + "Plugin outbound adapter does not implement sendMedia and no text fallback is available for media payload", + }), + expect.objectContaining({ channelId: "matrix" }), + ); + }); + + it("emits message_sent failure when delivery errors", async () => { + hookMocks.runner.hasHooks.mockReturnValue(true); + const sendWhatsApp = vi.fn().mockRejectedValue(new Error("downstream failed")); + + await expect( + deliverOutboundPayloads({ + cfg: {}, + channel: "whatsapp", + to: "+1555", + payloads: [{ text: "hi" }], + deps: { sendWhatsApp }, + }), + ).rejects.toThrow("downstream failed"); + + expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith( + expect.objectContaining({ + to: "+1555", + content: "hi", + success: false, + error: "downstream failed", + }), + expect.objectContaining({ channelId: "whatsapp" }), + ); }); }); + +const emptyRegistry = createTestRegistry([]); +const defaultRegistry = createTestRegistry([ + { + pluginId: "telegram", + plugin: createOutboundTestPlugin({ id: "telegram", outbound: telegramOutbound }), + source: "test", + }, + { + pluginId: "signal", + plugin: createOutboundTestPlugin({ id: "signal", outbound: signalOutbound }), + source: "test", + }, + { + pluginId: "whatsapp", + plugin: createOutboundTestPlugin({ id: "whatsapp", outbound: whatsappOutbound }), + source: "test", + }, + { + pluginId: "imessage", + plugin: createIMessageTestPlugin(), + source: "test", + }, +]); diff --git a/src/infra/outbound/outbound.test.ts b/src/infra/outbound/outbound.test.ts index c20632099bd..5e5ea6f0ea8 100644 --- a/src/infra/outbound/outbound.test.ts +++ b/src/infra/outbound/outbound.test.ts @@ -1,3 +1,1309 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import type { ReplyPayload } from "../../auto-reply/types.js"; +import type { OpenClawConfig } from "../../config/config.js"; +import { typedCases } from "../../test-utils/typed-cases.js"; +import { + ackDelivery, + computeBackoffMs, + type DeliverFn, + enqueueDelivery, + failDelivery, + isEntryEligibleForRecoveryRetry, + isPermanentDeliveryError, + loadPendingDeliveries, + MAX_RETRIES, + moveToFailed, + recoverPendingDeliveries, +} from "./delivery-queue.js"; +import { DirectoryCache } from "./directory-cache.js"; +import { buildOutboundResultEnvelope } from "./envelope.js"; +import type { OutboundDeliveryJson } from "./format.js"; +import { + buildOutboundDeliveryJson, + formatGatewaySummary, + formatOutboundDeliverySummary, +} from "./format.js"; +import { + applyCrossContextDecoration, + buildCrossContextDecoration, + enforceCrossContextPolicy, +} from "./outbound-policy.js"; +import { resolveOutboundSessionRoute } from "./outbound-session.js"; +import { + formatOutboundPayloadLog, + normalizeOutboundPayloads, + normalizeOutboundPayloadsForJson, +} from "./payloads.js"; import { runResolveOutboundTargetCoreTests } from "./targets.shared-test.js"; +describe("delivery-queue", () => { + let tmpDir: string; + let fixtureRoot = ""; + let fixtureCount = 0; + + beforeAll(() => { + fixtureRoot = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-dq-suite-")); + }); + + beforeEach(() => { + tmpDir = path.join(fixtureRoot, `case-${fixtureCount++}`); + fs.mkdirSync(tmpDir, { recursive: true }); + }); + + afterAll(() => { + if (!fixtureRoot) { + return; + } + fs.rmSync(fixtureRoot, { recursive: true, force: true }); + fixtureRoot = ""; + }); + + describe("enqueue + ack lifecycle", () => { + it("creates and removes a queue entry", async () => { + const id = await enqueueDelivery( + { + channel: "whatsapp", + to: "+1555", + payloads: [{ text: "hello" }], + bestEffort: true, + gifPlayback: true, + silent: true, + mirror: { + sessionKey: "agent:main:main", + text: "hello", + mediaUrls: ["https://example.com/file.png"], + }, + }, + tmpDir, + ); + + // Entry file exists after enqueue. + const queueDir = path.join(tmpDir, "delivery-queue"); + const files = fs.readdirSync(queueDir).filter((f) => f.endsWith(".json")); + expect(files).toHaveLength(1); + expect(files[0]).toBe(`${id}.json`); + + // Entry contents are correct. + const entry = JSON.parse(fs.readFileSync(path.join(queueDir, files[0]), "utf-8")); + expect(entry).toMatchObject({ + id, + channel: "whatsapp", + to: "+1555", + bestEffort: true, + gifPlayback: true, + silent: true, + mirror: { + sessionKey: "agent:main:main", + text: "hello", + mediaUrls: ["https://example.com/file.png"], + }, + retryCount: 0, + }); + expect(entry.payloads).toEqual([{ text: "hello" }]); + + // Ack removes the file. + await ackDelivery(id, tmpDir); + const remaining = fs.readdirSync(queueDir).filter((f) => f.endsWith(".json")); + expect(remaining).toHaveLength(0); + }); + + it("ack is idempotent (no error on missing file)", async () => { + await expect(ackDelivery("nonexistent-id", tmpDir)).resolves.toBeUndefined(); + }); + + it("ack cleans up leftover .delivered marker when .json is already gone", async () => { + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1", payloads: [{ text: "stale-marker" }] }, + tmpDir, + ); + const queueDir = path.join(tmpDir, "delivery-queue"); + + fs.renameSync(path.join(queueDir, `${id}.json`), path.join(queueDir, `${id}.delivered`)); + await expect(ackDelivery(id, tmpDir)).resolves.toBeUndefined(); + + expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false); + }); + + it("ack removes .delivered marker so recovery does not replay", async () => { + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1", payloads: [{ text: "ack-test" }] }, + tmpDir, + ); + const queueDir = path.join(tmpDir, "delivery-queue"); + + await ackDelivery(id, tmpDir); + + // Neither .json nor .delivered should remain. + expect(fs.existsSync(path.join(queueDir, `${id}.json`))).toBe(false); + expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false); + }); + + it("loadPendingDeliveries cleans up stale .delivered markers without replaying", async () => { + const id = await enqueueDelivery( + { channel: "telegram", to: "99", payloads: [{ text: "stale" }] }, + tmpDir, + ); + const queueDir = path.join(tmpDir, "delivery-queue"); + + // Simulate crash between ack phase 1 (rename) and phase 2 (unlink): + // rename .json → .delivered, then pretend the process died. + fs.renameSync(path.join(queueDir, `${id}.json`), path.join(queueDir, `${id}.delivered`)); + + const entries = await loadPendingDeliveries(tmpDir); + + // The .delivered entry must NOT appear as pending. + expect(entries).toHaveLength(0); + // And the marker file should have been cleaned up. + expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false); + }); + }); + + describe("failDelivery", () => { + it("increments retryCount, records attempt time, and sets lastError", async () => { + const id = await enqueueDelivery( + { + channel: "telegram", + to: "123", + payloads: [{ text: "test" }], + }, + tmpDir, + ); + + await failDelivery(id, "connection refused", tmpDir); + + const queueDir = path.join(tmpDir, "delivery-queue"); + const entry = JSON.parse(fs.readFileSync(path.join(queueDir, `${id}.json`), "utf-8")); + expect(entry.retryCount).toBe(1); + expect(typeof entry.lastAttemptAt).toBe("number"); + expect(entry.lastAttemptAt).toBeGreaterThan(0); + expect(entry.lastError).toBe("connection refused"); + }); + }); + + describe("moveToFailed", () => { + it("moves entry to failed/ subdirectory", async () => { + const id = await enqueueDelivery( + { + channel: "slack", + to: "#general", + payloads: [{ text: "hi" }], + }, + tmpDir, + ); + + await moveToFailed(id, tmpDir); + + const queueDir = path.join(tmpDir, "delivery-queue"); + const failedDir = path.join(queueDir, "failed"); + expect(fs.existsSync(path.join(queueDir, `${id}.json`))).toBe(false); + expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true); + }); + }); + + describe("isPermanentDeliveryError", () => { + it.each([ + "No conversation reference found for user:abc", + "Telegram send failed: chat not found (chat_id=user:123)", + "user not found", + "Bot was blocked by the user", + "Forbidden: bot was kicked from the group chat", + "chat_id is empty", + "Outbound not configured for channel: msteams", + ])("returns true for permanent error: %s", (msg) => { + expect(isPermanentDeliveryError(msg)).toBe(true); + }); + + it.each([ + "network down", + "ETIMEDOUT", + "socket hang up", + "rate limited", + "500 Internal Server Error", + ])("returns false for transient error: %s", (msg) => { + expect(isPermanentDeliveryError(msg)).toBe(false); + }); + }); + + describe("loadPendingDeliveries", () => { + it("returns empty array when queue directory does not exist", async () => { + const nonexistent = path.join(tmpDir, "no-such-dir"); + const entries = await loadPendingDeliveries(nonexistent); + expect(entries).toEqual([]); + }); + + it("loads multiple entries", async () => { + await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir); + await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir); + + const entries = await loadPendingDeliveries(tmpDir); + expect(entries).toHaveLength(2); + }); + + it("backfills lastAttemptAt for legacy retry entries during load", async () => { + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1", payloads: [{ text: "legacy" }] }, + tmpDir, + ); + const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`); + const legacyEntry = JSON.parse(fs.readFileSync(filePath, "utf-8")); + legacyEntry.retryCount = 2; + delete legacyEntry.lastAttemptAt; + fs.writeFileSync(filePath, JSON.stringify(legacyEntry), "utf-8"); + + const entries = await loadPendingDeliveries(tmpDir); + expect(entries).toHaveLength(1); + expect(entries[0]?.lastAttemptAt).toBe(entries[0]?.enqueuedAt); + + const persisted = JSON.parse(fs.readFileSync(filePath, "utf-8")); + expect(persisted.lastAttemptAt).toBe(persisted.enqueuedAt); + }); + }); + + describe("computeBackoffMs", () => { + it("returns scheduled backoff values and clamps at max retry", () => { + const cases = [ + { retryCount: 0, expected: 0 }, + { retryCount: 1, expected: 5_000 }, + { retryCount: 2, expected: 25_000 }, + { retryCount: 3, expected: 120_000 }, + { retryCount: 4, expected: 600_000 }, + // Beyond defined schedule -- clamps to last value. + { retryCount: 5, expected: 600_000 }, + ] as const; + + for (const testCase of cases) { + expect(computeBackoffMs(testCase.retryCount), String(testCase.retryCount)).toBe( + testCase.expected, + ); + } + }); + }); + + describe("isEntryEligibleForRecoveryRetry", () => { + it("allows first replay after crash for retryCount=0 without lastAttemptAt", () => { + const now = Date.now(); + const result = isEntryEligibleForRecoveryRetry( + { + id: "entry-1", + channel: "whatsapp", + to: "+1", + payloads: [{ text: "a" }], + enqueuedAt: now, + retryCount: 0, + }, + now, + ); + expect(result).toEqual({ eligible: true }); + }); + + it("defers retry entries until backoff window elapses", () => { + const now = Date.now(); + const result = isEntryEligibleForRecoveryRetry( + { + id: "entry-2", + channel: "whatsapp", + to: "+1", + payloads: [{ text: "a" }], + enqueuedAt: now - 30_000, + retryCount: 3, + lastAttemptAt: now, + }, + now, + ); + expect(result.eligible).toBe(false); + if (result.eligible) { + throw new Error("Expected ineligible retry entry"); + } + expect(result.remainingBackoffMs).toBeGreaterThan(0); + }); + }); + + describe("recoverPendingDeliveries", () => { + const baseCfg = {}; + const createLog = () => ({ info: vi.fn(), warn: vi.fn(), error: vi.fn() }); + const enqueueCrashRecoveryEntries = async () => { + await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir); + await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir); + }; + const setEntryState = ( + id: string, + state: { retryCount: number; lastAttemptAt?: number; enqueuedAt?: number }, + ) => { + const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`); + const entry = JSON.parse(fs.readFileSync(filePath, "utf-8")); + entry.retryCount = state.retryCount; + if (state.lastAttemptAt === undefined) { + delete entry.lastAttemptAt; + } else { + entry.lastAttemptAt = state.lastAttemptAt; + } + if (state.enqueuedAt !== undefined) { + entry.enqueuedAt = state.enqueuedAt; + } + fs.writeFileSync(filePath, JSON.stringify(entry), "utf-8"); + }; + const runRecovery = async ({ + deliver, + log = createLog(), + maxRecoveryMs, + }: { + deliver: ReturnType; + log?: ReturnType; + maxRecoveryMs?: number; + }) => { + const result = await recoverPendingDeliveries({ + deliver: deliver as DeliverFn, + log, + cfg: baseCfg, + stateDir: tmpDir, + ...(maxRecoveryMs === undefined ? {} : { maxRecoveryMs }), + }); + return { result, log }; + }; + + it("recovers entries from a simulated crash", async () => { + // Manually create queue entries as if gateway crashed before delivery. + await enqueueCrashRecoveryEntries(); + const deliver = vi.fn().mockResolvedValue([]); + const { result } = await runRecovery({ deliver }); + + expect(deliver).toHaveBeenCalledTimes(2); + expect(result.recovered).toBe(2); + expect(result.failed).toBe(0); + expect(result.skippedMaxRetries).toBe(0); + expect(result.deferredBackoff).toBe(0); + + // Queue should be empty after recovery. + const remaining = await loadPendingDeliveries(tmpDir); + expect(remaining).toHaveLength(0); + }); + + it("moves entries that exceeded max retries to failed/", async () => { + // Create an entry and manually set retryCount to MAX_RETRIES. + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, + tmpDir, + ); + setEntryState(id, { retryCount: MAX_RETRIES }); + + const deliver = vi.fn(); + const { result } = await runRecovery({ deliver }); + + expect(deliver).not.toHaveBeenCalled(); + expect(result.skippedMaxRetries).toBe(1); + expect(result.deferredBackoff).toBe(0); + + // Entry should be in failed/ directory. + const failedDir = path.join(tmpDir, "delivery-queue", "failed"); + expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true); + }); + + it("increments retryCount on failed recovery attempt", async () => { + await enqueueDelivery({ channel: "slack", to: "#ch", payloads: [{ text: "x" }] }, tmpDir); + + const deliver = vi.fn().mockRejectedValue(new Error("network down")); + const { result } = await runRecovery({ deliver }); + + expect(result.failed).toBe(1); + expect(result.recovered).toBe(0); + + // Entry should still be in queue with incremented retryCount. + const entries = await loadPendingDeliveries(tmpDir); + expect(entries).toHaveLength(1); + expect(entries[0].retryCount).toBe(1); + expect(entries[0].lastError).toBe("network down"); + }); + + it("moves entries to failed/ immediately on permanent delivery errors", async () => { + const id = await enqueueDelivery( + { channel: "msteams", to: "user:abc", payloads: [{ text: "hi" }] }, + tmpDir, + ); + const deliver = vi + .fn() + .mockRejectedValue(new Error("No conversation reference found for user:abc")); + const log = createLog(); + const { result } = await runRecovery({ deliver, log }); + + expect(result.failed).toBe(1); + expect(result.recovered).toBe(0); + const remaining = await loadPendingDeliveries(tmpDir); + expect(remaining).toHaveLength(0); + const failedDir = path.join(tmpDir, "delivery-queue", "failed"); + expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true); + expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("permanent error")); + }); + + it("passes skipQueue: true to prevent re-enqueueing during recovery", async () => { + await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir); + + const deliver = vi.fn().mockResolvedValue([]); + await runRecovery({ deliver }); + + expect(deliver).toHaveBeenCalledWith(expect.objectContaining({ skipQueue: true })); + }); + + it("replays stored delivery options during recovery", async () => { + await enqueueDelivery( + { + channel: "whatsapp", + to: "+1", + payloads: [{ text: "a" }], + bestEffort: true, + gifPlayback: true, + silent: true, + mirror: { + sessionKey: "agent:main:main", + text: "a", + mediaUrls: ["https://example.com/a.png"], + }, + }, + tmpDir, + ); + + const deliver = vi.fn().mockResolvedValue([]); + await runRecovery({ deliver }); + + expect(deliver).toHaveBeenCalledWith( + expect.objectContaining({ + bestEffort: true, + gifPlayback: true, + silent: true, + mirror: { + sessionKey: "agent:main:main", + text: "a", + mediaUrls: ["https://example.com/a.png"], + }, + }), + ); + }); + + it("respects maxRecoveryMs time budget", async () => { + await enqueueCrashRecoveryEntries(); + await enqueueDelivery({ channel: "slack", to: "#c", payloads: [{ text: "c" }] }, tmpDir); + + const deliver = vi.fn().mockResolvedValue([]); + const { result, log } = await runRecovery({ + deliver, + maxRecoveryMs: 0, // Immediate timeout -- no entries should be processed. + }); + + expect(deliver).not.toHaveBeenCalled(); + expect(result.recovered).toBe(0); + expect(result.failed).toBe(0); + expect(result.skippedMaxRetries).toBe(0); + expect(result.deferredBackoff).toBe(0); + + // All entries should still be in the queue. + const remaining = await loadPendingDeliveries(tmpDir); + expect(remaining).toHaveLength(3); + + // Should have logged a warning about deferred entries. + expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("deferred to next restart")); + }); + + it("defers entries until backoff becomes eligible", async () => { + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, + tmpDir, + ); + setEntryState(id, { retryCount: 3, lastAttemptAt: Date.now() }); + + const deliver = vi.fn().mockResolvedValue([]); + const { result, log } = await runRecovery({ + deliver, + maxRecoveryMs: 60_000, + }); + + expect(deliver).not.toHaveBeenCalled(); + expect(result).toEqual({ + recovered: 0, + failed: 0, + skippedMaxRetries: 0, + deferredBackoff: 1, + }); + + const remaining = await loadPendingDeliveries(tmpDir); + expect(remaining).toHaveLength(1); + + expect(log.info).toHaveBeenCalledWith(expect.stringContaining("not ready for retry yet")); + }); + + it("continues past high-backoff entries and recovers ready entries behind them", async () => { + const now = Date.now(); + const blockedId = await enqueueDelivery( + { channel: "whatsapp", to: "+1", payloads: [{ text: "blocked" }] }, + tmpDir, + ); + const readyId = await enqueueDelivery( + { channel: "telegram", to: "2", payloads: [{ text: "ready" }] }, + tmpDir, + ); + + setEntryState(blockedId, { retryCount: 3, lastAttemptAt: now, enqueuedAt: now - 30_000 }); + setEntryState(readyId, { retryCount: 0, enqueuedAt: now - 10_000 }); + + const deliver = vi.fn().mockResolvedValue([]); + const { result } = await runRecovery({ deliver, maxRecoveryMs: 60_000 }); + + expect(result).toEqual({ + recovered: 1, + failed: 0, + skippedMaxRetries: 0, + deferredBackoff: 1, + }); + expect(deliver).toHaveBeenCalledTimes(1); + expect(deliver).toHaveBeenCalledWith( + expect.objectContaining({ channel: "telegram", to: "2", skipQueue: true }), + ); + + const remaining = await loadPendingDeliveries(tmpDir); + expect(remaining).toHaveLength(1); + expect(remaining[0]?.id).toBe(blockedId); + }); + + it("recovers deferred entries on a later restart once backoff elapsed", async () => { + vi.useFakeTimers(); + const start = new Date("2026-01-01T00:00:00.000Z"); + vi.setSystemTime(start); + + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1", payloads: [{ text: "later" }] }, + tmpDir, + ); + setEntryState(id, { retryCount: 3, lastAttemptAt: start.getTime() }); + + const firstDeliver = vi.fn().mockResolvedValue([]); + const firstRun = await runRecovery({ deliver: firstDeliver, maxRecoveryMs: 60_000 }); + expect(firstRun.result).toEqual({ + recovered: 0, + failed: 0, + skippedMaxRetries: 0, + deferredBackoff: 1, + }); + expect(firstDeliver).not.toHaveBeenCalled(); + + vi.setSystemTime(new Date(start.getTime() + 600_000 + 1)); + const secondDeliver = vi.fn().mockResolvedValue([]); + const secondRun = await runRecovery({ deliver: secondDeliver, maxRecoveryMs: 60_000 }); + expect(secondRun.result).toEqual({ + recovered: 1, + failed: 0, + skippedMaxRetries: 0, + deferredBackoff: 0, + }); + expect(secondDeliver).toHaveBeenCalledTimes(1); + + const remaining = await loadPendingDeliveries(tmpDir); + expect(remaining).toHaveLength(0); + + vi.useRealTimers(); + }); + + it("returns zeros when queue is empty", async () => { + const deliver = vi.fn(); + const { result } = await runRecovery({ deliver }); + + expect(result).toEqual({ + recovered: 0, + failed: 0, + skippedMaxRetries: 0, + deferredBackoff: 0, + }); + expect(deliver).not.toHaveBeenCalled(); + }); + }); +}); + +describe("DirectoryCache", () => { + const cfg = {} as OpenClawConfig; + + afterEach(() => { + vi.useRealTimers(); + }); + + it("expires entries after ttl", () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z")); + const cache = new DirectoryCache(1000, 10); + + cache.set("a", "value-a", cfg); + expect(cache.get("a", cfg)).toBe("value-a"); + + vi.setSystemTime(new Date("2026-01-01T00:00:02.000Z")); + expect(cache.get("a", cfg)).toBeUndefined(); + }); + + it("evicts least-recent entries when capacity is exceeded", () => { + const cases = [ + { + actions: [ + ["set", "a", "value-a"], + ["set", "b", "value-b"], + ["set", "c", "value-c"], + ] as const, + expected: { a: undefined, b: "value-b", c: "value-c" }, + }, + { + actions: [ + ["set", "a", "value-a"], + ["set", "b", "value-b"], + ["set", "a", "value-a2"], + ["set", "c", "value-c"], + ] as const, + expected: { a: "value-a2", b: undefined, c: "value-c" }, + }, + ]; + + for (const testCase of cases) { + const cache = new DirectoryCache(60_000, 2); + for (const action of testCase.actions) { + cache.set(action[1], action[2], cfg); + } + expect(cache.get("a", cfg)).toBe(testCase.expected.a); + expect(cache.get("b", cfg)).toBe(testCase.expected.b); + expect(cache.get("c", cfg)).toBe(testCase.expected.c); + } + }); +}); + +describe("buildOutboundResultEnvelope", () => { + it("formats envelope variants", () => { + const whatsappDelivery: OutboundDeliveryJson = { + channel: "whatsapp", + via: "gateway", + to: "+1", + messageId: "m1", + mediaUrl: null, + }; + const telegramDelivery: OutboundDeliveryJson = { + channel: "telegram", + via: "direct", + to: "123", + messageId: "m2", + mediaUrl: null, + chatId: "c1", + }; + const discordDelivery: OutboundDeliveryJson = { + channel: "discord", + via: "gateway", + to: "channel:C1", + messageId: "m3", + mediaUrl: null, + channelId: "C1", + }; + const cases = typedCases<{ + name: string; + input: Parameters[0]; + expected: unknown; + }>([ + { + name: "flatten delivery by default", + input: { delivery: whatsappDelivery }, + expected: whatsappDelivery, + }, + { + name: "keep payloads + meta", + input: { + payloads: [{ text: "hi", mediaUrl: null, mediaUrls: undefined }], + meta: { foo: "bar" }, + }, + expected: { + payloads: [{ text: "hi", mediaUrl: null, mediaUrls: undefined }], + meta: { foo: "bar" }, + }, + }, + { + name: "include delivery when payloads exist", + input: { payloads: [], delivery: telegramDelivery, meta: { ok: true } }, + expected: { + payloads: [], + meta: { ok: true }, + delivery: telegramDelivery, + }, + }, + { + name: "keep wrapped delivery when flatten disabled", + input: { delivery: discordDelivery, flattenDelivery: false }, + expected: { delivery: discordDelivery }, + }, + ]); + for (const testCase of cases) { + expect(buildOutboundResultEnvelope(testCase.input), testCase.name).toEqual(testCase.expected); + } + }); +}); + +describe("formatOutboundDeliverySummary", () => { + it("formats fallback and channel-specific detail variants", () => { + const cases = [ + { + name: "fallback telegram", + channel: "telegram" as const, + result: undefined, + expected: "✅ Sent via Telegram. Message ID: unknown", + }, + { + name: "fallback imessage", + channel: "imessage" as const, + result: undefined, + expected: "✅ Sent via iMessage. Message ID: unknown", + }, + { + name: "telegram with chat detail", + channel: "telegram" as const, + result: { + channel: "telegram" as const, + messageId: "m1", + chatId: "c1", + }, + expected: "✅ Sent via Telegram. Message ID: m1 (chat c1)", + }, + { + name: "discord with channel detail", + channel: "discord" as const, + result: { + channel: "discord" as const, + messageId: "d1", + channelId: "chan", + }, + expected: "✅ Sent via Discord. Message ID: d1 (channel chan)", + }, + ]; + + for (const testCase of cases) { + expect(formatOutboundDeliverySummary(testCase.channel, testCase.result), testCase.name).toBe( + testCase.expected, + ); + } + }); +}); + +describe("buildOutboundDeliveryJson", () => { + it("builds direct delivery payloads across provider-specific fields", () => { + const cases = [ + { + name: "telegram direct payload", + input: { + channel: "telegram" as const, + to: "123", + result: { channel: "telegram" as const, messageId: "m1", chatId: "c1" }, + mediaUrl: "https://example.com/a.png", + }, + expected: { + channel: "telegram", + via: "direct", + to: "123", + messageId: "m1", + mediaUrl: "https://example.com/a.png", + chatId: "c1", + }, + }, + { + name: "whatsapp metadata", + input: { + channel: "whatsapp" as const, + to: "+1", + result: { channel: "whatsapp" as const, messageId: "w1", toJid: "jid" }, + }, + expected: { + channel: "whatsapp", + via: "direct", + to: "+1", + messageId: "w1", + mediaUrl: null, + toJid: "jid", + }, + }, + { + name: "signal timestamp", + input: { + channel: "signal" as const, + to: "+1", + result: { channel: "signal" as const, messageId: "s1", timestamp: 123 }, + }, + expected: { + channel: "signal", + via: "direct", + to: "+1", + messageId: "s1", + mediaUrl: null, + timestamp: 123, + }, + }, + ]; + + for (const testCase of cases) { + expect(buildOutboundDeliveryJson(testCase.input), testCase.name).toEqual(testCase.expected); + } + }); +}); + +describe("formatGatewaySummary", () => { + it("formats default and custom gateway action summaries", () => { + const cases = [ + { + name: "default send action", + input: { channel: "whatsapp", messageId: "m1" }, + expected: "✅ Sent via gateway (whatsapp). Message ID: m1", + }, + { + name: "custom action", + input: { action: "Poll sent", channel: "discord", messageId: "p1" }, + expected: "✅ Poll sent via gateway (discord). Message ID: p1", + }, + ]; + + for (const testCase of cases) { + expect(formatGatewaySummary(testCase.input), testCase.name).toBe(testCase.expected); + } + }); +}); + +const slackConfig = { + channels: { + slack: { + botToken: "xoxb-test", + appToken: "xapp-test", + }, + }, +} as OpenClawConfig; + +const discordConfig = { + channels: { + discord: {}, + }, +} as OpenClawConfig; + +describe("outbound policy", () => { + it("allows cross-provider sends when enabled", () => { + const cfg = { + ...slackConfig, + tools: { + message: { crossContext: { allowAcrossProviders: true } }, + }, + } as OpenClawConfig; + + expect(() => + enforceCrossContextPolicy({ + cfg, + channel: "telegram", + action: "send", + args: { to: "telegram:@ops" }, + toolContext: { currentChannelId: "C12345678", currentChannelProvider: "slack" }, + }), + ).not.toThrow(); + }); + + it("uses components when available and preferred", async () => { + const decoration = await buildCrossContextDecoration({ + cfg: discordConfig, + channel: "discord", + target: "123", + toolContext: { currentChannelId: "C12345678", currentChannelProvider: "discord" }, + }); + + expect(decoration).not.toBeNull(); + const applied = applyCrossContextDecoration({ + message: "hello", + decoration: decoration!, + preferComponents: true, + }); + + expect(applied.usedComponents).toBe(true); + expect(applied.componentsBuilder).toBeDefined(); + expect(applied.componentsBuilder?.("hello").length).toBeGreaterThan(0); + expect(applied.message).toBe("hello"); + }); +}); + +describe("resolveOutboundSessionRoute", () => { + const baseConfig = {} as OpenClawConfig; + + it("resolves provider-specific session routes", async () => { + const perChannelPeerCfg = { session: { dmScope: "per-channel-peer" } } as OpenClawConfig; + const identityLinksCfg = { + session: { + dmScope: "per-peer", + identityLinks: { + alice: ["discord:123"], + }, + }, + } as OpenClawConfig; + const slackMpimCfg = { + channels: { + slack: { + dm: { + groupChannels: ["G123"], + }, + }, + }, + } as OpenClawConfig; + const cases: Array<{ + name: string; + cfg: OpenClawConfig; + channel: string; + target: string; + replyToId?: string; + threadId?: string; + expected: { + sessionKey: string; + from?: string; + to?: string; + threadId?: string | number; + chatType?: "direct" | "group"; + }; + }> = [ + { + name: "Slack thread", + cfg: baseConfig, + channel: "slack", + target: "channel:C123", + replyToId: "456", + expected: { + sessionKey: "agent:main:slack:channel:c123:thread:456", + from: "slack:channel:C123", + to: "channel:C123", + threadId: "456", + }, + }, + { + name: "Telegram topic group", + cfg: baseConfig, + channel: "telegram", + target: "-100123456:topic:42", + expected: { + sessionKey: "agent:main:telegram:group:-100123456:topic:42", + from: "telegram:group:-100123456:topic:42", + to: "telegram:-100123456", + threadId: 42, + }, + }, + { + name: "Telegram DM with topic", + cfg: perChannelPeerCfg, + channel: "telegram", + target: "123456789:topic:99", + expected: { + sessionKey: "agent:main:telegram:direct:123456789:thread:99", + from: "telegram:123456789:topic:99", + to: "telegram:123456789", + threadId: 99, + chatType: "direct", + }, + }, + { + name: "Telegram unresolved username DM", + cfg: perChannelPeerCfg, + channel: "telegram", + target: "@alice", + expected: { + sessionKey: "agent:main:telegram:direct:@alice", + chatType: "direct", + }, + }, + { + name: "Telegram DM scoped threadId fallback", + cfg: perChannelPeerCfg, + channel: "telegram", + target: "12345", + threadId: "12345:99", + expected: { + sessionKey: "agent:main:telegram:direct:12345:thread:99", + from: "telegram:12345:topic:99", + to: "telegram:12345", + threadId: 99, + chatType: "direct", + }, + }, + { + name: "identity-links per-peer", + cfg: identityLinksCfg, + channel: "discord", + target: "user:123", + expected: { + sessionKey: "agent:main:direct:alice", + }, + }, + { + name: "BlueBubbles chat_* prefix stripping", + cfg: baseConfig, + channel: "bluebubbles", + target: "chat_guid:ABC123", + expected: { + sessionKey: "agent:main:bluebubbles:group:abc123", + from: "group:ABC123", + }, + }, + { + name: "Zalo Personal DM target", + cfg: perChannelPeerCfg, + channel: "zalouser", + target: "123456", + expected: { + sessionKey: "agent:main:zalouser:direct:123456", + chatType: "direct", + }, + }, + { + name: "Slack mpim allowlist -> group key", + cfg: slackMpimCfg, + channel: "slack", + target: "channel:G123", + expected: { + sessionKey: "agent:main:slack:group:g123", + from: "slack:group:G123", + }, + }, + { + name: "Feishu explicit group prefix keeps group routing", + cfg: baseConfig, + channel: "feishu", + target: "group:oc_group_chat", + expected: { + sessionKey: "agent:main:feishu:group:oc_group_chat", + from: "feishu:group:oc_group_chat", + to: "oc_group_chat", + chatType: "group", + }, + }, + { + name: "Feishu explicit dm prefix keeps direct routing", + cfg: perChannelPeerCfg, + channel: "feishu", + target: "dm:oc_dm_chat", + expected: { + sessionKey: "agent:main:feishu:direct:oc_dm_chat", + from: "feishu:oc_dm_chat", + to: "oc_dm_chat", + chatType: "direct", + }, + }, + { + name: "Feishu bare oc_ target defaults to direct routing", + cfg: perChannelPeerCfg, + channel: "feishu", + target: "oc_ambiguous_chat", + expected: { + sessionKey: "agent:main:feishu:direct:oc_ambiguous_chat", + from: "feishu:oc_ambiguous_chat", + to: "oc_ambiguous_chat", + chatType: "direct", + }, + }, + ]; + + for (const testCase of cases) { + const route = await resolveOutboundSessionRoute({ + cfg: testCase.cfg, + channel: testCase.channel, + agentId: "main", + target: testCase.target, + replyToId: testCase.replyToId, + threadId: testCase.threadId, + }); + expect(route?.sessionKey, testCase.name).toBe(testCase.expected.sessionKey); + if (testCase.expected.from !== undefined) { + expect(route?.from, testCase.name).toBe(testCase.expected.from); + } + if (testCase.expected.to !== undefined) { + expect(route?.to, testCase.name).toBe(testCase.expected.to); + } + if (testCase.expected.threadId !== undefined) { + expect(route?.threadId, testCase.name).toBe(testCase.expected.threadId); + } + if (testCase.expected.chatType !== undefined) { + expect(route?.chatType, testCase.name).toBe(testCase.expected.chatType); + } + } + }); + + it("uses resolved Discord user targets to route bare numeric ids as DMs", async () => { + const route = await resolveOutboundSessionRoute({ + cfg: { session: { dmScope: "per-channel-peer" } } as OpenClawConfig, + channel: "discord", + agentId: "main", + target: "123", + resolvedTarget: { + to: "user:123", + kind: "user", + source: "directory", + }, + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:discord:direct:123", + from: "discord:123", + to: "user:123", + chatType: "direct", + }); + }); + + it("uses resolved Mattermost user targets to route bare ids as DMs", async () => { + const userId = "dthcxgoxhifn3pwh65cut3ud3w"; + const route = await resolveOutboundSessionRoute({ + cfg: { session: { dmScope: "per-channel-peer" } } as OpenClawConfig, + channel: "mattermost", + agentId: "main", + target: userId, + resolvedTarget: { + to: `user:${userId}`, + kind: "user", + source: "directory", + }, + }); + + expect(route).toMatchObject({ + sessionKey: `agent:main:mattermost:direct:${userId}`, + from: `mattermost:${userId}`, + to: `user:${userId}`, + chatType: "direct", + }); + }); + + it("rejects bare numeric Discord targets when the caller has no kind hint", async () => { + await expect( + resolveOutboundSessionRoute({ + cfg: { session: { dmScope: "per-channel-peer" } } as OpenClawConfig, + channel: "discord", + agentId: "main", + target: "123", + }), + ).rejects.toThrow(/Ambiguous Discord recipient/); + }); +}); + +describe("normalizeOutboundPayloadsForJson", () => { + it("normalizes payloads for JSON output", () => { + const cases = typedCases<{ + input: Parameters[0]; + expected: ReturnType; + }>([ + { + input: [ + { text: "hi" }, + { text: "photo", mediaUrl: "https://x.test/a.jpg" }, + { text: "multi", mediaUrls: ["https://x.test/1.png"] }, + ], + expected: [ + { text: "hi", mediaUrl: null, mediaUrls: undefined, channelData: undefined }, + { + text: "photo", + mediaUrl: "https://x.test/a.jpg", + mediaUrls: ["https://x.test/a.jpg"], + channelData: undefined, + }, + { + text: "multi", + mediaUrl: null, + mediaUrls: ["https://x.test/1.png"], + channelData: undefined, + }, + ], + }, + { + input: [ + { + text: "MEDIA:https://x.test/a.png\nMEDIA:https://x.test/b.png", + }, + ], + expected: [ + { + text: "", + mediaUrl: null, + mediaUrls: ["https://x.test/a.png", "https://x.test/b.png"], + channelData: undefined, + }, + ], + }, + ]); + + for (const testCase of cases) { + const input: ReplyPayload[] = testCase.input.map((payload) => + "mediaUrls" in payload + ? ({ + ...payload, + mediaUrls: payload.mediaUrls ? [...payload.mediaUrls] : undefined, + } as ReplyPayload) + : ({ ...payload } as ReplyPayload), + ); + expect(normalizeOutboundPayloadsForJson(input)).toEqual(testCase.expected); + } + }); + + it("suppresses reasoning payloads", () => { + const normalized = normalizeOutboundPayloadsForJson([ + { text: "Reasoning:\n_step_", isReasoning: true }, + { text: "final answer" }, + ]); + expect(normalized).toEqual([{ text: "final answer", mediaUrl: null, mediaUrls: undefined }]); + }); +}); + +describe("normalizeOutboundPayloads", () => { + it("keeps channelData-only payloads", () => { + const channelData = { line: { flexMessage: { altText: "Card", contents: {} } } }; + const normalized = normalizeOutboundPayloads([{ channelData }]); + expect(normalized).toEqual([{ text: "", mediaUrls: [], channelData }]); + }); + + it("suppresses reasoning payloads", () => { + const normalized = normalizeOutboundPayloads([ + { text: "Reasoning:\n_step_", isReasoning: true }, + { text: "final answer" }, + ]); + expect(normalized).toEqual([{ text: "final answer", mediaUrls: [] }]); + }); + + it("prefixes BTW replies for external delivery", () => { + const normalized = normalizeOutboundPayloads([ + { + text: "323", + btw: { question: "what is 17 * 19?" }, + }, + ]); + expect(normalized).toEqual([{ text: "BTW: 323", mediaUrls: [] }]); + }); +}); + +describe("formatOutboundPayloadLog", () => { + it("formats text+media and media-only logs", () => { + const cases = typedCases<{ + name: string; + input: Parameters[0]; + expected: string; + }>([ + { + name: "text with media lines", + input: { + text: "hello ", + mediaUrls: ["https://x.test/a.png", "https://x.test/b.png"], + }, + expected: "hello\nMEDIA:https://x.test/a.png\nMEDIA:https://x.test/b.png", + }, + { + name: "media only", + input: { + text: "", + mediaUrls: ["https://x.test/a.png"], + }, + expected: "MEDIA:https://x.test/a.png", + }, + ]); + + for (const testCase of cases) { + expect( + formatOutboundPayloadLog({ + ...testCase.input, + mediaUrls: [...testCase.input.mediaUrls], + }), + testCase.name, + ).toBe(testCase.expected); + } + }); +}); + runResolveOutboundTargetCoreTests(); diff --git a/src/infra/outbound/payloads.ts b/src/infra/outbound/payloads.ts index 9dae6a6c1e6..754d3434445 100644 --- a/src/infra/outbound/payloads.ts +++ b/src/infra/outbound/payloads.ts @@ -1,5 +1,6 @@ import { parseReplyDirectives } from "../../auto-reply/reply/reply-directives.js"; import { + formatBtwTextForExternalDelivery, isRenderablePayload, shouldSuppressReasoningPayload, } from "../../auto-reply/reply/reply-payloads.js"; @@ -59,7 +60,11 @@ export function normalizeReplyPayloadsForDelivery( const resolvedMediaUrl = hasMultipleMedia ? undefined : explicitMediaUrl; const next: ReplyPayload = { ...payload, - text: parsed.text ?? "", + text: + formatBtwTextForExternalDelivery({ + ...payload, + text: parsed.text ?? "", + }) ?? "", mediaUrls: mergedMedia.length ? mergedMedia : undefined, mediaUrl: resolvedMediaUrl, replyToId: payload.replyToId ?? parsed.replyToId, diff --git a/src/infra/session-cost-usage.ts b/src/infra/session-cost-usage.ts index 230ebd60c2e..a795ebc17ed 100644 --- a/src/infra/session-cost-usage.ts +++ b/src/infra/session-cost-usage.ts @@ -5,6 +5,7 @@ import type { NormalizedUsage, UsageLike } from "../agents/usage.js"; import { normalizeUsage } from "../agents/usage.js"; import { stripInboundMetadata } from "../auto-reply/reply/strip-inbound-meta.js"; import type { OpenClawConfig } from "../config/config.js"; +import { isPrimarySessionTranscriptFileName } from "../config/sessions.js"; import { resolveSessionFilePath, resolveSessionTranscriptsDirForAgent, @@ -318,7 +319,7 @@ export async function loadCostUsageSummary(params?: { const files = ( await Promise.all( entries - .filter((entry) => entry.isFile() && entry.name.endsWith(".jsonl")) + .filter((entry) => entry.isFile() && isPrimarySessionTranscriptFileName(entry.name)) .map(async (entry) => { const filePath = path.join(sessionsDir, entry.name); const stats = await fs.promises.stat(filePath).catch(() => null); @@ -393,7 +394,7 @@ export async function discoverAllSessions(params?: { const discovered: DiscoveredSession[] = []; for (const entry of entries) { - if (!entry.isFile() || !entry.name.endsWith(".jsonl")) { + if (!entry.isFile() || !isPrimarySessionTranscriptFileName(entry.name)) { continue; } diff --git a/src/sessions/side-results.ts b/src/sessions/side-results.ts new file mode 100644 index 00000000000..881ca159812 --- /dev/null +++ b/src/sessions/side-results.ts @@ -0,0 +1,37 @@ +import fs from "node:fs"; +import path from "node:path"; + +export const SESSION_SIDE_RESULTS_SUFFIX = ".side-results.jsonl"; + +export type PersistedSessionSideResult = { + kind: "btw"; + question: string; + text: string; + ts: number; + isError?: boolean; + provider?: string; + model?: string; + thinkingLevel?: string; + reasoningLevel?: string; + sessionKey?: string; + authProfileId?: string; + authProfileIdSource?: "auto" | "user"; + usage?: unknown; +}; + +export function resolveSessionSideResultsPathFromTranscript(transcriptPath: string): string { + const resolved = path.resolve(transcriptPath.trim()); + return resolved.endsWith(".jsonl") + ? `${resolved.slice(0, -".jsonl".length)}${SESSION_SIDE_RESULTS_SUFFIX}` + : `${resolved}${SESSION_SIDE_RESULTS_SUFFIX}`; +} + +export function appendSessionSideResult(params: { + transcriptPath: string; + result: PersistedSessionSideResult; +}) { + const filePath = resolveSessionSideResultsPathFromTranscript(params.transcriptPath); + fs.mkdirSync(path.dirname(filePath), { recursive: true }); + fs.appendFileSync(filePath, `${JSON.stringify(params.result)}\n`, "utf-8"); + return filePath; +} diff --git a/src/tui/components/btw-inline-message.test.ts b/src/tui/components/btw-inline-message.test.ts new file mode 100644 index 00000000000..8cd323708c2 --- /dev/null +++ b/src/tui/components/btw-inline-message.test.ts @@ -0,0 +1,16 @@ +import { describe, expect, it } from "vitest"; +import { BtwInlineMessage } from "./btw-inline-message.js"; + +describe("btw inline message", () => { + it("renders the BTW question, answer, and dismiss hint inline", () => { + const message = new BtwInlineMessage({ + question: "what is 17 * 19?", + text: "323", + }); + + const rendered = message.render(80).join("\n"); + expect(rendered).toContain("BTW: what is 17 * 19?"); + expect(rendered).toContain("323"); + expect(rendered).toContain("Press Enter or Esc to dismiss"); + }); +}); diff --git a/src/tui/components/btw-inline-message.ts b/src/tui/components/btw-inline-message.ts new file mode 100644 index 00000000000..7aa813a457e --- /dev/null +++ b/src/tui/components/btw-inline-message.ts @@ -0,0 +1,28 @@ +import { Container, Spacer, Text } from "@mariozechner/pi-tui"; +import { theme } from "../theme/theme.js"; +import { AssistantMessageComponent } from "./assistant-message.js"; + +type BtwInlineMessageParams = { + question: string; + text: string; + isError?: boolean; +}; + +export class BtwInlineMessage extends Container { + constructor(params: BtwInlineMessageParams) { + super(); + this.setResult(params); + } + + setResult(params: BtwInlineMessageParams) { + this.clear(); + this.addChild(new Spacer(1)); + this.addChild(new Text(theme.header(`BTW: ${params.question}`), 1, 0)); + if (params.isError) { + this.addChild(new Text(theme.error(params.text), 1, 0)); + } else { + this.addChild(new AssistantMessageComponent(params.text)); + } + this.addChild(new Text(theme.dim("Press Enter or Esc to dismiss"), 1, 0)); + } +} diff --git a/src/tui/components/chat-log.test.ts b/src/tui/components/chat-log.test.ts index b81740a2e8c..700a2abb9d2 100644 --- a/src/tui/components/chat-log.test.ts +++ b/src/tui/components/chat-log.test.ts @@ -52,4 +52,25 @@ describe("ChatLog", () => { expect(chatLog.children.length).toBe(20); }); + + it("renders BTW inline and removes it when dismissed", () => { + const chatLog = new ChatLog(40); + + chatLog.addSystem("session agent:main:main"); + chatLog.showBtw({ + question: "what is 17 * 19?", + text: "323", + }); + + let rendered = chatLog.render(120).join("\n"); + expect(rendered).toContain("BTW: what is 17 * 19?"); + expect(rendered).toContain("323"); + expect(chatLog.hasVisibleBtw()).toBe(true); + + chatLog.dismissBtw(); + + rendered = chatLog.render(120).join("\n"); + expect(rendered).not.toContain("BTW: what is 17 * 19?"); + expect(chatLog.hasVisibleBtw()).toBe(false); + }); }); diff --git a/src/tui/components/chat-log.ts b/src/tui/components/chat-log.ts index 76ac7d93654..c46e6065b9b 100644 --- a/src/tui/components/chat-log.ts +++ b/src/tui/components/chat-log.ts @@ -2,6 +2,7 @@ import type { Component } from "@mariozechner/pi-tui"; import { Container, Spacer, Text } from "@mariozechner/pi-tui"; import { theme } from "../theme/theme.js"; import { AssistantMessageComponent } from "./assistant-message.js"; +import { BtwInlineMessage } from "./btw-inline-message.js"; import { ToolExecutionComponent } from "./tool-execution.js"; import { UserMessageComponent } from "./user-message.js"; @@ -9,6 +10,7 @@ export class ChatLog extends Container { private readonly maxComponents: number; private toolById = new Map(); private streamingRuns = new Map(); + private btwMessage: BtwInlineMessage | null = null; private toolsExpanded = false; constructor(maxComponents = 180) { @@ -27,6 +29,9 @@ export class ChatLog extends Container { this.streamingRuns.delete(runId); } } + if (this.btwMessage === component) { + this.btwMessage = null; + } } private pruneOverflow() { @@ -49,6 +54,7 @@ export class ChatLog extends Container { this.clear(); this.toolById.clear(); this.streamingRuns.clear(); + this.btwMessage = null; } addSystem(text: string) { @@ -108,6 +114,33 @@ export class ChatLog extends Container { this.streamingRuns.delete(effectiveRunId); } + showBtw(params: { question: string; text: string; isError?: boolean }) { + if (this.btwMessage) { + this.btwMessage.setResult(params); + if (this.children[this.children.length - 1] !== this.btwMessage) { + this.removeChild(this.btwMessage); + this.append(this.btwMessage); + } + return this.btwMessage; + } + const component = new BtwInlineMessage(params); + this.btwMessage = component; + this.append(component); + return component; + } + + dismissBtw() { + if (!this.btwMessage) { + return; + } + this.removeChild(this.btwMessage); + this.btwMessage = null; + } + + hasVisibleBtw() { + return this.btwMessage !== null; + } + startTool(toolCallId: string, toolName: string, args: unknown) { const existing = this.toolById.get(toolCallId); if (existing) { diff --git a/src/tui/tui-command-handlers.test.ts b/src/tui/tui-command-handlers.test.ts index 4e4bfe3c36f..87f68217e3a 100644 --- a/src/tui/tui-command-handlers.test.ts +++ b/src/tui/tui-command-handlers.test.ts @@ -12,6 +12,7 @@ function createHarness(params?: { loadHistory?: LoadHistoryMock; setActivityStatus?: SetActivityStatusMock; isConnected?: boolean; + activeChatRunId?: string | null; }) { const sendChat = params?.sendChat ?? vi.fn().mockResolvedValue({ runId: "r1" }); const resetSession = params?.resetSession ?? vi.fn().mockResolvedValue({ ok: true }); @@ -19,21 +20,23 @@ function createHarness(params?: { const addUser = vi.fn(); const addSystem = vi.fn(); const requestRender = vi.fn(); + const noteLocalRunId = vi.fn(); const loadHistory = params?.loadHistory ?? (vi.fn().mockResolvedValue(undefined) as LoadHistoryMock); const setActivityStatus = params?.setActivityStatus ?? (vi.fn() as SetActivityStatusMock); + const state = { + currentSessionKey: "agent:main:main", + activeChatRunId: params?.activeChatRunId ?? null, + isConnected: params?.isConnected ?? true, + sessionInfo: {}, + }; const { handleCommand } = createCommandHandlers({ client: { sendChat, resetSession } as never, chatLog: { addUser, addSystem } as never, tui: { requestRender } as never, opts: {}, - state: { - currentSessionKey: "agent:main:main", - activeChatRunId: null, - isConnected: params?.isConnected ?? true, - sessionInfo: {}, - } as never, + state: state as never, deliverDefault: false, openOverlay: vi.fn(), closeOverlay: vi.fn(), @@ -45,7 +48,7 @@ function createHarness(params?: { setActivityStatus, formatSessionKey: vi.fn(), applySessionInfoFromPatch: vi.fn(), - noteLocalRunId: vi.fn(), + noteLocalRunId, forgetLocalRunId: vi.fn(), requestExit: vi.fn(), }); @@ -60,6 +63,8 @@ function createHarness(params?: { requestRender, loadHistory, setActivityStatus, + noteLocalRunId, + state, }; } @@ -108,6 +113,27 @@ describe("tui command handlers", () => { expect(requestRender).toHaveBeenCalled(); }); + it("sends /btw without hijacking the active main run", async () => { + const setActivityStatus = vi.fn(); + const { handleCommand, sendChat, addUser, noteLocalRunId, state } = createHarness({ + activeChatRunId: "run-main", + setActivityStatus, + }); + + await handleCommand("/btw what changed?"); + + expect(addUser).not.toHaveBeenCalled(); + expect(noteLocalRunId).not.toHaveBeenCalled(); + expect(state.activeChatRunId).toBe("run-main"); + expect(setActivityStatus).not.toHaveBeenCalledWith("sending"); + expect(setActivityStatus).not.toHaveBeenCalledWith("waiting"); + expect(sendChat).toHaveBeenCalledWith( + expect.objectContaining({ + message: "/btw what changed?", + }), + ); + }); + it("creates unique session for /new and resets shared session for /reset", async () => { const loadHistory = vi.fn().mockResolvedValue(undefined); const setSessionMock = vi.fn().mockResolvedValue(undefined) as SetSessionMock; diff --git a/src/tui/tui-command-handlers.ts b/src/tui/tui-command-handlers.ts index dd5113a17af..9a6d63f53d8 100644 --- a/src/tui/tui-command-handlers.ts +++ b/src/tui/tui-command-handlers.ts @@ -47,6 +47,10 @@ type CommandHandlerContext = { requestExit: () => void; }; +function isBtwCommand(text: string): boolean { + return /^\/btw(?::|\s|$)/i.test(text.trim()); +} + export function createCommandHandlers(context: CommandHandlerContext) { const { client, @@ -501,13 +505,15 @@ export function createCommandHandlers(context: CommandHandlerContext) { tui.requestRender(); return; } + const isBtw = isBtwCommand(text); try { - chatLog.addUser(text); - tui.requestRender(); const runId = randomUUID(); - noteLocalRunId(runId); - state.activeChatRunId = runId; - setActivityStatus("sending"); + if (!isBtw) { + chatLog.addUser(text); + noteLocalRunId(runId); + state.activeChatRunId = runId; + setActivityStatus("sending"); + } tui.requestRender(); await client.sendChat({ sessionKey: state.currentSessionKey, @@ -517,15 +523,21 @@ export function createCommandHandlers(context: CommandHandlerContext) { timeoutMs: opts.timeoutMs, runId, }); - setActivityStatus("waiting"); - tui.requestRender(); + if (!isBtw) { + setActivityStatus("waiting"); + tui.requestRender(); + } } catch (err) { - if (state.activeChatRunId) { + if (!isBtw && state.activeChatRunId) { forgetLocalRunId?.(state.activeChatRunId); } - state.activeChatRunId = null; - chatLog.addSystem(`send failed: ${String(err)}`); - setActivityStatus("error"); + if (!isBtw) { + state.activeChatRunId = null; + } + chatLog.addSystem(`${isBtw ? "btw failed" : "send failed"}: ${String(err)}`); + if (!isBtw) { + setActivityStatus("error"); + } tui.requestRender(); } }; diff --git a/src/tui/tui-event-handlers.test.ts b/src/tui/tui-event-handlers.test.ts index 7b08ddceaf5..2e1046fc650 100644 --- a/src/tui/tui-event-handlers.test.ts +++ b/src/tui/tui-event-handlers.test.ts @@ -1,6 +1,6 @@ import { describe, expect, it, vi } from "vitest"; import { createEventHandlers } from "./tui-event-handlers.js"; -import type { AgentEvent, ChatEvent, TuiStateAccess } from "./tui-types.js"; +import type { AgentEvent, BtwEvent, ChatEvent, TuiStateAccess } from "./tui-types.js"; type MockFn = ReturnType; type HandlerChatLog = { @@ -11,6 +11,10 @@ type HandlerChatLog = { finalizeAssistant: (...args: unknown[]) => void; dropAssistant: (...args: unknown[]) => void; }; +type HandlerBtwPresenter = { + showResult: (...args: unknown[]) => void; + clear: (...args: unknown[]) => void; +}; type HandlerTui = { requestRender: (...args: unknown[]) => void }; type MockChatLog = { startTool: MockFn; @@ -20,6 +24,10 @@ type MockChatLog = { finalizeAssistant: MockFn; dropAssistant: MockFn; }; +type MockBtwPresenter = { + showResult: MockFn; + clear: MockFn; +}; type MockTui = { requestRender: MockFn }; function createMockChatLog(): MockChatLog & HandlerChatLog { @@ -33,6 +41,13 @@ function createMockChatLog(): MockChatLog & HandlerChatLog { } as unknown as MockChatLog & HandlerChatLog; } +function createMockBtwPresenter(): MockBtwPresenter & HandlerBtwPresenter { + return { + showResult: vi.fn(), + clear: vi.fn(), + } as unknown as MockBtwPresenter & HandlerBtwPresenter; +} + describe("tui-event-handlers: handleAgentEvent", () => { const makeState = (overrides?: Partial): TuiStateAccess => ({ agentDefaultId: "main", @@ -59,6 +74,7 @@ describe("tui-event-handlers: handleAgentEvent", () => { const makeContext = (state: TuiStateAccess) => { const chatLog = createMockChatLog(); + const btw = createMockBtwPresenter(); const tui = { requestRender: vi.fn() } as unknown as MockTui & HandlerTui; const setActivityStatus = vi.fn(); const loadHistory = vi.fn(); @@ -72,6 +88,7 @@ describe("tui-event-handlers: handleAgentEvent", () => { return { chatLog, + btw, tui, state, setActivityStatus, @@ -86,12 +103,14 @@ describe("tui-event-handlers: handleAgentEvent", () => { const createHandlersHarness = (params?: { state?: Partial; chatLog?: HandlerChatLog; + btw?: HandlerBtwPresenter; }) => { const state = makeState(params?.state); const context = makeContext(state); const chatLog = (params?.chatLog ?? context.chatLog) as MockChatLog & HandlerChatLog; const handlers = createEventHandlers({ chatLog, + btw: (params?.btw ?? context.btw) as MockBtwPresenter & HandlerBtwPresenter, tui: context.tui, state, setActivityStatus: context.setActivityStatus, @@ -103,6 +122,7 @@ describe("tui-event-handlers: handleAgentEvent", () => { ...context, state, chatLog, + btw: (params?.btw ?? context.btw) as MockBtwPresenter & HandlerBtwPresenter, ...handlers, }; }; @@ -212,6 +232,33 @@ describe("tui-event-handlers: handleAgentEvent", () => { expect(chatLog.updateAssistant).toHaveBeenCalledWith("hello", "run-alias"); }); + it("renders BTW results separately without disturbing the active run", () => { + const { state, btw, setActivityStatus, loadHistory, tui, handleBtwEvent } = + createHandlersHarness({ + state: { activeChatRunId: "run-main" }, + }); + + const evt: BtwEvent = { + kind: "btw", + runId: "run-btw", + sessionKey: state.currentSessionKey, + question: "what changed?", + text: "nothing important", + }; + + handleBtwEvent(evt); + + expect(state.activeChatRunId).toBe("run-main"); + expect(btw.showResult).toHaveBeenCalledWith({ + question: "what changed?", + text: "nothing important", + isError: undefined, + }); + expect(setActivityStatus).not.toHaveBeenCalled(); + expect(loadHistory).not.toHaveBeenCalled(); + expect(tui.requestRender).toHaveBeenCalledTimes(1); + }); + it("does not cross-match canonical session keys from different agents", () => { const { chatLog, handleChatEvent } = createHandlersHarness({ state: { diff --git a/src/tui/tui-event-handlers.ts b/src/tui/tui-event-handlers.ts index 54e4654ee96..2549bcbba38 100644 --- a/src/tui/tui-event-handlers.ts +++ b/src/tui/tui-event-handlers.ts @@ -1,7 +1,7 @@ import { parseAgentSessionKey } from "../sessions/session-key-utils.js"; import { asString, extractTextFromMessage, isCommandMessage } from "./tui-formatters.js"; import { TuiStreamAssembler } from "./tui-stream-assembler.js"; -import type { AgentEvent, ChatEvent, TuiStateAccess } from "./tui-types.js"; +import type { AgentEvent, BtwEvent, ChatEvent, TuiStateAccess } from "./tui-types.js"; type EventHandlerChatLog = { startTool: (toolCallId: string, toolName: string, args: unknown) => void; @@ -20,8 +20,14 @@ type EventHandlerTui = { requestRender: () => void; }; +type EventHandlerBtwPresenter = { + showResult: (params: { question: string; text: string; isError?: boolean }) => void; + clear: () => void; +}; + type EventHandlerContext = { chatLog: EventHandlerChatLog; + btw: EventHandlerBtwPresenter; tui: EventHandlerTui; state: TuiStateAccess; setActivityStatus: (text: string) => void; @@ -35,6 +41,7 @@ type EventHandlerContext = { export function createEventHandlers(context: EventHandlerContext) { const { chatLog, + btw, tui, state, setActivityStatus, @@ -81,6 +88,7 @@ export function createEventHandlers(context: EventHandlerContext) { sessionRuns.clear(); streamAssembler = new TuiStreamAssembler(); clearLocalRunIds?.(); + btw.clear(); }; const noteSessionRun = (runId: string) => { @@ -335,5 +343,30 @@ export function createEventHandlers(context: EventHandlerContext) { } }; - return { handleChatEvent, handleAgentEvent }; + const handleBtwEvent = (payload: unknown) => { + if (!payload || typeof payload !== "object") { + return; + } + const evt = payload as BtwEvent; + syncSessionKey(); + if (!isSameSessionKey(evt.sessionKey, state.currentSessionKey)) { + return; + } + if (evt.kind !== "btw") { + return; + } + const question = evt.question.trim(); + const text = evt.text.trim(); + if (!question || !text) { + return; + } + btw.showResult({ + question, + text, + isError: evt.isError, + }); + tui.requestRender(); + }; + + return { handleChatEvent, handleAgentEvent, handleBtwEvent }; } diff --git a/src/tui/tui-session-actions.test.ts b/src/tui/tui-session-actions.test.ts index 5e4a427c4a9..64c8d9e6660 100644 --- a/src/tui/tui-session-actions.test.ts +++ b/src/tui/tui-session-actions.test.ts @@ -4,6 +4,11 @@ import { createSessionActions } from "./tui-session-actions.js"; import type { TuiStateAccess } from "./tui-types.js"; describe("tui session actions", () => { + const createBtwPresenter = () => ({ + clear: vi.fn(), + showResult: vi.fn(), + }); + it("queues session refreshes and applies the latest result", async () => { let resolveFirst: ((value: unknown) => void) | undefined; let resolveSecond: ((value: unknown) => void) | undefined; @@ -52,6 +57,7 @@ describe("tui session actions", () => { const { refreshSessionInfo } = createSessionActions({ client: { listSessions } as unknown as GatewayChatClient, chatLog: { addSystem: vi.fn() } as unknown as import("./components/chat-log.js").ChatLog, + btw: createBtwPresenter(), tui: { requestRender } as unknown as import("@mariozechner/pi-tui").TUI, opts: {}, state, @@ -157,6 +163,7 @@ describe("tui session actions", () => { const { applySessionInfoFromPatch, refreshSessionInfo } = createSessionActions({ client: { listSessions } as unknown as GatewayChatClient, chatLog: { addSystem: vi.fn() } as unknown as import("./components/chat-log.js").ChatLog, + btw: createBtwPresenter(), tui: { requestRender: vi.fn() } as unknown as import("@mariozechner/pi-tui").TUI, opts: {}, state, @@ -210,7 +217,15 @@ describe("tui session actions", () => { const loadHistory = vi.fn().mockResolvedValue({ sessionId: "session-2", messages: [], + sideResults: [ + { + kind: "btw", + question: "what changed?", + text: "nothing important", + }, + ], }); + const btw = createBtwPresenter(); const state: TuiStateAccess = { agentDefaultId: "main", @@ -247,6 +262,7 @@ describe("tui session actions", () => { addSystem: vi.fn(), clearAll: vi.fn(), } as unknown as import("./components/chat-log.js").ChatLog, + btw, tui: { requestRender: vi.fn() } as unknown as import("@mariozechner/pi-tui").TUI, opts: {}, state, @@ -270,5 +286,7 @@ describe("tui session actions", () => { expect(state.sessionInfo.model).toBe("session-model"); expect(state.sessionInfo.modelProvider).toBe("openai"); expect(state.sessionInfo.updatedAt).toBe(50); + expect(btw.clear).toHaveBeenCalled(); + expect(btw.showResult).not.toHaveBeenCalled(); }); }); diff --git a/src/tui/tui-session-actions.ts b/src/tui/tui-session-actions.ts index 406b584599f..99f2b8ab2ee 100644 --- a/src/tui/tui-session-actions.ts +++ b/src/tui/tui-session-actions.ts @@ -10,9 +10,14 @@ import type { GatewayAgentsList, GatewayChatClient } from "./gateway-chat.js"; import { asString, extractTextFromMessage, isCommandMessage } from "./tui-formatters.js"; import type { SessionInfo, TuiOptions, TuiStateAccess } from "./tui-types.js"; +type SessionActionBtwPresenter = { + clear: () => void; +}; + type SessionActionContext = { client: GatewayChatClient; chatLog: ChatLog; + btw: SessionActionBtwPresenter; tui: TUI; opts: TuiOptions; state: TuiStateAccess; @@ -42,6 +47,7 @@ export function createSessionActions(context: SessionActionContext) { const { client, chatLog, + btw, tui, opts, state, @@ -298,6 +304,7 @@ export function createSessionActions(context: SessionActionContext) { state.sessionInfo.verboseLevel = record.verboseLevel ?? state.sessionInfo.verboseLevel; const showTools = (state.sessionInfo.verboseLevel ?? "off") !== "off"; chatLog.clearAll(); + btw.clear(); chatLog.addSystem(`session ${state.currentSessionKey}`); for (const entry of record.messages ?? []) { if (!entry || typeof entry !== "object") { @@ -367,6 +374,7 @@ export function createSessionActions(context: SessionActionContext) { state.sessionInfo.updatedAt = null; state.historyLoaded = false; clearLocalRunIds?.(); + btw.clear(); updateHeader(); updateFooter(); await loadHistory(); diff --git a/src/tui/tui-types.ts b/src/tui/tui-types.ts index 0f780b0a6bb..eeda9693ebf 100644 --- a/src/tui/tui-types.ts +++ b/src/tui/tui-types.ts @@ -18,6 +18,17 @@ export type ChatEvent = { errorMessage?: string; }; +export type BtwEvent = { + kind: "btw"; + runId?: string; + sessionKey?: string; + question: string; + text: string; + isError?: boolean; + seq?: number; + ts?: number; +}; + export type AgentEvent = { runId: string; stream: string; diff --git a/src/tui/tui.ts b/src/tui/tui.ts index e1eae539f50..143dfb7d62f 100644 --- a/src/tui/tui.ts +++ b/src/tui/tui.ts @@ -771,6 +771,14 @@ export async function runTui(opts: TuiOptions) { }; const { openOverlay, closeOverlay } = createOverlayHandlers(tui, editor); + const btw = { + showResult: (params: { question: string; text: string; isError?: boolean }) => { + chatLog.showBtw(params); + }, + clear: () => { + chatLog.dismissBtw(); + }, + }; const initialSessionAgentId = (() => { if (!initialSessionInput) { @@ -783,6 +791,7 @@ export async function runTui(opts: TuiOptions) { const sessionActions = createSessionActions({ client, chatLog, + btw, tui, opts, state, @@ -805,8 +814,9 @@ export async function runTui(opts: TuiOptions) { abortActive, } = sessionActions; - const { handleChatEvent, handleAgentEvent } = createEventHandlers({ + const { handleChatEvent, handleAgentEvent, handleBtwEvent } = createEventHandlers({ chatLog, + btw, tui, state, setActivityStatus, @@ -869,6 +879,11 @@ export async function runTui(opts: TuiOptions) { }); editor.onEscape = () => { + if (chatLog.hasVisibleBtw()) { + chatLog.dismissBtw(); + tui.requestRender(); + return; + } void abortActive(); }; const handleCtrlC = () => { @@ -918,10 +933,28 @@ export async function runTui(opts: TuiOptions) { void loadHistory(); }; + tui.addInputListener((data) => { + if (!chatLog.hasVisibleBtw()) { + return undefined; + } + if (editor.getText().length > 0) { + return undefined; + } + if (matchesKey(data, "enter")) { + chatLog.dismissBtw(); + tui.requestRender(); + return { consume: true }; + } + return undefined; + }); + client.onEvent = (evt) => { if (evt.event === "chat") { handleChatEvent(evt.payload); } + if (evt.event === "chat.side_result") { + handleBtwEvent(evt.payload); + } if (evt.event === "agent") { handleAgentEvent(evt.payload); }