fix(matrix): centralize thread routing

This commit is contained in:
Gustavo Madeira Santana
2026-03-30 21:31:53 -04:00
parent 257b160ebe
commit 6ad4c7f347
12 changed files with 202 additions and 142 deletions

View File

@@ -26,7 +26,7 @@ Docs: https://docs.openclaw.ai
- Flows/tasks: route one-task ACP and subagent updates through a parent flow owner context, so detached work can emerge back through the intended parent thread/session instead of speaking only as a raw child task.
- Matrix/history: add optional room history context for Matrix group triggers via `channels.matrix.historyLimit`, with per-agent watermarks and retry-safe snapshots so failed trigger retries do not drift into newer room messages. (#57022) thanks @chain710.
- Diffs: skip unused viewer-versus-file SSR preload work so `diffs` view-only and file-only runs do less render work while keeping mode outputs aligned. (#57909) thanks @gumadeiras.
- Matrix/threads: add per-DM `threadReplies` overrides and keep thread session isolation aligned with the effective room or DM thread policy. (#57995) thanks @teconomix.
- Matrix/threads: add per-DM `threadReplies` overrides and keep thread session isolation aligned with the effective room or DM thread policy from the triggering message onward. (#57995) thanks @teconomix.
### Fixes

View File

@@ -504,7 +504,7 @@ Matrix supports native Matrix threads for both automatic replies and message-too
- `threadReplies: "off"` keeps replies top-level and keeps inbound threaded messages on the parent session.
- `threadReplies: "inbound"` replies inside a thread only when the inbound message was already in that thread.
- `threadReplies: "always"` keeps room replies in a thread rooted at the triggering message.
- `threadReplies: "always"` keeps room replies in a thread rooted at the triggering message and routes that conversation through the matching thread-scoped session from the first triggering message.
- `dm.threadReplies` overrides the top-level setting for DMs only. For example, you can keep room threads isolated while keeping DMs flat.
- Inbound threaded messages include the thread root message as extra agent context.
- Message-tool sends now auto-inherit the current Matrix thread when the target is the same room, or the same DM user target, unless an explicit `threadId` is provided.

View File

@@ -18,4 +18,16 @@ describe("MatrixConfigSchema SecretInput", () => {
});
expect(result.success).toBe(true);
});
it("accepts dm threadReplies overrides", () => {
const result = MatrixConfigSchema.safeParse({
homeserver: "https://matrix.example.org",
accessToken: "token",
dm: {
policy: "pairing",
threadReplies: "off",
},
});
expect(result.success).toBe(true);
});
});

View File

