feat(matrix): thread-isolated sessions and per-chat-type threadReplies (#57995)

Merged via squash.

Prepared head SHA: 9ed96dd063
Co-authored-by: teconomix <6959299+teconomix@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
This commit is contained in:
Teconomix
2026-03-31 04:45:32 +02:00
committed by GitHub
parent d859746862
commit 697dddbeb6
20 changed files with 564 additions and 67 deletions

View File

@@ -18,4 +18,16 @@ describe("MatrixConfigSchema SecretInput", () => {
});
expect(result.success).toBe(true);
});
it("accepts dm threadReplies overrides", () => {
const result = MatrixConfigSchema.safeParse({
homeserver: "https://matrix.example.org",
accessToken: "token",
dm: {
policy: "pairing",
threadReplies: "off",
},
});
expect(result.success).toBe(true);
});
});

View File

@@ -1,6 +1,5 @@
import {
AllowFromListSchema,
buildNestedDmConfigSchema,
DmPolicySchema,
GroupPolicySchema,
MarkdownConfigSchema,
@@ -84,7 +83,14 @@ export const MatrixConfigSchema = z.object({
autoJoin: z.enum(["always", "allowlist", "off"]).optional(),
autoJoinAllowlist: AllowFromListSchema,
groupAllowFrom: AllowFromListSchema,
dm: buildNestedDmConfigSchema(),
dm: z
.object({
enabled: z.boolean().optional(),
policy: DmPolicySchema.optional(),
allowFrom: AllowFromListSchema,
threadReplies: z.enum(["off", "inbound", "always"]).optional(),
})
.optional(),
groups: z.object({}).catchall(matrixRoomSchema).optional(),
rooms: z.object({}).catchall(matrixRoomSchema).optional(),
actions: matrixActionSchema,

View File

@@ -51,9 +51,39 @@ describe("createMatrixRoomMessageHandler inbound body formatting", () => {
ThreadStarterBody: "Matrix thread root $thread-root from Alice:\nRoot topic",
}),
);
// Thread messages get thread-scoped session keys (thread isolation feature).
expect(recordInboundSession).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey: "agent:ops:main",
sessionKey: "agent:ops:main:thread:$thread-root",
}),
);
});
it("starts the thread-scoped session from the triggering message when threadReplies is always", async () => {
const { handler, finalizeInboundContext, recordInboundSession } =
createMatrixHandlerTestHarness({
isDirectMessage: false,
threadReplies: "always",
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$thread-root",
body: "@room start thread",
mentions: { room: true },
}),
);
expect(finalizeInboundContext).toHaveBeenCalledWith(
expect.objectContaining({
MessageThreadId: "$thread-root",
ReplyToId: undefined,
}),
);
expect(recordInboundSession).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey: "agent:ops:main:thread:$thread-root",
}),
);
});

View File

