fix(hooks): standardize outbound routing metadata

This commit is contained in:
Vincent Koc
2026-04-22 10:52:22 -07:00
parent b0f6c54645
commit e593122465
12 changed files with 174 additions and 113 deletions

View File

@@ -19,6 +19,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Hooks/Slack: standardize shared message hook routing fields (`threadId` / `replyToId`) and stop Slack outbound delivery from re-running `message_sending` inside the channel adapter, so plugins like thread-ownership make one outbound routing decision per reply. Thanks @vincentkoc.
- Gateway/restart: preserve group and channel chat context when resuming an agent turn after a Gateway restart, so continuation replies keep the same prompt, routing, and tool-status behavior as the original conversation.
- Gateway/pairing: shared-secret loopback CLI clients now silently auto-approve `metadata-upgrade` pairing (platform / device family refresh) instead of being disconnected with `1008 pairing required`. This matches the scope-upgrade and role-upgrade behavior added in #69431 and unblocks non-interactive CLI automation when a paired-device record has a stale platform string (e.g. device key replicated across hosts, install migrated between OSes, or platform-string format changed between OpenClaw versions). Browser / Control-UI clients keep the existing approval-required flow for metadata changes.
- Gateway/pairing: treat any forwarded-header evidence (`Forwarded`, `X-Forwarded-*`, or `X-Real-IP`) as proxied WebSocket traffic before pairing locality checks, so reverse-proxy topologies cannot use the loopback shared-secret helper auto-pairing path.

View File

@@ -190,6 +190,8 @@ Hook guard semantics to keep in mind:
- `before_install`: `{ block: false }` is treated as no decision.
- `message_sending`: `{ cancel: true }` is terminal and stops lower-priority handlers.
- `message_sending`: `{ cancel: false }` is treated as no decision.
- `message_received`: prefer the typed `threadId` field when you need inbound thread/topic routing. Keep `metadata` for channel-specific extras.
- `message_sending`: prefer typed `replyToId` / `threadId` routing fields over channel-specific metadata keys.
The `/approve` command handles both exec and plugin approvals with bounded fallback: when an exec approval id is not found, OpenClaw retries the same id through plugin approvals. Plugin approval forwarding can be configured independently via `approvals.plugin` in config.

View File

@@ -460,6 +460,8 @@ AI CLI backend such as `codex-cli`.
- `reply_dispatch`: returning `{ handled: true, ... }` is terminal. Once any handler claims dispatch, lower-priority handlers and the default model dispatch path are skipped.
- `message_sending`: returning `{ cancel: true }` is terminal. Once any handler sets it, lower-priority handlers are skipped.
- `message_sending`: returning `{ cancel: false }` is treated as no decision (same as omitting `cancel`), not as an override.
- `message_received`: use the typed `threadId` field when you need inbound thread/topic routing. Keep `metadata` for channel-specific extras.
- `message_sending`: use typed `replyToId` / `threadId` routing fields before falling back to channel-specific `metadata`.
### API object fields

View File

