mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-09 13:50:42 +00:00
fix(telegram): hydrate inbound reply chains
This commit is contained in:
@@ -60,6 +60,7 @@ import {
|
||||
resolveInboundMediaFileId,
|
||||
} from "./bot-handlers.media.js";
|
||||
import type { TelegramMediaRef } from "./bot-message-context.js";
|
||||
import type { TelegramMessageContextOptions } from "./bot-message-context.types.js";
|
||||
import {
|
||||
parseTelegramNativeCommandCallbackData,
|
||||
RegisterTelegramHandlerParams,
|
||||
@@ -102,6 +103,13 @@ import {
|
||||
import { migrateTelegramGroupConfig } from "./group-migration.js";
|
||||
import { resolveTelegramInlineButtonsScope } from "./inline-buttons.js";
|
||||
import { dispatchTelegramPluginInteractiveHandler } from "./interactive-dispatch.js";
|
||||
import {
|
||||
buildTelegramReplyChain,
|
||||
createTelegramMessageCache,
|
||||
resolveTelegramMessageCachePath,
|
||||
type TelegramCachedMessageNode,
|
||||
type TelegramReplyChainEntry,
|
||||
} from "./message-cache.js";
|
||||
import {
|
||||
buildModelsKeyboard,
|
||||
buildProviderKeyboard,
|
||||
@@ -158,9 +166,15 @@ export const registerTelegramHandlers = ({
|
||||
|
||||
const mediaGroupBuffer = new Map<string, MediaGroupEntry>();
|
||||
let mediaGroupProcessing: Promise<void> = Promise.resolve();
|
||||
const messageCache = createTelegramMessageCache({
|
||||
persistedPath: resolveTelegramMessageCachePath(
|
||||
telegramDeps.resolveStorePath(cfg.session?.store),
|
||||
),
|
||||
});
|
||||
|
||||
type TextFragmentEntry = {
|
||||
key: string;
|
||||
threadId?: number;
|
||||
messages: Array<{ msg: Message; ctx: TelegramContext; receivedAtMs: number }>;
|
||||
timer: ReturnType<typeof setTimeout>;
|
||||
};
|
||||
@@ -179,6 +193,7 @@ export const registerTelegramHandlers = ({
|
||||
debounceKey: string | null;
|
||||
debounceLane: TelegramDebounceLane;
|
||||
botUsername?: string;
|
||||
threadId?: number;
|
||||
};
|
||||
const resolveTelegramDebounceLane = (msg: Message): TelegramDebounceLane => {
|
||||
const forwardMeta = msg as {
|
||||
@@ -248,17 +263,10 @@ export const registerTelegramHandlers = ({
|
||||
return;
|
||||
}
|
||||
if (entries.length === 1) {
|
||||
const replyMedia = await resolveReplyMediaForMessage(last.ctx, last.msg);
|
||||
await processMessage(
|
||||
last.ctx,
|
||||
last.allMedia,
|
||||
last.storeAllowFrom,
|
||||
{
|
||||
receivedAtMs: last.receivedAtMs,
|
||||
ingressBuffer: "inbound-debounce",
|
||||
},
|
||||
replyMedia,
|
||||
);
|
||||
await processMessageWithReplyChain(last.ctx, last.msg, last.allMedia, last.storeAllowFrom, {
|
||||
receivedAtMs: last.receivedAtMs,
|
||||
ingressBuffer: "inbound-debounce",
|
||||
});
|
||||
return;
|
||||
}
|
||||
const combinedText = entries
|
||||
@@ -278,9 +286,9 @@ export const registerTelegramHandlers = ({
|
||||
});
|
||||
const messageIdOverride = last.msg.message_id ? String(last.msg.message_id) : undefined;
|
||||
const syntheticCtx = buildSyntheticContext(baseCtx, syntheticMessage);
|
||||
const replyMedia = await resolveReplyMediaForMessage(baseCtx, syntheticMessage);
|
||||
await processMessage(
|
||||
await processMessageWithReplyChain(
|
||||
syntheticCtx,
|
||||
syntheticMessage,
|
||||
combinedMedia,
|
||||
first.storeAllowFrom,
|
||||
{
|
||||
@@ -288,7 +296,6 @@ export const registerTelegramHandlers = ({
|
||||
receivedAtMs: first.receivedAtMs,
|
||||
ingressBuffer: "inbound-debounce",
|
||||
},
|
||||
replyMedia,
|
||||
);
|
||||
},
|
||||
onError: (err, items) => {
|
||||
@@ -442,8 +449,12 @@ export const registerTelegramHandlers = ({
|
||||
}
|
||||
|
||||
const storeAllowFrom = await loadStoreAllowFrom();
|
||||
const replyMedia = await resolveReplyMediaForMessage(primaryEntry.ctx, primaryEntry.msg);
|
||||
await processMessage(primaryEntry.ctx, allMedia, storeAllowFrom, undefined, replyMedia);
|
||||
await processMessageWithReplyChain(
|
||||
primaryEntry.ctx,
|
||||
primaryEntry.msg,
|
||||
allMedia,
|
||||
storeAllowFrom,
|
||||
);
|
||||
} catch (err) {
|
||||
runtime.error?.(danger(`media group handler failed: ${String(err)}`));
|
||||
}
|
||||
@@ -473,7 +484,8 @@ export const registerTelegramHandlers = ({
|
||||
const storeAllowFrom = await loadStoreAllowFrom();
|
||||
const baseCtx = first.ctx;
|
||||
|
||||
await processMessage(buildSyntheticContext(baseCtx, syntheticMessage), [], storeAllowFrom, {
|
||||
const syntheticCtx = buildSyntheticContext(baseCtx, syntheticMessage);
|
||||
await processMessageWithReplyChain(syntheticCtx, syntheticMessage, [], storeAllowFrom, {
|
||||
messageIdOverride: String(last.msg.message_id),
|
||||
receivedAtMs: first.receivedAtMs,
|
||||
ingressBuffer: "text-fragment",
|
||||
@@ -507,42 +519,88 @@ export const registerTelegramHandlers = ({
|
||||
const loadStoreAllowFrom = async () =>
|
||||
telegramDeps.readChannelAllowFromStore("telegram", process.env, accountId).catch(() => []);
|
||||
|
||||
const resolveReplyMediaForMessage = async (
|
||||
const recordMessageForReplyChain = (msg: Message, threadId?: number) =>
|
||||
messageCache.record({
|
||||
accountId,
|
||||
chatId: msg.chat.id,
|
||||
msg,
|
||||
...(threadId != null ? { threadId } : {}),
|
||||
});
|
||||
|
||||
const buildReplyChainForMessage = (msg: Message) =>
|
||||
buildTelegramReplyChain({
|
||||
cache: messageCache,
|
||||
accountId,
|
||||
chatId: msg.chat.id,
|
||||
msg,
|
||||
});
|
||||
|
||||
const toReplyChainEntry = (
|
||||
node: TelegramCachedMessageNode,
|
||||
media?: TelegramMediaRef,
|
||||
): TelegramReplyChainEntry => {
|
||||
const { sourceMessage: _sourceMessage, ...entry } = node;
|
||||
return {
|
||||
...entry,
|
||||
...(media?.path ? { mediaPath: media.path } : {}),
|
||||
...(media?.contentType ? { mediaType: media.contentType } : {}),
|
||||
};
|
||||
};
|
||||
|
||||
const resolveReplyMediaForChain = async (
|
||||
ctx: TelegramContext,
|
||||
chain: TelegramCachedMessageNode[],
|
||||
): Promise<{ replyMedia: TelegramMediaRef[]; replyChain: TelegramReplyChainEntry[] }> => {
|
||||
const replyMedia: TelegramMediaRef[] = [];
|
||||
const replyChain: TelegramReplyChainEntry[] = [];
|
||||
for (const node of chain) {
|
||||
const replyFileId = resolveInboundMediaFileId(node.sourceMessage);
|
||||
if (!replyFileId || !hasInboundMedia(node.sourceMessage)) {
|
||||
replyChain.push(toReplyChainEntry(node));
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
const media = await resolveMedia({
|
||||
ctx: {
|
||||
message: node.sourceMessage,
|
||||
me: ctx.me,
|
||||
getFile: async () => await bot.api.getFile(replyFileId),
|
||||
},
|
||||
maxBytes: mediaMaxBytes,
|
||||
...mediaRuntimeOptions,
|
||||
});
|
||||
if (!media) {
|
||||
replyChain.push(toReplyChainEntry(node));
|
||||
continue;
|
||||
}
|
||||
const mediaRef: TelegramMediaRef = {
|
||||
path: media.path,
|
||||
...(media.contentType ? { contentType: media.contentType } : {}),
|
||||
...(media.stickerMetadata ? { stickerMetadata: media.stickerMetadata } : {}),
|
||||
};
|
||||
replyMedia.push(mediaRef);
|
||||
replyChain.push(toReplyChainEntry(node, mediaRef));
|
||||
} catch (err) {
|
||||
logger.warn(
|
||||
{ chatId: ctx.message.chat.id, error: String(err) },
|
||||
"reply media fetch failed",
|
||||
);
|
||||
replyChain.push(toReplyChainEntry(node));
|
||||
}
|
||||
}
|
||||
return { replyMedia, replyChain };
|
||||
};
|
||||
|
||||
const processMessageWithReplyChain = async (
|
||||
ctx: TelegramContext,
|
||||
msg: Message,
|
||||
): Promise<TelegramMediaRef[]> => {
|
||||
const replyMessage = msg.reply_to_message;
|
||||
if (!replyMessage || !hasInboundMedia(replyMessage)) {
|
||||
return [];
|
||||
}
|
||||
const replyFileId = resolveInboundMediaFileId(replyMessage);
|
||||
if (!replyFileId) {
|
||||
return [];
|
||||
}
|
||||
try {
|
||||
const media = await resolveMedia({
|
||||
ctx: {
|
||||
message: replyMessage,
|
||||
me: ctx.me,
|
||||
getFile: async () => await bot.api.getFile(replyFileId),
|
||||
},
|
||||
maxBytes: mediaMaxBytes,
|
||||
...mediaRuntimeOptions,
|
||||
});
|
||||
if (!media) {
|
||||
return [];
|
||||
}
|
||||
return [
|
||||
{
|
||||
path: media.path,
|
||||
contentType: media.contentType,
|
||||
stickerMetadata: media.stickerMetadata,
|
||||
},
|
||||
];
|
||||
} catch (err) {
|
||||
logger.warn({ chatId: msg.chat.id, error: String(err) }, "reply media fetch failed");
|
||||
return [];
|
||||
}
|
||||
allMedia: TelegramMediaRef[],
|
||||
storeAllowFrom: string[],
|
||||
options?: TelegramMessageContextOptions,
|
||||
) => {
|
||||
const replyChainNodes = buildReplyChainForMessage(msg);
|
||||
const { replyMedia, replyChain } = await resolveReplyMediaForChain(ctx, replyChainNodes);
|
||||
await processMessage(ctx, allMedia, storeAllowFrom, options, replyMedia, replyChain);
|
||||
};
|
||||
|
||||
const isAllowlistAuthorized = (
|
||||
@@ -1783,7 +1841,8 @@ export const registerTelegramHandlers = ({
|
||||
from: callback.from,
|
||||
text: nativeCallbackCommand ?? data,
|
||||
});
|
||||
await processMessage(buildSyntheticContext(ctx, syntheticMessage), [], storeAllowFrom, {
|
||||
const syntheticCtx = buildSyntheticContext(ctx, syntheticMessage);
|
||||
await processMessageWithReplyChain(syntheticCtx, syntheticMessage, [], storeAllowFrom, {
|
||||
...(nativeCallbackCommand ? { commandSource: "native" as const } : {}),
|
||||
forceWasMentioned: true,
|
||||
messageIdOverride: callback.id,
|
||||
@@ -1943,6 +2002,7 @@ export const registerTelegramHandlers = ({
|
||||
}
|
||||
}
|
||||
|
||||
recordMessageForReplyChain(event.msg, resolvedThreadId ?? dmThreadId);
|
||||
await processInboundMessage({
|
||||
ctx: event.ctx,
|
||||
msg: event.msg,
|
||||
|
||||
@@ -39,6 +39,7 @@ import {
|
||||
} from "./bot/helpers.js";
|
||||
import type { TelegramContext } from "./bot/types.js";
|
||||
import { resolveTelegramGroupPromptSettings } from "./group-config-helpers.js";
|
||||
import type { TelegramReplyChainEntry } from "./message-cache.js";
|
||||
|
||||
type FinalizedTelegramInboundContext = ReturnType<
|
||||
typeof import("./bot-message-context.session.runtime.js").finalizeInboundContext
|
||||
@@ -93,6 +94,7 @@ export async function buildTelegramInboundContextPayload(params: {
|
||||
msg: TelegramContext["message"];
|
||||
allMedia: TelegramMediaRef[];
|
||||
replyMedia: TelegramMediaRef[];
|
||||
replyChain: TelegramReplyChainEntry[];
|
||||
isGroup: boolean;
|
||||
isForum: boolean;
|
||||
chatId: number | string;
|
||||
@@ -139,6 +141,7 @@ export async function buildTelegramInboundContextPayload(params: {
|
||||
msg,
|
||||
allMedia,
|
||||
replyMedia,
|
||||
replyChain,
|
||||
isGroup,
|
||||
isForum,
|
||||
chatId,
|
||||
@@ -225,38 +228,111 @@ export async function buildTelegramInboundContextPayload(params: {
|
||||
forwardedFrom: visibleReplyForwardedFrom,
|
||||
}
|
||||
: null;
|
||||
const fallbackReplyChain: TelegramReplyChainEntry[] = visibleReplyTarget
|
||||
? [
|
||||
{
|
||||
...(visibleReplyTarget.id ? { messageId: visibleReplyTarget.id } : {}),
|
||||
sender: visibleReplyTarget.sender,
|
||||
...(visibleReplyTarget.senderId ? { senderId: visibleReplyTarget.senderId } : {}),
|
||||
...(visibleReplyTarget.senderUsername
|
||||
? { senderUsername: visibleReplyTarget.senderUsername }
|
||||
: {}),
|
||||
...(visibleReplyTarget.body ? { body: visibleReplyTarget.body } : {}),
|
||||
...(visibleReplyTarget.kind === "quote" ? { isQuote: true } : {}),
|
||||
...(visibleReplyTarget.forwardedFrom?.from
|
||||
? { forwardedFrom: visibleReplyTarget.forwardedFrom.from }
|
||||
: {}),
|
||||
...(visibleReplyTarget.forwardedFrom?.fromId
|
||||
? { forwardedFromId: visibleReplyTarget.forwardedFrom.fromId }
|
||||
: {}),
|
||||
...(visibleReplyTarget.forwardedFrom?.fromUsername
|
||||
? { forwardedFromUsername: visibleReplyTarget.forwardedFrom.fromUsername }
|
||||
: {}),
|
||||
...(visibleReplyTarget.forwardedFrom?.date
|
||||
? { forwardedDate: visibleReplyTarget.forwardedFrom.date * 1000 }
|
||||
: {}),
|
||||
},
|
||||
]
|
||||
: [];
|
||||
const rawReplyChain = replyChain.length > 0 ? replyChain : fallbackReplyChain;
|
||||
const replyChainWithVisibleTarget =
|
||||
visibleReplyTarget && rawReplyChain[0]?.messageId === visibleReplyTarget.id
|
||||
? [
|
||||
{
|
||||
...rawReplyChain[0],
|
||||
...(visibleReplyTarget.body ? { body: visibleReplyTarget.body } : {}),
|
||||
...(visibleReplyTarget.kind === "quote" ? { isQuote: true } : {}),
|
||||
...(visibleReplyTarget.forwardedFrom?.from
|
||||
? { forwardedFrom: visibleReplyTarget.forwardedFrom.from }
|
||||
: {}),
|
||||
...(visibleReplyTarget.forwardedFrom?.fromId
|
||||
? { forwardedFromId: visibleReplyTarget.forwardedFrom.fromId }
|
||||
: {}),
|
||||
...(visibleReplyTarget.forwardedFrom?.fromUsername
|
||||
? { forwardedFromUsername: visibleReplyTarget.forwardedFrom.fromUsername }
|
||||
: {}),
|
||||
...(visibleReplyTarget.forwardedFrom?.date
|
||||
? { forwardedDate: visibleReplyTarget.forwardedFrom.date * 1000 }
|
||||
: {}),
|
||||
},
|
||||
...rawReplyChain.slice(1),
|
||||
]
|
||||
: rawReplyChain;
|
||||
const visibleReplyChain = replyChainWithVisibleTarget
|
||||
.filter((entry) =>
|
||||
shouldIncludeGroupSupplementalContext({
|
||||
kind: "quote",
|
||||
senderId: entry.senderId,
|
||||
senderUsername: entry.senderUsername,
|
||||
}),
|
||||
)
|
||||
.map((entry) => {
|
||||
const includeForwarded =
|
||||
entry.forwardedFrom &&
|
||||
shouldIncludeGroupSupplementalContext({
|
||||
kind: "forwarded",
|
||||
senderId: entry.forwardedFromId,
|
||||
senderUsername: entry.forwardedFromUsername,
|
||||
});
|
||||
if (includeForwarded) {
|
||||
return entry;
|
||||
}
|
||||
const {
|
||||
forwardedFrom: _forwardedFrom,
|
||||
forwardedFromId: _forwardedFromId,
|
||||
forwardedFromUsername: _forwardedFromUsername,
|
||||
forwardedDate: _forwardedDate,
|
||||
...withoutForwarded
|
||||
} = entry;
|
||||
return withoutForwarded;
|
||||
});
|
||||
const visibleForwardOrigin = includeForwardOrigin ? forwardOrigin : null;
|
||||
const replyForwardAnnotation = visibleReplyTarget?.forwardedFrom
|
||||
? `[Forwarded from ${visibleReplyTarget.forwardedFrom.from}${
|
||||
visibleReplyTarget.forwardedFrom.date
|
||||
? ` at ${new Date(visibleReplyTarget.forwardedFrom.date * 1000).toISOString()}`
|
||||
: ""
|
||||
}]\n`
|
||||
: "";
|
||||
const buildReplySupplementalLines = (params: { body?: string }) => {
|
||||
const lines: string[] = [];
|
||||
const forwardAnnotation = replyForwardAnnotation.trimEnd();
|
||||
if (forwardAnnotation) {
|
||||
lines.push(forwardAnnotation);
|
||||
}
|
||||
if (params.body) {
|
||||
lines.push(params.body);
|
||||
}
|
||||
return lines.length > 0 ? `\n${lines.join("\n")}` : "";
|
||||
const formatReplyChainEntry = (entry: TelegramReplyChainEntry, index: number) => {
|
||||
const labels = [
|
||||
`${index + 1}. ${entry.sender ?? "unknown sender"}`,
|
||||
entry.messageId ? `id:${entry.messageId}` : undefined,
|
||||
entry.replyToId ? `reply_to:${entry.replyToId}` : undefined,
|
||||
entry.timestamp ? new Date(entry.timestamp).toISOString() : undefined,
|
||||
].filter(Boolean);
|
||||
const bodyLines = [
|
||||
entry.forwardedFrom
|
||||
? `[Forwarded from ${entry.forwardedFrom}${
|
||||
entry.forwardedDate ? ` at ${new Date(entry.forwardedDate).toISOString()}` : ""
|
||||
}]`
|
||||
: undefined,
|
||||
entry.isQuote && entry.body ? `"${entry.body}"` : entry.body,
|
||||
entry.mediaType ? `<media:${entry.mediaType}>` : undefined,
|
||||
entry.mediaPath ? `[media_path:${entry.mediaPath}]` : undefined,
|
||||
entry.mediaRef ? `[media_ref:${entry.mediaRef}]` : undefined,
|
||||
].filter(Boolean);
|
||||
return `[${labels.join(" ")}]\n${bodyLines.join("\n")}`;
|
||||
};
|
||||
const replySuffix = visibleReplyTarget
|
||||
? visibleReplyTarget.kind === "quote"
|
||||
? `\n\n[Quoting ${visibleReplyTarget.sender}${
|
||||
visibleReplyTarget.id ? ` id:${visibleReplyTarget.id}` : ""
|
||||
}]${buildReplySupplementalLines({
|
||||
body: visibleReplyTarget.body ? `"${visibleReplyTarget.body}"` : undefined,
|
||||
})}\n[/Quoting]`
|
||||
: `\n\n[Replying to ${visibleReplyTarget.sender}${
|
||||
visibleReplyTarget.id ? ` id:${visibleReplyTarget.id}` : ""
|
||||
}]${buildReplySupplementalLines({
|
||||
body: visibleReplyTarget.body,
|
||||
})}\n[/Replying]`
|
||||
: "";
|
||||
const replySuffix =
|
||||
visibleReplyChain.length > 0
|
||||
? `\n\n[Reply chain - nearest first]\n${visibleReplyChain
|
||||
.map(formatReplyChainEntry)
|
||||
.join("\n")}\n[/Reply chain]`
|
||||
: "";
|
||||
const forwardPrefix = visibleForwardOrigin
|
||||
? `[Forwarded from ${visibleForwardOrigin.from}${
|
||||
visibleForwardOrigin.date
|
||||
@@ -352,9 +428,10 @@ export async function buildTelegramInboundContextPayload(params: {
|
||||
Surface: "telegram",
|
||||
BotUsername: primaryCtx.me?.username ?? undefined,
|
||||
MessageSid: options?.messageIdOverride ?? String(msg.message_id),
|
||||
ReplyToId: visibleReplyTarget?.id,
|
||||
ReplyToBody: visibleReplyTarget?.body,
|
||||
ReplyToSender: visibleReplyTarget?.sender,
|
||||
ReplyToId: visibleReplyChain[0]?.messageId ?? visibleReplyTarget?.id,
|
||||
ReplyToBody: visibleReplyChain[0]?.body ?? visibleReplyTarget?.body,
|
||||
ReplyToSender: visibleReplyChain[0]?.sender ?? visibleReplyTarget?.sender,
|
||||
ReplyChain: visibleReplyChain.length > 0 ? visibleReplyChain : undefined,
|
||||
ReplyToIsQuote: visibleReplyTarget?.kind === "quote" ? true : undefined,
|
||||
ReplyToIsExternal: visibleReplyTarget?.source === "external_reply" ? true : undefined,
|
||||
ReplyToQuoteText: visibleReplyTarget?.quoteText,
|
||||
|
||||
@@ -115,6 +115,7 @@ export const buildTelegramMessageContext = async ({
|
||||
primaryCtx,
|
||||
allMedia,
|
||||
replyMedia = [],
|
||||
replyChain = [],
|
||||
storeAllowFrom,
|
||||
options,
|
||||
bot,
|
||||
@@ -578,6 +579,7 @@ export const buildTelegramMessageContext = async ({
|
||||
msg,
|
||||
allMedia,
|
||||
replyMedia,
|
||||
replyChain,
|
||||
isGroup,
|
||||
isForum,
|
||||
chatId,
|
||||
|
||||
@@ -8,6 +8,7 @@ import type {
|
||||
} from "openclaw/plugin-sdk/config-types";
|
||||
import type { HistoryEntry } from "openclaw/plugin-sdk/reply-history";
|
||||
import type { StickerMetadata, TelegramContext } from "./bot/types.js";
|
||||
import type { TelegramReplyChainEntry } from "./message-cache.js";
|
||||
|
||||
export type TelegramMediaRef = {
|
||||
path: string;
|
||||
@@ -70,6 +71,7 @@ export type BuildTelegramMessageContextParams = {
|
||||
primaryCtx: TelegramContext;
|
||||
allMedia: TelegramMediaRef[];
|
||||
replyMedia?: TelegramMediaRef[];
|
||||
replyChain?: TelegramReplyChainEntry[];
|
||||
storeAllowFrom: string[];
|
||||
options?: TelegramMessageContextOptions;
|
||||
bot: Bot;
|
||||
|
||||
@@ -13,6 +13,7 @@ import { dispatchTelegramMessage } from "./bot-message-dispatch.js";
|
||||
import type { TelegramBotOptions } from "./bot.types.js";
|
||||
import { buildTelegramThreadParams } from "./bot/helpers.js";
|
||||
import type { TelegramContext, TelegramStreamMode } from "./bot/types.js";
|
||||
import type { TelegramReplyChainEntry } from "./message-cache.js";
|
||||
|
||||
/** Dependencies injected once when creating the message processor. */
|
||||
type TelegramMessageProcessorDeps = Omit<
|
||||
@@ -60,6 +61,7 @@ export const createTelegramMessageProcessor = (deps: TelegramMessageProcessorDep
|
||||
storeAllowFrom: string[],
|
||||
options?: TelegramMessageContextOptions,
|
||||
replyMedia?: TelegramMediaRef[],
|
||||
replyChain?: TelegramReplyChainEntry[],
|
||||
) => {
|
||||
const ingressReceivedAtMs =
|
||||
typeof options?.receivedAtMs === "number" && Number.isFinite(options.receivedAtMs)
|
||||
@@ -72,6 +74,7 @@ export const createTelegramMessageProcessor = (deps: TelegramMessageProcessorDep
|
||||
primaryCtx,
|
||||
allMedia,
|
||||
replyMedia,
|
||||
replyChain,
|
||||
storeAllowFrom,
|
||||
options,
|
||||
bot,
|
||||
|
||||
@@ -405,6 +405,7 @@ export type RegisterTelegramHandlerParams = {
|
||||
storeAllowFrom: string[],
|
||||
options?: TelegramMessageContextOptions,
|
||||
replyMedia?: TelegramMediaRef[],
|
||||
replyChain?: import("./message-cache.js").TelegramReplyChainEntry[],
|
||||
) => Promise<void>;
|
||||
logger: ReturnType<typeof getChildLogger>;
|
||||
};
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { rmSync } from "node:fs";
|
||||
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types";
|
||||
import type { MockFn } from "openclaw/plugin-sdk/plugin-test-runtime";
|
||||
import type { GetReplyOptions, MsgContext } from "openclaw/plugin-sdk/reply-runtime";
|
||||
@@ -460,6 +461,7 @@ beforeEach(() => {
|
||||
getRuntimeConfig.mockReset();
|
||||
getRuntimeConfig.mockReturnValue(DEFAULT_TELEGRAM_TEST_CONFIG);
|
||||
sessionStoreEntries.value = {};
|
||||
rmSync(`${sessionStorePath}.telegram-messages.json`, { force: true });
|
||||
loadSessionStoreMock.mockReset();
|
||||
loadSessionStoreMock.mockImplementation(() => sessionStoreEntries.value);
|
||||
resolveStorePathMock.mockReset();
|
||||
|
||||
@@ -1564,7 +1564,8 @@ describe("createTelegramBot", () => {
|
||||
|
||||
expect(replySpy).toHaveBeenCalledTimes(1);
|
||||
const payload = replySpy.mock.calls[0][0];
|
||||
expect(payload.Body).toContain("[Quoting Ada id:9001]");
|
||||
expect(payload.Body).toContain("[Reply chain - nearest first]");
|
||||
expect(payload.Body).toContain("[1. Ada id:9001]");
|
||||
expect(payload.Body).toContain('"summarize this"');
|
||||
expect(payload.ReplyToId).toBe("9001");
|
||||
expect(payload.ReplyToBody).toBe("summarize this");
|
||||
@@ -1601,7 +1602,8 @@ describe("createTelegramBot", () => {
|
||||
|
||||
expect(replySpy).toHaveBeenCalledTimes(1);
|
||||
const payload = replySpy.mock.calls[0][0];
|
||||
expect(payload.Body).toContain("[Replying to Ada id:9001]");
|
||||
expect(payload.Body).toContain("[Reply chain - nearest first]");
|
||||
expect(payload.Body).toContain("[1. Ada id:9001]");
|
||||
expect(payload.Body).not.toContain("PK");
|
||||
expect(payload.Body).not.toContain("unsafe reply text omitted");
|
||||
expect(payload.ReplyToBody).toBeUndefined();
|
||||
@@ -1665,6 +1667,110 @@ describe("createTelegramBot", () => {
|
||||
expect(mediaFetch).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("hydrates reply chains from cached Telegram messages", async () => {
|
||||
onSpy.mockClear();
|
||||
replySpy.mockClear();
|
||||
getFileSpy.mockClear();
|
||||
|
||||
const mediaFetch = vi.fn(
|
||||
async () =>
|
||||
new Response(new Uint8Array([0x89, 0x50, 0x4e, 0x47]), {
|
||||
status: 200,
|
||||
headers: { "content-type": "image/png" },
|
||||
}),
|
||||
);
|
||||
const ssrfMock = mockPinnedHostnameResolution();
|
||||
|
||||
try {
|
||||
createTelegramBot({
|
||||
token: "tok",
|
||||
telegramTransport: {
|
||||
fetch: mediaFetch as typeof fetch,
|
||||
sourceFetch: mediaFetch as typeof fetch,
|
||||
close: async () => {},
|
||||
},
|
||||
});
|
||||
const handler = getOnHandler("message") as (ctx: Record<string, unknown>) => Promise<void>;
|
||||
|
||||
await handler({
|
||||
message: {
|
||||
chat: { id: 7, type: "private" },
|
||||
message_id: 9000,
|
||||
date: 1736380700,
|
||||
from: { id: 1, first_name: "Kesava" },
|
||||
photo: [{ file_id: "root-photo-1" }],
|
||||
},
|
||||
me: { username: "openclaw_bot" },
|
||||
getFile: async () => ({ file_path: "media/root.jpg" }),
|
||||
});
|
||||
|
||||
await handler({
|
||||
message: {
|
||||
chat: { id: 7, type: "private" },
|
||||
message_id: 9001,
|
||||
text: "r u back from hermes",
|
||||
date: 1736380750,
|
||||
from: { id: 2, first_name: "Ada" },
|
||||
reply_to_message: {
|
||||
message_id: 9000,
|
||||
photo: [{ file_id: "root-photo-1" }],
|
||||
from: { id: 1, first_name: "Kesava" },
|
||||
},
|
||||
},
|
||||
me: { username: "openclaw_bot" },
|
||||
getFile: async () => ({ download: async () => new Uint8Array() }),
|
||||
});
|
||||
|
||||
replySpy.mockClear();
|
||||
getFileSpy.mockClear();
|
||||
mediaFetch.mockClear();
|
||||
|
||||
await handler({
|
||||
message: {
|
||||
chat: { id: 7, type: "private" },
|
||||
message_id: 9002,
|
||||
text: "why did you reply?",
|
||||
date: 1736380800,
|
||||
from: { id: 3, first_name: "Grace" },
|
||||
reply_to_message: {
|
||||
message_id: 9001,
|
||||
text: "r u back from hermes",
|
||||
from: { id: 2, first_name: "Ada" },
|
||||
},
|
||||
},
|
||||
me: { username: "openclaw_bot" },
|
||||
getFile: async () => ({ download: async () => new Uint8Array() }),
|
||||
});
|
||||
} finally {
|
||||
ssrfMock.mockRestore();
|
||||
}
|
||||
|
||||
expect(replySpy).toHaveBeenCalledTimes(1);
|
||||
const payload = replySpy.mock.calls[0][0] as {
|
||||
ReplyChain?: Array<{
|
||||
messageId?: string;
|
||||
body?: string;
|
||||
mediaPath?: string;
|
||||
mediaRef?: string;
|
||||
replyToId?: string;
|
||||
}>;
|
||||
};
|
||||
expect(payload.ReplyChain).toEqual([
|
||||
expect.objectContaining({
|
||||
messageId: "9001",
|
||||
body: "r u back from hermes",
|
||||
replyToId: "9000",
|
||||
}),
|
||||
expect.objectContaining({
|
||||
messageId: "9000",
|
||||
mediaRef: "telegram:file/root-photo-1",
|
||||
}),
|
||||
]);
|
||||
expect(payload.ReplyChain?.[1]?.mediaPath).toBeTruthy();
|
||||
expect(getFileSpy).toHaveBeenCalledWith("root-photo-1");
|
||||
expect(mediaFetch).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("does not fetch reply media for unauthorized DM replies", async () => {
|
||||
onSpy.mockClear();
|
||||
replySpy.mockClear();
|
||||
@@ -1833,7 +1939,8 @@ describe("createTelegramBot", () => {
|
||||
|
||||
expect(replySpy).toHaveBeenCalledTimes(1);
|
||||
const payload = replySpy.mock.calls[0][0];
|
||||
expect(payload.Body).toContain("[Quoting unknown sender]");
|
||||
expect(payload.Body).toContain("[Reply chain - nearest first]");
|
||||
expect(payload.Body).toContain("[1. unknown sender]");
|
||||
expect(payload.Body).toContain('"summarize this"');
|
||||
expect(payload.ReplyToId).toBeUndefined();
|
||||
expect(payload.ReplyToBody).toBe("summarize this");
|
||||
@@ -1868,7 +1975,8 @@ describe("createTelegramBot", () => {
|
||||
|
||||
expect(replySpy).toHaveBeenCalledTimes(1);
|
||||
const payload = replySpy.mock.calls[0][0];
|
||||
expect(payload.Body).toContain("[Quoting Ada id:9002]");
|
||||
expect(payload.Body).toContain("[Reply chain - nearest first]");
|
||||
expect(payload.Body).toContain("[1. Ada id:9002]");
|
||||
expect(payload.Body).toContain('"summarize this"');
|
||||
expect(payload.ReplyToId).toBe("9002");
|
||||
expect(payload.ReplyToBody).toBe("summarize this");
|
||||
|
||||
Reference in New Issue
Block a user