diff --git a/src/agents/btw.test.ts b/src/agents/btw.test.ts
index b2c222f5ac1..56ec021afcf 100644
--- a/src/agents/btw.test.ts
+++ b/src/agents/btw.test.ts
@@ -18,6 +18,8 @@ const resolveSessionAuthProfileOverrideMock = vi.fn();
const getActiveEmbeddedRunSnapshotMock = vi.fn();
const waitForEmbeddedPiRunEndMock = vi.fn();
const diagWarnMock = vi.fn();
+const diagDebugMock = vi.fn();
+const appendSessionSideResultMock = vi.fn();
vi.mock("@mariozechner/pi-ai", () => ({
streamSimple: (...args: unknown[]) => streamSimpleMock(...args),
@@ -70,9 +72,14 @@ vi.mock("./auth-profiles/session-override.js", () => ({
vi.mock("../logging/diagnostic.js", () => ({
diagnosticLogger: {
warn: (...args: unknown[]) => diagWarnMock(...args),
+ debug: (...args: unknown[]) => diagDebugMock(...args),
},
}));
+vi.mock("../sessions/side-results.js", () => ({
+ appendSessionSideResult: (...args: unknown[]) => appendSessionSideResultMock(...args),
+}));
+
const { BTW_CUSTOM_TYPE, runBtwSideQuestion } = await import("./btw.js");
function makeAsyncEvents(events: unknown[]) {
@@ -113,6 +120,8 @@ describe("runBtwSideQuestion", () => {
getActiveEmbeddedRunSnapshotMock.mockReset();
waitForEmbeddedPiRunEndMock.mockReset();
diagWarnMock.mockReset();
+ diagDebugMock.mockReset();
+ appendSessionSideResultMock.mockReset();
buildSessionContextMock.mockReturnValue({
messages: [{ role: "user", content: [{ type: "text", text: "hi" }], timestamp: 1 }],
@@ -206,6 +215,14 @@ describe("runBtwSideQuestion", () => {
expect(result).toBeUndefined();
expect(onBlockReply).toHaveBeenCalledWith({ text: "Side answer." });
+ expect(appendSessionSideResultMock).toHaveBeenCalledWith({
+ transcriptPath: expect.stringContaining("session-1.jsonl"),
+ result: expect.objectContaining({
+ kind: "btw",
+ question: "What changed?",
+ text: "Side answer.",
+ }),
+ });
expect(appendCustomEntryMock).toHaveBeenCalledWith(
BTW_CUSTOM_TYPE,
expect.objectContaining({
@@ -278,6 +295,144 @@ describe("runBtwSideQuestion", () => {
).rejects.toThrow("No active session context.");
});
+ it("uses active-run snapshot messages for BTW context while the main run is in flight", async () => {
+ buildSessionContextMock.mockReturnValue({ messages: [] });
+ getActiveEmbeddedRunSnapshotMock.mockReturnValue({
+ transcriptLeafId: "assistant-1",
+ messages: [
+ {
+ role: "user",
+ content: [
+ { type: "text", text: "write some things then wait 30 seconds and write more" },
+ ],
+ timestamp: 1,
+ },
+ ],
+ });
+ streamSimpleMock.mockReturnValue(
+ makeAsyncEvents([
+ {
+ type: "done",
+ reason: "stop",
+ message: {
+ role: "assistant",
+ content: [{ type: "text", text: "323" }],
+ provider: "anthropic",
+ api: "anthropic-messages",
+ model: "claude-sonnet-4-5",
+ stopReason: "stop",
+ usage: {
+ input: 1,
+ output: 2,
+ cacheRead: 0,
+ cacheWrite: 0,
+ totalTokens: 3,
+ cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
+ },
+ timestamp: Date.now(),
+ },
+ },
+ ]),
+ );
+
+ const result = await runBtwSideQuestion({
+ cfg: {} as never,
+ agentDir: "/tmp/agent",
+ provider: "anthropic",
+ model: "claude-sonnet-4-5",
+ question: "What is 17 * 19?",
+ sessionEntry: createSessionEntry(),
+ resolvedReasoningLevel: "off",
+ opts: {},
+ isNewSession: false,
+ });
+
+ expect(result).toEqual({ text: "323" });
+ expect(streamSimpleMock).toHaveBeenCalledWith(
+ expect.anything(),
+ expect.objectContaining({
+ systemPrompt: expect.stringContaining("ephemeral /btw side question"),
+ messages: expect.arrayContaining([
+ expect.objectContaining({ role: "user" }),
+ expect.objectContaining({
+ role: "user",
+ content: [
+ {
+ type: "text",
+ text: expect.stringContaining(
+ "\nWhat is 17 * 19?\n",
+ ),
+ },
+ ],
+ }),
+ ]),
+ }),
+ expect.anything(),
+ );
+ });
+
+ it("wraps the side question so the model does not treat it as a main-task continuation", async () => {
+ streamSimpleMock.mockReturnValue(
+ makeAsyncEvents([
+ {
+ type: "done",
+ reason: "stop",
+ message: {
+ role: "assistant",
+ content: [{ type: "text", text: "About 93 million miles." }],
+ provider: "anthropic",
+ api: "anthropic-messages",
+ model: "claude-sonnet-4-5",
+ stopReason: "stop",
+ usage: {
+ input: 1,
+ output: 2,
+ cacheRead: 0,
+ cacheWrite: 0,
+ totalTokens: 3,
+ cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
+ },
+ timestamp: Date.now(),
+ },
+ },
+ ]),
+ );
+
+ await runBtwSideQuestion({
+ cfg: {} as never,
+ agentDir: "/tmp/agent",
+ provider: "anthropic",
+ model: "claude-sonnet-4-5",
+ question: "what is the distance to the sun?",
+ sessionEntry: createSessionEntry(),
+ resolvedReasoningLevel: "off",
+ opts: {},
+ isNewSession: false,
+ });
+
+ const [, context] = streamSimpleMock.mock.calls[0] ?? [];
+ expect(context).toMatchObject({
+ systemPrompt: expect.stringContaining(
+ "Do not continue, resume, or complete any unfinished task",
+ ),
+ });
+ expect(context).toMatchObject({
+ messages: expect.arrayContaining([
+ expect.objectContaining({
+ role: "user",
+ content: [
+ {
+ type: "text",
+ text: expect.stringContaining(
+ "Ignore any unfinished task in the conversation while answering it.",
+ ),
+ },
+ ],
+ }),
+ ]),
+ });
+ });
+
it("branches away from an unresolved trailing user turn before building BTW context", async () => {
getLeafEntryMock.mockReturnValue({
type: "message",
@@ -487,4 +642,77 @@ describe("runBtwSideQuestion", () => {
);
});
});
+
+ it("excludes tool results from BTW context to avoid replaying raw tool output", async () => {
+ getActiveEmbeddedRunSnapshotMock.mockReturnValue({
+ transcriptLeafId: "assistant-1",
+ messages: [
+ {
+ role: "user",
+ content: [{ type: "text", text: "seed" }],
+ timestamp: 1,
+ },
+ {
+ role: "toolResult",
+ content: [{ type: "text", text: "sensitive tool output" }],
+ details: { raw: "secret" },
+ timestamp: 2,
+ },
+ {
+ role: "assistant",
+ content: [{ type: "text", text: "done" }],
+ timestamp: 3,
+ },
+ ],
+ });
+ streamSimpleMock.mockReturnValue(
+ makeAsyncEvents([
+ {
+ type: "done",
+ reason: "stop",
+ message: {
+ role: "assistant",
+ content: [{ type: "text", text: "323" }],
+ provider: "anthropic",
+ api: "anthropic-messages",
+ model: "claude-sonnet-4-5",
+ stopReason: "stop",
+ usage: {
+ input: 1,
+ output: 2,
+ cacheRead: 0,
+ cacheWrite: 0,
+ totalTokens: 3,
+ cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
+ },
+ timestamp: Date.now(),
+ },
+ },
+ ]),
+ );
+
+ await runBtwSideQuestion({
+ cfg: {} as never,
+ agentDir: "/tmp/agent",
+ provider: "anthropic",
+ model: "claude-sonnet-4-5",
+ question: "What is 17 * 19?",
+ sessionEntry: createSessionEntry(),
+ resolvedReasoningLevel: "off",
+ opts: {},
+ isNewSession: false,
+ });
+
+ const [, context] = streamSimpleMock.mock.calls[0] ?? [];
+ expect(context).toMatchObject({
+ messages: [
+ expect.objectContaining({ role: "user" }),
+ expect.objectContaining({ role: "assistant" }),
+ expect.objectContaining({ role: "user" }),
+ ],
+ });
+ expect((context as { messages?: Array<{ role?: string }> }).messages).not.toEqual(
+ expect.arrayContaining([expect.objectContaining({ role: "toolResult" })]),
+ );
+ });
});
diff --git a/src/agents/btw.ts b/src/agents/btw.ts
index 52fd4d24a46..158272d0f38 100644
--- a/src/agents/btw.ts
+++ b/src/agents/btw.ts
@@ -16,6 +16,8 @@ import {
type SessionEntry,
} from "../config/sessions.js";
import { diagnosticLogger as diag } from "../logging/diagnostic.js";
+import { appendSessionSideResult } from "../sessions/side-results.js";
+import { stripToolResultDetails } from "./session-transcript-repair.js";
import { resolveSessionAuthProfileOverride } from "./auth-profiles/session-override.js";
import { getApiKeyForModel, requireApiKey } from "./model-auth.js";
import { ensureOpenClawModelsJson } from "./models-config.js";
@@ -60,6 +62,20 @@ type BtwCustomEntryData = {
usage?: unknown;
};
+type BtwSideResultData = {
+ timestamp: number;
+ question: string;
+ answer: string;
+ provider: string;
+ model: string;
+ thinkingLevel: ThinkLevel | "off";
+ reasoningLevel: ReasoningLevel;
+ sessionKey?: string;
+ authProfileId?: string;
+ authProfileIdSource?: "auto" | "user";
+ usage?: unknown;
+};
+
async function appendBtwCustomEntry(params: {
sessionFile: string;
timeoutMs: number;
@@ -78,6 +94,26 @@ async function appendBtwCustomEntry(params: {
}
}
+function appendBtwSideResult(params: { sessionFile: string; entry: BtwSideResultData }) {
+ appendSessionSideResult({
+ transcriptPath: params.sessionFile,
+ result: {
+ kind: "btw",
+ question: params.entry.question,
+ text: params.entry.answer,
+ ts: params.entry.timestamp,
+ provider: params.entry.provider,
+ model: params.entry.model,
+ thinkingLevel: params.entry.thinkingLevel,
+ reasoningLevel: params.entry.reasoningLevel,
+ sessionKey: params.entry.sessionKey,
+ authProfileId: params.entry.authProfileId,
+ authProfileIdSource: params.entry.authProfileIdSource,
+ usage: params.entry.usage,
+ },
+ });
+}
+
function isSessionLockError(error: unknown): boolean {
const message = error instanceof Error ? error.message : String(error);
return message.includes("session file locked");
@@ -117,14 +153,39 @@ function collectThinkingContent(content: Array<{ type?: string; thinking?: strin
.join("");
}
+function buildBtwSystemPrompt(): string {
+ return [
+ "You are answering an ephemeral /btw side question about the current conversation.",
+ "Use the conversation only as background context.",
+ "Answer only the side question in the last user message.",
+ "Do not continue, resume, or complete any unfinished task from the conversation.",
+ "Do not emit tool calls, pseudo-tool calls, shell commands, file writes, patches, or code unless the side question explicitly asks for them.",
+ "Do not say you will continue the main task after answering.",
+ "If the question can be answered briefly, answer briefly.",
+ ].join("\n");
+}
+
+function buildBtwQuestionPrompt(question: string): string {
+ return [
+ "Answer this side question only.",
+ "Ignore any unfinished task in the conversation while answering it.",
+ "",
+ "",
+ question.trim(),
+ "",
+ ].join("\n");
+}
+
function toSimpleContextMessages(messages: unknown[]): Message[] {
- return messages.filter((message): message is Message => {
+ const contextMessages = messages.filter((message): message is Message => {
if (!message || typeof message !== "object") {
return false;
}
const role = (message as { role?: unknown }).role;
- return role === "user" || role === "assistant" || role === "toolResult";
+ return role === "user" || role === "assistant";
});
+ return stripToolResultDetails(contextMessages as Parameters[0]) as
+ Message[];
}
function resolveSimpleThinkingLevel(level?: ThinkLevel): SimpleThinkingLevel | undefined {
@@ -147,7 +208,10 @@ function resolveSessionTranscriptPath(params: {
storePath: params.storePath,
});
return resolveSessionFilePath(params.sessionId, params.sessionEntry, pathOpts);
- } catch {
+ } catch (error) {
+ diag.debug(
+ `resolveSessionTranscriptPath failed: sessionId=${params.sessionId} err=${String(error)}`,
+ );
return undefined;
}
}
@@ -235,7 +299,10 @@ export async function runBtwSideQuestion(
const sessionManager = SessionManager.open(sessionFile) as SessionManagerLike;
const activeRunSnapshot = getActiveEmbeddedRunSnapshot(sessionId);
- if (activeRunSnapshot) {
+ let messages: Message[] = [];
+ if (Array.isArray(activeRunSnapshot?.messages) && activeRunSnapshot.messages.length > 0) {
+ messages = toSimpleContextMessages(activeRunSnapshot.messages);
+ } else if (activeRunSnapshot) {
if (activeRunSnapshot.transcriptLeafId && sessionManager.branch) {
sessionManager.branch(activeRunSnapshot.transcriptLeafId);
} else {
@@ -251,10 +318,12 @@ export async function runBtwSideQuestion(
}
}
}
- const sessionContext = sessionManager.buildSessionContext();
- const messages = toSimpleContextMessages(
- Array.isArray(sessionContext.messages) ? sessionContext.messages : [],
- );
+ if (messages.length === 0) {
+ const sessionContext = sessionManager.buildSessionContext();
+ messages = toSimpleContextMessages(
+ Array.isArray(sessionContext.messages) ? sessionContext.messages : [],
+ );
+ }
if (messages.length === 0) {
throw new Error("No active session context.");
}
@@ -304,11 +373,12 @@ export async function runBtwSideQuestion(
const stream = streamSimple(
model,
{
+ systemPrompt: buildBtwSystemPrompt(),
messages: [
...messages,
{
role: "user",
- content: [{ type: "text", text: params.question }],
+ content: [{ type: "text", text: buildBtwQuestionPrompt(params.question) }],
timestamp: Date.now(),
},
],
@@ -400,6 +470,16 @@ export async function runBtwSideQuestion(
usage: finalMessage?.usage,
} satisfies BtwCustomEntryData;
+ try {
+ appendBtwSideResult({
+ sessionFile,
+ entry: customEntry,
+ });
+ } catch (error) {
+ const message = error instanceof Error ? error.message : String(error);
+ diag.warn(`btw side-result persistence skipped: sessionId=${sessionId} err=${message}`);
+ }
+
try {
await appendBtwCustomEntry({
sessionFile,
diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts
index 450f8b60d1d..cb00a5ff6ff 100644
--- a/src/agents/pi-embedded-runner/run/attempt.ts
+++ b/src/agents/pi-embedded-runner/run/attempt.ts
@@ -2377,10 +2377,8 @@ export async function runEmbeddedAttempt(
`runId=${params.runId} sessionId=${params.sessionId}`,
);
}
- updateActiveEmbeddedRunSnapshot(params.sessionId, {
- transcriptLeafId:
- (sessionManager.getLeafEntry() as { id?: string } | null | undefined)?.id ?? null,
- });
+ const transcriptLeafId =
+ (sessionManager.getLeafEntry() as { id?: string } | null | undefined)?.id ?? null;
try {
// Idempotent cleanup for legacy sessions with persisted image payloads.
@@ -2459,6 +2457,19 @@ export async function runEmbeddedAttempt(
});
}
+ const btwSnapshotMessages = [
+ ...activeSession.messages,
+ {
+ role: "user",
+ content: [{ type: "text", text: effectivePrompt }],
+ timestamp: Date.now(),
+ },
+ ];
+ updateActiveEmbeddedRunSnapshot(params.sessionId, {
+ transcriptLeafId,
+ messages: btwSnapshotMessages,
+ });
+
// Only pass images option if there are actually images to pass
// This avoids potential issues with models that don't expect the images parameter
if (imageResult.images.length > 0) {
diff --git a/src/agents/pi-embedded-runner/runs.test.ts b/src/agents/pi-embedded-runner/runs.test.ts
index 21a36a183d8..b7e67cd6083 100644
--- a/src/agents/pi-embedded-runner/runs.test.ts
+++ b/src/agents/pi-embedded-runner/runs.test.ts
@@ -151,9 +151,11 @@ describe("pi-embedded runner run registry", () => {
setActiveEmbeddedRun("session-snapshot", handle);
updateActiveEmbeddedRunSnapshot("session-snapshot", {
transcriptLeafId: "assistant-1",
+ messages: [{ role: "user", content: [{ type: "text", text: "hello" }], timestamp: 1 }],
});
expect(getActiveEmbeddedRunSnapshot("session-snapshot")).toEqual({
transcriptLeafId: "assistant-1",
+ messages: [{ role: "user", content: [{ type: "text", text: "hello" }], timestamp: 1 }],
});
clearActiveEmbeddedRun("session-snapshot", handle);
diff --git a/src/agents/pi-embedded-runner/runs.ts b/src/agents/pi-embedded-runner/runs.ts
index ad853108dad..9ce76f9f5e0 100644
--- a/src/agents/pi-embedded-runner/runs.ts
+++ b/src/agents/pi-embedded-runner/runs.ts
@@ -14,6 +14,7 @@ type EmbeddedPiQueueHandle = {
export type ActiveEmbeddedRunSnapshot = {
transcriptLeafId: string | null;
+ messages?: unknown[];
};
type EmbeddedRunWaiter = {
diff --git a/src/auto-reply/reply/commands-btw.test.ts b/src/auto-reply/reply/commands-btw.test.ts
index 53e15667f91..ecc76020a5c 100644
--- a/src/auto-reply/reply/commands-btw.test.ts
+++ b/src/auto-reply/reply/commands-btw.test.ts
@@ -80,7 +80,7 @@ describe("handleBtwCommand", () => {
);
expect(result).toEqual({
shouldContinue: false,
- reply: { text: "snapshot answer" },
+ reply: { text: "snapshot answer", btw: { question: "what changed?" } },
});
});
@@ -104,7 +104,7 @@ describe("handleBtwCommand", () => {
);
expect(result).toEqual({
shouldContinue: false,
- reply: { text: "nothing important" },
+ reply: { text: "nothing important", btw: { question: "what changed?" } },
});
});
});
diff --git a/src/auto-reply/reply/commands-btw.ts b/src/auto-reply/reply/commands-btw.ts
index 2d4f8bf6a31..957435ed2e7 100644
--- a/src/auto-reply/reply/commands-btw.ts
+++ b/src/auto-reply/reply/commands-btw.ts
@@ -61,7 +61,7 @@ export const handleBtwCommand: CommandHandler = async (params, allowTextCommands
});
return {
shouldContinue: false,
- reply,
+ reply: reply ? { ...reply, btw: { question } } : reply,
};
} catch (error) {
const message = error instanceof Error ? error.message.trim() : "";
@@ -69,6 +69,8 @@ export const handleBtwCommand: CommandHandler = async (params, allowTextCommands
shouldContinue: false,
reply: {
text: `⚠️ /btw failed${message ? `: ${message}` : "."}`,
+ btw: { question },
+ isError: true,
},
};
}
diff --git a/src/auto-reply/reply/commands-types.ts b/src/auto-reply/reply/commands-types.ts
index 79da67ca8d8..effb22ec6fa 100644
--- a/src/auto-reply/reply/commands-types.ts
+++ b/src/auto-reply/reply/commands-types.ts
@@ -1,4 +1,5 @@
import type { SkillCommandSpec } from "../../agents/skills.js";
+import type { BlockReplyChunking } from "../../agents/pi-embedded-block-chunker.js";
import type { ChannelId } from "../../channels/plugins/types.js";
import type { OpenClawConfig } from "../../config/config.js";
import type { SessionEntry, SessionScope } from "../../config/sessions.js";
@@ -50,12 +51,7 @@ export type HandleCommandsParams = {
resolvedVerboseLevel: VerboseLevel;
resolvedReasoningLevel: ReasoningLevel;
resolvedElevatedLevel?: ElevatedLevel;
- blockReplyChunking?: {
- minChars: number;
- maxChars: number;
- breakPreference: "paragraph" | "newline" | "sentence";
- flushOnParagraph?: boolean;
- };
+ blockReplyChunking?: BlockReplyChunking;
resolvedBlockStreamingBreak?: "text_end" | "message_end";
resolveDefaultThinkingLevel: () => Promise;
provider: string;
diff --git a/src/auto-reply/reply/get-reply-inline-actions.ts b/src/auto-reply/reply/get-reply-inline-actions.ts
index 91bcd68eb56..f059b100e32 100644
--- a/src/auto-reply/reply/get-reply-inline-actions.ts
+++ b/src/auto-reply/reply/get-reply-inline-actions.ts
@@ -1,4 +1,5 @@
import { collectTextContentBlocks } from "../../agents/content-blocks.js";
+import type { BlockReplyChunking } from "../../agents/pi-embedded-block-chunker.js";
import { createOpenClawTools } from "../../agents/openclaw-tools.js";
import type { SkillCommandSpec } from "../../agents/skills.js";
import { applyOwnerOnlyToolPolicy } from "../../agents/tool-policy.js";
@@ -114,12 +115,7 @@ export async function handleInlineActions(params: {
resolvedVerboseLevel: VerboseLevel | undefined;
resolvedReasoningLevel: ReasoningLevel;
resolvedElevatedLevel: ElevatedLevel;
- blockReplyChunking?: {
- minChars: number;
- maxChars: number;
- breakPreference: "paragraph" | "newline" | "sentence";
- flushOnParagraph?: boolean;
- };
+ blockReplyChunking?: BlockReplyChunking;
resolvedBlockStreamingBreak?: "text_end" | "message_end";
resolveDefaultThinkingLevel: Awaited<
ReturnType
diff --git a/src/auto-reply/reply/reply-payloads.ts b/src/auto-reply/reply/reply-payloads.ts
index 5a20d4ba950..e46ae422a9f 100644
--- a/src/auto-reply/reply/reply-payloads.ts
+++ b/src/auto-reply/reply/reply-payloads.ts
@@ -10,6 +10,17 @@ import type { ReplyPayload } from "../types.js";
import { extractReplyToTag } from "./reply-tags.js";
import { createReplyToModeFilterForChannel } from "./reply-threading.js";
+export function formatBtwTextForExternalDelivery(payload: ReplyPayload): string | undefined {
+ const text = payload.text?.trim();
+ if (!text) {
+ return payload.text;
+ }
+ if (!payload.btw?.question?.trim()) {
+ return payload.text;
+ }
+ return text.startsWith("BTW:") ? text : `BTW: ${text}`;
+}
+
function resolveReplyThreadingForPayload(params: {
payload: ReplyPayload;
implicitReplyToId?: string;
diff --git a/src/auto-reply/reply/route-reply.test.ts b/src/auto-reply/reply/route-reply.test.ts
index b0a2d393738..a656c5a0084 100644
--- a/src/auto-reply/reply/route-reply.test.ts
+++ b/src/auto-reply/reply/route-reply.test.ts
@@ -294,6 +294,36 @@ describe("routeReply", () => {
);
});
+ it("prefixes BTW replies on routed sends", async () => {
+ mocks.sendMessageSlack.mockClear();
+ await routeReply({
+ payload: { text: "323", btw: { question: "what is 17 * 19?" } },
+ channel: "slack",
+ to: "channel:C123",
+ cfg: {} as never,
+ });
+ expect(mocks.sendMessageSlack).toHaveBeenCalledWith(
+ "channel:C123",
+ "BTW: 323",
+ expect.any(Object),
+ );
+ });
+
+ it("prefixes BTW replies on routed discord sends", async () => {
+ mocks.sendMessageDiscord.mockClear();
+ await routeReply({
+ payload: { text: "323", btw: { question: "what is 17 * 19?" } },
+ channel: "discord",
+ to: "channel:123456",
+ cfg: {} as never,
+ });
+ expect(mocks.sendMessageDiscord).toHaveBeenCalledWith(
+ "channel:123456",
+ "BTW: 323",
+ expect.any(Object),
+ );
+ });
+
it("passes replyToId to Telegram sends", async () => {
mocks.sendMessageTelegram.mockClear();
await routeReply({
diff --git a/src/auto-reply/reply/route-reply.ts b/src/auto-reply/reply/route-reply.ts
index a6f863d7d18..a698b53cf9f 100644
--- a/src/auto-reply/reply/route-reply.ts
+++ b/src/auto-reply/reply/route-reply.ts
@@ -18,7 +18,10 @@ import { INTERNAL_MESSAGE_CHANNEL, normalizeMessageChannel } from "../../utils/m
import type { OriginatingChannelType } from "../templating.js";
import type { ReplyPayload } from "../types.js";
import { normalizeReplyPayload } from "./normalize-reply.js";
-import { shouldSuppressReasoningPayload } from "./reply-payloads.js";
+import {
+ formatBtwTextForExternalDelivery,
+ shouldSuppressReasoningPayload,
+} from "./reply-payloads.js";
let deliverRuntimePromise: Promise<
typeof import("../../infra/outbound/deliver-runtime.js")
@@ -102,24 +105,28 @@ export async function routeReply(params: RouteReplyParams): Promise entry.isFile() && isPrimarySessionTranscriptFileName(entry.name))
+ .filter(
+ (entry) =>
+ entry.isFile() &&
+ (isPrimarySessionTranscriptFileName(entry.name) ||
+ isSessionSideResultsArtifactName(entry.name)),
+ )
.map((entry) => path.resolve(path.join(sessionsDir, entry.name)))
.filter((filePath) => !referencedTranscriptPaths.has(filePath));
if (orphanTranscriptPaths.length > 0) {
diff --git a/src/commands/sessions-cleanup.ts b/src/commands/sessions-cleanup.ts
index a0b1d072386..9f5b12f56b1 100644
--- a/src/commands/sessions-cleanup.ts
+++ b/src/commands/sessions-cleanup.ts
@@ -13,6 +13,7 @@ import {
type SessionMaintenanceApplyReport,
} from "../config/sessions.js";
import type { RuntimeEnv } from "../runtime.js";
+import { resolveSessionSideResultsPathFromTranscript } from "../sessions/side-results.js";
import { isRich, theme } from "../terminal/theme.js";
import {
resolveSessionStoreTargetsOrExit,
@@ -147,6 +148,11 @@ function pruneMissingTranscriptEntries(params: {
}
const transcriptPath = resolveSessionFilePath(entry.sessionId, entry, sessionPathOpts);
if (!fs.existsSync(transcriptPath)) {
+ try {
+ fs.rmSync(resolveSessionSideResultsPathFromTranscript(transcriptPath), { force: true });
+ } catch {
+ // Best-effort cleanup for orphan BTW sidecars when the primary transcript is gone.
+ }
delete params.store[key];
removed += 1;
params.onPruned?.(key);
diff --git a/src/config/sessions/artifacts.test.ts b/src/config/sessions/artifacts.test.ts
index b8c438a9eca..679578563ab 100644
--- a/src/config/sessions/artifacts.test.ts
+++ b/src/config/sessions/artifacts.test.ts
@@ -3,6 +3,7 @@ import {
formatSessionArchiveTimestamp,
isPrimarySessionTranscriptFileName,
isSessionArchiveArtifactName,
+ isSessionSideResultsArtifactName,
parseSessionArchiveTimestamp,
} from "./artifacts.js";
@@ -19,12 +20,21 @@ describe("session artifact helpers", () => {
it("classifies primary transcript files", () => {
expect(isPrimarySessionTranscriptFileName("abc.jsonl")).toBe(true);
expect(isPrimarySessionTranscriptFileName("keep.deleted.keep.jsonl")).toBe(true);
+ expect(isPrimarySessionTranscriptFileName("abc.side-results.jsonl")).toBe(false);
expect(isPrimarySessionTranscriptFileName("abc.jsonl.deleted.2026-01-01T00-00-00.000Z")).toBe(
false,
);
expect(isPrimarySessionTranscriptFileName("sessions.json")).toBe(false);
});
+ it("classifies BTW side-result artifacts separately from transcripts", () => {
+ expect(isSessionSideResultsArtifactName("abc.side-results.jsonl")).toBe(true);
+ expect(
+ isSessionSideResultsArtifactName("abc.side-results.jsonl.deleted.2026-01-01T00-00-00.000Z"),
+ ).toBe(true);
+ expect(isSessionSideResultsArtifactName("abc.jsonl")).toBe(false);
+ });
+
it("formats and parses archive timestamps", () => {
const now = Date.parse("2026-02-23T12:34:56.000Z");
const stamp = formatSessionArchiveTimestamp(now);
diff --git a/src/config/sessions/artifacts.ts b/src/config/sessions/artifacts.ts
index c851f7967fc..d561bbc0196 100644
--- a/src/config/sessions/artifacts.ts
+++ b/src/config/sessions/artifacts.ts
@@ -24,6 +24,10 @@ export function isSessionArchiveArtifactName(fileName: string): boolean {
);
}
+export function isSessionSideResultsArtifactName(fileName: string): boolean {
+ return fileName.endsWith(".side-results.jsonl") || fileName.includes(".side-results.jsonl.");
+}
+
export function isPrimarySessionTranscriptFileName(fileName: string): boolean {
if (fileName === "sessions.json") {
return false;
@@ -31,6 +35,9 @@ export function isPrimarySessionTranscriptFileName(fileName: string): boolean {
if (!fileName.endsWith(".jsonl")) {
return false;
}
+ if (isSessionSideResultsArtifactName(fileName)) {
+ return false;
+ }
return !isSessionArchiveArtifactName(fileName);
}
diff --git a/src/config/sessions/disk-budget.ts b/src/config/sessions/disk-budget.ts
index 078acd904bf..4309ca6f7f3 100644
--- a/src/config/sessions/disk-budget.ts
+++ b/src/config/sessions/disk-budget.ts
@@ -1,6 +1,11 @@
import fs from "node:fs";
import path from "node:path";
-import { isPrimarySessionTranscriptFileName, isSessionArchiveArtifactName } from "./artifacts.js";
+import { resolveSessionSideResultsPathFromTranscript } from "../../sessions/side-results.js";
+import {
+ isPrimarySessionTranscriptFileName,
+ isSessionArchiveArtifactName,
+ isSessionSideResultsArtifactName,
+} from "./artifacts.js";
import { resolveSessionFilePath } from "./paths.js";
import type { SessionEntry } from "./types.js";
@@ -123,6 +128,9 @@ function resolveReferencedSessionTranscriptPaths(params: {
});
if (resolved) {
referenced.add(canonicalizePathForComparison(resolved));
+ referenced.add(
+ canonicalizePathForComparison(resolveSessionSideResultsPathFromTranscript(resolved)),
+ );
}
}
return referenced;
@@ -255,7 +263,9 @@ export async function enforceSessionDiskBudget(params: {
.filter(
(file) =>
isSessionArchiveArtifactName(file.name) ||
- (isPrimarySessionTranscriptFileName(file.name) && !referencedPaths.has(file.canonicalPath)),
+ ((isPrimarySessionTranscriptFileName(file.name) ||
+ isSessionSideResultsArtifactName(file.name)) &&
+ !referencedPaths.has(file.canonicalPath)),
)
.toSorted((a, b) => a.mtimeMs - b.mtimeMs);
for (const file of removableFileQueue) {
@@ -336,6 +346,18 @@ export async function enforceSessionDiskBudget(params: {
total -= deletedBytes;
freedBytes += deletedBytes;
removedFiles += 1;
+ const sideResultsPath = resolveSessionSideResultsPathFromTranscript(transcriptPath);
+ const deletedSideResultsBytes = await removeFileForBudget({
+ filePath: sideResultsPath,
+ dryRun,
+ fileSizesByPath,
+ simulatedRemovedPaths,
+ });
+ if (deletedSideResultsBytes > 0) {
+ total -= deletedSideResultsBytes;
+ freedBytes += deletedSideResultsBytes;
+ removedFiles += 1;
+ }
}
}
diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts
index 3b506c052c0..b7a408e8812 100644
--- a/src/gateway/server-methods/chat.ts
+++ b/src/gateway/server-methods/chat.ts
@@ -8,6 +8,7 @@ import { dispatchInboundMessage } from "../../auto-reply/dispatch.js";
import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js";
import type { MsgContext } from "../../auto-reply/templating.js";
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js";
+import type { ReplyPayload } from "../../auto-reply/types.js";
import { createReplyPrefixOptions } from "../../channels/reply-prefix.js";
import { resolveSessionFilePath } from "../../config/sessions.js";
import { jsonUtf8Bytes } from "../../infra/json-utf8-bytes.js";
@@ -55,6 +56,7 @@ import {
capArrayByJsonBytes,
loadSessionEntry,
readSessionMessages,
+ readSessionSideResults,
resolveSessionModelRef,
} from "../session-utils.js";
import { formatForLog } from "../ws-log.js";
@@ -130,6 +132,16 @@ type ChatSendOriginatingRoute = {
explicitDeliverRoute: boolean;
};
+type SideResultPayload = {
+ kind: "btw";
+ runId: string;
+ sessionKey: string;
+ question: string;
+ text: string;
+ isError?: boolean;
+ ts: number;
+};
+
function resolveChatSendOriginatingRoute(params: {
client?: { mode?: string | null; id?: string | null } | null;
deliver?: boolean;
@@ -900,6 +912,34 @@ function broadcastChatFinal(params: {
params.context.agentRunSeq.delete(params.runId);
}
+function isBtwReplyPayload(payload: ReplyPayload | undefined): payload is ReplyPayload & {
+ btw: { question: string };
+ text: string;
+} {
+ return (
+ typeof payload?.btw?.question === "string" &&
+ payload.btw.question.trim().length > 0 &&
+ typeof payload.text === "string" &&
+ payload.text.trim().length > 0
+ );
+}
+
+function broadcastSideResult(params: {
+ context: Pick;
+ payload: SideResultPayload;
+}) {
+ const seq = nextChatSeq({ agentRunSeq: params.context.agentRunSeq }, params.payload.runId);
+ params.context.broadcast("chat.side_result", {
+ ...params.payload,
+ seq,
+ });
+ params.context.nodeSendToSession(params.payload.sessionKey, "chat.side_result", {
+ ...params.payload,
+ seq,
+ });
+ params.context.agentRunSeq.delete(params.payload.runId);
+}
+
function broadcastChatError(params: {
context: Pick;
runId: string;
@@ -940,6 +980,10 @@ export const chatHandlers: GatewayRequestHandlers = {
const sessionId = entry?.sessionId;
const rawMessages =
sessionId && storePath ? readSessionMessages(sessionId, storePath, entry?.sessionFile) : [];
+ const sideResults =
+ sessionId && storePath
+ ? readSessionSideResults(sessionId, storePath, entry?.sessionFile)
+ : [];
const hardMax = 1000;
const defaultLimit = 200;
const requested = typeof limit === "number" ? limit : defaultLimit;
@@ -979,6 +1023,7 @@ export const chatHandlers: GatewayRequestHandlers = {
sessionKey,
sessionId,
messages: bounded.messages,
+ sideResults,
thinkingLevel,
fastMode: entry?.fastMode,
verboseLevel,
@@ -1284,7 +1329,7 @@ export const chatHandlers: GatewayRequestHandlers = {
agentId,
channel: INTERNAL_MESSAGE_CHANNEL,
});
- const finalReplyParts: string[] = [];
+ const finalReplies: ReplyPayload[] = [];
const dispatcher = createReplyDispatcher({
...prefixOptions,
onError: (err) => {
@@ -1294,11 +1339,7 @@ export const chatHandlers: GatewayRequestHandlers = {
if (info.kind !== "final") {
return;
}
- const text = payload.text?.trim() ?? "";
- if (!text) {
- return;
- }
- finalReplyParts.push(text);
+ finalReplies.push(payload);
},
});
@@ -1335,48 +1376,64 @@ export const chatHandlers: GatewayRequestHandlers = {
})
.then(() => {
if (!agentRunStarted) {
- const combinedReply = finalReplyParts
- .map((part) => part.trim())
- .filter(Boolean)
- .join("\n\n")
- .trim();
- let message: Record | undefined;
- if (combinedReply) {
- const { storePath: latestStorePath, entry: latestEntry } =
- loadSessionEntry(sessionKey);
- const sessionId = latestEntry?.sessionId ?? entry?.sessionId ?? clientRunId;
- const appended = appendAssistantTranscriptMessage({
- message: combinedReply,
- sessionId,
- storePath: latestStorePath,
- sessionFile: latestEntry?.sessionFile,
- agentId,
- createIfMissing: true,
+ const btwReply = finalReplies.length === 1 ? finalReplies[0] : undefined;
+ if (isBtwReplyPayload(btwReply)) {
+ broadcastSideResult({
+ context,
+ payload: {
+ kind: "btw",
+ runId: clientRunId,
+ sessionKey: rawSessionKey,
+ question: btwReply.btw.question.trim(),
+ text: btwReply.text.trim(),
+ isError: btwReply.isError,
+ ts: Date.now(),
+ },
});
- if (appended.ok) {
- message = appended.message;
- } else {
- context.logGateway.warn(
- `webchat transcript append failed: ${appended.error ?? "unknown error"}`,
- );
- const now = Date.now();
- message = {
- role: "assistant",
- content: [{ type: "text", text: combinedReply }],
- timestamp: now,
- // Keep this compatible with Pi stopReason enums even though this message isn't
- // persisted to the transcript due to the append failure.
- stopReason: "stop",
- usage: { input: 0, output: 0, totalTokens: 0 },
- };
+ } else {
+ const combinedReply = finalReplies
+ .map((part) => part.text?.trim() ?? "")
+ .filter(Boolean)
+ .join("\n\n")
+ .trim();
+ let message: Record | undefined;
+ if (combinedReply) {
+ const { storePath: latestStorePath, entry: latestEntry } =
+ loadSessionEntry(sessionKey);
+ const sessionId = latestEntry?.sessionId ?? entry?.sessionId ?? clientRunId;
+ const appended = appendAssistantTranscriptMessage({
+ message: combinedReply,
+ sessionId,
+ storePath: latestStorePath,
+ sessionFile: latestEntry?.sessionFile,
+ agentId,
+ createIfMissing: true,
+ });
+ if (appended.ok) {
+ message = appended.message;
+ } else {
+ context.logGateway.warn(
+ `webchat transcript append failed: ${appended.error ?? "unknown error"}`,
+ );
+ const now = Date.now();
+ message = {
+ role: "assistant",
+ content: [{ type: "text", text: combinedReply }],
+ timestamp: now,
+ // Keep this compatible with Pi stopReason enums even though this message isn't
+ // persisted to the transcript due to the append failure.
+ stopReason: "stop",
+ usage: { input: 0, output: 0, totalTokens: 0 },
+ };
+ }
}
+ broadcastChatFinal({
+ context,
+ runId: clientRunId,
+ sessionKey: rawSessionKey,
+ message,
+ });
}
- broadcastChatFinal({
- context,
- runId: clientRunId,
- sessionKey: rawSessionKey,
- message,
- });
}
setGatewayDedupeEntry({
dedupe: context.dedupe,
diff --git a/src/gateway/server.chat.gateway-server-chat.test.ts b/src/gateway/server.chat.gateway-server-chat.test.ts
index 9ecd16e35d3..666cbf47cc2 100644
--- a/src/gateway/server.chat.gateway-server-chat.test.ts
+++ b/src/gateway/server.chat.gateway-server-chat.test.ts
@@ -4,6 +4,7 @@ import path from "node:path";
import { describe, expect, test, vi } from "vitest";
import { WebSocket } from "ws";
import { emitAgentEvent, registerAgentRunContext } from "../infra/agent-events.js";
+import { resolveSessionSideResultsPathFromTranscript } from "../sessions/side-results.js";
import { extractFirstTextBlock } from "../shared/chat-message-content.js";
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js";
import {
@@ -497,6 +498,121 @@ describe("gateway server chat", () => {
});
});
+ test("routes /btw replies through side-result events without transcript injection", async () => {
+ await withMainSessionStore(async () => {
+ const replyMock = vi.mocked(getReplyFromConfig);
+ replyMock.mockResolvedValueOnce({
+ text: "323",
+ btw: { question: "what is 17 * 19?" },
+ });
+ const sideResultPromise = onceMessage(
+ ws,
+ (o) =>
+ o.type === "event" &&
+ o.event === "chat.side_result" &&
+ o.payload?.kind === "btw" &&
+ o.payload?.runId === "idem-btw-1",
+ 8000,
+ );
+
+ const res = await rpcReq(ws, "chat.send", {
+ sessionKey: "main",
+ message: "/btw what is 17 * 19?",
+ idempotencyKey: "idem-btw-1",
+ });
+
+ expect(res.ok).toBe(true);
+ const sideResult = await sideResultPromise;
+ expect(sideResult.payload).toMatchObject({
+ kind: "btw",
+ runId: "idem-btw-1",
+ sessionKey: "main",
+ question: "what is 17 * 19?",
+ text: "323",
+ });
+
+ const historyRes = await rpcReq<{ messages?: unknown[] }>(ws, "chat.history", {
+ sessionKey: "main",
+ });
+ expect(historyRes.ok).toBe(true);
+ expect(historyRes.payload?.messages ?? []).toEqual([]);
+ });
+ });
+
+ test("chat.history returns BTW side results separately from normal messages", async () => {
+ await withMainSessionStore(async (dir) => {
+ const transcriptPath = path.join(dir, "sess-main.jsonl");
+ await fs.writeFile(
+ transcriptPath,
+ [
+ JSON.stringify({
+ type: "custom",
+ customType: "openclaw:btw",
+ data: {
+ timestamp: 123,
+ question: "what changed?",
+ answer: "nothing important",
+ },
+ }),
+ ].join("\n"),
+ "utf-8",
+ );
+
+ const historyRes = await rpcReq<{ messages?: unknown[]; sideResults?: unknown[] }>(
+ ws,
+ "chat.history",
+ {
+ sessionKey: "main",
+ },
+ );
+ expect(historyRes.ok).toBe(true);
+ expect(historyRes.payload?.messages ?? []).toEqual([]);
+ expect(historyRes.payload?.sideResults ?? []).toEqual([
+ {
+ kind: "btw",
+ question: "what changed?",
+ text: "nothing important",
+ ts: 123,
+ },
+ ]);
+ });
+ });
+
+ test("chat.history replays BTW side results from the sidecar file after transcript compaction", async () => {
+ await withMainSessionStore(async (dir) => {
+ const transcriptPath = path.join(dir, "sess-main.jsonl");
+ await fs.writeFile(transcriptPath, JSON.stringify({ type: "session", version: 1 }) + "\n");
+ await fs.writeFile(
+ resolveSessionSideResultsPathFromTranscript(transcriptPath),
+ JSON.stringify({
+ kind: "btw",
+ question: "what changed?",
+ text: "still working",
+ ts: 456,
+ }) + "\n",
+ "utf-8",
+ );
+
+ const historyRes = await rpcReq<{ messages?: unknown[]; sideResults?: unknown[] }>(
+ ws,
+ "chat.history",
+ {
+ sessionKey: "main",
+ },
+ );
+ expect(historyRes.ok).toBe(true);
+ expect(historyRes.payload?.messages ?? []).toEqual([]);
+ expect(historyRes.payload?.sideResults ?? []).toEqual([
+ {
+ kind: "btw",
+ question: "what changed?",
+ text: "still working",
+ ts: 456,
+ },
+ ]);
+ });
+ });
+
test("chat.history hides assistant NO_REPLY-only entries and keeps mixed-content assistant entries", async () => {
const historyMessages = await loadChatHistoryWithMessages(buildNoReplyHistoryFixture(true));
const roleAndText = historyMessages
diff --git a/src/gateway/server.sessions.gateway-server-sessions-a.test.ts b/src/gateway/server.sessions.gateway-server-sessions-a.test.ts
index 034020a61fe..bdf779c6929 100644
--- a/src/gateway/server.sessions.gateway-server-sessions-a.test.ts
+++ b/src/gateway/server.sessions.gateway-server-sessions-a.test.ts
@@ -4,6 +4,7 @@ import path from "node:path";
import { afterAll, beforeAll, beforeEach, describe, expect, test, vi } from "vitest";
import { WebSocket } from "ws";
import { DEFAULT_PROVIDER } from "../agents/defaults.js";
+import { resolveSessionSideResultsPathFromTranscript } from "../sessions/side-results.js";
import { GATEWAY_CLIENT_IDS, GATEWAY_CLIENT_MODES } from "./protocol/client-info.js";
import { startGatewayServerHarness, type GatewayServerHarness } from "./server.e2e-ws-harness.js";
import { createToolSummaryPreviewTranscriptLines } from "./session-preview.test-helpers.js";
@@ -752,6 +753,66 @@ describe("gateway server sessions", () => {
ws.close();
});
+ test("sessions.compact keeps BTW side results available via the sidecar file", async () => {
+ const { dir } = await createSessionStoreDir();
+ const sessionId = "sess-compact-btw";
+ const transcriptPath = path.join(dir, `${sessionId}.jsonl`);
+ await fs.writeFile(
+ transcriptPath,
+ [
+ JSON.stringify({ type: "session", version: 1, id: sessionId }),
+ JSON.stringify({ message: { role: "user", content: "hello" } }),
+ JSON.stringify({ message: { role: "assistant", content: "hi" } }),
+ JSON.stringify({
+ type: "custom",
+ customType: "openclaw:btw",
+ data: { timestamp: 123, question: "what changed?", answer: "nothing important" },
+ }),
+ JSON.stringify({ message: { role: "user", content: "later" } }),
+ JSON.stringify({ message: { role: "assistant", content: "done" } }),
+ ].join("\n"),
+ "utf-8",
+ );
+ await fs.writeFile(
+ resolveSessionSideResultsPathFromTranscript(transcriptPath),
+ JSON.stringify({
+ kind: "btw",
+ question: "what changed?",
+ text: "nothing important",
+ ts: 123,
+ }) + "\n",
+ "utf-8",
+ );
+ await writeSessionStore({
+ entries: {
+ main: { sessionId, updatedAt: Date.now() },
+ },
+ });
+
+ const { ws } = await openClient();
+ const compacted = await rpcReq<{ ok: true; compacted: boolean }>(ws, "sessions.compact", {
+ key: "main",
+ maxLines: 2,
+ });
+ expect(compacted.ok).toBe(true);
+ expect(compacted.payload?.compacted).toBe(true);
+
+ const history = await rpcReq<{ sideResults?: unknown[] }>(ws, "chat.history", {
+ sessionKey: "main",
+ });
+ expect(history.ok).toBe(true);
+ expect(history.payload?.sideResults ?? []).toEqual([
+ {
+ kind: "btw",
+ question: "what changed?",
+ text: "nothing important",
+ ts: 123,
+ },
+ ]);
+
+ ws.close();
+ });
+
test("sessions.delete rejects main and aborts active runs", async () => {
const { dir } = await createSessionStoreDir();
await writeSingleLineSession(dir, "sess-main", "hello");
diff --git a/src/gateway/session-utils.fs.test.ts b/src/gateway/session-utils.fs.test.ts
index 09ab7e2cda2..1160c52ee1e 100644
--- a/src/gateway/session-utils.fs.test.ts
+++ b/src/gateway/session-utils.fs.test.ts
@@ -2,12 +2,14 @@ import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterAll, afterEach, beforeAll, describe, expect, test, vi } from "vitest";
+import { resolveSessionSideResultsPathFromTranscript } from "../sessions/side-results.js";
import { createToolSummaryPreviewTranscriptLines } from "./session-preview.test-helpers.js";
import {
archiveSessionTranscripts,
readFirstUserMessageFromTranscript,
readLastMessagePreviewFromTranscript,
readSessionMessages,
+ readSessionSideResults,
readSessionTitleFieldsFromTranscript,
readSessionPreviewItemsFromTranscript,
resolveSessionTranscriptCandidates,
@@ -760,6 +762,32 @@ describe("archiveSessionTranscripts", () => {
}
});
+ test("archives BTW side-result sidecars alongside transcripts", () => {
+ const sessionId = "sess-archive-side-results";
+ const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`);
+ const sidecarPath = resolveSessionSideResultsPathFromTranscript(transcriptPath);
+ fs.writeFileSync(transcriptPath, '{"type":"session"}\n', "utf-8");
+ fs.writeFileSync(
+ sidecarPath,
+ `${JSON.stringify({ kind: "btw", question: "q", text: "a", ts: 123 })}\n`,
+ "utf-8",
+ );
+
+ const archived = archiveSessionTranscripts({
+ sessionId,
+ storePath,
+ reason: "reset",
+ });
+
+ expect(archived).toHaveLength(2);
+ expect(archived.some((entry) => entry.includes(`${sessionId}.jsonl.reset.`))).toBe(true);
+ expect(archived.some((entry) => entry.includes(`${sessionId}.side-results.jsonl.reset.`))).toBe(
+ true,
+ );
+ expect(fs.existsSync(transcriptPath)).toBe(false);
+ expect(fs.existsSync(sidecarPath)).toBe(false);
+ });
+
test("returns empty array when no transcript files exist", () => {
const archived = archiveSessionTranscripts({
sessionId: "nonexistent-session",
@@ -787,3 +815,77 @@ describe("archiveSessionTranscripts", () => {
expect(fs.existsSync(transcriptPath)).toBe(false);
});
});
+
+describe("readSessionSideResults", () => {
+ let tmpDir: string;
+ let storePath: string;
+
+ registerTempSessionStore("openclaw-side-results-test-", (nextTmpDir, nextStorePath) => {
+ tmpDir = nextTmpDir;
+ storePath = nextStorePath;
+ });
+
+ test("reads BTW results from transcript custom entries", () => {
+ const sessionId = "sess-btw-transcript";
+ writeTranscript(tmpDir, sessionId, [
+ { type: "session", version: 1, id: sessionId },
+ {
+ type: "custom",
+ customType: "openclaw:btw",
+ data: {
+ timestamp: 10,
+ question: "what changed?",
+ answer: "nothing",
+ },
+ },
+ ]);
+
+ expect(readSessionSideResults(sessionId, storePath)).toEqual([
+ {
+ kind: "btw",
+ question: "what changed?",
+ text: "nothing",
+ ts: 10,
+ },
+ ]);
+ });
+
+ test("merges BTW sidecar records and dedupes transcript duplicates", () => {
+ const sessionId = "sess-btw-sidecar";
+ const transcriptPath = writeTranscript(tmpDir, sessionId, [
+ { type: "session", version: 1, id: sessionId },
+ {
+ type: "custom",
+ customType: "openclaw:btw",
+ data: {
+ timestamp: 10,
+ question: "what changed?",
+ answer: "nothing",
+ },
+ },
+ ]);
+ fs.writeFileSync(
+ resolveSessionSideResultsPathFromTranscript(transcriptPath),
+ [
+ JSON.stringify({ kind: "btw", question: "what changed?", text: "nothing", ts: 10 }),
+ JSON.stringify({ kind: "btw", question: "what now?", text: "keep going", ts: 20 }),
+ ].join("\n"),
+ "utf-8",
+ );
+
+ expect(readSessionSideResults(sessionId, storePath)).toEqual([
+ {
+ kind: "btw",
+ question: "what changed?",
+ text: "nothing",
+ ts: 10,
+ },
+ {
+ kind: "btw",
+ question: "what now?",
+ text: "keep going",
+ ts: 20,
+ },
+ ]);
+ });
+});
diff --git a/src/gateway/session-utils.fs.ts b/src/gateway/session-utils.fs.ts
index 3712c8c8272..52eb2ff40c0 100644
--- a/src/gateway/session-utils.fs.ts
+++ b/src/gateway/session-utils.fs.ts
@@ -12,6 +12,10 @@ import {
import { resolveRequiredHomeDir } from "../infra/home-dir.js";
import { jsonUtf8Bytes } from "../infra/json-utf8-bytes.js";
import { hasInterSessionUserProvenance } from "../sessions/input-provenance.js";
+import {
+ resolveSessionSideResultsPathFromTranscript,
+ type PersistedSessionSideResult,
+} from "../sessions/side-results.js";
import { stripInlineDirectiveTagsForDisplay } from "../utils/directive-tags.js";
import { extractToolCallNames, hasToolCall } from "../utils/transcript-tools.js";
import { stripEnvelope } from "./chat-sanitize.js";
@@ -22,6 +26,16 @@ type SessionTitleFields = {
lastMessagePreview: string | null;
};
+const BTW_CUSTOM_TYPE = "openclaw:btw";
+
+export type SessionSideResult = {
+ kind: "btw";
+ question: string;
+ text: string;
+ isError?: boolean;
+ ts?: number;
+};
+
type SessionTitleFieldsCacheEntry = SessionTitleFields & {
mtimeMs: number;
size: number;
@@ -118,6 +132,126 @@ export function readSessionMessages(
return messages;
}
+export function readSessionSideResults(
+ sessionId: string,
+ storePath: string | undefined,
+ sessionFile?: string,
+): SessionSideResult[] {
+ const candidates = resolveSessionTranscriptCandidates(sessionId, storePath, sessionFile);
+ const resultsByKey = new Map();
+
+ const pushResult = (result: SessionSideResult | null) => {
+ if (!result) {
+ return;
+ }
+ const dedupeKey = [
+ result.kind,
+ result.ts ?? "",
+ result.question,
+ result.text,
+ result.isError ? "1" : "0",
+ ].join("\u0000");
+ if (!resultsByKey.has(dedupeKey)) {
+ resultsByKey.set(dedupeKey, result);
+ }
+ };
+
+ for (const filePath of candidates.filter((p) => fs.existsSync(p))) {
+ const lines = fs.readFileSync(filePath, "utf-8").split(/\r?\n/);
+ for (const line of lines) {
+ if (!line.trim()) {
+ continue;
+ }
+ try {
+ const parsed = JSON.parse(line) as {
+ type?: string;
+ customType?: string;
+ data?: { question?: unknown; answer?: unknown; timestamp?: unknown; isError?: unknown };
+ };
+ pushResult(parseTranscriptSideResult(parsed));
+ } catch {
+ // ignore bad lines
+ }
+ }
+ }
+
+ const sideResultPaths = new Set(
+ candidates.map((candidate) => resolveSessionSideResultsPathFromTranscript(candidate)),
+ );
+ for (const filePath of sideResultPaths) {
+ if (!fs.existsSync(filePath)) {
+ continue;
+ }
+ const lines = fs.readFileSync(filePath, "utf-8").split(/\r?\n/);
+ for (const line of lines) {
+ if (!line.trim()) {
+ continue;
+ }
+ try {
+ const parsed = JSON.parse(line) as PersistedSessionSideResult;
+ pushResult(parsePersistedSideResult(parsed));
+ } catch {
+ // ignore bad lines
+ }
+ }
+ }
+
+ return [...resultsByKey.values()].toSorted((left, right) => {
+ const leftTs = left.ts ?? Number.MAX_SAFE_INTEGER;
+ const rightTs = right.ts ?? Number.MAX_SAFE_INTEGER;
+ if (leftTs !== rightTs) {
+ return leftTs - rightTs;
+ }
+ return left.question.localeCompare(right.question);
+ });
+}
+
+function parseTranscriptSideResult(parsed: {
+ type?: string;
+ customType?: string;
+ data?: { question?: unknown; answer?: unknown; timestamp?: unknown; isError?: unknown };
+}): SessionSideResult | null {
+ if (parsed.type !== "custom" || parsed.customType !== BTW_CUSTOM_TYPE) {
+ return null;
+ }
+ const question = typeof parsed.data?.question === "string" ? parsed.data.question.trim() : "";
+ const text = typeof parsed.data?.answer === "string" ? parsed.data.answer.trim() : "";
+ if (!question || !text) {
+ return null;
+ }
+ const timestamp =
+ typeof parsed.data?.timestamp === "number" && Number.isFinite(parsed.data.timestamp)
+ ? parsed.data.timestamp
+ : undefined;
+ const isError = parsed.data?.isError === true ? true : undefined;
+ return {
+ kind: "btw",
+ question,
+ text,
+ isError,
+ ts: timestamp,
+ };
+}
+
+function parsePersistedSideResult(parsed: PersistedSessionSideResult): SessionSideResult | null {
+ if (parsed.kind !== "btw") {
+ return null;
+ }
+ const question = typeof parsed.question === "string" ? parsed.question.trim() : "";
+ const text = typeof parsed.text === "string" ? parsed.text.trim() : "";
+ if (!question || !text) {
+ return null;
+ }
+ const timestamp = Number.isFinite(parsed.ts) ? parsed.ts : undefined;
+ return {
+ kind: "btw",
+ question,
+ text,
+ isError: parsed.isError === true ? true : undefined,
+ ts: timestamp,
+ };
+}
+
export function resolveSessionTranscriptCandidates(
sessionId: string,
storePath: string | undefined,
@@ -202,12 +336,17 @@ export function archiveSessionTranscripts(opts: {
opts.restrictToStoreDir && opts.storePath
? canonicalizePathForComparison(path.dirname(opts.storePath))
: null;
- for (const candidate of resolveSessionTranscriptCandidates(
+ const transcriptCandidates = resolveSessionTranscriptCandidates(
opts.sessionId,
opts.storePath,
opts.sessionFile,
opts.agentId,
- )) {
+ );
+ const archiveCandidates = new Set(transcriptCandidates);
+ for (const candidate of transcriptCandidates) {
+ archiveCandidates.add(resolveSessionSideResultsPathFromTranscript(candidate));
+ }
+ for (const candidate of archiveCandidates) {
const candidatePath = canonicalizePathForComparison(candidate);
if (storeDir) {
const relative = path.relative(storeDir, candidatePath);
diff --git a/src/gateway/session-utils.ts b/src/gateway/session-utils.ts
index 00a2cb7747e..25957ac17ca 100644
--- a/src/gateway/session-utils.ts
+++ b/src/gateway/session-utils.ts
@@ -57,6 +57,7 @@ export {
readSessionTitleFieldsFromTranscript,
readSessionPreviewItemsFromTranscript,
readSessionMessages,
+ readSessionSideResults,
resolveSessionTranscriptCandidates,
} from "./session-utils.fs.js";
export type {
diff --git a/src/infra/outbound/deliver.test.ts b/src/infra/outbound/deliver.test.ts
index 119e7b3c5d7..d30415d0bcb 100644
--- a/src/infra/outbound/deliver.test.ts
+++ b/src/infra/outbound/deliver.test.ts
@@ -1,6 +1,8 @@
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
-import type { ChannelOutboundAdapter } from "../../channels/plugins/types.adapters.js";
+import { signalOutbound } from "../../channels/plugins/outbound/signal.js";
+import { telegramOutbound } from "../../channels/plugins/outbound/telegram.js";
+import { whatsappOutbound } from "../../channels/plugins/outbound/whatsapp.js";
import type { OpenClawConfig } from "../../config/config.js";
import { STATE_DIR } from "../../config/paths.js";
import { setActivePluginRegistry } from "../../plugins/runtime.js";
@@ -8,15 +10,64 @@ import { markdownToSignalTextChunks } from "../../signal/format.js";
import { createOutboundTestPlugin, createTestRegistry } from "../../test-utils/channel-plugins.js";
import { withEnvAsync } from "../../test-utils/env.js";
import { createIMessageTestPlugin } from "../../test-utils/imessage-test-plugin.js";
+import { createInternalHookEventPayload } from "../../test-utils/internal-hook-event-payload.js";
import { resolvePreferredOpenClawTmpDir } from "../tmp-openclaw-dir.js";
-import {
- clearDeliverTestRegistry,
- hookMocks,
- resetDeliverTestState,
- resetDeliverTestMocks,
- runChunkedWhatsAppDelivery as runChunkedWhatsAppDeliveryHelper,
- whatsappChunkConfig,
-} from "./deliver.test-helpers.js";
+
+const mocks = vi.hoisted(() => ({
+ appendAssistantMessageToSessionTranscript: vi.fn(async () => ({ ok: true, sessionFile: "x" })),
+}));
+const hookMocks = vi.hoisted(() => ({
+ runner: {
+ hasHooks: vi.fn(() => false),
+ runMessageSent: vi.fn(async () => {}),
+ },
+}));
+const internalHookMocks = vi.hoisted(() => ({
+ createInternalHookEvent: vi.fn(),
+ triggerInternalHook: vi.fn(async () => {}),
+}));
+const queueMocks = vi.hoisted(() => ({
+ enqueueDelivery: vi.fn(async () => "mock-queue-id"),
+ ackDelivery: vi.fn(async () => {}),
+ failDelivery: vi.fn(async () => {}),
+}));
+const logMocks = vi.hoisted(() => ({
+ warn: vi.fn(),
+}));
+
+vi.mock("../../config/sessions.js", async () => {
+ const actual = await vi.importActual(
+ "../../config/sessions.js",
+ );
+ return {
+ ...actual,
+ appendAssistantMessageToSessionTranscript: mocks.appendAssistantMessageToSessionTranscript,
+ };
+});
+vi.mock("../../plugins/hook-runner-global.js", () => ({
+ getGlobalHookRunner: () => hookMocks.runner,
+}));
+vi.mock("../../hooks/internal-hooks.js", () => ({
+ createInternalHookEvent: internalHookMocks.createInternalHookEvent,
+ triggerInternalHook: internalHookMocks.triggerInternalHook,
+}));
+vi.mock("./delivery-queue.js", () => ({
+ enqueueDelivery: queueMocks.enqueueDelivery,
+ ackDelivery: queueMocks.ackDelivery,
+ failDelivery: queueMocks.failDelivery,
+}));
+vi.mock("../../logging/subsystem.js", () => ({
+ createSubsystemLogger: () => {
+ const makeLogger = () => ({
+ warn: logMocks.warn,
+ info: vi.fn(),
+ error: vi.fn(),
+ debug: vi.fn(),
+ child: vi.fn(() => makeLogger()),
+ });
+ return makeLogger();
+ },
+}));
const { deliverOutboundPayloads, normalizeOutboundPayloads } = await import("./deliver.js");
@@ -24,34 +75,14 @@ const telegramChunkConfig: OpenClawConfig = {
channels: { telegram: { botToken: "tok-1", textChunkLimit: 2 } },
};
+const whatsappChunkConfig: OpenClawConfig = {
+ channels: { whatsapp: { textChunkLimit: 4000 } },
+};
+
type DeliverOutboundArgs = Parameters[0];
type DeliverOutboundPayload = DeliverOutboundArgs["payloads"][number];
type DeliverSession = DeliverOutboundArgs["session"];
-function setMatrixTextOnlyPlugin(sendText: NonNullable) {
- setActivePluginRegistry(
- createTestRegistry([
- {
- pluginId: "matrix",
- source: "test",
- plugin: createOutboundTestPlugin({
- id: "matrix",
- outbound: { deliveryMode: "direct", sendText },
- }),
- },
- ]),
- );
-}
-
-async function deliverMatrixPayloads(payloads: DeliverOutboundPayload[]) {
- return deliverOutboundPayloads({
- cfg: {},
- channel: "matrix",
- to: "!room:1",
- payloads,
- });
-}
-
async function deliverWhatsAppPayload(params: {
sendWhatsApp: NonNullable<
NonNullable[0]["deps"]>["sendWhatsApp"]
@@ -86,14 +117,96 @@ async function deliverTelegramPayload(params: {
});
}
+async function runChunkedWhatsAppDelivery(params?: {
+ mirror?: Parameters[0]["mirror"];
+}) {
+ const sendWhatsApp = vi
+ .fn()
+ .mockResolvedValueOnce({ messageId: "w1", toJid: "jid" })
+ .mockResolvedValueOnce({ messageId: "w2", toJid: "jid" });
+ const cfg: OpenClawConfig = {
+ channels: { whatsapp: { textChunkLimit: 2 } },
+ };
+ const results = await deliverOutboundPayloads({
+ cfg,
+ channel: "whatsapp",
+ to: "+1555",
+ payloads: [{ text: "abcd" }],
+ deps: { sendWhatsApp },
+ ...(params?.mirror ? { mirror: params.mirror } : {}),
+ });
+ return { sendWhatsApp, results };
+}
+
+async function deliverSingleWhatsAppForHookTest(params?: { sessionKey?: string }) {
+ const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" });
+ await deliverOutboundPayloads({
+ cfg: whatsappChunkConfig,
+ channel: "whatsapp",
+ to: "+1555",
+ payloads: [{ text: "hello" }],
+ deps: { sendWhatsApp },
+ ...(params?.sessionKey ? { session: { key: params.sessionKey } } : {}),
+ });
+}
+
+async function runBestEffortPartialFailureDelivery() {
+ const sendWhatsApp = vi
+ .fn()
+ .mockRejectedValueOnce(new Error("fail"))
+ .mockResolvedValueOnce({ messageId: "w2", toJid: "jid" });
+ const onError = vi.fn();
+ const cfg: OpenClawConfig = {};
+ const results = await deliverOutboundPayloads({
+ cfg,
+ channel: "whatsapp",
+ to: "+1555",
+ payloads: [{ text: "a" }, { text: "b" }],
+ deps: { sendWhatsApp },
+ bestEffort: true,
+ onError,
+ });
+ return { sendWhatsApp, onError, results };
+}
+
+function expectSuccessfulWhatsAppInternalHookPayload(
+ expected: Partial<{
+ content: string;
+ messageId: string;
+ isGroup: boolean;
+ groupId: string;
+ }>,
+) {
+ return expect.objectContaining({
+ to: "+1555",
+ success: true,
+ channelId: "whatsapp",
+ conversationId: "+1555",
+ ...expected,
+ });
+}
+
describe("deliverOutboundPayloads", () => {
beforeEach(() => {
- resetDeliverTestState();
- resetDeliverTestMocks();
+ setActivePluginRegistry(defaultRegistry);
+ hookMocks.runner.hasHooks.mockClear();
+ hookMocks.runner.hasHooks.mockReturnValue(false);
+ hookMocks.runner.runMessageSent.mockClear();
+ hookMocks.runner.runMessageSent.mockResolvedValue(undefined);
+ internalHookMocks.createInternalHookEvent.mockClear();
+ internalHookMocks.createInternalHookEvent.mockImplementation(createInternalHookEventPayload);
+ internalHookMocks.triggerInternalHook.mockClear();
+ queueMocks.enqueueDelivery.mockClear();
+ queueMocks.enqueueDelivery.mockResolvedValue("mock-queue-id");
+ queueMocks.ackDelivery.mockClear();
+ queueMocks.ackDelivery.mockResolvedValue(undefined);
+ queueMocks.failDelivery.mockClear();
+ queueMocks.failDelivery.mockResolvedValue(undefined);
+ logMocks.warn.mockClear();
});
afterEach(() => {
- clearDeliverTestRegistry();
+ setActivePluginRegistry(emptyRegistry);
});
it("chunks telegram markdown and passes through accountId", async () => {
const sendTelegram = vi.fn().mockResolvedValue({ messageId: "m1", chatId: "c1" });
@@ -175,6 +288,24 @@ describe("deliverOutboundPayloads", () => {
);
});
+ it("prefixes BTW replies for telegram delivery", async () => {
+ const sendTelegram = vi.fn().mockResolvedValue({ messageId: "m1", chatId: "c1" });
+
+ await deliverTelegramPayload({
+ sendTelegram,
+ cfg: {
+ channels: { telegram: { botToken: "tok-1", textChunkLimit: 100 } },
+ },
+ payload: { text: "323", btw: { question: "what is 17 * 19?" } },
+ });
+
+ expect(sendTelegram).toHaveBeenCalledWith(
+ "123",
+ "BTW: 323",
+ expect.objectContaining({ verbose: false, textMode: "html" }),
+ );
+ });
+
it("preserves HTML text for telegram sendPayload channelData path", async () => {
const sendTelegram = vi.fn().mockResolvedValue({ messageId: "m1", chatId: "c1" });
@@ -416,9 +547,7 @@ describe("deliverOutboundPayloads", () => {
});
it("chunks WhatsApp text and returns all results", async () => {
- const { sendWhatsApp, results } = await runChunkedWhatsAppDeliveryHelper({
- deliverOutboundPayloads,
- });
+ const { sendWhatsApp, results } = await runChunkedWhatsAppDelivery();
expect(sendWhatsApp).toHaveBeenCalledTimes(2);
expect(results.map((r) => r.messageId)).toEqual(["w1", "w2"]);
@@ -614,6 +743,222 @@ describe("deliverOutboundPayloads", () => {
]);
});
+ it("prefixes BTW replies for whatsapp delivery", async () => {
+ const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" });
+
+ await deliverWhatsAppPayload({
+ sendWhatsApp,
+ payload: { text: "323", btw: { question: "what is 17 * 19?" } },
+ });
+
+ expect(sendWhatsApp).toHaveBeenCalledWith("+1555", "BTW: 323", expect.any(Object));
+ });
+
+ it("continues on errors when bestEffort is enabled", async () => {
+ const { sendWhatsApp, onError, results } = await runBestEffortPartialFailureDelivery();
+
+ expect(sendWhatsApp).toHaveBeenCalledTimes(2);
+ expect(onError).toHaveBeenCalledTimes(1);
+ expect(results).toEqual([{ channel: "whatsapp", messageId: "w2", toJid: "jid" }]);
+ });
+
+ it("emits internal message:sent hook with success=true for chunked payload delivery", async () => {
+ const { sendWhatsApp } = await runChunkedWhatsAppDelivery({
+ mirror: {
+ sessionKey: "agent:main:main",
+ isGroup: true,
+ groupId: "whatsapp:group:123",
+ },
+ });
+ expect(sendWhatsApp).toHaveBeenCalledTimes(2);
+
+ expect(internalHookMocks.createInternalHookEvent).toHaveBeenCalledTimes(1);
+ expect(internalHookMocks.createInternalHookEvent).toHaveBeenCalledWith(
+ "message",
+ "sent",
+ "agent:main:main",
+ expectSuccessfulWhatsAppInternalHookPayload({
+ content: "abcd",
+ messageId: "w2",
+ isGroup: true,
+ groupId: "whatsapp:group:123",
+ }),
+ );
+ expect(internalHookMocks.triggerInternalHook).toHaveBeenCalledTimes(1);
+ });
+
+ it("does not emit internal message:sent hook when neither mirror nor sessionKey is provided", async () => {
+ await deliverSingleWhatsAppForHookTest();
+
+ expect(internalHookMocks.createInternalHookEvent).not.toHaveBeenCalled();
+ expect(internalHookMocks.triggerInternalHook).not.toHaveBeenCalled();
+ });
+
+ it("emits internal message:sent hook when sessionKey is provided without mirror", async () => {
+ await deliverSingleWhatsAppForHookTest({ sessionKey: "agent:main:main" });
+
+ expect(internalHookMocks.createInternalHookEvent).toHaveBeenCalledTimes(1);
+ expect(internalHookMocks.createInternalHookEvent).toHaveBeenCalledWith(
+ "message",
+ "sent",
+ "agent:main:main",
+ expectSuccessfulWhatsAppInternalHookPayload({ content: "hello", messageId: "w1" }),
+ );
+ expect(internalHookMocks.triggerInternalHook).toHaveBeenCalledTimes(1);
+ });
+
+ it("warns when session.agentId is set without a session key", async () => {
+ const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" });
+ hookMocks.runner.hasHooks.mockReturnValue(true);
+
+ await deliverOutboundPayloads({
+ cfg: whatsappChunkConfig,
+ channel: "whatsapp",
+ to: "+1555",
+ payloads: [{ text: "hello" }],
+ deps: { sendWhatsApp },
+ session: { agentId: "agent-main" },
+ });
+
+ expect(logMocks.warn).toHaveBeenCalledWith(
+ "deliverOutboundPayloads: session.agentId present without session key; internal message:sent hook will be skipped",
+ expect.objectContaining({ channel: "whatsapp", to: "+1555", agentId: "agent-main" }),
+ );
+ });
+
+ it("calls failDelivery instead of ackDelivery on bestEffort partial failure", async () => {
+ const { onError } = await runBestEffortPartialFailureDelivery();
+
+ // onError was called for the first payload's failure.
+ expect(onError).toHaveBeenCalledTimes(1);
+
+ // Queue entry should NOT be acked — failDelivery should be called instead.
+ expect(queueMocks.ackDelivery).not.toHaveBeenCalled();
+ expect(queueMocks.failDelivery).toHaveBeenCalledWith(
+ "mock-queue-id",
+ "partial delivery failure (bestEffort)",
+ );
+ });
+
+ it("acks the queue entry when delivery is aborted", async () => {
+ const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" });
+ const abortController = new AbortController();
+ abortController.abort();
+ const cfg: OpenClawConfig = {};
+
+ await expect(
+ deliverOutboundPayloads({
+ cfg,
+ channel: "whatsapp",
+ to: "+1555",
+ payloads: [{ text: "a" }],
+ deps: { sendWhatsApp },
+ abortSignal: abortController.signal,
+ }),
+ ).rejects.toThrow("Operation aborted");
+
+ expect(queueMocks.ackDelivery).toHaveBeenCalledWith("mock-queue-id");
+ expect(queueMocks.failDelivery).not.toHaveBeenCalled();
+ expect(sendWhatsApp).not.toHaveBeenCalled();
+ });
+
+ it("passes normalized payload to onError", async () => {
+ const sendWhatsApp = vi.fn().mockRejectedValue(new Error("boom"));
+ const onError = vi.fn();
+ const cfg: OpenClawConfig = {};
+
+ await deliverOutboundPayloads({
+ cfg,
+ channel: "whatsapp",
+ to: "+1555",
+ payloads: [{ text: "hi", mediaUrl: "https://x.test/a.jpg" }],
+ deps: { sendWhatsApp },
+ bestEffort: true,
+ onError,
+ });
+
+ expect(onError).toHaveBeenCalledTimes(1);
+ expect(onError).toHaveBeenCalledWith(
+ expect.any(Error),
+ expect.objectContaining({ text: "hi", mediaUrls: ["https://x.test/a.jpg"] }),
+ );
+ });
+
+ it("mirrors delivered output when mirror options are provided", async () => {
+ const sendTelegram = vi.fn().mockResolvedValue({ messageId: "m1", chatId: "c1" });
+ mocks.appendAssistantMessageToSessionTranscript.mockClear();
+
+ await deliverOutboundPayloads({
+ cfg: telegramChunkConfig,
+ channel: "telegram",
+ to: "123",
+ payloads: [{ text: "caption", mediaUrl: "https://example.com/files/report.pdf?sig=1" }],
+ deps: { sendTelegram },
+ mirror: {
+ sessionKey: "agent:main:main",
+ text: "caption",
+ mediaUrls: ["https://example.com/files/report.pdf?sig=1"],
+ idempotencyKey: "idem-deliver-1",
+ },
+ });
+
+ expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledWith(
+ expect.objectContaining({
+ text: "report.pdf",
+ idempotencyKey: "idem-deliver-1",
+ }),
+ );
+ });
+
+ it("emits message_sent success for text-only deliveries", async () => {
+ hookMocks.runner.hasHooks.mockReturnValue(true);
+ const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" });
+
+ await deliverOutboundPayloads({
+ cfg: {},
+ channel: "whatsapp",
+ to: "+1555",
+ payloads: [{ text: "hello" }],
+ deps: { sendWhatsApp },
+ });
+
+ expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith(
+ expect.objectContaining({ to: "+1555", content: "hello", success: true }),
+ expect.objectContaining({ channelId: "whatsapp" }),
+ );
+ });
+
+ it("emits message_sent success for sendPayload deliveries", async () => {
+ hookMocks.runner.hasHooks.mockReturnValue(true);
+ const sendPayload = vi.fn().mockResolvedValue({ channel: "matrix", messageId: "mx-1" });
+ const sendText = vi.fn();
+ const sendMedia = vi.fn();
+ setActivePluginRegistry(
+ createTestRegistry([
+ {
+ pluginId: "matrix",
+ source: "test",
+ plugin: createOutboundTestPlugin({
+ id: "matrix",
+ outbound: { deliveryMode: "direct", sendPayload, sendText, sendMedia },
+ }),
+ },
+ ]),
+ );
+
+ await deliverOutboundPayloads({
+ cfg: {},
+ channel: "matrix",
+ to: "!room:1",
+ payloads: [{ text: "payload text", channelData: { mode: "custom" } }],
+ });
+
+ expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith(
+ expect.objectContaining({ to: "!room:1", content: "payload text", success: true }),
+ expect.objectContaining({ channelId: "matrix" }),
+ );
+ });
+
it("preserves channelData-only payloads with empty text for non-WhatsApp sendPayload channels", async () => {
const sendPayload = vi.fn().mockResolvedValue({ channel: "line", messageId: "ln-1" });
const sendText = vi.fn();
@@ -649,11 +994,25 @@ describe("deliverOutboundPayloads", () => {
it("falls back to sendText when plugin outbound omits sendMedia", async () => {
const sendText = vi.fn().mockResolvedValue({ channel: "matrix", messageId: "mx-1" });
- setMatrixTextOnlyPlugin(sendText);
+ setActivePluginRegistry(
+ createTestRegistry([
+ {
+ pluginId: "matrix",
+ source: "test",
+ plugin: createOutboundTestPlugin({
+ id: "matrix",
+ outbound: { deliveryMode: "direct", sendText },
+ }),
+ },
+ ]),
+ );
- const results = await deliverMatrixPayloads([
- { text: "caption", mediaUrl: "https://example.com/file.png" },
- ]);
+ const results = await deliverOutboundPayloads({
+ cfg: {},
+ channel: "matrix",
+ to: "!room:1",
+ payloads: [{ text: "caption", mediaUrl: "https://example.com/file.png" }],
+ });
expect(sendText).toHaveBeenCalledTimes(1);
expect(sendText).toHaveBeenCalledWith(
@@ -661,19 +1020,42 @@ describe("deliverOutboundPayloads", () => {
text: "caption",
}),
);
+ expect(logMocks.warn).toHaveBeenCalledWith(
+ "Plugin outbound adapter does not implement sendMedia; media URLs will be dropped and text fallback will be used",
+ expect.objectContaining({
+ channel: "matrix",
+ mediaCount: 1,
+ }),
+ );
expect(results).toEqual([{ channel: "matrix", messageId: "mx-1" }]);
});
it("falls back to one sendText call for multi-media payloads when sendMedia is omitted", async () => {
const sendText = vi.fn().mockResolvedValue({ channel: "matrix", messageId: "mx-2" });
- setMatrixTextOnlyPlugin(sendText);
+ setActivePluginRegistry(
+ createTestRegistry([
+ {
+ pluginId: "matrix",
+ source: "test",
+ plugin: createOutboundTestPlugin({
+ id: "matrix",
+ outbound: { deliveryMode: "direct", sendText },
+ }),
+ },
+ ]),
+ );
- const results = await deliverMatrixPayloads([
- {
- text: "caption",
- mediaUrls: ["https://example.com/a.png", "https://example.com/b.png"],
- },
- ]);
+ const results = await deliverOutboundPayloads({
+ cfg: {},
+ channel: "matrix",
+ to: "!room:1",
+ payloads: [
+ {
+ text: "caption",
+ mediaUrls: ["https://example.com/a.png", "https://example.com/b.png"],
+ },
+ ],
+ });
expect(sendText).toHaveBeenCalledTimes(1);
expect(sendText).toHaveBeenCalledWith(
@@ -681,20 +1063,109 @@ describe("deliverOutboundPayloads", () => {
text: "caption",
}),
);
+ expect(logMocks.warn).toHaveBeenCalledWith(
+ "Plugin outbound adapter does not implement sendMedia; media URLs will be dropped and text fallback will be used",
+ expect.objectContaining({
+ channel: "matrix",
+ mediaCount: 2,
+ }),
+ );
expect(results).toEqual([{ channel: "matrix", messageId: "mx-2" }]);
});
it("fails media-only payloads when plugin outbound omits sendMedia", async () => {
hookMocks.runner.hasHooks.mockReturnValue(true);
const sendText = vi.fn().mockResolvedValue({ channel: "matrix", messageId: "mx-3" });
- setMatrixTextOnlyPlugin(sendText);
+ setActivePluginRegistry(
+ createTestRegistry([
+ {
+ pluginId: "matrix",
+ source: "test",
+ plugin: createOutboundTestPlugin({
+ id: "matrix",
+ outbound: { deliveryMode: "direct", sendText },
+ }),
+ },
+ ]),
+ );
await expect(
- deliverMatrixPayloads([{ text: " ", mediaUrl: "https://example.com/file.png" }]),
+ deliverOutboundPayloads({
+ cfg: {},
+ channel: "matrix",
+ to: "!room:1",
+ payloads: [{ text: " ", mediaUrl: "https://example.com/file.png" }],
+ }),
).rejects.toThrow(
"Plugin outbound adapter does not implement sendMedia and no text fallback is available for media payload",
);
expect(sendText).not.toHaveBeenCalled();
+ expect(logMocks.warn).toHaveBeenCalledWith(
+ "Plugin outbound adapter does not implement sendMedia; media URLs will be dropped and text fallback will be used",
+ expect.objectContaining({
+ channel: "matrix",
+ mediaCount: 1,
+ }),
+ );
+ expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith(
+ expect.objectContaining({
+ to: "!room:1",
+ content: "",
+ success: false,
+ error:
+ "Plugin outbound adapter does not implement sendMedia and no text fallback is available for media payload",
+ }),
+ expect.objectContaining({ channelId: "matrix" }),
+ );
+ });
+
+ it("emits message_sent failure when delivery errors", async () => {
+ hookMocks.runner.hasHooks.mockReturnValue(true);
+ const sendWhatsApp = vi.fn().mockRejectedValue(new Error("downstream failed"));
+
+ await expect(
+ deliverOutboundPayloads({
+ cfg: {},
+ channel: "whatsapp",
+ to: "+1555",
+ payloads: [{ text: "hi" }],
+ deps: { sendWhatsApp },
+ }),
+ ).rejects.toThrow("downstream failed");
+
+ expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith(
+ expect.objectContaining({
+ to: "+1555",
+ content: "hi",
+ success: false,
+ error: "downstream failed",
+ }),
+ expect.objectContaining({ channelId: "whatsapp" }),
+ );
});
});
+
+const emptyRegistry = createTestRegistry([]);
+const defaultRegistry = createTestRegistry([
+ {
+ pluginId: "telegram",
+ plugin: createOutboundTestPlugin({ id: "telegram", outbound: telegramOutbound }),
+ source: "test",
+ },
+ {
+ pluginId: "signal",
+ plugin: createOutboundTestPlugin({ id: "signal", outbound: signalOutbound }),
+ source: "test",
+ },
+ {
+ pluginId: "whatsapp",
+ plugin: createOutboundTestPlugin({ id: "whatsapp", outbound: whatsappOutbound }),
+ source: "test",
+ },
+ {
+ pluginId: "imessage",
+ plugin: createIMessageTestPlugin(),
+ source: "test",
+ },
+]);
diff --git a/src/infra/outbound/outbound.test.ts b/src/infra/outbound/outbound.test.ts
index c20632099bd..5e5ea6f0ea8 100644
--- a/src/infra/outbound/outbound.test.ts
+++ b/src/infra/outbound/outbound.test.ts
@@ -1,3 +1,1309 @@
+import fs from "node:fs";
+import os from "node:os";
+import path from "node:path";
+import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
+import type { ReplyPayload } from "../../auto-reply/types.js";
+import type { OpenClawConfig } from "../../config/config.js";
+import { typedCases } from "../../test-utils/typed-cases.js";
+import {
+ ackDelivery,
+ computeBackoffMs,
+ type DeliverFn,
+ enqueueDelivery,
+ failDelivery,
+ isEntryEligibleForRecoveryRetry,
+ isPermanentDeliveryError,
+ loadPendingDeliveries,
+ MAX_RETRIES,
+ moveToFailed,
+ recoverPendingDeliveries,
+} from "./delivery-queue.js";
+import { DirectoryCache } from "./directory-cache.js";
+import { buildOutboundResultEnvelope } from "./envelope.js";
+import type { OutboundDeliveryJson } from "./format.js";
+import {
+ buildOutboundDeliveryJson,
+ formatGatewaySummary,
+ formatOutboundDeliverySummary,
+} from "./format.js";
+import {
+ applyCrossContextDecoration,
+ buildCrossContextDecoration,
+ enforceCrossContextPolicy,
+} from "./outbound-policy.js";
+import { resolveOutboundSessionRoute } from "./outbound-session.js";
+import {
+ formatOutboundPayloadLog,
+ normalizeOutboundPayloads,
+ normalizeOutboundPayloadsForJson,
+} from "./payloads.js";
import { runResolveOutboundTargetCoreTests } from "./targets.shared-test.js";
+describe("delivery-queue", () => {
+ let tmpDir: string;
+ let fixtureRoot = "";
+ let fixtureCount = 0;
+
+ beforeAll(() => {
+ fixtureRoot = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-dq-suite-"));
+ });
+
+ beforeEach(() => {
+ tmpDir = path.join(fixtureRoot, `case-${fixtureCount++}`);
+ fs.mkdirSync(tmpDir, { recursive: true });
+ });
+
+ afterAll(() => {
+ if (!fixtureRoot) {
+ return;
+ }
+ fs.rmSync(fixtureRoot, { recursive: true, force: true });
+ fixtureRoot = "";
+ });
+
+ describe("enqueue + ack lifecycle", () => {
+ it("creates and removes a queue entry", async () => {
+ const id = await enqueueDelivery(
+ {
+ channel: "whatsapp",
+ to: "+1555",
+ payloads: [{ text: "hello" }],
+ bestEffort: true,
+ gifPlayback: true,
+ silent: true,
+ mirror: {
+ sessionKey: "agent:main:main",
+ text: "hello",
+ mediaUrls: ["https://example.com/file.png"],
+ },
+ },
+ tmpDir,
+ );
+
+ // Entry file exists after enqueue.
+ const queueDir = path.join(tmpDir, "delivery-queue");
+ const files = fs.readdirSync(queueDir).filter((f) => f.endsWith(".json"));
+ expect(files).toHaveLength(1);
+ expect(files[0]).toBe(`${id}.json`);
+
+ // Entry contents are correct.
+ const entry = JSON.parse(fs.readFileSync(path.join(queueDir, files[0]), "utf-8"));
+ expect(entry).toMatchObject({
+ id,
+ channel: "whatsapp",
+ to: "+1555",
+ bestEffort: true,
+ gifPlayback: true,
+ silent: true,
+ mirror: {
+ sessionKey: "agent:main:main",
+ text: "hello",
+ mediaUrls: ["https://example.com/file.png"],
+ },
+ retryCount: 0,
+ });
+ expect(entry.payloads).toEqual([{ text: "hello" }]);
+
+ // Ack removes the file.
+ await ackDelivery(id, tmpDir);
+ const remaining = fs.readdirSync(queueDir).filter((f) => f.endsWith(".json"));
+ expect(remaining).toHaveLength(0);
+ });
+
+ it("ack is idempotent (no error on missing file)", async () => {
+ await expect(ackDelivery("nonexistent-id", tmpDir)).resolves.toBeUndefined();
+ });
+
+ it("ack cleans up leftover .delivered marker when .json is already gone", async () => {
+ const id = await enqueueDelivery(
+ { channel: "whatsapp", to: "+1", payloads: [{ text: "stale-marker" }] },
+ tmpDir,
+ );
+ const queueDir = path.join(tmpDir, "delivery-queue");
+
+ fs.renameSync(path.join(queueDir, `${id}.json`), path.join(queueDir, `${id}.delivered`));
+ await expect(ackDelivery(id, tmpDir)).resolves.toBeUndefined();
+
+ expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false);
+ });
+
+ it("ack removes .delivered marker so recovery does not replay", async () => {
+ const id = await enqueueDelivery(
+ { channel: "whatsapp", to: "+1", payloads: [{ text: "ack-test" }] },
+ tmpDir,
+ );
+ const queueDir = path.join(tmpDir, "delivery-queue");
+
+ await ackDelivery(id, tmpDir);
+
+ // Neither .json nor .delivered should remain.
+ expect(fs.existsSync(path.join(queueDir, `${id}.json`))).toBe(false);
+ expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false);
+ });
+
+ it("loadPendingDeliveries cleans up stale .delivered markers without replaying", async () => {
+ const id = await enqueueDelivery(
+ { channel: "telegram", to: "99", payloads: [{ text: "stale" }] },
+ tmpDir,
+ );
+ const queueDir = path.join(tmpDir, "delivery-queue");
+
+ // Simulate crash between ack phase 1 (rename) and phase 2 (unlink):
+ // rename .json → .delivered, then pretend the process died.
+ fs.renameSync(path.join(queueDir, `${id}.json`), path.join(queueDir, `${id}.delivered`));
+
+ const entries = await loadPendingDeliveries(tmpDir);
+
+ // The .delivered entry must NOT appear as pending.
+ expect(entries).toHaveLength(0);
+ // And the marker file should have been cleaned up.
+ expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false);
+ });
+ });
+
+ describe("failDelivery", () => {
+ it("increments retryCount, records attempt time, and sets lastError", async () => {
+ const id = await enqueueDelivery(
+ {
+ channel: "telegram",
+ to: "123",
+ payloads: [{ text: "test" }],
+ },
+ tmpDir,
+ );
+
+ await failDelivery(id, "connection refused", tmpDir);
+
+ const queueDir = path.join(tmpDir, "delivery-queue");
+ const entry = JSON.parse(fs.readFileSync(path.join(queueDir, `${id}.json`), "utf-8"));
+ expect(entry.retryCount).toBe(1);
+ expect(typeof entry.lastAttemptAt).toBe("number");
+ expect(entry.lastAttemptAt).toBeGreaterThan(0);
+ expect(entry.lastError).toBe("connection refused");
+ });
+ });
+
+ describe("moveToFailed", () => {
+ it("moves entry to failed/ subdirectory", async () => {
+ const id = await enqueueDelivery(
+ {
+ channel: "slack",
+ to: "#general",
+ payloads: [{ text: "hi" }],
+ },
+ tmpDir,
+ );
+
+ await moveToFailed(id, tmpDir);
+
+ const queueDir = path.join(tmpDir, "delivery-queue");
+ const failedDir = path.join(queueDir, "failed");
+ expect(fs.existsSync(path.join(queueDir, `${id}.json`))).toBe(false);
+ expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true);
+ });
+ });
+
+ describe("isPermanentDeliveryError", () => {
+ it.each([
+ "No conversation reference found for user:abc",
+ "Telegram send failed: chat not found (chat_id=user:123)",
+ "user not found",
+ "Bot was blocked by the user",
+ "Forbidden: bot was kicked from the group chat",
+ "chat_id is empty",
+ "Outbound not configured for channel: msteams",
+ ])("returns true for permanent error: %s", (msg) => {
+ expect(isPermanentDeliveryError(msg)).toBe(true);
+ });
+
+ it.each([
+ "network down",
+ "ETIMEDOUT",
+ "socket hang up",
+ "rate limited",
+ "500 Internal Server Error",
+ ])("returns false for transient error: %s", (msg) => {
+ expect(isPermanentDeliveryError(msg)).toBe(false);
+ });
+ });
+
+ describe("loadPendingDeliveries", () => {
+ it("returns empty array when queue directory does not exist", async () => {
+ const nonexistent = path.join(tmpDir, "no-such-dir");
+ const entries = await loadPendingDeliveries(nonexistent);
+ expect(entries).toEqual([]);
+ });
+
+ it("loads multiple entries", async () => {
+ await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir);
+ await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir);
+
+ const entries = await loadPendingDeliveries(tmpDir);
+ expect(entries).toHaveLength(2);
+ });
+
+ it("backfills lastAttemptAt for legacy retry entries during load", async () => {
+ const id = await enqueueDelivery(
+ { channel: "whatsapp", to: "+1", payloads: [{ text: "legacy" }] },
+ tmpDir,
+ );
+ const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`);
+ const legacyEntry = JSON.parse(fs.readFileSync(filePath, "utf-8"));
+ legacyEntry.retryCount = 2;
+ delete legacyEntry.lastAttemptAt;
+ fs.writeFileSync(filePath, JSON.stringify(legacyEntry), "utf-8");
+
+ const entries = await loadPendingDeliveries(tmpDir);
+ expect(entries).toHaveLength(1);
+ expect(entries[0]?.lastAttemptAt).toBe(entries[0]?.enqueuedAt);
+
+ const persisted = JSON.parse(fs.readFileSync(filePath, "utf-8"));
+ expect(persisted.lastAttemptAt).toBe(persisted.enqueuedAt);
+ });
+ });
+
+ describe("computeBackoffMs", () => {
+ it("returns scheduled backoff values and clamps at max retry", () => {
+ const cases = [
+ { retryCount: 0, expected: 0 },
+ { retryCount: 1, expected: 5_000 },
+ { retryCount: 2, expected: 25_000 },
+ { retryCount: 3, expected: 120_000 },
+ { retryCount: 4, expected: 600_000 },
+ // Beyond defined schedule -- clamps to last value.
+ { retryCount: 5, expected: 600_000 },
+ ] as const;
+
+ for (const testCase of cases) {
+ expect(computeBackoffMs(testCase.retryCount), String(testCase.retryCount)).toBe(
+ testCase.expected,
+ );
+ }
+ });
+ });
+
+ describe("isEntryEligibleForRecoveryRetry", () => {
+ it("allows first replay after crash for retryCount=0 without lastAttemptAt", () => {
+ const now = Date.now();
+ const result = isEntryEligibleForRecoveryRetry(
+ {
+ id: "entry-1",
+ channel: "whatsapp",
+ to: "+1",
+ payloads: [{ text: "a" }],
+ enqueuedAt: now,
+ retryCount: 0,
+ },
+ now,
+ );
+ expect(result).toEqual({ eligible: true });
+ });
+
+ it("defers retry entries until backoff window elapses", () => {
+ const now = Date.now();
+ const result = isEntryEligibleForRecoveryRetry(
+ {
+ id: "entry-2",
+ channel: "whatsapp",
+ to: "+1",
+ payloads: [{ text: "a" }],
+ enqueuedAt: now - 30_000,
+ retryCount: 3,
+ lastAttemptAt: now,
+ },
+ now,
+ );
+ expect(result.eligible).toBe(false);
+ if (result.eligible) {
+ throw new Error("Expected ineligible retry entry");
+ }
+ expect(result.remainingBackoffMs).toBeGreaterThan(0);
+ });
+ });
+
+ describe("recoverPendingDeliveries", () => {
+ const baseCfg = {};
+ const createLog = () => ({ info: vi.fn(), warn: vi.fn(), error: vi.fn() });
+ const enqueueCrashRecoveryEntries = async () => {
+ await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir);
+ await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir);
+ };
+ const setEntryState = (
+ id: string,
+ state: { retryCount: number; lastAttemptAt?: number; enqueuedAt?: number },
+ ) => {
+ const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`);
+ const entry = JSON.parse(fs.readFileSync(filePath, "utf-8"));
+ entry.retryCount = state.retryCount;
+ if (state.lastAttemptAt === undefined) {
+ delete entry.lastAttemptAt;
+ } else {
+ entry.lastAttemptAt = state.lastAttemptAt;
+ }
+ if (state.enqueuedAt !== undefined) {
+ entry.enqueuedAt = state.enqueuedAt;
+ }
+ fs.writeFileSync(filePath, JSON.stringify(entry), "utf-8");
+ };
+ const runRecovery = async ({
+ deliver,
+ log = createLog(),
+ maxRecoveryMs,
+ }: {
+ deliver: ReturnType;
+ log?: ReturnType;
+ maxRecoveryMs?: number;
+ }) => {
+ const result = await recoverPendingDeliveries({
+ deliver: deliver as DeliverFn,
+ log,
+ cfg: baseCfg,
+ stateDir: tmpDir,
+ ...(maxRecoveryMs === undefined ? {} : { maxRecoveryMs }),
+ });
+ return { result, log };
+ };
+
+ it("recovers entries from a simulated crash", async () => {
+ // Manually create queue entries as if gateway crashed before delivery.
+ await enqueueCrashRecoveryEntries();
+ const deliver = vi.fn().mockResolvedValue([]);
+ const { result } = await runRecovery({ deliver });
+
+ expect(deliver).toHaveBeenCalledTimes(2);
+ expect(result.recovered).toBe(2);
+ expect(result.failed).toBe(0);
+ expect(result.skippedMaxRetries).toBe(0);
+ expect(result.deferredBackoff).toBe(0);
+
+ // Queue should be empty after recovery.
+ const remaining = await loadPendingDeliveries(tmpDir);
+ expect(remaining).toHaveLength(0);
+ });
+
+ it("moves entries that exceeded max retries to failed/", async () => {
+ // Create an entry and manually set retryCount to MAX_RETRIES.
+ const id = await enqueueDelivery(
+ { channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] },
+ tmpDir,
+ );
+ setEntryState(id, { retryCount: MAX_RETRIES });
+
+ const deliver = vi.fn();
+ const { result } = await runRecovery({ deliver });
+
+ expect(deliver).not.toHaveBeenCalled();
+ expect(result.skippedMaxRetries).toBe(1);
+ expect(result.deferredBackoff).toBe(0);
+
+ // Entry should be in failed/ directory.
+ const failedDir = path.join(tmpDir, "delivery-queue", "failed");
+ expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true);
+ });
+
+ it("increments retryCount on failed recovery attempt", async () => {
+ await enqueueDelivery({ channel: "slack", to: "#ch", payloads: [{ text: "x" }] }, tmpDir);
+
+ const deliver = vi.fn().mockRejectedValue(new Error("network down"));
+ const { result } = await runRecovery({ deliver });
+
+ expect(result.failed).toBe(1);
+ expect(result.recovered).toBe(0);
+
+ // Entry should still be in queue with incremented retryCount.
+ const entries = await loadPendingDeliveries(tmpDir);
+ expect(entries).toHaveLength(1);
+ expect(entries[0].retryCount).toBe(1);
+ expect(entries[0].lastError).toBe("network down");
+ });
+
+ it("moves entries to failed/ immediately on permanent delivery errors", async () => {
+ const id = await enqueueDelivery(
+ { channel: "msteams", to: "user:abc", payloads: [{ text: "hi" }] },
+ tmpDir,
+ );
+ const deliver = vi
+ .fn()
+ .mockRejectedValue(new Error("No conversation reference found for user:abc"));
+ const log = createLog();
+ const { result } = await runRecovery({ deliver, log });
+
+ expect(result.failed).toBe(1);
+ expect(result.recovered).toBe(0);
+ const remaining = await loadPendingDeliveries(tmpDir);
+ expect(remaining).toHaveLength(0);
+ const failedDir = path.join(tmpDir, "delivery-queue", "failed");
+ expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true);
+ expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("permanent error"));
+ });
+
+ it("passes skipQueue: true to prevent re-enqueueing during recovery", async () => {
+ await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir);
+
+ const deliver = vi.fn().mockResolvedValue([]);
+ await runRecovery({ deliver });
+
+ expect(deliver).toHaveBeenCalledWith(expect.objectContaining({ skipQueue: true }));
+ });
+
+ it("replays stored delivery options during recovery", async () => {
+ await enqueueDelivery(
+ {
+ channel: "whatsapp",
+ to: "+1",
+ payloads: [{ text: "a" }],
+ bestEffort: true,
+ gifPlayback: true,
+ silent: true,
+ mirror: {
+ sessionKey: "agent:main:main",
+ text: "a",
+ mediaUrls: ["https://example.com/a.png"],
+ },
+ },
+ tmpDir,
+ );
+
+ const deliver = vi.fn().mockResolvedValue([]);
+ await runRecovery({ deliver });
+
+ expect(deliver).toHaveBeenCalledWith(
+ expect.objectContaining({
+ bestEffort: true,
+ gifPlayback: true,
+ silent: true,
+ mirror: {
+ sessionKey: "agent:main:main",
+ text: "a",
+ mediaUrls: ["https://example.com/a.png"],
+ },
+ }),
+ );
+ });
+
+ it("respects maxRecoveryMs time budget", async () => {
+ await enqueueCrashRecoveryEntries();
+ await enqueueDelivery({ channel: "slack", to: "#c", payloads: [{ text: "c" }] }, tmpDir);
+
+ const deliver = vi.fn().mockResolvedValue([]);
+ const { result, log } = await runRecovery({
+ deliver,
+ maxRecoveryMs: 0, // Immediate timeout -- no entries should be processed.
+ });
+
+ expect(deliver).not.toHaveBeenCalled();
+ expect(result.recovered).toBe(0);
+ expect(result.failed).toBe(0);
+ expect(result.skippedMaxRetries).toBe(0);
+ expect(result.deferredBackoff).toBe(0);
+
+ // All entries should still be in the queue.
+ const remaining = await loadPendingDeliveries(tmpDir);
+ expect(remaining).toHaveLength(3);
+
+ // Should have logged a warning about deferred entries.
+ expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("deferred to next restart"));
+ });
+
+ it("defers entries until backoff becomes eligible", async () => {
+ const id = await enqueueDelivery(
+ { channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] },
+ tmpDir,
+ );
+ setEntryState(id, { retryCount: 3, lastAttemptAt: Date.now() });
+
+ const deliver = vi.fn().mockResolvedValue([]);
+ const { result, log } = await runRecovery({
+ deliver,
+ maxRecoveryMs: 60_000,
+ });
+
+ expect(deliver).not.toHaveBeenCalled();
+ expect(result).toEqual({
+ recovered: 0,
+ failed: 0,
+ skippedMaxRetries: 0,
+ deferredBackoff: 1,
+ });
+
+ const remaining = await loadPendingDeliveries(tmpDir);
+ expect(remaining).toHaveLength(1);
+
+ expect(log.info).toHaveBeenCalledWith(expect.stringContaining("not ready for retry yet"));
+ });
+
+ it("continues past high-backoff entries and recovers ready entries behind them", async () => {
+ const now = Date.now();
+ const blockedId = await enqueueDelivery(
+ { channel: "whatsapp", to: "+1", payloads: [{ text: "blocked" }] },
+ tmpDir,
+ );
+ const readyId = await enqueueDelivery(
+ { channel: "telegram", to: "2", payloads: [{ text: "ready" }] },
+ tmpDir,
+ );
+
+ setEntryState(blockedId, { retryCount: 3, lastAttemptAt: now, enqueuedAt: now - 30_000 });
+ setEntryState(readyId, { retryCount: 0, enqueuedAt: now - 10_000 });
+
+ const deliver = vi.fn().mockResolvedValue([]);
+ const { result } = await runRecovery({ deliver, maxRecoveryMs: 60_000 });
+
+ expect(result).toEqual({
+ recovered: 1,
+ failed: 0,
+ skippedMaxRetries: 0,
+ deferredBackoff: 1,
+ });
+ expect(deliver).toHaveBeenCalledTimes(1);
+ expect(deliver).toHaveBeenCalledWith(
+ expect.objectContaining({ channel: "telegram", to: "2", skipQueue: true }),
+ );
+
+ const remaining = await loadPendingDeliveries(tmpDir);
+ expect(remaining).toHaveLength(1);
+ expect(remaining[0]?.id).toBe(blockedId);
+ });
+
+ it("recovers deferred entries on a later restart once backoff elapsed", async () => {
+ vi.useFakeTimers();
+ const start = new Date("2026-01-01T00:00:00.000Z");
+ vi.setSystemTime(start);
+
+ const id = await enqueueDelivery(
+ { channel: "whatsapp", to: "+1", payloads: [{ text: "later" }] },
+ tmpDir,
+ );
+ setEntryState(id, { retryCount: 3, lastAttemptAt: start.getTime() });
+
+ const firstDeliver = vi.fn().mockResolvedValue([]);
+ const firstRun = await runRecovery({ deliver: firstDeliver, maxRecoveryMs: 60_000 });
+ expect(firstRun.result).toEqual({
+ recovered: 0,
+ failed: 0,
+ skippedMaxRetries: 0,
+ deferredBackoff: 1,
+ });
+ expect(firstDeliver).not.toHaveBeenCalled();
+
+ vi.setSystemTime(new Date(start.getTime() + 600_000 + 1));
+ const secondDeliver = vi.fn().mockResolvedValue([]);
+ const secondRun = await runRecovery({ deliver: secondDeliver, maxRecoveryMs: 60_000 });
+ expect(secondRun.result).toEqual({
+ recovered: 1,
+ failed: 0,
+ skippedMaxRetries: 0,
+ deferredBackoff: 0,
+ });
+ expect(secondDeliver).toHaveBeenCalledTimes(1);
+
+ const remaining = await loadPendingDeliveries(tmpDir);
+ expect(remaining).toHaveLength(0);
+
+ vi.useRealTimers();
+ });
+
+ it("returns zeros when queue is empty", async () => {
+ const deliver = vi.fn();
+ const { result } = await runRecovery({ deliver });
+
+ expect(result).toEqual({
+ recovered: 0,
+ failed: 0,
+ skippedMaxRetries: 0,
+ deferredBackoff: 0,
+ });
+ expect(deliver).not.toHaveBeenCalled();
+ });
+ });
+});
+
+describe("DirectoryCache", () => {
+ const cfg = {} as OpenClawConfig;
+
+ afterEach(() => {
+ vi.useRealTimers();
+ });
+
+ it("expires entries after ttl", () => {
+ vi.useFakeTimers();
+ vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z"));
+ const cache = new DirectoryCache(1000, 10);
+
+ cache.set("a", "value-a", cfg);
+ expect(cache.get("a", cfg)).toBe("value-a");
+
+ vi.setSystemTime(new Date("2026-01-01T00:00:02.000Z"));
+ expect(cache.get("a", cfg)).toBeUndefined();
+ });
+
+ it("evicts least-recent entries when capacity is exceeded", () => {
+ const cases = [
+ {
+ actions: [
+ ["set", "a", "value-a"],
+ ["set", "b", "value-b"],
+ ["set", "c", "value-c"],
+ ] as const,
+ expected: { a: undefined, b: "value-b", c: "value-c" },
+ },
+ {
+ actions: [
+ ["set", "a", "value-a"],
+ ["set", "b", "value-b"],
+ ["set", "a", "value-a2"],
+ ["set", "c", "value-c"],
+ ] as const,
+ expected: { a: "value-a2", b: undefined, c: "value-c" },
+ },
+ ];
+
+ for (const testCase of cases) {
+ const cache = new DirectoryCache(60_000, 2);
+ for (const action of testCase.actions) {
+ cache.set(action[1], action[2], cfg);
+ }
+ expect(cache.get("a", cfg)).toBe(testCase.expected.a);
+ expect(cache.get("b", cfg)).toBe(testCase.expected.b);
+ expect(cache.get("c", cfg)).toBe(testCase.expected.c);
+ }
+ });
+});
+
+describe("buildOutboundResultEnvelope", () => {
+ it("formats envelope variants", () => {
+ const whatsappDelivery: OutboundDeliveryJson = {
+ channel: "whatsapp",
+ via: "gateway",
+ to: "+1",
+ messageId: "m1",
+ mediaUrl: null,
+ };
+ const telegramDelivery: OutboundDeliveryJson = {
+ channel: "telegram",
+ via: "direct",
+ to: "123",
+ messageId: "m2",
+ mediaUrl: null,
+ chatId: "c1",
+ };
+ const discordDelivery: OutboundDeliveryJson = {
+ channel: "discord",
+ via: "gateway",
+ to: "channel:C1",
+ messageId: "m3",
+ mediaUrl: null,
+ channelId: "C1",
+ };
+ const cases = typedCases<{
+ name: string;
+ input: Parameters[0];
+ expected: unknown;
+ }>([
+ {
+ name: "flatten delivery by default",
+ input: { delivery: whatsappDelivery },
+ expected: whatsappDelivery,
+ },
+ {
+ name: "keep payloads + meta",
+ input: {
+ payloads: [{ text: "hi", mediaUrl: null, mediaUrls: undefined }],
+ meta: { foo: "bar" },
+ },
+ expected: {
+ payloads: [{ text: "hi", mediaUrl: null, mediaUrls: undefined }],
+ meta: { foo: "bar" },
+ },
+ },
+ {
+ name: "include delivery when payloads exist",
+ input: { payloads: [], delivery: telegramDelivery, meta: { ok: true } },
+ expected: {
+ payloads: [],
+ meta: { ok: true },
+ delivery: telegramDelivery,
+ },
+ },
+ {
+ name: "keep wrapped delivery when flatten disabled",
+ input: { delivery: discordDelivery, flattenDelivery: false },
+ expected: { delivery: discordDelivery },
+ },
+ ]);
+ for (const testCase of cases) {
+ expect(buildOutboundResultEnvelope(testCase.input), testCase.name).toEqual(testCase.expected);
+ }
+ });
+});
+
+describe("formatOutboundDeliverySummary", () => {
+ it("formats fallback and channel-specific detail variants", () => {
+ const cases = [
+ {
+ name: "fallback telegram",
+ channel: "telegram" as const,
+ result: undefined,
+ expected: "✅ Sent via Telegram. Message ID: unknown",
+ },
+ {
+ name: "fallback imessage",
+ channel: "imessage" as const,
+ result: undefined,
+ expected: "✅ Sent via iMessage. Message ID: unknown",
+ },
+ {
+ name: "telegram with chat detail",
+ channel: "telegram" as const,
+ result: {
+ channel: "telegram" as const,
+ messageId: "m1",
+ chatId: "c1",
+ },
+ expected: "✅ Sent via Telegram. Message ID: m1 (chat c1)",
+ },
+ {
+ name: "discord with channel detail",
+ channel: "discord" as const,
+ result: {
+ channel: "discord" as const,
+ messageId: "d1",
+ channelId: "chan",
+ },
+ expected: "✅ Sent via Discord. Message ID: d1 (channel chan)",
+ },
+ ];
+
+ for (const testCase of cases) {
+ expect(formatOutboundDeliverySummary(testCase.channel, testCase.result), testCase.name).toBe(
+ testCase.expected,
+ );
+ }
+ });
+});
+
+describe("buildOutboundDeliveryJson", () => {
+ it("builds direct delivery payloads across provider-specific fields", () => {
+ const cases = [
+ {
+ name: "telegram direct payload",
+ input: {
+ channel: "telegram" as const,
+ to: "123",
+ result: { channel: "telegram" as const, messageId: "m1", chatId: "c1" },
+ mediaUrl: "https://example.com/a.png",
+ },
+ expected: {
+ channel: "telegram",
+ via: "direct",
+ to: "123",
+ messageId: "m1",
+ mediaUrl: "https://example.com/a.png",
+ chatId: "c1",
+ },
+ },
+ {
+ name: "whatsapp metadata",
+ input: {
+ channel: "whatsapp" as const,
+ to: "+1",
+ result: { channel: "whatsapp" as const, messageId: "w1", toJid: "jid" },
+ },
+ expected: {
+ channel: "whatsapp",
+ via: "direct",
+ to: "+1",
+ messageId: "w1",
+ mediaUrl: null,
+ toJid: "jid",
+ },
+ },
+ {
+ name: "signal timestamp",
+ input: {
+ channel: "signal" as const,
+ to: "+1",
+ result: { channel: "signal" as const, messageId: "s1", timestamp: 123 },
+ },
+ expected: {
+ channel: "signal",
+ via: "direct",
+ to: "+1",
+ messageId: "s1",
+ mediaUrl: null,
+ timestamp: 123,
+ },
+ },
+ ];
+
+ for (const testCase of cases) {
+ expect(buildOutboundDeliveryJson(testCase.input), testCase.name).toEqual(testCase.expected);
+ }
+ });
+});
+
+describe("formatGatewaySummary", () => {
+ it("formats default and custom gateway action summaries", () => {
+ const cases = [
+ {
+ name: "default send action",
+ input: { channel: "whatsapp", messageId: "m1" },
+ expected: "✅ Sent via gateway (whatsapp). Message ID: m1",
+ },
+ {
+ name: "custom action",
+ input: { action: "Poll sent", channel: "discord", messageId: "p1" },
+ expected: "✅ Poll sent via gateway (discord). Message ID: p1",
+ },
+ ];
+
+ for (const testCase of cases) {
+ expect(formatGatewaySummary(testCase.input), testCase.name).toBe(testCase.expected);
+ }
+ });
+});
+
+const slackConfig = {
+ channels: {
+ slack: {
+ botToken: "xoxb-test",
+ appToken: "xapp-test",
+ },
+ },
+} as OpenClawConfig;
+
+const discordConfig = {
+ channels: {
+ discord: {},
+ },
+} as OpenClawConfig;
+
+describe("outbound policy", () => {
+ it("allows cross-provider sends when enabled", () => {
+ const cfg = {
+ ...slackConfig,
+ tools: {
+ message: { crossContext: { allowAcrossProviders: true } },
+ },
+ } as OpenClawConfig;
+
+ expect(() =>
+ enforceCrossContextPolicy({
+ cfg,
+ channel: "telegram",
+ action: "send",
+ args: { to: "telegram:@ops" },
+ toolContext: { currentChannelId: "C12345678", currentChannelProvider: "slack" },
+ }),
+ ).not.toThrow();
+ });
+
+ it("uses components when available and preferred", async () => {
+ const decoration = await buildCrossContextDecoration({
+ cfg: discordConfig,
+ channel: "discord",
+ target: "123",
+ toolContext: { currentChannelId: "C12345678", currentChannelProvider: "discord" },
+ });
+
+ expect(decoration).not.toBeNull();
+ const applied = applyCrossContextDecoration({
+ message: "hello",
+ decoration: decoration!,
+ preferComponents: true,
+ });
+
+ expect(applied.usedComponents).toBe(true);
+ expect(applied.componentsBuilder).toBeDefined();
+ expect(applied.componentsBuilder?.("hello").length).toBeGreaterThan(0);
+ expect(applied.message).toBe("hello");
+ });
+});
+
+describe("resolveOutboundSessionRoute", () => {
+ const baseConfig = {} as OpenClawConfig;
+
+ it("resolves provider-specific session routes", async () => {
+ const perChannelPeerCfg = { session: { dmScope: "per-channel-peer" } } as OpenClawConfig;
+ const identityLinksCfg = {
+ session: {
+ dmScope: "per-peer",
+ identityLinks: {
+ alice: ["discord:123"],
+ },
+ },
+ } as OpenClawConfig;
+ const slackMpimCfg = {
+ channels: {
+ slack: {
+ dm: {
+ groupChannels: ["G123"],
+ },
+ },
+ },
+ } as OpenClawConfig;
+ const cases: Array<{
+ name: string;
+ cfg: OpenClawConfig;
+ channel: string;
+ target: string;
+ replyToId?: string;
+ threadId?: string;
+ expected: {
+ sessionKey: string;
+ from?: string;
+ to?: string;
+ threadId?: string | number;
+ chatType?: "direct" | "group";
+ };
+ }> = [
+ {
+ name: "Slack thread",
+ cfg: baseConfig,
+ channel: "slack",
+ target: "channel:C123",
+ replyToId: "456",
+ expected: {
+ sessionKey: "agent:main:slack:channel:c123:thread:456",
+ from: "slack:channel:C123",
+ to: "channel:C123",
+ threadId: "456",
+ },
+ },
+ {
+ name: "Telegram topic group",
+ cfg: baseConfig,
+ channel: "telegram",
+ target: "-100123456:topic:42",
+ expected: {
+ sessionKey: "agent:main:telegram:group:-100123456:topic:42",
+ from: "telegram:group:-100123456:topic:42",
+ to: "telegram:-100123456",
+ threadId: 42,
+ },
+ },
+ {
+ name: "Telegram DM with topic",
+ cfg: perChannelPeerCfg,
+ channel: "telegram",
+ target: "123456789:topic:99",
+ expected: {
+ sessionKey: "agent:main:telegram:direct:123456789:thread:99",
+ from: "telegram:123456789:topic:99",
+ to: "telegram:123456789",
+ threadId: 99,
+ chatType: "direct",
+ },
+ },
+ {
+ name: "Telegram unresolved username DM",
+ cfg: perChannelPeerCfg,
+ channel: "telegram",
+ target: "@alice",
+ expected: {
+ sessionKey: "agent:main:telegram:direct:@alice",
+ chatType: "direct",
+ },
+ },
+ {
+ name: "Telegram DM scoped threadId fallback",
+ cfg: perChannelPeerCfg,
+ channel: "telegram",
+ target: "12345",
+ threadId: "12345:99",
+ expected: {
+ sessionKey: "agent:main:telegram:direct:12345:thread:99",
+ from: "telegram:12345:topic:99",
+ to: "telegram:12345",
+ threadId: 99,
+ chatType: "direct",
+ },
+ },
+ {
+ name: "identity-links per-peer",
+ cfg: identityLinksCfg,
+ channel: "discord",
+ target: "user:123",
+ expected: {
+ sessionKey: "agent:main:direct:alice",
+ },
+ },
+ {
+ name: "BlueBubbles chat_* prefix stripping",
+ cfg: baseConfig,
+ channel: "bluebubbles",
+ target: "chat_guid:ABC123",
+ expected: {
+ sessionKey: "agent:main:bluebubbles:group:abc123",
+ from: "group:ABC123",
+ },
+ },
+ {
+ name: "Zalo Personal DM target",
+ cfg: perChannelPeerCfg,
+ channel: "zalouser",
+ target: "123456",
+ expected: {
+ sessionKey: "agent:main:zalouser:direct:123456",
+ chatType: "direct",
+ },
+ },
+ {
+ name: "Slack mpim allowlist -> group key",
+ cfg: slackMpimCfg,
+ channel: "slack",
+ target: "channel:G123",
+ expected: {
+ sessionKey: "agent:main:slack:group:g123",
+ from: "slack:group:G123",
+ },
+ },
+ {
+ name: "Feishu explicit group prefix keeps group routing",
+ cfg: baseConfig,
+ channel: "feishu",
+ target: "group:oc_group_chat",
+ expected: {
+ sessionKey: "agent:main:feishu:group:oc_group_chat",
+ from: "feishu:group:oc_group_chat",
+ to: "oc_group_chat",
+ chatType: "group",
+ },
+ },
+ {
+ name: "Feishu explicit dm prefix keeps direct routing",
+ cfg: perChannelPeerCfg,
+ channel: "feishu",
+ target: "dm:oc_dm_chat",
+ expected: {
+ sessionKey: "agent:main:feishu:direct:oc_dm_chat",
+ from: "feishu:oc_dm_chat",
+ to: "oc_dm_chat",
+ chatType: "direct",
+ },
+ },
+ {
+ name: "Feishu bare oc_ target defaults to direct routing",
+ cfg: perChannelPeerCfg,
+ channel: "feishu",
+ target: "oc_ambiguous_chat",
+ expected: {
+ sessionKey: "agent:main:feishu:direct:oc_ambiguous_chat",
+ from: "feishu:oc_ambiguous_chat",
+ to: "oc_ambiguous_chat",
+ chatType: "direct",
+ },
+ },
+ ];
+
+ for (const testCase of cases) {
+ const route = await resolveOutboundSessionRoute({
+ cfg: testCase.cfg,
+ channel: testCase.channel,
+ agentId: "main",
+ target: testCase.target,
+ replyToId: testCase.replyToId,
+ threadId: testCase.threadId,
+ });
+ expect(route?.sessionKey, testCase.name).toBe(testCase.expected.sessionKey);
+ if (testCase.expected.from !== undefined) {
+ expect(route?.from, testCase.name).toBe(testCase.expected.from);
+ }
+ if (testCase.expected.to !== undefined) {
+ expect(route?.to, testCase.name).toBe(testCase.expected.to);
+ }
+ if (testCase.expected.threadId !== undefined) {
+ expect(route?.threadId, testCase.name).toBe(testCase.expected.threadId);
+ }
+ if (testCase.expected.chatType !== undefined) {
+ expect(route?.chatType, testCase.name).toBe(testCase.expected.chatType);
+ }
+ }
+ });
+
+ it("uses resolved Discord user targets to route bare numeric ids as DMs", async () => {
+ const route = await resolveOutboundSessionRoute({
+ cfg: { session: { dmScope: "per-channel-peer" } } as OpenClawConfig,
+ channel: "discord",
+ agentId: "main",
+ target: "123",
+ resolvedTarget: {
+ to: "user:123",
+ kind: "user",
+ source: "directory",
+ },
+ });
+
+ expect(route).toMatchObject({
+ sessionKey: "agent:main:discord:direct:123",
+ from: "discord:123",
+ to: "user:123",
+ chatType: "direct",
+ });
+ });
+
+ it("uses resolved Mattermost user targets to route bare ids as DMs", async () => {
+ const userId = "dthcxgoxhifn3pwh65cut3ud3w";
+ const route = await resolveOutboundSessionRoute({
+ cfg: { session: { dmScope: "per-channel-peer" } } as OpenClawConfig,
+ channel: "mattermost",
+ agentId: "main",
+ target: userId,
+ resolvedTarget: {
+ to: `user:${userId}`,
+ kind: "user",
+ source: "directory",
+ },
+ });
+
+ expect(route).toMatchObject({
+ sessionKey: `agent:main:mattermost:direct:${userId}`,
+ from: `mattermost:${userId}`,
+ to: `user:${userId}`,
+ chatType: "direct",
+ });
+ });
+
+ it("rejects bare numeric Discord targets when the caller has no kind hint", async () => {
+ await expect(
+ resolveOutboundSessionRoute({
+ cfg: { session: { dmScope: "per-channel-peer" } } as OpenClawConfig,
+ channel: "discord",
+ agentId: "main",
+ target: "123",
+ }),
+ ).rejects.toThrow(/Ambiguous Discord recipient/);
+ });
+});
+
+describe("normalizeOutboundPayloadsForJson", () => {
+ it("normalizes payloads for JSON output", () => {
+ const cases = typedCases<{
+ input: Parameters[0];
+ expected: ReturnType;
+ }>([
+ {
+ input: [
+ { text: "hi" },
+ { text: "photo", mediaUrl: "https://x.test/a.jpg" },
+ { text: "multi", mediaUrls: ["https://x.test/1.png"] },
+ ],
+ expected: [
+ { text: "hi", mediaUrl: null, mediaUrls: undefined, channelData: undefined },
+ {
+ text: "photo",
+ mediaUrl: "https://x.test/a.jpg",
+ mediaUrls: ["https://x.test/a.jpg"],
+ channelData: undefined,
+ },
+ {
+ text: "multi",
+ mediaUrl: null,
+ mediaUrls: ["https://x.test/1.png"],
+ channelData: undefined,
+ },
+ ],
+ },
+ {
+ input: [
+ {
+ text: "MEDIA:https://x.test/a.png\nMEDIA:https://x.test/b.png",
+ },
+ ],
+ expected: [
+ {
+ text: "",
+ mediaUrl: null,
+ mediaUrls: ["https://x.test/a.png", "https://x.test/b.png"],
+ channelData: undefined,
+ },
+ ],
+ },
+ ]);
+
+ for (const testCase of cases) {
+ const input: ReplyPayload[] = testCase.input.map((payload) =>
+ "mediaUrls" in payload
+ ? ({
+ ...payload,
+ mediaUrls: payload.mediaUrls ? [...payload.mediaUrls] : undefined,
+ } as ReplyPayload)
+ : ({ ...payload } as ReplyPayload),
+ );
+ expect(normalizeOutboundPayloadsForJson(input)).toEqual(testCase.expected);
+ }
+ });
+
+ it("suppresses reasoning payloads", () => {
+ const normalized = normalizeOutboundPayloadsForJson([
+ { text: "Reasoning:\n_step_", isReasoning: true },
+ { text: "final answer" },
+ ]);
+ expect(normalized).toEqual([{ text: "final answer", mediaUrl: null, mediaUrls: undefined }]);
+ });
+});
+
+describe("normalizeOutboundPayloads", () => {
+ it("keeps channelData-only payloads", () => {
+ const channelData = { line: { flexMessage: { altText: "Card", contents: {} } } };
+ const normalized = normalizeOutboundPayloads([{ channelData }]);
+ expect(normalized).toEqual([{ text: "", mediaUrls: [], channelData }]);
+ });
+
+ it("suppresses reasoning payloads", () => {
+ const normalized = normalizeOutboundPayloads([
+ { text: "Reasoning:\n_step_", isReasoning: true },
+ { text: "final answer" },
+ ]);
+ expect(normalized).toEqual([{ text: "final answer", mediaUrls: [] }]);
+ });
+
+ it("prefixes BTW replies for external delivery", () => {
+ const normalized = normalizeOutboundPayloads([
+ {
+ text: "323",
+ btw: { question: "what is 17 * 19?" },
+ },
+ ]);
+ expect(normalized).toEqual([{ text: "BTW: 323", mediaUrls: [] }]);
+ });
+});
+
+describe("formatOutboundPayloadLog", () => {
+ it("formats text+media and media-only logs", () => {
+ const cases = typedCases<{
+ name: string;
+ input: Parameters[0];
+ expected: string;
+ }>([
+ {
+ name: "text with media lines",
+ input: {
+ text: "hello ",
+ mediaUrls: ["https://x.test/a.png", "https://x.test/b.png"],
+ },
+ expected: "hello\nMEDIA:https://x.test/a.png\nMEDIA:https://x.test/b.png",
+ },
+ {
+ name: "media only",
+ input: {
+ text: "",
+ mediaUrls: ["https://x.test/a.png"],
+ },
+ expected: "MEDIA:https://x.test/a.png",
+ },
+ ]);
+
+ for (const testCase of cases) {
+ expect(
+ formatOutboundPayloadLog({
+ ...testCase.input,
+ mediaUrls: [...testCase.input.mediaUrls],
+ }),
+ testCase.name,
+ ).toBe(testCase.expected);
+ }
+ });
+});
+
runResolveOutboundTargetCoreTests();
diff --git a/src/infra/outbound/payloads.ts b/src/infra/outbound/payloads.ts
index 9dae6a6c1e6..754d3434445 100644
--- a/src/infra/outbound/payloads.ts
+++ b/src/infra/outbound/payloads.ts
@@ -1,5 +1,6 @@
import { parseReplyDirectives } from "../../auto-reply/reply/reply-directives.js";
import {
+ formatBtwTextForExternalDelivery,
isRenderablePayload,
shouldSuppressReasoningPayload,
} from "../../auto-reply/reply/reply-payloads.js";
@@ -59,7 +60,11 @@ export function normalizeReplyPayloadsForDelivery(
const resolvedMediaUrl = hasMultipleMedia ? undefined : explicitMediaUrl;
const next: ReplyPayload = {
...payload,
- text: parsed.text ?? "",
+ text:
+ formatBtwTextForExternalDelivery({
+ ...payload,
+ text: parsed.text ?? "",
+ }) ?? "",
mediaUrls: mergedMedia.length ? mergedMedia : undefined,
mediaUrl: resolvedMediaUrl,
replyToId: payload.replyToId ?? parsed.replyToId,
diff --git a/src/infra/session-cost-usage.ts b/src/infra/session-cost-usage.ts
index 230ebd60c2e..a795ebc17ed 100644
--- a/src/infra/session-cost-usage.ts
+++ b/src/infra/session-cost-usage.ts
@@ -5,6 +5,7 @@ import type { NormalizedUsage, UsageLike } from "../agents/usage.js";
import { normalizeUsage } from "../agents/usage.js";
import { stripInboundMetadata } from "../auto-reply/reply/strip-inbound-meta.js";
import type { OpenClawConfig } from "../config/config.js";
+import { isPrimarySessionTranscriptFileName } from "../config/sessions.js";
import {
resolveSessionFilePath,
resolveSessionTranscriptsDirForAgent,
@@ -318,7 +319,7 @@ export async function loadCostUsageSummary(params?: {
const files = (
await Promise.all(
entries
- .filter((entry) => entry.isFile() && entry.name.endsWith(".jsonl"))
+ .filter((entry) => entry.isFile() && isPrimarySessionTranscriptFileName(entry.name))
.map(async (entry) => {
const filePath = path.join(sessionsDir, entry.name);
const stats = await fs.promises.stat(filePath).catch(() => null);
@@ -393,7 +394,7 @@ export async function discoverAllSessions(params?: {
const discovered: DiscoveredSession[] = [];
for (const entry of entries) {
- if (!entry.isFile() || !entry.name.endsWith(".jsonl")) {
+ if (!entry.isFile() || !isPrimarySessionTranscriptFileName(entry.name)) {
continue;
}
diff --git a/src/sessions/side-results.ts b/src/sessions/side-results.ts
new file mode 100644
index 00000000000..881ca159812
--- /dev/null
+++ b/src/sessions/side-results.ts
@@ -0,0 +1,37 @@
+import fs from "node:fs";
+import path from "node:path";
+
+export const SESSION_SIDE_RESULTS_SUFFIX = ".side-results.jsonl";
+
+export type PersistedSessionSideResult = {
+ kind: "btw";
+ question: string;
+ text: string;
+ ts: number;
+ isError?: boolean;
+ provider?: string;
+ model?: string;
+ thinkingLevel?: string;
+ reasoningLevel?: string;
+ sessionKey?: string;
+ authProfileId?: string;
+ authProfileIdSource?: "auto" | "user";
+ usage?: unknown;
+};
+
+export function resolveSessionSideResultsPathFromTranscript(transcriptPath: string): string {
+ const resolved = path.resolve(transcriptPath.trim());
+ return resolved.endsWith(".jsonl")
+ ? `${resolved.slice(0, -".jsonl".length)}${SESSION_SIDE_RESULTS_SUFFIX}`
+ : `${resolved}${SESSION_SIDE_RESULTS_SUFFIX}`;
+}
+
+export function appendSessionSideResult(params: {
+ transcriptPath: string;
+ result: PersistedSessionSideResult;
+}) {
+ const filePath = resolveSessionSideResultsPathFromTranscript(params.transcriptPath);
+ fs.mkdirSync(path.dirname(filePath), { recursive: true });
+ fs.appendFileSync(filePath, `${JSON.stringify(params.result)}\n`, "utf-8");
+ return filePath;
+}
diff --git a/src/tui/components/btw-inline-message.test.ts b/src/tui/components/btw-inline-message.test.ts
new file mode 100644
index 00000000000..8cd323708c2
--- /dev/null
+++ b/src/tui/components/btw-inline-message.test.ts
@@ -0,0 +1,16 @@
+import { describe, expect, it } from "vitest";
+import { BtwInlineMessage } from "./btw-inline-message.js";
+
+describe("btw inline message", () => {
+ it("renders the BTW question, answer, and dismiss hint inline", () => {
+ const message = new BtwInlineMessage({
+ question: "what is 17 * 19?",
+ text: "323",
+ });
+
+ const rendered = message.render(80).join("\n");
+ expect(rendered).toContain("BTW: what is 17 * 19?");
+ expect(rendered).toContain("323");
+ expect(rendered).toContain("Press Enter or Esc to dismiss");
+ });
+});
diff --git a/src/tui/components/btw-inline-message.ts b/src/tui/components/btw-inline-message.ts
new file mode 100644
index 00000000000..7aa813a457e
--- /dev/null
+++ b/src/tui/components/btw-inline-message.ts
@@ -0,0 +1,28 @@
+import { Container, Spacer, Text } from "@mariozechner/pi-tui";
+import { theme } from "../theme/theme.js";
+import { AssistantMessageComponent } from "./assistant-message.js";
+
+type BtwInlineMessageParams = {
+ question: string;
+ text: string;
+ isError?: boolean;
+};
+
+export class BtwInlineMessage extends Container {
+ constructor(params: BtwInlineMessageParams) {
+ super();
+ this.setResult(params);
+ }
+
+ setResult(params: BtwInlineMessageParams) {
+ this.clear();
+ this.addChild(new Spacer(1));
+ this.addChild(new Text(theme.header(`BTW: ${params.question}`), 1, 0));
+ if (params.isError) {
+ this.addChild(new Text(theme.error(params.text), 1, 0));
+ } else {
+ this.addChild(new AssistantMessageComponent(params.text));
+ }
+ this.addChild(new Text(theme.dim("Press Enter or Esc to dismiss"), 1, 0));
+ }
+}
diff --git a/src/tui/components/chat-log.test.ts b/src/tui/components/chat-log.test.ts
index b81740a2e8c..700a2abb9d2 100644
--- a/src/tui/components/chat-log.test.ts
+++ b/src/tui/components/chat-log.test.ts
@@ -52,4 +52,25 @@ describe("ChatLog", () => {
expect(chatLog.children.length).toBe(20);
});
+
+ it("renders BTW inline and removes it when dismissed", () => {
+ const chatLog = new ChatLog(40);
+
+ chatLog.addSystem("session agent:main:main");
+ chatLog.showBtw({
+ question: "what is 17 * 19?",
+ text: "323",
+ });
+
+ let rendered = chatLog.render(120).join("\n");
+ expect(rendered).toContain("BTW: what is 17 * 19?");
+ expect(rendered).toContain("323");
+ expect(chatLog.hasVisibleBtw()).toBe(true);
+
+ chatLog.dismissBtw();
+
+ rendered = chatLog.render(120).join("\n");
+ expect(rendered).not.toContain("BTW: what is 17 * 19?");
+ expect(chatLog.hasVisibleBtw()).toBe(false);
+ });
});
diff --git a/src/tui/components/chat-log.ts b/src/tui/components/chat-log.ts
index 76ac7d93654..c46e6065b9b 100644
--- a/src/tui/components/chat-log.ts
+++ b/src/tui/components/chat-log.ts
@@ -2,6 +2,7 @@ import type { Component } from "@mariozechner/pi-tui";
import { Container, Spacer, Text } from "@mariozechner/pi-tui";
import { theme } from "../theme/theme.js";
import { AssistantMessageComponent } from "./assistant-message.js";
+import { BtwInlineMessage } from "./btw-inline-message.js";
import { ToolExecutionComponent } from "./tool-execution.js";
import { UserMessageComponent } from "./user-message.js";
@@ -9,6 +10,7 @@ export class ChatLog extends Container {
private readonly maxComponents: number;
private toolById = new Map();
private streamingRuns = new Map();
+ private btwMessage: BtwInlineMessage | null = null;
private toolsExpanded = false;
constructor(maxComponents = 180) {
@@ -27,6 +29,9 @@ export class ChatLog extends Container {
this.streamingRuns.delete(runId);
}
}
+ if (this.btwMessage === component) {
+ this.btwMessage = null;
+ }
}
private pruneOverflow() {
@@ -49,6 +54,7 @@ export class ChatLog extends Container {
this.clear();
this.toolById.clear();
this.streamingRuns.clear();
+ this.btwMessage = null;
}
addSystem(text: string) {
@@ -108,6 +114,33 @@ export class ChatLog extends Container {
this.streamingRuns.delete(effectiveRunId);
}
+ showBtw(params: { question: string; text: string; isError?: boolean }) {
+ if (this.btwMessage) {
+ this.btwMessage.setResult(params);
+ if (this.children[this.children.length - 1] !== this.btwMessage) {
+ this.removeChild(this.btwMessage);
+ this.append(this.btwMessage);
+ }
+ return this.btwMessage;
+ }
+ const component = new BtwInlineMessage(params);
+ this.btwMessage = component;
+ this.append(component);
+ return component;
+ }
+
+ dismissBtw() {
+ if (!this.btwMessage) {
+ return;
+ }
+ this.removeChild(this.btwMessage);
+ this.btwMessage = null;
+ }
+
+ hasVisibleBtw() {
+ return this.btwMessage !== null;
+ }
+
startTool(toolCallId: string, toolName: string, args: unknown) {
const existing = this.toolById.get(toolCallId);
if (existing) {
diff --git a/src/tui/tui-command-handlers.test.ts b/src/tui/tui-command-handlers.test.ts
index 4e4bfe3c36f..87f68217e3a 100644
--- a/src/tui/tui-command-handlers.test.ts
+++ b/src/tui/tui-command-handlers.test.ts
@@ -12,6 +12,7 @@ function createHarness(params?: {
loadHistory?: LoadHistoryMock;
setActivityStatus?: SetActivityStatusMock;
isConnected?: boolean;
+ activeChatRunId?: string | null;
}) {
const sendChat = params?.sendChat ?? vi.fn().mockResolvedValue({ runId: "r1" });
const resetSession = params?.resetSession ?? vi.fn().mockResolvedValue({ ok: true });
@@ -19,21 +20,23 @@ function createHarness(params?: {
const addUser = vi.fn();
const addSystem = vi.fn();
const requestRender = vi.fn();
+ const noteLocalRunId = vi.fn();
const loadHistory =
params?.loadHistory ?? (vi.fn().mockResolvedValue(undefined) as LoadHistoryMock);
const setActivityStatus = params?.setActivityStatus ?? (vi.fn() as SetActivityStatusMock);
+ const state = {
+ currentSessionKey: "agent:main:main",
+ activeChatRunId: params?.activeChatRunId ?? null,
+ isConnected: params?.isConnected ?? true,
+ sessionInfo: {},
+ };
const { handleCommand } = createCommandHandlers({
client: { sendChat, resetSession } as never,
chatLog: { addUser, addSystem } as never,
tui: { requestRender } as never,
opts: {},
- state: {
- currentSessionKey: "agent:main:main",
- activeChatRunId: null,
- isConnected: params?.isConnected ?? true,
- sessionInfo: {},
- } as never,
+ state: state as never,
deliverDefault: false,
openOverlay: vi.fn(),
closeOverlay: vi.fn(),
@@ -45,7 +48,7 @@ function createHarness(params?: {
setActivityStatus,
formatSessionKey: vi.fn(),
applySessionInfoFromPatch: vi.fn(),
- noteLocalRunId: vi.fn(),
+ noteLocalRunId,
forgetLocalRunId: vi.fn(),
requestExit: vi.fn(),
});
@@ -60,6 +63,8 @@ function createHarness(params?: {
requestRender,
loadHistory,
setActivityStatus,
+ noteLocalRunId,
+ state,
};
}
@@ -108,6 +113,27 @@ describe("tui command handlers", () => {
expect(requestRender).toHaveBeenCalled();
});
+ it("sends /btw without hijacking the active main run", async () => {
+ const setActivityStatus = vi.fn();
+ const { handleCommand, sendChat, addUser, noteLocalRunId, state } = createHarness({
+ activeChatRunId: "run-main",
+ setActivityStatus,
+ });
+
+ await handleCommand("/btw what changed?");
+
+ expect(addUser).not.toHaveBeenCalled();
+ expect(noteLocalRunId).not.toHaveBeenCalled();
+ expect(state.activeChatRunId).toBe("run-main");
+ expect(setActivityStatus).not.toHaveBeenCalledWith("sending");
+ expect(setActivityStatus).not.toHaveBeenCalledWith("waiting");
+ expect(sendChat).toHaveBeenCalledWith(
+ expect.objectContaining({
+ message: "/btw what changed?",
+ }),
+ );
+ });
+
it("creates unique session for /new and resets shared session for /reset", async () => {
const loadHistory = vi.fn().mockResolvedValue(undefined);
const setSessionMock = vi.fn().mockResolvedValue(undefined) as SetSessionMock;
diff --git a/src/tui/tui-command-handlers.ts b/src/tui/tui-command-handlers.ts
index dd5113a17af..9a6d63f53d8 100644
--- a/src/tui/tui-command-handlers.ts
+++ b/src/tui/tui-command-handlers.ts
@@ -47,6 +47,10 @@ type CommandHandlerContext = {
requestExit: () => void;
};
+function isBtwCommand(text: string): boolean {
+ return /^\/btw(?::|\s|$)/i.test(text.trim());
+}
+
export function createCommandHandlers(context: CommandHandlerContext) {
const {
client,
@@ -501,13 +505,15 @@ export function createCommandHandlers(context: CommandHandlerContext) {
tui.requestRender();
return;
}
+ const isBtw = isBtwCommand(text);
try {
- chatLog.addUser(text);
- tui.requestRender();
const runId = randomUUID();
- noteLocalRunId(runId);
- state.activeChatRunId = runId;
- setActivityStatus("sending");
+ if (!isBtw) {
+ chatLog.addUser(text);
+ noteLocalRunId(runId);
+ state.activeChatRunId = runId;
+ setActivityStatus("sending");
+ }
tui.requestRender();
await client.sendChat({
sessionKey: state.currentSessionKey,
@@ -517,15 +523,21 @@ export function createCommandHandlers(context: CommandHandlerContext) {
timeoutMs: opts.timeoutMs,
runId,
});
- setActivityStatus("waiting");
- tui.requestRender();
+ if (!isBtw) {
+ setActivityStatus("waiting");
+ tui.requestRender();
+ }
} catch (err) {
- if (state.activeChatRunId) {
+ if (!isBtw && state.activeChatRunId) {
forgetLocalRunId?.(state.activeChatRunId);
}
- state.activeChatRunId = null;
- chatLog.addSystem(`send failed: ${String(err)}`);
- setActivityStatus("error");
+ if (!isBtw) {
+ state.activeChatRunId = null;
+ }
+ chatLog.addSystem(`${isBtw ? "btw failed" : "send failed"}: ${String(err)}`);
+ if (!isBtw) {
+ setActivityStatus("error");
+ }
tui.requestRender();
}
};
diff --git a/src/tui/tui-event-handlers.test.ts b/src/tui/tui-event-handlers.test.ts
index 7b08ddceaf5..2e1046fc650 100644
--- a/src/tui/tui-event-handlers.test.ts
+++ b/src/tui/tui-event-handlers.test.ts
@@ -1,6 +1,6 @@
import { describe, expect, it, vi } from "vitest";
import { createEventHandlers } from "./tui-event-handlers.js";
-import type { AgentEvent, ChatEvent, TuiStateAccess } from "./tui-types.js";
+import type { AgentEvent, BtwEvent, ChatEvent, TuiStateAccess } from "./tui-types.js";
type MockFn = ReturnType;
type HandlerChatLog = {
@@ -11,6 +11,10 @@ type HandlerChatLog = {
finalizeAssistant: (...args: unknown[]) => void;
dropAssistant: (...args: unknown[]) => void;
};
+type HandlerBtwPresenter = {
+ showResult: (...args: unknown[]) => void;
+ clear: (...args: unknown[]) => void;
+};
type HandlerTui = { requestRender: (...args: unknown[]) => void };
type MockChatLog = {
startTool: MockFn;
@@ -20,6 +24,10 @@ type MockChatLog = {
finalizeAssistant: MockFn;
dropAssistant: MockFn;
};
+type MockBtwPresenter = {
+ showResult: MockFn;
+ clear: MockFn;
+};
type MockTui = { requestRender: MockFn };
function createMockChatLog(): MockChatLog & HandlerChatLog {
@@ -33,6 +41,13 @@ function createMockChatLog(): MockChatLog & HandlerChatLog {
} as unknown as MockChatLog & HandlerChatLog;
}
+function createMockBtwPresenter(): MockBtwPresenter & HandlerBtwPresenter {
+ return {
+ showResult: vi.fn(),
+ clear: vi.fn(),
+ } as unknown as MockBtwPresenter & HandlerBtwPresenter;
+}
+
describe("tui-event-handlers: handleAgentEvent", () => {
const makeState = (overrides?: Partial): TuiStateAccess => ({
agentDefaultId: "main",
@@ -59,6 +74,7 @@ describe("tui-event-handlers: handleAgentEvent", () => {
const makeContext = (state: TuiStateAccess) => {
const chatLog = createMockChatLog();
+ const btw = createMockBtwPresenter();
const tui = { requestRender: vi.fn() } as unknown as MockTui & HandlerTui;
const setActivityStatus = vi.fn();
const loadHistory = vi.fn();
@@ -72,6 +88,7 @@ describe("tui-event-handlers: handleAgentEvent", () => {
return {
chatLog,
+ btw,
tui,
state,
setActivityStatus,
@@ -86,12 +103,14 @@ describe("tui-event-handlers: handleAgentEvent", () => {
const createHandlersHarness = (params?: {
state?: Partial;
chatLog?: HandlerChatLog;
+ btw?: HandlerBtwPresenter;
}) => {
const state = makeState(params?.state);
const context = makeContext(state);
const chatLog = (params?.chatLog ?? context.chatLog) as MockChatLog & HandlerChatLog;
const handlers = createEventHandlers({
chatLog,
+ btw: (params?.btw ?? context.btw) as MockBtwPresenter & HandlerBtwPresenter,
tui: context.tui,
state,
setActivityStatus: context.setActivityStatus,
@@ -103,6 +122,7 @@ describe("tui-event-handlers: handleAgentEvent", () => {
...context,
state,
chatLog,
+ btw: (params?.btw ?? context.btw) as MockBtwPresenter & HandlerBtwPresenter,
...handlers,
};
};
@@ -212,6 +232,33 @@ describe("tui-event-handlers: handleAgentEvent", () => {
expect(chatLog.updateAssistant).toHaveBeenCalledWith("hello", "run-alias");
});
+ it("renders BTW results separately without disturbing the active run", () => {
+ const { state, btw, setActivityStatus, loadHistory, tui, handleBtwEvent } =
+ createHandlersHarness({
+ state: { activeChatRunId: "run-main" },
+ });
+
+ const evt: BtwEvent = {
+ kind: "btw",
+ runId: "run-btw",
+ sessionKey: state.currentSessionKey,
+ question: "what changed?",
+ text: "nothing important",
+ };
+
+ handleBtwEvent(evt);
+
+ expect(state.activeChatRunId).toBe("run-main");
+ expect(btw.showResult).toHaveBeenCalledWith({
+ question: "what changed?",
+ text: "nothing important",
+ isError: undefined,
+ });
+ expect(setActivityStatus).not.toHaveBeenCalled();
+ expect(loadHistory).not.toHaveBeenCalled();
+ expect(tui.requestRender).toHaveBeenCalledTimes(1);
+ });
+
it("does not cross-match canonical session keys from different agents", () => {
const { chatLog, handleChatEvent } = createHandlersHarness({
state: {
diff --git a/src/tui/tui-event-handlers.ts b/src/tui/tui-event-handlers.ts
index 54e4654ee96..2549bcbba38 100644
--- a/src/tui/tui-event-handlers.ts
+++ b/src/tui/tui-event-handlers.ts
@@ -1,7 +1,7 @@
import { parseAgentSessionKey } from "../sessions/session-key-utils.js";
import { asString, extractTextFromMessage, isCommandMessage } from "./tui-formatters.js";
import { TuiStreamAssembler } from "./tui-stream-assembler.js";
-import type { AgentEvent, ChatEvent, TuiStateAccess } from "./tui-types.js";
+import type { AgentEvent, BtwEvent, ChatEvent, TuiStateAccess } from "./tui-types.js";
type EventHandlerChatLog = {
startTool: (toolCallId: string, toolName: string, args: unknown) => void;
@@ -20,8 +20,14 @@ type EventHandlerTui = {
requestRender: () => void;
};
+type EventHandlerBtwPresenter = {
+ showResult: (params: { question: string; text: string; isError?: boolean }) => void;
+ clear: () => void;
+};
+
type EventHandlerContext = {
chatLog: EventHandlerChatLog;
+ btw: EventHandlerBtwPresenter;
tui: EventHandlerTui;
state: TuiStateAccess;
setActivityStatus: (text: string) => void;
@@ -35,6 +41,7 @@ type EventHandlerContext = {
export function createEventHandlers(context: EventHandlerContext) {
const {
chatLog,
+ btw,
tui,
state,
setActivityStatus,
@@ -81,6 +88,7 @@ export function createEventHandlers(context: EventHandlerContext) {
sessionRuns.clear();
streamAssembler = new TuiStreamAssembler();
clearLocalRunIds?.();
+ btw.clear();
};
const noteSessionRun = (runId: string) => {
@@ -335,5 +343,30 @@ export function createEventHandlers(context: EventHandlerContext) {
}
};
- return { handleChatEvent, handleAgentEvent };
+ const handleBtwEvent = (payload: unknown) => {
+ if (!payload || typeof payload !== "object") {
+ return;
+ }
+ const evt = payload as BtwEvent;
+ syncSessionKey();
+ if (!isSameSessionKey(evt.sessionKey, state.currentSessionKey)) {
+ return;
+ }
+ if (evt.kind !== "btw") {
+ return;
+ }
+ const question = evt.question.trim();
+ const text = evt.text.trim();
+ if (!question || !text) {
+ return;
+ }
+ btw.showResult({
+ question,
+ text,
+ isError: evt.isError,
+ });
+ tui.requestRender();
+ };
+
+ return { handleChatEvent, handleAgentEvent, handleBtwEvent };
}
diff --git a/src/tui/tui-session-actions.test.ts b/src/tui/tui-session-actions.test.ts
index 5e4a427c4a9..64c8d9e6660 100644
--- a/src/tui/tui-session-actions.test.ts
+++ b/src/tui/tui-session-actions.test.ts
@@ -4,6 +4,11 @@ import { createSessionActions } from "./tui-session-actions.js";
import type { TuiStateAccess } from "./tui-types.js";
describe("tui session actions", () => {
+ const createBtwPresenter = () => ({
+ clear: vi.fn(),
+ showResult: vi.fn(),
+ });
+
it("queues session refreshes and applies the latest result", async () => {
let resolveFirst: ((value: unknown) => void) | undefined;
let resolveSecond: ((value: unknown) => void) | undefined;
@@ -52,6 +57,7 @@ describe("tui session actions", () => {
const { refreshSessionInfo } = createSessionActions({
client: { listSessions } as unknown as GatewayChatClient,
chatLog: { addSystem: vi.fn() } as unknown as import("./components/chat-log.js").ChatLog,
+ btw: createBtwPresenter(),
tui: { requestRender } as unknown as import("@mariozechner/pi-tui").TUI,
opts: {},
state,
@@ -157,6 +163,7 @@ describe("tui session actions", () => {
const { applySessionInfoFromPatch, refreshSessionInfo } = createSessionActions({
client: { listSessions } as unknown as GatewayChatClient,
chatLog: { addSystem: vi.fn() } as unknown as import("./components/chat-log.js").ChatLog,
+ btw: createBtwPresenter(),
tui: { requestRender: vi.fn() } as unknown as import("@mariozechner/pi-tui").TUI,
opts: {},
state,
@@ -210,7 +217,15 @@ describe("tui session actions", () => {
const loadHistory = vi.fn().mockResolvedValue({
sessionId: "session-2",
messages: [],
+ sideResults: [
+ {
+ kind: "btw",
+ question: "what changed?",
+ text: "nothing important",
+ },
+ ],
});
+ const btw = createBtwPresenter();
const state: TuiStateAccess = {
agentDefaultId: "main",
@@ -247,6 +262,7 @@ describe("tui session actions", () => {
addSystem: vi.fn(),
clearAll: vi.fn(),
} as unknown as import("./components/chat-log.js").ChatLog,
+ btw,
tui: { requestRender: vi.fn() } as unknown as import("@mariozechner/pi-tui").TUI,
opts: {},
state,
@@ -270,5 +286,7 @@ describe("tui session actions", () => {
expect(state.sessionInfo.model).toBe("session-model");
expect(state.sessionInfo.modelProvider).toBe("openai");
expect(state.sessionInfo.updatedAt).toBe(50);
+ expect(btw.clear).toHaveBeenCalled();
+ expect(btw.showResult).not.toHaveBeenCalled();
});
});
diff --git a/src/tui/tui-session-actions.ts b/src/tui/tui-session-actions.ts
index 406b584599f..99f2b8ab2ee 100644
--- a/src/tui/tui-session-actions.ts
+++ b/src/tui/tui-session-actions.ts
@@ -10,9 +10,14 @@ import type { GatewayAgentsList, GatewayChatClient } from "./gateway-chat.js";
import { asString, extractTextFromMessage, isCommandMessage } from "./tui-formatters.js";
import type { SessionInfo, TuiOptions, TuiStateAccess } from "./tui-types.js";
+type SessionActionBtwPresenter = {
+ clear: () => void;
+};
+
type SessionActionContext = {
client: GatewayChatClient;
chatLog: ChatLog;
+ btw: SessionActionBtwPresenter;
tui: TUI;
opts: TuiOptions;
state: TuiStateAccess;
@@ -42,6 +47,7 @@ export function createSessionActions(context: SessionActionContext) {
const {
client,
chatLog,
+ btw,
tui,
opts,
state,
@@ -298,6 +304,7 @@ export function createSessionActions(context: SessionActionContext) {
state.sessionInfo.verboseLevel = record.verboseLevel ?? state.sessionInfo.verboseLevel;
const showTools = (state.sessionInfo.verboseLevel ?? "off") !== "off";
chatLog.clearAll();
+ btw.clear();
chatLog.addSystem(`session ${state.currentSessionKey}`);
for (const entry of record.messages ?? []) {
if (!entry || typeof entry !== "object") {
@@ -367,6 +374,7 @@ export function createSessionActions(context: SessionActionContext) {
state.sessionInfo.updatedAt = null;
state.historyLoaded = false;
clearLocalRunIds?.();
+ btw.clear();
updateHeader();
updateFooter();
await loadHistory();
diff --git a/src/tui/tui-types.ts b/src/tui/tui-types.ts
index 0f780b0a6bb..eeda9693ebf 100644
--- a/src/tui/tui-types.ts
+++ b/src/tui/tui-types.ts
@@ -18,6 +18,17 @@ export type ChatEvent = {
errorMessage?: string;
};
+export type BtwEvent = {
+ kind: "btw";
+ runId?: string;
+ sessionKey?: string;
+ question: string;
+ text: string;
+ isError?: boolean;
+ seq?: number;
+ ts?: number;
+};
+
export type AgentEvent = {
runId: string;
stream: string;
diff --git a/src/tui/tui.ts b/src/tui/tui.ts
index e1eae539f50..143dfb7d62f 100644
--- a/src/tui/tui.ts
+++ b/src/tui/tui.ts
@@ -771,6 +771,14 @@ export async function runTui(opts: TuiOptions) {
};
const { openOverlay, closeOverlay } = createOverlayHandlers(tui, editor);
+ const btw = {
+ showResult: (params: { question: string; text: string; isError?: boolean }) => {
+ chatLog.showBtw(params);
+ },
+ clear: () => {
+ chatLog.dismissBtw();
+ },
+ };
const initialSessionAgentId = (() => {
if (!initialSessionInput) {
@@ -783,6 +791,7 @@ export async function runTui(opts: TuiOptions) {
const sessionActions = createSessionActions({
client,
chatLog,
+ btw,
tui,
opts,
state,
@@ -805,8 +814,9 @@ export async function runTui(opts: TuiOptions) {
abortActive,
} = sessionActions;
- const { handleChatEvent, handleAgentEvent } = createEventHandlers({
+ const { handleChatEvent, handleAgentEvent, handleBtwEvent } = createEventHandlers({
chatLog,
+ btw,
tui,
state,
setActivityStatus,
@@ -869,6 +879,11 @@ export async function runTui(opts: TuiOptions) {
});
editor.onEscape = () => {
+ if (chatLog.hasVisibleBtw()) {
+ chatLog.dismissBtw();
+ tui.requestRender();
+ return;
+ }
void abortActive();
};
const handleCtrlC = () => {
@@ -918,10 +933,28 @@ export async function runTui(opts: TuiOptions) {
void loadHistory();
};
+ tui.addInputListener((data) => {
+ if (!chatLog.hasVisibleBtw()) {
+ return undefined;
+ }
+ if (editor.getText().length > 0) {
+ return undefined;
+ }
+ if (matchesKey(data, "enter")) {
+ chatLog.dismissBtw();
+ tui.requestRender();
+ return { consume: true };
+ }
+ return undefined;
+ });
+
client.onEvent = (evt) => {
if (evt.event === "chat") {
handleChatEvent(evt.payload);
}
+ if (evt.event === "chat.side_result") {
+ handleBtwEvent(evt.payload);
+ }
if (evt.event === "agent") {
handleAgentEvent(evt.payload);
}