From 9c79c2c2a7c0ddff2d3a7f383db72bfac1cfec3b Mon Sep 17 00:00:00 2001 From: huntharo Date: Thu, 12 Mar 2026 08:08:30 -0400 Subject: [PATCH] Plugins: add inbound claim and Telegram interaction seams --- extensions/lobster/src/lobster-tool.test.ts | 1 + extensions/telegram/src/bot-handlers.ts | 72 ++++++++ extensions/telegram/src/bot.test.ts | 56 +++++++ extensions/telegram/src/send.test-harness.ts | 4 + extensions/telegram/src/send.test.ts | 42 +++++ extensions/telegram/src/send.ts | 103 ++++++++++++ extensions/test-utils/plugin-runtime-mock.ts | 13 ++ .../reply/dispatch-from-config.test.ts | 57 +++++++ src/auto-reply/reply/dispatch-from-config.ts | 18 ++ src/hooks/message-hook-mappers.ts | 99 +++++++++++ src/plugin-sdk/index.ts | 13 ++ src/plugins/hooks.ts | 53 ++++++ src/plugins/interactive.test.ts | 91 ++++++++++ src/plugins/interactive.ts | 156 ++++++++++++++++++ src/plugins/loader.ts | 2 + src/plugins/registry.ts | 12 ++ src/plugins/runtime/index.test.ts | 6 + src/plugins/runtime/runtime-channel.ts | 29 ++++ .../runtime/runtime-telegram-typing.test.ts | 38 +++++ .../runtime/runtime-telegram-typing.ts | 52 ++++++ src/plugins/runtime/types-channel.ts | 36 ++++ src/plugins/types.ts | 87 ++++++++++ src/plugins/wired-hooks-inbound-claim.test.ts | 69 ++++++++ 23 files changed, 1109 insertions(+) create mode 100644 src/plugins/interactive.test.ts create mode 100644 src/plugins/interactive.ts create mode 100644 src/plugins/runtime/runtime-telegram-typing.test.ts create mode 100644 src/plugins/runtime/runtime-telegram-typing.ts create mode 100644 src/plugins/wired-hooks-inbound-claim.test.ts diff --git a/extensions/lobster/src/lobster-tool.test.ts b/extensions/lobster/src/lobster-tool.test.ts index 40e9a0b64e8..7c62501aa6f 100644 --- a/extensions/lobster/src/lobster-tool.test.ts +++ b/extensions/lobster/src/lobster-tool.test.ts @@ -43,6 +43,7 @@ function fakeApi(overrides: Partial = {}): OpenClawPluginApi registerCli() {}, registerService() {}, registerProvider() {}, + registerInteractiveHandler() {}, registerHook() {}, registerHttpRoute() {}, registerCommand() {}, diff --git a/extensions/telegram/src/bot-handlers.ts b/extensions/telegram/src/bot-handlers.ts index 295c4092ec6..5ed83304524 100644 --- a/extensions/telegram/src/bot-handlers.ts +++ b/extensions/telegram/src/bot-handlers.ts @@ -36,6 +36,7 @@ import { readChannelAllowFromStore } from "../../../src/pairing/pairing-store.js import { resolveAgentRoute } from "../../../src/routing/resolve-route.js"; import { resolveThreadSessionKeys } from "../../../src/routing/session-key.js"; import { applyModelOverrideToSessionEntry } from "../../../src/sessions/model-overrides.js"; +import { dispatchPluginInteractiveHandler } from "../../../src/plugins/interactive.js"; import { withTelegramApiErrorLogging } from "./api-logging.js"; import { isSenderAllowed, @@ -1121,6 +1122,24 @@ export const registerTelegramHandlers = ({ } return await editCallbackMessage(messageText, replyMarkup); }; + const editCallbackButtons = async ( + buttons: Array< + Array<{ text: string; callback_data: string; style?: "danger" | "success" | "primary" }> + >, + ) => { + const keyboard = buildInlineKeyboard(buttons) ?? { inline_keyboard: [] }; + const replyMarkup = { reply_markup: keyboard }; + const editReplyMarkupFn = (ctx as { editMessageReplyMarkup?: unknown }) + .editMessageReplyMarkup; + if (typeof editReplyMarkupFn === "function") { + return await ctx.editMessageReplyMarkup(replyMarkup); + } + return await bot.api.editMessageReplyMarkup( + callbackMessage.chat.id, + callbackMessage.message_id, + replyMarkup, + ); + }; const deleteCallbackMessage = async () => { const deleteFn = (ctx as { deleteMessage?: unknown }).deleteMessage; if (typeof deleteFn === "function") { @@ -1201,6 +1220,59 @@ export const registerTelegramHandlers = ({ return; } + const callbackConversationId = + messageThreadId != null ? `${chatId}:topic:${messageThreadId}` : String(chatId); + const pluginCallback = await dispatchPluginInteractiveHandler({ + channel: "telegram", + data, + callbackId: callback.id, + ctx: { + accountId, + callbackId: callback.id, + conversationId: callbackConversationId, + parentConversationId: messageThreadId != null ? String(chatId) : undefined, + senderId: senderId || undefined, + senderUsername: senderUsername || undefined, + threadId: messageThreadId, + isGroup, + isForum, + auth: { + isAuthorizedSender: true, + }, + callbackMessage: { + messageId: callbackMessage.message_id, + chatId: String(chatId), + messageText: callbackMessage.text ?? callbackMessage.caption, + }, + }, + respond: { + reply: async ({ text, buttons }) => { + await replyToCallbackChat( + text, + buttons ? { reply_markup: buildInlineKeyboard(buttons) } : undefined, + ); + }, + editMessage: async ({ text, buttons }) => { + await editCallbackMessage( + text, + buttons ? { reply_markup: buildInlineKeyboard(buttons) } : undefined, + ); + }, + editButtons: async ({ buttons }) => { + await editCallbackButtons(buttons); + }, + clearButtons: async () => { + await clearCallbackButtons(); + }, + deleteMessage: async () => { + await deleteCallbackMessage(); + }, + }, + }); + if (pluginCallback.handled) { + return; + } + if (isApprovalCallback) { if ( !isTelegramExecApprovalClientEnabled({ cfg, accountId }) || diff --git a/extensions/telegram/src/bot.test.ts b/extensions/telegram/src/bot.test.ts index db19faa8fe3..b8ea5f5b6d9 100644 --- a/extensions/telegram/src/bot.test.ts +++ b/extensions/telegram/src/bot.test.ts @@ -2,6 +2,10 @@ import { rm } from "node:fs/promises"; import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import { escapeRegExp, formatEnvelopeTimestamp } from "../../../test/helpers/envelope-timestamp.js"; import { expectInboundContextContract } from "../../../test/helpers/inbound-contract.js"; +import { + clearPluginInteractiveHandlers, + registerPluginInteractiveHandler, +} from "../plugins/interactive.js"; import { answerCallbackQuerySpy, commandSpy, @@ -49,6 +53,7 @@ describe("createTelegramBot", () => { beforeEach(() => { setMyCommandsSpy.mockClear(); + clearPluginInteractiveHandlers(); loadConfig.mockReturnValue({ agents: { defaults: { @@ -1359,6 +1364,57 @@ describe("createTelegramBot", () => { expect(replySpy).not.toHaveBeenCalled(); }); + + it("routes plugin-owned callback namespaces before synthetic command fallback", async () => { + onSpy.mockClear(); + replySpy.mockClear(); + editMessageTextSpy.mockClear(); + sendMessageSpy.mockClear(); + registerPluginInteractiveHandler("codex-plugin", { + channel: "telegram", + namespace: "codex", + handler: async ({ respond, callback }) => { + await respond.editMessage({ + text: `Handled ${callback.payload}`, + }); + return { handled: true }; + }, + }); + + createTelegramBot({ + token: "tok", + config: { + channels: { + telegram: { + dmPolicy: "open", + allowFrom: ["*"], + }, + }, + }, + }); + const callbackHandler = onSpy.mock.calls.find((call) => call[0] === "callback_query")?.[1] as ( + ctx: Record, + ) => Promise; + + await callbackHandler({ + callbackQuery: { + id: "cbq-codex-1", + data: "codex:resume:thread-1", + from: { id: 9, first_name: "Ada", username: "ada_bot" }, + message: { + chat: { id: 1234, type: "private" }, + date: 1736380800, + message_id: 11, + text: "Select a thread", + }, + }, + me: { username: "openclaw_bot" }, + getFile: async () => ({ download: async () => new Uint8Array() }), + }); + + expect(editMessageTextSpy).toHaveBeenCalledWith(1234, 11, "Handled resume:thread-1", undefined); + expect(replySpy).not.toHaveBeenCalled(); + }); it("sets command target session key for dm topic commands", async () => { onSpy.mockClear(); sendMessageSpy.mockClear(); diff --git a/extensions/telegram/src/send.test-harness.ts b/extensions/telegram/src/send.test-harness.ts index 6d53a3d20e7..604a7d27dd1 100644 --- a/extensions/telegram/src/send.test-harness.ts +++ b/extensions/telegram/src/send.test-harness.ts @@ -4,7 +4,10 @@ import type { MockFn } from "../../../src/test-utils/vitest-mock-fn.js"; const { botApi, botCtorSpy } = vi.hoisted(() => ({ botApi: { deleteMessage: vi.fn(), + editForumTopic: vi.fn(), editMessageText: vi.fn(), + editMessageReplyMarkup: vi.fn(), + pinChatMessage: vi.fn(), sendChatAction: vi.fn(), sendMessage: vi.fn(), sendPoll: vi.fn(), @@ -16,6 +19,7 @@ const { botApi, botCtorSpy } = vi.hoisted(() => ({ sendAnimation: vi.fn(), setMessageReaction: vi.fn(), sendSticker: vi.fn(), + unpinChatMessage: vi.fn(), }, botCtorSpy: vi.fn(), })); diff --git a/extensions/telegram/src/send.test.ts b/extensions/telegram/src/send.test.ts index 7a29ecf07de..8a234ce92cb 100644 --- a/extensions/telegram/src/send.test.ts +++ b/extensions/telegram/src/send.test.ts @@ -16,11 +16,14 @@ const { buildInlineKeyboard, createForumTopicTelegram, editMessageTelegram, + pinMessageTelegram, reactMessageTelegram, + renameForumTopicTelegram, sendMessageTelegram, sendTypingTelegram, sendPollTelegram, sendStickerTelegram, + unpinMessageTelegram, } = await importTelegramSendModule(); async function expectChatNotFoundWithChatId( @@ -215,6 +218,45 @@ describe("sendMessageTelegram", () => { }); }); + it("pins and unpins Telegram messages", async () => { + loadConfig.mockReturnValue({ + channels: { + telegram: { + botToken: "tok", + }, + }, + }); + botApi.pinChatMessage.mockResolvedValue(true); + botApi.unpinChatMessage.mockResolvedValue(true); + + await pinMessageTelegram("-1001234567890", 101, { accountId: "default" }); + await unpinMessageTelegram("-1001234567890", 101, { accountId: "default" }); + + expect(botApi.pinChatMessage).toHaveBeenCalledWith("-1001234567890", 101, { + disable_notification: true, + }); + expect(botApi.unpinChatMessage).toHaveBeenCalledWith("-1001234567890", 101); + }); + + it("renames a Telegram forum topic", async () => { + loadConfig.mockReturnValue({ + channels: { + telegram: { + botToken: "tok", + }, + }, + }); + botApi.editForumTopic.mockResolvedValue(true); + + await renameForumTopicTelegram("-1001234567890", 271, "Codex Thread", { + accountId: "default", + }); + + expect(botApi.editForumTopic).toHaveBeenCalledWith("-1001234567890", 271, { + name: "Codex Thread", + }); + }); + it("applies timeoutSeconds config precedence", async () => { const cases = [ { diff --git a/extensions/telegram/src/send.ts b/extensions/telegram/src/send.ts index e7d2c48e9fc..89d6f7d337d 100644 --- a/extensions/telegram/src/send.ts +++ b/extensions/telegram/src/send.ts @@ -1067,6 +1067,109 @@ export async function deleteMessageTelegram( return { ok: true }; } +export async function pinMessageTelegram( + chatIdInput: string | number, + messageIdInput: string | number, + opts: TelegramDeleteOpts = {}, +): Promise<{ ok: true; messageId: string; chatId: string }> { + const { cfg, account, api } = resolveTelegramApiContext(opts); + const rawTarget = String(chatIdInput); + const chatId = await resolveAndPersistChatId({ + cfg, + api, + lookupTarget: rawTarget, + persistTarget: rawTarget, + verbose: opts.verbose, + }); + const messageId = normalizeMessageId(messageIdInput); + const requestWithDiag = createTelegramRequestWithDiag({ + cfg, + account, + retry: opts.retry, + verbose: opts.verbose, + }); + await requestWithDiag( + () => api.pinChatMessage(chatId, messageId, { disable_notification: true }), + "pinChatMessage", + ); + logVerbose(`[telegram] Pinned message ${messageId} in chat ${chatId}`); + return { ok: true, messageId: String(messageId), chatId }; +} + +export async function unpinMessageTelegram( + chatIdInput: string | number, + messageIdInput?: string | number, + opts: TelegramDeleteOpts = {}, +): Promise<{ ok: true; chatId: string; messageId?: string }> { + const { cfg, account, api } = resolveTelegramApiContext(opts); + const rawTarget = String(chatIdInput); + const chatId = await resolveAndPersistChatId({ + cfg, + api, + lookupTarget: rawTarget, + persistTarget: rawTarget, + verbose: opts.verbose, + }); + const messageId = messageIdInput === undefined ? undefined : normalizeMessageId(messageIdInput); + const requestWithDiag = createTelegramRequestWithDiag({ + cfg, + account, + retry: opts.retry, + verbose: opts.verbose, + }); + await requestWithDiag(() => api.unpinChatMessage(chatId, messageId), "unpinChatMessage"); + logVerbose( + `[telegram] Unpinned ${messageId != null ? `message ${messageId}` : "active message"} in chat ${chatId}`, + ); + return { + ok: true, + chatId, + ...(messageId != null ? { messageId: String(messageId) } : {}), + }; +} + +export async function renameForumTopicTelegram( + chatIdInput: string | number, + messageThreadIdInput: string | number, + name: string, + opts: TelegramDeleteOpts = {}, +): Promise<{ ok: true; chatId: string; messageThreadId: number; name: string }> { + const trimmedName = name.trim(); + if (!trimmedName) { + throw new Error("Telegram forum topic name is required"); + } + if (trimmedName.length > 128) { + throw new Error("Telegram forum topic name must be 128 characters or fewer"); + } + const { cfg, account, api } = resolveTelegramApiContext(opts); + const rawTarget = String(chatIdInput); + const chatId = await resolveAndPersistChatId({ + cfg, + api, + lookupTarget: rawTarget, + persistTarget: rawTarget, + verbose: opts.verbose, + }); + const messageThreadId = normalizeMessageId(messageThreadIdInput); + const requestWithDiag = createTelegramRequestWithDiag({ + cfg, + account, + retry: opts.retry, + verbose: opts.verbose, + }); + await requestWithDiag( + () => api.editForumTopic(chatId, messageThreadId, { name: trimmedName }), + "editForumTopic", + ); + logVerbose(`[telegram] Renamed forum topic ${messageThreadId} in chat ${chatId}`); + return { + ok: true, + chatId, + messageThreadId, + name: trimmedName, + }; +} + type TelegramEditOpts = { token?: string; accountId?: string; diff --git a/extensions/test-utils/plugin-runtime-mock.ts b/extensions/test-utils/plugin-runtime-mock.ts index 81e3fdedeec..a9b6f7001d4 100644 --- a/extensions/test-utils/plugin-runtime-mock.ts +++ b/extensions/test-utils/plugin-runtime-mock.ts @@ -69,6 +69,19 @@ export function createPluginRuntimeMock(overrides: DeepPartial = registerMemoryCli: vi.fn() as unknown as PluginRuntime["tools"]["registerMemoryCli"], }, channel: { + bindings: { + bind: vi.fn(), + getCapabilities: vi.fn(() => ({ + adapterAvailable: true, + bindSupported: true, + unbindSupported: true, + placements: ["current", "child"] as Array<"current" | "child">, + })), + listBySession: vi.fn(() => []), + resolveByConversation: vi.fn(() => null), + touch: vi.fn(), + unbind: vi.fn(() => Promise.resolve([])), + }, text: { chunkByNewline: vi.fn((text: string) => (text ? [text] : [])), chunkMarkdownText: vi.fn((text: string) => [text]), diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index 666964eb865..f10e9803854 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -25,6 +25,7 @@ const diagnosticMocks = vi.hoisted(() => ({ const hookMocks = vi.hoisted(() => ({ runner: { hasHooks: vi.fn(() => false), + runInboundClaim: vi.fn(async () => undefined), runMessageReceived: vi.fn(async () => {}), }, })); @@ -239,6 +240,8 @@ describe("dispatchReplyFromConfig", () => { diagnosticMocks.logSessionStateChange.mockClear(); hookMocks.runner.hasHooks.mockClear(); hookMocks.runner.hasHooks.mockReturnValue(false); + hookMocks.runner.runInboundClaim.mockClear(); + hookMocks.runner.runInboundClaim.mockResolvedValue(undefined); hookMocks.runner.runMessageReceived.mockClear(); internalHookMocks.createInternalHookEvent.mockClear(); internalHookMocks.createInternalHookEvent.mockImplementation(createInternalHookEventPayload); @@ -1861,6 +1864,60 @@ describe("dispatchReplyFromConfig", () => { ); }); + it("lets a plugin claim inbound traffic before core commands and agent dispatch", async () => { + setNoAbort(); + hookMocks.runner.hasHooks.mockImplementation( + ((hookName?: string) => hookName === "inbound_claim") as () => boolean, + ); + hookMocks.runner.runInboundClaim.mockResolvedValue({ handled: true } as never); + const cfg = emptyConfig; + const dispatcher = createDispatcher(); + const ctx = buildTestCtx({ + Provider: "telegram", + Surface: "telegram", + OriginatingChannel: "telegram", + OriginatingTo: "telegram:-10099", + To: "telegram:-10099", + AccountId: "default", + SenderId: "user-9", + SenderUsername: "ada", + MessageThreadId: 77, + CommandAuthorized: true, + WasMentioned: true, + CommandBody: "who are you", + RawBody: "who are you", + Body: "who are you", + MessageSid: "msg-claim-1", + }); + const replyResolver = vi.fn(async () => ({ text: "should not run" }) satisfies ReplyPayload); + + const result = await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); + + expect(result).toEqual({ queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } }); + expect(hookMocks.runner.runInboundClaim).toHaveBeenCalledWith( + expect.objectContaining({ + content: "who are you", + channel: "telegram", + accountId: "default", + conversationId: "-10099:topic:77", + parentConversationId: "-10099", + senderId: "user-9", + commandAuthorized: true, + wasMentioned: true, + }), + expect.objectContaining({ + channelId: "telegram", + accountId: "default", + conversationId: "-10099:topic:77", + parentConversationId: "-10099", + senderId: "user-9", + messageId: "msg-claim-1", + }), + ); + expect(replyResolver).not.toHaveBeenCalled(); + expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); + }); + it("emits internal message:received hook when a session key is available", async () => { setNoAbort(); const cfg = emptyConfig; diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index 5b679fa59e5..43d515ad7c7 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -13,6 +13,8 @@ import { fireAndForgetHook } from "../../hooks/fire-and-forget.js"; import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js"; import { deriveInboundMessageHookContext, + toPluginInboundClaimContext, + toPluginInboundClaimEvent, toInternalMessageReceivedContext, toPluginMessageContext, toPluginMessageReceivedEvent, @@ -191,6 +193,22 @@ export async function dispatchReplyFromConfig(params: { const hookContext = deriveInboundMessageHookContext(ctx, { messageId: messageIdForHook }); const { isGroup, groupId } = hookContext; + if (hookRunner?.hasHooks("inbound_claim")) { + const inboundClaim = await hookRunner.runInboundClaim( + toPluginInboundClaimEvent(hookContext, { + commandAuthorized: + typeof ctx.CommandAuthorized === "boolean" ? ctx.CommandAuthorized : undefined, + wasMentioned: typeof ctx.WasMentioned === "boolean" ? ctx.WasMentioned : undefined, + }), + toPluginInboundClaimContext(hookContext), + ); + if (inboundClaim?.handled) { + markIdle("plugin_claim"); + recordProcessed("completed", { reason: "plugin-claimed" }); + return { queuedFinal: false, counts: dispatcher.getQueuedCounts() }; + } + } + // Trigger plugin hooks (fire-and-forget) if (hookRunner?.hasHooks("message_received")) { fireAndForgetHook( diff --git a/src/hooks/message-hook-mappers.ts b/src/hooks/message-hook-mappers.ts index 1cdd12a93ac..987c20ffb0a 100644 --- a/src/hooks/message-hook-mappers.ts +++ b/src/hooks/message-hook-mappers.ts @@ -1,6 +1,8 @@ import type { FinalizedMsgContext } from "../auto-reply/templating.js"; import type { OpenClawConfig } from "../config/config.js"; import type { + PluginHookInboundClaimContext, + PluginHookInboundClaimEvent, PluginHookMessageContext, PluginHookMessageReceivedEvent, PluginHookMessageSentEvent, @@ -147,6 +149,103 @@ export function toPluginMessageContext( }; } +function stripChannelPrefix(value: string | undefined, channelId: string): string | undefined { + if (!value) { + return undefined; + } + const prefix = `${channelId}:`; + return value.startsWith(prefix) ? value.slice(prefix.length) : value; +} + +function deriveParentConversationId( + canonical: CanonicalInboundMessageHookContext, +): string | undefined { + if (canonical.channelId !== "telegram") { + return undefined; + } + if (typeof canonical.threadId !== "number" && typeof canonical.threadId !== "string") { + return undefined; + } + return stripChannelPrefix( + canonical.to ?? canonical.originatingTo ?? canonical.conversationId, + "telegram", + ); +} + +function deriveConversationId(canonical: CanonicalInboundMessageHookContext): string | undefined { + const baseConversationId = stripChannelPrefix( + canonical.to ?? canonical.originatingTo ?? canonical.conversationId, + canonical.channelId, + ); + if (canonical.channelId === "telegram" && baseConversationId) { + const threadId = + typeof canonical.threadId === "number" || typeof canonical.threadId === "string" + ? String(canonical.threadId).trim() + : ""; + if (threadId) { + return `${baseConversationId}:topic:${threadId}`; + } + } + return baseConversationId; +} + +export function toPluginInboundClaimContext( + canonical: CanonicalInboundMessageHookContext, +): PluginHookInboundClaimContext { + const conversationId = deriveConversationId(canonical); + return { + channelId: canonical.channelId, + accountId: canonical.accountId, + conversationId, + parentConversationId: deriveParentConversationId(canonical), + senderId: canonical.senderId, + messageId: canonical.messageId, + }; +} + +export function toPluginInboundClaimEvent( + canonical: CanonicalInboundMessageHookContext, + extras?: { + commandAuthorized?: boolean; + wasMentioned?: boolean; + }, +): PluginHookInboundClaimEvent { + const context = toPluginInboundClaimContext(canonical); + return { + content: canonical.content, + body: canonical.body, + bodyForAgent: canonical.bodyForAgent, + transcript: canonical.transcript, + timestamp: canonical.timestamp, + channel: canonical.channelId, + accountId: canonical.accountId, + conversationId: context.conversationId, + parentConversationId: context.parentConversationId, + senderId: canonical.senderId, + senderName: canonical.senderName, + senderUsername: canonical.senderUsername, + threadId: canonical.threadId, + messageId: canonical.messageId, + isGroup: canonical.isGroup, + commandAuthorized: extras?.commandAuthorized, + wasMentioned: extras?.wasMentioned, + metadata: { + from: canonical.from, + to: canonical.to, + provider: canonical.provider, + surface: canonical.surface, + originatingChannel: canonical.originatingChannel, + originatingTo: canonical.originatingTo, + senderE164: canonical.senderE164, + mediaPath: canonical.mediaPath, + mediaType: canonical.mediaType, + guildId: canonical.guildId, + channelName: canonical.channelName, + groupId: canonical.groupId, + }, + }; +} + export function toPluginMessageReceivedEvent( canonical: CanonicalInboundMessageHookContext, ): PluginHookMessageReceivedEvent { diff --git a/src/plugin-sdk/index.ts b/src/plugin-sdk/index.ts index eaae5d08968..4c0cf3c6635 100644 --- a/src/plugin-sdk/index.ts +++ b/src/plugin-sdk/index.ts @@ -100,10 +100,23 @@ export type { OpenClawPluginApi, OpenClawPluginService, OpenClawPluginServiceContext, + PluginHookInboundClaimContext, + PluginHookInboundClaimEvent, + PluginHookInboundClaimResult, + PluginInteractiveHandlerRegistration, + PluginInteractiveTelegramHandlerContext, PluginLogger, ProviderAuthContext, ProviderAuthResult, } from "../plugins/types.js"; +export type { + ConversationRef, + SessionBindingBindInput, + SessionBindingCapabilities, + SessionBindingRecord, + SessionBindingService, + SessionBindingUnbindInput, +} from "../infra/outbound/session-binding-service.js"; export type { GatewayRequestHandler, GatewayRequestHandlerOptions, diff --git a/src/plugins/hooks.ts b/src/plugins/hooks.ts index 4d74267d4ca..c32496f5b4a 100644 --- a/src/plugins/hooks.ts +++ b/src/plugins/hooks.ts @@ -19,6 +19,9 @@ import type { PluginHookBeforePromptBuildEvent, PluginHookBeforePromptBuildResult, PluginHookBeforeCompactionEvent, + PluginHookInboundClaimContext, + PluginHookInboundClaimEvent, + PluginHookInboundClaimResult, PluginHookLlmInputEvent, PluginHookLlmOutputEvent, PluginHookBeforeResetEvent, @@ -66,6 +69,9 @@ export type { PluginHookAgentEndEvent, PluginHookBeforeCompactionEvent, PluginHookBeforeResetEvent, + PluginHookInboundClaimContext, + PluginHookInboundClaimEvent, + PluginHookInboundClaimResult, PluginHookAfterCompactionEvent, PluginHookMessageContext, PluginHookMessageReceivedEvent, @@ -263,6 +269,37 @@ export function createHookRunner(registry: PluginRegistry, options: HookRunnerOp return result; } + /** + * Run a sequential claim hook where the first `{ handled: true }` result wins. + */ + async function runClaimingHook( + hookName: K, + event: Parameters["handler"]>>[0], + ctx: Parameters["handler"]>>[1], + ): Promise { + const hooks = getHooksForName(registry, hookName); + if (hooks.length === 0) { + return undefined; + } + + logger?.debug?.(`[hooks] running ${hookName} (${hooks.length} handlers, first-claim wins)`); + + for (const hook of hooks) { + try { + const handlerResult = await ( + hook.handler as (event: unknown, ctx: unknown) => Promise + )(event, ctx); + if (handlerResult?.handled) { + return handlerResult; + } + } catch (err) { + handleHookError({ hookName, pluginId: hook.pluginId, error: err }); + } + } + + return undefined; + } + // ========================================================================= // Agent Hooks // ========================================================================= @@ -384,6 +421,21 @@ export function createHookRunner(registry: PluginRegistry, options: HookRunnerOp // Message Hooks // ========================================================================= + /** + * Run inbound_claim hook. + * Allows plugins to claim an inbound event before commands/agent dispatch. + */ + async function runInboundClaim( + event: PluginHookInboundClaimEvent, + ctx: PluginHookInboundClaimContext, + ): Promise { + return runClaimingHook<"inbound_claim", PluginHookInboundClaimResult>( + "inbound_claim", + event, + ctx, + ); + } + /** * Run message_received hook. * Runs in parallel (fire-and-forget). @@ -734,6 +786,7 @@ export function createHookRunner(registry: PluginRegistry, options: HookRunnerOp runAfterCompaction, runBeforeReset, // Message hooks + runInboundClaim, runMessageReceived, runMessageSending, runMessageSent, diff --git a/src/plugins/interactive.test.ts b/src/plugins/interactive.test.ts new file mode 100644 index 00000000000..acd225134c9 --- /dev/null +++ b/src/plugins/interactive.test.ts @@ -0,0 +1,91 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { + clearPluginInteractiveHandlers, + dispatchPluginInteractiveHandler, + registerPluginInteractiveHandler, +} from "./interactive.js"; + +describe("plugin interactive handlers", () => { + beforeEach(() => { + clearPluginInteractiveHandlers(); + }); + + it("routes Telegram callbacks by namespace and dedupes callback ids", async () => { + const handler = vi.fn(async () => ({ handled: true })); + expect( + registerPluginInteractiveHandler("codex-plugin", { + channel: "telegram", + namespace: "codex", + handler, + }), + ).toEqual({ ok: true }); + + const baseParams = { + channel: "telegram" as const, + data: "codex:resume:thread-1", + callbackId: "cb-1", + ctx: { + accountId: "default", + callbackId: "cb-1", + conversationId: "-10099:topic:77", + parentConversationId: "-10099", + senderId: "user-1", + senderUsername: "ada", + threadId: 77, + isGroup: true, + isForum: true, + auth: { isAuthorizedSender: true }, + callbackMessage: { + messageId: 55, + chatId: "-10099", + messageText: "Pick a thread", + }, + }, + respond: { + reply: vi.fn(async () => {}), + editMessage: vi.fn(async () => {}), + editButtons: vi.fn(async () => {}), + clearButtons: vi.fn(async () => {}), + deleteMessage: vi.fn(async () => {}), + }, + }; + + const first = await dispatchPluginInteractiveHandler(baseParams); + const duplicate = await dispatchPluginInteractiveHandler(baseParams); + + expect(first).toEqual({ matched: true, handled: true, duplicate: false }); + expect(duplicate).toEqual({ matched: true, handled: true, duplicate: true }); + expect(handler).toHaveBeenCalledTimes(1); + expect(handler).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "telegram", + conversationId: "-10099:topic:77", + callback: expect.objectContaining({ + namespace: "codex", + payload: "resume:thread-1", + chatId: "-10099", + messageId: 55, + }), + }), + ); + }); + + it("rejects duplicate namespace registrations", () => { + const first = registerPluginInteractiveHandler("plugin-a", { + channel: "telegram", + namespace: "codex", + handler: async () => ({ handled: true }), + }); + const second = registerPluginInteractiveHandler("plugin-b", { + channel: "telegram", + namespace: "codex", + handler: async () => ({ handled: true }), + }); + + expect(first).toEqual({ ok: true }); + expect(second).toEqual({ + ok: false, + error: 'Interactive handler namespace "codex" already registered by plugin "plugin-a"', + }); + }); +}); diff --git a/src/plugins/interactive.ts b/src/plugins/interactive.ts new file mode 100644 index 00000000000..3fe7f8f1ef5 --- /dev/null +++ b/src/plugins/interactive.ts @@ -0,0 +1,156 @@ +import { createDedupeCache } from "../infra/dedupe.js"; +import type { + PluginInteractiveButtons, + PluginInteractiveHandlerRegistration, + PluginInteractiveTelegramHandlerContext, +} from "./types.js"; + +type RegisteredInteractiveHandler = PluginInteractiveHandlerRegistration & { + pluginId: string; +}; + +type InteractiveRegistrationResult = { + ok: boolean; + error?: string; +}; + +type InteractiveDispatchResult = + | { matched: false; handled: false; duplicate: false } + | { matched: true; handled: boolean; duplicate: boolean }; + +const interactiveHandlers = new Map(); +const callbackDedupe = createDedupeCache({ + ttlMs: 5 * 60_000, + maxSize: 4096, +}); + +function toRegistryKey(channel: string, namespace: string): string { + return `${channel.trim().toLowerCase()}:${namespace.trim()}`; +} + +function normalizeNamespace(namespace: string): string { + return namespace.trim(); +} + +function validateNamespace(namespace: string): string | null { + if (!namespace.trim()) { + return "Interactive handler namespace cannot be empty"; + } + if (!/^[A-Za-z0-9._-]+$/.test(namespace.trim())) { + return "Interactive handler namespace must contain only letters, numbers, dots, underscores, and hyphens"; + } + return null; +} + +function resolveNamespaceMatch( + channel: string, + data: string, +): { registration: RegisteredInteractiveHandler; namespace: string; payload: string } | null { + const trimmedData = data.trim(); + if (!trimmedData) { + return null; + } + + const separatorIndex = trimmedData.indexOf(":"); + const namespace = + separatorIndex >= 0 ? trimmedData.slice(0, separatorIndex) : normalizeNamespace(trimmedData); + const registration = interactiveHandlers.get(toRegistryKey(channel, namespace)); + if (!registration) { + return null; + } + + return { + registration, + namespace, + payload: separatorIndex >= 0 ? trimmedData.slice(separatorIndex + 1) : "", + }; +} + +export function registerPluginInteractiveHandler( + pluginId: string, + registration: PluginInteractiveHandlerRegistration, +): InteractiveRegistrationResult { + const namespace = normalizeNamespace(registration.namespace); + const validationError = validateNamespace(namespace); + if (validationError) { + return { ok: false, error: validationError }; + } + const key = toRegistryKey(registration.channel, namespace); + const existing = interactiveHandlers.get(key); + if (existing) { + return { + ok: false, + error: `Interactive handler namespace "${namespace}" already registered by plugin "${existing.pluginId}"`, + }; + } + interactiveHandlers.set(key, { + ...registration, + namespace, + channel: registration.channel, + pluginId, + }); + return { ok: true }; +} + +export function clearPluginInteractiveHandlers(): void { + interactiveHandlers.clear(); + callbackDedupe.clear(); +} + +export function clearPluginInteractiveHandlersForPlugin(pluginId: string): void { + for (const [key, value] of interactiveHandlers.entries()) { + if (value.pluginId === pluginId) { + interactiveHandlers.delete(key); + } + } +} + +export async function dispatchPluginInteractiveHandler(params: { + channel: "telegram"; + data: string; + callbackId: string; + ctx: Omit & { + callbackMessage: { + messageId: number; + chatId: string; + messageText?: string; + }; + }; + respond: { + reply: (params: { text: string; buttons?: PluginInteractiveButtons }) => Promise; + editMessage: (params: { text: string; buttons?: PluginInteractiveButtons }) => Promise; + editButtons: (params: { buttons: PluginInteractiveButtons }) => Promise; + clearButtons: () => Promise; + deleteMessage: () => Promise; + }; +}): Promise { + const match = resolveNamespaceMatch(params.channel, params.data); + if (!match) { + return { matched: false, handled: false, duplicate: false }; + } + + if (callbackDedupe.check(params.callbackId)) { + return { matched: true, handled: true, duplicate: true }; + } + + const { callbackMessage, ...handlerContext } = params.ctx; + const result = await match.registration.handler({ + ...handlerContext, + channel: "telegram", + callback: { + data: params.data, + namespace: match.namespace, + payload: match.payload, + messageId: callbackMessage.messageId, + chatId: callbackMessage.chatId, + messageText: callbackMessage.messageText, + }, + respond: params.respond, + }); + + return { + matched: true, + handled: result?.handled ?? true, + duplicate: false, + }; +} diff --git a/src/plugins/loader.ts b/src/plugins/loader.ts index 20d5772d3f7..c3bb84aabac 100644 --- a/src/plugins/loader.ts +++ b/src/plugins/loader.ts @@ -19,6 +19,7 @@ import { } from "./config-state.js"; import { discoverOpenClawPlugins } from "./discovery.js"; import { initializeGlobalHookRunner } from "./hook-runner-global.js"; +import { clearPluginInteractiveHandlers } from "./interactive.js"; import { loadPluginManifestRegistry } from "./manifest-registry.js"; import { isPathInside, safeStatSync } from "./path-safety.js"; import { createPluginRegistry, type PluginRecord, type PluginRegistry } from "./registry.js"; @@ -653,6 +654,7 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi // Clear previously registered plugin commands before reloading clearPluginCommands(); + clearPluginInteractiveHandlers(); // Lazily initialize the runtime so startup paths that discover/skip plugins do // not eagerly load every channel runtime dependency. diff --git a/src/plugins/registry.ts b/src/plugins/registry.ts index fe978d6a346..d07970cc74d 100644 --- a/src/plugins/registry.ts +++ b/src/plugins/registry.ts @@ -14,6 +14,7 @@ import { registerPluginCommand } from "./commands.js"; import { normalizePluginHttpPath } from "./http-path.js"; import { findOverlappingPluginHttpRoute } from "./http-route-overlap.js"; import { normalizeRegisteredProvider } from "./provider-validation.js"; +import { registerPluginInteractiveHandler } from "./interactive.js"; import type { PluginRuntime } from "./runtime/types.js"; import { defaultSlotIdForKey } from "./slots.js"; import { @@ -653,6 +654,17 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { registerGatewayMethod: (method, handler) => registerGatewayMethod(record, method, handler), registerCli: (registrar, opts) => registerCli(record, registrar, opts), registerService: (service) => registerService(record, service), + registerInteractiveHandler: (registration) => { + const result = registerPluginInteractiveHandler(record.id, registration); + if (!result.ok) { + pushDiagnostic({ + level: "warn", + pluginId: record.id, + source: record.source, + message: result.error ?? "interactive handler registration failed", + }); + } + }, registerCommand: (command) => registerCommand(record, command), registerContextEngine: (id, factory) => { if (id === defaultSlotIdForKey("contextEngine")) { diff --git a/src/plugins/runtime/index.test.ts b/src/plugins/runtime/index.test.ts index 5ec2df28199..29ecbdfec5b 100644 --- a/src/plugins/runtime/index.test.ts +++ b/src/plugins/runtime/index.test.ts @@ -1,6 +1,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import { onAgentEvent } from "../../infra/agent-events.js"; import { requestHeartbeatNow } from "../../infra/heartbeat-wake.js"; +import { getSessionBindingService } from "../../infra/outbound/session-binding-service.js"; import { onSessionTranscriptUpdate } from "../../sessions/transcript-events.js"; const runCommandWithTimeoutMock = vi.hoisted(() => vi.fn()); @@ -49,6 +50,11 @@ describe("plugin runtime command execution", () => { expect(runtime.events.onSessionTranscriptUpdate).toBe(onSessionTranscriptUpdate); }); + it("exposes runtime.channel.bindings", () => { + const runtime = createPluginRuntime(); + expect(runtime.channel.bindings).toBe(getSessionBindingService()); + }); + it("exposes runtime.system.requestHeartbeatNow", () => { const runtime = createPluginRuntime(); expect(runtime.system.requestHeartbeatNow).toBe(requestHeartbeatNow); diff --git a/src/plugins/runtime/runtime-channel.ts b/src/plugins/runtime/runtime-channel.ts index 53a8f0ca936..dd1e16e3302 100644 --- a/src/plugins/runtime/runtime-channel.ts +++ b/src/plugins/runtime/runtime-channel.ts @@ -85,6 +85,7 @@ import { updateLastRoute, } from "../../config/sessions.js"; import { getChannelActivity, recordChannelActivity } from "../../infra/channel-activity.js"; +import { getSessionBindingService } from "../../infra/outbound/session-binding-service.js"; import { listLineAccountIds, normalizeAccountId as normalizeLineAccountId, @@ -118,6 +119,7 @@ import type { PluginRuntime } from "./types.js"; export function createRuntimeChannel(): PluginRuntime["channel"] { return { + bindings: getSessionBindingService(), text: { chunkByNewline, chunkMarkdownText, @@ -230,6 +232,33 @@ export function createRuntimeChannel(): PluginRuntime["channel"] { sendPollTelegram, monitorTelegramProvider, messageActions: telegramMessageActions, + typing: { + pulse: sendTypingTelegram, + start: async ({ to, accountId, cfg, intervalMs, messageThreadId }) => + await createTelegramTypingLease({ + to, + accountId, + cfg, + intervalMs, + messageThreadId, + pulse: async ({ to, accountId, cfg, messageThreadId }) => + await sendTypingTelegram(to, { + accountId, + cfg, + messageThreadId, + }), + }), + }, + conversationActions: { + editMessage: editMessageTelegram, + editReplyMarkup: editMessageReplyMarkupTelegram, + clearReplyMarkup: async (chatIdInput, messageIdInput, opts = {}) => + await editMessageReplyMarkupTelegram(chatIdInput, messageIdInput, [], opts), + deleteMessage: deleteMessageTelegram, + renameTopic: renameForumTopicTelegram, + pinMessage: pinMessageTelegram, + unpinMessage: unpinMessageTelegram, + }, }, signal: { probeSignal, diff --git a/src/plugins/runtime/runtime-telegram-typing.test.ts b/src/plugins/runtime/runtime-telegram-typing.test.ts new file mode 100644 index 00000000000..a472a47096b --- /dev/null +++ b/src/plugins/runtime/runtime-telegram-typing.test.ts @@ -0,0 +1,38 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { createTelegramTypingLease } from "./runtime-telegram-typing.js"; + +describe("createTelegramTypingLease", () => { + afterEach(() => { + vi.useRealTimers(); + }); + + it("pulses immediately and keeps leases independent", async () => { + vi.useFakeTimers(); + const pulse = vi.fn(async () => undefined); + + const leaseA = await createTelegramTypingLease({ + to: "telegram:123", + intervalMs: 2_000, + pulse, + }); + const leaseB = await createTelegramTypingLease({ + to: "telegram:123", + intervalMs: 2_000, + pulse, + }); + + expect(pulse).toHaveBeenCalledTimes(2); + + await vi.advanceTimersByTimeAsync(2_000); + expect(pulse).toHaveBeenCalledTimes(4); + + leaseA.stop(); + await vi.advanceTimersByTimeAsync(2_000); + expect(pulse).toHaveBeenCalledTimes(5); + + await leaseB.refresh(); + expect(pulse).toHaveBeenCalledTimes(6); + + leaseB.stop(); + }); +}); diff --git a/src/plugins/runtime/runtime-telegram-typing.ts b/src/plugins/runtime/runtime-telegram-typing.ts new file mode 100644 index 00000000000..c9434316840 --- /dev/null +++ b/src/plugins/runtime/runtime-telegram-typing.ts @@ -0,0 +1,52 @@ +import type { OpenClawConfig } from "../../config/config.js"; + +export type CreateTelegramTypingLeaseParams = { + to: string; + accountId?: string; + cfg?: OpenClawConfig; + intervalMs?: number; + messageThreadId?: number; + pulse: (params: { + to: string; + accountId?: string; + cfg?: OpenClawConfig; + messageThreadId?: number; + }) => Promise; +}; + +export async function createTelegramTypingLease(params: CreateTelegramTypingLeaseParams): Promise<{ + refresh: () => Promise; + stop: () => void; +}> { + const intervalMs = Math.max(1000, Math.floor(params.intervalMs ?? 4_000)); + let stopped = false; + + const refresh = async () => { + if (stopped) { + return; + } + await params.pulse({ + to: params.to, + accountId: params.accountId, + cfg: params.cfg, + messageThreadId: params.messageThreadId, + }); + }; + + await refresh(); + + const timer = setInterval(() => { + void refresh(); + }, intervalMs); + + return { + refresh, + stop: () => { + if (stopped) { + return; + } + stopped = true; + clearInterval(timer); + }, + }; +} diff --git a/src/plugins/runtime/types-channel.ts b/src/plugins/runtime/types-channel.ts index bf2f2387d46..bc4add449a4 100644 --- a/src/plugins/runtime/types-channel.ts +++ b/src/plugins/runtime/types-channel.ts @@ -2,6 +2,8 @@ type ReadChannelAllowFromStore = typeof import("../../pairing/pairing-store.js").readChannelAllowFromStore; type UpsertChannelPairingRequest = typeof import("../../pairing/pairing-store.js").upsertChannelPairingRequest; +type SessionBindingService = + typeof import("../../infra/outbound/session-binding-service.js").getSessionBindingService; type ReadChannelAllowFromStoreForAccount = (params: { channel: Parameters[0]; @@ -14,6 +16,7 @@ type UpsertChannelPairingRequestForAccount = ( ) => ReturnType; export type PluginRuntimeChannel = { + bindings: ReturnType; text: { chunkByNewline: typeof import("../../auto-reply/chunk.js").chunkByNewline; chunkMarkdownText: typeof import("../../auto-reply/chunk.js").chunkMarkdownText; @@ -117,6 +120,39 @@ export type PluginRuntimeChannel = { sendPollTelegram: typeof import("../../../extensions/telegram/src/send.js").sendPollTelegram; monitorTelegramProvider: typeof import("../../../extensions/telegram/src/monitor.js").monitorTelegramProvider; messageActions: typeof import("../../channels/plugins/actions/telegram.js").telegramMessageActions; + typing: { + pulse: typeof import("../../telegram/send.js").sendTypingTelegram; + start: (params: { + to: string; + accountId?: string; + cfg?: ReturnType; + intervalMs?: number; + messageThreadId?: number; + }) => Promise<{ + refresh: () => Promise; + stop: () => void; + }>; + }; + conversationActions: { + editMessage: typeof import("../../telegram/send.js").editMessageTelegram; + editReplyMarkup: typeof import("../../telegram/send.js").editMessageReplyMarkupTelegram; + clearReplyMarkup: ( + chatIdInput: string | number, + messageIdInput: string | number, + opts?: { + token?: string; + accountId?: string; + verbose?: boolean; + api?: Partial; + retry?: import("../../infra/retry.js").RetryConfig; + cfg?: ReturnType; + }, + ) => Promise<{ ok: true; messageId: string; chatId: string }>; + deleteMessage: typeof import("../../telegram/send.js").deleteMessageTelegram; + renameTopic: typeof import("../../telegram/send.js").renameForumTopicTelegram; + pinMessage: typeof import("../../telegram/send.js").pinMessageTelegram; + unpinMessage: typeof import("../../telegram/send.js").unpinMessageTelegram; + }; }; signal: { probeSignal: typeof import("../../../extensions/signal/src/probe.js").probeSignal; diff --git a/src/plugins/types.ts b/src/plugins/types.ts index 40e3de13529..1fde7d8846f 100644 --- a/src/plugins/types.ts +++ b/src/plugins/types.ts @@ -305,6 +305,55 @@ export type OpenClawPluginCommandDefinition = { handler: PluginCommandHandler; }; +export type PluginInteractiveChannel = "telegram"; + +export type PluginInteractiveButtons = Array< + Array<{ text: string; callback_data: string; style?: "danger" | "success" | "primary" }> +>; + +export type PluginInteractiveTelegramHandlerResult = { + handled?: boolean; +} | void; + +export type PluginInteractiveTelegramHandlerContext = { + channel: "telegram"; + accountId: string; + callbackId: string; + conversationId: string; + parentConversationId?: string; + senderId?: string; + senderUsername?: string; + threadId?: number; + isGroup: boolean; + isForum: boolean; + auth: { + isAuthorizedSender: boolean; + }; + callback: { + data: string; + namespace: string; + payload: string; + messageId: number; + chatId: string; + messageText?: string; + }; + respond: { + reply: (params: { text: string; buttons?: PluginInteractiveButtons }) => Promise; + editMessage: (params: { text: string; buttons?: PluginInteractiveButtons }) => Promise; + editButtons: (params: { buttons: PluginInteractiveButtons }) => Promise; + clearButtons: () => Promise; + deleteMessage: () => Promise; + }; +}; + +export type PluginInteractiveHandlerRegistration = { + channel: PluginInteractiveChannel; + namespace: string; + handler: ( + ctx: PluginInteractiveTelegramHandlerContext, + ) => Promise | PluginInteractiveTelegramHandlerResult; +}; + export type OpenClawPluginHttpRouteAuth = "gateway" | "plugin"; export type OpenClawPluginHttpRouteMatch = "exact" | "prefix"; @@ -388,6 +437,7 @@ export type OpenClawPluginApi = { registerCli: (registrar: OpenClawPluginCliRegistrar, opts?: { commands?: string[] }) => void; registerService: (service: OpenClawPluginService) => void; registerProvider: (provider: ProviderPlugin) => void; + registerInteractiveHandler: (registration: PluginInteractiveHandlerRegistration) => void; /** * Register a custom command that bypasses the LLM agent. * Plugin commands are processed before built-in commands and before agent invocation. @@ -431,6 +481,7 @@ export type PluginHookName = | "before_compaction" | "after_compaction" | "before_reset" + | "inbound_claim" | "message_received" | "message_sending" | "message_sent" @@ -457,6 +508,7 @@ export const PLUGIN_HOOK_NAMES = [ "before_compaction", "after_compaction", "before_reset", + "inbound_claim", "message_received", "message_sending", "message_sent", @@ -665,6 +717,37 @@ export type PluginHookMessageContext = { conversationId?: string; }; +export type PluginHookInboundClaimContext = PluginHookMessageContext & { + parentConversationId?: string; + senderId?: string; + messageId?: string; +}; + +export type PluginHookInboundClaimEvent = { + content: string; + body?: string; + bodyForAgent?: string; + transcript?: string; + timestamp?: number; + channel: string; + accountId?: string; + conversationId?: string; + parentConversationId?: string; + senderId?: string; + senderName?: string; + senderUsername?: string; + threadId?: string | number; + messageId?: string; + isGroup: boolean; + commandAuthorized?: boolean; + wasMentioned?: boolean; + metadata?: Record; +}; + +export type PluginHookInboundClaimResult = { + handled: boolean; +}; + // message_received hook export type PluginHookMessageReceivedEvent = { from: string; @@ -921,6 +1004,10 @@ export type PluginHookHandlerMap = { event: PluginHookBeforeResetEvent, ctx: PluginHookAgentContext, ) => Promise | void; + inbound_claim: ( + event: PluginHookInboundClaimEvent, + ctx: PluginHookInboundClaimContext, + ) => Promise | PluginHookInboundClaimResult | void; message_received: ( event: PluginHookMessageReceivedEvent, ctx: PluginHookMessageContext, diff --git a/src/plugins/wired-hooks-inbound-claim.test.ts b/src/plugins/wired-hooks-inbound-claim.test.ts new file mode 100644 index 00000000000..2c6f95fff47 --- /dev/null +++ b/src/plugins/wired-hooks-inbound-claim.test.ts @@ -0,0 +1,69 @@ +import { describe, expect, it, vi } from "vitest"; +import { createHookRunner } from "./hooks.js"; +import { createMockPluginRegistry } from "./hooks.test-helpers.js"; + +describe("inbound_claim hook runner", () => { + it("stops at the first handler that claims the event", async () => { + const first = vi.fn().mockResolvedValue({ handled: true }); + const second = vi.fn().mockResolvedValue({ handled: true }); + const registry = createMockPluginRegistry([ + { hookName: "inbound_claim", handler: first }, + { hookName: "inbound_claim", handler: second }, + ]); + const runner = createHookRunner(registry); + + const result = await runner.runInboundClaim( + { + content: "who are you", + channel: "telegram", + accountId: "default", + conversationId: "123:topic:77", + isGroup: true, + }, + { + channelId: "telegram", + accountId: "default", + conversationId: "123:topic:77", + }, + ); + + expect(result).toEqual({ handled: true }); + expect(first).toHaveBeenCalledTimes(1); + expect(second).not.toHaveBeenCalled(); + }); + + it("continues to the next handler when a higher-priority handler throws", async () => { + const logger = { + warn: vi.fn(), + error: vi.fn(), + }; + const failing = vi.fn().mockRejectedValue(new Error("boom")); + const succeeding = vi.fn().mockResolvedValue({ handled: true }); + const registry = createMockPluginRegistry([ + { hookName: "inbound_claim", handler: failing }, + { hookName: "inbound_claim", handler: succeeding }, + ]); + const runner = createHookRunner(registry, { logger }); + + const result = await runner.runInboundClaim( + { + content: "hi", + channel: "telegram", + accountId: "default", + conversationId: "123", + isGroup: false, + }, + { + channelId: "telegram", + accountId: "default", + conversationId: "123", + }, + ); + + expect(result).toEqual({ handled: true }); + expect(logger.error).toHaveBeenCalledWith( + expect.stringContaining("inbound_claim handler from test-plugin failed: Error: boom"), + ); + expect(succeeding).toHaveBeenCalledTimes(1); + }); +});