matrix-js: improve thread context and auto-threading

This commit is contained in:
Gustavo Madeira Santana
2026-03-08 16:36:14 -04:00
parent 149729f56b
commit e4d041757d
11 changed files with 610 additions and 2 deletions

View File

@@ -225,6 +225,16 @@ Inbound SAS requests are auto-confirmed by the bot device, so once the user conf
in their Matrix client, verification completes without requiring a manual OpenClaw tool step.
Verification protocol/system notices are not forwarded to the agent chat pipeline, so they do not produce `NO_REPLY`.
## Threads
Matrix-js supports native Matrix threads for both automatic replies and message-tool sends.
- `threadReplies: "off"` keeps replies top-level.
- `threadReplies: "inbound"` replies inside a thread only when the inbound message was already in that thread.
- `threadReplies: "always"` keeps room replies in a thread rooted at the triggering message.
- Inbound threaded messages include the thread root message as extra agent context.
- Message-tool sends now auto-inherit the current Matrix thread when the target is the same room, or the same DM user target, unless an explicit `threadId` is provided.
## Reactions
Matrix-js supports outbound reaction actions, inbound reaction notifications, and inbound ack reactions.

View File

@@ -104,6 +104,56 @@ describe("matrix directory", () => {
).toBe("off");
});
it("only exposes real Matrix thread ids in tool context", () => {
expect(
matrixPlugin.threading?.buildToolContext?.({
context: {
To: "room:!room:example.org",
ReplyToId: "$reply",
},
hasRepliedRef: { value: false },
}),
).toEqual({
currentChannelId: "room:!room:example.org",
currentThreadTs: undefined,
hasRepliedRef: { value: false },
});
expect(
matrixPlugin.threading?.buildToolContext?.({
context: {
To: "room:!room:example.org",
ReplyToId: "$reply",
MessageThreadId: "$thread",
},
hasRepliedRef: { value: true },
}),
).toEqual({
currentChannelId: "room:!room:example.org",
currentThreadTs: "$thread",
hasRepliedRef: { value: true },
});
});
it("exposes Matrix direct user id in dm tool context", () => {
expect(
matrixPlugin.threading?.buildToolContext?.({
context: {
From: "matrix:@alice:example.org",
To: "room:!dm:example.org",
ChatType: "direct",
MessageThreadId: "$thread",
},
hasRepliedRef: { value: false },
}),
).toEqual({
currentChannelId: "room:!dm:example.org",
currentThreadTs: "$thread",
currentDirectUserId: "@alice:example.org",
hasRepliedRef: { value: false },
});
});
it("resolves group mention policy from account config", () => {
const cfg = {
channels: {

View File

@@ -71,6 +71,26 @@ function normalizeMatrixMessagingTarget(raw: string): string | undefined {
return stripped || undefined;
}
function resolveMatrixDirectUserId(params: {
from?: string;
to?: string;
chatType?: string;
}): string | undefined {
if (params.chatType !== "direct") {
return undefined;
}
const from = params.from?.trim();
const to = params.to?.trim();
if (!from || !to || !/^room:/i.test(to)) {
return undefined;
}
const normalized = from
.replace(/^matrix:/i, "")
.replace(/^user:/i, "")
.trim();
return normalized.startsWith("@") ? normalized : undefined;
}
function resolveAvatarInput(input: ChannelSetupInput): string | undefined {
const avatarUrl = (input as ChannelSetupInput & { avatarUrl?: string }).avatarUrl;
const trimmed = avatarUrl?.trim();
@@ -181,7 +201,12 @@ export const matrixPlugin: ChannelPlugin<ResolvedMatrixAccount> = {
return {
currentChannelId: currentTarget?.trim() || undefined,
currentThreadTs:
context.MessageThreadId != null ? String(context.MessageThreadId) : context.ReplyToId,
context.MessageThreadId != null ? String(context.MessageThreadId) : undefined,
currentDirectUserId: resolveMatrixDirectUserId({
from: context.From,
to: context.To,
chatType: context.ChatType,
}),
hasRepliedRef,
};
},

View File

@@ -390,6 +390,131 @@ describe("matrix monitor handler pairing account scope", () => {
);
});
it("records thread starter context for inbound thread replies", async () => {
const recordInboundSession = vi.fn(async () => {});
const finalizeInboundContext = vi.fn((ctx) => ctx);
const handler = createMatrixRoomMessageHandler({
client: {
getUserId: async () => "@bot:example.org",
getEvent: async () => ({
event_id: "$root",
sender: "@alice:example.org",
type: EventType.RoomMessage,
origin_server_ts: Date.now(),
content: {
msgtype: "m.text",
body: "Root topic",
},
}),
} as never,
core: {
channel: {
pairing: {
readAllowFromStore: async () => [] as string[],
upsertPairingRequest: async () => ({ code: "ABCDEFGH", created: false }),
},
commands: {
shouldHandleTextCommands: () => false,
},
text: {
hasControlCommand: () => false,
resolveMarkdownTableMode: () => "preserve",
},
routing: {
resolveAgentRoute: () => ({
agentId: "ops",
channel: "matrix-js",
accountId: "ops",
sessionKey: "agent:ops:main",
mainSessionKey: "agent:ops:main",
matchedBy: "binding.account",
}),
},
session: {
resolveStorePath: () => "/tmp/session-store",
readSessionUpdatedAt: () => undefined,
recordInboundSession,
},
reply: {
resolveEnvelopeFormatOptions: () => ({}),
formatAgentEnvelope: ({ body }: { body: string }) => body,
finalizeInboundContext,
createReplyDispatcherWithTyping: () => ({
dispatcher: {},
replyOptions: {},
markDispatchIdle: () => {},
}),
resolveHumanDelayConfig: () => undefined,
dispatchReplyFromConfig: async () => ({
queuedFinal: false,
counts: { final: 0, block: 0, tool: 0 },
}),
},
reactions: {
shouldAckReaction: () => false,
},
},
} as never,
cfg: {} as never,
accountId: "ops",
runtime: {
error: () => {},
} as never,
logger: {
info: () => {},
warn: () => {},
} as never,
logVerboseMessage: () => {},
allowFrom: [],
mentionRegexes: [],
groupPolicy: "open",
replyToMode: "off",
threadReplies: "inbound",
dmEnabled: true,
dmPolicy: "open",
textLimit: 8_000,
mediaMaxBytes: 10_000_000,
startupMs: 0,
startupGraceMs: 0,
directTracker: {
isDirectMessage: async () => false,
},
getRoomInfo: async () => ({ altAliases: [] }),
getMemberDisplayName: async (_roomId, userId) =>
userId === "@alice:example.org" ? "Alice" : "sender",
});
await handler("!room:example.org", {
type: EventType.RoomMessage,
sender: "@user:example.org",
event_id: "$reply1",
origin_server_ts: Date.now(),
content: {
msgtype: "m.text",
body: "follow up",
"m.relates_to": {
rel_type: "m.thread",
event_id: "$root",
"m.in_reply_to": { event_id: "$root" },
},
"m.mentions": { room: true },
},
} as MatrixRawEvent);
expect(finalizeInboundContext).toHaveBeenCalledWith(
expect.objectContaining({
MessageThreadId: "$root",
ThreadStarterBody: "Matrix thread root $root from Alice:\nRoot topic",
}),
);
expect(recordInboundSession).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey: "agent:ops:main",
}),
);
});
it("enqueues system events for reactions on bot-authored messages", async () => {
const { handler, enqueueSystemEvent, resolveAgentRoute } = createReactionHarness();

View File

@@ -36,6 +36,7 @@ import { resolveMentions } from "./mentions.js";
import { handleInboundMatrixReaction } from "./reaction-events.js";
import { deliverMatrixReplies } from "./replies.js";
import { resolveMatrixRoomConfig } from "./rooms.js";
import { createMatrixThreadContextResolver } from "./thread-context.js";
import { resolveMatrixThreadRootId, resolveMatrixThreadTarget } from "./threads.js";
import type { MatrixRawEvent, RoomMessageEventContent } from "./types.js";
import { EventType, RelationType } from "./types.js";
@@ -108,6 +109,11 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
expiresAtMs: number;
} | null = null;
const pairingReplySentAtMsBySender = new Map<string, number>();
const resolveThreadContext = createMatrixThreadContextResolver({
client,
getMemberDisplayName,
logVerboseMessage,
});
const readStoreAllowFrom = async (): Promise<string[]> => {
const now = Date.now();
@@ -523,6 +529,9 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
threadRootId,
isThreadRoot: false, // Raw event payload does not carry explicit thread-root metadata.
});
const threadContext = threadRootId
? await resolveThreadContext({ roomId, threadRootId })
: undefined;
const route = core.channel.routing.resolveAgentRoute({
cfg,
@@ -575,6 +584,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
MessageSid: messageId,
ReplyToId: threadTarget ? undefined : (replyToEventId ?? undefined),
MessageThreadId: threadTarget,
ThreadStarterBody: threadContext?.threadStarterBody,
Timestamp: eventTs ?? undefined,
MediaPath: media?.path,
MediaType: media?.contentType,

View File

@@ -0,0 +1,106 @@
import { describe, expect, it, vi } from "vitest";
import {
createMatrixThreadContextResolver,
summarizeMatrixThreadStarterEvent,
} from "./thread-context.js";
import type { MatrixRawEvent } from "./types.js";
describe("matrix thread context", () => {
it("summarizes thread starter events from body text", () => {
expect(
summarizeMatrixThreadStarterEvent({
event_id: "$root",
sender: "@alice:example.org",
type: "m.room.message",
origin_server_ts: Date.now(),
content: {
msgtype: "m.text",
body: " Thread starter body ",
},
} as MatrixRawEvent),
).toBe("Thread starter body");
});
it("resolves and caches thread starter context", async () => {
const getEvent = vi.fn(async () => ({
event_id: "$root",
sender: "@alice:example.org",
type: "m.room.message",
origin_server_ts: Date.now(),
content: {
msgtype: "m.text",
body: "Root topic",
},
}));
const getMemberDisplayName = vi.fn(async () => "Alice");
const resolveThreadContext = createMatrixThreadContextResolver({
client: {
getEvent,
} as never,
getMemberDisplayName,
logVerboseMessage: () => {},
});
await expect(
resolveThreadContext({
roomId: "!room:example.org",
threadRootId: "$root",
}),
).resolves.toEqual({
threadStarterBody: "Matrix thread root $root from Alice:\nRoot topic",
});
await resolveThreadContext({
roomId: "!room:example.org",
threadRootId: "$root",
});
expect(getEvent).toHaveBeenCalledTimes(1);
expect(getMemberDisplayName).toHaveBeenCalledTimes(1);
});
it("does not cache thread starter fetch failures", async () => {
const getEvent = vi
.fn()
.mockRejectedValueOnce(new Error("temporary failure"))
.mockResolvedValueOnce({
event_id: "$root",
sender: "@alice:example.org",
type: "m.room.message",
origin_server_ts: Date.now(),
content: {
msgtype: "m.text",
body: "Recovered topic",
},
});
const getMemberDisplayName = vi.fn(async () => "Alice");
const resolveThreadContext = createMatrixThreadContextResolver({
client: {
getEvent,
} as never,
getMemberDisplayName,
logVerboseMessage: () => {},
});
await expect(
resolveThreadContext({
roomId: "!room:example.org",
threadRootId: "$root",
}),
).resolves.toEqual({
threadStarterBody: "Matrix thread root $root",
});
await expect(
resolveThreadContext({
roomId: "!room:example.org",
threadRootId: "$root",
}),
).resolves.toEqual({
threadStarterBody: "Matrix thread root $root from Alice:\nRecovered topic",
});
expect(getEvent).toHaveBeenCalledTimes(2);
expect(getMemberDisplayName).toHaveBeenCalledTimes(1);
});
});

View File

@@ -0,0 +1,107 @@
import type { MatrixClient } from "../sdk.js";
import type { MatrixRawEvent } from "./types.js";
const MAX_TRACKED_THREAD_STARTERS = 256;
const MAX_THREAD_STARTER_BODY_LENGTH = 500;
type MatrixThreadContext = {
threadStarterBody?: string;
};
function trimMaybeString(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed || undefined;
}
function truncateThreadStarterBody(value: string): string {
if (value.length <= MAX_THREAD_STARTER_BODY_LENGTH) {
return value;
}
return `${value.slice(0, MAX_THREAD_STARTER_BODY_LENGTH - 3)}...`;
}
export function summarizeMatrixThreadStarterEvent(event: MatrixRawEvent): string | undefined {
const content = event.content as { body?: unknown; msgtype?: unknown };
const body = trimMaybeString(content.body);
if (body) {
return truncateThreadStarterBody(body);
}
const msgtype = trimMaybeString(content.msgtype);
if (msgtype) {
return `Matrix ${msgtype} message`;
}
const eventType = trimMaybeString(event.type);
return eventType ? `Matrix ${eventType} event` : undefined;
}
function formatMatrixThreadStarterBody(params: {
threadRootId: string;
senderName?: string;
senderId?: string;
summary?: string;
}): string {
const senderLabel = params.senderName ?? params.senderId ?? "unknown sender";
const lines = [`Matrix thread root ${params.threadRootId} from ${senderLabel}:`];
if (params.summary) {
lines.push(params.summary);
}
return lines.join("\n");
}
export function createMatrixThreadContextResolver(params: {
client: MatrixClient;
getMemberDisplayName: (roomId: string, userId: string) => Promise<string>;
logVerboseMessage: (message: string) => void;
}) {
const cache = new Map<string, MatrixThreadContext>();
const remember = (key: string, value: MatrixThreadContext): MatrixThreadContext => {
cache.set(key, value);
if (cache.size > MAX_TRACKED_THREAD_STARTERS) {
const oldest = cache.keys().next().value;
if (typeof oldest === "string") {
cache.delete(oldest);
}
}
return value;
};
return async (input: { roomId: string; threadRootId: string }): Promise<MatrixThreadContext> => {
const cacheKey = `${input.roomId}:${input.threadRootId}`;
const cached = cache.get(cacheKey);
if (cached) {
return cached;
}
const rootEvent = await params.client
.getEvent(input.roomId, input.threadRootId)
.catch((err) => {
params.logVerboseMessage(
`matrix: failed resolving thread root room=${input.roomId} id=${input.threadRootId}: ${String(err)}`,
);
return null;
});
if (!rootEvent) {
return {
threadStarterBody: `Matrix thread root ${input.threadRootId}`,
};
}
const rawEvent = rootEvent as MatrixRawEvent;
const senderId = trimMaybeString(rawEvent.sender);
const senderName =
senderId &&
(await params.getMemberDisplayName(input.roomId, senderId).catch(() => undefined));
return remember(cacheKey, {
threadStarterBody: formatMatrixThreadStarterBody({
threadRootId: input.threadRootId,
senderId,
senderName,
summary: summarizeMatrixThreadStarterEvent(rawEvent),
}),
});
};
}

View File

@@ -273,6 +273,12 @@ export type ChannelThreadingToolContext = {
currentChannelProvider?: ChannelId;
currentThreadTs?: string;
currentMessageId?: string | number;
/**
* Optional direct-chat participant identifier for channels whose outbound
* tool targets can address either the backing conversation id or the direct
* participant id.
*/
currentDirectUserId?: string;
replyToMode?: "off" | "first" | "all";
hasRepliedRef?: { value: boolean };
/**

View File

@@ -71,6 +71,49 @@ export function resolveTelegramAutoThreadId(params: {
return context.currentThreadTs;
}
function normalizeMatrixThreadTarget(raw: string): string | undefined {
let normalized = raw.trim();
if (!normalized) {
return undefined;
}
if (normalized.toLowerCase().startsWith("matrix:")) {
normalized = normalized.slice("matrix:".length).trim();
}
normalized = normalized.replace(/^(room|channel|user):/i, "").trim();
return normalized || undefined;
}
function normalizeMatrixDirectUserTarget(raw: string): string | undefined {
const normalized = normalizeMatrixThreadTarget(raw);
return normalized?.startsWith("@") ? normalized : undefined;
}
export function resolveMatrixAutoThreadId(params: {
to: string;
toolContext?: ChannelThreadingToolContext;
}): string | undefined {
const context = params.toolContext;
if (!context?.currentThreadTs || !context.currentChannelId) {
return undefined;
}
const target = normalizeMatrixThreadTarget(params.to);
const currentChannel = normalizeMatrixThreadTarget(context.currentChannelId);
if (!target || !currentChannel) {
return undefined;
}
if (target.toLowerCase() !== currentChannel.toLowerCase()) {
const directTarget = normalizeMatrixDirectUserTarget(params.to);
const currentDirectUserId = normalizeMatrixDirectUserTarget(context.currentDirectUserId ?? "");
if (!directTarget || !currentDirectUserId) {
return undefined;
}
if (directTarget.toLowerCase() !== currentDirectUserId.toLowerCase()) {
return undefined;
}
}
return context.currentThreadTs;
}
function resolveAttachmentMaxBytes(params: {
cfg: OpenClawConfig;
channel: ChannelId;

View File

@@ -1,4 +1,5 @@
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { matrixPlugin } from "../../../extensions/matrix-js/src/channel.js";
import { slackPlugin } from "../../../extensions/slack/src/channel.js";
import { telegramPlugin } from "../../../extensions/telegram/src/channel.js";
import type { OpenClawConfig } from "../../config/config.js";
@@ -49,6 +50,15 @@ const telegramConfig = {
},
} as OpenClawConfig;
const matrixConfig = {
channels: {
"matrix-js": {
homeserver: "https://matrix.example.org",
accessToken: "matrix-test",
},
},
} as OpenClawConfig;
async function runThreadingAction(params: {
cfg: OpenClawConfig;
actionParams: Record<string, unknown>;
@@ -80,23 +90,42 @@ const defaultTelegramToolContext = {
currentThreadTs: "42",
} as const;
const defaultMatrixToolContext = {
currentChannelId: "room:!room:example.org",
currentThreadTs: "$thread",
} as const;
const defaultMatrixDmToolContext = {
currentChannelId: "room:!dm:example.org",
currentThreadTs: "$thread",
currentDirectUserId: "@alice:example.org",
} as const;
let createPluginRuntime: typeof import("../../plugins/runtime/index.js").createPluginRuntime;
let setMatrixRuntime: typeof import("../../../extensions/matrix-js/src/runtime.js").setMatrixRuntime;
let setSlackRuntime: typeof import("../../../extensions/slack/src/runtime.js").setSlackRuntime;
let setTelegramRuntime: typeof import("../../../extensions/telegram/src/runtime.js").setTelegramRuntime;
describe("runMessageAction threading auto-injection", () => {
beforeAll(async () => {
({ createPluginRuntime } = await import("../../plugins/runtime/index.js"));
({ setMatrixRuntime } = await import("../../../extensions/matrix-js/src/runtime.js"));
({ setSlackRuntime } = await import("../../../extensions/slack/src/runtime.js"));
({ setTelegramRuntime } = await import("../../../extensions/telegram/src/runtime.js"));
});
beforeEach(() => {
const runtime = createPluginRuntime();
setMatrixRuntime(runtime);
setSlackRuntime(runtime);
setTelegramRuntime(runtime);
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix-js",
source: "test",
plugin: matrixPlugin,
},
{
pluginId: "slack",
source: "test",
@@ -221,4 +250,96 @@ describe("runMessageAction threading auto-injection", () => {
expect(call?.replyToId).toBe("777");
expect(call?.ctx?.params?.replyTo).toBe("777");
});
it.each([
{
name: "injects threadId for bare room id",
target: "!room:example.org",
expectedThreadId: "$thread",
},
{
name: "injects threadId for room target prefix",
target: "room:!room:example.org",
expectedThreadId: "$thread",
},
{
name: "injects threadId for matrix room target",
target: "matrix:room:!room:example.org",
expectedThreadId: "$thread",
},
{
name: "skips threadId when target room differs",
target: "!other:example.org",
expectedThreadId: undefined,
},
] as const)("matrix auto-threading: $name", async (testCase) => {
mockHandledSendAction();
const call = await runThreadingAction({
cfg: matrixConfig,
actionParams: {
channel: "matrix-js",
target: testCase.target,
message: "hi",
},
toolContext: defaultMatrixToolContext,
});
expect(call?.ctx?.params?.threadId).toBe(testCase.expectedThreadId);
if (testCase.expectedThreadId !== undefined) {
expect(call?.threadId).toBe(testCase.expectedThreadId);
}
});
it("uses explicit matrix threadId when provided", async () => {
mockHandledSendAction();
const call = await runThreadingAction({
cfg: matrixConfig,
actionParams: {
channel: "matrix-js",
target: "room:!room:example.org",
message: "hi",
threadId: "$explicit",
},
toolContext: defaultMatrixToolContext,
});
expect(call?.threadId).toBe("$explicit");
expect(call?.ctx?.params?.threadId).toBe("$explicit");
});
it("injects threadId for matching Matrix dm user target", async () => {
mockHandledSendAction();
const call = await runThreadingAction({
cfg: matrixConfig,
actionParams: {
channel: "matrix-js",
target: "user:@alice:example.org",
message: "hi",
},
toolContext: defaultMatrixDmToolContext,
});
expect(call?.threadId).toBe("$thread");
expect(call?.ctx?.params?.threadId).toBe("$thread");
});
it("skips threadId for different Matrix dm user target", async () => {
mockHandledSendAction();
const call = await runThreadingAction({
cfg: matrixConfig,
actionParams: {
channel: "matrix-js",
target: "user:@bob:example.org",
message: "hi",
},
toolContext: defaultMatrixDmToolContext,
});
expect(call?.threadId).toBeUndefined();
expect(call?.ctx?.params?.threadId).toBeUndefined();
});
});

View File

@@ -35,6 +35,7 @@ import {
parseComponentsParam,
readBooleanParam,
resolveAttachmentMediaPolicy,
resolveMatrixAutoThreadId,
resolveSlackAutoThreadId,
resolveTelegramAutoThreadId,
} from "./message-action-params.js";
@@ -78,7 +79,11 @@ function resolveAndApplyOutboundThreadId(
ctx.channel === "telegram" && !threadId
? resolveTelegramAutoThreadId({ to: ctx.to, toolContext: ctx.toolContext })
: undefined;
const resolved = threadId ?? slackAutoThreadId ?? telegramAutoThreadId;
const matrixAutoThreadId =
ctx.channel === "matrix-js" && !threadId
? resolveMatrixAutoThreadId({ to: ctx.to, toolContext: ctx.toolContext })
: undefined;
const resolved = threadId ?? slackAutoThreadId ?? telegramAutoThreadId ?? matrixAutoThreadId;
// Write auto-resolved threadId back into params so downstream dispatch
// (plugin `readStringParam(params, "threadId")`) picks it up.
if (resolved && !params.threadId) {