From 9e1de97a69df6e1645413f4b4e61086e61659afa Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Sat, 7 Mar 2026 19:01:16 +0530 Subject: [PATCH] fix(telegram): route native topic commands to the active session (#38871) * fix(telegram): resolve session entry for /stop in forum topics Fixes #38675 - Export normalizeStoreSessionKey from store.ts for reuse - Use it in resolveSessionEntryForKey so topic session keys (lowercase in store) are found when handling /stop - Add test for forum topic session key lookup * fix(telegram): share native topic routing with inbound messages * fix: land telegram topic routing follow-up (#38871) --------- Co-authored-by: xialonglee --- CHANGELOG.md | 1 + src/auto-reply/reply/abort.test.ts | 14 ++ src/auto-reply/reply/abort.ts | 30 ++++- src/auto-reply/reply/dispatch-from-config.ts | 13 +- src/config/sessions/store.ts | 14 +- src/telegram/bot-handlers.ts | 38 +++--- src/telegram/bot-message-context.ts | 110 ++-------------- src/telegram/bot-message-dispatch.test.ts | 12 +- src/telegram/bot-message-dispatch.ts | 8 +- .../bot-native-commands.session-meta.test.ts | 90 ++++++++++++- .../bot-native-commands.test-helpers.ts | 11 +- src/telegram/bot-native-commands.ts | 44 +++---- src/telegram/conversation-route.ts | 122 ++++++++++++++++++ 13 files changed, 335 insertions(+), 172 deletions(-) create mode 100644 src/telegram/conversation-route.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 16a29952160..5944ff51743 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -230,6 +230,7 @@ Docs: https://docs.openclaw.ai - Gateway/Telegram polling health monitor: skip stale-socket restarts for Telegram long-polling channels and thread channel identity through shared health evaluation so polling connections are not restarted on the WebSocket stale-socket heuristic. (#38395) Thanks @ql-wade and @Takhoffman. - Daemon/systemd fresh-install probe: check for OpenClaw's managed user unit before running `systemctl --user is-enabled`, so first-time Linux installs no longer fail on generic missing-unit probe errors. (#38819) Thanks @adaHubble. - Gateway/Windows restart supervision: relaunch task-managed gateways through Scheduled Task with quoted helper-script command paths, distinguish restart-capable supervisors per platform, and stop orphaned Windows gateway children during self-restart. (#38825) Thanks @obviyus. +- Telegram/native topic command routing: resolve forum-topic native commands through the same conversation route as inbound messages so topic `agentId` overrides and bound topic sessions target the active session instead of the default topic-parent session. (#38871) Thanks @obviyus. ## 2026.3.2 diff --git a/src/auto-reply/reply/abort.test.ts b/src/auto-reply/reply/abort.test.ts index dab520e6b24..df6fa228890 100644 --- a/src/auto-reply/reply/abort.test.ts +++ b/src/auto-reply/reply/abort.test.ts @@ -356,6 +356,20 @@ describe("abort detection", () => { expect(resolveSessionEntryForKey(undefined, "session-1")).toEqual({}); }); + it("resolves Telegram forum topic session when lookup key has different casing than store", () => { + // Store normalizes keys to lowercase; caller may pass mixed-case. /stop in topic must find entry. + const storeKey = "agent:main:telegram:group:-1001234567890:topic:99"; + const lookupKey = "Agent:Main:Telegram:Group:-1001234567890:Topic:99"; + const store = { + [storeKey]: { sessionId: "pi-topic-99", updatedAt: 0 }, + } as Record; + // Direct lookup fails (store uses lowercase keys); normalization fallback must succeed. + expect(store[lookupKey]).toBeUndefined(); + const result = resolveSessionEntryForKey(store, lookupKey); + expect(result.entry?.sessionId).toBe("pi-topic-99"); + expect(result.key).toBe(storeKey); + }); + it("fast-aborts even when text commands are disabled", async () => { const { cfg } = await createAbortConfig({ commandsTextEnabled: false }); diff --git a/src/auto-reply/reply/abort.ts b/src/auto-reply/reply/abort.ts index ba4d92b1dfa..d0f97f04fa8 100644 --- a/src/auto-reply/reply/abort.ts +++ b/src/auto-reply/reply/abort.ts @@ -12,6 +12,7 @@ import { import type { OpenClawConfig } from "../../config/config.js"; import { loadSessionStore, + resolveSessionStoreEntry, resolveStorePath, type SessionEntry, updateSessionStore, @@ -172,13 +173,22 @@ export function formatAbortReplyText(stoppedSubagents?: number): string { export function resolveSessionEntryForKey( store: Record | undefined, sessionKey: string | undefined, -) { +): { entry?: SessionEntry; key?: string; legacyKeys?: string[] } { if (!store || !sessionKey) { return {}; } - const direct = store[sessionKey]; - if (direct) { - return { entry: direct, key: sessionKey }; + const resolved = resolveSessionStoreEntry({ store, sessionKey }); + if (resolved.existing) { + return resolved.legacyKeys.length > 0 + ? { + entry: resolved.existing, + key: resolved.normalizedKey, + legacyKeys: resolved.legacyKeys, + } + : { + entry: resolved.existing, + key: resolved.normalizedKey, + }; } return {}; } @@ -301,7 +311,7 @@ export async function tryFastAbortFromMessage(params: { if (targetKey) { const storePath = resolveStorePath(cfg.session?.store, { agentId }); const store = loadSessionStore(storePath); - const { entry, key } = resolveSessionEntryForKey(store, targetKey); + const { entry, key, legacyKeys } = resolveSessionEntryForKey(store, targetKey); const resolvedTargetKey = key ?? targetKey; const acpManager = getAcpSessionManager(); const acpResolution = acpManager.resolveSession({ @@ -340,6 +350,11 @@ export async function tryFastAbortFromMessage(params: { applyAbortCutoffToSessionEntry(entry, abortCutoff); entry.updatedAt = Date.now(); store[key] = entry; + for (const legacyKey of legacyKeys ?? []) { + if (legacyKey !== key) { + delete store[legacyKey]; + } + } await updateSessionStore(storePath, (nextStore) => { const nextEntry = nextStore[key] ?? entry; if (!nextEntry) { @@ -349,6 +364,11 @@ export async function tryFastAbortFromMessage(params: { applyAbortCutoffToSessionEntry(nextEntry, abortCutoff); nextEntry.updatedAt = Date.now(); nextStore[key] = nextEntry; + for (const legacyKey of legacyKeys ?? []) { + if (legacyKey !== key) { + delete nextStore[legacyKey]; + } + } }); } else if (abortKey) { setAbortMemory(abortKey, true); diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index 003a8f37435..786b1a7c16b 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -1,6 +1,11 @@ import { resolveSessionAgentId } from "../../agents/agent-scope.js"; import type { OpenClawConfig } from "../../config/config.js"; -import { loadSessionStore, resolveStorePath, type SessionEntry } from "../../config/sessions.js"; +import { + loadSessionStore, + resolveSessionStoreEntry, + resolveStorePath, + type SessionEntry, +} from "../../config/sessions.js"; import { logVerbose } from "../../globals.js"; import { fireAndForgetHook } from "../../hooks/fire-and-forget.js"; import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js"; @@ -65,7 +70,7 @@ const isInboundAudioContext = (ctx: FinalizedMsgContext): boolean => { return AUDIO_HEADER_RE.test(trimmed); }; -const resolveSessionStoreEntry = ( +const resolveSessionStoreLookup = ( ctx: FinalizedMsgContext, cfg: OpenClawConfig, ): { @@ -84,7 +89,7 @@ const resolveSessionStoreEntry = ( const store = loadSessionStore(storePath); return { sessionKey, - entry: store[sessionKey.toLowerCase()] ?? store[sessionKey], + entry: resolveSessionStoreEntry({ store, sessionKey }).existing, }; } catch { return { @@ -164,7 +169,7 @@ export async function dispatchReplyFromConfig(params: { return { queuedFinal: false, counts: dispatcher.getQueuedCounts() }; } - const sessionStoreEntry = resolveSessionStoreEntry(ctx, cfg); + const sessionStoreEntry = resolveSessionStoreLookup(ctx, cfg); const acpDispatchSessionKey = sessionStoreEntry.sessionKey ?? sessionKey; const inboundAudio = isInboundAudioContext(ctx); const sessionTtsAuto = normalizeTtsAutoMode(sessionStoreEntry.entry?.ttsAuto); diff --git a/src/config/sessions/store.ts b/src/config/sessions/store.ts index 9984752985d..a70285c4c62 100644 --- a/src/config/sessions/store.ts +++ b/src/config/sessions/store.ts @@ -108,11 +108,11 @@ function removeThreadFromDeliveryContext(context?: DeliveryContext): DeliveryCon return next; } -function normalizeStoreSessionKey(sessionKey: string): string { +export function normalizeStoreSessionKey(sessionKey: string): string { return sessionKey.trim().toLowerCase(); } -function resolveStoreSessionEntry(params: { +export function resolveSessionStoreEntry(params: { store: Record; sessionKey: string; }): { @@ -275,7 +275,7 @@ export function readSessionUpdatedAt(params: { }): number | undefined { try { const store = loadSessionStore(params.storePath); - const resolved = resolveStoreSessionEntry({ store, sessionKey: params.sessionKey }); + const resolved = resolveSessionStoreEntry({ store, sessionKey: params.sessionKey }); return resolved.existing?.updatedAt; } catch { return undefined; @@ -611,7 +611,7 @@ async function writeSessionStoreAtomic(params: { async function persistResolvedSessionEntry(params: { storePath: string; store: Record; - resolved: ReturnType; + resolved: ReturnType; next: SessionEntry; }): Promise { params.store[params.resolved.normalizedKey] = params.next; @@ -734,7 +734,7 @@ export async function updateSessionStoreEntry(params: { const { storePath, sessionKey, update } = params; return await withSessionStoreLock(storePath, async () => { const store = loadSessionStore(storePath, { skipCache: true }); - const resolved = resolveStoreSessionEntry({ store, sessionKey }); + const resolved = resolveSessionStoreEntry({ store, sessionKey }); const existing = resolved.existing; if (!existing) { return null; @@ -765,7 +765,7 @@ export async function recordSessionMetaFromInbound(params: { return await updateSessionStore( storePath, (store) => { - const resolved = resolveStoreSessionEntry({ store, sessionKey }); + const resolved = resolveSessionStoreEntry({ store, sessionKey }); const existing = resolved.existing; const patch = deriveSessionMetaPatch({ ctx, @@ -814,7 +814,7 @@ export async function updateLastRoute(params: { const { storePath, sessionKey, channel, to, accountId, threadId, ctx } = params; return await withSessionStoreLock(storePath, async () => { const store = loadSessionStore(storePath); - const resolved = resolveStoreSessionEntry({ store, sessionKey }); + const resolved = resolveSessionStoreEntry({ store, sessionKey }); const existing = resolved.existing; const now = Date.now(); const explicitContext = normalizeDeliveryContext(params.deliveryContext); diff --git a/src/telegram/bot-handlers.ts b/src/telegram/bot-handlers.ts index 6df34fe2c60..aaea84ecad7 100644 --- a/src/telegram/bot-handlers.ts +++ b/src/telegram/bot-handlers.ts @@ -16,7 +16,11 @@ import { shouldDebounceTextInbound } from "../channels/inbound-debounce-policy.j import { resolveChannelConfigWrites } from "../channels/plugins/config-writes.js"; import { loadConfig } from "../config/config.js"; import { writeConfigFile } from "../config/io.js"; -import { loadSessionStore, resolveStorePath } from "../config/sessions.js"; +import { + loadSessionStore, + resolveSessionStoreEntry, + resolveStorePath, +} from "../config/sessions.js"; import type { DmPolicy } from "../config/types.base.js"; import type { TelegramDirectConfig, @@ -50,6 +54,7 @@ import { resolveTelegramGroupAllowFromContext, } from "./bot/helpers.js"; import type { TelegramContext } from "./bot/types.js"; +import { resolveTelegramConversationRoute } from "./conversation-route.js"; import { enforceTelegramDmAccess } from "./dm-access.js"; import { evaluateTelegramGroupBaseAccess, @@ -268,9 +273,10 @@ export const registerTelegramHandlers = ({ isForum: boolean; messageThreadId?: number; resolvedThreadId?: number; + senderId?: string | number; }): { agentId: string; - sessionEntry: ReturnType[string]; + sessionEntry: ReturnType[string] | undefined; model?: string; } => { const resolvedThreadId = @@ -279,26 +285,20 @@ export const registerTelegramHandlers = ({ isForum: params.isForum, messageThreadId: params.messageThreadId, }); - const peerId = params.isGroup - ? buildTelegramGroupPeerId(params.chatId, resolvedThreadId) - : String(params.chatId); - const parentPeer = buildTelegramParentPeer({ + const dmThreadId = !params.isGroup ? params.messageThreadId : undefined; + const topicThreadId = resolvedThreadId ?? dmThreadId; + const { topicConfig } = resolveTelegramGroupConfig(params.chatId, topicThreadId); + const { route } = resolveTelegramConversationRoute({ + cfg, + accountId, + chatId: params.chatId, isGroup: params.isGroup, resolvedThreadId, - chatId: params.chatId, - }); - const route = resolveAgentRoute({ - cfg, - channel: "telegram", - accountId, - peer: { - kind: params.isGroup ? "group" : "direct", - id: peerId, - }, - parentPeer, + replyThreadId: topicThreadId, + senderId: params.senderId, + topicAgentId: topicConfig?.agentId, }); const baseSessionKey = route.sessionKey; - const dmThreadId = !params.isGroup ? params.messageThreadId : undefined; const threadKeys = dmThreadId != null ? resolveThreadSessionKeys({ baseSessionKey, threadId: `${params.chatId}:${dmThreadId}` }) @@ -306,7 +306,7 @@ export const registerTelegramHandlers = ({ const sessionKey = threadKeys?.sessionKey ?? baseSessionKey; const storePath = resolveStorePath(cfg.session?.store, { agentId: route.agentId }); const store = loadSessionStore(storePath); - const entry = store[sessionKey]; + const entry = resolveSessionStoreEntry({ store, sessionKey }).existing; const storedOverride = resolveStoredModelOverride({ sessionEntry: entry, sessionStore: store, diff --git a/src/telegram/bot-message-context.ts b/src/telegram/bot-message-context.ts index 72cfc527661..ab628dc0e0a 100644 --- a/src/telegram/bot-message-context.ts +++ b/src/telegram/bot-message-context.ts @@ -1,8 +1,5 @@ import type { Bot } from "grammy"; -import { - ensureConfiguredAcpRouteReady, - resolveConfiguredAcpRoute, -} from "../acp/persistent-bindings.route.js"; +import { ensureConfiguredAcpRouteReady } from "../acp/persistent-bindings.route.js"; import { resolveAckReaction } from "../agents/identity.js"; import { findModelInCatalog, @@ -42,19 +39,7 @@ import type { } from "../config/types.js"; import { logVerbose, shouldLogVerbose } from "../globals.js"; import { recordChannelActivity } from "../infra/channel-activity.js"; -import { getSessionBindingService } from "../infra/outbound/session-binding-service.js"; -import { - buildAgentSessionKey, - pickFirstExistingAgentId, - resolveAgentRoute, - type ResolvedAgentRoute, -} from "../routing/resolve-route.js"; -import { - DEFAULT_ACCOUNT_ID, - buildAgentMainSessionKey, - resolveAgentIdFromSessionKey, - resolveThreadSessionKeys, -} from "../routing/session-key.js"; +import { DEFAULT_ACCOUNT_ID, resolveThreadSessionKeys } from "../routing/session-key.js"; import { resolvePinnedMainDmOwnerFromAllowlist } from "../security/dm-policy-shared.js"; import { withTelegramApiErrorLogging } from "./api-logging.js"; import { @@ -67,10 +52,8 @@ import { buildGroupLabel, buildSenderLabel, buildSenderName, - resolveTelegramDirectPeerId, buildTelegramGroupFrom, buildTelegramGroupPeerId, - buildTelegramParentPeer, buildTypingThreadParams, resolveTelegramMediaPlaceholder, expandTextLinks, @@ -81,6 +64,7 @@ import { resolveTelegramThreadSpec, } from "./bot/helpers.js"; import type { StickerMetadata, TelegramContext } from "./bot/types.js"; +import { resolveTelegramConversationRoute } from "./conversation-route.js"; import { enforceTelegramDmAccess } from "./dm-access.js"; import { isTelegramForumServiceMessage } from "./forum-service-message.js"; import { evaluateTelegramGroupBaseAccess } from "./group-access.js"; @@ -209,89 +193,21 @@ export const buildTelegramMessageContext = async ({ !isGroup && groupConfig && "dmPolicy" in groupConfig ? (groupConfig.dmPolicy ?? dmPolicy) : dmPolicy; - const peerId = isGroup - ? buildTelegramGroupPeerId(chatId, resolvedThreadId) - : resolveTelegramDirectPeerId({ chatId, senderId }); - const parentPeer = buildTelegramParentPeer({ isGroup, resolvedThreadId, chatId }); // Fresh config for bindings lookup; other routing inputs are payload-derived. const freshCfg = loadConfig(); - let route: ResolvedAgentRoute = resolveAgentRoute({ + let { route, configuredBinding, configuredBindingSessionKey } = resolveTelegramConversationRoute({ cfg: freshCfg, - channel: "telegram", accountId: account.accountId, - peer: { - kind: isGroup ? "group" : "direct", - id: peerId, - }, - parentPeer, + chatId, + isGroup, + resolvedThreadId, + replyThreadId, + senderId, + topicAgentId: topicConfig?.agentId, }); - // Per-topic agentId override: re-derive session key under the topic's agent. - const rawTopicAgentId = topicConfig?.agentId?.trim(); - if (rawTopicAgentId) { - // Validate agentId against configured agents; falls back to default if not found. - const topicAgentId = pickFirstExistingAgentId(freshCfg, rawTopicAgentId); - const overrideSessionKey = buildAgentSessionKey({ - agentId: topicAgentId, - channel: "telegram", - accountId: account.accountId, - peer: { kind: isGroup ? "group" : "direct", id: peerId }, - dmScope: freshCfg.session?.dmScope, - identityLinks: freshCfg.session?.identityLinks, - }).toLowerCase(); - const overrideMainSessionKey = buildAgentMainSessionKey({ - agentId: topicAgentId, - }).toLowerCase(); - route = { - ...route, - agentId: topicAgentId, - sessionKey: overrideSessionKey, - mainSessionKey: overrideMainSessionKey, - }; - logVerbose( - `telegram: per-topic agent override: topic=${resolvedThreadId ?? dmThreadId} agent=${topicAgentId} sessionKey=${overrideSessionKey}`, - ); - } - const configuredRoute = resolveConfiguredAcpRoute({ - cfg: freshCfg, - route, - channel: "telegram", - accountId: account.accountId, - conversationId: peerId, - parentConversationId: isGroup ? String(chatId) : undefined, - }); - let configuredBinding = configuredRoute.configuredBinding; - let configuredBindingSessionKey = configuredRoute.boundSessionKey ?? ""; - route = configuredRoute.route; - const threadBindingConversationId = - replyThreadId != null - ? `${chatId}:topic:${replyThreadId}` - : !isGroup - ? String(chatId) - : undefined; - if (threadBindingConversationId) { - const threadBinding = getSessionBindingService().resolveByConversation({ - channel: "telegram", - accountId: account.accountId, - conversationId: threadBindingConversationId, - }); - const boundSessionKey = threadBinding?.targetSessionKey?.trim(); - if (threadBinding && boundSessionKey) { - route = { - ...route, - sessionKey: boundSessionKey, - agentId: resolveAgentIdFromSessionKey(boundSessionKey), - matchedBy: "binding.channel", - }; - configuredBinding = null; - configuredBindingSessionKey = ""; - getSessionBindingService().touch(threadBinding.bindingId); - logVerbose( - `telegram: routed via bound conversation ${threadBindingConversationId} -> ${boundSessionKey}`, - ); - } - } - const requiresExplicitAccountBinding = (candidate: ResolvedAgentRoute): boolean => - candidate.accountId !== DEFAULT_ACCOUNT_ID && candidate.matchedBy === "default"; + const requiresExplicitAccountBinding = ( + candidate: ReturnType["route"], + ): boolean => candidate.accountId !== DEFAULT_ACCOUNT_ID && candidate.matchedBy === "default"; // Fail closed for named Telegram accounts when route resolution falls back to // default-agent routing. This prevents cross-account DM/session contamination. if (requiresExplicitAccountBinding(route)) { diff --git a/src/telegram/bot-message-dispatch.test.ts b/src/telegram/bot-message-dispatch.test.ts index b0411e65e70..2e6cf158f10 100644 --- a/src/telegram/bot-message-dispatch.test.ts +++ b/src/telegram/bot-message-dispatch.test.ts @@ -30,10 +30,14 @@ vi.mock("./send.js", () => ({ editMessageTelegram, })); -vi.mock("../config/sessions.js", async () => ({ - loadSessionStore, - resolveStorePath, -})); +vi.mock("../config/sessions.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + loadSessionStore, + resolveStorePath, + }; +}); vi.mock("./sticker-cache.js", () => ({ cacheSticker: vi.fn(), diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index 0433fed9f7a..e6f2f65218d 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -15,7 +15,11 @@ import { logAckFailure, logTypingFailure } from "../channels/logging.js"; import { createReplyPrefixOptions } from "../channels/reply-prefix.js"; import { createTypingCallbacks } from "../channels/typing.js"; import { resolveMarkdownTableMode } from "../config/markdown-tables.js"; -import { loadSessionStore, resolveStorePath } from "../config/sessions.js"; +import { + loadSessionStore, + resolveSessionStoreEntry, + resolveStorePath, +} from "../config/sessions.js"; import type { OpenClawConfig, ReplyToMode, TelegramAccountConfig } from "../config/types.js"; import { danger, logVerbose } from "../globals.js"; import { getAgentScopedMediaLocalRoots } from "../media/local-roots.js"; @@ -117,7 +121,7 @@ function resolveTelegramReasoningLevel(params: { try { const storePath = resolveStorePath(cfg.session?.store, { agentId }); const store = loadSessionStore(storePath, { skipCache: true }); - const entry = store[sessionKey.toLowerCase()] ?? store[sessionKey]; + const entry = resolveSessionStoreEntry({ store, sessionKey }).existing; const level = entry?.reasoningLevel; if (level === "on" || level === "stream") { return level; diff --git a/src/telegram/bot-native-commands.session-meta.test.ts b/src/telegram/bot-native-commands.session-meta.test.ts index 968529d8217..9f0a9f4116d 100644 --- a/src/telegram/bot-native-commands.session-meta.test.ts +++ b/src/telegram/bot-native-commands.session-meta.test.ts @@ -1,6 +1,9 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../config/config.js"; -import { registerTelegramNativeCommands } from "./bot-native-commands.js"; +import { + registerTelegramNativeCommands, + type RegisterTelegramHandlerParams, +} from "./bot-native-commands.js"; import { createNativeCommandTestParams } from "./bot-native-commands.test-helpers.js"; // All mocks scoped to this file only — does not affect bot-native-commands.test.ts @@ -24,6 +27,12 @@ const sessionMocks = vi.hoisted(() => ({ const replyMocks = vi.hoisted(() => ({ dispatchReplyWithBufferedBlockDispatcher: vi.fn(async () => undefined), })); +const sessionBindingMocks = vi.hoisted(() => ({ + resolveByConversation: vi.fn< + (ref: unknown) => { bindingId: string; targetSessionKey: string } | null + >(() => null), + touch: vi.fn(), +})); vi.mock("../acp/persistent-bindings.js", async (importOriginal) => { const actual = await importOriginal(); @@ -49,6 +58,16 @@ vi.mock("../auto-reply/reply/provider-dispatcher.js", () => ({ vi.mock("../channels/reply-prefix.js", () => ({ createReplyPrefixOptions: vi.fn(() => ({ onModelSelected: () => {} })), })); +vi.mock("../infra/outbound/session-binding-service.js", () => ({ + getSessionBindingService: () => ({ + bind: vi.fn(), + getCapabilities: vi.fn(), + listBySession: vi.fn(), + resolveByConversation: (ref: unknown) => sessionBindingMocks.resolveByConversation(ref), + touch: (bindingId: string, at?: number) => sessionBindingMocks.touch(bindingId, at), + unbind: vi.fn(), + }), +})); vi.mock("../auto-reply/skill-commands.js", async (importOriginal) => { const actual = await importOriginal(); return { ...actual, listSkillCommandsForAgents: vi.fn(() => []) }; @@ -106,11 +125,12 @@ function registerAndResolveStatusHandler(params: { cfg: OpenClawConfig; allowFrom?: string[]; groupAllowFrom?: string[]; + resolveTelegramGroupConfig?: RegisterTelegramHandlerParams["resolveTelegramGroupConfig"]; }): { handler: TelegramCommandHandler; sendMessage: ReturnType; } { - const { cfg, allowFrom, groupAllowFrom } = params; + const { cfg, allowFrom, groupAllowFrom, resolveTelegramGroupConfig } = params; const commandHandlers = new Map(); const sendMessage = vi.fn().mockResolvedValue(undefined); registerTelegramNativeCommands({ @@ -127,6 +147,7 @@ function registerAndResolveStatusHandler(params: { cfg, allowFrom: allowFrom ?? ["*"], groupAllowFrom: groupAllowFrom ?? [], + resolveTelegramGroupConfig, }), }); @@ -141,11 +162,19 @@ function registerAndResolveCommandHandler(params: { allowFrom?: string[]; groupAllowFrom?: string[]; useAccessGroups?: boolean; + resolveTelegramGroupConfig?: RegisterTelegramHandlerParams["resolveTelegramGroupConfig"]; }): { handler: TelegramCommandHandler; sendMessage: ReturnType; } { - const { commandName, cfg, allowFrom, groupAllowFrom, useAccessGroups } = params; + const { + commandName, + cfg, + allowFrom, + groupAllowFrom, + useAccessGroups, + resolveTelegramGroupConfig, + } = params; const commandHandlers = new Map(); const sendMessage = vi.fn().mockResolvedValue(undefined); registerTelegramNativeCommands({ @@ -163,6 +192,7 @@ function registerAndResolveCommandHandler(params: { allowFrom: allowFrom ?? [], groupAllowFrom: groupAllowFrom ?? [], useAccessGroups: useAccessGroups ?? true, + resolveTelegramGroupConfig, }), }); @@ -183,6 +213,8 @@ describe("registerTelegramNativeCommands — session metadata", () => { sessionMocks.recordSessionMetaFromInbound.mockClear().mockResolvedValue(undefined); sessionMocks.resolveStorePath.mockClear().mockReturnValue("/tmp/openclaw-sessions.json"); replyMocks.dispatchReplyWithBufferedBlockDispatcher.mockClear().mockResolvedValue(undefined); + sessionBindingMocks.resolveByConversation.mockReset().mockReturnValue(null); + sessionBindingMocks.touch.mockReset(); }); it("calls recordSessionMetaFromInbound after a native slash command", async () => { @@ -273,6 +305,58 @@ describe("registerTelegramNativeCommands — session metadata", () => { expect(sessionMetaCall?.sessionKey).toBe("agent:codex:telegram:slash:200"); }); + it("routes Telegram native commands through topic-specific agent sessions", async () => { + const { handler } = registerAndResolveStatusHandler({ + cfg: {}, + allowFrom: ["200"], + groupAllowFrom: ["200"], + resolveTelegramGroupConfig: () => ({ + groupConfig: { requireMention: false }, + topicConfig: { agentId: "zu" }, + }), + }); + await handler(buildStatusTopicCommandContext()); + + const dispatchCall = ( + replyMocks.dispatchReplyWithBufferedBlockDispatcher.mock.calls as unknown as Array< + [{ ctx?: { CommandTargetSessionKey?: string } }] + > + )[0]?.[0]; + expect(dispatchCall?.ctx?.CommandTargetSessionKey).toBe( + "agent:zu:telegram:group:-1001234567890:topic:42", + ); + }); + + it("routes Telegram native commands through bound topic sessions", async () => { + sessionBindingMocks.resolveByConversation.mockReturnValue({ + bindingId: "default:-1001234567890:topic:42", + targetSessionKey: "agent:codex-acp:session-1", + }); + + const { handler } = registerAndResolveStatusHandler({ + cfg: {}, + allowFrom: ["200"], + groupAllowFrom: ["200"], + }); + await handler(buildStatusTopicCommandContext()); + + expect(sessionBindingMocks.resolveByConversation).toHaveBeenCalledWith({ + channel: "telegram", + accountId: "default", + conversationId: "-1001234567890:topic:42", + }); + const dispatchCall = ( + replyMocks.dispatchReplyWithBufferedBlockDispatcher.mock.calls as unknown as Array< + [{ ctx?: { CommandTargetSessionKey?: string } }] + > + )[0]?.[0]; + expect(dispatchCall?.ctx?.CommandTargetSessionKey).toBe("agent:codex-acp:session-1"); + expect(sessionBindingMocks.touch).toHaveBeenCalledWith( + "default:-1001234567890:topic:42", + undefined, + ); + }); + it("aborts native command dispatch when configured ACP topic binding cannot initialize", async () => { const boundSessionKey = "agent:codex:acp:binding:telegram:default:feedface"; persistentBindingMocks.resolveConfiguredAcpBindingRecord.mockReturnValue({ diff --git a/src/telegram/bot-native-commands.test-helpers.ts b/src/telegram/bot-native-commands.test-helpers.ts index 0a749841d76..b79d61d48a3 100644 --- a/src/telegram/bot-native-commands.test-helpers.ts +++ b/src/telegram/bot-native-commands.test-helpers.ts @@ -19,6 +19,7 @@ export function createNativeCommandTestParams(params: { nativeEnabled?: boolean; nativeSkillsEnabled?: boolean; nativeDisabledExplicit?: boolean; + resolveTelegramGroupConfig?: RegisterTelegramNativeCommandParams["resolveTelegramGroupConfig"]; opts?: RegisterTelegramNativeCommandParams["opts"]; }): RegisterTelegramNativeCommandParams { return { @@ -36,10 +37,12 @@ export function createNativeCommandTestParams(params: { nativeSkillsEnabled: params.nativeSkillsEnabled ?? true, nativeDisabledExplicit: params.nativeDisabledExplicit ?? false, resolveGroupPolicy: () => ({ allowlistEnabled: false, allowed: true }), - resolveTelegramGroupConfig: () => ({ - groupConfig: undefined, - topicConfig: undefined, - }), + resolveTelegramGroupConfig: + params.resolveTelegramGroupConfig ?? + (() => ({ + groupConfig: undefined, + topicConfig: undefined, + })), shouldSkipUpdate: () => false, opts: params.opts ?? { token: "token" }, }; diff --git a/src/telegram/bot-native-commands.ts b/src/telegram/bot-native-commands.ts index b8a2b980f27..cc00a46dd8a 100644 --- a/src/telegram/bot-native-commands.ts +++ b/src/telegram/bot-native-commands.ts @@ -1,8 +1,5 @@ import type { Bot, Context } from "grammy"; -import { - ensureConfiguredAcpRouteReady, - resolveConfiguredAcpRoute, -} from "../acp/persistent-bindings.route.js"; +import { ensureConfiguredAcpRouteReady } from "../acp/persistent-bindings.route.js"; import { resolveChunkMode } from "../auto-reply/chunk.js"; import type { CommandArgs } from "../auto-reply/commands-registry.js"; import { @@ -60,12 +57,11 @@ import { buildTelegramThreadParams, buildSenderName, buildTelegramGroupFrom, - buildTelegramGroupPeerId, - buildTelegramParentPeer, resolveTelegramGroupAllowFromContext, resolveTelegramThreadSpec, } from "./bot/helpers.js"; import type { TelegramContext } from "./bot/types.js"; +import { resolveTelegramConversationRoute } from "./conversation-route.js"; import { evaluateTelegramGroupBaseAccess, evaluateTelegramGroupPolicyAccess, @@ -424,15 +420,17 @@ export const registerTelegramNativeCommands = ({ isGroup: boolean; isForum: boolean; resolvedThreadId?: number; + senderId?: string; + topicAgentId?: string; }): Promise<{ chatId: number; threadSpec: ReturnType; - route: ReturnType; + route: ReturnType["route"]; mediaLocalRoots: readonly string[] | undefined; tableMode: ReturnType; chunkMode: ReturnType; } | null> => { - const { msg, isGroup, isForum, resolvedThreadId } = params; + const { msg, isGroup, isForum, resolvedThreadId, senderId, topicAgentId } = params; const chatId = msg.chat.id; const messageThreadId = (msg as { message_thread_id?: number }).message_thread_id; const threadSpec = resolveTelegramThreadSpec({ @@ -440,28 +438,16 @@ export const registerTelegramNativeCommands = ({ isForum, messageThreadId, }); - const parentPeer = buildTelegramParentPeer({ isGroup, resolvedThreadId, chatId }); - const peerId = isGroup ? buildTelegramGroupPeerId(chatId, resolvedThreadId) : String(chatId); - let route = resolveAgentRoute({ + let { route, configuredBinding } = resolveTelegramConversationRoute({ cfg, - channel: "telegram", accountId, - peer: { - kind: isGroup ? "group" : "direct", - id: peerId, - }, - parentPeer, + chatId, + isGroup, + resolvedThreadId, + replyThreadId: threadSpec.id, + senderId, + topicAgentId, }); - const configuredRoute = resolveConfiguredAcpRoute({ - cfg, - route, - channel: "telegram", - accountId, - conversationId: peerId, - parentConversationId: isGroup ? String(chatId) : undefined, - }); - const configuredBinding = configuredRoute.configuredBinding; - route = configuredRoute.route; if (configuredBinding) { const ensured = await ensureConfiguredAcpRouteReady({ cfg, @@ -562,6 +548,8 @@ export const registerTelegramNativeCommands = ({ isGroup, isForum, resolvedThreadId, + senderId, + topicAgentId: topicConfig?.agentId, }); if (!runtimeContext) { return; @@ -788,6 +776,8 @@ export const registerTelegramNativeCommands = ({ isGroup, isForum, resolvedThreadId, + senderId, + topicAgentId: auth.topicConfig?.agentId, }); if (!runtimeContext) { return; diff --git a/src/telegram/conversation-route.ts b/src/telegram/conversation-route.ts new file mode 100644 index 00000000000..478e9049f7a --- /dev/null +++ b/src/telegram/conversation-route.ts @@ -0,0 +1,122 @@ +import { resolveConfiguredAcpRoute } from "../acp/persistent-bindings.route.js"; +import type { OpenClawConfig } from "../config/config.js"; +import { logVerbose } from "../globals.js"; +import { getSessionBindingService } from "../infra/outbound/session-binding-service.js"; +import { + buildAgentSessionKey, + pickFirstExistingAgentId, + resolveAgentRoute, +} from "../routing/resolve-route.js"; +import { buildAgentMainSessionKey, resolveAgentIdFromSessionKey } from "../routing/session-key.js"; +import { + buildTelegramGroupPeerId, + buildTelegramParentPeer, + resolveTelegramDirectPeerId, +} from "./bot/helpers.js"; + +export function resolveTelegramConversationRoute(params: { + cfg: OpenClawConfig; + accountId: string; + chatId: number | string; + isGroup: boolean; + resolvedThreadId?: number; + replyThreadId?: number; + senderId?: string | number | null; + topicAgentId?: string | null; +}): { + route: ReturnType; + configuredBinding: ReturnType["configuredBinding"]; + configuredBindingSessionKey: string; +} { + const peerId = params.isGroup + ? buildTelegramGroupPeerId(params.chatId, params.resolvedThreadId) + : resolveTelegramDirectPeerId({ + chatId: params.chatId, + senderId: params.senderId, + }); + const parentPeer = buildTelegramParentPeer({ + isGroup: params.isGroup, + resolvedThreadId: params.resolvedThreadId, + chatId: params.chatId, + }); + let route = resolveAgentRoute({ + cfg: params.cfg, + channel: "telegram", + accountId: params.accountId, + peer: { + kind: params.isGroup ? "group" : "direct", + id: peerId, + }, + parentPeer, + }); + + const rawTopicAgentId = params.topicAgentId?.trim(); + if (rawTopicAgentId) { + const topicAgentId = pickFirstExistingAgentId(params.cfg, rawTopicAgentId); + route = { + ...route, + agentId: topicAgentId, + sessionKey: buildAgentSessionKey({ + agentId: topicAgentId, + channel: "telegram", + accountId: params.accountId, + peer: { kind: params.isGroup ? "group" : "direct", id: peerId }, + dmScope: params.cfg.session?.dmScope, + identityLinks: params.cfg.session?.identityLinks, + }).toLowerCase(), + mainSessionKey: buildAgentMainSessionKey({ + agentId: topicAgentId, + }).toLowerCase(), + }; + logVerbose( + `telegram: topic route override: topic=${params.resolvedThreadId ?? params.replyThreadId} agent=${topicAgentId} sessionKey=${route.sessionKey}`, + ); + } + + const configuredRoute = resolveConfiguredAcpRoute({ + cfg: params.cfg, + route, + channel: "telegram", + accountId: params.accountId, + conversationId: peerId, + parentConversationId: params.isGroup ? String(params.chatId) : undefined, + }); + let configuredBinding = configuredRoute.configuredBinding; + let configuredBindingSessionKey = configuredRoute.boundSessionKey ?? ""; + route = configuredRoute.route; + + const threadBindingConversationId = + params.replyThreadId != null + ? `${params.chatId}:topic:${params.replyThreadId}` + : !params.isGroup + ? String(params.chatId) + : undefined; + if (threadBindingConversationId) { + const threadBinding = getSessionBindingService().resolveByConversation({ + channel: "telegram", + accountId: params.accountId, + conversationId: threadBindingConversationId, + }); + const boundSessionKey = threadBinding?.targetSessionKey?.trim(); + if (threadBinding && boundSessionKey) { + route = { + ...route, + sessionKey: boundSessionKey, + agentId: resolveAgentIdFromSessionKey(boundSessionKey), + matchedBy: "binding.channel", + }; + configuredBinding = null; + configuredBindingSessionKey = ""; + getSessionBindingService().touch(threadBinding.bindingId); + logVerbose( + `telegram: routed via bound conversation ${threadBindingConversationId} -> ${boundSessionKey}`, + ); + } + } + + return { + route, + configuredBinding, + configuredBindingSessionKey, + }; +}