matrix-js: improve thread context and auto-threading

This commit is contained in:
Gustavo Madeira Santana
2026-03-08 16:36:14 -04:00
parent a670c21ab4
commit 3eb6c4c8ec
11 changed files with 610 additions and 2 deletions

View File

@@ -104,6 +104,56 @@ describe("matrix directory", () => {
).toBe("off");
});
it("only exposes real Matrix thread ids in tool context", () => {
expect(
matrixPlugin.threading?.buildToolContext?.({
context: {
To: "room:!room:example.org",
ReplyToId: "$reply",
},
hasRepliedRef: { value: false },
}),
).toEqual({
currentChannelId: "room:!room:example.org",
currentThreadTs: undefined,
hasRepliedRef: { value: false },
});
expect(
matrixPlugin.threading?.buildToolContext?.({
context: {
To: "room:!room:example.org",
ReplyToId: "$reply",
MessageThreadId: "$thread",
},
hasRepliedRef: { value: true },
}),
).toEqual({
currentChannelId: "room:!room:example.org",
currentThreadTs: "$thread",
hasRepliedRef: { value: true },
});
});
it("exposes Matrix direct user id in dm tool context", () => {
expect(
matrixPlugin.threading?.buildToolContext?.({
context: {
From: "matrix:@alice:example.org",
To: "room:!dm:example.org",
ChatType: "direct",
MessageThreadId: "$thread",
},
hasRepliedRef: { value: false },
}),
).toEqual({
currentChannelId: "room:!dm:example.org",
currentThreadTs: "$thread",
currentDirectUserId: "@alice:example.org",
hasRepliedRef: { value: false },
});
});
it("resolves group mention policy from account config", () => {
const cfg = {
channels: {

View File

@@ -71,6 +71,26 @@ function normalizeMatrixMessagingTarget(raw: string): string | undefined {
return stripped || undefined;
}
function resolveMatrixDirectUserId(params: {
from?: string;
to?: string;
chatType?: string;
}): string | undefined {
if (params.chatType !== "direct") {
return undefined;
}
const from = params.from?.trim();
const to = params.to?.trim();
if (!from || !to || !/^room:/i.test(to)) {
return undefined;
}
const normalized = from
.replace(/^matrix:/i, "")
.replace(/^user:/i, "")
.trim();
return normalized.startsWith("@") ? normalized : undefined;
}
function resolveAvatarInput(input: ChannelSetupInput): string | undefined {
const avatarUrl = (input as ChannelSetupInput & { avatarUrl?: string }).avatarUrl;
const trimmed = avatarUrl?.trim();
@@ -181,7 +201,12 @@ export const matrixPlugin: ChannelPlugin<ResolvedMatrixAccount> = {
return {
currentChannelId: currentTarget?.trim() || undefined,
currentThreadTs:
context.MessageThreadId != null ? String(context.MessageThreadId) : context.ReplyToId,
context.MessageThreadId != null ? String(context.MessageThreadId) : undefined,
currentDirectUserId: resolveMatrixDirectUserId({
from: context.From,
to: context.To,
chatType: context.ChatType,
}),
hasRepliedRef,
};
},

View File

@@ -390,6 +390,131 @@ describe("matrix monitor handler pairing account scope", () => {
);
});
it("records thread starter context for inbound thread replies", async () => {
const recordInboundSession = vi.fn(async () => {});
const finalizeInboundContext = vi.fn((ctx) => ctx);
const handler = createMatrixRoomMessageHandler({
client: {
getUserId: async () => "@bot:example.org",
getEvent: async () => ({
event_id: "$root",
sender: "@alice:example.org",
type: EventType.RoomMessage,
origin_server_ts: Date.now(),
content: {
msgtype: "m.text",
body: "Root topic",
},
}),
} as never,
core: {
channel: {
pairing: {
readAllowFromStore: async () => [] as string[],
upsertPairingRequest: async () => ({ code: "ABCDEFGH", created: false }),
},
commands: {
shouldHandleTextCommands: () => false,
},
text: {
hasControlCommand: () => false,
resolveMarkdownTableMode: () => "preserve",
},
routing: {
resolveAgentRoute: () => ({
agentId: "ops",
channel: "matrix-js",
accountId: "ops",
sessionKey: "agent:ops:main",
mainSessionKey: "agent:ops:main",
matchedBy: "binding.account",
}),
},
session: {
resolveStorePath: () => "/tmp/session-store",
readSessionUpdatedAt: () => undefined,
recordInboundSession,
},
reply: {
resolveEnvelopeFormatOptions: () => ({}),
formatAgentEnvelope: ({ body }: { body: string }) => body,
finalizeInboundContext,
createReplyDispatcherWithTyping: () => ({
dispatcher: {},
replyOptions: {},
markDispatchIdle: () => {},
}),
resolveHumanDelayConfig: () => undefined,
dispatchReplyFromConfig: async () => ({
queuedFinal: false,
counts: { final: 0, block: 0, tool: 0 },
}),
},
reactions: {
shouldAckReaction: () => false,
},
},
} as never,
cfg: {} as never,
accountId: "ops",
runtime: {
error: () => {},
} as never,
logger: {
info: () => {},
warn: () => {},
} as never,
logVerboseMessage: () => {},
allowFrom: [],
mentionRegexes: [],
groupPolicy: "open",
replyToMode: "off",
threadReplies: "inbound",
dmEnabled: true,
dmPolicy: "open",
textLimit: 8_000,
mediaMaxBytes: 10_000_000,
startupMs: 0,
startupGraceMs: 0,
directTracker: {
isDirectMessage: async () => false,
},
getRoomInfo: async () => ({ altAliases: [] }),
getMemberDisplayName: async (_roomId, userId) =>
userId === "@alice:example.org" ? "Alice" : "sender",
});
await handler("!room:example.org", {
type: EventType.RoomMessage,
sender: "@user:example.org",
event_id: "$reply1",
origin_server_ts: Date.now(),
content: {
msgtype: "m.text",
body: "follow up",
"m.relates_to": {
rel_type: "m.thread",
event_id: "$root",
"m.in_reply_to": { event_id: "$root" },
},
"m.mentions": { room: true },
},
} as MatrixRawEvent);
expect(finalizeInboundContext).toHaveBeenCalledWith(
expect.objectContaining({
MessageThreadId: "$root",
ThreadStarterBody: "Matrix thread root $root from Alice:\nRoot topic",
}),
);
expect(recordInboundSession).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey: "agent:ops:main",
}),
);
});
it("enqueues system events for reactions on bot-authored messages", async () => {
const { handler, enqueueSystemEvent, resolveAgentRoute } = createReactionHarness();

View File

@@ -36,6 +36,7 @@ import { resolveMentions } from "./mentions.js";
import { handleInboundMatrixReaction } from "./reaction-events.js";
import { deliverMatrixReplies } from "./replies.js";
import { resolveMatrixRoomConfig } from "./rooms.js";
import { createMatrixThreadContextResolver } from "./thread-context.js";
import { resolveMatrixThreadRootId, resolveMatrixThreadTarget } from "./threads.js";
import type { MatrixRawEvent, RoomMessageEventContent } from "./types.js";
import { EventType, RelationType } from "./types.js";
@@ -108,6 +109,11 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
expiresAtMs: number;
} | null = null;
const pairingReplySentAtMsBySender = new Map<string, number>();
const resolveThreadContext = createMatrixThreadContextResolver({
client,
getMemberDisplayName,
logVerboseMessage,
});
const readStoreAllowFrom = async (): Promise<string[]> => {
const now = Date.now();
@@ -523,6 +529,9 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
threadRootId,
isThreadRoot: false, // Raw event payload does not carry explicit thread-root metadata.
});
const threadContext = threadRootId
? await resolveThreadContext({ roomId, threadRootId })
: undefined;
const route = core.channel.routing.resolveAgentRoute({
cfg,
@@ -575,6 +584,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
MessageSid: messageId,
ReplyToId: threadTarget ? undefined : (replyToEventId ?? undefined),
MessageThreadId: threadTarget,
ThreadStarterBody: threadContext?.threadStarterBody,
Timestamp: eventTs ?? undefined,
MediaPath: media?.path,
MediaType: media?.contentType,

View File

@@ -0,0 +1,106 @@
import { describe, expect, it, vi } from "vitest";
import {
createMatrixThreadContextResolver,
summarizeMatrixThreadStarterEvent,
} from "./thread-context.js";
import type { MatrixRawEvent } from "./types.js";
describe("matrix thread context", () => {
it("summarizes thread starter events from body text", () => {
expect(
summarizeMatrixThreadStarterEvent({
event_id: "$root",
sender: "@alice:example.org",
type: "m.room.message",
origin_server_ts: Date.now(),
content: {
msgtype: "m.text",
body: " Thread starter body ",
},
} as MatrixRawEvent),
).toBe("Thread starter body");
});
it("resolves and caches thread starter context", async () => {
const getEvent = vi.fn(async () => ({
event_id: "$root",
sender: "@alice:example.org",
type: "m.room.message",
origin_server_ts: Date.now(),
content: {
msgtype: "m.text",
body: "Root topic",
},
}));
const getMemberDisplayName = vi.fn(async () => "Alice");
const resolveThreadContext = createMatrixThreadContextResolver({
client: {
getEvent,
} as never,
getMemberDisplayName,
logVerboseMessage: () => {},
});
await expect(
resolveThreadContext({
roomId: "!room:example.org",
threadRootId: "$root",
}),
).resolves.toEqual({
threadStarterBody: "Matrix thread root $root from Alice:\nRoot topic",
});
await resolveThreadContext({
roomId: "!room:example.org",
threadRootId: "$root",
});
expect(getEvent).toHaveBeenCalledTimes(1);
expect(getMemberDisplayName).toHaveBeenCalledTimes(1);
});
it("does not cache thread starter fetch failures", async () => {
const getEvent = vi
.fn()
.mockRejectedValueOnce(new Error("temporary failure"))
.mockResolvedValueOnce({
event_id: "$root",
sender: "@alice:example.org",
type: "m.room.message",
origin_server_ts: Date.now(),
content: {
msgtype: "m.text",
body: "Recovered topic",
},
});
const getMemberDisplayName = vi.fn(async () => "Alice");
const resolveThreadContext = createMatrixThreadContextResolver({
client: {
getEvent,
} as never,
getMemberDisplayName,
logVerboseMessage: () => {},
});
await expect(
resolveThreadContext({
roomId: "!room:example.org",
threadRootId: "$root",
}),
).resolves.toEqual({
threadStarterBody: "Matrix thread root $root",
});
await expect(
resolveThreadContext({
roomId: "!room:example.org",
threadRootId: "$root",
}),
).resolves.toEqual({
threadStarterBody: "Matrix thread root $root from Alice:\nRecovered topic",
});
expect(getEvent).toHaveBeenCalledTimes(2);
expect(getMemberDisplayName).toHaveBeenCalledTimes(1);
});
});

View File

@@ -0,0 +1,107 @@
import type { MatrixClient } from "../sdk.js";
import type { MatrixRawEvent } from "./types.js";
const MAX_TRACKED_THREAD_STARTERS = 256;
const MAX_THREAD_STARTER_BODY_LENGTH = 500;
type MatrixThreadContext = {
threadStarterBody?: string;
};
function trimMaybeString(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed || undefined;
}
function truncateThreadStarterBody(value: string): string {
if (value.length <= MAX_THREAD_STARTER_BODY_LENGTH) {
return value;
}
return `${value.slice(0, MAX_THREAD_STARTER_BODY_LENGTH - 3)}...`;
}
export function summarizeMatrixThreadStarterEvent(event: MatrixRawEvent): string | undefined {
const content = event.content as { body?: unknown; msgtype?: unknown };
const body = trimMaybeString(content.body);
if (body) {
return truncateThreadStarterBody(body);
}
const msgtype = trimMaybeString(content.msgtype);
if (msgtype) {
return `Matrix ${msgtype} message`;
}
const eventType = trimMaybeString(event.type);
return eventType ? `Matrix ${eventType} event` : undefined;
}
function formatMatrixThreadStarterBody(params: {
threadRootId: string;
senderName?: string;
senderId?: string;
summary?: string;
}): string {
const senderLabel = params.senderName ?? params.senderId ?? "unknown sender";
const lines = [`Matrix thread root ${params.threadRootId} from ${senderLabel}:`];
if (params.summary) {
lines.push(params.summary);
}
return lines.join("\n");
}
export function createMatrixThreadContextResolver(params: {
client: MatrixClient;
getMemberDisplayName: (roomId: string, userId: string) => Promise<string>;
logVerboseMessage: (message: string) => void;
}) {
const cache = new Map<string, MatrixThreadContext>();
const remember = (key: string, value: MatrixThreadContext): MatrixThreadContext => {
cache.set(key, value);
if (cache.size > MAX_TRACKED_THREAD_STARTERS) {
const oldest = cache.keys().next().value;
if (typeof oldest === "string") {
cache.delete(oldest);
}
}
return value;
};
return async (input: { roomId: string; threadRootId: string }): Promise<MatrixThreadContext> => {
const cacheKey = `${input.roomId}:${input.threadRootId}`;
const cached = cache.get(cacheKey);
if (cached) {
return cached;
}
const rootEvent = await params.client
.getEvent(input.roomId, input.threadRootId)
.catch((err) => {
params.logVerboseMessage(
`matrix: failed resolving thread root room=${input.roomId} id=${input.threadRootId}: ${String(err)}`,
);
return null;
});
if (!rootEvent) {
return {
threadStarterBody: `Matrix thread root ${input.threadRootId}`,
};
}
const rawEvent = rootEvent as MatrixRawEvent;
const senderId = trimMaybeString(rawEvent.sender);
const senderName =
senderId &&
(await params.getMemberDisplayName(input.roomId, senderId).catch(() => undefined));
return remember(cacheKey, {
threadStarterBody: formatMatrixThreadStarterBody({
threadRootId: input.threadRootId,
senderId,
senderName,
summary: summarizeMatrixThreadStarterEvent(rawEvent),
}),
});
};
}