mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-11 17:21:13 +00:00
fix(whatsapp): route react through gateway (#64638)
* fix(whatsapp): route react through gateway * fix(gateway): accept full message action tool context
This commit is contained in:
@@ -29,6 +29,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Google/Veo: stop sending the unsupported `numberOfVideos` request field so Gemini Developer API Veo runs do not fail before OpenClaw can complete the intended Google video generation path. (#64723) Thanks @velvet-shark.
|
||||
- QA/packaging: stop packaged CLI startup and completion cache generation from reading repo-only QA scenario markdown, ship the bundled QA scenario pack in npm releases, and keep `openclaw completion --write-state` working even if QA setup is broken. (#64648) Thanks @obviyus.
|
||||
- Codex/QA: keep Codex app-server coordination chatter out of visible replies, add a live QA leak scenario, and classify leaked harness meta text as a QA failure instead of a successful reply. Thanks @vincentkoc.
|
||||
- WhatsApp: route `message react` through the gateway-owned action path so reactions use the live WhatsApp listener in both DM and group chats, matching `message send` and `message poll`. Thanks @mcaxtr.
|
||||
|
||||
## 2026.4.10
|
||||
|
||||
|
||||
@@ -401,6 +401,60 @@ public struct AgentEvent: Codable, Sendable {
|
||||
}
|
||||
}
|
||||
|
||||
public struct MessageActionParams: Codable, Sendable {
|
||||
public let channel: String
|
||||
public let action: String
|
||||
public let params: [String: AnyCodable]
|
||||
public let accountid: String?
|
||||
public let requestersenderid: String?
|
||||
public let senderisowner: Bool?
|
||||
public let sessionkey: String?
|
||||
public let sessionid: String?
|
||||
public let agentid: String?
|
||||
public let toolcontext: [String: AnyCodable]?
|
||||
public let idempotencykey: String
|
||||
|
||||
public init(
|
||||
channel: String,
|
||||
action: String,
|
||||
params: [String: AnyCodable],
|
||||
accountid: String?,
|
||||
requestersenderid: String?,
|
||||
senderisowner: Bool?,
|
||||
sessionkey: String?,
|
||||
sessionid: String?,
|
||||
agentid: String?,
|
||||
toolcontext: [String: AnyCodable]?,
|
||||
idempotencykey: String)
|
||||
{
|
||||
self.channel = channel
|
||||
self.action = action
|
||||
self.params = params
|
||||
self.accountid = accountid
|
||||
self.requestersenderid = requestersenderid
|
||||
self.senderisowner = senderisowner
|
||||
self.sessionkey = sessionkey
|
||||
self.sessionid = sessionid
|
||||
self.agentid = agentid
|
||||
self.toolcontext = toolcontext
|
||||
self.idempotencykey = idempotencykey
|
||||
}
|
||||
|
||||
private enum CodingKeys: String, CodingKey {
|
||||
case channel
|
||||
case action
|
||||
case params
|
||||
case accountid = "accountId"
|
||||
case requestersenderid = "requesterSenderId"
|
||||
case senderisowner = "senderIsOwner"
|
||||
case sessionkey = "sessionKey"
|
||||
case sessionid = "sessionId"
|
||||
case agentid = "agentId"
|
||||
case toolcontext = "toolContext"
|
||||
case idempotencykey = "idempotencyKey"
|
||||
}
|
||||
}
|
||||
|
||||
public struct SendParams: Codable, Sendable {
|
||||
public let to: String
|
||||
public let message: String?
|
||||
|
||||
@@ -401,6 +401,60 @@ public struct AgentEvent: Codable, Sendable {
|
||||
}
|
||||
}
|
||||
|
||||
public struct MessageActionParams: Codable, Sendable {
|
||||
public let channel: String
|
||||
public let action: String
|
||||
public let params: [String: AnyCodable]
|
||||
public let accountid: String?
|
||||
public let requestersenderid: String?
|
||||
public let senderisowner: Bool?
|
||||
public let sessionkey: String?
|
||||
public let sessionid: String?
|
||||
public let agentid: String?
|
||||
public let toolcontext: [String: AnyCodable]?
|
||||
public let idempotencykey: String
|
||||
|
||||
public init(
|
||||
channel: String,
|
||||
action: String,
|
||||
params: [String: AnyCodable],
|
||||
accountid: String?,
|
||||
requestersenderid: String?,
|
||||
senderisowner: Bool?,
|
||||
sessionkey: String?,
|
||||
sessionid: String?,
|
||||
agentid: String?,
|
||||
toolcontext: [String: AnyCodable]?,
|
||||
idempotencykey: String)
|
||||
{
|
||||
self.channel = channel
|
||||
self.action = action
|
||||
self.params = params
|
||||
self.accountid = accountid
|
||||
self.requestersenderid = requestersenderid
|
||||
self.senderisowner = senderisowner
|
||||
self.sessionkey = sessionkey
|
||||
self.sessionid = sessionid
|
||||
self.agentid = agentid
|
||||
self.toolcontext = toolcontext
|
||||
self.idempotencykey = idempotencykey
|
||||
}
|
||||
|
||||
private enum CodingKeys: String, CodingKey {
|
||||
case channel
|
||||
case action
|
||||
case params
|
||||
case accountid = "accountId"
|
||||
case requestersenderid = "requesterSenderId"
|
||||
case senderisowner = "senderIsOwner"
|
||||
case sessionkey = "sessionKey"
|
||||
case sessionid = "sessionId"
|
||||
case agentid = "agentId"
|
||||
case toolcontext = "toolContext"
|
||||
case idempotencykey = "idempotencyKey"
|
||||
}
|
||||
}
|
||||
|
||||
public struct SendParams: Codable, Sendable {
|
||||
public let to: String
|
||||
public let message: String?
|
||||
|
||||
@@ -135,6 +135,7 @@ export const whatsappPlugin: ChannelPlugin<ResolvedWhatsAppAccount> =
|
||||
describeMessageTool: ({ cfg, accountId }) =>
|
||||
describeWhatsAppMessageActions({ cfg, accountId }),
|
||||
supportsAction: ({ action }) => action === "react",
|
||||
resolveExecutionMode: ({ action }) => (action === "react" ? "gateway" : "local"),
|
||||
handleAction: async ({ action, params, cfg, accountId, toolContext }) =>
|
||||
await (
|
||||
await loadWhatsAppChannelReactAction()
|
||||
|
||||
@@ -636,6 +636,7 @@ export type ChannelMessageActionAdapter = {
|
||||
params: ChannelMessageActionDiscoveryContext,
|
||||
) => ChannelMessageToolDiscovery | null | undefined;
|
||||
supportsAction?: (params: { action: ChannelMessageActionName }) => boolean;
|
||||
resolveExecutionMode?: (params: { action: ChannelMessageActionName }) => "local" | "gateway";
|
||||
resolveCliActionRequest?: (params: {
|
||||
action: ChannelMessageActionName;
|
||||
args: Record<string, unknown>;
|
||||
|
||||
@@ -115,6 +115,7 @@ const METHOD_SCOPE_GROUPS: Record<OperatorScope, readonly string[]> = {
|
||||
"agents.files.get",
|
||||
],
|
||||
[WRITE_SCOPE]: [
|
||||
"message.action",
|
||||
"send",
|
||||
"poll",
|
||||
"agent",
|
||||
|
||||
@@ -8,6 +8,8 @@ import {
|
||||
type AgentIdentityResult,
|
||||
AgentIdentityResultSchema,
|
||||
AgentParamsSchema,
|
||||
type MessageActionParams,
|
||||
MessageActionParamsSchema,
|
||||
type AgentSummary,
|
||||
AgentSummarySchema,
|
||||
type AgentsFileEntry,
|
||||
@@ -305,6 +307,8 @@ export const validateConnectParams = ajv.compile<ConnectParams>(ConnectParamsSch
|
||||
export const validateRequestFrame = ajv.compile<RequestFrame>(RequestFrameSchema);
|
||||
export const validateResponseFrame = ajv.compile<ResponseFrame>(ResponseFrameSchema);
|
||||
export const validateEventFrame = ajv.compile<EventFrame>(EventFrameSchema);
|
||||
export const validateMessageActionParams =
|
||||
ajv.compile<MessageActionParams>(MessageActionParamsSchema);
|
||||
export const validateSendParams = ajv.compile(SendParamsSchema);
|
||||
export const validatePollParams = ajv.compile<PollParams>(PollParamsSchema);
|
||||
export const validateAgentParams = ajv.compile(AgentParamsSchema);
|
||||
@@ -553,6 +557,7 @@ export {
|
||||
ErrorShapeSchema,
|
||||
StateVersionSchema,
|
||||
AgentEventSchema,
|
||||
MessageActionParamsSchema,
|
||||
ChatEventSchema,
|
||||
SendParamsSchema,
|
||||
PollParamsSchema,
|
||||
|
||||
@@ -35,6 +35,51 @@ export const AgentEventSchema = Type.Object(
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const MessageActionToolContextSchema = Type.Object(
|
||||
{
|
||||
currentChannelId: Type.Optional(Type.String()),
|
||||
currentGraphChannelId: Type.Optional(Type.String()),
|
||||
currentChannelProvider: Type.Optional(Type.String()),
|
||||
currentThreadTs: Type.Optional(Type.String()),
|
||||
currentMessageId: Type.Optional(Type.Union([Type.String(), Type.Number()])),
|
||||
replyToMode: Type.Optional(
|
||||
Type.Union([
|
||||
Type.Literal("off"),
|
||||
Type.Literal("first"),
|
||||
Type.Literal("all"),
|
||||
Type.Literal("batched"),
|
||||
]),
|
||||
),
|
||||
hasRepliedRef: Type.Optional(
|
||||
Type.Object(
|
||||
{
|
||||
value: Type.Boolean(),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
),
|
||||
),
|
||||
skipCrossContextDecoration: Type.Optional(Type.Boolean()),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const MessageActionParamsSchema = Type.Object(
|
||||
{
|
||||
channel: NonEmptyString,
|
||||
action: NonEmptyString,
|
||||
params: Type.Record(Type.String(), Type.Unknown()),
|
||||
accountId: Type.Optional(Type.String()),
|
||||
requesterSenderId: Type.Optional(Type.String()),
|
||||
senderIsOwner: Type.Optional(Type.Boolean()),
|
||||
sessionKey: Type.Optional(Type.String()),
|
||||
sessionId: Type.Optional(Type.String()),
|
||||
agentId: Type.Optional(Type.String()),
|
||||
toolContext: Type.Optional(MessageActionToolContextSchema),
|
||||
idempotencyKey: NonEmptyString,
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const SendParamsSchema = Type.Object(
|
||||
{
|
||||
to: NonEmptyString,
|
||||
|
||||
@@ -5,6 +5,7 @@ import {
|
||||
AgentIdentityResultSchema,
|
||||
AgentParamsSchema,
|
||||
AgentWaitParamsSchema,
|
||||
MessageActionParamsSchema,
|
||||
PollParamsSchema,
|
||||
SendParamsSchema,
|
||||
WakeParamsSchema,
|
||||
@@ -205,6 +206,7 @@ export const ProtocolSchemas = {
|
||||
Snapshot: SnapshotSchema,
|
||||
ErrorShape: ErrorShapeSchema,
|
||||
AgentEvent: AgentEventSchema,
|
||||
MessageActionParams: MessageActionParamsSchema,
|
||||
SendParams: SendParamsSchema,
|
||||
PollParams: PollParamsSchema,
|
||||
AgentParams: AgentParamsSchema,
|
||||
|
||||
@@ -17,6 +17,7 @@ export type StateVersion = SchemaType<"StateVersion">;
|
||||
export type AgentEvent = SchemaType<"AgentEvent">;
|
||||
export type AgentIdentityParams = SchemaType<"AgentIdentityParams">;
|
||||
export type AgentIdentityResult = SchemaType<"AgentIdentityResult">;
|
||||
export type MessageActionParams = SchemaType<"MessageActionParams">;
|
||||
export type PollParams = SchemaType<"PollParams">;
|
||||
export type AgentWaitParams = SchemaType<"AgentWaitParams">;
|
||||
export type WakeParams = SchemaType<"WakeParams">;
|
||||
|
||||
@@ -120,6 +120,7 @@ const BASE_METHODS = [
|
||||
"gateway.identity.get",
|
||||
"system-presence",
|
||||
"system-event",
|
||||
"message.action",
|
||||
"send",
|
||||
"agent",
|
||||
"agent.identity.get",
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { jsonResult } from "../../agents/tools/common.js";
|
||||
import type { ChannelPlugin } from "../../channels/plugins/types.js";
|
||||
import { setActivePluginRegistry } from "../../plugins/runtime.js";
|
||||
import { createTestRegistry } from "../../test-utils/channel-plugins.js";
|
||||
import type { GatewayRequestContext } from "./types.js";
|
||||
@@ -153,6 +155,19 @@ async function runPollWithClient(
|
||||
return { respond };
|
||||
}
|
||||
|
||||
async function runMessageActionRequest(params: Record<string, unknown>) {
|
||||
const respond = vi.fn();
|
||||
await sendHandlers["message.action"]({
|
||||
params: params as never,
|
||||
respond,
|
||||
context: makeContext(),
|
||||
req: { type: "req", id: "1", method: "message.action" },
|
||||
client: null as never,
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
return { respond };
|
||||
}
|
||||
|
||||
function expectDeliverySessionMirror(params: { agentId: string; sessionKey: string }) {
|
||||
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
@@ -857,4 +872,85 @@ describe("gateway send mirroring", () => {
|
||||
expect.objectContaining({ channel: "slack" }),
|
||||
);
|
||||
});
|
||||
|
||||
it("dispatches message actions through the gateway for plugin-owned channels", async () => {
|
||||
const reactPlugin: ChannelPlugin = {
|
||||
id: "whatsapp",
|
||||
meta: {
|
||||
id: "whatsapp",
|
||||
label: "WhatsApp",
|
||||
selectionLabel: "WhatsApp",
|
||||
docsPath: "/channels/whatsapp",
|
||||
blurb: "WhatsApp action dispatch test plugin.",
|
||||
},
|
||||
capabilities: { chatTypes: ["direct"], reactions: true },
|
||||
config: {
|
||||
listAccountIds: () => ["default"],
|
||||
resolveAccount: () => ({ enabled: true }),
|
||||
isConfigured: () => true,
|
||||
},
|
||||
actions: {
|
||||
describeMessageTool: () => ({ actions: ["react"] }),
|
||||
supportsAction: ({ action }) => action === "react",
|
||||
handleAction: async ({ params, requesterSenderId, toolContext }) =>
|
||||
jsonResult({
|
||||
ok: true,
|
||||
messageId: params.messageId,
|
||||
requesterSenderId,
|
||||
currentMessageId: toolContext?.currentMessageId,
|
||||
currentGraphChannelId: toolContext?.currentGraphChannelId,
|
||||
replyToMode: toolContext?.replyToMode,
|
||||
hasRepliedRef: toolContext?.hasRepliedRef?.value,
|
||||
skipCrossContextDecoration: toolContext?.skipCrossContextDecoration,
|
||||
}),
|
||||
},
|
||||
};
|
||||
mocks.getChannelPlugin.mockReturnValue(reactPlugin);
|
||||
setActivePluginRegistry(
|
||||
createTestRegistry([
|
||||
{
|
||||
pluginId: "whatsapp",
|
||||
source: "test",
|
||||
plugin: reactPlugin,
|
||||
},
|
||||
]),
|
||||
"send-test-message-action",
|
||||
);
|
||||
|
||||
const { respond } = await runMessageActionRequest({
|
||||
channel: "whatsapp",
|
||||
action: "react",
|
||||
params: {
|
||||
chatJid: "+15551234567",
|
||||
messageId: "wamid.1",
|
||||
emoji: "✅",
|
||||
},
|
||||
requesterSenderId: "trusted-user",
|
||||
toolContext: {
|
||||
currentGraphChannelId: "graph:team/chan",
|
||||
currentChannelProvider: "whatsapp",
|
||||
currentMessageId: "wamid.1",
|
||||
replyToMode: "first",
|
||||
hasRepliedRef: { value: true },
|
||||
skipCrossContextDecoration: true,
|
||||
},
|
||||
idempotencyKey: "idem-message-action",
|
||||
});
|
||||
|
||||
expect(respond).toHaveBeenCalledWith(
|
||||
true,
|
||||
{
|
||||
ok: true,
|
||||
messageId: "wamid.1",
|
||||
requesterSenderId: "trusted-user",
|
||||
currentMessageId: "wamid.1",
|
||||
currentGraphChannelId: "graph:team/chan",
|
||||
replyToMode: "first",
|
||||
hasRepliedRef: true,
|
||||
skipCrossContextDecoration: true,
|
||||
},
|
||||
undefined,
|
||||
{ channel: "whatsapp" },
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload";
|
||||
import { resolveSessionAgentId } from "../../agents/agent-scope.js";
|
||||
import { normalizeChannelId } from "../../channels/plugins/index.js";
|
||||
import { dispatchChannelMessageAction } from "../../channels/plugins/message-action-dispatch.js";
|
||||
import { createOutboundSendDeps } from "../../cli/deps.js";
|
||||
import { loadConfig } from "../../config/config.js";
|
||||
import { applyPluginAutoEnable } from "../../config/plugin-auto-enable.js";
|
||||
@@ -16,6 +17,7 @@ import { normalizeReplyPayloadsForDelivery } from "../../infra/outbound/payloads
|
||||
import { buildOutboundSessionContext } from "../../infra/outbound/session-context.js";
|
||||
import { maybeResolveIdLikeTarget } from "../../infra/outbound/target-resolver.js";
|
||||
import { resolveOutboundTarget } from "../../infra/outbound/targets.js";
|
||||
import { extractToolPayload } from "../../infra/outbound/tool-payload.js";
|
||||
import { normalizePollInput } from "../../polls.js";
|
||||
import {
|
||||
normalizeOptionalLowercaseString,
|
||||
@@ -26,6 +28,7 @@ import {
|
||||
ErrorCodes,
|
||||
errorShape,
|
||||
formatValidationErrors,
|
||||
validateMessageActionParams,
|
||||
validatePollParams,
|
||||
validateSendParams,
|
||||
} from "../protocol/index.js";
|
||||
@@ -34,7 +37,7 @@ import type { GatewayRequestContext, GatewayRequestHandlers } from "./types.js";
|
||||
|
||||
type InflightResult = {
|
||||
ok: boolean;
|
||||
payload?: Record<string, unknown>;
|
||||
payload?: unknown;
|
||||
error?: ReturnType<typeof errorShape>;
|
||||
meta?: Record<string, unknown>;
|
||||
};
|
||||
@@ -158,7 +161,7 @@ function buildGatewayDeliveryPayload(params: {
|
||||
function cacheGatewayDedupeSuccess(params: {
|
||||
context: GatewayRequestContext;
|
||||
dedupeKey: string;
|
||||
payload: Record<string, unknown>;
|
||||
payload: unknown;
|
||||
}) {
|
||||
params.context.dedupe.set(params.dedupeKey, {
|
||||
ts: Date.now(),
|
||||
@@ -180,6 +183,123 @@ function cacheGatewayDedupeFailure(params: {
|
||||
}
|
||||
|
||||
export const sendHandlers: GatewayRequestHandlers = {
|
||||
"message.action": async ({ params, respond, context }) => {
|
||||
const p = params;
|
||||
if (!validateMessageActionParams(p)) {
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
errorShape(
|
||||
ErrorCodes.INVALID_REQUEST,
|
||||
`invalid message.action params: ${formatValidationErrors(validateMessageActionParams.errors)}`,
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
const request = p as {
|
||||
channel: string;
|
||||
action: string;
|
||||
params: Record<string, unknown>;
|
||||
accountId?: string;
|
||||
requesterSenderId?: string;
|
||||
senderIsOwner?: boolean;
|
||||
sessionKey?: string;
|
||||
sessionId?: string;
|
||||
agentId?: string;
|
||||
toolContext?: {
|
||||
currentChannelId?: string;
|
||||
currentChannelProvider?: string;
|
||||
currentThreadTs?: string;
|
||||
currentMessageId?: string | number;
|
||||
};
|
||||
idempotencyKey: string;
|
||||
};
|
||||
const idem = request.idempotencyKey;
|
||||
const dedupeKey = `message.action:${idem}`;
|
||||
const cached = context.dedupe.get(dedupeKey);
|
||||
if (cached) {
|
||||
respond(cached.ok, cached.payload, cached.error, {
|
||||
cached: true,
|
||||
});
|
||||
return;
|
||||
}
|
||||
const inflightMap = getInflightMap(context);
|
||||
const inflight = inflightMap.get(dedupeKey);
|
||||
if (inflight) {
|
||||
const result = await inflight;
|
||||
const meta = result.meta ? { ...result.meta, cached: true } : { cached: true };
|
||||
respond(result.ok, result.payload, result.error, meta);
|
||||
return;
|
||||
}
|
||||
const resolvedChannel = await resolveRequestedChannel({
|
||||
requestChannel: request.channel,
|
||||
unsupportedMessage: (input) => `unsupported channel: ${input}`,
|
||||
rejectWebchatAsInternalOnly: true,
|
||||
});
|
||||
if ("error" in resolvedChannel) {
|
||||
respond(false, undefined, resolvedChannel.error);
|
||||
return;
|
||||
}
|
||||
const { cfg, channel } = resolvedChannel;
|
||||
const plugin = resolveOutboundChannelPlugin({ channel, cfg });
|
||||
if (!plugin?.actions?.handleAction) {
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
errorShape(
|
||||
ErrorCodes.INVALID_REQUEST,
|
||||
`Channel ${channel} does not support action ${request.action}.`,
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const work = (async (): Promise<InflightResult> => {
|
||||
try {
|
||||
const handled = await dispatchChannelMessageAction({
|
||||
channel,
|
||||
action: request.action as never,
|
||||
cfg,
|
||||
params: request.params,
|
||||
accountId: normalizeOptionalString(request.accountId) ?? undefined,
|
||||
requesterSenderId: normalizeOptionalString(request.requesterSenderId) ?? undefined,
|
||||
senderIsOwner: request.senderIsOwner,
|
||||
sessionKey: normalizeOptionalString(request.sessionKey) ?? undefined,
|
||||
sessionId: normalizeOptionalString(request.sessionId) ?? undefined,
|
||||
agentId: normalizeOptionalString(request.agentId) ?? undefined,
|
||||
toolContext: request.toolContext,
|
||||
dryRun: false,
|
||||
});
|
||||
if (!handled) {
|
||||
const error = errorShape(
|
||||
ErrorCodes.INVALID_REQUEST,
|
||||
`Message action ${request.action} not supported for channel ${channel}.`,
|
||||
);
|
||||
cacheGatewayDedupeFailure({ context, dedupeKey, error });
|
||||
return { ok: false, error, meta: { channel } };
|
||||
}
|
||||
const payload = extractToolPayload(handled);
|
||||
cacheGatewayDedupeSuccess({ context, dedupeKey, payload });
|
||||
return {
|
||||
ok: true,
|
||||
payload,
|
||||
meta: { channel },
|
||||
};
|
||||
} catch (err) {
|
||||
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
|
||||
cacheGatewayDedupeFailure({ context, dedupeKey, error });
|
||||
return { ok: false, error, meta: { channel, error: formatForLog(err) } };
|
||||
}
|
||||
})();
|
||||
|
||||
inflightMap.set(dedupeKey, work);
|
||||
try {
|
||||
const result = await work;
|
||||
respond(result.ok, result.payload, result.error, result.meta);
|
||||
} finally {
|
||||
inflightMap.delete(dedupeKey);
|
||||
}
|
||||
},
|
||||
send: async ({ params, respond, context, client }) => {
|
||||
const p = params;
|
||||
if (!validateSendParams(p)) {
|
||||
|
||||
@@ -15,6 +15,8 @@ const mocks = vi.hoisted(() => ({
|
||||
resolveOutboundChannelPlugin: vi.fn(),
|
||||
executeSendAction: vi.fn(),
|
||||
executePollAction: vi.fn(),
|
||||
callGatewayLeastPrivilege: vi.fn(),
|
||||
randomIdempotencyKey: vi.fn(() => "idem-gateway-action"),
|
||||
}));
|
||||
|
||||
vi.mock("./channel-resolution.js", () => ({
|
||||
@@ -27,6 +29,11 @@ vi.mock("./outbound-send-service.js", () => ({
|
||||
executePollAction: mocks.executePollAction,
|
||||
}));
|
||||
|
||||
vi.mock("./message.gateway.runtime.js", () => ({
|
||||
callGatewayLeastPrivilege: mocks.callGatewayLeastPrivilege,
|
||||
randomIdempotencyKey: mocks.randomIdempotencyKey,
|
||||
}));
|
||||
|
||||
vi.mock("./outbound-session.js", () => ({
|
||||
ensureOutboundSessionEntry: vi.fn(async () => undefined),
|
||||
resolveOutboundSessionRoute: vi.fn(async () => null),
|
||||
@@ -145,6 +152,8 @@ describe("runMessageAction plugin dispatch", () => {
|
||||
async ({ ctx }: { ctx: Parameters<typeof executePluginAction>[0]["ctx"] }) =>
|
||||
await executePluginAction({ action: "poll", ctx }),
|
||||
);
|
||||
mocks.callGatewayLeastPrivilege.mockReset();
|
||||
mocks.randomIdempotencyKey.mockClear();
|
||||
});
|
||||
|
||||
describe("alias-based plugin action dispatch", () => {
|
||||
@@ -302,6 +311,107 @@ describe("runMessageAction plugin dispatch", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("routes gateway-executed plugin actions through gateway RPC instead of local dispatch", async () => {
|
||||
const handleAction = vi.fn(async () =>
|
||||
jsonResult({
|
||||
ok: true,
|
||||
local: true,
|
||||
}),
|
||||
);
|
||||
const gatewayPlugin: ChannelPlugin = {
|
||||
id: "whatsapp",
|
||||
meta: {
|
||||
id: "whatsapp",
|
||||
label: "WhatsApp",
|
||||
selectionLabel: "WhatsApp",
|
||||
docsPath: "/channels/whatsapp",
|
||||
blurb: "WhatsApp reaction test plugin.",
|
||||
},
|
||||
capabilities: { chatTypes: ["direct"], reactions: true },
|
||||
config: createAlwaysConfiguredPluginConfig(),
|
||||
actions: {
|
||||
describeMessageTool: () => ({ actions: ["react"] }),
|
||||
supportsAction: ({ action }) => action === "react",
|
||||
resolveExecutionMode: ({ action }) => (action === "react" ? "gateway" : "local"),
|
||||
handleAction,
|
||||
},
|
||||
};
|
||||
setActivePluginRegistry(
|
||||
createTestRegistry([
|
||||
{
|
||||
pluginId: "whatsapp",
|
||||
source: "test",
|
||||
plugin: gatewayPlugin,
|
||||
},
|
||||
]),
|
||||
);
|
||||
mocks.callGatewayLeastPrivilege.mockResolvedValue({
|
||||
ok: true,
|
||||
added: "✅",
|
||||
});
|
||||
|
||||
const result = await runMessageAction({
|
||||
cfg: {
|
||||
channels: {
|
||||
whatsapp: {
|
||||
enabled: true,
|
||||
},
|
||||
},
|
||||
} as OpenClawConfig,
|
||||
action: "react",
|
||||
params: {
|
||||
channel: "whatsapp",
|
||||
to: "+15551234567",
|
||||
chatJid: "+15551234567",
|
||||
messageId: "wamid.1",
|
||||
emoji: "✅",
|
||||
},
|
||||
requesterSenderId: "trusted-user",
|
||||
sessionKey: "agent:alpha:main",
|
||||
sessionId: "session-123",
|
||||
agentId: "alpha",
|
||||
toolContext: {
|
||||
currentChannelProvider: "whatsapp",
|
||||
currentMessageId: "wamid.1",
|
||||
},
|
||||
gateway: {
|
||||
clientName: "cli",
|
||||
mode: "cli",
|
||||
},
|
||||
dryRun: false,
|
||||
});
|
||||
|
||||
expect(mocks.callGatewayLeastPrivilege).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
method: "message.action",
|
||||
params: expect.objectContaining({
|
||||
channel: "whatsapp",
|
||||
action: "react",
|
||||
requesterSenderId: "trusted-user",
|
||||
sessionKey: "agent:alpha:main",
|
||||
sessionId: "session-123",
|
||||
agentId: "alpha",
|
||||
toolContext: expect.objectContaining({
|
||||
currentChannelProvider: "whatsapp",
|
||||
currentMessageId: "wamid.1",
|
||||
}),
|
||||
idempotencyKey: "idem-gateway-action",
|
||||
}),
|
||||
}),
|
||||
);
|
||||
expect(handleAction).not.toHaveBeenCalled();
|
||||
expect(result).toMatchObject({
|
||||
kind: "action",
|
||||
channel: "whatsapp",
|
||||
action: "react",
|
||||
handledBy: "plugin",
|
||||
payload: {
|
||||
ok: true,
|
||||
added: "✅",
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("uses requester session channel policy for host-media reads", async () => {
|
||||
const handlePolicyCheckedAction = vi.fn(async ({ mediaAccess }) =>
|
||||
jsonResult({
|
||||
|
||||
@@ -26,7 +26,12 @@ import {
|
||||
normalizeOptionalLowercaseString,
|
||||
normalizeOptionalString,
|
||||
} from "../../shared/string-coerce.js";
|
||||
import { type GatewayClientMode, type GatewayClientName } from "../../utils/message-channel.js";
|
||||
import {
|
||||
GATEWAY_CLIENT_MODES,
|
||||
GATEWAY_CLIENT_NAMES,
|
||||
type GatewayClientMode,
|
||||
type GatewayClientName,
|
||||
} from "../../utils/message-channel.js";
|
||||
import { formatErrorMessage } from "../errors.js";
|
||||
import { throwIfAborted } from "./abort.js";
|
||||
import { resolveOutboundChannelPlugin } from "./channel-resolution.js";
|
||||
@@ -73,6 +78,15 @@ export type MessageActionRunnerGateway = {
|
||||
mode: GatewayClientMode;
|
||||
};
|
||||
|
||||
let messageActionGatewayRuntimePromise: Promise<
|
||||
typeof import("./message.gateway.runtime.js")
|
||||
> | null = null;
|
||||
|
||||
function loadMessageActionGatewayRuntime() {
|
||||
messageActionGatewayRuntimePromise ??= import("./message.gateway.runtime.js");
|
||||
return messageActionGatewayRuntimePromise;
|
||||
}
|
||||
|
||||
export type RunMessageActionParams = {
|
||||
cfg: OpenClawConfig;
|
||||
action: ChannelMessageActionName;
|
||||
@@ -150,6 +164,46 @@ export function getToolResult(
|
||||
return "toolResult" in result ? result.toolResult : undefined;
|
||||
}
|
||||
|
||||
function resolveGatewayActionOptions(gateway?: MessageActionRunnerGateway) {
|
||||
return {
|
||||
url: gateway?.url,
|
||||
token: gateway?.token,
|
||||
timeoutMs:
|
||||
typeof gateway?.timeoutMs === "number" && Number.isFinite(gateway.timeoutMs)
|
||||
? Math.max(1, Math.floor(gateway.timeoutMs))
|
||||
: 10_000,
|
||||
clientName: gateway?.clientName ?? GATEWAY_CLIENT_NAMES.CLI,
|
||||
clientDisplayName: gateway?.clientDisplayName,
|
||||
mode: gateway?.mode ?? GATEWAY_CLIENT_MODES.CLI,
|
||||
};
|
||||
}
|
||||
|
||||
async function callGatewayMessageAction<T>(params: {
|
||||
gateway?: MessageActionRunnerGateway;
|
||||
actionParams: Record<string, unknown>;
|
||||
}): Promise<T> {
|
||||
const { callGatewayLeastPrivilege } = await loadMessageActionGatewayRuntime();
|
||||
const gateway = resolveGatewayActionOptions(params.gateway);
|
||||
return await callGatewayLeastPrivilege<T>({
|
||||
url: gateway.url,
|
||||
token: gateway.token,
|
||||
method: "message.action",
|
||||
params: params.actionParams,
|
||||
timeoutMs: gateway.timeoutMs,
|
||||
clientName: gateway.clientName,
|
||||
clientDisplayName: gateway.clientDisplayName,
|
||||
mode: gateway.mode,
|
||||
});
|
||||
}
|
||||
|
||||
async function resolveGatewayActionIdempotencyKey(idempotencyKey?: string): Promise<string> {
|
||||
if (idempotencyKey) {
|
||||
return idempotencyKey;
|
||||
}
|
||||
const { randomIdempotencyKey } = await loadMessageActionGatewayRuntime();
|
||||
return randomIdempotencyKey();
|
||||
}
|
||||
|
||||
function collectActionMediaSourceHints(params: Record<string, unknown>): string[] {
|
||||
const sources: string[] = [];
|
||||
for (const key of ["media", "mediaUrl", "path", "filePath", "fileUrl"] as const) {
|
||||
@@ -702,6 +756,36 @@ async function handlePluginAction(ctx: ResolvedActionContext): Promise<MessageAc
|
||||
if (!plugin?.actions?.handleAction) {
|
||||
throw new Error(`Channel ${channel} is unavailable for message actions (plugin not loaded).`);
|
||||
}
|
||||
const executionMode = plugin.actions.resolveExecutionMode?.({ action }) ?? "local";
|
||||
if (executionMode === "gateway" && gateway) {
|
||||
// Gateway-owned actions must execute where the live channel runtime exists.
|
||||
const payload = await callGatewayMessageAction<unknown>({
|
||||
gateway,
|
||||
actionParams: {
|
||||
channel,
|
||||
action,
|
||||
params,
|
||||
accountId: accountId ?? undefined,
|
||||
requesterSenderId: input.requesterSenderId ?? undefined,
|
||||
senderIsOwner: input.senderIsOwner,
|
||||
sessionKey: input.sessionKey,
|
||||
sessionId: input.sessionId,
|
||||
agentId,
|
||||
toolContext: input.toolContext,
|
||||
idempotencyKey: await resolveGatewayActionIdempotencyKey(
|
||||
normalizeOptionalString(params.idempotencyKey),
|
||||
),
|
||||
},
|
||||
});
|
||||
return {
|
||||
kind: "action",
|
||||
channel,
|
||||
action,
|
||||
handledBy: "plugin",
|
||||
payload,
|
||||
dryRun,
|
||||
};
|
||||
}
|
||||
|
||||
const handled = await dispatchChannelMessageAction({
|
||||
channel,
|
||||
|
||||
Reference in New Issue
Block a user