@@ -30,6 +30,7 @@ type MatrixHandlerTestHarnessOptions = {
groupPolicy?: "open" | "allowlist" | "disabled";
replyToMode?: ReplyToMode;
threadReplies?: "off" | "inbound" | "always";
dmThreadReplies?: "off" | "inbound" | "always";
streaming?: "partial" | "off";
dmEnabled?: boolean;
dmPolicy?: "pairing" | "allowlist" | "open" | "disabled";
@@ -211,6 +212,7 @@ export function createMatrixHandlerTestHarness(
groupPolicy: options.groupPolicy ?? "open",
replyToMode: options.replyToMode ?? "off",
threadReplies: options.threadReplies ?? "inbound",
dmThreadReplies: options.dmThreadReplies,
streaming: options.streaming ?? "off",
dmEnabled: options.dmEnabled ?? true,
dmPolicy: options.dmPolicy ?? "open",

View File

@@ -71,6 +71,7 @@ function createReactionHarness(params?: {
targetSender?: string;
isDirectMessage?: boolean;
senderName?: string;
client?: NonNullable<Parameters<typeof createMatrixHandlerTestHarness>[0]>["client"];
}) {
return createMatrixHandlerTestHarness({
cfg: params?.cfg,
@@ -79,6 +80,7 @@ function createReactionHarness(params?: {
readAllowFromStore: vi.fn(async () => params?.storeAllowFrom ?? []),
client: {
getEvent: async () => ({ sender: params?.targetSender ?? "@bot:example.org" }),
...params?.client,
},
isDirectMessage: params?.isDirectMessage,
getMemberDisplayName: async () => params?.senderName ?? "sender",
@@ -626,6 +628,53 @@ describe("matrix monitor handler pairing account scope", () => {
ThreadStarterBody: "Matrix thread root $root from Alice:\nRoot topic",
}),
);
expect(recordInboundSession).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey: "agent:ops:main:thread:$root",
}),
);
});
it("keeps threaded DMs flat when dm threadReplies is off", async () => {
const { handler, finalizeInboundContext, recordInboundSession } =
createMatrixHandlerTestHarness({
threadReplies: "always",
dmThreadReplies: "off",
isDirectMessage: true,
client: {
getEvent: async (_roomId, eventId) =>
eventId === "$root"
? createMatrixTextMessageEvent({
eventId: "$root",
sender: "@alice:example.org",
body: "Root topic",
})
: ({ sender: "@bot:example.org" } as never),
},
getMemberDisplayName: async (_roomId, userId) =>
userId === "@alice:example.org" ? "Alice" : "sender",
});
await handler(
"!dm:example.org",
createMatrixTextMessageEvent({
eventId: "$reply1",
body: "follow up",
relatesTo: {
rel_type: "m.thread",
event_id: "$root",
"m.in_reply_to": { event_id: "$root" },
},
}),
);
expect(finalizeInboundContext).toHaveBeenCalledWith(
expect.objectContaining({
MessageThreadId: undefined,
ReplyToId: "$root",
ThreadStarterBody: "Matrix thread root $root from Alice:\nRoot topic",
}),
);
expect(recordInboundSession).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey: "agent:ops:main",
@@ -1006,6 +1055,88 @@ describe("matrix monitor handler pairing account scope", () => {
);
});
it("keeps threaded DM reaction notifications on the flat session when dm threadReplies is off", async () => {
const { handler, enqueueSystemEvent } = createReactionHarness({
cfg: {
channels: {
matrix: {
threadReplies: "always",
dm: { threadReplies: "off" },
},
},
},
isDirectMessage: true,
client: {
getEvent: async () =>
createMatrixTextMessageEvent({
eventId: "$reply1",
sender: "@bot:example.org",
body: "follow up",
relatesTo: {
rel_type: "m.thread",
event_id: "$root",
"m.in_reply_to": { event_id: "$root" },
},
}),
},
});
await handler(
"!dm:example.org",
createMatrixReactionEvent({
eventId: "$reaction-thread",
targetEventId: "$reply1",
key: "🎯",
}),
);
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"Matrix reaction added: 🎯 by sender on msg $reply1",
{
sessionKey: "agent:ops:main",
contextKey: "matrix:reaction:add:!dm:example.org:$reply1:@user:example.org:🎯",
},
);
});
it("routes thread-root reaction notifications to the thread session when threadReplies is always", async () => {
const { handler, enqueueSystemEvent } = createReactionHarness({
cfg: {
channels: {
matrix: {
threadReplies: "always",
},
},
},
isDirectMessage: false,
client: {
getEvent: async () =>
createMatrixTextMessageEvent({
eventId: "$root",
sender: "@bot:example.org",
body: "start thread",
}),
},
});
await handler(
"!room:example.org",
createMatrixReactionEvent({
eventId: "$reaction-root",
targetEventId: "$root",
key: "🧵",
}),
);
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"Matrix reaction added: 🧵 by sender on msg $root",
{
sessionKey: "agent:ops:main:thread:$root",
contextKey: "matrix:reaction:add:!room:example.org:$root:@user:example.org:🧵",
},
);
});
it("ignores reactions that do not target bot-authored messages", async () => {
const { handler, enqueueSystemEvent, resolveAgentRoute } = createReactionHarness({
targetSender: "@other:example.org",

View File

@@ -53,7 +53,7 @@ import { createMatrixThreadContextResolver } from "./thread-context.js";
import {
resolveMatrixReplyToEventId,
resolveMatrixThreadRootId,
resolveMatrixThreadTarget,
resolveMatrixThreadRouting,
} from "./threads.js";
import type { MatrixRawEvent, RoomMessageEventContent } from "./types.js";
import { EventType, RelationType } from "./types.js";
@@ -80,6 +80,8 @@ export type MatrixMonitorHandlerParams = {
groupPolicy: "open" | "allowlist" | "disabled";
replyToMode: ReplyToMode;
threadReplies: "off" | "inbound" | "always";
/** DM-specific threadReplies override. Falls back to threadReplies when absent. */
dmThreadReplies?: "off" | "inbound" | "always";
streaming: "partial" | "off";
dmEnabled: boolean;
dmPolicy: "open" | "pairing" | "allowlist" | "disabled";
@@ -196,6 +198,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
groupPolicy,
replyToMode,
threadReplies,
dmThreadReplies,
streaming,
dmEnabled,
dmPolicy,
@@ -630,6 +633,13 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
const _messageId = event.event_id ?? "";
const _threadRootId = resolveMatrixThreadRootId({ event, content });
const thread = resolveMatrixThreadRouting({
isDirectMessage,
threadReplies,
dmThreadReplies,
messageId: _messageId,
threadRootId: _threadRootId,
});
const {
route: _route,
configuredBinding: _configuredBinding,
@@ -640,8 +650,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
roomId,
senderId,
isDirectMessage,
messageId: _messageId,
threadRootId: _threadRootId,
threadId: thread.threadId,
eventTs: eventTs ?? undefined,
resolveAgentRoute: core.channel.routing.resolveAgentRoute,
});
@@ -850,6 +859,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
messageId: _messageId,
triggerSnapshot,
threadRootId: _threadRootId,
thread,
};
};
const ingressResult =
@@ -899,17 +909,13 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
messageId: _messageId,
triggerSnapshot,
threadRootId: _threadRootId,
thread,
} = resolvedIngressResult;
// Keep the per-room ingress gate focused on ordering-sensitive state updates.
// Prompt/session enrichment below can run concurrently after the history snapshot is fixed.
const replyToEventId = resolveMatrixReplyToEventId(event.content as RoomMessageEventContent);
const threadTarget = resolveMatrixThreadTarget({
threadReplies,
messageId: _messageId,
threadRootId: _threadRootId,
isThreadRoot: false,
});
const threadTarget = thread.threadId;
const threadContext = _threadRootId
? await resolveThreadContext({ roomId, threadRootId: _threadRootId })
: undefined;

View File

@@ -186,6 +186,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
const groupPolicy = allowlistOnly && groupPolicyRaw === "open" ? "allowlist" : groupPolicyRaw;
const replyToMode = opts.replyToMode ?? accountConfig.replyToMode ?? "off";
const threadReplies = accountConfig.threadReplies ?? "inbound";
const dmThreadReplies = accountConfig.dm?.threadReplies;
const threadBindingIdleTimeoutMs = resolveThreadBindingIdleTimeoutMsForChannel({
cfg,
channel: "matrix",
@@ -244,6 +245,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
groupPolicy,
replyToMode,
threadReplies,
dmThreadReplies,
streaming,
dmEnabled,
dmPolicy,

View File

@@ -5,7 +5,7 @@ import { resolveMatrixAccountConfig } from "../accounts.js";
import { extractMatrixReactionAnnotation } from "../reaction-common.js";
import type { MatrixClient } from "../sdk.js";
import { resolveMatrixInboundRoute } from "./route.js";
import { resolveMatrixThreadRootId } from "./threads.js";
import { resolveMatrixThreadRootId, resolveMatrixThreadRouting } from "./threads.js";
import type { MatrixRawEvent, RoomMessageEventContent } from "./types.js";
export type MatrixReactionNotificationMode = "off" | "own";
@@ -73,14 +73,24 @@ export async function handleInboundMatrixReaction(params: {
content: targetContent,
})
: undefined;
const accountConfig = resolveMatrixAccountConfig({
cfg: params.cfg,
accountId: params.accountId,
});
const thread = resolveMatrixThreadRouting({
isDirectMessage: params.isDirectMessage,
threadReplies: accountConfig.threadReplies ?? "inbound",
dmThreadReplies: accountConfig.dm?.threadReplies,
messageId: reaction.eventId,
threadRootId,
});
const { route, runtimeBindingId } = resolveMatrixInboundRoute({
cfg: params.cfg,
accountId: params.accountId,
roomId: params.roomId,
senderId: params.senderId,
isDirectMessage: params.isDirectMessage,
messageId: reaction.eventId,
threadRootId,
threadId: thread.threadId,
eventTs: params.event.origin_server_ts,
resolveAgentRoute: params.core.channel.routing.resolveAgentRoute,
});

View File

@@ -24,7 +24,6 @@ function resolveDmRoute(cfg: OpenClawConfig) {
roomId: "!dm:example.org",
senderId: "@alice:example.org",
isDirectMessage: true,
messageId: "$msg1",
resolveAgentRoute,
});
}
@@ -187,3 +186,54 @@ describe("resolveMatrixInboundRoute", () => {
expect(touch).not.toHaveBeenCalled();
});
});
describe("resolveMatrixInboundRoute thread-isolated sessions", () => {
beforeEach(() => {
sessionBindingTesting.resetSessionBindingAdaptersForTests();
setActivePluginRegistry(
createTestRegistry([{ pluginId: "matrix", source: "test", plugin: matrixPlugin }]),
);
});
it("scopes session key to thread when a thread id is provided", () => {
const { route } = resolveMatrixInboundRoute({
cfg: baseCfg as never,
accountId: "ops",
roomId: "!room:example.org",
senderId: "@alice:example.org",
isDirectMessage: false,
threadId: "$thread-root",
resolveAgentRoute,
});
expect(route.sessionKey).toContain(":thread:$thread-root");
expect(route.mainSessionKey).not.toContain(":thread:");
});
it("preserves mixed-case matrix thread ids in session keys", () => {
const { route } = resolveMatrixInboundRoute({
cfg: baseCfg as never,
accountId: "ops",
roomId: "!room:example.org",
senderId: "@alice:example.org",
isDirectMessage: false,
threadId: "$AbC123:example.org",
resolveAgentRoute,
});
expect(route.sessionKey).toContain(":thread:$AbC123:example.org");
});
it("does not scope session key when thread id is absent", () => {
const { route } = resolveMatrixInboundRoute({
cfg: baseCfg as never,
accountId: "ops",
roomId: "!room:example.org",
senderId: "@alice:example.org",
isDirectMessage: false,
resolveAgentRoute,
});
expect(route.sessionKey).not.toContain(":thread:");
});
});

