mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-19 05:50:47 +00:00
Matrix: productize direct room repair
This commit is contained in:
@@ -403,6 +403,28 @@ Remove stale OpenClaw-managed devices with:
|
||||
openclaw matrix devices prune-stale
|
||||
```
|
||||
|
||||
### Direct Room Repair
|
||||
|
||||
If direct-message state gets out of sync, OpenClaw can end up with stale `m.direct` mappings that point at old solo rooms instead of the live DM. Inspect the current mapping for a peer with:
|
||||
|
||||
```bash
|
||||
openclaw matrix direct inspect --user-id @alice:example.org
|
||||
```
|
||||
|
||||
Repair it with:
|
||||
|
||||
```bash
|
||||
openclaw matrix direct repair --user-id @alice:example.org
|
||||
```
|
||||
|
||||
Repair keeps the Matrix-specific logic inside the plugin:
|
||||
|
||||
- it prefers a strict 1:1 DM that is already mapped in `m.direct`
|
||||
- otherwise it falls back to any currently joined strict 1:1 DM with that user
|
||||
- if no healthy DM exists, it creates a fresh direct room and rewrites `m.direct` to point at it
|
||||
|
||||
The repair flow does not delete old rooms automatically. It only picks the healthy DM and updates the mapping so new Matrix sends, verification notices, and other direct-message flows target the right room again.
|
||||
|
||||
## Threads
|
||||
|
||||
Matrix supports native Matrix threads for both automatic replies and message-tool sends.
|
||||
|
||||
@@ -6,6 +6,7 @@ import {
|
||||
} from "openclaw/plugin-sdk/matrix";
|
||||
import { matrixPlugin } from "./channel.js";
|
||||
import { resolveMatrixAccount, resolveMatrixAccountConfig } from "./matrix/accounts.js";
|
||||
import { withResolvedActionClient, withStartedActionClient } from "./matrix/actions/client.js";
|
||||
import { listMatrixOwnDevices, pruneMatrixStaleGatewayDevices } from "./matrix/actions/devices.js";
|
||||
import { updateMatrixOwnProfile } from "./matrix/actions/profile.js";
|
||||
import {
|
||||
@@ -21,6 +22,11 @@ import { resolveMatrixAuthContext } from "./matrix/client.js";
|
||||
import { setMatrixSdkConsoleLogging, setMatrixSdkLogMode } from "./matrix/client/logging.js";
|
||||
import { resolveMatrixConfigPath, updateMatrixAccountConfig } from "./matrix/config-update.js";
|
||||
import { isOpenClawManagedMatrixDevice } from "./matrix/device-health.js";
|
||||
import {
|
||||
inspectMatrixDirectRooms,
|
||||
repairMatrixDirectRooms,
|
||||
type MatrixDirectRoomCandidate,
|
||||
} from "./matrix/direct-management.js";
|
||||
import { applyMatrixProfileUpdate, type MatrixProfileUpdateResult } from "./profile-update.js";
|
||||
import { getMatrixRuntime } from "./runtime.js";
|
||||
import type { CoreConfig } from "./types.js";
|
||||
@@ -309,6 +315,87 @@ async function addMatrixAccount(params: {
|
||||
};
|
||||
}
|
||||
|
||||
function printDirectRoomCandidate(room: MatrixCliDirectRoomCandidate): void {
|
||||
const members =
|
||||
room.joinedMembers === null ? "unavailable" : room.joinedMembers.join(", ") || "none";
|
||||
console.log(
|
||||
`- ${room.roomId} [${room.source}] strict=${room.strict ? "yes" : "no"} joined=${members}`,
|
||||
);
|
||||
}
|
||||
|
||||
function printDirectRoomInspection(result: MatrixCliDirectRoomInspection): void {
|
||||
printAccountLabel(result.accountId);
|
||||
console.log(`Peer: ${result.remoteUserId}`);
|
||||
console.log(`Self: ${result.selfUserId ?? "unknown"}`);
|
||||
console.log(`Active direct room: ${result.activeRoomId ?? "none"}`);
|
||||
console.log(
|
||||
`Mapped rooms: ${result.mappedRoomIds.length ? result.mappedRoomIds.join(", ") : "none"}`,
|
||||
);
|
||||
console.log(
|
||||
`Discovered strict rooms: ${result.discoveredStrictRoomIds.length ? result.discoveredStrictRoomIds.join(", ") : "none"}`,
|
||||
);
|
||||
if (result.mappedRooms.length > 0) {
|
||||
console.log("Mapped room details:");
|
||||
for (const room of result.mappedRooms) {
|
||||
printDirectRoomCandidate(room);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function inspectMatrixDirectRoom(params: {
|
||||
accountId: string;
|
||||
userId: string;
|
||||
}): Promise<MatrixCliDirectRoomInspection> {
|
||||
return await withResolvedActionClient(
|
||||
{ accountId: params.accountId },
|
||||
async (client) => {
|
||||
const inspection = await inspectMatrixDirectRooms({
|
||||
client,
|
||||
remoteUserId: params.userId,
|
||||
});
|
||||
return {
|
||||
accountId: params.accountId,
|
||||
remoteUserId: inspection.remoteUserId,
|
||||
selfUserId: inspection.selfUserId,
|
||||
mappedRoomIds: inspection.mappedRoomIds,
|
||||
mappedRooms: inspection.mappedRooms.map(toCliDirectRoomCandidate),
|
||||
discoveredStrictRoomIds: inspection.discoveredStrictRoomIds,
|
||||
activeRoomId: inspection.activeRoomId,
|
||||
};
|
||||
},
|
||||
"persist",
|
||||
);
|
||||
}
|
||||
|
||||
async function repairMatrixDirectRoom(params: {
|
||||
accountId: string;
|
||||
userId: string;
|
||||
}): Promise<MatrixCliDirectRoomRepair> {
|
||||
const cfg = getMatrixRuntime().config.loadConfig() as CoreConfig;
|
||||
const account = resolveMatrixAccount({ cfg, accountId: params.accountId });
|
||||
return await withStartedActionClient({ accountId: params.accountId }, async (client) => {
|
||||
const repaired = await repairMatrixDirectRooms({
|
||||
client,
|
||||
remoteUserId: params.userId,
|
||||
encrypted: account.config.encryption === true,
|
||||
});
|
||||
return {
|
||||
accountId: params.accountId,
|
||||
remoteUserId: repaired.remoteUserId,
|
||||
selfUserId: repaired.selfUserId,
|
||||
mappedRoomIds: repaired.mappedRoomIds,
|
||||
mappedRooms: repaired.mappedRooms.map(toCliDirectRoomCandidate),
|
||||
discoveredStrictRoomIds: repaired.discoveredStrictRoomIds,
|
||||
activeRoomId: repaired.activeRoomId,
|
||||
encrypted: account.config.encryption === true,
|
||||
createdRoomId: repaired.createdRoomId,
|
||||
changed: repaired.changed,
|
||||
directContentBefore: repaired.directContentBefore,
|
||||
directContentAfter: repaired.directContentAfter,
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
type MatrixCliProfileSetResult = MatrixProfileUpdateResult;
|
||||
|
||||
async function setMatrixProfile(params: {
|
||||
@@ -386,6 +473,40 @@ type MatrixCliVerificationStatus = {
|
||||
pendingVerifications: number;
|
||||
};
|
||||
|
||||
type MatrixCliDirectRoomCandidate = {
|
||||
roomId: string;
|
||||
source: "account-data" | "joined";
|
||||
strict: boolean;
|
||||
joinedMembers: string[] | null;
|
||||
};
|
||||
|
||||
type MatrixCliDirectRoomInspection = {
|
||||
accountId: string;
|
||||
remoteUserId: string;
|
||||
selfUserId: string | null;
|
||||
mappedRoomIds: string[];
|
||||
mappedRooms: MatrixCliDirectRoomCandidate[];
|
||||
discoveredStrictRoomIds: string[];
|
||||
activeRoomId: string | null;
|
||||
};
|
||||
|
||||
type MatrixCliDirectRoomRepair = MatrixCliDirectRoomInspection & {
|
||||
encrypted: boolean;
|
||||
createdRoomId: string | null;
|
||||
changed: boolean;
|
||||
directContentBefore: Record<string, string[]>;
|
||||
directContentAfter: Record<string, string[]>;
|
||||
};
|
||||
|
||||
function toCliDirectRoomCandidate(room: MatrixDirectRoomCandidate): MatrixCliDirectRoomCandidate {
|
||||
return {
|
||||
roomId: room.roomId,
|
||||
source: room.source,
|
||||
strict: room.strict,
|
||||
joinedMembers: room.joinedMembers,
|
||||
};
|
||||
}
|
||||
|
||||
function resolveBackupStatus(status: {
|
||||
backupVersion: string | null;
|
||||
backup?: MatrixCliBackupStatus;
|
||||
@@ -706,6 +827,71 @@ export function registerMatrixCli(params: { program: Command }): void {
|
||||
},
|
||||
);
|
||||
|
||||
const direct = root.command("direct").description("Inspect and repair Matrix direct-room state");
|
||||
|
||||
direct
|
||||
.command("inspect")
|
||||
.description("Inspect direct-room mappings for a Matrix user")
|
||||
.requiredOption("--user-id <id>", "Peer Matrix user ID")
|
||||
.option("--account <id>", "Account ID (for multi-account setups)")
|
||||
.option("--verbose", "Show detailed diagnostics")
|
||||
.option("--json", "Output as JSON")
|
||||
.action(
|
||||
async (options: { userId: string; account?: string; verbose?: boolean; json?: boolean }) => {
|
||||
const accountId = resolveMatrixCliAccountId(options.account);
|
||||
await runMatrixCliCommand({
|
||||
verbose: options.verbose === true,
|
||||
json: options.json === true,
|
||||
run: async () =>
|
||||
await inspectMatrixDirectRoom({
|
||||
accountId,
|
||||
userId: options.userId,
|
||||
}),
|
||||
onText: (result) => {
|
||||
printDirectRoomInspection(result);
|
||||
},
|
||||
errorPrefix: "Direct room inspection failed",
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
direct
|
||||
.command("repair")
|
||||
.description("Repair Matrix direct-room mappings for a Matrix user")
|
||||
.requiredOption("--user-id <id>", "Peer Matrix user ID")
|
||||
.option("--account <id>", "Account ID (for multi-account setups)")
|
||||
.option("--verbose", "Show detailed diagnostics")
|
||||
.option("--json", "Output as JSON")
|
||||
.action(
|
||||
async (options: { userId: string; account?: string; verbose?: boolean; json?: boolean }) => {
|
||||
const accountId = resolveMatrixCliAccountId(options.account);
|
||||
await runMatrixCliCommand({
|
||||
verbose: options.verbose === true,
|
||||
json: options.json === true,
|
||||
run: async () =>
|
||||
await repairMatrixDirectRoom({
|
||||
accountId,
|
||||
userId: options.userId,
|
||||
}),
|
||||
onText: (result, verbose) => {
|
||||
printDirectRoomInspection(result);
|
||||
console.log(`Encrypted room creation: ${result.encrypted ? "enabled" : "disabled"}`);
|
||||
console.log(`Created room: ${result.createdRoomId ?? "none"}`);
|
||||
console.log(`m.direct updated: ${result.changed ? "yes" : "no"}`);
|
||||
if (verbose) {
|
||||
console.log(
|
||||
`m.direct before: ${JSON.stringify(result.directContentBefore[result.remoteUserId] ?? [])}`,
|
||||
);
|
||||
console.log(
|
||||
`m.direct after: ${JSON.stringify(result.directContentAfter[result.remoteUserId] ?? [])}`,
|
||||
);
|
||||
}
|
||||
},
|
||||
errorPrefix: "Direct room repair failed",
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
const verify = root.command("verify").description("Device verification for Matrix E2EE");
|
||||
|
||||
verify
|
||||
|
||||
@@ -12,7 +12,7 @@ const {
|
||||
getMatrixRuntimeMock,
|
||||
getActiveMatrixClientMock,
|
||||
resolveSharedMatrixClientMock,
|
||||
stopSharedClientForAccountMock,
|
||||
stopSharedClientInstanceMock,
|
||||
isBunRuntimeMock,
|
||||
resolveMatrixAuthContextMock,
|
||||
} = matrixClientResolverMocks;
|
||||
@@ -32,7 +32,7 @@ vi.mock("../client.js", () => ({
|
||||
}));
|
||||
|
||||
vi.mock("../client/shared.js", () => ({
|
||||
stopSharedClientForAccount: (...args: unknown[]) => stopSharedClientForAccountMock(...args),
|
||||
stopSharedClientInstance: (...args: unknown[]) => stopSharedClientInstanceMock(...args),
|
||||
}));
|
||||
|
||||
vi.mock("../send.js", () => ({
|
||||
@@ -74,9 +74,7 @@ describe("action client helpers", () => {
|
||||
const sharedClient = await resolveSharedMatrixClientMock.mock.results[0]?.value;
|
||||
expect(sharedClient.prepareForOneOff).toHaveBeenCalledTimes(1);
|
||||
expect(sharedClient.stop).toHaveBeenCalledTimes(1);
|
||||
expect(stopSharedClientForAccountMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ userId: "@bot:example.org" }),
|
||||
);
|
||||
expect(stopSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient);
|
||||
expect(result).toBe("ok");
|
||||
});
|
||||
|
||||
@@ -97,9 +95,7 @@ describe("action client helpers", () => {
|
||||
expect(sharedClient.prepareForOneOff).not.toHaveBeenCalled();
|
||||
expect(sharedClient.stop).not.toHaveBeenCalled();
|
||||
expect(sharedClient.stopAndPersist).toHaveBeenCalledTimes(1);
|
||||
expect(stopSharedClientForAccountMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ userId: "@bot:example.org" }),
|
||||
);
|
||||
expect(stopSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient);
|
||||
});
|
||||
|
||||
it("reuses active monitor client when available", async () => {
|
||||
@@ -201,9 +197,7 @@ describe("action client helpers", () => {
|
||||
expect(result).toBe("ok");
|
||||
expect(sharedClient.stop).toHaveBeenCalledTimes(1);
|
||||
expect(sharedClient.stopAndPersist).not.toHaveBeenCalled();
|
||||
expect(stopSharedClientForAccountMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ userId: "@bot:example.org" }),
|
||||
);
|
||||
expect(stopSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient);
|
||||
});
|
||||
|
||||
it("stops shared action clients when the wrapped call throws", async () => {
|
||||
|
||||
@@ -2,7 +2,7 @@ import { getMatrixRuntime } from "../runtime.js";
|
||||
import type { CoreConfig } from "../types.js";
|
||||
import { getActiveMatrixClient } from "./active-client.js";
|
||||
import { isBunRuntime, resolveMatrixAuthContext, resolveSharedMatrixClient } from "./client.js";
|
||||
import { stopSharedClientForAccount } from "./client/shared.js";
|
||||
import { stopSharedClientInstance } from "./client/shared.js";
|
||||
import type { MatrixClient } from "./sdk.js";
|
||||
|
||||
type ResolvedRuntimeMatrixClient = {
|
||||
@@ -78,7 +78,7 @@ async function resolveRuntimeMatrixClient(opts: {
|
||||
} else {
|
||||
client.stop();
|
||||
}
|
||||
stopSharedClientForAccount(authContext.resolved);
|
||||
stopSharedClientInstance(client);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ type MatrixClientResolverMocks = {
|
||||
getMatrixRuntimeMock: Mock<() => unknown>;
|
||||
getActiveMatrixClientMock: Mock<(...args: unknown[]) => MatrixClient | null>;
|
||||
resolveSharedMatrixClientMock: Mock<(...args: unknown[]) => Promise<MatrixClient>>;
|
||||
stopSharedClientForAccountMock: Mock<(...args: unknown[]) => void>;
|
||||
stopSharedClientInstanceMock: Mock<(...args: unknown[]) => void>;
|
||||
isBunRuntimeMock: Mock<() => boolean>;
|
||||
resolveMatrixAuthContextMock: Mock<
|
||||
(params: { cfg: unknown; accountId?: string | null }) => unknown
|
||||
@@ -18,7 +18,7 @@ export const matrixClientResolverMocks: MatrixClientResolverMocks = {
|
||||
getMatrixRuntimeMock: vi.fn(),
|
||||
getActiveMatrixClientMock: vi.fn(),
|
||||
resolveSharedMatrixClientMock: vi.fn(),
|
||||
stopSharedClientForAccountMock: vi.fn(),
|
||||
stopSharedClientInstanceMock: vi.fn(),
|
||||
isBunRuntimeMock: vi.fn(() => false),
|
||||
resolveMatrixAuthContextMock: vi.fn(),
|
||||
};
|
||||
@@ -44,7 +44,7 @@ export function primeMatrixClientResolverMocks(params?: {
|
||||
getMatrixRuntimeMock,
|
||||
getActiveMatrixClientMock,
|
||||
resolveSharedMatrixClientMock,
|
||||
stopSharedClientForAccountMock,
|
||||
stopSharedClientInstanceMock,
|
||||
isBunRuntimeMock,
|
||||
resolveMatrixAuthContextMock,
|
||||
} = matrixClientResolverMocks;
|
||||
@@ -70,7 +70,7 @@ export function primeMatrixClientResolverMocks(params?: {
|
||||
});
|
||||
getActiveMatrixClientMock.mockReturnValue(null);
|
||||
isBunRuntimeMock.mockReturnValue(false);
|
||||
stopSharedClientForAccountMock.mockReset();
|
||||
stopSharedClientInstanceMock.mockReset();
|
||||
resolveMatrixAuthContextMock.mockImplementation(
|
||||
({
|
||||
cfg: explicitCfg,
|
||||
|
||||
@@ -10,4 +10,8 @@ export {
|
||||
validateMatrixHomeserverUrl,
|
||||
} from "./client/config.js";
|
||||
export { createMatrixClient } from "./client/create-client.js";
|
||||
export { resolveSharedMatrixClient, stopSharedClientForAccount } from "./client/shared.js";
|
||||
export {
|
||||
resolveSharedMatrixClient,
|
||||
stopSharedClientForAccount,
|
||||
stopSharedClientInstance,
|
||||
} from "./client/shared.js";
|
||||
|
||||
@@ -18,6 +18,7 @@ import {
|
||||
resolveSharedMatrixClient,
|
||||
stopSharedClient,
|
||||
stopSharedClientForAccount,
|
||||
stopSharedClientInstance,
|
||||
} from "./shared.js";
|
||||
|
||||
function authFor(accountId: string): MatrixAuth {
|
||||
@@ -122,6 +123,26 @@ describe("resolveSharedMatrixClient", () => {
|
||||
expect(poeClient.stop).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("drops stopped shared clients by instance so the next resolve recreates them", async () => {
|
||||
const mainAuth = authFor("main");
|
||||
const firstMainClient = createMockClient("main-first");
|
||||
const secondMainClient = createMockClient("main-second");
|
||||
|
||||
resolveMatrixAuthMock.mockResolvedValue(mainAuth);
|
||||
createMatrixClientMock
|
||||
.mockResolvedValueOnce(firstMainClient)
|
||||
.mockResolvedValueOnce(secondMainClient);
|
||||
|
||||
const first = await resolveSharedMatrixClient({ accountId: "main", startClient: false });
|
||||
stopSharedClientInstance(first as unknown as import("../sdk.js").MatrixClient);
|
||||
const second = await resolveSharedMatrixClient({ accountId: "main", startClient: false });
|
||||
|
||||
expect(first).toBe(firstMainClient);
|
||||
expect(second).toBe(secondMainClient);
|
||||
expect(firstMainClient.stop).toHaveBeenCalledTimes(1);
|
||||
expect(createMatrixClientMock).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("reuses the effective implicit account instead of keying it as default", async () => {
|
||||
const poeAuth = authFor("ops");
|
||||
const poeClient = createMockClient("ops");
|
||||
|
||||
@@ -193,3 +193,15 @@ export function stopSharedClientForAccount(auth: MatrixAuth): void {
|
||||
sharedClientStates.delete(key);
|
||||
sharedClientPromises.delete(key);
|
||||
}
|
||||
|
||||
export function stopSharedClientInstance(client: MatrixClient): void {
|
||||
for (const [key, state] of sharedClientStates.entries()) {
|
||||
if (state.client !== client) {
|
||||
continue;
|
||||
}
|
||||
state.client.stop();
|
||||
sharedClientStates.delete(key);
|
||||
sharedClientPromises.delete(key);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
139
extensions/matrix/src/matrix/direct-management.test.ts
Normal file
139
extensions/matrix/src/matrix/direct-management.test.ts
Normal file
@@ -0,0 +1,139 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { inspectMatrixDirectRooms, repairMatrixDirectRooms } from "./direct-management.js";
|
||||
import type { MatrixClient } from "./sdk.js";
|
||||
import { EventType } from "./send/types.js";
|
||||
|
||||
function createClient(overrides: Partial<MatrixClient> = {}): MatrixClient {
|
||||
return {
|
||||
getUserId: vi.fn(async () => "@bot:example.org"),
|
||||
getAccountData: vi.fn(async () => undefined),
|
||||
getJoinedRooms: vi.fn(async () => [] as string[]),
|
||||
getJoinedRoomMembers: vi.fn(async () => [] as string[]),
|
||||
setAccountData: vi.fn(async () => undefined),
|
||||
createDirectRoom: vi.fn(async () => "!created:example.org"),
|
||||
...overrides,
|
||||
} as unknown as MatrixClient;
|
||||
}
|
||||
|
||||
describe("inspectMatrixDirectRooms", () => {
|
||||
it("prefers strict mapped rooms over discovered rooms", async () => {
|
||||
const client = createClient({
|
||||
getAccountData: vi.fn(async () => ({
|
||||
"@alice:example.org": ["!dm:example.org", "!shared:example.org"],
|
||||
})),
|
||||
getJoinedRooms: vi.fn(async () => ["!dm:example.org", "!shared:example.org"]),
|
||||
getJoinedRoomMembers: vi.fn(async (roomId: string) =>
|
||||
roomId === "!dm:example.org"
|
||||
? ["@bot:example.org", "@alice:example.org"]
|
||||
: ["@bot:example.org", "@alice:example.org", "@mallory:example.org"],
|
||||
),
|
||||
});
|
||||
|
||||
const result = await inspectMatrixDirectRooms({
|
||||
client,
|
||||
remoteUserId: "@alice:example.org",
|
||||
});
|
||||
|
||||
expect(result.activeRoomId).toBe("!dm:example.org");
|
||||
expect(result.mappedRooms).toEqual([
|
||||
expect.objectContaining({ roomId: "!dm:example.org", strict: true }),
|
||||
expect.objectContaining({ roomId: "!shared:example.org", strict: false }),
|
||||
]);
|
||||
});
|
||||
|
||||
it("falls back to discovered strict joined rooms when m.direct is stale", async () => {
|
||||
const client = createClient({
|
||||
getAccountData: vi.fn(async () => ({
|
||||
"@alice:example.org": ["!stale:example.org"],
|
||||
})),
|
||||
getJoinedRooms: vi.fn(async () => ["!stale:example.org", "!fresh:example.org"]),
|
||||
getJoinedRoomMembers: vi.fn(async (roomId: string) =>
|
||||
roomId === "!fresh:example.org"
|
||||
? ["@bot:example.org", "@alice:example.org"]
|
||||
: ["@bot:example.org", "@alice:example.org", "@mallory:example.org"],
|
||||
),
|
||||
});
|
||||
|
||||
const result = await inspectMatrixDirectRooms({
|
||||
client,
|
||||
remoteUserId: "@alice:example.org",
|
||||
});
|
||||
|
||||
expect(result.activeRoomId).toBe("!fresh:example.org");
|
||||
expect(result.discoveredStrictRoomIds).toEqual(["!fresh:example.org"]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("repairMatrixDirectRooms", () => {
|
||||
it("repoints m.direct to an existing strict joined room", async () => {
|
||||
const setAccountData = vi.fn(async () => undefined);
|
||||
const client = createClient({
|
||||
getAccountData: vi.fn(async () => ({
|
||||
"@alice:example.org": ["!stale:example.org"],
|
||||
})),
|
||||
getJoinedRooms: vi.fn(async () => ["!stale:example.org", "!fresh:example.org"]),
|
||||
getJoinedRoomMembers: vi.fn(async (roomId: string) =>
|
||||
roomId === "!fresh:example.org"
|
||||
? ["@bot:example.org", "@alice:example.org"]
|
||||
: ["@bot:example.org", "@alice:example.org", "@mallory:example.org"],
|
||||
),
|
||||
setAccountData,
|
||||
});
|
||||
|
||||
const result = await repairMatrixDirectRooms({
|
||||
client,
|
||||
remoteUserId: "@alice:example.org",
|
||||
encrypted: true,
|
||||
});
|
||||
|
||||
expect(result.activeRoomId).toBe("!fresh:example.org");
|
||||
expect(result.createdRoomId).toBeNull();
|
||||
expect(setAccountData).toHaveBeenCalledWith(
|
||||
EventType.Direct,
|
||||
expect.objectContaining({
|
||||
"@alice:example.org": ["!fresh:example.org", "!stale:example.org"],
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("creates a fresh direct room when no healthy DM exists", async () => {
|
||||
const createDirectRoom = vi.fn(async () => "!created:example.org");
|
||||
const setAccountData = vi.fn(async () => undefined);
|
||||
const client = createClient({
|
||||
getJoinedRooms: vi.fn(async () => ["!shared:example.org"]),
|
||||
getJoinedRoomMembers: vi.fn(async () => [
|
||||
"@bot:example.org",
|
||||
"@alice:example.org",
|
||||
"@mallory:example.org",
|
||||
]),
|
||||
createDirectRoom,
|
||||
setAccountData,
|
||||
});
|
||||
|
||||
const result = await repairMatrixDirectRooms({
|
||||
client,
|
||||
remoteUserId: "@alice:example.org",
|
||||
encrypted: true,
|
||||
});
|
||||
|
||||
expect(createDirectRoom).toHaveBeenCalledWith("@alice:example.org", { encrypted: true });
|
||||
expect(result.createdRoomId).toBe("!created:example.org");
|
||||
expect(setAccountData).toHaveBeenCalledWith(
|
||||
EventType.Direct,
|
||||
expect.objectContaining({
|
||||
"@alice:example.org": ["!created:example.org"],
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("rejects unqualified Matrix user ids", async () => {
|
||||
const client = createClient();
|
||||
|
||||
await expect(
|
||||
repairMatrixDirectRooms({
|
||||
client,
|
||||
remoteUserId: "alice",
|
||||
}),
|
||||
).rejects.toThrow('Matrix user IDs must be fully qualified (got "alice")');
|
||||
});
|
||||
});
|
||||
211
extensions/matrix/src/matrix/direct-management.ts
Normal file
211
extensions/matrix/src/matrix/direct-management.ts
Normal file
@@ -0,0 +1,211 @@
|
||||
import {
|
||||
isStrictDirectMembership,
|
||||
isStrictDirectRoom,
|
||||
readJoinedMatrixMembers,
|
||||
} from "./direct-room.js";
|
||||
import type { MatrixClient } from "./sdk.js";
|
||||
import { EventType, type MatrixDirectAccountData } from "./send/types.js";
|
||||
import { isMatrixQualifiedUserId } from "./target-ids.js";
|
||||
|
||||
export type MatrixDirectRoomCandidate = {
|
||||
roomId: string;
|
||||
joinedMembers: string[] | null;
|
||||
strict: boolean;
|
||||
source: "account-data" | "joined";
|
||||
};
|
||||
|
||||
export type MatrixDirectRoomInspection = {
|
||||
selfUserId: string | null;
|
||||
remoteUserId: string;
|
||||
mappedRoomIds: string[];
|
||||
mappedRooms: MatrixDirectRoomCandidate[];
|
||||
discoveredStrictRoomIds: string[];
|
||||
activeRoomId: string | null;
|
||||
};
|
||||
|
||||
export type MatrixDirectRoomRepairResult = MatrixDirectRoomInspection & {
|
||||
createdRoomId: string | null;
|
||||
changed: boolean;
|
||||
directContentBefore: MatrixDirectAccountData;
|
||||
directContentAfter: MatrixDirectAccountData;
|
||||
};
|
||||
|
||||
async function readMatrixDirectAccountData(client: MatrixClient): Promise<MatrixDirectAccountData> {
|
||||
try {
|
||||
const direct = (await client.getAccountData(EventType.Direct)) as MatrixDirectAccountData;
|
||||
return direct && typeof direct === "object" && !Array.isArray(direct) ? direct : {};
|
||||
} catch {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
function normalizeRemoteUserId(remoteUserId: string): string {
|
||||
const normalized = remoteUserId.trim();
|
||||
if (!isMatrixQualifiedUserId(normalized)) {
|
||||
throw new Error(`Matrix user IDs must be fully qualified (got "${remoteUserId}")`);
|
||||
}
|
||||
return normalized;
|
||||
}
|
||||
|
||||
function normalizeMappedRoomIds(direct: MatrixDirectAccountData, remoteUserId: string): string[] {
|
||||
const current = direct[remoteUserId];
|
||||
if (!Array.isArray(current)) {
|
||||
return [];
|
||||
}
|
||||
const seen = new Set<string>();
|
||||
const normalized: string[] = [];
|
||||
for (const value of current) {
|
||||
const roomId = typeof value === "string" ? value.trim() : "";
|
||||
if (!roomId || seen.has(roomId)) {
|
||||
continue;
|
||||
}
|
||||
seen.add(roomId);
|
||||
normalized.push(roomId);
|
||||
}
|
||||
return normalized;
|
||||
}
|
||||
|
||||
function normalizeRoomIdList(values: readonly string[]): string[] {
|
||||
const seen = new Set<string>();
|
||||
const normalized: string[] = [];
|
||||
for (const value of values) {
|
||||
const roomId = value.trim();
|
||||
if (!roomId || seen.has(roomId)) {
|
||||
continue;
|
||||
}
|
||||
seen.add(roomId);
|
||||
normalized.push(roomId);
|
||||
}
|
||||
return normalized;
|
||||
}
|
||||
|
||||
async function classifyDirectRoomCandidate(params: {
|
||||
client: MatrixClient;
|
||||
roomId: string;
|
||||
remoteUserId: string;
|
||||
selfUserId: string | null;
|
||||
source: "account-data" | "joined";
|
||||
}): Promise<MatrixDirectRoomCandidate> {
|
||||
const joinedMembers = await readJoinedMatrixMembers(params.client, params.roomId);
|
||||
return {
|
||||
roomId: params.roomId,
|
||||
joinedMembers,
|
||||
strict:
|
||||
joinedMembers !== null &&
|
||||
isStrictDirectMembership({
|
||||
selfUserId: params.selfUserId,
|
||||
remoteUserId: params.remoteUserId,
|
||||
joinedMembers,
|
||||
}),
|
||||
source: params.source,
|
||||
};
|
||||
}
|
||||
|
||||
function buildNextDirectContent(params: {
|
||||
directContent: MatrixDirectAccountData;
|
||||
remoteUserId: string;
|
||||
roomId: string;
|
||||
}): MatrixDirectAccountData {
|
||||
const current = normalizeMappedRoomIds(params.directContent, params.remoteUserId);
|
||||
const nextRooms = normalizeRoomIdList([params.roomId, ...current]);
|
||||
return {
|
||||
...params.directContent,
|
||||
[params.remoteUserId]: nextRooms,
|
||||
};
|
||||
}
|
||||
|
||||
export async function inspectMatrixDirectRooms(params: {
|
||||
client: MatrixClient;
|
||||
remoteUserId: string;
|
||||
}): Promise<MatrixDirectRoomInspection> {
|
||||
const remoteUserId = normalizeRemoteUserId(params.remoteUserId);
|
||||
const selfUserId = (await params.client.getUserId().catch(() => null))?.trim() || null;
|
||||
const directContent = await readMatrixDirectAccountData(params.client);
|
||||
const mappedRoomIds = normalizeMappedRoomIds(directContent, remoteUserId);
|
||||
const mappedRooms = await Promise.all(
|
||||
mappedRoomIds.map(
|
||||
async (roomId) =>
|
||||
await classifyDirectRoomCandidate({
|
||||
client: params.client,
|
||||
roomId,
|
||||
remoteUserId,
|
||||
selfUserId,
|
||||
source: "account-data",
|
||||
}),
|
||||
),
|
||||
);
|
||||
const mappedStrict = mappedRooms.find((room) => room.strict);
|
||||
|
||||
let joinedRooms: string[] = [];
|
||||
if (!mappedStrict && typeof params.client.getJoinedRooms === "function") {
|
||||
try {
|
||||
const resolved = await params.client.getJoinedRooms();
|
||||
joinedRooms = Array.isArray(resolved) ? resolved : [];
|
||||
} catch {
|
||||
joinedRooms = [];
|
||||
}
|
||||
}
|
||||
const discoveredStrictRoomIds: string[] = [];
|
||||
for (const roomId of normalizeRoomIdList(joinedRooms)) {
|
||||
if (mappedRoomIds.includes(roomId)) {
|
||||
continue;
|
||||
}
|
||||
if (
|
||||
await isStrictDirectRoom({
|
||||
client: params.client,
|
||||
roomId,
|
||||
remoteUserId,
|
||||
selfUserId,
|
||||
})
|
||||
) {
|
||||
discoveredStrictRoomIds.push(roomId);
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
selfUserId,
|
||||
remoteUserId,
|
||||
mappedRoomIds,
|
||||
mappedRooms,
|
||||
discoveredStrictRoomIds,
|
||||
activeRoomId: mappedStrict?.roomId ?? discoveredStrictRoomIds[0] ?? null,
|
||||
};
|
||||
}
|
||||
|
||||
export async function repairMatrixDirectRooms(params: {
|
||||
client: MatrixClient;
|
||||
remoteUserId: string;
|
||||
encrypted?: boolean;
|
||||
}): Promise<MatrixDirectRoomRepairResult> {
|
||||
const remoteUserId = normalizeRemoteUserId(params.remoteUserId);
|
||||
const directContentBefore = await readMatrixDirectAccountData(params.client);
|
||||
const inspected = await inspectMatrixDirectRooms({
|
||||
client: params.client,
|
||||
remoteUserId,
|
||||
});
|
||||
const activeRoomId =
|
||||
inspected.activeRoomId ??
|
||||
(await params.client.createDirectRoom(remoteUserId, {
|
||||
encrypted: params.encrypted === true,
|
||||
}));
|
||||
const createdRoomId = inspected.activeRoomId ? null : activeRoomId;
|
||||
const directContentAfter = buildNextDirectContent({
|
||||
directContent: directContentBefore,
|
||||
remoteUserId,
|
||||
roomId: activeRoomId,
|
||||
});
|
||||
const changed =
|
||||
JSON.stringify(directContentAfter[remoteUserId] ?? []) !==
|
||||
JSON.stringify(directContentBefore[remoteUserId] ?? []);
|
||||
if (changed) {
|
||||
await params.client.setAccountData(EventType.Direct, directContentAfter);
|
||||
}
|
||||
return {
|
||||
...inspected,
|
||||
activeRoomId,
|
||||
createdRoomId,
|
||||
changed,
|
||||
directContentBefore,
|
||||
directContentAfter,
|
||||
};
|
||||
}
|
||||
@@ -14,7 +14,7 @@ const hoisted = vi.hoisted(() => {
|
||||
debug: vi.fn(),
|
||||
};
|
||||
const stopThreadBindingManager = vi.fn();
|
||||
const stopSharedClientForAccount = vi.fn();
|
||||
const stopSharedClientInstance = vi.fn();
|
||||
const setActiveMatrixClient = vi.fn();
|
||||
return {
|
||||
callOrder,
|
||||
@@ -23,7 +23,7 @@ const hoisted = vi.hoisted(() => {
|
||||
resolveTextChunkLimit,
|
||||
setActiveMatrixClient,
|
||||
startClientError,
|
||||
stopSharedClientForAccount,
|
||||
stopSharedClientInstance,
|
||||
stopThreadBindingManager,
|
||||
};
|
||||
});
|
||||
@@ -123,7 +123,7 @@ vi.mock("../client.js", () => ({
|
||||
hoisted.callOrder.push("start-client");
|
||||
return hoisted.client;
|
||||
}),
|
||||
stopSharedClientForAccount: hoisted.stopSharedClientForAccount,
|
||||
stopSharedClientInstance: hoisted.stopSharedClientInstance,
|
||||
}));
|
||||
|
||||
vi.mock("../config-update.js", () => ({
|
||||
@@ -203,7 +203,7 @@ describe("monitorMatrixProvider", () => {
|
||||
hoisted.startClientError = null;
|
||||
hoisted.resolveTextChunkLimit.mockReset().mockReturnValue(4000);
|
||||
hoisted.setActiveMatrixClient.mockReset();
|
||||
hoisted.stopSharedClientForAccount.mockReset();
|
||||
hoisted.stopSharedClientInstance.mockReset();
|
||||
hoisted.stopThreadBindingManager.mockReset();
|
||||
Object.values(hoisted.logger).forEach((mock) => mock.mockReset());
|
||||
});
|
||||
@@ -245,7 +245,7 @@ describe("monitorMatrixProvider", () => {
|
||||
await expect(monitorMatrixProvider()).rejects.toThrow("start failed");
|
||||
|
||||
expect(hoisted.stopThreadBindingManager).toHaveBeenCalledTimes(1);
|
||||
expect(hoisted.stopSharedClientForAccount).toHaveBeenCalledTimes(1);
|
||||
expect(hoisted.stopSharedClientInstance).toHaveBeenCalledTimes(1);
|
||||
expect(hoisted.setActiveMatrixClient).toHaveBeenNthCalledWith(1, hoisted.client, "default");
|
||||
expect(hoisted.setActiveMatrixClient).toHaveBeenNthCalledWith(2, null, "default");
|
||||
});
|
||||
|
||||
@@ -17,7 +17,7 @@ import {
|
||||
resolveMatrixAuth,
|
||||
resolveMatrixAuthContext,
|
||||
resolveSharedMatrixClient,
|
||||
stopSharedClientForAccount,
|
||||
stopSharedClientInstance,
|
||||
} from "../client.js";
|
||||
import { createMatrixThreadBindingManager } from "../thread-bindings.js";
|
||||
import { registerMatrixAutoJoin } from "./auto-join.js";
|
||||
@@ -139,7 +139,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
try {
|
||||
threadBindingManager?.stop();
|
||||
} finally {
|
||||
stopSharedClientForAccount(auth);
|
||||
stopSharedClientInstance(client);
|
||||
setActiveMatrixClient(null, auth.accountId);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -501,6 +501,30 @@ export class MatrixClient {
|
||||
}
|
||||
}
|
||||
|
||||
async createDirectRoom(
|
||||
remoteUserId: string,
|
||||
opts: { encrypted?: boolean } = {},
|
||||
): Promise<string> {
|
||||
const initialState = opts.encrypted
|
||||
? [
|
||||
{
|
||||
type: "m.room.encryption",
|
||||
state_key: "",
|
||||
content: {
|
||||
algorithm: "m.megolm.v1.aes-sha2",
|
||||
},
|
||||
},
|
||||
]
|
||||
: undefined;
|
||||
const result = await this.client.createRoom({
|
||||
invite: [remoteUserId],
|
||||
is_direct: true,
|
||||
preset: "trusted_private_chat",
|
||||
initial_state: initialState,
|
||||
});
|
||||
return result.room_id;
|
||||
}
|
||||
|
||||
async sendMessage(roomId: string, content: MessageEventContent): Promise<string> {
|
||||
const sent = await this.client.sendMessage(roomId, content as never);
|
||||
return sent.event_id;
|
||||
|
||||
@@ -9,7 +9,7 @@ const {
|
||||
getMatrixRuntimeMock,
|
||||
getActiveMatrixClientMock,
|
||||
resolveSharedMatrixClientMock,
|
||||
stopSharedClientForAccountMock,
|
||||
stopSharedClientInstanceMock,
|
||||
isBunRuntimeMock,
|
||||
resolveMatrixAuthContextMock,
|
||||
} = matrixClientResolverMocks;
|
||||
@@ -25,7 +25,7 @@ vi.mock("../client.js", () => ({
|
||||
}));
|
||||
|
||||
vi.mock("../client/shared.js", () => ({
|
||||
stopSharedClientForAccount: (...args: unknown[]) => stopSharedClientForAccountMock(...args),
|
||||
stopSharedClientInstance: (...args: unknown[]) => stopSharedClientInstanceMock(...args),
|
||||
}));
|
||||
|
||||
vi.mock("../../runtime.js", () => ({
|
||||
@@ -63,9 +63,7 @@ describe("withResolvedMatrixClient", () => {
|
||||
const sharedClient = await resolveSharedMatrixClientMock.mock.results[0]?.value;
|
||||
expect(sharedClient.prepareForOneOff).toHaveBeenCalledTimes(1);
|
||||
expect(sharedClient.stop).toHaveBeenCalledTimes(1);
|
||||
expect(stopSharedClientForAccountMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ userId: "@bot:example.org" }),
|
||||
);
|
||||
expect(stopSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient);
|
||||
expect(result).toBe("ok");
|
||||
});
|
||||
|
||||
@@ -134,8 +132,6 @@ describe("withResolvedMatrixClient", () => {
|
||||
).rejects.toThrow("boom");
|
||||
|
||||
expect(sharedClient.stop).toHaveBeenCalledTimes(1);
|
||||
expect(stopSharedClientForAccountMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ userId: "@bot:example.org" }),
|
||||
);
|
||||
expect(stopSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { inspectMatrixDirectRooms } from "../direct-management.js";
|
||||
import { isStrictDirectRoom } from "../direct-room.js";
|
||||
import type { MatrixClient } from "../sdk.js";
|
||||
import { isMatrixQualifiedUserId, normalizeMatrixResolvableTarget } from "../target-ids.js";
|
||||
@@ -92,36 +93,16 @@ async function resolveDirectRoomId(client: MatrixClient, userId: string): Promis
|
||||
directRoomCache.delete(trimmed);
|
||||
}
|
||||
|
||||
// 1) Fast path: use account data (m.direct) for *this* logged-in user (the bot).
|
||||
try {
|
||||
const directContent = (await client.getAccountData(EventType.Direct)) as Record<
|
||||
string,
|
||||
string[] | undefined
|
||||
>;
|
||||
const list = Array.isArray(directContent?.[trimmed]) ? directContent[trimmed] : [];
|
||||
for (const roomId of list) {
|
||||
if (await isStrictDirectRoom({ client, roomId, remoteUserId: trimmed, selfUserId })) {
|
||||
setDirectRoomCached(client, trimmed, roomId);
|
||||
return roomId;
|
||||
}
|
||||
const inspection = await inspectMatrixDirectRooms({
|
||||
client,
|
||||
remoteUserId: trimmed,
|
||||
});
|
||||
if (inspection.activeRoomId) {
|
||||
setDirectRoomCached(client, trimmed, inspection.activeRoomId);
|
||||
if (inspection.mappedRoomIds[0] !== inspection.activeRoomId) {
|
||||
await persistDirectRoom(client, trimmed, inspection.activeRoomId);
|
||||
}
|
||||
} catch {
|
||||
// Ignore and fall back.
|
||||
}
|
||||
|
||||
// 2) Fallback: look for an existing joined room that is actually a 1:1 with the user.
|
||||
// Many clients only maintain m.direct for *their own* account data, so relying on it is brittle.
|
||||
try {
|
||||
const rooms = await client.getJoinedRooms();
|
||||
for (const roomId of rooms) {
|
||||
if (await isStrictDirectRoom({ client, roomId, remoteUserId: trimmed, selfUserId })) {
|
||||
setDirectRoomCached(client, trimmed, roomId);
|
||||
await persistDirectRoom(client, trimmed, roomId);
|
||||
return roomId;
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Ignore and fall back.
|
||||
return inspection.activeRoomId;
|
||||
}
|
||||
|
||||
throw new Error(`No direct room found for ${trimmed} (m.direct missing)`);
|
||||
|
||||
Reference in New Issue
Block a user