mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-22 23:41:07 +00:00
Matrix: isolate thread binding manager stateDir reuse
This commit is contained in:
@@ -59,11 +59,12 @@ describe("matrix thread bindings", () => {
|
||||
accessToken: "token",
|
||||
} as const;
|
||||
|
||||
function resolveBindingsFilePath() {
|
||||
function resolveBindingsFilePath(customStateDir?: string) {
|
||||
return path.join(
|
||||
resolveMatrixStoragePaths({
|
||||
...auth,
|
||||
env: process.env,
|
||||
...(customStateDir ? { stateDir: customStateDir } : {}),
|
||||
}).rootDir,
|
||||
"thread-bindings.json",
|
||||
);
|
||||
@@ -432,6 +433,98 @@ describe("matrix thread bindings", () => {
|
||||
expect(rotatedBindingsPath).toBe(initialBindingsPath);
|
||||
});
|
||||
|
||||
it("replaces reused account managers when the bindings stateDir changes", async () => {
|
||||
const initialStateDir = stateDir;
|
||||
const replacementStateDir = await fs.mkdtemp(
|
||||
path.join(os.tmpdir(), "matrix-thread-bindings-replacement-"),
|
||||
);
|
||||
|
||||
const initialManager = await createMatrixThreadBindingManager({
|
||||
accountId: "ops",
|
||||
auth,
|
||||
client: {} as never,
|
||||
stateDir: initialStateDir,
|
||||
idleTimeoutMs: 24 * 60 * 60 * 1000,
|
||||
maxAgeMs: 0,
|
||||
enableSweeper: false,
|
||||
});
|
||||
|
||||
await getSessionBindingService().bind({
|
||||
targetSessionKey: "agent:ops:subagent:child",
|
||||
targetKind: "subagent",
|
||||
conversation: {
|
||||
channel: "matrix",
|
||||
accountId: "ops",
|
||||
conversationId: "$thread",
|
||||
parentConversationId: "!room:example",
|
||||
},
|
||||
placement: "current",
|
||||
});
|
||||
|
||||
const replacementManager = await createMatrixThreadBindingManager({
|
||||
accountId: "ops",
|
||||
auth,
|
||||
client: {} as never,
|
||||
stateDir: replacementStateDir,
|
||||
idleTimeoutMs: 24 * 60 * 60 * 1000,
|
||||
maxAgeMs: 0,
|
||||
enableSweeper: false,
|
||||
});
|
||||
|
||||
expect(replacementManager).not.toBe(initialManager);
|
||||
expect(replacementManager.listBindings()).toEqual([]);
|
||||
expect(
|
||||
getSessionBindingService().resolveByConversation({
|
||||
channel: "matrix",
|
||||
accountId: "ops",
|
||||
conversationId: "$thread",
|
||||
parentConversationId: "!room:example",
|
||||
}),
|
||||
).toBeNull();
|
||||
|
||||
await getSessionBindingService().bind({
|
||||
targetSessionKey: "agent:ops:subagent:replacement",
|
||||
targetKind: "subagent",
|
||||
conversation: {
|
||||
channel: "matrix",
|
||||
accountId: "ops",
|
||||
conversationId: "$thread-2",
|
||||
parentConversationId: "!room:example",
|
||||
},
|
||||
placement: "current",
|
||||
});
|
||||
|
||||
await vi.waitFor(async () => {
|
||||
const replacementRaw = await fs.readFile(
|
||||
resolveBindingsFilePath(replacementStateDir),
|
||||
"utf-8",
|
||||
);
|
||||
expect(JSON.parse(replacementRaw)).toMatchObject({
|
||||
version: 1,
|
||||
bindings: [
|
||||
expect.objectContaining({
|
||||
conversationId: "$thread-2",
|
||||
parentConversationId: "!room:example",
|
||||
targetSessionKey: "agent:ops:subagent:replacement",
|
||||
}),
|
||||
],
|
||||
});
|
||||
});
|
||||
await vi.waitFor(async () => {
|
||||
const initialRaw = await fs.readFile(resolveBindingsFilePath(initialStateDir), "utf-8");
|
||||
expect(JSON.parse(initialRaw)).toMatchObject({
|
||||
version: 1,
|
||||
bindings: [
|
||||
expect.objectContaining({
|
||||
conversationId: "$thread",
|
||||
parentConversationId: "!room:example",
|
||||
targetSessionKey: "agent:ops:subagent:child",
|
||||
}),
|
||||
],
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it("updates lifecycle windows by session key and refreshes activity", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-03-06T10:00:00.000Z"));
|
||||
|
||||
@@ -62,7 +62,12 @@ export type MatrixThreadBindingManager = {
|
||||
stop: () => void;
|
||||
};
|
||||
|
||||
const MANAGERS_BY_ACCOUNT_ID = new Map<string, MatrixThreadBindingManager>();
|
||||
type MatrixThreadBindingManagerCacheEntry = {
|
||||
filePath: string;
|
||||
manager: MatrixThreadBindingManager;
|
||||
};
|
||||
|
||||
const MANAGERS_BY_ACCOUNT_ID = new Map<string, MatrixThreadBindingManagerCacheEntry>();
|
||||
const BINDINGS_BY_ACCOUNT_CONVERSATION = new Map<string, MatrixThreadBindingRecord>();
|
||||
|
||||
function normalizeDurationMs(raw: unknown, fallback: number): number {
|
||||
@@ -354,17 +359,19 @@ export async function createMatrixThreadBindingManager(params: {
|
||||
`Matrix thread binding account mismatch: requested ${params.accountId}, auth resolved ${params.auth.accountId}`,
|
||||
);
|
||||
}
|
||||
const existing = MANAGERS_BY_ACCOUNT_ID.get(params.accountId);
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
|
||||
const filePath = resolveBindingsPath({
|
||||
auth: params.auth,
|
||||
accountId: params.accountId,
|
||||
env: params.env,
|
||||
stateDir: params.stateDir,
|
||||
});
|
||||
const existingEntry = MANAGERS_BY_ACCOUNT_ID.get(params.accountId);
|
||||
if (existingEntry) {
|
||||
if (existingEntry.filePath === filePath) {
|
||||
return existingEntry.manager;
|
||||
}
|
||||
existingEntry.manager.stop();
|
||||
}
|
||||
const loaded = await loadBindingsFromDisk(filePath, params.accountId);
|
||||
for (const record of loaded) {
|
||||
setBindingRecord(record);
|
||||
@@ -499,7 +506,7 @@ export async function createMatrixThreadBindingManager(params: {
|
||||
channel: "matrix",
|
||||
accountId: params.accountId,
|
||||
});
|
||||
if (MANAGERS_BY_ACCOUNT_ID.get(params.accountId) === manager) {
|
||||
if (MANAGERS_BY_ACCOUNT_ID.get(params.accountId)?.manager === manager) {
|
||||
MANAGERS_BY_ACCOUNT_ID.delete(params.accountId);
|
||||
}
|
||||
for (const record of listBindingsForAccount(params.accountId)) {
|
||||
@@ -698,14 +705,17 @@ export async function createMatrixThreadBindingManager(params: {
|
||||
sweepTimer.unref?.();
|
||||
}
|
||||
|
||||
MANAGERS_BY_ACCOUNT_ID.set(params.accountId, manager);
|
||||
MANAGERS_BY_ACCOUNT_ID.set(params.accountId, {
|
||||
filePath,
|
||||
manager,
|
||||
});
|
||||
return manager;
|
||||
}
|
||||
|
||||
export function getMatrixThreadBindingManager(
|
||||
accountId: string,
|
||||
): MatrixThreadBindingManager | null {
|
||||
return MANAGERS_BY_ACCOUNT_ID.get(accountId) ?? null;
|
||||
return MANAGERS_BY_ACCOUNT_ID.get(accountId)?.manager ?? null;
|
||||
}
|
||||
|
||||
export function setMatrixThreadBindingIdleTimeoutBySessionKey(params: {
|
||||
@@ -713,7 +723,7 @@ export function setMatrixThreadBindingIdleTimeoutBySessionKey(params: {
|
||||
targetSessionKey: string;
|
||||
idleTimeoutMs: number;
|
||||
}): SessionBindingRecord[] {
|
||||
const manager = MANAGERS_BY_ACCOUNT_ID.get(params.accountId);
|
||||
const manager = MANAGERS_BY_ACCOUNT_ID.get(params.accountId)?.manager;
|
||||
if (!manager) {
|
||||
return [];
|
||||
}
|
||||
@@ -730,7 +740,7 @@ export function setMatrixThreadBindingMaxAgeBySessionKey(params: {
|
||||
targetSessionKey: string;
|
||||
maxAgeMs: number;
|
||||
}): SessionBindingRecord[] {
|
||||
const manager = MANAGERS_BY_ACCOUNT_ID.get(params.accountId);
|
||||
const manager = MANAGERS_BY_ACCOUNT_ID.get(params.accountId)?.manager;
|
||||
if (!manager) {
|
||||
return [];
|
||||
}
|
||||
@@ -743,7 +753,7 @@ export function setMatrixThreadBindingMaxAgeBySessionKey(params: {
|
||||
}
|
||||
|
||||
export function resetMatrixThreadBindingsForTests(): void {
|
||||
for (const manager of MANAGERS_BY_ACCOUNT_ID.values()) {
|
||||
for (const { manager } of MANAGERS_BY_ACCOUNT_ID.values()) {
|
||||
manager.stop();
|
||||
}
|
||||
MANAGERS_BY_ACCOUNT_ID.clear();
|
||||
|
||||
Reference in New Issue
Block a user