mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-09 16:21:15 +00:00
feat(btw): isolate side-question delivery
This commit is contained in:
@@ -18,6 +18,8 @@ const resolveSessionAuthProfileOverrideMock = vi.fn();
|
||||
const getActiveEmbeddedRunSnapshotMock = vi.fn();
|
||||
const waitForEmbeddedPiRunEndMock = vi.fn();
|
||||
const diagWarnMock = vi.fn();
|
||||
const diagDebugMock = vi.fn();
|
||||
const appendSessionSideResultMock = vi.fn();
|
||||
|
||||
vi.mock("@mariozechner/pi-ai", () => ({
|
||||
streamSimple: (...args: unknown[]) => streamSimpleMock(...args),
|
||||
@@ -70,9 +72,14 @@ vi.mock("./auth-profiles/session-override.js", () => ({
|
||||
vi.mock("../logging/diagnostic.js", () => ({
|
||||
diagnosticLogger: {
|
||||
warn: (...args: unknown[]) => diagWarnMock(...args),
|
||||
debug: (...args: unknown[]) => diagDebugMock(...args),
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock("../sessions/side-results.js", () => ({
|
||||
appendSessionSideResult: (...args: unknown[]) => appendSessionSideResultMock(...args),
|
||||
}));
|
||||
|
||||
const { BTW_CUSTOM_TYPE, runBtwSideQuestion } = await import("./btw.js");
|
||||
|
||||
function makeAsyncEvents(events: unknown[]) {
|
||||
@@ -113,6 +120,8 @@ describe("runBtwSideQuestion", () => {
|
||||
getActiveEmbeddedRunSnapshotMock.mockReset();
|
||||
waitForEmbeddedPiRunEndMock.mockReset();
|
||||
diagWarnMock.mockReset();
|
||||
diagDebugMock.mockReset();
|
||||
appendSessionSideResultMock.mockReset();
|
||||
|
||||
buildSessionContextMock.mockReturnValue({
|
||||
messages: [{ role: "user", content: [{ type: "text", text: "hi" }], timestamp: 1 }],
|
||||
@@ -206,6 +215,14 @@ describe("runBtwSideQuestion", () => {
|
||||
|
||||
expect(result).toBeUndefined();
|
||||
expect(onBlockReply).toHaveBeenCalledWith({ text: "Side answer." });
|
||||
expect(appendSessionSideResultMock).toHaveBeenCalledWith({
|
||||
transcriptPath: expect.stringContaining("session-1.jsonl"),
|
||||
result: expect.objectContaining({
|
||||
kind: "btw",
|
||||
question: "What changed?",
|
||||
text: "Side answer.",
|
||||
}),
|
||||
});
|
||||
expect(appendCustomEntryMock).toHaveBeenCalledWith(
|
||||
BTW_CUSTOM_TYPE,
|
||||
expect.objectContaining({
|
||||
@@ -278,6 +295,144 @@ describe("runBtwSideQuestion", () => {
|
||||
).rejects.toThrow("No active session context.");
|
||||
});
|
||||
|
||||
it("uses active-run snapshot messages for BTW context while the main run is in flight", async () => {
|
||||
buildSessionContextMock.mockReturnValue({ messages: [] });
|
||||
getActiveEmbeddedRunSnapshotMock.mockReturnValue({
|
||||
transcriptLeafId: "assistant-1",
|
||||
messages: [
|
||||
{
|
||||
role: "user",
|
||||
content: [
|
||||
{ type: "text", text: "write some things then wait 30 seconds and write more" },
|
||||
],
|
||||
timestamp: 1,
|
||||
},
|
||||
],
|
||||
});
|
||||
streamSimpleMock.mockReturnValue(
|
||||
makeAsyncEvents([
|
||||
{
|
||||
type: "done",
|
||||
reason: "stop",
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "323" }],
|
||||
provider: "anthropic",
|
||||
api: "anthropic-messages",
|
||||
model: "claude-sonnet-4-5",
|
||||
stopReason: "stop",
|
||||
usage: {
|
||||
input: 1,
|
||||
output: 2,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
totalTokens: 3,
|
||||
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
||||
},
|
||||
timestamp: Date.now(),
|
||||
},
|
||||
},
|
||||
]),
|
||||
);
|
||||
|
||||
const result = await runBtwSideQuestion({
|
||||
cfg: {} as never,
|
||||
agentDir: "/tmp/agent",
|
||||
provider: "anthropic",
|
||||
model: "claude-sonnet-4-5",
|
||||
question: "What is 17 * 19?",
|
||||
sessionEntry: createSessionEntry(),
|
||||
resolvedReasoningLevel: "off",
|
||||
opts: {},
|
||||
isNewSession: false,
|
||||
});
|
||||
|
||||
expect(result).toEqual({ text: "323" });
|
||||
expect(streamSimpleMock).toHaveBeenCalledWith(
|
||||
expect.anything(),
|
||||
expect.objectContaining({
|
||||
systemPrompt: expect.stringContaining("ephemeral /btw side question"),
|
||||
messages: expect.arrayContaining([
|
||||
expect.objectContaining({ role: "user" }),
|
||||
expect.objectContaining({
|
||||
role: "user",
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: expect.stringContaining(
|
||||
"<btw_side_question>\nWhat is 17 * 19?\n</btw_side_question>",
|
||||
),
|
||||
},
|
||||
],
|
||||
}),
|
||||
]),
|
||||
}),
|
||||
expect.anything(),
|
||||
);
|
||||
});
|
||||
|
||||
it("wraps the side question so the model does not treat it as a main-task continuation", async () => {
|
||||
streamSimpleMock.mockReturnValue(
|
||||
makeAsyncEvents([
|
||||
{
|
||||
type: "done",
|
||||
reason: "stop",
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "About 93 million miles." }],
|
||||
provider: "anthropic",
|
||||
api: "anthropic-messages",
|
||||
model: "claude-sonnet-4-5",
|
||||
stopReason: "stop",
|
||||
usage: {
|
||||
input: 1,
|
||||
output: 2,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
totalTokens: 3,
|
||||
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
||||
},
|
||||
timestamp: Date.now(),
|
||||
},
|
||||
},
|
||||
]),
|
||||
);
|
||||
|
||||
await runBtwSideQuestion({
|
||||
cfg: {} as never,
|
||||
agentDir: "/tmp/agent",
|
||||
provider: "anthropic",
|
||||
model: "claude-sonnet-4-5",
|
||||
question: "what is the distance to the sun?",
|
||||
sessionEntry: createSessionEntry(),
|
||||
resolvedReasoningLevel: "off",
|
||||
opts: {},
|
||||
isNewSession: false,
|
||||
});
|
||||
|
||||
const [, context] = streamSimpleMock.mock.calls[0] ?? [];
|
||||
expect(context).toMatchObject({
|
||||
systemPrompt: expect.stringContaining(
|
||||
"Do not continue, resume, or complete any unfinished task",
|
||||
),
|
||||
});
|
||||
expect(context).toMatchObject({
|
||||
messages: expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
role: "user",
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: expect.stringContaining(
|
||||
"Ignore any unfinished task in the conversation while answering it.",
|
||||
),
|
||||
},
|
||||
],
|
||||
}),
|
||||
]),
|
||||
});
|
||||
});
|
||||
|
||||
it("branches away from an unresolved trailing user turn before building BTW context", async () => {
|
||||
getLeafEntryMock.mockReturnValue({
|
||||
type: "message",
|
||||
@@ -487,4 +642,77 @@ describe("runBtwSideQuestion", () => {
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
it("excludes tool results from BTW context to avoid replaying raw tool output", async () => {
|
||||
getActiveEmbeddedRunSnapshotMock.mockReturnValue({
|
||||
transcriptLeafId: "assistant-1",
|
||||
messages: [
|
||||
{
|
||||
role: "user",
|
||||
content: [{ type: "text", text: "seed" }],
|
||||
timestamp: 1,
|
||||
},
|
||||
{
|
||||
role: "toolResult",
|
||||
content: [{ type: "text", text: "sensitive tool output" }],
|
||||
details: { raw: "secret" },
|
||||
timestamp: 2,
|
||||
},
|
||||
{
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "done" }],
|
||||
timestamp: 3,
|
||||
},
|
||||
],
|
||||
});
|
||||
streamSimpleMock.mockReturnValue(
|
||||
makeAsyncEvents([
|
||||
{
|
||||
type: "done",
|
||||
reason: "stop",
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "323" }],
|
||||
provider: "anthropic",
|
||||
api: "anthropic-messages",
|
||||
model: "claude-sonnet-4-5",
|
||||
stopReason: "stop",
|
||||
usage: {
|
||||
input: 1,
|
||||
output: 2,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
totalTokens: 3,
|
||||
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
||||
},
|
||||
timestamp: Date.now(),
|
||||
},
|
||||
},
|
||||
]),
|
||||
);
|
||||
|
||||
await runBtwSideQuestion({
|
||||
cfg: {} as never,
|
||||
agentDir: "/tmp/agent",
|
||||
provider: "anthropic",
|
||||
model: "claude-sonnet-4-5",
|
||||
question: "What is 17 * 19?",
|
||||
sessionEntry: createSessionEntry(),
|
||||
resolvedReasoningLevel: "off",
|
||||
opts: {},
|
||||
isNewSession: false,
|
||||
});
|
||||
|
||||
const [, context] = streamSimpleMock.mock.calls[0] ?? [];
|
||||
expect(context).toMatchObject({
|
||||
messages: [
|
||||
expect.objectContaining({ role: "user" }),
|
||||
expect.objectContaining({ role: "assistant" }),
|
||||
expect.objectContaining({ role: "user" }),
|
||||
],
|
||||
});
|
||||
expect((context as { messages?: Array<{ role?: string }> }).messages).not.toEqual(
|
||||
expect.arrayContaining([expect.objectContaining({ role: "toolResult" })]),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -16,6 +16,8 @@ import {
|
||||
type SessionEntry,
|
||||
} from "../config/sessions.js";
|
||||
import { diagnosticLogger as diag } from "../logging/diagnostic.js";
|
||||
import { appendSessionSideResult } from "../sessions/side-results.js";
|
||||
import { stripToolResultDetails } from "./session-transcript-repair.js";
|
||||
import { resolveSessionAuthProfileOverride } from "./auth-profiles/session-override.js";
|
||||
import { getApiKeyForModel, requireApiKey } from "./model-auth.js";
|
||||
import { ensureOpenClawModelsJson } from "./models-config.js";
|
||||
@@ -60,6 +62,20 @@ type BtwCustomEntryData = {
|
||||
usage?: unknown;
|
||||
};
|
||||
|
||||
type BtwSideResultData = {
|
||||
timestamp: number;
|
||||
question: string;
|
||||
answer: string;
|
||||
provider: string;
|
||||
model: string;
|
||||
thinkingLevel: ThinkLevel | "off";
|
||||
reasoningLevel: ReasoningLevel;
|
||||
sessionKey?: string;
|
||||
authProfileId?: string;
|
||||
authProfileIdSource?: "auto" | "user";
|
||||
usage?: unknown;
|
||||
};
|
||||
|
||||
async function appendBtwCustomEntry(params: {
|
||||
sessionFile: string;
|
||||
timeoutMs: number;
|
||||
@@ -78,6 +94,26 @@ async function appendBtwCustomEntry(params: {
|
||||
}
|
||||
}
|
||||
|
||||
function appendBtwSideResult(params: { sessionFile: string; entry: BtwSideResultData }) {
|
||||
appendSessionSideResult({
|
||||
transcriptPath: params.sessionFile,
|
||||
result: {
|
||||
kind: "btw",
|
||||
question: params.entry.question,
|
||||
text: params.entry.answer,
|
||||
ts: params.entry.timestamp,
|
||||
provider: params.entry.provider,
|
||||
model: params.entry.model,
|
||||
thinkingLevel: params.entry.thinkingLevel,
|
||||
reasoningLevel: params.entry.reasoningLevel,
|
||||
sessionKey: params.entry.sessionKey,
|
||||
authProfileId: params.entry.authProfileId,
|
||||
authProfileIdSource: params.entry.authProfileIdSource,
|
||||
usage: params.entry.usage,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function isSessionLockError(error: unknown): boolean {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
return message.includes("session file locked");
|
||||
@@ -117,14 +153,39 @@ function collectThinkingContent(content: Array<{ type?: string; thinking?: strin
|
||||
.join("");
|
||||
}
|
||||
|
||||
function buildBtwSystemPrompt(): string {
|
||||
return [
|
||||
"You are answering an ephemeral /btw side question about the current conversation.",
|
||||
"Use the conversation only as background context.",
|
||||
"Answer only the side question in the last user message.",
|
||||
"Do not continue, resume, or complete any unfinished task from the conversation.",
|
||||
"Do not emit tool calls, pseudo-tool calls, shell commands, file writes, patches, or code unless the side question explicitly asks for them.",
|
||||
"Do not say you will continue the main task after answering.",
|
||||
"If the question can be answered briefly, answer briefly.",
|
||||
].join("\n");
|
||||
}
|
||||
|
||||
function buildBtwQuestionPrompt(question: string): string {
|
||||
return [
|
||||
"Answer this side question only.",
|
||||
"Ignore any unfinished task in the conversation while answering it.",
|
||||
"",
|
||||
"<btw_side_question>",
|
||||
question.trim(),
|
||||
"</btw_side_question>",
|
||||
].join("\n");
|
||||
}
|
||||
|
||||
function toSimpleContextMessages(messages: unknown[]): Message[] {
|
||||
return messages.filter((message): message is Message => {
|
||||
const contextMessages = messages.filter((message): message is Message => {
|
||||
if (!message || typeof message !== "object") {
|
||||
return false;
|
||||
}
|
||||
const role = (message as { role?: unknown }).role;
|
||||
return role === "user" || role === "assistant" || role === "toolResult";
|
||||
return role === "user" || role === "assistant";
|
||||
});
|
||||
return stripToolResultDetails(contextMessages as Parameters<typeof stripToolResultDetails>[0]) as
|
||||
Message[];
|
||||
}
|
||||
|
||||
function resolveSimpleThinkingLevel(level?: ThinkLevel): SimpleThinkingLevel | undefined {
|
||||
@@ -147,7 +208,10 @@ function resolveSessionTranscriptPath(params: {
|
||||
storePath: params.storePath,
|
||||
});
|
||||
return resolveSessionFilePath(params.sessionId, params.sessionEntry, pathOpts);
|
||||
} catch {
|
||||
} catch (error) {
|
||||
diag.debug(
|
||||
`resolveSessionTranscriptPath failed: sessionId=${params.sessionId} err=${String(error)}`,
|
||||
);
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
@@ -235,7 +299,10 @@ export async function runBtwSideQuestion(
|
||||
|
||||
const sessionManager = SessionManager.open(sessionFile) as SessionManagerLike;
|
||||
const activeRunSnapshot = getActiveEmbeddedRunSnapshot(sessionId);
|
||||
if (activeRunSnapshot) {
|
||||
let messages: Message[] = [];
|
||||
if (Array.isArray(activeRunSnapshot?.messages) && activeRunSnapshot.messages.length > 0) {
|
||||
messages = toSimpleContextMessages(activeRunSnapshot.messages);
|
||||
} else if (activeRunSnapshot) {
|
||||
if (activeRunSnapshot.transcriptLeafId && sessionManager.branch) {
|
||||
sessionManager.branch(activeRunSnapshot.transcriptLeafId);
|
||||
} else {
|
||||
@@ -251,10 +318,12 @@ export async function runBtwSideQuestion(
|
||||
}
|
||||
}
|
||||
}
|
||||
const sessionContext = sessionManager.buildSessionContext();
|
||||
const messages = toSimpleContextMessages(
|
||||
Array.isArray(sessionContext.messages) ? sessionContext.messages : [],
|
||||
);
|
||||
if (messages.length === 0) {
|
||||
const sessionContext = sessionManager.buildSessionContext();
|
||||
messages = toSimpleContextMessages(
|
||||
Array.isArray(sessionContext.messages) ? sessionContext.messages : [],
|
||||
);
|
||||
}
|
||||
if (messages.length === 0) {
|
||||
throw new Error("No active session context.");
|
||||
}
|
||||
@@ -304,11 +373,12 @@ export async function runBtwSideQuestion(
|
||||
const stream = streamSimple(
|
||||
model,
|
||||
{
|
||||
systemPrompt: buildBtwSystemPrompt(),
|
||||
messages: [
|
||||
...messages,
|
||||
{
|
||||
role: "user",
|
||||
content: [{ type: "text", text: params.question }],
|
||||
content: [{ type: "text", text: buildBtwQuestionPrompt(params.question) }],
|
||||
timestamp: Date.now(),
|
||||
},
|
||||
],
|
||||
@@ -400,6 +470,16 @@ export async function runBtwSideQuestion(
|
||||
usage: finalMessage?.usage,
|
||||
} satisfies BtwCustomEntryData;
|
||||
|
||||
try {
|
||||
appendBtwSideResult({
|
||||
sessionFile,
|
||||
entry: customEntry,
|
||||
});
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
diag.warn(`btw side-result persistence skipped: sessionId=${sessionId} err=${message}`);
|
||||
}
|
||||
|
||||
try {
|
||||
await appendBtwCustomEntry({
|
||||
sessionFile,
|
||||
|
||||
@@ -2377,10 +2377,8 @@ export async function runEmbeddedAttempt(
|
||||
`runId=${params.runId} sessionId=${params.sessionId}`,
|
||||
);
|
||||
}
|
||||
updateActiveEmbeddedRunSnapshot(params.sessionId, {
|
||||
transcriptLeafId:
|
||||
(sessionManager.getLeafEntry() as { id?: string } | null | undefined)?.id ?? null,
|
||||
});
|
||||
const transcriptLeafId =
|
||||
(sessionManager.getLeafEntry() as { id?: string } | null | undefined)?.id ?? null;
|
||||
|
||||
try {
|
||||
// Idempotent cleanup for legacy sessions with persisted image payloads.
|
||||
@@ -2459,6 +2457,19 @@ export async function runEmbeddedAttempt(
|
||||
});
|
||||
}
|
||||
|
||||
const btwSnapshotMessages = [
|
||||
...activeSession.messages,
|
||||
{
|
||||
role: "user",
|
||||
content: [{ type: "text", text: effectivePrompt }],
|
||||
timestamp: Date.now(),
|
||||
},
|
||||
];
|
||||
updateActiveEmbeddedRunSnapshot(params.sessionId, {
|
||||
transcriptLeafId,
|
||||
messages: btwSnapshotMessages,
|
||||
});
|
||||
|
||||
// Only pass images option if there are actually images to pass
|
||||
// This avoids potential issues with models that don't expect the images parameter
|
||||
if (imageResult.images.length > 0) {
|
||||
|
||||
@@ -151,9 +151,11 @@ describe("pi-embedded runner run registry", () => {
|
||||
setActiveEmbeddedRun("session-snapshot", handle);
|
||||
updateActiveEmbeddedRunSnapshot("session-snapshot", {
|
||||
transcriptLeafId: "assistant-1",
|
||||
messages: [{ role: "user", content: [{ type: "text", text: "hello" }], timestamp: 1 }],
|
||||
});
|
||||
expect(getActiveEmbeddedRunSnapshot("session-snapshot")).toEqual({
|
||||
transcriptLeafId: "assistant-1",
|
||||
messages: [{ role: "user", content: [{ type: "text", text: "hello" }], timestamp: 1 }],
|
||||
});
|
||||
|
||||
clearActiveEmbeddedRun("session-snapshot", handle);
|
||||
|
||||
@@ -14,6 +14,7 @@ type EmbeddedPiQueueHandle = {
|
||||
|
||||
export type ActiveEmbeddedRunSnapshot = {
|
||||
transcriptLeafId: string | null;
|
||||
messages?: unknown[];
|
||||
};
|
||||
|
||||
type EmbeddedRunWaiter = {
|
||||
|
||||
@@ -80,7 +80,7 @@ describe("handleBtwCommand", () => {
|
||||
);
|
||||
expect(result).toEqual({
|
||||
shouldContinue: false,
|
||||
reply: { text: "snapshot answer" },
|
||||
reply: { text: "snapshot answer", btw: { question: "what changed?" } },
|
||||
});
|
||||
});
|
||||
|
||||
@@ -104,7 +104,7 @@ describe("handleBtwCommand", () => {
|
||||
);
|
||||
expect(result).toEqual({
|
||||
shouldContinue: false,
|
||||
reply: { text: "nothing important" },
|
||||
reply: { text: "nothing important", btw: { question: "what changed?" } },
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -61,7 +61,7 @@ export const handleBtwCommand: CommandHandler = async (params, allowTextCommands
|
||||
});
|
||||
return {
|
||||
shouldContinue: false,
|
||||
reply,
|
||||
reply: reply ? { ...reply, btw: { question } } : reply,
|
||||
};
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message.trim() : "";
|
||||
@@ -69,6 +69,8 @@ export const handleBtwCommand: CommandHandler = async (params, allowTextCommands
|
||||
shouldContinue: false,
|
||||
reply: {
|
||||
text: `⚠️ /btw failed${message ? `: ${message}` : "."}`,
|
||||
btw: { question },
|
||||
isError: true,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import type { SkillCommandSpec } from "../../agents/skills.js";
|
||||
import type { BlockReplyChunking } from "../../agents/pi-embedded-block-chunker.js";
|
||||
import type { ChannelId } from "../../channels/plugins/types.js";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import type { SessionEntry, SessionScope } from "../../config/sessions.js";
|
||||
@@ -50,12 +51,7 @@ export type HandleCommandsParams = {
|
||||
resolvedVerboseLevel: VerboseLevel;
|
||||
resolvedReasoningLevel: ReasoningLevel;
|
||||
resolvedElevatedLevel?: ElevatedLevel;
|
||||
blockReplyChunking?: {
|
||||
minChars: number;
|
||||
maxChars: number;
|
||||
breakPreference: "paragraph" | "newline" | "sentence";
|
||||
flushOnParagraph?: boolean;
|
||||
};
|
||||
blockReplyChunking?: BlockReplyChunking;
|
||||
resolvedBlockStreamingBreak?: "text_end" | "message_end";
|
||||
resolveDefaultThinkingLevel: () => Promise<ThinkLevel | undefined>;
|
||||
provider: string;
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { collectTextContentBlocks } from "../../agents/content-blocks.js";
|
||||
import type { BlockReplyChunking } from "../../agents/pi-embedded-block-chunker.js";
|
||||
import { createOpenClawTools } from "../../agents/openclaw-tools.js";
|
||||
import type { SkillCommandSpec } from "../../agents/skills.js";
|
||||
import { applyOwnerOnlyToolPolicy } from "../../agents/tool-policy.js";
|
||||
@@ -114,12 +115,7 @@ export async function handleInlineActions(params: {
|
||||
resolvedVerboseLevel: VerboseLevel | undefined;
|
||||
resolvedReasoningLevel: ReasoningLevel;
|
||||
resolvedElevatedLevel: ElevatedLevel;
|
||||
blockReplyChunking?: {
|
||||
minChars: number;
|
||||
maxChars: number;
|
||||
breakPreference: "paragraph" | "newline" | "sentence";
|
||||
flushOnParagraph?: boolean;
|
||||
};
|
||||
blockReplyChunking?: BlockReplyChunking;
|
||||
resolvedBlockStreamingBreak?: "text_end" | "message_end";
|
||||
resolveDefaultThinkingLevel: Awaited<
|
||||
ReturnType<typeof createModelSelectionState>
|
||||
|
||||
@@ -10,6 +10,17 @@ import type { ReplyPayload } from "../types.js";
|
||||
import { extractReplyToTag } from "./reply-tags.js";
|
||||
import { createReplyToModeFilterForChannel } from "./reply-threading.js";
|
||||
|
||||
export function formatBtwTextForExternalDelivery(payload: ReplyPayload): string | undefined {
|
||||
const text = payload.text?.trim();
|
||||
if (!text) {
|
||||
return payload.text;
|
||||
}
|
||||
if (!payload.btw?.question?.trim()) {
|
||||
return payload.text;
|
||||
}
|
||||
return text.startsWith("BTW:") ? text : `BTW: ${text}`;
|
||||
}
|
||||
|
||||
function resolveReplyThreadingForPayload(params: {
|
||||
payload: ReplyPayload;
|
||||
implicitReplyToId?: string;
|
||||
|
||||
@@ -294,6 +294,36 @@ describe("routeReply", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("prefixes BTW replies on routed sends", async () => {
|
||||
mocks.sendMessageSlack.mockClear();
|
||||
await routeReply({
|
||||
payload: { text: "323", btw: { question: "what is 17 * 19?" } },
|
||||
channel: "slack",
|
||||
to: "channel:C123",
|
||||
cfg: {} as never,
|
||||
});
|
||||
expect(mocks.sendMessageSlack).toHaveBeenCalledWith(
|
||||
"channel:C123",
|
||||
"BTW: 323",
|
||||
expect.any(Object),
|
||||
);
|
||||
});
|
||||
|
||||
it("prefixes BTW replies on routed discord sends", async () => {
|
||||
mocks.sendMessageDiscord.mockClear();
|
||||
await routeReply({
|
||||
payload: { text: "323", btw: { question: "what is 17 * 19?" } },
|
||||
channel: "discord",
|
||||
to: "channel:123456",
|
||||
cfg: {} as never,
|
||||
});
|
||||
expect(mocks.sendMessageDiscord).toHaveBeenCalledWith(
|
||||
"channel:123456",
|
||||
"BTW: 323",
|
||||
expect.any(Object),
|
||||
);
|
||||
});
|
||||
|
||||
it("passes replyToId to Telegram sends", async () => {
|
||||
mocks.sendMessageTelegram.mockClear();
|
||||
await routeReply({
|
||||
|
||||
@@ -18,7 +18,10 @@ import { INTERNAL_MESSAGE_CHANNEL, normalizeMessageChannel } from "../../utils/m
|
||||
import type { OriginatingChannelType } from "../templating.js";
|
||||
import type { ReplyPayload } from "../types.js";
|
||||
import { normalizeReplyPayload } from "./normalize-reply.js";
|
||||
import { shouldSuppressReasoningPayload } from "./reply-payloads.js";
|
||||
import {
|
||||
formatBtwTextForExternalDelivery,
|
||||
shouldSuppressReasoningPayload,
|
||||
} from "./reply-payloads.js";
|
||||
|
||||
let deliverRuntimePromise: Promise<
|
||||
typeof import("../../infra/outbound/deliver-runtime.js")
|
||||
@@ -102,24 +105,28 @@ export async function routeReply(params: RouteReplyParams): Promise<RouteReplyRe
|
||||
if (!normalized) {
|
||||
return { ok: true };
|
||||
}
|
||||
const externalPayload: ReplyPayload = {
|
||||
...normalized,
|
||||
text: formatBtwTextForExternalDelivery(normalized),
|
||||
};
|
||||
|
||||
let text = normalized.text ?? "";
|
||||
let mediaUrls = (normalized.mediaUrls?.filter(Boolean) ?? []).length
|
||||
? (normalized.mediaUrls?.filter(Boolean) as string[])
|
||||
: normalized.mediaUrl
|
||||
? [normalized.mediaUrl]
|
||||
let text = externalPayload.text ?? "";
|
||||
let mediaUrls = (externalPayload.mediaUrls?.filter(Boolean) ?? []).length
|
||||
? (externalPayload.mediaUrls?.filter(Boolean) as string[])
|
||||
: externalPayload.mediaUrl
|
||||
? [externalPayload.mediaUrl]
|
||||
: [];
|
||||
const replyToId = normalized.replyToId;
|
||||
const replyToId = externalPayload.replyToId;
|
||||
let hasSlackBlocks = false;
|
||||
if (
|
||||
channel === "slack" &&
|
||||
normalized.channelData?.slack &&
|
||||
typeof normalized.channelData.slack === "object" &&
|
||||
!Array.isArray(normalized.channelData.slack)
|
||||
externalPayload.channelData?.slack &&
|
||||
typeof externalPayload.channelData.slack === "object" &&
|
||||
!Array.isArray(externalPayload.channelData.slack)
|
||||
) {
|
||||
try {
|
||||
hasSlackBlocks = Boolean(
|
||||
parseSlackBlocksInput((normalized.channelData.slack as { blocks?: unknown }).blocks)
|
||||
parseSlackBlocksInput((externalPayload.channelData.slack as { blocks?: unknown }).blocks)
|
||||
?.length,
|
||||
);
|
||||
} catch {
|
||||
@@ -168,7 +175,7 @@ export async function routeReply(params: RouteReplyParams): Promise<RouteReplyRe
|
||||
channel: channelId,
|
||||
to,
|
||||
accountId: accountId ?? undefined,
|
||||
payloads: [normalized],
|
||||
payloads: [externalPayload],
|
||||
replyToId: resolvedReplyToId ?? null,
|
||||
threadId: resolvedThreadId,
|
||||
session: outboundSession,
|
||||
|
||||
@@ -76,6 +76,9 @@ export type ReplyPayload = {
|
||||
text?: string;
|
||||
mediaUrl?: string;
|
||||
mediaUrls?: string[];
|
||||
btw?: {
|
||||
question: string;
|
||||
};
|
||||
replyToId?: string;
|
||||
replyToTag?: boolean;
|
||||
/** True when [[reply_to_current]] was present but not yet mapped to a message id. */
|
||||
|
||||
@@ -8,6 +8,7 @@ import { resolveOAuthDir, resolveStateDir } from "../config/paths.js";
|
||||
import {
|
||||
formatSessionArchiveTimestamp,
|
||||
isPrimarySessionTranscriptFileName,
|
||||
isSessionSideResultsArtifactName,
|
||||
loadSessionStore,
|
||||
resolveMainSessionKey,
|
||||
resolveSessionFilePath,
|
||||
@@ -17,6 +18,7 @@ import {
|
||||
} from "../config/sessions.js";
|
||||
import { resolveRequiredHomeDir } from "../infra/home-dir.js";
|
||||
import { parseAgentSessionKey } from "../sessions/session-key-utils.js";
|
||||
import { resolveSessionSideResultsPathFromTranscript } from "../sessions/side-results.js";
|
||||
import { note } from "../terminal/note.js";
|
||||
import { shortenHomePath } from "../utils.js";
|
||||
|
||||
@@ -757,8 +759,12 @@ export async function noteStateIntegrity(
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
const transcriptPath = path.resolve(
|
||||
resolveSessionFilePath(entry.sessionId, entry, sessionPathOpts),
|
||||
);
|
||||
referencedTranscriptPaths.add(transcriptPath);
|
||||
referencedTranscriptPaths.add(
|
||||
path.resolve(resolveSessionFilePath(entry.sessionId, entry, sessionPathOpts)),
|
||||
path.resolve(resolveSessionSideResultsPathFromTranscript(transcriptPath)),
|
||||
);
|
||||
} catch {
|
||||
// ignore invalid legacy paths
|
||||
@@ -766,7 +772,12 @@ export async function noteStateIntegrity(
|
||||
}
|
||||
const sessionDirEntries = fs.readdirSync(sessionsDir, { withFileTypes: true });
|
||||
const orphanTranscriptPaths = sessionDirEntries
|
||||
.filter((entry) => entry.isFile() && isPrimarySessionTranscriptFileName(entry.name))
|
||||
.filter(
|
||||
(entry) =>
|
||||
entry.isFile() &&
|
||||
(isPrimarySessionTranscriptFileName(entry.name) ||
|
||||
isSessionSideResultsArtifactName(entry.name)),
|
||||
)
|
||||
.map((entry) => path.resolve(path.join(sessionsDir, entry.name)))
|
||||
.filter((filePath) => !referencedTranscriptPaths.has(filePath));
|
||||
if (orphanTranscriptPaths.length > 0) {
|
||||
|
||||
@@ -13,6 +13,7 @@ import {
|
||||
type SessionMaintenanceApplyReport,
|
||||
} from "../config/sessions.js";
|
||||
import type { RuntimeEnv } from "../runtime.js";
|
||||
import { resolveSessionSideResultsPathFromTranscript } from "../sessions/side-results.js";
|
||||
import { isRich, theme } from "../terminal/theme.js";
|
||||
import {
|
||||
resolveSessionStoreTargetsOrExit,
|
||||
@@ -147,6 +148,11 @@ function pruneMissingTranscriptEntries(params: {
|
||||
}
|
||||
const transcriptPath = resolveSessionFilePath(entry.sessionId, entry, sessionPathOpts);
|
||||
if (!fs.existsSync(transcriptPath)) {
|
||||
try {
|
||||
fs.rmSync(resolveSessionSideResultsPathFromTranscript(transcriptPath), { force: true });
|
||||
} catch {
|
||||
// Best-effort cleanup for orphan BTW sidecars when the primary transcript is gone.
|
||||
}
|
||||
delete params.store[key];
|
||||
removed += 1;
|
||||
params.onPruned?.(key);
|
||||
|
||||
@@ -3,6 +3,7 @@ import {
|
||||
formatSessionArchiveTimestamp,
|
||||
isPrimarySessionTranscriptFileName,
|
||||
isSessionArchiveArtifactName,
|
||||
isSessionSideResultsArtifactName,
|
||||
parseSessionArchiveTimestamp,
|
||||
} from "./artifacts.js";
|
||||
|
||||
@@ -19,12 +20,21 @@ describe("session artifact helpers", () => {
|
||||
it("classifies primary transcript files", () => {
|
||||
expect(isPrimarySessionTranscriptFileName("abc.jsonl")).toBe(true);
|
||||
expect(isPrimarySessionTranscriptFileName("keep.deleted.keep.jsonl")).toBe(true);
|
||||
expect(isPrimarySessionTranscriptFileName("abc.side-results.jsonl")).toBe(false);
|
||||
expect(isPrimarySessionTranscriptFileName("abc.jsonl.deleted.2026-01-01T00-00-00.000Z")).toBe(
|
||||
false,
|
||||
);
|
||||
expect(isPrimarySessionTranscriptFileName("sessions.json")).toBe(false);
|
||||
});
|
||||
|
||||
it("classifies BTW side-result artifacts separately from transcripts", () => {
|
||||
expect(isSessionSideResultsArtifactName("abc.side-results.jsonl")).toBe(true);
|
||||
expect(
|
||||
isSessionSideResultsArtifactName("abc.side-results.jsonl.deleted.2026-01-01T00-00-00.000Z"),
|
||||
).toBe(true);
|
||||
expect(isSessionSideResultsArtifactName("abc.jsonl")).toBe(false);
|
||||
});
|
||||
|
||||
it("formats and parses archive timestamps", () => {
|
||||
const now = Date.parse("2026-02-23T12:34:56.000Z");
|
||||
const stamp = formatSessionArchiveTimestamp(now);
|
||||
|
||||
@@ -24,6 +24,10 @@ export function isSessionArchiveArtifactName(fileName: string): boolean {
|
||||
);
|
||||
}
|
||||
|
||||
export function isSessionSideResultsArtifactName(fileName: string): boolean {
|
||||
return fileName.endsWith(".side-results.jsonl") || fileName.includes(".side-results.jsonl.");
|
||||
}
|
||||
|
||||
export function isPrimarySessionTranscriptFileName(fileName: string): boolean {
|
||||
if (fileName === "sessions.json") {
|
||||
return false;
|
||||
@@ -31,6 +35,9 @@ export function isPrimarySessionTranscriptFileName(fileName: string): boolean {
|
||||
if (!fileName.endsWith(".jsonl")) {
|
||||
return false;
|
||||
}
|
||||
if (isSessionSideResultsArtifactName(fileName)) {
|
||||
return false;
|
||||
}
|
||||
return !isSessionArchiveArtifactName(fileName);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import { isPrimarySessionTranscriptFileName, isSessionArchiveArtifactName } from "./artifacts.js";
|
||||
import { resolveSessionSideResultsPathFromTranscript } from "../../sessions/side-results.js";
|
||||
import {
|
||||
isPrimarySessionTranscriptFileName,
|
||||
isSessionArchiveArtifactName,
|
||||
isSessionSideResultsArtifactName,
|
||||
} from "./artifacts.js";
|
||||
import { resolveSessionFilePath } from "./paths.js";
|
||||
import type { SessionEntry } from "./types.js";
|
||||
|
||||
@@ -123,6 +128,9 @@ function resolveReferencedSessionTranscriptPaths(params: {
|
||||
});
|
||||
if (resolved) {
|
||||
referenced.add(canonicalizePathForComparison(resolved));
|
||||
referenced.add(
|
||||
canonicalizePathForComparison(resolveSessionSideResultsPathFromTranscript(resolved)),
|
||||
);
|
||||
}
|
||||
}
|
||||
return referenced;
|
||||
@@ -255,7 +263,9 @@ export async function enforceSessionDiskBudget(params: {
|
||||
.filter(
|
||||
(file) =>
|
||||
isSessionArchiveArtifactName(file.name) ||
|
||||
(isPrimarySessionTranscriptFileName(file.name) && !referencedPaths.has(file.canonicalPath)),
|
||||
((isPrimarySessionTranscriptFileName(file.name) ||
|
||||
isSessionSideResultsArtifactName(file.name)) &&
|
||||
!referencedPaths.has(file.canonicalPath)),
|
||||
)
|
||||
.toSorted((a, b) => a.mtimeMs - b.mtimeMs);
|
||||
for (const file of removableFileQueue) {
|
||||
@@ -336,6 +346,18 @@ export async function enforceSessionDiskBudget(params: {
|
||||
total -= deletedBytes;
|
||||
freedBytes += deletedBytes;
|
||||
removedFiles += 1;
|
||||
const sideResultsPath = resolveSessionSideResultsPathFromTranscript(transcriptPath);
|
||||
const deletedSideResultsBytes = await removeFileForBudget({
|
||||
filePath: sideResultsPath,
|
||||
dryRun,
|
||||
fileSizesByPath,
|
||||
simulatedRemovedPaths,
|
||||
});
|
||||
if (deletedSideResultsBytes > 0) {
|
||||
total -= deletedSideResultsBytes;
|
||||
freedBytes += deletedSideResultsBytes;
|
||||
removedFiles += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import { dispatchInboundMessage } from "../../auto-reply/dispatch.js";
|
||||
import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js";
|
||||
import type { MsgContext } from "../../auto-reply/templating.js";
|
||||
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js";
|
||||
import type { ReplyPayload } from "../../auto-reply/types.js";
|
||||
import { createReplyPrefixOptions } from "../../channels/reply-prefix.js";
|
||||
import { resolveSessionFilePath } from "../../config/sessions.js";
|
||||
import { jsonUtf8Bytes } from "../../infra/json-utf8-bytes.js";
|
||||
@@ -55,6 +56,7 @@ import {
|
||||
capArrayByJsonBytes,
|
||||
loadSessionEntry,
|
||||
readSessionMessages,
|
||||
readSessionSideResults,
|
||||
resolveSessionModelRef,
|
||||
} from "../session-utils.js";
|
||||
import { formatForLog } from "../ws-log.js";
|
||||
@@ -130,6 +132,16 @@ type ChatSendOriginatingRoute = {
|
||||
explicitDeliverRoute: boolean;
|
||||
};
|
||||
|
||||
type SideResultPayload = {
|
||||
kind: "btw";
|
||||
runId: string;
|
||||
sessionKey: string;
|
||||
question: string;
|
||||
text: string;
|
||||
isError?: boolean;
|
||||
ts: number;
|
||||
};
|
||||
|
||||
function resolveChatSendOriginatingRoute(params: {
|
||||
client?: { mode?: string | null; id?: string | null } | null;
|
||||
deliver?: boolean;
|
||||
@@ -900,6 +912,34 @@ function broadcastChatFinal(params: {
|
||||
params.context.agentRunSeq.delete(params.runId);
|
||||
}
|
||||
|
||||
function isBtwReplyPayload(payload: ReplyPayload | undefined): payload is ReplyPayload & {
|
||||
btw: { question: string };
|
||||
text: string;
|
||||
} {
|
||||
return (
|
||||
typeof payload?.btw?.question === "string" &&
|
||||
payload.btw.question.trim().length > 0 &&
|
||||
typeof payload.text === "string" &&
|
||||
payload.text.trim().length > 0
|
||||
);
|
||||
}
|
||||
|
||||
function broadcastSideResult(params: {
|
||||
context: Pick<GatewayRequestContext, "broadcast" | "nodeSendToSession" | "agentRunSeq">;
|
||||
payload: SideResultPayload;
|
||||
}) {
|
||||
const seq = nextChatSeq({ agentRunSeq: params.context.agentRunSeq }, params.payload.runId);
|
||||
params.context.broadcast("chat.side_result", {
|
||||
...params.payload,
|
||||
seq,
|
||||
});
|
||||
params.context.nodeSendToSession(params.payload.sessionKey, "chat.side_result", {
|
||||
...params.payload,
|
||||
seq,
|
||||
});
|
||||
params.context.agentRunSeq.delete(params.payload.runId);
|
||||
}
|
||||
|
||||
function broadcastChatError(params: {
|
||||
context: Pick<GatewayRequestContext, "broadcast" | "nodeSendToSession" | "agentRunSeq">;
|
||||
runId: string;
|
||||
@@ -940,6 +980,10 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
const sessionId = entry?.sessionId;
|
||||
const rawMessages =
|
||||
sessionId && storePath ? readSessionMessages(sessionId, storePath, entry?.sessionFile) : [];
|
||||
const sideResults =
|
||||
sessionId && storePath
|
||||
? readSessionSideResults(sessionId, storePath, entry?.sessionFile)
|
||||
: [];
|
||||
const hardMax = 1000;
|
||||
const defaultLimit = 200;
|
||||
const requested = typeof limit === "number" ? limit : defaultLimit;
|
||||
@@ -979,6 +1023,7 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
sessionKey,
|
||||
sessionId,
|
||||
messages: bounded.messages,
|
||||
sideResults,
|
||||
thinkingLevel,
|
||||
fastMode: entry?.fastMode,
|
||||
verboseLevel,
|
||||
@@ -1284,7 +1329,7 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
agentId,
|
||||
channel: INTERNAL_MESSAGE_CHANNEL,
|
||||
});
|
||||
const finalReplyParts: string[] = [];
|
||||
const finalReplies: ReplyPayload[] = [];
|
||||
const dispatcher = createReplyDispatcher({
|
||||
...prefixOptions,
|
||||
onError: (err) => {
|
||||
@@ -1294,11 +1339,7 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
if (info.kind !== "final") {
|
||||
return;
|
||||
}
|
||||
const text = payload.text?.trim() ?? "";
|
||||
if (!text) {
|
||||
return;
|
||||
}
|
||||
finalReplyParts.push(text);
|
||||
finalReplies.push(payload);
|
||||
},
|
||||
});
|
||||
|
||||
@@ -1335,48 +1376,64 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
})
|
||||
.then(() => {
|
||||
if (!agentRunStarted) {
|
||||
const combinedReply = finalReplyParts
|
||||
.map((part) => part.trim())
|
||||
.filter(Boolean)
|
||||
.join("\n\n")
|
||||
.trim();
|
||||
let message: Record<string, unknown> | undefined;
|
||||
if (combinedReply) {
|
||||
const { storePath: latestStorePath, entry: latestEntry } =
|
||||
loadSessionEntry(sessionKey);
|
||||
const sessionId = latestEntry?.sessionId ?? entry?.sessionId ?? clientRunId;
|
||||
const appended = appendAssistantTranscriptMessage({
|
||||
message: combinedReply,
|
||||
sessionId,
|
||||
storePath: latestStorePath,
|
||||
sessionFile: latestEntry?.sessionFile,
|
||||
agentId,
|
||||
createIfMissing: true,
|
||||
const btwReply = finalReplies.length === 1 ? finalReplies[0] : undefined;
|
||||
if (isBtwReplyPayload(btwReply)) {
|
||||
broadcastSideResult({
|
||||
context,
|
||||
payload: {
|
||||
kind: "btw",
|
||||
runId: clientRunId,
|
||||
sessionKey: rawSessionKey,
|
||||
question: btwReply.btw.question.trim(),
|
||||
text: btwReply.text.trim(),
|
||||
isError: btwReply.isError,
|
||||
ts: Date.now(),
|
||||
},
|
||||
});
|
||||
if (appended.ok) {
|
||||
message = appended.message;
|
||||
} else {
|
||||
context.logGateway.warn(
|
||||
`webchat transcript append failed: ${appended.error ?? "unknown error"}`,
|
||||
);
|
||||
const now = Date.now();
|
||||
message = {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: combinedReply }],
|
||||
timestamp: now,
|
||||
// Keep this compatible with Pi stopReason enums even though this message isn't
|
||||
// persisted to the transcript due to the append failure.
|
||||
stopReason: "stop",
|
||||
usage: { input: 0, output: 0, totalTokens: 0 },
|
||||
};
|
||||
} else {
|
||||
const combinedReply = finalReplies
|
||||
.map((part) => part.text?.trim() ?? "")
|
||||
.filter(Boolean)
|
||||
.join("\n\n")
|
||||
.trim();
|
||||
let message: Record<string, unknown> | undefined;
|
||||
if (combinedReply) {
|
||||
const { storePath: latestStorePath, entry: latestEntry } =
|
||||
loadSessionEntry(sessionKey);
|
||||
const sessionId = latestEntry?.sessionId ?? entry?.sessionId ?? clientRunId;
|
||||
const appended = appendAssistantTranscriptMessage({
|
||||
message: combinedReply,
|
||||
sessionId,
|
||||
storePath: latestStorePath,
|
||||
sessionFile: latestEntry?.sessionFile,
|
||||
agentId,
|
||||
createIfMissing: true,
|
||||
});
|
||||
if (appended.ok) {
|
||||
message = appended.message;
|
||||
} else {
|
||||
context.logGateway.warn(
|
||||
`webchat transcript append failed: ${appended.error ?? "unknown error"}`,
|
||||
);
|
||||
const now = Date.now();
|
||||
message = {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: combinedReply }],
|
||||
timestamp: now,
|
||||
// Keep this compatible with Pi stopReason enums even though this message isn't
|
||||
// persisted to the transcript due to the append failure.
|
||||
stopReason: "stop",
|
||||
usage: { input: 0, output: 0, totalTokens: 0 },
|
||||
};
|
||||
}
|
||||
}
|
||||
broadcastChatFinal({
|
||||
context,
|
||||
runId: clientRunId,
|
||||
sessionKey: rawSessionKey,
|
||||
message,
|
||||
});
|
||||
}
|
||||
broadcastChatFinal({
|
||||
context,
|
||||
runId: clientRunId,
|
||||
sessionKey: rawSessionKey,
|
||||
message,
|
||||
});
|
||||
}
|
||||
setGatewayDedupeEntry({
|
||||
dedupe: context.dedupe,
|
||||
|
||||
@@ -4,6 +4,7 @@ import path from "node:path";
|
||||
import { describe, expect, test, vi } from "vitest";
|
||||
import { WebSocket } from "ws";
|
||||
import { emitAgentEvent, registerAgentRunContext } from "../infra/agent-events.js";
|
||||
import { resolveSessionSideResultsPathFromTranscript } from "../sessions/side-results.js";
|
||||
import { extractFirstTextBlock } from "../shared/chat-message-content.js";
|
||||
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js";
|
||||
import {
|
||||
@@ -497,6 +498,121 @@ describe("gateway server chat", () => {
|
||||
});
|
||||
});
|
||||
|
||||
test("routes /btw replies through side-result events without transcript injection", async () => {
|
||||
await withMainSessionStore(async () => {
|
||||
const replyMock = vi.mocked(getReplyFromConfig);
|
||||
replyMock.mockResolvedValueOnce({
|
||||
text: "323",
|
||||
btw: { question: "what is 17 * 19?" },
|
||||
});
|
||||
const sideResultPromise = onceMessage(
|
||||
ws,
|
||||
(o) =>
|
||||
o.type === "event" &&
|
||||
o.event === "chat.side_result" &&
|
||||
o.payload?.kind === "btw" &&
|
||||
o.payload?.runId === "idem-btw-1",
|
||||
8000,
|
||||
);
|
||||
|
||||
const res = await rpcReq(ws, "chat.send", {
|
||||
sessionKey: "main",
|
||||
message: "/btw what is 17 * 19?",
|
||||
idempotencyKey: "idem-btw-1",
|
||||
});
|
||||
|
||||
expect(res.ok).toBe(true);
|
||||
const sideResult = await sideResultPromise;
|
||||
expect(sideResult.payload).toMatchObject({
|
||||
kind: "btw",
|
||||
runId: "idem-btw-1",
|
||||
sessionKey: "main",
|
||||
question: "what is 17 * 19?",
|
||||
text: "323",
|
||||
});
|
||||
|
||||
const historyRes = await rpcReq<{ messages?: unknown[] }>(ws, "chat.history", {
|
||||
sessionKey: "main",
|
||||
});
|
||||
expect(historyRes.ok).toBe(true);
|
||||
expect(historyRes.payload?.messages ?? []).toEqual([]);
|
||||
});
|
||||
});
|
||||
|
||||
test("chat.history returns BTW side results separately from normal messages", async () => {
|
||||
await withMainSessionStore(async (dir) => {
|
||||
const transcriptPath = path.join(dir, "sess-main.jsonl");
|
||||
await fs.writeFile(
|
||||
transcriptPath,
|
||||
[
|
||||
JSON.stringify({
|
||||
type: "custom",
|
||||
customType: "openclaw:btw",
|
||||
data: {
|
||||
timestamp: 123,
|
||||
question: "what changed?",
|
||||
answer: "nothing important",
|
||||
},
|
||||
}),
|
||||
].join("\n"),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
const historyRes = await rpcReq<{ messages?: unknown[]; sideResults?: unknown[] }>(
|
||||
ws,
|
||||
"chat.history",
|
||||
{
|
||||
sessionKey: "main",
|
||||
},
|
||||
);
|
||||
expect(historyRes.ok).toBe(true);
|
||||
expect(historyRes.payload?.messages ?? []).toEqual([]);
|
||||
expect(historyRes.payload?.sideResults ?? []).toEqual([
|
||||
{
|
||||
kind: "btw",
|
||||
question: "what changed?",
|
||||
text: "nothing important",
|
||||
ts: 123,
|
||||
},
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
test("chat.history replays BTW side results from the sidecar file after transcript compaction", async () => {
|
||||
await withMainSessionStore(async (dir) => {
|
||||
const transcriptPath = path.join(dir, "sess-main.jsonl");
|
||||
await fs.writeFile(transcriptPath, JSON.stringify({ type: "session", version: 1 }) + "\n");
|
||||
await fs.writeFile(
|
||||
resolveSessionSideResultsPathFromTranscript(transcriptPath),
|
||||
JSON.stringify({
|
||||
kind: "btw",
|
||||
question: "what changed?",
|
||||
text: "still working",
|
||||
ts: 456,
|
||||
}) + "\n",
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
const historyRes = await rpcReq<{ messages?: unknown[]; sideResults?: unknown[] }>(
|
||||
ws,
|
||||
"chat.history",
|
||||
{
|
||||
sessionKey: "main",
|
||||
},
|
||||
);
|
||||
expect(historyRes.ok).toBe(true);
|
||||
expect(historyRes.payload?.messages ?? []).toEqual([]);
|
||||
expect(historyRes.payload?.sideResults ?? []).toEqual([
|
||||
{
|
||||
kind: "btw",
|
||||
question: "what changed?",
|
||||
text: "still working",
|
||||
ts: 456,
|
||||
},
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
test("chat.history hides assistant NO_REPLY-only entries and keeps mixed-content assistant entries", async () => {
|
||||
const historyMessages = await loadChatHistoryWithMessages(buildNoReplyHistoryFixture(true));
|
||||
const roleAndText = historyMessages
|
||||
|
||||
@@ -4,6 +4,7 @@ import path from "node:path";
|
||||
import { afterAll, beforeAll, beforeEach, describe, expect, test, vi } from "vitest";
|
||||
import { WebSocket } from "ws";
|
||||
import { DEFAULT_PROVIDER } from "../agents/defaults.js";
|
||||
import { resolveSessionSideResultsPathFromTranscript } from "../sessions/side-results.js";
|
||||
import { GATEWAY_CLIENT_IDS, GATEWAY_CLIENT_MODES } from "./protocol/client-info.js";
|
||||
import { startGatewayServerHarness, type GatewayServerHarness } from "./server.e2e-ws-harness.js";
|
||||
import { createToolSummaryPreviewTranscriptLines } from "./session-preview.test-helpers.js";
|
||||
@@ -752,6 +753,66 @@ describe("gateway server sessions", () => {
|
||||
ws.close();
|
||||
});
|
||||
|
||||
test("sessions.compact keeps BTW side results available via the sidecar file", async () => {
|
||||
const { dir } = await createSessionStoreDir();
|
||||
const sessionId = "sess-compact-btw";
|
||||
const transcriptPath = path.join(dir, `${sessionId}.jsonl`);
|
||||
await fs.writeFile(
|
||||
transcriptPath,
|
||||
[
|
||||
JSON.stringify({ type: "session", version: 1, id: sessionId }),
|
||||
JSON.stringify({ message: { role: "user", content: "hello" } }),
|
||||
JSON.stringify({ message: { role: "assistant", content: "hi" } }),
|
||||
JSON.stringify({
|
||||
type: "custom",
|
||||
customType: "openclaw:btw",
|
||||
data: { timestamp: 123, question: "what changed?", answer: "nothing important" },
|
||||
}),
|
||||
JSON.stringify({ message: { role: "user", content: "later" } }),
|
||||
JSON.stringify({ message: { role: "assistant", content: "done" } }),
|
||||
].join("\n"),
|
||||
"utf-8",
|
||||
);
|
||||
await fs.writeFile(
|
||||
resolveSessionSideResultsPathFromTranscript(transcriptPath),
|
||||
JSON.stringify({
|
||||
kind: "btw",
|
||||
question: "what changed?",
|
||||
text: "nothing important",
|
||||
ts: 123,
|
||||
}) + "\n",
|
||||
"utf-8",
|
||||
);
|
||||
await writeSessionStore({
|
||||
entries: {
|
||||
main: { sessionId, updatedAt: Date.now() },
|
||||
},
|
||||
});
|
||||
|
||||
const { ws } = await openClient();
|
||||
const compacted = await rpcReq<{ ok: true; compacted: boolean }>(ws, "sessions.compact", {
|
||||
key: "main",
|
||||
maxLines: 2,
|
||||
});
|
||||
expect(compacted.ok).toBe(true);
|
||||
expect(compacted.payload?.compacted).toBe(true);
|
||||
|
||||
const history = await rpcReq<{ sideResults?: unknown[] }>(ws, "chat.history", {
|
||||
sessionKey: "main",
|
||||
});
|
||||
expect(history.ok).toBe(true);
|
||||
expect(history.payload?.sideResults ?? []).toEqual([
|
||||
{
|
||||
kind: "btw",
|
||||
question: "what changed?",
|
||||
text: "nothing important",
|
||||
ts: 123,
|
||||
},
|
||||
]);
|
||||
|
||||
ws.close();
|
||||
});
|
||||
|
||||
test("sessions.delete rejects main and aborts active runs", async () => {
|
||||
const { dir } = await createSessionStoreDir();
|
||||
await writeSingleLineSession(dir, "sess-main", "hello");
|
||||
|
||||
@@ -2,12 +2,14 @@ import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterAll, afterEach, beforeAll, describe, expect, test, vi } from "vitest";
|
||||
import { resolveSessionSideResultsPathFromTranscript } from "../sessions/side-results.js";
|
||||
import { createToolSummaryPreviewTranscriptLines } from "./session-preview.test-helpers.js";
|
||||
import {
|
||||
archiveSessionTranscripts,
|
||||
readFirstUserMessageFromTranscript,
|
||||
readLastMessagePreviewFromTranscript,
|
||||
readSessionMessages,
|
||||
readSessionSideResults,
|
||||
readSessionTitleFieldsFromTranscript,
|
||||
readSessionPreviewItemsFromTranscript,
|
||||
resolveSessionTranscriptCandidates,
|
||||
@@ -760,6 +762,32 @@ describe("archiveSessionTranscripts", () => {
|
||||
}
|
||||
});
|
||||
|
||||
test("archives BTW side-result sidecars alongside transcripts", () => {
|
||||
const sessionId = "sess-archive-side-results";
|
||||
const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`);
|
||||
const sidecarPath = resolveSessionSideResultsPathFromTranscript(transcriptPath);
|
||||
fs.writeFileSync(transcriptPath, '{"type":"session"}\n', "utf-8");
|
||||
fs.writeFileSync(
|
||||
sidecarPath,
|
||||
`${JSON.stringify({ kind: "btw", question: "q", text: "a", ts: 123 })}\n`,
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
const archived = archiveSessionTranscripts({
|
||||
sessionId,
|
||||
storePath,
|
||||
reason: "reset",
|
||||
});
|
||||
|
||||
expect(archived).toHaveLength(2);
|
||||
expect(archived.some((entry) => entry.includes(`${sessionId}.jsonl.reset.`))).toBe(true);
|
||||
expect(archived.some((entry) => entry.includes(`${sessionId}.side-results.jsonl.reset.`))).toBe(
|
||||
true,
|
||||
);
|
||||
expect(fs.existsSync(transcriptPath)).toBe(false);
|
||||
expect(fs.existsSync(sidecarPath)).toBe(false);
|
||||
});
|
||||
|
||||
test("returns empty array when no transcript files exist", () => {
|
||||
const archived = archiveSessionTranscripts({
|
||||
sessionId: "nonexistent-session",
|
||||
@@ -787,3 +815,77 @@ describe("archiveSessionTranscripts", () => {
|
||||
expect(fs.existsSync(transcriptPath)).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("readSessionSideResults", () => {
|
||||
let tmpDir: string;
|
||||
let storePath: string;
|
||||
|
||||
registerTempSessionStore("openclaw-side-results-test-", (nextTmpDir, nextStorePath) => {
|
||||
tmpDir = nextTmpDir;
|
||||
storePath = nextStorePath;
|
||||
});
|
||||
|
||||
test("reads BTW results from transcript custom entries", () => {
|
||||
const sessionId = "sess-btw-transcript";
|
||||
writeTranscript(tmpDir, sessionId, [
|
||||
{ type: "session", version: 1, id: sessionId },
|
||||
{
|
||||
type: "custom",
|
||||
customType: "openclaw:btw",
|
||||
data: {
|
||||
timestamp: 10,
|
||||
question: "what changed?",
|
||||
answer: "nothing",
|
||||
},
|
||||
},
|
||||
]);
|
||||
|
||||
expect(readSessionSideResults(sessionId, storePath)).toEqual([
|
||||
{
|
||||
kind: "btw",
|
||||
question: "what changed?",
|
||||
text: "nothing",
|
||||
ts: 10,
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
test("merges BTW sidecar records and dedupes transcript duplicates", () => {
|
||||
const sessionId = "sess-btw-sidecar";
|
||||
const transcriptPath = writeTranscript(tmpDir, sessionId, [
|
||||
{ type: "session", version: 1, id: sessionId },
|
||||
{
|
||||
type: "custom",
|
||||
customType: "openclaw:btw",
|
||||
data: {
|
||||
timestamp: 10,
|
||||
question: "what changed?",
|
||||
answer: "nothing",
|
||||
},
|
||||
},
|
||||
]);
|
||||
fs.writeFileSync(
|
||||
resolveSessionSideResultsPathFromTranscript(transcriptPath),
|
||||
[
|
||||
JSON.stringify({ kind: "btw", question: "what changed?", text: "nothing", ts: 10 }),
|
||||
JSON.stringify({ kind: "btw", question: "what now?", text: "keep going", ts: 20 }),
|
||||
].join("\n"),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
expect(readSessionSideResults(sessionId, storePath)).toEqual([
|
||||
{
|
||||
kind: "btw",
|
||||
question: "what changed?",
|
||||
text: "nothing",
|
||||
ts: 10,
|
||||
},
|
||||
{
|
||||
kind: "btw",
|
||||
question: "what now?",
|
||||
text: "keep going",
|
||||
ts: 20,
|
||||
},
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -12,6 +12,10 @@ import {
|
||||
import { resolveRequiredHomeDir } from "../infra/home-dir.js";
|
||||
import { jsonUtf8Bytes } from "../infra/json-utf8-bytes.js";
|
||||
import { hasInterSessionUserProvenance } from "../sessions/input-provenance.js";
|
||||
import {
|
||||
resolveSessionSideResultsPathFromTranscript,
|
||||
type PersistedSessionSideResult,
|
||||
} from "../sessions/side-results.js";
|
||||
import { stripInlineDirectiveTagsForDisplay } from "../utils/directive-tags.js";
|
||||
import { extractToolCallNames, hasToolCall } from "../utils/transcript-tools.js";
|
||||
import { stripEnvelope } from "./chat-sanitize.js";
|
||||
@@ -22,6 +26,16 @@ type SessionTitleFields = {
|
||||
lastMessagePreview: string | null;
|
||||
};
|
||||
|
||||
const BTW_CUSTOM_TYPE = "openclaw:btw";
|
||||
|
||||
export type SessionSideResult = {
|
||||
kind: "btw";
|
||||
question: string;
|
||||
text: string;
|
||||
isError?: boolean;
|
||||
ts?: number;
|
||||
};
|
||||
|
||||
type SessionTitleFieldsCacheEntry = SessionTitleFields & {
|
||||
mtimeMs: number;
|
||||
size: number;
|
||||
@@ -118,6 +132,126 @@ export function readSessionMessages(
|
||||
return messages;
|
||||
}
|
||||
|
||||
export function readSessionSideResults(
|
||||
sessionId: string,
|
||||
storePath: string | undefined,
|
||||
sessionFile?: string,
|
||||
): SessionSideResult[] {
|
||||
const candidates = resolveSessionTranscriptCandidates(sessionId, storePath, sessionFile);
|
||||
const resultsByKey = new Map<string, SessionSideResult>();
|
||||
|
||||
const pushResult = (result: SessionSideResult | null) => {
|
||||
if (!result) {
|
||||
return;
|
||||
}
|
||||
const dedupeKey = [
|
||||
result.kind,
|
||||
result.ts ?? "",
|
||||
result.question,
|
||||
result.text,
|
||||
result.isError ? "1" : "0",
|
||||
].join("\u0000");
|
||||
if (!resultsByKey.has(dedupeKey)) {
|
||||
resultsByKey.set(dedupeKey, result);
|
||||
}
|
||||
};
|
||||
|
||||
for (const filePath of candidates.filter((p) => fs.existsSync(p))) {
|
||||
const lines = fs.readFileSync(filePath, "utf-8").split(/\r?\n/);
|
||||
for (const line of lines) {
|
||||
if (!line.trim()) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
const parsed = JSON.parse(line) as {
|
||||
type?: string;
|
||||
customType?: string;
|
||||
data?: { question?: unknown; answer?: unknown; timestamp?: unknown; isError?: unknown };
|
||||
};
|
||||
pushResult(parseTranscriptSideResult(parsed));
|
||||
} catch {
|
||||
// ignore bad lines
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const sideResultPaths = new Set(
|
||||
candidates.map((candidate) => resolveSessionSideResultsPathFromTranscript(candidate)),
|
||||
);
|
||||
for (const filePath of sideResultPaths) {
|
||||
if (!fs.existsSync(filePath)) {
|
||||
continue;
|
||||
}
|
||||
const lines = fs.readFileSync(filePath, "utf-8").split(/\r?\n/);
|
||||
for (const line of lines) {
|
||||
if (!line.trim()) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
const parsed = JSON.parse(line) as PersistedSessionSideResult;
|
||||
pushResult(parsePersistedSideResult(parsed));
|
||||
} catch {
|
||||
// ignore bad lines
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return [...resultsByKey.values()].toSorted((left, right) => {
|
||||
const leftTs = left.ts ?? Number.MAX_SAFE_INTEGER;
|
||||
const rightTs = right.ts ?? Number.MAX_SAFE_INTEGER;
|
||||
if (leftTs !== rightTs) {
|
||||
return leftTs - rightTs;
|
||||
}
|
||||
return left.question.localeCompare(right.question);
|
||||
});
|
||||
}
|
||||
|
||||
function parseTranscriptSideResult(parsed: {
|
||||
type?: string;
|
||||
customType?: string;
|
||||
data?: { question?: unknown; answer?: unknown; timestamp?: unknown; isError?: unknown };
|
||||
}): SessionSideResult | null {
|
||||
if (parsed.type !== "custom" || parsed.customType !== BTW_CUSTOM_TYPE) {
|
||||
return null;
|
||||
}
|
||||
const question = typeof parsed.data?.question === "string" ? parsed.data.question.trim() : "";
|
||||
const text = typeof parsed.data?.answer === "string" ? parsed.data.answer.trim() : "";
|
||||
if (!question || !text) {
|
||||
return null;
|
||||
}
|
||||
const timestamp =
|
||||
typeof parsed.data?.timestamp === "number" && Number.isFinite(parsed.data.timestamp)
|
||||
? parsed.data.timestamp
|
||||
: undefined;
|
||||
const isError = parsed.data?.isError === true ? true : undefined;
|
||||
return {
|
||||
kind: "btw",
|
||||
question,
|
||||
text,
|
||||
isError,
|
||||
ts: timestamp,
|
||||
};
|
||||
}
|
||||
|
||||
function parsePersistedSideResult(parsed: PersistedSessionSideResult): SessionSideResult | null {
|
||||
if (parsed.kind !== "btw") {
|
||||
return null;
|
||||
}
|
||||
const question = typeof parsed.question === "string" ? parsed.question.trim() : "";
|
||||
const text = typeof parsed.text === "string" ? parsed.text.trim() : "";
|
||||
if (!question || !text) {
|
||||
return null;
|
||||
}
|
||||
const timestamp = Number.isFinite(parsed.ts) ? parsed.ts : undefined;
|
||||
return {
|
||||
kind: "btw",
|
||||
question,
|
||||
text,
|
||||
isError: parsed.isError === true ? true : undefined,
|
||||
ts: timestamp,
|
||||
};
|
||||
}
|
||||
|
||||
export function resolveSessionTranscriptCandidates(
|
||||
sessionId: string,
|
||||
storePath: string | undefined,
|
||||
@@ -202,12 +336,17 @@ export function archiveSessionTranscripts(opts: {
|
||||
opts.restrictToStoreDir && opts.storePath
|
||||
? canonicalizePathForComparison(path.dirname(opts.storePath))
|
||||
: null;
|
||||
for (const candidate of resolveSessionTranscriptCandidates(
|
||||
const transcriptCandidates = resolveSessionTranscriptCandidates(
|
||||
opts.sessionId,
|
||||
opts.storePath,
|
||||
opts.sessionFile,
|
||||
opts.agentId,
|
||||
)) {
|
||||
);
|
||||
const archiveCandidates = new Set<string>(transcriptCandidates);
|
||||
for (const candidate of transcriptCandidates) {
|
||||
archiveCandidates.add(resolveSessionSideResultsPathFromTranscript(candidate));
|
||||
}
|
||||
for (const candidate of archiveCandidates) {
|
||||
const candidatePath = canonicalizePathForComparison(candidate);
|
||||
if (storeDir) {
|
||||
const relative = path.relative(storeDir, candidatePath);
|
||||
|
||||
@@ -57,6 +57,7 @@ export {
|
||||
readSessionTitleFieldsFromTranscript,
|
||||
readSessionPreviewItemsFromTranscript,
|
||||
readSessionMessages,
|
||||
readSessionSideResults,
|
||||
resolveSessionTranscriptCandidates,
|
||||
} from "./session-utils.fs.js";
|
||||
export type {
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { ChannelOutboundAdapter } from "../../channels/plugins/types.adapters.js";
|
||||
import { signalOutbound } from "../../channels/plugins/outbound/signal.js";
|
||||
import { telegramOutbound } from "../../channels/plugins/outbound/telegram.js";
|
||||
import { whatsappOutbound } from "../../channels/plugins/outbound/whatsapp.js";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import { STATE_DIR } from "../../config/paths.js";
|
||||
import { setActivePluginRegistry } from "../../plugins/runtime.js";
|
||||
@@ -8,15 +10,64 @@ import { markdownToSignalTextChunks } from "../../signal/format.js";
|
||||
import { createOutboundTestPlugin, createTestRegistry } from "../../test-utils/channel-plugins.js";
|
||||
import { withEnvAsync } from "../../test-utils/env.js";
|
||||
import { createIMessageTestPlugin } from "../../test-utils/imessage-test-plugin.js";
|
||||
import { createInternalHookEventPayload } from "../../test-utils/internal-hook-event-payload.js";
|
||||
import { resolvePreferredOpenClawTmpDir } from "../tmp-openclaw-dir.js";
|
||||
import {
|
||||
clearDeliverTestRegistry,
|
||||
hookMocks,
|
||||
resetDeliverTestState,
|
||||
resetDeliverTestMocks,
|
||||
runChunkedWhatsAppDelivery as runChunkedWhatsAppDeliveryHelper,
|
||||
whatsappChunkConfig,
|
||||
} from "./deliver.test-helpers.js";
|
||||
|
||||
const mocks = vi.hoisted(() => ({
|
||||
appendAssistantMessageToSessionTranscript: vi.fn(async () => ({ ok: true, sessionFile: "x" })),
|
||||
}));
|
||||
const hookMocks = vi.hoisted(() => ({
|
||||
runner: {
|
||||
hasHooks: vi.fn(() => false),
|
||||
runMessageSent: vi.fn(async () => {}),
|
||||
},
|
||||
}));
|
||||
const internalHookMocks = vi.hoisted(() => ({
|
||||
createInternalHookEvent: vi.fn(),
|
||||
triggerInternalHook: vi.fn(async () => {}),
|
||||
}));
|
||||
const queueMocks = vi.hoisted(() => ({
|
||||
enqueueDelivery: vi.fn(async () => "mock-queue-id"),
|
||||
ackDelivery: vi.fn(async () => {}),
|
||||
failDelivery: vi.fn(async () => {}),
|
||||
}));
|
||||
const logMocks = vi.hoisted(() => ({
|
||||
warn: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("../../config/sessions.js", async () => {
|
||||
const actual = await vi.importActual<typeof import("../../config/sessions.js")>(
|
||||
"../../config/sessions.js",
|
||||
);
|
||||
return {
|
||||
...actual,
|
||||
appendAssistantMessageToSessionTranscript: mocks.appendAssistantMessageToSessionTranscript,
|
||||
};
|
||||
});
|
||||
vi.mock("../../plugins/hook-runner-global.js", () => ({
|
||||
getGlobalHookRunner: () => hookMocks.runner,
|
||||
}));
|
||||
vi.mock("../../hooks/internal-hooks.js", () => ({
|
||||
createInternalHookEvent: internalHookMocks.createInternalHookEvent,
|
||||
triggerInternalHook: internalHookMocks.triggerInternalHook,
|
||||
}));
|
||||
vi.mock("./delivery-queue.js", () => ({
|
||||
enqueueDelivery: queueMocks.enqueueDelivery,
|
||||
ackDelivery: queueMocks.ackDelivery,
|
||||
failDelivery: queueMocks.failDelivery,
|
||||
}));
|
||||
vi.mock("../../logging/subsystem.js", () => ({
|
||||
createSubsystemLogger: () => {
|
||||
const makeLogger = () => ({
|
||||
warn: logMocks.warn,
|
||||
info: vi.fn(),
|
||||
error: vi.fn(),
|
||||
debug: vi.fn(),
|
||||
child: vi.fn(() => makeLogger()),
|
||||
});
|
||||
return makeLogger();
|
||||
},
|
||||
}));
|
||||
|
||||
const { deliverOutboundPayloads, normalizeOutboundPayloads } = await import("./deliver.js");
|
||||
|
||||
@@ -24,34 +75,14 @@ const telegramChunkConfig: OpenClawConfig = {
|
||||
channels: { telegram: { botToken: "tok-1", textChunkLimit: 2 } },
|
||||
};
|
||||
|
||||
const whatsappChunkConfig: OpenClawConfig = {
|
||||
channels: { whatsapp: { textChunkLimit: 4000 } },
|
||||
};
|
||||
|
||||
type DeliverOutboundArgs = Parameters<typeof deliverOutboundPayloads>[0];
|
||||
type DeliverOutboundPayload = DeliverOutboundArgs["payloads"][number];
|
||||
type DeliverSession = DeliverOutboundArgs["session"];
|
||||
|
||||
function setMatrixTextOnlyPlugin(sendText: NonNullable<ChannelOutboundAdapter["sendText"]>) {
|
||||
setActivePluginRegistry(
|
||||
createTestRegistry([
|
||||
{
|
||||
pluginId: "matrix",
|
||||
source: "test",
|
||||
plugin: createOutboundTestPlugin({
|
||||
id: "matrix",
|
||||
outbound: { deliveryMode: "direct", sendText },
|
||||
}),
|
||||
},
|
||||
]),
|
||||
);
|
||||
}
|
||||
|
||||
async function deliverMatrixPayloads(payloads: DeliverOutboundPayload[]) {
|
||||
return deliverOutboundPayloads({
|
||||
cfg: {},
|
||||
channel: "matrix",
|
||||
to: "!room:1",
|
||||
payloads,
|
||||
});
|
||||
}
|
||||
|
||||
async function deliverWhatsAppPayload(params: {
|
||||
sendWhatsApp: NonNullable<
|
||||
NonNullable<Parameters<typeof deliverOutboundPayloads>[0]["deps"]>["sendWhatsApp"]
|
||||
@@ -86,14 +117,96 @@ async function deliverTelegramPayload(params: {
|
||||
});
|
||||
}
|
||||
|
||||
async function runChunkedWhatsAppDelivery(params?: {
|
||||
mirror?: Parameters<typeof deliverOutboundPayloads>[0]["mirror"];
|
||||
}) {
|
||||
const sendWhatsApp = vi
|
||||
.fn()
|
||||
.mockResolvedValueOnce({ messageId: "w1", toJid: "jid" })
|
||||
.mockResolvedValueOnce({ messageId: "w2", toJid: "jid" });
|
||||
const cfg: OpenClawConfig = {
|
||||
channels: { whatsapp: { textChunkLimit: 2 } },
|
||||
};
|
||||
const results = await deliverOutboundPayloads({
|
||||
cfg,
|
||||
channel: "whatsapp",
|
||||
to: "+1555",
|
||||
payloads: [{ text: "abcd" }],
|
||||
deps: { sendWhatsApp },
|
||||
...(params?.mirror ? { mirror: params.mirror } : {}),
|
||||
});
|
||||
return { sendWhatsApp, results };
|
||||
}
|
||||
|
||||
async function deliverSingleWhatsAppForHookTest(params?: { sessionKey?: string }) {
|
||||
const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" });
|
||||
await deliverOutboundPayloads({
|
||||
cfg: whatsappChunkConfig,
|
||||
channel: "whatsapp",
|
||||
to: "+1555",
|
||||
payloads: [{ text: "hello" }],
|
||||
deps: { sendWhatsApp },
|
||||
...(params?.sessionKey ? { session: { key: params.sessionKey } } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
async function runBestEffortPartialFailureDelivery() {
|
||||
const sendWhatsApp = vi
|
||||
.fn()
|
||||
.mockRejectedValueOnce(new Error("fail"))
|
||||
.mockResolvedValueOnce({ messageId: "w2", toJid: "jid" });
|
||||
const onError = vi.fn();
|
||||
const cfg: OpenClawConfig = {};
|
||||
const results = await deliverOutboundPayloads({
|
||||
cfg,
|
||||
channel: "whatsapp",
|
||||
to: "+1555",
|
||||
payloads: [{ text: "a" }, { text: "b" }],
|
||||
deps: { sendWhatsApp },
|
||||
bestEffort: true,
|
||||
onError,
|
||||
});
|
||||
return { sendWhatsApp, onError, results };
|
||||
}
|
||||
|
||||
function expectSuccessfulWhatsAppInternalHookPayload(
|
||||
expected: Partial<{
|
||||
content: string;
|
||||
messageId: string;
|
||||
isGroup: boolean;
|
||||
groupId: string;
|
||||
}>,
|
||||
) {
|
||||
return expect.objectContaining({
|
||||
to: "+1555",
|
||||
success: true,
|
||||
channelId: "whatsapp",
|
||||
conversationId: "+1555",
|
||||
...expected,
|
||||
});
|
||||
}
|
||||
|
||||
describe("deliverOutboundPayloads", () => {
|
||||
beforeEach(() => {
|
||||
resetDeliverTestState();
|
||||
resetDeliverTestMocks();
|
||||
setActivePluginRegistry(defaultRegistry);
|
||||
hookMocks.runner.hasHooks.mockClear();
|
||||
hookMocks.runner.hasHooks.mockReturnValue(false);
|
||||
hookMocks.runner.runMessageSent.mockClear();
|
||||
hookMocks.runner.runMessageSent.mockResolvedValue(undefined);
|
||||
internalHookMocks.createInternalHookEvent.mockClear();
|
||||
internalHookMocks.createInternalHookEvent.mockImplementation(createInternalHookEventPayload);
|
||||
internalHookMocks.triggerInternalHook.mockClear();
|
||||
queueMocks.enqueueDelivery.mockClear();
|
||||
queueMocks.enqueueDelivery.mockResolvedValue("mock-queue-id");
|
||||
queueMocks.ackDelivery.mockClear();
|
||||
queueMocks.ackDelivery.mockResolvedValue(undefined);
|
||||
queueMocks.failDelivery.mockClear();
|
||||
queueMocks.failDelivery.mockResolvedValue(undefined);
|
||||
logMocks.warn.mockClear();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
clearDeliverTestRegistry();
|
||||
setActivePluginRegistry(emptyRegistry);
|
||||
});
|
||||
it("chunks telegram markdown and passes through accountId", async () => {
|
||||
const sendTelegram = vi.fn().mockResolvedValue({ messageId: "m1", chatId: "c1" });
|
||||
@@ -175,6 +288,24 @@ describe("deliverOutboundPayloads", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("prefixes BTW replies for telegram delivery", async () => {
|
||||
const sendTelegram = vi.fn().mockResolvedValue({ messageId: "m1", chatId: "c1" });
|
||||
|
||||
await deliverTelegramPayload({
|
||||
sendTelegram,
|
||||
cfg: {
|
||||
channels: { telegram: { botToken: "tok-1", textChunkLimit: 100 } },
|
||||
},
|
||||
payload: { text: "323", btw: { question: "what is 17 * 19?" } },
|
||||
});
|
||||
|
||||
expect(sendTelegram).toHaveBeenCalledWith(
|
||||
"123",
|
||||
"BTW: 323",
|
||||
expect.objectContaining({ verbose: false, textMode: "html" }),
|
||||
);
|
||||
});
|
||||
|
||||
it("preserves HTML text for telegram sendPayload channelData path", async () => {
|
||||
const sendTelegram = vi.fn().mockResolvedValue({ messageId: "m1", chatId: "c1" });
|
||||
|
||||
@@ -416,9 +547,7 @@ describe("deliverOutboundPayloads", () => {
|
||||
});
|
||||
|
||||
it("chunks WhatsApp text and returns all results", async () => {
|
||||
const { sendWhatsApp, results } = await runChunkedWhatsAppDeliveryHelper({
|
||||
deliverOutboundPayloads,
|
||||
});
|
||||
const { sendWhatsApp, results } = await runChunkedWhatsAppDelivery();
|
||||
|
||||
expect(sendWhatsApp).toHaveBeenCalledTimes(2);
|
||||
expect(results.map((r) => r.messageId)).toEqual(["w1", "w2"]);
|
||||
@@ -614,6 +743,222 @@ describe("deliverOutboundPayloads", () => {
|
||||
]);
|
||||
});
|
||||
|
||||
it("prefixes BTW replies for whatsapp delivery", async () => {
|
||||
const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" });
|
||||
|
||||
await deliverWhatsAppPayload({
|
||||
sendWhatsApp,
|
||||
payload: { text: "323", btw: { question: "what is 17 * 19?" } },
|
||||
});
|
||||
|
||||
expect(sendWhatsApp).toHaveBeenCalledWith("+1555", "BTW: 323", expect.any(Object));
|
||||
});
|
||||
|
||||
it("continues on errors when bestEffort is enabled", async () => {
|
||||
const { sendWhatsApp, onError, results } = await runBestEffortPartialFailureDelivery();
|
||||
|
||||
expect(sendWhatsApp).toHaveBeenCalledTimes(2);
|
||||
expect(onError).toHaveBeenCalledTimes(1);
|
||||
expect(results).toEqual([{ channel: "whatsapp", messageId: "w2", toJid: "jid" }]);
|
||||
});
|
||||
|
||||
it("emits internal message:sent hook with success=true for chunked payload delivery", async () => {
|
||||
const { sendWhatsApp } = await runChunkedWhatsAppDelivery({
|
||||
mirror: {
|
||||
sessionKey: "agent:main:main",
|
||||
isGroup: true,
|
||||
groupId: "whatsapp:group:123",
|
||||
},
|
||||
});
|
||||
expect(sendWhatsApp).toHaveBeenCalledTimes(2);
|
||||
|
||||
expect(internalHookMocks.createInternalHookEvent).toHaveBeenCalledTimes(1);
|
||||
expect(internalHookMocks.createInternalHookEvent).toHaveBeenCalledWith(
|
||||
"message",
|
||||
"sent",
|
||||
"agent:main:main",
|
||||
expectSuccessfulWhatsAppInternalHookPayload({
|
||||
content: "abcd",
|
||||
messageId: "w2",
|
||||
isGroup: true,
|
||||
groupId: "whatsapp:group:123",
|
||||
}),
|
||||
);
|
||||
expect(internalHookMocks.triggerInternalHook).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("does not emit internal message:sent hook when neither mirror nor sessionKey is provided", async () => {
|
||||
await deliverSingleWhatsAppForHookTest();
|
||||
|
||||
expect(internalHookMocks.createInternalHookEvent).not.toHaveBeenCalled();
|
||||
expect(internalHookMocks.triggerInternalHook).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("emits internal message:sent hook when sessionKey is provided without mirror", async () => {
|
||||
await deliverSingleWhatsAppForHookTest({ sessionKey: "agent:main:main" });
|
||||
|
||||
expect(internalHookMocks.createInternalHookEvent).toHaveBeenCalledTimes(1);
|
||||
expect(internalHookMocks.createInternalHookEvent).toHaveBeenCalledWith(
|
||||
"message",
|
||||
"sent",
|
||||
"agent:main:main",
|
||||
expectSuccessfulWhatsAppInternalHookPayload({ content: "hello", messageId: "w1" }),
|
||||
);
|
||||
expect(internalHookMocks.triggerInternalHook).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("warns when session.agentId is set without a session key", async () => {
|
||||
const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" });
|
||||
hookMocks.runner.hasHooks.mockReturnValue(true);
|
||||
|
||||
await deliverOutboundPayloads({
|
||||
cfg: whatsappChunkConfig,
|
||||
channel: "whatsapp",
|
||||
to: "+1555",
|
||||
payloads: [{ text: "hello" }],
|
||||
deps: { sendWhatsApp },
|
||||
session: { agentId: "agent-main" },
|
||||
});
|
||||
|
||||
expect(logMocks.warn).toHaveBeenCalledWith(
|
||||
"deliverOutboundPayloads: session.agentId present without session key; internal message:sent hook will be skipped",
|
||||
expect.objectContaining({ channel: "whatsapp", to: "+1555", agentId: "agent-main" }),
|
||||
);
|
||||
});
|
||||
|
||||
it("calls failDelivery instead of ackDelivery on bestEffort partial failure", async () => {
|
||||
const { onError } = await runBestEffortPartialFailureDelivery();
|
||||
|
||||
// onError was called for the first payload's failure.
|
||||
expect(onError).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Queue entry should NOT be acked — failDelivery should be called instead.
|
||||
expect(queueMocks.ackDelivery).not.toHaveBeenCalled();
|
||||
expect(queueMocks.failDelivery).toHaveBeenCalledWith(
|
||||
"mock-queue-id",
|
||||
"partial delivery failure (bestEffort)",
|
||||
);
|
||||
});
|
||||
|
||||
it("acks the queue entry when delivery is aborted", async () => {
|
||||
const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" });
|
||||
const abortController = new AbortController();
|
||||
abortController.abort();
|
||||
const cfg: OpenClawConfig = {};
|
||||
|
||||
await expect(
|
||||
deliverOutboundPayloads({
|
||||
cfg,
|
||||
channel: "whatsapp",
|
||||
to: "+1555",
|
||||
payloads: [{ text: "a" }],
|
||||
deps: { sendWhatsApp },
|
||||
abortSignal: abortController.signal,
|
||||
}),
|
||||
).rejects.toThrow("Operation aborted");
|
||||
|
||||
expect(queueMocks.ackDelivery).toHaveBeenCalledWith("mock-queue-id");
|
||||
expect(queueMocks.failDelivery).not.toHaveBeenCalled();
|
||||
expect(sendWhatsApp).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("passes normalized payload to onError", async () => {
|
||||
const sendWhatsApp = vi.fn().mockRejectedValue(new Error("boom"));
|
||||
const onError = vi.fn();
|
||||
const cfg: OpenClawConfig = {};
|
||||
|
||||
await deliverOutboundPayloads({
|
||||
cfg,
|
||||
channel: "whatsapp",
|
||||
to: "+1555",
|
||||
payloads: [{ text: "hi", mediaUrl: "https://x.test/a.jpg" }],
|
||||
deps: { sendWhatsApp },
|
||||
bestEffort: true,
|
||||
onError,
|
||||
});
|
||||
|
||||
expect(onError).toHaveBeenCalledTimes(1);
|
||||
expect(onError).toHaveBeenCalledWith(
|
||||
expect.any(Error),
|
||||
expect.objectContaining({ text: "hi", mediaUrls: ["https://x.test/a.jpg"] }),
|
||||
);
|
||||
});
|
||||
|
||||
it("mirrors delivered output when mirror options are provided", async () => {
|
||||
const sendTelegram = vi.fn().mockResolvedValue({ messageId: "m1", chatId: "c1" });
|
||||
mocks.appendAssistantMessageToSessionTranscript.mockClear();
|
||||
|
||||
await deliverOutboundPayloads({
|
||||
cfg: telegramChunkConfig,
|
||||
channel: "telegram",
|
||||
to: "123",
|
||||
payloads: [{ text: "caption", mediaUrl: "https://example.com/files/report.pdf?sig=1" }],
|
||||
deps: { sendTelegram },
|
||||
mirror: {
|
||||
sessionKey: "agent:main:main",
|
||||
text: "caption",
|
||||
mediaUrls: ["https://example.com/files/report.pdf?sig=1"],
|
||||
idempotencyKey: "idem-deliver-1",
|
||||
},
|
||||
});
|
||||
|
||||
expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
text: "report.pdf",
|
||||
idempotencyKey: "idem-deliver-1",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("emits message_sent success for text-only deliveries", async () => {
|
||||
hookMocks.runner.hasHooks.mockReturnValue(true);
|
||||
const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" });
|
||||
|
||||
await deliverOutboundPayloads({
|
||||
cfg: {},
|
||||
channel: "whatsapp",
|
||||
to: "+1555",
|
||||
payloads: [{ text: "hello" }],
|
||||
deps: { sendWhatsApp },
|
||||
});
|
||||
|
||||
expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ to: "+1555", content: "hello", success: true }),
|
||||
expect.objectContaining({ channelId: "whatsapp" }),
|
||||
);
|
||||
});
|
||||
|
||||
it("emits message_sent success for sendPayload deliveries", async () => {
|
||||
hookMocks.runner.hasHooks.mockReturnValue(true);
|
||||
const sendPayload = vi.fn().mockResolvedValue({ channel: "matrix", messageId: "mx-1" });
|
||||
const sendText = vi.fn();
|
||||
const sendMedia = vi.fn();
|
||||
setActivePluginRegistry(
|
||||
createTestRegistry([
|
||||
{
|
||||
pluginId: "matrix",
|
||||
source: "test",
|
||||
plugin: createOutboundTestPlugin({
|
||||
id: "matrix",
|
||||
outbound: { deliveryMode: "direct", sendPayload, sendText, sendMedia },
|
||||
}),
|
||||
},
|
||||
]),
|
||||
);
|
||||
|
||||
await deliverOutboundPayloads({
|
||||
cfg: {},
|
||||
channel: "matrix",
|
||||
to: "!room:1",
|
||||
payloads: [{ text: "payload text", channelData: { mode: "custom" } }],
|
||||
});
|
||||
|
||||
expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ to: "!room:1", content: "payload text", success: true }),
|
||||
expect.objectContaining({ channelId: "matrix" }),
|
||||
);
|
||||
});
|
||||
|
||||
it("preserves channelData-only payloads with empty text for non-WhatsApp sendPayload channels", async () => {
|
||||
const sendPayload = vi.fn().mockResolvedValue({ channel: "line", messageId: "ln-1" });
|
||||
const sendText = vi.fn();
|
||||
@@ -649,11 +994,25 @@ describe("deliverOutboundPayloads", () => {
|
||||
|
||||
it("falls back to sendText when plugin outbound omits sendMedia", async () => {
|
||||
const sendText = vi.fn().mockResolvedValue({ channel: "matrix", messageId: "mx-1" });
|
||||
setMatrixTextOnlyPlugin(sendText);
|
||||
setActivePluginRegistry(
|
||||
createTestRegistry([
|
||||
{
|
||||
pluginId: "matrix",
|
||||
source: "test",
|
||||
plugin: createOutboundTestPlugin({
|
||||
id: "matrix",
|
||||
outbound: { deliveryMode: "direct", sendText },
|
||||
}),
|
||||
},
|
||||
]),
|
||||
);
|
||||
|
||||
const results = await deliverMatrixPayloads([
|
||||
{ text: "caption", mediaUrl: "https://example.com/file.png" },
|
||||
]);
|
||||
const results = await deliverOutboundPayloads({
|
||||
cfg: {},
|
||||
channel: "matrix",
|
||||
to: "!room:1",
|
||||
payloads: [{ text: "caption", mediaUrl: "https://example.com/file.png" }],
|
||||
});
|
||||
|
||||
expect(sendText).toHaveBeenCalledTimes(1);
|
||||
expect(sendText).toHaveBeenCalledWith(
|
||||
@@ -661,19 +1020,42 @@ describe("deliverOutboundPayloads", () => {
|
||||
text: "caption",
|
||||
}),
|
||||
);
|
||||
expect(logMocks.warn).toHaveBeenCalledWith(
|
||||
"Plugin outbound adapter does not implement sendMedia; media URLs will be dropped and text fallback will be used",
|
||||
expect.objectContaining({
|
||||
channel: "matrix",
|
||||
mediaCount: 1,
|
||||
}),
|
||||
);
|
||||
expect(results).toEqual([{ channel: "matrix", messageId: "mx-1" }]);
|
||||
});
|
||||
|
||||
it("falls back to one sendText call for multi-media payloads when sendMedia is omitted", async () => {
|
||||
const sendText = vi.fn().mockResolvedValue({ channel: "matrix", messageId: "mx-2" });
|
||||
setMatrixTextOnlyPlugin(sendText);
|
||||
setActivePluginRegistry(
|
||||
createTestRegistry([
|
||||
{
|
||||
pluginId: "matrix",
|
||||
source: "test",
|
||||
plugin: createOutboundTestPlugin({
|
||||
id: "matrix",
|
||||
outbound: { deliveryMode: "direct", sendText },
|
||||
}),
|
||||
},
|
||||
]),
|
||||
);
|
||||
|
||||
const results = await deliverMatrixPayloads([
|
||||
{
|
||||
text: "caption",
|
||||
mediaUrls: ["https://example.com/a.png", "https://example.com/b.png"],
|
||||
},
|
||||
]);
|
||||
const results = await deliverOutboundPayloads({
|
||||
cfg: {},
|
||||
channel: "matrix",
|
||||
to: "!room:1",
|
||||
payloads: [
|
||||
{
|
||||
text: "caption",
|
||||
mediaUrls: ["https://example.com/a.png", "https://example.com/b.png"],
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
expect(sendText).toHaveBeenCalledTimes(1);
|
||||
expect(sendText).toHaveBeenCalledWith(
|
||||
@@ -681,20 +1063,109 @@ describe("deliverOutboundPayloads", () => {
|
||||
text: "caption",
|
||||
}),
|
||||
);
|
||||
expect(logMocks.warn).toHaveBeenCalledWith(
|
||||
"Plugin outbound adapter does not implement sendMedia; media URLs will be dropped and text fallback will be used",
|
||||
expect.objectContaining({
|
||||
channel: "matrix",
|
||||
mediaCount: 2,
|
||||
}),
|
||||
);
|
||||
expect(results).toEqual([{ channel: "matrix", messageId: "mx-2" }]);
|
||||
});
|
||||
|
||||
it("fails media-only payloads when plugin outbound omits sendMedia", async () => {
|
||||
hookMocks.runner.hasHooks.mockReturnValue(true);
|
||||
const sendText = vi.fn().mockResolvedValue({ channel: "matrix", messageId: "mx-3" });
|
||||
setMatrixTextOnlyPlugin(sendText);
|
||||
setActivePluginRegistry(
|
||||
createTestRegistry([
|
||||
{
|
||||
pluginId: "matrix",
|
||||
source: "test",
|
||||
plugin: createOutboundTestPlugin({
|
||||
id: "matrix",
|
||||
outbound: { deliveryMode: "direct", sendText },
|
||||
}),
|
||||
},
|
||||
]),
|
||||
);
|
||||
|
||||
await expect(
|
||||
deliverMatrixPayloads([{ text: " ", mediaUrl: "https://example.com/file.png" }]),
|
||||
deliverOutboundPayloads({
|
||||
cfg: {},
|
||||
channel: "matrix",
|
||||
to: "!room:1",
|
||||
payloads: [{ text: " ", mediaUrl: "https://example.com/file.png" }],
|
||||
}),
|
||||
).rejects.toThrow(
|
||||
"Plugin outbound adapter does not implement sendMedia and no text fallback is available for media payload",
|
||||
);
|
||||
|
||||
expect(sendText).not.toHaveBeenCalled();
|
||||
expect(logMocks.warn).toHaveBeenCalledWith(
|
||||
"Plugin outbound adapter does not implement sendMedia; media URLs will be dropped and text fallback will be used",
|
||||
expect.objectContaining({
|
||||
channel: "matrix",
|
||||
mediaCount: 1,
|
||||
}),
|
||||
);
|
||||
expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
to: "!room:1",
|
||||
content: "",
|
||||
success: false,
|
||||
error:
|
||||
"Plugin outbound adapter does not implement sendMedia and no text fallback is available for media payload",
|
||||
}),
|
||||
expect.objectContaining({ channelId: "matrix" }),
|
||||
);
|
||||
});
|
||||
|
||||
it("emits message_sent failure when delivery errors", async () => {
|
||||
hookMocks.runner.hasHooks.mockReturnValue(true);
|
||||
const sendWhatsApp = vi.fn().mockRejectedValue(new Error("downstream failed"));
|
||||
|
||||
await expect(
|
||||
deliverOutboundPayloads({
|
||||
cfg: {},
|
||||
channel: "whatsapp",
|
||||
to: "+1555",
|
||||
payloads: [{ text: "hi" }],
|
||||
deps: { sendWhatsApp },
|
||||
}),
|
||||
).rejects.toThrow("downstream failed");
|
||||
|
||||
expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
to: "+1555",
|
||||
content: "hi",
|
||||
success: false,
|
||||
error: "downstream failed",
|
||||
}),
|
||||
expect.objectContaining({ channelId: "whatsapp" }),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
const emptyRegistry = createTestRegistry([]);
|
||||
const defaultRegistry = createTestRegistry([
|
||||
{
|
||||
pluginId: "telegram",
|
||||
plugin: createOutboundTestPlugin({ id: "telegram", outbound: telegramOutbound }),
|
||||
source: "test",
|
||||
},
|
||||
{
|
||||
pluginId: "signal",
|
||||
plugin: createOutboundTestPlugin({ id: "signal", outbound: signalOutbound }),
|
||||
source: "test",
|
||||
},
|
||||
{
|
||||
pluginId: "whatsapp",
|
||||
plugin: createOutboundTestPlugin({ id: "whatsapp", outbound: whatsappOutbound }),
|
||||
source: "test",
|
||||
},
|
||||
{
|
||||
pluginId: "imessage",
|
||||
plugin: createIMessageTestPlugin(),
|
||||
source: "test",
|
||||
},
|
||||
]);
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,5 +1,6 @@
|
||||
import { parseReplyDirectives } from "../../auto-reply/reply/reply-directives.js";
|
||||
import {
|
||||
formatBtwTextForExternalDelivery,
|
||||
isRenderablePayload,
|
||||
shouldSuppressReasoningPayload,
|
||||
} from "../../auto-reply/reply/reply-payloads.js";
|
||||
@@ -59,7 +60,11 @@ export function normalizeReplyPayloadsForDelivery(
|
||||
const resolvedMediaUrl = hasMultipleMedia ? undefined : explicitMediaUrl;
|
||||
const next: ReplyPayload = {
|
||||
...payload,
|
||||
text: parsed.text ?? "",
|
||||
text:
|
||||
formatBtwTextForExternalDelivery({
|
||||
...payload,
|
||||
text: parsed.text ?? "",
|
||||
}) ?? "",
|
||||
mediaUrls: mergedMedia.length ? mergedMedia : undefined,
|
||||
mediaUrl: resolvedMediaUrl,
|
||||
replyToId: payload.replyToId ?? parsed.replyToId,
|
||||
|
||||
@@ -5,6 +5,7 @@ import type { NormalizedUsage, UsageLike } from "../agents/usage.js";
|
||||
import { normalizeUsage } from "../agents/usage.js";
|
||||
import { stripInboundMetadata } from "../auto-reply/reply/strip-inbound-meta.js";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { isPrimarySessionTranscriptFileName } from "../config/sessions.js";
|
||||
import {
|
||||
resolveSessionFilePath,
|
||||
resolveSessionTranscriptsDirForAgent,
|
||||
@@ -318,7 +319,7 @@ export async function loadCostUsageSummary(params?: {
|
||||
const files = (
|
||||
await Promise.all(
|
||||
entries
|
||||
.filter((entry) => entry.isFile() && entry.name.endsWith(".jsonl"))
|
||||
.filter((entry) => entry.isFile() && isPrimarySessionTranscriptFileName(entry.name))
|
||||
.map(async (entry) => {
|
||||
const filePath = path.join(sessionsDir, entry.name);
|
||||
const stats = await fs.promises.stat(filePath).catch(() => null);
|
||||
@@ -393,7 +394,7 @@ export async function discoverAllSessions(params?: {
|
||||
const discovered: DiscoveredSession[] = [];
|
||||
|
||||
for (const entry of entries) {
|
||||
if (!entry.isFile() || !entry.name.endsWith(".jsonl")) {
|
||||
if (!entry.isFile() || !isPrimarySessionTranscriptFileName(entry.name)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
37
src/sessions/side-results.ts
Normal file
37
src/sessions/side-results.ts
Normal file
@@ -0,0 +1,37 @@
|
||||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
|
||||
export const SESSION_SIDE_RESULTS_SUFFIX = ".side-results.jsonl";
|
||||
|
||||
export type PersistedSessionSideResult = {
|
||||
kind: "btw";
|
||||
question: string;
|
||||
text: string;
|
||||
ts: number;
|
||||
isError?: boolean;
|
||||
provider?: string;
|
||||
model?: string;
|
||||
thinkingLevel?: string;
|
||||
reasoningLevel?: string;
|
||||
sessionKey?: string;
|
||||
authProfileId?: string;
|
||||
authProfileIdSource?: "auto" | "user";
|
||||
usage?: unknown;
|
||||
};
|
||||
|
||||
export function resolveSessionSideResultsPathFromTranscript(transcriptPath: string): string {
|
||||
const resolved = path.resolve(transcriptPath.trim());
|
||||
return resolved.endsWith(".jsonl")
|
||||
? `${resolved.slice(0, -".jsonl".length)}${SESSION_SIDE_RESULTS_SUFFIX}`
|
||||
: `${resolved}${SESSION_SIDE_RESULTS_SUFFIX}`;
|
||||
}
|
||||
|
||||
export function appendSessionSideResult(params: {
|
||||
transcriptPath: string;
|
||||
result: PersistedSessionSideResult;
|
||||
}) {
|
||||
const filePath = resolveSessionSideResultsPathFromTranscript(params.transcriptPath);
|
||||
fs.mkdirSync(path.dirname(filePath), { recursive: true });
|
||||
fs.appendFileSync(filePath, `${JSON.stringify(params.result)}\n`, "utf-8");
|
||||
return filePath;
|
||||
}
|
||||
16
src/tui/components/btw-inline-message.test.ts
Normal file
16
src/tui/components/btw-inline-message.test.ts
Normal file
@@ -0,0 +1,16 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { BtwInlineMessage } from "./btw-inline-message.js";
|
||||
|
||||
describe("btw inline message", () => {
|
||||
it("renders the BTW question, answer, and dismiss hint inline", () => {
|
||||
const message = new BtwInlineMessage({
|
||||
question: "what is 17 * 19?",
|
||||
text: "323",
|
||||
});
|
||||
|
||||
const rendered = message.render(80).join("\n");
|
||||
expect(rendered).toContain("BTW: what is 17 * 19?");
|
||||
expect(rendered).toContain("323");
|
||||
expect(rendered).toContain("Press Enter or Esc to dismiss");
|
||||
});
|
||||
});
|
||||
28
src/tui/components/btw-inline-message.ts
Normal file
28
src/tui/components/btw-inline-message.ts
Normal file
@@ -0,0 +1,28 @@
|
||||
import { Container, Spacer, Text } from "@mariozechner/pi-tui";
|
||||
import { theme } from "../theme/theme.js";
|
||||
import { AssistantMessageComponent } from "./assistant-message.js";
|
||||
|
||||
type BtwInlineMessageParams = {
|
||||
question: string;
|
||||
text: string;
|
||||
isError?: boolean;
|
||||
};
|
||||
|
||||
export class BtwInlineMessage extends Container {
|
||||
constructor(params: BtwInlineMessageParams) {
|
||||
super();
|
||||
this.setResult(params);
|
||||
}
|
||||
|
||||
setResult(params: BtwInlineMessageParams) {
|
||||
this.clear();
|
||||
this.addChild(new Spacer(1));
|
||||
this.addChild(new Text(theme.header(`BTW: ${params.question}`), 1, 0));
|
||||
if (params.isError) {
|
||||
this.addChild(new Text(theme.error(params.text), 1, 0));
|
||||
} else {
|
||||
this.addChild(new AssistantMessageComponent(params.text));
|
||||
}
|
||||
this.addChild(new Text(theme.dim("Press Enter or Esc to dismiss"), 1, 0));
|
||||
}
|
||||
}
|
||||
@@ -52,4 +52,25 @@ describe("ChatLog", () => {
|
||||
|
||||
expect(chatLog.children.length).toBe(20);
|
||||
});
|
||||
|
||||
it("renders BTW inline and removes it when dismissed", () => {
|
||||
const chatLog = new ChatLog(40);
|
||||
|
||||
chatLog.addSystem("session agent:main:main");
|
||||
chatLog.showBtw({
|
||||
question: "what is 17 * 19?",
|
||||
text: "323",
|
||||
});
|
||||
|
||||
let rendered = chatLog.render(120).join("\n");
|
||||
expect(rendered).toContain("BTW: what is 17 * 19?");
|
||||
expect(rendered).toContain("323");
|
||||
expect(chatLog.hasVisibleBtw()).toBe(true);
|
||||
|
||||
chatLog.dismissBtw();
|
||||
|
||||
rendered = chatLog.render(120).join("\n");
|
||||
expect(rendered).not.toContain("BTW: what is 17 * 19?");
|
||||
expect(chatLog.hasVisibleBtw()).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -2,6 +2,7 @@ import type { Component } from "@mariozechner/pi-tui";
|
||||
import { Container, Spacer, Text } from "@mariozechner/pi-tui";
|
||||
import { theme } from "../theme/theme.js";
|
||||
import { AssistantMessageComponent } from "./assistant-message.js";
|
||||
import { BtwInlineMessage } from "./btw-inline-message.js";
|
||||
import { ToolExecutionComponent } from "./tool-execution.js";
|
||||
import { UserMessageComponent } from "./user-message.js";
|
||||
|
||||
@@ -9,6 +10,7 @@ export class ChatLog extends Container {
|
||||
private readonly maxComponents: number;
|
||||
private toolById = new Map<string, ToolExecutionComponent>();
|
||||
private streamingRuns = new Map<string, AssistantMessageComponent>();
|
||||
private btwMessage: BtwInlineMessage | null = null;
|
||||
private toolsExpanded = false;
|
||||
|
||||
constructor(maxComponents = 180) {
|
||||
@@ -27,6 +29,9 @@ export class ChatLog extends Container {
|
||||
this.streamingRuns.delete(runId);
|
||||
}
|
||||
}
|
||||
if (this.btwMessage === component) {
|
||||
this.btwMessage = null;
|
||||
}
|
||||
}
|
||||
|
||||
private pruneOverflow() {
|
||||
@@ -49,6 +54,7 @@ export class ChatLog extends Container {
|
||||
this.clear();
|
||||
this.toolById.clear();
|
||||
this.streamingRuns.clear();
|
||||
this.btwMessage = null;
|
||||
}
|
||||
|
||||
addSystem(text: string) {
|
||||
@@ -108,6 +114,33 @@ export class ChatLog extends Container {
|
||||
this.streamingRuns.delete(effectiveRunId);
|
||||
}
|
||||
|
||||
showBtw(params: { question: string; text: string; isError?: boolean }) {
|
||||
if (this.btwMessage) {
|
||||
this.btwMessage.setResult(params);
|
||||
if (this.children[this.children.length - 1] !== this.btwMessage) {
|
||||
this.removeChild(this.btwMessage);
|
||||
this.append(this.btwMessage);
|
||||
}
|
||||
return this.btwMessage;
|
||||
}
|
||||
const component = new BtwInlineMessage(params);
|
||||
this.btwMessage = component;
|
||||
this.append(component);
|
||||
return component;
|
||||
}
|
||||
|
||||
dismissBtw() {
|
||||
if (!this.btwMessage) {
|
||||
return;
|
||||
}
|
||||
this.removeChild(this.btwMessage);
|
||||
this.btwMessage = null;
|
||||
}
|
||||
|
||||
hasVisibleBtw() {
|
||||
return this.btwMessage !== null;
|
||||
}
|
||||
|
||||
startTool(toolCallId: string, toolName: string, args: unknown) {
|
||||
const existing = this.toolById.get(toolCallId);
|
||||
if (existing) {
|
||||
|
||||
@@ -12,6 +12,7 @@ function createHarness(params?: {
|
||||
loadHistory?: LoadHistoryMock;
|
||||
setActivityStatus?: SetActivityStatusMock;
|
||||
isConnected?: boolean;
|
||||
activeChatRunId?: string | null;
|
||||
}) {
|
||||
const sendChat = params?.sendChat ?? vi.fn().mockResolvedValue({ runId: "r1" });
|
||||
const resetSession = params?.resetSession ?? vi.fn().mockResolvedValue({ ok: true });
|
||||
@@ -19,21 +20,23 @@ function createHarness(params?: {
|
||||
const addUser = vi.fn();
|
||||
const addSystem = vi.fn();
|
||||
const requestRender = vi.fn();
|
||||
const noteLocalRunId = vi.fn();
|
||||
const loadHistory =
|
||||
params?.loadHistory ?? (vi.fn().mockResolvedValue(undefined) as LoadHistoryMock);
|
||||
const setActivityStatus = params?.setActivityStatus ?? (vi.fn() as SetActivityStatusMock);
|
||||
const state = {
|
||||
currentSessionKey: "agent:main:main",
|
||||
activeChatRunId: params?.activeChatRunId ?? null,
|
||||
isConnected: params?.isConnected ?? true,
|
||||
sessionInfo: {},
|
||||
};
|
||||
|
||||
const { handleCommand } = createCommandHandlers({
|
||||
client: { sendChat, resetSession } as never,
|
||||
chatLog: { addUser, addSystem } as never,
|
||||
tui: { requestRender } as never,
|
||||
opts: {},
|
||||
state: {
|
||||
currentSessionKey: "agent:main:main",
|
||||
activeChatRunId: null,
|
||||
isConnected: params?.isConnected ?? true,
|
||||
sessionInfo: {},
|
||||
} as never,
|
||||
state: state as never,
|
||||
deliverDefault: false,
|
||||
openOverlay: vi.fn(),
|
||||
closeOverlay: vi.fn(),
|
||||
@@ -45,7 +48,7 @@ function createHarness(params?: {
|
||||
setActivityStatus,
|
||||
formatSessionKey: vi.fn(),
|
||||
applySessionInfoFromPatch: vi.fn(),
|
||||
noteLocalRunId: vi.fn(),
|
||||
noteLocalRunId,
|
||||
forgetLocalRunId: vi.fn(),
|
||||
requestExit: vi.fn(),
|
||||
});
|
||||
@@ -60,6 +63,8 @@ function createHarness(params?: {
|
||||
requestRender,
|
||||
loadHistory,
|
||||
setActivityStatus,
|
||||
noteLocalRunId,
|
||||
state,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -108,6 +113,27 @@ describe("tui command handlers", () => {
|
||||
expect(requestRender).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("sends /btw without hijacking the active main run", async () => {
|
||||
const setActivityStatus = vi.fn();
|
||||
const { handleCommand, sendChat, addUser, noteLocalRunId, state } = createHarness({
|
||||
activeChatRunId: "run-main",
|
||||
setActivityStatus,
|
||||
});
|
||||
|
||||
await handleCommand("/btw what changed?");
|
||||
|
||||
expect(addUser).not.toHaveBeenCalled();
|
||||
expect(noteLocalRunId).not.toHaveBeenCalled();
|
||||
expect(state.activeChatRunId).toBe("run-main");
|
||||
expect(setActivityStatus).not.toHaveBeenCalledWith("sending");
|
||||
expect(setActivityStatus).not.toHaveBeenCalledWith("waiting");
|
||||
expect(sendChat).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
message: "/btw what changed?",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("creates unique session for /new and resets shared session for /reset", async () => {
|
||||
const loadHistory = vi.fn().mockResolvedValue(undefined);
|
||||
const setSessionMock = vi.fn().mockResolvedValue(undefined) as SetSessionMock;
|
||||
|
||||
@@ -47,6 +47,10 @@ type CommandHandlerContext = {
|
||||
requestExit: () => void;
|
||||
};
|
||||
|
||||
function isBtwCommand(text: string): boolean {
|
||||
return /^\/btw(?::|\s|$)/i.test(text.trim());
|
||||
}
|
||||
|
||||
export function createCommandHandlers(context: CommandHandlerContext) {
|
||||
const {
|
||||
client,
|
||||
@@ -501,13 +505,15 @@ export function createCommandHandlers(context: CommandHandlerContext) {
|
||||
tui.requestRender();
|
||||
return;
|
||||
}
|
||||
const isBtw = isBtwCommand(text);
|
||||
try {
|
||||
chatLog.addUser(text);
|
||||
tui.requestRender();
|
||||
const runId = randomUUID();
|
||||
noteLocalRunId(runId);
|
||||
state.activeChatRunId = runId;
|
||||
setActivityStatus("sending");
|
||||
if (!isBtw) {
|
||||
chatLog.addUser(text);
|
||||
noteLocalRunId(runId);
|
||||
state.activeChatRunId = runId;
|
||||
setActivityStatus("sending");
|
||||
}
|
||||
tui.requestRender();
|
||||
await client.sendChat({
|
||||
sessionKey: state.currentSessionKey,
|
||||
@@ -517,15 +523,21 @@ export function createCommandHandlers(context: CommandHandlerContext) {
|
||||
timeoutMs: opts.timeoutMs,
|
||||
runId,
|
||||
});
|
||||
setActivityStatus("waiting");
|
||||
tui.requestRender();
|
||||
if (!isBtw) {
|
||||
setActivityStatus("waiting");
|
||||
tui.requestRender();
|
||||
}
|
||||
} catch (err) {
|
||||
if (state.activeChatRunId) {
|
||||
if (!isBtw && state.activeChatRunId) {
|
||||
forgetLocalRunId?.(state.activeChatRunId);
|
||||
}
|
||||
state.activeChatRunId = null;
|
||||
chatLog.addSystem(`send failed: ${String(err)}`);
|
||||
setActivityStatus("error");
|
||||
if (!isBtw) {
|
||||
state.activeChatRunId = null;
|
||||
}
|
||||
chatLog.addSystem(`${isBtw ? "btw failed" : "send failed"}: ${String(err)}`);
|
||||
if (!isBtw) {
|
||||
setActivityStatus("error");
|
||||
}
|
||||
tui.requestRender();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { createEventHandlers } from "./tui-event-handlers.js";
|
||||
import type { AgentEvent, ChatEvent, TuiStateAccess } from "./tui-types.js";
|
||||
import type { AgentEvent, BtwEvent, ChatEvent, TuiStateAccess } from "./tui-types.js";
|
||||
|
||||
type MockFn = ReturnType<typeof vi.fn>;
|
||||
type HandlerChatLog = {
|
||||
@@ -11,6 +11,10 @@ type HandlerChatLog = {
|
||||
finalizeAssistant: (...args: unknown[]) => void;
|
||||
dropAssistant: (...args: unknown[]) => void;
|
||||
};
|
||||
type HandlerBtwPresenter = {
|
||||
showResult: (...args: unknown[]) => void;
|
||||
clear: (...args: unknown[]) => void;
|
||||
};
|
||||
type HandlerTui = { requestRender: (...args: unknown[]) => void };
|
||||
type MockChatLog = {
|
||||
startTool: MockFn;
|
||||
@@ -20,6 +24,10 @@ type MockChatLog = {
|
||||
finalizeAssistant: MockFn;
|
||||
dropAssistant: MockFn;
|
||||
};
|
||||
type MockBtwPresenter = {
|
||||
showResult: MockFn;
|
||||
clear: MockFn;
|
||||
};
|
||||
type MockTui = { requestRender: MockFn };
|
||||
|
||||
function createMockChatLog(): MockChatLog & HandlerChatLog {
|
||||
@@ -33,6 +41,13 @@ function createMockChatLog(): MockChatLog & HandlerChatLog {
|
||||
} as unknown as MockChatLog & HandlerChatLog;
|
||||
}
|
||||
|
||||
function createMockBtwPresenter(): MockBtwPresenter & HandlerBtwPresenter {
|
||||
return {
|
||||
showResult: vi.fn(),
|
||||
clear: vi.fn(),
|
||||
} as unknown as MockBtwPresenter & HandlerBtwPresenter;
|
||||
}
|
||||
|
||||
describe("tui-event-handlers: handleAgentEvent", () => {
|
||||
const makeState = (overrides?: Partial<TuiStateAccess>): TuiStateAccess => ({
|
||||
agentDefaultId: "main",
|
||||
@@ -59,6 +74,7 @@ describe("tui-event-handlers: handleAgentEvent", () => {
|
||||
|
||||
const makeContext = (state: TuiStateAccess) => {
|
||||
const chatLog = createMockChatLog();
|
||||
const btw = createMockBtwPresenter();
|
||||
const tui = { requestRender: vi.fn() } as unknown as MockTui & HandlerTui;
|
||||
const setActivityStatus = vi.fn();
|
||||
const loadHistory = vi.fn();
|
||||
@@ -72,6 +88,7 @@ describe("tui-event-handlers: handleAgentEvent", () => {
|
||||
|
||||
return {
|
||||
chatLog,
|
||||
btw,
|
||||
tui,
|
||||
state,
|
||||
setActivityStatus,
|
||||
@@ -86,12 +103,14 @@ describe("tui-event-handlers: handleAgentEvent", () => {
|
||||
const createHandlersHarness = (params?: {
|
||||
state?: Partial<TuiStateAccess>;
|
||||
chatLog?: HandlerChatLog;
|
||||
btw?: HandlerBtwPresenter;
|
||||
}) => {
|
||||
const state = makeState(params?.state);
|
||||
const context = makeContext(state);
|
||||
const chatLog = (params?.chatLog ?? context.chatLog) as MockChatLog & HandlerChatLog;
|
||||
const handlers = createEventHandlers({
|
||||
chatLog,
|
||||
btw: (params?.btw ?? context.btw) as MockBtwPresenter & HandlerBtwPresenter,
|
||||
tui: context.tui,
|
||||
state,
|
||||
setActivityStatus: context.setActivityStatus,
|
||||
@@ -103,6 +122,7 @@ describe("tui-event-handlers: handleAgentEvent", () => {
|
||||
...context,
|
||||
state,
|
||||
chatLog,
|
||||
btw: (params?.btw ?? context.btw) as MockBtwPresenter & HandlerBtwPresenter,
|
||||
...handlers,
|
||||
};
|
||||
};
|
||||
@@ -212,6 +232,33 @@ describe("tui-event-handlers: handleAgentEvent", () => {
|
||||
expect(chatLog.updateAssistant).toHaveBeenCalledWith("hello", "run-alias");
|
||||
});
|
||||
|
||||
it("renders BTW results separately without disturbing the active run", () => {
|
||||
const { state, btw, setActivityStatus, loadHistory, tui, handleBtwEvent } =
|
||||
createHandlersHarness({
|
||||
state: { activeChatRunId: "run-main" },
|
||||
});
|
||||
|
||||
const evt: BtwEvent = {
|
||||
kind: "btw",
|
||||
runId: "run-btw",
|
||||
sessionKey: state.currentSessionKey,
|
||||
question: "what changed?",
|
||||
text: "nothing important",
|
||||
};
|
||||
|
||||
handleBtwEvent(evt);
|
||||
|
||||
expect(state.activeChatRunId).toBe("run-main");
|
||||
expect(btw.showResult).toHaveBeenCalledWith({
|
||||
question: "what changed?",
|
||||
text: "nothing important",
|
||||
isError: undefined,
|
||||
});
|
||||
expect(setActivityStatus).not.toHaveBeenCalled();
|
||||
expect(loadHistory).not.toHaveBeenCalled();
|
||||
expect(tui.requestRender).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("does not cross-match canonical session keys from different agents", () => {
|
||||
const { chatLog, handleChatEvent } = createHandlersHarness({
|
||||
state: {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { parseAgentSessionKey } from "../sessions/session-key-utils.js";
|
||||
import { asString, extractTextFromMessage, isCommandMessage } from "./tui-formatters.js";
|
||||
import { TuiStreamAssembler } from "./tui-stream-assembler.js";
|
||||
import type { AgentEvent, ChatEvent, TuiStateAccess } from "./tui-types.js";
|
||||
import type { AgentEvent, BtwEvent, ChatEvent, TuiStateAccess } from "./tui-types.js";
|
||||
|
||||
type EventHandlerChatLog = {
|
||||
startTool: (toolCallId: string, toolName: string, args: unknown) => void;
|
||||
@@ -20,8 +20,14 @@ type EventHandlerTui = {
|
||||
requestRender: () => void;
|
||||
};
|
||||
|
||||
type EventHandlerBtwPresenter = {
|
||||
showResult: (params: { question: string; text: string; isError?: boolean }) => void;
|
||||
clear: () => void;
|
||||
};
|
||||
|
||||
type EventHandlerContext = {
|
||||
chatLog: EventHandlerChatLog;
|
||||
btw: EventHandlerBtwPresenter;
|
||||
tui: EventHandlerTui;
|
||||
state: TuiStateAccess;
|
||||
setActivityStatus: (text: string) => void;
|
||||
@@ -35,6 +41,7 @@ type EventHandlerContext = {
|
||||
export function createEventHandlers(context: EventHandlerContext) {
|
||||
const {
|
||||
chatLog,
|
||||
btw,
|
||||
tui,
|
||||
state,
|
||||
setActivityStatus,
|
||||
@@ -81,6 +88,7 @@ export function createEventHandlers(context: EventHandlerContext) {
|
||||
sessionRuns.clear();
|
||||
streamAssembler = new TuiStreamAssembler();
|
||||
clearLocalRunIds?.();
|
||||
btw.clear();
|
||||
};
|
||||
|
||||
const noteSessionRun = (runId: string) => {
|
||||
@@ -335,5 +343,30 @@ export function createEventHandlers(context: EventHandlerContext) {
|
||||
}
|
||||
};
|
||||
|
||||
return { handleChatEvent, handleAgentEvent };
|
||||
const handleBtwEvent = (payload: unknown) => {
|
||||
if (!payload || typeof payload !== "object") {
|
||||
return;
|
||||
}
|
||||
const evt = payload as BtwEvent;
|
||||
syncSessionKey();
|
||||
if (!isSameSessionKey(evt.sessionKey, state.currentSessionKey)) {
|
||||
return;
|
||||
}
|
||||
if (evt.kind !== "btw") {
|
||||
return;
|
||||
}
|
||||
const question = evt.question.trim();
|
||||
const text = evt.text.trim();
|
||||
if (!question || !text) {
|
||||
return;
|
||||
}
|
||||
btw.showResult({
|
||||
question,
|
||||
text,
|
||||
isError: evt.isError,
|
||||
});
|
||||
tui.requestRender();
|
||||
};
|
||||
|
||||
return { handleChatEvent, handleAgentEvent, handleBtwEvent };
|
||||
}
|
||||
|
||||
@@ -4,6 +4,11 @@ import { createSessionActions } from "./tui-session-actions.js";
|
||||
import type { TuiStateAccess } from "./tui-types.js";
|
||||
|
||||
describe("tui session actions", () => {
|
||||
const createBtwPresenter = () => ({
|
||||
clear: vi.fn(),
|
||||
showResult: vi.fn(),
|
||||
});
|
||||
|
||||
it("queues session refreshes and applies the latest result", async () => {
|
||||
let resolveFirst: ((value: unknown) => void) | undefined;
|
||||
let resolveSecond: ((value: unknown) => void) | undefined;
|
||||
@@ -52,6 +57,7 @@ describe("tui session actions", () => {
|
||||
const { refreshSessionInfo } = createSessionActions({
|
||||
client: { listSessions } as unknown as GatewayChatClient,
|
||||
chatLog: { addSystem: vi.fn() } as unknown as import("./components/chat-log.js").ChatLog,
|
||||
btw: createBtwPresenter(),
|
||||
tui: { requestRender } as unknown as import("@mariozechner/pi-tui").TUI,
|
||||
opts: {},
|
||||
state,
|
||||
@@ -157,6 +163,7 @@ describe("tui session actions", () => {
|
||||
const { applySessionInfoFromPatch, refreshSessionInfo } = createSessionActions({
|
||||
client: { listSessions } as unknown as GatewayChatClient,
|
||||
chatLog: { addSystem: vi.fn() } as unknown as import("./components/chat-log.js").ChatLog,
|
||||
btw: createBtwPresenter(),
|
||||
tui: { requestRender: vi.fn() } as unknown as import("@mariozechner/pi-tui").TUI,
|
||||
opts: {},
|
||||
state,
|
||||
@@ -210,7 +217,15 @@ describe("tui session actions", () => {
|
||||
const loadHistory = vi.fn().mockResolvedValue({
|
||||
sessionId: "session-2",
|
||||
messages: [],
|
||||
sideResults: [
|
||||
{
|
||||
kind: "btw",
|
||||
question: "what changed?",
|
||||
text: "nothing important",
|
||||
},
|
||||
],
|
||||
});
|
||||
const btw = createBtwPresenter();
|
||||
|
||||
const state: TuiStateAccess = {
|
||||
agentDefaultId: "main",
|
||||
@@ -247,6 +262,7 @@ describe("tui session actions", () => {
|
||||
addSystem: vi.fn(),
|
||||
clearAll: vi.fn(),
|
||||
} as unknown as import("./components/chat-log.js").ChatLog,
|
||||
btw,
|
||||
tui: { requestRender: vi.fn() } as unknown as import("@mariozechner/pi-tui").TUI,
|
||||
opts: {},
|
||||
state,
|
||||
@@ -270,5 +286,7 @@ describe("tui session actions", () => {
|
||||
expect(state.sessionInfo.model).toBe("session-model");
|
||||
expect(state.sessionInfo.modelProvider).toBe("openai");
|
||||
expect(state.sessionInfo.updatedAt).toBe(50);
|
||||
expect(btw.clear).toHaveBeenCalled();
|
||||
expect(btw.showResult).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -10,9 +10,14 @@ import type { GatewayAgentsList, GatewayChatClient } from "./gateway-chat.js";
|
||||
import { asString, extractTextFromMessage, isCommandMessage } from "./tui-formatters.js";
|
||||
import type { SessionInfo, TuiOptions, TuiStateAccess } from "./tui-types.js";
|
||||
|
||||
type SessionActionBtwPresenter = {
|
||||
clear: () => void;
|
||||
};
|
||||
|
||||
type SessionActionContext = {
|
||||
client: GatewayChatClient;
|
||||
chatLog: ChatLog;
|
||||
btw: SessionActionBtwPresenter;
|
||||
tui: TUI;
|
||||
opts: TuiOptions;
|
||||
state: TuiStateAccess;
|
||||
@@ -42,6 +47,7 @@ export function createSessionActions(context: SessionActionContext) {
|
||||
const {
|
||||
client,
|
||||
chatLog,
|
||||
btw,
|
||||
tui,
|
||||
opts,
|
||||
state,
|
||||
@@ -298,6 +304,7 @@ export function createSessionActions(context: SessionActionContext) {
|
||||
state.sessionInfo.verboseLevel = record.verboseLevel ?? state.sessionInfo.verboseLevel;
|
||||
const showTools = (state.sessionInfo.verboseLevel ?? "off") !== "off";
|
||||
chatLog.clearAll();
|
||||
btw.clear();
|
||||
chatLog.addSystem(`session ${state.currentSessionKey}`);
|
||||
for (const entry of record.messages ?? []) {
|
||||
if (!entry || typeof entry !== "object") {
|
||||
@@ -367,6 +374,7 @@ export function createSessionActions(context: SessionActionContext) {
|
||||
state.sessionInfo.updatedAt = null;
|
||||
state.historyLoaded = false;
|
||||
clearLocalRunIds?.();
|
||||
btw.clear();
|
||||
updateHeader();
|
||||
updateFooter();
|
||||
await loadHistory();
|
||||
|
||||
@@ -18,6 +18,17 @@ export type ChatEvent = {
|
||||
errorMessage?: string;
|
||||
};
|
||||
|
||||
export type BtwEvent = {
|
||||
kind: "btw";
|
||||
runId?: string;
|
||||
sessionKey?: string;
|
||||
question: string;
|
||||
text: string;
|
||||
isError?: boolean;
|
||||
seq?: number;
|
||||
ts?: number;
|
||||
};
|
||||
|
||||
export type AgentEvent = {
|
||||
runId: string;
|
||||
stream: string;
|
||||
|
||||
@@ -771,6 +771,14 @@ export async function runTui(opts: TuiOptions) {
|
||||
};
|
||||
|
||||
const { openOverlay, closeOverlay } = createOverlayHandlers(tui, editor);
|
||||
const btw = {
|
||||
showResult: (params: { question: string; text: string; isError?: boolean }) => {
|
||||
chatLog.showBtw(params);
|
||||
},
|
||||
clear: () => {
|
||||
chatLog.dismissBtw();
|
||||
},
|
||||
};
|
||||
|
||||
const initialSessionAgentId = (() => {
|
||||
if (!initialSessionInput) {
|
||||
@@ -783,6 +791,7 @@ export async function runTui(opts: TuiOptions) {
|
||||
const sessionActions = createSessionActions({
|
||||
client,
|
||||
chatLog,
|
||||
btw,
|
||||
tui,
|
||||
opts,
|
||||
state,
|
||||
@@ -805,8 +814,9 @@ export async function runTui(opts: TuiOptions) {
|
||||
abortActive,
|
||||
} = sessionActions;
|
||||
|
||||
const { handleChatEvent, handleAgentEvent } = createEventHandlers({
|
||||
const { handleChatEvent, handleAgentEvent, handleBtwEvent } = createEventHandlers({
|
||||
chatLog,
|
||||
btw,
|
||||
tui,
|
||||
state,
|
||||
setActivityStatus,
|
||||
@@ -869,6 +879,11 @@ export async function runTui(opts: TuiOptions) {
|
||||
});
|
||||
|
||||
editor.onEscape = () => {
|
||||
if (chatLog.hasVisibleBtw()) {
|
||||
chatLog.dismissBtw();
|
||||
tui.requestRender();
|
||||
return;
|
||||
}
|
||||
void abortActive();
|
||||
};
|
||||
const handleCtrlC = () => {
|
||||
@@ -918,10 +933,28 @@ export async function runTui(opts: TuiOptions) {
|
||||
void loadHistory();
|
||||
};
|
||||
|
||||
tui.addInputListener((data) => {
|
||||
if (!chatLog.hasVisibleBtw()) {
|
||||
return undefined;
|
||||
}
|
||||
if (editor.getText().length > 0) {
|
||||
return undefined;
|
||||
}
|
||||
if (matchesKey(data, "enter")) {
|
||||
chatLog.dismissBtw();
|
||||
tui.requestRender();
|
||||
return { consume: true };
|
||||
}
|
||||
return undefined;
|
||||
});
|
||||
|
||||
client.onEvent = (evt) => {
|
||||
if (evt.event === "chat") {
|
||||
handleChatEvent(evt.payload);
|
||||
}
|
||||
if (evt.event === "chat.side_result") {
|
||||
handleBtwEvent(evt.payload);
|
||||
}
|
||||
if (evt.event === "agent") {
|
||||
handleAgentEvent(evt.payload);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user