From 974ddcecaba9368779dca0d210485af83f4c72d1 Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Sat, 14 Mar 2026 19:22:38 +0000 Subject: [PATCH] Matrix: fix one-off client and media-send regressions --- .../src/actions.account-propagation.test.ts | 24 +++++ extensions/matrix/src/actions.ts | 4 +- .../matrix/src/matrix/actions/client.test.ts | 51 +++++----- .../matrix/src/matrix/actions/messages.ts | 2 +- .../matrix/src/matrix/client-bootstrap.ts | 14 +-- .../matrix/client-resolver.test-helpers.ts | 16 ++-- extensions/matrix/src/matrix/client.ts | 2 + .../matrix/src/matrix/client/shared.test.ts | 42 +++++++++ extensions/matrix/src/matrix/client/shared.ts | 92 ++++++++++++++++--- extensions/matrix/src/matrix/send.ts | 2 +- .../matrix/src/matrix/send/client.test.ts | 31 ++++--- extensions/matrix/src/tool-actions.test.ts | 23 +++++ extensions/matrix/src/tool-actions.ts | 4 +- .../pi-embedded-runner/compaction-failures.ts | 24 +++++ .../sanitize-session-history.thinking.test.ts | 39 ++++++++ src/channels/plugins/setup-helpers.test.ts | 38 ++++++++ src/channels/plugins/setup-helpers.ts | 29 ++++-- 17 files changed, 351 insertions(+), 86 deletions(-) create mode 100644 src/agents/pi-embedded-runner/compaction-failures.ts create mode 100644 src/agents/pi-embedded-runner/sanitize-session-history.thinking.test.ts diff --git a/extensions/matrix/src/actions.account-propagation.test.ts b/extensions/matrix/src/actions.account-propagation.test.ts index c74c9ca7e35..0675fb2e440 100644 --- a/extensions/matrix/src/actions.account-propagation.test.ts +++ b/extensions/matrix/src/actions.account-propagation.test.ts @@ -155,4 +155,28 @@ describe("matrixMessageActions account propagation", () => { { mediaLocalRoots: ["/tmp/openclaw-matrix-test"] }, ); }); + + it("allows media-only sends without requiring a message body", async () => { + await matrixMessageActions.handleAction?.( + createContext({ + action: "send", + accountId: "ops", + params: { + to: "room:!room:example", + media: "file:///tmp/photo.png", + }, + }), + ); + + expect(mocks.handleMatrixAction).toHaveBeenCalledWith( + expect.objectContaining({ + action: "sendMessage", + accountId: "ops", + content: undefined, + mediaUrl: "file:///tmp/photo.png", + }), + expect.any(Object), + { mediaLocalRoots: undefined }, + ); + }); }); diff --git a/extensions/matrix/src/actions.ts b/extensions/matrix/src/actions.ts index 97c46867683..618eda64231 100644 --- a/extensions/matrix/src/actions.ts +++ b/extensions/matrix/src/actions.ts @@ -114,11 +114,11 @@ export const matrixMessageActions: ChannelMessageActionAdapter = { if (action === "send") { const to = readStringParam(params, "to", { required: true }); + const mediaUrl = readStringParam(params, "media", { trim: false }); const content = readStringParam(params, "message", { - required: true, + required: !mediaUrl, allowEmpty: true, }); - const mediaUrl = readStringParam(params, "media", { trim: false }); const replyTo = readStringParam(params, "replyTo"); const threadId = readStringParam(params, "threadId"); return await dispatch({ diff --git a/extensions/matrix/src/matrix/actions/client.test.ts b/extensions/matrix/src/matrix/actions/client.test.ts index c6ec7bac6fd..23e0c7a6667 100644 --- a/extensions/matrix/src/matrix/actions/client.test.ts +++ b/extensions/matrix/src/matrix/actions/client.test.ts @@ -11,8 +11,8 @@ const { loadConfigMock, getMatrixRuntimeMock, getActiveMatrixClientMock, - resolveSharedMatrixClientMock, - removeSharedClientInstanceMock, + acquireSharedMatrixClientMock, + releaseSharedClientInstanceMock, isBunRuntimeMock, resolveMatrixAuthContextMock, } = matrixClientResolverMocks; @@ -26,13 +26,13 @@ vi.mock("../active-client.js", () => ({ })); vi.mock("../client.js", () => ({ - resolveSharedMatrixClient: resolveSharedMatrixClientMock, + acquireSharedMatrixClient: acquireSharedMatrixClientMock, isBunRuntime: () => isBunRuntimeMock(), resolveMatrixAuthContext: resolveMatrixAuthContextMock, })); vi.mock("../client/shared.js", () => ({ - removeSharedClientInstance: (...args: unknown[]) => removeSharedClientInstanceMock(...args), + releaseSharedClientInstance: (...args: unknown[]) => releaseSharedClientInstanceMock(...args), })); vi.mock("../send.js", () => ({ @@ -65,37 +65,35 @@ describe("action client helpers", () => { const result = await withResolvedActionClient({ accountId: "default" }, async () => "ok"); expect(getActiveMatrixClientMock).toHaveBeenCalledWith("default"); - expect(resolveSharedMatrixClientMock).toHaveBeenCalledTimes(1); - expect(resolveSharedMatrixClientMock).toHaveBeenCalledWith({ + expect(acquireSharedMatrixClientMock).toHaveBeenCalledTimes(1); + expect(acquireSharedMatrixClientMock).toHaveBeenCalledWith({ cfg: {}, timeoutMs: undefined, accountId: "default", + startClient: false, }); - const sharedClient = await resolveSharedMatrixClientMock.mock.results[0]?.value; + const sharedClient = await acquireSharedMatrixClientMock.mock.results[0]?.value; expect(sharedClient.prepareForOneOff).toHaveBeenCalledTimes(1); - expect(sharedClient.stop).toHaveBeenCalledTimes(1); - expect(removeSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient); + expect(releaseSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient, "stop"); expect(result).toBe("ok"); }); it("skips one-off room preparation when readiness is disabled", async () => { await withResolvedActionClient({ accountId: "default", readiness: "none" }, async () => {}); - const sharedClient = await resolveSharedMatrixClientMock.mock.results[0]?.value; + const sharedClient = await acquireSharedMatrixClientMock.mock.results[0]?.value; expect(sharedClient.prepareForOneOff).not.toHaveBeenCalled(); expect(sharedClient.start).not.toHaveBeenCalled(); - expect(sharedClient.stop).toHaveBeenCalledTimes(1); + expect(releaseSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient, "stop"); }); it("starts one-off clients when started readiness is required", async () => { await withStartedActionClient({ accountId: "default" }, async () => {}); - const sharedClient = await resolveSharedMatrixClientMock.mock.results[0]?.value; + const sharedClient = await acquireSharedMatrixClientMock.mock.results[0]?.value; expect(sharedClient.start).toHaveBeenCalledTimes(1); expect(sharedClient.prepareForOneOff).not.toHaveBeenCalled(); - expect(sharedClient.stop).not.toHaveBeenCalled(); - expect(sharedClient.stopAndPersist).toHaveBeenCalledTimes(1); - expect(removeSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient); + expect(releaseSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient, "persist"); }); it("reuses active monitor client when available", async () => { @@ -108,7 +106,7 @@ describe("action client helpers", () => { }); expect(result).toBe("ok"); - expect(resolveSharedMatrixClientMock).not.toHaveBeenCalled(); + expect(acquireSharedMatrixClientMock).not.toHaveBeenCalled(); expect(activeClient.stop).not.toHaveBeenCalled(); }); @@ -155,10 +153,11 @@ describe("action client helpers", () => { await withResolvedActionClient({}, async () => {}); expect(getActiveMatrixClientMock).toHaveBeenCalledWith("ops"); - expect(resolveSharedMatrixClientMock).toHaveBeenCalledWith({ + expect(acquireSharedMatrixClientMock).toHaveBeenCalledWith({ cfg: loadConfigMock(), timeoutMs: undefined, accountId: "ops", + startClient: false, }); }); @@ -178,16 +177,17 @@ describe("action client helpers", () => { cfg: explicitCfg, accountId: "ops", }); - expect(resolveSharedMatrixClientMock).toHaveBeenCalledWith({ + expect(acquireSharedMatrixClientMock).toHaveBeenCalledWith({ cfg: explicitCfg, timeoutMs: undefined, accountId: "ops", + startClient: false, }); }); it("stops shared action clients after wrapped calls succeed", async () => { const sharedClient = createMockMatrixClient(); - resolveSharedMatrixClientMock.mockResolvedValue(sharedClient); + acquireSharedMatrixClientMock.mockResolvedValue(sharedClient); const result = await withResolvedActionClient({ accountId: "default" }, async (client) => { expect(client).toBe(sharedClient); @@ -195,14 +195,12 @@ describe("action client helpers", () => { }); expect(result).toBe("ok"); - expect(sharedClient.stop).toHaveBeenCalledTimes(1); - expect(sharedClient.stopAndPersist).not.toHaveBeenCalled(); - expect(removeSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient); + expect(releaseSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient, "stop"); }); it("stops shared action clients when the wrapped call throws", async () => { const sharedClient = createMockMatrixClient(); - resolveSharedMatrixClientMock.mockResolvedValue(sharedClient); + acquireSharedMatrixClientMock.mockResolvedValue(sharedClient); await expect( withResolvedActionClient({ accountId: "default" }, async () => { @@ -210,13 +208,12 @@ describe("action client helpers", () => { }), ).rejects.toThrow("boom"); - expect(sharedClient.stop).toHaveBeenCalledTimes(1); - expect(sharedClient.stopAndPersist).not.toHaveBeenCalled(); + expect(releaseSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient, "stop"); }); it("resolves room ids before running wrapped room actions", async () => { const sharedClient = createMockMatrixClient(); - resolveSharedMatrixClientMock.mockResolvedValue(sharedClient); + acquireSharedMatrixClientMock.mockResolvedValue(sharedClient); resolveMatrixRoomIdMock.mockResolvedValue("!room:example.org"); const result = await withResolvedRoomAction( @@ -230,6 +227,6 @@ describe("action client helpers", () => { expect(resolveMatrixRoomIdMock).toHaveBeenCalledWith(sharedClient, "room:#ops:example.org"); expect(result).toBe("!room:example.org"); - expect(sharedClient.stop).toHaveBeenCalledTimes(1); + expect(releaseSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient, "stop"); }); }); diff --git a/extensions/matrix/src/matrix/actions/messages.ts b/extensions/matrix/src/matrix/actions/messages.ts index 03b9ba847c8..728b5d1dfec 100644 --- a/extensions/matrix/src/matrix/actions/messages.ts +++ b/extensions/matrix/src/matrix/actions/messages.ts @@ -16,7 +16,7 @@ import { export async function sendMatrixMessage( to: string, - content: string, + content: string | undefined, opts: MatrixActionClientOpts & { mediaUrl?: string; replyToId?: string; diff --git a/extensions/matrix/src/matrix/client-bootstrap.ts b/extensions/matrix/src/matrix/client-bootstrap.ts index 37b3387c72b..d52006ebe90 100644 --- a/extensions/matrix/src/matrix/client-bootstrap.ts +++ b/extensions/matrix/src/matrix/client-bootstrap.ts @@ -1,8 +1,8 @@ import { getMatrixRuntime } from "../runtime.js"; import type { CoreConfig } from "../types.js"; import { getActiveMatrixClient } from "./active-client.js"; -import { isBunRuntime, resolveMatrixAuthContext, resolveSharedMatrixClient } from "./client.js"; -import { removeSharedClientInstance } from "./client/shared.js"; +import { acquireSharedMatrixClient, isBunRuntime, resolveMatrixAuthContext } from "./client.js"; +import { releaseSharedClientInstance } from "./client/shared.js"; import type { MatrixClient } from "./sdk.js"; type ResolvedRuntimeMatrixClient = { @@ -63,22 +63,18 @@ async function resolveRuntimeMatrixClient(opts: { return { client: active, stopOnDone: false }; } - const client = await resolveSharedMatrixClient({ + const client = await acquireSharedMatrixClient({ cfg, timeoutMs: opts.timeoutMs, accountId: authContext.accountId, + startClient: false, }); await opts.onResolved?.(client, { preparedByDefault: true }); return { client, stopOnDone: true, cleanup: async (mode) => { - if (mode === "persist") { - await client.stopAndPersist(); - } else { - client.stop(); - } - removeSharedClientInstance(client); + await releaseSharedClientInstance(client, mode); }, }; } diff --git a/extensions/matrix/src/matrix/client-resolver.test-helpers.ts b/extensions/matrix/src/matrix/client-resolver.test-helpers.ts index 06605e0afed..ef90b3863dd 100644 --- a/extensions/matrix/src/matrix/client-resolver.test-helpers.ts +++ b/extensions/matrix/src/matrix/client-resolver.test-helpers.ts @@ -5,8 +5,8 @@ type MatrixClientResolverMocks = { loadConfigMock: Mock<() => unknown>; getMatrixRuntimeMock: Mock<() => unknown>; getActiveMatrixClientMock: Mock<(...args: unknown[]) => MatrixClient | null>; - resolveSharedMatrixClientMock: Mock<(...args: unknown[]) => Promise>; - removeSharedClientInstanceMock: Mock<(...args: unknown[]) => void>; + acquireSharedMatrixClientMock: Mock<(...args: unknown[]) => Promise>; + releaseSharedClientInstanceMock: Mock<(...args: unknown[]) => Promise>; isBunRuntimeMock: Mock<() => boolean>; resolveMatrixAuthContextMock: Mock< (params: { cfg: unknown; accountId?: string | null }) => unknown @@ -17,8 +17,8 @@ export const matrixClientResolverMocks: MatrixClientResolverMocks = { loadConfigMock: vi.fn(() => ({})), getMatrixRuntimeMock: vi.fn(), getActiveMatrixClientMock: vi.fn(), - resolveSharedMatrixClientMock: vi.fn(), - removeSharedClientInstanceMock: vi.fn(), + acquireSharedMatrixClientMock: vi.fn(), + releaseSharedClientInstanceMock: vi.fn(), isBunRuntimeMock: vi.fn(() => false), resolveMatrixAuthContextMock: vi.fn(), }; @@ -43,8 +43,8 @@ export function primeMatrixClientResolverMocks(params?: { loadConfigMock, getMatrixRuntimeMock, getActiveMatrixClientMock, - resolveSharedMatrixClientMock, - removeSharedClientInstanceMock, + acquireSharedMatrixClientMock, + releaseSharedClientInstanceMock, isBunRuntimeMock, resolveMatrixAuthContextMock, } = matrixClientResolverMocks; @@ -70,7 +70,7 @@ export function primeMatrixClientResolverMocks(params?: { }); getActiveMatrixClientMock.mockReturnValue(null); isBunRuntimeMock.mockReturnValue(false); - removeSharedClientInstanceMock.mockReset(); + releaseSharedClientInstanceMock.mockReset().mockResolvedValue(true); resolveMatrixAuthContextMock.mockImplementation( ({ cfg: explicitCfg, @@ -88,7 +88,7 @@ export function primeMatrixClientResolverMocks(params?: { }, }), ); - resolveSharedMatrixClientMock.mockResolvedValue(client); + acquireSharedMatrixClientMock.mockResolvedValue(client); return client; } diff --git a/extensions/matrix/src/matrix/client.ts b/extensions/matrix/src/matrix/client.ts index ccf35b58d70..3d6a9f4ff92 100644 --- a/extensions/matrix/src/matrix/client.ts +++ b/extensions/matrix/src/matrix/client.ts @@ -11,7 +11,9 @@ export { } from "./client/config.js"; export { createMatrixClient } from "./client/create-client.js"; export { + acquireSharedMatrixClient, removeSharedClientInstance, + releaseSharedClientInstance, resolveSharedMatrixClient, stopSharedClientForAccount, stopSharedClientInstance, diff --git a/extensions/matrix/src/matrix/client/shared.test.ts b/extensions/matrix/src/matrix/client/shared.test.ts index ab12a182961..c7e7d3e1a97 100644 --- a/extensions/matrix/src/matrix/client/shared.test.ts +++ b/extensions/matrix/src/matrix/client/shared.test.ts @@ -15,6 +15,8 @@ vi.mock("./create-client.js", () => ({ })); import { + acquireSharedMatrixClient, + releaseSharedClientInstance, resolveSharedMatrixClient, stopSharedClient, stopSharedClientForAccount, @@ -174,6 +176,46 @@ describe("resolveSharedMatrixClient", () => { ); }); + it("honors startClient false even when the caller acquires a shared lease", async () => { + const mainAuth = authFor("main"); + const mainClient = createMockClient("main"); + + resolveMatrixAuthMock.mockResolvedValue(mainAuth); + createMatrixClientMock.mockResolvedValue(mainClient); + + const client = await acquireSharedMatrixClient({ accountId: "main", startClient: false }); + + expect(client).toBe(mainClient); + expect(mainClient.start).not.toHaveBeenCalled(); + }); + + it("keeps shared clients alive until the last one-off lease releases", async () => { + const mainAuth = authFor("main"); + const mainClient = { + ...createMockClient("main"), + stopAndPersist: vi.fn(async () => undefined), + }; + + resolveMatrixAuthMock.mockResolvedValue(mainAuth); + createMatrixClientMock.mockResolvedValue(mainClient); + + const first = await acquireSharedMatrixClient({ accountId: "main", startClient: false }); + const second = await acquireSharedMatrixClient({ accountId: "main", startClient: false }); + + expect(first).toBe(mainClient); + expect(second).toBe(mainClient); + + expect( + await releaseSharedClientInstance(mainClient as unknown as import("../sdk.js").MatrixClient), + ).toBe(false); + expect(mainClient.stop).not.toHaveBeenCalled(); + + expect( + await releaseSharedClientInstance(mainClient as unknown as import("../sdk.js").MatrixClient), + ).toBe(true); + expect(mainClient.stop).toHaveBeenCalledTimes(1); + }); + it("rejects mismatched explicit account ids when auth is already resolved", async () => { await expect( resolveSharedMatrixClient({ diff --git a/extensions/matrix/src/matrix/client/shared.ts b/extensions/matrix/src/matrix/client/shared.ts index a7b1a14ce96..dc3186d2682 100644 --- a/extensions/matrix/src/matrix/client/shared.ts +++ b/extensions/matrix/src/matrix/client/shared.ts @@ -12,6 +12,7 @@ type SharedMatrixClientState = { started: boolean; cryptoReady: boolean; startPromise: Promise | null; + leases: number; }; const sharedClientStates = new Map(); @@ -48,9 +49,24 @@ async function createSharedMatrixClient(params: { started: false, cryptoReady: false, startPromise: null, + leases: 0, }; } +function findSharedClientStateByInstance(client: MatrixClient): SharedMatrixClientState | null { + for (const state of sharedClientStates.values()) { + if (state.client === client) { + return state; + } + } + return null; +} + +function deleteSharedClientState(state: SharedMatrixClientState): void { + sharedClientStates.delete(state.key); + sharedClientPromises.delete(state.key); +} + async function ensureSharedClientStarted(params: { state: SharedMatrixClientState; timeoutMs?: number; @@ -92,7 +108,7 @@ async function ensureSharedClientStarted(params: { } } -export async function resolveSharedMatrixClient( +async function resolveSharedMatrixClientState( params: { cfg?: CoreConfig; env?: NodeJS.ProcessEnv; @@ -101,7 +117,7 @@ export async function resolveSharedMatrixClient( startClient?: boolean; accountId?: string | null; } = {}, -): Promise { +): Promise { const requestedAccountId = normalizeOptionalAccountId(params.accountId); if (params.auth && requestedAccountId && requestedAccountId !== params.auth.accountId) { throw new Error( @@ -135,7 +151,7 @@ export async function resolveSharedMatrixClient( encryption: auth.encryption, }); } - return existingState.client; + return existingState; } const existingPromise = sharedClientPromises.get(key); @@ -149,7 +165,7 @@ export async function resolveSharedMatrixClient( encryption: auth.encryption, }); } - return pending.client; + return pending; } const creationPromise = createSharedMatrixClient({ @@ -169,12 +185,41 @@ export async function resolveSharedMatrixClient( encryption: auth.encryption, }); } - return created.client; + return created; } finally { sharedClientPromises.delete(key); } } +export async function resolveSharedMatrixClient( + params: { + cfg?: CoreConfig; + env?: NodeJS.ProcessEnv; + timeoutMs?: number; + auth?: MatrixAuth; + startClient?: boolean; + accountId?: string | null; + } = {}, +): Promise { + const state = await resolveSharedMatrixClientState(params); + return state.client; +} + +export async function acquireSharedMatrixClient( + params: { + cfg?: CoreConfig; + env?: NodeJS.ProcessEnv; + timeoutMs?: number; + auth?: MatrixAuth; + startClient?: boolean; + accountId?: string | null; + } = {}, +): Promise { + const state = await resolveSharedMatrixClientState(params); + state.leases += 1; + return state.client; +} + export function stopSharedClient(): void { for (const state of sharedClientStates.values()) { state.client.stop(); @@ -190,20 +235,16 @@ export function stopSharedClientForAccount(auth: MatrixAuth): void { return; } state.client.stop(); - sharedClientStates.delete(key); - sharedClientPromises.delete(key); + deleteSharedClientState(state); } export function removeSharedClientInstance(client: MatrixClient): boolean { - for (const [key, state] of sharedClientStates.entries()) { - if (state.client !== client) { - continue; - } - sharedClientStates.delete(key); - sharedClientPromises.delete(key); - return true; + const state = findSharedClientStateByInstance(client); + if (!state) { + return false; } - return false; + deleteSharedClientState(state); + return true; } export function stopSharedClientInstance(client: MatrixClient): void { @@ -212,3 +253,24 @@ export function stopSharedClientInstance(client: MatrixClient): void { } client.stop(); } + +export async function releaseSharedClientInstance( + client: MatrixClient, + mode: "stop" | "persist" = "stop", +): Promise { + const state = findSharedClientStateByInstance(client); + if (!state) { + return false; + } + state.leases = Math.max(0, state.leases - 1); + if (state.leases > 0) { + return false; + } + deleteSharedClientState(state); + if (mode === "persist") { + await client.stopAndPersist(); + } else { + client.stop(); + } + return true; +} diff --git a/extensions/matrix/src/matrix/send.ts b/extensions/matrix/src/matrix/send.ts index 411fdbc10d9..f0fcf75c6f7 100644 --- a/extensions/matrix/src/matrix/send.ts +++ b/extensions/matrix/src/matrix/send.ts @@ -63,7 +63,7 @@ function normalizeMatrixClientResolveOpts( export async function sendMessageMatrix( to: string, - message: string, + message: string | undefined, opts: MatrixSendOpts = {}, ): Promise { const trimmedMessage = message?.trim() ?? ""; diff --git a/extensions/matrix/src/matrix/send/client.test.ts b/extensions/matrix/src/matrix/send/client.test.ts index 8f29130876e..5b5a17d23bb 100644 --- a/extensions/matrix/src/matrix/send/client.test.ts +++ b/extensions/matrix/src/matrix/send/client.test.ts @@ -8,8 +8,8 @@ import { const { getMatrixRuntimeMock, getActiveMatrixClientMock, - resolveSharedMatrixClientMock, - removeSharedClientInstanceMock, + acquireSharedMatrixClientMock, + releaseSharedClientInstanceMock, isBunRuntimeMock, resolveMatrixAuthContextMock, } = matrixClientResolverMocks; @@ -19,13 +19,13 @@ vi.mock("../active-client.js", () => ({ })); vi.mock("../client.js", () => ({ - resolveSharedMatrixClient: (...args: unknown[]) => resolveSharedMatrixClientMock(...args), + acquireSharedMatrixClient: (...args: unknown[]) => acquireSharedMatrixClientMock(...args), isBunRuntime: () => isBunRuntimeMock(), resolveMatrixAuthContext: resolveMatrixAuthContextMock, })); vi.mock("../client/shared.js", () => ({ - removeSharedClientInstance: (...args: unknown[]) => removeSharedClientInstanceMock(...args), + releaseSharedClientInstance: (...args: unknown[]) => releaseSharedClientInstanceMock(...args), })); vi.mock("../../runtime.js", () => ({ @@ -54,16 +54,16 @@ describe("withResolvedMatrixClient", () => { const result = await withResolvedMatrixClient({ accountId: "default" }, async () => "ok"); expect(getActiveMatrixClientMock).toHaveBeenCalledWith("default"); - expect(resolveSharedMatrixClientMock).toHaveBeenCalledTimes(1); - expect(resolveSharedMatrixClientMock).toHaveBeenCalledWith({ + expect(acquireSharedMatrixClientMock).toHaveBeenCalledTimes(1); + expect(acquireSharedMatrixClientMock).toHaveBeenCalledWith({ cfg: {}, timeoutMs: undefined, accountId: "default", + startClient: false, }); - const sharedClient = await resolveSharedMatrixClientMock.mock.results[0]?.value; + const sharedClient = await acquireSharedMatrixClientMock.mock.results[0]?.value; expect(sharedClient.prepareForOneOff).toHaveBeenCalledTimes(1); - expect(sharedClient.stop).toHaveBeenCalledTimes(1); - expect(removeSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient); + expect(releaseSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient, "stop"); expect(result).toBe("ok"); }); @@ -77,7 +77,7 @@ describe("withResolvedMatrixClient", () => { }); expect(result).toBe("ok"); - expect(resolveSharedMatrixClientMock).not.toHaveBeenCalled(); + expect(acquireSharedMatrixClientMock).not.toHaveBeenCalled(); expect(activeClient.stop).not.toHaveBeenCalled(); }); @@ -91,10 +91,11 @@ describe("withResolvedMatrixClient", () => { await withResolvedMatrixClient({}, async () => {}); expect(getActiveMatrixClientMock).toHaveBeenCalledWith("ops"); - expect(resolveSharedMatrixClientMock).toHaveBeenCalledWith({ + expect(acquireSharedMatrixClientMock).toHaveBeenCalledWith({ cfg: {}, timeoutMs: undefined, accountId: "ops", + startClient: false, }); }); @@ -114,16 +115,17 @@ describe("withResolvedMatrixClient", () => { cfg: explicitCfg, accountId: "ops", }); - expect(resolveSharedMatrixClientMock).toHaveBeenCalledWith({ + expect(acquireSharedMatrixClientMock).toHaveBeenCalledWith({ cfg: explicitCfg, timeoutMs: undefined, accountId: "ops", + startClient: false, }); }); it("stops shared matrix clients when wrapped sends fail", async () => { const sharedClient = createMockMatrixClient(); - resolveSharedMatrixClientMock.mockResolvedValue(sharedClient); + acquireSharedMatrixClientMock.mockResolvedValue(sharedClient); await expect( withResolvedMatrixClient({ accountId: "default" }, async () => { @@ -131,7 +133,6 @@ describe("withResolvedMatrixClient", () => { }), ).rejects.toThrow("boom"); - expect(sharedClient.stop).toHaveBeenCalledTimes(1); - expect(removeSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient); + expect(releaseSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient, "stop"); }); }); diff --git a/extensions/matrix/src/tool-actions.test.ts b/extensions/matrix/src/tool-actions.test.ts index bcb5380e845..d917f33090f 100644 --- a/extensions/matrix/src/tool-actions.test.ts +++ b/extensions/matrix/src/tool-actions.test.ts @@ -207,6 +207,29 @@ describe("handleMatrixAction pollVote", () => { }); }); + it("accepts media-only message sends", async () => { + const cfg = { channels: { matrix: { actions: { messages: true } } } } as CoreConfig; + await handleMatrixAction( + { + action: "sendMessage", + accountId: "ops", + to: "room:!room:example", + mediaUrl: "file:///tmp/photo.png", + }, + cfg, + { mediaLocalRoots: ["/tmp/openclaw-matrix-test"] }, + ); + + expect(mocks.sendMatrixMessage).toHaveBeenCalledWith("room:!room:example", undefined, { + cfg, + accountId: "ops", + mediaUrl: "file:///tmp/photo.png", + mediaLocalRoots: ["/tmp/openclaw-matrix-test"], + replyToId: undefined, + threadId: undefined, + }); + }); + it("passes mediaLocalRoots to profile updates", async () => { const cfg = { channels: { matrix: { actions: { profile: true } } } } as CoreConfig; await handleMatrixAction( diff --git a/extensions/matrix/src/tool-actions.ts b/extensions/matrix/src/tool-actions.ts index de191513cbf..2003789e502 100644 --- a/extensions/matrix/src/tool-actions.ts +++ b/extensions/matrix/src/tool-actions.ts @@ -195,11 +195,11 @@ export async function handleMatrixAction( switch (action) { case "sendMessage": { const to = readStringParam(params, "to", { required: true }); + const mediaUrl = readStringParam(params, "mediaUrl"); const content = readStringParam(params, "content", { - required: true, + required: !mediaUrl, allowEmpty: true, }); - const mediaUrl = readStringParam(params, "mediaUrl"); const replyToId = readStringParam(params, "replyToId") ?? readStringParam(params, "replyTo"); const threadId = readStringParam(params, "threadId"); diff --git a/src/agents/pi-embedded-runner/compaction-failures.ts b/src/agents/pi-embedded-runner/compaction-failures.ts new file mode 100644 index 00000000000..e3186b648f1 --- /dev/null +++ b/src/agents/pi-embedded-runner/compaction-failures.ts @@ -0,0 +1,24 @@ +import { EventEmitter } from "node:events"; +import { registerUnhandledRejectionHandler } from "../../infra/unhandled-rejections.js"; +import { isCompactionFailureError } from "../pi-embedded-helpers.js"; +import { log } from "./logger.js"; +import { describeUnknownError } from "./utils.js"; + +const compactionFailureEmitter = new EventEmitter(); + +export type CompactionFailureListener = (reason: string) => void; + +export function onUnhandledCompactionFailure(cb: CompactionFailureListener): () => void { + compactionFailureEmitter.on("failure", cb); + return () => compactionFailureEmitter.off("failure", cb); +} + +registerUnhandledRejectionHandler((reason) => { + const message = describeUnknownError(reason); + if (!isCompactionFailureError(message)) { + return false; + } + log.error(`Auto-compaction failed (unhandled): ${message}`); + compactionFailureEmitter.emit("failure", message); + return true; +}); diff --git a/src/agents/pi-embedded-runner/sanitize-session-history.thinking.test.ts b/src/agents/pi-embedded-runner/sanitize-session-history.thinking.test.ts new file mode 100644 index 00000000000..55701ef8669 --- /dev/null +++ b/src/agents/pi-embedded-runner/sanitize-session-history.thinking.test.ts @@ -0,0 +1,39 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; +import { SessionManager } from "@mariozechner/pi-coding-agent"; +import { describe, expect, it } from "vitest"; +import { makeAgentAssistantMessage } from "../test-helpers/agent-message-fixtures.js"; +import { sanitizeSessionHistory } from "./transcript-hygiene.js"; + +describe("sanitizeSessionHistory thinking stripping", () => { + it("strips latest thinking blocks for non-Anthropic providers that reject replayed thinking", async () => { + const sm = SessionManager.inMemory(); + const messages: AgentMessage[] = [ + makeAgentAssistantMessage({ + provider: "github-copilot", + model: "claude-3.7-sonnet", + api: "openai-completions", + content: [ + { type: "thinking", thinking: "internal" }, + { type: "text", text: "final answer" }, + ], + timestamp: 1, + }), + ]; + + const sanitized = await sanitizeSessionHistory({ + messages, + modelApi: "openai-completions", + provider: "github-copilot", + modelId: "claude-3.7-sonnet", + sessionManager: sm, + sessionId: "test", + }); + + expect(sanitized).not.toBe(messages); + const assistant = sanitized.find( + (message): message is Extract => + Boolean(message && typeof message === "object" && message.role === "assistant"), + ); + expect(assistant?.content).toEqual([{ type: "text", text: "final answer" }]); + }); +}); diff --git a/src/channels/plugins/setup-helpers.test.ts b/src/channels/plugins/setup-helpers.test.ts index 4111986e175..1abe3e47825 100644 --- a/src/channels/plugins/setup-helpers.test.ts +++ b/src/channels/plugins/setup-helpers.test.ts @@ -5,6 +5,7 @@ import { applySetupAccountConfigPatch, createEnvPatchedAccountSetupAdapter, createPatchedAccountSetupAdapter, + moveSingleAccountChannelSectionToDefaultAccount, prepareScopedSetupConfig, } from "./setup-helpers.js"; @@ -163,6 +164,43 @@ describe("createPatchedAccountSetupAdapter", () => { }); }); +describe("moveSingleAccountChannelSectionToDefaultAccount", () => { + it("promotes legacy Matrix keys into the sole named account when defaultAccount is unset", () => { + const next = moveSingleAccountChannelSectionToDefaultAccount({ + cfg: asConfig({ + channels: { + matrix: { + homeserver: "https://matrix.example.org", + userId: "@bot:example.org", + accessToken: "token", + accounts: { + main: { + enabled: true, + }, + }, + }, + }, + }), + channelKey: "matrix", + }); + + expect(next.channels?.matrix).toMatchObject({ + accounts: { + main: { + enabled: true, + homeserver: "https://matrix.example.org", + userId: "@bot:example.org", + accessToken: "token", + }, + }, + }); + expect(next.channels?.matrix?.accounts?.default).toBeUndefined(); + expect(next.channels?.matrix?.homeserver).toBeUndefined(); + expect(next.channels?.matrix?.userId).toBeUndefined(); + expect(next.channels?.matrix?.accessToken).toBeUndefined(); + }); +}); + describe("createEnvPatchedAccountSetupAdapter", () => { it("rejects env mode for named accounts and requires credentials otherwise", () => { const adapter = createEnvPatchedAccountSetupAdapter({ diff --git a/src/channels/plugins/setup-helpers.ts b/src/channels/plugins/setup-helpers.ts index 99332591bf0..c622769af0b 100644 --- a/src/channels/plugins/setup-helpers.ts +++ b/src/channels/plugins/setup-helpers.ts @@ -447,16 +447,33 @@ export function resolveSingleAccountPromotionTarget(params: { if (params.channelKey !== "matrix") { return DEFAULT_ACCOUNT_ID; } - const normalizedDefaultAccount = normalizeAccountId(params.channel.defaultAccount); - if (normalizedDefaultAccount === DEFAULT_ACCOUNT_ID) { + const accounts = params.channel.accounts ?? {}; + const normalizedDefaultAccount = + typeof params.channel.defaultAccount === "string" && params.channel.defaultAccount.trim() + ? normalizeAccountId(params.channel.defaultAccount) + : undefined; + if (normalizedDefaultAccount) { + if ( + normalizedDefaultAccount !== DEFAULT_ACCOUNT_ID && + accounts[normalizedDefaultAccount] && + typeof accounts[normalizedDefaultAccount] === "object" + ) { + return normalizedDefaultAccount; + } return DEFAULT_ACCOUNT_ID; } - const accounts = params.channel.accounts ?? {}; + const namedAccounts = Object.entries(accounts).filter( + ([accountId, value]) => accountId && typeof value === "object" && value, + ); + if (namedAccounts.length === 1) { + return namedAccounts[0][0]; + } if ( - accounts[normalizedDefaultAccount] && - typeof accounts[normalizedDefaultAccount] === "object" + namedAccounts.length > 1 && + accounts[DEFAULT_ACCOUNT_ID] && + typeof accounts[DEFAULT_ACCOUNT_ID] === "object" ) { - return normalizedDefaultAccount; + return DEFAULT_ACCOUNT_ID; } return DEFAULT_ACCOUNT_ID; }