diff --git a/docs/channels/matrix.md b/docs/channels/matrix.md index cbd09faca48..4d9d0fa0e4f 100644 --- a/docs/channels/matrix.md +++ b/docs/channels/matrix.md @@ -403,6 +403,28 @@ Remove stale OpenClaw-managed devices with: openclaw matrix devices prune-stale ``` +### Direct Room Repair + +If direct-message state gets out of sync, OpenClaw can end up with stale `m.direct` mappings that point at old solo rooms instead of the live DM. Inspect the current mapping for a peer with: + +```bash +openclaw matrix direct inspect --user-id @alice:example.org +``` + +Repair it with: + +```bash +openclaw matrix direct repair --user-id @alice:example.org +``` + +Repair keeps the Matrix-specific logic inside the plugin: + +- it prefers a strict 1:1 DM that is already mapped in `m.direct` +- otherwise it falls back to any currently joined strict 1:1 DM with that user +- if no healthy DM exists, it creates a fresh direct room and rewrites `m.direct` to point at it + +The repair flow does not delete old rooms automatically. It only picks the healthy DM and updates the mapping so new Matrix sends, verification notices, and other direct-message flows target the right room again. + ## Threads Matrix supports native Matrix threads for both automatic replies and message-tool sends. diff --git a/extensions/matrix/src/cli.ts b/extensions/matrix/src/cli.ts index 101ae091ecf..ac81b4a5efd 100644 --- a/extensions/matrix/src/cli.ts +++ b/extensions/matrix/src/cli.ts @@ -6,6 +6,7 @@ import { } from "openclaw/plugin-sdk/matrix"; import { matrixPlugin } from "./channel.js"; import { resolveMatrixAccount, resolveMatrixAccountConfig } from "./matrix/accounts.js"; +import { withResolvedActionClient, withStartedActionClient } from "./matrix/actions/client.js"; import { listMatrixOwnDevices, pruneMatrixStaleGatewayDevices } from "./matrix/actions/devices.js"; import { updateMatrixOwnProfile } from "./matrix/actions/profile.js"; import { @@ -21,6 +22,11 @@ import { resolveMatrixAuthContext } from "./matrix/client.js"; import { setMatrixSdkConsoleLogging, setMatrixSdkLogMode } from "./matrix/client/logging.js"; import { resolveMatrixConfigPath, updateMatrixAccountConfig } from "./matrix/config-update.js"; import { isOpenClawManagedMatrixDevice } from "./matrix/device-health.js"; +import { + inspectMatrixDirectRooms, + repairMatrixDirectRooms, + type MatrixDirectRoomCandidate, +} from "./matrix/direct-management.js"; import { applyMatrixProfileUpdate, type MatrixProfileUpdateResult } from "./profile-update.js"; import { getMatrixRuntime } from "./runtime.js"; import type { CoreConfig } from "./types.js"; @@ -309,6 +315,87 @@ async function addMatrixAccount(params: { }; } +function printDirectRoomCandidate(room: MatrixCliDirectRoomCandidate): void { + const members = + room.joinedMembers === null ? "unavailable" : room.joinedMembers.join(", ") || "none"; + console.log( + `- ${room.roomId} [${room.source}] strict=${room.strict ? "yes" : "no"} joined=${members}`, + ); +} + +function printDirectRoomInspection(result: MatrixCliDirectRoomInspection): void { + printAccountLabel(result.accountId); + console.log(`Peer: ${result.remoteUserId}`); + console.log(`Self: ${result.selfUserId ?? "unknown"}`); + console.log(`Active direct room: ${result.activeRoomId ?? "none"}`); + console.log( + `Mapped rooms: ${result.mappedRoomIds.length ? result.mappedRoomIds.join(", ") : "none"}`, + ); + console.log( + `Discovered strict rooms: ${result.discoveredStrictRoomIds.length ? result.discoveredStrictRoomIds.join(", ") : "none"}`, + ); + if (result.mappedRooms.length > 0) { + console.log("Mapped room details:"); + for (const room of result.mappedRooms) { + printDirectRoomCandidate(room); + } + } +} + +async function inspectMatrixDirectRoom(params: { + accountId: string; + userId: string; +}): Promise { + return await withResolvedActionClient( + { accountId: params.accountId }, + async (client) => { + const inspection = await inspectMatrixDirectRooms({ + client, + remoteUserId: params.userId, + }); + return { + accountId: params.accountId, + remoteUserId: inspection.remoteUserId, + selfUserId: inspection.selfUserId, + mappedRoomIds: inspection.mappedRoomIds, + mappedRooms: inspection.mappedRooms.map(toCliDirectRoomCandidate), + discoveredStrictRoomIds: inspection.discoveredStrictRoomIds, + activeRoomId: inspection.activeRoomId, + }; + }, + "persist", + ); +} + +async function repairMatrixDirectRoom(params: { + accountId: string; + userId: string; +}): Promise { + const cfg = getMatrixRuntime().config.loadConfig() as CoreConfig; + const account = resolveMatrixAccount({ cfg, accountId: params.accountId }); + return await withStartedActionClient({ accountId: params.accountId }, async (client) => { + const repaired = await repairMatrixDirectRooms({ + client, + remoteUserId: params.userId, + encrypted: account.config.encryption === true, + }); + return { + accountId: params.accountId, + remoteUserId: repaired.remoteUserId, + selfUserId: repaired.selfUserId, + mappedRoomIds: repaired.mappedRoomIds, + mappedRooms: repaired.mappedRooms.map(toCliDirectRoomCandidate), + discoveredStrictRoomIds: repaired.discoveredStrictRoomIds, + activeRoomId: repaired.activeRoomId, + encrypted: account.config.encryption === true, + createdRoomId: repaired.createdRoomId, + changed: repaired.changed, + directContentBefore: repaired.directContentBefore, + directContentAfter: repaired.directContentAfter, + }; + }); +} + type MatrixCliProfileSetResult = MatrixProfileUpdateResult; async function setMatrixProfile(params: { @@ -386,6 +473,40 @@ type MatrixCliVerificationStatus = { pendingVerifications: number; }; +type MatrixCliDirectRoomCandidate = { + roomId: string; + source: "account-data" | "joined"; + strict: boolean; + joinedMembers: string[] | null; +}; + +type MatrixCliDirectRoomInspection = { + accountId: string; + remoteUserId: string; + selfUserId: string | null; + mappedRoomIds: string[]; + mappedRooms: MatrixCliDirectRoomCandidate[]; + discoveredStrictRoomIds: string[]; + activeRoomId: string | null; +}; + +type MatrixCliDirectRoomRepair = MatrixCliDirectRoomInspection & { + encrypted: boolean; + createdRoomId: string | null; + changed: boolean; + directContentBefore: Record; + directContentAfter: Record; +}; + +function toCliDirectRoomCandidate(room: MatrixDirectRoomCandidate): MatrixCliDirectRoomCandidate { + return { + roomId: room.roomId, + source: room.source, + strict: room.strict, + joinedMembers: room.joinedMembers, + }; +} + function resolveBackupStatus(status: { backupVersion: string | null; backup?: MatrixCliBackupStatus; @@ -706,6 +827,71 @@ export function registerMatrixCli(params: { program: Command }): void { }, ); + const direct = root.command("direct").description("Inspect and repair Matrix direct-room state"); + + direct + .command("inspect") + .description("Inspect direct-room mappings for a Matrix user") + .requiredOption("--user-id ", "Peer Matrix user ID") + .option("--account ", "Account ID (for multi-account setups)") + .option("--verbose", "Show detailed diagnostics") + .option("--json", "Output as JSON") + .action( + async (options: { userId: string; account?: string; verbose?: boolean; json?: boolean }) => { + const accountId = resolveMatrixCliAccountId(options.account); + await runMatrixCliCommand({ + verbose: options.verbose === true, + json: options.json === true, + run: async () => + await inspectMatrixDirectRoom({ + accountId, + userId: options.userId, + }), + onText: (result) => { + printDirectRoomInspection(result); + }, + errorPrefix: "Direct room inspection failed", + }); + }, + ); + + direct + .command("repair") + .description("Repair Matrix direct-room mappings for a Matrix user") + .requiredOption("--user-id ", "Peer Matrix user ID") + .option("--account ", "Account ID (for multi-account setups)") + .option("--verbose", "Show detailed diagnostics") + .option("--json", "Output as JSON") + .action( + async (options: { userId: string; account?: string; verbose?: boolean; json?: boolean }) => { + const accountId = resolveMatrixCliAccountId(options.account); + await runMatrixCliCommand({ + verbose: options.verbose === true, + json: options.json === true, + run: async () => + await repairMatrixDirectRoom({ + accountId, + userId: options.userId, + }), + onText: (result, verbose) => { + printDirectRoomInspection(result); + console.log(`Encrypted room creation: ${result.encrypted ? "enabled" : "disabled"}`); + console.log(`Created room: ${result.createdRoomId ?? "none"}`); + console.log(`m.direct updated: ${result.changed ? "yes" : "no"}`); + if (verbose) { + console.log( + `m.direct before: ${JSON.stringify(result.directContentBefore[result.remoteUserId] ?? [])}`, + ); + console.log( + `m.direct after: ${JSON.stringify(result.directContentAfter[result.remoteUserId] ?? [])}`, + ); + } + }, + errorPrefix: "Direct room repair failed", + }); + }, + ); + const verify = root.command("verify").description("Device verification for Matrix E2EE"); verify diff --git a/extensions/matrix/src/matrix/actions/client.test.ts b/extensions/matrix/src/matrix/actions/client.test.ts index 20d9d03e816..1f2d3070b7c 100644 --- a/extensions/matrix/src/matrix/actions/client.test.ts +++ b/extensions/matrix/src/matrix/actions/client.test.ts @@ -12,7 +12,7 @@ const { getMatrixRuntimeMock, getActiveMatrixClientMock, resolveSharedMatrixClientMock, - stopSharedClientForAccountMock, + stopSharedClientInstanceMock, isBunRuntimeMock, resolveMatrixAuthContextMock, } = matrixClientResolverMocks; @@ -32,7 +32,7 @@ vi.mock("../client.js", () => ({ })); vi.mock("../client/shared.js", () => ({ - stopSharedClientForAccount: (...args: unknown[]) => stopSharedClientForAccountMock(...args), + stopSharedClientInstance: (...args: unknown[]) => stopSharedClientInstanceMock(...args), })); vi.mock("../send.js", () => ({ @@ -74,9 +74,7 @@ describe("action client helpers", () => { const sharedClient = await resolveSharedMatrixClientMock.mock.results[0]?.value; expect(sharedClient.prepareForOneOff).toHaveBeenCalledTimes(1); expect(sharedClient.stop).toHaveBeenCalledTimes(1); - expect(stopSharedClientForAccountMock).toHaveBeenCalledWith( - expect.objectContaining({ userId: "@bot:example.org" }), - ); + expect(stopSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient); expect(result).toBe("ok"); }); @@ -97,9 +95,7 @@ describe("action client helpers", () => { expect(sharedClient.prepareForOneOff).not.toHaveBeenCalled(); expect(sharedClient.stop).not.toHaveBeenCalled(); expect(sharedClient.stopAndPersist).toHaveBeenCalledTimes(1); - expect(stopSharedClientForAccountMock).toHaveBeenCalledWith( - expect.objectContaining({ userId: "@bot:example.org" }), - ); + expect(stopSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient); }); it("reuses active monitor client when available", async () => { @@ -201,9 +197,7 @@ describe("action client helpers", () => { expect(result).toBe("ok"); expect(sharedClient.stop).toHaveBeenCalledTimes(1); expect(sharedClient.stopAndPersist).not.toHaveBeenCalled(); - expect(stopSharedClientForAccountMock).toHaveBeenCalledWith( - expect.objectContaining({ userId: "@bot:example.org" }), - ); + expect(stopSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient); }); it("stops shared action clients when the wrapped call throws", async () => { diff --git a/extensions/matrix/src/matrix/client-bootstrap.ts b/extensions/matrix/src/matrix/client-bootstrap.ts index 6a89239ea81..93c700abef6 100644 --- a/extensions/matrix/src/matrix/client-bootstrap.ts +++ b/extensions/matrix/src/matrix/client-bootstrap.ts @@ -2,7 +2,7 @@ import { getMatrixRuntime } from "../runtime.js"; import type { CoreConfig } from "../types.js"; import { getActiveMatrixClient } from "./active-client.js"; import { isBunRuntime, resolveMatrixAuthContext, resolveSharedMatrixClient } from "./client.js"; -import { stopSharedClientForAccount } from "./client/shared.js"; +import { stopSharedClientInstance } from "./client/shared.js"; import type { MatrixClient } from "./sdk.js"; type ResolvedRuntimeMatrixClient = { @@ -78,7 +78,7 @@ async function resolveRuntimeMatrixClient(opts: { } else { client.stop(); } - stopSharedClientForAccount(authContext.resolved); + stopSharedClientInstance(client); }, }; } diff --git a/extensions/matrix/src/matrix/client-resolver.test-helpers.ts b/extensions/matrix/src/matrix/client-resolver.test-helpers.ts index b4247cdfe99..a6ee7b86d9b 100644 --- a/extensions/matrix/src/matrix/client-resolver.test-helpers.ts +++ b/extensions/matrix/src/matrix/client-resolver.test-helpers.ts @@ -6,7 +6,7 @@ type MatrixClientResolverMocks = { getMatrixRuntimeMock: Mock<() => unknown>; getActiveMatrixClientMock: Mock<(...args: unknown[]) => MatrixClient | null>; resolveSharedMatrixClientMock: Mock<(...args: unknown[]) => Promise>; - stopSharedClientForAccountMock: Mock<(...args: unknown[]) => void>; + stopSharedClientInstanceMock: Mock<(...args: unknown[]) => void>; isBunRuntimeMock: Mock<() => boolean>; resolveMatrixAuthContextMock: Mock< (params: { cfg: unknown; accountId?: string | null }) => unknown @@ -18,7 +18,7 @@ export const matrixClientResolverMocks: MatrixClientResolverMocks = { getMatrixRuntimeMock: vi.fn(), getActiveMatrixClientMock: vi.fn(), resolveSharedMatrixClientMock: vi.fn(), - stopSharedClientForAccountMock: vi.fn(), + stopSharedClientInstanceMock: vi.fn(), isBunRuntimeMock: vi.fn(() => false), resolveMatrixAuthContextMock: vi.fn(), }; @@ -44,7 +44,7 @@ export function primeMatrixClientResolverMocks(params?: { getMatrixRuntimeMock, getActiveMatrixClientMock, resolveSharedMatrixClientMock, - stopSharedClientForAccountMock, + stopSharedClientInstanceMock, isBunRuntimeMock, resolveMatrixAuthContextMock, } = matrixClientResolverMocks; @@ -70,7 +70,7 @@ export function primeMatrixClientResolverMocks(params?: { }); getActiveMatrixClientMock.mockReturnValue(null); isBunRuntimeMock.mockReturnValue(false); - stopSharedClientForAccountMock.mockReset(); + stopSharedClientInstanceMock.mockReset(); resolveMatrixAuthContextMock.mockImplementation( ({ cfg: explicitCfg, diff --git a/extensions/matrix/src/matrix/client.ts b/extensions/matrix/src/matrix/client.ts index 23d4011f9f3..57395b22c24 100644 --- a/extensions/matrix/src/matrix/client.ts +++ b/extensions/matrix/src/matrix/client.ts @@ -10,4 +10,8 @@ export { validateMatrixHomeserverUrl, } from "./client/config.js"; export { createMatrixClient } from "./client/create-client.js"; -export { resolveSharedMatrixClient, stopSharedClientForAccount } from "./client/shared.js"; +export { + resolveSharedMatrixClient, + stopSharedClientForAccount, + stopSharedClientInstance, +} from "./client/shared.js"; diff --git a/extensions/matrix/src/matrix/client/shared.test.ts b/extensions/matrix/src/matrix/client/shared.test.ts index 43919cb712b..ab12a182961 100644 --- a/extensions/matrix/src/matrix/client/shared.test.ts +++ b/extensions/matrix/src/matrix/client/shared.test.ts @@ -18,6 +18,7 @@ import { resolveSharedMatrixClient, stopSharedClient, stopSharedClientForAccount, + stopSharedClientInstance, } from "./shared.js"; function authFor(accountId: string): MatrixAuth { @@ -122,6 +123,26 @@ describe("resolveSharedMatrixClient", () => { expect(poeClient.stop).toHaveBeenCalledTimes(1); }); + it("drops stopped shared clients by instance so the next resolve recreates them", async () => { + const mainAuth = authFor("main"); + const firstMainClient = createMockClient("main-first"); + const secondMainClient = createMockClient("main-second"); + + resolveMatrixAuthMock.mockResolvedValue(mainAuth); + createMatrixClientMock + .mockResolvedValueOnce(firstMainClient) + .mockResolvedValueOnce(secondMainClient); + + const first = await resolveSharedMatrixClient({ accountId: "main", startClient: false }); + stopSharedClientInstance(first as unknown as import("../sdk.js").MatrixClient); + const second = await resolveSharedMatrixClient({ accountId: "main", startClient: false }); + + expect(first).toBe(firstMainClient); + expect(second).toBe(secondMainClient); + expect(firstMainClient.stop).toHaveBeenCalledTimes(1); + expect(createMatrixClientMock).toHaveBeenCalledTimes(2); + }); + it("reuses the effective implicit account instead of keying it as default", async () => { const poeAuth = authFor("ops"); const poeClient = createMockClient("ops"); diff --git a/extensions/matrix/src/matrix/client/shared.ts b/extensions/matrix/src/matrix/client/shared.ts index c92a995fc37..dca912133eb 100644 --- a/extensions/matrix/src/matrix/client/shared.ts +++ b/extensions/matrix/src/matrix/client/shared.ts @@ -193,3 +193,15 @@ export function stopSharedClientForAccount(auth: MatrixAuth): void { sharedClientStates.delete(key); sharedClientPromises.delete(key); } + +export function stopSharedClientInstance(client: MatrixClient): void { + for (const [key, state] of sharedClientStates.entries()) { + if (state.client !== client) { + continue; + } + state.client.stop(); + sharedClientStates.delete(key); + sharedClientPromises.delete(key); + return; + } +} diff --git a/extensions/matrix/src/matrix/direct-management.test.ts b/extensions/matrix/src/matrix/direct-management.test.ts new file mode 100644 index 00000000000..34407fef864 --- /dev/null +++ b/extensions/matrix/src/matrix/direct-management.test.ts @@ -0,0 +1,139 @@ +import { describe, expect, it, vi } from "vitest"; +import { inspectMatrixDirectRooms, repairMatrixDirectRooms } from "./direct-management.js"; +import type { MatrixClient } from "./sdk.js"; +import { EventType } from "./send/types.js"; + +function createClient(overrides: Partial = {}): MatrixClient { + return { + getUserId: vi.fn(async () => "@bot:example.org"), + getAccountData: vi.fn(async () => undefined), + getJoinedRooms: vi.fn(async () => [] as string[]), + getJoinedRoomMembers: vi.fn(async () => [] as string[]), + setAccountData: vi.fn(async () => undefined), + createDirectRoom: vi.fn(async () => "!created:example.org"), + ...overrides, + } as unknown as MatrixClient; +} + +describe("inspectMatrixDirectRooms", () => { + it("prefers strict mapped rooms over discovered rooms", async () => { + const client = createClient({ + getAccountData: vi.fn(async () => ({ + "@alice:example.org": ["!dm:example.org", "!shared:example.org"], + })), + getJoinedRooms: vi.fn(async () => ["!dm:example.org", "!shared:example.org"]), + getJoinedRoomMembers: vi.fn(async (roomId: string) => + roomId === "!dm:example.org" + ? ["@bot:example.org", "@alice:example.org"] + : ["@bot:example.org", "@alice:example.org", "@mallory:example.org"], + ), + }); + + const result = await inspectMatrixDirectRooms({ + client, + remoteUserId: "@alice:example.org", + }); + + expect(result.activeRoomId).toBe("!dm:example.org"); + expect(result.mappedRooms).toEqual([ + expect.objectContaining({ roomId: "!dm:example.org", strict: true }), + expect.objectContaining({ roomId: "!shared:example.org", strict: false }), + ]); + }); + + it("falls back to discovered strict joined rooms when m.direct is stale", async () => { + const client = createClient({ + getAccountData: vi.fn(async () => ({ + "@alice:example.org": ["!stale:example.org"], + })), + getJoinedRooms: vi.fn(async () => ["!stale:example.org", "!fresh:example.org"]), + getJoinedRoomMembers: vi.fn(async (roomId: string) => + roomId === "!fresh:example.org" + ? ["@bot:example.org", "@alice:example.org"] + : ["@bot:example.org", "@alice:example.org", "@mallory:example.org"], + ), + }); + + const result = await inspectMatrixDirectRooms({ + client, + remoteUserId: "@alice:example.org", + }); + + expect(result.activeRoomId).toBe("!fresh:example.org"); + expect(result.discoveredStrictRoomIds).toEqual(["!fresh:example.org"]); + }); +}); + +describe("repairMatrixDirectRooms", () => { + it("repoints m.direct to an existing strict joined room", async () => { + const setAccountData = vi.fn(async () => undefined); + const client = createClient({ + getAccountData: vi.fn(async () => ({ + "@alice:example.org": ["!stale:example.org"], + })), + getJoinedRooms: vi.fn(async () => ["!stale:example.org", "!fresh:example.org"]), + getJoinedRoomMembers: vi.fn(async (roomId: string) => + roomId === "!fresh:example.org" + ? ["@bot:example.org", "@alice:example.org"] + : ["@bot:example.org", "@alice:example.org", "@mallory:example.org"], + ), + setAccountData, + }); + + const result = await repairMatrixDirectRooms({ + client, + remoteUserId: "@alice:example.org", + encrypted: true, + }); + + expect(result.activeRoomId).toBe("!fresh:example.org"); + expect(result.createdRoomId).toBeNull(); + expect(setAccountData).toHaveBeenCalledWith( + EventType.Direct, + expect.objectContaining({ + "@alice:example.org": ["!fresh:example.org", "!stale:example.org"], + }), + ); + }); + + it("creates a fresh direct room when no healthy DM exists", async () => { + const createDirectRoom = vi.fn(async () => "!created:example.org"); + const setAccountData = vi.fn(async () => undefined); + const client = createClient({ + getJoinedRooms: vi.fn(async () => ["!shared:example.org"]), + getJoinedRoomMembers: vi.fn(async () => [ + "@bot:example.org", + "@alice:example.org", + "@mallory:example.org", + ]), + createDirectRoom, + setAccountData, + }); + + const result = await repairMatrixDirectRooms({ + client, + remoteUserId: "@alice:example.org", + encrypted: true, + }); + + expect(createDirectRoom).toHaveBeenCalledWith("@alice:example.org", { encrypted: true }); + expect(result.createdRoomId).toBe("!created:example.org"); + expect(setAccountData).toHaveBeenCalledWith( + EventType.Direct, + expect.objectContaining({ + "@alice:example.org": ["!created:example.org"], + }), + ); + }); + + it("rejects unqualified Matrix user ids", async () => { + const client = createClient(); + + await expect( + repairMatrixDirectRooms({ + client, + remoteUserId: "alice", + }), + ).rejects.toThrow('Matrix user IDs must be fully qualified (got "alice")'); + }); +}); diff --git a/extensions/matrix/src/matrix/direct-management.ts b/extensions/matrix/src/matrix/direct-management.ts new file mode 100644 index 00000000000..3f1c4cb5009 --- /dev/null +++ b/extensions/matrix/src/matrix/direct-management.ts @@ -0,0 +1,211 @@ +import { + isStrictDirectMembership, + isStrictDirectRoom, + readJoinedMatrixMembers, +} from "./direct-room.js"; +import type { MatrixClient } from "./sdk.js"; +import { EventType, type MatrixDirectAccountData } from "./send/types.js"; +import { isMatrixQualifiedUserId } from "./target-ids.js"; + +export type MatrixDirectRoomCandidate = { + roomId: string; + joinedMembers: string[] | null; + strict: boolean; + source: "account-data" | "joined"; +}; + +export type MatrixDirectRoomInspection = { + selfUserId: string | null; + remoteUserId: string; + mappedRoomIds: string[]; + mappedRooms: MatrixDirectRoomCandidate[]; + discoveredStrictRoomIds: string[]; + activeRoomId: string | null; +}; + +export type MatrixDirectRoomRepairResult = MatrixDirectRoomInspection & { + createdRoomId: string | null; + changed: boolean; + directContentBefore: MatrixDirectAccountData; + directContentAfter: MatrixDirectAccountData; +}; + +async function readMatrixDirectAccountData(client: MatrixClient): Promise { + try { + const direct = (await client.getAccountData(EventType.Direct)) as MatrixDirectAccountData; + return direct && typeof direct === "object" && !Array.isArray(direct) ? direct : {}; + } catch { + return {}; + } +} + +function normalizeRemoteUserId(remoteUserId: string): string { + const normalized = remoteUserId.trim(); + if (!isMatrixQualifiedUserId(normalized)) { + throw new Error(`Matrix user IDs must be fully qualified (got "${remoteUserId}")`); + } + return normalized; +} + +function normalizeMappedRoomIds(direct: MatrixDirectAccountData, remoteUserId: string): string[] { + const current = direct[remoteUserId]; + if (!Array.isArray(current)) { + return []; + } + const seen = new Set(); + const normalized: string[] = []; + for (const value of current) { + const roomId = typeof value === "string" ? value.trim() : ""; + if (!roomId || seen.has(roomId)) { + continue; + } + seen.add(roomId); + normalized.push(roomId); + } + return normalized; +} + +function normalizeRoomIdList(values: readonly string[]): string[] { + const seen = new Set(); + const normalized: string[] = []; + for (const value of values) { + const roomId = value.trim(); + if (!roomId || seen.has(roomId)) { + continue; + } + seen.add(roomId); + normalized.push(roomId); + } + return normalized; +} + +async function classifyDirectRoomCandidate(params: { + client: MatrixClient; + roomId: string; + remoteUserId: string; + selfUserId: string | null; + source: "account-data" | "joined"; +}): Promise { + const joinedMembers = await readJoinedMatrixMembers(params.client, params.roomId); + return { + roomId: params.roomId, + joinedMembers, + strict: + joinedMembers !== null && + isStrictDirectMembership({ + selfUserId: params.selfUserId, + remoteUserId: params.remoteUserId, + joinedMembers, + }), + source: params.source, + }; +} + +function buildNextDirectContent(params: { + directContent: MatrixDirectAccountData; + remoteUserId: string; + roomId: string; +}): MatrixDirectAccountData { + const current = normalizeMappedRoomIds(params.directContent, params.remoteUserId); + const nextRooms = normalizeRoomIdList([params.roomId, ...current]); + return { + ...params.directContent, + [params.remoteUserId]: nextRooms, + }; +} + +export async function inspectMatrixDirectRooms(params: { + client: MatrixClient; + remoteUserId: string; +}): Promise { + const remoteUserId = normalizeRemoteUserId(params.remoteUserId); + const selfUserId = (await params.client.getUserId().catch(() => null))?.trim() || null; + const directContent = await readMatrixDirectAccountData(params.client); + const mappedRoomIds = normalizeMappedRoomIds(directContent, remoteUserId); + const mappedRooms = await Promise.all( + mappedRoomIds.map( + async (roomId) => + await classifyDirectRoomCandidate({ + client: params.client, + roomId, + remoteUserId, + selfUserId, + source: "account-data", + }), + ), + ); + const mappedStrict = mappedRooms.find((room) => room.strict); + + let joinedRooms: string[] = []; + if (!mappedStrict && typeof params.client.getJoinedRooms === "function") { + try { + const resolved = await params.client.getJoinedRooms(); + joinedRooms = Array.isArray(resolved) ? resolved : []; + } catch { + joinedRooms = []; + } + } + const discoveredStrictRoomIds: string[] = []; + for (const roomId of normalizeRoomIdList(joinedRooms)) { + if (mappedRoomIds.includes(roomId)) { + continue; + } + if ( + await isStrictDirectRoom({ + client: params.client, + roomId, + remoteUserId, + selfUserId, + }) + ) { + discoveredStrictRoomIds.push(roomId); + } + } + + return { + selfUserId, + remoteUserId, + mappedRoomIds, + mappedRooms, + discoveredStrictRoomIds, + activeRoomId: mappedStrict?.roomId ?? discoveredStrictRoomIds[0] ?? null, + }; +} + +export async function repairMatrixDirectRooms(params: { + client: MatrixClient; + remoteUserId: string; + encrypted?: boolean; +}): Promise { + const remoteUserId = normalizeRemoteUserId(params.remoteUserId); + const directContentBefore = await readMatrixDirectAccountData(params.client); + const inspected = await inspectMatrixDirectRooms({ + client: params.client, + remoteUserId, + }); + const activeRoomId = + inspected.activeRoomId ?? + (await params.client.createDirectRoom(remoteUserId, { + encrypted: params.encrypted === true, + })); + const createdRoomId = inspected.activeRoomId ? null : activeRoomId; + const directContentAfter = buildNextDirectContent({ + directContent: directContentBefore, + remoteUserId, + roomId: activeRoomId, + }); + const changed = + JSON.stringify(directContentAfter[remoteUserId] ?? []) !== + JSON.stringify(directContentBefore[remoteUserId] ?? []); + if (changed) { + await params.client.setAccountData(EventType.Direct, directContentAfter); + } + return { + ...inspected, + activeRoomId, + createdRoomId, + changed, + directContentBefore, + directContentAfter, + }; +} diff --git a/extensions/matrix/src/matrix/monitor/index.test.ts b/extensions/matrix/src/matrix/monitor/index.test.ts index 4824c8ebd12..eba37159cc4 100644 --- a/extensions/matrix/src/matrix/monitor/index.test.ts +++ b/extensions/matrix/src/matrix/monitor/index.test.ts @@ -14,7 +14,7 @@ const hoisted = vi.hoisted(() => { debug: vi.fn(), }; const stopThreadBindingManager = vi.fn(); - const stopSharedClientForAccount = vi.fn(); + const stopSharedClientInstance = vi.fn(); const setActiveMatrixClient = vi.fn(); return { callOrder, @@ -23,7 +23,7 @@ const hoisted = vi.hoisted(() => { resolveTextChunkLimit, setActiveMatrixClient, startClientError, - stopSharedClientForAccount, + stopSharedClientInstance, stopThreadBindingManager, }; }); @@ -123,7 +123,7 @@ vi.mock("../client.js", () => ({ hoisted.callOrder.push("start-client"); return hoisted.client; }), - stopSharedClientForAccount: hoisted.stopSharedClientForAccount, + stopSharedClientInstance: hoisted.stopSharedClientInstance, })); vi.mock("../config-update.js", () => ({ @@ -203,7 +203,7 @@ describe("monitorMatrixProvider", () => { hoisted.startClientError = null; hoisted.resolveTextChunkLimit.mockReset().mockReturnValue(4000); hoisted.setActiveMatrixClient.mockReset(); - hoisted.stopSharedClientForAccount.mockReset(); + hoisted.stopSharedClientInstance.mockReset(); hoisted.stopThreadBindingManager.mockReset(); Object.values(hoisted.logger).forEach((mock) => mock.mockReset()); }); @@ -245,7 +245,7 @@ describe("monitorMatrixProvider", () => { await expect(monitorMatrixProvider()).rejects.toThrow("start failed"); expect(hoisted.stopThreadBindingManager).toHaveBeenCalledTimes(1); - expect(hoisted.stopSharedClientForAccount).toHaveBeenCalledTimes(1); + expect(hoisted.stopSharedClientInstance).toHaveBeenCalledTimes(1); expect(hoisted.setActiveMatrixClient).toHaveBeenNthCalledWith(1, hoisted.client, "default"); expect(hoisted.setActiveMatrixClient).toHaveBeenNthCalledWith(2, null, "default"); }); diff --git a/extensions/matrix/src/matrix/monitor/index.ts b/extensions/matrix/src/matrix/monitor/index.ts index c05270a7ed2..e87aef44702 100644 --- a/extensions/matrix/src/matrix/monitor/index.ts +++ b/extensions/matrix/src/matrix/monitor/index.ts @@ -17,7 +17,7 @@ import { resolveMatrixAuth, resolveMatrixAuthContext, resolveSharedMatrixClient, - stopSharedClientForAccount, + stopSharedClientInstance, } from "../client.js"; import { createMatrixThreadBindingManager } from "../thread-bindings.js"; import { registerMatrixAutoJoin } from "./auto-join.js"; @@ -139,7 +139,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi try { threadBindingManager?.stop(); } finally { - stopSharedClientForAccount(auth); + stopSharedClientInstance(client); setActiveMatrixClient(null, auth.accountId); } }; diff --git a/extensions/matrix/src/matrix/sdk.ts b/extensions/matrix/src/matrix/sdk.ts index b1fe8683b2b..1a8c220d6ec 100644 --- a/extensions/matrix/src/matrix/sdk.ts +++ b/extensions/matrix/src/matrix/sdk.ts @@ -501,6 +501,30 @@ export class MatrixClient { } } + async createDirectRoom( + remoteUserId: string, + opts: { encrypted?: boolean } = {}, + ): Promise { + const initialState = opts.encrypted + ? [ + { + type: "m.room.encryption", + state_key: "", + content: { + algorithm: "m.megolm.v1.aes-sha2", + }, + }, + ] + : undefined; + const result = await this.client.createRoom({ + invite: [remoteUserId], + is_direct: true, + preset: "trusted_private_chat", + initial_state: initialState, + }); + return result.room_id; + } + async sendMessage(roomId: string, content: MessageEventContent): Promise { const sent = await this.client.sendMessage(roomId, content as never); return sent.event_id; diff --git a/extensions/matrix/src/matrix/send/client.test.ts b/extensions/matrix/src/matrix/send/client.test.ts index 61bcd897618..5366aae5958 100644 --- a/extensions/matrix/src/matrix/send/client.test.ts +++ b/extensions/matrix/src/matrix/send/client.test.ts @@ -9,7 +9,7 @@ const { getMatrixRuntimeMock, getActiveMatrixClientMock, resolveSharedMatrixClientMock, - stopSharedClientForAccountMock, + stopSharedClientInstanceMock, isBunRuntimeMock, resolveMatrixAuthContextMock, } = matrixClientResolverMocks; @@ -25,7 +25,7 @@ vi.mock("../client.js", () => ({ })); vi.mock("../client/shared.js", () => ({ - stopSharedClientForAccount: (...args: unknown[]) => stopSharedClientForAccountMock(...args), + stopSharedClientInstance: (...args: unknown[]) => stopSharedClientInstanceMock(...args), })); vi.mock("../../runtime.js", () => ({ @@ -63,9 +63,7 @@ describe("withResolvedMatrixClient", () => { const sharedClient = await resolveSharedMatrixClientMock.mock.results[0]?.value; expect(sharedClient.prepareForOneOff).toHaveBeenCalledTimes(1); expect(sharedClient.stop).toHaveBeenCalledTimes(1); - expect(stopSharedClientForAccountMock).toHaveBeenCalledWith( - expect.objectContaining({ userId: "@bot:example.org" }), - ); + expect(stopSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient); expect(result).toBe("ok"); }); @@ -134,8 +132,6 @@ describe("withResolvedMatrixClient", () => { ).rejects.toThrow("boom"); expect(sharedClient.stop).toHaveBeenCalledTimes(1); - expect(stopSharedClientForAccountMock).toHaveBeenCalledWith( - expect.objectContaining({ userId: "@bot:example.org" }), - ); + expect(stopSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient); }); }); diff --git a/extensions/matrix/src/matrix/send/targets.ts b/extensions/matrix/src/matrix/send/targets.ts index e71e6d958a4..e27ff99a59e 100644 --- a/extensions/matrix/src/matrix/send/targets.ts +++ b/extensions/matrix/src/matrix/send/targets.ts @@ -1,3 +1,4 @@ +import { inspectMatrixDirectRooms } from "../direct-management.js"; import { isStrictDirectRoom } from "../direct-room.js"; import type { MatrixClient } from "../sdk.js"; import { isMatrixQualifiedUserId, normalizeMatrixResolvableTarget } from "../target-ids.js"; @@ -92,36 +93,16 @@ async function resolveDirectRoomId(client: MatrixClient, userId: string): Promis directRoomCache.delete(trimmed); } - // 1) Fast path: use account data (m.direct) for *this* logged-in user (the bot). - try { - const directContent = (await client.getAccountData(EventType.Direct)) as Record< - string, - string[] | undefined - >; - const list = Array.isArray(directContent?.[trimmed]) ? directContent[trimmed] : []; - for (const roomId of list) { - if (await isStrictDirectRoom({ client, roomId, remoteUserId: trimmed, selfUserId })) { - setDirectRoomCached(client, trimmed, roomId); - return roomId; - } + const inspection = await inspectMatrixDirectRooms({ + client, + remoteUserId: trimmed, + }); + if (inspection.activeRoomId) { + setDirectRoomCached(client, trimmed, inspection.activeRoomId); + if (inspection.mappedRoomIds[0] !== inspection.activeRoomId) { + await persistDirectRoom(client, trimmed, inspection.activeRoomId); } - } catch { - // Ignore and fall back. - } - - // 2) Fallback: look for an existing joined room that is actually a 1:1 with the user. - // Many clients only maintain m.direct for *their own* account data, so relying on it is brittle. - try { - const rooms = await client.getJoinedRooms(); - for (const roomId of rooms) { - if (await isStrictDirectRoom({ client, roomId, remoteUserId: trimmed, selfUserId })) { - setDirectRoomCached(client, trimmed, roomId); - await persistDirectRoom(client, trimmed, roomId); - return roomId; - } - } - } catch { - // Ignore and fall back. + return inspection.activeRoomId; } throw new Error(`No direct room found for ${trimmed} (m.direct missing)`);