From e593122465e7157ca9c2e4a8010a1eb78414a09c Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Wed, 22 Apr 2026 10:52:22 -0700 Subject: [PATCH] fix(hooks): standardize outbound routing metadata --- CHANGELOG.md | 1 + docs/plugins/building-plugins.md | 2 + docs/plugins/sdk-overview.md | 2 + extensions/slack/src/outbound-adapter.test.ts | 45 ------- extensions/slack/src/outbound-adapter.ts | 58 +-------- .../slack/src/outbound-delivery.test.ts | 123 ++++++++++++++++++ .../telegram/src/bot/delivery.replies.ts | 6 +- extensions/thread-ownership/index.test.ts | 24 ++-- extensions/thread-ownership/index.ts | 15 ++- src/hooks/message-hook-mappers.ts | 1 + src/infra/outbound/deliver.ts | 7 + src/plugins/hook-message.types.ts | 3 + 12 files changed, 174 insertions(+), 113 deletions(-) create mode 100644 extensions/slack/src/outbound-delivery.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 71f6e183465..c14472d21e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Hooks/Slack: standardize shared message hook routing fields (`threadId` / `replyToId`) and stop Slack outbound delivery from re-running `message_sending` inside the channel adapter, so plugins like thread-ownership make one outbound routing decision per reply. Thanks @vincentkoc. - Gateway/restart: preserve group and channel chat context when resuming an agent turn after a Gateway restart, so continuation replies keep the same prompt, routing, and tool-status behavior as the original conversation. - Gateway/pairing: shared-secret loopback CLI clients now silently auto-approve `metadata-upgrade` pairing (platform / device family refresh) instead of being disconnected with `1008 pairing required`. This matches the scope-upgrade and role-upgrade behavior added in #69431 and unblocks non-interactive CLI automation when a paired-device record has a stale platform string (e.g. device key replicated across hosts, install migrated between OSes, or platform-string format changed between OpenClaw versions). Browser / Control-UI clients keep the existing approval-required flow for metadata changes. - Gateway/pairing: treat any forwarded-header evidence (`Forwarded`, `X-Forwarded-*`, or `X-Real-IP`) as proxied WebSocket traffic before pairing locality checks, so reverse-proxy topologies cannot use the loopback shared-secret helper auto-pairing path. diff --git a/docs/plugins/building-plugins.md b/docs/plugins/building-plugins.md index 3f2903c1221..bf0929b3d61 100644 --- a/docs/plugins/building-plugins.md +++ b/docs/plugins/building-plugins.md @@ -190,6 +190,8 @@ Hook guard semantics to keep in mind: - `before_install`: `{ block: false }` is treated as no decision. - `message_sending`: `{ cancel: true }` is terminal and stops lower-priority handlers. - `message_sending`: `{ cancel: false }` is treated as no decision. +- `message_received`: prefer the typed `threadId` field when you need inbound thread/topic routing. Keep `metadata` for channel-specific extras. +- `message_sending`: prefer typed `replyToId` / `threadId` routing fields over channel-specific metadata keys. The `/approve` command handles both exec and plugin approvals with bounded fallback: when an exec approval id is not found, OpenClaw retries the same id through plugin approvals. Plugin approval forwarding can be configured independently via `approvals.plugin` in config. diff --git a/docs/plugins/sdk-overview.md b/docs/plugins/sdk-overview.md index 54f036b4d5b..a49accf7240 100644 --- a/docs/plugins/sdk-overview.md +++ b/docs/plugins/sdk-overview.md @@ -460,6 +460,8 @@ AI CLI backend such as `codex-cli`. - `reply_dispatch`: returning `{ handled: true, ... }` is terminal. Once any handler claims dispatch, lower-priority handlers and the default model dispatch path are skipped. - `message_sending`: returning `{ cancel: true }` is terminal. Once any handler sets it, lower-priority handlers are skipped. - `message_sending`: returning `{ cancel: false }` is treated as no decision (same as omitting `cancel`), not as an override. +- `message_received`: use the typed `threadId` field when you need inbound thread/topic routing. Keep `metadata` for channel-specific extras. +- `message_sending`: use typed `replyToId` / `threadId` routing fields before falling back to channel-specific `metadata`. ### API object fields diff --git a/extensions/slack/src/outbound-adapter.test.ts b/extensions/slack/src/outbound-adapter.test.ts index 5ec1b09e100..74b94a3019b 100644 --- a/extensions/slack/src/outbound-adapter.test.ts +++ b/extensions/slack/src/outbound-adapter.test.ts @@ -1,20 +1,11 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; const sendMessageSlackMock = vi.hoisted(() => vi.fn()); -const hasHooksMock = vi.hoisted(() => vi.fn()); -const runMessageSendingMock = vi.hoisted(() => vi.fn()); vi.mock("./send.js", () => ({ sendMessageSlack: (...args: unknown[]) => sendMessageSlackMock(...args), })); -vi.mock("openclaw/plugin-sdk/plugin-runtime", () => ({ - getGlobalHookRunner: () => ({ - hasHooks: (...args: unknown[]) => hasHooksMock(...args), - runMessageSending: (...args: unknown[]) => runMessageSendingMock(...args), - }), -})); - let slackOutbound: typeof import("./outbound-adapter.js").slackOutbound; ({ slackOutbound } = await import("./outbound-adapter.js")); @@ -30,9 +21,6 @@ describe("slackOutbound", () => { beforeEach(() => { sendMessageSlackMock.mockReset(); - hasHooksMock.mockReset(); - runMessageSendingMock.mockReset(); - hasHooksMock.mockReturnValue(false); }); it("sends payload media first, then finalizes with blocks", async () => { @@ -127,37 +115,4 @@ describe("slackOutbound", () => { ); expect(result).toEqual({ channel: "slack", messageId: "m-blocks" }); }); - - it("cancels sendMedia when message_sending hooks block it", async () => { - hasHooksMock.mockReturnValue(true); - runMessageSendingMock.mockResolvedValue({ cancel: true }); - - const result = await slackOutbound.sendMedia!({ - cfg, - to: "C123", - text: "caption", - mediaUrl: "https://example.com/image.png", - accountId: "default", - replyToId: "1712000000.000001", - }); - - expect(runMessageSendingMock).toHaveBeenCalledWith( - { - to: "C123", - content: "caption", - metadata: { - threadTs: "1712000000.000001", - channelId: "C123", - mediaUrl: "https://example.com/image.png", - }, - }, - { channelId: "slack", accountId: "default" }, - ); - expect(sendMessageSlackMock).not.toHaveBeenCalled(); - expect(result).toMatchObject({ - channel: "slack", - messageId: "cancelled-by-hook", - meta: { cancelled: true }, - }); - }); }); diff --git a/extensions/slack/src/outbound-adapter.ts b/extensions/slack/src/outbound-adapter.ts index 9eaaf47435a..b51773306b4 100644 --- a/extensions/slack/src/outbound-adapter.ts +++ b/extensions/slack/src/outbound-adapter.ts @@ -12,14 +12,12 @@ import { resolveOutboundSendDep, type OutboundIdentity, } from "openclaw/plugin-sdk/outbound-runtime"; -import { getGlobalHookRunner } from "openclaw/plugin-sdk/plugin-runtime"; import { resolvePayloadMediaUrls, sendPayloadMediaSequenceAndFinalize, sendTextMediaPayload, } from "openclaw/plugin-sdk/reply-payload"; import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime"; -import { resolveSlackAccount } from "./accounts.js"; import { parseSlackBlocksInput } from "./blocks-input.js"; import { buildSlackInteractiveBlocks, @@ -64,40 +62,6 @@ function resolveSlackSendIdentity(identity?: OutboundIdentity): SlackSendIdentit return { username, iconUrl, iconEmoji }; } -async function applySlackMessageSendingHooks(params: { - cfg: NonNullable[2]>["cfg"]>; - to: string; - text: string; - threadTs?: string; - accountId?: string; - mediaUrl?: string; -}): Promise<{ cancelled: boolean; text: string }> { - const hookRunner = getGlobalHookRunner(); - if (!hookRunner?.hasHooks("message_sending")) { - return { cancelled: false, text: params.text }; - } - const account = resolveSlackAccount({ - cfg: params.cfg, - accountId: params.accountId, - }); - const hookResult = await hookRunner.runMessageSending( - { - to: params.to, - content: params.text, - metadata: { - threadTs: params.threadTs, - channelId: params.to, - ...(params.mediaUrl ? { mediaUrl: params.mediaUrl } : {}), - }, - }, - { channelId: "slack", accountId: account.accountId }, - ); - if (hookResult?.cancel) { - return { cancelled: true, text: params.text }; - } - return { cancelled: false, text: hookResult?.content ?? params.text }; -} - async function sendSlackOutboundMessage(params: { cfg: NonNullable[2]>["cfg"]>; to: string; @@ -119,28 +83,10 @@ async function sendSlackOutboundMessage(params: { const send = resolveOutboundSendDep(params.deps, "slack") ?? (await loadSlackSendRuntime()).sendMessageSlack; - const threadTs = - params.replyToId ?? (params.threadId != null ? String(params.threadId) : undefined); - const hookResult = await applySlackMessageSendingHooks({ - cfg: params.cfg, - to: params.to, - text: params.text, - threadTs, - mediaUrl: params.mediaUrl, - accountId: params.accountId ?? undefined, - }); - if (hookResult.cancelled) { - return { - messageId: "cancelled-by-hook", - channelId: params.to, - meta: { cancelled: true }, - }; - } - const slackIdentity = resolveSlackSendIdentity(params.identity); - const result = await send(params.to, hookResult.text, { + const result = await send(params.to, params.text, { cfg: params.cfg, - threadTs, + threadTs: params.replyToId ?? (params.threadId != null ? String(params.threadId) : undefined), accountId: params.accountId ?? undefined, ...(params.mediaUrl ? { diff --git a/extensions/slack/src/outbound-delivery.test.ts b/extensions/slack/src/outbound-delivery.test.ts new file mode 100644 index 00000000000..485db825937 --- /dev/null +++ b/extensions/slack/src/outbound-delivery.test.ts @@ -0,0 +1,123 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { deliverOutboundPayloads } from "../../../src/infra/outbound/deliver.js"; +import { + initializeGlobalHookRunner, + resetGlobalHookRunner, +} from "../../../src/plugins/hook-runner-global.js"; +import { addTestHook } from "../../../src/plugins/hooks.test-helpers.js"; +import { createEmptyPluginRegistry } from "../../../src/plugins/registry.js"; +import { + releasePinnedPluginChannelRegistry, + setActivePluginRegistry, +} from "../../../src/plugins/runtime.js"; +import type { PluginHookRegistration } from "../../../src/plugins/types.js"; +import { + createOutboundTestPlugin, + createTestRegistry, +} from "../../../src/test-utils/channel-plugins.js"; +import { slackOutbound } from "./outbound-adapter.js"; +import type { OpenClawConfig } from "./runtime-api.js"; + +const sendMessageSlackMock = vi.hoisted(() => vi.fn()); + +vi.mock("./send.runtime.js", () => ({ + sendMessageSlack: sendMessageSlackMock, +})); + +const cfg: OpenClawConfig = { + channels: { + slack: { + botToken: "xoxb-test", + appToken: "xapp-test", + accounts: { + default: { + botToken: "xoxb-default", + appToken: "xapp-default", + }, + }, + }, + }, +}; + +describe("slack outbound shared hook wiring", () => { + beforeEach(() => { + sendMessageSlackMock.mockReset(); + sendMessageSlackMock.mockResolvedValue({ messageId: "m1", channelId: "C123" }); + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "slack", + plugin: createOutboundTestPlugin({ id: "slack", outbound: slackOutbound }), + source: "test", + }, + ]), + ); + resetGlobalHookRunner(); + }); + + afterEach(() => { + resetGlobalHookRunner(); + releasePinnedPluginChannelRegistry(); + }); + + it("fires message_sending once with shared routing fields", async () => { + const hookRegistry = createEmptyPluginRegistry(); + const handler = vi.fn().mockResolvedValue(undefined); + addTestHook({ + registry: hookRegistry, + pluginId: "thread-ownership", + hookName: "message_sending", + handler: handler as PluginHookRegistration["handler"], + }); + initializeGlobalHookRunner(hookRegistry); + + await deliverOutboundPayloads({ + cfg, + channel: "slack", + to: "C123", + payloads: [{ text: "hello" }], + accountId: "default", + replyToId: "1712000000.000001", + }); + + expect(handler).toHaveBeenCalledTimes(1); + expect(handler).toHaveBeenCalledWith( + expect.objectContaining({ + to: "C123", + content: "hello", + replyToId: "1712000000.000001", + }), + expect.objectContaining({ + channelId: "slack", + accountId: "default", + conversationId: "C123", + }), + ); + expect(sendMessageSlackMock).toHaveBeenCalledTimes(1); + }); + + it("respects cancel from the shared hook without a second adapter pass", async () => { + const hookRegistry = createEmptyPluginRegistry(); + const handler = vi.fn().mockResolvedValue({ cancel: true }); + addTestHook({ + registry: hookRegistry, + pluginId: "thread-ownership", + hookName: "message_sending", + handler: handler as PluginHookRegistration["handler"], + }); + initializeGlobalHookRunner(hookRegistry); + + const result = await deliverOutboundPayloads({ + cfg, + channel: "slack", + to: "C123", + payloads: [{ text: "hello" }], + accountId: "default", + replyToId: "1712000000.000001", + }); + + expect(handler).toHaveBeenCalledTimes(1); + expect(sendMessageSlackMock).not.toHaveBeenCalled(); + expect(result).toEqual([]); + }); +}); diff --git a/extensions/telegram/src/bot/delivery.replies.ts b/extensions/telegram/src/bot/delivery.replies.ts index 2d2e4d300d1..6b1737f4ac1 100644 --- a/extensions/telegram/src/bot/delivery.replies.ts +++ b/extensions/telegram/src/bot/delivery.replies.ts @@ -676,11 +676,15 @@ export async function deliverReplies(params: { } const rawContent = reply.text || ""; + const replyToId = + params.replyToMode === "off" ? undefined : resolveTelegramReplyId(reply.replyToId); if (hasMessageSendingHooks) { const hookResult = await hookRunner?.runMessageSending( { to: params.chatId, content: rawContent, + replyToId, + threadId: params.thread?.id, metadata: { channel: "telegram", mediaUrls: mediaList, @@ -705,8 +709,6 @@ export async function deliverReplies(params: { try { const deliveredCountBeforeReply = progress.deliveredCount; - const replyToId = - params.replyToMode === "off" ? undefined : resolveTelegramReplyId(reply.replyToId); const telegramData = reply.channelData?.telegram as TelegramReplyChannelData | undefined; const replyMarkup = buildInlineKeyboard(telegramData?.buttons); let firstDeliveredMessageId: number | undefined; diff --git a/extensions/thread-ownership/index.test.ts b/extensions/thread-ownership/index.test.ts index 8cb354a96c1..651cc57823c 100644 --- a/extensions/thread-ownership/index.test.ts +++ b/extensions/thread-ownership/index.test.ts @@ -46,14 +46,14 @@ describe("thread-ownership plugin", () => { async function sendSlackThreadMessage() { return await hooks.message_sending( - { content: "hello", metadata: { threadTs: "1234.5678", channelId: "C123" }, to: "C123" }, + { content: "hello", replyToId: "1234.5678", metadata: { channelId: "C123" }, to: "C123" }, { channelId: "slack", conversationId: "C123" }, ); } it("allows non-slack channels", async () => { const result = await hooks.message_sending( - { content: "hello", metadata: { threadTs: "1234.5678", channelId: "C123" }, to: "C123" }, + { content: "hello", replyToId: "1234.5678", metadata: { channelId: "C123" }, to: "C123" }, { channelId: "discord", conversationId: "C123" }, ); @@ -119,13 +119,17 @@ describe("thread-ownership plugin", () => { it("tracks @-mentions and skips ownership check for mentioned threads", async () => { // Simulate receiving a message that @-mentions the agent. await hooks.message_received( - { content: "Hey @TestBot help me", metadata: { threadTs: "9999.0001", channelId: "C456" } }, + { + content: "Hey @TestBot help me", + threadId: "9999.0001", + metadata: { channelId: "C456" }, + }, { channelId: "slack", conversationId: "C456" }, ); // Now send in the same thread -- should skip the ownership HTTP call. const result = await hooks.message_sending( - { content: "Sure!", metadata: { threadTs: "9999.0001", channelId: "C456" }, to: "C456" }, + { content: "Sure!", replyToId: "9999.0001", metadata: { channelId: "C456" }, to: "C456" }, { channelId: "slack", conversationId: "C456" }, ); @@ -136,7 +140,7 @@ describe("thread-ownership plugin", () => { it("ignores @-mentions on non-slack channels", async () => { // Use a unique thread key so module-level state from other tests doesn't interfere. await hooks.message_received( - { content: "Hey @TestBot", metadata: { threadTs: "7777.0001", channelId: "C999" } }, + { content: "Hey @TestBot", threadId: "7777.0001", metadata: { channelId: "C999" } }, { channelId: "discord", conversationId: "C999" }, ); @@ -146,7 +150,7 @@ describe("thread-ownership plugin", () => { ); await hooks.message_sending( - { content: "Sure!", metadata: { threadTs: "7777.0001", channelId: "C999" }, to: "C999" }, + { content: "Sure!", replyToId: "7777.0001", metadata: { channelId: "C999" }, to: "C999" }, { channelId: "slack", conversationId: "C999" }, ); @@ -155,12 +159,16 @@ describe("thread-ownership plugin", () => { it("tracks bot user ID mentions via <@U999> syntax", async () => { await hooks.message_received( - { content: "Hey <@U999> help", metadata: { threadTs: "8888.0001", channelId: "C789" } }, + { + content: "Hey <@U999> help", + threadId: "8888.0001", + metadata: { channelId: "C789" }, + }, { channelId: "slack", conversationId: "C789" }, ); const result = await hooks.message_sending( - { content: "On it!", metadata: { threadTs: "8888.0001", channelId: "C789" }, to: "C789" }, + { content: "On it!", replyToId: "8888.0001", metadata: { channelId: "C789" }, to: "C789" }, { channelId: "slack", conversationId: "C789" }, ); diff --git a/extensions/thread-ownership/index.ts b/extensions/thread-ownership/index.ts index 59a70d7498e..d0f0b81d9e2 100644 --- a/extensions/thread-ownership/index.ts +++ b/extensions/thread-ownership/index.ts @@ -20,6 +20,10 @@ type ThreadOwnershipMessageSendingResult = { cancel: true } | undefined; const mentionedThreads = new Map(); const MENTION_TTL_MS = 5 * 60 * 1000; +function resolveThreadToken(value: unknown): string { + return typeof value === "string" || typeof value === "number" ? String(value) : ""; +} + function cleanExpiredMentions(): void { const now = Date.now(); for (const [key, ts] of mentionedThreads) { @@ -72,7 +76,10 @@ export default definePluginEntry({ } const text = event.content ?? ""; - const threadTs = (event.metadata?.threadTs as string) ?? ""; + const threadTs = + resolveThreadToken(event.threadId) || + resolveThreadToken(event.metadata?.threadId) || + resolveThreadToken(event.metadata?.threadTs); const channelId = (event.metadata?.channelId as string) ?? ctx.conversationId ?? ""; if (!threadTs || !channelId) { return; @@ -92,7 +99,11 @@ export default definePluginEntry({ return undefined; } - const threadTs = (event.metadata?.threadTs as string) ?? ""; + const threadTs = + resolveThreadToken(event.replyToId) || + resolveThreadToken(event.threadId) || + resolveThreadToken(event.metadata?.threadId) || + resolveThreadToken(event.metadata?.threadTs); const channelId = (event.metadata?.channelId as string) ?? event.to; if (!threadTs) { return undefined; diff --git a/src/hooks/message-hook-mappers.ts b/src/hooks/message-hook-mappers.ts index 9be1920c157..e3a09756212 100644 --- a/src/hooks/message-hook-mappers.ts +++ b/src/hooks/message-hook-mappers.ts @@ -280,6 +280,7 @@ export function toPluginMessageReceivedEvent( from: canonical.from, content: canonical.content, timestamp: canonical.timestamp, + threadId: canonical.threadId, metadata: { to: canonical.to, provider: canonical.provider, diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index 01f1c0e3c3c..a727590f490 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -567,6 +567,8 @@ async function applyMessageSendingHook(params: { to: string; channel: Exclude; accountId?: string; + replyToId?: string | null; + threadId?: string | number | null; }): Promise<{ cancelled: boolean; payload: ReplyPayload; @@ -584,6 +586,8 @@ async function applyMessageSendingHook(params: { { to: params.to, content: params.payloadSummary.text, + replyToId: params.payload.replyToId ?? params.replyToId ?? undefined, + threadId: params.threadId ?? undefined, metadata: { channel: params.channel, accountId: params.accountId, @@ -593,6 +597,7 @@ async function applyMessageSendingHook(params: { { channelId: params.channel, accountId: params.accountId ?? undefined, + conversationId: params.to, }, ); if (sendingResult?.cancel) { @@ -827,6 +832,8 @@ async function deliverOutboundPayloadsCore( to, channel, accountId, + replyToId: params.replyToId, + threadId: params.threadId, }); if (hookResult.cancelled) { continue; diff --git a/src/plugins/hook-message.types.ts b/src/plugins/hook-message.types.ts index ddf82384c4f..ddbf30b059c 100644 --- a/src/plugins/hook-message.types.ts +++ b/src/plugins/hook-message.types.ts @@ -35,12 +35,15 @@ export type PluginHookMessageReceivedEvent = { from: string; content: string; timestamp?: number; + threadId?: string | number; metadata?: Record; }; export type PluginHookMessageSendingEvent = { to: string; content: string; + replyToId?: string | number; + threadId?: string | number; metadata?: Record; };