View File

@@ -5,6 +5,7 @@ import {
type PluginRuntime,
} from "../../runtime-api.js";
import type { CoreConfig } from "../../types.js";
import { resolveMatrixThreadSessionKeys } from "./threads.js";
type MatrixResolvedRoute = ReturnType<PluginRuntime["channel"]["routing"]["resolveAgentRoute"]>;
@@ -14,8 +15,7 @@ export function resolveMatrixInboundRoute(params: {
roomId: string;
senderId: string;
isDirectMessage: boolean;
messageId: string;
threadRootId?: string;
threadId?: string;
eventTs?: number;
resolveAgentRoute: PluginRuntime["channel"]["routing"]["resolveAgentRoute"];
}): {
@@ -40,12 +40,8 @@ export function resolveMatrixInboundRoute(params: {
}
: undefined,
});
const bindingConversationId =
params.threadRootId && params.threadRootId !== params.messageId
? params.threadRootId
: params.roomId;
const bindingParentConversationId =
bindingConversationId === params.roomId ? undefined : params.roomId;
const bindingConversationId = params.threadId ?? params.roomId;
const bindingParentConversationId = params.threadId ? params.roomId : undefined;
const sessionBindingService = getSessionBindingService();
const runtimeBinding = sessionBindingService.resolveByConversation({
channel: "matrix",
@@ -80,19 +76,39 @@ export function resolveMatrixInboundRoute(params: {
: null;
const configuredSessionKey = configuredBinding?.record.targetSessionKey?.trim();
const effectiveRoute =
configuredBinding && configuredSessionKey
? {
...baseRoute,
sessionKey: configuredSessionKey,
agentId:
resolveAgentIdFromSessionKey(configuredSessionKey) ||
configuredBinding.spec.agentId ||
baseRoute.agentId,
matchedBy: "binding.channel" as const,
}
: baseRoute;
// When no binding overrides the session key, isolate threads into their own sessions.
if (!configuredBinding && !configuredSessionKey && params.threadId) {
const threadKeys = resolveMatrixThreadSessionKeys({
baseSessionKey: effectiveRoute.sessionKey,
threadId: params.threadId,
parentSessionKey: effectiveRoute.sessionKey,
});
return {
route: {
...effectiveRoute,
sessionKey: threadKeys.sessionKey,
mainSessionKey: threadKeys.parentSessionKey ?? effectiveRoute.sessionKey,
},
configuredBinding,
runtimeBindingId: null,
};
}
return {
route:
configuredBinding && configuredSessionKey
? {
...baseRoute,
sessionKey: configuredSessionKey,
agentId:
resolveAgentIdFromSessionKey(configuredSessionKey) ||
configuredBinding.spec.agentId ||
baseRoute.agentId,
matchedBy: "binding.channel",
}
: baseRoute,
route: effectiveRoute,
configuredBinding,
runtimeBindingId: null,
};

View File

@@ -0,0 +1,68 @@
import { describe, expect, it } from "vitest";
import { resolveMatrixThreadRouting } from "./threads.js";
describe("resolveMatrixThreadRouting", () => {
it("keeps sessions flat when threadReplies is off", () => {
expect(
resolveMatrixThreadRouting({
isDirectMessage: false,
threadReplies: "off",
messageId: "$reply1",
threadRootId: "$root",
}),
).toEqual({
threadId: undefined,
});
});
it("uses the inbound thread root when replies arrive inside an existing thread", () => {
expect(
resolveMatrixThreadRouting({
isDirectMessage: false,
threadReplies: "inbound",
messageId: "$reply1",
threadRootId: "$root",
}),
).toEqual({
threadId: "$root",
});
});
it("keeps top-level inbound messages flat when threadReplies is inbound", () => {
expect(
resolveMatrixThreadRouting({
isDirectMessage: false,
threadReplies: "inbound",
messageId: "$root",
}),
).toEqual({
threadId: undefined,
});
});
it("uses the triggering message as the thread id when threadReplies is always", () => {
expect(
resolveMatrixThreadRouting({
isDirectMessage: false,
threadReplies: "always",
messageId: "$root",
}),
).toEqual({
threadId: "$root",
});
});
it("lets dm.threadReplies override room threading behavior", () => {
expect(
resolveMatrixThreadRouting({
isDirectMessage: true,
threadReplies: "always",
dmThreadReplies: "off",
messageId: "$reply1",
threadRootId: "$root",
}),
).toEqual({
threadId: undefined,
});
});
});

View File

@@ -1,6 +1,26 @@
import { resolveThreadSessionKeys } from "openclaw/plugin-sdk/routing";
import type { MatrixRawEvent, RoomMessageEventContent } from "./types.js";
import { RelationType } from "./types.js";
export type MatrixThreadReplies = "off" | "inbound" | "always";
export type MatrixThreadRouting = {
threadId?: string;
};
export function resolveMatrixThreadSessionKeys(params: {
baseSessionKey: string;
threadId?: string | null;
parentSessionKey?: string;
useSuffix?: boolean;
}): { sessionKey: string; parentSessionKey?: string } {
return resolveThreadSessionKeys({
...params,
// Matrix event IDs are opaque and case-sensitive; keep the exact thread root.
normalizeThreadId: (threadId) => threadId,
});
}
function resolveMatrixRelatedReplyToEventId(relates: unknown): string | undefined {
if (!relates || typeof relates !== "object") {
return undefined;
@@ -17,25 +37,33 @@ function resolveMatrixRelatedReplyToEventId(relates: unknown): string | undefine
return undefined;
}
export function resolveMatrixThreadTarget(params: {
threadReplies: "off" | "inbound" | "always";
export function resolveMatrixThreadRouting(params: {
isDirectMessage: boolean;
threadReplies: MatrixThreadReplies;
dmThreadReplies?: MatrixThreadReplies;
messageId: string;
threadRootId?: string;
isThreadRoot?: boolean;
}): string | undefined {
const { threadReplies, messageId, threadRootId } = params;
if (threadReplies === "off") {
return undefined;
}
}): MatrixThreadRouting {
const effectiveThreadReplies =
params.isDirectMessage && params.dmThreadReplies !== undefined
? params.dmThreadReplies
: params.threadReplies;
const messageId = params.messageId.trim();
const threadRootId = params.threadRootId?.trim();
const isThreadRoot = params.isThreadRoot === true;
const hasInboundThread = Boolean(threadRootId && threadRootId !== messageId && !isThreadRoot);
if (threadReplies === "inbound") {
return hasInboundThread ? threadRootId : undefined;
}
if (threadReplies === "always") {
return threadRootId ?? messageId;
}
return undefined;
const inboundThreadId =
threadRootId && threadRootId !== messageId && !isThreadRoot ? threadRootId : undefined;
const threadId =
effectiveThreadReplies === "off"
? undefined
: effectiveThreadReplies === "inbound"
? inboundThreadId
: (inboundThreadId ?? (messageId || undefined));
return {
threadId,
};
}
export function resolveMatrixThreadRootId(params: {

View File

@@ -10,6 +10,8 @@ export type MatrixDmConfig = {
policy?: DmPolicy;
/** Allowlist for DM senders (matrix user IDs or "*"). */
allowFrom?: Array<string | number>;
/** Per-DM thread reply behavior override (off|inbound|always). Overrides top-level threadReplies for direct messages. */
threadReplies?: "off" | "inbound" | "always";
};
export type MatrixRoomConfig = {