refactor: centralize plugin gateway message dispatch

This commit is contained in:
Peter Steinberger
2026-04-28 03:28:51 +01:00
parent 7bf08e7344
commit 35685e9960
2 changed files with 103 additions and 85 deletions

View File

@@ -2,7 +2,11 @@ import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { jsonResult } from "../../agents/tools/common.js";
import { dispatchChannelMessageAction } from "../../channels/plugins/message-action-dispatch.js";
import type { ChannelMessageActionContext, ChannelPlugin } from "../../channels/plugins/types.js";
import type {
ChannelMessageActionContext,
ChannelMessageActionName,
ChannelPlugin,
} from "../../channels/plugins/types.js";
import type { OpenClawConfig } from "../../config/config.js";
import { getActivePluginRegistry, setActivePluginRegistry } from "../../plugins/runtime.js";
import { createTestRegistry } from "../../test-utils/channel-plugins.js";
@@ -98,6 +102,39 @@ function createPollForwardingPlugin(params: {
};
}
function createGatewayActionPlugin(params: {
pluginId: string;
label: string;
blurb: string;
actions: ChannelMessageActionName[];
gatewayActions?: ChannelMessageActionName[];
capabilities?: ChannelPlugin["capabilities"];
messaging?: ChannelPlugin["messaging"];
handleAction: ChannelActionHandler;
}): ChannelPlugin {
const actions = new Set(params.actions);
const gatewayActions = new Set(params.gatewayActions ?? params.actions);
return {
id: params.pluginId,
meta: {
id: params.pluginId,
label: params.label,
selectionLabel: params.label,
docsPath: `/channels/${params.pluginId}`,
blurb: params.blurb,
},
capabilities: params.capabilities ?? { chatTypes: ["direct"] },
config: createAlwaysConfiguredPluginConfig(),
messaging: params.messaging,
actions: {
describeMessageTool: () => ({ actions: params.actions }),
supportsAction: ({ action }) => actions.has(action),
resolveExecutionMode: ({ action }) => (gatewayActions.has(action) ? "gateway" : "local"),
handleAction: params.handleAction,
},
};
}
async function executePluginAction(params: {
action: "send" | "poll";
ctx: Pick<
@@ -318,24 +355,14 @@ describe("runMessageAction plugin dispatch", () => {
local: true,
}),
);
const gatewayPlugin: ChannelPlugin = {
id: "gatewaychat",
meta: {
id: "gatewaychat",
label: "Gateway Chat",
selectionLabel: "Gateway Chat",
docsPath: "/channels/gatewaychat",
blurb: "Gateway Chat reaction test plugin.",
},
const gatewayPlugin = createGatewayActionPlugin({
pluginId: "gatewaychat",
label: "Gateway Chat",
blurb: "Gateway Chat reaction test plugin.",
actions: ["react"],
capabilities: { chatTypes: ["direct"], reactions: true },
config: createAlwaysConfiguredPluginConfig(),
actions: {
describeMessageTool: () => ({ actions: ["react"] }),
supportsAction: ({ action }) => action === "react",
resolveExecutionMode: ({ action }) => (action === "react" ? "gateway" : "local"),
handleAction,
},
};
handleAction,
});
setActivePluginRegistry(
createTestRegistry([
{
@@ -414,29 +441,18 @@ describe("runMessageAction plugin dispatch", () => {
it("routes gateway-executed plugin sends through gateway RPC instead of local dispatch", async () => {
const handleAction = vi.fn(async () => jsonResult({ ok: true, local: true }));
const gatewayPlugin: ChannelPlugin = {
id: "gatewaychat",
meta: {
id: "gatewaychat",
label: "Gateway Chat",
selectionLabel: "Gateway Chat",
docsPath: "/channels/gatewaychat",
blurb: "Gateway Chat send test plugin.",
},
capabilities: { chatTypes: ["direct"] },
config: createAlwaysConfiguredPluginConfig(),
const gatewayPlugin = createGatewayActionPlugin({
pluginId: "gatewaychat",
label: "Gateway Chat",
blurb: "Gateway Chat send test plugin.",
actions: ["send"],
messaging: {
targetResolver: {
looksLikeId: () => true,
},
},
actions: {
describeMessageTool: () => ({ actions: ["send"] }),
supportsAction: ({ action }) => action === "send",
resolveExecutionMode: ({ action }) => (action === "send" ? "gateway" : "local"),
handleAction,
},
};
handleAction,
});
setActivePluginRegistry(
createTestRegistry([
{
@@ -1003,19 +1019,18 @@ describe("runMessageAction plugin dispatch", () => {
it("routes gateway-executed plugin polls through gateway RPC instead of local dispatch", async () => {
const handleAction = vi.fn(async () => jsonResult({ ok: true, local: true }));
const pollGatewayPlugin = createPollForwardingPlugin({
const pollGatewayPlugin = createGatewayActionPlugin({
pluginId: "pollchat",
label: "Poll Chat",
blurb: "Poll chat gateway forwarding test plugin.",
actions: ["poll"],
messaging: {
targetResolver: {
looksLikeId: () => true,
},
},
handleAction,
});
const baseActions = pollGatewayPlugin.actions!;
pollGatewayPlugin.actions = {
describeMessageTool: baseActions.describeMessageTool,
supportsAction: baseActions.supportsAction,
handleAction: baseActions.handleAction,
resolveExecutionMode: ({ action }) => (action === "poll" ? "gateway" : "local"),
};
setActivePluginRegistry(
createTestRegistry([
{

View File

@@ -366,7 +366,7 @@ type ResolvedActionContext = {
abortSignal?: AbortSignal;
};
async function maybeCallGatewayPluginMessageAction(params: {
async function runGatewayPluginMessageActionOrNull(params: {
cfg: OpenClawConfig;
params: Record<string, unknown>;
channel: ChannelId;
@@ -376,7 +376,8 @@ async function maybeCallGatewayPluginMessageAction(params: {
gateway?: MessageActionRunnerGateway;
input: RunMessageActionParams;
agentId?: string;
}): Promise<{ payload: unknown } | null> {
result: (payload: unknown) => MessageActionRunResult;
}): Promise<MessageActionRunResult | null> {
if (params.dryRun || !params.gateway) {
return null;
}
@@ -388,26 +389,25 @@ async function maybeCallGatewayPluginMessageAction(params: {
if (executionMode !== "gateway") {
return null;
}
return {
payload: await callGatewayMessageAction<unknown>({
gateway: params.gateway,
actionParams: {
channel: params.channel,
action: params.action,
params: params.params,
accountId: params.accountId ?? undefined,
requesterSenderId: params.input.requesterSenderId ?? undefined,
senderIsOwner: params.input.senderIsOwner,
sessionKey: params.input.sessionKey,
sessionId: params.input.sessionId,
agentId: params.agentId,
toolContext: params.input.toolContext,
idempotencyKey: await resolveGatewayActionIdempotencyKey(
normalizeOptionalString(params.params.idempotencyKey),
),
},
}),
};
const payload = await callGatewayMessageAction<unknown>({
gateway: params.gateway,
actionParams: {
channel: params.channel,
action: params.action,
params: params.params,
accountId: params.accountId ?? undefined,
requesterSenderId: params.input.requesterSenderId ?? undefined,
senderIsOwner: params.input.senderIsOwner,
sessionKey: params.input.sessionKey,
sessionId: params.input.sessionId,
agentId: params.agentId,
toolContext: params.input.toolContext,
idempotencyKey: await resolveGatewayActionIdempotencyKey(
normalizeOptionalString(params.params.idempotencyKey),
),
},
});
return params.result(payload);
}
function resolveGateway(input: RunMessageActionParams): MessageActionRunnerGateway | undefined {
@@ -636,7 +636,7 @@ async function handleSendAction(ctx: ResolvedActionContext): Promise<MessageActi
mergedMediaUrls.length > 0 ? mergedMediaUrls : mediaUrl ? [mediaUrl] : undefined;
throwIfAborted(abortSignal);
const gatewayPluginAction = await maybeCallGatewayPluginMessageAction({
const gatewayPluginAction = await runGatewayPluginMessageActionOrNull({
cfg,
params,
channel,
@@ -646,17 +646,18 @@ async function handleSendAction(ctx: ResolvedActionContext): Promise<MessageActi
gateway,
input,
agentId,
});
if (gatewayPluginAction) {
return {
result: (payload) => ({
kind: "send",
channel,
action,
to,
handledBy: "plugin",
payload: gatewayPluginAction.payload,
payload,
dryRun,
};
}),
});
if (gatewayPluginAction) {
return gatewayPluginAction;
}
const send = await executeSendAction({
@@ -743,7 +744,7 @@ async function handlePollAction(ctx: ResolvedActionContext): Promise<MessageActi
preferPresentation: false,
});
const gatewayPluginAction = await maybeCallGatewayPluginMessageAction({
const gatewayPluginAction = await runGatewayPluginMessageActionOrNull({
cfg,
params,
channel,
@@ -753,17 +754,18 @@ async function handlePollAction(ctx: ResolvedActionContext): Promise<MessageActi
gateway,
input,
agentId,
});
if (gatewayPluginAction) {
return {
result: (payload) => ({
kind: "poll",
channel,
action,
to,
handledBy: "plugin",
payload: gatewayPluginAction.payload,
payload,
dryRun,
};
}),
});
if (gatewayPluginAction) {
return gatewayPluginAction;
}
const poll = await executePollAction({
@@ -850,7 +852,7 @@ async function handlePluginAction(ctx: ResolvedActionContext): Promise<MessageAc
if (!plugin?.actions?.handleAction) {
throw new Error(`Channel ${channel} is unavailable for message actions (plugin not loaded).`);
}
const gatewayPluginAction = await maybeCallGatewayPluginMessageAction({
const gatewayPluginAction = await runGatewayPluginMessageActionOrNull({
cfg,
params,
channel,
@@ -860,17 +862,18 @@ async function handlePluginAction(ctx: ResolvedActionContext): Promise<MessageAc
gateway,
input,
agentId,
});
if (gatewayPluginAction) {
// Gateway-owned actions must execute where the live channel runtime exists.
return {
result: (payload) => ({
kind: "action",
channel,
action,
handledBy: "plugin",
payload: gatewayPluginAction.payload,
payload,
dryRun,
};
}),
});
if (gatewayPluginAction) {
// Gateway-owned actions must execute where the live channel runtime exists.
return gatewayPluginAction;
}
const handled = await dispatchChannelMessageAction({