mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-14 10:41:23 +00:00
fix(matrix): align DM room session routing
This commit is contained in:
@@ -200,6 +200,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Agents/video generation: accept `agents.defaults.videoGenerationModel` in strict config validation and `openclaw config set/get`, so gateways using `video_generate` no longer fail to boot after enabling a video model.
|
||||
- Discord/image generation: persist volatile workspace-generated media into durable outbound media before final reply delivery so generated image replies stop failing with missing local workspace paths.
|
||||
- Matrix: move legacy top-level `avatarUrl` into the default account during multi-account promotion and keep env-backed account setup avatar config persisted. (#61437) Thanks @gumadeiras.
|
||||
- Matrix/DM sessions: add `channels.matrix.dm.sessionScope`, shared-session collision notices, and aligned outbound session reuse so separate Matrix DM rooms can keep distinct context when configured. (#61373) Thanks @gumadeiras.
|
||||
|
||||
## 2026.4.2
|
||||
|
||||
|
||||
@@ -152,6 +152,7 @@ This is a practical baseline config with DM pairing, room allowlist, and E2EE en
|
||||
|
||||
dm: {
|
||||
policy: "pairing",
|
||||
sessionScope: "per-room",
|
||||
threadReplies: "off",
|
||||
},
|
||||
|
||||
@@ -522,12 +523,15 @@ The repair flow does not delete old rooms automatically. It only picks the healt
|
||||
|
||||
Matrix supports native Matrix threads for both automatic replies and message-tool sends.
|
||||
|
||||
- `dm.sessionScope: "per-user"` (default) keeps Matrix DM routing sender-scoped, so multiple DM rooms can share one session when they resolve to the same peer.
|
||||
- `dm.sessionScope: "per-room"` isolates each Matrix DM room into its own session key while still using normal DM auth and allowlist checks.
|
||||
- `threadReplies: "off"` keeps replies top-level and keeps inbound threaded messages on the parent session.
|
||||
- `threadReplies: "inbound"` replies inside a thread only when the inbound message was already in that thread.
|
||||
- `threadReplies: "always"` keeps room replies in a thread rooted at the triggering message and routes that conversation through the matching thread-scoped session from the first triggering message.
|
||||
- `dm.threadReplies` overrides the top-level setting for DMs only. For example, you can keep room threads isolated while keeping DMs flat.
|
||||
- Inbound threaded messages include the thread root message as extra agent context.
|
||||
- Message-tool sends now auto-inherit the current Matrix thread when the target is the same room, or the same DM user target, unless an explicit `threadId` is provided.
|
||||
- When OpenClaw sees a Matrix DM room collide with another DM room on the same shared Matrix DM session, it posts a one-time `m.notice` in that room with the `/focus` escape hatch when thread bindings are enabled and the `dm.sessionScope` hint.
|
||||
- Runtime thread bindings are supported for Matrix. `/focus`, `/unfocus`, `/agents`, `/session idle`, `/session max-age`, and thread-bound `/acp spawn` now work in Matrix rooms and DMs.
|
||||
- Top-level Matrix room/DM `/focus` creates a new Matrix thread and binds it to the target session when `threadBindings.spawnSubagentSessions=true`.
|
||||
- Running `/focus` or `/acp spawn --thread here` inside an existing Matrix thread binds that current thread instead.
|
||||
@@ -842,8 +846,9 @@ Live directory lookup uses the logged-in Matrix account:
|
||||
- `mediaMaxMb`: media size cap in MB for Matrix media handling. It applies to outbound sends and inbound media processing.
|
||||
- `autoJoin`: invite auto-join policy (`always`, `allowlist`, `off`). Default: `off`.
|
||||
- `autoJoinAllowlist`: rooms/aliases allowed when `autoJoin` is `allowlist`. Alias entries are resolved to room IDs during invite handling; OpenClaw does not trust alias state claimed by the invited room.
|
||||
- `dm`: DM policy block (`enabled`, `policy`, `allowFrom`, `threadReplies`).
|
||||
- `dm`: DM policy block (`enabled`, `policy`, `allowFrom`, `sessionScope`, `threadReplies`).
|
||||
- `dm.allowFrom` entries should be full Matrix user IDs unless you already resolved them through live directory lookup.
|
||||
- `dm.sessionScope`: `per-user` (default) or `per-room`. Use `per-room` when you want each Matrix DM room to keep separate context even if the peer is the same.
|
||||
- `dm.threadReplies`: DM-only thread policy override (`off`, `inbound`, `always`). It overrides the top-level `threadReplies` setting for both reply placement and session isolation in DMs.
|
||||
- `execApprovals`: Matrix-native exec approval delivery (`enabled`, `approvers`, `target`, `agentFilter`, `sessionFilter`).
|
||||
- `execApprovals.approvers`: Matrix user IDs allowed to approve exec requests. Optional when `dm.allowFrom` already identifies the approvers.
|
||||
|
||||
@@ -655,6 +655,7 @@ Matrix is extension-backed and configured under `channels.matrix`.
|
||||
- `sessionFilter`: optional session key patterns (substring or regex).
|
||||
- `target`: where to send approval prompts. `"dm"` (default), `"channel"` (originating room), or `"both"`.
|
||||
- Per-account overrides: `channels.matrix.accounts.<id>.execApprovals`.
|
||||
- `channels.matrix.dm.sessionScope` controls how Matrix DMs group into sessions: `per-user` (default) shares by routed peer, while `per-room` isolates each DM room.
|
||||
- Matrix status probes and live directory lookups use the same proxy policy as runtime traffic.
|
||||
- Full Matrix configuration, targeting rules, and setup examples are documented in [Matrix](/channels/matrix).
|
||||
|
||||
|
||||
@@ -31,6 +31,18 @@ describe("MatrixConfigSchema SecretInput", () => {
|
||||
expect(result.success).toBe(true);
|
||||
});
|
||||
|
||||
it("accepts dm sessionScope overrides", () => {
|
||||
const result = MatrixConfigSchema.safeParse({
|
||||
homeserver: "https://matrix.example.org",
|
||||
accessToken: "token",
|
||||
dm: {
|
||||
policy: "pairing",
|
||||
sessionScope: "per-room",
|
||||
},
|
||||
});
|
||||
expect(result.success).toBe(true);
|
||||
});
|
||||
|
||||
it("accepts room-level account assignments", () => {
|
||||
const result = MatrixConfigSchema.safeParse({
|
||||
homeserver: "https://matrix.example.org",
|
||||
|
||||
@@ -104,6 +104,7 @@ export const MatrixConfigSchema = z.object({
|
||||
autoJoinAllowlist: AllowFromListSchema,
|
||||
groupAllowFrom: AllowFromListSchema,
|
||||
dm: buildNestedDmConfigSchema({
|
||||
sessionScope: z.enum(["per-user", "per-room"]).optional(),
|
||||
threadReplies: z.enum(["off", "inbound", "always"]).optional(),
|
||||
}),
|
||||
execApprovals: matrixExecApprovalsSchema,
|
||||
|
||||
@@ -31,6 +31,7 @@ type MatrixHandlerTestHarnessOptions = {
|
||||
replyToMode?: ReplyToMode;
|
||||
threadReplies?: "off" | "inbound" | "always";
|
||||
dmThreadReplies?: "off" | "inbound" | "always";
|
||||
dmSessionScope?: "per-user" | "per-room";
|
||||
streaming?: "partial" | "off";
|
||||
blockStreamingEnabled?: boolean;
|
||||
dmEnabled?: boolean;
|
||||
@@ -214,6 +215,7 @@ export function createMatrixHandlerTestHarness(
|
||||
replyToMode: options.replyToMode ?? "off",
|
||||
threadReplies: options.threadReplies ?? "inbound",
|
||||
dmThreadReplies: options.dmThreadReplies,
|
||||
dmSessionScope: options.dmSessionScope,
|
||||
streaming: options.streaming ?? "off",
|
||||
blockStreamingEnabled: options.blockStreamingEnabled ?? false,
|
||||
dmEnabled: options.dmEnabled ?? true,
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import {
|
||||
__testing as sessionBindingTesting,
|
||||
registerSessionBindingAdapter,
|
||||
@@ -682,6 +685,114 @@ describe("matrix monitor handler pairing account scope", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("posts a one-time notice when another Matrix DM room already owns the shared DM session", async () => {
|
||||
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "matrix-dm-shared-notice-"));
|
||||
const storePath = path.join(tempDir, "sessions.json");
|
||||
fs.writeFileSync(
|
||||
storePath,
|
||||
JSON.stringify({
|
||||
"agent:ops:main": {
|
||||
sessionId: "sess-main",
|
||||
updatedAt: Date.now(),
|
||||
deliveryContext: {
|
||||
channel: "matrix",
|
||||
to: "room:!other:example.org",
|
||||
accountId: "ops",
|
||||
},
|
||||
},
|
||||
}),
|
||||
"utf8",
|
||||
);
|
||||
const sendNotice = vi.fn(async () => "$notice");
|
||||
|
||||
try {
|
||||
const { handler } = createMatrixHandlerTestHarness({
|
||||
isDirectMessage: true,
|
||||
resolveStorePath: () => storePath,
|
||||
client: {
|
||||
sendMessage: sendNotice,
|
||||
},
|
||||
});
|
||||
|
||||
await handler(
|
||||
"!dm:example.org",
|
||||
createMatrixTextMessageEvent({
|
||||
eventId: "$dm1",
|
||||
body: "follow up",
|
||||
}),
|
||||
);
|
||||
|
||||
expect(sendNotice).toHaveBeenCalledWith(
|
||||
"!dm:example.org",
|
||||
expect.objectContaining({
|
||||
msgtype: "m.notice",
|
||||
body: expect.stringContaining("channels.matrix.dm.sessionScope"),
|
||||
}),
|
||||
);
|
||||
|
||||
await handler(
|
||||
"!dm:example.org",
|
||||
createMatrixTextMessageEvent({
|
||||
eventId: "$dm2",
|
||||
body: "again",
|
||||
}),
|
||||
);
|
||||
|
||||
expect(sendNotice).toHaveBeenCalledTimes(1);
|
||||
} finally {
|
||||
fs.rmSync(tempDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("skips the shared-session notice when Matrix DMs are isolated per room", async () => {
|
||||
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "matrix-dm-room-scope-"));
|
||||
const storePath = path.join(tempDir, "sessions.json");
|
||||
fs.writeFileSync(
|
||||
storePath,
|
||||
JSON.stringify({
|
||||
"agent:ops:main": {
|
||||
sessionId: "sess-main",
|
||||
updatedAt: Date.now(),
|
||||
deliveryContext: {
|
||||
channel: "matrix",
|
||||
to: "room:!other:example.org",
|
||||
accountId: "ops",
|
||||
},
|
||||
},
|
||||
}),
|
||||
"utf8",
|
||||
);
|
||||
const sendNotice = vi.fn(async () => "$notice");
|
||||
|
||||
try {
|
||||
const { handler, recordInboundSession } = createMatrixHandlerTestHarness({
|
||||
isDirectMessage: true,
|
||||
dmSessionScope: "per-room",
|
||||
resolveStorePath: () => storePath,
|
||||
client: {
|
||||
sendMessage: sendNotice,
|
||||
},
|
||||
});
|
||||
|
||||
await handler(
|
||||
"!dm:example.org",
|
||||
createMatrixTextMessageEvent({
|
||||
eventId: "$dm1",
|
||||
body: "follow up",
|
||||
}),
|
||||
);
|
||||
|
||||
expect(sendNotice).not.toHaveBeenCalled();
|
||||
expect(recordInboundSession).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
sessionKey: "agent:ops:matrix:channel:!dm:example.org",
|
||||
}),
|
||||
);
|
||||
} finally {
|
||||
fs.rmSync(tempDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("uses stable room ids instead of room-declared aliases in group context", async () => {
|
||||
const { handler, finalizeInboundContext } = createMatrixHandlerTestHarness({
|
||||
isDirectMessage: false,
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
import { resolveControlCommandGate } from "openclaw/plugin-sdk/command-auth";
|
||||
import { resolveChannelContextVisibilityMode } from "openclaw/plugin-sdk/config-runtime";
|
||||
import {
|
||||
loadSessionStore,
|
||||
resolveChannelContextVisibilityMode,
|
||||
resolveSessionStoreEntry,
|
||||
} from "openclaw/plugin-sdk/config-runtime";
|
||||
import { getSessionBindingService } from "openclaw/plugin-sdk/conversation-runtime";
|
||||
import { evaluateSupplementalContextVisibility } from "openclaw/plugin-sdk/security-runtime";
|
||||
import type { CoreConfig, MatrixRoomConfig, ReplyToMode } from "../../types.js";
|
||||
@@ -68,6 +72,7 @@ import { isMatrixVerificationRoomMessage } from "./verification-utils.js";
|
||||
const ALLOW_FROM_STORE_CACHE_TTL_MS = 30_000;
|
||||
const PAIRING_REPLY_COOLDOWN_MS = 5 * 60_000;
|
||||
const MAX_TRACKED_PAIRING_REPLY_SENDERS = 512;
|
||||
const MAX_TRACKED_SHARED_DM_CONTEXT_NOTICES = 512;
|
||||
type MatrixAllowBotsMode = "off" | "mentions" | "all";
|
||||
|
||||
export type MatrixMonitorHandlerParams = {
|
||||
@@ -88,6 +93,8 @@ export type MatrixMonitorHandlerParams = {
|
||||
threadReplies: "off" | "inbound" | "always";
|
||||
/** DM-specific threadReplies override. Falls back to threadReplies when absent. */
|
||||
dmThreadReplies?: "off" | "inbound" | "always";
|
||||
/** DM session grouping behavior. */
|
||||
dmSessionScope?: "per-user" | "per-room";
|
||||
streaming: "partial" | "off";
|
||||
blockStreamingEnabled: boolean;
|
||||
dmEnabled: boolean;
|
||||
@@ -163,6 +170,103 @@ function resolveMatrixInboundBodyText(params: {
|
||||
});
|
||||
}
|
||||
|
||||
function trimMaybeString(value: unknown): string | undefined {
|
||||
if (typeof value !== "string") {
|
||||
return undefined;
|
||||
}
|
||||
const trimmed = value.trim();
|
||||
return trimmed.length > 0 ? trimmed : undefined;
|
||||
}
|
||||
|
||||
function rememberTrackedRoom(set: Set<string>, roomId: string): void {
|
||||
set.add(roomId);
|
||||
if (set.size > MAX_TRACKED_SHARED_DM_CONTEXT_NOTICES) {
|
||||
const oldest = set.keys().next().value;
|
||||
if (typeof oldest === "string") {
|
||||
set.delete(oldest);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function extractMatrixRoomIdFromTarget(value: unknown): string | undefined {
|
||||
const trimmed = trimMaybeString(value);
|
||||
if (!trimmed) {
|
||||
return undefined;
|
||||
}
|
||||
return trimmed.startsWith("room:") ? trimMaybeString(trimmed.slice("room:".length)) : undefined;
|
||||
}
|
||||
|
||||
function resolveMatrixSharedDmContextNotice(params: {
|
||||
storePath: string;
|
||||
sessionKey: string;
|
||||
roomId: string;
|
||||
accountId: string;
|
||||
dmSessionScope?: "per-user" | "per-room";
|
||||
sentRooms: Set<string>;
|
||||
logVerboseMessage: (message: string) => void;
|
||||
}): string | null {
|
||||
if ((params.dmSessionScope ?? "per-user") === "per-room") {
|
||||
return null;
|
||||
}
|
||||
if (params.sentRooms.has(params.roomId)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
const store = loadSessionStore(params.storePath);
|
||||
const existing = resolveSessionStoreEntry({
|
||||
store,
|
||||
sessionKey: params.sessionKey,
|
||||
}).existing as
|
||||
| {
|
||||
deliveryContext?: { channel?: unknown; to?: unknown; accountId?: unknown };
|
||||
origin?: { provider?: unknown; to?: unknown; accountId?: unknown };
|
||||
lastChannel?: unknown;
|
||||
lastTo?: unknown;
|
||||
lastAccountId?: unknown;
|
||||
}
|
||||
| undefined;
|
||||
if (!existing) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const priorChannel =
|
||||
trimMaybeString(existing.deliveryContext?.channel) ??
|
||||
trimMaybeString(existing.lastChannel) ??
|
||||
trimMaybeString(existing.origin?.provider);
|
||||
if (priorChannel && priorChannel !== "matrix") {
|
||||
return null;
|
||||
}
|
||||
|
||||
const priorAccountId =
|
||||
trimMaybeString(existing.deliveryContext?.accountId) ??
|
||||
trimMaybeString(existing.lastAccountId) ??
|
||||
trimMaybeString(existing.origin?.accountId);
|
||||
if (priorAccountId && priorAccountId !== params.accountId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const priorRoomId =
|
||||
extractMatrixRoomIdFromTarget(existing.deliveryContext?.to) ??
|
||||
extractMatrixRoomIdFromTarget(existing.lastTo) ??
|
||||
extractMatrixRoomIdFromTarget(existing.origin?.to);
|
||||
if (!priorRoomId || priorRoomId === params.roomId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return [
|
||||
"This Matrix DM is sharing a session with another Matrix DM room.",
|
||||
"Use /focus here for a one-off isolated thread session when thread bindings are enabled, or set",
|
||||
"channels.matrix.dm.sessionScope to per-room to isolate each Matrix DM room.",
|
||||
].join(" ");
|
||||
} catch (err) {
|
||||
params.logVerboseMessage(
|
||||
`matrix: failed checking shared DM session notice room=${params.roomId} (${String(err)})`,
|
||||
);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function resolveMatrixPendingHistoryText(params: {
|
||||
mentionPrecheckText: string;
|
||||
content: RoomMessageEventContent;
|
||||
@@ -214,6 +318,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
replyToMode,
|
||||
threadReplies,
|
||||
dmThreadReplies,
|
||||
dmSessionScope,
|
||||
streaming,
|
||||
blockStreamingEnabled,
|
||||
dmEnabled,
|
||||
@@ -252,6 +357,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
});
|
||||
const roomHistoryTracker = createRoomHistoryTracker();
|
||||
const roomIngressTails = new Map<string, Promise<void>>();
|
||||
const sharedDmContextNoticeRooms = new Set<string>();
|
||||
|
||||
const readStoreAllowFrom = async (): Promise<string[]> => {
|
||||
const now = Date.now();
|
||||
@@ -672,6 +778,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
roomId,
|
||||
senderId,
|
||||
isDirectMessage,
|
||||
dmSessionScope,
|
||||
threadId: thread.threadId,
|
||||
eventTs: eventTs ?? undefined,
|
||||
resolveAgentRoute: core.channel.routing.resolveAgentRoute,
|
||||
@@ -1023,6 +1130,17 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
storePath,
|
||||
sessionKey: _route.sessionKey,
|
||||
});
|
||||
const sharedDmContextNotice = isDirectMessage
|
||||
? resolveMatrixSharedDmContextNotice({
|
||||
storePath,
|
||||
sessionKey: _route.sessionKey,
|
||||
roomId,
|
||||
accountId: _route.accountId,
|
||||
dmSessionScope,
|
||||
sentRooms: sharedDmContextNoticeRooms,
|
||||
logVerboseMessage,
|
||||
})
|
||||
: null;
|
||||
const body = core.channel.reply.formatAgentEnvelope({
|
||||
channel: "Matrix",
|
||||
from: envelopeFrom,
|
||||
@@ -1090,6 +1208,20 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
},
|
||||
});
|
||||
|
||||
if (sharedDmContextNotice) {
|
||||
rememberTrackedRoom(sharedDmContextNoticeRooms, roomId);
|
||||
client
|
||||
.sendMessage(roomId, {
|
||||
msgtype: "m.notice",
|
||||
body: sharedDmContextNotice,
|
||||
})
|
||||
.catch((err) => {
|
||||
logVerboseMessage(
|
||||
`matrix: failed sending shared DM session notice room=${roomId}: ${String(err)}`,
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
const preview = bodyText.slice(0, 200).replace(/\n/g, "\\n");
|
||||
logVerboseMessage(`matrix inbound: room=${roomId} from=${senderId} preview="${preview}"`);
|
||||
|
||||
|
||||
@@ -207,6 +207,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
const dmEnabled = dmConfig?.enabled ?? true;
|
||||
const dmPolicyRaw = dmConfig?.policy ?? "pairing";
|
||||
const dmPolicy = allowlistOnly && dmPolicyRaw !== "disabled" ? "allowlist" : dmPolicyRaw;
|
||||
const dmSessionScope = dmConfig?.sessionScope ?? "per-user";
|
||||
const textLimit = core.channel.text.resolveTextChunkLimit(cfg, "matrix", effectiveAccountId);
|
||||
const globalGroupChatHistoryLimit = (
|
||||
cfg.messages as { groupChat?: { historyLimit?: number } } | undefined
|
||||
@@ -271,6 +272,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
replyToMode,
|
||||
threadReplies,
|
||||
dmThreadReplies,
|
||||
dmSessionScope,
|
||||
streaming,
|
||||
blockStreamingEnabled,
|
||||
dmEnabled,
|
||||
|
||||
@@ -169,6 +169,7 @@ export async function handleInboundMatrixReaction(params: {
|
||||
roomId: params.roomId,
|
||||
senderId: params.senderId,
|
||||
isDirectMessage: params.isDirectMessage,
|
||||
dmSessionScope: accountConfig.dm?.sessionScope ?? "per-user",
|
||||
threadId: thread.threadId,
|
||||
eventTs: params.event.origin_server_ts,
|
||||
resolveAgentRoute: params.core.channel.routing.resolveAgentRoute,
|
||||
|
||||
@@ -17,13 +17,19 @@ const baseCfg = {
|
||||
},
|
||||
} satisfies OpenClawConfig;
|
||||
|
||||
function resolveDmRoute(cfg: OpenClawConfig) {
|
||||
function resolveDmRoute(
|
||||
cfg: OpenClawConfig,
|
||||
opts: {
|
||||
dmSessionScope?: "per-user" | "per-room";
|
||||
} = {},
|
||||
) {
|
||||
return resolveMatrixInboundRoute({
|
||||
cfg,
|
||||
accountId: "ops",
|
||||
roomId: "!dm:example.org",
|
||||
senderId: "@alice:example.org",
|
||||
isDirectMessage: true,
|
||||
dmSessionScope: opts.dmSessionScope,
|
||||
resolveAgentRoute,
|
||||
});
|
||||
}
|
||||
@@ -97,6 +103,33 @@ describe("resolveMatrixInboundRoute", () => {
|
||||
expect(route.sessionKey).toBe("agent:room-agent:main");
|
||||
});
|
||||
|
||||
it("can isolate Matrix DMs per room without changing agent selection", () => {
|
||||
const cfg = {
|
||||
...baseCfg,
|
||||
bindings: [
|
||||
{
|
||||
agentId: "sender-agent",
|
||||
match: {
|
||||
channel: "matrix",
|
||||
accountId: "ops",
|
||||
peer: { kind: "direct", id: "@alice:example.org" },
|
||||
},
|
||||
},
|
||||
],
|
||||
} satisfies OpenClawConfig;
|
||||
|
||||
const { route, configuredBinding } = resolveDmRoute(cfg, {
|
||||
dmSessionScope: "per-room",
|
||||
});
|
||||
|
||||
expect(configuredBinding).toBeNull();
|
||||
expect(route.agentId).toBe("sender-agent");
|
||||
expect(route.matchedBy).toBe("binding.peer");
|
||||
expect(route.sessionKey).toBe("agent:sender-agent:matrix:channel:!dm:example.org");
|
||||
expect(route.mainSessionKey).toBe("agent:sender-agent:main");
|
||||
expect(route.lastRoutePolicy).toBe("session");
|
||||
});
|
||||
|
||||
it("lets configured ACP room bindings override DM parent-peer routing", () => {
|
||||
const cfg = {
|
||||
...baseCfg,
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { deriveLastRoutePolicy } from "openclaw/plugin-sdk/routing";
|
||||
import { buildAgentSessionKey, deriveLastRoutePolicy } from "openclaw/plugin-sdk/routing";
|
||||
import {
|
||||
getSessionBindingService,
|
||||
resolveAgentIdFromSessionKey,
|
||||
@@ -10,12 +10,34 @@ import { resolveMatrixThreadSessionKeys } from "./threads.js";
|
||||
|
||||
type MatrixResolvedRoute = ReturnType<PluginRuntime["channel"]["routing"]["resolveAgentRoute"]>;
|
||||
|
||||
function resolveMatrixDmSessionKey(params: {
|
||||
accountId: string;
|
||||
agentId: string;
|
||||
roomId: string;
|
||||
dmSessionScope?: "per-user" | "per-room";
|
||||
fallbackSessionKey: string;
|
||||
}): string {
|
||||
if (params.dmSessionScope !== "per-room") {
|
||||
return params.fallbackSessionKey;
|
||||
}
|
||||
return buildAgentSessionKey({
|
||||
agentId: params.agentId,
|
||||
channel: "matrix",
|
||||
accountId: params.accountId,
|
||||
peer: {
|
||||
kind: "channel",
|
||||
id: params.roomId,
|
||||
},
|
||||
}).toLowerCase();
|
||||
}
|
||||
|
||||
export function resolveMatrixInboundRoute(params: {
|
||||
cfg: CoreConfig;
|
||||
accountId: string;
|
||||
roomId: string;
|
||||
senderId: string;
|
||||
isDirectMessage: boolean;
|
||||
dmSessionScope?: "per-user" | "per-room";
|
||||
threadId?: string;
|
||||
eventTs?: number;
|
||||
resolveAgentRoute: PluginRuntime["channel"]["routing"]["resolveAgentRoute"];
|
||||
@@ -98,21 +120,39 @@ export function resolveMatrixInboundRoute(params: {
|
||||
}
|
||||
: baseRoute;
|
||||
|
||||
const dmSessionKey = params.isDirectMessage
|
||||
? resolveMatrixDmSessionKey({
|
||||
accountId: params.accountId,
|
||||
agentId: effectiveRoute.agentId,
|
||||
roomId: params.roomId,
|
||||
dmSessionScope: params.dmSessionScope,
|
||||
fallbackSessionKey: effectiveRoute.sessionKey,
|
||||
})
|
||||
: effectiveRoute.sessionKey;
|
||||
const routeWithDmScope =
|
||||
dmSessionKey === effectiveRoute.sessionKey
|
||||
? effectiveRoute
|
||||
: {
|
||||
...effectiveRoute,
|
||||
sessionKey: dmSessionKey,
|
||||
lastRoutePolicy: "session" as const,
|
||||
};
|
||||
|
||||
// When no binding overrides the session key, isolate threads into their own sessions.
|
||||
if (!configuredBinding && !configuredSessionKey && params.threadId) {
|
||||
const threadKeys = resolveMatrixThreadSessionKeys({
|
||||
baseSessionKey: effectiveRoute.sessionKey,
|
||||
baseSessionKey: routeWithDmScope.sessionKey,
|
||||
threadId: params.threadId,
|
||||
parentSessionKey: effectiveRoute.sessionKey,
|
||||
parentSessionKey: routeWithDmScope.sessionKey,
|
||||
});
|
||||
return {
|
||||
route: {
|
||||
...effectiveRoute,
|
||||
...routeWithDmScope,
|
||||
sessionKey: threadKeys.sessionKey,
|
||||
mainSessionKey: threadKeys.parentSessionKey ?? effectiveRoute.sessionKey,
|
||||
mainSessionKey: threadKeys.parentSessionKey ?? routeWithDmScope.sessionKey,
|
||||
lastRoutePolicy: deriveLastRoutePolicy({
|
||||
sessionKey: threadKeys.sessionKey,
|
||||
mainSessionKey: threadKeys.parentSessionKey ?? effectiveRoute.sessionKey,
|
||||
mainSessionKey: threadKeys.parentSessionKey ?? routeWithDmScope.sessionKey,
|
||||
}),
|
||||
},
|
||||
configuredBinding,
|
||||
@@ -121,7 +161,7 @@ export function resolveMatrixInboundRoute(params: {
|
||||
}
|
||||
|
||||
return {
|
||||
route: effectiveRoute,
|
||||
route: routeWithDmScope,
|
||||
configuredBinding,
|
||||
runtimeBindingId: null,
|
||||
};
|
||||
|
||||
135
extensions/matrix/src/session-route.test.ts
Normal file
135
extensions/matrix/src/session-route.test.ts
Normal file
@@ -0,0 +1,135 @@
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import type { OpenClawConfig } from "./runtime-api.js";
|
||||
import { resolveMatrixOutboundSessionRoute } from "./session-route.js";
|
||||
|
||||
const tempDirs = new Set<string>();
|
||||
|
||||
function createTempStore(entries: Record<string, unknown>): string {
|
||||
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "matrix-session-route-"));
|
||||
tempDirs.add(tempDir);
|
||||
const storePath = path.join(tempDir, "sessions.json");
|
||||
fs.writeFileSync(storePath, JSON.stringify(entries), "utf8");
|
||||
return storePath;
|
||||
}
|
||||
|
||||
afterEach(() => {
|
||||
for (const tempDir of tempDirs) {
|
||||
fs.rmSync(tempDir, { recursive: true, force: true });
|
||||
}
|
||||
tempDirs.clear();
|
||||
});
|
||||
|
||||
describe("resolveMatrixOutboundSessionRoute", () => {
|
||||
it("reuses the current DM room session for same-user sends when Matrix DMs are per-room", () => {
|
||||
const storePath = createTempStore({
|
||||
"agent:main:matrix:channel:!dm:example.org": {
|
||||
sessionId: "sess-1",
|
||||
updatedAt: Date.now(),
|
||||
chatType: "direct",
|
||||
origin: {
|
||||
chatType: "direct",
|
||||
from: "matrix:@alice:example.org",
|
||||
to: "room:!dm:example.org",
|
||||
accountId: "ops",
|
||||
},
|
||||
deliveryContext: {
|
||||
channel: "matrix",
|
||||
to: "room:!dm:example.org",
|
||||
accountId: "ops",
|
||||
},
|
||||
},
|
||||
});
|
||||
const cfg = {
|
||||
session: {
|
||||
store: storePath,
|
||||
},
|
||||
channels: {
|
||||
matrix: {
|
||||
dm: {
|
||||
sessionScope: "per-room",
|
||||
},
|
||||
},
|
||||
},
|
||||
} satisfies OpenClawConfig;
|
||||
|
||||
const route = resolveMatrixOutboundSessionRoute({
|
||||
cfg,
|
||||
agentId: "main",
|
||||
accountId: "ops",
|
||||
currentSessionKey: "agent:main:matrix:channel:!dm:example.org",
|
||||
target: "@alice:example.org",
|
||||
resolvedTarget: {
|
||||
to: "@alice:example.org",
|
||||
kind: "user",
|
||||
source: "normalized",
|
||||
},
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:matrix:channel:!dm:example.org",
|
||||
baseSessionKey: "agent:main:matrix:channel:!dm:example.org",
|
||||
peer: { kind: "channel", id: "!dm:example.org" },
|
||||
chatType: "direct",
|
||||
from: "matrix:@alice:example.org",
|
||||
to: "room:!dm:example.org",
|
||||
});
|
||||
});
|
||||
|
||||
it("falls back to user-scoped routing when the current session is for another DM peer", () => {
|
||||
const storePath = createTempStore({
|
||||
"agent:main:matrix:channel:!dm:example.org": {
|
||||
sessionId: "sess-1",
|
||||
updatedAt: Date.now(),
|
||||
chatType: "direct",
|
||||
origin: {
|
||||
chatType: "direct",
|
||||
from: "matrix:@bob:example.org",
|
||||
to: "room:!dm:example.org",
|
||||
accountId: "ops",
|
||||
},
|
||||
deliveryContext: {
|
||||
channel: "matrix",
|
||||
to: "room:!dm:example.org",
|
||||
accountId: "ops",
|
||||
},
|
||||
},
|
||||
});
|
||||
const cfg = {
|
||||
session: {
|
||||
store: storePath,
|
||||
},
|
||||
channels: {
|
||||
matrix: {
|
||||
dm: {
|
||||
sessionScope: "per-room",
|
||||
},
|
||||
},
|
||||
},
|
||||
} satisfies OpenClawConfig;
|
||||
|
||||
const route = resolveMatrixOutboundSessionRoute({
|
||||
cfg,
|
||||
agentId: "main",
|
||||
accountId: "ops",
|
||||
currentSessionKey: "agent:main:matrix:channel:!dm:example.org",
|
||||
target: "@alice:example.org",
|
||||
resolvedTarget: {
|
||||
to: "@alice:example.org",
|
||||
kind: "user",
|
||||
source: "normalized",
|
||||
},
|
||||
});
|
||||
|
||||
expect(route).toMatchObject({
|
||||
sessionKey: "agent:main:main",
|
||||
baseSessionKey: "agent:main:main",
|
||||
peer: { kind: "direct", id: "@alice:example.org" },
|
||||
chatType: "direct",
|
||||
from: "matrix:@alice:example.org",
|
||||
to: "room:@alice:example.org",
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1,29 +1,125 @@
|
||||
import {
|
||||
loadSessionStore,
|
||||
resolveSessionStoreEntry,
|
||||
resolveStorePath,
|
||||
} from "openclaw/plugin-sdk/config-runtime";
|
||||
import {
|
||||
buildChannelOutboundSessionRoute,
|
||||
stripChannelTargetPrefix,
|
||||
stripTargetKindPrefix,
|
||||
type ChannelOutboundSessionRouteParams,
|
||||
} from "openclaw/plugin-sdk/channel-core";
|
||||
import { normalizeAccountId } from "openclaw/plugin-sdk/account-id";
|
||||
import { resolveMatrixAccountConfig } from "./matrix/account-config.js";
|
||||
import { resolveMatrixDirectUserId, resolveMatrixTargetIdentity } from "./matrix/target-ids.js";
|
||||
|
||||
function resolveMatrixDmSessionScope(
|
||||
params: Pick<ChannelOutboundSessionRouteParams, "cfg" | "accountId">,
|
||||
): "per-user" | "per-room" {
|
||||
return (
|
||||
resolveMatrixAccountConfig({
|
||||
cfg: params.cfg,
|
||||
accountId: params.accountId,
|
||||
}).dm?.sessionScope ?? "per-user"
|
||||
);
|
||||
}
|
||||
|
||||
function resolveMatrixRoomTargetId(value: unknown): string | undefined {
|
||||
if (typeof value !== "string") {
|
||||
return undefined;
|
||||
}
|
||||
const target = resolveMatrixTargetIdentity(value);
|
||||
return target?.kind === "room" && target.id.startsWith("!") ? target.id : undefined;
|
||||
}
|
||||
|
||||
function resolveMatrixSessionAccountId(value: unknown): string | undefined {
|
||||
if (typeof value !== "string") {
|
||||
return undefined;
|
||||
}
|
||||
const trimmed = value.trim();
|
||||
return trimmed ? normalizeAccountId(trimmed) : undefined;
|
||||
}
|
||||
|
||||
function resolveMatrixCurrentDmRoomId(params: {
|
||||
cfg: ChannelOutboundSessionRouteParams["cfg"];
|
||||
agentId: string;
|
||||
accountId?: string | null;
|
||||
currentSessionKey?: string;
|
||||
targetUserId: string;
|
||||
}): string | undefined {
|
||||
const sessionKey = params.currentSessionKey?.trim();
|
||||
if (!sessionKey) {
|
||||
return undefined;
|
||||
}
|
||||
try {
|
||||
const storePath = resolveStorePath(params.cfg.session?.store, {
|
||||
agentId: params.agentId,
|
||||
});
|
||||
const store = loadSessionStore(storePath);
|
||||
const existing = resolveSessionStoreEntry({
|
||||
store,
|
||||
sessionKey,
|
||||
}).existing;
|
||||
if (!existing) {
|
||||
return undefined;
|
||||
}
|
||||
const currentAccountId =
|
||||
resolveMatrixSessionAccountId(
|
||||
existing.deliveryContext?.accountId ?? existing.lastAccountId ?? existing.origin?.accountId,
|
||||
) ?? undefined;
|
||||
if (!currentAccountId || currentAccountId !== normalizeAccountId(params.accountId)) {
|
||||
return undefined;
|
||||
}
|
||||
const currentUserId = resolveMatrixDirectUserId({
|
||||
from: existing.origin?.from,
|
||||
to: existing.deliveryContext?.to ?? existing.lastTo ?? existing.origin?.to,
|
||||
chatType: existing.origin?.chatType ?? existing.chatType,
|
||||
});
|
||||
if (!currentUserId || currentUserId !== params.targetUserId) {
|
||||
return undefined;
|
||||
}
|
||||
return resolveMatrixRoomTargetId(
|
||||
existing.deliveryContext?.to ?? existing.lastTo ?? existing.origin?.to,
|
||||
);
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
export function resolveMatrixOutboundSessionRoute(params: ChannelOutboundSessionRouteParams) {
|
||||
const stripped = stripChannelTargetPrefix(params.target, "matrix");
|
||||
const isUser =
|
||||
params.resolvedTarget?.kind === "user" || stripped.startsWith("@") || /^user:/i.test(stripped);
|
||||
const rawId = stripTargetKindPrefix(stripped);
|
||||
if (!rawId) {
|
||||
const target =
|
||||
resolveMatrixTargetIdentity(params.resolvedTarget?.to ?? params.target) ??
|
||||
resolveMatrixTargetIdentity(params.target);
|
||||
if (!target) {
|
||||
return null;
|
||||
}
|
||||
const roomScopedDmId =
|
||||
target.kind === "user" && resolveMatrixDmSessionScope(params) === "per-room"
|
||||
? resolveMatrixCurrentDmRoomId({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
accountId: params.accountId,
|
||||
currentSessionKey: params.currentSessionKey,
|
||||
targetUserId: target.id,
|
||||
})
|
||||
: undefined;
|
||||
const peer =
|
||||
roomScopedDmId !== undefined
|
||||
? { kind: "channel" as const, id: roomScopedDmId }
|
||||
: {
|
||||
kind: target.kind === "user" ? ("direct" as const) : ("channel" as const),
|
||||
id: target.id,
|
||||
};
|
||||
const chatType = target.kind === "user" ? "direct" : "channel";
|
||||
const from = target.kind === "user" ? `matrix:${target.id}` : `matrix:channel:${target.id}`;
|
||||
const to = `room:${roomScopedDmId ?? target.id}`;
|
||||
|
||||
return buildChannelOutboundSessionRoute({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
channel: "matrix",
|
||||
accountId: params.accountId,
|
||||
peer: {
|
||||
kind: isUser ? "direct" : "channel",
|
||||
id: rawId,
|
||||
},
|
||||
chatType: isUser ? "direct" : "channel",
|
||||
from: isUser ? `matrix:${rawId}` : `matrix:channel:${rawId}`,
|
||||
to: `room:${rawId}`,
|
||||
peer,
|
||||
chatType,
|
||||
from,
|
||||
to,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -16,6 +16,12 @@ export type MatrixDmConfig = {
|
||||
policy?: DmPolicy;
|
||||
/** Allowlist for DM senders (matrix user IDs or "*"). */
|
||||
allowFrom?: Array<string | number>;
|
||||
/**
|
||||
* How Matrix DMs map to sessions.
|
||||
* - `per-user` (default): all DM rooms with the same routed peer share one DM session.
|
||||
* - `per-room`: each Matrix DM room gets its own session key.
|
||||
*/
|
||||
sessionScope?: "per-user" | "per-room";
|
||||
/** Per-DM thread reply behavior override (off|inbound|always). Overrides top-level threadReplies for direct messages. */
|
||||
threadReplies?: "off" | "inbound" | "always";
|
||||
};
|
||||
|
||||
@@ -534,6 +534,7 @@ export type ChannelMessagingAdapter = {
|
||||
agentId: string;
|
||||
accountId?: string | null;
|
||||
target: string;
|
||||
currentSessionKey?: string;
|
||||
resolvedTarget?: {
|
||||
to: string;
|
||||
kind: ChannelDirectoryEntryKind | "channel";
|
||||
|
||||
@@ -502,6 +502,7 @@ async function handleSendAction(ctx: ResolvedActionContext): Promise<MessageActi
|
||||
accountId,
|
||||
toolContext: input.toolContext,
|
||||
agentId,
|
||||
currentSessionKey: input.sessionKey,
|
||||
dryRun,
|
||||
resolvedTarget,
|
||||
resolveAutoThreadId: getChannelPlugin(channel)?.threading?.resolveAutoThreadId,
|
||||
|
||||
@@ -47,6 +47,7 @@ export async function prepareOutboundMirrorRoute(params: {
|
||||
accountId?: string | null;
|
||||
toolContext?: ChannelThreadingToolContext;
|
||||
agentId?: string;
|
||||
currentSessionKey?: string;
|
||||
dryRun?: boolean;
|
||||
resolvedTarget?: ResolvedMessagingTarget;
|
||||
resolveAutoThreadId?: ResolveAutoThreadId;
|
||||
@@ -80,6 +81,7 @@ export async function prepareOutboundMirrorRoute(params: {
|
||||
agentId: params.agentId,
|
||||
accountId: params.accountId,
|
||||
target: params.to,
|
||||
currentSessionKey: params.currentSessionKey,
|
||||
resolvedTarget: params.resolvedTarget,
|
||||
replyToId,
|
||||
threadId: resolvedThreadId,
|
||||
|
||||
@@ -26,6 +26,7 @@ export type ResolveOutboundSessionRouteParams = {
|
||||
agentId: string;
|
||||
accountId?: string | null;
|
||||
target: string;
|
||||
currentSessionKey?: string;
|
||||
resolvedTarget?: ResolvedMessagingTarget;
|
||||
replyToId?: string | null;
|
||||
threadId?: string | number | null;
|
||||
|
||||
Reference in New Issue
Block a user