mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-20 22:40:58 +00:00
Matrix: fix one-off client and media-send regressions
This commit is contained in:
@@ -155,4 +155,28 @@ describe("matrixMessageActions account propagation", () => {
|
||||
{ mediaLocalRoots: ["/tmp/openclaw-matrix-test"] },
|
||||
);
|
||||
});
|
||||
|
||||
it("allows media-only sends without requiring a message body", async () => {
|
||||
await matrixMessageActions.handleAction?.(
|
||||
createContext({
|
||||
action: "send",
|
||||
accountId: "ops",
|
||||
params: {
|
||||
to: "room:!room:example",
|
||||
media: "file:///tmp/photo.png",
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
expect(mocks.handleMatrixAction).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
action: "sendMessage",
|
||||
accountId: "ops",
|
||||
content: undefined,
|
||||
mediaUrl: "file:///tmp/photo.png",
|
||||
}),
|
||||
expect.any(Object),
|
||||
{ mediaLocalRoots: undefined },
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -114,11 +114,11 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
|
||||
|
||||
if (action === "send") {
|
||||
const to = readStringParam(params, "to", { required: true });
|
||||
const mediaUrl = readStringParam(params, "media", { trim: false });
|
||||
const content = readStringParam(params, "message", {
|
||||
required: true,
|
||||
required: !mediaUrl,
|
||||
allowEmpty: true,
|
||||
});
|
||||
const mediaUrl = readStringParam(params, "media", { trim: false });
|
||||
const replyTo = readStringParam(params, "replyTo");
|
||||
const threadId = readStringParam(params, "threadId");
|
||||
return await dispatch({
|
||||
|
||||
@@ -11,8 +11,8 @@ const {
|
||||
loadConfigMock,
|
||||
getMatrixRuntimeMock,
|
||||
getActiveMatrixClientMock,
|
||||
resolveSharedMatrixClientMock,
|
||||
removeSharedClientInstanceMock,
|
||||
acquireSharedMatrixClientMock,
|
||||
releaseSharedClientInstanceMock,
|
||||
isBunRuntimeMock,
|
||||
resolveMatrixAuthContextMock,
|
||||
} = matrixClientResolverMocks;
|
||||
@@ -26,13 +26,13 @@ vi.mock("../active-client.js", () => ({
|
||||
}));
|
||||
|
||||
vi.mock("../client.js", () => ({
|
||||
resolveSharedMatrixClient: resolveSharedMatrixClientMock,
|
||||
acquireSharedMatrixClient: acquireSharedMatrixClientMock,
|
||||
isBunRuntime: () => isBunRuntimeMock(),
|
||||
resolveMatrixAuthContext: resolveMatrixAuthContextMock,
|
||||
}));
|
||||
|
||||
vi.mock("../client/shared.js", () => ({
|
||||
removeSharedClientInstance: (...args: unknown[]) => removeSharedClientInstanceMock(...args),
|
||||
releaseSharedClientInstance: (...args: unknown[]) => releaseSharedClientInstanceMock(...args),
|
||||
}));
|
||||
|
||||
vi.mock("../send.js", () => ({
|
||||
@@ -65,37 +65,35 @@ describe("action client helpers", () => {
|
||||
const result = await withResolvedActionClient({ accountId: "default" }, async () => "ok");
|
||||
|
||||
expect(getActiveMatrixClientMock).toHaveBeenCalledWith("default");
|
||||
expect(resolveSharedMatrixClientMock).toHaveBeenCalledTimes(1);
|
||||
expect(resolveSharedMatrixClientMock).toHaveBeenCalledWith({
|
||||
expect(acquireSharedMatrixClientMock).toHaveBeenCalledTimes(1);
|
||||
expect(acquireSharedMatrixClientMock).toHaveBeenCalledWith({
|
||||
cfg: {},
|
||||
timeoutMs: undefined,
|
||||
accountId: "default",
|
||||
startClient: false,
|
||||
});
|
||||
const sharedClient = await resolveSharedMatrixClientMock.mock.results[0]?.value;
|
||||
const sharedClient = await acquireSharedMatrixClientMock.mock.results[0]?.value;
|
||||
expect(sharedClient.prepareForOneOff).toHaveBeenCalledTimes(1);
|
||||
expect(sharedClient.stop).toHaveBeenCalledTimes(1);
|
||||
expect(removeSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient);
|
||||
expect(releaseSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient, "stop");
|
||||
expect(result).toBe("ok");
|
||||
});
|
||||
|
||||
it("skips one-off room preparation when readiness is disabled", async () => {
|
||||
await withResolvedActionClient({ accountId: "default", readiness: "none" }, async () => {});
|
||||
|
||||
const sharedClient = await resolveSharedMatrixClientMock.mock.results[0]?.value;
|
||||
const sharedClient = await acquireSharedMatrixClientMock.mock.results[0]?.value;
|
||||
expect(sharedClient.prepareForOneOff).not.toHaveBeenCalled();
|
||||
expect(sharedClient.start).not.toHaveBeenCalled();
|
||||
expect(sharedClient.stop).toHaveBeenCalledTimes(1);
|
||||
expect(releaseSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient, "stop");
|
||||
});
|
||||
|
||||
it("starts one-off clients when started readiness is required", async () => {
|
||||
await withStartedActionClient({ accountId: "default" }, async () => {});
|
||||
|
||||
const sharedClient = await resolveSharedMatrixClientMock.mock.results[0]?.value;
|
||||
const sharedClient = await acquireSharedMatrixClientMock.mock.results[0]?.value;
|
||||
expect(sharedClient.start).toHaveBeenCalledTimes(1);
|
||||
expect(sharedClient.prepareForOneOff).not.toHaveBeenCalled();
|
||||
expect(sharedClient.stop).not.toHaveBeenCalled();
|
||||
expect(sharedClient.stopAndPersist).toHaveBeenCalledTimes(1);
|
||||
expect(removeSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient);
|
||||
expect(releaseSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient, "persist");
|
||||
});
|
||||
|
||||
it("reuses active monitor client when available", async () => {
|
||||
@@ -108,7 +106,7 @@ describe("action client helpers", () => {
|
||||
});
|
||||
|
||||
expect(result).toBe("ok");
|
||||
expect(resolveSharedMatrixClientMock).not.toHaveBeenCalled();
|
||||
expect(acquireSharedMatrixClientMock).not.toHaveBeenCalled();
|
||||
expect(activeClient.stop).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
@@ -155,10 +153,11 @@ describe("action client helpers", () => {
|
||||
await withResolvedActionClient({}, async () => {});
|
||||
|
||||
expect(getActiveMatrixClientMock).toHaveBeenCalledWith("ops");
|
||||
expect(resolveSharedMatrixClientMock).toHaveBeenCalledWith({
|
||||
expect(acquireSharedMatrixClientMock).toHaveBeenCalledWith({
|
||||
cfg: loadConfigMock(),
|
||||
timeoutMs: undefined,
|
||||
accountId: "ops",
|
||||
startClient: false,
|
||||
});
|
||||
});
|
||||
|
||||
@@ -178,16 +177,17 @@ describe("action client helpers", () => {
|
||||
cfg: explicitCfg,
|
||||
accountId: "ops",
|
||||
});
|
||||
expect(resolveSharedMatrixClientMock).toHaveBeenCalledWith({
|
||||
expect(acquireSharedMatrixClientMock).toHaveBeenCalledWith({
|
||||
cfg: explicitCfg,
|
||||
timeoutMs: undefined,
|
||||
accountId: "ops",
|
||||
startClient: false,
|
||||
});
|
||||
});
|
||||
|
||||
it("stops shared action clients after wrapped calls succeed", async () => {
|
||||
const sharedClient = createMockMatrixClient();
|
||||
resolveSharedMatrixClientMock.mockResolvedValue(sharedClient);
|
||||
acquireSharedMatrixClientMock.mockResolvedValue(sharedClient);
|
||||
|
||||
const result = await withResolvedActionClient({ accountId: "default" }, async (client) => {
|
||||
expect(client).toBe(sharedClient);
|
||||
@@ -195,14 +195,12 @@ describe("action client helpers", () => {
|
||||
});
|
||||
|
||||
expect(result).toBe("ok");
|
||||
expect(sharedClient.stop).toHaveBeenCalledTimes(1);
|
||||
expect(sharedClient.stopAndPersist).not.toHaveBeenCalled();
|
||||
expect(removeSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient);
|
||||
expect(releaseSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient, "stop");
|
||||
});
|
||||
|
||||
it("stops shared action clients when the wrapped call throws", async () => {
|
||||
const sharedClient = createMockMatrixClient();
|
||||
resolveSharedMatrixClientMock.mockResolvedValue(sharedClient);
|
||||
acquireSharedMatrixClientMock.mockResolvedValue(sharedClient);
|
||||
|
||||
await expect(
|
||||
withResolvedActionClient({ accountId: "default" }, async () => {
|
||||
@@ -210,13 +208,12 @@ describe("action client helpers", () => {
|
||||
}),
|
||||
).rejects.toThrow("boom");
|
||||
|
||||
expect(sharedClient.stop).toHaveBeenCalledTimes(1);
|
||||
expect(sharedClient.stopAndPersist).not.toHaveBeenCalled();
|
||||
expect(releaseSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient, "stop");
|
||||
});
|
||||
|
||||
it("resolves room ids before running wrapped room actions", async () => {
|
||||
const sharedClient = createMockMatrixClient();
|
||||
resolveSharedMatrixClientMock.mockResolvedValue(sharedClient);
|
||||
acquireSharedMatrixClientMock.mockResolvedValue(sharedClient);
|
||||
resolveMatrixRoomIdMock.mockResolvedValue("!room:example.org");
|
||||
|
||||
const result = await withResolvedRoomAction(
|
||||
@@ -230,6 +227,6 @@ describe("action client helpers", () => {
|
||||
|
||||
expect(resolveMatrixRoomIdMock).toHaveBeenCalledWith(sharedClient, "room:#ops:example.org");
|
||||
expect(result).toBe("!room:example.org");
|
||||
expect(sharedClient.stop).toHaveBeenCalledTimes(1);
|
||||
expect(releaseSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient, "stop");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -16,7 +16,7 @@ import {
|
||||
|
||||
export async function sendMatrixMessage(
|
||||
to: string,
|
||||
content: string,
|
||||
content: string | undefined,
|
||||
opts: MatrixActionClientOpts & {
|
||||
mediaUrl?: string;
|
||||
replyToId?: string;
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import { getMatrixRuntime } from "../runtime.js";
|
||||
import type { CoreConfig } from "../types.js";
|
||||
import { getActiveMatrixClient } from "./active-client.js";
|
||||
import { isBunRuntime, resolveMatrixAuthContext, resolveSharedMatrixClient } from "./client.js";
|
||||
import { removeSharedClientInstance } from "./client/shared.js";
|
||||
import { acquireSharedMatrixClient, isBunRuntime, resolveMatrixAuthContext } from "./client.js";
|
||||
import { releaseSharedClientInstance } from "./client/shared.js";
|
||||
import type { MatrixClient } from "./sdk.js";
|
||||
|
||||
type ResolvedRuntimeMatrixClient = {
|
||||
@@ -63,22 +63,18 @@ async function resolveRuntimeMatrixClient(opts: {
|
||||
return { client: active, stopOnDone: false };
|
||||
}
|
||||
|
||||
const client = await resolveSharedMatrixClient({
|
||||
const client = await acquireSharedMatrixClient({
|
||||
cfg,
|
||||
timeoutMs: opts.timeoutMs,
|
||||
accountId: authContext.accountId,
|
||||
startClient: false,
|
||||
});
|
||||
await opts.onResolved?.(client, { preparedByDefault: true });
|
||||
return {
|
||||
client,
|
||||
stopOnDone: true,
|
||||
cleanup: async (mode) => {
|
||||
if (mode === "persist") {
|
||||
await client.stopAndPersist();
|
||||
} else {
|
||||
client.stop();
|
||||
}
|
||||
removeSharedClientInstance(client);
|
||||
await releaseSharedClientInstance(client, mode);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
@@ -5,8 +5,8 @@ type MatrixClientResolverMocks = {
|
||||
loadConfigMock: Mock<() => unknown>;
|
||||
getMatrixRuntimeMock: Mock<() => unknown>;
|
||||
getActiveMatrixClientMock: Mock<(...args: unknown[]) => MatrixClient | null>;
|
||||
resolveSharedMatrixClientMock: Mock<(...args: unknown[]) => Promise<MatrixClient>>;
|
||||
removeSharedClientInstanceMock: Mock<(...args: unknown[]) => void>;
|
||||
acquireSharedMatrixClientMock: Mock<(...args: unknown[]) => Promise<MatrixClient>>;
|
||||
releaseSharedClientInstanceMock: Mock<(...args: unknown[]) => Promise<boolean>>;
|
||||
isBunRuntimeMock: Mock<() => boolean>;
|
||||
resolveMatrixAuthContextMock: Mock<
|
||||
(params: { cfg: unknown; accountId?: string | null }) => unknown
|
||||
@@ -17,8 +17,8 @@ export const matrixClientResolverMocks: MatrixClientResolverMocks = {
|
||||
loadConfigMock: vi.fn(() => ({})),
|
||||
getMatrixRuntimeMock: vi.fn(),
|
||||
getActiveMatrixClientMock: vi.fn(),
|
||||
resolveSharedMatrixClientMock: vi.fn(),
|
||||
removeSharedClientInstanceMock: vi.fn(),
|
||||
acquireSharedMatrixClientMock: vi.fn(),
|
||||
releaseSharedClientInstanceMock: vi.fn(),
|
||||
isBunRuntimeMock: vi.fn(() => false),
|
||||
resolveMatrixAuthContextMock: vi.fn(),
|
||||
};
|
||||
@@ -43,8 +43,8 @@ export function primeMatrixClientResolverMocks(params?: {
|
||||
loadConfigMock,
|
||||
getMatrixRuntimeMock,
|
||||
getActiveMatrixClientMock,
|
||||
resolveSharedMatrixClientMock,
|
||||
removeSharedClientInstanceMock,
|
||||
acquireSharedMatrixClientMock,
|
||||
releaseSharedClientInstanceMock,
|
||||
isBunRuntimeMock,
|
||||
resolveMatrixAuthContextMock,
|
||||
} = matrixClientResolverMocks;
|
||||
@@ -70,7 +70,7 @@ export function primeMatrixClientResolverMocks(params?: {
|
||||
});
|
||||
getActiveMatrixClientMock.mockReturnValue(null);
|
||||
isBunRuntimeMock.mockReturnValue(false);
|
||||
removeSharedClientInstanceMock.mockReset();
|
||||
releaseSharedClientInstanceMock.mockReset().mockResolvedValue(true);
|
||||
resolveMatrixAuthContextMock.mockImplementation(
|
||||
({
|
||||
cfg: explicitCfg,
|
||||
@@ -88,7 +88,7 @@ export function primeMatrixClientResolverMocks(params?: {
|
||||
},
|
||||
}),
|
||||
);
|
||||
resolveSharedMatrixClientMock.mockResolvedValue(client);
|
||||
acquireSharedMatrixClientMock.mockResolvedValue(client);
|
||||
|
||||
return client;
|
||||
}
|
||||
|
||||
@@ -11,7 +11,9 @@ export {
|
||||
} from "./client/config.js";
|
||||
export { createMatrixClient } from "./client/create-client.js";
|
||||
export {
|
||||
acquireSharedMatrixClient,
|
||||
removeSharedClientInstance,
|
||||
releaseSharedClientInstance,
|
||||
resolveSharedMatrixClient,
|
||||
stopSharedClientForAccount,
|
||||
stopSharedClientInstance,
|
||||
|
||||
@@ -15,6 +15,8 @@ vi.mock("./create-client.js", () => ({
|
||||
}));
|
||||
|
||||
import {
|
||||
acquireSharedMatrixClient,
|
||||
releaseSharedClientInstance,
|
||||
resolveSharedMatrixClient,
|
||||
stopSharedClient,
|
||||
stopSharedClientForAccount,
|
||||
@@ -174,6 +176,46 @@ describe("resolveSharedMatrixClient", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("honors startClient false even when the caller acquires a shared lease", async () => {
|
||||
const mainAuth = authFor("main");
|
||||
const mainClient = createMockClient("main");
|
||||
|
||||
resolveMatrixAuthMock.mockResolvedValue(mainAuth);
|
||||
createMatrixClientMock.mockResolvedValue(mainClient);
|
||||
|
||||
const client = await acquireSharedMatrixClient({ accountId: "main", startClient: false });
|
||||
|
||||
expect(client).toBe(mainClient);
|
||||
expect(mainClient.start).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("keeps shared clients alive until the last one-off lease releases", async () => {
|
||||
const mainAuth = authFor("main");
|
||||
const mainClient = {
|
||||
...createMockClient("main"),
|
||||
stopAndPersist: vi.fn(async () => undefined),
|
||||
};
|
||||
|
||||
resolveMatrixAuthMock.mockResolvedValue(mainAuth);
|
||||
createMatrixClientMock.mockResolvedValue(mainClient);
|
||||
|
||||
const first = await acquireSharedMatrixClient({ accountId: "main", startClient: false });
|
||||
const second = await acquireSharedMatrixClient({ accountId: "main", startClient: false });
|
||||
|
||||
expect(first).toBe(mainClient);
|
||||
expect(second).toBe(mainClient);
|
||||
|
||||
expect(
|
||||
await releaseSharedClientInstance(mainClient as unknown as import("../sdk.js").MatrixClient),
|
||||
).toBe(false);
|
||||
expect(mainClient.stop).not.toHaveBeenCalled();
|
||||
|
||||
expect(
|
||||
await releaseSharedClientInstance(mainClient as unknown as import("../sdk.js").MatrixClient),
|
||||
).toBe(true);
|
||||
expect(mainClient.stop).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("rejects mismatched explicit account ids when auth is already resolved", async () => {
|
||||
await expect(
|
||||
resolveSharedMatrixClient({
|
||||
|
||||
@@ -12,6 +12,7 @@ type SharedMatrixClientState = {
|
||||
started: boolean;
|
||||
cryptoReady: boolean;
|
||||
startPromise: Promise<void> | null;
|
||||
leases: number;
|
||||
};
|
||||
|
||||
const sharedClientStates = new Map<string, SharedMatrixClientState>();
|
||||
@@ -48,9 +49,24 @@ async function createSharedMatrixClient(params: {
|
||||
started: false,
|
||||
cryptoReady: false,
|
||||
startPromise: null,
|
||||
leases: 0,
|
||||
};
|
||||
}
|
||||
|
||||
function findSharedClientStateByInstance(client: MatrixClient): SharedMatrixClientState | null {
|
||||
for (const state of sharedClientStates.values()) {
|
||||
if (state.client === client) {
|
||||
return state;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function deleteSharedClientState(state: SharedMatrixClientState): void {
|
||||
sharedClientStates.delete(state.key);
|
||||
sharedClientPromises.delete(state.key);
|
||||
}
|
||||
|
||||
async function ensureSharedClientStarted(params: {
|
||||
state: SharedMatrixClientState;
|
||||
timeoutMs?: number;
|
||||
@@ -92,7 +108,7 @@ async function ensureSharedClientStarted(params: {
|
||||
}
|
||||
}
|
||||
|
||||
export async function resolveSharedMatrixClient(
|
||||
async function resolveSharedMatrixClientState(
|
||||
params: {
|
||||
cfg?: CoreConfig;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
@@ -101,7 +117,7 @@ export async function resolveSharedMatrixClient(
|
||||
startClient?: boolean;
|
||||
accountId?: string | null;
|
||||
} = {},
|
||||
): Promise<MatrixClient> {
|
||||
): Promise<SharedMatrixClientState> {
|
||||
const requestedAccountId = normalizeOptionalAccountId(params.accountId);
|
||||
if (params.auth && requestedAccountId && requestedAccountId !== params.auth.accountId) {
|
||||
throw new Error(
|
||||
@@ -135,7 +151,7 @@ export async function resolveSharedMatrixClient(
|
||||
encryption: auth.encryption,
|
||||
});
|
||||
}
|
||||
return existingState.client;
|
||||
return existingState;
|
||||
}
|
||||
|
||||
const existingPromise = sharedClientPromises.get(key);
|
||||
@@ -149,7 +165,7 @@ export async function resolveSharedMatrixClient(
|
||||
encryption: auth.encryption,
|
||||
});
|
||||
}
|
||||
return pending.client;
|
||||
return pending;
|
||||
}
|
||||
|
||||
const creationPromise = createSharedMatrixClient({
|
||||
@@ -169,12 +185,41 @@ export async function resolveSharedMatrixClient(
|
||||
encryption: auth.encryption,
|
||||
});
|
||||
}
|
||||
return created.client;
|
||||
return created;
|
||||
} finally {
|
||||
sharedClientPromises.delete(key);
|
||||
}
|
||||
}
|
||||
|
||||
export async function resolveSharedMatrixClient(
|
||||
params: {
|
||||
cfg?: CoreConfig;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
timeoutMs?: number;
|
||||
auth?: MatrixAuth;
|
||||
startClient?: boolean;
|
||||
accountId?: string | null;
|
||||
} = {},
|
||||
): Promise<MatrixClient> {
|
||||
const state = await resolveSharedMatrixClientState(params);
|
||||
return state.client;
|
||||
}
|
||||
|
||||
export async function acquireSharedMatrixClient(
|
||||
params: {
|
||||
cfg?: CoreConfig;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
timeoutMs?: number;
|
||||
auth?: MatrixAuth;
|
||||
startClient?: boolean;
|
||||
accountId?: string | null;
|
||||
} = {},
|
||||
): Promise<MatrixClient> {
|
||||
const state = await resolveSharedMatrixClientState(params);
|
||||
state.leases += 1;
|
||||
return state.client;
|
||||
}
|
||||
|
||||
export function stopSharedClient(): void {
|
||||
for (const state of sharedClientStates.values()) {
|
||||
state.client.stop();
|
||||
@@ -190,20 +235,16 @@ export function stopSharedClientForAccount(auth: MatrixAuth): void {
|
||||
return;
|
||||
}
|
||||
state.client.stop();
|
||||
sharedClientStates.delete(key);
|
||||
sharedClientPromises.delete(key);
|
||||
deleteSharedClientState(state);
|
||||
}
|
||||
|
||||
export function removeSharedClientInstance(client: MatrixClient): boolean {
|
||||
for (const [key, state] of sharedClientStates.entries()) {
|
||||
if (state.client !== client) {
|
||||
continue;
|
||||
}
|
||||
sharedClientStates.delete(key);
|
||||
sharedClientPromises.delete(key);
|
||||
return true;
|
||||
const state = findSharedClientStateByInstance(client);
|
||||
if (!state) {
|
||||
return false;
|
||||
}
|
||||
return false;
|
||||
deleteSharedClientState(state);
|
||||
return true;
|
||||
}
|
||||
|
||||
export function stopSharedClientInstance(client: MatrixClient): void {
|
||||
@@ -212,3 +253,24 @@ export function stopSharedClientInstance(client: MatrixClient): void {
|
||||
}
|
||||
client.stop();
|
||||
}
|
||||
|
||||
export async function releaseSharedClientInstance(
|
||||
client: MatrixClient,
|
||||
mode: "stop" | "persist" = "stop",
|
||||
): Promise<boolean> {
|
||||
const state = findSharedClientStateByInstance(client);
|
||||
if (!state) {
|
||||
return false;
|
||||
}
|
||||
state.leases = Math.max(0, state.leases - 1);
|
||||
if (state.leases > 0) {
|
||||
return false;
|
||||
}
|
||||
deleteSharedClientState(state);
|
||||
if (mode === "persist") {
|
||||
await client.stopAndPersist();
|
||||
} else {
|
||||
client.stop();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -63,7 +63,7 @@ function normalizeMatrixClientResolveOpts(
|
||||
|
||||
export async function sendMessageMatrix(
|
||||
to: string,
|
||||
message: string,
|
||||
message: string | undefined,
|
||||
opts: MatrixSendOpts = {},
|
||||
): Promise<MatrixSendResult> {
|
||||
const trimmedMessage = message?.trim() ?? "";
|
||||
|
||||
@@ -8,8 +8,8 @@ import {
|
||||
const {
|
||||
getMatrixRuntimeMock,
|
||||
getActiveMatrixClientMock,
|
||||
resolveSharedMatrixClientMock,
|
||||
removeSharedClientInstanceMock,
|
||||
acquireSharedMatrixClientMock,
|
||||
releaseSharedClientInstanceMock,
|
||||
isBunRuntimeMock,
|
||||
resolveMatrixAuthContextMock,
|
||||
} = matrixClientResolverMocks;
|
||||
@@ -19,13 +19,13 @@ vi.mock("../active-client.js", () => ({
|
||||
}));
|
||||
|
||||
vi.mock("../client.js", () => ({
|
||||
resolveSharedMatrixClient: (...args: unknown[]) => resolveSharedMatrixClientMock(...args),
|
||||
acquireSharedMatrixClient: (...args: unknown[]) => acquireSharedMatrixClientMock(...args),
|
||||
isBunRuntime: () => isBunRuntimeMock(),
|
||||
resolveMatrixAuthContext: resolveMatrixAuthContextMock,
|
||||
}));
|
||||
|
||||
vi.mock("../client/shared.js", () => ({
|
||||
removeSharedClientInstance: (...args: unknown[]) => removeSharedClientInstanceMock(...args),
|
||||
releaseSharedClientInstance: (...args: unknown[]) => releaseSharedClientInstanceMock(...args),
|
||||
}));
|
||||
|
||||
vi.mock("../../runtime.js", () => ({
|
||||
@@ -54,16 +54,16 @@ describe("withResolvedMatrixClient", () => {
|
||||
const result = await withResolvedMatrixClient({ accountId: "default" }, async () => "ok");
|
||||
|
||||
expect(getActiveMatrixClientMock).toHaveBeenCalledWith("default");
|
||||
expect(resolveSharedMatrixClientMock).toHaveBeenCalledTimes(1);
|
||||
expect(resolveSharedMatrixClientMock).toHaveBeenCalledWith({
|
||||
expect(acquireSharedMatrixClientMock).toHaveBeenCalledTimes(1);
|
||||
expect(acquireSharedMatrixClientMock).toHaveBeenCalledWith({
|
||||
cfg: {},
|
||||
timeoutMs: undefined,
|
||||
accountId: "default",
|
||||
startClient: false,
|
||||
});
|
||||
const sharedClient = await resolveSharedMatrixClientMock.mock.results[0]?.value;
|
||||
const sharedClient = await acquireSharedMatrixClientMock.mock.results[0]?.value;
|
||||
expect(sharedClient.prepareForOneOff).toHaveBeenCalledTimes(1);
|
||||
expect(sharedClient.stop).toHaveBeenCalledTimes(1);
|
||||
expect(removeSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient);
|
||||
expect(releaseSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient, "stop");
|
||||
expect(result).toBe("ok");
|
||||
});
|
||||
|
||||
@@ -77,7 +77,7 @@ describe("withResolvedMatrixClient", () => {
|
||||
});
|
||||
|
||||
expect(result).toBe("ok");
|
||||
expect(resolveSharedMatrixClientMock).not.toHaveBeenCalled();
|
||||
expect(acquireSharedMatrixClientMock).not.toHaveBeenCalled();
|
||||
expect(activeClient.stop).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
@@ -91,10 +91,11 @@ describe("withResolvedMatrixClient", () => {
|
||||
await withResolvedMatrixClient({}, async () => {});
|
||||
|
||||
expect(getActiveMatrixClientMock).toHaveBeenCalledWith("ops");
|
||||
expect(resolveSharedMatrixClientMock).toHaveBeenCalledWith({
|
||||
expect(acquireSharedMatrixClientMock).toHaveBeenCalledWith({
|
||||
cfg: {},
|
||||
timeoutMs: undefined,
|
||||
accountId: "ops",
|
||||
startClient: false,
|
||||
});
|
||||
});
|
||||
|
||||
@@ -114,16 +115,17 @@ describe("withResolvedMatrixClient", () => {
|
||||
cfg: explicitCfg,
|
||||
accountId: "ops",
|
||||
});
|
||||
expect(resolveSharedMatrixClientMock).toHaveBeenCalledWith({
|
||||
expect(acquireSharedMatrixClientMock).toHaveBeenCalledWith({
|
||||
cfg: explicitCfg,
|
||||
timeoutMs: undefined,
|
||||
accountId: "ops",
|
||||
startClient: false,
|
||||
});
|
||||
});
|
||||
|
||||
it("stops shared matrix clients when wrapped sends fail", async () => {
|
||||
const sharedClient = createMockMatrixClient();
|
||||
resolveSharedMatrixClientMock.mockResolvedValue(sharedClient);
|
||||
acquireSharedMatrixClientMock.mockResolvedValue(sharedClient);
|
||||
|
||||
await expect(
|
||||
withResolvedMatrixClient({ accountId: "default" }, async () => {
|
||||
@@ -131,7 +133,6 @@ describe("withResolvedMatrixClient", () => {
|
||||
}),
|
||||
).rejects.toThrow("boom");
|
||||
|
||||
expect(sharedClient.stop).toHaveBeenCalledTimes(1);
|
||||
expect(removeSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient);
|
||||
expect(releaseSharedClientInstanceMock).toHaveBeenCalledWith(sharedClient, "stop");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -207,6 +207,29 @@ describe("handleMatrixAction pollVote", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("accepts media-only message sends", async () => {
|
||||
const cfg = { channels: { matrix: { actions: { messages: true } } } } as CoreConfig;
|
||||
await handleMatrixAction(
|
||||
{
|
||||
action: "sendMessage",
|
||||
accountId: "ops",
|
||||
to: "room:!room:example",
|
||||
mediaUrl: "file:///tmp/photo.png",
|
||||
},
|
||||
cfg,
|
||||
{ mediaLocalRoots: ["/tmp/openclaw-matrix-test"] },
|
||||
);
|
||||
|
||||
expect(mocks.sendMatrixMessage).toHaveBeenCalledWith("room:!room:example", undefined, {
|
||||
cfg,
|
||||
accountId: "ops",
|
||||
mediaUrl: "file:///tmp/photo.png",
|
||||
mediaLocalRoots: ["/tmp/openclaw-matrix-test"],
|
||||
replyToId: undefined,
|
||||
threadId: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it("passes mediaLocalRoots to profile updates", async () => {
|
||||
const cfg = { channels: { matrix: { actions: { profile: true } } } } as CoreConfig;
|
||||
await handleMatrixAction(
|
||||
|
||||
@@ -195,11 +195,11 @@ export async function handleMatrixAction(
|
||||
switch (action) {
|
||||
case "sendMessage": {
|
||||
const to = readStringParam(params, "to", { required: true });
|
||||
const mediaUrl = readStringParam(params, "mediaUrl");
|
||||
const content = readStringParam(params, "content", {
|
||||
required: true,
|
||||
required: !mediaUrl,
|
||||
allowEmpty: true,
|
||||
});
|
||||
const mediaUrl = readStringParam(params, "mediaUrl");
|
||||
const replyToId =
|
||||
readStringParam(params, "replyToId") ?? readStringParam(params, "replyTo");
|
||||
const threadId = readStringParam(params, "threadId");
|
||||
|
||||
24
src/agents/pi-embedded-runner/compaction-failures.ts
Normal file
24
src/agents/pi-embedded-runner/compaction-failures.ts
Normal file
@@ -0,0 +1,24 @@
|
||||
import { EventEmitter } from "node:events";
|
||||
import { registerUnhandledRejectionHandler } from "../../infra/unhandled-rejections.js";
|
||||
import { isCompactionFailureError } from "../pi-embedded-helpers.js";
|
||||
import { log } from "./logger.js";
|
||||
import { describeUnknownError } from "./utils.js";
|
||||
|
||||
const compactionFailureEmitter = new EventEmitter();
|
||||
|
||||
export type CompactionFailureListener = (reason: string) => void;
|
||||
|
||||
export function onUnhandledCompactionFailure(cb: CompactionFailureListener): () => void {
|
||||
compactionFailureEmitter.on("failure", cb);
|
||||
return () => compactionFailureEmitter.off("failure", cb);
|
||||
}
|
||||
|
||||
registerUnhandledRejectionHandler((reason) => {
|
||||
const message = describeUnknownError(reason);
|
||||
if (!isCompactionFailureError(message)) {
|
||||
return false;
|
||||
}
|
||||
log.error(`Auto-compaction failed (unhandled): ${message}`);
|
||||
compactionFailureEmitter.emit("failure", message);
|
||||
return true;
|
||||
});
|
||||
@@ -0,0 +1,39 @@
|
||||
import type { AgentMessage } from "@mariozechner/pi-agent-core";
|
||||
import { SessionManager } from "@mariozechner/pi-coding-agent";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { makeAgentAssistantMessage } from "../test-helpers/agent-message-fixtures.js";
|
||||
import { sanitizeSessionHistory } from "./transcript-hygiene.js";
|
||||
|
||||
describe("sanitizeSessionHistory thinking stripping", () => {
|
||||
it("strips latest thinking blocks for non-Anthropic providers that reject replayed thinking", async () => {
|
||||
const sm = SessionManager.inMemory();
|
||||
const messages: AgentMessage[] = [
|
||||
makeAgentAssistantMessage({
|
||||
provider: "github-copilot",
|
||||
model: "claude-3.7-sonnet",
|
||||
api: "openai-completions",
|
||||
content: [
|
||||
{ type: "thinking", thinking: "internal" },
|
||||
{ type: "text", text: "final answer" },
|
||||
],
|
||||
timestamp: 1,
|
||||
}),
|
||||
];
|
||||
|
||||
const sanitized = await sanitizeSessionHistory({
|
||||
messages,
|
||||
modelApi: "openai-completions",
|
||||
provider: "github-copilot",
|
||||
modelId: "claude-3.7-sonnet",
|
||||
sessionManager: sm,
|
||||
sessionId: "test",
|
||||
});
|
||||
|
||||
expect(sanitized).not.toBe(messages);
|
||||
const assistant = sanitized.find(
|
||||
(message): message is Extract<AgentMessage, { role: "assistant" }> =>
|
||||
Boolean(message && typeof message === "object" && message.role === "assistant"),
|
||||
);
|
||||
expect(assistant?.content).toEqual([{ type: "text", text: "final answer" }]);
|
||||
});
|
||||
});
|
||||
@@ -5,6 +5,7 @@ import {
|
||||
applySetupAccountConfigPatch,
|
||||
createEnvPatchedAccountSetupAdapter,
|
||||
createPatchedAccountSetupAdapter,
|
||||
moveSingleAccountChannelSectionToDefaultAccount,
|
||||
prepareScopedSetupConfig,
|
||||
} from "./setup-helpers.js";
|
||||
|
||||
@@ -163,6 +164,43 @@ describe("createPatchedAccountSetupAdapter", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("moveSingleAccountChannelSectionToDefaultAccount", () => {
|
||||
it("promotes legacy Matrix keys into the sole named account when defaultAccount is unset", () => {
|
||||
const next = moveSingleAccountChannelSectionToDefaultAccount({
|
||||
cfg: asConfig({
|
||||
channels: {
|
||||
matrix: {
|
||||
homeserver: "https://matrix.example.org",
|
||||
userId: "@bot:example.org",
|
||||
accessToken: "token",
|
||||
accounts: {
|
||||
main: {
|
||||
enabled: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}),
|
||||
channelKey: "matrix",
|
||||
});
|
||||
|
||||
expect(next.channels?.matrix).toMatchObject({
|
||||
accounts: {
|
||||
main: {
|
||||
enabled: true,
|
||||
homeserver: "https://matrix.example.org",
|
||||
userId: "@bot:example.org",
|
||||
accessToken: "token",
|
||||
},
|
||||
},
|
||||
});
|
||||
expect(next.channels?.matrix?.accounts?.default).toBeUndefined();
|
||||
expect(next.channels?.matrix?.homeserver).toBeUndefined();
|
||||
expect(next.channels?.matrix?.userId).toBeUndefined();
|
||||
expect(next.channels?.matrix?.accessToken).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe("createEnvPatchedAccountSetupAdapter", () => {
|
||||
it("rejects env mode for named accounts and requires credentials otherwise", () => {
|
||||
const adapter = createEnvPatchedAccountSetupAdapter({
|
||||
|
||||
@@ -447,16 +447,33 @@ export function resolveSingleAccountPromotionTarget(params: {
|
||||
if (params.channelKey !== "matrix") {
|
||||
return DEFAULT_ACCOUNT_ID;
|
||||
}
|
||||
const normalizedDefaultAccount = normalizeAccountId(params.channel.defaultAccount);
|
||||
if (normalizedDefaultAccount === DEFAULT_ACCOUNT_ID) {
|
||||
const accounts = params.channel.accounts ?? {};
|
||||
const normalizedDefaultAccount =
|
||||
typeof params.channel.defaultAccount === "string" && params.channel.defaultAccount.trim()
|
||||
? normalizeAccountId(params.channel.defaultAccount)
|
||||
: undefined;
|
||||
if (normalizedDefaultAccount) {
|
||||
if (
|
||||
normalizedDefaultAccount !== DEFAULT_ACCOUNT_ID &&
|
||||
accounts[normalizedDefaultAccount] &&
|
||||
typeof accounts[normalizedDefaultAccount] === "object"
|
||||
) {
|
||||
return normalizedDefaultAccount;
|
||||
}
|
||||
return DEFAULT_ACCOUNT_ID;
|
||||
}
|
||||
const accounts = params.channel.accounts ?? {};
|
||||
const namedAccounts = Object.entries(accounts).filter(
|
||||
([accountId, value]) => accountId && typeof value === "object" && value,
|
||||
);
|
||||
if (namedAccounts.length === 1) {
|
||||
return namedAccounts[0][0];
|
||||
}
|
||||
if (
|
||||
accounts[normalizedDefaultAccount] &&
|
||||
typeof accounts[normalizedDefaultAccount] === "object"
|
||||
namedAccounts.length > 1 &&
|
||||
accounts[DEFAULT_ACCOUNT_ID] &&
|
||||
typeof accounts[DEFAULT_ACCOUNT_ID] === "object"
|
||||
) {
|
||||
return normalizedDefaultAccount;
|
||||
return DEFAULT_ACCOUNT_ID;
|
||||
}
|
||||
return DEFAULT_ACCOUNT_ID;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user