From 56cc1507715899b98040fb7deefcef5cd2b22621 Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Wed, 13 May 2026 12:07:10 +0530 Subject: [PATCH] feat(telegram): route ambient chatter as room events --- .../telegram/src/bot-message-context.body.ts | 4 +- ...ot-message-context.require-mention.test.ts | 54 +++++++++++++++++++ .../src/bot-message-context.session.ts | 23 +++++++- .../src/bot-message-context.test-harness.ts | 6 ++- .../telegram/src/bot-message-context.ts | 1 + .../telegram/src/bot-message-dispatch.test.ts | 53 ++++++++++++++++++ .../telegram/src/bot-message-dispatch.ts | 20 +++++-- 7 files changed, 152 insertions(+), 9 deletions(-) diff --git a/extensions/telegram/src/bot-message-context.body.ts b/extensions/telegram/src/bot-message-context.body.ts index eeee8acf5c3..f630ee4d40f 100644 --- a/extensions/telegram/src/bot-message-context.body.ts +++ b/extensions/telegram/src/bot-message-context.body.ts @@ -69,6 +69,7 @@ export type TelegramInboundBodyResult = { effectiveWasMentioned: boolean; canDetectMention: boolean; shouldBypassMention: boolean; + hasControlCommand: boolean; audioTranscribedMediaIndex?: number; stickerCacheHit: boolean; locationData?: NormalizedLocation; @@ -341,7 +342,7 @@ export async function resolveTelegramInboundBody(params: { canDetectMention, wasMentioned, hasAnyMention, - implicitMentionKinds: isGroup && Boolean(requireMention) ? implicitMentionKinds : [], + implicitMentionKinds: isGroup ? implicitMentionKinds : [], }, policy: { isGroup, @@ -419,6 +420,7 @@ export async function resolveTelegramInboundBody(params: { effectiveWasMentioned, canDetectMention, shouldBypassMention: mentionDecision.shouldBypassMention, + hasControlCommand: hasControlCommandInMessage, ...(audioTranscribedMediaIndex !== undefined && audioTranscribedMediaIndex >= 0 ? { audioTranscribedMediaIndex } : {}), diff --git a/extensions/telegram/src/bot-message-context.require-mention.test.ts b/extensions/telegram/src/bot-message-context.require-mention.test.ts index 751480dc4f2..97398d69e39 100644 --- a/extensions/telegram/src/bot-message-context.require-mention.test.ts +++ b/extensions/telegram/src/bot-message-context.require-mention.test.ts @@ -61,6 +61,60 @@ describe("buildTelegramMessageContext requireMention precedence", () => { } }); + it("marks always-on ambient group messages as room events", async () => { + const ctx = await buildTelegramMessageContextForTest({ + message: buildForumMessage(), + resolveGroupActivation: () => false, + resolveGroupRequireMention: () => false, + resolveTelegramGroupConfig: () => ({ + groupConfig: { requireMention: false }, + topicConfig: undefined, + }), + }); + + expect(ctx?.ctxPayload.InboundTurnKind).toBe("room_event"); + }); + + it("keeps room events as context for the next direct group request", async () => { + const groupHistories = new Map(); + await buildTelegramMessageContextForTest({ + message: { ...buildForumMessage(99), text: "side chatter" }, + historyLimit: 10, + groupHistories, + resolveGroupActivation: () => false, + resolveGroupRequireMention: () => false, + resolveTelegramGroupConfig: () => ({ + groupConfig: { requireMention: false }, + topicConfig: undefined, + }), + }); + + const ctx = await buildTelegramMessageContextForTest({ + message: { + ...buildForumMessage(99), + message_id: 2, + text: "replying directly", + reply_to_message: { + message_id: 10, + chat: { id: -1001234567890, type: "supergroup", title: "Forum", is_forum: true }, + from: { id: 7, first_name: "Bot", username: "bot", is_bot: true }, + text: "previous bot message", + }, + }, + historyLimit: 10, + groupHistories, + resolveGroupActivation: () => false, + resolveGroupRequireMention: () => false, + resolveTelegramGroupConfig: () => ({ + groupConfig: { requireMention: false }, + topicConfig: undefined, + }), + }); + + expect(ctx?.ctxPayload.InboundTurnKind).toBe("user_request"); + expect(ctx?.ctxPayload.Body).toContain("side chatter"); + }); + it("lets explicit topic requireMention=false override mention activation", async () => { const resolveGroupActivation = vi.fn(() => true); diff --git a/extensions/telegram/src/bot-message-context.session.ts b/extensions/telegram/src/bot-message-context.session.ts index 3d77657eb67..93ea39de522 100644 --- a/extensions/telegram/src/bot-message-context.session.ts +++ b/extensions/telegram/src/bot-message-context.session.ts @@ -175,6 +175,7 @@ export async function buildTelegramInboundContextPayload(params: { topicConfig?: TelegramTopicConfig; stickerCacheHit: boolean; effectiveWasMentioned: boolean; + hasControlCommand: boolean; audioTranscribedMediaIndex?: number; commandAuthorized: boolean; locationData?: NormalizedLocation; @@ -223,6 +224,7 @@ export async function buildTelegramInboundContextPayload(params: { topicConfig, stickerCacheHit, effectiveWasMentioned, + hasControlCommand, audioTranscribedMediaIndex, commandAuthorized, locationData, @@ -368,9 +370,9 @@ export async function buildTelegramInboundContextPayload(params: { previousTimestamp, envelope: envelopeOptions, }); + const channelHistory = createChannelHistoryWindow({ historyMap: groupHistories }); let combinedBody = body; if (isGroup && historyKey && historyLimit > 0) { - const channelHistory = createChannelHistoryWindow({ historyMap: groupHistories }); combinedBody = channelHistory.buildPendingContext({ historyKey, limit: historyLimit, @@ -397,7 +399,7 @@ export async function buildTelegramInboundContextPayload(params: { }); const inboundHistory = isGroup && historyKey && historyLimit > 0 - ? createChannelHistoryWindow({ historyMap: groupHistories }).buildInboundHistory({ + ? channelHistory.buildInboundHistory({ historyKey, limit: historyLimit, }) @@ -411,6 +413,10 @@ export async function buildTelegramInboundContextPayload(params: { const telegramTo = `telegram:${chatId}`; const locationContext = locationData ? toLocationContext(locationData) : undefined; const commandSource = options?.commandSource; + const inboundTurnKind = + isGroup && !effectiveWasMentioned && !hasControlCommand && commandSource !== "native" + ? "room_event" + : "user_request"; const ctxPayload = sessionRuntime.buildChannelTurnContext({ channel: "telegram", accountId: route.accountId, @@ -447,6 +453,7 @@ export async function buildTelegramInboundContextPayload(params: { messageThreadId: threadSpec.id, }, message: { + inboundTurnKind, body: combinedBody, rawBody, bodyForAgent: bodyText, @@ -538,6 +545,18 @@ export async function buildTelegramInboundContextPayload(params: { TopicName: isForum && topicName ? topicName : undefined, }, } satisfies BuildChannelTurnContextParams); + if (inboundTurnKind === "room_event" && historyKey) { + channelHistory.record({ + historyKey, + limit: historyLimit, + entry: { + sender: buildSenderLabel(msg, senderId || chatId), + body: rawBody, + timestamp: msg.date ? msg.date * 1000 : undefined, + messageId: typeof msg.message_id === "number" ? String(msg.message_id) : undefined, + }, + }); + } const pinnedMainDmOwner = !isGroup ? sessionRuntime.resolvePinnedMainDmOwnerFromAllowlist({ diff --git a/extensions/telegram/src/bot-message-context.test-harness.ts b/extensions/telegram/src/bot-message-context.test-harness.ts index 6a969bcc7b9..83d01186956 100644 --- a/extensions/telegram/src/bot-message-context.test-harness.ts +++ b/extensions/telegram/src/bot-message-context.test-harness.ts @@ -15,6 +15,8 @@ type BuildTelegramMessageContextForTestParams = { options?: BuildTelegramMessageContextParams["options"]; cfg?: Record; accountId?: string; + historyLimit?: number; + groupHistories?: Map; ackReactionScope?: BuildTelegramMessageContextParams["ackReactionScope"]; botApi?: Record; runtime?: BuildTelegramMessageContextParams["runtime"]; @@ -77,8 +79,8 @@ export async function buildTelegramMessageContextForTest( }, sessionRuntime, account: { accountId: params.accountId ?? "default" } as never, - historyLimit: 0, - groupHistories: new Map(), + historyLimit: params.historyLimit ?? 0, + groupHistories: params.groupHistories ?? new Map(), dmPolicy: "open", allowFrom: ["*"], groupAllowFrom: [], diff --git a/extensions/telegram/src/bot-message-context.ts b/extensions/telegram/src/bot-message-context.ts index bc1975ca0e6..017bbb59507 100644 --- a/extensions/telegram/src/bot-message-context.ts +++ b/extensions/telegram/src/bot-message-context.ts @@ -603,6 +603,7 @@ export const buildTelegramMessageContext = async ({ topicConfig, stickerCacheHit: bodyResult.stickerCacheHit, effectiveWasMentioned: bodyResult.effectiveWasMentioned, + hasControlCommand: bodyResult.hasControlCommand, ...(bodyResult.audioTranscribedMediaIndex !== undefined ? { audioTranscribedMediaIndex: bodyResult.audioTranscribedMediaIndex } : {}), diff --git a/extensions/telegram/src/bot-message-dispatch.test.ts b/extensions/telegram/src/bot-message-dispatch.test.ts index 114730c4964..06334303a55 100644 --- a/extensions/telegram/src/bot-message-dispatch.test.ts +++ b/extensions/telegram/src/bot-message-dispatch.test.ts @@ -1498,6 +1498,59 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(sendMessageTelegram).not.toHaveBeenCalled(); }); + it("runs ambient room events as tool-only invisible turns", async () => { + const historyKey = "telegram:group:-100123"; + const groupHistories = new Map([ + [historyKey, [{ sender: "Alice", body: "side chatter", timestamp: 1 }]], + ]); + dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({ + queuedFinal: false, + counts: { block: 0, final: 0, tool: 0 }, + sourceReplyDeliveryMode: "message_tool_only", + }); + + await dispatchWithContext({ + context: createContext({ + ctxPayload: { + InboundTurnKind: "room_event", + SessionKey: "agent:main:telegram:group:-100123", + ChatType: "group", + MessageSid: "99", + RawBody: "ambient", + BodyForAgent: "ambient", + CommandBody: "ambient", + } as unknown as TelegramMessageContext["ctxPayload"], + msg: { + chat: { id: -100123, type: "supergroup" }, + message_id: 99, + } as unknown as TelegramMessageContext["msg"], + chatId: -100123, + isGroup: true, + historyKey, + historyLimit: 10, + groupHistories, + threadSpec: { id: undefined, scope: "none" }, + }), + streamMode: "partial", + }); + + const dispatchParams = mockCallArg(dispatchReplyWithBufferedBlockDispatcher) as { + replyOptions?: { + sourceReplyDeliveryMode?: string; + suppressTyping?: boolean; + allowProgressCallbacksWhenSourceDeliverySuppressed?: boolean; + }; + }; + expect(dispatchParams.replyOptions?.sourceReplyDeliveryMode).toBe("message_tool_only"); + expect(dispatchParams.replyOptions?.suppressTyping).toBe(true); + expect(dispatchParams.replyOptions?.allowProgressCallbacksWhenSourceDeliverySuppressed).toBe( + false, + ); + expect(createTelegramDraftStream).not.toHaveBeenCalled(); + expect(deliverReplies).not.toHaveBeenCalled(); + expect(groupHistories.get(historyKey)).toHaveLength(1); + }); + it("shows compacting reaction during auto-compaction and resumes thinking", async () => { const statusReactionController = { setThinking: vi.fn(async () => {}), diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index 5fbc5fe4d76..6ea4043d384 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -480,6 +480,7 @@ export const dispatchTelegramMessage = async ({ const accountBlockStreamingEnabled = resolveChannelStreamingBlockEnabled(telegramCfg) ?? cfg.agents?.defaults?.blockStreamingDefault === "on"; + const isRoomEvent = ctxPayload.InboundTurnKind === "room_event"; const resolvedReasoningLevel = resolveTelegramReasoningLevel({ cfg, sessionKey: ctxPayload.SessionKey, @@ -488,7 +489,7 @@ export const dispatchTelegramMessage = async ({ }); const forceBlockStreamingForReasoning = resolvedReasoningLevel === "on"; const streamReasoningDraft = resolvedReasoningLevel === "stream"; - const streamDeliveryEnabled = streamMode !== "off"; + const streamDeliveryEnabled = !isRoomEvent && streamMode !== "off"; const rawReplyQuoteText = ctxPayload.ReplyToIsQuote && typeof ctxPayload.ReplyToQuoteText === "string" ? ctxPayload.ReplyToQuoteText @@ -1162,7 +1163,7 @@ export const dispatchTelegramMessage = async ({ } } - if (statusReactionController) { + if (statusReactionController && !isRoomEvent) { void statusReactionController.setThinking(); } @@ -1434,6 +1435,8 @@ export const dispatchTelegramMessage = async ({ replyOptions: { skillFilter, disableBlockStreaming, + sourceReplyDeliveryMode: isRoomEvent ? "message_tool_only" : undefined, + suppressTyping: isRoomEvent, onPartialReply: answerLane.stream || reasoningLane.stream ? (payload) => @@ -1473,7 +1476,8 @@ export const dispatchTelegramMessage = async ({ : undefined, suppressDefaultToolProgressMessages: !streamDeliveryEnabled || Boolean(answerLane.stream), - allowProgressCallbacksWhenSourceDeliverySuppressed: Boolean(answerLane.stream), + allowProgressCallbacksWhenSourceDeliverySuppressed: + !isRoomEvent && Boolean(answerLane.stream), onToolStart: async (payload) => { const toolName = payload.name?.trim(); const progressPromise = pushStreamToolProgress( @@ -1722,7 +1726,13 @@ export const dispatchTelegramMessage = async ({ ); } + const shouldClearGroupHistory = + !isRoomEvent || deliverySummary.delivered || sentFallback || queuedFinal; + if (!hasFinalResponse) { + if (!shouldClearGroupHistory) { + return; + } clearGroupHistory(); return; } @@ -1795,5 +1805,7 @@ export const dispatchTelegramMessage = async ({ }, }); } - clearGroupHistory(); + if (shouldClearGroupHistory) { + clearGroupHistory(); + } };