Files
openclaw/extensions/telegram/src/bot-message-dispatch.test.ts
Josh Lehman 24ffd4d2f8 Telegram: keep verbose tool results separate from final answers (#80294)
* Telegram: keep tool progress separate from final answers

* Telegram: separate verbose tool results from final answers

* fix: separate Telegram verbose tool drafts
2026-05-11 15:21:19 -07:00

1938 lines
73 KiB
TypeScript

import type { Bot } from "grammy";
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { resolveAutoTopicLabelConfig as resolveAutoTopicLabelConfigRuntime } from "./auto-topic-label-config.js";
import type { TelegramBotDeps } from "./bot-deps.js";
import {
createSequencedTestDraftStream,
createTestDraftStream,
} from "./draft-stream.test-helpers.js";
type DispatchReplyWithBufferedBlockDispatcherArgs = Parameters<
TelegramBotDeps["dispatchReplyWithBufferedBlockDispatcher"]
>[0];
const createTelegramDraftStream = vi.hoisted(() => vi.fn());
const dispatchReplyWithBufferedBlockDispatcher = vi.hoisted(() =>
vi.fn<(params: DispatchReplyWithBufferedBlockDispatcherArgs) => Promise<unknown>>(),
);
const deliverReplies = vi.hoisted(() => vi.fn());
const deliverInboundReplyWithMessageSendContext = vi.hoisted(() => vi.fn());
const emitInternalMessageSentHook = vi.hoisted(() => vi.fn());
const createForumTopicTelegram = vi.hoisted(() => vi.fn());
const deleteMessageTelegram = vi.hoisted(() => vi.fn());
const editForumTopicTelegram = vi.hoisted(() => vi.fn());
const editMessageTelegram = vi.hoisted(() => vi.fn());
const reactMessageTelegram = vi.hoisted(() => vi.fn());
const sendMessageTelegram = vi.hoisted(() => vi.fn());
const sendPollTelegram = vi.hoisted(() => vi.fn());
const sendStickerTelegram = vi.hoisted(() => vi.fn());
const loadConfig = vi.hoisted(() => vi.fn(() => ({})));
const readChannelAllowFromStore = vi.hoisted(() => vi.fn(async () => []));
const upsertChannelPairingRequest = vi.hoisted(() =>
vi.fn(async () => ({
code: "PAIRCODE",
created: true,
})),
);
const enqueueSystemEvent = vi.hoisted(() => vi.fn());
const buildModelsProviderData = vi.hoisted(() =>
vi.fn(async () => ({
byProvider: new Map<string, Set<string>>(),
providers: [],
resolvedDefault: { provider: "openai", model: "gpt-test" },
modelNames: new Map<string, string>(),
})),
);
const listSkillCommandsForAgents = vi.hoisted(() => vi.fn(() => []));
const createChannelMessageReplyPipeline = vi.hoisted(() =>
vi.fn(() => ({
responsePrefix: undefined,
responsePrefixContextProvider: () => ({ identityName: undefined }),
onModelSelected: () => undefined,
})),
);
const wasSentByBot = vi.hoisted(() => vi.fn(() => false));
const appendSessionTranscriptMessage = vi.hoisted(() => vi.fn(async () => ({ messageId: "m1" })));
const emitSessionTranscriptUpdate = vi.hoisted(() => vi.fn());
const loadSessionStore = vi.hoisted(() => vi.fn());
const resolveStorePath = vi.hoisted(() => vi.fn(() => "/tmp/sessions.json"));
const resolveAndPersistSessionFile = vi.hoisted(() =>
vi.fn(async () => ({
sessionFile: "/tmp/session.jsonl",
sessionEntry: { sessionId: "s1", sessionFile: "/tmp/session.jsonl" },
})),
);
const generateTopicLabel = vi.hoisted(() => vi.fn());
const describeStickerImage = vi.hoisted(() => vi.fn(async () => null));
const loadModelCatalog = vi.hoisted(() => vi.fn(async () => ({})));
const findModelInCatalog = vi.hoisted(() => vi.fn(() => null));
const modelSupportsVision = vi.hoisted(() => vi.fn(() => false));
const resolveAgentDir = vi.hoisted(() => vi.fn(() => "/tmp/agent"));
const resolveDefaultModelForAgent = vi.hoisted(() =>
vi.fn(() => ({ provider: "openai", model: "gpt-test" })),
);
const getAgentScopedMediaLocalRoots = vi.hoisted(() =>
vi.fn((_cfg: unknown, agentId: string) => [`/tmp/.openclaw/workspace-${agentId}`]),
);
const resolveChunkMode = vi.hoisted(() => vi.fn(() => undefined));
const resolveMarkdownTableMode = vi.hoisted(() => vi.fn(() => "preserve"));
const resolveSessionStoreEntry = vi.hoisted(() =>
vi.fn(({ store, sessionKey }: { store: Record<string, unknown>; sessionKey: string }) => ({
existing: store[sessionKey],
})),
);
vi.mock("./draft-stream.js", () => ({
createTelegramDraftStream,
}));
vi.mock("openclaw/plugin-sdk/channel-message", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/channel-message")>();
return {
...actual,
deliverInboundReplyWithMessageSendContext,
};
});
vi.mock("openclaw/plugin-sdk/agent-harness-runtime", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/agent-harness-runtime")>();
return {
...actual,
appendSessionTranscriptMessage,
emitSessionTranscriptUpdate,
};
});
vi.mock("./bot/delivery.js", () => ({
deliverReplies,
emitInternalMessageSentHook,
}));
vi.mock("./bot/delivery.replies.js", () => ({
deliverReplies,
emitInternalMessageSentHook,
}));
vi.mock("./send.js", () => ({
createForumTopicTelegram,
deleteMessageTelegram,
editForumTopicTelegram,
editMessageTelegram,
reactMessageTelegram,
sendMessageTelegram,
sendPollTelegram,
sendStickerTelegram,
}));
vi.mock("./bot-message-dispatch.runtime.js", () => ({
generateTopicLabel,
getAgentScopedMediaLocalRoots,
loadSessionStore,
resolveAndPersistSessionFile,
resolveAutoTopicLabelConfig: resolveAutoTopicLabelConfigRuntime,
resolveChunkMode,
resolveMarkdownTableMode,
resolveSessionStoreEntry,
resolveStorePath,
}));
vi.mock("./bot-message-dispatch.agent.runtime.js", () => ({
findModelInCatalog,
loadModelCatalog,
modelSupportsVision,
resolveAgentDir,
resolveDefaultModelForAgent,
}));
vi.mock("./sticker-cache.js", () => ({
cacheSticker: vi.fn(),
getCachedSticker: () => null,
getCacheStats: () => ({ count: 0 }),
searchStickers: () => [],
getAllCachedStickers: () => [],
describeStickerImage,
}));
let dispatchTelegramMessage: typeof import("./bot-message-dispatch.js").dispatchTelegramMessage;
let resetTelegramReplyFenceForTests: typeof import("./bot-message-dispatch.js").resetTelegramReplyFenceForTests;
const telegramDepsForTest: TelegramBotDeps = {
getRuntimeConfig: loadConfig as TelegramBotDeps["getRuntimeConfig"],
resolveStorePath: resolveStorePath as TelegramBotDeps["resolveStorePath"],
loadSessionStore: loadSessionStore as TelegramBotDeps["loadSessionStore"],
readChannelAllowFromStore:
readChannelAllowFromStore as TelegramBotDeps["readChannelAllowFromStore"],
upsertChannelPairingRequest:
upsertChannelPairingRequest as TelegramBotDeps["upsertChannelPairingRequest"],
enqueueSystemEvent: enqueueSystemEvent as TelegramBotDeps["enqueueSystemEvent"],
dispatchReplyWithBufferedBlockDispatcher:
dispatchReplyWithBufferedBlockDispatcher as TelegramBotDeps["dispatchReplyWithBufferedBlockDispatcher"],
buildModelsProviderData: buildModelsProviderData as TelegramBotDeps["buildModelsProviderData"],
listSkillCommandsForAgents:
listSkillCommandsForAgents as TelegramBotDeps["listSkillCommandsForAgents"],
createChannelMessageReplyPipeline:
createChannelMessageReplyPipeline as TelegramBotDeps["createChannelMessageReplyPipeline"],
wasSentByBot: wasSentByBot as TelegramBotDeps["wasSentByBot"],
createTelegramDraftStream:
createTelegramDraftStream as TelegramBotDeps["createTelegramDraftStream"],
deliverReplies: deliverReplies as TelegramBotDeps["deliverReplies"],
deliverInboundReplyWithMessageSendContext:
deliverInboundReplyWithMessageSendContext as TelegramBotDeps["deliverInboundReplyWithMessageSendContext"],
emitInternalMessageSentHook:
emitInternalMessageSentHook as TelegramBotDeps["emitInternalMessageSentHook"],
editMessageTelegram: editMessageTelegram as TelegramBotDeps["editMessageTelegram"],
};
describe("dispatchTelegramMessage draft streaming", () => {
type TelegramMessageContext = Parameters<typeof dispatchTelegramMessage>[0]["context"];
beforeAll(async () => {
({ dispatchTelegramMessage, resetTelegramReplyFenceForTests } =
await import("./bot-message-dispatch.js"));
});
beforeEach(() => {
resetTelegramReplyFenceForTests();
createTelegramDraftStream.mockReset();
dispatchReplyWithBufferedBlockDispatcher.mockReset();
deliverReplies.mockReset();
deliverInboundReplyWithMessageSendContext.mockReset();
emitInternalMessageSentHook.mockReset();
createForumTopicTelegram.mockReset();
deleteMessageTelegram.mockReset();
editForumTopicTelegram.mockReset();
editMessageTelegram.mockReset();
reactMessageTelegram.mockReset();
sendMessageTelegram.mockReset();
sendPollTelegram.mockReset();
sendStickerTelegram.mockReset();
loadConfig.mockReset();
readChannelAllowFromStore.mockReset();
upsertChannelPairingRequest.mockReset();
enqueueSystemEvent.mockReset();
buildModelsProviderData.mockReset();
listSkillCommandsForAgents.mockReset();
createChannelMessageReplyPipeline.mockReset();
wasSentByBot.mockReset();
appendSessionTranscriptMessage.mockReset();
emitSessionTranscriptUpdate.mockReset();
loadSessionStore.mockReset();
resolveStorePath.mockReset();
resolveAndPersistSessionFile.mockReset();
generateTopicLabel.mockReset();
getAgentScopedMediaLocalRoots.mockClear();
resolveChunkMode.mockClear();
resolveMarkdownTableMode.mockClear();
resolveSessionStoreEntry.mockClear();
describeStickerImage.mockReset();
loadModelCatalog.mockReset();
findModelInCatalog.mockReset();
modelSupportsVision.mockReset();
resolveAgentDir.mockReset();
resolveDefaultModelForAgent.mockReset();
loadConfig.mockReturnValue({});
dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({
queuedFinal: false,
counts: { block: 0, final: 0, tool: 0 },
});
deliverReplies.mockResolvedValue({ delivered: true });
deliverInboundReplyWithMessageSendContext.mockResolvedValue({
status: "unsupported",
reason: "missing_outbound_handler",
});
emitInternalMessageSentHook.mockResolvedValue(undefined);
createForumTopicTelegram.mockResolvedValue({ message_thread_id: 777 });
deleteMessageTelegram.mockResolvedValue(true);
editForumTopicTelegram.mockResolvedValue(true);
editMessageTelegram.mockResolvedValue({ ok: true });
reactMessageTelegram.mockResolvedValue(true);
sendMessageTelegram.mockResolvedValue({ message_id: 1001 });
sendPollTelegram.mockResolvedValue({ message_id: 1001 });
sendStickerTelegram.mockResolvedValue({ message_id: 1001 });
readChannelAllowFromStore.mockResolvedValue([]);
upsertChannelPairingRequest.mockResolvedValue({
code: "PAIRCODE",
created: true,
});
enqueueSystemEvent.mockResolvedValue(undefined);
buildModelsProviderData.mockResolvedValue({
byProvider: new Map<string, Set<string>>(),
providers: [],
resolvedDefault: { provider: "openai", model: "gpt-test" },
modelNames: new Map<string, string>(),
});
listSkillCommandsForAgents.mockReturnValue([]);
createChannelMessageReplyPipeline.mockReturnValue({
responsePrefix: undefined,
responsePrefixContextProvider: () => ({ identityName: undefined }),
onModelSelected: () => undefined,
});
wasSentByBot.mockReturnValue(false);
resolveStorePath.mockReturnValue("/tmp/sessions.json");
resolveAndPersistSessionFile.mockResolvedValue({
sessionFile: "/tmp/session.jsonl",
sessionEntry: { sessionId: "s1", sessionFile: "/tmp/session.jsonl" },
});
loadSessionStore.mockReturnValue({});
generateTopicLabel.mockResolvedValue("Topic label");
describeStickerImage.mockResolvedValue(null);
loadModelCatalog.mockResolvedValue({});
findModelInCatalog.mockReturnValue(null);
modelSupportsVision.mockReturnValue(false);
resolveAgentDir.mockReturnValue("/tmp/agent");
resolveDefaultModelForAgent.mockReturnValue({
provider: "openai",
model: "gpt-test",
});
});
const createDraftStream = (messageId?: number) => createTestDraftStream({ messageId });
const createSequencedDraftStream = (startMessageId = 1001) =>
createSequencedTestDraftStream(startMessageId);
function setupDraftStreams(params?: { answerMessageId?: number; reasoningMessageId?: number }) {
const answerDraftStream = createDraftStream(params?.answerMessageId);
const reasoningDraftStream = createDraftStream(params?.reasoningMessageId);
createTelegramDraftStream
.mockImplementationOnce(() => answerDraftStream)
.mockImplementationOnce(() => reasoningDraftStream);
return { answerDraftStream, reasoningDraftStream };
}
function expectRecordFields(record: unknown, expected: Record<string, unknown>) {
if (!record || typeof record !== "object") {
throw new Error("Expected record");
}
const actual = record as Record<string, unknown>;
for (const [key, value] of Object.entries(expected)) {
expect(actual[key]).toEqual(value);
}
return actual;
}
function mockCallArg(mock: ReturnType<typeof vi.fn>, callIndex = 0, argIndex = 0) {
const call = mock.mock.calls[callIndex];
if (!call) {
throw new Error(`Expected mock call ${callIndex}`);
}
return call[argIndex];
}
function expectDraftStreamParams(expected: Record<string, unknown>) {
return expectRecordFields(mockCallArg(createTelegramDraftStream), expected);
}
function expectDeliverRepliesParams(expected: Record<string, unknown>, callIndex = 0) {
return expectRecordFields(mockCallArg(deliverReplies, callIndex), expected);
}
function expectDeliveredReply(index: number, expected: Record<string, unknown>, callIndex = 0) {
const params = expectDeliverRepliesParams({}, callIndex);
const replies = params.replies as Array<unknown> | undefined;
if (!Array.isArray(replies)) {
throw new Error("Expected delivered replies array");
}
return expectRecordFields(replies[index], expected);
}
function expectDispatchParams(expected: Record<string, unknown>) {
return expectRecordFields(mockCallArg(dispatchReplyWithBufferedBlockDispatcher), expected);
}
function createContext(overrides?: Partial<TelegramMessageContext>): TelegramMessageContext {
const base = {
ctxPayload: {},
primaryCtx: { message: { chat: { id: 123, type: "private" } } },
msg: {
chat: { id: 123, type: "private" },
message_id: 456,
message_thread_id: 777,
},
chatId: 123,
isGroup: false,
groupConfig: undefined,
resolvedThreadId: undefined,
replyThreadId: 777,
threadSpec: { id: 777, scope: "dm" },
historyKey: undefined,
historyLimit: 0,
groupHistories: new Map(),
route: { agentId: "default", accountId: "default" },
skillFilter: undefined,
sendTyping: vi.fn(),
sendRecordVoice: vi.fn(),
ackReactionPromise: null,
reactionApi: null,
removeAckAfterReply: false,
} as unknown as TelegramMessageContext;
base.turn = {
storePath: "/tmp/openclaw/telegram-sessions.json",
recordInboundSession: vi.fn(async () => undefined),
record: {
onRecordError: vi.fn(),
},
} as unknown as TelegramMessageContext["turn"];
return {
...base,
...overrides,
// Merge nested fields when overrides provide partial objects.
primaryCtx: {
...(base.primaryCtx as object),
...(overrides?.primaryCtx ? (overrides.primaryCtx as object) : null),
} as TelegramMessageContext["primaryCtx"],
msg: {
...(base.msg as object),
...(overrides?.msg ? (overrides.msg as object) : null),
} as TelegramMessageContext["msg"],
route: {
...(base.route as object),
...(overrides?.route ? (overrides.route as object) : null),
} as TelegramMessageContext["route"],
};
}
function createStatusReactionController() {
return {
setQueued: vi.fn(),
setThinking: vi.fn(async () => {}),
setTool: vi.fn(async () => {}),
setCompacting: vi.fn(async () => {}),
cancelPending: vi.fn(),
setError: vi.fn(async () => {}),
setDone: vi.fn(async () => {}),
restoreInitial: vi.fn(async () => {}),
};
}
function createDirectSessionPayload(): TelegramMessageContext["ctxPayload"] {
return {
SessionKey: "agent:test:telegram:direct:123",
ChatType: "direct",
} as TelegramMessageContext["ctxPayload"];
}
function observeDeliveredReply(text: string): Promise<void> {
return new Promise((resolve) => {
deliverReplies.mockImplementation(async (params: { replies?: Array<{ text?: string }> }) => {
if (params.replies?.some((reply) => reply.text === text)) {
resolve();
}
return { delivered: true };
});
});
}
function createBot(): Bot {
return {
api: {
sendMessage: vi.fn(async (_chatId, _text, params) => ({
message_id:
typeof params?.message_thread_id === "number" ? params.message_thread_id : 1001,
})),
editMessageText: vi.fn(async () => ({ message_id: 1001 })),
deleteMessage: vi.fn().mockResolvedValue(true),
editForumTopic: vi.fn().mockResolvedValue(true),
},
} as unknown as Bot;
}
function createRuntime(): Parameters<typeof dispatchTelegramMessage>[0]["runtime"] {
return {
log: vi.fn(),
error: vi.fn(),
exit: () => {
throw new Error("exit");
},
};
}
async function dispatchWithContext(params: {
context: TelegramMessageContext;
cfg?: Parameters<typeof dispatchTelegramMessage>[0]["cfg"];
telegramCfg?: Parameters<typeof dispatchTelegramMessage>[0]["telegramCfg"];
streamMode?: Parameters<typeof dispatchTelegramMessage>[0]["streamMode"];
telegramDeps?: TelegramBotDeps;
bot?: Bot;
replyToMode?: Parameters<typeof dispatchTelegramMessage>[0]["replyToMode"];
textLimit?: number;
}) {
const bot = params.bot ?? createBot();
await dispatchTelegramMessage({
context: params.context,
bot,
cfg: params.cfg ?? {},
runtime: createRuntime(),
replyToMode: params.replyToMode ?? "first",
streamMode: params.streamMode ?? "partial",
textLimit: params.textLimit ?? 4096,
telegramCfg: params.telegramCfg ?? {},
telegramDeps: params.telegramDeps ?? telegramDepsForTest,
opts: { token: "token" },
});
}
function createReasoningStreamContext(): TelegramMessageContext {
loadSessionStore.mockReturnValue({
s1: { reasoningLevel: "stream" },
});
return createContext({
ctxPayload: { SessionKey: "s1" } as unknown as TelegramMessageContext["ctxPayload"],
});
}
function createReasoningDefaultContext(): TelegramMessageContext {
loadSessionStore.mockReturnValue({
s1: {},
});
return createContext({
ctxPayload: { SessionKey: "s1" } as unknown as TelegramMessageContext["ctxPayload"],
route: { agentId: "ops" } as unknown as TelegramMessageContext["route"],
});
}
it("streams drafts in private threads and forwards thread id", async () => {
const draftStream = createDraftStream();
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Hello" });
await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
const context = createContext({
route: {
agentId: "work",
} as unknown as TelegramMessageContext["route"],
});
await dispatchWithContext({ context });
expectDraftStreamParams({
chatId: 123,
thread: { id: 777, scope: "dm" },
minInitialChars: 30,
});
expect(draftStream.update).toHaveBeenCalledWith("Hello");
const delivery = expectDeliverRepliesParams({ thread: { id: 777, scope: "dm" } });
const mediaLocalRoots = delivery.mediaLocalRoots as string[] | undefined;
expect(mediaLocalRoots?.some((root) => /[\\/]\.openclaw[\\/]workspace-work$/u.test(root))).toBe(
true,
);
const dispatchParams = expectDispatchParams({});
expect(
typeof (dispatchParams.dispatcherOptions as { beforeDeliver?: unknown }).beforeDeliver,
).toBe("function");
expectRecordFields(dispatchParams.replyOptions, { disableBlockStreaming: true });
expect(editMessageTelegram).not.toHaveBeenCalled();
expect(draftStream.clear).toHaveBeenCalledTimes(1);
});
it("keeps retained overflow draft previews", async () => {
const draftStream = createDraftStream();
const bot = createBot();
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Hello" });
await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({ context: createContext(), bot });
const streamParams = createTelegramDraftStream.mock.calls[0]?.[0] as Parameters<
NonNullable<TelegramBotDeps["createTelegramDraftStream"]>
>[0];
streamParams.onSupersededPreview?.({
messageId: 17,
textSnapshot: "first page",
retain: true,
});
expect(bot.api.deleteMessage).not.toHaveBeenCalled();
streamParams.onSupersededPreview?.({
messageId: 18,
textSnapshot: "stale page",
});
await vi.waitFor(() => expect(bot.api.deleteMessage).toHaveBeenCalledWith(123, 18));
});
it("queues final Telegram replies through outbound delivery when available", async () => {
deliverInboundReplyWithMessageSendContext.mockResolvedValue({
status: "handled_visible",
delivery: {
messageIds: ["1001"],
visibleReplySent: true,
},
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Hello queued" }, { kind: "final" });
return { queuedFinal: true };
});
await dispatchWithContext({
context: createContext({
ctxPayload: {
SessionKey: "s1",
ChatType: "direct",
SenderId: "42",
SenderName: "Alice",
SenderUsername: "alice",
} as unknown as TelegramMessageContext["ctxPayload"],
}),
streamMode: "off",
telegramDeps: telegramDepsForTest,
});
const outbound = expectRecordFields(mockCallArg(deliverInboundReplyWithMessageSendContext), {
channel: "telegram",
to: "123",
accountId: "default",
info: { kind: "final" },
replyToMode: "first",
threadId: 777,
agentId: "default",
});
expectRecordFields(outbound.payload, { text: "Hello queued" });
expectRecordFields(outbound.formatting, { textLimit: 4096, tableMode: "preserve" });
expectRecordFields(outbound.ctxPayload, {
SessionKey: "s1",
ChatType: "direct",
SenderId: "42",
SenderName: "Alice",
SenderUsername: "alice",
});
expect(deliverReplies).not.toHaveBeenCalled();
});
it("queues media-only final Telegram replies through outbound delivery when available", async () => {
deliverInboundReplyWithMessageSendContext.mockResolvedValue({
status: "handled_visible",
delivery: {
messageIds: ["1002"],
visibleReplySent: true,
},
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ mediaUrl: "file:///tmp/final.png" }, { kind: "final" });
return { queuedFinal: true };
});
await dispatchWithContext({
context: createContext(),
streamMode: "off",
telegramDeps: telegramDepsForTest,
});
const outbound = expectRecordFields(mockCallArg(deliverInboundReplyWithMessageSendContext), {
channel: "telegram",
info: { kind: "final" },
});
expectRecordFields(outbound.payload, { mediaUrl: "file:///tmp/final.png" });
expectRecordFields(outbound.requiredCapabilities, { media: true, payload: true });
expect(deliverReplies).not.toHaveBeenCalled();
});
it("skips answer draft stream for same-chat selected quotes", async () => {
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Hello", replyToId: "1001" }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext({
msg: {
message_id: 1001,
} as unknown as TelegramMessageContext["msg"],
ctxPayload: {
MessageSid: "1001",
ReplyToId: "9001",
ReplyToBody: "quoted slice",
ReplyToQuoteText: " quoted slice\n",
ReplyToIsQuote: true,
} as unknown as TelegramMessageContext["ctxPayload"],
}),
});
expect(createTelegramDraftStream).not.toHaveBeenCalled();
const delivery = expectDeliverRepliesParams({
replyQuoteMessageId: 9001,
replyQuoteText: " quoted slice\n",
});
expectRecordFields((delivery.replies as Array<unknown>)[0], { replyToId: "9001" });
});
it("keeps answer draft stream for current message replies with native quote candidates", async () => {
const draftStream = createDraftStream();
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Hello", replyToId: "1001" }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext({
msg: {
message_id: 1001,
text: "Original current message",
entities: [{ type: "bold", offset: 0, length: 8 }],
} as unknown as TelegramMessageContext["msg"],
ctxPayload: {
MessageSid: "1001",
} as unknown as TelegramMessageContext["ctxPayload"],
}),
});
expectDraftStreamParams({ replyToMessageId: 1001 });
const delivery = expectDeliverRepliesParams({
replyQuoteByMessageId: {
"1001": {
text: "Original current message",
position: 0,
entities: [{ type: "bold", offset: 0, length: 8 }],
},
},
});
expectRecordFields((delivery.replies as Array<unknown>)[0], { replyToId: "1001" });
});
it("passes native quote candidates for explicit reply targets", async () => {
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Hello", replyToId: "9001" }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext({
ctxPayload: {
ReplyToId: "9001",
ReplyToBody: "trimmed body",
ReplyToQuoteSourceText: " exact reply body",
ReplyToQuoteSourceEntities: [{ type: "italic", offset: 2, length: 5 }],
} as unknown as TelegramMessageContext["ctxPayload"],
}),
});
const delivery = expectDeliverRepliesParams({
replyQuoteByMessageId: {
"9001": {
text: " exact reply body",
position: 0,
entities: [{ type: "italic", offset: 2, length: 5 }],
},
},
});
expectRecordFields((delivery.replies as Array<unknown>)[0], { replyToId: "9001" });
});
it("does not build native quote candidates when reply mode is off", async () => {
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Hello", replyToId: "1001" }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext({
msg: {
message_id: 1001,
text: "Original current message",
} as unknown as TelegramMessageContext["msg"],
ctxPayload: {
MessageSid: "1001",
} as unknown as TelegramMessageContext["ctxPayload"],
}),
replyToMode: "off",
});
expect(deliverReplies.mock.calls[0]?.[0]).not.toHaveProperty("replyQuoteByMessageId.1001");
});
it("keeps answer draft stream for selected quotes when reply mode is off", async () => {
const draftStream = createDraftStream();
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({ queuedFinal: true });
await dispatchWithContext({
context: createContext({
msg: {
message_id: 1001,
} as unknown as TelegramMessageContext["msg"],
ctxPayload: {
MessageSid: "1001",
ReplyToId: "9001",
ReplyToBody: "quoted slice",
ReplyToQuoteText: " quoted slice\n",
ReplyToIsQuote: true,
} as unknown as TelegramMessageContext["ctxPayload"],
}),
replyToMode: "off",
});
expectDraftStreamParams({ replyToMessageId: undefined });
});
it("passes same-chat quoted reply target id with Telegram quote text", async () => {
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Hello", replyToId: "1001" }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext({
ctxPayload: {
MessageSid: "1001",
ReplyToId: "9001",
ReplyToBody: "quoted slice",
ReplyToQuoteText: " quoted slice\n",
ReplyToIsQuote: true,
ReplyToQuotePosition: 12,
ReplyToQuoteEntities: [{ type: "italic", offset: 0, length: 6 }],
} as unknown as TelegramMessageContext["ctxPayload"],
}),
streamMode: "off",
});
const delivery = expectDeliverRepliesParams({
replyQuoteMessageId: 9001,
replyQuoteText: " quoted slice\n",
replyQuotePosition: 12,
replyQuoteEntities: [{ type: "italic", offset: 0, length: 6 }],
});
expectRecordFields((delivery.replies as Array<unknown>)[0], { replyToId: "9001" });
});
it("does not pass a native quote target for external replies", async () => {
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Hello", replyToId: "1001" }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext({
ctxPayload: {
MessageSid: "1001",
ReplyToId: "9001",
ReplyToBody: "external quoted slice",
ReplyToQuoteText: " external quoted slice\n",
ReplyToIsQuote: true,
ReplyToIsExternal: true,
} as unknown as TelegramMessageContext["ctxPayload"],
}),
streamMode: "off",
});
const params = expectDeliverRepliesParams({ replyQuoteText: " external quoted slice\n" });
expectRecordFields((params.replies as Array<unknown>)[0], { replyToId: "1001" });
expect(params?.replyQuoteMessageId).toBeUndefined();
});
it("does not inject approval buttons in local dispatch once the monitor owns approvals", async () => {
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver(
{
text: "Mode: foreground\nRun: /approve 117ba06d allow-once (or allow-always / deny).",
},
{ kind: "final" },
);
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext(),
streamMode: "off",
cfg: {
channels: {
telegram: {
execApprovals: {
enabled: true,
approvers: ["123"],
target: "dm",
},
},
},
},
});
const deliveredPayload = expectDeliveredReply(0, {
text: "Mode: foreground\nRun: /approve 117ba06d allow-once (or allow-always / deny).",
}) as { channelData?: unknown };
expect(deliveredPayload.channelData).toBeUndefined();
});
it("uses 30-char stream debounce for legacy block stream mode", async () => {
const draftStream = createDraftStream();
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Hello" });
await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({ context: createContext(), streamMode: "block" });
expectDraftStreamParams({ minInitialChars: 30 });
});
it("keeps canonical block mode on the Telegram draft stream path", async () => {
const draftStream = createDraftStream();
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "HelloWorld" });
await dispatcherOptions.deliver({ text: "HelloWorld" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext(),
streamMode: "block",
telegramCfg: { streaming: { mode: "block" } },
});
expect(createTelegramDraftStream).toHaveBeenCalled();
expect(draftStream.update).toHaveBeenCalledWith("HelloWorld");
});
it("streams text-only finals into the answer message", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Final answer" }, { kind: "final" });
return { queuedFinal: true };
});
await dispatchWithContext({ context: createContext() });
expect(answerDraftStream.update).toHaveBeenCalledWith("Final answer");
expect(answerDraftStream.stop).toHaveBeenCalled();
expect(deliverReplies).not.toHaveBeenCalled();
expect(editMessageTelegram).not.toHaveBeenCalled();
expectRecordFields(mockCallArg(emitInternalMessageSentHook), {
content: "Final answer",
messageId: 2001,
});
});
it("mirrors preview-finalized finals into the session transcript", async () => {
setupDraftStreams({ answerMessageId: 2001 });
const context = createContext();
context.ctxPayload.SessionKey = "agent:default:telegram:direct:123";
loadSessionStore.mockReturnValue({
"agent:default:telegram:direct:123": { sessionId: "s1" },
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Final answer" }, { kind: "final" });
return { queuedFinal: true };
});
await dispatchWithContext({ context });
const transcriptCall = expectRecordFields(mockCallArg(appendSessionTranscriptMessage), {
transcriptPath: "/tmp/session.jsonl",
});
expectRecordFields(transcriptCall.message, {
role: "assistant",
provider: "openclaw",
model: "delivery-mirror",
content: [{ type: "text", text: "Final answer" }],
});
expectRecordFields(mockCallArg(emitSessionTranscriptUpdate), {
sessionFile: "/tmp/session.jsonl",
sessionKey: "agent:default:telegram:direct:123",
messageId: "m1",
});
});
it("streams block and final text through the same answer message", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Working" });
await dispatcherOptions.deliver({ text: "Done" }, { kind: "final" });
return { queuedFinal: true };
},
);
await dispatchWithContext({ context: createContext() });
expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "Working");
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Done");
expect(answerDraftStream.stop).toHaveBeenCalled();
expect(deliverReplies).not.toHaveBeenCalled();
expect(editMessageTelegram).not.toHaveBeenCalled();
});
it("applies partial deltas while preserving the first-preview debounce", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({
text: "Streaming ",
delta: "Streaming ",
});
await replyOptions?.onPartialReply?.({
text: "Streaming previews ",
delta: "previews ",
});
await replyOptions?.onPartialReply?.({
text: "Streaming previews are useful because they show progress.",
delta: "are useful because they show progress.",
});
await dispatcherOptions.deliver(
{ text: "Streaming previews are useful because they show progress." },
{ kind: "final" },
);
return { queuedFinal: true };
},
);
await dispatchWithContext({
context: createContext(),
streamMode: "partial",
telegramCfg: { streaming: { mode: "partial" } },
});
expectDraftStreamParams({ minInitialChars: 30 });
expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "Streaming ");
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Streaming previews ");
expect(answerDraftStream.update).toHaveBeenNthCalledWith(
3,
"Streaming previews are useful because they show progress.",
);
expect(answerDraftStream.update).toHaveBeenLastCalledWith(
"Streaming previews are useful because they show progress.",
);
expect(answerDraftStream.stop).toHaveBeenCalled();
expect(deliverReplies).not.toHaveBeenCalled();
});
it("replaces non-prefix partial snapshots instead of appending them", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({
text: "Working...",
delta: "Working...",
});
await replyOptions?.onPartialReply?.({
text: "Done.",
delta: "",
replace: true,
});
await dispatcherOptions.deliver({ text: "Done." }, { kind: "final" });
return { queuedFinal: true };
},
);
await dispatchWithContext({
context: createContext(),
streamMode: "partial",
telegramCfg: { streaming: { mode: "partial" } },
});
expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "Working...");
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Done.");
expect(answerDraftStream.update).toHaveBeenLastCalledWith("Done.");
expect(deliverReplies).not.toHaveBeenCalled();
});
it("does not coalesce answer partial fragments with tool progress drafts", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onToolStart?.({ name: "exec", phase: "start" });
await replyOptions?.onPartialReply?.({ text: "Done ", delta: "Done " });
await replyOptions?.onPartialReply?.({ text: "Done answer", delta: "answer" });
await dispatcherOptions.deliver({ text: "Done answer." }, { kind: "final" });
return { queuedFinal: true };
},
);
await dispatchWithContext({
context: createContext(),
streamMode: "partial",
telegramCfg: { streaming: { mode: "partial" } },
});
expect(answerDraftStream.update.mock.calls[0]?.[0]).toContain("Exec");
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Done ");
expect(answerDraftStream.update).toHaveBeenNthCalledWith(3, "Done answer");
expect(answerDraftStream.update).toHaveBeenLastCalledWith("Done answer.");
expect(deliverReplies).not.toHaveBeenCalled();
});
it("rotates the answer stream only after a finalized assistant message", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onPartialReply?.({ text: "Message B partial" });
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
return { queuedFinal: true };
},
);
await dispatchWithContext({ context: createContext() });
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "Message A final");
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Message B partial");
expect(answerDraftStream.update).toHaveBeenNthCalledWith(3, "Message B final");
expect(deliverReplies).not.toHaveBeenCalled();
});
it("keeps compaction replay on the same answer stream", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Partial before compaction" });
await replyOptions?.onCompactionStart?.();
await replyOptions?.onPartialReply?.({ text: "Partial before compaction" });
await dispatcherOptions.deliver({ text: "Final after compaction" }, { kind: "final" });
return { queuedFinal: true };
},
);
await dispatchWithContext({ context: createContext() });
expect(answerDraftStream.forceNewMessage).not.toHaveBeenCalled();
expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "Partial before compaction");
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Final after compaction");
expect(deliverReplies).not.toHaveBeenCalled();
});
it("rotates a tool-progress-only answer draft before streaming the final answer", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onToolStart?.({ name: "exec", phase: "start" });
await dispatcherOptions.deliver({ text: "Branch is up to date" }, { kind: "final" });
return { queuedFinal: true };
},
);
await dispatchWithContext({ context: createContext() });
expect(answerDraftStream.update).toHaveBeenNthCalledWith(
1,
expect.stringMatching(/`🛠️ Exec`$/),
);
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Branch is up to date");
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
expect(answerDraftStream.clear).not.toHaveBeenCalled();
const rotationOrder = answerDraftStream.forceNewMessage.mock.invocationCallOrder[0];
const finalUpdateOrder = answerDraftStream.update.mock.invocationCallOrder[1];
expect(rotationOrder).toBeLessThan(finalUpdateOrder);
});
it("rotates a verbose tool result draft before streaming the final answer", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "🛠️ Exec: pnpm test" }, { kind: "tool" });
await dispatcherOptions.deliver({ text: "Tests passed" }, { kind: "final" });
return { queuedFinal: true };
});
await dispatchWithContext({ context: createContext() });
expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "🛠️ Exec: pnpm test");
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Tests passed");
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
expect(answerDraftStream.clear).not.toHaveBeenCalled();
const rotationOrder = answerDraftStream.forceNewMessage.mock.invocationCallOrder[0];
const finalUpdateOrder = answerDraftStream.update.mock.invocationCallOrder[1];
expect(rotationOrder).toBeLessThan(finalUpdateOrder);
});
it("keeps progress updates in a draft and sends the final answer normally", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onToolStart?.({ name: "exec", phase: "start" });
await replyOptions?.onItemEvent?.({
kind: "command",
name: "exec",
progressText: "git rev-parse --abbrev-ref HEAD",
});
await dispatcherOptions.deliver({ text: "Branch is up to date" }, { kind: "final" });
return { queuedFinal: true };
},
);
await dispatchWithContext({
context: createContext(),
streamMode: "progress",
telegramCfg: { streaming: { mode: "progress" } },
});
expect(answerDraftStream.update).toHaveBeenCalledWith(
"Cracking...\n`🛠️ Exec`\n`🛠️ git rev-parse --abbrev-ref HEAD`",
);
expect(answerDraftStream.update).not.toHaveBeenCalledWith("Branch is up to date");
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
expect(answerDraftStream.clear).not.toHaveBeenCalled();
expectDeliveredReply(0, { text: "Branch is up to date" });
expect(editMessageTelegram).not.toHaveBeenCalled();
});
it("streams the first long final chunk and sends follow-up chunks", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
const longText = "one ".repeat(80);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: longText }, { kind: "final" });
return { queuedFinal: true };
});
await dispatchWithContext({ context: createContext(), textLimit: 80 });
const firstChunk = answerDraftStream.update.mock.calls.at(-1)?.[0] ?? "";
expect(firstChunk.length).toBeLessThanOrEqual(80);
expect(deliverReplies).toHaveBeenCalled();
const followUpTexts = deliverReplies.mock.calls.flatMap((call: unknown[]) =>
((call[0] as { replies?: Array<{ text?: string }> }).replies ?? []).map(
(reply) => reply.text ?? "",
),
);
expect(followUpTexts.join("")).toContain("one");
expect(editMessageTelegram).not.toHaveBeenCalled();
});
it("falls back to normal send for media and clears the pending stream", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver(
{ text: "Photo", mediaUrl: "https://example.com/a.png" },
{ kind: "final" },
);
return { queuedFinal: true };
});
await dispatchWithContext({ context: createContext() });
expect(answerDraftStream.clear).toHaveBeenCalled();
expect(answerDraftStream.update).not.toHaveBeenCalledWith("Photo");
expectDeliveredReply(0, { text: "Photo", mediaUrl: "https://example.com/a.png" });
});
it("shows Telegram progress drafts immediately for explicit tool starts", async () => {
const draftStream = createSequencedDraftStream(2001);
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => {
await replyOptions?.onReplyStart?.();
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onToolStart?.({ name: "exec", phase: "start" });
return { queuedFinal: false };
});
await dispatchWithContext({
context: createContext(),
streamMode: "progress",
telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } },
});
expect(draftStream.update).toHaveBeenCalledWith("Shelling\n`🛠️ Exec`");
expect(draftStream.flush).toHaveBeenCalled();
});
it("renders Telegram progress drafts before slow status reactions resolve", async () => {
const draftStream = createSequencedDraftStream(2001);
createTelegramDraftStream.mockReturnValue(draftStream);
let releaseSetTool: (() => void) | undefined;
const statusReactionController = createStatusReactionController();
statusReactionController.setTool.mockImplementation(
() =>
new Promise<void>((resolve) => {
releaseSetTool = resolve;
}),
);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => {
const pendingToolStart = replyOptions?.onToolStart?.({ name: "exec", phase: "start" });
await Promise.resolve();
await Promise.resolve();
const updateBeforeStatusReaction = draftStream.update.mock.calls.at(-1)?.[0];
releaseSetTool?.();
await pendingToolStart;
expect(updateBeforeStatusReaction).toMatch(/^Shelling\n`🛠️ Exec`$/);
return { queuedFinal: false };
});
await dispatchWithContext({
context: createContext({
statusReactionController: statusReactionController as never,
}),
streamMode: "progress",
telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } },
});
expect(statusReactionController.setTool).toHaveBeenCalledWith("exec");
});
it("keeps non-command Telegram progress draft lines across post-tool assistant boundaries", async () => {
const draftStream = createSequencedDraftStream(2001);
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onReplyStart?.();
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onItemEvent?.({ kind: "search", progressText: "docs lookup" });
await replyOptions?.onItemEvent?.({ progressText: "tests passed" });
await replyOptions?.onAssistantMessageStart?.();
await dispatcherOptions.deliver({ text: "Final after tool" }, { kind: "final" });
return { queuedFinal: true };
},
);
await dispatchWithContext({
context: createContext(),
streamMode: "progress",
telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } },
});
expect(draftStream.update).toHaveBeenCalledWith(
"Shelling\n`🔎 Web Search: docs lookup`\n• `tests passed`",
);
expect(draftStream.forceNewMessage).toHaveBeenCalledTimes(1);
expect(draftStream.materialize).not.toHaveBeenCalled();
expect(draftStream.clear).not.toHaveBeenCalled();
expectDeliveredReply(0, { text: "Final after tool" });
expect(editMessageTelegram).not.toHaveBeenCalled();
});
it("falls back to normal send for error payloads and clears the pending stream", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Boom", isError: true }, { kind: "final" });
return { queuedFinal: true };
});
await dispatchWithContext({ context: createContext() });
expect(answerDraftStream.clear).toHaveBeenCalled();
expectDeliveredReply(0, { text: "Boom" });
});
it("streams button-bearing text into the same message", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
const buttons = [[{ text: "OK", callback_data: "ok" }]];
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver(
{ text: "Choose", channelData: { telegram: { buttons } } },
{ kind: "final" },
);
return { queuedFinal: true };
});
await dispatchWithContext({ context: createContext() });
expect(answerDraftStream.update).toHaveBeenCalledWith("Choose");
expect(mockCallArg(editMessageTelegram)).toBe(123);
expect(mockCallArg(editMessageTelegram, 0, 1)).toBe(2001);
expect(mockCallArg(editMessageTelegram, 0, 2)).toBe("Choose");
expectRecordFields(mockCallArg(editMessageTelegram, 0, 3), { buttons });
expect(deliverReplies).not.toHaveBeenCalled();
});
it("streams reasoning and answer text on separate lanes", async () => {
const { answerDraftStream, reasoningDraftStream } = setupDraftStreams({
answerMessageId: 2001,
reasoningMessageId: 3001,
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onReasoningStream?.({ text: "<think>Thinking</think>" });
await dispatcherOptions.deliver({ text: "Answer" }, { kind: "final" });
return { queuedFinal: true };
},
);
await dispatchWithContext({ context: createReasoningStreamContext() });
expect(reasoningDraftStream.update).toHaveBeenCalledWith("Reasoning:\n_Thinking_");
expect(answerDraftStream.update).toHaveBeenCalledWith("Answer");
expect(deliverReplies).not.toHaveBeenCalled();
});
it("streams reasoning from configured defaults", async () => {
const { answerDraftStream, reasoningDraftStream } = setupDraftStreams({
answerMessageId: 2001,
reasoningMessageId: 3001,
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onReasoningStream?.({ text: "<think>Thinking</think>" });
await dispatcherOptions.deliver({ text: "Answer" }, { kind: "final" });
return { queuedFinal: true };
},
);
await dispatchWithContext({
context: createReasoningDefaultContext(),
cfg: {
agents: {
defaults: { reasoningDefault: "off" },
list: [{ id: "Ops", reasoningDefault: "stream" }],
},
},
});
expect(reasoningDraftStream.update).toHaveBeenCalledWith("Reasoning:\n_Thinking_");
expect(answerDraftStream.update).toHaveBeenCalledWith("Answer");
});
it("suppresses reasoning-only finals without raw text fallback", async () => {
setupDraftStreams({ answerMessageId: 2001, reasoningMessageId: 3001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "<think>hidden</think>" }, { kind: "final" });
return { queuedFinal: true };
});
await dispatchWithContext({ context: createContext() });
expect(deliverReplies).not.toHaveBeenCalled();
expect(editMessageTelegram).not.toHaveBeenCalled();
});
it("does not add silent fallback when source delivery is message-tool-only", async () => {
setupDraftStreams({ answerMessageId: 2001, reasoningMessageId: 3001 });
dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({
queuedFinal: false,
counts: { block: 0, final: 0, tool: 0 },
sourceReplyDeliveryMode: "message_tool_only",
});
await dispatchWithContext({
context: createContext({
ctxPayload: {
SessionKey: "agent:main:telegram:direct:123",
} as unknown as TelegramMessageContext["ctxPayload"],
}),
cfg: {
agents: {
defaults: {
silentReply: {
direct: "disallow",
group: "allow",
internal: "allow",
},
silentReplyRewrite: {
direct: true,
},
},
},
},
});
expect(deliverReplies).not.toHaveBeenCalled();
expect(editMessageTelegram).not.toHaveBeenCalled();
expect(sendMessageTelegram).not.toHaveBeenCalled();
});
it("shows compacting reaction during auto-compaction and resumes thinking", async () => {
const statusReactionController = {
setThinking: vi.fn(async () => {}),
setCompacting: vi.fn(async () => {}),
setTool: vi.fn(async () => {}),
setDone: vi.fn(async () => {}),
setError: vi.fn(async () => {}),
setQueued: vi.fn(async () => {}),
cancelPending: vi.fn(() => {}),
clear: vi.fn(async () => {}),
restoreInitial: vi.fn(async () => {}),
};
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => {
await replyOptions?.onCompactionStart?.();
await replyOptions?.onCompactionEnd?.();
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext({
statusReactionController: statusReactionController as never,
}),
streamMode: "off",
});
expect(statusReactionController.setCompacting).toHaveBeenCalledTimes(1);
expect(statusReactionController.cancelPending).toHaveBeenCalledTimes(1);
expect(statusReactionController.setThinking).toHaveBeenCalledTimes(2);
expect(statusReactionController.setCompacting.mock.invocationCallOrder[0]).toBeLessThan(
statusReactionController.cancelPending.mock.invocationCallOrder[0],
);
expect(statusReactionController.cancelPending.mock.invocationCallOrder[0]).toBeLessThan(
statusReactionController.setThinking.mock.invocationCallOrder[1],
);
});
it("does not supersede the same session for unauthorized abort-looking commands", async () => {
let releaseFirstFinal: (() => void) | undefined;
const firstFinalGate = new Promise<void>((resolve) => {
releaseFirstFinal = resolve;
});
let resolveStreamVisible: (() => void) | undefined;
const streamVisible = new Promise<void>((resolve) => {
resolveStreamVisible = resolve;
});
const firstAnswerDraft = createTestDraftStream({
messageId: 1001,
onUpdate: (text) => {
if (text === "Old reply partial") {
if (!resolveStreamVisible) {
throw new Error("Expected Telegram stream-visible resolver to be initialized");
}
resolveStreamVisible();
}
},
});
const firstReasoningDraft = createDraftStream();
const unauthorizedAnswerDraft = createDraftStream();
const unauthorizedReasoningDraft = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => firstAnswerDraft)
.mockImplementationOnce(() => firstReasoningDraft)
.mockImplementationOnce(() => unauthorizedAnswerDraft)
.mockImplementationOnce(() => unauthorizedReasoningDraft);
dispatchReplyWithBufferedBlockDispatcher
.mockImplementationOnce(async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Old reply partial" });
await firstFinalGate;
await dispatcherOptions.deliver({ text: "Old reply final" }, { kind: "final" });
return { queuedFinal: true };
})
.mockImplementationOnce(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Unauthorized stop" }, { kind: "final" });
return { queuedFinal: true };
});
const unauthorizedReplyDelivered = observeDeliveredReply("Unauthorized stop");
const firstPromise = dispatchWithContext({
context: createContext({
ctxPayload: {
SessionKey: "s1",
Body: "earlier request",
RawBody: "earlier request",
} as never,
}),
});
await streamVisible;
const unauthorizedPromise = dispatchWithContext({
context: createContext({
ctxPayload: {
SessionKey: "s1",
Body: "/stop",
RawBody: "/stop",
CommandBody: "/stop",
CommandAuthorized: false,
} as never,
}),
});
await unauthorizedReplyDelivered;
if (!releaseFirstFinal) {
throw new Error("Expected first Telegram final release callback to be initialized");
}
releaseFirstFinal();
await Promise.all([firstPromise, unauthorizedPromise]);
expect(firstAnswerDraft.update).toHaveBeenCalledWith("Old reply final");
expect(editMessageTelegram).not.toHaveBeenCalled();
});
it("uses configured doneHoldMs when clearing Telegram status reactions after reply", async () => {
vi.useFakeTimers();
const reactionApi = vi.fn(async () => true);
const statusReactionController = createStatusReactionController();
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Done" }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
try {
await dispatchWithContext({
context: createContext({
reactionApi: reactionApi as never,
removeAckAfterReply: true,
statusReactionController: statusReactionController as never,
}),
cfg: {
messages: {
statusReactions: {
timing: {
doneHoldMs: 250,
},
},
},
},
streamMode: "off",
});
expect(statusReactionController.setDone).toHaveBeenCalledTimes(1);
expect(statusReactionController.restoreInitial).not.toHaveBeenCalled();
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
await vi.advanceTimersByTimeAsync(249);
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
await vi.advanceTimersByTimeAsync(1);
expect(reactionApi).toHaveBeenCalledWith(123, 456, []);
} finally {
vi.useRealTimers();
}
});
it("restores the initial Telegram status reaction after reply when removeAckAfterReply is disabled", async () => {
const reactionApi = vi.fn(async () => true);
const statusReactionController = createStatusReactionController();
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Done" }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext({
reactionApi: reactionApi as never,
removeAckAfterReply: false,
statusReactionController: statusReactionController as never,
}),
streamMode: "off",
});
await vi.waitFor(() => {
expect(statusReactionController.setDone).toHaveBeenCalledTimes(1);
expect(statusReactionController.restoreInitial).toHaveBeenCalledTimes(1);
});
expect(statusReactionController.setError).not.toHaveBeenCalled();
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
});
it("uses configured errorHoldMs to clear Telegram status reactions after an error fallback", async () => {
vi.useFakeTimers();
const reactionApi = vi.fn(async () => true);
const statusReactionController = createStatusReactionController();
dispatchReplyWithBufferedBlockDispatcher.mockRejectedValue(new Error("dispatcher exploded"));
deliverReplies.mockResolvedValue({ delivered: true });
try {
await dispatchWithContext({
context: createContext({
reactionApi: reactionApi as never,
removeAckAfterReply: true,
statusReactionController: statusReactionController as never,
}),
cfg: {
messages: {
statusReactions: {
timing: {
errorHoldMs: 320,
},
},
},
},
streamMode: "off",
});
expect(statusReactionController.setError).toHaveBeenCalledTimes(1);
expect(statusReactionController.setDone).not.toHaveBeenCalled();
expect(statusReactionController.restoreInitial).not.toHaveBeenCalled();
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
await vi.advanceTimersByTimeAsync(319);
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
await vi.advanceTimersByTimeAsync(1);
expect(reactionApi).toHaveBeenCalledWith(123, 456, []);
} finally {
vi.useRealTimers();
}
});
it("restores the initial Telegram status reaction after an error when no final reply is sent", async () => {
vi.useFakeTimers();
const reactionApi = vi.fn(async () => true);
const statusReactionController = createStatusReactionController();
dispatchReplyWithBufferedBlockDispatcher.mockRejectedValue(new Error("dispatcher exploded"));
deliverReplies.mockResolvedValue({ delivered: false });
try {
await dispatchWithContext({
context: createContext({
reactionApi: reactionApi as never,
removeAckAfterReply: true,
statusReactionController: statusReactionController as never,
}),
cfg: {
messages: {
statusReactions: {
timing: {
errorHoldMs: 320,
},
},
},
},
streamMode: "off",
});
expect(statusReactionController.setError).toHaveBeenCalledTimes(1);
expect(statusReactionController.restoreInitial).not.toHaveBeenCalled();
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
await vi.advanceTimersByTimeAsync(319);
expect(statusReactionController.restoreInitial).not.toHaveBeenCalled();
await vi.advanceTimersByTimeAsync(1);
expect(statusReactionController.restoreInitial).toHaveBeenCalledTimes(1);
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
} finally {
vi.useRealTimers();
}
});
it("restores the initial Telegram status reaction after an error fallback when removeAckAfterReply is disabled", async () => {
const reactionApi = vi.fn(async () => true);
const statusReactionController = createStatusReactionController();
dispatchReplyWithBufferedBlockDispatcher.mockRejectedValue(new Error("dispatcher exploded"));
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext({
reactionApi: reactionApi as never,
removeAckAfterReply: false,
statusReactionController: statusReactionController as never,
}),
streamMode: "off",
});
await vi.waitFor(() => {
expect(statusReactionController.setError).toHaveBeenCalledTimes(1);
expect(statusReactionController.restoreInitial).toHaveBeenCalledTimes(1);
});
expect(statusReactionController.setDone).not.toHaveBeenCalled();
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
});
it("uses resolved DM config for auto-topic-label overrides", async () => {
dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({ queuedFinal: true });
loadSessionStore.mockReturnValue({ s1: {} });
const bot = createBot();
await dispatchWithContext({
bot,
context: createContext({
ctxPayload: {
SessionKey: "s1",
RawBody: "Need help with invoices",
} as TelegramMessageContext["ctxPayload"],
groupConfig: {
autoTopicLabel: false,
} as TelegramMessageContext["groupConfig"],
}),
telegramCfg: { autoTopicLabel: true },
cfg: {
channels: {
telegram: {
direct: {
"123": { autoTopicLabel: true },
},
},
},
},
});
expect(generateTopicLabel).not.toHaveBeenCalled();
expect(bot.api.editForumTopic).not.toHaveBeenCalled();
});
it("does not emit a silent-reply fallback when the dispatcher reports a queued final reply", async () => {
dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({
queuedFinal: true,
counts: { block: 0, final: 1, tool: 0 },
});
await dispatchWithContext({
context: createContext({
ctxPayload: createDirectSessionPayload(),
}),
streamMode: "off",
});
expect(deliverReplies).not.toHaveBeenCalled();
});
it("does not emit a silent-reply fallback for no-response DM turns", async () => {
dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({
queuedFinal: false,
counts: { block: 0, final: 0, tool: 0 },
});
await dispatchWithContext({
context: createContext({
ctxPayload: createDirectSessionPayload(),
}),
streamMode: "off",
});
expect(deliverReplies).not.toHaveBeenCalled();
});
it("emits a silent-reply fallback for no-response group turns", async () => {
dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({
queuedFinal: false,
counts: { block: 0, final: 0, tool: 0 },
});
await dispatchWithContext({
context: createContext({
chatId: -1001234,
isGroup: true,
ctxPayload: {
SessionKey: "agent:test:telegram:group:-1001234",
ChatType: "group",
} as TelegramMessageContext["ctxPayload"],
primaryCtx: {
message: { chat: { id: -1001234, type: "supergroup" } },
} as TelegramMessageContext["primaryCtx"],
msg: {
chat: { id: -1001234, type: "supergroup" },
message_id: 456,
} as TelegramMessageContext["msg"],
threadSpec: { id: undefined, scope: "none" },
replyThreadId: undefined,
}),
cfg: {
agents: {
defaults: {
silentReply: {
direct: "disallow",
group: "disallow",
internal: "allow",
},
silentReplyRewrite: {
group: true,
},
},
},
} as Parameters<typeof dispatchTelegramMessage>[0]["cfg"],
streamMode: "off",
});
expect(deliverReplies).toHaveBeenCalledTimes(1);
const replies = deliverReplies.mock.calls[0]?.[0]?.replies as
| Array<{ text?: string }>
| undefined;
const replyText = replies?.[0]?.text?.trim();
if (!replyText) {
throw new Error("expected non-empty Telegram reply text");
}
expect(replies?.[0]?.text).not.toBe("NO_REPLY");
});
describe("non-streaming media dedup", () => {
const finalDeliveryPayload = () => {
for (const [params] of deliverInboundReplyWithMessageSendContext.mock.calls) {
if (params.info.kind === "final") {
return params.payload;
}
}
throw new Error("missing final delivery");
};
it("deduplicates block-sent media from final reply", async () => {
deliverReplies.mockResolvedValue({ delivered: true });
deliverInboundReplyWithMessageSendContext.mockResolvedValue({
status: "handled_visible",
delivery: { messageIds: ["101"], visibleReplySent: true },
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ mediaUrls: ["/tmp/cat.jpg"] }, { kind: "block" });
await dispatcherOptions.deliver(
{ text: "Here is the image", mediaUrls: ["/tmp/cat.jpg"] },
{ kind: "final" },
);
return { queuedFinal: true };
});
await dispatchWithContext({
context: createContext(),
streamMode: "off",
telegramDeps: telegramDepsForTest,
});
expect(finalDeliveryPayload().mediaUrls).toEqual([]);
});
it("preserves final media when block delivery reports no visible send", async () => {
deliverReplies.mockResolvedValueOnce({ delivered: false });
deliverReplies.mockResolvedValue({ delivered: true });
deliverInboundReplyWithMessageSendContext.mockResolvedValue({
status: "handled_visible",
delivery: { messageIds: ["101"], visibleReplySent: true },
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ mediaUrls: ["/tmp/cat.jpg"] }, { kind: "block" });
await dispatcherOptions.deliver(
{ text: "Here is the image", mediaUrls: ["/tmp/cat.jpg"] },
{ kind: "final" },
);
return { queuedFinal: true };
});
await dispatchWithContext({
context: createContext(),
streamMode: "off",
telegramDeps: telegramDepsForTest,
});
expect(finalDeliveryPayload().mediaUrls).toEqual(["/tmp/cat.jpg"]);
});
it("preserves final media when block delivery fails", async () => {
deliverReplies.mockRejectedValueOnce(new Error("Telegram API error"));
deliverReplies.mockResolvedValue({ delivered: true });
deliverInboundReplyWithMessageSendContext.mockResolvedValue({
status: "handled_visible",
delivery: { messageIds: ["101"], visibleReplySent: true },
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
try {
await dispatcherOptions.deliver({ mediaUrls: ["/tmp/cat.jpg"] }, { kind: "block" });
} catch {}
await dispatcherOptions.deliver(
{ text: "Here is the image", mediaUrls: ["/tmp/cat.jpg"] },
{ kind: "final" },
);
return { queuedFinal: true };
});
await dispatchWithContext({
context: createContext(),
streamMode: "off",
telegramDeps: telegramDepsForTest,
});
expect(finalDeliveryPayload().mediaUrls).toEqual(["/tmp/cat.jpg"]);
});
});
});