@@ -1,6 +1,5 @@
import {
AllowFromListSchema,
buildNestedDmConfigSchema,
DmPolicySchema,
GroupPolicySchema,
MarkdownConfigSchema,

View File

@@ -59,6 +59,35 @@ describe("createMatrixRoomMessageHandler inbound body formatting", () => {
);
});
it("starts the thread-scoped session from the triggering message when threadReplies is always", async () => {
const { handler, finalizeInboundContext, recordInboundSession } =
createMatrixHandlerTestHarness({
isDirectMessage: false,
threadReplies: "always",
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$thread-root",
body: "@room start thread",
mentions: { room: true },
}),
);
expect(finalizeInboundContext).toHaveBeenCalledWith(
expect.objectContaining({
MessageThreadId: "$thread-root",
ReplyToId: undefined,
}),
);
expect(recordInboundSession).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey: "agent:ops:main:thread:$thread-root",
}),
);
});
it("records formatted poll results for inbound poll response events", async () => {
const { handler, finalizeInboundContext, recordInboundSession } =
createMatrixHandlerTestHarness({

View File

@@ -1099,6 +1099,44 @@ describe("matrix monitor handler pairing account scope", () => {
);
});
it("routes thread-root reaction notifications to the thread session when threadReplies is always", async () => {
const { handler, enqueueSystemEvent } = createReactionHarness({
cfg: {
channels: {
matrix: {
threadReplies: "always",
},
},
},
isDirectMessage: false,
client: {
getEvent: async () =>
createMatrixTextMessageEvent({
eventId: "$root",
sender: "@bot:example.org",
body: "start thread",
}),
},
});
await handler(
"!room:example.org",
createMatrixReactionEvent({
eventId: "$reaction-root",
targetEventId: "$root",
key: "🧵",
}),
);
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"Matrix reaction added: 🧵 by sender on msg $root",
{
sessionKey: "agent:ops:main:thread:$root",
contextKey: "matrix:reaction:add:!room:example.org:$root:@user:example.org:🧵",
},
);
});
it("ignores reactions that do not target bot-authored messages", async () => {
const { handler, enqueueSystemEvent, resolveAgentRoute } = createReactionHarness({
targetSender: "@other:example.org",

View File

@@ -51,11 +51,9 @@ import { resolveMatrixRoomConfig } from "./rooms.js";
import { resolveMatrixInboundRoute } from "./route.js";
import { createMatrixThreadContextResolver } from "./thread-context.js";
import {
resolveMatrixEffectiveThreadReplies,
resolveMatrixReplyToEventId,
resolveMatrixThreadSessionId,
resolveMatrixThreadRootId,
resolveMatrixThreadTarget,
resolveMatrixThreadRouting,
} from "./threads.js";
import type { MatrixRawEvent, RoomMessageEventContent } from "./types.js";
import { EventType, RelationType } from "./types.js";
@@ -635,10 +633,12 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
const _messageId = event.event_id ?? "";
const _threadRootId = resolveMatrixThreadRootId({ event, content });
const effectiveThreadReplies = resolveMatrixEffectiveThreadReplies({
const thread = resolveMatrixThreadRouting({
isDirectMessage,
threadReplies,
dmThreadReplies,
messageId: _messageId,
threadRootId: _threadRootId,
});
const {
route: _route,
@@ -650,9 +650,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
roomId,
senderId,
isDirectMessage,
messageId: _messageId,
threadRootId: _threadRootId,
effectiveThreadReplies,
threadId: thread.threadId,
eventTs: eventTs ?? undefined,
resolveAgentRoute: core.channel.routing.resolveAgentRoute,
});
@@ -861,7 +859,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
messageId: _messageId,
triggerSnapshot,
threadRootId: _threadRootId,
effectiveThreadReplies,
thread,
};
};
const ingressResult =
@@ -911,23 +909,13 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
messageId: _messageId,
triggerSnapshot,
threadRootId: _threadRootId,
effectiveThreadReplies,
thread,
} = resolvedIngressResult;
// Keep the per-room ingress gate focused on ordering-sensitive state updates.
// Prompt/session enrichment below can run concurrently after the history snapshot is fixed.
const replyToEventId = resolveMatrixReplyToEventId(event.content as RoomMessageEventContent);
const threadTarget = resolveMatrixThreadTarget({
threadReplies: effectiveThreadReplies,
messageId: _messageId,
threadRootId: _threadRootId,
isThreadRoot: false,
});
const threadSessionId = resolveMatrixThreadSessionId({
effectiveThreadReplies,
messageId: _messageId,
threadRootId: _threadRootId,
});
const threadTarget = thread.threadId;
const threadContext = _threadRootId
? await resolveThreadContext({ roomId, threadRootId: _threadRootId })
: undefined;
@@ -982,7 +970,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
Surface: "matrix" as const,
WasMentioned: isRoom ? wasMentioned : undefined,
MessageSid: _messageId,
ReplyToId: threadTarget || threadSessionId ? undefined : (replyToEventId ?? undefined),
ReplyToId: threadTarget ? undefined : (replyToEventId ?? undefined),
ReplyToBody: replyContext?.replyToBody,
ReplyToSender: replyContext?.replyToSender,
MessageThreadId: threadTarget,

View File

@@ -5,7 +5,7 @@ import { resolveMatrixAccountConfig } from "../accounts.js";
import { extractMatrixReactionAnnotation } from "../reaction-common.js";
import type { MatrixClient } from "../sdk.js";
import { resolveMatrixInboundRoute } from "./route.js";
import { resolveMatrixEffectiveThreadReplies, resolveMatrixThreadRootId } from "./threads.js";
import { resolveMatrixThreadRootId, resolveMatrixThreadRouting } from "./threads.js";
import type { MatrixRawEvent, RoomMessageEventContent } from "./types.js";
export type MatrixReactionNotificationMode = "off" | "own";
@@ -77,10 +77,12 @@ export async function handleInboundMatrixReaction(params: {
cfg: params.cfg,
accountId: params.accountId,
});
const effectiveThreadReplies = resolveMatrixEffectiveThreadReplies({
const thread = resolveMatrixThreadRouting({
isDirectMessage: params.isDirectMessage,
threadReplies: accountConfig.threadReplies ?? "inbound",
dmThreadReplies: accountConfig.dm?.threadReplies,
messageId: reaction.eventId,
threadRootId,
});
const { route, runtimeBindingId } = resolveMatrixInboundRoute({
cfg: params.cfg,
@@ -88,9 +90,7 @@ export async function handleInboundMatrixReaction(params: {
roomId: params.roomId,
senderId: params.senderId,
isDirectMessage: params.isDirectMessage,
messageId: reaction.eventId,
threadRootId,
effectiveThreadReplies,
threadId: thread.threadId,
eventTs: params.event.origin_server_ts,
resolveAgentRoute: params.core.channel.routing.resolveAgentRoute,
});

View File

@@ -24,8 +24,6 @@ function resolveDmRoute(cfg: OpenClawConfig) {
roomId: "!dm:example.org",
senderId: "@alice:example.org",
isDirectMessage: true,
messageId: "$msg1",
effectiveThreadReplies: "inbound",
resolveAgentRoute,
});
}
@@ -197,16 +195,14 @@ describe("resolveMatrixInboundRoute thread-isolated sessions", () => {
);
});
it("scopes session key to thread when threadRootId is present and differs from messageId", () => {
it("scopes session key to thread when a thread id is provided", () => {
const { route } = resolveMatrixInboundRoute({
cfg: baseCfg as never,
accountId: "ops",
roomId: "!room:example.org",
senderId: "@alice:example.org",
isDirectMessage: false,
messageId: "$reply1",
threadRootId: "$thread-root",
effectiveThreadReplies: "inbound",
threadId: "$thread-root",
resolveAgentRoute,
});
@@ -214,68 +210,16 @@ describe("resolveMatrixInboundRoute thread-isolated sessions", () => {
expect(route.mainSessionKey).not.toContain(":thread:");
});
it("does not scope session key when threadRootId equals messageId", () => {
it("does not scope session key when thread id is absent", () => {
const { route } = resolveMatrixInboundRoute({
cfg: baseCfg as never,
accountId: "ops",
roomId: "!room:example.org",
senderId: "@alice:example.org",
isDirectMessage: false,
messageId: "$thread-root",
threadRootId: "$thread-root",
effectiveThreadReplies: "inbound",
resolveAgentRoute,
});
expect(route.sessionKey).not.toContain(":thread:");
});
it("does not scope session key when threadRootId is absent", () => {
const { route } = resolveMatrixInboundRoute({
cfg: baseCfg as never,
accountId: "ops",
roomId: "!room:example.org",
senderId: "@alice:example.org",
isDirectMessage: false,
messageId: "$msg1",
effectiveThreadReplies: "inbound",
resolveAgentRoute,
});
expect(route.sessionKey).not.toContain(":thread:");
});
it("keeps room sessions flat when threadReplies is off", () => {
const { route } = resolveMatrixInboundRoute({
cfg: baseCfg as never,
accountId: "ops",
roomId: "!room:example.org",
senderId: "@alice:example.org",
isDirectMessage: false,
messageId: "$reply1",
threadRootId: "$thread-root",
effectiveThreadReplies: "off",
resolveAgentRoute,
});
expect(route.sessionKey).not.toContain(":thread:");
expect(route.mainSessionKey).not.toContain(":thread:");
});
it("keeps DM sessions flat when dm.threadReplies overrides threading off", () => {
const { route } = resolveMatrixInboundRoute({
cfg: baseCfg as never,
accountId: "ops",
roomId: "!dm:example.org",
senderId: "@alice:example.org",
isDirectMessage: true,
messageId: "$reply1",
threadRootId: "$thread-root",
effectiveThreadReplies: "off",
resolveAgentRoute,
});
expect(route.sessionKey).not.toContain(":thread:");
expect(route.mainSessionKey).not.toContain(":thread:");
});
});

View File

@@ -6,7 +6,6 @@ import {
type PluginRuntime,
} from "../../runtime-api.js";
import type { CoreConfig } from "../../types.js";
import { resolveMatrixThreadSessionId } from "./threads.js";
type MatrixResolvedRoute = ReturnType<PluginRuntime["channel"]["routing"]["resolveAgentRoute"]>;
@@ -16,9 +15,7 @@ export function resolveMatrixInboundRoute(params: {
roomId: string;
senderId: string;
isDirectMessage: boolean;
messageId: string;
threadRootId?: string;
effectiveThreadReplies: "off" | "inbound" | "always";
threadId?: string;
eventTs?: number;
resolveAgentRoute: PluginRuntime["channel"]["routing"]["resolveAgentRoute"];
}): {
@@ -43,12 +40,8 @@ export function resolveMatrixInboundRoute(params: {
}
: undefined,
});
const bindingConversationId =
params.threadRootId && params.threadRootId !== params.messageId
? params.threadRootId
: params.roomId;
const bindingParentConversationId =
bindingConversationId === params.roomId ? undefined : params.roomId;
const bindingConversationId = params.threadId ?? params.roomId;
const bindingParentConversationId = params.threadId ? params.roomId : undefined;
const sessionBindingService = getSessionBindingService();
const runtimeBinding = sessionBindingService.resolveByConversation({
channel: "matrix",
@@ -97,15 +90,10 @@ export function resolveMatrixInboundRoute(params: {
: baseRoute;
// When no binding overrides the session key, isolate threads into their own sessions.
const threadId = resolveMatrixThreadSessionId({
effectiveThreadReplies: params.effectiveThreadReplies,
messageId: params.messageId,
threadRootId: params.threadRootId,
});
if (!configuredBinding && !configuredSessionKey && threadId) {
if (!configuredBinding && !configuredSessionKey && params.threadId) {
const threadKeys = resolveThreadSessionKeys({
baseSessionKey: effectiveRoute.sessionKey,
threadId,
threadId: params.threadId,
parentSessionKey: effectiveRoute.sessionKey,
});
return {

View File

@@ -0,0 +1,73 @@
import { describe, expect, it } from "vitest";
import { resolveMatrixThreadRouting } from "./threads.js";
describe("resolveMatrixThreadRouting", () => {
it("keeps sessions flat when threadReplies is off", () => {
expect(
resolveMatrixThreadRouting({
isDirectMessage: false,
threadReplies: "off",
messageId: "$reply1",
threadRootId: "$root",
}),
).toEqual({
effectiveThreadReplies: "off",
threadId: undefined,
});
});
it("uses the inbound thread root when replies arrive inside an existing thread", () => {
expect(
resolveMatrixThreadRouting({
isDirectMessage: false,
threadReplies: "inbound",
messageId: "$reply1",
threadRootId: "$root",
}),
).toEqual({
effectiveThreadReplies: "inbound",
threadId: "$root",
});
});
it("keeps top-level inbound messages flat when threadReplies is inbound", () => {
expect(
resolveMatrixThreadRouting({
isDirectMessage: false,
threadReplies: "inbound",
messageId: "$root",
}),
).toEqual({
effectiveThreadReplies: "inbound",
threadId: undefined,
});
});
it("uses the triggering message as the thread id when threadReplies is always", () => {
expect(
resolveMatrixThreadRouting({
isDirectMessage: false,
threadReplies: "always",
messageId: "$root",
}),
).toEqual({
effectiveThreadReplies: "always",
threadId: "$root",
});
});
it("lets dm.threadReplies override room threading behavior", () => {
expect(
resolveMatrixThreadRouting({
isDirectMessage: true,
threadReplies: "always",
dmThreadReplies: "off",
messageId: "$reply1",
threadRootId: "$root",
}),
).toEqual({
effectiveThreadReplies: "off",
threadId: undefined,
});
});
});

View File

@@ -1,7 +1,12 @@
import type { MatrixRawEvent, RoomMessageEventContent } from "./types.js";
import { RelationType } from "./types.js";
type MatrixThreadReplies = "off" | "inbound" | "always";
export type MatrixThreadReplies = "off" | "inbound" | "always";
export type MatrixThreadRouting = {
effectiveThreadReplies: MatrixThreadReplies;
threadId?: string;
};
function resolveMatrixRelatedReplyToEventId(relates: unknown): string | undefined {
if (!relates || typeof relates !== "object") {
@@ -19,50 +24,34 @@ function resolveMatrixRelatedReplyToEventId(relates: unknown): string | undefine
return undefined;
}
export function resolveMatrixEffectiveThreadReplies(params: {
export function resolveMatrixThreadRouting(params: {
isDirectMessage: boolean;
threadReplies: MatrixThreadReplies;
dmThreadReplies?: MatrixThreadReplies;
}): MatrixThreadReplies {
return params.isDirectMessage && params.dmThreadReplies !== undefined
? params.dmThreadReplies
: params.threadReplies;
}
export function resolveMatrixThreadSessionId(params: {
effectiveThreadReplies: MatrixThreadReplies;
messageId: string;
threadRootId?: string;
isThreadRoot?: boolean;
}): string | undefined {
if (params.effectiveThreadReplies === "off") {
return undefined;
}
}): MatrixThreadRouting {
const effectiveThreadReplies =
params.isDirectMessage && params.dmThreadReplies !== undefined
? params.dmThreadReplies
: params.threadReplies;
const messageId = params.messageId.trim();
const threadRootId = params.threadRootId?.trim();
const isThreadRoot = params.isThreadRoot === true;
return params.threadRootId && params.threadRootId !== params.messageId && !isThreadRoot
? params.threadRootId
: undefined;
}
const inboundThreadId =
threadRootId && threadRootId !== messageId && !isThreadRoot ? threadRootId : undefined;
const threadId =
effectiveThreadReplies === "off"
? undefined
: effectiveThreadReplies === "inbound"
? inboundThreadId
: (inboundThreadId ?? (messageId || undefined));
export function resolveMatrixThreadTarget(params: {
threadReplies: MatrixThreadReplies;
messageId: string;
threadRootId?: string;
isThreadRoot?: boolean;
}): string | undefined {
const { threadReplies, messageId, threadRootId } = params;
if (threadReplies === "off") {
return undefined;
}
const isThreadRoot = params.isThreadRoot === true;
const hasInboundThread = Boolean(threadRootId && threadRootId !== messageId && !isThreadRoot);
if (threadReplies === "inbound") {
return hasInboundThread ? threadRootId : undefined;
}
if (threadReplies === "always") {
return threadRootId ?? messageId;
}
return undefined;
return {
effectiveThreadReplies,
threadId,
};
}
export function resolveMatrixThreadRootId(params: {