From b86bc9de95d0f301044caed43cbb4e66beb98db7 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 17 Mar 2026 21:27:14 -0700 Subject: [PATCH] refactor: split remaining monitor runtime helpers --- .../native-command.plugin-dispatch.test.ts | 260 ++------ .../discord/src/monitor/provider.test.ts | 67 ++- extensions/discord/src/runtime-api.ts | 15 +- extensions/feishu/src/bot-content.ts | 471 +++++++++++++++ extensions/feishu/src/bot.ts | 558 +----------------- .../src/mattermost/monitor-resources.ts | 183 ++++++ .../mattermost/src/mattermost/monitor.ts | 149 +---- .../telegram/src/bot-handlers.buffers.ts | 373 ++++++++++++ .../telegram/src/bot-handlers.runtime.ts | 380 ++---------- extensions/telegram/src/monitor.test.ts | 67 ++- .../tlon/src/monitor/approval-runtime.ts | 362 ++++++++++++ extensions/tlon/src/monitor/cites.ts | 53 ++ extensions/tlon/src/monitor/index.ts | 489 +++------------ .../pi-embedded-runner-extraparams.test.ts | 6 +- .../anthropic-stream-wrappers.ts | 35 +- src/agents/provider-capabilities.ts | 5 +- src/channels/plugins/directory-config.ts | 10 +- src/plugin-sdk/runtime-api-guardrails.test.ts | 3 +- .../discord-provider.test-support.ts | 15 +- 19 files changed, 1825 insertions(+), 1676 deletions(-) create mode 100644 extensions/feishu/src/bot-content.ts create mode 100644 extensions/mattermost/src/mattermost/monitor-resources.ts create mode 100644 extensions/telegram/src/bot-handlers.buffers.ts create mode 100644 extensions/tlon/src/monitor/approval-runtime.ts create mode 100644 extensions/tlon/src/monitor/cites.ts diff --git a/extensions/discord/src/monitor/native-command.plugin-dispatch.test.ts b/extensions/discord/src/monitor/native-command.plugin-dispatch.test.ts index 274530b4bd7..4bbd2ab08c8 100644 --- a/extensions/discord/src/monitor/native-command.plugin-dispatch.test.ts +++ b/extensions/discord/src/monitor/native-command.plugin-dispatch.test.ts @@ -2,7 +2,7 @@ import { ChannelType } from "discord-api-types/v10"; import { beforeEach, describe, expect, it, vi } from "vitest"; import type { NativeCommandSpec } from "../../../../src/auto-reply/commands-registry.js"; import * as dispatcherModule from "../../../../src/auto-reply/reply/provider-dispatcher.js"; -import type { ChatType } from "../../../../src/channels/chat-type.js"; +import { setDefaultChannelPluginRegistryForTests } from "../../../../src/commands/channel-test-helpers.js"; import type { OpenClawConfig } from "../../../../src/config/config.js"; import * as pluginCommandsModule from "../../../../src/plugins/commands.js"; import { clearPluginCommands, registerPluginCommand } from "../../../../src/plugins/commands.js"; @@ -12,32 +12,26 @@ import { } from "./native-command.test-helpers.js"; import { createNoopThreadBindingManager } from "./thread-bindings.js"; -type ResolveConfiguredBindingRouteFn = - typeof import("openclaw/plugin-sdk/conversation-runtime").resolveConfiguredBindingRoute; type EnsureConfiguredBindingRouteReadyFn = typeof import("openclaw/plugin-sdk/conversation-runtime").ensureConfiguredBindingRouteReady; -const persistentBindingMocks = vi.hoisted(() => ({ - resolveConfiguredAcpBindingRecord: vi.fn((params) => ({ - bindingResolution: null, - route: params.route, - })), - ensureConfiguredAcpBindingSession: vi.fn(async () => ({ +const ensureConfiguredBindingRouteReadyMock = vi.hoisted(() => + vi.fn(async () => ({ ok: true, })), -})); +); vi.mock("openclaw/plugin-sdk/conversation-runtime", async (importOriginal) => { const actual = await importOriginal(); return { ...actual, - resolveConfiguredBindingRoute: persistentBindingMocks.resolveConfiguredAcpBindingRecord, - ensureConfiguredBindingRouteReady: persistentBindingMocks.ensureConfiguredAcpBindingSession, + ensureConfiguredBindingRouteReady: (...args: unknown[]) => + ensureConfiguredBindingRouteReadyMock( + ...(args as Parameters), + ), }; }); -import { createDiscordNativeCommand } from "./native-command.js"; - function createInteraction(params?: { channelType?: ChannelType; channelId?: string; @@ -66,7 +60,12 @@ function createConfig(): OpenClawConfig { } as OpenClawConfig; } -function createNativeCommand(cfg: OpenClawConfig, commandSpec: NativeCommandSpec) { +async function loadCreateDiscordNativeCommand() { + return (await import("./native-command.js")).createDiscordNativeCommand; +} + +async function createNativeCommand(cfg: OpenClawConfig, commandSpec: NativeCommandSpec) { + const createDiscordNativeCommand = await loadCreateDiscordNativeCommand(); return createDiscordNativeCommand({ command: commandSpec, cfg, @@ -78,7 +77,8 @@ function createNativeCommand(cfg: OpenClawConfig, commandSpec: NativeCommandSpec }); } -function createPluginCommand(params: { cfg: OpenClawConfig; name: string }) { +async function createPluginCommand(params: { cfg: OpenClawConfig; name: string }) { + const createDiscordNativeCommand = await loadCreateDiscordNativeCommand(); return createDiscordNativeCommand({ command: { name: params.name, @@ -119,7 +119,7 @@ async function expectPairCommandReply(params: { commandName: string; interaction: MockCommandInteraction; }) { - const command = createPluginCommand({ + const command = await createPluginCommand({ cfg: params.cfg, name: params.commandName, }); @@ -143,150 +143,14 @@ async function expectPairCommandReply(params: { ); } -function createStatusCommand(cfg: OpenClawConfig) { - return createNativeCommand(cfg, { +async function createStatusCommand(cfg: OpenClawConfig) { + return await createNativeCommand(cfg, { name: "status", description: "Status", acceptsArgs: false, }); } -function resolveConversationFromParams(params: Parameters[0]) { - if ("conversation" in params) { - return params.conversation; - } - return { - channel: params.channel, - accountId: params.accountId, - conversationId: params.conversationId, - ...(params.parentConversationId ? { parentConversationId: params.parentConversationId } : {}), - }; -} - -function createConfiguredBindingResolution(params: { - conversation: ReturnType; - boundSessionKey: string; -}) { - const peerKind: ChatType = params.conversation.conversationId.startsWith("dm-") - ? "direct" - : "channel"; - const configuredBinding = { - spec: { - channel: "discord" as const, - accountId: params.conversation.accountId, - conversationId: params.conversation.conversationId, - ...(params.conversation.parentConversationId - ? { parentConversationId: params.conversation.parentConversationId } - : {}), - agentId: "codex", - mode: "persistent" as const, - }, - record: { - bindingId: `config:acp:discord:${params.conversation.accountId}:${params.conversation.conversationId}`, - targetSessionKey: params.boundSessionKey, - targetKind: "session" as const, - conversation: params.conversation, - status: "active" as const, - boundAt: 0, - }, - }; - return { - conversation: params.conversation, - compiledBinding: { - channel: "discord" as const, - binding: { - type: "acp" as const, - agentId: "codex", - match: { - channel: "discord", - accountId: params.conversation.accountId, - peer: { - kind: peerKind, - id: params.conversation.conversationId, - }, - }, - acp: { - mode: "persistent" as const, - }, - }, - bindingConversationId: params.conversation.conversationId, - target: { - conversationId: params.conversation.conversationId, - ...(params.conversation.parentConversationId - ? { parentConversationId: params.conversation.parentConversationId } - : {}), - }, - agentId: "codex", - provider: { - compileConfiguredBinding: () => ({ - conversationId: params.conversation.conversationId, - ...(params.conversation.parentConversationId - ? { parentConversationId: params.conversation.parentConversationId } - : {}), - }), - matchInboundConversation: () => ({ - conversationId: params.conversation.conversationId, - ...(params.conversation.parentConversationId - ? { parentConversationId: params.conversation.parentConversationId } - : {}), - }), - }, - targetFactory: { - driverId: "acp" as const, - materialize: () => ({ - record: configuredBinding.record, - statefulTarget: { - kind: "stateful" as const, - driverId: "acp", - sessionKey: params.boundSessionKey, - agentId: "codex", - }, - }), - }, - }, - match: { - conversationId: params.conversation.conversationId, - ...(params.conversation.parentConversationId - ? { parentConversationId: params.conversation.parentConversationId } - : {}), - }, - record: configuredBinding.record, - statefulTarget: { - kind: "stateful" as const, - driverId: "acp", - sessionKey: params.boundSessionKey, - agentId: "codex", - }, - }; -} - -function setConfiguredBinding(channelId: string, boundSessionKey: string) { - persistentBindingMocks.resolveConfiguredAcpBindingRecord.mockImplementation((params) => { - const conversation = resolveConversationFromParams(params); - const bindingResolution = createConfiguredBindingResolution({ - conversation: { - ...conversation, - conversationId: channelId, - }, - boundSessionKey, - }); - return { - bindingResolution, - boundSessionKey, - boundAgentId: "codex", - route: { - ...params.route, - agentId: "codex", - sessionKey: boundSessionKey, - matchedBy: "binding.channel", - }, - }; - }); - persistentBindingMocks.ensureConfiguredAcpBindingSession.mockResolvedValue({ - ok: true, - }); -} - function createDispatchSpy() { return vi.spyOn(dispatcherModule, "dispatchReplyWithDispatcher").mockResolvedValue({ counts: { @@ -299,26 +163,23 @@ function createDispatchSpy() { function expectBoundSessionDispatch( dispatchSpy: ReturnType, - boundSessionKey: string, + expectedPattern: RegExp, ) { expect(dispatchSpy).toHaveBeenCalledTimes(1); const dispatchCall = dispatchSpy.mock.calls[0]?.[0] as { ctx?: { SessionKey?: string; CommandTargetSessionKey?: string }; }; - expect(dispatchCall.ctx?.SessionKey).toBe(boundSessionKey); - expect(dispatchCall.ctx?.CommandTargetSessionKey).toBe(boundSessionKey); - expect(persistentBindingMocks.resolveConfiguredAcpBindingRecord).toHaveBeenCalledTimes(1); - expect(persistentBindingMocks.ensureConfiguredAcpBindingSession).toHaveBeenCalledTimes(1); + expect(dispatchCall.ctx?.SessionKey).toMatch(expectedPattern); + expect(dispatchCall.ctx?.CommandTargetSessionKey).toMatch(expectedPattern); + expect(ensureConfiguredBindingRouteReadyMock).toHaveBeenCalledTimes(1); } async function expectBoundStatusCommandDispatch(params: { cfg: OpenClawConfig; interaction: MockCommandInteraction; - channelId: string; - boundSessionKey: string; + expectedPattern: RegExp; }) { - const command = createStatusCommand(params.cfg); - setConfiguredBinding(params.channelId, params.boundSessionKey); + const command = await createStatusCommand(params.cfg); vi.spyOn(pluginCommandsModule, "matchPluginCommand").mockReturnValue(null); const dispatchSpy = createDispatchSpy(); @@ -327,20 +188,16 @@ async function expectBoundStatusCommandDispatch(params: { params.interaction as unknown, ); - expectBoundSessionDispatch(dispatchSpy, params.boundSessionKey); + expectBoundSessionDispatch(dispatchSpy, params.expectedPattern); } describe("Discord native plugin command dispatch", () => { beforeEach(() => { vi.clearAllMocks(); clearPluginCommands(); - persistentBindingMocks.resolveConfiguredAcpBindingRecord.mockReset(); - persistentBindingMocks.resolveConfiguredAcpBindingRecord.mockImplementation((params) => ({ - bindingResolution: null, - route: params.route, - })); - persistentBindingMocks.ensureConfiguredAcpBindingSession.mockReset(); - persistentBindingMocks.ensureConfiguredAcpBindingSession.mockResolvedValue({ + setDefaultChannelPluginRegistryForTests(); + ensureConfiguredBindingRouteReadyMock.mockReset(); + ensureConfiguredBindingRouteReadyMock.mockResolvedValue({ ok: true, }); }); @@ -397,15 +254,7 @@ describe("Discord native plugin command dispatch", () => { description: "Pair", acceptsArgs: true, }; - const command = createDiscordNativeCommand({ - command: commandSpec, - cfg, - discordConfig: cfg.channels?.discord ?? {}, - accountId: "default", - sessionPrefix: "discord:slash", - ephemeralDefault: true, - threadBindings: createNoopThreadBindingManager("default"), - }); + const command = await createNativeCommand(cfg, commandSpec); const interaction = createInteraction({ channelType: ChannelType.GuildText, channelId: "234567890123456789", @@ -449,15 +298,7 @@ describe("Discord native plugin command dispatch", () => { description: "List cron jobs", acceptsArgs: false, }; - const command = createDiscordNativeCommand({ - command: commandSpec, - cfg, - discordConfig: cfg.channels?.discord ?? {}, - accountId: "default", - sessionPrefix: "discord:slash", - ephemeralDefault: true, - threadBindings: createNoopThreadBindingManager("default"), - }); + const command = await createNativeCommand(cfg, commandSpec); const interaction = createInteraction(); const pluginMatch = { command: { @@ -492,11 +333,21 @@ describe("Discord native plugin command dispatch", () => { it("routes native slash commands through configured ACP Discord channel bindings", async () => { const guildId = "1459246755253325866"; const channelId = "1478836151241412759"; - const boundSessionKey = "agent:codex:acp:binding:discord:default:feedface"; const cfg = { commands: { useAccessGroups: false, }, + channels: { + discord: { + guilds: { + [guildId]: { + channels: { + [channelId]: { allow: true, requireMention: false }, + }, + }, + }, + }, + }, bindings: [ { type: "acp", @@ -522,8 +373,7 @@ describe("Discord native plugin command dispatch", () => { await expectBoundStatusCommandDispatch({ cfg, interaction, - channelId, - boundSessionKey, + expectedPattern: /^agent:codex:acp:binding:discord:default:/, }); }); @@ -557,7 +407,7 @@ describe("Discord native plugin command dispatch", () => { }, }, } as OpenClawConfig; - const command = createStatusCommand(cfg); + const command = await createStatusCommand(cfg); const interaction = createInteraction({ channelType: ChannelType.GuildText, channelId, @@ -578,13 +428,11 @@ describe("Discord native plugin command dispatch", () => { expect(dispatchCall.ctx?.CommandTargetSessionKey).toBe( "agent:qwen:discord:channel:1478836151241412759", ); - expect(persistentBindingMocks.resolveConfiguredAcpBindingRecord).toHaveBeenCalledTimes(1); - expect(persistentBindingMocks.ensureConfiguredAcpBindingSession).not.toHaveBeenCalled(); + expect(ensureConfiguredBindingRouteReadyMock).not.toHaveBeenCalled(); }); it("routes Discord DM native slash commands through configured ACP bindings", async () => { const channelId = "dm-1"; - const boundSessionKey = "agent:codex:acp:binding:discord:default:dmfeedface"; const cfg = { commands: { useAccessGroups: false, @@ -617,15 +465,13 @@ describe("Discord native plugin command dispatch", () => { await expectBoundStatusCommandDispatch({ cfg, interaction, - channelId, - boundSessionKey, + expectedPattern: /^agent:codex:acp:binding:discord:default:/, }); }); it("allows recovery commands through configured ACP bindings even when ensure fails", async () => { const guildId = "1459246755253325866"; const channelId = "1479098716916023408"; - const boundSessionKey = "agent:codex:acp:binding:discord:default:feedface"; const cfg = { commands: { useAccessGroups: false, @@ -651,14 +497,13 @@ describe("Discord native plugin command dispatch", () => { guildId, guildName: "Ops", }); - const command = createNativeCommand(cfg, { + const command = await createNativeCommand(cfg, { name: "new", description: "Start a new session.", acceptsArgs: true, }); - setConfiguredBinding(channelId, boundSessionKey); - persistentBindingMocks.ensureConfiguredAcpBindingSession.mockResolvedValue({ + ensureConfiguredBindingRouteReadyMock.mockResolvedValue({ ok: false, error: "acpx exited with code 1", }); @@ -671,10 +516,11 @@ describe("Discord native plugin command dispatch", () => { const dispatchCall = dispatchSpy.mock.calls[0]?.[0] as { ctx?: { SessionKey?: string; CommandTargetSessionKey?: string }; }; - expect(dispatchCall.ctx?.SessionKey).toBe(boundSessionKey); - expect(dispatchCall.ctx?.CommandTargetSessionKey).toBe(boundSessionKey); - expect(persistentBindingMocks.resolveConfiguredAcpBindingRecord).toHaveBeenCalledTimes(1); - expect(persistentBindingMocks.ensureConfiguredAcpBindingSession).not.toHaveBeenCalled(); + expect(dispatchCall.ctx?.SessionKey).toMatch(/^agent:codex:acp:binding:discord:default:/); + expect(dispatchCall.ctx?.CommandTargetSessionKey).toMatch( + /^agent:codex:acp:binding:discord:default:/, + ); + expect(ensureConfiguredBindingRouteReadyMock).not.toHaveBeenCalled(); expect(interaction.reply).not.toHaveBeenCalledWith( expect.objectContaining({ content: "Configured ACP binding is unavailable right now. Please try again.", diff --git a/extensions/discord/src/monitor/provider.test.ts b/extensions/discord/src/monitor/provider.test.ts index 8cda7cc90b3..0e7780374b5 100644 --- a/extensions/discord/src/monitor/provider.test.ts +++ b/extensions/discord/src/monitor/provider.test.ts @@ -7,7 +7,6 @@ import { baseRuntime, getFirstDiscordMessageHandlerParams, getProviderMonitorTestMocks, - mockResolvedDiscordAccountConfig, resetDiscordProviderMonitorMocks, } from "../../../../test/helpers/extensions/discord-provider.test-support.js"; @@ -37,6 +36,21 @@ const { voiceRuntimeModuleLoadedMock, } = getProviderMonitorTestMocks(); +function createConfigWithDiscordAccount(overrides: Record = {}): OpenClawConfig { + return { + channels: { + discord: { + accounts: { + default: { + token: "MTIz.abc.def", + ...overrides, + }, + }, + }, + }, + } as OpenClawConfig; +} + vi.mock("openclaw/plugin-sdk/plugin-runtime", async () => { const actual = await vi.importActual( "openclaw/plugin-sdk/plugin-runtime", @@ -90,7 +104,18 @@ describe("monitorDiscordProvider", () => { }; beforeEach(() => { + vi.resetModules(); resetDiscordProviderMonitorMocks(); + vi.doMock("../accounts.js", () => ({ + resolveDiscordAccount: (...args: Parameters) => + resolveDiscordAccountMock(...args), + })); + vi.doMock("../probe.js", () => ({ + fetchDiscordApplicationId: async () => "app-1", + })); + vi.doMock("../token.js", () => ({ + normalizeDiscordToken: (value?: string) => value, + })); }); it("stops thread bindings when startup fails before lifecycle begins", async () => { @@ -139,7 +164,7 @@ describe("monitorDiscordProvider", () => { it("loads the Discord voice runtime only when voice is enabled", async () => { resolveDiscordAccountMock.mockReturnValue({ accountId: "default", - token: "cfg-token", + token: "MTIz.abc.def", config: { commands: { native: true, nativeSkills: false }, voice: { enabled: true }, @@ -356,11 +381,18 @@ describe("monitorDiscordProvider", () => { }); it("forwards custom eventQueue config from discord config to Carbon Client", async () => { - const { monitorDiscordProvider } = await import("./provider.js"); - - mockResolvedDiscordAccountConfig({ - eventQueue: { listenerTimeout: 300_000 }, + resolveDiscordAccountMock.mockReturnValue({ + accountId: "default", + token: "MTIz.abc.def", + config: { + commands: { native: true, nativeSkills: false }, + voice: { enabled: false }, + agentComponents: { enabled: false }, + execApprovals: { enabled: false }, + eventQueue: { listenerTimeout: 300_000 }, + }, }); + const { monitorDiscordProvider } = await import("./provider.js"); await monitorDiscordProvider({ config: baseConfig(), @@ -374,12 +406,10 @@ describe("monitorDiscordProvider", () => { it("does not reuse eventQueue.listenerTimeout as the queued inbound worker timeout", async () => { const { monitorDiscordProvider } = await import("./provider.js"); - mockResolvedDiscordAccountConfig({ - eventQueue: { listenerTimeout: 50_000 }, - }); - await monitorDiscordProvider({ - config: baseConfig(), + config: createConfigWithDiscordAccount({ + eventQueue: { listenerTimeout: 50_000 }, + }), runtime: baseRuntime(), }); @@ -392,11 +422,18 @@ describe("monitorDiscordProvider", () => { }); it("forwards inbound worker timeout config to the Discord message handler", async () => { - const { monitorDiscordProvider } = await import("./provider.js"); - - mockResolvedDiscordAccountConfig({ - inboundWorker: { runTimeoutMs: 300_000 }, + resolveDiscordAccountMock.mockReturnValue({ + accountId: "default", + token: "MTIz.abc.def", + config: { + commands: { native: true, nativeSkills: false }, + voice: { enabled: false }, + agentComponents: { enabled: false }, + execApprovals: { enabled: false }, + inboundWorker: { runTimeoutMs: 300_000 }, + }, }); + const { monitorDiscordProvider } = await import("./provider.js"); await monitorDiscordProvider({ config: baseConfig(), diff --git a/extensions/discord/src/runtime-api.ts b/extensions/discord/src/runtime-api.ts index 3d789d10bd4..3e277cfea3c 100644 --- a/extensions/discord/src/runtime-api.ts +++ b/extensions/discord/src/runtime-api.ts @@ -35,13 +35,16 @@ export type { ChannelMessageActionAdapter, ChannelMessageActionName, } from "openclaw/plugin-sdk/channel-runtime"; -export { withNormalizedTimestamp } from "../../../src/agents/date-time.js"; -export { assertMediaNotDataUrl } from "../../../src/agents/sandbox-paths.js"; -export { parseAvailableTags, readReactionParams } from "openclaw/plugin-sdk/discord-core"; -export { resolvePollMaxSelections } from "../../../src/polls.js"; -export type { DiscordAccountConfig, DiscordActionConfig } from "../../../src/config/types.js"; +export { + assertMediaNotDataUrl, + parseAvailableTags, + readReactionParams, + resolvePollMaxSelections, + withNormalizedTimestamp, +} from "openclaw/plugin-sdk/discord-core"; +export type { DiscordAccountConfig, DiscordActionConfig } from "openclaw/plugin-sdk/discord"; export { hasConfiguredSecretInput, normalizeResolvedSecretInputString, normalizeSecretInputString, -} from "../../../src/config/types.secrets.js"; +} from "openclaw/plugin-sdk/config-runtime"; diff --git a/extensions/feishu/src/bot-content.ts b/extensions/feishu/src/bot-content.ts new file mode 100644 index 00000000000..d8dcc1c0aa2 --- /dev/null +++ b/extensions/feishu/src/bot-content.ts @@ -0,0 +1,471 @@ +import type { ClawdbotConfig } from "../runtime-api.js"; +import { normalizeFeishuExternalKey } from "./external-keys.js"; +import { downloadMessageResourceFeishu } from "./media.js"; +import { parsePostContent } from "./post.js"; +import { getFeishuRuntime } from "./runtime.js"; +import type { FeishuMediaInfo } from "./types.js"; + +export type FeishuMention = { + key: string; + id: { + open_id?: string; + user_id?: string; + union_id?: string; + }; + name: string; + tenant_key?: string; +}; + +type FeishuMessageLike = { + message: { + content: string; + message_type: string; + mentions?: FeishuMention[]; + chat_id: string; + root_id?: string; + parent_id?: string; + thread_id?: string; + message_id: string; + }; + sender: { + sender_id: { + open_id?: string; + user_id?: string; + }; + }; +}; + +export type GroupSessionScope = "group" | "group_sender" | "group_topic" | "group_topic_sender"; + +export type ResolvedFeishuGroupSession = { + peerId: string; + parentPeer: { kind: "group"; id: string } | null; + groupSessionScope: GroupSessionScope; + replyInThread: boolean; + threadReply: boolean; +}; + +function buildFeishuConversationId(params: { + chatId: string; + scope: GroupSessionScope | "group_sender"; + topicId?: string; + senderOpenId?: string; +}): string { + switch (params.scope) { + case "group_sender": + return `${params.chatId}:sender:${params.senderOpenId}`; + case "group_topic": + return `${params.chatId}:topic:${params.topicId}`; + case "group_topic_sender": + return `${params.chatId}:topic:${params.topicId}:sender:${params.senderOpenId}`; + default: + return params.chatId; + } +} + +export function resolveFeishuGroupSession(params: { + chatId: string; + senderOpenId: string; + messageId: string; + rootId?: string; + threadId?: string; + groupConfig?: { + groupSessionScope?: GroupSessionScope; + topicSessionMode?: "enabled" | "disabled"; + replyInThread?: "enabled" | "disabled"; + }; + feishuCfg?: { + groupSessionScope?: GroupSessionScope; + topicSessionMode?: "enabled" | "disabled"; + replyInThread?: "enabled" | "disabled"; + }; +}): ResolvedFeishuGroupSession { + const { chatId, senderOpenId, messageId, rootId, threadId, groupConfig, feishuCfg } = params; + const normalizedThreadId = threadId?.trim(); + const normalizedRootId = rootId?.trim(); + const threadReply = Boolean(normalizedThreadId || normalizedRootId); + const replyInThread = + (groupConfig?.replyInThread ?? feishuCfg?.replyInThread ?? "disabled") === "enabled" || + threadReply; + const legacyTopicSessionMode = + groupConfig?.topicSessionMode ?? feishuCfg?.topicSessionMode ?? "disabled"; + const groupSessionScope: GroupSessionScope = + groupConfig?.groupSessionScope ?? + feishuCfg?.groupSessionScope ?? + (legacyTopicSessionMode === "enabled" ? "group_topic" : "group"); + const topicScope = + groupSessionScope === "group_topic" || groupSessionScope === "group_topic_sender" + ? (normalizedRootId ?? normalizedThreadId ?? (replyInThread ? messageId : null)) + : null; + + let peerId = chatId; + switch (groupSessionScope) { + case "group_sender": + peerId = buildFeishuConversationId({ chatId, scope: "group_sender", senderOpenId }); + break; + case "group_topic": + peerId = topicScope + ? buildFeishuConversationId({ chatId, scope: "group_topic", topicId: topicScope }) + : chatId; + break; + case "group_topic_sender": + peerId = topicScope + ? buildFeishuConversationId({ + chatId, + scope: "group_topic_sender", + topicId: topicScope, + senderOpenId, + }) + : buildFeishuConversationId({ chatId, scope: "group_sender", senderOpenId }); + break; + case "group": + default: + peerId = chatId; + break; + } + + return { + peerId, + parentPeer: + topicScope && + (groupSessionScope === "group_topic" || groupSessionScope === "group_topic_sender") + ? { kind: "group", id: chatId } + : null, + groupSessionScope, + replyInThread, + threadReply, + }; +} + +export function parseMessageContent(content: string, messageType: string): string { + if (messageType === "post") { + return parsePostContent(content).textContent; + } + + try { + const parsed = JSON.parse(content); + if (messageType === "text") { + return parsed.text || ""; + } + if (messageType === "share_chat") { + if (parsed && typeof parsed === "object") { + const share = parsed as { body?: unknown; summary?: unknown; share_chat_id?: unknown }; + if (typeof share.body === "string" && share.body.trim()) { + return share.body.trim(); + } + if (typeof share.summary === "string" && share.summary.trim()) { + return share.summary.trim(); + } + if (typeof share.share_chat_id === "string" && share.share_chat_id.trim()) { + return `[Forwarded message: ${share.share_chat_id.trim()}]`; + } + } + return "[Forwarded message]"; + } + if (messageType === "merge_forward") { + return "[Merged and Forwarded Message - loading...]"; + } + return content; + } catch { + return content; + } +} + +function formatSubMessageContent(content: string, contentType: string): string { + try { + const parsed = JSON.parse(content); + switch (contentType) { + case "text": + return parsed.text || content; + case "post": + return parsePostContent(content).textContent; + case "image": + return "[Image]"; + case "file": + return `[File: ${parsed.file_name || "unknown"}]`; + case "audio": + return "[Audio]"; + case "video": + return "[Video]"; + case "sticker": + return "[Sticker]"; + case "merge_forward": + return "[Nested Merged Forward]"; + default: + return `[${contentType}]`; + } + } catch { + return content; + } +} + +export function parseMergeForwardContent(params: { + content: string; + log?: (...args: any[]) => void; +}): string { + const { content, log } = params; + const maxMessages = 50; + log?.("feishu: parsing merge_forward sub-messages from API response"); + + let items: Array<{ + message_id?: string; + msg_type?: string; + body?: { content?: string }; + sender?: { id?: string }; + upper_message_id?: string; + create_time?: string; + }>; + try { + items = JSON.parse(content); + } catch { + log?.("feishu: merge_forward items parse failed"); + return "[Merged and Forwarded Message - parse error]"; + } + if (!Array.isArray(items) || items.length === 0) { + return "[Merged and Forwarded Message - no sub-messages]"; + } + const subMessages = items.filter((item) => item.upper_message_id); + if (subMessages.length === 0) { + return "[Merged and Forwarded Message - no sub-messages found]"; + } + + log?.(`feishu: merge_forward contains ${subMessages.length} sub-messages`); + subMessages.sort( + (a, b) => parseInt(a.create_time || "0", 10) - parseInt(b.create_time || "0", 10), + ); + + const lines = ["[Merged and Forwarded Messages]"]; + for (const item of subMessages.slice(0, maxMessages)) { + lines.push(`- ${formatSubMessageContent(item.body?.content || "", item.msg_type || "text")}`); + } + if (subMessages.length > maxMessages) { + lines.push(`... and ${subMessages.length - maxMessages} more messages`); + } + return lines.join("\n"); +} + +export function checkBotMentioned(event: FeishuMessageLike, botOpenId?: string): boolean { + if (!botOpenId) { + return false; + } + if ((event.message.content ?? "").includes("@_all")) { + return true; + } + const mentions = event.message.mentions ?? []; + if (mentions.length > 0) { + return mentions.some((mention) => mention.id.open_id === botOpenId); + } + if (event.message.message_type === "post") { + return parsePostContent(event.message.content).mentionedOpenIds.some((id) => id === botOpenId); + } + return false; +} + +export function normalizeMentions( + text: string, + mentions?: FeishuMention[], + botStripId?: string, +): string { + if (!mentions || mentions.length === 0) { + return text; + } + const escaped = (value: string) => value.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); + const escapeName = (value: string) => value.replace(//g, ">"); + let result = text; + for (const mention of mentions) { + const mentionId = mention.id.open_id; + const replacement = + botStripId && mentionId === botStripId + ? "" + : mentionId + ? `${escapeName(mention.name)}` + : `@${mention.name}`; + result = result.replace(new RegExp(escaped(mention.key), "g"), () => replacement).trim(); + } + return result; +} + +export function normalizeFeishuCommandProbeBody(text: string): string { + if (!text) { + return ""; + } + return text + .replace(/]*>[^<]*<\/at>/giu, " ") + .replace(/(^|\s)@[^/\s]+(?=\s|$|\/)/gu, "$1") + .replace(/\s+/g, " ") + .trim(); +} + +export function parseMediaKeys( + content: string, + messageType: string, +): { imageKey?: string; fileKey?: string; fileName?: string } { + try { + const parsed = JSON.parse(content); + const imageKey = normalizeFeishuExternalKey(parsed.image_key); + const fileKey = normalizeFeishuExternalKey(parsed.file_key); + switch (messageType) { + case "image": + return { imageKey, fileName: parsed.file_name }; + case "file": + case "audio": + case "sticker": + return { fileKey, fileName: parsed.file_name }; + case "video": + case "media": + return { fileKey, imageKey, fileName: parsed.file_name }; + default: + return {}; + } + } catch { + return {}; + } +} + +export function toMessageResourceType(messageType: string): "image" | "file" { + return messageType === "image" ? "image" : "file"; +} + +function inferPlaceholder(messageType: string): string { + switch (messageType) { + case "image": + return ""; + case "file": + return ""; + case "audio": + return ""; + case "video": + case "media": + return ""; + case "sticker": + return ""; + default: + return ""; + } +} + +export async function resolveFeishuMediaList(params: { + cfg: ClawdbotConfig; + messageId: string; + messageType: string; + content: string; + maxBytes: number; + log?: (msg: string) => void; + accountId?: string; +}): Promise { + const { cfg, messageId, messageType, content, maxBytes, log, accountId } = params; + const mediaTypes = ["image", "file", "audio", "video", "media", "sticker", "post"]; + if (!mediaTypes.includes(messageType)) { + return []; + } + + const out: FeishuMediaInfo[] = []; + const core = getFeishuRuntime(); + + if (messageType === "post") { + const { imageKeys, mediaKeys } = parsePostContent(content); + if (imageKeys.length === 0 && mediaKeys.length === 0) { + return []; + } + if (imageKeys.length > 0) { + log?.(`feishu: post message contains ${imageKeys.length} embedded image(s)`); + } + if (mediaKeys.length > 0) { + log?.(`feishu: post message contains ${mediaKeys.length} embedded media file(s)`); + } + + for (const imageKey of imageKeys) { + try { + const result = await downloadMessageResourceFeishu({ + cfg, + messageId, + fileKey: imageKey, + type: "image", + accountId, + }); + const contentType = + result.contentType ?? (await core.media.detectMime({ buffer: result.buffer })); + const saved = await core.channel.media.saveMediaBuffer( + result.buffer, + contentType, + "inbound", + maxBytes, + ); + out.push({ + path: saved.path, + contentType: saved.contentType, + placeholder: "", + }); + log?.(`feishu: downloaded embedded image ${imageKey}, saved to ${saved.path}`); + } catch (err) { + log?.(`feishu: failed to download embedded image ${imageKey}: ${String(err)}`); + } + } + + for (const media of mediaKeys) { + try { + const result = await downloadMessageResourceFeishu({ + cfg, + messageId, + fileKey: media.fileKey, + type: "file", + accountId, + }); + const contentType = + result.contentType ?? (await core.media.detectMime({ buffer: result.buffer })); + const saved = await core.channel.media.saveMediaBuffer( + result.buffer, + contentType, + "inbound", + maxBytes, + ); + out.push({ + path: saved.path, + contentType: saved.contentType, + placeholder: "", + }); + log?.(`feishu: downloaded embedded media ${media.fileKey}, saved to ${saved.path}`); + } catch (err) { + log?.(`feishu: failed to download embedded media ${media.fileKey}: ${String(err)}`); + } + } + return out; + } + + const mediaKeys = parseMediaKeys(content, messageType); + if (!mediaKeys.imageKey && !mediaKeys.fileKey) { + return []; + } + + try { + const fileKey = mediaKeys.fileKey || mediaKeys.imageKey; + if (!fileKey) { + return []; + } + const result = await downloadMessageResourceFeishu({ + cfg, + messageId, + fileKey, + type: toMessageResourceType(messageType), + accountId, + }); + const contentType = + result.contentType ?? (await core.media.detectMime({ buffer: result.buffer })); + const saved = await core.channel.media.saveMediaBuffer( + result.buffer, + contentType, + "inbound", + maxBytes, + result.fileName || mediaKeys.fileName, + ); + out.push({ + path: saved.path, + contentType: saved.contentType, + placeholder: inferPlaceholder(messageType), + }); + log?.(`feishu: downloaded ${messageType} media, saved to ${saved.path}`); + } catch (err) { + log?.(`feishu: failed to download ${messageType} media: ${String(err)}`); + } + return out; +} diff --git a/extensions/feishu/src/bot.ts b/extensions/feishu/src/bot.ts index 871ca9dfb1d..3a7e62adc68 100644 --- a/extensions/feishu/src/bot.ts +++ b/extensions/feishu/src/bot.ts @@ -22,13 +22,20 @@ import { warnMissingProviderGroupPolicyFallbackOnce, } from "../runtime-api.js"; import { resolveFeishuAccount } from "./accounts.js"; +import { + checkBotMentioned, + normalizeFeishuCommandProbeBody, + normalizeMentions, + parseMergeForwardContent, + parseMessageContent, + resolveFeishuGroupSession, + resolveFeishuMediaList, + toMessageResourceType, +} from "./bot-content.js"; import { type FeishuPermissionError, resolveFeishuSenderName } from "./bot-sender-name.js"; import { createFeishuClient } from "./client.js"; -import { buildFeishuConversationId } from "./conversation-id.js"; import { finalizeFeishuMessageProcessing, tryRecordMessagePersistent } from "./dedup.js"; import { maybeCreateDynamicAgent } from "./dynamic-agent.js"; -import { normalizeFeishuExternalKey } from "./external-keys.js"; -import { downloadMessageResourceFeishu } from "./media.js"; import { extractMentionTargets, isMentionForwardRequest } from "./mention.js"; import { resolveFeishuGroupConfig, @@ -36,13 +43,14 @@ import { resolveFeishuAllowlistMatch, isFeishuGroupAllowed, } from "./policy.js"; -import { parsePostContent } from "./post.js"; import { createFeishuReplyDispatcher } from "./reply-dispatcher.js"; import { getFeishuRuntime } from "./runtime.js"; import { getMessageFeishu, listFeishuThreadMessages, sendMessageFeishu } from "./send.js"; -import type { FeishuMessageContext, FeishuMediaInfo } from "./types.js"; +import type { FeishuMessageContext } from "./types.js"; import type { DynamicAgentCreationConfig } from "./types.js"; +export { toMessageResourceType } from "./bot-content.js"; + // Cache permission errors to avoid spamming the user with repeated notifications. // Key: appId or "default", Value: timestamp of last notification const permissionErrorNotifiedAt = new Map(); @@ -91,546 +99,6 @@ export type FeishuBotAddedEvent = { operator_tenant_key?: string; }; -type GroupSessionScope = "group" | "group_sender" | "group_topic" | "group_topic_sender"; - -type ResolvedFeishuGroupSession = { - peerId: string; - parentPeer: { kind: "group"; id: string } | null; - groupSessionScope: GroupSessionScope; - replyInThread: boolean; - threadReply: boolean; -}; - -function resolveFeishuGroupSession(params: { - chatId: string; - senderOpenId: string; - messageId: string; - rootId?: string; - threadId?: string; - groupConfig?: { - groupSessionScope?: GroupSessionScope; - topicSessionMode?: "enabled" | "disabled"; - replyInThread?: "enabled" | "disabled"; - }; - feishuCfg?: { - groupSessionScope?: GroupSessionScope; - topicSessionMode?: "enabled" | "disabled"; - replyInThread?: "enabled" | "disabled"; - }; -}): ResolvedFeishuGroupSession { - const { chatId, senderOpenId, messageId, rootId, threadId, groupConfig, feishuCfg } = params; - - const normalizedThreadId = threadId?.trim(); - const normalizedRootId = rootId?.trim(); - const threadReply = Boolean(normalizedThreadId || normalizedRootId); - const replyInThread = - (groupConfig?.replyInThread ?? feishuCfg?.replyInThread ?? "disabled") === "enabled" || - threadReply; - - const legacyTopicSessionMode = - groupConfig?.topicSessionMode ?? feishuCfg?.topicSessionMode ?? "disabled"; - const groupSessionScope: GroupSessionScope = - groupConfig?.groupSessionScope ?? - feishuCfg?.groupSessionScope ?? - (legacyTopicSessionMode === "enabled" ? "group_topic" : "group"); - - // Keep topic session keys stable across the "first turn creates thread" flow: - // first turn may only have message_id, while the next turn carries root_id/thread_id. - // Prefer root_id first so both turns stay on the same peer key. - const topicScope = - groupSessionScope === "group_topic" || groupSessionScope === "group_topic_sender" - ? (normalizedRootId ?? normalizedThreadId ?? (replyInThread ? messageId : null)) - : null; - - let peerId = chatId; - switch (groupSessionScope) { - case "group_sender": - peerId = buildFeishuConversationId({ - chatId, - scope: "group_sender", - senderOpenId, - }); - break; - case "group_topic": - peerId = topicScope - ? buildFeishuConversationId({ - chatId, - scope: "group_topic", - topicId: topicScope, - }) - : chatId; - break; - case "group_topic_sender": - peerId = topicScope - ? buildFeishuConversationId({ - chatId, - scope: "group_topic_sender", - topicId: topicScope, - senderOpenId, - }) - : buildFeishuConversationId({ - chatId, - scope: "group_sender", - senderOpenId, - }); - break; - case "group": - default: - peerId = chatId; - break; - } - - const parentPeer = - topicScope && - (groupSessionScope === "group_topic" || groupSessionScope === "group_topic_sender") - ? { - kind: "group" as const, - id: chatId, - } - : null; - - return { - peerId, - parentPeer, - groupSessionScope, - replyInThread, - threadReply, - }; -} - -function parseMessageContent(content: string, messageType: string): string { - if (messageType === "post") { - // Extract text content from rich text post - const { textContent } = parsePostContent(content); - return textContent; - } - - try { - const parsed = JSON.parse(content); - if (messageType === "text") { - return parsed.text || ""; - } - if (messageType === "share_chat") { - // Preserve available summary text for merged/forwarded chat messages. - if (parsed && typeof parsed === "object") { - const share = parsed as { - body?: unknown; - summary?: unknown; - share_chat_id?: unknown; - }; - if (typeof share.body === "string" && share.body.trim().length > 0) { - return share.body.trim(); - } - if (typeof share.summary === "string" && share.summary.trim().length > 0) { - return share.summary.trim(); - } - if (typeof share.share_chat_id === "string" && share.share_chat_id.trim().length > 0) { - return `[Forwarded message: ${share.share_chat_id.trim()}]`; - } - } - return "[Forwarded message]"; - } - if (messageType === "merge_forward") { - // Return placeholder; actual content fetched asynchronously in handleFeishuMessage - return "[Merged and Forwarded Message - loading...]"; - } - return content; - } catch { - return content; - } -} - -/** - * Parse merge_forward message content and fetch sub-messages. - * Returns formatted text content of all sub-messages. - */ -function parseMergeForwardContent(params: { - content: string; - log?: (...args: any[]) => void; -}): string { - const { content, log } = params; - const maxMessages = 50; - - // For merge_forward, the API returns all sub-messages in items array - // with upper_message_id pointing to the merge_forward message. - // The 'content' parameter here is actually the full API response items array as JSON. - log?.(`feishu: parsing merge_forward sub-messages from API response`); - - let items: Array<{ - message_id?: string; - msg_type?: string; - body?: { content?: string }; - sender?: { id?: string }; - upper_message_id?: string; - create_time?: string; - }>; - - try { - items = JSON.parse(content); - } catch { - log?.(`feishu: merge_forward items parse failed`); - return "[Merged and Forwarded Message - parse error]"; - } - - if (!Array.isArray(items) || items.length === 0) { - return "[Merged and Forwarded Message - no sub-messages]"; - } - - // Filter to only sub-messages (those with upper_message_id, skip the merge_forward container itself) - const subMessages = items.filter((item) => item.upper_message_id); - - if (subMessages.length === 0) { - return "[Merged and Forwarded Message - no sub-messages found]"; - } - - log?.(`feishu: merge_forward contains ${subMessages.length} sub-messages`); - - // Sort by create_time - subMessages.sort((a, b) => { - const timeA = parseInt(a.create_time || "0", 10); - const timeB = parseInt(b.create_time || "0", 10); - return timeA - timeB; - }); - - // Format output - const lines: string[] = ["[Merged and Forwarded Messages]"]; - const limitedMessages = subMessages.slice(0, maxMessages); - - for (const item of limitedMessages) { - const msgContent = item.body?.content || ""; - const msgType = item.msg_type || "text"; - const formatted = formatSubMessageContent(msgContent, msgType); - lines.push(`- ${formatted}`); - } - - if (subMessages.length > maxMessages) { - lines.push(`... and ${subMessages.length - maxMessages} more messages`); - } - - return lines.join("\n"); -} - -/** - * Format sub-message content based on message type. - */ -function formatSubMessageContent(content: string, contentType: string): string { - try { - const parsed = JSON.parse(content); - switch (contentType) { - case "text": - return parsed.text || content; - case "post": { - const { textContent } = parsePostContent(content); - return textContent; - } - case "image": - return "[Image]"; - case "file": - return `[File: ${parsed.file_name || "unknown"}]`; - case "audio": - return "[Audio]"; - case "video": - return "[Video]"; - case "sticker": - return "[Sticker]"; - case "merge_forward": - return "[Nested Merged Forward]"; - default: - return `[${contentType}]`; - } - } catch { - return content; - } -} - -function checkBotMentioned(event: FeishuMessageEvent, botOpenId?: string): boolean { - if (!botOpenId) return false; - // Check for @all (@_all in Feishu) — treat as mentioning every bot - const rawContent = event.message.content ?? ""; - if (rawContent.includes("@_all")) return true; - const mentions = event.message.mentions ?? []; - if (mentions.length > 0) { - // Rely on Feishu mention IDs; display names can vary by alias/context. - return mentions.some((m) => m.id.open_id === botOpenId); - } - // Post (rich text) messages may have empty message.mentions when they contain docs/paste - if (event.message.message_type === "post") { - const { mentionedOpenIds } = parsePostContent(event.message.content); - return mentionedOpenIds.some((id) => id === botOpenId); - } - return false; -} - -function normalizeMentions( - text: string, - mentions?: FeishuMessageEvent["message"]["mentions"], - botStripId?: string, -): string { - if (!mentions || mentions.length === 0) return text; - - const escaped = (value: string) => value.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); - const escapeName = (value: string) => value.replace(//g, ">"); - let result = text; - - for (const mention of mentions) { - const mentionId = mention.id.open_id; - const replacement = - botStripId && mentionId === botStripId - ? "" - : mentionId - ? `${escapeName(mention.name)}` - : `@${mention.name}`; - - result = result.replace(new RegExp(escaped(mention.key), "g"), () => replacement).trim(); - } - - return result; -} - -function normalizeFeishuCommandProbeBody(text: string): string { - if (!text) { - return ""; - } - return text - .replace(/]*>[^<]*<\/at>/giu, " ") - .replace(/(^|\s)@[^/\s]+(?=\s|$|\/)/gu, "$1") - .replace(/\s+/g, " ") - .trim(); -} - -/** - * Parse media keys from message content based on message type. - */ -function parseMediaKeys( - content: string, - messageType: string, -): { - imageKey?: string; - fileKey?: string; - fileName?: string; -} { - try { - const parsed = JSON.parse(content); - const imageKey = normalizeFeishuExternalKey(parsed.image_key); - const fileKey = normalizeFeishuExternalKey(parsed.file_key); - switch (messageType) { - case "image": - return { imageKey, fileName: parsed.file_name }; - case "file": - return { fileKey, fileName: parsed.file_name }; - case "audio": - return { fileKey, fileName: parsed.file_name }; - case "video": - case "media": - // Video/media has both file_key (video) and image_key (thumbnail) - return { fileKey, imageKey, fileName: parsed.file_name }; - case "sticker": - return { fileKey, fileName: parsed.file_name }; - default: - return {}; - } - } catch { - return {}; - } -} - -/** - * Map Feishu message type to messageResource.get resource type. - * Feishu messageResource API supports only: image | file. - */ -export function toMessageResourceType(messageType: string): "image" | "file" { - return messageType === "image" ? "image" : "file"; -} - -/** - * Infer placeholder text based on message type. - */ -function inferPlaceholder(messageType: string): string { - switch (messageType) { - case "image": - return ""; - case "file": - return ""; - case "audio": - return ""; - case "video": - case "media": - return ""; - case "sticker": - return ""; - default: - return ""; - } -} - -/** - * Resolve media from a Feishu message, downloading and saving to disk. - * Similar to Discord's resolveMediaList(). - */ -async function resolveFeishuMediaList(params: { - cfg: ClawdbotConfig; - messageId: string; - messageType: string; - content: string; - maxBytes: number; - log?: (msg: string) => void; - accountId?: string; -}): Promise { - const { cfg, messageId, messageType, content, maxBytes, log, accountId } = params; - - // Only process media message types (including post for embedded images) - const mediaTypes = ["image", "file", "audio", "video", "media", "sticker", "post"]; - if (!mediaTypes.includes(messageType)) { - return []; - } - - const out: FeishuMediaInfo[] = []; - const core = getFeishuRuntime(); - - // Handle post (rich text) messages with embedded images/media. - if (messageType === "post") { - const { imageKeys, mediaKeys: postMediaKeys } = parsePostContent(content); - if (imageKeys.length === 0 && postMediaKeys.length === 0) { - return []; - } - - if (imageKeys.length > 0) { - log?.(`feishu: post message contains ${imageKeys.length} embedded image(s)`); - } - if (postMediaKeys.length > 0) { - log?.(`feishu: post message contains ${postMediaKeys.length} embedded media file(s)`); - } - - for (const imageKey of imageKeys) { - try { - // Embedded images in post use messageResource API with image_key as file_key - const result = await downloadMessageResourceFeishu({ - cfg, - messageId, - fileKey: imageKey, - type: "image", - accountId, - }); - - let contentType = result.contentType; - if (!contentType) { - contentType = await core.media.detectMime({ buffer: result.buffer }); - } - - const saved = await core.channel.media.saveMediaBuffer( - result.buffer, - contentType, - "inbound", - maxBytes, - ); - - out.push({ - path: saved.path, - contentType: saved.contentType, - placeholder: "", - }); - - log?.(`feishu: downloaded embedded image ${imageKey}, saved to ${saved.path}`); - } catch (err) { - log?.(`feishu: failed to download embedded image ${imageKey}: ${String(err)}`); - } - } - - for (const media of postMediaKeys) { - try { - const result = await downloadMessageResourceFeishu({ - cfg, - messageId, - fileKey: media.fileKey, - type: "file", - accountId, - }); - - let contentType = result.contentType; - if (!contentType) { - contentType = await core.media.detectMime({ buffer: result.buffer }); - } - - const saved = await core.channel.media.saveMediaBuffer( - result.buffer, - contentType, - "inbound", - maxBytes, - ); - - out.push({ - path: saved.path, - contentType: saved.contentType, - placeholder: "", - }); - - log?.(`feishu: downloaded embedded media ${media.fileKey}, saved to ${saved.path}`); - } catch (err) { - log?.(`feishu: failed to download embedded media ${media.fileKey}: ${String(err)}`); - } - } - - return out; - } - - // Handle other media types - const mediaKeys = parseMediaKeys(content, messageType); - if (!mediaKeys.imageKey && !mediaKeys.fileKey) { - return []; - } - - try { - let buffer: Buffer; - let contentType: string | undefined; - let fileName: string | undefined; - - // For message media, always use messageResource API - // The image.get API is only for images uploaded via im/v1/images, not for message attachments - const fileKey = mediaKeys.fileKey || mediaKeys.imageKey; - if (!fileKey) { - return []; - } - - const resourceType = toMessageResourceType(messageType); - const result = await downloadMessageResourceFeishu({ - cfg, - messageId, - fileKey, - type: resourceType, - accountId, - }); - buffer = result.buffer; - contentType = result.contentType; - fileName = result.fileName || mediaKeys.fileName; - - // Detect mime type if not provided - if (!contentType) { - contentType = await core.media.detectMime({ buffer }); - } - - // Save to disk using core's saveMediaBuffer - const saved = await core.channel.media.saveMediaBuffer( - buffer, - contentType, - "inbound", - maxBytes, - fileName, - ); - - out.push({ - path: saved.path, - contentType: saved.contentType, - placeholder: inferPlaceholder(messageType), - }); - - log?.(`feishu: downloaded ${messageType} media, saved to ${saved.path}`); - } catch (err) { - log?.(`feishu: failed to download ${messageType} media: ${String(err)}`); - } - - return out; -} - // --- Broadcast support --- // Resolve broadcast agent list for a given peer (group) ID. // Returns null if no broadcast config exists or the peer is not in the broadcast list. diff --git a/extensions/mattermost/src/mattermost/monitor-resources.ts b/extensions/mattermost/src/mattermost/monitor-resources.ts new file mode 100644 index 00000000000..5b34a90f9e2 --- /dev/null +++ b/extensions/mattermost/src/mattermost/monitor-resources.ts @@ -0,0 +1,183 @@ +import { + fetchMattermostChannel, + fetchMattermostUser, + sendMattermostTyping, + updateMattermostPost, + type MattermostChannel, + type MattermostClient, + type MattermostUser, +} from "./client.js"; +import { buildButtonProps, type MattermostInteractionResponse } from "./interactions.js"; + +export type MattermostMediaKind = "image" | "audio" | "video" | "document" | "unknown"; + +export type MattermostMediaInfo = { + path: string; + contentType?: string; + kind: MattermostMediaKind; +}; + +const CHANNEL_CACHE_TTL_MS = 5 * 60_000; +const USER_CACHE_TTL_MS = 10 * 60_000; + +type FetchRemoteMedia = (params: { + url: string; + requestInit?: RequestInit; + filePathHint?: string; + maxBytes: number; + ssrfPolicy?: { allowedHostnames?: string[] }; +}) => Promise<{ buffer: Uint8Array; contentType?: string | null }>; + +type SaveMediaBuffer = ( + buffer: Uint8Array, + contentType: string | undefined, + direction: "inbound" | "outbound", + maxBytes: number, +) => Promise<{ path: string; contentType?: string | null }>; + +export function createMattermostMonitorResources(params: { + accountId: string; + callbackUrl: string; + client: MattermostClient; + logger: { debug?: (...args: unknown[]) => void }; + mediaMaxBytes: number; + fetchRemoteMedia: FetchRemoteMedia; + saveMediaBuffer: SaveMediaBuffer; + mediaKindFromMime: (contentType?: string) => MattermostMediaKind | null | undefined; +}) { + const { + accountId, + callbackUrl, + client, + logger, + mediaMaxBytes, + fetchRemoteMedia, + saveMediaBuffer, + mediaKindFromMime, + } = params; + const channelCache = new Map(); + const userCache = new Map(); + + const resolveMattermostMedia = async ( + fileIds?: string[] | null, + ): Promise => { + const ids = (fileIds ?? []).map((id) => id?.trim()).filter(Boolean); + if (ids.length === 0) { + return []; + } + const out: MattermostMediaInfo[] = []; + for (const fileId of ids) { + try { + const fetched = await fetchRemoteMedia({ + url: `${client.apiBaseUrl}/files/${fileId}`, + requestInit: { + headers: { + Authorization: `Bearer ${client.token}`, + }, + }, + filePathHint: fileId, + maxBytes: mediaMaxBytes, + ssrfPolicy: { allowedHostnames: [new URL(client.baseUrl).hostname] }, + }); + const saved = await saveMediaBuffer( + Buffer.from(fetched.buffer), + fetched.contentType ?? undefined, + "inbound", + mediaMaxBytes, + ); + const contentType = saved.contentType ?? fetched.contentType ?? undefined; + out.push({ + path: saved.path, + contentType, + kind: mediaKindFromMime(contentType) ?? "unknown", + }); + } catch (err) { + logger.debug?.(`mattermost: failed to download file ${fileId}: ${String(err)}`); + } + } + return out; + }; + + const sendTypingIndicator = async (channelId: string, parentId?: string) => { + await sendMattermostTyping(client, { channelId, parentId }); + }; + + const resolveChannelInfo = async (channelId: string): Promise => { + const cached = channelCache.get(channelId); + if (cached && cached.expiresAt > Date.now()) { + return cached.value; + } + try { + const info = await fetchMattermostChannel(client, channelId); + channelCache.set(channelId, { + value: info, + expiresAt: Date.now() + CHANNEL_CACHE_TTL_MS, + }); + return info; + } catch (err) { + logger.debug?.(`mattermost: channel lookup failed: ${String(err)}`); + channelCache.set(channelId, { + value: null, + expiresAt: Date.now() + CHANNEL_CACHE_TTL_MS, + }); + return null; + } + }; + + const resolveUserInfo = async (userId: string): Promise => { + const cached = userCache.get(userId); + if (cached && cached.expiresAt > Date.now()) { + return cached.value; + } + try { + const info = await fetchMattermostUser(client, userId); + userCache.set(userId, { + value: info, + expiresAt: Date.now() + USER_CACHE_TTL_MS, + }); + return info; + } catch (err) { + logger.debug?.(`mattermost: user lookup failed: ${String(err)}`); + userCache.set(userId, { + value: null, + expiresAt: Date.now() + USER_CACHE_TTL_MS, + }); + return null; + } + }; + + const buildModelPickerProps = ( + channelId: string, + buttons: Array, + ): Record | undefined => + buildButtonProps({ + callbackUrl, + accountId, + channelId, + buttons, + }); + + const updateModelPickerPost = async (params: { + channelId: string; + postId: string; + message: string; + buttons?: Array; + }): Promise => { + const props = buildModelPickerProps(params.channelId, params.buttons ?? []) ?? { + attachments: [], + }; + await updateMattermostPost(client, params.postId, { + message: params.message, + props, + }); + return {}; + }; + + return { + resolveMattermostMedia, + sendTypingIndicator, + resolveChannelInfo, + resolveUserInfo, + updateModelPickerPost, + }; +} diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index 23153cfae9c..4cd74216811 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -44,7 +44,6 @@ import { type MattermostUser, } from "./client.js"; import { - buildButtonProps, computeInteractionCallbackUrl, createMattermostInteractionHandler, resolveInteractionCallbackPath, @@ -75,6 +74,7 @@ import { resolveThreadSessionKeys, } from "./monitor-helpers.js"; import { resolveOncharPrefixes, stripOncharPrefix } from "./monitor-onchar.js"; +import { createMattermostMonitorResources, type MattermostMediaInfo } from "./monitor-resources.js"; import { registerMattermostMonitorSlashCommands } from "./monitor-slash.js"; import { createMattermostConnectOnce, @@ -117,8 +117,6 @@ type MattermostReaction = { }; const RECENT_MATTERMOST_MESSAGE_TTL_MS = 5 * 60_000; const RECENT_MATTERMOST_MESSAGE_MAX = 2000; -const CHANNEL_CACHE_TTL_MS = 5 * 60_000; -const USER_CACHE_TTL_MS = 10 * 60_000; function isLoopbackHost(hostname: string): boolean { return hostname === "localhost" || hostname === "127.0.0.1" || hostname === "::1"; @@ -215,12 +213,6 @@ export function resolveMattermostThreadSessionContext(params: { parentSessionKey: threadKeys.parentSessionKey, }; } -type MattermostMediaInfo = { - path: string; - contentType?: string; - kind: MediaKind; -}; - function buildMattermostAttachmentPlaceholder(mediaList: MattermostMediaInfo[]): string { if (mediaList.length === 0) { return ""; @@ -286,6 +278,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} baseUrl, botUserId, }); + const slashEnabled = getSlashCommandState(account.accountId) != null; // ─── Interactive buttons registration ────────────────────────────────────── // Derive a stable HMAC secret from the bot token so CLI and gateway share it. @@ -536,8 +529,6 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} log: (msg: string) => runtime.log?.(msg), }); - const channelCache = new Map(); - const userCache = new Map(); const logger = core.logging.getChildLogger({ module: "mattermost" }); const logVerboseMessage = (message: string) => { if (!core.logging.shouldLogVerbose()) { @@ -570,123 +561,25 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} log: (message) => logVerboseMessage(message), }); - const resolveMattermostMedia = async ( - fileIds?: string[] | null, - ): Promise => { - const ids = (fileIds ?? []).map((id) => id?.trim()).filter(Boolean); - if (ids.length === 0) { - return []; - } - const out: MattermostMediaInfo[] = []; - for (const fileId of ids) { - try { - const fetched = await core.channel.media.fetchRemoteMedia({ - url: `${client.apiBaseUrl}/files/${fileId}`, - requestInit: { - headers: { - Authorization: `Bearer ${client.token}`, - }, - }, - filePathHint: fileId, - maxBytes: mediaMaxBytes, - // Allow fetching from the Mattermost server host (may be localhost or - // a private IP). Without this, SSRF guards block media downloads. - // Credit: #22594 (@webclerk) - ssrfPolicy: { allowedHostnames: [new URL(client.baseUrl).hostname] }, - }); - const saved = await core.channel.media.saveMediaBuffer( - fetched.buffer, - fetched.contentType ?? undefined, - "inbound", - mediaMaxBytes, - ); - const contentType = saved.contentType ?? fetched.contentType ?? undefined; - out.push({ - path: saved.path, - contentType, - kind: core.media.mediaKindFromMime(contentType) ?? "unknown", - }); - } catch (err) { - logger.debug?.(`mattermost: failed to download file ${fileId}: ${String(err)}`); - } - } - return out; - }; - - const sendTypingIndicator = async (channelId: string, parentId?: string) => { - await sendMattermostTyping(client, { channelId, parentId }); - }; - - const resolveChannelInfo = async (channelId: string): Promise => { - const cached = channelCache.get(channelId); - if (cached && cached.expiresAt > Date.now()) { - return cached.value; - } - try { - const info = await fetchMattermostChannel(client, channelId); - channelCache.set(channelId, { - value: info, - expiresAt: Date.now() + CHANNEL_CACHE_TTL_MS, - }); - return info; - } catch (err) { - logger.debug?.(`mattermost: channel lookup failed: ${String(err)}`); - channelCache.set(channelId, { - value: null, - expiresAt: Date.now() + CHANNEL_CACHE_TTL_MS, - }); - return null; - } - }; - - const resolveUserInfo = async (userId: string): Promise => { - const cached = userCache.get(userId); - if (cached && cached.expiresAt > Date.now()) { - return cached.value; - } - try { - const info = await fetchMattermostUser(client, userId); - userCache.set(userId, { - value: info, - expiresAt: Date.now() + USER_CACHE_TTL_MS, - }); - return info; - } catch (err) { - logger.debug?.(`mattermost: user lookup failed: ${String(err)}`); - userCache.set(userId, { - value: null, - expiresAt: Date.now() + USER_CACHE_TTL_MS, - }); - return null; - } - }; - - const buildModelPickerProps = ( - channelId: string, - buttons: Array, - ): Record | undefined => - buildButtonProps({ - callbackUrl, - accountId: account.accountId, - channelId, - buttons, - }); - - const updateModelPickerPost = async (params: { - channelId: string; - postId: string; - message: string; - buttons?: Array; - }): Promise => { - const props = buildModelPickerProps(params.channelId, params.buttons ?? []) ?? { - attachments: [], - }; - await updateMattermostPost(client, params.postId, { - message: params.message, - props, - }); - return {}; - }; + const { + resolveMattermostMedia, + sendTypingIndicator, + resolveChannelInfo, + resolveUserInfo, + updateModelPickerPost, + } = createMattermostMonitorResources({ + accountId: account.accountId, + callbackUrl, + client, + logger: { + debug: (message) => logger.debug?.(String(message)), + }, + mediaMaxBytes, + fetchRemoteMedia: (params) => core.channel.media.fetchRemoteMedia(params), + saveMediaBuffer: (buffer, contentType, direction, maxBytes) => + core.channel.media.saveMediaBuffer(Buffer.from(buffer), contentType, direction, maxBytes), + mediaKindFromMime: (contentType) => core.media.mediaKindFromMime(contentType) as MediaKind, + }); const runModelPickerCommand = async (params: { commandText: string; diff --git a/extensions/telegram/src/bot-handlers.buffers.ts b/extensions/telegram/src/bot-handlers.buffers.ts new file mode 100644 index 00000000000..41dcee18aa4 --- /dev/null +++ b/extensions/telegram/src/bot-handlers.buffers.ts @@ -0,0 +1,373 @@ +import type { Message } from "@grammyjs/types"; +import { shouldDebounceTextInbound } from "openclaw/plugin-sdk/channel-runtime"; +import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime"; +import { + createInboundDebouncer, + resolveInboundDebounceMs, +} from "openclaw/plugin-sdk/reply-runtime"; +import { danger, logVerbose, warn } from "openclaw/plugin-sdk/runtime-env"; +import { + hasInboundMedia, + isRecoverableMediaGroupError, + resolveInboundMediaFileId, +} from "./bot-handlers.media.js"; +import type { TelegramMediaRef } from "./bot-message-context.js"; +import { MEDIA_GROUP_TIMEOUT_MS, type MediaGroupEntry } from "./bot-updates.js"; +import { resolveMedia } from "./bot/delivery.js"; +import type { TelegramContext } from "./bot/types.js"; +import type { TelegramTransport } from "./fetch.js"; + +export type TelegramDebounceLane = "default" | "forward"; + +export type TelegramDebounceEntry = { + ctx: TelegramContext; + msg: Message; + allMedia: TelegramMediaRef[]; + storeAllowFrom: string[]; + debounceKey: string | null; + debounceLane: TelegramDebounceLane; + botUsername?: string; +}; + +export type TextFragmentEntry = { + key: string; + messages: Array<{ msg: Message; ctx: TelegramContext; receivedAtMs: number }>; + timer: ReturnType; +}; + +const DEFAULT_TEXT_FRAGMENT_MAX_GAP_MS = 1500; + +type TelegramBotApi = { + sendMessage: ( + chatId: number | string, + text: string, + params?: { message_thread_id?: number }, + ) => Promise; + getFile: (fileId: string) => Promise<{ file_path?: string }>; +}; + +export function createTelegramInboundBufferRuntime(params: { + accountId?: string | null; + bot: { api: TelegramBotApi }; + cfg: OpenClawConfig; + logger: { warn: (...args: unknown[]) => void }; + mediaMaxBytes: number; + opts: { + token: string; + testTimings?: { + textFragmentGapMs?: number; + mediaGroupFlushMs?: number; + }; + }; + processMessage: ( + ctx: TelegramContext, + media: TelegramMediaRef[], + storeAllowFrom: string[], + metadata?: { messageIdOverride?: string }, + replyMedia?: TelegramMediaRef[], + ) => Promise; + loadStoreAllowFrom: () => Promise; + runtime: { + error?: (message: string) => void; + }; + telegramTransport?: TelegramTransport; +}) { + const { + accountId, + bot, + cfg, + logger, + mediaMaxBytes, + opts, + processMessage, + loadStoreAllowFrom, + runtime, + telegramTransport, + } = params; + const TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS = 4000; + const TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS = + typeof opts.testTimings?.textFragmentGapMs === "number" && + Number.isFinite(opts.testTimings.textFragmentGapMs) + ? Math.max(10, Math.floor(opts.testTimings.textFragmentGapMs)) + : DEFAULT_TEXT_FRAGMENT_MAX_GAP_MS; + const TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP = 1; + const TELEGRAM_TEXT_FRAGMENT_MAX_PARTS = 12; + const TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS = 50_000; + const mediaGroupTimeoutMs = + typeof opts.testTimings?.mediaGroupFlushMs === "number" && + Number.isFinite(opts.testTimings.mediaGroupFlushMs) + ? Math.max(10, Math.floor(opts.testTimings.mediaGroupFlushMs)) + : MEDIA_GROUP_TIMEOUT_MS; + const debounceMs = resolveInboundDebounceMs({ cfg, channel: "telegram" }); + const FORWARD_BURST_DEBOUNCE_MS = 80; + + const mediaGroupBuffer = new Map(); + let mediaGroupProcessing: Promise = Promise.resolve(); + const textFragmentBuffer = new Map(); + let textFragmentProcessing: Promise = Promise.resolve(); + + const resolveTelegramDebounceLane = (msg: Message): TelegramDebounceLane => { + const forwardMeta = msg as { + forward_origin?: unknown; + forward_from?: unknown; + forward_from_chat?: unknown; + forward_sender_name?: unknown; + forward_date?: unknown; + }; + return (forwardMeta.forward_origin ?? + forwardMeta.forward_from ?? + forwardMeta.forward_from_chat ?? + forwardMeta.forward_sender_name ?? + forwardMeta.forward_date) + ? "forward" + : "default"; + }; + + const buildSyntheticTextMessage = (params: { + base: Message; + text: string; + date?: number; + from?: Message["from"]; + }): Message => ({ + ...params.base, + ...(params.from ? { from: params.from } : {}), + text: params.text, + caption: undefined, + caption_entities: undefined, + entities: undefined, + ...(params.date != null ? { date: params.date } : {}), + }); + + const buildSyntheticContext = ( + ctx: Pick & { getFile?: unknown }, + message: Message, + ): TelegramContext => { + const getFile = + typeof ctx.getFile === "function" + ? (ctx.getFile as TelegramContext["getFile"]).bind(ctx as object) + : async () => ({}); + return { message, me: ctx.me, getFile }; + }; + + const resolveReplyMediaForMessage = async ( + ctx: TelegramContext, + msg: Message, + ): Promise => { + const replyMessage = msg.reply_to_message; + if (!replyMessage || !hasInboundMedia(replyMessage)) { + return []; + } + const replyFileId = resolveInboundMediaFileId(replyMessage); + if (!replyFileId) { + return []; + } + try { + const media = await resolveMedia( + { + message: replyMessage, + me: ctx.me, + getFile: async () => await bot.api.getFile(replyFileId), + }, + mediaMaxBytes, + opts.token, + telegramTransport, + ); + if (!media) { + return []; + } + return [ + { + path: media.path, + contentType: media.contentType, + stickerMetadata: media.stickerMetadata, + }, + ]; + } catch (err) { + logger.warn({ chatId: msg.chat.id, error: String(err) }, "reply media fetch failed"); + return []; + } + }; + + const processMediaGroup = async (entry: MediaGroupEntry) => { + try { + entry.messages.sort( + (a: { msg: Message; ctx: TelegramContext }, b: { msg: Message; ctx: TelegramContext }) => + a.msg.message_id - b.msg.message_id, + ); + const captionMsg = entry.messages.find((item) => item.msg.caption || item.msg.text); + const primaryEntry = captionMsg ?? entry.messages[0]; + + const allMedia: TelegramMediaRef[] = []; + for (const { ctx } of entry.messages) { + let media; + try { + media = await resolveMedia(ctx, mediaMaxBytes, opts.token, telegramTransport); + } catch (mediaErr) { + if (!isRecoverableMediaGroupError(mediaErr)) { + throw mediaErr; + } + runtime.error?.( + warn(`media group: skipping photo that failed to fetch: ${String(mediaErr)}`), + ); + continue; + } + if (media) { + allMedia.push({ + path: media.path, + contentType: media.contentType, + stickerMetadata: media.stickerMetadata, + }); + } + } + + const storeAllowFrom = await loadStoreAllowFrom(); + const replyMedia = await resolveReplyMediaForMessage(primaryEntry.ctx, primaryEntry.msg); + await processMessage(primaryEntry.ctx, allMedia, storeAllowFrom, undefined, replyMedia); + } catch (err) { + runtime.error?.(danger(`media group handler failed: ${String(err)}`)); + } + }; + + const flushTextFragments = async (entry: TextFragmentEntry) => { + try { + entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id); + const first = entry.messages[0]; + const last = entry.messages.at(-1); + if (!first || !last) { + return; + } + const combinedText = entry.messages.map((item) => item.msg.text ?? "").join(""); + if (!combinedText.trim()) { + return; + } + const syntheticMessage = buildSyntheticTextMessage({ + base: first.msg, + text: combinedText, + date: last.msg.date ?? first.msg.date, + }); + const storeAllowFrom = await loadStoreAllowFrom(); + await processMessage(buildSyntheticContext(first.ctx, syntheticMessage), [], storeAllowFrom, { + messageIdOverride: String(last.msg.message_id), + }); + } catch (err) { + runtime.error?.(danger(`text fragment handler failed: ${String(err)}`)); + } + }; + + const queueTextFragmentFlush = async (entry: TextFragmentEntry) => { + textFragmentProcessing = textFragmentProcessing + .then(async () => { + await flushTextFragments(entry); + }) + .catch(() => undefined); + await textFragmentProcessing; + }; + + const runTextFragmentFlush = async (entry: TextFragmentEntry) => { + textFragmentBuffer.delete(entry.key); + await queueTextFragmentFlush(entry); + }; + + const scheduleTextFragmentFlush = (entry: TextFragmentEntry) => { + clearTimeout(entry.timer); + entry.timer = setTimeout(async () => { + await runTextFragmentFlush(entry); + }, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS); + }; + + const inboundDebouncer = createInboundDebouncer({ + debounceMs, + resolveDebounceMs: (entry) => + entry.debounceLane === "forward" ? FORWARD_BURST_DEBOUNCE_MS : debounceMs, + buildKey: (entry) => entry.debounceKey, + shouldDebounce: (entry) => { + const text = entry.msg.text ?? entry.msg.caption ?? ""; + const hasDebounceableText = shouldDebounceTextInbound({ + text, + cfg, + commandOptions: { botUsername: entry.botUsername }, + }); + if (entry.debounceLane === "forward") { + return hasDebounceableText || entry.allMedia.length > 0; + } + return hasDebounceableText && entry.allMedia.length === 0; + }, + onFlush: async (entries) => { + const last = entries.at(-1); + if (!last) { + return; + } + if (entries.length === 1) { + const replyMedia = await resolveReplyMediaForMessage(last.ctx, last.msg); + await processMessage(last.ctx, last.allMedia, last.storeAllowFrom, undefined, replyMedia); + return; + } + const combinedText = entries + .map((entry) => entry.msg.text ?? entry.msg.caption ?? "") + .filter(Boolean) + .join("\n"); + const combinedMedia = entries.flatMap((entry) => entry.allMedia); + if (!combinedText.trim() && combinedMedia.length === 0) { + return; + } + const first = entries[0]; + const syntheticMessage = buildSyntheticTextMessage({ + base: first.msg, + text: combinedText, + date: last.msg.date ?? first.msg.date, + }); + const messageIdOverride = last.msg.message_id ? String(last.msg.message_id) : undefined; + const replyMedia = await resolveReplyMediaForMessage(first.ctx, syntheticMessage); + await processMessage( + buildSyntheticContext(first.ctx, syntheticMessage), + combinedMedia, + first.storeAllowFrom, + messageIdOverride ? { messageIdOverride } : undefined, + replyMedia, + ); + }, + onError: (err, items) => { + runtime.error?.(danger(`telegram debounce flush failed: ${String(err)}`)); + const chatId = items[0]?.msg.chat.id; + if (chatId != null) { + const threadId = items[0]?.msg.message_thread_id; + void bot.api + .sendMessage( + chatId, + "Something went wrong while processing your message. Please try again.", + threadId != null ? { message_thread_id: threadId } : undefined, + ) + .catch((sendErr) => { + logVerbose(`telegram: error fallback send failed: ${String(sendErr)}`); + }); + } + }, + }); + + return { + buildSyntheticContext, + buildSyntheticTextMessage, + inboundDebouncer, + mediaGroupBuffer, + mediaGroupProcessing: () => mediaGroupProcessing, + setMediaGroupProcessing: (next: Promise) => { + mediaGroupProcessing = next; + }, + mediaGroupTimeoutMs, + processMediaGroup, + textFragmentBuffer, + textFragmentProcessing: () => textFragmentProcessing, + setTextFragmentProcessing: (next: Promise) => { + textFragmentProcessing = next; + }, + scheduleTextFragmentFlush, + flushTextFragments, + resolveReplyMediaForMessage, + resolveTelegramDebounceLane, + TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS, + TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP, + TELEGRAM_TEXT_FRAGMENT_MAX_PARTS, + TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS, + TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS, + }; +} diff --git a/extensions/telegram/src/bot-handlers.runtime.ts b/extensions/telegram/src/bot-handlers.runtime.ts index 38a8d29dcab..70e60bd30ad 100644 --- a/extensions/telegram/src/bot-handlers.runtime.ts +++ b/extensions/telegram/src/bot-handlers.runtime.ts @@ -1,7 +1,6 @@ import type { Message, ReactionTypeEmoji } from "@grammyjs/types"; import { resolveAgentDir, resolveDefaultAgentId } from "openclaw/plugin-sdk/agent-runtime"; import { resolveDefaultModelForAgent } from "openclaw/plugin-sdk/agent-runtime"; -import { shouldDebounceTextInbound } from "openclaw/plugin-sdk/channel-runtime"; import { resolveChannelConfigWrites } from "openclaw/plugin-sdk/channel-runtime"; import { loadConfig } from "openclaw/plugin-sdk/config-runtime"; import { writeConfigFile } from "openclaw/plugin-sdk/config-runtime"; @@ -26,10 +25,6 @@ import { } from "openclaw/plugin-sdk/conversation-runtime"; import { enqueueSystemEvent } from "openclaw/plugin-sdk/infra-runtime"; import { dispatchPluginInteractiveHandler } from "openclaw/plugin-sdk/plugin-runtime"; -import { - createInboundDebouncer, - resolveInboundDebounceMs, -} from "openclaw/plugin-sdk/reply-runtime"; import { buildCommandsPaginationKeyboard } from "openclaw/plugin-sdk/reply-runtime"; import { buildModelsProviderData, @@ -47,21 +42,19 @@ import { normalizeDmAllowFromWithStore, type NormalizedAllowFrom, } from "./bot-access.js"; +import { + createTelegramInboundBufferRuntime, + type TextFragmentEntry, +} from "./bot-handlers.buffers.js"; import { APPROVE_CALLBACK_DATA_RE, hasInboundMedia, hasReplyTargetMedia, isMediaSizeLimitError, - isRecoverableMediaGroupError, - resolveInboundMediaFileId, } from "./bot-handlers.media.js"; import type { TelegramMediaRef } from "./bot-message-context.js"; import { RegisterTelegramHandlerParams } from "./bot-native-commands.js"; -import { - MEDIA_GROUP_TIMEOUT_MS, - type MediaGroupEntry, - type TelegramUpdateKeyContext, -} from "./bot-updates.js"; +import { type TelegramUpdateKeyContext } from "./bot-updates.js"; import { resolveMedia } from "./bot/delivery.js"; import { getTelegramTextParts, @@ -116,159 +109,41 @@ export const registerTelegramHandlers = ({ processMessage, logger, }: RegisterTelegramHandlerParams) => { - const DEFAULT_TEXT_FRAGMENT_MAX_GAP_MS = 1500; - const TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS = 4000; - const TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS = - typeof opts.testTimings?.textFragmentGapMs === "number" && - Number.isFinite(opts.testTimings.textFragmentGapMs) - ? Math.max(10, Math.floor(opts.testTimings.textFragmentGapMs)) - : DEFAULT_TEXT_FRAGMENT_MAX_GAP_MS; - const TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP = 1; - const TELEGRAM_TEXT_FRAGMENT_MAX_PARTS = 12; - const TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS = 50_000; - const mediaGroupTimeoutMs = - typeof opts.testTimings?.mediaGroupFlushMs === "number" && - Number.isFinite(opts.testTimings.mediaGroupFlushMs) - ? Math.max(10, Math.floor(opts.testTimings.mediaGroupFlushMs)) - : MEDIA_GROUP_TIMEOUT_MS; + const loadStoreAllowFrom = async () => + readChannelAllowFromStore("telegram", process.env, accountId).catch(() => []); - const mediaGroupBuffer = new Map(); - let mediaGroupProcessing: Promise = Promise.resolve(); - - type TextFragmentEntry = { - key: string; - messages: Array<{ msg: Message; ctx: TelegramContext; receivedAtMs: number }>; - timer: ReturnType; - }; - const textFragmentBuffer = new Map(); - let textFragmentProcessing: Promise = Promise.resolve(); - - const debounceMs = resolveInboundDebounceMs({ cfg, channel: "telegram" }); - const FORWARD_BURST_DEBOUNCE_MS = 80; - type TelegramDebounceLane = "default" | "forward"; - type TelegramDebounceEntry = { - ctx: TelegramContext; - msg: Message; - allMedia: TelegramMediaRef[]; - storeAllowFrom: string[]; - debounceKey: string | null; - debounceLane: TelegramDebounceLane; - botUsername?: string; - }; - const resolveTelegramDebounceLane = (msg: Message): TelegramDebounceLane => { - const forwardMeta = msg as { - forward_origin?: unknown; - forward_from?: unknown; - forward_from_chat?: unknown; - forward_sender_name?: unknown; - forward_date?: unknown; - }; - return (forwardMeta.forward_origin ?? - forwardMeta.forward_from ?? - forwardMeta.forward_from_chat ?? - forwardMeta.forward_sender_name ?? - forwardMeta.forward_date) - ? "forward" - : "default"; - }; - const buildSyntheticTextMessage = (params: { - base: Message; - text: string; - date?: number; - from?: Message["from"]; - }): Message => ({ - ...params.base, - ...(params.from ? { from: params.from } : {}), - text: params.text, - caption: undefined, - caption_entities: undefined, - entities: undefined, - ...(params.date != null ? { date: params.date } : {}), - }); - const buildSyntheticContext = ( - ctx: Pick & { getFile?: unknown }, - message: Message, - ): TelegramContext => { - const getFile = - typeof ctx.getFile === "function" - ? (ctx.getFile as TelegramContext["getFile"]).bind(ctx as object) - : async () => ({}); - return { message, me: ctx.me, getFile }; - }; - const inboundDebouncer = createInboundDebouncer({ - debounceMs, - resolveDebounceMs: (entry) => - entry.debounceLane === "forward" ? FORWARD_BURST_DEBOUNCE_MS : debounceMs, - buildKey: (entry) => entry.debounceKey, - shouldDebounce: (entry) => { - const text = entry.msg.text ?? entry.msg.caption ?? ""; - const hasDebounceableText = shouldDebounceTextInbound({ - text, - cfg, - commandOptions: { botUsername: entry.botUsername }, - }); - if (entry.debounceLane === "forward") { - // Forwarded bursts often split text + media into adjacent updates. - // Debounce media-only forward entries too so they can coalesce. - return hasDebounceableText || entry.allMedia.length > 0; - } - if (!hasDebounceableText) { - return false; - } - return entry.allMedia.length === 0; - }, - onFlush: async (entries) => { - const last = entries.at(-1); - if (!last) { - return; - } - if (entries.length === 1) { - const replyMedia = await resolveReplyMediaForMessage(last.ctx, last.msg); - await processMessage(last.ctx, last.allMedia, last.storeAllowFrom, undefined, replyMedia); - return; - } - const combinedText = entries - .map((entry) => entry.msg.text ?? entry.msg.caption ?? "") - .filter(Boolean) - .join("\n"); - const combinedMedia = entries.flatMap((entry) => entry.allMedia); - if (!combinedText.trim() && combinedMedia.length === 0) { - return; - } - const first = entries[0]; - const baseCtx = first.ctx; - const syntheticMessage = buildSyntheticTextMessage({ - base: first.msg, - text: combinedText, - date: last.msg.date ?? first.msg.date, - }); - const messageIdOverride = last.msg.message_id ? String(last.msg.message_id) : undefined; - const syntheticCtx = buildSyntheticContext(baseCtx, syntheticMessage); - const replyMedia = await resolveReplyMediaForMessage(baseCtx, syntheticMessage); - await processMessage( - syntheticCtx, - combinedMedia, - first.storeAllowFrom, - messageIdOverride ? { messageIdOverride } : undefined, - replyMedia, - ); - }, - onError: (err, items) => { - runtime.error?.(danger(`telegram debounce flush failed: ${String(err)}`)); - const chatId = items[0]?.msg.chat.id; - if (chatId != null) { - const threadId = items[0]?.msg.message_thread_id; - void bot.api - .sendMessage( - chatId, - "Something went wrong while processing your message. Please try again.", - threadId != null ? { message_thread_id: threadId } : undefined, - ) - .catch((sendErr) => { - logVerbose(`telegram: error fallback send failed: ${String(sendErr)}`); - }); - } - }, + const { + buildSyntheticContext, + buildSyntheticTextMessage, + inboundDebouncer, + mediaGroupBuffer, + mediaGroupProcessing, + setMediaGroupProcessing, + mediaGroupTimeoutMs, + processMediaGroup, + textFragmentBuffer, + textFragmentProcessing, + setTextFragmentProcessing, + scheduleTextFragmentFlush, + flushTextFragments, + resolveReplyMediaForMessage, + resolveTelegramDebounceLane, + TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS, + TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP, + TELEGRAM_TEXT_FRAGMENT_MAX_PARTS, + TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS, + TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS, + } = createTelegramInboundBufferRuntime({ + accountId, + bot, + cfg, + logger, + mediaMaxBytes, + opts, + processMessage, + loadStoreAllowFrom, + runtime, + telegramTransport, }); const resolveTelegramSessionState = (params: { @@ -352,139 +227,6 @@ export const registerTelegramHandlers = ({ }; }; - const processMediaGroup = async (entry: MediaGroupEntry) => { - try { - entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id); - - const captionMsg = entry.messages.find((m) => m.msg.caption || m.msg.text); - const primaryEntry = captionMsg ?? entry.messages[0]; - - const allMedia: TelegramMediaRef[] = []; - for (const { ctx } of entry.messages) { - let media; - try { - media = await resolveMedia(ctx, mediaMaxBytes, opts.token, telegramTransport); - } catch (mediaErr) { - if (!isRecoverableMediaGroupError(mediaErr)) { - throw mediaErr; - } - runtime.log?.( - warn(`media group: skipping photo that failed to fetch: ${String(mediaErr)}`), - ); - continue; - } - if (media) { - allMedia.push({ - path: media.path, - contentType: media.contentType, - stickerMetadata: media.stickerMetadata, - }); - } - } - - const storeAllowFrom = await loadStoreAllowFrom(); - const replyMedia = await resolveReplyMediaForMessage(primaryEntry.ctx, primaryEntry.msg); - await processMessage(primaryEntry.ctx, allMedia, storeAllowFrom, undefined, replyMedia); - } catch (err) { - runtime.error?.(danger(`media group handler failed: ${String(err)}`)); - } - }; - - const flushTextFragments = async (entry: TextFragmentEntry) => { - try { - entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id); - - const first = entry.messages[0]; - const last = entry.messages.at(-1); - if (!first || !last) { - return; - } - - const combinedText = entry.messages.map((m) => m.msg.text ?? "").join(""); - if (!combinedText.trim()) { - return; - } - - const syntheticMessage = buildSyntheticTextMessage({ - base: first.msg, - text: combinedText, - date: last.msg.date ?? first.msg.date, - }); - - const storeAllowFrom = await loadStoreAllowFrom(); - const baseCtx = first.ctx; - - await processMessage(buildSyntheticContext(baseCtx, syntheticMessage), [], storeAllowFrom, { - messageIdOverride: String(last.msg.message_id), - }); - } catch (err) { - runtime.error?.(danger(`text fragment handler failed: ${String(err)}`)); - } - }; - - const queueTextFragmentFlush = async (entry: TextFragmentEntry) => { - textFragmentProcessing = textFragmentProcessing - .then(async () => { - await flushTextFragments(entry); - }) - .catch(() => undefined); - await textFragmentProcessing; - }; - - const runTextFragmentFlush = async (entry: TextFragmentEntry) => { - textFragmentBuffer.delete(entry.key); - await queueTextFragmentFlush(entry); - }; - - const scheduleTextFragmentFlush = (entry: TextFragmentEntry) => { - clearTimeout(entry.timer); - entry.timer = setTimeout(async () => { - await runTextFragmentFlush(entry); - }, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS); - }; - - const loadStoreAllowFrom = async () => - readChannelAllowFromStore("telegram", process.env, accountId).catch(() => []); - - const resolveReplyMediaForMessage = async ( - ctx: TelegramContext, - msg: Message, - ): Promise => { - const replyMessage = msg.reply_to_message; - if (!replyMessage || !hasInboundMedia(replyMessage)) { - return []; - } - const replyFileId = resolveInboundMediaFileId(replyMessage); - if (!replyFileId) { - return []; - } - try { - const media = await resolveMedia( - { - message: replyMessage, - me: ctx.me, - getFile: async () => await bot.api.getFile(replyFileId), - }, - mediaMaxBytes, - opts.token, - telegramTransport, - ); - if (!media) { - return []; - } - return [ - { - path: media.path, - contentType: media.contentType, - stickerMetadata: media.stickerMetadata, - }, - ]; - } catch (err) { - logger.warn({ chatId: msg.chat.id, error: String(err) }, "reply media fetch failed"); - return []; - } - }; - const isAllowlistAuthorized = ( allow: NormalizedAllowFrom, senderId: string, @@ -921,12 +663,14 @@ export const registerTelegramHandlers = ({ // Not appendable (or limits exceeded): flush buffered entry first, then continue normally. clearTimeout(existing.timer); textFragmentBuffer.delete(key); - textFragmentProcessing = textFragmentProcessing - .then(async () => { - await flushTextFragments(existing); - }) - .catch(() => undefined); - await textFragmentProcessing; + setTextFragmentProcessing( + textFragmentProcessing() + .then(async () => { + await flushTextFragments(existing); + }) + .catch(() => undefined), + ); + await textFragmentProcessing(); } const shouldStart = text.length >= TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS; @@ -951,24 +695,28 @@ export const registerTelegramHandlers = ({ existing.messages.push({ msg, ctx }); existing.timer = setTimeout(async () => { mediaGroupBuffer.delete(mediaGroupId); - mediaGroupProcessing = mediaGroupProcessing - .then(async () => { - await processMediaGroup(existing); - }) - .catch(() => undefined); - await mediaGroupProcessing; + setMediaGroupProcessing( + mediaGroupProcessing() + .then(async () => { + await processMediaGroup(existing); + }) + .catch(() => undefined), + ); + await mediaGroupProcessing(); }, mediaGroupTimeoutMs); } else { - const entry: MediaGroupEntry = { + const entry = { messages: [{ msg, ctx }], timer: setTimeout(async () => { mediaGroupBuffer.delete(mediaGroupId); - mediaGroupProcessing = mediaGroupProcessing - .then(async () => { - await processMediaGroup(entry); - }) - .catch(() => undefined); - await mediaGroupProcessing; + setMediaGroupProcessing( + mediaGroupProcessing() + .then(async () => { + await processMediaGroup(entry); + }) + .catch(() => undefined), + ); + await mediaGroupProcessing(); }, mediaGroupTimeoutMs), }; mediaGroupBuffer.set(mediaGroupId, entry); diff --git a/extensions/telegram/src/monitor.test.ts b/extensions/telegram/src/monitor.test.ts index d75b01c4608..eb979a23884 100644 --- a/extensions/telegram/src/monitor.test.ts +++ b/extensions/telegram/src/monitor.test.ts @@ -1,7 +1,8 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { monitorTelegramProvider } from "./monitor.js"; import { tagTelegramNetworkError } from "./network-errors.js"; +type MonitorTelegramOpts = import("./monitor.js").MonitorTelegramOpts; + type MockCtx = { message: { message_id?: number; @@ -136,6 +137,7 @@ function mockRunOnceAndAbort(abort: AbortController) { } async function expectOffsetConfirmationSkipped(offset: number | null) { + const { monitorTelegramProvider } = await import("./monitor.js"); readTelegramUpdateOffsetSpy.mockResolvedValueOnce(offset); const abort = new AbortController(); api.getUpdates.mockReset(); @@ -149,6 +151,7 @@ async function expectOffsetConfirmationSkipped(offset: number | null) { } async function runMonitorAndCaptureStartupOrder(params?: { persistedOffset?: number | null }) { + const { monitorTelegramProvider } = await import("./monitor.js"); if (params && "persistedOffset" in params) { readTelegramUpdateOffsetSpy.mockResolvedValueOnce(params.persistedOffset ?? null); } @@ -203,9 +206,8 @@ function expectRecoverableRetryState(expectedRunCalls: number) { expect(runSpy).toHaveBeenCalledTimes(expectedRunCalls); } -async function monitorWithAutoAbort( - opts: Omit[0], "abortSignal"> = {}, -) { +async function monitorWithAutoAbort(opts: Omit = {}) { + const { monitorTelegramProvider } = await import("./monitor.js"); const abort = new AbortController(); mockRunOnceAndAbort(abort); await monitorTelegramProvider({ @@ -215,8 +217,8 @@ async function monitorWithAutoAbort( }); } -vi.mock("../../../src/config/config.js", async (importOriginal) => { - const actual = await importOriginal(); +vi.mock("openclaw/plugin-sdk/config-runtime", async (importOriginal) => { + const actual = await importOriginal(); return { ...actual, loadConfig, @@ -260,14 +262,22 @@ vi.mock("@grammyjs/runner", () => ({ run: runSpy, })); -vi.mock("../../../src/infra/backoff.js", () => ({ - computeBackoff, - sleepWithAbort, -})); +vi.mock("openclaw/plugin-sdk/infra-runtime", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + computeBackoff, + sleepWithAbort, + }; +}); -vi.mock("../../../src/infra/unhandled-rejections.js", () => ({ - registerUnhandledRejectionHandler: registerUnhandledRejectionHandlerMock, -})); +vi.mock("openclaw/plugin-sdk/runtime-env", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + registerUnhandledRejectionHandler: registerUnhandledRejectionHandlerMock, + }; +}); vi.mock("./webhook.js", () => ({ startTelegramWebhook: startTelegramWebhookSpy, @@ -282,6 +292,16 @@ vi.mock("./update-offset-store.js", () => ({ writeTelegramUpdateOffset: vi.fn(async () => undefined), })); +vi.mock("openclaw/plugin-sdk/reply-runtime", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + getReplyFromConfig: async (ctx: { Body?: string }) => ({ + text: `echo:${ctx.Body}`, + }), + }; +}); + vi.mock("../../../src/auto-reply/reply.js", () => ({ getReplyFromConfig: async (ctx: { Body?: string }) => ({ text: `echo:${ctx.Body}`, @@ -292,6 +312,7 @@ describe("monitorTelegramProvider (grammY)", () => { let consoleErrorSpy: { mockRestore: () => void } | undefined; beforeEach(() => { + vi.resetModules(); loadConfig.mockReturnValue({ agents: { defaults: { maxConcurrent: 2 } }, channels: { telegram: {} }, @@ -387,6 +408,7 @@ describe("monitorTelegramProvider (grammY)", () => { }); it("retries on recoverable undici fetch errors", async () => { + const { monitorTelegramProvider } = await import("./monitor.js"); const abort = new AbortController(); const networkError = makeRecoverableFetchError(); runSpy @@ -410,6 +432,7 @@ describe("monitorTelegramProvider (grammY)", () => { }); it("retries recoverable deleteWebhook failures before polling", async () => { + const { monitorTelegramProvider } = await import("./monitor.js"); const abort = new AbortController(); const cleanupError = makeRecoverableFetchError(); api.deleteWebhook.mockReset(); @@ -423,6 +446,7 @@ describe("monitorTelegramProvider (grammY)", () => { }); it("retries setup-time recoverable errors before starting polling", async () => { + const { monitorTelegramProvider } = await import("./monitor.js"); const abort = new AbortController(); const setupError = makeRecoverableFetchError(); createTelegramBotErrors.push(setupError); @@ -436,6 +460,7 @@ describe("monitorTelegramProvider (grammY)", () => { }); it("awaits runner.stop before retrying after recoverable polling error", async () => { + const { monitorTelegramProvider } = await import("./monitor.js"); const abort = new AbortController(); const recoverableError = makeRecoverableFetchError(); let firstStopped = false; @@ -463,6 +488,7 @@ describe("monitorTelegramProvider (grammY)", () => { }); it("stops bot instance when polling cycle exits", async () => { + const { monitorTelegramProvider } = await import("./monitor.js"); const abort = new AbortController(); mockRunOnceAndAbort(abort); @@ -473,6 +499,7 @@ describe("monitorTelegramProvider (grammY)", () => { }); it("clears bounded cleanup timers after a clean stop", async () => { + const { monitorTelegramProvider } = await import("./monitor.js"); vi.useFakeTimers(); try { const abort = new AbortController(); @@ -487,6 +514,7 @@ describe("monitorTelegramProvider (grammY)", () => { }); it("surfaces non-recoverable errors", async () => { + const { monitorTelegramProvider } = await import("./monitor.js"); runSpy.mockImplementationOnce(() => makeRunnerStub({ task: () => Promise.reject(new Error("bad token")), @@ -497,6 +525,7 @@ describe("monitorTelegramProvider (grammY)", () => { }); it("force-restarts polling when unhandled network rejection stalls runner", async () => { + const { monitorTelegramProvider } = await import("./monitor.js"); const abort = new AbortController(); const { stop } = mockRunOnceWithStalledPollingRunner(); mockRunOnceAndAbort(abort); @@ -504,7 +533,7 @@ describe("monitorTelegramProvider (grammY)", () => { const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); await vi.waitFor(() => expect(runSpy).toHaveBeenCalledTimes(1)); - expect(emitUnhandledRejection(makeTaggedPollingFetchError())).toBe(true); + emitUnhandledRejection(makeTaggedPollingFetchError()); await monitor; expect(stop.mock.calls.length).toBeGreaterThanOrEqual(1); @@ -514,6 +543,7 @@ describe("monitorTelegramProvider (grammY)", () => { }); it("reuses the resolved transport across polling restarts", async () => { + const { monitorTelegramProvider } = await import("./monitor.js"); vi.useFakeTimers({ shouldAdvanceTime: true }); try { const telegramTransport = { @@ -542,6 +572,7 @@ describe("monitorTelegramProvider (grammY)", () => { }); it("aborts the active Telegram fetch when unhandled network rejection forces restart", async () => { + const { monitorTelegramProvider } = await import("./monitor.js"); const abort = new AbortController(); const { stop } = mockRunOnceWithStalledPollingRunner(); mockRunOnceAndAbort(abort); @@ -552,7 +583,7 @@ describe("monitorTelegramProvider (grammY)", () => { expect(firstSignal).toBeInstanceOf(AbortSignal); expect((firstSignal as AbortSignal).aborted).toBe(false); - expect(emitUnhandledRejection(makeTaggedPollingFetchError())).toBe(true); + emitUnhandledRejection(makeTaggedPollingFetchError()); await monitor; expect((firstSignal as AbortSignal).aborted).toBe(true); @@ -560,6 +591,7 @@ describe("monitorTelegramProvider (grammY)", () => { }); it("ignores unrelated process-level network errors while telegram polling is active", async () => { + const { monitorTelegramProvider } = await import("./monitor.js"); const abort = new AbortController(); const { stop } = mockRunOnceWithStalledPollingRunner(); @@ -585,6 +617,7 @@ describe("monitorTelegramProvider (grammY)", () => { }); it("passes configured webhookHost to webhook listener", async () => { + const { monitorTelegramProvider } = await import("./monitor.js"); await monitorTelegramProvider({ token: "tok", useWebhook: true, @@ -609,6 +642,7 @@ describe("monitorTelegramProvider (grammY)", () => { }); it("webhook mode waits for abort signal before returning", async () => { + const { monitorTelegramProvider } = await import("./monitor.js"); const abort = new AbortController(); const settled = vi.fn(); const monitor = monitorTelegramProvider({ @@ -628,6 +662,7 @@ describe("monitorTelegramProvider (grammY)", () => { }); it("force-restarts polling when getUpdates stalls (watchdog)", async () => { + const { monitorTelegramProvider } = await import("./monitor.js"); vi.useFakeTimers({ shouldAdvanceTime: true }); const abort = new AbortController(); const { stop } = mockRunOnceWithStalledPollingRunner(); @@ -668,6 +703,7 @@ describe("monitorTelegramProvider (grammY)", () => { }); it("resets webhookCleared latch on 409 conflict so deleteWebhook re-runs", async () => { + const { monitorTelegramProvider } = await import("./monitor.js"); const abort = new AbortController(); api.deleteWebhook.mockReset(); api.deleteWebhook.mockResolvedValue(true); @@ -706,6 +742,7 @@ describe("monitorTelegramProvider (grammY)", () => { }); it("falls back to configured webhookSecret when not passed explicitly", async () => { + const { monitorTelegramProvider } = await import("./monitor.js"); await monitorTelegramProvider({ token: "tok", useWebhook: true, diff --git a/extensions/tlon/src/monitor/approval-runtime.ts b/extensions/tlon/src/monitor/approval-runtime.ts new file mode 100644 index 00000000000..292b6cc6faa --- /dev/null +++ b/extensions/tlon/src/monitor/approval-runtime.ts @@ -0,0 +1,362 @@ +import type { RuntimeEnv } from "../../api.js"; +import type { PendingApproval, TlonSettingsStore } from "../settings.js"; +import { normalizeShip } from "../targets.js"; +import { sendDm } from "../urbit/send.js"; +import type { UrbitSSEClient } from "../urbit/sse-client.js"; +import { + findPendingApproval, + formatApprovalConfirmation, + formatApprovalRequest, + formatBlockedList, + formatPendingList, + parseAdminCommand, + parseApprovalResponse, + removePendingApproval, +} from "./approval.js"; + +type TlonApprovalApi = Pick; + +type ApprovedMessageProcessor = (approval: PendingApproval) => Promise; + +export function createTlonApprovalRuntime(params: { + api: TlonApprovalApi; + runtime: RuntimeEnv; + botShipName: string; + getPendingApprovals: () => PendingApproval[]; + setPendingApprovals: (approvals: PendingApproval[]) => void; + getCurrentSettings: () => TlonSettingsStore; + setCurrentSettings: (settings: TlonSettingsStore) => void; + getEffectiveDmAllowlist: () => string[]; + setEffectiveDmAllowlist: (ships: string[]) => void; + getEffectiveOwnerShip: () => string | null; + processApprovedMessage: ApprovedMessageProcessor; + refreshWatchedChannels: () => Promise; +}) { + const { + api, + runtime, + botShipName, + getPendingApprovals, + setPendingApprovals, + getCurrentSettings, + setCurrentSettings, + getEffectiveDmAllowlist, + setEffectiveDmAllowlist, + getEffectiveOwnerShip, + processApprovedMessage, + refreshWatchedChannels, + } = params; + + const savePendingApprovals = async (): Promise => { + try { + await api.poke({ + app: "settings", + mark: "settings-event", + json: { + "put-entry": { + desk: "moltbot", + "bucket-key": "tlon", + "entry-key": "pendingApprovals", + value: JSON.stringify(getPendingApprovals()), + }, + }, + }); + } catch (err) { + runtime.error?.(`[tlon] Failed to save pending approvals: ${String(err)}`); + } + }; + + const addToDmAllowlist = async (ship: string): Promise => { + const normalizedShip = normalizeShip(ship); + const nextAllowlist = getEffectiveDmAllowlist().includes(normalizedShip) + ? getEffectiveDmAllowlist() + : [...getEffectiveDmAllowlist(), normalizedShip]; + setEffectiveDmAllowlist(nextAllowlist); + try { + await api.poke({ + app: "settings", + mark: "settings-event", + json: { + "put-entry": { + desk: "moltbot", + "bucket-key": "tlon", + "entry-key": "dmAllowlist", + value: nextAllowlist, + }, + }, + }); + runtime.log?.(`[tlon] Added ${normalizedShip} to dmAllowlist`); + } catch (err) { + runtime.error?.(`[tlon] Failed to update dmAllowlist: ${String(err)}`); + } + }; + + const addToChannelAllowlist = async (ship: string, channelNest: string): Promise => { + const normalizedShip = normalizeShip(ship); + const currentSettings = getCurrentSettings(); + const channelRules = currentSettings.channelRules ?? {}; + const rule = channelRules[channelNest] ?? { mode: "restricted", allowedShips: [] }; + const allowedShips = [...(rule.allowedShips ?? [])]; + + if (!allowedShips.includes(normalizedShip)) { + allowedShips.push(normalizedShip); + } + + const updatedRules = { + ...channelRules, + [channelNest]: { ...rule, allowedShips }, + }; + setCurrentSettings({ ...currentSettings, channelRules: updatedRules }); + + try { + await api.poke({ + app: "settings", + mark: "settings-event", + json: { + "put-entry": { + desk: "moltbot", + "bucket-key": "tlon", + "entry-key": "channelRules", + value: JSON.stringify(updatedRules), + }, + }, + }); + runtime.log?.(`[tlon] Added ${normalizedShip} to ${channelNest} allowlist`); + } catch (err) { + runtime.error?.(`[tlon] Failed to update channelRules: ${String(err)}`); + } + }; + + const blockShip = async (ship: string): Promise => { + const normalizedShip = normalizeShip(ship); + try { + await api.poke({ + app: "chat", + mark: "chat-block-ship", + json: { ship: normalizedShip }, + }); + runtime.log?.(`[tlon] Blocked ship ${normalizedShip}`); + } catch (err) { + runtime.error?.(`[tlon] Failed to block ship ${normalizedShip}: ${String(err)}`); + } + }; + + const isShipBlocked = async (ship: string): Promise => { + const normalizedShip = normalizeShip(ship); + try { + const blocked = (await api.scry("/chat/blocked.json")) as string[] | undefined; + return ( + Array.isArray(blocked) && blocked.some((item) => normalizeShip(item) === normalizedShip) + ); + } catch (err) { + runtime.log?.(`[tlon] Failed to check blocked list: ${String(err)}`); + return false; + } + }; + + const getBlockedShips = async (): Promise => { + try { + const blocked = (await api.scry("/chat/blocked.json")) as string[] | undefined; + return Array.isArray(blocked) ? blocked : []; + } catch (err) { + runtime.log?.(`[tlon] Failed to get blocked list: ${String(err)}`); + return []; + } + }; + + const unblockShip = async (ship: string): Promise => { + const normalizedShip = normalizeShip(ship); + try { + await api.poke({ + app: "chat", + mark: "chat-unblock-ship", + json: { ship: normalizedShip }, + }); + runtime.log?.(`[tlon] Unblocked ship ${normalizedShip}`); + return true; + } catch (err) { + runtime.error?.(`[tlon] Failed to unblock ship ${normalizedShip}: ${String(err)}`); + return false; + } + }; + + const sendOwnerNotification = async (message: string): Promise => { + const ownerShip = getEffectiveOwnerShip(); + if (!ownerShip) { + runtime.log?.("[tlon] No ownerShip configured, cannot send notification"); + return; + } + try { + await sendDm({ + api, + fromShip: botShipName, + toShip: ownerShip, + text: message, + }); + runtime.log?.(`[tlon] Sent notification to owner ${ownerShip}`); + } catch (err) { + runtime.error?.(`[tlon] Failed to send notification to owner: ${String(err)}`); + } + }; + + const queueApprovalRequest = async (approval: PendingApproval): Promise => { + if (await isShipBlocked(approval.requestingShip)) { + runtime.log?.(`[tlon] Ignoring request from blocked ship ${approval.requestingShip}`); + return; + } + + const approvals = getPendingApprovals(); + const existingIndex = approvals.findIndex( + (item) => + item.type === approval.type && + item.requestingShip === approval.requestingShip && + (approval.type !== "channel" || item.channelNest === approval.channelNest) && + (approval.type !== "group" || item.groupFlag === approval.groupFlag), + ); + + if (existingIndex !== -1) { + const existing = approvals[existingIndex]; + if (approval.originalMessage) { + existing.originalMessage = approval.originalMessage; + existing.messagePreview = approval.messagePreview; + } + runtime.log?.( + `[tlon] Updated existing approval for ${approval.requestingShip} (${approval.type}) - re-sending notification`, + ); + await savePendingApprovals(); + await sendOwnerNotification(formatApprovalRequest(existing)); + return; + } + + setPendingApprovals([...approvals, approval]); + await savePendingApprovals(); + await sendOwnerNotification(formatApprovalRequest(approval)); + runtime.log?.( + `[tlon] Queued approval request: ${approval.id} (${approval.type} from ${approval.requestingShip})`, + ); + }; + + const handleApprovalResponse = async (text: string): Promise => { + const parsed = parseApprovalResponse(text); + if (!parsed) { + return false; + } + + const approval = findPendingApproval(getPendingApprovals(), parsed.id); + if (!approval) { + await sendOwnerNotification( + `No pending approval found${parsed.id ? ` for ID: ${parsed.id}` : ""}`, + ); + return true; + } + + if (parsed.action === "approve") { + switch (approval.type) { + case "dm": + await addToDmAllowlist(approval.requestingShip); + if (approval.originalMessage) { + runtime.log?.( + `[tlon] Processing original message from ${approval.requestingShip} after approval`, + ); + await processApprovedMessage(approval); + } + break; + case "channel": + if (approval.channelNest) { + await addToChannelAllowlist(approval.requestingShip, approval.channelNest); + if (approval.originalMessage) { + runtime.log?.( + `[tlon] Processing original message from ${approval.requestingShip} in ${approval.channelNest} after approval`, + ); + await processApprovedMessage(approval); + } + } + break; + case "group": + if (approval.groupFlag) { + try { + await api.poke({ + app: "groups", + mark: "group-join", + json: { + flag: approval.groupFlag, + "join-all": true, + }, + }); + runtime.log?.(`[tlon] Joined group ${approval.groupFlag} after approval`); + setTimeout(() => { + void (async () => { + try { + const newCount = await refreshWatchedChannels(); + if (newCount > 0) { + runtime.log?.( + `[tlon] Discovered ${newCount} new channel(s) after joining group`, + ); + } + } catch (err) { + runtime.log?.( + `[tlon] Channel discovery after group join failed: ${String(err)}`, + ); + } + })(); + }, 2000); + } catch (err) { + runtime.error?.(`[tlon] Failed to join group ${approval.groupFlag}: ${String(err)}`); + } + } + break; + } + + await sendOwnerNotification(formatApprovalConfirmation(approval, "approve")); + } else if (parsed.action === "block") { + await blockShip(approval.requestingShip); + await sendOwnerNotification(formatApprovalConfirmation(approval, "block")); + } else { + await sendOwnerNotification(formatApprovalConfirmation(approval, "deny")); + } + + setPendingApprovals(removePendingApproval(getPendingApprovals(), approval.id)); + await savePendingApprovals(); + return true; + }; + + const handleAdminCommand = async (text: string): Promise => { + const command = parseAdminCommand(text); + if (!command) { + return false; + } + + switch (command.type) { + case "blocked": { + const blockedShips = await getBlockedShips(); + await sendOwnerNotification(formatBlockedList(blockedShips)); + runtime.log?.(`[tlon] Owner requested blocked ships list (${blockedShips.length} ships)`); + return true; + } + case "pending": + await sendOwnerNotification(formatPendingList(getPendingApprovals())); + runtime.log?.( + `[tlon] Owner requested pending approvals list (${getPendingApprovals().length} pending)`, + ); + return true; + case "unblock": { + const shipToUnblock = command.ship; + if (!(await isShipBlocked(shipToUnblock))) { + await sendOwnerNotification(`${shipToUnblock} is not blocked.`); + return true; + } + const success = await unblockShip(shipToUnblock); + await sendOwnerNotification( + success ? `Unblocked ${shipToUnblock}.` : `Failed to unblock ${shipToUnblock}.`, + ); + return true; + } + } + }; + + return { + queueApprovalRequest, + handleApprovalResponse, + handleAdminCommand, + }; +} diff --git a/extensions/tlon/src/monitor/cites.ts b/extensions/tlon/src/monitor/cites.ts new file mode 100644 index 00000000000..8e60a1fb5ef --- /dev/null +++ b/extensions/tlon/src/monitor/cites.ts @@ -0,0 +1,53 @@ +import type { RuntimeEnv } from "../../api.js"; +import { extractCites, extractMessageText, type ParsedCite } from "./utils.js"; + +type TlonScryApi = { + scry: (path: string) => Promise; +}; + +export function createTlonCitationResolver(params: { api: TlonScryApi; runtime: RuntimeEnv }) { + const { api, runtime } = params; + + const resolveCiteContent = async (cite: ParsedCite): Promise => { + if (cite.type !== "chan" || !cite.nest || !cite.postId) { + return null; + } + + try { + const scryPath = `/channels/v4/${cite.nest}/posts/post/${cite.postId}.json`; + runtime.log?.(`[tlon] Fetching cited post: ${scryPath}`); + + const data: any = await api.scry(scryPath); + if (data?.essay?.content) { + return extractMessageText(data.essay.content) || null; + } + + return null; + } catch (err) { + runtime.log?.(`[tlon] Failed to fetch cited post: ${String(err)}`); + return null; + } + }; + + const resolveAllCites = async (content: unknown): Promise => { + const cites = extractCites(content); + if (cites.length === 0) { + return ""; + } + + const resolved: string[] = []; + for (const cite of cites) { + const text = await resolveCiteContent(cite); + if (text) { + resolved.push(`> ${cite.author || "unknown"} wrote: ${text}`); + } + } + + return resolved.length > 0 ? `${resolved.join("\n")}\n\n` : ""; + }; + + return { + resolveCiteContent, + resolveAllCites, + }; +} diff --git a/extensions/tlon/src/monitor/index.ts b/extensions/tlon/src/monitor/index.ts index 4f79a536cd8..1b340a1c1dc 100644 --- a/extensions/tlon/src/monitor/index.ts +++ b/extensions/tlon/src/monitor/index.ts @@ -9,22 +9,16 @@ import { ssrfPolicyFromAllowPrivateNetwork } from "../urbit/context.js"; import type { Foreigns, DmInvite } from "../urbit/foreigns.js"; import { sendDm, sendGroupMessage } from "../urbit/send.js"; import { UrbitSSEClient } from "../urbit/sse-client.js"; +import { createTlonApprovalRuntime } from "./approval-runtime.js"; import { type PendingApproval, type AdminCommand, createPendingApproval, - formatApprovalRequest, - formatApprovalConfirmation, - parseApprovalResponse, isApprovalResponse, - findPendingApproval, - removePendingApproval, - parseAdminCommand, isAdminCommand, - formatBlockedList, - formatPendingList, } from "./approval.js"; import { resolveChannelAuthorization } from "./authorization.js"; +import { createTlonCitationResolver } from "./cites.js"; import { fetchAllChannels, fetchInitData } from "./discovery.js"; import { cacheMessage, getChannelHistory, fetchThreadHistory } from "./history.js"; import { downloadMessageImages } from "./media.js"; @@ -270,412 +264,6 @@ export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise { - if (cite.type !== "chan" || !cite.nest || !cite.postId) { - return null; - } - - try { - // Scry for the specific post: /v4/{nest}/posts/post/{postId} - const scryPath = `/channels/v4/${cite.nest}/posts/post/${cite.postId}.json`; - runtime.log?.(`[tlon] Fetching cited post: ${scryPath}`); - - const data: any = await api!.scry(scryPath); - - // Extract text from the post's essay content - if (data?.essay?.content) { - const text = extractMessageText(data.essay.content); - return text || null; - } - - return null; - } catch (err) { - runtime.log?.(`[tlon] Failed to fetch cited post: ${String(err)}`); - return null; - } - } - - // Resolve all cites in message content and return quoted text - async function resolveAllCites(content: unknown): Promise { - const cites = extractCites(content); - if (cites.length === 0) { - return ""; - } - - const resolved: string[] = []; - for (const cite of cites) { - const text = await resolveCiteContent(cite); - if (text) { - const author = cite.author || "unknown"; - resolved.push(`> ${author} wrote: ${text}`); - } - } - - return resolved.length > 0 ? resolved.join("\n") + "\n\n" : ""; - } - - // Helper to save pending approvals to settings store - async function savePendingApprovals(): Promise { - try { - await api!.poke({ - app: "settings", - mark: "settings-event", - json: { - "put-entry": { - desk: "moltbot", - "bucket-key": "tlon", - "entry-key": "pendingApprovals", - value: JSON.stringify(pendingApprovals), - }, - }, - }); - } catch (err) { - runtime.error?.(`[tlon] Failed to save pending approvals: ${String(err)}`); - } - } - - // Helper to update dmAllowlist in settings store - async function addToDmAllowlist(ship: string): Promise { - const normalizedShip = normalizeShip(ship); - if (!effectiveDmAllowlist.includes(normalizedShip)) { - effectiveDmAllowlist = [...effectiveDmAllowlist, normalizedShip]; - } - try { - await api!.poke({ - app: "settings", - mark: "settings-event", - json: { - "put-entry": { - desk: "moltbot", - "bucket-key": "tlon", - "entry-key": "dmAllowlist", - value: effectiveDmAllowlist, - }, - }, - }); - runtime.log?.(`[tlon] Added ${normalizedShip} to dmAllowlist`); - } catch (err) { - runtime.error?.(`[tlon] Failed to update dmAllowlist: ${String(err)}`); - } - } - - // Helper to update channelRules in settings store - async function addToChannelAllowlist(ship: string, channelNest: string): Promise { - const normalizedShip = normalizeShip(ship); - const channelRules = currentSettings.channelRules ?? {}; - const rule = channelRules[channelNest] ?? { mode: "restricted", allowedShips: [] }; - const allowedShips = [...(rule.allowedShips ?? [])]; // Clone to avoid mutation - - if (!allowedShips.includes(normalizedShip)) { - allowedShips.push(normalizedShip); - } - - const updatedRules = { - ...channelRules, - [channelNest]: { ...rule, allowedShips }, - }; - - // Update local state immediately (don't wait for settings subscription) - currentSettings = { ...currentSettings, channelRules: updatedRules }; - - try { - await api!.poke({ - app: "settings", - mark: "settings-event", - json: { - "put-entry": { - desk: "moltbot", - "bucket-key": "tlon", - "entry-key": "channelRules", - value: JSON.stringify(updatedRules), - }, - }, - }); - runtime.log?.(`[tlon] Added ${normalizedShip} to ${channelNest} allowlist`); - } catch (err) { - runtime.error?.(`[tlon] Failed to update channelRules: ${String(err)}`); - } - } - - // Helper to block a ship using Tlon's native blocking - async function blockShip(ship: string): Promise { - const normalizedShip = normalizeShip(ship); - try { - await api!.poke({ - app: "chat", - mark: "chat-block-ship", - json: { ship: normalizedShip }, - }); - runtime.log?.(`[tlon] Blocked ship ${normalizedShip}`); - } catch (err) { - runtime.error?.(`[tlon] Failed to block ship ${normalizedShip}: ${String(err)}`); - } - } - - // Check if a ship is blocked using Tlon's native block list - async function isShipBlocked(ship: string): Promise { - const normalizedShip = normalizeShip(ship); - try { - const blocked = (await api!.scry("/chat/blocked.json")) as string[] | undefined; - return Array.isArray(blocked) && blocked.some((s) => normalizeShip(s) === normalizedShip); - } catch (err) { - runtime.log?.(`[tlon] Failed to check blocked list: ${String(err)}`); - return false; - } - } - - // Get all blocked ships - async function getBlockedShips(): Promise { - try { - const blocked = (await api!.scry("/chat/blocked.json")) as string[] | undefined; - return Array.isArray(blocked) ? blocked : []; - } catch (err) { - runtime.log?.(`[tlon] Failed to get blocked list: ${String(err)}`); - return []; - } - } - - // Helper to unblock a ship using Tlon's native blocking - async function unblockShip(ship: string): Promise { - const normalizedShip = normalizeShip(ship); - try { - await api!.poke({ - app: "chat", - mark: "chat-unblock-ship", - json: { ship: normalizedShip }, - }); - runtime.log?.(`[tlon] Unblocked ship ${normalizedShip}`); - return true; - } catch (err) { - runtime.error?.(`[tlon] Failed to unblock ship ${normalizedShip}: ${String(err)}`); - return false; - } - } - - // Helper to send DM notification to owner - async function sendOwnerNotification(message: string): Promise { - if (!effectiveOwnerShip) { - runtime.log?.("[tlon] No ownerShip configured, cannot send notification"); - return; - } - try { - await sendDm({ - api: api!, - fromShip: botShipName, - toShip: effectiveOwnerShip, - text: message, - }); - runtime.log?.(`[tlon] Sent notification to owner ${effectiveOwnerShip}`); - } catch (err) { - runtime.error?.(`[tlon] Failed to send notification to owner: ${String(err)}`); - } - } - - // Queue a new approval request and notify the owner - async function queueApprovalRequest(approval: PendingApproval): Promise { - // Check if ship is blocked - silently ignore - if (await isShipBlocked(approval.requestingShip)) { - runtime.log?.(`[tlon] Ignoring request from blocked ship ${approval.requestingShip}`); - return; - } - - // Check for duplicate - if found, update it with new content and re-notify - const existingIndex = pendingApprovals.findIndex( - (a) => - a.type === approval.type && - a.requestingShip === approval.requestingShip && - (approval.type !== "channel" || a.channelNest === approval.channelNest) && - (approval.type !== "group" || a.groupFlag === approval.groupFlag), - ); - - if (existingIndex !== -1) { - // Update existing approval with new content (preserves the original ID) - const existing = pendingApprovals[existingIndex]; - if (approval.originalMessage) { - existing.originalMessage = approval.originalMessage; - existing.messagePreview = approval.messagePreview; - } - runtime.log?.( - `[tlon] Updated existing approval for ${approval.requestingShip} (${approval.type}) - re-sending notification`, - ); - await savePendingApprovals(); - const message = formatApprovalRequest(existing); - await sendOwnerNotification(message); - return; - } - - pendingApprovals.push(approval); - await savePendingApprovals(); - - const message = formatApprovalRequest(approval); - await sendOwnerNotification(message); - runtime.log?.( - `[tlon] Queued approval request: ${approval.id} (${approval.type} from ${approval.requestingShip})`, - ); - } - - // Process the owner's approval response - async function handleApprovalResponse(text: string): Promise { - const parsed = parseApprovalResponse(text); - if (!parsed) { - return false; - } - - const approval = findPendingApproval(pendingApprovals, parsed.id); - if (!approval) { - await sendOwnerNotification( - "No pending approval found" + (parsed.id ? ` for ID: ${parsed.id}` : ""), - ); - return true; // Still consumed the message - } - - if (parsed.action === "approve") { - switch (approval.type) { - case "dm": - await addToDmAllowlist(approval.requestingShip); - // Process the original message if available - if (approval.originalMessage) { - runtime.log?.( - `[tlon] Processing original message from ${approval.requestingShip} after approval`, - ); - await processMessage({ - messageId: approval.originalMessage.messageId, - senderShip: approval.requestingShip, - messageText: approval.originalMessage.messageText, - messageContent: approval.originalMessage.messageContent, - isGroup: false, - timestamp: approval.originalMessage.timestamp, - }); - } - break; - - case "channel": - if (approval.channelNest) { - await addToChannelAllowlist(approval.requestingShip, approval.channelNest); - // Process the original message if available - if (approval.originalMessage) { - const parsed = parseChannelNest(approval.channelNest); - runtime.log?.( - `[tlon] Processing original message from ${approval.requestingShip} in ${approval.channelNest} after approval`, - ); - await processMessage({ - messageId: approval.originalMessage.messageId, - senderShip: approval.requestingShip, - messageText: approval.originalMessage.messageText, - messageContent: approval.originalMessage.messageContent, - isGroup: true, - channelNest: approval.channelNest, - hostShip: parsed?.hostShip, - channelName: parsed?.channelName, - timestamp: approval.originalMessage.timestamp, - parentId: approval.originalMessage.parentId, - isThreadReply: approval.originalMessage.isThreadReply, - }); - } - } - break; - - case "group": - // Accept the group invite (don't add to allowlist - each invite requires approval) - if (approval.groupFlag) { - try { - await api!.poke({ - app: "groups", - mark: "group-join", - json: { - flag: approval.groupFlag, - "join-all": true, - }, - }); - runtime.log?.(`[tlon] Joined group ${approval.groupFlag} after approval`); - - // Immediately discover channels from the newly joined group - // Small delay to allow the join to propagate - setTimeout(async () => { - try { - const discoveredChannels = await fetchAllChannels(api!, runtime); - let newCount = 0; - for (const channelNest of discoveredChannels) { - if (!watchedChannels.has(channelNest)) { - watchedChannels.add(channelNest); - newCount++; - } - } - if (newCount > 0) { - runtime.log?.( - `[tlon] Discovered ${newCount} new channel(s) after joining group`, - ); - } - } catch (err) { - runtime.log?.(`[tlon] Channel discovery after group join failed: ${String(err)}`); - } - }, 2000); - } catch (err) { - runtime.error?.(`[tlon] Failed to join group ${approval.groupFlag}: ${String(err)}`); - } - } - break; - } - - await sendOwnerNotification(formatApprovalConfirmation(approval, "approve")); - } else if (parsed.action === "block") { - // Block the ship using Tlon's native blocking - await blockShip(approval.requestingShip); - await sendOwnerNotification(formatApprovalConfirmation(approval, "block")); - } else { - // Denied - just remove from pending, no notification to requester - await sendOwnerNotification(formatApprovalConfirmation(approval, "deny")); - } - - // Remove from pending - pendingApprovals = removePendingApproval(pendingApprovals, approval.id); - await savePendingApprovals(); - - return true; - } - - // Handle admin commands from owner (unblock, blocked, pending) - async function handleAdminCommand(text: string): Promise { - const command = parseAdminCommand(text); - if (!command) { - return false; - } - - switch (command.type) { - case "blocked": { - const blockedShips = await getBlockedShips(); - await sendOwnerNotification(formatBlockedList(blockedShips)); - runtime.log?.(`[tlon] Owner requested blocked ships list (${blockedShips.length} ships)`); - return true; - } - - case "pending": { - await sendOwnerNotification(formatPendingList(pendingApprovals)); - runtime.log?.( - `[tlon] Owner requested pending approvals list (${pendingApprovals.length} pending)`, - ); - return true; - } - - case "unblock": { - const shipToUnblock = command.ship; - const isBlocked = await isShipBlocked(shipToUnblock); - if (!isBlocked) { - await sendOwnerNotification(`${shipToUnblock} is not blocked.`); - return true; - } - const success = await unblockShip(shipToUnblock); - if (success) { - await sendOwnerNotification(`Unblocked ${shipToUnblock}.`); - } else { - await sendOwnerNotification(`Failed to unblock ${shipToUnblock}.`); - } - return true; - } - } - } - // Check if a ship is the owner (always allowed to DM) function isOwner(ship: string): boolean { if (!effectiveOwnerShip) { @@ -1026,6 +614,79 @@ export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise(groupChannels); const _watchedDMs = new Set(); + const refreshWatchedChannels = async (): Promise => { + const discoveredChannels = await fetchAllChannels(api!, runtime); + let newCount = 0; + for (const channelNest of discoveredChannels) { + if (!watchedChannels.has(channelNest)) { + watchedChannels.add(channelNest); + newCount++; + } + } + return newCount; + }; + + const { resolveAllCites } = createTlonCitationResolver({ + api: { scry: (path) => api!.scry(path) }, + runtime, + }); + + const { queueApprovalRequest, handleApprovalResponse, handleAdminCommand } = + createTlonApprovalRuntime({ + api: { + poke: (payload) => api!.poke(payload), + scry: (path) => api!.scry(path), + }, + runtime, + botShipName, + getPendingApprovals: () => pendingApprovals, + setPendingApprovals: (approvals) => { + pendingApprovals = approvals; + }, + getCurrentSettings: () => currentSettings, + setCurrentSettings: (settings) => { + currentSettings = settings; + }, + getEffectiveDmAllowlist: () => effectiveDmAllowlist, + setEffectiveDmAllowlist: (ships) => { + effectiveDmAllowlist = ships; + }, + getEffectiveOwnerShip: () => effectiveOwnerShip, + processApprovedMessage: async (approval) => { + if (!approval.originalMessage) { + return; + } + if (approval.type === "dm") { + await processMessage({ + messageId: approval.originalMessage.messageId, + senderShip: approval.requestingShip, + messageText: approval.originalMessage.messageText, + messageContent: approval.originalMessage.messageContent, + isGroup: false, + timestamp: approval.originalMessage.timestamp, + }); + return; + } + if (approval.type === "channel" && approval.channelNest) { + const parsedChannel = parseChannelNest(approval.channelNest); + await processMessage({ + messageId: approval.originalMessage.messageId, + senderShip: approval.requestingShip, + messageText: approval.originalMessage.messageText, + messageContent: approval.originalMessage.messageContent, + isGroup: true, + channelNest: approval.channelNest, + hostShip: parsedChannel?.hostShip, + channelName: parsedChannel?.channelName, + timestamp: approval.originalMessage.timestamp, + parentId: approval.originalMessage.parentId, + isThreadReply: approval.originalMessage.isThreadReply, + }); + } + }, + refreshWatchedChannels, + }); + // Firehose handler for all channel messages (/v2) const handleChannelsFirehose = async (event: any) => { try { diff --git a/src/agents/pi-embedded-runner-extraparams.test.ts b/src/agents/pi-embedded-runner-extraparams.test.ts index b597b0b19a0..dbd95e64d34 100644 --- a/src/agents/pi-embedded-runner-extraparams.test.ts +++ b/src/agents/pi-embedded-runner-extraparams.test.ts @@ -80,10 +80,8 @@ vi.mock("../plugins/provider-runtime.js", async (importOriginal) => { const thinkingLevel = skipReasoningInjection ? undefined : params.context.thinkingLevel; return createOpenRouterSystemCacheWrapper(createOpenRouterWrapper(streamFn, thinkingLevel)); }, - resolveProviderCapabilitiesWithPlugin: (params: { - provider: string; - workspaceDir?: string; - }) => resolveProviderCapabilitiesWithPluginMock(params), + resolveProviderCapabilitiesWithPlugin: (params: { provider: string; workspaceDir?: string }) => + resolveProviderCapabilitiesWithPluginMock(params), }; }); diff --git a/src/agents/pi-embedded-runner/anthropic-stream-wrappers.ts b/src/agents/pi-embedded-runner/anthropic-stream-wrappers.ts index 83a5be8f783..511b70d280d 100644 --- a/src/agents/pi-embedded-runner/anthropic-stream-wrappers.ts +++ b/src/agents/pi-embedded-runner/anthropic-stream-wrappers.ts @@ -89,11 +89,14 @@ function hasOpenAiAnthropicToolPayloadCompatFlag(model: { compat?: unknown }): b ); } -function requiresAnthropicToolPayloadCompatibilityForModel(model: { - api?: unknown; - provider?: unknown; - compat?: unknown; -}, options?: AnthropicToolPayloadResolverOptions): boolean { +function requiresAnthropicToolPayloadCompatibilityForModel( + model: { + api?: unknown; + provider?: unknown; + compat?: unknown; + }, + options?: AnthropicToolPayloadResolverOptions, +): boolean { if (model.api !== "anthropic-messages") { return false; } @@ -107,10 +110,13 @@ function requiresAnthropicToolPayloadCompatibilityForModel(model: { return hasOpenAiAnthropicToolPayloadCompatFlag(model); } -function usesOpenAiFunctionAnthropicToolSchemaForModel(model: { - provider?: unknown; - compat?: unknown; -}, options?: AnthropicToolPayloadResolverOptions): boolean { +function usesOpenAiFunctionAnthropicToolSchemaForModel( + model: { + provider?: unknown; + compat?: unknown; + }, + options?: AnthropicToolPayloadResolverOptions, +): boolean { if ( typeof model.provider === "string" && usesOpenAiFunctionAnthropicToolSchema(model.provider, options) @@ -120,10 +126,13 @@ function usesOpenAiFunctionAnthropicToolSchemaForModel(model: { return hasOpenAiAnthropicToolPayloadCompatFlag(model); } -function usesOpenAiStringModeAnthropicToolChoiceForModel(model: { - provider?: unknown; - compat?: unknown; -}, options?: AnthropicToolPayloadResolverOptions): boolean { +function usesOpenAiStringModeAnthropicToolChoiceForModel( + model: { + provider?: unknown; + compat?: unknown; + }, + options?: AnthropicToolPayloadResolverOptions, +): boolean { if ( typeof model.provider === "string" && usesOpenAiStringModeAnthropicToolChoice(model.provider, options) diff --git a/src/agents/provider-capabilities.ts b/src/agents/provider-capabilities.ts index 0539faf31b8..2fe11666766 100644 --- a/src/agents/provider-capabilities.ts +++ b/src/agents/provider-capabilities.ts @@ -1,6 +1,6 @@ +import type { OpenClawConfig } from "../config/config.js"; import { resolveProviderCapabilitiesWithPlugin } from "../plugins/provider-runtime.js"; import { normalizeProviderId } from "./model-selection.js"; -import type { OpenClawConfig } from "../config/config.js"; export type ProviderCapabilities = { anthropicToolSchemaMode: "native" | "openai-functions"; @@ -125,8 +125,7 @@ export function usesOpenAiStringModeAnthropicToolChoice( options?: ProviderCapabilityLookupOptions, ): boolean { return ( - resolveProviderCapabilities(provider, options).anthropicToolChoiceMode === - "openai-string-modes" + resolveProviderCapabilities(provider, options).anthropicToolChoiceMode === "openai-string-modes" ); } diff --git a/src/channels/plugins/directory-config.ts b/src/channels/plugins/directory-config.ts index e54889076e8..04dc4c5afb2 100644 --- a/src/channels/plugins/directory-config.ts +++ b/src/channels/plugins/directory-config.ts @@ -56,6 +56,10 @@ function normalizeTrimmedSet( .filter((id): id is string => Boolean(id)); } +function objectValues(value: Record | undefined): T[] { + return Object.values(value ?? {}); +} + export async function listSlackDirectoryPeersFromConfig( params: DirectoryConfigParams, ): Promise { @@ -123,9 +127,9 @@ export async function listDiscordDirectoryPeersFromConfig( account.config.allowFrom ?? account.config.dm?.allowFrom, account.config.dms, ); - for (const guild of Object.values(account.config.guilds ?? {})) { + for (const guild of objectValues(account.config.guilds)) { addTrimmedEntries(ids, guild.users ?? []); - for (const channel of Object.values(guild.channels ?? {})) { + for (const channel of objectValues(guild.channels)) { addTrimmedEntries(ids, channel.users ?? []); } } @@ -153,7 +157,7 @@ export async function listDiscordDirectoryGroupsFromConfig( return []; } const ids = new Set(); - for (const guild of Object.values(account.config.guilds ?? {})) { + for (const guild of objectValues(account.config.guilds)) { addTrimmedEntries(ids, Object.keys(guild.channels ?? {})); } diff --git a/src/plugin-sdk/runtime-api-guardrails.test.ts b/src/plugin-sdk/runtime-api-guardrails.test.ts index f2079d8691f..785ed9de224 100644 --- a/src/plugin-sdk/runtime-api-guardrails.test.ts +++ b/src/plugin-sdk/runtime-api-guardrails.test.ts @@ -99,7 +99,8 @@ function readExportStatements(path: string): string[] { return sourceFile.statements.flatMap((statement) => { if (!ts.isExportDeclaration(statement)) { - if (!statement.modifiers?.some((modifier) => modifier.kind === ts.SyntaxKind.ExportKeyword)) { + const modifiers = ts.canHaveModifiers(statement) ? ts.getModifiers(statement) : undefined; + if (!modifiers?.some((modifier) => modifier.kind === ts.SyntaxKind.ExportKeyword)) { return []; } return [statement.getText(sourceFile).replaceAll(/\s+/g, " ").trim()]; diff --git a/test/helpers/extensions/discord-provider.test-support.ts b/test/helpers/extensions/discord-provider.test-support.ts index 2c8ad988d04..21412c91709 100644 --- a/test/helpers/extensions/discord-provider.test-support.ts +++ b/test/helpers/extensions/discord-provider.test-support.ts @@ -248,15 +248,16 @@ export const baseConfig = (): OpenClawConfig => channels: { discord: { accounts: { - default: {}, + default: { + token: "MTIz.abc.def", + }, }, }, }, }) as OpenClawConfig; -vi.mock("@buape/carbon", () => { - class Command {} - class ReadyListener {} +vi.mock("@buape/carbon", async (importOriginal) => { + const actual = await importOriginal(); class RateLimitError extends Error { status = 429; discordCode?: number; @@ -293,7 +294,7 @@ vi.mock("@buape/carbon", () => { return clientGetPluginMock(name); } } - return { Client, Command, RateLimitError, ReadyListener }; + return { ...actual, Client, RateLimitError }; }); vi.mock("@buape/carbon/gateway", () => ({ @@ -463,7 +464,9 @@ vi.mock("../../../extensions/discord/src/monitor/provider.lifecycle.js", () => ( })); vi.mock("../../../extensions/discord/src/monitor/rest-fetch.js", () => ({ - resolveDiscordRestFetch: () => async () => undefined, + resolveDiscordRestFetch: () => async () => { + throw new Error("offline"); + }, })); vi.mock("../../../extensions/discord/src/monitor/thread-bindings.js", () => ({