mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-22 23:41:07 +00:00
Plugins: broaden plugin surface for Codex App Server (#45318)
* Plugins: add inbound claim and Telegram interaction seams * Plugins: add Discord interaction surface * Chore: fix formatting after plugin rebase * fix(hooks): preserve observers after inbound claim * test(hooks): cover claimed inbound observer delivery * fix(plugins): harden typing lease refreshes * fix(discord): pass real auth to plugin interactions * fix(plugins): remove raw session binding runtime exposure * fix(plugins): tighten interactive callback handling * Plugins: gate conversation binding with approvals * Plugins: migrate legacy plugin binding records * Plugins/phone-control: update test command context * Plugins: migrate legacy binding ids * Plugins: migrate legacy codex session bindings * Discord: fix plugin interaction handling * Discord: support direct plugin conversation binds * Plugins: preserve Discord command bind targets * Tests: fix plugin binding and interactive fallout * Discord: stabilize directory lookup tests * Discord: route bound DMs to plugins * Discord: restore plugin bindings after restart * Telegram: persist detached plugin bindings * Plugins: limit binding APIs to Telegram and Discord * Plugins: harden bound conversation routing * Plugins: fix extension target imports * Plugins: fix Telegram runtime extension imports * Plugins: format rebased binding handlers * Discord: bind group DM interactions by channel --------- Co-authored-by: Vincent Koc <vincentkoc@ieee.org>
This commit is contained in:
@@ -1,6 +1,8 @@
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import {
|
||||
__testing,
|
||||
clearPluginCommands,
|
||||
executePluginCommand,
|
||||
getPluginCommandSpecs,
|
||||
listPluginCommands,
|
||||
registerPluginCommand,
|
||||
@@ -93,5 +95,107 @@ describe("registerPluginCommand", () => {
|
||||
acceptsArgs: false,
|
||||
},
|
||||
]);
|
||||
expect(getPluginCommandSpecs("slack")).toEqual([]);
|
||||
});
|
||||
|
||||
it("resolves Discord DM command bindings with the user target prefix intact", () => {
|
||||
expect(
|
||||
__testing.resolveBindingConversationFromCommand({
|
||||
channel: "discord",
|
||||
from: "discord:1177378744822943744",
|
||||
to: "slash:1177378744822943744",
|
||||
accountId: "default",
|
||||
}),
|
||||
).toEqual({
|
||||
channel: "discord",
|
||||
accountId: "default",
|
||||
conversationId: "user:1177378744822943744",
|
||||
});
|
||||
});
|
||||
|
||||
it("resolves Discord guild command bindings with the channel target prefix intact", () => {
|
||||
expect(
|
||||
__testing.resolveBindingConversationFromCommand({
|
||||
channel: "discord",
|
||||
from: "discord:channel:1480554272859881494",
|
||||
accountId: "default",
|
||||
}),
|
||||
).toEqual({
|
||||
channel: "discord",
|
||||
accountId: "default",
|
||||
conversationId: "channel:1480554272859881494",
|
||||
});
|
||||
});
|
||||
|
||||
it("does not resolve binding conversations for unsupported command channels", () => {
|
||||
expect(
|
||||
__testing.resolveBindingConversationFromCommand({
|
||||
channel: "slack",
|
||||
from: "slack:U123",
|
||||
to: "C456",
|
||||
accountId: "default",
|
||||
}),
|
||||
).toBeNull();
|
||||
});
|
||||
|
||||
it("does not expose binding APIs to plugin commands on unsupported channels", async () => {
|
||||
const handler = async (ctx: {
|
||||
requestConversationBinding: (params: { summary: string }) => Promise<unknown>;
|
||||
getCurrentConversationBinding: () => Promise<unknown>;
|
||||
detachConversationBinding: () => Promise<unknown>;
|
||||
}) => {
|
||||
const requested = await ctx.requestConversationBinding({
|
||||
summary: "Bind this conversation.",
|
||||
});
|
||||
const current = await ctx.getCurrentConversationBinding();
|
||||
const detached = await ctx.detachConversationBinding();
|
||||
return {
|
||||
text: JSON.stringify({
|
||||
requested,
|
||||
current,
|
||||
detached,
|
||||
}),
|
||||
};
|
||||
};
|
||||
registerPluginCommand(
|
||||
"demo-plugin",
|
||||
{
|
||||
name: "bindcheck",
|
||||
description: "Demo command",
|
||||
acceptsArgs: false,
|
||||
handler,
|
||||
},
|
||||
{ pluginRoot: "/plugins/demo-plugin" },
|
||||
);
|
||||
|
||||
const result = await executePluginCommand({
|
||||
command: {
|
||||
name: "bindcheck",
|
||||
description: "Demo command",
|
||||
acceptsArgs: false,
|
||||
handler,
|
||||
pluginId: "demo-plugin",
|
||||
pluginRoot: "/plugins/demo-plugin",
|
||||
},
|
||||
channel: "slack",
|
||||
senderId: "U123",
|
||||
isAuthorizedSender: true,
|
||||
commandBody: "/bindcheck",
|
||||
config: {} as never,
|
||||
from: "slack:U123",
|
||||
to: "C456",
|
||||
accountId: "default",
|
||||
});
|
||||
|
||||
expect(result.text).toBe(
|
||||
JSON.stringify({
|
||||
requested: {
|
||||
status: "error",
|
||||
message: "This command cannot bind the current conversation.",
|
||||
},
|
||||
current: null,
|
||||
detached: { removed: false },
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -5,8 +5,15 @@
|
||||
* These commands are processed before built-in commands and before agent invocation.
|
||||
*/
|
||||
|
||||
import { parseDiscordTarget } from "../../extensions/discord/src/targets.js";
|
||||
import { parseTelegramTarget } from "../../extensions/telegram/src/targets.js";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { logVerbose } from "../globals.js";
|
||||
import {
|
||||
detachPluginConversationBinding,
|
||||
getCurrentPluginConversationBinding,
|
||||
requestPluginConversationBinding,
|
||||
} from "./conversation-binding.js";
|
||||
import type {
|
||||
OpenClawPluginCommandDefinition,
|
||||
PluginCommandContext,
|
||||
@@ -15,6 +22,8 @@ import type {
|
||||
|
||||
type RegisteredPluginCommand = OpenClawPluginCommandDefinition & {
|
||||
pluginId: string;
|
||||
pluginName?: string;
|
||||
pluginRoot?: string;
|
||||
};
|
||||
|
||||
// Registry of plugin commands
|
||||
@@ -109,6 +118,7 @@ export type CommandRegistrationResult = {
|
||||
export function registerPluginCommand(
|
||||
pluginId: string,
|
||||
command: OpenClawPluginCommandDefinition,
|
||||
opts?: { pluginName?: string; pluginRoot?: string },
|
||||
): CommandRegistrationResult {
|
||||
// Prevent registration while commands are being processed
|
||||
if (registryLocked) {
|
||||
@@ -149,7 +159,14 @@ export function registerPluginCommand(
|
||||
};
|
||||
}
|
||||
|
||||
pluginCommands.set(key, { ...command, name, description, pluginId });
|
||||
pluginCommands.set(key, {
|
||||
...command,
|
||||
name,
|
||||
description,
|
||||
pluginId,
|
||||
pluginName: opts?.pluginName,
|
||||
pluginRoot: opts?.pluginRoot,
|
||||
});
|
||||
logVerbose(`Registered plugin command: ${key} (plugin: ${pluginId})`);
|
||||
return { ok: true };
|
||||
}
|
||||
@@ -235,6 +252,63 @@ function sanitizeArgs(args: string | undefined): string | undefined {
|
||||
return sanitized;
|
||||
}
|
||||
|
||||
function stripPrefix(raw: string | undefined, prefix: string): string | undefined {
|
||||
if (!raw) {
|
||||
return undefined;
|
||||
}
|
||||
return raw.startsWith(prefix) ? raw.slice(prefix.length) : raw;
|
||||
}
|
||||
|
||||
function resolveBindingConversationFromCommand(params: {
|
||||
channel: string;
|
||||
from?: string;
|
||||
to?: string;
|
||||
accountId?: string;
|
||||
messageThreadId?: number;
|
||||
}): {
|
||||
channel: string;
|
||||
accountId: string;
|
||||
conversationId: string;
|
||||
parentConversationId?: string;
|
||||
threadId?: string | number;
|
||||
} | null {
|
||||
const accountId = params.accountId?.trim() || "default";
|
||||
if (params.channel === "telegram") {
|
||||
const rawTarget = params.to ?? params.from;
|
||||
if (!rawTarget) {
|
||||
return null;
|
||||
}
|
||||
const target = parseTelegramTarget(rawTarget);
|
||||
return {
|
||||
channel: "telegram",
|
||||
accountId,
|
||||
conversationId: target.chatId,
|
||||
threadId: params.messageThreadId ?? target.messageThreadId,
|
||||
};
|
||||
}
|
||||
if (params.channel === "discord") {
|
||||
const source = params.from ?? params.to;
|
||||
const rawTarget = source?.startsWith("discord:channel:")
|
||||
? stripPrefix(source, "discord:")
|
||||
: source?.startsWith("discord:user:")
|
||||
? stripPrefix(source, "discord:")
|
||||
: source;
|
||||
if (!rawTarget || rawTarget.startsWith("slash:")) {
|
||||
return null;
|
||||
}
|
||||
const target = parseDiscordTarget(rawTarget, { defaultKind: "channel" });
|
||||
if (!target) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
channel: "discord",
|
||||
accountId,
|
||||
conversationId: `${target.kind}:${target.id}`,
|
||||
};
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a plugin command handler.
|
||||
*
|
||||
@@ -268,6 +342,13 @@ export async function executePluginCommand(params: {
|
||||
|
||||
// Sanitize args before passing to handler
|
||||
const sanitizedArgs = sanitizeArgs(args);
|
||||
const bindingConversation = resolveBindingConversationFromCommand({
|
||||
channel,
|
||||
from: params.from,
|
||||
to: params.to,
|
||||
accountId: params.accountId,
|
||||
messageThreadId: params.messageThreadId,
|
||||
});
|
||||
|
||||
const ctx: PluginCommandContext = {
|
||||
senderId,
|
||||
@@ -281,6 +362,40 @@ export async function executePluginCommand(params: {
|
||||
to: params.to,
|
||||
accountId: params.accountId,
|
||||
messageThreadId: params.messageThreadId,
|
||||
requestConversationBinding: async (bindingParams) => {
|
||||
if (!command.pluginRoot || !bindingConversation) {
|
||||
return {
|
||||
status: "error",
|
||||
message: "This command cannot bind the current conversation.",
|
||||
};
|
||||
}
|
||||
return requestPluginConversationBinding({
|
||||
pluginId: command.pluginId,
|
||||
pluginName: command.pluginName,
|
||||
pluginRoot: command.pluginRoot,
|
||||
requestedBySenderId: senderId,
|
||||
conversation: bindingConversation,
|
||||
binding: bindingParams,
|
||||
});
|
||||
},
|
||||
detachConversationBinding: async () => {
|
||||
if (!command.pluginRoot || !bindingConversation) {
|
||||
return { removed: false };
|
||||
}
|
||||
return detachPluginConversationBinding({
|
||||
pluginRoot: command.pluginRoot,
|
||||
conversation: bindingConversation,
|
||||
});
|
||||
},
|
||||
getCurrentConversationBinding: async () => {
|
||||
if (!command.pluginRoot || !bindingConversation) {
|
||||
return null;
|
||||
}
|
||||
return getCurrentPluginConversationBinding({
|
||||
pluginRoot: command.pluginRoot,
|
||||
conversation: bindingConversation,
|
||||
});
|
||||
},
|
||||
};
|
||||
|
||||
// Lock registry during execution to prevent concurrent modifications
|
||||
@@ -341,9 +456,17 @@ export function getPluginCommandSpecs(provider?: string): Array<{
|
||||
description: string;
|
||||
acceptsArgs: boolean;
|
||||
}> {
|
||||
const providerName = provider?.trim().toLowerCase();
|
||||
if (providerName && providerName !== "telegram" && providerName !== "discord") {
|
||||
return [];
|
||||
}
|
||||
return Array.from(pluginCommands.values()).map((cmd) => ({
|
||||
name: resolvePluginNativeName(cmd, provider),
|
||||
description: cmd.description,
|
||||
acceptsArgs: cmd.acceptsArgs ?? false,
|
||||
}));
|
||||
}
|
||||
|
||||
export const __testing = {
|
||||
resolveBindingConversationFromCommand,
|
||||
};
|
||||
|
||||
575
src/plugins/conversation-binding.test.ts
Normal file
575
src/plugins/conversation-binding.test.ts
Normal file
@@ -0,0 +1,575 @@
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type {
|
||||
ConversationRef,
|
||||
SessionBindingAdapter,
|
||||
SessionBindingRecord,
|
||||
} from "../infra/outbound/session-binding-service.js";
|
||||
|
||||
const tempRoot = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-plugin-binding-"));
|
||||
const approvalsPath = path.join(tempRoot, "plugin-binding-approvals.json");
|
||||
|
||||
const sessionBindingState = vi.hoisted(() => {
|
||||
const records = new Map<string, SessionBindingRecord>();
|
||||
let nextId = 1;
|
||||
|
||||
function normalizeRef(ref: ConversationRef): ConversationRef {
|
||||
return {
|
||||
channel: ref.channel.trim().toLowerCase(),
|
||||
accountId: ref.accountId.trim() || "default",
|
||||
conversationId: ref.conversationId.trim(),
|
||||
parentConversationId: ref.parentConversationId?.trim() || undefined,
|
||||
};
|
||||
}
|
||||
|
||||
function toKey(ref: ConversationRef): string {
|
||||
const normalized = normalizeRef(ref);
|
||||
return JSON.stringify(normalized);
|
||||
}
|
||||
|
||||
return {
|
||||
records,
|
||||
bind: vi.fn(
|
||||
async (input: {
|
||||
targetSessionKey: string;
|
||||
targetKind: "session" | "subagent";
|
||||
conversation: ConversationRef;
|
||||
metadata?: Record<string, unknown>;
|
||||
}) => {
|
||||
const normalized = normalizeRef(input.conversation);
|
||||
const record: SessionBindingRecord = {
|
||||
bindingId: `binding-${nextId++}`,
|
||||
targetSessionKey: input.targetSessionKey,
|
||||
targetKind: input.targetKind,
|
||||
conversation: normalized,
|
||||
status: "active",
|
||||
boundAt: Date.now(),
|
||||
metadata: input.metadata,
|
||||
};
|
||||
records.set(toKey(normalized), record);
|
||||
return record;
|
||||
},
|
||||
),
|
||||
resolveByConversation: vi.fn((ref: ConversationRef) => {
|
||||
return records.get(toKey(ref)) ?? null;
|
||||
}),
|
||||
touch: vi.fn(),
|
||||
unbind: vi.fn(async (input: { bindingId?: string }) => {
|
||||
const removed: SessionBindingRecord[] = [];
|
||||
for (const [key, record] of records.entries()) {
|
||||
if (record.bindingId !== input.bindingId) {
|
||||
continue;
|
||||
}
|
||||
removed.push(record);
|
||||
records.delete(key);
|
||||
}
|
||||
return removed;
|
||||
}),
|
||||
reset() {
|
||||
records.clear();
|
||||
nextId = 1;
|
||||
this.bind.mockClear();
|
||||
this.resolveByConversation.mockClear();
|
||||
this.touch.mockClear();
|
||||
this.unbind.mockClear();
|
||||
},
|
||||
setRecord(record: SessionBindingRecord) {
|
||||
records.set(toKey(record.conversation), record);
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("../infra/home-dir.js", () => ({
|
||||
expandHomePrefix: (value: string) => {
|
||||
if (value === "~/.openclaw/plugin-binding-approvals.json") {
|
||||
return approvalsPath;
|
||||
}
|
||||
return value;
|
||||
},
|
||||
}));
|
||||
|
||||
const {
|
||||
__testing,
|
||||
buildPluginBindingApprovalCustomId,
|
||||
detachPluginConversationBinding,
|
||||
getCurrentPluginConversationBinding,
|
||||
parsePluginBindingApprovalCustomId,
|
||||
requestPluginConversationBinding,
|
||||
resolvePluginConversationBindingApproval,
|
||||
} = await import("./conversation-binding.js");
|
||||
const { registerSessionBindingAdapter, unregisterSessionBindingAdapter } =
|
||||
await import("../infra/outbound/session-binding-service.js");
|
||||
|
||||
function createAdapter(channel: string, accountId: string): SessionBindingAdapter {
|
||||
return {
|
||||
channel,
|
||||
accountId,
|
||||
capabilities: {
|
||||
bindSupported: true,
|
||||
unbindSupported: true,
|
||||
placements: ["current", "child"],
|
||||
},
|
||||
bind: sessionBindingState.bind,
|
||||
listBySession: () => [],
|
||||
resolveByConversation: sessionBindingState.resolveByConversation,
|
||||
touch: sessionBindingState.touch,
|
||||
unbind: sessionBindingState.unbind,
|
||||
};
|
||||
}
|
||||
|
||||
describe("plugin conversation binding approvals", () => {
|
||||
beforeEach(() => {
|
||||
sessionBindingState.reset();
|
||||
__testing.reset();
|
||||
fs.rmSync(approvalsPath, { force: true });
|
||||
unregisterSessionBindingAdapter({ channel: "discord", accountId: "default" });
|
||||
unregisterSessionBindingAdapter({ channel: "discord", accountId: "work" });
|
||||
unregisterSessionBindingAdapter({ channel: "discord", accountId: "isolated" });
|
||||
unregisterSessionBindingAdapter({ channel: "telegram", accountId: "default" });
|
||||
registerSessionBindingAdapter(createAdapter("discord", "default"));
|
||||
registerSessionBindingAdapter(createAdapter("discord", "work"));
|
||||
registerSessionBindingAdapter(createAdapter("discord", "isolated"));
|
||||
registerSessionBindingAdapter(createAdapter("telegram", "default"));
|
||||
});
|
||||
|
||||
it("keeps Telegram bind approval callback_data within Telegram's limit", () => {
|
||||
const allowOnce = buildPluginBindingApprovalCustomId("abcdefghijkl", "allow-once");
|
||||
const allowAlways = buildPluginBindingApprovalCustomId("abcdefghijkl", "allow-always");
|
||||
const deny = buildPluginBindingApprovalCustomId("abcdefghijkl", "deny");
|
||||
|
||||
expect(Buffer.byteLength(allowOnce, "utf8")).toBeLessThanOrEqual(64);
|
||||
expect(Buffer.byteLength(allowAlways, "utf8")).toBeLessThanOrEqual(64);
|
||||
expect(Buffer.byteLength(deny, "utf8")).toBeLessThanOrEqual(64);
|
||||
expect(parsePluginBindingApprovalCustomId(allowAlways)).toEqual({
|
||||
approvalId: "abcdefghijkl",
|
||||
decision: "allow-always",
|
||||
});
|
||||
});
|
||||
|
||||
it("requires a fresh approval again after allow-once is consumed", async () => {
|
||||
const firstRequest = await requestPluginConversationBinding({
|
||||
pluginId: "codex",
|
||||
pluginName: "Codex App Server",
|
||||
pluginRoot: "/plugins/codex-a",
|
||||
requestedBySenderId: "user-1",
|
||||
conversation: {
|
||||
channel: "discord",
|
||||
accountId: "isolated",
|
||||
conversationId: "channel:1",
|
||||
},
|
||||
binding: { summary: "Bind this conversation to Codex thread 123." },
|
||||
});
|
||||
|
||||
expect(firstRequest.status).toBe("pending");
|
||||
if (firstRequest.status !== "pending") {
|
||||
throw new Error("expected pending bind request");
|
||||
}
|
||||
|
||||
const approved = await resolvePluginConversationBindingApproval({
|
||||
approvalId: firstRequest.approvalId,
|
||||
decision: "allow-once",
|
||||
senderId: "user-1",
|
||||
});
|
||||
|
||||
expect(approved.status).toBe("approved");
|
||||
|
||||
const secondRequest = await requestPluginConversationBinding({
|
||||
pluginId: "codex",
|
||||
pluginName: "Codex App Server",
|
||||
pluginRoot: "/plugins/codex-a",
|
||||
requestedBySenderId: "user-1",
|
||||
conversation: {
|
||||
channel: "discord",
|
||||
accountId: "isolated",
|
||||
conversationId: "channel:2",
|
||||
},
|
||||
binding: { summary: "Bind this conversation to Codex thread 456." },
|
||||
});
|
||||
|
||||
expect(secondRequest.status).toBe("pending");
|
||||
});
|
||||
|
||||
it("persists always-allow by plugin root plus channel/account only", async () => {
|
||||
const firstRequest = await requestPluginConversationBinding({
|
||||
pluginId: "codex",
|
||||
pluginName: "Codex App Server",
|
||||
pluginRoot: "/plugins/codex-a",
|
||||
requestedBySenderId: "user-1",
|
||||
conversation: {
|
||||
channel: "discord",
|
||||
accountId: "isolated",
|
||||
conversationId: "channel:1",
|
||||
},
|
||||
binding: { summary: "Bind this conversation to Codex thread 123." },
|
||||
});
|
||||
|
||||
expect(firstRequest.status).toBe("pending");
|
||||
if (firstRequest.status !== "pending") {
|
||||
throw new Error("expected pending bind request");
|
||||
}
|
||||
|
||||
const approved = await resolvePluginConversationBindingApproval({
|
||||
approvalId: firstRequest.approvalId,
|
||||
decision: "allow-always",
|
||||
senderId: "user-1",
|
||||
});
|
||||
|
||||
expect(approved.status).toBe("approved");
|
||||
|
||||
const sameScope = await requestPluginConversationBinding({
|
||||
pluginId: "codex",
|
||||
pluginName: "Codex App Server",
|
||||
pluginRoot: "/plugins/codex-a",
|
||||
requestedBySenderId: "user-1",
|
||||
conversation: {
|
||||
channel: "discord",
|
||||
accountId: "isolated",
|
||||
conversationId: "channel:2",
|
||||
},
|
||||
binding: { summary: "Bind this conversation to Codex thread 456." },
|
||||
});
|
||||
|
||||
expect(sameScope.status).toBe("bound");
|
||||
|
||||
const differentAccount = await requestPluginConversationBinding({
|
||||
pluginId: "codex",
|
||||
pluginName: "Codex App Server",
|
||||
pluginRoot: "/plugins/codex-a",
|
||||
requestedBySenderId: "user-1",
|
||||
conversation: {
|
||||
channel: "discord",
|
||||
accountId: "work",
|
||||
conversationId: "channel:3",
|
||||
},
|
||||
binding: { summary: "Bind this conversation to Codex thread 789." },
|
||||
});
|
||||
|
||||
expect(differentAccount.status).toBe("pending");
|
||||
});
|
||||
|
||||
it("does not share persistent approvals across plugin roots even with the same plugin id", async () => {
|
||||
const request = await requestPluginConversationBinding({
|
||||
pluginId: "codex",
|
||||
pluginName: "Codex App Server",
|
||||
pluginRoot: "/plugins/codex-a",
|
||||
requestedBySenderId: "user-1",
|
||||
conversation: {
|
||||
channel: "telegram",
|
||||
accountId: "default",
|
||||
conversationId: "-10099:topic:77",
|
||||
parentConversationId: "-10099",
|
||||
threadId: "77",
|
||||
},
|
||||
binding: { summary: "Bind this conversation to Codex thread abc." },
|
||||
});
|
||||
|
||||
expect(request.status).toBe("pending");
|
||||
if (request.status !== "pending") {
|
||||
throw new Error("expected pending bind request");
|
||||
}
|
||||
|
||||
await resolvePluginConversationBindingApproval({
|
||||
approvalId: request.approvalId,
|
||||
decision: "allow-always",
|
||||
senderId: "user-1",
|
||||
});
|
||||
|
||||
const samePluginNewPath = await requestPluginConversationBinding({
|
||||
pluginId: "codex",
|
||||
pluginName: "Codex App Server",
|
||||
pluginRoot: "/plugins/codex-b",
|
||||
requestedBySenderId: "user-1",
|
||||
conversation: {
|
||||
channel: "telegram",
|
||||
accountId: "default",
|
||||
conversationId: "-10099:topic:78",
|
||||
parentConversationId: "-10099",
|
||||
threadId: "78",
|
||||
},
|
||||
binding: { summary: "Bind this conversation to Codex thread def." },
|
||||
});
|
||||
|
||||
expect(samePluginNewPath.status).toBe("pending");
|
||||
});
|
||||
|
||||
it("persists detachHint on approved plugin bindings", async () => {
|
||||
const request = await requestPluginConversationBinding({
|
||||
pluginId: "codex",
|
||||
pluginName: "Codex App Server",
|
||||
pluginRoot: "/plugins/codex-a",
|
||||
requestedBySenderId: "user-1",
|
||||
conversation: {
|
||||
channel: "discord",
|
||||
accountId: "isolated",
|
||||
conversationId: "channel:detach-hint",
|
||||
},
|
||||
binding: {
|
||||
summary: "Bind this conversation to Codex thread 999.",
|
||||
detachHint: "/codex_detach",
|
||||
},
|
||||
});
|
||||
|
||||
expect(["pending", "bound"]).toContain(request.status);
|
||||
|
||||
if (request.status === "pending") {
|
||||
const approved = await resolvePluginConversationBindingApproval({
|
||||
approvalId: request.approvalId,
|
||||
decision: "allow-once",
|
||||
senderId: "user-1",
|
||||
});
|
||||
|
||||
expect(approved.status).toBe("approved");
|
||||
if (approved.status !== "approved") {
|
||||
throw new Error("expected approved bind request");
|
||||
}
|
||||
|
||||
expect(approved.binding.detachHint).toBe("/codex_detach");
|
||||
} else {
|
||||
expect(request.binding.detachHint).toBe("/codex_detach");
|
||||
}
|
||||
|
||||
const currentBinding = await getCurrentPluginConversationBinding({
|
||||
pluginRoot: "/plugins/codex-a",
|
||||
conversation: {
|
||||
channel: "discord",
|
||||
accountId: "isolated",
|
||||
conversationId: "channel:detach-hint",
|
||||
},
|
||||
});
|
||||
|
||||
expect(currentBinding?.detachHint).toBe("/codex_detach");
|
||||
});
|
||||
|
||||
it("returns and detaches only bindings owned by the requesting plugin root", async () => {
|
||||
const request = await requestPluginConversationBinding({
|
||||
pluginId: "codex",
|
||||
pluginName: "Codex App Server",
|
||||
pluginRoot: "/plugins/codex-a",
|
||||
requestedBySenderId: "user-1",
|
||||
conversation: {
|
||||
channel: "discord",
|
||||
accountId: "isolated",
|
||||
conversationId: "channel:1",
|
||||
},
|
||||
binding: { summary: "Bind this conversation to Codex thread 123." },
|
||||
});
|
||||
|
||||
expect(["pending", "bound"]).toContain(request.status);
|
||||
if (request.status === "pending") {
|
||||
await resolvePluginConversationBindingApproval({
|
||||
approvalId: request.approvalId,
|
||||
decision: "allow-once",
|
||||
senderId: "user-1",
|
||||
});
|
||||
}
|
||||
|
||||
const current = await getCurrentPluginConversationBinding({
|
||||
pluginRoot: "/plugins/codex-a",
|
||||
conversation: {
|
||||
channel: "discord",
|
||||
accountId: "isolated",
|
||||
conversationId: "channel:1",
|
||||
},
|
||||
});
|
||||
|
||||
expect(current).toEqual(
|
||||
expect.objectContaining({
|
||||
pluginId: "codex",
|
||||
pluginRoot: "/plugins/codex-a",
|
||||
conversationId: "channel:1",
|
||||
}),
|
||||
);
|
||||
|
||||
const otherPluginView = await getCurrentPluginConversationBinding({
|
||||
pluginRoot: "/plugins/codex-b",
|
||||
conversation: {
|
||||
channel: "discord",
|
||||
accountId: "isolated",
|
||||
conversationId: "channel:1",
|
||||
},
|
||||
});
|
||||
|
||||
expect(otherPluginView).toBeNull();
|
||||
|
||||
expect(
|
||||
await detachPluginConversationBinding({
|
||||
pluginRoot: "/plugins/codex-b",
|
||||
conversation: {
|
||||
channel: "discord",
|
||||
accountId: "isolated",
|
||||
conversationId: "channel:1",
|
||||
},
|
||||
}),
|
||||
).toEqual({ removed: false });
|
||||
|
||||
expect(
|
||||
await detachPluginConversationBinding({
|
||||
pluginRoot: "/plugins/codex-a",
|
||||
conversation: {
|
||||
channel: "discord",
|
||||
accountId: "isolated",
|
||||
conversationId: "channel:1",
|
||||
},
|
||||
}),
|
||||
).toEqual({ removed: true });
|
||||
});
|
||||
|
||||
it("refuses to claim a conversation already bound by core", async () => {
|
||||
sessionBindingState.setRecord({
|
||||
bindingId: "binding-core",
|
||||
targetSessionKey: "agent:main:discord:channel:1",
|
||||
targetKind: "session",
|
||||
conversation: {
|
||||
channel: "discord",
|
||||
accountId: "default",
|
||||
conversationId: "channel:1",
|
||||
},
|
||||
status: "active",
|
||||
boundAt: Date.now(),
|
||||
metadata: { owner: "core" },
|
||||
});
|
||||
|
||||
const result = await requestPluginConversationBinding({
|
||||
pluginId: "codex",
|
||||
pluginName: "Codex App Server",
|
||||
pluginRoot: "/plugins/codex-a",
|
||||
requestedBySenderId: "user-1",
|
||||
conversation: {
|
||||
channel: "discord",
|
||||
accountId: "default",
|
||||
conversationId: "channel:1",
|
||||
},
|
||||
binding: { summary: "Bind this conversation to Codex thread 123." },
|
||||
});
|
||||
|
||||
expect(result).toEqual({
|
||||
status: "error",
|
||||
message:
|
||||
"This conversation is already bound by core routing and cannot be claimed by a plugin.",
|
||||
});
|
||||
});
|
||||
|
||||
it("migrates a legacy plugin binding record through the new approval flow even if the old plugin id differs", async () => {
|
||||
sessionBindingState.setRecord({
|
||||
bindingId: "binding-legacy",
|
||||
targetSessionKey: "plugin-binding:old-codex-plugin:legacy123",
|
||||
targetKind: "session",
|
||||
conversation: {
|
||||
channel: "telegram",
|
||||
accountId: "default",
|
||||
conversationId: "-10099:topic:77",
|
||||
},
|
||||
status: "active",
|
||||
boundAt: Date.now(),
|
||||
metadata: {
|
||||
label: "legacy plugin bind",
|
||||
},
|
||||
});
|
||||
|
||||
const request = await requestPluginConversationBinding({
|
||||
pluginId: "codex",
|
||||
pluginName: "Codex App Server",
|
||||
pluginRoot: "/plugins/codex-a",
|
||||
requestedBySenderId: "user-1",
|
||||
conversation: {
|
||||
channel: "telegram",
|
||||
accountId: "default",
|
||||
conversationId: "-10099:topic:77",
|
||||
parentConversationId: "-10099",
|
||||
threadId: "77",
|
||||
},
|
||||
binding: { summary: "Bind this conversation to Codex thread abc." },
|
||||
});
|
||||
|
||||
expect(["pending", "bound"]).toContain(request.status);
|
||||
const binding =
|
||||
request.status === "pending"
|
||||
? await resolvePluginConversationBindingApproval({
|
||||
approvalId: request.approvalId,
|
||||
decision: "allow-once",
|
||||
senderId: "user-1",
|
||||
}).then((approved) => {
|
||||
expect(approved.status).toBe("approved");
|
||||
if (approved.status !== "approved") {
|
||||
throw new Error("expected approved bind result");
|
||||
}
|
||||
return approved.binding;
|
||||
})
|
||||
: request.status === "bound"
|
||||
? request.binding
|
||||
: (() => {
|
||||
throw new Error("expected pending or bound bind result");
|
||||
})();
|
||||
|
||||
expect(binding).toEqual(
|
||||
expect.objectContaining({
|
||||
pluginId: "codex",
|
||||
pluginRoot: "/plugins/codex-a",
|
||||
conversationId: "-10099:topic:77",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("migrates a legacy codex thread binding session key through the new approval flow", async () => {
|
||||
sessionBindingState.setRecord({
|
||||
bindingId: "binding-legacy-codex-thread",
|
||||
targetSessionKey: "openclaw-app-server:thread:019ce411-6322-7db2-a821-1a61c530e7d9",
|
||||
targetKind: "session",
|
||||
conversation: {
|
||||
channel: "telegram",
|
||||
accountId: "default",
|
||||
conversationId: "8460800771",
|
||||
},
|
||||
status: "active",
|
||||
boundAt: Date.now(),
|
||||
metadata: {
|
||||
label: "legacy codex thread bind",
|
||||
},
|
||||
});
|
||||
|
||||
const request = await requestPluginConversationBinding({
|
||||
pluginId: "openclaw-codex-app-server",
|
||||
pluginName: "Codex App Server",
|
||||
pluginRoot: "/plugins/codex-a",
|
||||
requestedBySenderId: "user-1",
|
||||
conversation: {
|
||||
channel: "telegram",
|
||||
accountId: "default",
|
||||
conversationId: "8460800771",
|
||||
},
|
||||
binding: {
|
||||
summary: "Bind this conversation to Codex thread 019ce411-6322-7db2-a821-1a61c530e7d9.",
|
||||
},
|
||||
});
|
||||
|
||||
expect(["pending", "bound"]).toContain(request.status);
|
||||
const binding =
|
||||
request.status === "pending"
|
||||
? await resolvePluginConversationBindingApproval({
|
||||
approvalId: request.approvalId,
|
||||
decision: "allow-once",
|
||||
senderId: "user-1",
|
||||
}).then((approved) => {
|
||||
expect(approved.status).toBe("approved");
|
||||
if (approved.status !== "approved") {
|
||||
throw new Error("expected approved bind result");
|
||||
}
|
||||
return approved.binding;
|
||||
})
|
||||
: request.status === "bound"
|
||||
? request.binding
|
||||
: (() => {
|
||||
throw new Error("expected pending or bound bind result");
|
||||
})();
|
||||
|
||||
expect(binding).toEqual(
|
||||
expect.objectContaining({
|
||||
pluginId: "openclaw-codex-app-server",
|
||||
pluginRoot: "/plugins/codex-a",
|
||||
conversationId: "8460800771",
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
825
src/plugins/conversation-binding.ts
Normal file
825
src/plugins/conversation-binding.ts
Normal file
@@ -0,0 +1,825 @@
|
||||
import crypto from "node:crypto";
|
||||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import { Button, Row, type TopLevelComponents } from "@buape/carbon";
|
||||
import { ButtonStyle } from "discord-api-types/v10";
|
||||
import type { ReplyPayload } from "../auto-reply/types.js";
|
||||
import { expandHomePrefix } from "../infra/home-dir.js";
|
||||
import { writeJsonAtomic } from "../infra/json-files.js";
|
||||
import {
|
||||
getSessionBindingService,
|
||||
type ConversationRef,
|
||||
} from "../infra/outbound/session-binding-service.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import type {
|
||||
PluginConversationBinding,
|
||||
PluginConversationBindingRequestParams,
|
||||
PluginConversationBindingRequestResult,
|
||||
} from "./types.js";
|
||||
|
||||
const log = createSubsystemLogger("plugins/binding");
|
||||
|
||||
const APPROVALS_PATH = "~/.openclaw/plugin-binding-approvals.json";
|
||||
const PLUGIN_BINDING_CUSTOM_ID_PREFIX = "pluginbind";
|
||||
const PLUGIN_BINDING_OWNER = "plugin";
|
||||
const PLUGIN_BINDING_SESSION_PREFIX = "plugin-binding";
|
||||
const LEGACY_CODEX_PLUGIN_SESSION_PREFIXES = [
|
||||
"openclaw-app-server:thread:",
|
||||
"openclaw-codex-app-server:thread:",
|
||||
] as const;
|
||||
|
||||
type PluginBindingApprovalDecision = "allow-once" | "allow-always" | "deny";
|
||||
|
||||
type PluginBindingApprovalEntry = {
|
||||
pluginRoot: string;
|
||||
pluginId: string;
|
||||
pluginName?: string;
|
||||
channel: string;
|
||||
accountId: string;
|
||||
approvedAt: number;
|
||||
};
|
||||
|
||||
type PluginBindingApprovalsFile = {
|
||||
version: 1;
|
||||
approvals: PluginBindingApprovalEntry[];
|
||||
};
|
||||
|
||||
type PluginBindingConversation = {
|
||||
channel: string;
|
||||
accountId: string;
|
||||
conversationId: string;
|
||||
parentConversationId?: string;
|
||||
threadId?: string | number;
|
||||
};
|
||||
|
||||
type PendingPluginBindingRequest = {
|
||||
id: string;
|
||||
pluginId: string;
|
||||
pluginName?: string;
|
||||
pluginRoot: string;
|
||||
conversation: PluginBindingConversation;
|
||||
requestedAt: number;
|
||||
requestedBySenderId?: string;
|
||||
summary?: string;
|
||||
detachHint?: string;
|
||||
};
|
||||
|
||||
type PluginBindingApprovalAction = {
|
||||
approvalId: string;
|
||||
decision: PluginBindingApprovalDecision;
|
||||
};
|
||||
|
||||
type PluginBindingIdentity = {
|
||||
pluginId: string;
|
||||
pluginName?: string;
|
||||
pluginRoot: string;
|
||||
};
|
||||
|
||||
type PluginBindingMetadata = {
|
||||
pluginBindingOwner: "plugin";
|
||||
pluginId: string;
|
||||
pluginName?: string;
|
||||
pluginRoot: string;
|
||||
summary?: string;
|
||||
detachHint?: string;
|
||||
};
|
||||
|
||||
type PluginBindingResolveResult =
|
||||
| {
|
||||
status: "approved";
|
||||
binding: PluginConversationBinding;
|
||||
request: PendingPluginBindingRequest;
|
||||
decision: PluginBindingApprovalDecision;
|
||||
}
|
||||
| {
|
||||
status: "denied";
|
||||
request: PendingPluginBindingRequest;
|
||||
}
|
||||
| {
|
||||
status: "expired";
|
||||
};
|
||||
|
||||
const pendingRequests = new Map<string, PendingPluginBindingRequest>();
|
||||
|
||||
type PluginBindingGlobalState = {
|
||||
fallbackNoticeBindingIds: Set<string>;
|
||||
};
|
||||
|
||||
const pluginBindingGlobalStateKey = Symbol.for("openclaw.plugins.binding.global-state");
|
||||
|
||||
let approvalsCache: PluginBindingApprovalsFile | null = null;
|
||||
let approvalsLoaded = false;
|
||||
|
||||
function getPluginBindingGlobalState(): PluginBindingGlobalState {
|
||||
const globalStore = globalThis as typeof globalThis & {
|
||||
[pluginBindingGlobalStateKey]?: PluginBindingGlobalState;
|
||||
};
|
||||
return (globalStore[pluginBindingGlobalStateKey] ??= {
|
||||
fallbackNoticeBindingIds: new Set<string>(),
|
||||
});
|
||||
}
|
||||
|
||||
class PluginBindingApprovalButton extends Button {
|
||||
customId: string;
|
||||
label: string;
|
||||
style: ButtonStyle;
|
||||
|
||||
constructor(params: {
|
||||
approvalId: string;
|
||||
decision: PluginBindingApprovalDecision;
|
||||
label: string;
|
||||
style: ButtonStyle;
|
||||
}) {
|
||||
super();
|
||||
this.customId = buildPluginBindingApprovalCustomId(params.approvalId, params.decision);
|
||||
this.label = params.label;
|
||||
this.style = params.style;
|
||||
}
|
||||
}
|
||||
|
||||
function resolveApprovalsPath(): string {
|
||||
return expandHomePrefix(APPROVALS_PATH);
|
||||
}
|
||||
|
||||
function normalizeChannel(value: string): string {
|
||||
return value.trim().toLowerCase();
|
||||
}
|
||||
|
||||
function normalizeConversation(params: PluginBindingConversation): PluginBindingConversation {
|
||||
return {
|
||||
channel: normalizeChannel(params.channel),
|
||||
accountId: params.accountId.trim() || "default",
|
||||
conversationId: params.conversationId.trim(),
|
||||
parentConversationId: params.parentConversationId?.trim() || undefined,
|
||||
threadId:
|
||||
typeof params.threadId === "number"
|
||||
? Math.trunc(params.threadId)
|
||||
: params.threadId?.toString().trim() || undefined,
|
||||
};
|
||||
}
|
||||
|
||||
function toConversationRef(params: PluginBindingConversation): ConversationRef {
|
||||
const normalized = normalizeConversation(params);
|
||||
if (normalized.channel === "telegram") {
|
||||
const threadId =
|
||||
typeof normalized.threadId === "number" || typeof normalized.threadId === "string"
|
||||
? String(normalized.threadId).trim()
|
||||
: "";
|
||||
if (threadId) {
|
||||
const parent = normalized.parentConversationId?.trim() || normalized.conversationId;
|
||||
return {
|
||||
channel: "telegram",
|
||||
accountId: normalized.accountId,
|
||||
conversationId: `${parent}:topic:${threadId}`,
|
||||
};
|
||||
}
|
||||
}
|
||||
return {
|
||||
channel: normalized.channel,
|
||||
accountId: normalized.accountId,
|
||||
conversationId: normalized.conversationId,
|
||||
...(normalized.parentConversationId
|
||||
? { parentConversationId: normalized.parentConversationId }
|
||||
: {}),
|
||||
};
|
||||
}
|
||||
|
||||
function buildApprovalScopeKey(params: {
|
||||
pluginRoot: string;
|
||||
channel: string;
|
||||
accountId: string;
|
||||
}): string {
|
||||
return [
|
||||
params.pluginRoot,
|
||||
normalizeChannel(params.channel),
|
||||
params.accountId.trim() || "default",
|
||||
].join("::");
|
||||
}
|
||||
|
||||
function buildPluginBindingSessionKey(params: {
|
||||
pluginId: string;
|
||||
channel: string;
|
||||
accountId: string;
|
||||
conversationId: string;
|
||||
}): string {
|
||||
const hash = crypto
|
||||
.createHash("sha256")
|
||||
.update(
|
||||
JSON.stringify({
|
||||
pluginId: params.pluginId,
|
||||
channel: normalizeChannel(params.channel),
|
||||
accountId: params.accountId,
|
||||
conversationId: params.conversationId,
|
||||
}),
|
||||
)
|
||||
.digest("hex")
|
||||
.slice(0, 24);
|
||||
return `${PLUGIN_BINDING_SESSION_PREFIX}:${params.pluginId}:${hash}`;
|
||||
}
|
||||
|
||||
function isLegacyPluginBindingRecord(params: {
|
||||
record:
|
||||
| {
|
||||
targetSessionKey: string;
|
||||
metadata?: Record<string, unknown>;
|
||||
}
|
||||
| null
|
||||
| undefined;
|
||||
}): boolean {
|
||||
if (!params.record || isPluginOwnedBindingMetadata(params.record.metadata)) {
|
||||
return false;
|
||||
}
|
||||
const targetSessionKey = params.record.targetSessionKey.trim();
|
||||
return (
|
||||
targetSessionKey.startsWith(`${PLUGIN_BINDING_SESSION_PREFIX}:`) ||
|
||||
LEGACY_CODEX_PLUGIN_SESSION_PREFIXES.some((prefix) => targetSessionKey.startsWith(prefix))
|
||||
);
|
||||
}
|
||||
|
||||
function buildDiscordButtonRow(
|
||||
approvalId: string,
|
||||
labels?: { once?: string; always?: string; deny?: string },
|
||||
): TopLevelComponents[] {
|
||||
return [
|
||||
new Row([
|
||||
new PluginBindingApprovalButton({
|
||||
approvalId,
|
||||
decision: "allow-once",
|
||||
label: labels?.once ?? "Allow once",
|
||||
style: ButtonStyle.Success,
|
||||
}),
|
||||
new PluginBindingApprovalButton({
|
||||
approvalId,
|
||||
decision: "allow-always",
|
||||
label: labels?.always ?? "Always allow",
|
||||
style: ButtonStyle.Primary,
|
||||
}),
|
||||
new PluginBindingApprovalButton({
|
||||
approvalId,
|
||||
decision: "deny",
|
||||
label: labels?.deny ?? "Deny",
|
||||
style: ButtonStyle.Danger,
|
||||
}),
|
||||
]),
|
||||
];
|
||||
}
|
||||
|
||||
function buildTelegramButtons(approvalId: string) {
|
||||
return [
|
||||
[
|
||||
{
|
||||
text: "Allow once",
|
||||
callback_data: buildPluginBindingApprovalCustomId(approvalId, "allow-once"),
|
||||
style: "success" as const,
|
||||
},
|
||||
{
|
||||
text: "Always allow",
|
||||
callback_data: buildPluginBindingApprovalCustomId(approvalId, "allow-always"),
|
||||
style: "primary" as const,
|
||||
},
|
||||
{
|
||||
text: "Deny",
|
||||
callback_data: buildPluginBindingApprovalCustomId(approvalId, "deny"),
|
||||
style: "danger" as const,
|
||||
},
|
||||
],
|
||||
];
|
||||
}
|
||||
|
||||
function createApprovalRequestId(): string {
|
||||
// Keep approval ids compact so Telegram callback_data stays under its 64-byte limit.
|
||||
return crypto.randomBytes(9).toString("base64url");
|
||||
}
|
||||
|
||||
function loadApprovalsFromDisk(): PluginBindingApprovalsFile {
|
||||
const filePath = resolveApprovalsPath();
|
||||
try {
|
||||
if (!fs.existsSync(filePath)) {
|
||||
return { version: 1, approvals: [] };
|
||||
}
|
||||
const raw = fs.readFileSync(filePath, "utf8");
|
||||
const parsed = JSON.parse(raw) as Partial<PluginBindingApprovalsFile>;
|
||||
if (!Array.isArray(parsed.approvals)) {
|
||||
return { version: 1, approvals: [] };
|
||||
}
|
||||
return {
|
||||
version: 1,
|
||||
approvals: parsed.approvals
|
||||
.filter((entry): entry is PluginBindingApprovalEntry =>
|
||||
Boolean(entry && typeof entry === "object"),
|
||||
)
|
||||
.map((entry) => ({
|
||||
pluginRoot: typeof entry.pluginRoot === "string" ? entry.pluginRoot : "",
|
||||
pluginId: typeof entry.pluginId === "string" ? entry.pluginId : "",
|
||||
pluginName: typeof entry.pluginName === "string" ? entry.pluginName : undefined,
|
||||
channel: typeof entry.channel === "string" ? normalizeChannel(entry.channel) : "",
|
||||
accountId:
|
||||
typeof entry.accountId === "string" ? entry.accountId.trim() || "default" : "default",
|
||||
approvedAt:
|
||||
typeof entry.approvedAt === "number" && Number.isFinite(entry.approvedAt)
|
||||
? Math.floor(entry.approvedAt)
|
||||
: Date.now(),
|
||||
}))
|
||||
.filter((entry) => entry.pluginRoot && entry.pluginId && entry.channel),
|
||||
};
|
||||
} catch (error) {
|
||||
log.warn(`plugin binding approvals load failed: ${String(error)}`);
|
||||
return { version: 1, approvals: [] };
|
||||
}
|
||||
}
|
||||
|
||||
async function saveApprovals(file: PluginBindingApprovalsFile): Promise<void> {
|
||||
const filePath = resolveApprovalsPath();
|
||||
fs.mkdirSync(path.dirname(filePath), { recursive: true });
|
||||
approvalsCache = file;
|
||||
approvalsLoaded = true;
|
||||
await writeJsonAtomic(filePath, file, {
|
||||
mode: 0o600,
|
||||
trailingNewline: true,
|
||||
});
|
||||
}
|
||||
|
||||
function getApprovals(): PluginBindingApprovalsFile {
|
||||
if (!approvalsLoaded || !approvalsCache) {
|
||||
approvalsCache = loadApprovalsFromDisk();
|
||||
approvalsLoaded = true;
|
||||
}
|
||||
return approvalsCache;
|
||||
}
|
||||
|
||||
function hasPersistentApproval(params: {
|
||||
pluginRoot: string;
|
||||
channel: string;
|
||||
accountId: string;
|
||||
}): boolean {
|
||||
const key = buildApprovalScopeKey(params);
|
||||
return getApprovals().approvals.some(
|
||||
(entry) =>
|
||||
buildApprovalScopeKey({
|
||||
pluginRoot: entry.pluginRoot,
|
||||
channel: entry.channel,
|
||||
accountId: entry.accountId,
|
||||
}) === key,
|
||||
);
|
||||
}
|
||||
|
||||
async function addPersistentApproval(entry: PluginBindingApprovalEntry): Promise<void> {
|
||||
const file = getApprovals();
|
||||
const key = buildApprovalScopeKey(entry);
|
||||
const approvals = file.approvals.filter(
|
||||
(existing) =>
|
||||
buildApprovalScopeKey({
|
||||
pluginRoot: existing.pluginRoot,
|
||||
channel: existing.channel,
|
||||
accountId: existing.accountId,
|
||||
}) !== key,
|
||||
);
|
||||
approvals.push(entry);
|
||||
await saveApprovals({
|
||||
version: 1,
|
||||
approvals,
|
||||
});
|
||||
}
|
||||
|
||||
function buildBindingMetadata(params: {
|
||||
pluginId: string;
|
||||
pluginName?: string;
|
||||
pluginRoot: string;
|
||||
summary?: string;
|
||||
detachHint?: string;
|
||||
}): PluginBindingMetadata {
|
||||
return {
|
||||
pluginBindingOwner: PLUGIN_BINDING_OWNER,
|
||||
pluginId: params.pluginId,
|
||||
pluginName: params.pluginName,
|
||||
pluginRoot: params.pluginRoot,
|
||||
summary: params.summary?.trim() || undefined,
|
||||
detachHint: params.detachHint?.trim() || undefined,
|
||||
};
|
||||
}
|
||||
|
||||
export function isPluginOwnedBindingMetadata(metadata: unknown): metadata is PluginBindingMetadata {
|
||||
if (!metadata || typeof metadata !== "object") {
|
||||
return false;
|
||||
}
|
||||
const record = metadata as Record<string, unknown>;
|
||||
return (
|
||||
record.pluginBindingOwner === PLUGIN_BINDING_OWNER &&
|
||||
typeof record.pluginId === "string" &&
|
||||
typeof record.pluginRoot === "string"
|
||||
);
|
||||
}
|
||||
|
||||
export function isPluginOwnedSessionBindingRecord(
|
||||
record:
|
||||
| {
|
||||
metadata?: Record<string, unknown>;
|
||||
}
|
||||
| null
|
||||
| undefined,
|
||||
): boolean {
|
||||
return isPluginOwnedBindingMetadata(record?.metadata);
|
||||
}
|
||||
|
||||
export function toPluginConversationBinding(
|
||||
record:
|
||||
| {
|
||||
bindingId: string;
|
||||
conversation: ConversationRef;
|
||||
boundAt: number;
|
||||
metadata?: Record<string, unknown>;
|
||||
}
|
||||
| null
|
||||
| undefined,
|
||||
): PluginConversationBinding | null {
|
||||
if (!record || !isPluginOwnedBindingMetadata(record.metadata)) {
|
||||
return null;
|
||||
}
|
||||
const metadata = record.metadata;
|
||||
return {
|
||||
bindingId: record.bindingId,
|
||||
pluginId: metadata.pluginId,
|
||||
pluginName: metadata.pluginName,
|
||||
pluginRoot: metadata.pluginRoot,
|
||||
channel: record.conversation.channel,
|
||||
accountId: record.conversation.accountId,
|
||||
conversationId: record.conversation.conversationId,
|
||||
parentConversationId: record.conversation.parentConversationId,
|
||||
boundAt: record.boundAt,
|
||||
summary: metadata.summary,
|
||||
detachHint: metadata.detachHint,
|
||||
};
|
||||
}
|
||||
|
||||
async function bindConversationNow(params: {
|
||||
identity: PluginBindingIdentity;
|
||||
conversation: PluginBindingConversation;
|
||||
summary?: string;
|
||||
detachHint?: string;
|
||||
}): Promise<PluginConversationBinding> {
|
||||
const ref = toConversationRef(params.conversation);
|
||||
const targetSessionKey = buildPluginBindingSessionKey({
|
||||
pluginId: params.identity.pluginId,
|
||||
channel: ref.channel,
|
||||
accountId: ref.accountId,
|
||||
conversationId: ref.conversationId,
|
||||
});
|
||||
const record = await getSessionBindingService().bind({
|
||||
targetSessionKey,
|
||||
targetKind: "session",
|
||||
conversation: ref,
|
||||
placement: "current",
|
||||
metadata: buildBindingMetadata({
|
||||
pluginId: params.identity.pluginId,
|
||||
pluginName: params.identity.pluginName,
|
||||
pluginRoot: params.identity.pluginRoot,
|
||||
summary: params.summary,
|
||||
detachHint: params.detachHint,
|
||||
}),
|
||||
});
|
||||
const binding = toPluginConversationBinding(record);
|
||||
if (!binding) {
|
||||
throw new Error("plugin binding was created without plugin metadata");
|
||||
}
|
||||
return {
|
||||
...binding,
|
||||
parentConversationId: params.conversation.parentConversationId,
|
||||
threadId: params.conversation.threadId,
|
||||
};
|
||||
}
|
||||
|
||||
function buildApprovalMessage(request: PendingPluginBindingRequest): string {
|
||||
const lines = [
|
||||
`Plugin bind approval required`,
|
||||
`Plugin: ${request.pluginName ?? request.pluginId}`,
|
||||
`Channel: ${request.conversation.channel}`,
|
||||
`Account: ${request.conversation.accountId}`,
|
||||
];
|
||||
if (request.summary?.trim()) {
|
||||
lines.push(`Request: ${request.summary.trim()}`);
|
||||
} else {
|
||||
lines.push("Request: Bind this conversation so future plain messages route to the plugin.");
|
||||
}
|
||||
lines.push("Choose whether to allow this plugin to bind the current conversation.");
|
||||
return lines.join("\n");
|
||||
}
|
||||
|
||||
function resolvePluginBindingDisplayName(binding: {
|
||||
pluginId: string;
|
||||
pluginName?: string;
|
||||
}): string {
|
||||
return binding.pluginName?.trim() || binding.pluginId;
|
||||
}
|
||||
|
||||
function buildDetachHintSuffix(detachHint?: string): string {
|
||||
const trimmed = detachHint?.trim();
|
||||
return trimmed ? ` To detach this conversation, use ${trimmed}.` : "";
|
||||
}
|
||||
|
||||
export function buildPluginBindingUnavailableText(binding: PluginConversationBinding): string {
|
||||
return `The bound plugin ${resolvePluginBindingDisplayName(binding)} is not currently loaded. Routing this message to OpenClaw instead.${buildDetachHintSuffix(binding.detachHint)}`;
|
||||
}
|
||||
|
||||
export function buildPluginBindingDeclinedText(binding: PluginConversationBinding): string {
|
||||
return `The bound plugin ${resolvePluginBindingDisplayName(binding)} did not handle this message. This conversation is still bound to that plugin.${buildDetachHintSuffix(binding.detachHint)}`;
|
||||
}
|
||||
|
||||
export function buildPluginBindingErrorText(binding: PluginConversationBinding): string {
|
||||
return `The bound plugin ${resolvePluginBindingDisplayName(binding)} hit an error handling this message. This conversation is still bound to that plugin.${buildDetachHintSuffix(binding.detachHint)}`;
|
||||
}
|
||||
|
||||
export function hasShownPluginBindingFallbackNotice(bindingId: string): boolean {
|
||||
const normalized = bindingId.trim();
|
||||
if (!normalized) {
|
||||
return false;
|
||||
}
|
||||
return getPluginBindingGlobalState().fallbackNoticeBindingIds.has(normalized);
|
||||
}
|
||||
|
||||
export function markPluginBindingFallbackNoticeShown(bindingId: string): void {
|
||||
const normalized = bindingId.trim();
|
||||
if (!normalized) {
|
||||
return;
|
||||
}
|
||||
getPluginBindingGlobalState().fallbackNoticeBindingIds.add(normalized);
|
||||
}
|
||||
|
||||
function buildPendingReply(request: PendingPluginBindingRequest): ReplyPayload {
|
||||
return {
|
||||
text: buildApprovalMessage(request),
|
||||
channelData: {
|
||||
telegram: {
|
||||
buttons: buildTelegramButtons(request.id),
|
||||
},
|
||||
discord: {
|
||||
components: buildDiscordButtonRow(request.id),
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function encodeCustomIdValue(value: string): string {
|
||||
return encodeURIComponent(value);
|
||||
}
|
||||
|
||||
function decodeCustomIdValue(value: string): string {
|
||||
try {
|
||||
return decodeURIComponent(value);
|
||||
} catch {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
export function buildPluginBindingApprovalCustomId(
|
||||
approvalId: string,
|
||||
decision: PluginBindingApprovalDecision,
|
||||
): string {
|
||||
const decisionCode = decision === "allow-once" ? "o" : decision === "allow-always" ? "a" : "d";
|
||||
return `${PLUGIN_BINDING_CUSTOM_ID_PREFIX}:${encodeCustomIdValue(approvalId)}:${decisionCode}`;
|
||||
}
|
||||
|
||||
export function parsePluginBindingApprovalCustomId(
|
||||
value: string,
|
||||
): PluginBindingApprovalAction | null {
|
||||
const trimmed = value.trim();
|
||||
if (!trimmed.startsWith(`${PLUGIN_BINDING_CUSTOM_ID_PREFIX}:`)) {
|
||||
return null;
|
||||
}
|
||||
const body = trimmed.slice(`${PLUGIN_BINDING_CUSTOM_ID_PREFIX}:`.length);
|
||||
const separator = body.lastIndexOf(":");
|
||||
if (separator <= 0 || separator === body.length - 1) {
|
||||
return null;
|
||||
}
|
||||
const rawId = body.slice(0, separator).trim();
|
||||
const rawDecisionCode = body.slice(separator + 1).trim();
|
||||
if (!rawId) {
|
||||
return null;
|
||||
}
|
||||
const rawDecision =
|
||||
rawDecisionCode === "o"
|
||||
? "allow-once"
|
||||
: rawDecisionCode === "a"
|
||||
? "allow-always"
|
||||
: rawDecisionCode === "d"
|
||||
? "deny"
|
||||
: null;
|
||||
if (!rawDecision) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
approvalId: decodeCustomIdValue(rawId),
|
||||
decision: rawDecision,
|
||||
};
|
||||
}
|
||||
|
||||
export async function requestPluginConversationBinding(params: {
|
||||
pluginId: string;
|
||||
pluginName?: string;
|
||||
pluginRoot: string;
|
||||
conversation: PluginBindingConversation;
|
||||
requestedBySenderId?: string;
|
||||
binding: PluginConversationBindingRequestParams | undefined;
|
||||
}): Promise<PluginConversationBindingRequestResult> {
|
||||
const conversation = normalizeConversation(params.conversation);
|
||||
const ref = toConversationRef(conversation);
|
||||
const existing = getSessionBindingService().resolveByConversation(ref);
|
||||
const existingPluginBinding = toPluginConversationBinding(existing);
|
||||
const existingLegacyPluginBinding = isLegacyPluginBindingRecord({
|
||||
record: existing,
|
||||
});
|
||||
if (existing && !existingPluginBinding) {
|
||||
if (existingLegacyPluginBinding) {
|
||||
log.info(
|
||||
`plugin binding migrating legacy record plugin=${params.pluginId} root=${params.pluginRoot} channel=${ref.channel} account=${ref.accountId} conversation=${ref.conversationId}`,
|
||||
);
|
||||
} else {
|
||||
return {
|
||||
status: "error",
|
||||
message:
|
||||
"This conversation is already bound by core routing and cannot be claimed by a plugin.",
|
||||
};
|
||||
}
|
||||
}
|
||||
if (existingPluginBinding && existingPluginBinding.pluginRoot !== params.pluginRoot) {
|
||||
return {
|
||||
status: "error",
|
||||
message: `This conversation is already bound by plugin "${existingPluginBinding.pluginName ?? existingPluginBinding.pluginId}".`,
|
||||
};
|
||||
}
|
||||
|
||||
if (existingPluginBinding && existingPluginBinding.pluginRoot === params.pluginRoot) {
|
||||
const rebound = await bindConversationNow({
|
||||
identity: {
|
||||
pluginId: params.pluginId,
|
||||
pluginName: params.pluginName,
|
||||
pluginRoot: params.pluginRoot,
|
||||
},
|
||||
conversation,
|
||||
summary: params.binding?.summary,
|
||||
detachHint: params.binding?.detachHint,
|
||||
});
|
||||
log.info(
|
||||
`plugin binding auto-refresh plugin=${params.pluginId} root=${params.pluginRoot} channel=${ref.channel} account=${ref.accountId} conversation=${ref.conversationId}`,
|
||||
);
|
||||
return { status: "bound", binding: rebound };
|
||||
}
|
||||
|
||||
if (
|
||||
hasPersistentApproval({
|
||||
pluginRoot: params.pluginRoot,
|
||||
channel: ref.channel,
|
||||
accountId: ref.accountId,
|
||||
})
|
||||
) {
|
||||
const bound = await bindConversationNow({
|
||||
identity: {
|
||||
pluginId: params.pluginId,
|
||||
pluginName: params.pluginName,
|
||||
pluginRoot: params.pluginRoot,
|
||||
},
|
||||
conversation,
|
||||
summary: params.binding?.summary,
|
||||
detachHint: params.binding?.detachHint,
|
||||
});
|
||||
log.info(
|
||||
`plugin binding auto-approved plugin=${params.pluginId} root=${params.pluginRoot} channel=${ref.channel} account=${ref.accountId} conversation=${ref.conversationId}`,
|
||||
);
|
||||
return { status: "bound", binding: bound };
|
||||
}
|
||||
|
||||
const request: PendingPluginBindingRequest = {
|
||||
id: createApprovalRequestId(),
|
||||
pluginId: params.pluginId,
|
||||
pluginName: params.pluginName,
|
||||
pluginRoot: params.pluginRoot,
|
||||
conversation,
|
||||
requestedAt: Date.now(),
|
||||
requestedBySenderId: params.requestedBySenderId?.trim() || undefined,
|
||||
summary: params.binding?.summary?.trim() || undefined,
|
||||
detachHint: params.binding?.detachHint?.trim() || undefined,
|
||||
};
|
||||
pendingRequests.set(request.id, request);
|
||||
log.info(
|
||||
`plugin binding requested plugin=${params.pluginId} root=${params.pluginRoot} channel=${ref.channel} account=${ref.accountId} conversation=${ref.conversationId}`,
|
||||
);
|
||||
return {
|
||||
status: "pending",
|
||||
approvalId: request.id,
|
||||
reply: buildPendingReply(request),
|
||||
};
|
||||
}
|
||||
|
||||
export async function getCurrentPluginConversationBinding(params: {
|
||||
pluginRoot: string;
|
||||
conversation: PluginBindingConversation;
|
||||
}): Promise<PluginConversationBinding | null> {
|
||||
const record = getSessionBindingService().resolveByConversation(
|
||||
toConversationRef(params.conversation),
|
||||
);
|
||||
const binding = toPluginConversationBinding(record);
|
||||
if (!binding || binding.pluginRoot !== params.pluginRoot) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
...binding,
|
||||
parentConversationId: params.conversation.parentConversationId,
|
||||
threadId: params.conversation.threadId,
|
||||
};
|
||||
}
|
||||
|
||||
export async function detachPluginConversationBinding(params: {
|
||||
pluginRoot: string;
|
||||
conversation: PluginBindingConversation;
|
||||
}): Promise<{ removed: boolean }> {
|
||||
const ref = toConversationRef(params.conversation);
|
||||
const record = getSessionBindingService().resolveByConversation(ref);
|
||||
const binding = toPluginConversationBinding(record);
|
||||
if (!binding || binding.pluginRoot !== params.pluginRoot) {
|
||||
return { removed: false };
|
||||
}
|
||||
await getSessionBindingService().unbind({
|
||||
bindingId: binding.bindingId,
|
||||
reason: "plugin-detach",
|
||||
});
|
||||
log.info(
|
||||
`plugin binding detached plugin=${binding.pluginId} root=${binding.pluginRoot} channel=${binding.channel} account=${binding.accountId} conversation=${binding.conversationId}`,
|
||||
);
|
||||
return { removed: true };
|
||||
}
|
||||
|
||||
export async function resolvePluginConversationBindingApproval(params: {
|
||||
approvalId: string;
|
||||
decision: PluginBindingApprovalDecision;
|
||||
senderId?: string;
|
||||
}): Promise<PluginBindingResolveResult> {
|
||||
const request = pendingRequests.get(params.approvalId);
|
||||
if (!request) {
|
||||
return { status: "expired" };
|
||||
}
|
||||
if (
|
||||
request.requestedBySenderId &&
|
||||
params.senderId?.trim() &&
|
||||
request.requestedBySenderId !== params.senderId.trim()
|
||||
) {
|
||||
return { status: "expired" };
|
||||
}
|
||||
pendingRequests.delete(params.approvalId);
|
||||
if (params.decision === "deny") {
|
||||
log.info(
|
||||
`plugin binding denied plugin=${request.pluginId} root=${request.pluginRoot} channel=${request.conversation.channel} account=${request.conversation.accountId} conversation=${request.conversation.conversationId}`,
|
||||
);
|
||||
return { status: "denied", request };
|
||||
}
|
||||
if (params.decision === "allow-always") {
|
||||
await addPersistentApproval({
|
||||
pluginRoot: request.pluginRoot,
|
||||
pluginId: request.pluginId,
|
||||
pluginName: request.pluginName,
|
||||
channel: request.conversation.channel,
|
||||
accountId: request.conversation.accountId,
|
||||
approvedAt: Date.now(),
|
||||
});
|
||||
}
|
||||
const binding = await bindConversationNow({
|
||||
identity: {
|
||||
pluginId: request.pluginId,
|
||||
pluginName: request.pluginName,
|
||||
pluginRoot: request.pluginRoot,
|
||||
},
|
||||
conversation: request.conversation,
|
||||
summary: request.summary,
|
||||
detachHint: request.detachHint,
|
||||
});
|
||||
log.info(
|
||||
`plugin binding approved plugin=${request.pluginId} root=${request.pluginRoot} decision=${params.decision} channel=${request.conversation.channel} account=${request.conversation.accountId} conversation=${request.conversation.conversationId}`,
|
||||
);
|
||||
return {
|
||||
status: "approved",
|
||||
binding,
|
||||
request,
|
||||
decision: params.decision,
|
||||
};
|
||||
}
|
||||
|
||||
export function buildPluginBindingResolvedText(params: PluginBindingResolveResult): string {
|
||||
if (params.status === "expired") {
|
||||
return "That plugin bind approval expired. Retry the bind command.";
|
||||
}
|
||||
if (params.status === "denied") {
|
||||
return `Denied plugin bind request for ${params.request.pluginName ?? params.request.pluginId}.`;
|
||||
}
|
||||
const summarySuffix = params.request.summary?.trim() ? ` ${params.request.summary.trim()}` : "";
|
||||
if (params.decision === "allow-always") {
|
||||
return `Allowed ${params.request.pluginName ?? params.request.pluginId} to bind this conversation.${summarySuffix}`;
|
||||
}
|
||||
return `Allowed ${params.request.pluginName ?? params.request.pluginId} to bind this conversation once.${summarySuffix}`;
|
||||
}
|
||||
|
||||
export const __testing = {
|
||||
reset() {
|
||||
pendingRequests.clear();
|
||||
approvalsCache = null;
|
||||
approvalsLoaded = false;
|
||||
getPluginBindingGlobalState().fallbackNoticeBindingIds.clear();
|
||||
},
|
||||
};
|
||||
@@ -5,6 +5,27 @@ export function createMockPluginRegistry(
|
||||
hooks: Array<{ hookName: string; handler: (...args: unknown[]) => unknown }>,
|
||||
): PluginRegistry {
|
||||
return {
|
||||
plugins: [
|
||||
{
|
||||
id: "test-plugin",
|
||||
name: "Test Plugin",
|
||||
source: "test",
|
||||
origin: "workspace",
|
||||
enabled: true,
|
||||
status: "loaded",
|
||||
toolNames: [],
|
||||
hookNames: [],
|
||||
channelIds: [],
|
||||
providerIds: [],
|
||||
gatewayMethods: [],
|
||||
cliCommands: [],
|
||||
services: [],
|
||||
commands: [],
|
||||
httpRoutes: 0,
|
||||
hookCount: hooks.length,
|
||||
configSchema: false,
|
||||
},
|
||||
],
|
||||
hooks: hooks as never[],
|
||||
typedHooks: hooks.map((h) => ({
|
||||
pluginId: "test-plugin",
|
||||
|
||||
@@ -19,6 +19,9 @@ import type {
|
||||
PluginHookBeforePromptBuildEvent,
|
||||
PluginHookBeforePromptBuildResult,
|
||||
PluginHookBeforeCompactionEvent,
|
||||
PluginHookInboundClaimContext,
|
||||
PluginHookInboundClaimEvent,
|
||||
PluginHookInboundClaimResult,
|
||||
PluginHookLlmInputEvent,
|
||||
PluginHookLlmOutputEvent,
|
||||
PluginHookBeforeResetEvent,
|
||||
@@ -66,6 +69,9 @@ export type {
|
||||
PluginHookAgentEndEvent,
|
||||
PluginHookBeforeCompactionEvent,
|
||||
PluginHookBeforeResetEvent,
|
||||
PluginHookInboundClaimContext,
|
||||
PluginHookInboundClaimEvent,
|
||||
PluginHookInboundClaimResult,
|
||||
PluginHookAfterCompactionEvent,
|
||||
PluginHookMessageContext,
|
||||
PluginHookMessageReceivedEvent,
|
||||
@@ -108,6 +114,25 @@ export type HookRunnerOptions = {
|
||||
catchErrors?: boolean;
|
||||
};
|
||||
|
||||
export type PluginTargetedInboundClaimOutcome =
|
||||
| {
|
||||
status: "handled";
|
||||
result: PluginHookInboundClaimResult;
|
||||
}
|
||||
| {
|
||||
status: "missing_plugin";
|
||||
}
|
||||
| {
|
||||
status: "no_handler";
|
||||
}
|
||||
| {
|
||||
status: "declined";
|
||||
}
|
||||
| {
|
||||
status: "error";
|
||||
error: string;
|
||||
};
|
||||
|
||||
/**
|
||||
* Get hooks for a specific hook name, sorted by priority (higher first).
|
||||
*/
|
||||
@@ -120,6 +145,14 @@ function getHooksForName<K extends PluginHookName>(
|
||||
.toSorted((a, b) => (b.priority ?? 0) - (a.priority ?? 0));
|
||||
}
|
||||
|
||||
function getHooksForNameAndPlugin<K extends PluginHookName>(
|
||||
registry: PluginRegistry,
|
||||
hookName: K,
|
||||
pluginId: string,
|
||||
): PluginHookRegistration<K>[] {
|
||||
return getHooksForName(registry, hookName).filter((hook) => hook.pluginId === pluginId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a hook runner for a specific registry.
|
||||
*/
|
||||
@@ -196,6 +229,12 @@ export function createHookRunner(registry: PluginRegistry, options: HookRunnerOp
|
||||
throw new Error(msg, { cause: params.error });
|
||||
};
|
||||
|
||||
const sanitizeHookError = (error: unknown): string => {
|
||||
const raw = error instanceof Error ? error.message : String(error);
|
||||
const firstLine = raw.split("\n")[0]?.trim();
|
||||
return firstLine || "unknown error";
|
||||
};
|
||||
|
||||
/**
|
||||
* Run a hook that doesn't return a value (fire-and-forget style).
|
||||
* All handlers are executed in parallel for performance.
|
||||
@@ -263,6 +302,123 @@ export function createHookRunner(registry: PluginRegistry, options: HookRunnerOp
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a sequential claim hook where the first `{ handled: true }` result wins.
|
||||
*/
|
||||
async function runClaimingHook<K extends PluginHookName, TResult extends { handled: boolean }>(
|
||||
hookName: K,
|
||||
event: Parameters<NonNullable<PluginHookRegistration<K>["handler"]>>[0],
|
||||
ctx: Parameters<NonNullable<PluginHookRegistration<K>["handler"]>>[1],
|
||||
): Promise<TResult | undefined> {
|
||||
const hooks = getHooksForName(registry, hookName);
|
||||
if (hooks.length === 0) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
logger?.debug?.(`[hooks] running ${hookName} (${hooks.length} handlers, first-claim wins)`);
|
||||
|
||||
for (const hook of hooks) {
|
||||
try {
|
||||
const handlerResult = await (
|
||||
hook.handler as (event: unknown, ctx: unknown) => Promise<TResult | void>
|
||||
)(event, ctx);
|
||||
if (handlerResult?.handled) {
|
||||
return handlerResult;
|
||||
}
|
||||
} catch (err) {
|
||||
handleHookError({ hookName, pluginId: hook.pluginId, error: err });
|
||||
}
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
|
||||
async function runClaimingHookForPlugin<
|
||||
K extends PluginHookName,
|
||||
TResult extends { handled: boolean },
|
||||
>(
|
||||
hookName: K,
|
||||
pluginId: string,
|
||||
event: Parameters<NonNullable<PluginHookRegistration<K>["handler"]>>[0],
|
||||
ctx: Parameters<NonNullable<PluginHookRegistration<K>["handler"]>>[1],
|
||||
): Promise<TResult | undefined> {
|
||||
const hooks = getHooksForNameAndPlugin(registry, hookName, pluginId);
|
||||
if (hooks.length === 0) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
logger?.debug?.(
|
||||
`[hooks] running ${hookName} for ${pluginId} (${hooks.length} handlers, targeted)`,
|
||||
);
|
||||
|
||||
for (const hook of hooks) {
|
||||
try {
|
||||
const handlerResult = await (
|
||||
hook.handler as (event: unknown, ctx: unknown) => Promise<TResult | void>
|
||||
)(event, ctx);
|
||||
if (handlerResult?.handled) {
|
||||
return handlerResult;
|
||||
}
|
||||
} catch (err) {
|
||||
handleHookError({ hookName, pluginId: hook.pluginId, error: err });
|
||||
}
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
|
||||
async function runClaimingHookForPluginOutcome<
|
||||
K extends PluginHookName,
|
||||
TResult extends { handled: boolean },
|
||||
>(
|
||||
hookName: K,
|
||||
pluginId: string,
|
||||
event: Parameters<NonNullable<PluginHookRegistration<K>["handler"]>>[0],
|
||||
ctx: Parameters<NonNullable<PluginHookRegistration<K>["handler"]>>[1],
|
||||
): Promise<
|
||||
| { status: "handled"; result: TResult }
|
||||
| { status: "missing_plugin" }
|
||||
| { status: "no_handler" }
|
||||
| { status: "declined" }
|
||||
| { status: "error"; error: string }
|
||||
> {
|
||||
const pluginLoaded = registry.plugins.some(
|
||||
(plugin) => plugin.id === pluginId && plugin.status === "loaded",
|
||||
);
|
||||
if (!pluginLoaded) {
|
||||
return { status: "missing_plugin" };
|
||||
}
|
||||
|
||||
const hooks = getHooksForNameAndPlugin(registry, hookName, pluginId);
|
||||
if (hooks.length === 0) {
|
||||
return { status: "no_handler" };
|
||||
}
|
||||
|
||||
logger?.debug?.(
|
||||
`[hooks] running ${hookName} for ${pluginId} (${hooks.length} handlers, targeted outcome)`,
|
||||
);
|
||||
|
||||
let firstError: string | null = null;
|
||||
for (const hook of hooks) {
|
||||
try {
|
||||
const handlerResult = await (
|
||||
hook.handler as (event: unknown, ctx: unknown) => Promise<TResult | void>
|
||||
)(event, ctx);
|
||||
if (handlerResult?.handled) {
|
||||
return { status: "handled", result: handlerResult };
|
||||
}
|
||||
} catch (err) {
|
||||
firstError ??= sanitizeHookError(err);
|
||||
handleHookError({ hookName, pluginId: hook.pluginId, error: err });
|
||||
}
|
||||
}
|
||||
|
||||
if (firstError) {
|
||||
return { status: "error", error: firstError };
|
||||
}
|
||||
return { status: "declined" };
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Agent Hooks
|
||||
// =========================================================================
|
||||
@@ -384,6 +540,47 @@ export function createHookRunner(registry: PluginRegistry, options: HookRunnerOp
|
||||
// Message Hooks
|
||||
// =========================================================================
|
||||
|
||||
/**
|
||||
* Run inbound_claim hook.
|
||||
* Allows plugins to claim an inbound event before commands/agent dispatch.
|
||||
*/
|
||||
async function runInboundClaim(
|
||||
event: PluginHookInboundClaimEvent,
|
||||
ctx: PluginHookInboundClaimContext,
|
||||
): Promise<PluginHookInboundClaimResult | undefined> {
|
||||
return runClaimingHook<"inbound_claim", PluginHookInboundClaimResult>(
|
||||
"inbound_claim",
|
||||
event,
|
||||
ctx,
|
||||
);
|
||||
}
|
||||
|
||||
async function runInboundClaimForPlugin(
|
||||
pluginId: string,
|
||||
event: PluginHookInboundClaimEvent,
|
||||
ctx: PluginHookInboundClaimContext,
|
||||
): Promise<PluginHookInboundClaimResult | undefined> {
|
||||
return runClaimingHookForPlugin<"inbound_claim", PluginHookInboundClaimResult>(
|
||||
"inbound_claim",
|
||||
pluginId,
|
||||
event,
|
||||
ctx,
|
||||
);
|
||||
}
|
||||
|
||||
async function runInboundClaimForPluginOutcome(
|
||||
pluginId: string,
|
||||
event: PluginHookInboundClaimEvent,
|
||||
ctx: PluginHookInboundClaimContext,
|
||||
): Promise<PluginTargetedInboundClaimOutcome> {
|
||||
return runClaimingHookForPluginOutcome<"inbound_claim", PluginHookInboundClaimResult>(
|
||||
"inbound_claim",
|
||||
pluginId,
|
||||
event,
|
||||
ctx,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Run message_received hook.
|
||||
* Runs in parallel (fire-and-forget).
|
||||
@@ -734,6 +931,9 @@ export function createHookRunner(registry: PluginRegistry, options: HookRunnerOp
|
||||
runAfterCompaction,
|
||||
runBeforeReset,
|
||||
// Message hooks
|
||||
runInboundClaim,
|
||||
runInboundClaimForPlugin,
|
||||
runInboundClaimForPluginOutcome,
|
||||
runMessageReceived,
|
||||
runMessageSending,
|
||||
runMessageSent,
|
||||
|
||||
201
src/plugins/interactive.test.ts
Normal file
201
src/plugins/interactive.test.ts
Normal file
@@ -0,0 +1,201 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
clearPluginInteractiveHandlers,
|
||||
dispatchPluginInteractiveHandler,
|
||||
registerPluginInteractiveHandler,
|
||||
} from "./interactive.js";
|
||||
|
||||
describe("plugin interactive handlers", () => {
|
||||
beforeEach(() => {
|
||||
clearPluginInteractiveHandlers();
|
||||
});
|
||||
|
||||
it("routes Telegram callbacks by namespace and dedupes callback ids", async () => {
|
||||
const handler = vi.fn(async () => ({ handled: true }));
|
||||
expect(
|
||||
registerPluginInteractiveHandler("codex-plugin", {
|
||||
channel: "telegram",
|
||||
namespace: "codex",
|
||||
handler,
|
||||
}),
|
||||
).toEqual({ ok: true });
|
||||
|
||||
const baseParams = {
|
||||
channel: "telegram" as const,
|
||||
data: "codex:resume:thread-1",
|
||||
callbackId: "cb-1",
|
||||
ctx: {
|
||||
accountId: "default",
|
||||
callbackId: "cb-1",
|
||||
conversationId: "-10099:topic:77",
|
||||
parentConversationId: "-10099",
|
||||
senderId: "user-1",
|
||||
senderUsername: "ada",
|
||||
threadId: 77,
|
||||
isGroup: true,
|
||||
isForum: true,
|
||||
auth: { isAuthorizedSender: true },
|
||||
callbackMessage: {
|
||||
messageId: 55,
|
||||
chatId: "-10099",
|
||||
messageText: "Pick a thread",
|
||||
},
|
||||
},
|
||||
respond: {
|
||||
reply: vi.fn(async () => {}),
|
||||
editMessage: vi.fn(async () => {}),
|
||||
editButtons: vi.fn(async () => {}),
|
||||
clearButtons: vi.fn(async () => {}),
|
||||
deleteMessage: vi.fn(async () => {}),
|
||||
},
|
||||
};
|
||||
|
||||
const first = await dispatchPluginInteractiveHandler(baseParams);
|
||||
const duplicate = await dispatchPluginInteractiveHandler(baseParams);
|
||||
|
||||
expect(first).toEqual({ matched: true, handled: true, duplicate: false });
|
||||
expect(duplicate).toEqual({ matched: true, handled: true, duplicate: true });
|
||||
expect(handler).toHaveBeenCalledTimes(1);
|
||||
expect(handler).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
channel: "telegram",
|
||||
conversationId: "-10099:topic:77",
|
||||
callback: expect.objectContaining({
|
||||
namespace: "codex",
|
||||
payload: "resume:thread-1",
|
||||
chatId: "-10099",
|
||||
messageId: 55,
|
||||
}),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("rejects duplicate namespace registrations", () => {
|
||||
const first = registerPluginInteractiveHandler("plugin-a", {
|
||||
channel: "telegram",
|
||||
namespace: "codex",
|
||||
handler: async () => ({ handled: true }),
|
||||
});
|
||||
const second = registerPluginInteractiveHandler("plugin-b", {
|
||||
channel: "telegram",
|
||||
namespace: "codex",
|
||||
handler: async () => ({ handled: true }),
|
||||
});
|
||||
|
||||
expect(first).toEqual({ ok: true });
|
||||
expect(second).toEqual({
|
||||
ok: false,
|
||||
error: 'Interactive handler namespace "codex" already registered by plugin "plugin-a"',
|
||||
});
|
||||
});
|
||||
|
||||
it("routes Discord interactions by namespace and dedupes interaction ids", async () => {
|
||||
const handler = vi.fn(async () => ({ handled: true }));
|
||||
expect(
|
||||
registerPluginInteractiveHandler("codex-plugin", {
|
||||
channel: "discord",
|
||||
namespace: "codex",
|
||||
handler,
|
||||
}),
|
||||
).toEqual({ ok: true });
|
||||
|
||||
const baseParams = {
|
||||
channel: "discord" as const,
|
||||
data: "codex:approve:thread-1",
|
||||
interactionId: "ix-1",
|
||||
ctx: {
|
||||
accountId: "default",
|
||||
interactionId: "ix-1",
|
||||
conversationId: "channel-1",
|
||||
parentConversationId: "parent-1",
|
||||
guildId: "guild-1",
|
||||
senderId: "user-1",
|
||||
senderUsername: "ada",
|
||||
auth: { isAuthorizedSender: true },
|
||||
interaction: {
|
||||
kind: "button" as const,
|
||||
messageId: "message-1",
|
||||
values: ["allow"],
|
||||
},
|
||||
},
|
||||
respond: {
|
||||
acknowledge: vi.fn(async () => {}),
|
||||
reply: vi.fn(async () => {}),
|
||||
followUp: vi.fn(async () => {}),
|
||||
editMessage: vi.fn(async () => {}),
|
||||
clearComponents: vi.fn(async () => {}),
|
||||
},
|
||||
};
|
||||
|
||||
const first = await dispatchPluginInteractiveHandler(baseParams);
|
||||
const duplicate = await dispatchPluginInteractiveHandler(baseParams);
|
||||
|
||||
expect(first).toEqual({ matched: true, handled: true, duplicate: false });
|
||||
expect(duplicate).toEqual({ matched: true, handled: true, duplicate: true });
|
||||
expect(handler).toHaveBeenCalledTimes(1);
|
||||
expect(handler).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
channel: "discord",
|
||||
conversationId: "channel-1",
|
||||
interaction: expect.objectContaining({
|
||||
namespace: "codex",
|
||||
payload: "approve:thread-1",
|
||||
messageId: "message-1",
|
||||
values: ["allow"],
|
||||
}),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("does not consume dedupe keys when a handler throws", async () => {
|
||||
const handler = vi
|
||||
.fn(async () => ({ handled: true }))
|
||||
.mockRejectedValueOnce(new Error("boom"))
|
||||
.mockResolvedValueOnce({ handled: true });
|
||||
expect(
|
||||
registerPluginInteractiveHandler("codex-plugin", {
|
||||
channel: "telegram",
|
||||
namespace: "codex",
|
||||
handler,
|
||||
}),
|
||||
).toEqual({ ok: true });
|
||||
|
||||
const baseParams = {
|
||||
channel: "telegram" as const,
|
||||
data: "codex:resume:thread-1",
|
||||
callbackId: "cb-throw",
|
||||
ctx: {
|
||||
accountId: "default",
|
||||
callbackId: "cb-throw",
|
||||
conversationId: "-10099:topic:77",
|
||||
parentConversationId: "-10099",
|
||||
senderId: "user-1",
|
||||
senderUsername: "ada",
|
||||
threadId: 77,
|
||||
isGroup: true,
|
||||
isForum: true,
|
||||
auth: { isAuthorizedSender: true },
|
||||
callbackMessage: {
|
||||
messageId: 55,
|
||||
chatId: "-10099",
|
||||
messageText: "Pick a thread",
|
||||
},
|
||||
},
|
||||
respond: {
|
||||
reply: vi.fn(async () => {}),
|
||||
editMessage: vi.fn(async () => {}),
|
||||
editButtons: vi.fn(async () => {}),
|
||||
clearButtons: vi.fn(async () => {}),
|
||||
deleteMessage: vi.fn(async () => {}),
|
||||
},
|
||||
};
|
||||
|
||||
await expect(dispatchPluginInteractiveHandler(baseParams)).rejects.toThrow("boom");
|
||||
await expect(dispatchPluginInteractiveHandler(baseParams)).resolves.toEqual({
|
||||
matched: true,
|
||||
handled: true,
|
||||
duplicate: false,
|
||||
});
|
||||
expect(handler).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
366
src/plugins/interactive.ts
Normal file
366
src/plugins/interactive.ts
Normal file
@@ -0,0 +1,366 @@
|
||||
import { createDedupeCache } from "../infra/dedupe.js";
|
||||
import {
|
||||
detachPluginConversationBinding,
|
||||
getCurrentPluginConversationBinding,
|
||||
requestPluginConversationBinding,
|
||||
} from "./conversation-binding.js";
|
||||
import type {
|
||||
PluginInteractiveDiscordHandlerContext,
|
||||
PluginInteractiveButtons,
|
||||
PluginInteractiveDiscordHandlerRegistration,
|
||||
PluginInteractiveHandlerRegistration,
|
||||
PluginInteractiveTelegramHandlerRegistration,
|
||||
PluginInteractiveTelegramHandlerContext,
|
||||
} from "./types.js";
|
||||
|
||||
type RegisteredInteractiveHandler = PluginInteractiveHandlerRegistration & {
|
||||
pluginId: string;
|
||||
pluginName?: string;
|
||||
pluginRoot?: string;
|
||||
};
|
||||
|
||||
type InteractiveRegistrationResult = {
|
||||
ok: boolean;
|
||||
error?: string;
|
||||
};
|
||||
|
||||
type InteractiveDispatchResult =
|
||||
| { matched: false; handled: false; duplicate: false }
|
||||
| { matched: true; handled: boolean; duplicate: boolean };
|
||||
|
||||
type TelegramInteractiveDispatchContext = Omit<
|
||||
PluginInteractiveTelegramHandlerContext,
|
||||
| "callback"
|
||||
| "respond"
|
||||
| "channel"
|
||||
| "requestConversationBinding"
|
||||
| "detachConversationBinding"
|
||||
| "getCurrentConversationBinding"
|
||||
> & {
|
||||
callbackMessage: {
|
||||
messageId: number;
|
||||
chatId: string;
|
||||
messageText?: string;
|
||||
};
|
||||
};
|
||||
|
||||
type DiscordInteractiveDispatchContext = Omit<
|
||||
PluginInteractiveDiscordHandlerContext,
|
||||
| "interaction"
|
||||
| "respond"
|
||||
| "channel"
|
||||
| "requestConversationBinding"
|
||||
| "detachConversationBinding"
|
||||
| "getCurrentConversationBinding"
|
||||
> & {
|
||||
interaction: Omit<
|
||||
PluginInteractiveDiscordHandlerContext["interaction"],
|
||||
"data" | "namespace" | "payload"
|
||||
>;
|
||||
};
|
||||
|
||||
const interactiveHandlers = new Map<string, RegisteredInteractiveHandler>();
|
||||
const callbackDedupe = createDedupeCache({
|
||||
ttlMs: 5 * 60_000,
|
||||
maxSize: 4096,
|
||||
});
|
||||
|
||||
function toRegistryKey(channel: string, namespace: string): string {
|
||||
return `${channel.trim().toLowerCase()}:${namespace.trim()}`;
|
||||
}
|
||||
|
||||
function normalizeNamespace(namespace: string): string {
|
||||
return namespace.trim();
|
||||
}
|
||||
|
||||
function validateNamespace(namespace: string): string | null {
|
||||
if (!namespace.trim()) {
|
||||
return "Interactive handler namespace cannot be empty";
|
||||
}
|
||||
if (!/^[A-Za-z0-9._-]+$/.test(namespace.trim())) {
|
||||
return "Interactive handler namespace must contain only letters, numbers, dots, underscores, and hyphens";
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function resolveNamespaceMatch(
|
||||
channel: string,
|
||||
data: string,
|
||||
): { registration: RegisteredInteractiveHandler; namespace: string; payload: string } | null {
|
||||
const trimmedData = data.trim();
|
||||
if (!trimmedData) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const separatorIndex = trimmedData.indexOf(":");
|
||||
const namespace =
|
||||
separatorIndex >= 0 ? trimmedData.slice(0, separatorIndex) : normalizeNamespace(trimmedData);
|
||||
const registration = interactiveHandlers.get(toRegistryKey(channel, namespace));
|
||||
if (!registration) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
registration,
|
||||
namespace,
|
||||
payload: separatorIndex >= 0 ? trimmedData.slice(separatorIndex + 1) : "",
|
||||
};
|
||||
}
|
||||
|
||||
export function registerPluginInteractiveHandler(
|
||||
pluginId: string,
|
||||
registration: PluginInteractiveHandlerRegistration,
|
||||
opts?: { pluginName?: string; pluginRoot?: string },
|
||||
): InteractiveRegistrationResult {
|
||||
const namespace = normalizeNamespace(registration.namespace);
|
||||
const validationError = validateNamespace(namespace);
|
||||
if (validationError) {
|
||||
return { ok: false, error: validationError };
|
||||
}
|
||||
const key = toRegistryKey(registration.channel, namespace);
|
||||
const existing = interactiveHandlers.get(key);
|
||||
if (existing) {
|
||||
return {
|
||||
ok: false,
|
||||
error: `Interactive handler namespace "${namespace}" already registered by plugin "${existing.pluginId}"`,
|
||||
};
|
||||
}
|
||||
if (registration.channel === "telegram") {
|
||||
interactiveHandlers.set(key, {
|
||||
...registration,
|
||||
namespace,
|
||||
channel: "telegram",
|
||||
pluginId,
|
||||
pluginName: opts?.pluginName,
|
||||
pluginRoot: opts?.pluginRoot,
|
||||
});
|
||||
} else {
|
||||
interactiveHandlers.set(key, {
|
||||
...registration,
|
||||
namespace,
|
||||
channel: "discord",
|
||||
pluginId,
|
||||
pluginName: opts?.pluginName,
|
||||
pluginRoot: opts?.pluginRoot,
|
||||
});
|
||||
}
|
||||
return { ok: true };
|
||||
}
|
||||
|
||||
export function clearPluginInteractiveHandlers(): void {
|
||||
interactiveHandlers.clear();
|
||||
callbackDedupe.clear();
|
||||
}
|
||||
|
||||
export function clearPluginInteractiveHandlersForPlugin(pluginId: string): void {
|
||||
for (const [key, value] of interactiveHandlers.entries()) {
|
||||
if (value.pluginId === pluginId) {
|
||||
interactiveHandlers.delete(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export async function dispatchPluginInteractiveHandler(params: {
|
||||
channel: "telegram";
|
||||
data: string;
|
||||
callbackId: string;
|
||||
ctx: TelegramInteractiveDispatchContext;
|
||||
respond: {
|
||||
reply: (params: { text: string; buttons?: PluginInteractiveButtons }) => Promise<void>;
|
||||
editMessage: (params: { text: string; buttons?: PluginInteractiveButtons }) => Promise<void>;
|
||||
editButtons: (params: { buttons: PluginInteractiveButtons }) => Promise<void>;
|
||||
clearButtons: () => Promise<void>;
|
||||
deleteMessage: () => Promise<void>;
|
||||
};
|
||||
}): Promise<InteractiveDispatchResult>;
|
||||
export async function dispatchPluginInteractiveHandler(params: {
|
||||
channel: "discord";
|
||||
data: string;
|
||||
interactionId: string;
|
||||
ctx: DiscordInteractiveDispatchContext;
|
||||
respond: PluginInteractiveDiscordHandlerContext["respond"];
|
||||
}): Promise<InteractiveDispatchResult>;
|
||||
export async function dispatchPluginInteractiveHandler(params: {
|
||||
channel: "telegram" | "discord";
|
||||
data: string;
|
||||
callbackId?: string;
|
||||
interactionId?: string;
|
||||
ctx: TelegramInteractiveDispatchContext | DiscordInteractiveDispatchContext;
|
||||
respond:
|
||||
| {
|
||||
reply: (params: { text: string; buttons?: PluginInteractiveButtons }) => Promise<void>;
|
||||
editMessage: (params: {
|
||||
text: string;
|
||||
buttons?: PluginInteractiveButtons;
|
||||
}) => Promise<void>;
|
||||
editButtons: (params: { buttons: PluginInteractiveButtons }) => Promise<void>;
|
||||
clearButtons: () => Promise<void>;
|
||||
deleteMessage: () => Promise<void>;
|
||||
}
|
||||
| PluginInteractiveDiscordHandlerContext["respond"];
|
||||
}): Promise<InteractiveDispatchResult> {
|
||||
const match = resolveNamespaceMatch(params.channel, params.data);
|
||||
if (!match) {
|
||||
return { matched: false, handled: false, duplicate: false };
|
||||
}
|
||||
|
||||
const dedupeKey =
|
||||
params.channel === "telegram" ? params.callbackId?.trim() : params.interactionId?.trim();
|
||||
if (dedupeKey && callbackDedupe.peek(dedupeKey)) {
|
||||
return { matched: true, handled: true, duplicate: true };
|
||||
}
|
||||
|
||||
let result:
|
||||
| ReturnType<PluginInteractiveTelegramHandlerRegistration["handler"]>
|
||||
| ReturnType<PluginInteractiveDiscordHandlerRegistration["handler"]>;
|
||||
if (params.channel === "telegram") {
|
||||
const pluginRoot = match.registration.pluginRoot;
|
||||
const { callbackMessage, ...handlerContext } = params.ctx as TelegramInteractiveDispatchContext;
|
||||
result = (
|
||||
match.registration as RegisteredInteractiveHandler &
|
||||
PluginInteractiveTelegramHandlerRegistration
|
||||
).handler({
|
||||
...handlerContext,
|
||||
channel: "telegram",
|
||||
callback: {
|
||||
data: params.data,
|
||||
namespace: match.namespace,
|
||||
payload: match.payload,
|
||||
messageId: callbackMessage.messageId,
|
||||
chatId: callbackMessage.chatId,
|
||||
messageText: callbackMessage.messageText,
|
||||
},
|
||||
respond: params.respond as PluginInteractiveTelegramHandlerContext["respond"],
|
||||
requestConversationBinding: async (bindingParams) => {
|
||||
if (!pluginRoot) {
|
||||
return {
|
||||
status: "error",
|
||||
message: "This interaction cannot bind the current conversation.",
|
||||
};
|
||||
}
|
||||
return requestPluginConversationBinding({
|
||||
pluginId: match.registration.pluginId,
|
||||
pluginName: match.registration.pluginName,
|
||||
pluginRoot,
|
||||
requestedBySenderId: handlerContext.senderId,
|
||||
conversation: {
|
||||
channel: "telegram",
|
||||
accountId: handlerContext.accountId,
|
||||
conversationId: handlerContext.conversationId,
|
||||
parentConversationId: handlerContext.parentConversationId,
|
||||
threadId: handlerContext.threadId,
|
||||
},
|
||||
binding: bindingParams,
|
||||
});
|
||||
},
|
||||
detachConversationBinding: async () => {
|
||||
if (!pluginRoot) {
|
||||
return { removed: false };
|
||||
}
|
||||
return detachPluginConversationBinding({
|
||||
pluginRoot,
|
||||
conversation: {
|
||||
channel: "telegram",
|
||||
accountId: handlerContext.accountId,
|
||||
conversationId: handlerContext.conversationId,
|
||||
parentConversationId: handlerContext.parentConversationId,
|
||||
threadId: handlerContext.threadId,
|
||||
},
|
||||
});
|
||||
},
|
||||
getCurrentConversationBinding: async () => {
|
||||
if (!pluginRoot) {
|
||||
return null;
|
||||
}
|
||||
return getCurrentPluginConversationBinding({
|
||||
pluginRoot,
|
||||
conversation: {
|
||||
channel: "telegram",
|
||||
accountId: handlerContext.accountId,
|
||||
conversationId: handlerContext.conversationId,
|
||||
parentConversationId: handlerContext.parentConversationId,
|
||||
threadId: handlerContext.threadId,
|
||||
},
|
||||
});
|
||||
},
|
||||
});
|
||||
} else {
|
||||
const pluginRoot = match.registration.pluginRoot;
|
||||
result = (
|
||||
match.registration as RegisteredInteractiveHandler &
|
||||
PluginInteractiveDiscordHandlerRegistration
|
||||
).handler({
|
||||
...(params.ctx as DiscordInteractiveDispatchContext),
|
||||
channel: "discord",
|
||||
interaction: {
|
||||
...(params.ctx as DiscordInteractiveDispatchContext).interaction,
|
||||
data: params.data,
|
||||
namespace: match.namespace,
|
||||
payload: match.payload,
|
||||
},
|
||||
respond: params.respond as PluginInteractiveDiscordHandlerContext["respond"],
|
||||
requestConversationBinding: async (bindingParams) => {
|
||||
if (!pluginRoot) {
|
||||
return {
|
||||
status: "error",
|
||||
message: "This interaction cannot bind the current conversation.",
|
||||
};
|
||||
}
|
||||
const handlerContext = params.ctx as DiscordInteractiveDispatchContext;
|
||||
return requestPluginConversationBinding({
|
||||
pluginId: match.registration.pluginId,
|
||||
pluginName: match.registration.pluginName,
|
||||
pluginRoot,
|
||||
requestedBySenderId: handlerContext.senderId,
|
||||
conversation: {
|
||||
channel: "discord",
|
||||
accountId: handlerContext.accountId,
|
||||
conversationId: handlerContext.conversationId,
|
||||
parentConversationId: handlerContext.parentConversationId,
|
||||
},
|
||||
binding: bindingParams,
|
||||
});
|
||||
},
|
||||
detachConversationBinding: async () => {
|
||||
if (!pluginRoot) {
|
||||
return { removed: false };
|
||||
}
|
||||
const handlerContext = params.ctx as DiscordInteractiveDispatchContext;
|
||||
return detachPluginConversationBinding({
|
||||
pluginRoot,
|
||||
conversation: {
|
||||
channel: "discord",
|
||||
accountId: handlerContext.accountId,
|
||||
conversationId: handlerContext.conversationId,
|
||||
parentConversationId: handlerContext.parentConversationId,
|
||||
},
|
||||
});
|
||||
},
|
||||
getCurrentConversationBinding: async () => {
|
||||
if (!pluginRoot) {
|
||||
return null;
|
||||
}
|
||||
const handlerContext = params.ctx as DiscordInteractiveDispatchContext;
|
||||
return getCurrentPluginConversationBinding({
|
||||
pluginRoot,
|
||||
conversation: {
|
||||
channel: "discord",
|
||||
accountId: handlerContext.accountId,
|
||||
conversationId: handlerContext.conversationId,
|
||||
parentConversationId: handlerContext.parentConversationId,
|
||||
},
|
||||
});
|
||||
},
|
||||
});
|
||||
}
|
||||
const resolved = await result;
|
||||
if (dedupeKey) {
|
||||
callbackDedupe.check(dedupeKey);
|
||||
}
|
||||
|
||||
return {
|
||||
matched: true,
|
||||
handled: resolved?.handled ?? true,
|
||||
duplicate: false,
|
||||
};
|
||||
}
|
||||
@@ -19,6 +19,7 @@ import {
|
||||
} from "./config-state.js";
|
||||
import { discoverOpenClawPlugins } from "./discovery.js";
|
||||
import { initializeGlobalHookRunner } from "./hook-runner-global.js";
|
||||
import { clearPluginInteractiveHandlers } from "./interactive.js";
|
||||
import { loadPluginManifestRegistry } from "./manifest-registry.js";
|
||||
import { isPathInside, safeStatSync } from "./path-safety.js";
|
||||
import { createPluginRegistry, type PluginRecord, type PluginRegistry } from "./registry.js";
|
||||
@@ -317,6 +318,7 @@ function createPluginRecord(params: {
|
||||
description?: string;
|
||||
version?: string;
|
||||
source: string;
|
||||
rootDir?: string;
|
||||
origin: PluginRecord["origin"];
|
||||
workspaceDir?: string;
|
||||
enabled: boolean;
|
||||
@@ -328,6 +330,7 @@ function createPluginRecord(params: {
|
||||
description: params.description,
|
||||
version: params.version,
|
||||
source: params.source,
|
||||
rootDir: params.rootDir,
|
||||
origin: params.origin,
|
||||
workspaceDir: params.workspaceDir,
|
||||
enabled: params.enabled,
|
||||
@@ -653,6 +656,7 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi
|
||||
|
||||
// Clear previously registered plugin commands before reloading
|
||||
clearPluginCommands();
|
||||
clearPluginInteractiveHandlers();
|
||||
|
||||
// Lazily initialize the runtime so startup paths that discover/skip plugins do
|
||||
// not eagerly load every channel runtime dependency.
|
||||
@@ -782,6 +786,7 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi
|
||||
description: manifestRecord.description,
|
||||
version: manifestRecord.version,
|
||||
source: candidate.source,
|
||||
rootDir: candidate.rootDir,
|
||||
origin: candidate.origin,
|
||||
workspaceDir: candidate.workspaceDir,
|
||||
enabled: false,
|
||||
@@ -806,6 +811,7 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi
|
||||
description: manifestRecord.description,
|
||||
version: manifestRecord.version,
|
||||
source: candidate.source,
|
||||
rootDir: candidate.rootDir,
|
||||
origin: candidate.origin,
|
||||
workspaceDir: candidate.workspaceDir,
|
||||
enabled: enableState.enabled,
|
||||
|
||||
@@ -13,6 +13,7 @@ import { resolveUserPath } from "../utils.js";
|
||||
import { registerPluginCommand } from "./commands.js";
|
||||
import { normalizePluginHttpPath } from "./http-path.js";
|
||||
import { findOverlappingPluginHttpRoute } from "./http-route-overlap.js";
|
||||
import { registerPluginInteractiveHandler } from "./interactive.js";
|
||||
import { normalizeRegisteredProvider } from "./provider-validation.js";
|
||||
import type { PluginRuntime } from "./runtime/types.js";
|
||||
import { defaultSlotIdForKey } from "./slots.js";
|
||||
@@ -47,17 +48,21 @@ import type {
|
||||
|
||||
export type PluginToolRegistration = {
|
||||
pluginId: string;
|
||||
pluginName?: string;
|
||||
factory: OpenClawPluginToolFactory;
|
||||
names: string[];
|
||||
optional: boolean;
|
||||
source: string;
|
||||
rootDir?: string;
|
||||
};
|
||||
|
||||
export type PluginCliRegistration = {
|
||||
pluginId: string;
|
||||
pluginName?: string;
|
||||
register: OpenClawPluginCliRegistrar;
|
||||
commands: string[];
|
||||
source: string;
|
||||
rootDir?: string;
|
||||
};
|
||||
|
||||
export type PluginHttpRouteRegistration = {
|
||||
@@ -71,15 +76,19 @@ export type PluginHttpRouteRegistration = {
|
||||
|
||||
export type PluginChannelRegistration = {
|
||||
pluginId: string;
|
||||
pluginName?: string;
|
||||
plugin: ChannelPlugin;
|
||||
dock?: ChannelDock;
|
||||
source: string;
|
||||
rootDir?: string;
|
||||
};
|
||||
|
||||
export type PluginProviderRegistration = {
|
||||
pluginId: string;
|
||||
pluginName?: string;
|
||||
provider: ProviderPlugin;
|
||||
source: string;
|
||||
rootDir?: string;
|
||||
};
|
||||
|
||||
export type PluginHookRegistration = {
|
||||
@@ -87,18 +96,23 @@ export type PluginHookRegistration = {
|
||||
entry: HookEntry;
|
||||
events: string[];
|
||||
source: string;
|
||||
rootDir?: string;
|
||||
};
|
||||
|
||||
export type PluginServiceRegistration = {
|
||||
pluginId: string;
|
||||
pluginName?: string;
|
||||
service: OpenClawPluginService;
|
||||
source: string;
|
||||
rootDir?: string;
|
||||
};
|
||||
|
||||
export type PluginCommandRegistration = {
|
||||
pluginId: string;
|
||||
pluginName?: string;
|
||||
command: OpenClawPluginCommandDefinition;
|
||||
source: string;
|
||||
rootDir?: string;
|
||||
};
|
||||
|
||||
export type PluginRecord = {
|
||||
@@ -108,6 +122,7 @@ export type PluginRecord = {
|
||||
description?: string;
|
||||
kind?: PluginKind;
|
||||
source: string;
|
||||
rootDir?: string;
|
||||
origin: PluginOrigin;
|
||||
workspaceDir?: string;
|
||||
enabled: boolean;
|
||||
@@ -212,10 +227,12 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
|
||||
}
|
||||
registry.tools.push({
|
||||
pluginId: record.id,
|
||||
pluginName: record.name,
|
||||
factory,
|
||||
names: normalized,
|
||||
optional,
|
||||
source: record.source,
|
||||
rootDir: record.rootDir,
|
||||
});
|
||||
};
|
||||
|
||||
@@ -443,9 +460,11 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
|
||||
record.channelIds.push(id);
|
||||
registry.channels.push({
|
||||
pluginId: record.id,
|
||||
pluginName: record.name,
|
||||
plugin,
|
||||
dock: normalized.dock,
|
||||
source: record.source,
|
||||
rootDir: record.rootDir,
|
||||
});
|
||||
};
|
||||
|
||||
@@ -473,8 +492,10 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
|
||||
record.providerIds.push(id);
|
||||
registry.providers.push({
|
||||
pluginId: record.id,
|
||||
pluginName: record.name,
|
||||
provider: normalizedProvider,
|
||||
source: record.source,
|
||||
rootDir: record.rootDir,
|
||||
});
|
||||
};
|
||||
|
||||
@@ -509,9 +530,11 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
|
||||
record.cliCommands.push(...commands);
|
||||
registry.cliRegistrars.push({
|
||||
pluginId: record.id,
|
||||
pluginName: record.name,
|
||||
register: registrar,
|
||||
commands,
|
||||
source: record.source,
|
||||
rootDir: record.rootDir,
|
||||
});
|
||||
};
|
||||
|
||||
@@ -533,8 +556,10 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
|
||||
record.services.push(id);
|
||||
registry.services.push({
|
||||
pluginId: record.id,
|
||||
pluginName: record.name,
|
||||
service,
|
||||
source: record.source,
|
||||
rootDir: record.rootDir,
|
||||
});
|
||||
};
|
||||
|
||||
@@ -551,7 +576,10 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
|
||||
}
|
||||
|
||||
// Register with the plugin command system (validates name and checks for duplicates)
|
||||
const result = registerPluginCommand(record.id, command);
|
||||
const result = registerPluginCommand(record.id, command, {
|
||||
pluginName: record.name,
|
||||
pluginRoot: record.rootDir,
|
||||
});
|
||||
if (!result.ok) {
|
||||
pushDiagnostic({
|
||||
level: "error",
|
||||
@@ -565,8 +593,10 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
|
||||
record.commands.push(name);
|
||||
registry.commands.push({
|
||||
pluginId: record.id,
|
||||
pluginName: record.name,
|
||||
command,
|
||||
source: record.source,
|
||||
rootDir: record.rootDir,
|
||||
});
|
||||
};
|
||||
|
||||
@@ -640,6 +670,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
|
||||
version: record.version,
|
||||
description: record.description,
|
||||
source: record.source,
|
||||
rootDir: record.rootDir,
|
||||
config: params.config,
|
||||
pluginConfig: params.pluginConfig,
|
||||
runtime: registryParams.runtime,
|
||||
@@ -653,6 +684,20 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
|
||||
registerGatewayMethod: (method, handler) => registerGatewayMethod(record, method, handler),
|
||||
registerCli: (registrar, opts) => registerCli(record, registrar, opts),
|
||||
registerService: (service) => registerService(record, service),
|
||||
registerInteractiveHandler: (registration) => {
|
||||
const result = registerPluginInteractiveHandler(record.id, registration, {
|
||||
pluginName: record.name,
|
||||
pluginRoot: record.rootDir,
|
||||
});
|
||||
if (!result.ok) {
|
||||
pushDiagnostic({
|
||||
level: "warn",
|
||||
pluginId: record.id,
|
||||
source: record.source,
|
||||
message: result.error ?? "interactive handler registration failed",
|
||||
});
|
||||
}
|
||||
},
|
||||
registerCommand: (command) => registerCommand(record, command),
|
||||
registerContextEngine: (id, factory) => {
|
||||
if (id === defaultSlotIdForKey("contextEngine")) {
|
||||
|
||||
@@ -7,7 +7,18 @@ import { monitorDiscordProvider } from "../../../extensions/discord/src/monitor.
|
||||
import { probeDiscord } from "../../../extensions/discord/src/probe.js";
|
||||
import { resolveDiscordChannelAllowlist } from "../../../extensions/discord/src/resolve-channels.js";
|
||||
import { resolveDiscordUserAllowlist } from "../../../extensions/discord/src/resolve-users.js";
|
||||
import { sendMessageDiscord, sendPollDiscord } from "../../../extensions/discord/src/send.js";
|
||||
import {
|
||||
createThreadDiscord,
|
||||
deleteMessageDiscord,
|
||||
editChannelDiscord,
|
||||
editMessageDiscord,
|
||||
pinMessageDiscord,
|
||||
sendDiscordComponentMessage,
|
||||
sendMessageDiscord,
|
||||
sendPollDiscord,
|
||||
sendTypingDiscord,
|
||||
unpinMessageDiscord,
|
||||
} from "../../../extensions/discord/src/send.js";
|
||||
import { monitorIMessageProvider } from "../../../extensions/imessage/src/monitor.js";
|
||||
import { probeIMessage } from "../../../extensions/imessage/src/probe.js";
|
||||
import { sendMessageIMessage } from "../../../extensions/imessage/src/send.js";
|
||||
@@ -29,7 +40,17 @@ import {
|
||||
} from "../../../extensions/telegram/src/audit.js";
|
||||
import { monitorTelegramProvider } from "../../../extensions/telegram/src/monitor.js";
|
||||
import { probeTelegram } from "../../../extensions/telegram/src/probe.js";
|
||||
import { sendMessageTelegram, sendPollTelegram } from "../../../extensions/telegram/src/send.js";
|
||||
import {
|
||||
deleteMessageTelegram,
|
||||
editMessageReplyMarkupTelegram,
|
||||
editMessageTelegram,
|
||||
pinMessageTelegram,
|
||||
renameForumTopicTelegram,
|
||||
sendMessageTelegram,
|
||||
sendPollTelegram,
|
||||
sendTypingTelegram,
|
||||
unpinMessageTelegram,
|
||||
} from "../../../extensions/telegram/src/send.js";
|
||||
import { resolveTelegramToken } from "../../../extensions/telegram/src/token.js";
|
||||
import { resolveEffectiveMessagesConfig, resolveHumanDelayConfig } from "../../agents/identity.js";
|
||||
import { handleSlackAction } from "../../agents/tools/slack-actions.js";
|
||||
@@ -113,6 +134,8 @@ import {
|
||||
upsertChannelPairingRequest,
|
||||
} from "../../pairing/pairing-store.js";
|
||||
import { buildAgentSessionKey, resolveAgentRoute } from "../../routing/resolve-route.js";
|
||||
import { createDiscordTypingLease } from "./runtime-discord-typing.js";
|
||||
import { createTelegramTypingLease } from "./runtime-telegram-typing.js";
|
||||
import { createRuntimeWhatsApp } from "./runtime-whatsapp.js";
|
||||
import type { PluginRuntime } from "./types.js";
|
||||
|
||||
@@ -207,9 +230,33 @@ export function createRuntimeChannel(): PluginRuntime["channel"] {
|
||||
probeDiscord,
|
||||
resolveChannelAllowlist: resolveDiscordChannelAllowlist,
|
||||
resolveUserAllowlist: resolveDiscordUserAllowlist,
|
||||
sendComponentMessage: sendDiscordComponentMessage,
|
||||
sendMessageDiscord,
|
||||
sendPollDiscord,
|
||||
monitorDiscordProvider,
|
||||
typing: {
|
||||
pulse: sendTypingDiscord,
|
||||
start: async ({ channelId, accountId, cfg, intervalMs }) =>
|
||||
await createDiscordTypingLease({
|
||||
channelId,
|
||||
accountId,
|
||||
cfg,
|
||||
intervalMs,
|
||||
pulse: async ({ channelId, accountId, cfg }) =>
|
||||
void (await sendTypingDiscord(channelId, {
|
||||
accountId,
|
||||
cfg,
|
||||
})),
|
||||
}),
|
||||
},
|
||||
conversationActions: {
|
||||
editMessage: editMessageDiscord,
|
||||
deleteMessage: deleteMessageDiscord,
|
||||
pinMessage: pinMessageDiscord,
|
||||
unpinMessage: unpinMessageDiscord,
|
||||
createThread: createThreadDiscord,
|
||||
editChannel: editChannelDiscord,
|
||||
},
|
||||
},
|
||||
slack: {
|
||||
listDirectoryGroupsLive: listSlackDirectoryGroupsLive,
|
||||
@@ -230,6 +277,33 @@ export function createRuntimeChannel(): PluginRuntime["channel"] {
|
||||
sendPollTelegram,
|
||||
monitorTelegramProvider,
|
||||
messageActions: telegramMessageActions,
|
||||
typing: {
|
||||
pulse: sendTypingTelegram,
|
||||
start: async ({ to, accountId, cfg, intervalMs, messageThreadId }) =>
|
||||
await createTelegramTypingLease({
|
||||
to,
|
||||
accountId,
|
||||
cfg,
|
||||
intervalMs,
|
||||
messageThreadId,
|
||||
pulse: async ({ to, accountId, cfg, messageThreadId }) =>
|
||||
await sendTypingTelegram(to, {
|
||||
accountId,
|
||||
cfg,
|
||||
messageThreadId,
|
||||
}),
|
||||
}),
|
||||
},
|
||||
conversationActions: {
|
||||
editMessage: editMessageTelegram,
|
||||
editReplyMarkup: editMessageReplyMarkupTelegram,
|
||||
clearReplyMarkup: async (chatIdInput, messageIdInput, opts = {}) =>
|
||||
await editMessageReplyMarkupTelegram(chatIdInput, messageIdInput, [], opts),
|
||||
deleteMessage: deleteMessageTelegram,
|
||||
renameTopic: renameForumTopicTelegram,
|
||||
pinMessage: pinMessageTelegram,
|
||||
unpinMessage: unpinMessageTelegram,
|
||||
},
|
||||
},
|
||||
signal: {
|
||||
probeSignal,
|
||||
|
||||
57
src/plugins/runtime/runtime-discord-typing.test.ts
Normal file
57
src/plugins/runtime/runtime-discord-typing.test.ts
Normal file
@@ -0,0 +1,57 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { createDiscordTypingLease } from "./runtime-discord-typing.js";
|
||||
|
||||
describe("createDiscordTypingLease", () => {
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("pulses immediately and keeps leases independent", async () => {
|
||||
vi.useFakeTimers();
|
||||
const pulse = vi.fn(async () => undefined);
|
||||
|
||||
const leaseA = await createDiscordTypingLease({
|
||||
channelId: "123",
|
||||
intervalMs: 2_000,
|
||||
pulse,
|
||||
});
|
||||
const leaseB = await createDiscordTypingLease({
|
||||
channelId: "123",
|
||||
intervalMs: 2_000,
|
||||
pulse,
|
||||
});
|
||||
|
||||
expect(pulse).toHaveBeenCalledTimes(2);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(2_000);
|
||||
expect(pulse).toHaveBeenCalledTimes(4);
|
||||
|
||||
leaseA.stop();
|
||||
await vi.advanceTimersByTimeAsync(2_000);
|
||||
expect(pulse).toHaveBeenCalledTimes(5);
|
||||
|
||||
await leaseB.refresh();
|
||||
expect(pulse).toHaveBeenCalledTimes(6);
|
||||
|
||||
leaseB.stop();
|
||||
});
|
||||
|
||||
it("swallows background pulse failures", async () => {
|
||||
vi.useFakeTimers();
|
||||
const pulse = vi
|
||||
.fn<(params: { channelId: string; accountId?: string; cfg?: unknown }) => Promise<void>>()
|
||||
.mockResolvedValueOnce(undefined)
|
||||
.mockRejectedValueOnce(new Error("boom"));
|
||||
|
||||
const lease = await createDiscordTypingLease({
|
||||
channelId: "123",
|
||||
intervalMs: 2_000,
|
||||
pulse,
|
||||
});
|
||||
|
||||
await expect(vi.advanceTimersByTimeAsync(2_000)).resolves.toBe(vi);
|
||||
expect(pulse).toHaveBeenCalledTimes(2);
|
||||
|
||||
lease.stop();
|
||||
});
|
||||
});
|
||||
62
src/plugins/runtime/runtime-discord-typing.ts
Normal file
62
src/plugins/runtime/runtime-discord-typing.ts
Normal file
@@ -0,0 +1,62 @@
|
||||
import { logWarn } from "../../logger.js";
|
||||
|
||||
export type CreateDiscordTypingLeaseParams = {
|
||||
channelId: string;
|
||||
accountId?: string;
|
||||
cfg?: ReturnType<typeof import("../../config/config.js").loadConfig>;
|
||||
intervalMs?: number;
|
||||
pulse: (params: {
|
||||
channelId: string;
|
||||
accountId?: string;
|
||||
cfg?: ReturnType<typeof import("../../config/config.js").loadConfig>;
|
||||
}) => Promise<void>;
|
||||
};
|
||||
|
||||
const DEFAULT_DISCORD_TYPING_INTERVAL_MS = 8_000;
|
||||
|
||||
export async function createDiscordTypingLease(params: CreateDiscordTypingLeaseParams): Promise<{
|
||||
refresh: () => Promise<void>;
|
||||
stop: () => void;
|
||||
}> {
|
||||
const intervalMs =
|
||||
typeof params.intervalMs === "number" && Number.isFinite(params.intervalMs)
|
||||
? Math.max(1_000, Math.floor(params.intervalMs))
|
||||
: DEFAULT_DISCORD_TYPING_INTERVAL_MS;
|
||||
|
||||
let stopped = false;
|
||||
let timer: ReturnType<typeof setInterval> | null = null;
|
||||
|
||||
const pulse = async () => {
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
await params.pulse({
|
||||
channelId: params.channelId,
|
||||
accountId: params.accountId,
|
||||
cfg: params.cfg,
|
||||
});
|
||||
};
|
||||
|
||||
await pulse();
|
||||
|
||||
timer = setInterval(() => {
|
||||
// Background lease refreshes must never escape as unhandled rejections.
|
||||
void pulse().catch((err) => {
|
||||
logWarn(`plugins: discord typing pulse failed: ${String(err)}`);
|
||||
});
|
||||
}, intervalMs);
|
||||
timer.unref?.();
|
||||
|
||||
return {
|
||||
refresh: async () => {
|
||||
await pulse();
|
||||
},
|
||||
stop: () => {
|
||||
stopped = true;
|
||||
if (timer) {
|
||||
clearInterval(timer);
|
||||
timer = null;
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
83
src/plugins/runtime/runtime-telegram-typing.test.ts
Normal file
83
src/plugins/runtime/runtime-telegram-typing.test.ts
Normal file
@@ -0,0 +1,83 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { createTelegramTypingLease } from "./runtime-telegram-typing.js";
|
||||
|
||||
describe("createTelegramTypingLease", () => {
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("pulses immediately and keeps leases independent", async () => {
|
||||
vi.useFakeTimers();
|
||||
const pulse = vi.fn(async () => undefined);
|
||||
|
||||
const leaseA = await createTelegramTypingLease({
|
||||
to: "telegram:123",
|
||||
intervalMs: 2_000,
|
||||
pulse,
|
||||
});
|
||||
const leaseB = await createTelegramTypingLease({
|
||||
to: "telegram:123",
|
||||
intervalMs: 2_000,
|
||||
pulse,
|
||||
});
|
||||
|
||||
expect(pulse).toHaveBeenCalledTimes(2);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(2_000);
|
||||
expect(pulse).toHaveBeenCalledTimes(4);
|
||||
|
||||
leaseA.stop();
|
||||
await vi.advanceTimersByTimeAsync(2_000);
|
||||
expect(pulse).toHaveBeenCalledTimes(5);
|
||||
|
||||
await leaseB.refresh();
|
||||
expect(pulse).toHaveBeenCalledTimes(6);
|
||||
|
||||
leaseB.stop();
|
||||
});
|
||||
|
||||
it("swallows background pulse failures", async () => {
|
||||
vi.useFakeTimers();
|
||||
const pulse = vi
|
||||
.fn<
|
||||
(params: {
|
||||
to: string;
|
||||
accountId?: string;
|
||||
cfg?: unknown;
|
||||
messageThreadId?: number;
|
||||
}) => Promise<unknown>
|
||||
>()
|
||||
.mockResolvedValueOnce(undefined)
|
||||
.mockRejectedValueOnce(new Error("boom"));
|
||||
|
||||
const lease = await createTelegramTypingLease({
|
||||
to: "telegram:123",
|
||||
intervalMs: 2_000,
|
||||
pulse,
|
||||
});
|
||||
|
||||
await expect(vi.advanceTimersByTimeAsync(2_000)).resolves.toBe(vi);
|
||||
expect(pulse).toHaveBeenCalledTimes(2);
|
||||
|
||||
lease.stop();
|
||||
});
|
||||
|
||||
it("falls back to the default interval for non-finite values", async () => {
|
||||
vi.useFakeTimers();
|
||||
const pulse = vi.fn(async () => undefined);
|
||||
|
||||
const lease = await createTelegramTypingLease({
|
||||
to: "telegram:123",
|
||||
intervalMs: Number.NaN,
|
||||
pulse,
|
||||
});
|
||||
|
||||
expect(pulse).toHaveBeenCalledTimes(1);
|
||||
await vi.advanceTimersByTimeAsync(3_999);
|
||||
expect(pulse).toHaveBeenCalledTimes(1);
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
expect(pulse).toHaveBeenCalledTimes(2);
|
||||
|
||||
lease.stop();
|
||||
});
|
||||
});
|
||||
60
src/plugins/runtime/runtime-telegram-typing.ts
Normal file
60
src/plugins/runtime/runtime-telegram-typing.ts
Normal file
@@ -0,0 +1,60 @@
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import { logWarn } from "../../logger.js";
|
||||
|
||||
export type CreateTelegramTypingLeaseParams = {
|
||||
to: string;
|
||||
accountId?: string;
|
||||
cfg?: OpenClawConfig;
|
||||
intervalMs?: number;
|
||||
messageThreadId?: number;
|
||||
pulse: (params: {
|
||||
to: string;
|
||||
accountId?: string;
|
||||
cfg?: OpenClawConfig;
|
||||
messageThreadId?: number;
|
||||
}) => Promise<unknown>;
|
||||
};
|
||||
|
||||
export async function createTelegramTypingLease(params: CreateTelegramTypingLeaseParams): Promise<{
|
||||
refresh: () => Promise<void>;
|
||||
stop: () => void;
|
||||
}> {
|
||||
const intervalMs =
|
||||
typeof params.intervalMs === "number" && Number.isFinite(params.intervalMs)
|
||||
? Math.max(1_000, Math.floor(params.intervalMs))
|
||||
: 4_000;
|
||||
let stopped = false;
|
||||
|
||||
const refresh = async () => {
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
await params.pulse({
|
||||
to: params.to,
|
||||
accountId: params.accountId,
|
||||
cfg: params.cfg,
|
||||
messageThreadId: params.messageThreadId,
|
||||
});
|
||||
};
|
||||
|
||||
await refresh();
|
||||
|
||||
const timer = setInterval(() => {
|
||||
// Background lease refreshes must never escape as unhandled rejections.
|
||||
void refresh().catch((err) => {
|
||||
logWarn(`plugins: telegram typing pulse failed: ${String(err)}`);
|
||||
});
|
||||
}, intervalMs);
|
||||
timer.unref?.();
|
||||
|
||||
return {
|
||||
refresh,
|
||||
stop: () => {
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
stopped = true;
|
||||
clearInterval(timer);
|
||||
},
|
||||
};
|
||||
}
|
||||
@@ -94,9 +94,30 @@ export type PluginRuntimeChannel = {
|
||||
probeDiscord: typeof import("../../../extensions/discord/src/probe.js").probeDiscord;
|
||||
resolveChannelAllowlist: typeof import("../../../extensions/discord/src/resolve-channels.js").resolveDiscordChannelAllowlist;
|
||||
resolveUserAllowlist: typeof import("../../../extensions/discord/src/resolve-users.js").resolveDiscordUserAllowlist;
|
||||
sendComponentMessage: typeof import("../../../extensions/discord/src/send.js").sendDiscordComponentMessage;
|
||||
sendMessageDiscord: typeof import("../../../extensions/discord/src/send.js").sendMessageDiscord;
|
||||
sendPollDiscord: typeof import("../../../extensions/discord/src/send.js").sendPollDiscord;
|
||||
monitorDiscordProvider: typeof import("../../../extensions/discord/src/monitor.js").monitorDiscordProvider;
|
||||
typing: {
|
||||
pulse: typeof import("../../../extensions/discord/src/send.js").sendTypingDiscord;
|
||||
start: (params: {
|
||||
channelId: string;
|
||||
accountId?: string;
|
||||
cfg?: ReturnType<typeof import("../../config/config.js").loadConfig>;
|
||||
intervalMs?: number;
|
||||
}) => Promise<{
|
||||
refresh: () => Promise<void>;
|
||||
stop: () => void;
|
||||
}>;
|
||||
};
|
||||
conversationActions: {
|
||||
editMessage: typeof import("../../../extensions/discord/src/send.js").editMessageDiscord;
|
||||
deleteMessage: typeof import("../../../extensions/discord/src/send.js").deleteMessageDiscord;
|
||||
pinMessage: typeof import("../../../extensions/discord/src/send.js").pinMessageDiscord;
|
||||
unpinMessage: typeof import("../../../extensions/discord/src/send.js").unpinMessageDiscord;
|
||||
createThread: typeof import("../../../extensions/discord/src/send.js").createThreadDiscord;
|
||||
editChannel: typeof import("../../../extensions/discord/src/send.js").editChannelDiscord;
|
||||
};
|
||||
};
|
||||
slack: {
|
||||
listDirectoryGroupsLive: typeof import("../../../extensions/slack/src/directory-live.js").listSlackDirectoryGroupsLive;
|
||||
@@ -117,6 +138,39 @@ export type PluginRuntimeChannel = {
|
||||
sendPollTelegram: typeof import("../../../extensions/telegram/src/send.js").sendPollTelegram;
|
||||
monitorTelegramProvider: typeof import("../../../extensions/telegram/src/monitor.js").monitorTelegramProvider;
|
||||
messageActions: typeof import("../../channels/plugins/actions/telegram.js").telegramMessageActions;
|
||||
typing: {
|
||||
pulse: typeof import("../../../extensions/telegram/src/send.js").sendTypingTelegram;
|
||||
start: (params: {
|
||||
to: string;
|
||||
accountId?: string;
|
||||
cfg?: ReturnType<typeof import("../../config/config.js").loadConfig>;
|
||||
intervalMs?: number;
|
||||
messageThreadId?: number;
|
||||
}) => Promise<{
|
||||
refresh: () => Promise<void>;
|
||||
stop: () => void;
|
||||
}>;
|
||||
};
|
||||
conversationActions: {
|
||||
editMessage: typeof import("../../../extensions/telegram/src/send.js").editMessageTelegram;
|
||||
editReplyMarkup: typeof import("../../../extensions/telegram/src/send.js").editMessageReplyMarkupTelegram;
|
||||
clearReplyMarkup: (
|
||||
chatIdInput: string | number,
|
||||
messageIdInput: string | number,
|
||||
opts?: {
|
||||
token?: string;
|
||||
accountId?: string;
|
||||
verbose?: boolean;
|
||||
api?: Partial<import("grammy").Bot["api"]>;
|
||||
retry?: import("../../infra/retry.js").RetryConfig;
|
||||
cfg?: ReturnType<typeof import("../../config/config.js").loadConfig>;
|
||||
},
|
||||
) => Promise<{ ok: true; messageId: string; chatId: string }>;
|
||||
deleteMessage: typeof import("../../../extensions/telegram/src/send.js").deleteMessageTelegram;
|
||||
renameTopic: typeof import("../../../extensions/telegram/src/send.js").renameForumTopicTelegram;
|
||||
pinMessage: typeof import("../../../extensions/telegram/src/send.js").pinMessageTelegram;
|
||||
unpinMessage: typeof import("../../../extensions/telegram/src/send.js").unpinMessageTelegram;
|
||||
};
|
||||
};
|
||||
signal: {
|
||||
probeSignal: typeof import("../../../extensions/signal/src/probe.js").probeSignal;
|
||||
|
||||
@@ -19,7 +19,12 @@ import { startPluginServices } from "./services.js";
|
||||
function createRegistry(services: OpenClawPluginService[]) {
|
||||
const registry = createEmptyPluginRegistry();
|
||||
for (const service of services) {
|
||||
registry.services.push({ pluginId: "plugin:test", service, source: "test" });
|
||||
registry.services.push({
|
||||
pluginId: "plugin:test",
|
||||
service,
|
||||
source: "test",
|
||||
rootDir: "/plugins/test-plugin",
|
||||
});
|
||||
}
|
||||
return registry;
|
||||
}
|
||||
@@ -116,7 +121,9 @@ describe("startPluginServices", () => {
|
||||
await handle.stop();
|
||||
|
||||
expect(mockedLogger.error).toHaveBeenCalledWith(
|
||||
expect.stringContaining("plugin service failed (service-start-fail):"),
|
||||
expect.stringContaining(
|
||||
"plugin service failed (service-start-fail, plugin=plugin:test, root=/plugins/test-plugin):",
|
||||
),
|
||||
);
|
||||
expect(mockedLogger.warn).toHaveBeenCalledWith(
|
||||
expect.stringContaining("plugin service stop failed (service-stop-fail):"),
|
||||
|
||||
@@ -54,7 +54,11 @@ export async function startPluginServices(params: {
|
||||
stop: service.stop ? () => service.stop?.(serviceContext) : undefined,
|
||||
});
|
||||
} catch (err) {
|
||||
log.error(`plugin service failed (${service.id}): ${String(err)}`);
|
||||
const error = err as Error;
|
||||
const stack = error?.stack?.trim();
|
||||
log.error(
|
||||
`plugin service failed (${service.id}, plugin=${entry.pluginId}, root=${entry.rootDir ?? "unknown"}): ${error?.message ?? String(err)}${stack ? `\n${stack}` : ""}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import type { IncomingMessage, ServerResponse } from "node:http";
|
||||
import type { TopLevelComponents } from "@buape/carbon";
|
||||
import type { AgentMessage } from "@mariozechner/pi-agent-core";
|
||||
import type { StreamFn } from "@mariozechner/pi-agent-core";
|
||||
import type { Api, Model } from "@mariozechner/pi-ai";
|
||||
@@ -511,8 +512,48 @@ export type PluginCommandContext = {
|
||||
accountId?: string;
|
||||
/** Thread/topic id if available */
|
||||
messageThreadId?: number;
|
||||
requestConversationBinding: (
|
||||
params?: PluginConversationBindingRequestParams,
|
||||
) => Promise<PluginConversationBindingRequestResult>;
|
||||
detachConversationBinding: () => Promise<{ removed: boolean }>;
|
||||
getCurrentConversationBinding: () => Promise<PluginConversationBinding | null>;
|
||||
};
|
||||
|
||||
export type PluginConversationBindingRequestParams = {
|
||||
summary?: string;
|
||||
detachHint?: string;
|
||||
};
|
||||
|
||||
export type PluginConversationBinding = {
|
||||
bindingId: string;
|
||||
pluginId: string;
|
||||
pluginName?: string;
|
||||
pluginRoot: string;
|
||||
channel: string;
|
||||
accountId: string;
|
||||
conversationId: string;
|
||||
parentConversationId?: string;
|
||||
threadId?: string | number;
|
||||
boundAt: number;
|
||||
summary?: string;
|
||||
detachHint?: string;
|
||||
};
|
||||
|
||||
export type PluginConversationBindingRequestResult =
|
||||
| {
|
||||
status: "bound";
|
||||
binding: PluginConversationBinding;
|
||||
}
|
||||
| {
|
||||
status: "pending";
|
||||
approvalId: string;
|
||||
reply: ReplyPayload;
|
||||
}
|
||||
| {
|
||||
status: "error";
|
||||
message: string;
|
||||
};
|
||||
|
||||
/**
|
||||
* Result returned by a plugin command handler.
|
||||
*/
|
||||
@@ -547,6 +588,111 @@ export type OpenClawPluginCommandDefinition = {
|
||||
handler: PluginCommandHandler;
|
||||
};
|
||||
|
||||
export type PluginInteractiveChannel = "telegram" | "discord";
|
||||
|
||||
export type PluginInteractiveButtons = Array<
|
||||
Array<{ text: string; callback_data: string; style?: "danger" | "success" | "primary" }>
|
||||
>;
|
||||
|
||||
export type PluginInteractiveTelegramHandlerResult = {
|
||||
handled?: boolean;
|
||||
} | void;
|
||||
|
||||
export type PluginInteractiveTelegramHandlerContext = {
|
||||
channel: "telegram";
|
||||
accountId: string;
|
||||
callbackId: string;
|
||||
conversationId: string;
|
||||
parentConversationId?: string;
|
||||
senderId?: string;
|
||||
senderUsername?: string;
|
||||
threadId?: number;
|
||||
isGroup: boolean;
|
||||
isForum: boolean;
|
||||
auth: {
|
||||
isAuthorizedSender: boolean;
|
||||
};
|
||||
callback: {
|
||||
data: string;
|
||||
namespace: string;
|
||||
payload: string;
|
||||
messageId: number;
|
||||
chatId: string;
|
||||
messageText?: string;
|
||||
};
|
||||
respond: {
|
||||
reply: (params: { text: string; buttons?: PluginInteractiveButtons }) => Promise<void>;
|
||||
editMessage: (params: { text: string; buttons?: PluginInteractiveButtons }) => Promise<void>;
|
||||
editButtons: (params: { buttons: PluginInteractiveButtons }) => Promise<void>;
|
||||
clearButtons: () => Promise<void>;
|
||||
deleteMessage: () => Promise<void>;
|
||||
};
|
||||
requestConversationBinding: (
|
||||
params?: PluginConversationBindingRequestParams,
|
||||
) => Promise<PluginConversationBindingRequestResult>;
|
||||
detachConversationBinding: () => Promise<{ removed: boolean }>;
|
||||
getCurrentConversationBinding: () => Promise<PluginConversationBinding | null>;
|
||||
};
|
||||
|
||||
export type PluginInteractiveDiscordHandlerResult = {
|
||||
handled?: boolean;
|
||||
} | void;
|
||||
|
||||
export type PluginInteractiveDiscordHandlerContext = {
|
||||
channel: "discord";
|
||||
accountId: string;
|
||||
interactionId: string;
|
||||
conversationId: string;
|
||||
parentConversationId?: string;
|
||||
guildId?: string;
|
||||
senderId?: string;
|
||||
senderUsername?: string;
|
||||
auth: {
|
||||
isAuthorizedSender: boolean;
|
||||
};
|
||||
interaction: {
|
||||
kind: "button" | "select" | "modal";
|
||||
data: string;
|
||||
namespace: string;
|
||||
payload: string;
|
||||
messageId?: string;
|
||||
values?: string[];
|
||||
fields?: Array<{ id: string; name: string; values: string[] }>;
|
||||
};
|
||||
respond: {
|
||||
acknowledge: () => Promise<void>;
|
||||
reply: (params: { text: string; ephemeral?: boolean }) => Promise<void>;
|
||||
followUp: (params: { text: string; ephemeral?: boolean }) => Promise<void>;
|
||||
editMessage: (params: { text?: string; components?: TopLevelComponents[] }) => Promise<void>;
|
||||
clearComponents: (params?: { text?: string }) => Promise<void>;
|
||||
};
|
||||
requestConversationBinding: (
|
||||
params?: PluginConversationBindingRequestParams,
|
||||
) => Promise<PluginConversationBindingRequestResult>;
|
||||
detachConversationBinding: () => Promise<{ removed: boolean }>;
|
||||
getCurrentConversationBinding: () => Promise<PluginConversationBinding | null>;
|
||||
};
|
||||
|
||||
export type PluginInteractiveTelegramHandlerRegistration = {
|
||||
channel: "telegram";
|
||||
namespace: string;
|
||||
handler: (
|
||||
ctx: PluginInteractiveTelegramHandlerContext,
|
||||
) => Promise<PluginInteractiveTelegramHandlerResult> | PluginInteractiveTelegramHandlerResult;
|
||||
};
|
||||
|
||||
export type PluginInteractiveDiscordHandlerRegistration = {
|
||||
channel: "discord";
|
||||
namespace: string;
|
||||
handler: (
|
||||
ctx: PluginInteractiveDiscordHandlerContext,
|
||||
) => Promise<PluginInteractiveDiscordHandlerResult> | PluginInteractiveDiscordHandlerResult;
|
||||
};
|
||||
|
||||
export type PluginInteractiveHandlerRegistration =
|
||||
| PluginInteractiveTelegramHandlerRegistration
|
||||
| PluginInteractiveDiscordHandlerRegistration;
|
||||
|
||||
export type OpenClawPluginHttpRouteAuth = "gateway" | "plugin";
|
||||
export type OpenClawPluginHttpRouteMatch = "exact" | "prefix";
|
||||
|
||||
@@ -611,6 +757,7 @@ export type OpenClawPluginApi = {
|
||||
version?: string;
|
||||
description?: string;
|
||||
source: string;
|
||||
rootDir?: string;
|
||||
config: OpenClawConfig;
|
||||
pluginConfig?: Record<string, unknown>;
|
||||
runtime: PluginRuntime;
|
||||
@@ -630,6 +777,7 @@ export type OpenClawPluginApi = {
|
||||
registerCli: (registrar: OpenClawPluginCliRegistrar, opts?: { commands?: string[] }) => void;
|
||||
registerService: (service: OpenClawPluginService) => void;
|
||||
registerProvider: (provider: ProviderPlugin) => void;
|
||||
registerInteractiveHandler: (registration: PluginInteractiveHandlerRegistration) => void;
|
||||
/**
|
||||
* Register a custom command that bypasses the LLM agent.
|
||||
* Plugin commands are processed before built-in commands and before agent invocation.
|
||||
@@ -673,6 +821,7 @@ export type PluginHookName =
|
||||
| "before_compaction"
|
||||
| "after_compaction"
|
||||
| "before_reset"
|
||||
| "inbound_claim"
|
||||
| "message_received"
|
||||
| "message_sending"
|
||||
| "message_sent"
|
||||
@@ -699,6 +848,7 @@ export const PLUGIN_HOOK_NAMES = [
|
||||
"before_compaction",
|
||||
"after_compaction",
|
||||
"before_reset",
|
||||
"inbound_claim",
|
||||
"message_received",
|
||||
"message_sending",
|
||||
"message_sent",
|
||||
@@ -907,6 +1057,37 @@ export type PluginHookMessageContext = {
|
||||
conversationId?: string;
|
||||
};
|
||||
|
||||
export type PluginHookInboundClaimContext = PluginHookMessageContext & {
|
||||
parentConversationId?: string;
|
||||
senderId?: string;
|
||||
messageId?: string;
|
||||
};
|
||||
|
||||
export type PluginHookInboundClaimEvent = {
|
||||
content: string;
|
||||
body?: string;
|
||||
bodyForAgent?: string;
|
||||
transcript?: string;
|
||||
timestamp?: number;
|
||||
channel: string;
|
||||
accountId?: string;
|
||||
conversationId?: string;
|
||||
parentConversationId?: string;
|
||||
senderId?: string;
|
||||
senderName?: string;
|
||||
senderUsername?: string;
|
||||
threadId?: string | number;
|
||||
messageId?: string;
|
||||
isGroup: boolean;
|
||||
commandAuthorized?: boolean;
|
||||
wasMentioned?: boolean;
|
||||
metadata?: Record<string, unknown>;
|
||||
};
|
||||
|
||||
export type PluginHookInboundClaimResult = {
|
||||
handled: boolean;
|
||||
};
|
||||
|
||||
// message_received hook
|
||||
export type PluginHookMessageReceivedEvent = {
|
||||
from: string;
|
||||
@@ -1163,6 +1344,10 @@ export type PluginHookHandlerMap = {
|
||||
event: PluginHookBeforeResetEvent,
|
||||
ctx: PluginHookAgentContext,
|
||||
) => Promise<void> | void;
|
||||
inbound_claim: (
|
||||
event: PluginHookInboundClaimEvent,
|
||||
ctx: PluginHookInboundClaimContext,
|
||||
) => Promise<PluginHookInboundClaimResult | void> | PluginHookInboundClaimResult | void;
|
||||
message_received: (
|
||||
event: PluginHookMessageReceivedEvent,
|
||||
ctx: PluginHookMessageContext,
|
||||
|
||||
175
src/plugins/wired-hooks-inbound-claim.test.ts
Normal file
175
src/plugins/wired-hooks-inbound-claim.test.ts
Normal file
@@ -0,0 +1,175 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { createHookRunner } from "./hooks.js";
|
||||
import { createMockPluginRegistry } from "./hooks.test-helpers.js";
|
||||
|
||||
describe("inbound_claim hook runner", () => {
|
||||
it("stops at the first handler that claims the event", async () => {
|
||||
const first = vi.fn().mockResolvedValue({ handled: true });
|
||||
const second = vi.fn().mockResolvedValue({ handled: true });
|
||||
const registry = createMockPluginRegistry([
|
||||
{ hookName: "inbound_claim", handler: first },
|
||||
{ hookName: "inbound_claim", handler: second },
|
||||
]);
|
||||
const runner = createHookRunner(registry);
|
||||
|
||||
const result = await runner.runInboundClaim(
|
||||
{
|
||||
content: "who are you",
|
||||
channel: "telegram",
|
||||
accountId: "default",
|
||||
conversationId: "123:topic:77",
|
||||
isGroup: true,
|
||||
},
|
||||
{
|
||||
channelId: "telegram",
|
||||
accountId: "default",
|
||||
conversationId: "123:topic:77",
|
||||
},
|
||||
);
|
||||
|
||||
expect(result).toEqual({ handled: true });
|
||||
expect(first).toHaveBeenCalledTimes(1);
|
||||
expect(second).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("continues to the next handler when a higher-priority handler throws", async () => {
|
||||
const logger = {
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
};
|
||||
const failing = vi.fn().mockRejectedValue(new Error("boom"));
|
||||
const succeeding = vi.fn().mockResolvedValue({ handled: true });
|
||||
const registry = createMockPluginRegistry([
|
||||
{ hookName: "inbound_claim", handler: failing },
|
||||
{ hookName: "inbound_claim", handler: succeeding },
|
||||
]);
|
||||
const runner = createHookRunner(registry, { logger });
|
||||
|
||||
const result = await runner.runInboundClaim(
|
||||
{
|
||||
content: "hi",
|
||||
channel: "telegram",
|
||||
accountId: "default",
|
||||
conversationId: "123",
|
||||
isGroup: false,
|
||||
},
|
||||
{
|
||||
channelId: "telegram",
|
||||
accountId: "default",
|
||||
conversationId: "123",
|
||||
},
|
||||
);
|
||||
|
||||
expect(result).toEqual({ handled: true });
|
||||
expect(logger.error).toHaveBeenCalledWith(
|
||||
expect.stringContaining("inbound_claim handler from test-plugin failed: Error: boom"),
|
||||
);
|
||||
expect(succeeding).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("can target a single plugin when core already owns the binding", async () => {
|
||||
const first = vi.fn().mockResolvedValue({ handled: true });
|
||||
const second = vi.fn().mockResolvedValue({ handled: true });
|
||||
const registry = createMockPluginRegistry([
|
||||
{ hookName: "inbound_claim", handler: first },
|
||||
{ hookName: "inbound_claim", handler: second },
|
||||
]);
|
||||
registry.typedHooks[1].pluginId = "other-plugin";
|
||||
const runner = createHookRunner(registry);
|
||||
|
||||
const result = await runner.runInboundClaimForPlugin(
|
||||
"test-plugin",
|
||||
{
|
||||
content: "who are you",
|
||||
channel: "discord",
|
||||
accountId: "default",
|
||||
conversationId: "channel:1",
|
||||
isGroup: true,
|
||||
},
|
||||
{
|
||||
channelId: "discord",
|
||||
accountId: "default",
|
||||
conversationId: "channel:1",
|
||||
},
|
||||
);
|
||||
|
||||
expect(result).toEqual({ handled: true });
|
||||
expect(first).toHaveBeenCalledTimes(1);
|
||||
expect(second).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("reports missing_plugin when the bound plugin is not loaded", async () => {
|
||||
const registry = createMockPluginRegistry([]);
|
||||
registry.plugins = [];
|
||||
const runner = createHookRunner(registry);
|
||||
|
||||
const result = await runner.runInboundClaimForPluginOutcome(
|
||||
"missing-plugin",
|
||||
{
|
||||
content: "who are you",
|
||||
channel: "discord",
|
||||
accountId: "default",
|
||||
conversationId: "channel:1",
|
||||
isGroup: true,
|
||||
},
|
||||
{
|
||||
channelId: "discord",
|
||||
accountId: "default",
|
||||
conversationId: "channel:1",
|
||||
},
|
||||
);
|
||||
|
||||
expect(result).toEqual({ status: "missing_plugin" });
|
||||
});
|
||||
|
||||
it("reports no_handler when the plugin is loaded but has no targeted hooks", async () => {
|
||||
const registry = createMockPluginRegistry([]);
|
||||
const runner = createHookRunner(registry);
|
||||
|
||||
const result = await runner.runInboundClaimForPluginOutcome(
|
||||
"test-plugin",
|
||||
{
|
||||
content: "who are you",
|
||||
channel: "discord",
|
||||
accountId: "default",
|
||||
conversationId: "channel:1",
|
||||
isGroup: true,
|
||||
},
|
||||
{
|
||||
channelId: "discord",
|
||||
accountId: "default",
|
||||
conversationId: "channel:1",
|
||||
},
|
||||
);
|
||||
|
||||
expect(result).toEqual({ status: "no_handler" });
|
||||
});
|
||||
|
||||
it("reports error when a targeted handler throws and none claim the event", async () => {
|
||||
const logger = {
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
};
|
||||
const failing = vi.fn().mockRejectedValue(new Error("boom"));
|
||||
const registry = createMockPluginRegistry([{ hookName: "inbound_claim", handler: failing }]);
|
||||
const runner = createHookRunner(registry, { logger });
|
||||
|
||||
const result = await runner.runInboundClaimForPluginOutcome(
|
||||
"test-plugin",
|
||||
{
|
||||
content: "who are you",
|
||||
channel: "discord",
|
||||
accountId: "default",
|
||||
conversationId: "channel:1",
|
||||
isGroup: true,
|
||||
},
|
||||
{
|
||||
channelId: "discord",
|
||||
accountId: "default",
|
||||
conversationId: "channel:1",
|
||||
},
|
||||
);
|
||||
|
||||
expect(result).toEqual({ status: "error", error: "boom" });
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user