fix(telegram): bridge direct delivery to internal message:sent hooks (#40185)

* telegram: bridge direct delivery message hooks

* telegram: align sent hooks with command session
This commit is contained in:
Vincent Koc
2026-03-09 11:21:19 -07:00
committed by GitHub
parent 12702e11a5
commit 7b88249c9e
4 changed files with 203 additions and 38 deletions

View File

@@ -433,6 +433,9 @@ export const dispatchTelegramMessage = async ({
const deliveryBaseOptions = {
chatId: String(chatId),
accountId: route.accountId,
sessionKeyForInternalHooks: ctxPayload.SessionKey,
mirrorIsGroup: isGroup,
mirrorGroupId: isGroup ? String(chatId) : undefined,
token: opts.token,
runtime,
bot,

View File

@@ -516,6 +516,9 @@ export const registerTelegramNativeCommands = ({
const buildCommandDeliveryBaseOptions = (params: {
chatId: string | number;
accountId: string;
sessionKeyForInternalHooks?: string;
mirrorIsGroup?: boolean;
mirrorGroupId?: string;
mediaLocalRoots?: readonly string[];
threadSpec: ReturnType<typeof resolveTelegramThreadSpec>;
tableMode: ReturnType<typeof resolveMarkdownTableMode>;
@@ -523,6 +526,9 @@ export const registerTelegramNativeCommands = ({
}) => ({
chatId: String(params.chatId),
accountId: params.accountId,
sessionKeyForInternalHooks: params.sessionKeyForInternalHooks,
mirrorIsGroup: params.mirrorIsGroup,
mirrorGroupId: params.mirrorGroupId,
token: opts.token,
runtime,
bot,
@@ -589,14 +595,6 @@ export const registerTelegramNativeCommands = ({
return;
}
const { threadSpec, route, mediaLocalRoots, tableMode, chunkMode } = runtimeContext;
const deliveryBaseOptions = buildCommandDeliveryBaseOptions({
chatId,
accountId: route.accountId,
mediaLocalRoots,
threadSpec,
tableMode,
chunkMode,
});
const threadParams = buildTelegramThreadParams(threadSpec) ?? {};
const commandDefinition = findCommandByNativeName(command.name, "telegram");
@@ -671,6 +669,17 @@ export const registerTelegramNativeCommands = ({
userId: String(senderId || chatId),
targetSessionKey: sessionKey,
});
const deliveryBaseOptions = buildCommandDeliveryBaseOptions({
chatId,
accountId: route.accountId,
sessionKeyForInternalHooks: commandSessionKey,
mirrorIsGroup: isGroup,
mirrorGroupId: isGroup ? String(chatId) : undefined,
mediaLocalRoots,
threadSpec,
tableMode,
chunkMode,
});
const conversationLabel = isGroup
? msg.chat.title
? `${msg.chat.title} id:${chatId}`
@@ -827,6 +836,9 @@ export const registerTelegramNativeCommands = ({
const deliveryBaseOptions = buildCommandDeliveryBaseOptions({
chatId,
accountId: route.accountId,
sessionKeyForInternalHooks: route.sessionKey,
mirrorIsGroup: isGroup,
mirrorGroupId: isGroup ? String(chatId) : undefined,
mediaLocalRoots,
threadSpec,
tableMode,

View File

@@ -4,6 +4,14 @@ import type { ReplyPayload } from "../../auto-reply/types.js";
import type { ReplyToMode } from "../../config/config.js";
import type { MarkdownTableMode } from "../../config/types.base.js";
import { danger, logVerbose } from "../../globals.js";
import { fireAndForgetHook } from "../../hooks/fire-and-forget.js";
import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js";
import {
buildCanonicalSentMessageHookContext,
toInternalMessageSentContext,
toPluginMessageContext,
toPluginMessageSentEvent,
} from "../../hooks/message-hook-mappers.js";
import { formatErrorMessage } from "../../infra/errors.js";
import { buildOutboundMediaLoadOptions } from "../../media/load-options.js";
import { isGifMedia, kindFromMime } from "../../media/mime.js";
@@ -493,10 +501,68 @@ async function maybePinFirstDeliveredMessage(params: {
}
}
function emitMessageSentHooks(params: {
hookRunner: ReturnType<typeof getGlobalHookRunner>;
enabled: boolean;
sessionKeyForInternalHooks?: string;
chatId: string;
accountId?: string;
content: string;
success: boolean;
error?: string;
messageId?: number;
isGroup?: boolean;
groupId?: string;
}): void {
if (!params.enabled && !params.sessionKeyForInternalHooks) {
return;
}
const canonical = buildCanonicalSentMessageHookContext({
to: params.chatId,
content: params.content,
success: params.success,
error: params.error,
channelId: "telegram",
accountId: params.accountId,
conversationId: params.chatId,
messageId: typeof params.messageId === "number" ? String(params.messageId) : undefined,
isGroup: params.isGroup,
groupId: params.groupId,
});
if (params.enabled) {
fireAndForgetHook(
Promise.resolve(
params.hookRunner!.runMessageSent(
toPluginMessageSentEvent(canonical),
toPluginMessageContext(canonical),
),
),
"telegram: message_sent plugin hook failed",
);
}
if (!params.sessionKeyForInternalHooks) {
return;
}
fireAndForgetHook(
triggerInternalHook(
createInternalHookEvent(
"message",
"sent",
params.sessionKeyForInternalHooks,
toInternalMessageSentContext(canonical),
),
),
"telegram: message:sent internal hook failed",
);
}
export async function deliverReplies(params: {
replies: ReplyPayload[];
chatId: string;
accountId?: string;
sessionKeyForInternalHooks?: string;
mirrorIsGroup?: boolean;
mirrorGroupId?: string;
token: string;
runtime: RuntimeEnv;
bot: Bot;
@@ -622,37 +688,31 @@ export async function deliverReplies(params: {
firstDeliveredMessageId,
});
if (hasMessageSentHooks) {
const deliveredThisReply = progress.deliveredCount > deliveredCountBeforeReply;
void hookRunner?.runMessageSent(
{
to: params.chatId,
content: contentForSentHook,
success: deliveredThisReply,
},
{
channelId: "telegram",
accountId: params.accountId,
conversationId: params.chatId,
},
);
}
emitMessageSentHooks({
hookRunner,
enabled: hasMessageSentHooks,
sessionKeyForInternalHooks: params.sessionKeyForInternalHooks,
chatId: params.chatId,
accountId: params.accountId,
content: contentForSentHook,
success: progress.deliveredCount > deliveredCountBeforeReply,
messageId: firstDeliveredMessageId,
isGroup: params.mirrorIsGroup,
groupId: params.mirrorGroupId,
});
} catch (error) {
if (hasMessageSentHooks) {
void hookRunner?.runMessageSent(
{
to: params.chatId,
content: contentForSentHook,
success: false,
error: error instanceof Error ? error.message : String(error),
},
{
channelId: "telegram",
accountId: params.accountId,
conversationId: params.chatId,
},
);
}
emitMessageSentHooks({
hookRunner,
enabled: hasMessageSentHooks,
sessionKeyForInternalHooks: params.sessionKeyForInternalHooks,
chatId: params.chatId,
accountId: params.accountId,
content: contentForSentHook,
success: false,
error: error instanceof Error ? error.message : String(error),
isGroup: params.mirrorIsGroup,
groupId: params.mirrorGroupId,
});
throw error;
}
}

View File

@@ -4,6 +4,7 @@ import type { RuntimeEnv } from "../../runtime.js";
import { deliverReplies } from "./delivery.js";
const loadWebMedia = vi.fn();
const triggerInternalHook = vi.hoisted(() => vi.fn(async () => {}));
const messageHookRunner = vi.hoisted(() => ({
hasHooks: vi.fn<(name: string) => boolean>(() => false),
runMessageSending: vi.fn(),
@@ -31,6 +32,16 @@ vi.mock("../../plugins/hook-runner-global.js", () => ({
getGlobalHookRunner: () => messageHookRunner,
}));
vi.mock("../../hooks/internal-hooks.js", async () => {
const actual = await vi.importActual<typeof import("../../hooks/internal-hooks.js")>(
"../../hooks/internal-hooks.js",
);
return {
...actual,
triggerInternalHook,
};
});
vi.mock("grammy", () => ({
InputFile: class {
constructor(
@@ -108,6 +119,7 @@ function createVoiceFailureHarness(params: {
describe("deliverReplies", () => {
beforeEach(() => {
loadWebMedia.mockClear();
triggerInternalHook.mockReset();
messageHookRunner.hasHooks.mockReset();
messageHookRunner.hasHooks.mockReturnValue(false);
messageHookRunner.runMessageSending.mockReset();
@@ -199,6 +211,84 @@ describe("deliverReplies", () => {
);
});
it("emits internal message:sent when session hook context is available", async () => {
const runtime = createRuntime(false);
const sendMessage = vi.fn().mockResolvedValue({ message_id: 9, chat: { id: "123" } });
const bot = createBot({ sendMessage });
await deliverWith({
sessionKeyForInternalHooks: "agent:test:telegram:123",
mirrorIsGroup: true,
mirrorGroupId: "123",
replies: [{ text: "hello" }],
runtime,
bot,
});
expect(triggerInternalHook).toHaveBeenCalledWith(
expect.objectContaining({
type: "message",
action: "sent",
sessionKey: "agent:test:telegram:123",
context: expect.objectContaining({
to: "123",
content: "hello",
success: true,
channelId: "telegram",
conversationId: "123",
messageId: "9",
isGroup: true,
groupId: "123",
}),
}),
);
});
it("does not emit internal message:sent without a session key", async () => {
const runtime = createRuntime(false);
const sendMessage = vi.fn().mockResolvedValue({ message_id: 11, chat: { id: "123" } });
const bot = createBot({ sendMessage });
await deliverWith({
replies: [{ text: "hello" }],
runtime,
bot,
});
expect(triggerInternalHook).not.toHaveBeenCalled();
});
it("emits internal message:sent with success=false on delivery failure", async () => {
const runtime = createRuntime(false);
const sendMessage = vi.fn().mockRejectedValue(new Error("network error"));
const bot = createBot({ sendMessage });
await expect(
deliverWith({
sessionKeyForInternalHooks: "agent:test:telegram:123",
replies: [{ text: "hello" }],
runtime,
bot,
}),
).rejects.toThrow("network error");
expect(triggerInternalHook).toHaveBeenCalledWith(
expect.objectContaining({
type: "message",
action: "sent",
sessionKey: "agent:test:telegram:123",
context: expect.objectContaining({
to: "123",
content: "hello",
success: false,
error: "network error",
channelId: "telegram",
conversationId: "123",
}),
}),
);
});
it("passes media metadata to message_sending hooks", async () => {
messageHookRunner.hasHooks.mockImplementation((name: string) => name === "message_sending");