From 3eb6c4c8ecc35948ee57ce8a3f718c670e16fa28 Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Sun, 8 Mar 2026 16:36:14 -0400 Subject: [PATCH] matrix-js: improve thread context and auto-threading --- docs/channels/matrix-js.md | 10 ++ .../matrix-js/src/channel.directory.test.ts | 50 +++++++ extensions/matrix-js/src/channel.ts | 27 +++- .../src/matrix/monitor/handler.test.ts | 125 ++++++++++++++++++ .../matrix-js/src/matrix/monitor/handler.ts | 10 ++ .../src/matrix/monitor/thread-context.test.ts | 106 +++++++++++++++ .../src/matrix/monitor/thread-context.ts | 107 +++++++++++++++ src/channels/plugins/types.core.ts | 6 + src/infra/outbound/message-action-params.ts | 43 ++++++ .../message-action-runner.threading.test.ts | 121 +++++++++++++++++ src/infra/outbound/message-action-runner.ts | 7 +- 11 files changed, 610 insertions(+), 2 deletions(-) create mode 100644 extensions/matrix-js/src/matrix/monitor/thread-context.test.ts create mode 100644 extensions/matrix-js/src/matrix/monitor/thread-context.ts diff --git a/docs/channels/matrix-js.md b/docs/channels/matrix-js.md index 1fca30a6202..39734a56f36 100644 --- a/docs/channels/matrix-js.md +++ b/docs/channels/matrix-js.md @@ -225,6 +225,16 @@ Inbound SAS requests are auto-confirmed by the bot device, so once the user conf in their Matrix client, verification completes without requiring a manual OpenClaw tool step. Verification protocol/system notices are not forwarded to the agent chat pipeline, so they do not produce `NO_REPLY`. +## Threads + +Matrix-js supports native Matrix threads for both automatic replies and message-tool sends. + +- `threadReplies: "off"` keeps replies top-level. +- `threadReplies: "inbound"` replies inside a thread only when the inbound message was already in that thread. +- `threadReplies: "always"` keeps room replies in a thread rooted at the triggering message. +- Inbound threaded messages include the thread root message as extra agent context. +- Message-tool sends now auto-inherit the current Matrix thread when the target is the same room, or the same DM user target, unless an explicit `threadId` is provided. + ## Reactions Matrix-js supports outbound reaction actions, inbound reaction notifications, and inbound ack reactions. diff --git a/extensions/matrix-js/src/channel.directory.test.ts b/extensions/matrix-js/src/channel.directory.test.ts index 18b02e5c0f7..ec11e136eb7 100644 --- a/extensions/matrix-js/src/channel.directory.test.ts +++ b/extensions/matrix-js/src/channel.directory.test.ts @@ -104,6 +104,56 @@ describe("matrix directory", () => { ).toBe("off"); }); + it("only exposes real Matrix thread ids in tool context", () => { + expect( + matrixPlugin.threading?.buildToolContext?.({ + context: { + To: "room:!room:example.org", + ReplyToId: "$reply", + }, + hasRepliedRef: { value: false }, + }), + ).toEqual({ + currentChannelId: "room:!room:example.org", + currentThreadTs: undefined, + hasRepliedRef: { value: false }, + }); + + expect( + matrixPlugin.threading?.buildToolContext?.({ + context: { + To: "room:!room:example.org", + ReplyToId: "$reply", + MessageThreadId: "$thread", + }, + hasRepliedRef: { value: true }, + }), + ).toEqual({ + currentChannelId: "room:!room:example.org", + currentThreadTs: "$thread", + hasRepliedRef: { value: true }, + }); + }); + + it("exposes Matrix direct user id in dm tool context", () => { + expect( + matrixPlugin.threading?.buildToolContext?.({ + context: { + From: "matrix:@alice:example.org", + To: "room:!dm:example.org", + ChatType: "direct", + MessageThreadId: "$thread", + }, + hasRepliedRef: { value: false }, + }), + ).toEqual({ + currentChannelId: "room:!dm:example.org", + currentThreadTs: "$thread", + currentDirectUserId: "@alice:example.org", + hasRepliedRef: { value: false }, + }); + }); + it("resolves group mention policy from account config", () => { const cfg = { channels: { diff --git a/extensions/matrix-js/src/channel.ts b/extensions/matrix-js/src/channel.ts index a625f2cfc49..866c81a5e10 100644 --- a/extensions/matrix-js/src/channel.ts +++ b/extensions/matrix-js/src/channel.ts @@ -71,6 +71,26 @@ function normalizeMatrixMessagingTarget(raw: string): string | undefined { return stripped || undefined; } +function resolveMatrixDirectUserId(params: { + from?: string; + to?: string; + chatType?: string; +}): string | undefined { + if (params.chatType !== "direct") { + return undefined; + } + const from = params.from?.trim(); + const to = params.to?.trim(); + if (!from || !to || !/^room:/i.test(to)) { + return undefined; + } + const normalized = from + .replace(/^matrix:/i, "") + .replace(/^user:/i, "") + .trim(); + return normalized.startsWith("@") ? normalized : undefined; +} + function resolveAvatarInput(input: ChannelSetupInput): string | undefined { const avatarUrl = (input as ChannelSetupInput & { avatarUrl?: string }).avatarUrl; const trimmed = avatarUrl?.trim(); @@ -181,7 +201,12 @@ export const matrixPlugin: ChannelPlugin = { return { currentChannelId: currentTarget?.trim() || undefined, currentThreadTs: - context.MessageThreadId != null ? String(context.MessageThreadId) : context.ReplyToId, + context.MessageThreadId != null ? String(context.MessageThreadId) : undefined, + currentDirectUserId: resolveMatrixDirectUserId({ + from: context.From, + to: context.To, + chatType: context.ChatType, + }), hasRepliedRef, }; }, diff --git a/extensions/matrix-js/src/matrix/monitor/handler.test.ts b/extensions/matrix-js/src/matrix/monitor/handler.test.ts index ceb73a41c30..ed070b28a5d 100644 --- a/extensions/matrix-js/src/matrix/monitor/handler.test.ts +++ b/extensions/matrix-js/src/matrix/monitor/handler.test.ts @@ -390,6 +390,131 @@ describe("matrix monitor handler pairing account scope", () => { ); }); + it("records thread starter context for inbound thread replies", async () => { + const recordInboundSession = vi.fn(async () => {}); + const finalizeInboundContext = vi.fn((ctx) => ctx); + + const handler = createMatrixRoomMessageHandler({ + client: { + getUserId: async () => "@bot:example.org", + getEvent: async () => ({ + event_id: "$root", + sender: "@alice:example.org", + type: EventType.RoomMessage, + origin_server_ts: Date.now(), + content: { + msgtype: "m.text", + body: "Root topic", + }, + }), + } as never, + core: { + channel: { + pairing: { + readAllowFromStore: async () => [] as string[], + upsertPairingRequest: async () => ({ code: "ABCDEFGH", created: false }), + }, + commands: { + shouldHandleTextCommands: () => false, + }, + text: { + hasControlCommand: () => false, + resolveMarkdownTableMode: () => "preserve", + }, + routing: { + resolveAgentRoute: () => ({ + agentId: "ops", + channel: "matrix-js", + accountId: "ops", + sessionKey: "agent:ops:main", + mainSessionKey: "agent:ops:main", + matchedBy: "binding.account", + }), + }, + session: { + resolveStorePath: () => "/tmp/session-store", + readSessionUpdatedAt: () => undefined, + recordInboundSession, + }, + reply: { + resolveEnvelopeFormatOptions: () => ({}), + formatAgentEnvelope: ({ body }: { body: string }) => body, + finalizeInboundContext, + createReplyDispatcherWithTyping: () => ({ + dispatcher: {}, + replyOptions: {}, + markDispatchIdle: () => {}, + }), + resolveHumanDelayConfig: () => undefined, + dispatchReplyFromConfig: async () => ({ + queuedFinal: false, + counts: { final: 0, block: 0, tool: 0 }, + }), + }, + reactions: { + shouldAckReaction: () => false, + }, + }, + } as never, + cfg: {} as never, + accountId: "ops", + runtime: { + error: () => {}, + } as never, + logger: { + info: () => {}, + warn: () => {}, + } as never, + logVerboseMessage: () => {}, + allowFrom: [], + mentionRegexes: [], + groupPolicy: "open", + replyToMode: "off", + threadReplies: "inbound", + dmEnabled: true, + dmPolicy: "open", + textLimit: 8_000, + mediaMaxBytes: 10_000_000, + startupMs: 0, + startupGraceMs: 0, + directTracker: { + isDirectMessage: async () => false, + }, + getRoomInfo: async () => ({ altAliases: [] }), + getMemberDisplayName: async (_roomId, userId) => + userId === "@alice:example.org" ? "Alice" : "sender", + }); + + await handler("!room:example.org", { + type: EventType.RoomMessage, + sender: "@user:example.org", + event_id: "$reply1", + origin_server_ts: Date.now(), + content: { + msgtype: "m.text", + body: "follow up", + "m.relates_to": { + rel_type: "m.thread", + event_id: "$root", + "m.in_reply_to": { event_id: "$root" }, + }, + "m.mentions": { room: true }, + }, + } as MatrixRawEvent); + + expect(finalizeInboundContext).toHaveBeenCalledWith( + expect.objectContaining({ + MessageThreadId: "$root", + ThreadStarterBody: "Matrix thread root $root from Alice:\nRoot topic", + }), + ); + expect(recordInboundSession).toHaveBeenCalledWith( + expect.objectContaining({ + sessionKey: "agent:ops:main", + }), + ); + }); + it("enqueues system events for reactions on bot-authored messages", async () => { const { handler, enqueueSystemEvent, resolveAgentRoute } = createReactionHarness(); diff --git a/extensions/matrix-js/src/matrix/monitor/handler.ts b/extensions/matrix-js/src/matrix/monitor/handler.ts index 26423ffd45d..9fbe8f7a83e 100644 --- a/extensions/matrix-js/src/matrix/monitor/handler.ts +++ b/extensions/matrix-js/src/matrix/monitor/handler.ts @@ -36,6 +36,7 @@ import { resolveMentions } from "./mentions.js"; import { handleInboundMatrixReaction } from "./reaction-events.js"; import { deliverMatrixReplies } from "./replies.js"; import { resolveMatrixRoomConfig } from "./rooms.js"; +import { createMatrixThreadContextResolver } from "./thread-context.js"; import { resolveMatrixThreadRootId, resolveMatrixThreadTarget } from "./threads.js"; import type { MatrixRawEvent, RoomMessageEventContent } from "./types.js"; import { EventType, RelationType } from "./types.js"; @@ -108,6 +109,11 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam expiresAtMs: number; } | null = null; const pairingReplySentAtMsBySender = new Map(); + const resolveThreadContext = createMatrixThreadContextResolver({ + client, + getMemberDisplayName, + logVerboseMessage, + }); const readStoreAllowFrom = async (): Promise => { const now = Date.now(); @@ -523,6 +529,9 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam threadRootId, isThreadRoot: false, // Raw event payload does not carry explicit thread-root metadata. }); + const threadContext = threadRootId + ? await resolveThreadContext({ roomId, threadRootId }) + : undefined; const route = core.channel.routing.resolveAgentRoute({ cfg, @@ -575,6 +584,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam MessageSid: messageId, ReplyToId: threadTarget ? undefined : (replyToEventId ?? undefined), MessageThreadId: threadTarget, + ThreadStarterBody: threadContext?.threadStarterBody, Timestamp: eventTs ?? undefined, MediaPath: media?.path, MediaType: media?.contentType, diff --git a/extensions/matrix-js/src/matrix/monitor/thread-context.test.ts b/extensions/matrix-js/src/matrix/monitor/thread-context.test.ts new file mode 100644 index 00000000000..f09655bf4c7 --- /dev/null +++ b/extensions/matrix-js/src/matrix/monitor/thread-context.test.ts @@ -0,0 +1,106 @@ +import { describe, expect, it, vi } from "vitest"; +import { + createMatrixThreadContextResolver, + summarizeMatrixThreadStarterEvent, +} from "./thread-context.js"; +import type { MatrixRawEvent } from "./types.js"; + +describe("matrix thread context", () => { + it("summarizes thread starter events from body text", () => { + expect( + summarizeMatrixThreadStarterEvent({ + event_id: "$root", + sender: "@alice:example.org", + type: "m.room.message", + origin_server_ts: Date.now(), + content: { + msgtype: "m.text", + body: " Thread starter body ", + }, + } as MatrixRawEvent), + ).toBe("Thread starter body"); + }); + + it("resolves and caches thread starter context", async () => { + const getEvent = vi.fn(async () => ({ + event_id: "$root", + sender: "@alice:example.org", + type: "m.room.message", + origin_server_ts: Date.now(), + content: { + msgtype: "m.text", + body: "Root topic", + }, + })); + const getMemberDisplayName = vi.fn(async () => "Alice"); + const resolveThreadContext = createMatrixThreadContextResolver({ + client: { + getEvent, + } as never, + getMemberDisplayName, + logVerboseMessage: () => {}, + }); + + await expect( + resolveThreadContext({ + roomId: "!room:example.org", + threadRootId: "$root", + }), + ).resolves.toEqual({ + threadStarterBody: "Matrix thread root $root from Alice:\nRoot topic", + }); + + await resolveThreadContext({ + roomId: "!room:example.org", + threadRootId: "$root", + }); + + expect(getEvent).toHaveBeenCalledTimes(1); + expect(getMemberDisplayName).toHaveBeenCalledTimes(1); + }); + + it("does not cache thread starter fetch failures", async () => { + const getEvent = vi + .fn() + .mockRejectedValueOnce(new Error("temporary failure")) + .mockResolvedValueOnce({ + event_id: "$root", + sender: "@alice:example.org", + type: "m.room.message", + origin_server_ts: Date.now(), + content: { + msgtype: "m.text", + body: "Recovered topic", + }, + }); + const getMemberDisplayName = vi.fn(async () => "Alice"); + const resolveThreadContext = createMatrixThreadContextResolver({ + client: { + getEvent, + } as never, + getMemberDisplayName, + logVerboseMessage: () => {}, + }); + + await expect( + resolveThreadContext({ + roomId: "!room:example.org", + threadRootId: "$root", + }), + ).resolves.toEqual({ + threadStarterBody: "Matrix thread root $root", + }); + + await expect( + resolveThreadContext({ + roomId: "!room:example.org", + threadRootId: "$root", + }), + ).resolves.toEqual({ + threadStarterBody: "Matrix thread root $root from Alice:\nRecovered topic", + }); + + expect(getEvent).toHaveBeenCalledTimes(2); + expect(getMemberDisplayName).toHaveBeenCalledTimes(1); + }); +}); diff --git a/extensions/matrix-js/src/matrix/monitor/thread-context.ts b/extensions/matrix-js/src/matrix/monitor/thread-context.ts new file mode 100644 index 00000000000..08a81a0d782 --- /dev/null +++ b/extensions/matrix-js/src/matrix/monitor/thread-context.ts @@ -0,0 +1,107 @@ +import type { MatrixClient } from "../sdk.js"; +import type { MatrixRawEvent } from "./types.js"; + +const MAX_TRACKED_THREAD_STARTERS = 256; +const MAX_THREAD_STARTER_BODY_LENGTH = 500; + +type MatrixThreadContext = { + threadStarterBody?: string; +}; + +function trimMaybeString(value: unknown): string | undefined { + if (typeof value !== "string") { + return undefined; + } + const trimmed = value.trim(); + return trimmed || undefined; +} + +function truncateThreadStarterBody(value: string): string { + if (value.length <= MAX_THREAD_STARTER_BODY_LENGTH) { + return value; + } + return `${value.slice(0, MAX_THREAD_STARTER_BODY_LENGTH - 3)}...`; +} + +export function summarizeMatrixThreadStarterEvent(event: MatrixRawEvent): string | undefined { + const content = event.content as { body?: unknown; msgtype?: unknown }; + const body = trimMaybeString(content.body); + if (body) { + return truncateThreadStarterBody(body); + } + const msgtype = trimMaybeString(content.msgtype); + if (msgtype) { + return `Matrix ${msgtype} message`; + } + const eventType = trimMaybeString(event.type); + return eventType ? `Matrix ${eventType} event` : undefined; +} + +function formatMatrixThreadStarterBody(params: { + threadRootId: string; + senderName?: string; + senderId?: string; + summary?: string; +}): string { + const senderLabel = params.senderName ?? params.senderId ?? "unknown sender"; + const lines = [`Matrix thread root ${params.threadRootId} from ${senderLabel}:`]; + if (params.summary) { + lines.push(params.summary); + } + return lines.join("\n"); +} + +export function createMatrixThreadContextResolver(params: { + client: MatrixClient; + getMemberDisplayName: (roomId: string, userId: string) => Promise; + logVerboseMessage: (message: string) => void; +}) { + const cache = new Map(); + + const remember = (key: string, value: MatrixThreadContext): MatrixThreadContext => { + cache.set(key, value); + if (cache.size > MAX_TRACKED_THREAD_STARTERS) { + const oldest = cache.keys().next().value; + if (typeof oldest === "string") { + cache.delete(oldest); + } + } + return value; + }; + + return async (input: { roomId: string; threadRootId: string }): Promise => { + const cacheKey = `${input.roomId}:${input.threadRootId}`; + const cached = cache.get(cacheKey); + if (cached) { + return cached; + } + + const rootEvent = await params.client + .getEvent(input.roomId, input.threadRootId) + .catch((err) => { + params.logVerboseMessage( + `matrix: failed resolving thread root room=${input.roomId} id=${input.threadRootId}: ${String(err)}`, + ); + return null; + }); + if (!rootEvent) { + return { + threadStarterBody: `Matrix thread root ${input.threadRootId}`, + }; + } + + const rawEvent = rootEvent as MatrixRawEvent; + const senderId = trimMaybeString(rawEvent.sender); + const senderName = + senderId && + (await params.getMemberDisplayName(input.roomId, senderId).catch(() => undefined)); + return remember(cacheKey, { + threadStarterBody: formatMatrixThreadStarterBody({ + threadRootId: input.threadRootId, + senderId, + senderName, + summary: summarizeMatrixThreadStarterEvent(rawEvent), + }), + }); + }; +} diff --git a/src/channels/plugins/types.core.ts b/src/channels/plugins/types.core.ts index 3bf3c07ddc6..2dd857ac7f6 100644 --- a/src/channels/plugins/types.core.ts +++ b/src/channels/plugins/types.core.ts @@ -273,6 +273,12 @@ export type ChannelThreadingToolContext = { currentChannelProvider?: ChannelId; currentThreadTs?: string; currentMessageId?: string | number; + /** + * Optional direct-chat participant identifier for channels whose outbound + * tool targets can address either the backing conversation id or the direct + * participant id. + */ + currentDirectUserId?: string; replyToMode?: "off" | "first" | "all"; hasRepliedRef?: { value: boolean }; /** diff --git a/src/infra/outbound/message-action-params.ts b/src/infra/outbound/message-action-params.ts index 037a7806f16..bdcdd89af7c 100644 --- a/src/infra/outbound/message-action-params.ts +++ b/src/infra/outbound/message-action-params.ts @@ -71,6 +71,49 @@ export function resolveTelegramAutoThreadId(params: { return context.currentThreadTs; } +function normalizeMatrixThreadTarget(raw: string): string | undefined { + let normalized = raw.trim(); + if (!normalized) { + return undefined; + } + if (normalized.toLowerCase().startsWith("matrix:")) { + normalized = normalized.slice("matrix:".length).trim(); + } + normalized = normalized.replace(/^(room|channel|user):/i, "").trim(); + return normalized || undefined; +} + +function normalizeMatrixDirectUserTarget(raw: string): string | undefined { + const normalized = normalizeMatrixThreadTarget(raw); + return normalized?.startsWith("@") ? normalized : undefined; +} + +export function resolveMatrixAutoThreadId(params: { + to: string; + toolContext?: ChannelThreadingToolContext; +}): string | undefined { + const context = params.toolContext; + if (!context?.currentThreadTs || !context.currentChannelId) { + return undefined; + } + const target = normalizeMatrixThreadTarget(params.to); + const currentChannel = normalizeMatrixThreadTarget(context.currentChannelId); + if (!target || !currentChannel) { + return undefined; + } + if (target.toLowerCase() !== currentChannel.toLowerCase()) { + const directTarget = normalizeMatrixDirectUserTarget(params.to); + const currentDirectUserId = normalizeMatrixDirectUserTarget(context.currentDirectUserId ?? ""); + if (!directTarget || !currentDirectUserId) { + return undefined; + } + if (directTarget.toLowerCase() !== currentDirectUserId.toLowerCase()) { + return undefined; + } + } + return context.currentThreadTs; +} + function resolveAttachmentMaxBytes(params: { cfg: OpenClawConfig; channel: ChannelId; diff --git a/src/infra/outbound/message-action-runner.threading.test.ts b/src/infra/outbound/message-action-runner.threading.test.ts index b668aea14b5..14aba71a07d 100644 --- a/src/infra/outbound/message-action-runner.threading.test.ts +++ b/src/infra/outbound/message-action-runner.threading.test.ts @@ -1,4 +1,5 @@ import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import { matrixPlugin } from "../../../extensions/matrix-js/src/channel.js"; import { slackPlugin } from "../../../extensions/slack/src/channel.js"; import { telegramPlugin } from "../../../extensions/telegram/src/channel.js"; import type { OpenClawConfig } from "../../config/config.js"; @@ -49,6 +50,15 @@ const telegramConfig = { }, } as OpenClawConfig; +const matrixConfig = { + channels: { + "matrix-js": { + homeserver: "https://matrix.example.org", + accessToken: "matrix-test", + }, + }, +} as OpenClawConfig; + async function runThreadingAction(params: { cfg: OpenClawConfig; actionParams: Record; @@ -80,23 +90,42 @@ const defaultTelegramToolContext = { currentThreadTs: "42", } as const; +const defaultMatrixToolContext = { + currentChannelId: "room:!room:example.org", + currentThreadTs: "$thread", +} as const; + +const defaultMatrixDmToolContext = { + currentChannelId: "room:!dm:example.org", + currentThreadTs: "$thread", + currentDirectUserId: "@alice:example.org", +} as const; + let createPluginRuntime: typeof import("../../plugins/runtime/index.js").createPluginRuntime; +let setMatrixRuntime: typeof import("../../../extensions/matrix-js/src/runtime.js").setMatrixRuntime; let setSlackRuntime: typeof import("../../../extensions/slack/src/runtime.js").setSlackRuntime; let setTelegramRuntime: typeof import("../../../extensions/telegram/src/runtime.js").setTelegramRuntime; describe("runMessageAction threading auto-injection", () => { beforeAll(async () => { ({ createPluginRuntime } = await import("../../plugins/runtime/index.js")); + ({ setMatrixRuntime } = await import("../../../extensions/matrix-js/src/runtime.js")); ({ setSlackRuntime } = await import("../../../extensions/slack/src/runtime.js")); ({ setTelegramRuntime } = await import("../../../extensions/telegram/src/runtime.js")); }); beforeEach(() => { const runtime = createPluginRuntime(); + setMatrixRuntime(runtime); setSlackRuntime(runtime); setTelegramRuntime(runtime); setActivePluginRegistry( createTestRegistry([ + { + pluginId: "matrix-js", + source: "test", + plugin: matrixPlugin, + }, { pluginId: "slack", source: "test", @@ -221,4 +250,96 @@ describe("runMessageAction threading auto-injection", () => { expect(call?.replyToId).toBe("777"); expect(call?.ctx?.params?.replyTo).toBe("777"); }); + + it.each([ + { + name: "injects threadId for bare room id", + target: "!room:example.org", + expectedThreadId: "$thread", + }, + { + name: "injects threadId for room target prefix", + target: "room:!room:example.org", + expectedThreadId: "$thread", + }, + { + name: "injects threadId for matrix room target", + target: "matrix:room:!room:example.org", + expectedThreadId: "$thread", + }, + { + name: "skips threadId when target room differs", + target: "!other:example.org", + expectedThreadId: undefined, + }, + ] as const)("matrix auto-threading: $name", async (testCase) => { + mockHandledSendAction(); + + const call = await runThreadingAction({ + cfg: matrixConfig, + actionParams: { + channel: "matrix-js", + target: testCase.target, + message: "hi", + }, + toolContext: defaultMatrixToolContext, + }); + + expect(call?.ctx?.params?.threadId).toBe(testCase.expectedThreadId); + if (testCase.expectedThreadId !== undefined) { + expect(call?.threadId).toBe(testCase.expectedThreadId); + } + }); + + it("uses explicit matrix threadId when provided", async () => { + mockHandledSendAction(); + + const call = await runThreadingAction({ + cfg: matrixConfig, + actionParams: { + channel: "matrix-js", + target: "room:!room:example.org", + message: "hi", + threadId: "$explicit", + }, + toolContext: defaultMatrixToolContext, + }); + + expect(call?.threadId).toBe("$explicit"); + expect(call?.ctx?.params?.threadId).toBe("$explicit"); + }); + + it("injects threadId for matching Matrix dm user target", async () => { + mockHandledSendAction(); + + const call = await runThreadingAction({ + cfg: matrixConfig, + actionParams: { + channel: "matrix-js", + target: "user:@alice:example.org", + message: "hi", + }, + toolContext: defaultMatrixDmToolContext, + }); + + expect(call?.threadId).toBe("$thread"); + expect(call?.ctx?.params?.threadId).toBe("$thread"); + }); + + it("skips threadId for different Matrix dm user target", async () => { + mockHandledSendAction(); + + const call = await runThreadingAction({ + cfg: matrixConfig, + actionParams: { + channel: "matrix-js", + target: "user:@bob:example.org", + message: "hi", + }, + toolContext: defaultMatrixDmToolContext, + }); + + expect(call?.threadId).toBeUndefined(); + expect(call?.ctx?.params?.threadId).toBeUndefined(); + }); }); diff --git a/src/infra/outbound/message-action-runner.ts b/src/infra/outbound/message-action-runner.ts index c703cd34d24..418143493f1 100644 --- a/src/infra/outbound/message-action-runner.ts +++ b/src/infra/outbound/message-action-runner.ts @@ -35,6 +35,7 @@ import { parseComponentsParam, readBooleanParam, resolveAttachmentMediaPolicy, + resolveMatrixAutoThreadId, resolveSlackAutoThreadId, resolveTelegramAutoThreadId, } from "./message-action-params.js"; @@ -78,7 +79,11 @@ function resolveAndApplyOutboundThreadId( ctx.channel === "telegram" && !threadId ? resolveTelegramAutoThreadId({ to: ctx.to, toolContext: ctx.toolContext }) : undefined; - const resolved = threadId ?? slackAutoThreadId ?? telegramAutoThreadId; + const matrixAutoThreadId = + ctx.channel === "matrix-js" && !threadId + ? resolveMatrixAutoThreadId({ to: ctx.to, toolContext: ctx.toolContext }) + : undefined; + const resolved = threadId ?? slackAutoThreadId ?? telegramAutoThreadId ?? matrixAutoThreadId; // Write auto-resolved threadId back into params so downstream dispatch // (plugin `readStringParam(params, "threadId")`) picks it up. if (resolved && !params.threadId) {