diff --git a/extensions/matrix/src/matrix/thread-bindings.test.ts b/extensions/matrix/src/matrix/thread-bindings.test.ts index 5d4eb70d983..aeabc3edd5d 100644 --- a/extensions/matrix/src/matrix/thread-bindings.test.ts +++ b/extensions/matrix/src/matrix/thread-bindings.test.ts @@ -8,9 +8,12 @@ import { __testing, } from "../../../../src/infra/outbound/session-binding-service.js"; import { setMatrixRuntime } from "../runtime.js"; +import { resolveMatrixStoragePaths } from "./client/storage.js"; import { createMatrixThreadBindingManager, resetMatrixThreadBindingsForTests, + setMatrixThreadBindingIdleTimeoutBySessionKey, + setMatrixThreadBindingMaxAgeBySessionKey, } from "./thread-bindings.js"; const sendMessageMatrixMock = vi.hoisted(() => @@ -30,6 +33,12 @@ vi.mock("./send.js", async () => { describe("matrix thread bindings", () => { let stateDir: string; + const auth = { + accountId: "ops", + homeserver: "https://matrix.example.org", + userId: "@bot:example.org", + accessToken: "token", + } as const; beforeEach(async () => { stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "matrix-thread-bindings-")); @@ -46,12 +55,7 @@ describe("matrix thread bindings", () => { it("creates child Matrix thread bindings from a top-level room context", async () => { await createMatrixThreadBindingManager({ accountId: "ops", - auth: { - accountId: "ops", - homeserver: "https://matrix.example.org", - userId: "@bot:example.org", - accessToken: "token", - }, + auth, client: {} as never, idleTimeoutMs: 24 * 60 * 60 * 1000, maxAgeMs: 0, @@ -87,12 +91,7 @@ describe("matrix thread bindings", () => { it("posts intro messages inside existing Matrix threads for current placement", async () => { await createMatrixThreadBindingManager({ accountId: "ops", - auth: { - accountId: "ops", - homeserver: "https://matrix.example.org", - userId: "@bot:example.org", - accessToken: "token", - }, + auth, client: {} as never, idleTimeoutMs: 24 * 60 * 60 * 1000, maxAgeMs: 0, @@ -138,12 +137,7 @@ describe("matrix thread bindings", () => { try { await createMatrixThreadBindingManager({ accountId: "ops", - auth: { - accountId: "ops", - homeserver: "https://matrix.example.org", - userId: "@bot:example.org", - accessToken: "token", - }, + auth, client: {} as never, idleTimeoutMs: 1_000, maxAgeMs: 0, @@ -184,12 +178,7 @@ describe("matrix thread bindings", () => { it("sends threaded farewell messages when bindings are unbound", async () => { await createMatrixThreadBindingManager({ accountId: "ops", - auth: { - accountId: "ops", - homeserver: "https://matrix.example.org", - userId: "@bot:example.org", - accessToken: "token", - }, + auth, client: {} as never, idleTimeoutMs: 1_000, maxAgeMs: 0, @@ -226,4 +215,110 @@ describe("matrix thread bindings", () => { }), ); }); + + it("updates lifecycle windows by session key and refreshes activity", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-03-06T10:00:00.000Z")); + try { + const manager = await createMatrixThreadBindingManager({ + accountId: "ops", + auth, + client: {} as never, + idleTimeoutMs: 24 * 60 * 60 * 1000, + maxAgeMs: 0, + enableSweeper: false, + }); + + await getSessionBindingService().bind({ + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + conversation: { + channel: "matrix", + accountId: "ops", + conversationId: "$thread", + parentConversationId: "!room:example", + }, + placement: "current", + }); + const original = manager.listBySessionKey("agent:ops:subagent:child")[0]; + expect(original).toBeDefined(); + + const idleUpdated = setMatrixThreadBindingIdleTimeoutBySessionKey({ + accountId: "ops", + targetSessionKey: "agent:ops:subagent:child", + idleTimeoutMs: 2 * 60 * 60 * 1000, + }); + vi.setSystemTime(new Date("2026-03-06T12:00:00.000Z")); + const maxAgeUpdated = setMatrixThreadBindingMaxAgeBySessionKey({ + accountId: "ops", + targetSessionKey: "agent:ops:subagent:child", + maxAgeMs: 6 * 60 * 60 * 1000, + }); + + expect(idleUpdated).toHaveLength(1); + expect(idleUpdated[0]?.metadata?.idleTimeoutMs).toBe(2 * 60 * 60 * 1000); + expect(maxAgeUpdated).toHaveLength(1); + expect(maxAgeUpdated[0]?.metadata?.maxAgeMs).toBe(6 * 60 * 60 * 1000); + expect(maxAgeUpdated[0]?.boundAt).toBe(original?.boundAt); + expect(maxAgeUpdated[0]?.metadata?.lastActivityAt).toBe( + Date.parse("2026-03-06T12:00:00.000Z"), + ); + expect(manager.listBySessionKey("agent:ops:subagent:child")[0]?.maxAgeMs).toBe( + 6 * 60 * 60 * 1000, + ); + expect(manager.listBySessionKey("agent:ops:subagent:child")[0]?.lastActivityAt).toBe( + Date.parse("2026-03-06T12:00:00.000Z"), + ); + } finally { + vi.useRealTimers(); + } + }); + + it("flushes pending touch persistence on stop", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-03-06T10:00:00.000Z")); + try { + const manager = await createMatrixThreadBindingManager({ + accountId: "ops", + auth, + client: {} as never, + idleTimeoutMs: 24 * 60 * 60 * 1000, + maxAgeMs: 0, + enableSweeper: false, + }); + const binding = await getSessionBindingService().bind({ + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + conversation: { + channel: "matrix", + accountId: "ops", + conversationId: "$thread", + parentConversationId: "!room:example", + }, + placement: "current", + }); + const touchedAt = Date.parse("2026-03-06T12:00:00.000Z"); + getSessionBindingService().touch(binding.bindingId, touchedAt); + + manager.stop(); + vi.useRealTimers(); + + const bindingsPath = path.join( + resolveMatrixStoragePaths({ + ...auth, + env: process.env, + }).rootDir, + "thread-bindings.json", + ); + await vi.waitFor(async () => { + const raw = await fs.readFile(bindingsPath, "utf-8"); + const parsed = JSON.parse(raw) as { + bindings?: Array<{ lastActivityAt?: number }>; + }; + expect(parsed.bindings?.[0]?.lastActivityAt).toBe(touchedAt); + }); + } finally { + vi.useRealTimers(); + } + }); }); diff --git a/extensions/matrix/src/matrix/thread-bindings.ts b/extensions/matrix/src/matrix/thread-bindings.ts index ca617efdb5d..e2a877b277c 100644 --- a/extensions/matrix/src/matrix/thread-bindings.ts +++ b/extensions/matrix/src/matrix/thread-bindings.ts @@ -232,14 +232,27 @@ async function loadBindingsFromDisk(filePath: string, accountId: string) { return loaded; } -async function persistBindings(filePath: string, accountId: string): Promise { - const bindings = [...BINDINGS_BY_ACCOUNT_CONVERSATION.values()] - .filter((entry) => entry.accountId === accountId) - .sort((a, b) => a.boundAt - b.boundAt); - await writeJsonFileAtomically(filePath, { +function toStoredBindingsState( + bindings: MatrixThreadBindingRecord[], +): StoredMatrixThreadBindingState { + return { version: STORE_VERSION, - bindings, - } satisfies StoredMatrixThreadBindingState); + bindings: [...bindings].sort((a, b) => a.boundAt - b.boundAt), + }; +} + +async function persistBindingsSnapshot( + filePath: string, + bindings: MatrixThreadBindingRecord[], +): Promise { + await writeJsonFileAtomically(filePath, toStoredBindingsState(bindings)); +} + +async function persistBindings(filePath: string, accountId: string): Promise { + await persistBindingsSnapshot( + filePath, + [...BINDINGS_BY_ACCOUNT_CONVERSATION.values()].filter((entry) => entry.accountId === accountId), + ); } function setBindingRecord(record: MatrixThreadBindingRecord): void { @@ -360,6 +373,16 @@ export async function createMatrixThreadBindingManager(params: { } const persist = async () => await persistBindings(filePath, params.accountId); + const persistSafely = (reason: string, bindings?: MatrixThreadBindingRecord[]) => { + void persistBindingsSnapshot( + filePath, + bindings ?? listBindingsForAccount(params.accountId), + ).catch((err) => { + params.logVerboseMessage?.( + `matrix: failed persisting thread bindings account=${params.accountId} action=${reason}: ${String(err)}`, + ); + }); + }; const defaults = { idleTimeoutMs: params.idleTimeoutMs, maxAgeMs: params.maxAgeMs, @@ -371,10 +394,32 @@ export async function createMatrixThreadBindingManager(params: { } persistTimer = setTimeout(() => { persistTimer = null; - void persist(); + persistSafely("delayed-touch"); }, delayMs); persistTimer.unref?.(); }; + const updateBindingsBySessionKey = (input: { + targetSessionKey: string; + update: (entry: MatrixThreadBindingRecord, now: number) => MatrixThreadBindingRecord; + persistReason: string; + }): MatrixThreadBindingRecord[] => { + const targetSessionKey = input.targetSessionKey.trim(); + if (!targetSessionKey) { + return []; + } + const now = Date.now(); + const nextBindings = listBindingsForAccount(params.accountId) + .filter((entry) => entry.targetSessionKey === targetSessionKey) + .map((entry) => input.update(entry, now)); + if (nextBindings.length === 0) { + return []; + } + for (const entry of nextBindings) { + setBindingRecord(entry); + } + persistSafely(input.persistReason); + return nextBindings; + }; const manager: MatrixThreadBindingManager = { accountId: params.accountId, @@ -414,30 +459,26 @@ export async function createMatrixThreadBindingManager(params: { return nextRecord; }, setIdleTimeoutBySessionKey: ({ targetSessionKey, idleTimeoutMs }) => { - const nextBindings = listBindingsForAccount(params.accountId) - .filter((entry) => entry.targetSessionKey === targetSessionKey.trim()) - .map((entry) => ({ + return updateBindingsBySessionKey({ + targetSessionKey, + persistReason: "idle-timeout-update", + update: (entry, now) => ({ ...entry, idleTimeoutMs: Math.max(0, Math.floor(idleTimeoutMs)), - })); - for (const entry of nextBindings) { - setBindingRecord(entry); - } - void persist(); - return nextBindings; + lastActivityAt: now, + }), + }); }, setMaxAgeBySessionKey: ({ targetSessionKey, maxAgeMs }) => { - const nextBindings = listBindingsForAccount(params.accountId) - .filter((entry) => entry.targetSessionKey === targetSessionKey.trim()) - .map((entry) => ({ + return updateBindingsBySessionKey({ + targetSessionKey, + persistReason: "max-age-update", + update: (entry, now) => ({ ...entry, maxAgeMs: Math.max(0, Math.floor(maxAgeMs)), - })); - for (const entry of nextBindings) { - setBindingRecord(entry); - } - void persist(); - return nextBindings; + lastActivityAt: now, + }), + }); }, stop: () => { if (sweepTimer) { @@ -446,6 +487,7 @@ export async function createMatrixThreadBindingManager(params: { if (persistTimer) { clearTimeout(persistTimer); persistTimer = null; + persistSafely("shutdown-flush"); } unregisterSessionBindingAdapter({ channel: "matrix", @@ -631,6 +673,7 @@ export async function createMatrixThreadBindingManager(params: { }), ); }, THREAD_BINDINGS_SWEEP_INTERVAL_MS); + sweepTimer.unref?.(); } MANAGERS_BY_ACCOUNT_ID.set(params.accountId, manager);