Matrix: harden config threading and binding cleanup

This commit is contained in:
Gustavo Madeira Santana
2026-03-12 00:27:52 +00:00
parent 228237e016
commit cd77210b2f
22 changed files with 456 additions and 81 deletions

View File

@@ -180,6 +180,31 @@ describe("resolveActionClient", () => {
);
});
it("uses explicit cfg instead of loading runtime config", async () => {
const explicitCfg = {
channels: {
matrix: {
defaultAccount: "ops",
},
},
};
await resolveActionClient({
cfg: explicitCfg,
accountId: "ops",
});
expect(getMatrixRuntimeMock).not.toHaveBeenCalled();
expect(resolveMatrixAuthContextMock).toHaveBeenCalledWith({
cfg: explicitCfg,
accountId: "ops",
});
expect(resolveMatrixAuthMock).toHaveBeenCalledWith({
cfg: explicitCfg,
accountId: "ops",
});
});
it("stops one-off action clients after wrapped calls succeed", async () => {
const oneOffClient = createMockMatrixClient();
createMatrixClientMock.mockResolvedValue(oneOffClient);

View File

@@ -21,6 +21,7 @@ export async function resolveActionClient(
): Promise<MatrixActionClient> {
return await resolveRuntimeMatrixClient({
client: opts.client,
cfg: opts.cfg,
timeoutMs: opts.timeoutMs,
accountId: opts.accountId,
onResolved: async (client, context) => {

View File

@@ -24,6 +24,7 @@ export async function sendMatrixMessage(
} = {},
) {
return await sendMessageMatrix(to, content, {
cfg: opts.cfg,
mediaUrl: opts.mediaUrl,
replyToId: opts.replyToId,
threadId: opts.threadId,

View File

@@ -1,3 +1,4 @@
import type { CoreConfig } from "../../types.js";
import {
MATRIX_ANNOTATION_RELATION_TYPE,
MATRIX_REACTION_EVENT_TYPE,
@@ -46,6 +47,7 @@ export type RoomTopicEventContent = {
export type MatrixActionClientOpts = {
client?: MatrixClient;
cfg?: CoreConfig;
timeoutMs?: number;
accountId?: string | null;
readiness?: "none" | "prepared" | "started";

View File

@@ -75,4 +75,29 @@ describe("matrix verification actions", () => {
"Matrix encryption is not available (enable channels.matrix.accounts.ops.encryption=true)",
);
});
it("uses explicit cfg instead of runtime config when crypto is unavailable", async () => {
const explicitCfg = {
channels: {
matrix: {
accounts: {
ops: {
encryption: false,
},
},
},
},
};
loadConfigMock.mockImplementation(() => {
throw new Error("verification actions should not reload runtime config when cfg is provided");
});
withStartedActionClientMock.mockImplementation(async (_opts, run) => {
return await run({ crypto: null });
});
await expect(listMatrixVerifications({ cfg: explicitCfg, accountId: "ops" })).rejects.toThrow(
"Matrix encryption is not available (enable channels.matrix.accounts.ops.encryption=true)",
);
expect(loadConfigMock).not.toHaveBeenCalled();
});
});

View File

@@ -9,7 +9,7 @@ function requireCrypto(
opts: MatrixActionClientOpts,
): NonNullable<import("../sdk.js").MatrixClient["crypto"]> {
if (!client.crypto) {
const cfg = getMatrixRuntime().config.loadConfig() as CoreConfig;
const cfg = opts.cfg ?? (getMatrixRuntime().config.loadConfig() as CoreConfig);
throw new Error(formatMatrixEncryptionUnavailableError(cfg, opts.accountId));
}
return client.crypto;

View File

@@ -27,6 +27,7 @@ export function ensureMatrixNodeRuntime() {
export async function resolveRuntimeMatrixClient(opts: {
client?: MatrixClient;
cfg?: CoreConfig;
timeoutMs?: number;
accountId?: string | null;
onResolved?: MatrixResolvedClientHook;
@@ -37,7 +38,7 @@ export async function resolveRuntimeMatrixClient(opts: {
return { client: opts.client, stopOnDone: false };
}
const cfg = getMatrixRuntime().config.loadConfig() as CoreConfig;
const cfg = opts.cfg ?? (getMatrixRuntime().config.loadConfig() as CoreConfig);
const authContext = resolveMatrixAuthContext({
cfg,
accountId: opts.accountId,

View File

@@ -336,7 +336,11 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
created
? pairingReply
: `${pairingReply}\n\nPairing request is still pending approval. Reusing existing code.`,
{ client },
{
client,
cfg,
accountId,
},
);
} catch (err) {
logVerboseMessage(`matrix pairing reply failed for ${senderId}: ${String(err)}`);
@@ -692,6 +696,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId),
deliver: async (payload: ReplyPayload) => {
await deliverMatrixReplies({
cfg,
replies: [payload],
roomId,
client,

View File

@@ -13,10 +13,13 @@ import { setMatrixRuntime } from "../../runtime.js";
import { deliverMatrixReplies } from "./replies.js";
describe("deliverMatrixReplies", () => {
const cfg = { channels: { matrix: {} } };
const loadConfigMock = vi.fn(() => ({}));
const resolveMarkdownTableModeMock = vi.fn(() => "code");
const resolveMarkdownTableModeMock = vi.fn<(params: unknown) => string>(() => "code");
const convertMarkdownTablesMock = vi.fn((text: string) => text);
const resolveChunkModeMock = vi.fn(() => "length");
const resolveChunkModeMock = vi.fn<
(cfg: unknown, channel: unknown, accountId?: unknown) => string
>(() => "length");
const chunkMarkdownTextWithModeMock = vi.fn((text: string) => [text]);
const runtimeStub = {
@@ -25,9 +28,10 @@ describe("deliverMatrixReplies", () => {
},
channel: {
text: {
resolveMarkdownTableMode: () => resolveMarkdownTableModeMock(),
resolveMarkdownTableMode: (params: unknown) => resolveMarkdownTableModeMock(params),
convertMarkdownTables: (text: string) => convertMarkdownTablesMock(text),
resolveChunkMode: () => resolveChunkModeMock(),
resolveChunkMode: (cfg: unknown, channel: unknown, accountId?: unknown) =>
resolveChunkModeMock(cfg, channel, accountId),
chunkMarkdownTextWithMode: (text: string) => chunkMarkdownTextWithModeMock(text),
},
},
@@ -51,6 +55,7 @@ describe("deliverMatrixReplies", () => {
chunkMarkdownTextWithModeMock.mockImplementation((text: string) => text.split("|"));
await deliverMatrixReplies({
cfg,
replies: [
{ text: "first-a|first-b", replyToId: "reply-1" },
{ text: "second", replyToId: "reply-2" },
@@ -76,6 +81,7 @@ describe("deliverMatrixReplies", () => {
it("keeps replyToId on every reply when replyToMode=all", async () => {
await deliverMatrixReplies({
cfg,
replies: [
{
text: "caption",
@@ -112,6 +118,7 @@ describe("deliverMatrixReplies", () => {
chunkMarkdownTextWithModeMock.mockImplementation((text: string) => text.split("|"));
await deliverMatrixReplies({
cfg,
replies: [{ text: "hello|thread", replyToId: "reply-thread" }],
roomId: "room:3",
client: {} as MatrixClient,
@@ -129,4 +136,44 @@ describe("deliverMatrixReplies", () => {
expect.objectContaining({ replyToId: undefined, threadId: "thread-77" }),
);
});
it("uses supplied cfg for chunking and send delivery without reloading runtime config", async () => {
const explicitCfg = {
channels: {
matrix: {
accounts: {
ops: {
chunkMode: "newline",
},
},
},
},
};
loadConfigMock.mockImplementation(() => {
throw new Error("deliverMatrixReplies should not reload runtime config when cfg is provided");
});
await deliverMatrixReplies({
cfg: explicitCfg,
replies: [{ text: "hello", replyToId: "reply-1" }],
roomId: "room:4",
client: {} as MatrixClient,
runtime: runtimeEnv,
textLimit: 4000,
replyToMode: "all",
accountId: "ops",
});
expect(loadConfigMock).not.toHaveBeenCalled();
expect(resolveChunkModeMock).toHaveBeenCalledWith(explicitCfg, "matrix", "ops");
expect(sendMessageMatrixMock).toHaveBeenCalledWith(
"room:4",
"hello",
expect.objectContaining({
cfg: explicitCfg,
accountId: "ops",
replyToId: "reply-1",
}),
);
});
});

View File

@@ -1,9 +1,15 @@
import type { MarkdownTableMode, ReplyPayload, RuntimeEnv } from "openclaw/plugin-sdk/matrix";
import type {
MarkdownTableMode,
OpenClawConfig,
ReplyPayload,
RuntimeEnv,
} from "openclaw/plugin-sdk/matrix";
import { getMatrixRuntime } from "../../runtime.js";
import type { MatrixClient } from "../sdk.js";
import { sendMessageMatrix } from "../send.js";
export async function deliverMatrixReplies(params: {
cfg: OpenClawConfig;
replies: ReplyPayload[];
roomId: string;
client: MatrixClient;
@@ -15,11 +21,10 @@ export async function deliverMatrixReplies(params: {
tableMode?: MarkdownTableMode;
}): Promise<void> {
const core = getMatrixRuntime();
const cfg = core.config.loadConfig();
const tableMode =
params.tableMode ??
core.channel.text.resolveMarkdownTableMode({
cfg,
cfg: params.cfg,
channel: "matrix",
accountId: params.accountId,
});
@@ -29,7 +34,7 @@ export async function deliverMatrixReplies(params: {
}
};
const chunkLimit = Math.min(params.textLimit, 4000);
const chunkMode = core.channel.text.resolveChunkMode(cfg, "matrix", params.accountId);
const chunkMode = core.channel.text.resolveChunkMode(params.cfg, "matrix", params.accountId);
let hasReplied = false;
for (const reply of params.replies) {
const hasMedia = Boolean(reply?.mediaUrl) || (reply?.mediaUrls?.length ?? 0) > 0;
@@ -68,6 +73,7 @@ export async function deliverMatrixReplies(params: {
}
await sendMessageMatrix(params.roomId, trimmed, {
client: params.client,
cfg: params.cfg,
replyToId: replyToIdForReply,
threadId: params.threadId,
accountId: params.accountId,
@@ -85,6 +91,7 @@ export async function deliverMatrixReplies(params: {
const caption = first ? text : "";
await sendMessageMatrix(params.roomId, caption, {
client: params.client,
cfg: params.cfg,
mediaUrl,
replyToId: replyToIdForReply,
threadId: params.threadId,

View File

@@ -8,6 +8,7 @@ const loadWebMediaMock = vi.fn().mockResolvedValue({
contentType: "image/png",
kind: "image",
});
const loadConfigMock = vi.fn(() => ({}));
const getImageMetadataMock = vi.fn().mockResolvedValue(null);
const resizeToJpegMock = vi.fn();
const resolveTextChunkLimitMock = vi.fn<
@@ -16,7 +17,7 @@ const resolveTextChunkLimitMock = vi.fn<
const runtimeStub = {
config: {
loadConfig: () => ({}),
loadConfig: () => loadConfigMock(),
},
media: {
loadWebMedia: (...args: unknown[]) => loadWebMediaMock(...args),
@@ -72,6 +73,7 @@ describe("sendMessageMatrix media", () => {
contentType: "image/png",
kind: "image",
});
loadConfigMock.mockReset().mockReturnValue({});
getImageMetadataMock.mockReset().mockResolvedValue(null);
resizeToJpegMock.mockReset();
resolveTextChunkLimitMock.mockReset().mockReturnValue(4000);
@@ -203,6 +205,36 @@ describe("sendMessageMatrix media", () => {
size: Buffer.from("thumb").byteLength,
});
});
it("uses explicit cfg for media sends instead of runtime loadConfig fallbacks", async () => {
const { client } = makeClient();
const explicitCfg = {
channels: {
matrix: {
accounts: {
ops: {
mediaMaxMb: 1,
},
},
},
},
};
loadConfigMock.mockImplementation(() => {
throw new Error("sendMessageMatrix should not reload runtime config when cfg is provided");
});
await sendMessageMatrix("room:!room:example", "caption", {
client,
cfg: explicitCfg,
accountId: "ops",
mediaUrl: "file:///tmp/photo.png",
});
expect(loadConfigMock).not.toHaveBeenCalled();
expect(loadWebMediaMock).toHaveBeenCalledWith("file:///tmp/photo.png", 1024 * 1024);
expect(resolveTextChunkLimitMock).toHaveBeenCalledWith(explicitCfg, "matrix", "ops");
});
});
describe("sendMessageMatrix threads", () => {
@@ -215,6 +247,7 @@ describe("sendMessageMatrix threads", () => {
beforeEach(() => {
vi.clearAllMocks();
loadConfigMock.mockReset().mockReturnValue({});
setMatrixRuntime(runtimeStub);
});
@@ -261,6 +294,7 @@ describe("voteMatrixPoll", () => {
beforeEach(() => {
vi.clearAllMocks();
loadConfigMock.mockReset().mockReturnValue({});
setMatrixRuntime(runtimeStub);
});
@@ -404,6 +438,7 @@ describe("sendTypingMatrix", () => {
beforeEach(() => {
vi.clearAllMocks();
loadConfigMock.mockReset().mockReturnValue({});
setMatrixRuntime(runtimeStub);
});

View File

@@ -1,5 +1,6 @@
import type { PollInput } from "openclaw/plugin-sdk/matrix";
import { getMatrixRuntime } from "../runtime.js";
import type { CoreConfig } from "../types.js";
import { buildPollStartContent, M_POLL_START } from "./poll-types.js";
import { buildMatrixReactionContent } from "./reaction-common.js";
import type { MatrixClient } from "./sdk.js";
@@ -34,6 +35,7 @@ export { resolveMatrixRoomId } from "./send/targets.js";
type MatrixClientResolveOpts = {
client?: MatrixClient;
cfg?: CoreConfig;
timeoutMs?: number;
accountId?: string | null;
};
@@ -51,7 +53,12 @@ function normalizeMatrixClientResolveOpts(
if (isMatrixClient(opts)) {
return { client: opts };
}
return { client: opts.client, timeoutMs: opts.timeoutMs, accountId: opts.accountId };
return {
client: opts.client,
cfg: opts.cfg,
timeoutMs: opts.timeoutMs,
accountId: opts.accountId,
};
}
export async function sendMessageMatrix(
@@ -66,12 +73,13 @@ export async function sendMessageMatrix(
return await withResolvedMatrixClient(
{
client: opts.client,
cfg: opts.cfg,
timeoutMs: opts.timeoutMs,
accountId: opts.accountId,
},
async (client) => {
const roomId = await resolveMatrixRoomId(client, to);
const cfg = getCore().config.loadConfig();
const cfg = opts.cfg ?? getCore().config.loadConfig();
const tableMode = getCore().channel.text.resolveMarkdownTableMode({
cfg,
channel: "matrix",
@@ -100,7 +108,7 @@ export async function sendMessageMatrix(
let lastMessageId = "";
if (opts.mediaUrl) {
const maxBytes = resolveMediaMaxBytes(opts.accountId);
const maxBytes = resolveMediaMaxBytes(opts.accountId, cfg);
const media = await getCore().media.loadWebMedia(opts.mediaUrl, maxBytes);
const uploaded = await uploadMediaMaybeEncrypted(client, roomId, media.buffer, {
contentType: media.contentType,
@@ -189,6 +197,7 @@ export async function sendPollMatrix(
return await withResolvedMatrixClient(
{
client: opts.client,
cfg: opts.cfg,
timeoutMs: opts.timeoutMs,
accountId: opts.accountId,
},
@@ -252,6 +261,7 @@ export async function reactMatrixMessage(
await withResolvedMatrixClient(
{
client: clientOpts.client,
cfg: clientOpts.cfg,
timeoutMs: clientOpts.timeoutMs,
accountId: clientOpts.accountId ?? undefined,
},

View File

@@ -106,6 +106,31 @@ describe("resolveMatrixClient", () => {
);
});
it("uses explicit cfg instead of loading runtime config", async () => {
const explicitCfg = {
channels: {
matrix: {
defaultAccount: "ops",
},
},
};
await resolveMatrixClient({
cfg: explicitCfg,
accountId: "ops",
});
expect(getMatrixRuntimeMock).not.toHaveBeenCalled();
expect(resolveMatrixAuthContextMock).toHaveBeenCalledWith({
cfg: explicitCfg,
accountId: "ops",
});
expect(resolveMatrixAuthMock).toHaveBeenCalledWith({
cfg: explicitCfg,
accountId: "ops",
});
});
it("stops one-off matrix clients after wrapped sends succeed", async () => {
const oneOffClient = createMockMatrixClient();
createMatrixClientMock.mockResolvedValue(oneOffClient);

View File

@@ -9,9 +9,12 @@ import type { MatrixClient } from "../sdk.js";
const getCore = () => getMatrixRuntime();
export function resolveMediaMaxBytes(accountId?: string | null): number | undefined {
const cfg = getCore().config.loadConfig() as CoreConfig;
const matrixCfg = resolveMatrixAccountConfig({ cfg, accountId });
export function resolveMediaMaxBytes(
accountId?: string | null,
cfg?: CoreConfig,
): number | undefined {
const resolvedCfg = cfg ?? (getCore().config.loadConfig() as CoreConfig);
const matrixCfg = resolveMatrixAccountConfig({ cfg: resolvedCfg, accountId });
const mediaMaxMb = typeof matrixCfg.mediaMaxMb === "number" ? matrixCfg.mediaMaxMb : undefined;
if (typeof mediaMaxMb === "number") {
return mediaMaxMb * 1024 * 1024;
@@ -21,11 +24,13 @@ export function resolveMediaMaxBytes(accountId?: string | null): number | undefi
export async function resolveMatrixClient(opts: {
client?: MatrixClient;
cfg?: CoreConfig;
timeoutMs?: number;
accountId?: string | null;
}): Promise<{ client: MatrixClient; stopOnDone: boolean }> {
return await resolveRuntimeMatrixClient({
client: opts.client,
cfg: opts.cfg,
timeoutMs: opts.timeoutMs,
accountId: opts.accountId,
onResolved: async (client, context) => {
@@ -45,6 +50,7 @@ function stopResolvedMatrixClient(resolved: ResolvedRuntimeMatrixClient): void {
export async function withResolvedMatrixClient<T>(
opts: {
client?: MatrixClient;
cfg?: CoreConfig;
timeoutMs?: number;
accountId?: string | null;
},

View File

@@ -1,3 +1,4 @@
import type { CoreConfig } from "../../types.js";
import {
MATRIX_ANNOTATION_RELATION_TYPE,
MATRIX_REACTION_EVENT_TYPE,
@@ -85,6 +86,7 @@ export type MatrixSendResult = {
export type MatrixSendOpts = {
client?: import("../sdk.js").MatrixClient;
cfg?: CoreConfig;
mediaUrl?: string;
accountId?: string;
replyToId?: string;

View File

@@ -16,12 +16,31 @@ import {
setMatrixThreadBindingMaxAgeBySessionKey,
} from "./thread-bindings.js";
const pluginSdkActual = vi.hoisted(() => ({
writeJsonFileAtomically: null as null | ((filePath: string, value: unknown) => Promise<void>),
}));
const sendMessageMatrixMock = vi.hoisted(() =>
vi.fn(async (_to: string, _message: string, opts?: { threadId?: string }) => ({
messageId: opts?.threadId ? "$reply" : "$root",
roomId: "!room:example",
})),
);
const writeJsonFileAtomicallyMock = vi.hoisted(() =>
vi.fn<(filePath: string, value: unknown) => Promise<void>>(),
);
vi.mock("openclaw/plugin-sdk/matrix", async () => {
const actual = await vi.importActual<typeof import("openclaw/plugin-sdk/matrix")>(
"openclaw/plugin-sdk/matrix",
);
pluginSdkActual.writeJsonFileAtomically = actual.writeJsonFileAtomically;
return {
...actual,
writeJsonFileAtomically: (filePath: string, value: unknown) =>
writeJsonFileAtomicallyMock(filePath, value),
};
});
vi.mock("./send.js", async () => {
const actual = await vi.importActual<typeof import("./send.js")>("./send.js");
@@ -63,6 +82,10 @@ describe("matrix thread bindings", () => {
__testing.resetSessionBindingAdaptersForTests();
resetMatrixThreadBindingsForTests();
sendMessageMatrixMock.mockClear();
writeJsonFileAtomicallyMock.mockReset();
writeJsonFileAtomicallyMock.mockImplementation(async (filePath: string, value: unknown) => {
await pluginSdkActual.writeJsonFileAtomically?.(filePath, value);
});
setMatrixRuntime({
state: {
resolveStateDir: () => stateDir,
@@ -193,6 +216,109 @@ describe("matrix thread bindings", () => {
}
});
it("persists a batch of expired bindings once per sweep", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-03-08T12:00:00.000Z"));
try {
await createMatrixThreadBindingManager({
accountId: "ops",
auth,
client: {} as never,
idleTimeoutMs: 1_000,
maxAgeMs: 0,
});
await getSessionBindingService().bind({
targetSessionKey: "agent:ops:subagent:first",
targetKind: "subagent",
conversation: {
channel: "matrix",
accountId: "ops",
conversationId: "$thread-1",
parentConversationId: "!room:example",
},
placement: "current",
});
await getSessionBindingService().bind({
targetSessionKey: "agent:ops:subagent:second",
targetKind: "subagent",
conversation: {
channel: "matrix",
accountId: "ops",
conversationId: "$thread-2",
parentConversationId: "!room:example",
},
placement: "current",
});
writeJsonFileAtomicallyMock.mockClear();
await vi.advanceTimersByTimeAsync(61_000);
await vi.waitFor(() => {
expect(writeJsonFileAtomicallyMock).toHaveBeenCalledTimes(1);
});
await vi.waitFor(async () => {
const persistedRaw = await fs.readFile(resolveBindingsFilePath(), "utf-8");
expect(JSON.parse(persistedRaw)).toMatchObject({
version: 1,
bindings: [],
});
});
} finally {
vi.useRealTimers();
}
});
it("logs and survives sweeper persistence failures", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-03-08T12:00:00.000Z"));
const logVerboseMessage = vi.fn();
try {
await createMatrixThreadBindingManager({
accountId: "ops",
auth,
client: {} as never,
idleTimeoutMs: 1_000,
maxAgeMs: 0,
logVerboseMessage,
});
await getSessionBindingService().bind({
targetSessionKey: "agent:ops:subagent:child",
targetKind: "subagent",
conversation: {
channel: "matrix",
accountId: "ops",
conversationId: "$thread",
parentConversationId: "!room:example",
},
placement: "current",
});
writeJsonFileAtomicallyMock.mockClear();
writeJsonFileAtomicallyMock.mockRejectedValueOnce(new Error("disk full"));
await vi.advanceTimersByTimeAsync(61_000);
await vi.waitFor(() => {
expect(logVerboseMessage).toHaveBeenCalledWith(
expect.stringContaining("failed auto-unbinding expired bindings"),
);
});
expect(
getSessionBindingService().resolveByConversation({
channel: "matrix",
accountId: "ops",
conversationId: "$thread",
parentConversationId: "!room:example",
}),
).toBeNull();
} finally {
vi.useRealTimers();
}
});
it("sends threaded farewell messages when bindings are unbound", async () => {
await createMatrixThreadBindingManager({
accountId: "ops",

View File

@@ -248,13 +248,6 @@ async function persistBindingsSnapshot(
await writeJsonFileAtomically(filePath, toStoredBindingsState(bindings));
}
async function persistBindings(filePath: string, accountId: string): Promise<void> {
await persistBindingsSnapshot(
filePath,
[...BINDINGS_BY_ACCOUNT_CONVERSATION.values()].filter((entry) => entry.accountId === accountId),
);
}
function setBindingRecord(record: MatrixThreadBindingRecord): void {
BINDINGS_BY_ACCOUNT_CONVERSATION.set(resolveBindingKey(record), record);
}
@@ -372,12 +365,20 @@ export async function createMatrixThreadBindingManager(params: {
setBindingRecord(record);
}
const persist = async () => await persistBindings(filePath, params.accountId);
let persistQueue: Promise<void> = Promise.resolve();
const enqueuePersist = (bindings?: MatrixThreadBindingRecord[]) => {
const snapshot = bindings ?? listBindingsForAccount(params.accountId);
const next = persistQueue
.catch(() => {})
.then(async () => {
await persistBindingsSnapshot(filePath, snapshot);
});
persistQueue = next;
return next;
};
const persist = async () => await enqueuePersist();
const persistSafely = (reason: string, bindings?: MatrixThreadBindingRecord[]) => {
void persistBindingsSnapshot(
filePath,
bindings ?? listBindingsForAccount(params.accountId),
).catch((err) => {
void enqueuePersist(bindings).catch((err) => {
params.logVerboseMessage?.(
`matrix: failed persisting thread bindings account=${params.accountId} action=${reason}: ${String(err)}`,
);
@@ -503,17 +504,18 @@ export async function createMatrixThreadBindingManager(params: {
};
let sweepTimer: NodeJS.Timeout | null = null;
const unbindRecords = async (records: MatrixThreadBindingRecord[], reason: string) => {
const removeRecords = (records: MatrixThreadBindingRecord[]) => {
if (records.length === 0) {
return [];
}
const removed = records
return records
.map((record) => removeBindingRecord(record))
.filter((record): record is MatrixThreadBindingRecord => Boolean(record));
if (removed.length === 0) {
return [];
}
await persist();
};
const sendFarewellMessages = async (
removed: MatrixThreadBindingRecord[],
reason: string | ((record: MatrixThreadBindingRecord) => string | undefined),
) => {
await Promise.all(
removed.map(async (record) => {
await sendFarewellMessage({
@@ -522,10 +524,18 @@ export async function createMatrixThreadBindingManager(params: {
record,
defaultIdleTimeoutMs: defaults.idleTimeoutMs,
defaultMaxAgeMs: defaults.maxAgeMs,
reason,
reason: typeof reason === "function" ? reason(record) : reason,
});
}),
);
};
const unbindRecords = async (records: MatrixThreadBindingRecord[], reason: string) => {
const removed = removeRecords(records);
if (removed.length === 0) {
return [];
}
await persist();
await sendFarewellMessages(removed, reason);
return removed.map((record) => toSessionBindingRecord(record, defaults));
};
@@ -664,14 +674,29 @@ export async function createMatrixThreadBindingManager(params: {
if (expired.length === 0) {
return;
}
void Promise.all(
expired.map(async ({ record, lifecycle }) => {
params.logVerboseMessage?.(
`matrix: auto-unbinding ${record.conversationId} due to ${lifecycle.reason}`,
);
await unbindRecords([record], lifecycle.reason);
}),
const reasonByBindingKey = new Map(
expired.map(({ record, lifecycle }) => [resolveBindingKey(record), lifecycle.reason]),
);
void (async () => {
const removed = removeRecords(expired.map(({ record }) => record));
if (removed.length === 0) {
return;
}
for (const record of removed) {
const reason = reasonByBindingKey.get(resolveBindingKey(record));
params.logVerboseMessage?.(
`matrix: auto-unbinding ${record.conversationId} due to ${reason}`,
);
}
await persist();
await sendFarewellMessages(removed, (record) =>
reasonByBindingKey.get(resolveBindingKey(record)),
);
})().catch((err) => {
params.logVerboseMessage?.(
`matrix: failed auto-unbinding expired bindings account=${params.accountId}: ${String(err)}`,
);
});
}, THREAD_BINDINGS_SWEEP_INTERVAL_MS);
sweepTimer.unref?.();
}

View File

@@ -7,11 +7,12 @@ export const matrixOutbound: ChannelOutboundAdapter = {
chunker: (text, limit) => getMatrixRuntime().channel.text.chunkMarkdownText(text, limit),
chunkerMode: "markdown",
textChunkLimit: 4000,
sendText: async ({ to, text, deps, replyToId, threadId, accountId }) => {
sendText: async ({ cfg, to, text, deps, replyToId, threadId, accountId }) => {
const send = deps?.sendMatrix ?? sendMessageMatrix;
const resolvedThreadId =
threadId !== undefined && threadId !== null ? String(threadId) : undefined;
const result = await send(to, text, {
cfg,
replyToId: replyToId ?? undefined,
threadId: resolvedThreadId,
accountId: accountId ?? undefined,
@@ -22,11 +23,12 @@ export const matrixOutbound: ChannelOutboundAdapter = {
roomId: result.roomId,
};
},
sendMedia: async ({ to, text, mediaUrl, deps, replyToId, threadId, accountId }) => {
sendMedia: async ({ cfg, to, text, mediaUrl, deps, replyToId, threadId, accountId }) => {
const send = deps?.sendMatrix ?? sendMessageMatrix;
const resolvedThreadId =
threadId !== undefined && threadId !== null ? String(threadId) : undefined;
const result = await send(to, text, {
cfg,
mediaUrl,
replyToId: replyToId ?? undefined,
threadId: resolvedThreadId,
@@ -38,10 +40,11 @@ export const matrixOutbound: ChannelOutboundAdapter = {
roomId: result.roomId,
};
},
sendPoll: async ({ to, poll, threadId, accountId }) => {
sendPoll: async ({ cfg, to, poll, threadId, accountId }) => {
const resolvedThreadId =
threadId !== undefined && threadId !== null ? String(threadId) : undefined;
const result = await sendPollMatrix(to, poll, {
cfg,
threadId: resolvedThreadId,
accountId: accountId ?? undefined,
});

View File

@@ -19,13 +19,14 @@ export type MatrixProfileUpdateResult = {
};
export async function applyMatrixProfileUpdate(params: {
cfg?: CoreConfig;
account?: string;
displayName?: string;
avatarUrl?: string;
avatarPath?: string;
}): Promise<MatrixProfileUpdateResult> {
const runtime = getMatrixRuntime();
const cfg = runtime.config.loadConfig() as CoreConfig;
const persistedCfg = runtime.config.loadConfig() as CoreConfig;
const accountId = normalizeAccountId(params.account);
const displayName = params.displayName?.trim() || null;
const avatarUrl = params.avatarUrl?.trim() || null;
@@ -35,6 +36,7 @@ export async function applyMatrixProfileUpdate(params: {
}
const synced = await updateMatrixOwnProfile({
cfg: params.cfg,
accountId,
displayName: displayName ?? undefined,
avatarUrl: avatarUrl ?? undefined,
@@ -42,7 +44,7 @@ export async function applyMatrixProfileUpdate(params: {
});
const persistedAvatarUrl =
synced.uploadedAvatarSource && synced.resolvedAvatarUrl ? synced.resolvedAvatarUrl : avatarUrl;
const updated = updateMatrixAccountConfig(cfg, accountId, {
const updated = updateMatrixAccountConfig(persistedCfg, accountId, {
name: displayName ?? undefined,
avatarUrl: persistedAvatarUrl ?? undefined,
});

View File

@@ -76,6 +76,7 @@ describe("handleMatrixAction pollVote", () => {
});
it("parses snake_case vote params and forwards normalized selectors", async () => {
const cfg = {} as CoreConfig;
const result = await handleMatrixAction(
{
action: "pollVote",
@@ -87,10 +88,11 @@ describe("handleMatrixAction pollVote", () => {
poll_option_index: "2",
poll_option_indexes: ["1", "bogus"],
},
{} as CoreConfig,
cfg,
);
expect(mocks.voteMatrixPoll).toHaveBeenCalledWith("!room:example", "$poll", {
cfg,
accountId: "main",
optionIds: ["a2", "a1"],
optionIndexes: [1, 2],
@@ -118,6 +120,7 @@ describe("handleMatrixAction pollVote", () => {
});
it("passes account-scoped opts to add reactions", async () => {
const cfg = { channels: { matrix: { actions: { reactions: true } } } } as CoreConfig;
await handleMatrixAction(
{
action: "react",
@@ -126,15 +129,17 @@ describe("handleMatrixAction pollVote", () => {
messageId: "$msg",
emoji: "👍",
},
{ channels: { matrix: { actions: { reactions: true } } } } as CoreConfig,
cfg,
);
expect(mocks.reactMatrixMessage).toHaveBeenCalledWith("!room:example", "$msg", "👍", {
cfg,
accountId: "ops",
});
});
it("passes account-scoped opts to remove reactions", async () => {
const cfg = { channels: { matrix: { actions: { reactions: true } } } } as CoreConfig;
await handleMatrixAction(
{
action: "react",
@@ -144,16 +149,18 @@ describe("handleMatrixAction pollVote", () => {
emoji: "👍",
remove: true,
},
{ channels: { matrix: { actions: { reactions: true } } } } as CoreConfig,
cfg,
);
expect(mocks.removeMatrixReactions).toHaveBeenCalledWith("!room:example", "$msg", {
cfg,
accountId: "ops",
emoji: "👍",
});
});
it("passes account-scoped opts and limit to reaction listing", async () => {
const cfg = { channels: { matrix: { actions: { reactions: true } } } } as CoreConfig;
const result = await handleMatrixAction(
{
action: "reactions",
@@ -162,10 +169,11 @@ describe("handleMatrixAction pollVote", () => {
message_id: "$msg",
limit: "5",
},
{ channels: { matrix: { actions: { reactions: true } } } } as CoreConfig,
cfg,
);
expect(mocks.listMatrixReactions).toHaveBeenCalledWith("!room:example", "$msg", {
cfg,
accountId: "ops",
limit: 5,
});
@@ -176,6 +184,7 @@ describe("handleMatrixAction pollVote", () => {
});
it("passes account-scoped opts to message sends", async () => {
const cfg = { channels: { matrix: { actions: { messages: true } } } } as CoreConfig;
await handleMatrixAction(
{
action: "sendMessage",
@@ -184,10 +193,11 @@ describe("handleMatrixAction pollVote", () => {
content: "hello",
threadId: "$thread",
},
{ channels: { matrix: { actions: { messages: true } } } } as CoreConfig,
cfg,
);
expect(mocks.sendMatrixMessage).toHaveBeenCalledWith("room:!room:example", "hello", {
cfg,
accountId: "ops",
mediaUrl: undefined,
replyToId: undefined,
@@ -196,21 +206,26 @@ describe("handleMatrixAction pollVote", () => {
});
it("passes account-scoped opts to pin listing", async () => {
const cfg = { channels: { matrix: { actions: { pins: true } } } } as CoreConfig;
await handleMatrixAction(
{
action: "listPins",
accountId: "ops",
roomId: "!room:example",
},
{ channels: { matrix: { actions: { pins: true } } } } as CoreConfig,
cfg,
);
expect(mocks.listMatrixPins).toHaveBeenCalledWith("!room:example", {
cfg,
accountId: "ops",
});
});
it("passes account-scoped opts to member and room info actions", async () => {
const memberCfg = {
channels: { matrix: { actions: { memberInfo: true } } },
} as CoreConfig;
await handleMatrixAction(
{
action: "memberInfo",
@@ -218,27 +233,31 @@ describe("handleMatrixAction pollVote", () => {
userId: "@u:example",
roomId: "!room:example",
},
{ channels: { matrix: { actions: { memberInfo: true } } } } as CoreConfig,
memberCfg,
);
const roomCfg = { channels: { matrix: { actions: { channelInfo: true } } } } as CoreConfig;
await handleMatrixAction(
{
action: "channelInfo",
accountId: "ops",
roomId: "!room:example",
},
{ channels: { matrix: { actions: { channelInfo: true } } } } as CoreConfig,
roomCfg,
);
expect(mocks.getMatrixMemberInfo).toHaveBeenCalledWith("@u:example", {
cfg: memberCfg,
accountId: "ops",
roomId: "!room:example",
});
expect(mocks.getMatrixRoomInfo).toHaveBeenCalledWith("!room:example", {
cfg: roomCfg,
accountId: "ops",
});
});
it("persists self-profile updates through the shared profile helper", async () => {
const cfg = { channels: { matrix: { actions: { profile: true } } } } as CoreConfig;
const result = await handleMatrixAction(
{
action: "setProfile",
@@ -246,10 +265,11 @@ describe("handleMatrixAction pollVote", () => {
display_name: "Ops Bot",
avatar_url: "mxc://example/avatar",
},
{ channels: { matrix: { actions: { profile: true } } } } as CoreConfig,
cfg,
);
expect(mocks.applyMatrixProfileUpdate).toHaveBeenCalledWith({
cfg,
account: "ops",
displayName: "Ops Bot",
avatarUrl: "mxc://example/avatar",
@@ -265,16 +285,18 @@ describe("handleMatrixAction pollVote", () => {
});
it("accepts local avatar paths for self-profile updates", async () => {
const cfg = { channels: { matrix: { actions: { profile: true } } } } as CoreConfig;
await handleMatrixAction(
{
action: "setProfile",
accountId: "ops",
path: "/tmp/avatar.jpg",
},
{ channels: { matrix: { actions: { profile: true } } } } as CoreConfig,
cfg,
);
expect(mocks.applyMatrixProfileUpdate).toHaveBeenCalledWith({
cfg,
account: "ops",
displayName: undefined,
avatarUrl: undefined,

View File

@@ -133,7 +133,10 @@ export async function handleMatrixAction(
const action = readStringParam(params, "action", { required: true });
const accountId = readStringParam(params, "accountId") ?? undefined;
const isActionEnabled = createActionGate(resolveMatrixAccountConfig({ cfg, accountId }).actions);
const clientOpts = accountId ? { accountId } : {};
const clientOpts = {
cfg,
...(accountId ? { accountId } : {}),
};
if (reactionActions.has(action)) {
if (!isActionEnabled("reactions")) {
@@ -147,17 +150,17 @@ export async function handleMatrixAction(
});
if (remove || isEmpty) {
const result = await removeMatrixReactions(roomId, messageId, {
accountId,
...clientOpts,
emoji: remove ? emoji : undefined,
});
return jsonResult({ ok: true, removed: result.removed });
}
await reactMatrixMessage(roomId, messageId, emoji, { accountId });
await reactMatrixMessage(roomId, messageId, emoji, clientOpts);
return jsonResult({ ok: true, added: emoji });
}
const limit = readNumberParam(params, "limit", { integer: true });
const reactions = await listMatrixReactions(roomId, messageId, {
accountId,
...clientOpts,
limit: limit ?? undefined,
});
return jsonResult({ ok: true, reactions });
@@ -177,7 +180,7 @@ export async function handleMatrixAction(
...(optionIndex !== undefined ? [optionIndex] : []),
];
const result = await voteMatrixPoll(roomId, pollId, {
accountId,
...clientOpts,
optionIds,
optionIndexes,
});
@@ -270,6 +273,7 @@ export async function handleMatrixAction(
readStringParam(params, "path") ??
readStringParam(params, "filePath");
const result = await applyMatrixProfileUpdate({
cfg,
account: accountId,
displayName: readStringParam(params, "displayName") ?? readStringParam(params, "name"),
avatarUrl: readStringParam(params, "avatarUrl"),
@@ -312,12 +316,12 @@ export async function handleMatrixAction(
if (action === "encryptionStatus") {
const includeRecoveryKey = params.includeRecoveryKey === true;
const status = await getMatrixEncryptionStatus({ includeRecoveryKey, accountId });
const status = await getMatrixEncryptionStatus({ includeRecoveryKey, ...clientOpts });
return jsonResult({ ok: true, status });
}
if (action === "verificationStatus") {
const includeRecoveryKey = params.includeRecoveryKey === true;
const status = await getMatrixVerificationStatus({ includeRecoveryKey, accountId });
const status = await getMatrixVerificationStatus({ includeRecoveryKey, ...clientOpts });
return jsonResult({ ok: true, status });
}
if (action === "verificationBootstrap") {
@@ -327,7 +331,7 @@ export async function handleMatrixAction(
const result = await bootstrapMatrixVerification({
recoveryKey: recoveryKey ?? undefined,
forceResetCrossSigning: params.forceResetCrossSigning === true,
accountId,
...clientOpts,
});
return jsonResult({ ok: result.success, result });
}
@@ -337,12 +341,12 @@ export async function handleMatrixAction(
readStringParam(params, "key", { trim: false });
const result = await verifyMatrixRecoveryKey(
readStringParam({ recoveryKey }, "recoveryKey", { required: true, trim: false }),
{ accountId },
clientOpts,
);
return jsonResult({ ok: result.success, result });
}
if (action === "verificationBackupStatus") {
const status = await getMatrixRoomKeyBackupStatus({ accountId });
const status = await getMatrixRoomKeyBackupStatus(clientOpts);
return jsonResult({ ok: true, status });
}
if (action === "verificationBackupRestore") {
@@ -351,12 +355,12 @@ export async function handleMatrixAction(
readStringParam(params, "key", { trim: false });
const result = await restoreMatrixRoomKeyBackup({
recoveryKey: recoveryKey ?? undefined,
accountId,
...clientOpts,
});
return jsonResult({ ok: result.success, result });
}
if (action === "verificationList") {
const verifications = await listMatrixVerifications({ accountId });
const verifications = await listMatrixVerifications(clientOpts);
return jsonResult({ ok: true, verifications });
}
if (action === "verificationRequest") {
@@ -369,14 +373,14 @@ export async function handleMatrixAction(
userId: userId ?? undefined,
deviceId: deviceId ?? undefined,
roomId: roomId ?? undefined,
accountId,
...clientOpts,
});
return jsonResult({ ok: true, verification });
}
if (action === "verificationAccept") {
const verification = await acceptMatrixVerification(
readStringParam({ requestId }, "requestId", { required: true }),
{ accountId },
clientOpts,
);
return jsonResult({ ok: true, verification });
}
@@ -385,7 +389,7 @@ export async function handleMatrixAction(
const code = readStringParam(params, "code");
const verification = await cancelMatrixVerification(
readStringParam({ requestId }, "requestId", { required: true }),
{ reason: reason ?? undefined, code: code ?? undefined, accountId },
{ reason: reason ?? undefined, code: code ?? undefined, ...clientOpts },
);
return jsonResult({ ok: true, verification });
}
@@ -399,14 +403,14 @@ export async function handleMatrixAction(
}
const verification = await startMatrixVerification(
readStringParam({ requestId }, "requestId", { required: true }),
{ method: "sas", accountId },
{ method: "sas", ...clientOpts },
);
return jsonResult({ ok: true, verification });
}
if (action === "verificationGenerateQr") {
const qr = await generateMatrixVerificationQr(
readStringParam({ requestId }, "requestId", { required: true }),
{ accountId },
clientOpts,
);
return jsonResult({ ok: true, ...qr });
}
@@ -418,35 +422,35 @@ export async function handleMatrixAction(
const verification = await scanMatrixVerificationQr(
readStringParam({ requestId }, "requestId", { required: true }),
readStringParam({ qrDataBase64 }, "qrDataBase64", { required: true }),
{ accountId },
clientOpts,
);
return jsonResult({ ok: true, verification });
}
if (action === "verificationSas") {
const sas = await getMatrixVerificationSas(
readStringParam({ requestId }, "requestId", { required: true }),
{ accountId },
clientOpts,
);
return jsonResult({ ok: true, sas });
}
if (action === "verificationConfirm") {
const verification = await confirmMatrixVerificationSas(
readStringParam({ requestId }, "requestId", { required: true }),
{ accountId },
clientOpts,
);
return jsonResult({ ok: true, verification });
}
if (action === "verificationMismatch") {
const verification = await mismatchMatrixVerificationSas(
readStringParam({ requestId }, "requestId", { required: true }),
{ accountId },
clientOpts,
);
return jsonResult({ ok: true, verification });
}
if (action === "verificationConfirmQr") {
const verification = await confirmMatrixVerificationReciprocateQr(
readStringParam({ requestId }, "requestId", { required: true }),
{ accountId },
clientOpts,
);
return jsonResult({ ok: true, verification });
}

View File

@@ -55,6 +55,7 @@ type SendMatrixMessage = (
text: string,
opts?: {
cfg?: OpenClawConfig;
accountId?: string;
mediaUrl?: string;
replyToId?: string;
threadId?: string;