Files
openclaw/extensions/telegram/src/bot-message-dispatch.test.ts
2026-05-08 19:35:31 +01:00

1589 lines
59 KiB
TypeScript

import type { Bot } from "grammy";
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { resolveAutoTopicLabelConfig as resolveAutoTopicLabelConfigRuntime } from "./auto-topic-label-config.js";
import type { TelegramBotDeps } from "./bot-deps.js";
import {
createSequencedTestDraftStream,
createTestDraftStream,
} from "./draft-stream.test-helpers.js";
type DispatchReplyWithBufferedBlockDispatcherArgs = Parameters<
TelegramBotDeps["dispatchReplyWithBufferedBlockDispatcher"]
>[0];
const createTelegramDraftStream = vi.hoisted(() => vi.fn());
const dispatchReplyWithBufferedBlockDispatcher = vi.hoisted(() =>
vi.fn<(params: DispatchReplyWithBufferedBlockDispatcherArgs) => Promise<unknown>>(),
);
const deliverReplies = vi.hoisted(() => vi.fn());
const deliverInboundReplyWithMessageSendContext = vi.hoisted(() => vi.fn());
const emitInternalMessageSentHook = vi.hoisted(() => vi.fn());
const createForumTopicTelegram = vi.hoisted(() => vi.fn());
const deleteMessageTelegram = vi.hoisted(() => vi.fn());
const editForumTopicTelegram = vi.hoisted(() => vi.fn());
const editMessageTelegram = vi.hoisted(() => vi.fn());
const reactMessageTelegram = vi.hoisted(() => vi.fn());
const sendMessageTelegram = vi.hoisted(() => vi.fn());
const sendPollTelegram = vi.hoisted(() => vi.fn());
const sendStickerTelegram = vi.hoisted(() => vi.fn());
const loadConfig = vi.hoisted(() => vi.fn(() => ({})));
const readChannelAllowFromStore = vi.hoisted(() => vi.fn(async () => []));
const upsertChannelPairingRequest = vi.hoisted(() =>
vi.fn(async () => ({
code: "PAIRCODE",
created: true,
})),
);
const enqueueSystemEvent = vi.hoisted(() => vi.fn());
const buildModelsProviderData = vi.hoisted(() =>
vi.fn(async () => ({
byProvider: new Map<string, Set<string>>(),
providers: [],
resolvedDefault: { provider: "openai", model: "gpt-test" },
modelNames: new Map<string, string>(),
})),
);
const listSkillCommandsForAgents = vi.hoisted(() => vi.fn(() => []));
const createChannelMessageReplyPipeline = vi.hoisted(() =>
vi.fn(() => ({
responsePrefix: undefined,
responsePrefixContextProvider: () => ({ identityName: undefined }),
onModelSelected: () => undefined,
})),
);
const wasSentByBot = vi.hoisted(() => vi.fn(() => false));
const loadSessionStore = vi.hoisted(() => vi.fn());
const resolveStorePath = vi.hoisted(() => vi.fn(() => "/tmp/sessions.json"));
const generateTopicLabel = vi.hoisted(() => vi.fn());
const describeStickerImage = vi.hoisted(() => vi.fn(async () => null));
const loadModelCatalog = vi.hoisted(() => vi.fn(async () => ({})));
const findModelInCatalog = vi.hoisted(() => vi.fn(() => null));
const modelSupportsVision = vi.hoisted(() => vi.fn(() => false));
const resolveAgentDir = vi.hoisted(() => vi.fn(() => "/tmp/agent"));
const resolveDefaultModelForAgent = vi.hoisted(() =>
vi.fn(() => ({ provider: "openai", model: "gpt-test" })),
);
const getAgentScopedMediaLocalRoots = vi.hoisted(() =>
vi.fn((_cfg: unknown, agentId: string) => [`/tmp/.openclaw/workspace-${agentId}`]),
);
const resolveChunkMode = vi.hoisted(() => vi.fn(() => undefined));
const resolveMarkdownTableMode = vi.hoisted(() => vi.fn(() => "preserve"));
const resolveSessionStoreEntry = vi.hoisted(() =>
vi.fn(({ store, sessionKey }: { store: Record<string, unknown>; sessionKey: string }) => ({
existing: store[sessionKey],
})),
);
vi.mock("./draft-stream.js", () => ({
createTelegramDraftStream,
}));
vi.mock("openclaw/plugin-sdk/channel-message", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/channel-message")>();
return {
...actual,
deliverInboundReplyWithMessageSendContext,
};
});
vi.mock("./bot/delivery.js", () => ({
deliverReplies,
emitInternalMessageSentHook,
}));
vi.mock("./bot/delivery.replies.js", () => ({
deliverReplies,
emitInternalMessageSentHook,
}));
vi.mock("./send.js", () => ({
createForumTopicTelegram,
deleteMessageTelegram,
editForumTopicTelegram,
editMessageTelegram,
reactMessageTelegram,
sendMessageTelegram,
sendPollTelegram,
sendStickerTelegram,
}));
vi.mock("./bot-message-dispatch.runtime.js", () => ({
generateTopicLabel,
getAgentScopedMediaLocalRoots,
loadSessionStore,
resolveAutoTopicLabelConfig: resolveAutoTopicLabelConfigRuntime,
resolveChunkMode,
resolveMarkdownTableMode,
resolveSessionStoreEntry,
resolveStorePath,
}));
vi.mock("./bot-message-dispatch.agent.runtime.js", () => ({
findModelInCatalog,
loadModelCatalog,
modelSupportsVision,
resolveAgentDir,
resolveDefaultModelForAgent,
}));
vi.mock("./sticker-cache.js", () => ({
cacheSticker: vi.fn(),
getCachedSticker: () => null,
getCacheStats: () => ({ count: 0 }),
searchStickers: () => [],
getAllCachedStickers: () => [],
describeStickerImage,
}));
let dispatchTelegramMessage: typeof import("./bot-message-dispatch.js").dispatchTelegramMessage;
let resetTelegramReplyFenceForTests: typeof import("./bot-message-dispatch.js").resetTelegramReplyFenceForTests;
const telegramDepsForTest: TelegramBotDeps = {
getRuntimeConfig: loadConfig as TelegramBotDeps["getRuntimeConfig"],
resolveStorePath: resolveStorePath as TelegramBotDeps["resolveStorePath"],
loadSessionStore: loadSessionStore as TelegramBotDeps["loadSessionStore"],
readChannelAllowFromStore:
readChannelAllowFromStore as TelegramBotDeps["readChannelAllowFromStore"],
upsertChannelPairingRequest:
upsertChannelPairingRequest as TelegramBotDeps["upsertChannelPairingRequest"],
enqueueSystemEvent: enqueueSystemEvent as TelegramBotDeps["enqueueSystemEvent"],
dispatchReplyWithBufferedBlockDispatcher:
dispatchReplyWithBufferedBlockDispatcher as TelegramBotDeps["dispatchReplyWithBufferedBlockDispatcher"],
buildModelsProviderData: buildModelsProviderData as TelegramBotDeps["buildModelsProviderData"],
listSkillCommandsForAgents:
listSkillCommandsForAgents as TelegramBotDeps["listSkillCommandsForAgents"],
createChannelMessageReplyPipeline:
createChannelMessageReplyPipeline as TelegramBotDeps["createChannelMessageReplyPipeline"],
wasSentByBot: wasSentByBot as TelegramBotDeps["wasSentByBot"],
createTelegramDraftStream:
createTelegramDraftStream as TelegramBotDeps["createTelegramDraftStream"],
deliverReplies: deliverReplies as TelegramBotDeps["deliverReplies"],
deliverInboundReplyWithMessageSendContext:
deliverInboundReplyWithMessageSendContext as TelegramBotDeps["deliverInboundReplyWithMessageSendContext"],
emitInternalMessageSentHook:
emitInternalMessageSentHook as TelegramBotDeps["emitInternalMessageSentHook"],
editMessageTelegram: editMessageTelegram as TelegramBotDeps["editMessageTelegram"],
};
describe("dispatchTelegramMessage draft streaming", () => {
type TelegramMessageContext = Parameters<typeof dispatchTelegramMessage>[0]["context"];
beforeAll(async () => {
({ dispatchTelegramMessage, resetTelegramReplyFenceForTests } =
await import("./bot-message-dispatch.js"));
});
beforeEach(() => {
resetTelegramReplyFenceForTests();
createTelegramDraftStream.mockReset();
dispatchReplyWithBufferedBlockDispatcher.mockReset();
deliverReplies.mockReset();
deliverInboundReplyWithMessageSendContext.mockReset();
emitInternalMessageSentHook.mockReset();
createForumTopicTelegram.mockReset();
deleteMessageTelegram.mockReset();
editForumTopicTelegram.mockReset();
editMessageTelegram.mockReset();
reactMessageTelegram.mockReset();
sendMessageTelegram.mockReset();
sendPollTelegram.mockReset();
sendStickerTelegram.mockReset();
loadConfig.mockReset();
readChannelAllowFromStore.mockReset();
upsertChannelPairingRequest.mockReset();
enqueueSystemEvent.mockReset();
buildModelsProviderData.mockReset();
listSkillCommandsForAgents.mockReset();
createChannelMessageReplyPipeline.mockReset();
wasSentByBot.mockReset();
loadSessionStore.mockReset();
resolveStorePath.mockReset();
generateTopicLabel.mockReset();
getAgentScopedMediaLocalRoots.mockClear();
resolveChunkMode.mockClear();
resolveMarkdownTableMode.mockClear();
resolveSessionStoreEntry.mockClear();
describeStickerImage.mockReset();
loadModelCatalog.mockReset();
findModelInCatalog.mockReset();
modelSupportsVision.mockReset();
resolveAgentDir.mockReset();
resolveDefaultModelForAgent.mockReset();
loadConfig.mockReturnValue({});
dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({
queuedFinal: false,
counts: { block: 0, final: 0, tool: 0 },
});
deliverReplies.mockResolvedValue({ delivered: true });
deliverInboundReplyWithMessageSendContext.mockResolvedValue({
status: "unsupported",
reason: "missing_outbound_handler",
});
emitInternalMessageSentHook.mockResolvedValue(undefined);
createForumTopicTelegram.mockResolvedValue({ message_thread_id: 777 });
deleteMessageTelegram.mockResolvedValue(true);
editForumTopicTelegram.mockResolvedValue(true);
editMessageTelegram.mockResolvedValue({ ok: true });
reactMessageTelegram.mockResolvedValue(true);
sendMessageTelegram.mockResolvedValue({ message_id: 1001 });
sendPollTelegram.mockResolvedValue({ message_id: 1001 });
sendStickerTelegram.mockResolvedValue({ message_id: 1001 });
readChannelAllowFromStore.mockResolvedValue([]);
upsertChannelPairingRequest.mockResolvedValue({
code: "PAIRCODE",
created: true,
});
enqueueSystemEvent.mockResolvedValue(undefined);
buildModelsProviderData.mockResolvedValue({
byProvider: new Map<string, Set<string>>(),
providers: [],
resolvedDefault: { provider: "openai", model: "gpt-test" },
modelNames: new Map<string, string>(),
});
listSkillCommandsForAgents.mockReturnValue([]);
createChannelMessageReplyPipeline.mockReturnValue({
responsePrefix: undefined,
responsePrefixContextProvider: () => ({ identityName: undefined }),
onModelSelected: () => undefined,
});
wasSentByBot.mockReturnValue(false);
resolveStorePath.mockReturnValue("/tmp/sessions.json");
loadSessionStore.mockReturnValue({});
generateTopicLabel.mockResolvedValue("Topic label");
describeStickerImage.mockResolvedValue(null);
loadModelCatalog.mockResolvedValue({});
findModelInCatalog.mockReturnValue(null);
modelSupportsVision.mockReturnValue(false);
resolveAgentDir.mockReturnValue("/tmp/agent");
resolveDefaultModelForAgent.mockReturnValue({
provider: "openai",
model: "gpt-test",
});
});
const createDraftStream = (messageId?: number) => createTestDraftStream({ messageId });
const createSequencedDraftStream = (startMessageId = 1001) =>
createSequencedTestDraftStream(startMessageId);
function setupDraftStreams(params?: { answerMessageId?: number; reasoningMessageId?: number }) {
const answerDraftStream = createDraftStream(params?.answerMessageId);
const reasoningDraftStream = createDraftStream(params?.reasoningMessageId);
createTelegramDraftStream
.mockImplementationOnce(() => answerDraftStream)
.mockImplementationOnce(() => reasoningDraftStream);
return { answerDraftStream, reasoningDraftStream };
}
function createContext(overrides?: Partial<TelegramMessageContext>): TelegramMessageContext {
const base = {
ctxPayload: {},
primaryCtx: { message: { chat: { id: 123, type: "private" } } },
msg: {
chat: { id: 123, type: "private" },
message_id: 456,
message_thread_id: 777,
},
chatId: 123,
isGroup: false,
groupConfig: undefined,
resolvedThreadId: undefined,
replyThreadId: 777,
threadSpec: { id: 777, scope: "dm" },
historyKey: undefined,
historyLimit: 0,
groupHistories: new Map(),
route: { agentId: "default", accountId: "default" },
skillFilter: undefined,
sendTyping: vi.fn(),
sendRecordVoice: vi.fn(),
ackReactionPromise: null,
reactionApi: null,
removeAckAfterReply: false,
} as unknown as TelegramMessageContext;
base.turn = {
storePath: "/tmp/openclaw/telegram-sessions.json",
recordInboundSession: vi.fn(async () => undefined),
record: {
onRecordError: vi.fn(),
},
} as unknown as TelegramMessageContext["turn"];
return {
...base,
...overrides,
// Merge nested fields when overrides provide partial objects.
primaryCtx: {
...(base.primaryCtx as object),
...(overrides?.primaryCtx ? (overrides.primaryCtx as object) : null),
} as TelegramMessageContext["primaryCtx"],
msg: {
...(base.msg as object),
...(overrides?.msg ? (overrides.msg as object) : null),
} as TelegramMessageContext["msg"],
route: {
...(base.route as object),
...(overrides?.route ? (overrides.route as object) : null),
} as TelegramMessageContext["route"],
};
}
function createStatusReactionController() {
return {
setQueued: vi.fn(),
setThinking: vi.fn(async () => {}),
setTool: vi.fn(async () => {}),
setCompacting: vi.fn(async () => {}),
cancelPending: vi.fn(),
setError: vi.fn(async () => {}),
setDone: vi.fn(async () => {}),
restoreInitial: vi.fn(async () => {}),
};
}
function observeDeliveredReply(text: string): Promise<void> {
return new Promise((resolve) => {
deliverReplies.mockImplementation(async (params: { replies?: Array<{ text?: string }> }) => {
if (params.replies?.some((reply) => reply.text === text)) {
resolve();
}
return { delivered: true };
});
});
}
function createBot(): Bot {
return {
api: {
sendMessage: vi.fn(async (_chatId, _text, params) => ({
message_id:
typeof params?.message_thread_id === "number" ? params.message_thread_id : 1001,
})),
editMessageText: vi.fn(async () => ({ message_id: 1001 })),
deleteMessage: vi.fn().mockResolvedValue(true),
editForumTopic: vi.fn().mockResolvedValue(true),
},
} as unknown as Bot;
}
function createRuntime(): Parameters<typeof dispatchTelegramMessage>[0]["runtime"] {
return {
log: vi.fn(),
error: vi.fn(),
exit: () => {
throw new Error("exit");
},
};
}
async function dispatchWithContext(params: {
context: TelegramMessageContext;
cfg?: Parameters<typeof dispatchTelegramMessage>[0]["cfg"];
telegramCfg?: Parameters<typeof dispatchTelegramMessage>[0]["telegramCfg"];
streamMode?: Parameters<typeof dispatchTelegramMessage>[0]["streamMode"];
telegramDeps?: TelegramBotDeps;
bot?: Bot;
replyToMode?: Parameters<typeof dispatchTelegramMessage>[0]["replyToMode"];
textLimit?: number;
}) {
const bot = params.bot ?? createBot();
await dispatchTelegramMessage({
context: params.context,
bot,
cfg: params.cfg ?? {},
runtime: createRuntime(),
replyToMode: params.replyToMode ?? "first",
streamMode: params.streamMode ?? "partial",
textLimit: params.textLimit ?? 4096,
telegramCfg: params.telegramCfg ?? {},
telegramDeps: params.telegramDeps ?? telegramDepsForTest,
opts: { token: "token" },
});
}
function createReasoningStreamContext(): TelegramMessageContext {
loadSessionStore.mockReturnValue({
s1: { reasoningLevel: "stream" },
});
return createContext({
ctxPayload: { SessionKey: "s1" } as unknown as TelegramMessageContext["ctxPayload"],
});
}
function createReasoningDefaultContext(): TelegramMessageContext {
loadSessionStore.mockReturnValue({
s1: {},
});
return createContext({
ctxPayload: { SessionKey: "s1" } as unknown as TelegramMessageContext["ctxPayload"],
route: { agentId: "ops" } as unknown as TelegramMessageContext["route"],
});
}
it("streams drafts in private threads and forwards thread id", async () => {
const draftStream = createDraftStream();
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Hello" });
await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
const context = createContext({
route: {
agentId: "work",
} as unknown as TelegramMessageContext["route"],
});
await dispatchWithContext({ context });
expect(createTelegramDraftStream).toHaveBeenCalledWith(
expect.objectContaining({
chatId: 123,
thread: { id: 777, scope: "dm" },
minInitialChars: 30,
}),
);
expect(draftStream.update).toHaveBeenCalledWith("Hello");
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
thread: { id: 777, scope: "dm" },
mediaLocalRoots: expect.arrayContaining([
expect.stringMatching(/[\\/]\.openclaw[\\/]workspace-work$/u),
]),
}),
);
expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledWith(
expect.objectContaining({
dispatcherOptions: expect.objectContaining({
beforeDeliver: expect.any(Function),
}),
replyOptions: expect.objectContaining({
disableBlockStreaming: true,
}),
}),
);
expect(editMessageTelegram).not.toHaveBeenCalled();
expect(draftStream.clear).toHaveBeenCalledTimes(1);
});
it("keeps retained overflow draft previews", async () => {
const draftStream = createDraftStream();
const bot = createBot();
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Hello" });
await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({ context: createContext(), bot });
const streamParams = createTelegramDraftStream.mock.calls[0]?.[0] as Parameters<
NonNullable<TelegramBotDeps["createTelegramDraftStream"]>
>[0];
streamParams.onSupersededPreview?.({
messageId: 17,
textSnapshot: "first page",
retain: true,
});
expect(bot.api.deleteMessage).not.toHaveBeenCalled();
streamParams.onSupersededPreview?.({
messageId: 18,
textSnapshot: "stale page",
});
await vi.waitFor(() => expect(bot.api.deleteMessage).toHaveBeenCalledWith(123, 18));
});
it("queues final Telegram replies through outbound delivery when available", async () => {
deliverInboundReplyWithMessageSendContext.mockResolvedValue({
status: "handled_visible",
delivery: {
messageIds: ["1001"],
visibleReplySent: true,
},
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Hello queued" }, { kind: "final" });
return { queuedFinal: true };
});
await dispatchWithContext({
context: createContext({
ctxPayload: {
SessionKey: "s1",
ChatType: "direct",
SenderId: "42",
SenderName: "Alice",
SenderUsername: "alice",
} as unknown as TelegramMessageContext["ctxPayload"],
}),
streamMode: "off",
telegramDeps: telegramDepsForTest,
});
expect(deliverInboundReplyWithMessageSendContext).toHaveBeenCalledWith(
expect.objectContaining({
channel: "telegram",
to: "123",
accountId: "default",
payload: expect.objectContaining({ text: "Hello queued" }),
info: { kind: "final" },
replyToMode: "first",
threadId: 777,
formatting: expect.objectContaining({ textLimit: 4096, tableMode: "preserve" }),
agentId: "default",
ctxPayload: expect.objectContaining({
SessionKey: "s1",
ChatType: "direct",
SenderId: "42",
SenderName: "Alice",
SenderUsername: "alice",
}),
}),
);
expect(deliverReplies).not.toHaveBeenCalled();
});
it("queues media-only final Telegram replies through outbound delivery when available", async () => {
deliverInboundReplyWithMessageSendContext.mockResolvedValue({
status: "handled_visible",
delivery: {
messageIds: ["1002"],
visibleReplySent: true,
},
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ mediaUrl: "file:///tmp/final.png" }, { kind: "final" });
return { queuedFinal: true };
});
await dispatchWithContext({
context: createContext(),
streamMode: "off",
telegramDeps: telegramDepsForTest,
});
expect(deliverInboundReplyWithMessageSendContext).toHaveBeenCalledWith(
expect.objectContaining({
channel: "telegram",
payload: expect.objectContaining({ mediaUrl: "file:///tmp/final.png" }),
info: { kind: "final" },
requiredCapabilities: expect.objectContaining({
media: true,
payload: true,
}),
}),
);
expect(deliverReplies).not.toHaveBeenCalled();
});
it("skips answer draft stream for same-chat selected quotes", async () => {
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Hello", replyToId: "1001" }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext({
msg: {
message_id: 1001,
} as unknown as TelegramMessageContext["msg"],
ctxPayload: {
MessageSid: "1001",
ReplyToId: "9001",
ReplyToBody: "quoted slice",
ReplyToQuoteText: " quoted slice\n",
ReplyToIsQuote: true,
} as unknown as TelegramMessageContext["ctxPayload"],
}),
});
expect(createTelegramDraftStream).not.toHaveBeenCalled();
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
replies: [expect.objectContaining({ replyToId: "9001" })],
replyQuoteMessageId: 9001,
replyQuoteText: " quoted slice\n",
}),
);
});
it("keeps answer draft stream for current message replies with native quote candidates", async () => {
const draftStream = createDraftStream();
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Hello", replyToId: "1001" }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext({
msg: {
message_id: 1001,
text: "Original current message",
entities: [{ type: "bold", offset: 0, length: 8 }],
} as unknown as TelegramMessageContext["msg"],
ctxPayload: {
MessageSid: "1001",
} as unknown as TelegramMessageContext["ctxPayload"],
}),
});
expect(createTelegramDraftStream).toHaveBeenCalledWith(
expect.objectContaining({
replyToMessageId: 1001,
}),
);
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
replies: [expect.objectContaining({ replyToId: "1001" })],
replyQuoteByMessageId: {
"1001": {
text: "Original current message",
position: 0,
entities: [{ type: "bold", offset: 0, length: 8 }],
},
},
}),
);
});
it("passes native quote candidates for explicit reply targets", async () => {
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Hello", replyToId: "9001" }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext({
ctxPayload: {
ReplyToId: "9001",
ReplyToBody: "trimmed body",
ReplyToQuoteSourceText: " exact reply body",
ReplyToQuoteSourceEntities: [{ type: "italic", offset: 2, length: 5 }],
} as unknown as TelegramMessageContext["ctxPayload"],
}),
});
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
replies: [expect.objectContaining({ replyToId: "9001" })],
replyQuoteByMessageId: {
"9001": {
text: " exact reply body",
position: 0,
entities: [{ type: "italic", offset: 2, length: 5 }],
},
},
}),
);
});
it("does not build native quote candidates when reply mode is off", async () => {
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Hello", replyToId: "1001" }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext({
msg: {
message_id: 1001,
text: "Original current message",
} as unknown as TelegramMessageContext["msg"],
ctxPayload: {
MessageSid: "1001",
} as unknown as TelegramMessageContext["ctxPayload"],
}),
replyToMode: "off",
});
expect(deliverReplies.mock.calls[0]?.[0]).not.toHaveProperty("replyQuoteByMessageId.1001");
});
it("keeps answer draft stream for selected quotes when reply mode is off", async () => {
const draftStream = createDraftStream();
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({ queuedFinal: true });
await dispatchWithContext({
context: createContext({
msg: {
message_id: 1001,
} as unknown as TelegramMessageContext["msg"],
ctxPayload: {
MessageSid: "1001",
ReplyToId: "9001",
ReplyToBody: "quoted slice",
ReplyToQuoteText: " quoted slice\n",
ReplyToIsQuote: true,
} as unknown as TelegramMessageContext["ctxPayload"],
}),
replyToMode: "off",
});
expect(createTelegramDraftStream).toHaveBeenCalledWith(
expect.objectContaining({
replyToMessageId: undefined,
}),
);
});
it("passes same-chat quoted reply target id with Telegram quote text", async () => {
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Hello", replyToId: "1001" }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext({
ctxPayload: {
MessageSid: "1001",
ReplyToId: "9001",
ReplyToBody: "quoted slice",
ReplyToQuoteText: " quoted slice\n",
ReplyToIsQuote: true,
ReplyToQuotePosition: 12,
ReplyToQuoteEntities: [{ type: "italic", offset: 0, length: 6 }],
} as unknown as TelegramMessageContext["ctxPayload"],
}),
streamMode: "off",
});
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
replies: [expect.objectContaining({ replyToId: "9001" })],
replyQuoteMessageId: 9001,
replyQuoteText: " quoted slice\n",
replyQuotePosition: 12,
replyQuoteEntities: [{ type: "italic", offset: 0, length: 6 }],
}),
);
});
it("does not pass a native quote target for external replies", async () => {
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Hello", replyToId: "1001" }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext({
ctxPayload: {
MessageSid: "1001",
ReplyToId: "9001",
ReplyToBody: "external quoted slice",
ReplyToQuoteText: " external quoted slice\n",
ReplyToIsQuote: true,
ReplyToIsExternal: true,
} as unknown as TelegramMessageContext["ctxPayload"],
}),
streamMode: "off",
});
const params = deliverReplies.mock.calls[0]?.[0];
expect(params).toEqual(
expect.objectContaining({
replies: [expect.objectContaining({ replyToId: "1001" })],
replyQuoteText: " external quoted slice\n",
}),
);
expect(params?.replyQuoteMessageId).toBeUndefined();
});
it("does not inject approval buttons in local dispatch once the monitor owns approvals", async () => {
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver(
{
text: "Mode: foreground\nRun: /approve 117ba06d allow-once (or allow-always / deny).",
},
{ kind: "final" },
);
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext(),
streamMode: "off",
cfg: {
channels: {
telegram: {
execApprovals: {
enabled: true,
approvers: ["123"],
target: "dm",
},
},
},
},
});
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
replies: [
expect.objectContaining({
text: "Mode: foreground\nRun: /approve 117ba06d allow-once (or allow-always / deny).",
}),
],
}),
);
const deliveredPayload = (deliverReplies.mock.calls[0]?.[0] as { replies?: Array<unknown> })
?.replies?.[0] as { channelData?: unknown } | undefined;
expect(deliveredPayload?.channelData).toBeUndefined();
});
it("uses 30-char stream debounce for legacy block stream mode", async () => {
const draftStream = createDraftStream();
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Hello" });
await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({ context: createContext(), streamMode: "block" });
expect(createTelegramDraftStream).toHaveBeenCalledWith(
expect.objectContaining({
minInitialChars: 30,
}),
);
});
it("keeps canonical block mode on the Telegram draft stream path", async () => {
const draftStream = createDraftStream();
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "HelloWorld" });
await dispatcherOptions.deliver({ text: "HelloWorld" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext(),
streamMode: "block",
telegramCfg: { streaming: { mode: "block" } },
});
expect(createTelegramDraftStream).toHaveBeenCalled();
expect(draftStream.update).toHaveBeenCalledWith("HelloWorld");
});
it("streams text-only finals into the answer message", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Final answer" }, { kind: "final" });
return { queuedFinal: true };
});
await dispatchWithContext({ context: createContext() });
expect(answerDraftStream.update).toHaveBeenCalledWith("Final answer");
expect(answerDraftStream.stop).toHaveBeenCalled();
expect(deliverReplies).not.toHaveBeenCalled();
expect(editMessageTelegram).not.toHaveBeenCalled();
expect(emitInternalMessageSentHook).toHaveBeenCalledWith(
expect.objectContaining({ content: "Final answer", messageId: 2001 }),
);
});
it("streams block and final text through the same answer message", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Working" });
await dispatcherOptions.deliver({ text: "Done" }, { kind: "final" });
return { queuedFinal: true };
},
);
await dispatchWithContext({ context: createContext() });
expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "Working");
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Done");
expect(answerDraftStream.stop).toHaveBeenCalled();
expect(deliverReplies).not.toHaveBeenCalled();
expect(editMessageTelegram).not.toHaveBeenCalled();
});
it("rotates the answer stream only after a finalized assistant message", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onPartialReply?.({ text: "Message B partial" });
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
return { queuedFinal: true };
},
);
await dispatchWithContext({ context: createContext() });
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "Message A final");
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Message B partial");
expect(answerDraftStream.update).toHaveBeenNthCalledWith(3, "Message B final");
expect(deliverReplies).not.toHaveBeenCalled();
});
it("keeps compaction replay on the same answer stream", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Partial before compaction" });
await replyOptions?.onCompactionStart?.();
await replyOptions?.onPartialReply?.({ text: "Partial before compaction" });
await dispatcherOptions.deliver({ text: "Final after compaction" }, { kind: "final" });
return { queuedFinal: true };
},
);
await dispatchWithContext({ context: createContext() });
expect(answerDraftStream.forceNewMessage).not.toHaveBeenCalled();
expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "Partial before compaction");
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Final after compaction");
expect(deliverReplies).not.toHaveBeenCalled();
});
it("keeps progress updates in a draft and sends the final answer normally", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onToolStart?.({ name: "exec", phase: "start" });
await replyOptions?.onItemEvent?.({
kind: "command",
name: "exec",
progressText: "git rev-parse --abbrev-ref HEAD",
});
await dispatcherOptions.deliver({ text: "Branch is up to date" }, { kind: "final" });
return { queuedFinal: true };
},
);
await dispatchWithContext({
context: createContext(),
streamMode: "progress",
telegramCfg: { streaming: { mode: "progress" } },
});
expect(answerDraftStream.update).toHaveBeenCalledWith(
expect.stringMatching(/`🛠️ Exec: git rev-parse --abbrev-ref HEAD`$/),
);
expect(answerDraftStream.update).not.toHaveBeenCalledWith("Branch is up to date");
expect(answerDraftStream.clear).toHaveBeenCalledTimes(1);
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
replies: [expect.objectContaining({ text: "Branch is up to date" })],
}),
);
expect(editMessageTelegram).not.toHaveBeenCalled();
});
it("streams the first long final chunk and sends follow-up chunks", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
const longText = "one ".repeat(80);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: longText }, { kind: "final" });
return { queuedFinal: true };
});
await dispatchWithContext({ context: createContext(), textLimit: 80 });
const firstChunk = answerDraftStream.update.mock.calls.at(-1)?.[0] ?? "";
expect(firstChunk.length).toBeLessThanOrEqual(80);
expect(deliverReplies).toHaveBeenCalled();
const followUpTexts = deliverReplies.mock.calls.flatMap((call: unknown[]) =>
((call[0] as { replies?: Array<{ text?: string }> }).replies ?? []).map(
(reply) => reply.text ?? "",
),
);
expect(followUpTexts.join("")).toContain("one");
expect(editMessageTelegram).not.toHaveBeenCalled();
});
it("falls back to normal send for media and clears the pending stream", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver(
{ text: "Photo", mediaUrl: "https://example.com/a.png" },
{ kind: "final" },
);
return { queuedFinal: true };
});
await dispatchWithContext({ context: createContext() });
expect(answerDraftStream.clear).toHaveBeenCalled();
expect(answerDraftStream.update).not.toHaveBeenCalledWith("Photo");
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
replies: [
expect.objectContaining({ text: "Photo", mediaUrl: "https://example.com/a.png" }),
],
}),
);
});
it("shows Telegram progress drafts immediately for explicit tool starts", async () => {
const draftStream = createSequencedDraftStream(2001);
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => {
await replyOptions?.onReplyStart?.();
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onToolStart?.({ name: "exec", phase: "start" });
return { queuedFinal: false };
});
await dispatchWithContext({
context: createContext(),
streamMode: "progress",
telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } },
});
expect(draftStream.update).toHaveBeenCalledWith(expect.stringMatching(/^Shelling\n`🛠️ Exec`$/));
expect(draftStream.flush).toHaveBeenCalled();
});
it("renders Telegram progress drafts before slow status reactions resolve", async () => {
const draftStream = createSequencedDraftStream(2001);
createTelegramDraftStream.mockReturnValue(draftStream);
let releaseSetTool: (() => void) | undefined;
const statusReactionController = createStatusReactionController();
statusReactionController.setTool.mockImplementation(
() =>
new Promise<void>((resolve) => {
releaseSetTool = resolve;
}),
);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => {
const pendingToolStart = replyOptions?.onToolStart?.({ name: "exec", phase: "start" });
await Promise.resolve();
await Promise.resolve();
const updateBeforeStatusReaction = draftStream.update.mock.calls.at(-1)?.[0];
releaseSetTool?.();
await pendingToolStart;
expect(updateBeforeStatusReaction).toMatch(/^Shelling\n`🛠️ Exec`$/);
return { queuedFinal: false };
});
await dispatchWithContext({
context: createContext({
statusReactionController: statusReactionController as never,
}),
streamMode: "progress",
telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } },
});
expect(statusReactionController.setTool).toHaveBeenCalledWith("exec");
});
it("keeps non-command Telegram progress draft lines across post-tool assistant boundaries", async () => {
const draftStream = createSequencedDraftStream(2001);
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onReplyStart?.();
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onItemEvent?.({ kind: "search", progressText: "docs lookup" });
await replyOptions?.onItemEvent?.({ progressText: "tests passed" });
await replyOptions?.onAssistantMessageStart?.();
await dispatcherOptions.deliver({ text: "Final after tool" }, { kind: "final" });
return { queuedFinal: true };
},
);
await dispatchWithContext({
context: createContext(),
streamMode: "progress",
telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } },
});
expect(draftStream.update).toHaveBeenCalledWith(
expect.stringMatching(/^Shelling\n`🔎 Web Search: docs lookup`\n• `tests passed`$/),
);
expect(draftStream.forceNewMessage).not.toHaveBeenCalled();
expect(draftStream.materialize).not.toHaveBeenCalled();
expect(draftStream.clear).toHaveBeenCalledTimes(1);
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
replies: [expect.objectContaining({ text: "Final after tool" })],
}),
);
expect(editMessageTelegram).not.toHaveBeenCalled();
});
it("falls back to normal send for error payloads and clears the pending stream", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Boom", isError: true }, { kind: "final" });
return { queuedFinal: true };
});
await dispatchWithContext({ context: createContext() });
expect(answerDraftStream.clear).toHaveBeenCalled();
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({ replies: [expect.objectContaining({ text: "Boom" })] }),
);
});
it("streams button-bearing text into the same message", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
const buttons = [[{ text: "OK", callback_data: "ok" }]];
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver(
{ text: "Choose", channelData: { telegram: { buttons } } },
{ kind: "final" },
);
return { queuedFinal: true };
});
await dispatchWithContext({ context: createContext() });
expect(answerDraftStream.update).toHaveBeenCalledWith("Choose");
expect(editMessageTelegram).toHaveBeenCalledWith(
123,
2001,
"Choose",
expect.objectContaining({ buttons }),
);
expect(deliverReplies).not.toHaveBeenCalled();
});
it("streams reasoning and answer text on separate lanes", async () => {
const { answerDraftStream, reasoningDraftStream } = setupDraftStreams({
answerMessageId: 2001,
reasoningMessageId: 3001,
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onReasoningStream?.({ text: "<think>Thinking</think>" });
await dispatcherOptions.deliver({ text: "Answer" }, { kind: "final" });
return { queuedFinal: true };
},
);
await dispatchWithContext({ context: createReasoningStreamContext() });
expect(reasoningDraftStream.update).toHaveBeenCalledWith("Reasoning:\n_Thinking_");
expect(answerDraftStream.update).toHaveBeenCalledWith("Answer");
expect(deliverReplies).not.toHaveBeenCalled();
});
it("streams reasoning from configured defaults", async () => {
const { answerDraftStream, reasoningDraftStream } = setupDraftStreams({
answerMessageId: 2001,
reasoningMessageId: 3001,
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onReasoningStream?.({ text: "<think>Thinking</think>" });
await dispatcherOptions.deliver({ text: "Answer" }, { kind: "final" });
return { queuedFinal: true };
},
);
await dispatchWithContext({
context: createReasoningDefaultContext(),
cfg: {
agents: {
defaults: { reasoningDefault: "off" },
list: [{ id: "Ops", reasoningDefault: "stream" }],
},
},
});
expect(reasoningDraftStream.update).toHaveBeenCalledWith("Reasoning:\n_Thinking_");
expect(answerDraftStream.update).toHaveBeenCalledWith("Answer");
});
it("suppresses reasoning-only finals without raw text fallback", async () => {
setupDraftStreams({ answerMessageId: 2001, reasoningMessageId: 3001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "<think>hidden</think>" }, { kind: "final" });
return { queuedFinal: true };
});
await dispatchWithContext({ context: createContext() });
expect(deliverReplies).not.toHaveBeenCalled();
expect(editMessageTelegram).not.toHaveBeenCalled();
});
it("does not add silent fallback when source delivery is message-tool-only", async () => {
setupDraftStreams({ answerMessageId: 2001, reasoningMessageId: 3001 });
dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({
queuedFinal: false,
counts: { block: 0, final: 0, tool: 0 },
sourceReplyDeliveryMode: "message_tool_only",
});
await dispatchWithContext({
context: createContext({
ctxPayload: {
SessionKey: "agent:main:telegram:direct:123",
} as unknown as TelegramMessageContext["ctxPayload"],
}),
cfg: {
agents: {
defaults: {
silentReply: {
direct: "disallow",
group: "allow",
internal: "allow",
},
silentReplyRewrite: {
direct: true,
},
},
},
},
});
expect(deliverReplies).not.toHaveBeenCalled();
expect(editMessageTelegram).not.toHaveBeenCalled();
expect(sendMessageTelegram).not.toHaveBeenCalled();
});
it("shows compacting reaction during auto-compaction and resumes thinking", async () => {
const statusReactionController = {
setThinking: vi.fn(async () => {}),
setCompacting: vi.fn(async () => {}),
setTool: vi.fn(async () => {}),
setDone: vi.fn(async () => {}),
setError: vi.fn(async () => {}),
setQueued: vi.fn(async () => {}),
cancelPending: vi.fn(() => {}),
clear: vi.fn(async () => {}),
restoreInitial: vi.fn(async () => {}),
};
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => {
await replyOptions?.onCompactionStart?.();
await replyOptions?.onCompactionEnd?.();
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext({
statusReactionController: statusReactionController as never,
}),
streamMode: "off",
});
expect(statusReactionController.setCompacting).toHaveBeenCalledTimes(1);
expect(statusReactionController.cancelPending).toHaveBeenCalledTimes(1);
expect(statusReactionController.setThinking).toHaveBeenCalledTimes(2);
expect(statusReactionController.setCompacting.mock.invocationCallOrder[0]).toBeLessThan(
statusReactionController.cancelPending.mock.invocationCallOrder[0],
);
expect(statusReactionController.cancelPending.mock.invocationCallOrder[0]).toBeLessThan(
statusReactionController.setThinking.mock.invocationCallOrder[1],
);
});
it("does not supersede the same session for unauthorized abort-looking commands", async () => {
let releaseFirstFinal: (() => void) | undefined;
const firstFinalGate = new Promise<void>((resolve) => {
releaseFirstFinal = resolve;
});
let resolveStreamVisible: (() => void) | undefined;
const streamVisible = new Promise<void>((resolve) => {
resolveStreamVisible = resolve;
});
const firstAnswerDraft = createTestDraftStream({
messageId: 1001,
onUpdate: (text) => {
if (text === "Old reply partial") {
if (!resolveStreamVisible) {
throw new Error("Expected Telegram stream-visible resolver to be initialized");
}
resolveStreamVisible();
}
},
});
const firstReasoningDraft = createDraftStream();
const unauthorizedAnswerDraft = createDraftStream();
const unauthorizedReasoningDraft = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => firstAnswerDraft)
.mockImplementationOnce(() => firstReasoningDraft)
.mockImplementationOnce(() => unauthorizedAnswerDraft)
.mockImplementationOnce(() => unauthorizedReasoningDraft);
dispatchReplyWithBufferedBlockDispatcher
.mockImplementationOnce(async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Old reply partial" });
await firstFinalGate;
await dispatcherOptions.deliver({ text: "Old reply final" }, { kind: "final" });
return { queuedFinal: true };
})
.mockImplementationOnce(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Unauthorized stop" }, { kind: "final" });
return { queuedFinal: true };
});
const unauthorizedReplyDelivered = observeDeliveredReply("Unauthorized stop");
const firstPromise = dispatchWithContext({
context: createContext({
ctxPayload: {
SessionKey: "s1",
Body: "earlier request",
RawBody: "earlier request",
} as never,
}),
});
await streamVisible;
const unauthorizedPromise = dispatchWithContext({
context: createContext({
ctxPayload: {
SessionKey: "s1",
Body: "/stop",
RawBody: "/stop",
CommandBody: "/stop",
CommandAuthorized: false,
} as never,
}),
});
await unauthorizedReplyDelivered;
if (!releaseFirstFinal) {
throw new Error("Expected first Telegram final release callback to be initialized");
}
releaseFirstFinal();
await Promise.all([firstPromise, unauthorizedPromise]);
expect(firstAnswerDraft.update).toHaveBeenCalledWith("Old reply final");
expect(editMessageTelegram).not.toHaveBeenCalled();
});
it("uses configured doneHoldMs when clearing Telegram status reactions after reply", async () => {
vi.useFakeTimers();
const reactionApi = vi.fn(async () => true);
const statusReactionController = createStatusReactionController();
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Done" }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
try {
await dispatchWithContext({
context: createContext({
reactionApi: reactionApi as never,
removeAckAfterReply: true,
statusReactionController: statusReactionController as never,
}),
cfg: {
messages: {
statusReactions: {
timing: {
doneHoldMs: 250,
},
},
},
},
streamMode: "off",
});
expect(statusReactionController.setDone).toHaveBeenCalledTimes(1);
expect(statusReactionController.restoreInitial).not.toHaveBeenCalled();
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
await vi.advanceTimersByTimeAsync(249);
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
await vi.advanceTimersByTimeAsync(1);
expect(reactionApi).toHaveBeenCalledWith(123, 456, []);
} finally {
vi.useRealTimers();
}
});
it("restores the initial Telegram status reaction after reply when removeAckAfterReply is disabled", async () => {
const reactionApi = vi.fn(async () => true);
const statusReactionController = createStatusReactionController();
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Done" }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext({
reactionApi: reactionApi as never,
removeAckAfterReply: false,
statusReactionController: statusReactionController as never,
}),
streamMode: "off",
});
await vi.waitFor(() => {
expect(statusReactionController.setDone).toHaveBeenCalledTimes(1);
expect(statusReactionController.restoreInitial).toHaveBeenCalledTimes(1);
});
expect(statusReactionController.setError).not.toHaveBeenCalled();
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
});
it("uses configured errorHoldMs to clear Telegram status reactions after an error fallback", async () => {
vi.useFakeTimers();
const reactionApi = vi.fn(async () => true);
const statusReactionController = createStatusReactionController();
dispatchReplyWithBufferedBlockDispatcher.mockRejectedValue(new Error("dispatcher exploded"));
deliverReplies.mockResolvedValue({ delivered: true });
try {
await dispatchWithContext({
context: createContext({
reactionApi: reactionApi as never,
removeAckAfterReply: true,
statusReactionController: statusReactionController as never,
}),
cfg: {
messages: {
statusReactions: {
timing: {
errorHoldMs: 320,
},
},
},
},
streamMode: "off",
});
expect(statusReactionController.setError).toHaveBeenCalledTimes(1);
expect(statusReactionController.setDone).not.toHaveBeenCalled();
expect(statusReactionController.restoreInitial).not.toHaveBeenCalled();
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
await vi.advanceTimersByTimeAsync(319);
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
await vi.advanceTimersByTimeAsync(1);
expect(reactionApi).toHaveBeenCalledWith(123, 456, []);
} finally {
vi.useRealTimers();
}
});
it("restores the initial Telegram status reaction after an error when no final reply is sent", async () => {
vi.useFakeTimers();
const reactionApi = vi.fn(async () => true);
const statusReactionController = createStatusReactionController();
dispatchReplyWithBufferedBlockDispatcher.mockRejectedValue(new Error("dispatcher exploded"));
deliverReplies.mockResolvedValue({ delivered: false });
try {
await dispatchWithContext({
context: createContext({
reactionApi: reactionApi as never,
removeAckAfterReply: true,
statusReactionController: statusReactionController as never,
}),
cfg: {
messages: {
statusReactions: {
timing: {
errorHoldMs: 320,
},
},
},
},
streamMode: "off",
});
expect(statusReactionController.setError).toHaveBeenCalledTimes(1);
expect(statusReactionController.restoreInitial).not.toHaveBeenCalled();
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
await vi.advanceTimersByTimeAsync(319);
expect(statusReactionController.restoreInitial).not.toHaveBeenCalled();
await vi.advanceTimersByTimeAsync(1);
expect(statusReactionController.restoreInitial).toHaveBeenCalledTimes(1);
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
} finally {
vi.useRealTimers();
}
});
it("restores the initial Telegram status reaction after an error fallback when removeAckAfterReply is disabled", async () => {
const reactionApi = vi.fn(async () => true);
const statusReactionController = createStatusReactionController();
dispatchReplyWithBufferedBlockDispatcher.mockRejectedValue(new Error("dispatcher exploded"));
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext({
reactionApi: reactionApi as never,
removeAckAfterReply: false,
statusReactionController: statusReactionController as never,
}),
streamMode: "off",
});
await vi.waitFor(() => {
expect(statusReactionController.setError).toHaveBeenCalledTimes(1);
expect(statusReactionController.restoreInitial).toHaveBeenCalledTimes(1);
});
expect(statusReactionController.setDone).not.toHaveBeenCalled();
expect(reactionApi).not.toHaveBeenCalledWith(123, 456, []);
});
it("uses resolved DM config for auto-topic-label overrides", async () => {
dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({ queuedFinal: true });
loadSessionStore.mockReturnValue({ s1: {} });
const bot = createBot();
await dispatchWithContext({
bot,
context: createContext({
ctxPayload: {
SessionKey: "s1",
RawBody: "Need help with invoices",
} as TelegramMessageContext["ctxPayload"],
groupConfig: {
autoTopicLabel: false,
} as TelegramMessageContext["groupConfig"],
}),
telegramCfg: { autoTopicLabel: true },
cfg: {
channels: {
telegram: {
direct: {
"123": { autoTopicLabel: true },
},
},
},
},
});
expect(generateTopicLabel).not.toHaveBeenCalled();
expect(bot.api.editForumTopic).not.toHaveBeenCalled();
});
});