Matrix: harden thread binding persistence

This commit is contained in:
Gustavo Madeira Santana
2026-03-09 04:50:02 -04:00
parent 8273a755e9
commit 72f532dc45
2 changed files with 188 additions and 50 deletions

View File

@@ -8,9 +8,12 @@ import {
__testing,
} from "../../../../src/infra/outbound/session-binding-service.js";
import { setMatrixRuntime } from "../runtime.js";
import { resolveMatrixStoragePaths } from "./client/storage.js";
import {
createMatrixThreadBindingManager,
resetMatrixThreadBindingsForTests,
setMatrixThreadBindingIdleTimeoutBySessionKey,
setMatrixThreadBindingMaxAgeBySessionKey,
} from "./thread-bindings.js";
const sendMessageMatrixMock = vi.hoisted(() =>
@@ -30,6 +33,12 @@ vi.mock("./send.js", async () => {
describe("matrix thread bindings", () => {
let stateDir: string;
const auth = {
accountId: "ops",
homeserver: "https://matrix.example.org",
userId: "@bot:example.org",
accessToken: "token",
} as const;
beforeEach(async () => {
stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "matrix-thread-bindings-"));
@@ -46,12 +55,7 @@ describe("matrix thread bindings", () => {
it("creates child Matrix thread bindings from a top-level room context", async () => {
await createMatrixThreadBindingManager({
accountId: "ops",
auth: {
accountId: "ops",
homeserver: "https://matrix.example.org",
userId: "@bot:example.org",
accessToken: "token",
},
auth,
client: {} as never,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
@@ -87,12 +91,7 @@ describe("matrix thread bindings", () => {
it("posts intro messages inside existing Matrix threads for current placement", async () => {
await createMatrixThreadBindingManager({
accountId: "ops",
auth: {
accountId: "ops",
homeserver: "https://matrix.example.org",
userId: "@bot:example.org",
accessToken: "token",
},
auth,
client: {} as never,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
@@ -138,12 +137,7 @@ describe("matrix thread bindings", () => {
try {
await createMatrixThreadBindingManager({
accountId: "ops",
auth: {
accountId: "ops",
homeserver: "https://matrix.example.org",
userId: "@bot:example.org",
accessToken: "token",
},
auth,
client: {} as never,
idleTimeoutMs: 1_000,
maxAgeMs: 0,
@@ -184,12 +178,7 @@ describe("matrix thread bindings", () => {
it("sends threaded farewell messages when bindings are unbound", async () => {
await createMatrixThreadBindingManager({
accountId: "ops",
auth: {
accountId: "ops",
homeserver: "https://matrix.example.org",
userId: "@bot:example.org",
accessToken: "token",
},
auth,
client: {} as never,
idleTimeoutMs: 1_000,
maxAgeMs: 0,
@@ -226,4 +215,110 @@ describe("matrix thread bindings", () => {
}),
);
});
it("updates lifecycle windows by session key and refreshes activity", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-03-06T10:00:00.000Z"));
try {
const manager = await createMatrixThreadBindingManager({
accountId: "ops",
auth,
client: {} as never,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
enableSweeper: false,
});
await getSessionBindingService().bind({
targetSessionKey: "agent:ops:subagent:child",
targetKind: "subagent",
conversation: {
channel: "matrix",
accountId: "ops",
conversationId: "$thread",
parentConversationId: "!room:example",
},
placement: "current",
});
const original = manager.listBySessionKey("agent:ops:subagent:child")[0];
expect(original).toBeDefined();
const idleUpdated = setMatrixThreadBindingIdleTimeoutBySessionKey({
accountId: "ops",
targetSessionKey: "agent:ops:subagent:child",
idleTimeoutMs: 2 * 60 * 60 * 1000,
});
vi.setSystemTime(new Date("2026-03-06T12:00:00.000Z"));
const maxAgeUpdated = setMatrixThreadBindingMaxAgeBySessionKey({
accountId: "ops",
targetSessionKey: "agent:ops:subagent:child",
maxAgeMs: 6 * 60 * 60 * 1000,
});
expect(idleUpdated).toHaveLength(1);
expect(idleUpdated[0]?.metadata?.idleTimeoutMs).toBe(2 * 60 * 60 * 1000);
expect(maxAgeUpdated).toHaveLength(1);
expect(maxAgeUpdated[0]?.metadata?.maxAgeMs).toBe(6 * 60 * 60 * 1000);
expect(maxAgeUpdated[0]?.boundAt).toBe(original?.boundAt);
expect(maxAgeUpdated[0]?.metadata?.lastActivityAt).toBe(
Date.parse("2026-03-06T12:00:00.000Z"),
);
expect(manager.listBySessionKey("agent:ops:subagent:child")[0]?.maxAgeMs).toBe(
6 * 60 * 60 * 1000,
);
expect(manager.listBySessionKey("agent:ops:subagent:child")[0]?.lastActivityAt).toBe(
Date.parse("2026-03-06T12:00:00.000Z"),
);
} finally {
vi.useRealTimers();
}
});
it("flushes pending touch persistence on stop", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-03-06T10:00:00.000Z"));
try {
const manager = await createMatrixThreadBindingManager({
accountId: "ops",
auth,
client: {} as never,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
enableSweeper: false,
});
const binding = await getSessionBindingService().bind({
targetSessionKey: "agent:ops:subagent:child",
targetKind: "subagent",
conversation: {
channel: "matrix",
accountId: "ops",
conversationId: "$thread",
parentConversationId: "!room:example",
},
placement: "current",
});
const touchedAt = Date.parse("2026-03-06T12:00:00.000Z");
getSessionBindingService().touch(binding.bindingId, touchedAt);
manager.stop();
vi.useRealTimers();
const bindingsPath = path.join(
resolveMatrixStoragePaths({
...auth,
env: process.env,
}).rootDir,
"thread-bindings.json",
);
await vi.waitFor(async () => {
const raw = await fs.readFile(bindingsPath, "utf-8");
const parsed = JSON.parse(raw) as {
bindings?: Array<{ lastActivityAt?: number }>;
};
expect(parsed.bindings?.[0]?.lastActivityAt).toBe(touchedAt);
});
} finally {
vi.useRealTimers();
}
});
});

View File

@@ -232,14 +232,27 @@ async function loadBindingsFromDisk(filePath: string, accountId: string) {
return loaded;
}
async function persistBindings(filePath: string, accountId: string): Promise<void> {
const bindings = [...BINDINGS_BY_ACCOUNT_CONVERSATION.values()]
.filter((entry) => entry.accountId === accountId)
.sort((a, b) => a.boundAt - b.boundAt);
await writeJsonFileAtomically(filePath, {
function toStoredBindingsState(
bindings: MatrixThreadBindingRecord[],
): StoredMatrixThreadBindingState {
return {
version: STORE_VERSION,
bindings,
} satisfies StoredMatrixThreadBindingState);
bindings: [...bindings].sort((a, b) => a.boundAt - b.boundAt),
};
}
async function persistBindingsSnapshot(
filePath: string,
bindings: MatrixThreadBindingRecord[],
): Promise<void> {
await writeJsonFileAtomically(filePath, toStoredBindingsState(bindings));
}
async function persistBindings(filePath: string, accountId: string): Promise<void> {
await persistBindingsSnapshot(
filePath,
[...BINDINGS_BY_ACCOUNT_CONVERSATION.values()].filter((entry) => entry.accountId === accountId),
);
}
function setBindingRecord(record: MatrixThreadBindingRecord): void {
@@ -360,6 +373,16 @@ export async function createMatrixThreadBindingManager(params: {
}
const persist = async () => await persistBindings(filePath, params.accountId);
const persistSafely = (reason: string, bindings?: MatrixThreadBindingRecord[]) => {
void persistBindingsSnapshot(
filePath,
bindings ?? listBindingsForAccount(params.accountId),
).catch((err) => {
params.logVerboseMessage?.(
`matrix: failed persisting thread bindings account=${params.accountId} action=${reason}: ${String(err)}`,
);
});
};
const defaults = {
idleTimeoutMs: params.idleTimeoutMs,
maxAgeMs: params.maxAgeMs,
@@ -371,10 +394,32 @@ export async function createMatrixThreadBindingManager(params: {
}
persistTimer = setTimeout(() => {
persistTimer = null;
void persist();
persistSafely("delayed-touch");
}, delayMs);
persistTimer.unref?.();
};
const updateBindingsBySessionKey = (input: {
targetSessionKey: string;
update: (entry: MatrixThreadBindingRecord, now: number) => MatrixThreadBindingRecord;
persistReason: string;
}): MatrixThreadBindingRecord[] => {
const targetSessionKey = input.targetSessionKey.trim();
if (!targetSessionKey) {
return [];
}
const now = Date.now();
const nextBindings = listBindingsForAccount(params.accountId)
.filter((entry) => entry.targetSessionKey === targetSessionKey)
.map((entry) => input.update(entry, now));
if (nextBindings.length === 0) {
return [];
}
for (const entry of nextBindings) {
setBindingRecord(entry);
}
persistSafely(input.persistReason);
return nextBindings;
};
const manager: MatrixThreadBindingManager = {
accountId: params.accountId,
@@ -414,30 +459,26 @@ export async function createMatrixThreadBindingManager(params: {
return nextRecord;
},
setIdleTimeoutBySessionKey: ({ targetSessionKey, idleTimeoutMs }) => {
const nextBindings = listBindingsForAccount(params.accountId)
.filter((entry) => entry.targetSessionKey === targetSessionKey.trim())
.map((entry) => ({
return updateBindingsBySessionKey({
targetSessionKey,
persistReason: "idle-timeout-update",
update: (entry, now) => ({
...entry,
idleTimeoutMs: Math.max(0, Math.floor(idleTimeoutMs)),
}));
for (const entry of nextBindings) {
setBindingRecord(entry);
}
void persist();
return nextBindings;
lastActivityAt: now,
}),
});
},
setMaxAgeBySessionKey: ({ targetSessionKey, maxAgeMs }) => {
const nextBindings = listBindingsForAccount(params.accountId)
.filter((entry) => entry.targetSessionKey === targetSessionKey.trim())
.map((entry) => ({
return updateBindingsBySessionKey({
targetSessionKey,
persistReason: "max-age-update",
update: (entry, now) => ({
...entry,
maxAgeMs: Math.max(0, Math.floor(maxAgeMs)),
}));
for (const entry of nextBindings) {
setBindingRecord(entry);
}
void persist();
return nextBindings;
lastActivityAt: now,
}),
});
},
stop: () => {
if (sweepTimer) {
@@ -446,6 +487,7 @@ export async function createMatrixThreadBindingManager(params: {
if (persistTimer) {
clearTimeout(persistTimer);
persistTimer = null;
persistSafely("shutdown-flush");
}
unregisterSessionBindingAdapter({
channel: "matrix",
@@ -631,6 +673,7 @@ export async function createMatrixThreadBindingManager(params: {
}),
);
}, THREAD_BINDINGS_SWEEP_INTERVAL_MS);
sweepTimer.unref?.();
}
MANAGERS_BY_ACCOUNT_ID.set(params.accountId, manager);