@@ -1,20 +1,11 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
const sendMessageSlackMock = vi.hoisted(() => vi.fn());
const hasHooksMock = vi.hoisted(() => vi.fn());
const runMessageSendingMock = vi.hoisted(() => vi.fn());
vi.mock("./send.js", () => ({
sendMessageSlack: (...args: unknown[]) => sendMessageSlackMock(...args),
}));
vi.mock("openclaw/plugin-sdk/plugin-runtime", () => ({
getGlobalHookRunner: () => ({
hasHooks: (...args: unknown[]) => hasHooksMock(...args),
runMessageSending: (...args: unknown[]) => runMessageSendingMock(...args),
}),
}));
let slackOutbound: typeof import("./outbound-adapter.js").slackOutbound;
({ slackOutbound } = await import("./outbound-adapter.js"));
@@ -30,9 +21,6 @@ describe("slackOutbound", () => {
beforeEach(() => {
sendMessageSlackMock.mockReset();
hasHooksMock.mockReset();
runMessageSendingMock.mockReset();
hasHooksMock.mockReturnValue(false);
});
it("sends payload media first, then finalizes with blocks", async () => {
@@ -127,37 +115,4 @@ describe("slackOutbound", () => {
);
expect(result).toEqual({ channel: "slack", messageId: "m-blocks" });
});
it("cancels sendMedia when message_sending hooks block it", async () => {
hasHooksMock.mockReturnValue(true);
runMessageSendingMock.mockResolvedValue({ cancel: true });
const result = await slackOutbound.sendMedia!({
cfg,
to: "C123",
text: "caption",
mediaUrl: "https://example.com/image.png",
accountId: "default",
replyToId: "1712000000.000001",
});
expect(runMessageSendingMock).toHaveBeenCalledWith(
{
to: "C123",
content: "caption",
metadata: {
threadTs: "1712000000.000001",
channelId: "C123",
mediaUrl: "https://example.com/image.png",
},
},
{ channelId: "slack", accountId: "default" },
);
expect(sendMessageSlackMock).not.toHaveBeenCalled();
expect(result).toMatchObject({
channel: "slack",
messageId: "cancelled-by-hook",
meta: { cancelled: true },
});
});
});

View File

@@ -12,14 +12,12 @@ import {
resolveOutboundSendDep,
type OutboundIdentity,
} from "openclaw/plugin-sdk/outbound-runtime";
import { getGlobalHookRunner } from "openclaw/plugin-sdk/plugin-runtime";
import {
resolvePayloadMediaUrls,
sendPayloadMediaSequenceAndFinalize,
sendTextMediaPayload,
} from "openclaw/plugin-sdk/reply-payload";
import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime";
import { resolveSlackAccount } from "./accounts.js";
import { parseSlackBlocksInput } from "./blocks-input.js";
import {
buildSlackInteractiveBlocks,
@@ -64,40 +62,6 @@ function resolveSlackSendIdentity(identity?: OutboundIdentity): SlackSendIdentit
return { username, iconUrl, iconEmoji };
}
async function applySlackMessageSendingHooks(params: {
cfg: NonNullable<NonNullable<Parameters<SlackSendFn>[2]>["cfg"]>;
to: string;
text: string;
threadTs?: string;
accountId?: string;
mediaUrl?: string;
}): Promise<{ cancelled: boolean; text: string }> {
const hookRunner = getGlobalHookRunner();
if (!hookRunner?.hasHooks("message_sending")) {
return { cancelled: false, text: params.text };
}
const account = resolveSlackAccount({
cfg: params.cfg,
accountId: params.accountId,
});
const hookResult = await hookRunner.runMessageSending(
{
to: params.to,
content: params.text,
metadata: {
threadTs: params.threadTs,
channelId: params.to,
...(params.mediaUrl ? { mediaUrl: params.mediaUrl } : {}),
},
},
{ channelId: "slack", accountId: account.accountId },
);
if (hookResult?.cancel) {
return { cancelled: true, text: params.text };
}
return { cancelled: false, text: hookResult?.content ?? params.text };
}
async function sendSlackOutboundMessage(params: {
cfg: NonNullable<NonNullable<Parameters<SlackSendFn>[2]>["cfg"]>;
to: string;
@@ -119,28 +83,10 @@ async function sendSlackOutboundMessage(params: {
const send =
resolveOutboundSendDep<SlackSendFn>(params.deps, "slack") ??
(await loadSlackSendRuntime()).sendMessageSlack;
const threadTs =
params.replyToId ?? (params.threadId != null ? String(params.threadId) : undefined);
const hookResult = await applySlackMessageSendingHooks({
cfg: params.cfg,
to: params.to,
text: params.text,
threadTs,
mediaUrl: params.mediaUrl,
accountId: params.accountId ?? undefined,
});
if (hookResult.cancelled) {
return {
messageId: "cancelled-by-hook",
channelId: params.to,
meta: { cancelled: true },
};
}
const slackIdentity = resolveSlackSendIdentity(params.identity);
const result = await send(params.to, hookResult.text, {
const result = await send(params.to, params.text, {
cfg: params.cfg,
threadTs,
threadTs: params.replyToId ?? (params.threadId != null ? String(params.threadId) : undefined),
accountId: params.accountId ?? undefined,
...(params.mediaUrl
? {

View File

@@ -0,0 +1,123 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { deliverOutboundPayloads } from "../../../src/infra/outbound/deliver.js";
import {
initializeGlobalHookRunner,
resetGlobalHookRunner,
} from "../../../src/plugins/hook-runner-global.js";
import { addTestHook } from "../../../src/plugins/hooks.test-helpers.js";
import { createEmptyPluginRegistry } from "../../../src/plugins/registry.js";
import {
releasePinnedPluginChannelRegistry,
setActivePluginRegistry,
} from "../../../src/plugins/runtime.js";
import type { PluginHookRegistration } from "../../../src/plugins/types.js";
import {
createOutboundTestPlugin,
createTestRegistry,
} from "../../../src/test-utils/channel-plugins.js";
import { slackOutbound } from "./outbound-adapter.js";
import type { OpenClawConfig } from "./runtime-api.js";
const sendMessageSlackMock = vi.hoisted(() => vi.fn());
vi.mock("./send.runtime.js", () => ({
sendMessageSlack: sendMessageSlackMock,
}));
const cfg: OpenClawConfig = {
channels: {
slack: {
botToken: "xoxb-test",
appToken: "xapp-test",
accounts: {
default: {
botToken: "xoxb-default",
appToken: "xapp-default",
},
},
},
},
};
describe("slack outbound shared hook wiring", () => {
beforeEach(() => {
sendMessageSlackMock.mockReset();
sendMessageSlackMock.mockResolvedValue({ messageId: "m1", channelId: "C123" });
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "slack",
plugin: createOutboundTestPlugin({ id: "slack", outbound: slackOutbound }),
source: "test",
},
]),
);
resetGlobalHookRunner();
});
afterEach(() => {
resetGlobalHookRunner();
releasePinnedPluginChannelRegistry();
});
it("fires message_sending once with shared routing fields", async () => {
const hookRegistry = createEmptyPluginRegistry();
const handler = vi.fn().mockResolvedValue(undefined);
addTestHook({
registry: hookRegistry,
pluginId: "thread-ownership",
hookName: "message_sending",
handler: handler as PluginHookRegistration["handler"],
});
initializeGlobalHookRunner(hookRegistry);
await deliverOutboundPayloads({
cfg,
channel: "slack",
to: "C123",
payloads: [{ text: "hello" }],
accountId: "default",
replyToId: "1712000000.000001",
});
expect(handler).toHaveBeenCalledTimes(1);
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({
to: "C123",
content: "hello",
replyToId: "1712000000.000001",
}),
expect.objectContaining({
channelId: "slack",
accountId: "default",
conversationId: "C123",
}),
);
expect(sendMessageSlackMock).toHaveBeenCalledTimes(1);
});
it("respects cancel from the shared hook without a second adapter pass", async () => {
const hookRegistry = createEmptyPluginRegistry();
const handler = vi.fn().mockResolvedValue({ cancel: true });
addTestHook({
registry: hookRegistry,
pluginId: "thread-ownership",
hookName: "message_sending",
handler: handler as PluginHookRegistration["handler"],
});
initializeGlobalHookRunner(hookRegistry);
const result = await deliverOutboundPayloads({
cfg,
channel: "slack",
to: "C123",
payloads: [{ text: "hello" }],
accountId: "default",
replyToId: "1712000000.000001",
});
expect(handler).toHaveBeenCalledTimes(1);
expect(sendMessageSlackMock).not.toHaveBeenCalled();
expect(result).toEqual([]);
});
});

View File

@@ -676,11 +676,15 @@ export async function deliverReplies(params: {
}
const rawContent = reply.text || "";
const replyToId =
params.replyToMode === "off" ? undefined : resolveTelegramReplyId(reply.replyToId);
if (hasMessageSendingHooks) {
const hookResult = await hookRunner?.runMessageSending(
{
to: params.chatId,
content: rawContent,
replyToId,
threadId: params.thread?.id,
metadata: {
channel: "telegram",
mediaUrls: mediaList,
@@ -705,8 +709,6 @@ export async function deliverReplies(params: {
try {
const deliveredCountBeforeReply = progress.deliveredCount;
const replyToId =
params.replyToMode === "off" ? undefined : resolveTelegramReplyId(reply.replyToId);
const telegramData = reply.channelData?.telegram as TelegramReplyChannelData | undefined;
const replyMarkup = buildInlineKeyboard(telegramData?.buttons);
let firstDeliveredMessageId: number | undefined;

View File

@@ -46,14 +46,14 @@ describe("thread-ownership plugin", () => {
async function sendSlackThreadMessage() {
return await hooks.message_sending(
{ content: "hello", metadata: { threadTs: "1234.5678", channelId: "C123" }, to: "C123" },
{ content: "hello", replyToId: "1234.5678", metadata: { channelId: "C123" }, to: "C123" },
{ channelId: "slack", conversationId: "C123" },
);
}
it("allows non-slack channels", async () => {
const result = await hooks.message_sending(
{ content: "hello", metadata: { threadTs: "1234.5678", channelId: "C123" }, to: "C123" },
{ content: "hello", replyToId: "1234.5678", metadata: { channelId: "C123" }, to: "C123" },
{ channelId: "discord", conversationId: "C123" },
);
@@ -119,13 +119,17 @@ describe("thread-ownership plugin", () => {
it("tracks @-mentions and skips ownership check for mentioned threads", async () => {
// Simulate receiving a message that @-mentions the agent.
await hooks.message_received(
{ content: "Hey @TestBot help me", metadata: { threadTs: "9999.0001", channelId: "C456" } },
{
content: "Hey @TestBot help me",
threadId: "9999.0001",
metadata: { channelId: "C456" },
},
{ channelId: "slack", conversationId: "C456" },
);
// Now send in the same thread -- should skip the ownership HTTP call.
const result = await hooks.message_sending(
{ content: "Sure!", metadata: { threadTs: "9999.0001", channelId: "C456" }, to: "C456" },
{ content: "Sure!", replyToId: "9999.0001", metadata: { channelId: "C456" }, to: "C456" },
{ channelId: "slack", conversationId: "C456" },
);
@@ -136,7 +140,7 @@ describe("thread-ownership plugin", () => {
it("ignores @-mentions on non-slack channels", async () => {
// Use a unique thread key so module-level state from other tests doesn't interfere.
await hooks.message_received(
{ content: "Hey @TestBot", metadata: { threadTs: "7777.0001", channelId: "C999" } },
{ content: "Hey @TestBot", threadId: "7777.0001", metadata: { channelId: "C999" } },
{ channelId: "discord", conversationId: "C999" },
);
@@ -146,7 +150,7 @@ describe("thread-ownership plugin", () => {
);
await hooks.message_sending(
{ content: "Sure!", metadata: { threadTs: "7777.0001", channelId: "C999" }, to: "C999" },
{ content: "Sure!", replyToId: "7777.0001", metadata: { channelId: "C999" }, to: "C999" },
{ channelId: "slack", conversationId: "C999" },
);
@@ -155,12 +159,16 @@ describe("thread-ownership plugin", () => {
it("tracks bot user ID mentions via <@U999> syntax", async () => {
await hooks.message_received(
{ content: "Hey <@U999> help", metadata: { threadTs: "8888.0001", channelId: "C789" } },
{
content: "Hey <@U999> help",
threadId: "8888.0001",
metadata: { channelId: "C789" },
},
{ channelId: "slack", conversationId: "C789" },
);
const result = await hooks.message_sending(
{ content: "On it!", metadata: { threadTs: "8888.0001", channelId: "C789" }, to: "C789" },
{ content: "On it!", replyToId: "8888.0001", metadata: { channelId: "C789" }, to: "C789" },
{ channelId: "slack", conversationId: "C789" },
);

View File

@@ -20,6 +20,10 @@ type ThreadOwnershipMessageSendingResult = { cancel: true } | undefined;
const mentionedThreads = new Map<string, number>();
const MENTION_TTL_MS = 5 * 60 * 1000;
function resolveThreadToken(value: unknown): string {
return typeof value === "string" || typeof value === "number" ? String(value) : "";
}
function cleanExpiredMentions(): void {
const now = Date.now();
for (const [key, ts] of mentionedThreads) {
@@ -72,7 +76,10 @@ export default definePluginEntry({
}
const text = event.content ?? "";
const threadTs = (event.metadata?.threadTs as string) ?? "";
const threadTs =
resolveThreadToken(event.threadId) ||
resolveThreadToken(event.metadata?.threadId) ||
resolveThreadToken(event.metadata?.threadTs);
const channelId = (event.metadata?.channelId as string) ?? ctx.conversationId ?? "";
if (!threadTs || !channelId) {
return;
@@ -92,7 +99,11 @@ export default definePluginEntry({
return undefined;
}
const threadTs = (event.metadata?.threadTs as string) ?? "";
const threadTs =
resolveThreadToken(event.replyToId) ||
resolveThreadToken(event.threadId) ||
resolveThreadToken(event.metadata?.threadId) ||
resolveThreadToken(event.metadata?.threadTs);
const channelId = (event.metadata?.channelId as string) ?? event.to;
if (!threadTs) {
return undefined;

View File

@@ -280,6 +280,7 @@ export function toPluginMessageReceivedEvent(
from: canonical.from,
content: canonical.content,
timestamp: canonical.timestamp,
threadId: canonical.threadId,
metadata: {
to: canonical.to,
provider: canonical.provider,

View File

@@ -567,6 +567,8 @@ async function applyMessageSendingHook(params: {
to: string;
channel: Exclude<OutboundChannel, "none">;
accountId?: string;
replyToId?: string | null;
threadId?: string | number | null;
}): Promise<{
cancelled: boolean;
payload: ReplyPayload;
@@ -584,6 +586,8 @@ async function applyMessageSendingHook(params: {
{
to: params.to,
content: params.payloadSummary.text,
replyToId: params.payload.replyToId ?? params.replyToId ?? undefined,
threadId: params.threadId ?? undefined,
metadata: {
channel: params.channel,
accountId: params.accountId,
@@ -593,6 +597,7 @@ async function applyMessageSendingHook(params: {
{
channelId: params.channel,
accountId: params.accountId ?? undefined,
conversationId: params.to,
},
);
if (sendingResult?.cancel) {
@@ -827,6 +832,8 @@ async function deliverOutboundPayloadsCore(
to,
channel,
accountId,
replyToId: params.replyToId,
threadId: params.threadId,
});
if (hookResult.cancelled) {
continue;

View File

@@ -35,12 +35,15 @@ export type PluginHookMessageReceivedEvent = {
from: string;
content: string;
timestamp?: number;
threadId?: string | number;
metadata?: Record<string, unknown>;
};
export type PluginHookMessageSendingEvent = {
to: string;
content: string;
replyToId?: string | number;
threadId?: string | number;
metadata?: Record<string, unknown>;
};