Plugins: add inbound claim and Telegram interaction seams

This commit is contained in:
huntharo
2026-03-12 08:08:30 -04:00
committed by Vincent Koc
parent 14137bef22
commit 9c79c2c2a7
23 changed files with 1109 additions and 0 deletions

View File

@@ -43,6 +43,7 @@ function fakeApi(overrides: Partial<OpenClawPluginApi> = {}): OpenClawPluginApi
registerCli() {},
registerService() {},
registerProvider() {},
registerInteractiveHandler() {},
registerHook() {},
registerHttpRoute() {},
registerCommand() {},

View File

@@ -36,6 +36,7 @@ import { readChannelAllowFromStore } from "../../../src/pairing/pairing-store.js
import { resolveAgentRoute } from "../../../src/routing/resolve-route.js";
import { resolveThreadSessionKeys } from "../../../src/routing/session-key.js";
import { applyModelOverrideToSessionEntry } from "../../../src/sessions/model-overrides.js";
import { dispatchPluginInteractiveHandler } from "../../../src/plugins/interactive.js";
import { withTelegramApiErrorLogging } from "./api-logging.js";
import {
isSenderAllowed,
@@ -1121,6 +1122,24 @@ export const registerTelegramHandlers = ({
}
return await editCallbackMessage(messageText, replyMarkup);
};
const editCallbackButtons = async (
buttons: Array<
Array<{ text: string; callback_data: string; style?: "danger" | "success" | "primary" }>
>,
) => {
const keyboard = buildInlineKeyboard(buttons) ?? { inline_keyboard: [] };
const replyMarkup = { reply_markup: keyboard };
const editReplyMarkupFn = (ctx as { editMessageReplyMarkup?: unknown })
.editMessageReplyMarkup;
if (typeof editReplyMarkupFn === "function") {
return await ctx.editMessageReplyMarkup(replyMarkup);
}
return await bot.api.editMessageReplyMarkup(
callbackMessage.chat.id,
callbackMessage.message_id,
replyMarkup,
);
};
const deleteCallbackMessage = async () => {
const deleteFn = (ctx as { deleteMessage?: unknown }).deleteMessage;
if (typeof deleteFn === "function") {
@@ -1201,6 +1220,59 @@ export const registerTelegramHandlers = ({
return;
}
const callbackConversationId =
messageThreadId != null ? `${chatId}:topic:${messageThreadId}` : String(chatId);
const pluginCallback = await dispatchPluginInteractiveHandler({
channel: "telegram",
data,
callbackId: callback.id,
ctx: {
accountId,
callbackId: callback.id,
conversationId: callbackConversationId,
parentConversationId: messageThreadId != null ? String(chatId) : undefined,
senderId: senderId || undefined,
senderUsername: senderUsername || undefined,
threadId: messageThreadId,
isGroup,
isForum,
auth: {
isAuthorizedSender: true,
},
callbackMessage: {
messageId: callbackMessage.message_id,
chatId: String(chatId),
messageText: callbackMessage.text ?? callbackMessage.caption,
},
},
respond: {
reply: async ({ text, buttons }) => {
await replyToCallbackChat(
text,
buttons ? { reply_markup: buildInlineKeyboard(buttons) } : undefined,
);
},
editMessage: async ({ text, buttons }) => {
await editCallbackMessage(
text,
buttons ? { reply_markup: buildInlineKeyboard(buttons) } : undefined,
);
},
editButtons: async ({ buttons }) => {
await editCallbackButtons(buttons);
},
clearButtons: async () => {
await clearCallbackButtons();
},
deleteMessage: async () => {
await deleteCallbackMessage();
},
},
});
if (pluginCallback.handled) {
return;
}
if (isApprovalCallback) {
if (
!isTelegramExecApprovalClientEnabled({ cfg, accountId }) ||

View File

@@ -2,6 +2,10 @@ import { rm } from "node:fs/promises";
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { escapeRegExp, formatEnvelopeTimestamp } from "../../../test/helpers/envelope-timestamp.js";
import { expectInboundContextContract } from "../../../test/helpers/inbound-contract.js";
import {
clearPluginInteractiveHandlers,
registerPluginInteractiveHandler,
} from "../plugins/interactive.js";
import {
answerCallbackQuerySpy,
commandSpy,
@@ -49,6 +53,7 @@ describe("createTelegramBot", () => {
beforeEach(() => {
setMyCommandsSpy.mockClear();
clearPluginInteractiveHandlers();
loadConfig.mockReturnValue({
agents: {
defaults: {
@@ -1359,6 +1364,57 @@ describe("createTelegramBot", () => {
expect(replySpy).not.toHaveBeenCalled();
});
it("routes plugin-owned callback namespaces before synthetic command fallback", async () => {
onSpy.mockClear();
replySpy.mockClear();
editMessageTextSpy.mockClear();
sendMessageSpy.mockClear();
registerPluginInteractiveHandler("codex-plugin", {
channel: "telegram",
namespace: "codex",
handler: async ({ respond, callback }) => {
await respond.editMessage({
text: `Handled ${callback.payload}`,
});
return { handled: true };
},
});
createTelegramBot({
token: "tok",
config: {
channels: {
telegram: {
dmPolicy: "open",
allowFrom: ["*"],
},
},
},
});
const callbackHandler = onSpy.mock.calls.find((call) => call[0] === "callback_query")?.[1] as (
ctx: Record<string, unknown>,
) => Promise<void>;
await callbackHandler({
callbackQuery: {
id: "cbq-codex-1",
data: "codex:resume:thread-1",
from: { id: 9, first_name: "Ada", username: "ada_bot" },
message: {
chat: { id: 1234, type: "private" },
date: 1736380800,
message_id: 11,
text: "Select a thread",
},
},
me: { username: "openclaw_bot" },
getFile: async () => ({ download: async () => new Uint8Array() }),
});
expect(editMessageTextSpy).toHaveBeenCalledWith(1234, 11, "Handled resume:thread-1", undefined);
expect(replySpy).not.toHaveBeenCalled();
});
it("sets command target session key for dm topic commands", async () => {
onSpy.mockClear();
sendMessageSpy.mockClear();

View File

@@ -4,7 +4,10 @@ import type { MockFn } from "../../../src/test-utils/vitest-mock-fn.js";
const { botApi, botCtorSpy } = vi.hoisted(() => ({
botApi: {
deleteMessage: vi.fn(),
editForumTopic: vi.fn(),
editMessageText: vi.fn(),
editMessageReplyMarkup: vi.fn(),
pinChatMessage: vi.fn(),
sendChatAction: vi.fn(),
sendMessage: vi.fn(),
sendPoll: vi.fn(),
@@ -16,6 +19,7 @@ const { botApi, botCtorSpy } = vi.hoisted(() => ({
sendAnimation: vi.fn(),
setMessageReaction: vi.fn(),
sendSticker: vi.fn(),
unpinChatMessage: vi.fn(),
},
botCtorSpy: vi.fn(),
}));

View File

@@ -16,11 +16,14 @@ const {
buildInlineKeyboard,
createForumTopicTelegram,
editMessageTelegram,
pinMessageTelegram,
reactMessageTelegram,
renameForumTopicTelegram,
sendMessageTelegram,
sendTypingTelegram,
sendPollTelegram,
sendStickerTelegram,
unpinMessageTelegram,
} = await importTelegramSendModule();
async function expectChatNotFoundWithChatId(
@@ -215,6 +218,45 @@ describe("sendMessageTelegram", () => {
});
});
it("pins and unpins Telegram messages", async () => {
loadConfig.mockReturnValue({
channels: {
telegram: {
botToken: "tok",
},
},
});
botApi.pinChatMessage.mockResolvedValue(true);
botApi.unpinChatMessage.mockResolvedValue(true);
await pinMessageTelegram("-1001234567890", 101, { accountId: "default" });
await unpinMessageTelegram("-1001234567890", 101, { accountId: "default" });
expect(botApi.pinChatMessage).toHaveBeenCalledWith("-1001234567890", 101, {
disable_notification: true,
});
expect(botApi.unpinChatMessage).toHaveBeenCalledWith("-1001234567890", 101);
});
it("renames a Telegram forum topic", async () => {
loadConfig.mockReturnValue({
channels: {
telegram: {
botToken: "tok",
},
},
});
botApi.editForumTopic.mockResolvedValue(true);
await renameForumTopicTelegram("-1001234567890", 271, "Codex Thread", {
accountId: "default",
});
expect(botApi.editForumTopic).toHaveBeenCalledWith("-1001234567890", 271, {
name: "Codex Thread",
});
});
it("applies timeoutSeconds config precedence", async () => {
const cases = [
{

View File

@@ -1067,6 +1067,109 @@ export async function deleteMessageTelegram(
return { ok: true };
}
export async function pinMessageTelegram(
chatIdInput: string | number,
messageIdInput: string | number,
opts: TelegramDeleteOpts = {},
): Promise<{ ok: true; messageId: string; chatId: string }> {
const { cfg, account, api } = resolveTelegramApiContext(opts);
const rawTarget = String(chatIdInput);
const chatId = await resolveAndPersistChatId({
cfg,
api,
lookupTarget: rawTarget,
persistTarget: rawTarget,
verbose: opts.verbose,
});
const messageId = normalizeMessageId(messageIdInput);
const requestWithDiag = createTelegramRequestWithDiag({
cfg,
account,
retry: opts.retry,
verbose: opts.verbose,
});
await requestWithDiag(
() => api.pinChatMessage(chatId, messageId, { disable_notification: true }),
"pinChatMessage",
);
logVerbose(`[telegram] Pinned message ${messageId} in chat ${chatId}`);
return { ok: true, messageId: String(messageId), chatId };
}
export async function unpinMessageTelegram(
chatIdInput: string | number,
messageIdInput?: string | number,
opts: TelegramDeleteOpts = {},
): Promise<{ ok: true; chatId: string; messageId?: string }> {
const { cfg, account, api } = resolveTelegramApiContext(opts);
const rawTarget = String(chatIdInput);
const chatId = await resolveAndPersistChatId({
cfg,
api,
lookupTarget: rawTarget,
persistTarget: rawTarget,
verbose: opts.verbose,
});
const messageId = messageIdInput === undefined ? undefined : normalizeMessageId(messageIdInput);
const requestWithDiag = createTelegramRequestWithDiag({
cfg,
account,
retry: opts.retry,
verbose: opts.verbose,
});
await requestWithDiag(() => api.unpinChatMessage(chatId, messageId), "unpinChatMessage");
logVerbose(
`[telegram] Unpinned ${messageId != null ? `message ${messageId}` : "active message"} in chat ${chatId}`,
);
return {
ok: true,
chatId,
...(messageId != null ? { messageId: String(messageId) } : {}),
};
}
export async function renameForumTopicTelegram(
chatIdInput: string | number,
messageThreadIdInput: string | number,
name: string,
opts: TelegramDeleteOpts = {},
): Promise<{ ok: true; chatId: string; messageThreadId: number; name: string }> {
const trimmedName = name.trim();
if (!trimmedName) {
throw new Error("Telegram forum topic name is required");
}
if (trimmedName.length > 128) {
throw new Error("Telegram forum topic name must be 128 characters or fewer");
}
const { cfg, account, api } = resolveTelegramApiContext(opts);
const rawTarget = String(chatIdInput);
const chatId = await resolveAndPersistChatId({
cfg,
api,
lookupTarget: rawTarget,
persistTarget: rawTarget,
verbose: opts.verbose,
});
const messageThreadId = normalizeMessageId(messageThreadIdInput);
const requestWithDiag = createTelegramRequestWithDiag({
cfg,
account,
retry: opts.retry,
verbose: opts.verbose,
});
await requestWithDiag(
() => api.editForumTopic(chatId, messageThreadId, { name: trimmedName }),
"editForumTopic",
);
logVerbose(`[telegram] Renamed forum topic ${messageThreadId} in chat ${chatId}`);
return {
ok: true,
chatId,
messageThreadId,
name: trimmedName,
};
}
type TelegramEditOpts = {
token?: string;
accountId?: string;

View File

@@ -69,6 +69,19 @@ export function createPluginRuntimeMock(overrides: DeepPartial<PluginRuntime> =
registerMemoryCli: vi.fn() as unknown as PluginRuntime["tools"]["registerMemoryCli"],
},
channel: {
bindings: {
bind: vi.fn(),
getCapabilities: vi.fn(() => ({
adapterAvailable: true,
bindSupported: true,
unbindSupported: true,
placements: ["current", "child"] as Array<"current" | "child">,
})),
listBySession: vi.fn(() => []),
resolveByConversation: vi.fn(() => null),
touch: vi.fn(),
unbind: vi.fn(() => Promise.resolve([])),
},
text: {
chunkByNewline: vi.fn((text: string) => (text ? [text] : [])),
chunkMarkdownText: vi.fn((text: string) => [text]),

View File

@@ -25,6 +25,7 @@ const diagnosticMocks = vi.hoisted(() => ({
const hookMocks = vi.hoisted(() => ({
runner: {
hasHooks: vi.fn(() => false),
runInboundClaim: vi.fn(async () => undefined),
runMessageReceived: vi.fn(async () => {}),
},
}));
@@ -239,6 +240,8 @@ describe("dispatchReplyFromConfig", () => {
diagnosticMocks.logSessionStateChange.mockClear();
hookMocks.runner.hasHooks.mockClear();
hookMocks.runner.hasHooks.mockReturnValue(false);
hookMocks.runner.runInboundClaim.mockClear();
hookMocks.runner.runInboundClaim.mockResolvedValue(undefined);
hookMocks.runner.runMessageReceived.mockClear();
internalHookMocks.createInternalHookEvent.mockClear();
internalHookMocks.createInternalHookEvent.mockImplementation(createInternalHookEventPayload);
@@ -1861,6 +1864,60 @@ describe("dispatchReplyFromConfig", () => {
);
});
it("lets a plugin claim inbound traffic before core commands and agent dispatch", async () => {
setNoAbort();
hookMocks.runner.hasHooks.mockImplementation(
((hookName?: string) => hookName === "inbound_claim") as () => boolean,
);
hookMocks.runner.runInboundClaim.mockResolvedValue({ handled: true } as never);
const cfg = emptyConfig;
const dispatcher = createDispatcher();
const ctx = buildTestCtx({
Provider: "telegram",
Surface: "telegram",
OriginatingChannel: "telegram",
OriginatingTo: "telegram:-10099",
To: "telegram:-10099",
AccountId: "default",
SenderId: "user-9",
SenderUsername: "ada",
MessageThreadId: 77,
CommandAuthorized: true,
WasMentioned: true,
CommandBody: "who are you",
RawBody: "who are you",
Body: "who are you",
MessageSid: "msg-claim-1",
});
const replyResolver = vi.fn(async () => ({ text: "should not run" }) satisfies ReplyPayload);
const result = await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver });
expect(result).toEqual({ queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } });
expect(hookMocks.runner.runInboundClaim).toHaveBeenCalledWith(
expect.objectContaining({
content: "who are you",
channel: "telegram",
accountId: "default",
conversationId: "-10099:topic:77",
parentConversationId: "-10099",
senderId: "user-9",
commandAuthorized: true,
wasMentioned: true,
}),
expect.objectContaining({
channelId: "telegram",
accountId: "default",
conversationId: "-10099:topic:77",
parentConversationId: "-10099",
senderId: "user-9",
messageId: "msg-claim-1",
}),
);
expect(replyResolver).not.toHaveBeenCalled();
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
});
it("emits internal message:received hook when a session key is available", async () => {
setNoAbort();
const cfg = emptyConfig;

View File

@@ -13,6 +13,8 @@ import { fireAndForgetHook } from "../../hooks/fire-and-forget.js";
import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js";
import {
deriveInboundMessageHookContext,
toPluginInboundClaimContext,
toPluginInboundClaimEvent,
toInternalMessageReceivedContext,
toPluginMessageContext,
toPluginMessageReceivedEvent,
@@ -191,6 +193,22 @@ export async function dispatchReplyFromConfig(params: {
const hookContext = deriveInboundMessageHookContext(ctx, { messageId: messageIdForHook });
const { isGroup, groupId } = hookContext;
if (hookRunner?.hasHooks("inbound_claim")) {
const inboundClaim = await hookRunner.runInboundClaim(
toPluginInboundClaimEvent(hookContext, {
commandAuthorized:
typeof ctx.CommandAuthorized === "boolean" ? ctx.CommandAuthorized : undefined,
wasMentioned: typeof ctx.WasMentioned === "boolean" ? ctx.WasMentioned : undefined,
}),
toPluginInboundClaimContext(hookContext),
);
if (inboundClaim?.handled) {
markIdle("plugin_claim");
recordProcessed("completed", { reason: "plugin-claimed" });
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
}
}
// Trigger plugin hooks (fire-and-forget)
if (hookRunner?.hasHooks("message_received")) {
fireAndForgetHook(

View File

@@ -1,6 +1,8 @@
import type { FinalizedMsgContext } from "../auto-reply/templating.js";
import type { OpenClawConfig } from "../config/config.js";
import type {
PluginHookInboundClaimContext,
PluginHookInboundClaimEvent,
PluginHookMessageContext,
PluginHookMessageReceivedEvent,
PluginHookMessageSentEvent,
@@ -147,6 +149,103 @@ export function toPluginMessageContext(
};
}
function stripChannelPrefix(value: string | undefined, channelId: string): string | undefined {
if (!value) {
return undefined;
}
const prefix = `${channelId}:`;
return value.startsWith(prefix) ? value.slice(prefix.length) : value;
}
function deriveParentConversationId(
canonical: CanonicalInboundMessageHookContext,
): string | undefined {
if (canonical.channelId !== "telegram") {
return undefined;
}
if (typeof canonical.threadId !== "number" && typeof canonical.threadId !== "string") {
return undefined;
}
return stripChannelPrefix(
canonical.to ?? canonical.originatingTo ?? canonical.conversationId,
"telegram",
);
}
function deriveConversationId(canonical: CanonicalInboundMessageHookContext): string | undefined {
const baseConversationId = stripChannelPrefix(
canonical.to ?? canonical.originatingTo ?? canonical.conversationId,
canonical.channelId,
);
if (canonical.channelId === "telegram" && baseConversationId) {
const threadId =
typeof canonical.threadId === "number" || typeof canonical.threadId === "string"
? String(canonical.threadId).trim()
: "";
if (threadId) {
return `${baseConversationId}:topic:${threadId}`;
}
}
return baseConversationId;
}
export function toPluginInboundClaimContext(
canonical: CanonicalInboundMessageHookContext,
): PluginHookInboundClaimContext {
const conversationId = deriveConversationId(canonical);
return {
channelId: canonical.channelId,
accountId: canonical.accountId,
conversationId,
parentConversationId: deriveParentConversationId(canonical),
senderId: canonical.senderId,
messageId: canonical.messageId,
};
}
export function toPluginInboundClaimEvent(
canonical: CanonicalInboundMessageHookContext,
extras?: {
commandAuthorized?: boolean;
wasMentioned?: boolean;
},
): PluginHookInboundClaimEvent {
const context = toPluginInboundClaimContext(canonical);
return {
content: canonical.content,
body: canonical.body,
bodyForAgent: canonical.bodyForAgent,
transcript: canonical.transcript,
timestamp: canonical.timestamp,
channel: canonical.channelId,
accountId: canonical.accountId,
conversationId: context.conversationId,
parentConversationId: context.parentConversationId,
senderId: canonical.senderId,
senderName: canonical.senderName,
senderUsername: canonical.senderUsername,
threadId: canonical.threadId,
messageId: canonical.messageId,
isGroup: canonical.isGroup,
commandAuthorized: extras?.commandAuthorized,
wasMentioned: extras?.wasMentioned,
metadata: {
from: canonical.from,
to: canonical.to,
provider: canonical.provider,
surface: canonical.surface,
originatingChannel: canonical.originatingChannel,
originatingTo: canonical.originatingTo,
senderE164: canonical.senderE164,
mediaPath: canonical.mediaPath,
mediaType: canonical.mediaType,
guildId: canonical.guildId,
channelName: canonical.channelName,
groupId: canonical.groupId,
},
};
}
export function toPluginMessageReceivedEvent(
canonical: CanonicalInboundMessageHookContext,
): PluginHookMessageReceivedEvent {

View File

@@ -100,10 +100,23 @@ export type {
OpenClawPluginApi,
OpenClawPluginService,
OpenClawPluginServiceContext,
PluginHookInboundClaimContext,
PluginHookInboundClaimEvent,
PluginHookInboundClaimResult,
PluginInteractiveHandlerRegistration,
PluginInteractiveTelegramHandlerContext,
PluginLogger,
ProviderAuthContext,
ProviderAuthResult,
} from "../plugins/types.js";
export type {
ConversationRef,
SessionBindingBindInput,
SessionBindingCapabilities,
SessionBindingRecord,
SessionBindingService,
SessionBindingUnbindInput,
} from "../infra/outbound/session-binding-service.js";
export type {
GatewayRequestHandler,
GatewayRequestHandlerOptions,

View File

@@ -19,6 +19,9 @@ import type {
PluginHookBeforePromptBuildEvent,
PluginHookBeforePromptBuildResult,
PluginHookBeforeCompactionEvent,
PluginHookInboundClaimContext,
PluginHookInboundClaimEvent,
PluginHookInboundClaimResult,
PluginHookLlmInputEvent,
PluginHookLlmOutputEvent,
PluginHookBeforeResetEvent,
@@ -66,6 +69,9 @@ export type {
PluginHookAgentEndEvent,
PluginHookBeforeCompactionEvent,
PluginHookBeforeResetEvent,
PluginHookInboundClaimContext,
PluginHookInboundClaimEvent,
PluginHookInboundClaimResult,
PluginHookAfterCompactionEvent,
PluginHookMessageContext,
PluginHookMessageReceivedEvent,
@@ -263,6 +269,37 @@ export function createHookRunner(registry: PluginRegistry, options: HookRunnerOp
return result;
}
/**
* Run a sequential claim hook where the first `{ handled: true }` result wins.
*/
async function runClaimingHook<K extends PluginHookName, TResult extends { handled: boolean }>(
hookName: K,
event: Parameters<NonNullable<PluginHookRegistration<K>["handler"]>>[0],
ctx: Parameters<NonNullable<PluginHookRegistration<K>["handler"]>>[1],
): Promise<TResult | undefined> {
const hooks = getHooksForName(registry, hookName);
if (hooks.length === 0) {
return undefined;
}
logger?.debug?.(`[hooks] running ${hookName} (${hooks.length} handlers, first-claim wins)`);
for (const hook of hooks) {
try {
const handlerResult = await (
hook.handler as (event: unknown, ctx: unknown) => Promise<TResult | void>
)(event, ctx);
if (handlerResult?.handled) {
return handlerResult;
}
} catch (err) {
handleHookError({ hookName, pluginId: hook.pluginId, error: err });
}
}
return undefined;
}
// =========================================================================
// Agent Hooks
// =========================================================================
@@ -384,6 +421,21 @@ export function createHookRunner(registry: PluginRegistry, options: HookRunnerOp
// Message Hooks
// =========================================================================
/**
* Run inbound_claim hook.
* Allows plugins to claim an inbound event before commands/agent dispatch.
*/
async function runInboundClaim(
event: PluginHookInboundClaimEvent,
ctx: PluginHookInboundClaimContext,
): Promise<PluginHookInboundClaimResult | undefined> {
return runClaimingHook<"inbound_claim", PluginHookInboundClaimResult>(
"inbound_claim",
event,
ctx,
);
}
/**
* Run message_received hook.
* Runs in parallel (fire-and-forget).
@@ -734,6 +786,7 @@ export function createHookRunner(registry: PluginRegistry, options: HookRunnerOp
runAfterCompaction,
runBeforeReset,
// Message hooks
runInboundClaim,
runMessageReceived,
runMessageSending,
runMessageSent,

View File

@@ -0,0 +1,91 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import {
clearPluginInteractiveHandlers,
dispatchPluginInteractiveHandler,
registerPluginInteractiveHandler,
} from "./interactive.js";
describe("plugin interactive handlers", () => {
beforeEach(() => {
clearPluginInteractiveHandlers();
});
it("routes Telegram callbacks by namespace and dedupes callback ids", async () => {
const handler = vi.fn(async () => ({ handled: true }));
expect(
registerPluginInteractiveHandler("codex-plugin", {
channel: "telegram",
namespace: "codex",
handler,
}),
).toEqual({ ok: true });
const baseParams = {
channel: "telegram" as const,
data: "codex:resume:thread-1",
callbackId: "cb-1",
ctx: {
accountId: "default",
callbackId: "cb-1",
conversationId: "-10099:topic:77",
parentConversationId: "-10099",
senderId: "user-1",
senderUsername: "ada",
threadId: 77,
isGroup: true,
isForum: true,
auth: { isAuthorizedSender: true },
callbackMessage: {
messageId: 55,
chatId: "-10099",
messageText: "Pick a thread",
},
},
respond: {
reply: vi.fn(async () => {}),
editMessage: vi.fn(async () => {}),
editButtons: vi.fn(async () => {}),
clearButtons: vi.fn(async () => {}),
deleteMessage: vi.fn(async () => {}),
},
};
const first = await dispatchPluginInteractiveHandler(baseParams);
const duplicate = await dispatchPluginInteractiveHandler(baseParams);
expect(first).toEqual({ matched: true, handled: true, duplicate: false });
expect(duplicate).toEqual({ matched: true, handled: true, duplicate: true });
expect(handler).toHaveBeenCalledTimes(1);
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({
channel: "telegram",
conversationId: "-10099:topic:77",
callback: expect.objectContaining({
namespace: "codex",
payload: "resume:thread-1",
chatId: "-10099",
messageId: 55,
}),
}),
);
});
it("rejects duplicate namespace registrations", () => {
const first = registerPluginInteractiveHandler("plugin-a", {
channel: "telegram",
namespace: "codex",
handler: async () => ({ handled: true }),
});
const second = registerPluginInteractiveHandler("plugin-b", {
channel: "telegram",
namespace: "codex",
handler: async () => ({ handled: true }),
});
expect(first).toEqual({ ok: true });
expect(second).toEqual({
ok: false,
error: 'Interactive handler namespace "codex" already registered by plugin "plugin-a"',
});
});
});

156
src/plugins/interactive.ts Normal file
View File

@@ -0,0 +1,156 @@
import { createDedupeCache } from "../infra/dedupe.js";
import type {
PluginInteractiveButtons,
PluginInteractiveHandlerRegistration,
PluginInteractiveTelegramHandlerContext,
} from "./types.js";
type RegisteredInteractiveHandler = PluginInteractiveHandlerRegistration & {
pluginId: string;
};
type InteractiveRegistrationResult = {
ok: boolean;
error?: string;
};
type InteractiveDispatchResult =
| { matched: false; handled: false; duplicate: false }
| { matched: true; handled: boolean; duplicate: boolean };
const interactiveHandlers = new Map<string, RegisteredInteractiveHandler>();
const callbackDedupe = createDedupeCache({
ttlMs: 5 * 60_000,
maxSize: 4096,
});
function toRegistryKey(channel: string, namespace: string): string {
return `${channel.trim().toLowerCase()}:${namespace.trim()}`;
}
function normalizeNamespace(namespace: string): string {
return namespace.trim();
}
function validateNamespace(namespace: string): string | null {
if (!namespace.trim()) {
return "Interactive handler namespace cannot be empty";
}
if (!/^[A-Za-z0-9._-]+$/.test(namespace.trim())) {
return "Interactive handler namespace must contain only letters, numbers, dots, underscores, and hyphens";
}
return null;
}
function resolveNamespaceMatch(
channel: string,
data: string,
): { registration: RegisteredInteractiveHandler; namespace: string; payload: string } | null {
const trimmedData = data.trim();
if (!trimmedData) {
return null;
}
const separatorIndex = trimmedData.indexOf(":");
const namespace =
separatorIndex >= 0 ? trimmedData.slice(0, separatorIndex) : normalizeNamespace(trimmedData);
const registration = interactiveHandlers.get(toRegistryKey(channel, namespace));
if (!registration) {
return null;
}
return {
registration,
namespace,
payload: separatorIndex >= 0 ? trimmedData.slice(separatorIndex + 1) : "",
};
}
export function registerPluginInteractiveHandler(
pluginId: string,
registration: PluginInteractiveHandlerRegistration,
): InteractiveRegistrationResult {
const namespace = normalizeNamespace(registration.namespace);
const validationError = validateNamespace(namespace);
if (validationError) {
return { ok: false, error: validationError };
}
const key = toRegistryKey(registration.channel, namespace);
const existing = interactiveHandlers.get(key);
if (existing) {
return {
ok: false,
error: `Interactive handler namespace "${namespace}" already registered by plugin "${existing.pluginId}"`,
};
}
interactiveHandlers.set(key, {
...registration,
namespace,
channel: registration.channel,
pluginId,
});
return { ok: true };
}
export function clearPluginInteractiveHandlers(): void {
interactiveHandlers.clear();
callbackDedupe.clear();
}
export function clearPluginInteractiveHandlersForPlugin(pluginId: string): void {
for (const [key, value] of interactiveHandlers.entries()) {
if (value.pluginId === pluginId) {
interactiveHandlers.delete(key);
}
}
}
export async function dispatchPluginInteractiveHandler(params: {
channel: "telegram";
data: string;
callbackId: string;
ctx: Omit<PluginInteractiveTelegramHandlerContext, "callback" | "respond" | "channel"> & {
callbackMessage: {
messageId: number;
chatId: string;
messageText?: string;
};
};
respond: {
reply: (params: { text: string; buttons?: PluginInteractiveButtons }) => Promise<void>;
editMessage: (params: { text: string; buttons?: PluginInteractiveButtons }) => Promise<void>;
editButtons: (params: { buttons: PluginInteractiveButtons }) => Promise<void>;
clearButtons: () => Promise<void>;
deleteMessage: () => Promise<void>;
};
}): Promise<InteractiveDispatchResult> {
const match = resolveNamespaceMatch(params.channel, params.data);
if (!match) {
return { matched: false, handled: false, duplicate: false };
}
if (callbackDedupe.check(params.callbackId)) {
return { matched: true, handled: true, duplicate: true };
}
const { callbackMessage, ...handlerContext } = params.ctx;
const result = await match.registration.handler({
...handlerContext,
channel: "telegram",
callback: {
data: params.data,
namespace: match.namespace,
payload: match.payload,
messageId: callbackMessage.messageId,
chatId: callbackMessage.chatId,
messageText: callbackMessage.messageText,
},
respond: params.respond,
});
return {
matched: true,
handled: result?.handled ?? true,
duplicate: false,
};
}

View File

@@ -19,6 +19,7 @@ import {
} from "./config-state.js";
import { discoverOpenClawPlugins } from "./discovery.js";
import { initializeGlobalHookRunner } from "./hook-runner-global.js";
import { clearPluginInteractiveHandlers } from "./interactive.js";
import { loadPluginManifestRegistry } from "./manifest-registry.js";
import { isPathInside, safeStatSync } from "./path-safety.js";
import { createPluginRegistry, type PluginRecord, type PluginRegistry } from "./registry.js";
@@ -653,6 +654,7 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi
// Clear previously registered plugin commands before reloading
clearPluginCommands();
clearPluginInteractiveHandlers();
// Lazily initialize the runtime so startup paths that discover/skip plugins do
// not eagerly load every channel runtime dependency.

View File

@@ -14,6 +14,7 @@ import { registerPluginCommand } from "./commands.js";
import { normalizePluginHttpPath } from "./http-path.js";
import { findOverlappingPluginHttpRoute } from "./http-route-overlap.js";
import { normalizeRegisteredProvider } from "./provider-validation.js";
import { registerPluginInteractiveHandler } from "./interactive.js";
import type { PluginRuntime } from "./runtime/types.js";
import { defaultSlotIdForKey } from "./slots.js";
import {
@@ -653,6 +654,17 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
registerGatewayMethod: (method, handler) => registerGatewayMethod(record, method, handler),
registerCli: (registrar, opts) => registerCli(record, registrar, opts),
registerService: (service) => registerService(record, service),
registerInteractiveHandler: (registration) => {
const result = registerPluginInteractiveHandler(record.id, registration);
if (!result.ok) {
pushDiagnostic({
level: "warn",
pluginId: record.id,
source: record.source,
message: result.error ?? "interactive handler registration failed",
});
}
},
registerCommand: (command) => registerCommand(record, command),
registerContextEngine: (id, factory) => {
if (id === defaultSlotIdForKey("contextEngine")) {

View File

@@ -1,6 +1,7 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { onAgentEvent } from "../../infra/agent-events.js";
import { requestHeartbeatNow } from "../../infra/heartbeat-wake.js";
import { getSessionBindingService } from "../../infra/outbound/session-binding-service.js";
import { onSessionTranscriptUpdate } from "../../sessions/transcript-events.js";
const runCommandWithTimeoutMock = vi.hoisted(() => vi.fn());
@@ -49,6 +50,11 @@ describe("plugin runtime command execution", () => {
expect(runtime.events.onSessionTranscriptUpdate).toBe(onSessionTranscriptUpdate);
});
it("exposes runtime.channel.bindings", () => {
const runtime = createPluginRuntime();
expect(runtime.channel.bindings).toBe(getSessionBindingService());
});
it("exposes runtime.system.requestHeartbeatNow", () => {
const runtime = createPluginRuntime();
expect(runtime.system.requestHeartbeatNow).toBe(requestHeartbeatNow);

View File

@@ -85,6 +85,7 @@ import {
updateLastRoute,
} from "../../config/sessions.js";
import { getChannelActivity, recordChannelActivity } from "../../infra/channel-activity.js";
import { getSessionBindingService } from "../../infra/outbound/session-binding-service.js";
import {
listLineAccountIds,
normalizeAccountId as normalizeLineAccountId,
@@ -118,6 +119,7 @@ import type { PluginRuntime } from "./types.js";
export function createRuntimeChannel(): PluginRuntime["channel"] {
return {
bindings: getSessionBindingService(),
text: {
chunkByNewline,
chunkMarkdownText,
@@ -230,6 +232,33 @@ export function createRuntimeChannel(): PluginRuntime["channel"] {
sendPollTelegram,
monitorTelegramProvider,
messageActions: telegramMessageActions,
typing: {
pulse: sendTypingTelegram,
start: async ({ to, accountId, cfg, intervalMs, messageThreadId }) =>
await createTelegramTypingLease({
to,
accountId,
cfg,
intervalMs,
messageThreadId,
pulse: async ({ to, accountId, cfg, messageThreadId }) =>
await sendTypingTelegram(to, {
accountId,
cfg,
messageThreadId,
}),
}),
},
conversationActions: {
editMessage: editMessageTelegram,
editReplyMarkup: editMessageReplyMarkupTelegram,
clearReplyMarkup: async (chatIdInput, messageIdInput, opts = {}) =>
await editMessageReplyMarkupTelegram(chatIdInput, messageIdInput, [], opts),
deleteMessage: deleteMessageTelegram,
renameTopic: renameForumTopicTelegram,
pinMessage: pinMessageTelegram,
unpinMessage: unpinMessageTelegram,
},
},
signal: {
probeSignal,

View File

@@ -0,0 +1,38 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { createTelegramTypingLease } from "./runtime-telegram-typing.js";
describe("createTelegramTypingLease", () => {
afterEach(() => {
vi.useRealTimers();
});
it("pulses immediately and keeps leases independent", async () => {
vi.useFakeTimers();
const pulse = vi.fn(async () => undefined);
const leaseA = await createTelegramTypingLease({
to: "telegram:123",
intervalMs: 2_000,
pulse,
});
const leaseB = await createTelegramTypingLease({
to: "telegram:123",
intervalMs: 2_000,
pulse,
});
expect(pulse).toHaveBeenCalledTimes(2);
await vi.advanceTimersByTimeAsync(2_000);
expect(pulse).toHaveBeenCalledTimes(4);
leaseA.stop();
await vi.advanceTimersByTimeAsync(2_000);
expect(pulse).toHaveBeenCalledTimes(5);
await leaseB.refresh();
expect(pulse).toHaveBeenCalledTimes(6);
leaseB.stop();
});
});

View File

@@ -0,0 +1,52 @@
import type { OpenClawConfig } from "../../config/config.js";
export type CreateTelegramTypingLeaseParams = {
to: string;
accountId?: string;
cfg?: OpenClawConfig;
intervalMs?: number;
messageThreadId?: number;
pulse: (params: {
to: string;
accountId?: string;
cfg?: OpenClawConfig;
messageThreadId?: number;
}) => Promise<unknown>;
};
export async function createTelegramTypingLease(params: CreateTelegramTypingLeaseParams): Promise<{
refresh: () => Promise<void>;
stop: () => void;
}> {
const intervalMs = Math.max(1000, Math.floor(params.intervalMs ?? 4_000));
let stopped = false;
const refresh = async () => {
if (stopped) {
return;
}
await params.pulse({
to: params.to,
accountId: params.accountId,
cfg: params.cfg,
messageThreadId: params.messageThreadId,
});
};
await refresh();
const timer = setInterval(() => {
void refresh();
}, intervalMs);
return {
refresh,
stop: () => {
if (stopped) {
return;
}
stopped = true;
clearInterval(timer);
},
};
}

View File

@@ -2,6 +2,8 @@ type ReadChannelAllowFromStore =
typeof import("../../pairing/pairing-store.js").readChannelAllowFromStore;
type UpsertChannelPairingRequest =
typeof import("../../pairing/pairing-store.js").upsertChannelPairingRequest;
type SessionBindingService =
typeof import("../../infra/outbound/session-binding-service.js").getSessionBindingService;
type ReadChannelAllowFromStoreForAccount = (params: {
channel: Parameters<ReadChannelAllowFromStore>[0];
@@ -14,6 +16,7 @@ type UpsertChannelPairingRequestForAccount = (
) => ReturnType<UpsertChannelPairingRequest>;
export type PluginRuntimeChannel = {
bindings: ReturnType<SessionBindingService>;
text: {
chunkByNewline: typeof import("../../auto-reply/chunk.js").chunkByNewline;
chunkMarkdownText: typeof import("../../auto-reply/chunk.js").chunkMarkdownText;
@@ -117,6 +120,39 @@ export type PluginRuntimeChannel = {
sendPollTelegram: typeof import("../../../extensions/telegram/src/send.js").sendPollTelegram;
monitorTelegramProvider: typeof import("../../../extensions/telegram/src/monitor.js").monitorTelegramProvider;
messageActions: typeof import("../../channels/plugins/actions/telegram.js").telegramMessageActions;
typing: {
pulse: typeof import("../../telegram/send.js").sendTypingTelegram;
start: (params: {
to: string;
accountId?: string;
cfg?: ReturnType<typeof import("../../config/config.js").loadConfig>;
intervalMs?: number;
messageThreadId?: number;
}) => Promise<{
refresh: () => Promise<void>;
stop: () => void;
}>;
};
conversationActions: {
editMessage: typeof import("../../telegram/send.js").editMessageTelegram;
editReplyMarkup: typeof import("../../telegram/send.js").editMessageReplyMarkupTelegram;
clearReplyMarkup: (
chatIdInput: string | number,
messageIdInput: string | number,
opts?: {
token?: string;
accountId?: string;
verbose?: boolean;
api?: Partial<import("grammy").Bot["api"]>;
retry?: import("../../infra/retry.js").RetryConfig;
cfg?: ReturnType<typeof import("../../config/config.js").loadConfig>;
},
) => Promise<{ ok: true; messageId: string; chatId: string }>;
deleteMessage: typeof import("../../telegram/send.js").deleteMessageTelegram;
renameTopic: typeof import("../../telegram/send.js").renameForumTopicTelegram;
pinMessage: typeof import("../../telegram/send.js").pinMessageTelegram;
unpinMessage: typeof import("../../telegram/send.js").unpinMessageTelegram;
};
};
signal: {
probeSignal: typeof import("../../../extensions/signal/src/probe.js").probeSignal;

View File

@@ -305,6 +305,55 @@ export type OpenClawPluginCommandDefinition = {
handler: PluginCommandHandler;
};
export type PluginInteractiveChannel = "telegram";
export type PluginInteractiveButtons = Array<
Array<{ text: string; callback_data: string; style?: "danger" | "success" | "primary" }>
>;
export type PluginInteractiveTelegramHandlerResult = {
handled?: boolean;
} | void;
export type PluginInteractiveTelegramHandlerContext = {
channel: "telegram";
accountId: string;
callbackId: string;
conversationId: string;
parentConversationId?: string;
senderId?: string;
senderUsername?: string;
threadId?: number;
isGroup: boolean;
isForum: boolean;
auth: {
isAuthorizedSender: boolean;
};
callback: {
data: string;
namespace: string;
payload: string;
messageId: number;
chatId: string;
messageText?: string;
};
respond: {
reply: (params: { text: string; buttons?: PluginInteractiveButtons }) => Promise<void>;
editMessage: (params: { text: string; buttons?: PluginInteractiveButtons }) => Promise<void>;
editButtons: (params: { buttons: PluginInteractiveButtons }) => Promise<void>;
clearButtons: () => Promise<void>;
deleteMessage: () => Promise<void>;
};
};
export type PluginInteractiveHandlerRegistration = {
channel: PluginInteractiveChannel;
namespace: string;
handler: (
ctx: PluginInteractiveTelegramHandlerContext,
) => Promise<PluginInteractiveTelegramHandlerResult> | PluginInteractiveTelegramHandlerResult;
};
export type OpenClawPluginHttpRouteAuth = "gateway" | "plugin";
export type OpenClawPluginHttpRouteMatch = "exact" | "prefix";
@@ -388,6 +437,7 @@ export type OpenClawPluginApi = {
registerCli: (registrar: OpenClawPluginCliRegistrar, opts?: { commands?: string[] }) => void;
registerService: (service: OpenClawPluginService) => void;
registerProvider: (provider: ProviderPlugin) => void;
registerInteractiveHandler: (registration: PluginInteractiveHandlerRegistration) => void;
/**
* Register a custom command that bypasses the LLM agent.
* Plugin commands are processed before built-in commands and before agent invocation.
@@ -431,6 +481,7 @@ export type PluginHookName =
| "before_compaction"
| "after_compaction"
| "before_reset"
| "inbound_claim"
| "message_received"
| "message_sending"
| "message_sent"
@@ -457,6 +508,7 @@ export const PLUGIN_HOOK_NAMES = [
"before_compaction",
"after_compaction",
"before_reset",
"inbound_claim",
"message_received",
"message_sending",
"message_sent",
@@ -665,6 +717,37 @@ export type PluginHookMessageContext = {
conversationId?: string;
};
export type PluginHookInboundClaimContext = PluginHookMessageContext & {
parentConversationId?: string;
senderId?: string;
messageId?: string;
};
export type PluginHookInboundClaimEvent = {
content: string;
body?: string;
bodyForAgent?: string;
transcript?: string;
timestamp?: number;
channel: string;
accountId?: string;
conversationId?: string;
parentConversationId?: string;
senderId?: string;
senderName?: string;
senderUsername?: string;
threadId?: string | number;
messageId?: string;
isGroup: boolean;
commandAuthorized?: boolean;
wasMentioned?: boolean;
metadata?: Record<string, unknown>;
};
export type PluginHookInboundClaimResult = {
handled: boolean;
};
// message_received hook
export type PluginHookMessageReceivedEvent = {
from: string;
@@ -921,6 +1004,10 @@ export type PluginHookHandlerMap = {
event: PluginHookBeforeResetEvent,
ctx: PluginHookAgentContext,
) => Promise<void> | void;
inbound_claim: (
event: PluginHookInboundClaimEvent,
ctx: PluginHookInboundClaimContext,
) => Promise<PluginHookInboundClaimResult | void> | PluginHookInboundClaimResult | void;
message_received: (
event: PluginHookMessageReceivedEvent,
ctx: PluginHookMessageContext,

View File

@@ -0,0 +1,69 @@
import { describe, expect, it, vi } from "vitest";
import { createHookRunner } from "./hooks.js";
import { createMockPluginRegistry } from "./hooks.test-helpers.js";
describe("inbound_claim hook runner", () => {
it("stops at the first handler that claims the event", async () => {
const first = vi.fn().mockResolvedValue({ handled: true });
const second = vi.fn().mockResolvedValue({ handled: true });
const registry = createMockPluginRegistry([
{ hookName: "inbound_claim", handler: first },
{ hookName: "inbound_claim", handler: second },
]);
const runner = createHookRunner(registry);
const result = await runner.runInboundClaim(
{
content: "who are you",
channel: "telegram",
accountId: "default",
conversationId: "123:topic:77",
isGroup: true,
},
{
channelId: "telegram",
accountId: "default",
conversationId: "123:topic:77",
},
);
expect(result).toEqual({ handled: true });
expect(first).toHaveBeenCalledTimes(1);
expect(second).not.toHaveBeenCalled();
});
it("continues to the next handler when a higher-priority handler throws", async () => {
const logger = {
warn: vi.fn(),
error: vi.fn(),
};
const failing = vi.fn().mockRejectedValue(new Error("boom"));
const succeeding = vi.fn().mockResolvedValue({ handled: true });
const registry = createMockPluginRegistry([
{ hookName: "inbound_claim", handler: failing },
{ hookName: "inbound_claim", handler: succeeding },
]);
const runner = createHookRunner(registry, { logger });
const result = await runner.runInboundClaim(
{
content: "hi",
channel: "telegram",
accountId: "default",
conversationId: "123",
isGroup: false,
},
{
channelId: "telegram",
accountId: "default",
conversationId: "123",
},
);
expect(result).toEqual({ handled: true });
expect(logger.error).toHaveBeenCalledWith(
expect.stringContaining("inbound_claim handler from test-plugin failed: Error: boom"),
);
expect(succeeding).toHaveBeenCalledTimes(1);
});
});