feat(telegram): route ambient chatter as room events

This commit is contained in:
Ayaan Zaidi
2026-05-13 12:07:10 +05:30
committed by Peter Steinberger
parent 503c3d139c
commit 56cc150771
7 changed files with 152 additions and 9 deletions

View File

@@ -69,6 +69,7 @@ export type TelegramInboundBodyResult = {
effectiveWasMentioned: boolean;
canDetectMention: boolean;
shouldBypassMention: boolean;
hasControlCommand: boolean;
audioTranscribedMediaIndex?: number;
stickerCacheHit: boolean;
locationData?: NormalizedLocation;
@@ -341,7 +342,7 @@ export async function resolveTelegramInboundBody(params: {
canDetectMention,
wasMentioned,
hasAnyMention,
implicitMentionKinds: isGroup && Boolean(requireMention) ? implicitMentionKinds : [],
implicitMentionKinds: isGroup ? implicitMentionKinds : [],
},
policy: {
isGroup,
@@ -419,6 +420,7 @@ export async function resolveTelegramInboundBody(params: {
effectiveWasMentioned,
canDetectMention,
shouldBypassMention: mentionDecision.shouldBypassMention,
hasControlCommand: hasControlCommandInMessage,
...(audioTranscribedMediaIndex !== undefined && audioTranscribedMediaIndex >= 0
? { audioTranscribedMediaIndex }
: {}),

View File

@@ -61,6 +61,60 @@ describe("buildTelegramMessageContext requireMention precedence", () => {
}
});
it("marks always-on ambient group messages as room events", async () => {
const ctx = await buildTelegramMessageContextForTest({
message: buildForumMessage(),
resolveGroupActivation: () => false,
resolveGroupRequireMention: () => false,
resolveTelegramGroupConfig: () => ({
groupConfig: { requireMention: false },
topicConfig: undefined,
}),
});
expect(ctx?.ctxPayload.InboundTurnKind).toBe("room_event");
});
it("keeps room events as context for the next direct group request", async () => {
const groupHistories = new Map();
await buildTelegramMessageContextForTest({
message: { ...buildForumMessage(99), text: "side chatter" },
historyLimit: 10,
groupHistories,
resolveGroupActivation: () => false,
resolveGroupRequireMention: () => false,
resolveTelegramGroupConfig: () => ({
groupConfig: { requireMention: false },
topicConfig: undefined,
}),
});
const ctx = await buildTelegramMessageContextForTest({
message: {
...buildForumMessage(99),
message_id: 2,
text: "replying directly",
reply_to_message: {
message_id: 10,
chat: { id: -1001234567890, type: "supergroup", title: "Forum", is_forum: true },
from: { id: 7, first_name: "Bot", username: "bot", is_bot: true },
text: "previous bot message",
},
},
historyLimit: 10,
groupHistories,
resolveGroupActivation: () => false,
resolveGroupRequireMention: () => false,
resolveTelegramGroupConfig: () => ({
groupConfig: { requireMention: false },
topicConfig: undefined,
}),
});
expect(ctx?.ctxPayload.InboundTurnKind).toBe("user_request");
expect(ctx?.ctxPayload.Body).toContain("side chatter");
});
it("lets explicit topic requireMention=false override mention activation", async () => {
const resolveGroupActivation = vi.fn(() => true);

View File

@@ -175,6 +175,7 @@ export async function buildTelegramInboundContextPayload(params: {
topicConfig?: TelegramTopicConfig;
stickerCacheHit: boolean;
effectiveWasMentioned: boolean;
hasControlCommand: boolean;
audioTranscribedMediaIndex?: number;
commandAuthorized: boolean;
locationData?: NormalizedLocation;
@@ -223,6 +224,7 @@ export async function buildTelegramInboundContextPayload(params: {
topicConfig,
stickerCacheHit,
effectiveWasMentioned,
hasControlCommand,
audioTranscribedMediaIndex,
commandAuthorized,
locationData,
@@ -368,9 +370,9 @@ export async function buildTelegramInboundContextPayload(params: {
previousTimestamp,
envelope: envelopeOptions,
});
const channelHistory = createChannelHistoryWindow({ historyMap: groupHistories });
let combinedBody = body;
if (isGroup && historyKey && historyLimit > 0) {
const channelHistory = createChannelHistoryWindow({ historyMap: groupHistories });
combinedBody = channelHistory.buildPendingContext({
historyKey,
limit: historyLimit,
@@ -397,7 +399,7 @@ export async function buildTelegramInboundContextPayload(params: {
});
const inboundHistory =
isGroup && historyKey && historyLimit > 0
? createChannelHistoryWindow({ historyMap: groupHistories }).buildInboundHistory({
? channelHistory.buildInboundHistory({
historyKey,
limit: historyLimit,
})
@@ -411,6 +413,10 @@ export async function buildTelegramInboundContextPayload(params: {
const telegramTo = `telegram:${chatId}`;
const locationContext = locationData ? toLocationContext(locationData) : undefined;
const commandSource = options?.commandSource;
const inboundTurnKind =
isGroup && !effectiveWasMentioned && !hasControlCommand && commandSource !== "native"
? "room_event"
: "user_request";
const ctxPayload = sessionRuntime.buildChannelTurnContext({
channel: "telegram",
accountId: route.accountId,
@@ -447,6 +453,7 @@ export async function buildTelegramInboundContextPayload(params: {
messageThreadId: threadSpec.id,
},
message: {
inboundTurnKind,
body: combinedBody,
rawBody,
bodyForAgent: bodyText,
@@ -538,6 +545,18 @@ export async function buildTelegramInboundContextPayload(params: {
TopicName: isForum && topicName ? topicName : undefined,
},
} satisfies BuildChannelTurnContextParams);
if (inboundTurnKind === "room_event" && historyKey) {
channelHistory.record({
historyKey,
limit: historyLimit,
entry: {
sender: buildSenderLabel(msg, senderId || chatId),
body: rawBody,
timestamp: msg.date ? msg.date * 1000 : undefined,
messageId: typeof msg.message_id === "number" ? String(msg.message_id) : undefined,
},
});
}
const pinnedMainDmOwner = !isGroup
? sessionRuntime.resolvePinnedMainDmOwnerFromAllowlist({

View File

@@ -15,6 +15,8 @@ type BuildTelegramMessageContextForTestParams = {
options?: BuildTelegramMessageContextParams["options"];
cfg?: Record<string, unknown>;
accountId?: string;
historyLimit?: number;
groupHistories?: Map<string, import("openclaw/plugin-sdk/reply-history").HistoryEntry[]>;
ackReactionScope?: BuildTelegramMessageContextParams["ackReactionScope"];
botApi?: Record<string, unknown>;
runtime?: BuildTelegramMessageContextParams["runtime"];
@@ -77,8 +79,8 @@ export async function buildTelegramMessageContextForTest(
},
sessionRuntime,
account: { accountId: params.accountId ?? "default" } as never,
historyLimit: 0,
groupHistories: new Map(),
historyLimit: params.historyLimit ?? 0,
groupHistories: params.groupHistories ?? new Map(),
dmPolicy: "open",
allowFrom: ["*"],
groupAllowFrom: [],

View File

@@ -603,6 +603,7 @@ export const buildTelegramMessageContext = async ({
topicConfig,
stickerCacheHit: bodyResult.stickerCacheHit,
effectiveWasMentioned: bodyResult.effectiveWasMentioned,
hasControlCommand: bodyResult.hasControlCommand,
...(bodyResult.audioTranscribedMediaIndex !== undefined
? { audioTranscribedMediaIndex: bodyResult.audioTranscribedMediaIndex }
: {}),

View File

@@ -1498,6 +1498,59 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(sendMessageTelegram).not.toHaveBeenCalled();
});
it("runs ambient room events as tool-only invisible turns", async () => {
const historyKey = "telegram:group:-100123";
const groupHistories = new Map([
[historyKey, [{ sender: "Alice", body: "side chatter", timestamp: 1 }]],
]);
dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({
queuedFinal: false,
counts: { block: 0, final: 0, tool: 0 },
sourceReplyDeliveryMode: "message_tool_only",
});
await dispatchWithContext({
context: createContext({
ctxPayload: {
InboundTurnKind: "room_event",
SessionKey: "agent:main:telegram:group:-100123",
ChatType: "group",
MessageSid: "99",
RawBody: "ambient",
BodyForAgent: "ambient",
CommandBody: "ambient",
} as unknown as TelegramMessageContext["ctxPayload"],
msg: {
chat: { id: -100123, type: "supergroup" },
message_id: 99,
} as unknown as TelegramMessageContext["msg"],
chatId: -100123,
isGroup: true,
historyKey,
historyLimit: 10,
groupHistories,
threadSpec: { id: undefined, scope: "none" },
}),
streamMode: "partial",
});
const dispatchParams = mockCallArg(dispatchReplyWithBufferedBlockDispatcher) as {
replyOptions?: {
sourceReplyDeliveryMode?: string;
suppressTyping?: boolean;
allowProgressCallbacksWhenSourceDeliverySuppressed?: boolean;
};
};
expect(dispatchParams.replyOptions?.sourceReplyDeliveryMode).toBe("message_tool_only");
expect(dispatchParams.replyOptions?.suppressTyping).toBe(true);
expect(dispatchParams.replyOptions?.allowProgressCallbacksWhenSourceDeliverySuppressed).toBe(
false,
);
expect(createTelegramDraftStream).not.toHaveBeenCalled();
expect(deliverReplies).not.toHaveBeenCalled();
expect(groupHistories.get(historyKey)).toHaveLength(1);
});
it("shows compacting reaction during auto-compaction and resumes thinking", async () => {
const statusReactionController = {
setThinking: vi.fn(async () => {}),

View File

@@ -480,6 +480,7 @@ export const dispatchTelegramMessage = async ({
const accountBlockStreamingEnabled =
resolveChannelStreamingBlockEnabled(telegramCfg) ??
cfg.agents?.defaults?.blockStreamingDefault === "on";
const isRoomEvent = ctxPayload.InboundTurnKind === "room_event";
const resolvedReasoningLevel = resolveTelegramReasoningLevel({
cfg,
sessionKey: ctxPayload.SessionKey,
@@ -488,7 +489,7 @@ export const dispatchTelegramMessage = async ({
});
const forceBlockStreamingForReasoning = resolvedReasoningLevel === "on";
const streamReasoningDraft = resolvedReasoningLevel === "stream";
const streamDeliveryEnabled = streamMode !== "off";
const streamDeliveryEnabled = !isRoomEvent && streamMode !== "off";
const rawReplyQuoteText =
ctxPayload.ReplyToIsQuote && typeof ctxPayload.ReplyToQuoteText === "string"
? ctxPayload.ReplyToQuoteText
@@ -1162,7 +1163,7 @@ export const dispatchTelegramMessage = async ({
}
}
if (statusReactionController) {
if (statusReactionController && !isRoomEvent) {
void statusReactionController.setThinking();
}
@@ -1434,6 +1435,8 @@ export const dispatchTelegramMessage = async ({
replyOptions: {
skillFilter,
disableBlockStreaming,
sourceReplyDeliveryMode: isRoomEvent ? "message_tool_only" : undefined,
suppressTyping: isRoomEvent,
onPartialReply:
answerLane.stream || reasoningLane.stream
? (payload) =>
@@ -1473,7 +1476,8 @@ export const dispatchTelegramMessage = async ({
: undefined,
suppressDefaultToolProgressMessages:
!streamDeliveryEnabled || Boolean(answerLane.stream),
allowProgressCallbacksWhenSourceDeliverySuppressed: Boolean(answerLane.stream),
allowProgressCallbacksWhenSourceDeliverySuppressed:
!isRoomEvent && Boolean(answerLane.stream),
onToolStart: async (payload) => {
const toolName = payload.name?.trim();
const progressPromise = pushStreamToolProgress(
@@ -1722,7 +1726,13 @@ export const dispatchTelegramMessage = async ({
);
}
const shouldClearGroupHistory =
!isRoomEvent || deliverySummary.delivered || sentFallback || queuedFinal;
if (!hasFinalResponse) {
if (!shouldClearGroupHistory) {
return;
}
clearGroupHistory();
return;
}
@@ -1795,5 +1805,7 @@ export const dispatchTelegramMessage = async ({
},
});
}
clearGroupHistory();
if (shouldClearGroupHistory) {
clearGroupHistory();
}
};