mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-19 05:01:15 +00:00
fix(matrix): harden startup auth bootstrap (#61383)
Merged via squash.
Prepared head SHA: d8011a9308
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
This commit is contained in:
committed by
GitHub
parent
1373ac6c9e
commit
8d88c27f19
@@ -241,6 +241,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Discord/media: raise the default inbound and outbound media cap to `100MB` so Discord matches Telegram more closely and larger attachments stop failing on the old low default.
|
||||
- Matrix: keep direct transport requests on the pinned dispatcher by routing them through undici runtime fetch, so Matrix clients resume syncing on newer runtimes without dropping the validated address binding. (#61595) Thanks @gumadeiras.
|
||||
- Plugins/facades: resolve globally installed bundled-plugin runtime facades from registry roots so bundled channels like LINE still boot when the winning plugin install lives under the global extensions directory with an encoded scoped folder name. (#61297) Thanks @openperf.
|
||||
- Matrix: avoid failing startup when token auth already knows the user ID but still needs optional device metadata, retry transient auth bootstrap requests, and backfill missing device IDs after startup while keeping unknown-device storage reuse conservative until metadata is repaired. (#61383) Thanks @gumadeiras.
|
||||
|
||||
## 2026.4.2
|
||||
|
||||
|
||||
@@ -16,7 +16,9 @@ function createLookupFn(addresses: Array<{ address: string; family: number }>):
|
||||
}
|
||||
|
||||
const saveMatrixCredentialsMock = vi.hoisted(() => vi.fn());
|
||||
const saveBackfilledMatrixDeviceIdMock = vi.hoisted(() => vi.fn(async () => "saved"));
|
||||
const touchMatrixCredentialsMock = vi.hoisted(() => vi.fn());
|
||||
const repairCurrentTokenStorageMetaDeviceIdMock = vi.hoisted(() => vi.fn());
|
||||
|
||||
vi.mock("./credentials-read.js", () => ({
|
||||
loadMatrixCredentials: vi.fn(() => null),
|
||||
@@ -24,11 +26,21 @@ vi.mock("./credentials-read.js", () => ({
|
||||
}));
|
||||
|
||||
vi.mock("./credentials-write.runtime.js", () => ({
|
||||
saveBackfilledMatrixDeviceId: saveBackfilledMatrixDeviceIdMock,
|
||||
saveMatrixCredentials: saveMatrixCredentialsMock,
|
||||
touchMatrixCredentials: touchMatrixCredentialsMock,
|
||||
}));
|
||||
|
||||
vi.mock("./client/storage.js", async () => {
|
||||
const actual = await vi.importActual<typeof import("./client/storage.js")>("./client/storage.js");
|
||||
return {
|
||||
...actual,
|
||||
repairCurrentTokenStorageMetaDeviceId: repairCurrentTokenStorageMetaDeviceIdMock,
|
||||
};
|
||||
});
|
||||
|
||||
const {
|
||||
backfillMatrixAuthDeviceIdAfterStartup,
|
||||
getMatrixScopedEnvVarNames,
|
||||
resolveImplicitMatrixAccountId,
|
||||
resolveMatrixConfig,
|
||||
@@ -741,7 +753,9 @@ describe("resolveMatrixAuth", () => {
|
||||
vi.mocked(readModule.credentialsMatchConfig).mockReset();
|
||||
vi.mocked(readModule.credentialsMatchConfig).mockReturnValue(false);
|
||||
saveMatrixCredentialsMock.mockReset();
|
||||
saveBackfilledMatrixDeviceIdMock.mockReset().mockResolvedValue("saved");
|
||||
touchMatrixCredentialsMock.mockReset();
|
||||
repairCurrentTokenStorageMetaDeviceIdMock.mockReset().mockReturnValue(true);
|
||||
ensureMatrixSdkLoggingConfiguredMock.mockReset();
|
||||
matrixDoRequestMock.mockReset();
|
||||
setMatrixAuthClientDepsForTest({
|
||||
@@ -1098,6 +1112,253 @@ describe("resolveMatrixAuth", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("retries token whoami when startup auth hits a transient network error", async () => {
|
||||
matrixDoRequestMock
|
||||
.mockRejectedValueOnce(
|
||||
Object.assign(new TypeError("fetch failed"), {
|
||||
cause: Object.assign(new Error("read ECONNRESET"), {
|
||||
code: "ECONNRESET",
|
||||
}),
|
||||
}),
|
||||
)
|
||||
.mockResolvedValue({
|
||||
user_id: "@bot:example.org",
|
||||
device_id: "DEVICE123",
|
||||
});
|
||||
|
||||
const cfg = {
|
||||
channels: {
|
||||
matrix: {
|
||||
homeserver: "https://matrix.example.org",
|
||||
accessToken: "tok-123",
|
||||
},
|
||||
},
|
||||
} as CoreConfig;
|
||||
|
||||
const auth = await resolveMatrixAuth({
|
||||
cfg,
|
||||
env: {} as NodeJS.ProcessEnv,
|
||||
});
|
||||
|
||||
expect(matrixDoRequestMock).toHaveBeenCalledTimes(2);
|
||||
expect(auth).toMatchObject({
|
||||
userId: "@bot:example.org",
|
||||
deviceId: "DEVICE123",
|
||||
});
|
||||
});
|
||||
|
||||
it("does not call whoami when token auth already has a userId and only deviceId is missing", async () => {
|
||||
matrixDoRequestMock.mockRejectedValue(new Error("whoami should not be called"));
|
||||
|
||||
const cfg = {
|
||||
channels: {
|
||||
matrix: {
|
||||
homeserver: "https://matrix.example.org",
|
||||
userId: "@bot:example.org",
|
||||
accessToken: "tok-123",
|
||||
encryption: true,
|
||||
},
|
||||
},
|
||||
} as CoreConfig;
|
||||
|
||||
const auth = await resolveMatrixAuth({
|
||||
cfg,
|
||||
env: {} as NodeJS.ProcessEnv,
|
||||
});
|
||||
|
||||
expect(matrixDoRequestMock).not.toHaveBeenCalled();
|
||||
expect(auth).toMatchObject({
|
||||
accountId: "default",
|
||||
homeserver: "https://matrix.example.org",
|
||||
userId: "@bot:example.org",
|
||||
accessToken: "tok-123",
|
||||
deviceId: undefined,
|
||||
encryption: true,
|
||||
});
|
||||
});
|
||||
|
||||
it("retries password login when startup auth hits a transient network error", async () => {
|
||||
matrixDoRequestMock
|
||||
.mockRejectedValueOnce(
|
||||
Object.assign(new TypeError("fetch failed"), {
|
||||
cause: Object.assign(new Error("socket hang up"), {
|
||||
code: "ECONNRESET",
|
||||
}),
|
||||
}),
|
||||
)
|
||||
.mockResolvedValue({
|
||||
access_token: "tok-123",
|
||||
user_id: "@bot:example.org",
|
||||
device_id: "DEVICE123",
|
||||
});
|
||||
|
||||
const cfg = {
|
||||
channels: {
|
||||
matrix: {
|
||||
homeserver: "https://matrix.example.org",
|
||||
userId: "@bot:example.org",
|
||||
password: "secret", // pragma: allowlist secret
|
||||
},
|
||||
},
|
||||
} as CoreConfig;
|
||||
|
||||
const auth = await resolveMatrixAuth({
|
||||
cfg,
|
||||
env: {} as NodeJS.ProcessEnv,
|
||||
});
|
||||
|
||||
expect(matrixDoRequestMock).toHaveBeenCalledTimes(2);
|
||||
expect(auth).toMatchObject({
|
||||
accessToken: "tok-123",
|
||||
deviceId: "DEVICE123",
|
||||
});
|
||||
});
|
||||
|
||||
it("best-effort backfills a missing deviceId after startup", async () => {
|
||||
matrixDoRequestMock.mockResolvedValue({
|
||||
user_id: "@bot:example.org",
|
||||
device_id: "DEVICE123",
|
||||
});
|
||||
|
||||
const deviceId = await backfillMatrixAuthDeviceIdAfterStartup({
|
||||
auth: {
|
||||
accountId: "default",
|
||||
homeserver: "https://matrix.example.org",
|
||||
userId: "@bot:example.org",
|
||||
accessToken: "tok-123",
|
||||
},
|
||||
env: {} as NodeJS.ProcessEnv,
|
||||
});
|
||||
|
||||
expect(matrixDoRequestMock).toHaveBeenCalledWith("GET", "/_matrix/client/v3/account/whoami");
|
||||
expect(saveBackfilledMatrixDeviceIdMock).toHaveBeenCalledWith(
|
||||
{
|
||||
homeserver: "https://matrix.example.org",
|
||||
userId: "@bot:example.org",
|
||||
accessToken: "tok-123",
|
||||
deviceId: "DEVICE123",
|
||||
},
|
||||
expect.any(Object),
|
||||
"default",
|
||||
);
|
||||
expect(repairCurrentTokenStorageMetaDeviceIdMock).toHaveBeenCalledWith({
|
||||
homeserver: "https://matrix.example.org",
|
||||
userId: "@bot:example.org",
|
||||
accessToken: "tok-123",
|
||||
accountId: "default",
|
||||
deviceId: "DEVICE123",
|
||||
env: expect.any(Object),
|
||||
});
|
||||
expect(repairCurrentTokenStorageMetaDeviceIdMock.mock.invocationCallOrder[0]).toBeLessThan(
|
||||
saveBackfilledMatrixDeviceIdMock.mock.invocationCallOrder[0],
|
||||
);
|
||||
expect(deviceId).toBe("DEVICE123");
|
||||
});
|
||||
|
||||
it("skips deviceId backfill when auth already includes it", async () => {
|
||||
const deviceId = await backfillMatrixAuthDeviceIdAfterStartup({
|
||||
auth: {
|
||||
accountId: "default",
|
||||
homeserver: "https://matrix.example.org",
|
||||
userId: "@bot:example.org",
|
||||
accessToken: "tok-123",
|
||||
deviceId: "DEVICE123",
|
||||
},
|
||||
env: {} as NodeJS.ProcessEnv,
|
||||
});
|
||||
|
||||
expect(matrixDoRequestMock).not.toHaveBeenCalled();
|
||||
expect(saveMatrixCredentialsMock).not.toHaveBeenCalled();
|
||||
expect(saveBackfilledMatrixDeviceIdMock).not.toHaveBeenCalled();
|
||||
expect(repairCurrentTokenStorageMetaDeviceIdMock).not.toHaveBeenCalled();
|
||||
expect(deviceId).toBe("DEVICE123");
|
||||
});
|
||||
|
||||
it("fails before saving repaired credentials when storage metadata repair fails", async () => {
|
||||
matrixDoRequestMock.mockResolvedValue({
|
||||
user_id: "@bot:example.org",
|
||||
device_id: "DEVICE123",
|
||||
});
|
||||
repairCurrentTokenStorageMetaDeviceIdMock.mockReturnValue(false);
|
||||
|
||||
await expect(
|
||||
backfillMatrixAuthDeviceIdAfterStartup({
|
||||
auth: {
|
||||
accountId: "default",
|
||||
homeserver: "https://matrix.example.org",
|
||||
userId: "@bot:example.org",
|
||||
accessToken: "tok-123",
|
||||
},
|
||||
env: {} as NodeJS.ProcessEnv,
|
||||
}),
|
||||
).rejects.toThrow("Matrix deviceId backfill failed to repair current-token storage metadata");
|
||||
expect(saveBackfilledMatrixDeviceIdMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("skips stale deviceId backfill writes after newer credentials take over", async () => {
|
||||
matrixDoRequestMock.mockResolvedValue({
|
||||
user_id: "@bot:example.org",
|
||||
device_id: "DEVICE123",
|
||||
});
|
||||
vi.mocked(requireCredentialsReadModule().loadMatrixCredentials).mockReturnValue({
|
||||
homeserver: "https://matrix.example.org",
|
||||
userId: "@bot:example.org",
|
||||
accessToken: "tok-new",
|
||||
deviceId: "DEVICE999",
|
||||
createdAt: "2026-03-01T00:00:00.000Z",
|
||||
});
|
||||
|
||||
const deviceId = await backfillMatrixAuthDeviceIdAfterStartup({
|
||||
auth: {
|
||||
accountId: "default",
|
||||
homeserver: "https://matrix.example.org",
|
||||
userId: "@bot:example.org",
|
||||
accessToken: "tok-old",
|
||||
},
|
||||
env: {} as NodeJS.ProcessEnv,
|
||||
});
|
||||
|
||||
expect(deviceId).toBeUndefined();
|
||||
expect(repairCurrentTokenStorageMetaDeviceIdMock).not.toHaveBeenCalled();
|
||||
expect(saveBackfilledMatrixDeviceIdMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("skips persistence when startup backfill is aborted before whoami resolves", async () => {
|
||||
let resolveWhoami:
|
||||
| ((value: { user_id: string; device_id: string }) => void)
|
||||
| undefined;
|
||||
matrixDoRequestMock.mockImplementation(
|
||||
() =>
|
||||
new Promise((resolve) => {
|
||||
resolveWhoami = resolve;
|
||||
}),
|
||||
);
|
||||
const abortController = new AbortController();
|
||||
const backfillPromise = backfillMatrixAuthDeviceIdAfterStartup({
|
||||
auth: {
|
||||
accountId: "default",
|
||||
homeserver: "https://matrix.example.org",
|
||||
userId: "@bot:example.org",
|
||||
accessToken: "tok-123",
|
||||
},
|
||||
env: {} as NodeJS.ProcessEnv,
|
||||
abortSignal: abortController.signal,
|
||||
});
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(resolveWhoami).toBeTypeOf("function");
|
||||
});
|
||||
abortController.abort();
|
||||
resolveWhoami?.({
|
||||
user_id: "@bot:example.org",
|
||||
device_id: "DEVICE123",
|
||||
});
|
||||
|
||||
await expect(backfillPromise).resolves.toBeUndefined();
|
||||
expect(repairCurrentTokenStorageMetaDeviceIdMock).not.toHaveBeenCalled();
|
||||
expect(saveBackfilledMatrixDeviceIdMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("resolves file-backed accessToken SecretRefs during Matrix auth", async () => {
|
||||
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "matrix-secret-ref-"));
|
||||
const secretPath = path.join(tempDir, "token.txt");
|
||||
|
||||
@@ -2,6 +2,7 @@ export type { MatrixAuth } from "./client/types.js";
|
||||
export { isBunRuntime } from "./client/runtime.js";
|
||||
export { getMatrixScopedEnvVarNames } from "../env-vars.js";
|
||||
export {
|
||||
backfillMatrixAuthDeviceIdAfterStartup,
|
||||
hasReadyMatrixEnvAuth,
|
||||
resolveMatrixEnvAuthReadiness,
|
||||
resolveMatrixConfigForAccount,
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import type { PinnedDispatcherPolicy } from "openclaw/plugin-sdk/infra-runtime";
|
||||
import { formatErrorMessage, type PinnedDispatcherPolicy } from "openclaw/plugin-sdk/infra-runtime";
|
||||
import { coerceSecretRef } from "openclaw/plugin-sdk/provider-auth";
|
||||
import { retryAsync } from "openclaw/plugin-sdk/retry-runtime";
|
||||
import { normalizeResolvedSecretInputString } from "openclaw/plugin-sdk/secret-input";
|
||||
import {
|
||||
requiresExplicitMatrixDefaultAccount,
|
||||
@@ -25,7 +26,9 @@ import {
|
||||
normalizeOptionalAccountId,
|
||||
ssrfPolicyFromDangerouslyAllowPrivateNetwork,
|
||||
} from "./config-runtime-api.js";
|
||||
import { repairCurrentTokenStorageMetaDeviceId } from "./storage.js";
|
||||
import type { MatrixAuth, MatrixResolvedConfig } from "./types.js";
|
||||
import type { MatrixStoredCredentials } from "../credentials-read.js";
|
||||
|
||||
type MatrixAuthClientDeps = {
|
||||
MatrixClient: typeof import("../sdk.js").MatrixClient;
|
||||
@@ -46,6 +49,9 @@ let matrixCredentialsReadDepsPromise: Promise<MatrixCredentialsReadDeps> | undef
|
||||
let matrixSecretInputDepsPromise: Promise<MatrixSecretInputDeps> | undefined;
|
||||
let matrixAuthClientDepsForTest: MatrixAuthClientDeps | undefined;
|
||||
|
||||
const MATRIX_AUTH_REQUEST_RETRY_RE =
|
||||
/\b(fetch failed|econnreset|econnrefused|enotfound|etimedout|ehostunreach|enetunreach|eai_again|und_err_|socket hang up|network|headers timeout|body timeout|connect timeout)\b/i;
|
||||
|
||||
export function setMatrixAuthClientDepsForTest(
|
||||
deps?:
|
||||
| {
|
||||
@@ -87,6 +93,67 @@ async function loadMatrixSecretInputDeps(): Promise<MatrixSecretInputDeps> {
|
||||
return await matrixSecretInputDepsPromise;
|
||||
}
|
||||
|
||||
function shouldRetryMatrixAuthRequest(err: unknown): boolean {
|
||||
return MATRIX_AUTH_REQUEST_RETRY_RE.test(formatErrorMessage(err));
|
||||
}
|
||||
|
||||
function isAbortSignalTriggered(signal?: AbortSignal): boolean {
|
||||
return signal?.aborted === true;
|
||||
}
|
||||
|
||||
function credentialsMatchBackfillAuthLineage(params: {
|
||||
stored: MatrixStoredCredentials | null;
|
||||
auth: Pick<MatrixAuth, "homeserver" | "userId" | "accessToken">;
|
||||
}): boolean {
|
||||
if (!params.stored) {
|
||||
return true;
|
||||
}
|
||||
return (
|
||||
params.stored.homeserver === params.auth.homeserver &&
|
||||
params.stored.userId === params.auth.userId &&
|
||||
params.stored.accessToken === params.auth.accessToken
|
||||
);
|
||||
}
|
||||
|
||||
async function retryMatrixAuthRequest<T>(label: string, run: () => Promise<T>): Promise<T> {
|
||||
return await retryAsync(run, {
|
||||
attempts: 3,
|
||||
minDelayMs: 250,
|
||||
maxDelayMs: 1_500,
|
||||
jitter: 0.1,
|
||||
label,
|
||||
shouldRetry: (err) => shouldRetryMatrixAuthRequest(err),
|
||||
});
|
||||
}
|
||||
|
||||
async function fetchMatrixWhoamiIdentity(params: {
|
||||
homeserver: string;
|
||||
accessToken: string;
|
||||
userId?: string;
|
||||
ssrfPolicy?: MatrixResolvedConfig["ssrfPolicy"];
|
||||
dispatcherPolicy?: PinnedDispatcherPolicy;
|
||||
}): Promise<{
|
||||
user_id?: string;
|
||||
device_id?: string;
|
||||
}> {
|
||||
const { MatrixClient, ensureMatrixSdkLoggingConfigured } = await loadMatrixAuthClientDeps();
|
||||
ensureMatrixSdkLoggingConfigured();
|
||||
const tempClient = new MatrixClient(params.homeserver, params.accessToken, {
|
||||
userId: params.userId,
|
||||
ssrfPolicy: params.ssrfPolicy,
|
||||
dispatcherPolicy: params.dispatcherPolicy,
|
||||
});
|
||||
return (await retryMatrixAuthRequest("matrix auth whoami", async () => {
|
||||
return (await tempClient.doRequest("GET", "/_matrix/client/v3/account/whoami")) as {
|
||||
user_id?: string;
|
||||
device_id?: string;
|
||||
};
|
||||
})) as {
|
||||
user_id?: string;
|
||||
device_id?: string;
|
||||
};
|
||||
}
|
||||
|
||||
function readEnvSecretRefFallback(params: {
|
||||
value: unknown;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
@@ -741,28 +808,22 @@ export async function resolveMatrixAuth(params?: {
|
||||
? cachedCredentials?.deviceId || resolved.deviceId
|
||||
: resolved.deviceId;
|
||||
|
||||
if (!userId || !knownDeviceId) {
|
||||
// Fetch whoami when we need to resolve userId and/or deviceId from token auth.
|
||||
const { MatrixClient, ensureMatrixSdkLoggingConfigured } = await loadMatrixAuthClientDeps();
|
||||
ensureMatrixSdkLoggingConfigured();
|
||||
const tempClient = new MatrixClient(homeserver, accessToken, {
|
||||
if (!userId) {
|
||||
// Only block startup on whoami when token auth still needs the user ID.
|
||||
// A missing device ID alone is optional and should not force a network round-trip.
|
||||
const whoami = await fetchMatrixWhoamiIdentity({
|
||||
homeserver,
|
||||
accessToken,
|
||||
userId,
|
||||
ssrfPolicy: resolved.ssrfPolicy,
|
||||
dispatcherPolicy: resolved.dispatcherPolicy,
|
||||
});
|
||||
const whoami = (await tempClient.doRequest("GET", "/_matrix/client/v3/account/whoami")) as {
|
||||
user_id?: string;
|
||||
device_id?: string;
|
||||
};
|
||||
if (!userId) {
|
||||
const fetchedUserId = whoami.user_id?.trim();
|
||||
if (!fetchedUserId) {
|
||||
throw new Error("Matrix whoami did not return user_id");
|
||||
}
|
||||
userId = fetchedUserId;
|
||||
}
|
||||
if (!knownDeviceId) {
|
||||
knownDeviceId = whoami.device_id?.trim() || resolved.deviceId;
|
||||
const fetchedUserId = whoami.user_id?.trim();
|
||||
if (!fetchedUserId) {
|
||||
throw new Error("Matrix whoami did not return user_id");
|
||||
}
|
||||
userId = fetchedUserId;
|
||||
knownDeviceId = knownDeviceId || whoami.device_id?.trim() || resolved.deviceId;
|
||||
}
|
||||
|
||||
const shouldRefreshCachedCredentials =
|
||||
@@ -847,12 +908,18 @@ export async function resolveMatrixAuth(params?: {
|
||||
ssrfPolicy: resolved.ssrfPolicy,
|
||||
dispatcherPolicy: resolved.dispatcherPolicy,
|
||||
});
|
||||
const login = (await loginClient.doRequest("POST", "/_matrix/client/v3/login", undefined, {
|
||||
type: "m.login.password",
|
||||
identifier: { type: "m.id.user", user: resolved.userId },
|
||||
password,
|
||||
device_id: resolved.deviceId,
|
||||
initial_device_display_name: resolved.deviceName ?? "OpenClaw Gateway",
|
||||
const login = (await retryMatrixAuthRequest("matrix auth login", async () => {
|
||||
return (await loginClient.doRequest("POST", "/_matrix/client/v3/login", undefined, {
|
||||
type: "m.login.password",
|
||||
identifier: { type: "m.id.user", user: resolved.userId },
|
||||
password,
|
||||
device_id: resolved.deviceId,
|
||||
initial_device_display_name: resolved.deviceName ?? "OpenClaw Gateway",
|
||||
})) as {
|
||||
access_token?: string;
|
||||
user_id?: string;
|
||||
device_id?: string;
|
||||
};
|
||||
})) as {
|
||||
access_token?: string;
|
||||
user_id?: string;
|
||||
@@ -894,3 +961,71 @@ export async function resolveMatrixAuth(params?: {
|
||||
|
||||
return auth;
|
||||
}
|
||||
|
||||
export async function backfillMatrixAuthDeviceIdAfterStartup(params: {
|
||||
auth: MatrixAuth;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
abortSignal?: AbortSignal;
|
||||
}): Promise<string | undefined> {
|
||||
const knownDeviceId = params.auth.deviceId?.trim();
|
||||
if (knownDeviceId) {
|
||||
return knownDeviceId;
|
||||
}
|
||||
if (isAbortSignalTriggered(params.abortSignal)) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const whoami = await fetchMatrixWhoamiIdentity({
|
||||
homeserver: params.auth.homeserver,
|
||||
accessToken: params.auth.accessToken,
|
||||
userId: params.auth.userId,
|
||||
ssrfPolicy: params.auth.ssrfPolicy,
|
||||
dispatcherPolicy: params.auth.dispatcherPolicy,
|
||||
});
|
||||
const deviceId = whoami.device_id?.trim();
|
||||
if (!deviceId) {
|
||||
return undefined;
|
||||
}
|
||||
if (isAbortSignalTriggered(params.abortSignal)) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const env = params.env ?? process.env;
|
||||
const { loadMatrixCredentials } = await loadMatrixCredentialsReadDeps();
|
||||
if (
|
||||
!credentialsMatchBackfillAuthLineage({
|
||||
stored: loadMatrixCredentials(env, params.auth.accountId),
|
||||
auth: params.auth,
|
||||
})
|
||||
) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const repairedStorageMeta = repairCurrentTokenStorageMetaDeviceId({
|
||||
homeserver: params.auth.homeserver,
|
||||
userId: params.auth.userId,
|
||||
accessToken: params.auth.accessToken,
|
||||
accountId: params.auth.accountId,
|
||||
deviceId,
|
||||
env: params.env,
|
||||
});
|
||||
if (!repairedStorageMeta) {
|
||||
throw new Error("Matrix deviceId backfill failed to repair current-token storage metadata");
|
||||
}
|
||||
if (isAbortSignalTriggered(params.abortSignal)) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const credentialsWriter = await import("../credentials-write.runtime.js");
|
||||
const saved = await credentialsWriter.saveBackfilledMatrixDeviceId(
|
||||
{
|
||||
homeserver: params.auth.homeserver,
|
||||
userId: params.auth.userId,
|
||||
accessToken: params.auth.accessToken,
|
||||
deviceId,
|
||||
},
|
||||
env,
|
||||
params.auth.accountId,
|
||||
);
|
||||
return saved === "saved" ? deviceId : undefined;
|
||||
}
|
||||
|
||||
@@ -100,6 +100,31 @@ describe("FileBackedMatrixSyncStore", () => {
|
||||
expect(secondStore.hasSavedSyncFromCleanShutdown()).toBe(false);
|
||||
});
|
||||
|
||||
it("claims current-token storage ownership when sync state is persisted", async () => {
|
||||
const storagePath = createStoragePath();
|
||||
const rootDir = path.dirname(storagePath);
|
||||
fs.writeFileSync(
|
||||
path.join(rootDir, "storage-meta.json"),
|
||||
JSON.stringify({
|
||||
homeserver: "https://matrix.example.org",
|
||||
userId: "@bot:example.org",
|
||||
accountId: "default",
|
||||
accessTokenHash: "token-hash",
|
||||
deviceId: null,
|
||||
}),
|
||||
"utf8",
|
||||
);
|
||||
|
||||
const store = new FileBackedMatrixSyncStore(storagePath);
|
||||
await store.setSyncData(createSyncResponse("claimed-token"));
|
||||
await store.flush();
|
||||
|
||||
const meta = JSON.parse(fs.readFileSync(path.join(rootDir, "storage-meta.json"), "utf8")) as {
|
||||
currentTokenStateClaimed?: boolean;
|
||||
};
|
||||
expect(meta.currentTokenStateClaimed).toBe(true);
|
||||
});
|
||||
|
||||
it("only treats sync state as restart-safe after a clean shutdown persist", async () => {
|
||||
const storagePath = createStoragePath();
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { readFileSync } from "node:fs";
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import {
|
||||
Category,
|
||||
MemoryStore,
|
||||
@@ -12,6 +13,7 @@ import {
|
||||
import { writeJsonFileAtomically } from "openclaw/plugin-sdk/json-store";
|
||||
import { createAsyncLock } from "../async-lock.js";
|
||||
import { LogService } from "../sdk/logger.js";
|
||||
import { claimCurrentTokenStorageState } from "./storage.js";
|
||||
|
||||
const STORE_VERSION = 1;
|
||||
const PERSIST_DEBOUNCE_MS = 250;
|
||||
@@ -278,6 +280,9 @@ export class FileBackedMatrixSyncStore extends MemoryStore {
|
||||
try {
|
||||
await this.persistLock(async () => {
|
||||
await writeJsonFileAtomically(this.storagePath, payload);
|
||||
claimCurrentTokenStorageState({
|
||||
rootDir: path.dirname(this.storagePath),
|
||||
});
|
||||
});
|
||||
} catch (err) {
|
||||
this.dirty = true;
|
||||
|
||||
@@ -5,7 +5,9 @@ import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { resolveMatrixAccountStorageRoot } from "../../../runtime-api.js";
|
||||
import { installMatrixTestRuntime } from "../../test-runtime.js";
|
||||
import {
|
||||
claimCurrentTokenStorageState,
|
||||
maybeMigrateLegacyStorage,
|
||||
repairCurrentTokenStorageMetaDeviceId,
|
||||
resolveMatrixStateFilePath,
|
||||
resolveMatrixStoragePaths,
|
||||
} from "./storage.js";
|
||||
@@ -408,8 +410,8 @@ describe("matrix client storage paths", () => {
|
||||
expect(fs.existsSync(path.join(legacyRoot, "bot-storage.json"))).toBe(true);
|
||||
});
|
||||
|
||||
it("reuses an existing token-hash storage root after the access token changes", () => {
|
||||
setupStateDir();
|
||||
it("keeps the canonical current-token storage root when deviceId is still unknown", () => {
|
||||
const stateDir = setupStateDir();
|
||||
const oldStoragePaths = seedExistingStorageRoot({
|
||||
accessToken: "secret-token-old",
|
||||
});
|
||||
@@ -417,10 +419,16 @@ describe("matrix client storage paths", () => {
|
||||
const rotatedStoragePaths = resolveDefaultStoragePaths({
|
||||
accessToken: "secret-token-new",
|
||||
});
|
||||
const canonicalPaths = resolveMatrixAccountStorageRoot({
|
||||
stateDir,
|
||||
homeserver: defaultStorageAuth.homeserver,
|
||||
userId: defaultStorageAuth.userId,
|
||||
accessToken: "secret-token-new",
|
||||
});
|
||||
|
||||
expect(rotatedStoragePaths.rootDir).toBe(oldStoragePaths.rootDir);
|
||||
expect(rotatedStoragePaths.tokenHash).toBe(oldStoragePaths.tokenHash);
|
||||
expect(rotatedStoragePaths.storagePath).toBe(oldStoragePaths.storagePath);
|
||||
expect(rotatedStoragePaths.rootDir).toBe(canonicalPaths.rootDir);
|
||||
expect(rotatedStoragePaths.tokenHash).toBe(canonicalPaths.tokenHash);
|
||||
expect(rotatedStoragePaths.rootDir).not.toBe(oldStoragePaths.rootDir);
|
||||
});
|
||||
|
||||
it("reuses an existing token-hash storage root for the same device after the access token changes", () => {
|
||||
@@ -447,7 +455,7 @@ describe("matrix client storage paths", () => {
|
||||
expect(rotatedStoragePaths.storagePath).toBe(oldStoragePaths.storagePath);
|
||||
});
|
||||
|
||||
it("prefers a populated older token-hash storage root over a newer empty root", () => {
|
||||
it("does not reuse a populated older token-hash root while deviceId is unknown", () => {
|
||||
const stateDir = setupStateDir();
|
||||
const oldStoragePaths = seedExistingStorageRoot({
|
||||
accessToken: "secret-token-old",
|
||||
@@ -465,8 +473,9 @@ describe("matrix client storage paths", () => {
|
||||
accessToken: "secret-token-new",
|
||||
});
|
||||
|
||||
expect(resolvedPaths.rootDir).toBe(oldStoragePaths.rootDir);
|
||||
expect(resolvedPaths.tokenHash).toBe(oldStoragePaths.tokenHash);
|
||||
expect(resolvedPaths.rootDir).toBe(newerCanonicalPaths.rootDir);
|
||||
expect(resolvedPaths.tokenHash).toBe(newerCanonicalPaths.tokenHash);
|
||||
expect(resolvedPaths.rootDir).not.toBe(oldStoragePaths.rootDir);
|
||||
});
|
||||
|
||||
it("does not reuse a populated sibling storage root from a different device", () => {
|
||||
@@ -486,4 +495,140 @@ describe("matrix client storage paths", () => {
|
||||
});
|
||||
expectCanonicalRootForNewDevice(stateDir);
|
||||
});
|
||||
|
||||
it("keeps the current-token storage root stable after deviceId backfill when startup claimed state there", () => {
|
||||
const stateDir = setupStateDir();
|
||||
const canonicalPaths = resolveMatrixAccountStorageRoot({
|
||||
stateDir,
|
||||
homeserver: defaultStorageAuth.homeserver,
|
||||
userId: defaultStorageAuth.userId,
|
||||
accessToken: "secret-token-new",
|
||||
});
|
||||
fs.mkdirSync(canonicalPaths.rootDir, { recursive: true });
|
||||
writeJson(canonicalPaths.rootDir, "storage-meta.json", {
|
||||
homeserver: defaultStorageAuth.homeserver,
|
||||
userId: defaultStorageAuth.userId,
|
||||
accountId: "default",
|
||||
accessTokenHash: canonicalPaths.tokenHash,
|
||||
deviceId: null,
|
||||
});
|
||||
writeJson(canonicalPaths.rootDir, "thread-bindings.json", {
|
||||
version: 1,
|
||||
bindings: [
|
||||
{
|
||||
accountId: "default",
|
||||
conversationId: "$thread-new",
|
||||
targetKind: "subagent",
|
||||
targetSessionKey: "agent:ops:subagent:new",
|
||||
boundAt: 1,
|
||||
lastActivityAt: 1,
|
||||
},
|
||||
],
|
||||
});
|
||||
expect(
|
||||
claimCurrentTokenStorageState({
|
||||
rootDir: canonicalPaths.rootDir,
|
||||
}),
|
||||
).toBe(true);
|
||||
const oldStoragePaths = seedExistingStorageRoot({
|
||||
accessToken: "secret-token-old",
|
||||
deviceId: "DEVICE123",
|
||||
storageMeta: {
|
||||
homeserver: defaultStorageAuth.homeserver,
|
||||
userId: defaultStorageAuth.userId,
|
||||
accountId: "default",
|
||||
accessTokenHash: resolveDefaultStoragePaths({ accessToken: "secret-token-old" }).tokenHash,
|
||||
deviceId: "DEVICE123",
|
||||
},
|
||||
});
|
||||
fs.mkdirSync(oldStoragePaths.cryptoPath, { recursive: true });
|
||||
writeJson(oldStoragePaths.rootDir, "startup-verification.json", {
|
||||
deviceId: "DEVICE123",
|
||||
});
|
||||
|
||||
repairCurrentTokenStorageMetaDeviceId({
|
||||
homeserver: defaultStorageAuth.homeserver,
|
||||
userId: defaultStorageAuth.userId,
|
||||
accessToken: "secret-token-new",
|
||||
accountId: "default",
|
||||
deviceId: "DEVICE123",
|
||||
env: createMigrationEnv(stateDir),
|
||||
});
|
||||
|
||||
const repairedMeta = JSON.parse(
|
||||
fs.readFileSync(path.join(canonicalPaths.rootDir, "storage-meta.json"), "utf8"),
|
||||
) as { deviceId?: string | null };
|
||||
|
||||
expect(repairedMeta.deviceId).toBe("DEVICE123");
|
||||
const startupPaths = resolveDefaultStoragePaths({
|
||||
accessToken: "secret-token-new",
|
||||
});
|
||||
expect(startupPaths.rootDir).toBe(canonicalPaths.rootDir);
|
||||
const restartedPaths = resolveDefaultStoragePaths({
|
||||
accessToken: "secret-token-new",
|
||||
deviceId: "DEVICE123",
|
||||
});
|
||||
expect(restartedPaths.rootDir).toBe(canonicalPaths.rootDir);
|
||||
});
|
||||
|
||||
it("does not keep the current-token storage root sticky when only marker files exist after backfill", () => {
|
||||
const stateDir = setupStateDir();
|
||||
const canonicalPaths = resolveMatrixAccountStorageRoot({
|
||||
stateDir,
|
||||
homeserver: defaultStorageAuth.homeserver,
|
||||
userId: defaultStorageAuth.userId,
|
||||
accessToken: "secret-token-new",
|
||||
});
|
||||
fs.mkdirSync(canonicalPaths.rootDir, { recursive: true });
|
||||
writeJson(canonicalPaths.rootDir, "storage-meta.json", {
|
||||
homeserver: defaultStorageAuth.homeserver,
|
||||
userId: defaultStorageAuth.userId,
|
||||
accountId: "default",
|
||||
accessTokenHash: canonicalPaths.tokenHash,
|
||||
deviceId: null,
|
||||
});
|
||||
writeJson(canonicalPaths.rootDir, "startup-verification.json", {
|
||||
deviceId: "DEVICE123",
|
||||
});
|
||||
const oldStoragePaths = seedExistingStorageRoot({
|
||||
accessToken: "secret-token-old",
|
||||
deviceId: "DEVICE123",
|
||||
storageMeta: {
|
||||
homeserver: defaultStorageAuth.homeserver,
|
||||
userId: defaultStorageAuth.userId,
|
||||
accountId: "default",
|
||||
accessTokenHash: resolveDefaultStoragePaths({ accessToken: "secret-token-old" }).tokenHash,
|
||||
deviceId: "DEVICE123",
|
||||
},
|
||||
});
|
||||
fs.mkdirSync(oldStoragePaths.cryptoPath, { recursive: true });
|
||||
writeJson(oldStoragePaths.rootDir, "thread-bindings.json", {
|
||||
version: 1,
|
||||
bindings: [
|
||||
{
|
||||
accountId: "default",
|
||||
conversationId: "$thread-old",
|
||||
targetKind: "subagent",
|
||||
targetSessionKey: "agent:ops:subagent:old",
|
||||
boundAt: 1,
|
||||
lastActivityAt: 1,
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
repairCurrentTokenStorageMetaDeviceId({
|
||||
homeserver: defaultStorageAuth.homeserver,
|
||||
userId: defaultStorageAuth.userId,
|
||||
accessToken: "secret-token-new",
|
||||
accountId: "default",
|
||||
deviceId: "DEVICE123",
|
||||
env: createMigrationEnv(stateDir),
|
||||
});
|
||||
|
||||
const restartedPaths = resolveDefaultStoragePaths({
|
||||
accessToken: "secret-token-new",
|
||||
deviceId: "DEVICE123",
|
||||
});
|
||||
expect(restartedPaths.rootDir).toBe(oldStoragePaths.rootDir);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -34,6 +34,8 @@ type StoredRootMetadata = {
|
||||
accountId?: string;
|
||||
accessTokenHash?: string;
|
||||
deviceId?: string | null;
|
||||
currentTokenStateClaimed?: boolean;
|
||||
createdAt?: string;
|
||||
};
|
||||
|
||||
function resolveLegacyStoragePaths(env: NodeJS.ProcessEnv = process.env): {
|
||||
@@ -121,6 +123,12 @@ function readStoredRootMetadata(rootDir: string): StoredRootMetadata {
|
||||
if (typeof parsed.deviceId === "string" && parsed.deviceId.trim()) {
|
||||
metadata.deviceId = parsed.deviceId.trim();
|
||||
}
|
||||
if (parsed.currentTokenStateClaimed === true) {
|
||||
metadata.currentTokenStateClaimed = true;
|
||||
}
|
||||
if (typeof parsed.createdAt === "string" && parsed.createdAt.trim()) {
|
||||
metadata.createdAt = parsed.createdAt.trim();
|
||||
}
|
||||
} catch {
|
||||
// ignore missing or malformed storage metadata
|
||||
}
|
||||
@@ -198,6 +206,27 @@ function resolvePreferredMatrixStorageRoot(params: {
|
||||
mtimeMs: resolveStorageRootMtimeMs(params.canonicalRootDir),
|
||||
};
|
||||
|
||||
// Without a confirmed device identity, reusing a populated sibling root after
|
||||
// token rotation can silently bind this run to the wrong Matrix device state.
|
||||
if (!params.deviceId?.trim()) {
|
||||
return {
|
||||
rootDir: best.rootDir,
|
||||
tokenHash: best.tokenHash,
|
||||
};
|
||||
}
|
||||
|
||||
const canonicalMetadata = readStoredRootMetadata(params.canonicalRootDir);
|
||||
if (
|
||||
canonicalMetadata.accessTokenHash === params.canonicalTokenHash &&
|
||||
canonicalMetadata.deviceId?.trim() === params.deviceId.trim() &&
|
||||
canonicalMetadata.currentTokenStateClaimed === true
|
||||
) {
|
||||
return {
|
||||
rootDir: best.rootDir,
|
||||
tokenHash: best.tokenHash,
|
||||
};
|
||||
}
|
||||
|
||||
let siblingEntries: fs.Dirent[] = [];
|
||||
try {
|
||||
siblingEntries = fs.readdirSync(parentDir, { withFileTypes: true });
|
||||
@@ -429,25 +458,86 @@ function rollbackLegacyMoves(moved: LegacyMoveRecord[]): string | null {
|
||||
return null;
|
||||
}
|
||||
|
||||
function writeStoredRootMetadata(
|
||||
metaPath: string,
|
||||
payload: {
|
||||
homeserver?: string;
|
||||
userId?: string;
|
||||
accountId: string;
|
||||
accessTokenHash?: string;
|
||||
deviceId: string | null;
|
||||
currentTokenStateClaimed: boolean;
|
||||
createdAt: string;
|
||||
},
|
||||
): boolean {
|
||||
try {
|
||||
fs.mkdirSync(path.dirname(metaPath), { recursive: true });
|
||||
fs.writeFileSync(metaPath, JSON.stringify(payload, null, 2), "utf-8");
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
export function writeStorageMeta(params: {
|
||||
storagePaths: MatrixStoragePaths;
|
||||
homeserver: string;
|
||||
userId: string;
|
||||
accountId?: string | null;
|
||||
deviceId?: string | null;
|
||||
}): void {
|
||||
try {
|
||||
const payload = {
|
||||
homeserver: params.homeserver,
|
||||
userId: params.userId,
|
||||
accountId: params.accountId ?? DEFAULT_ACCOUNT_KEY,
|
||||
accessTokenHash: params.storagePaths.tokenHash,
|
||||
deviceId: params.deviceId ?? null,
|
||||
createdAt: new Date().toISOString(),
|
||||
};
|
||||
fs.mkdirSync(params.storagePaths.rootDir, { recursive: true });
|
||||
fs.writeFileSync(params.storagePaths.metaPath, JSON.stringify(payload, null, 2), "utf-8");
|
||||
} catch {
|
||||
// ignore meta write failures
|
||||
}
|
||||
currentTokenStateClaimed?: boolean;
|
||||
}): boolean {
|
||||
const existing = readStoredRootMetadata(params.storagePaths.rootDir);
|
||||
return writeStoredRootMetadata(params.storagePaths.metaPath, {
|
||||
homeserver: params.homeserver,
|
||||
userId: params.userId,
|
||||
accountId: params.accountId ?? DEFAULT_ACCOUNT_KEY,
|
||||
accessTokenHash: params.storagePaths.tokenHash,
|
||||
deviceId: params.deviceId ?? null,
|
||||
currentTokenStateClaimed:
|
||||
params.currentTokenStateClaimed ?? existing.currentTokenStateClaimed === true,
|
||||
createdAt: existing.createdAt ?? new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
|
||||
export function claimCurrentTokenStorageState(params: { rootDir: string }): boolean {
|
||||
const metadata = readStoredRootMetadata(params.rootDir);
|
||||
if (!metadata.accessTokenHash?.trim()) {
|
||||
return false;
|
||||
}
|
||||
return writeStoredRootMetadata(path.join(params.rootDir, STORAGE_META_FILENAME), {
|
||||
homeserver: metadata.homeserver,
|
||||
userId: metadata.userId,
|
||||
accountId: metadata.accountId ?? DEFAULT_ACCOUNT_KEY,
|
||||
accessTokenHash: metadata.accessTokenHash,
|
||||
deviceId: metadata.deviceId ?? null,
|
||||
currentTokenStateClaimed: true,
|
||||
createdAt: metadata.createdAt ?? new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
|
||||
export function repairCurrentTokenStorageMetaDeviceId(params: {
|
||||
homeserver: string;
|
||||
userId: string;
|
||||
accessToken: string;
|
||||
accountId?: string | null;
|
||||
deviceId: string;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
stateDir?: string;
|
||||
}): boolean {
|
||||
const storagePaths = resolveMatrixStoragePaths({
|
||||
homeserver: params.homeserver,
|
||||
userId: params.userId,
|
||||
accessToken: params.accessToken,
|
||||
accountId: params.accountId,
|
||||
env: params.env,
|
||||
stateDir: params.stateDir,
|
||||
});
|
||||
return writeStorageMeta({
|
||||
storagePaths,
|
||||
homeserver: params.homeserver,
|
||||
userId: params.userId,
|
||||
accountId: params.accountId,
|
||||
deviceId: params.deviceId,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import type {
|
||||
saveBackfilledMatrixDeviceId as saveBackfilledMatrixDeviceIdType,
|
||||
saveMatrixCredentials as saveMatrixCredentialsType,
|
||||
touchMatrixCredentials as touchMatrixCredentialsType,
|
||||
} from "./credentials.js";
|
||||
@@ -10,6 +11,13 @@ export async function saveMatrixCredentials(
|
||||
return runtime.saveMatrixCredentials(...args);
|
||||
}
|
||||
|
||||
export async function saveBackfilledMatrixDeviceId(
|
||||
...args: Parameters<typeof saveBackfilledMatrixDeviceIdType>
|
||||
): ReturnType<typeof saveBackfilledMatrixDeviceIdType> {
|
||||
const runtime = await import("./credentials.js");
|
||||
return runtime.saveBackfilledMatrixDeviceId(...args);
|
||||
}
|
||||
|
||||
export async function touchMatrixCredentials(
|
||||
...args: Parameters<typeof touchMatrixCredentialsType>
|
||||
): ReturnType<typeof touchMatrixCredentialsType> {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import fs from "node:fs";
|
||||
import fsPromises from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
@@ -8,6 +9,7 @@ import {
|
||||
loadMatrixCredentials,
|
||||
clearMatrixCredentials,
|
||||
resolveMatrixCredentialsPath,
|
||||
saveBackfilledMatrixDeviceId,
|
||||
saveMatrixCredentials,
|
||||
touchMatrixCredentials,
|
||||
} from "./credentials.js";
|
||||
@@ -84,6 +86,135 @@ describe("matrix credentials storage", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("backfill updates deviceId when credentials still match the same auth lineage", async () => {
|
||||
setupStateDir();
|
||||
await saveMatrixCredentials(
|
||||
{
|
||||
homeserver: "https://matrix.example.org",
|
||||
userId: "@bot:example.org",
|
||||
accessToken: "tok-123",
|
||||
},
|
||||
{},
|
||||
"default",
|
||||
);
|
||||
|
||||
await expect(
|
||||
saveBackfilledMatrixDeviceId(
|
||||
{
|
||||
homeserver: "https://matrix.example.org",
|
||||
userId: "@bot:example.org",
|
||||
accessToken: "tok-123",
|
||||
deviceId: "DEVICE123",
|
||||
},
|
||||
{},
|
||||
"default",
|
||||
),
|
||||
).resolves.toBe("saved");
|
||||
|
||||
expect(loadMatrixCredentials({}, "default")).toMatchObject({
|
||||
accessToken: "tok-123",
|
||||
deviceId: "DEVICE123",
|
||||
});
|
||||
});
|
||||
|
||||
it("backfill skips when newer credentials already changed the token", async () => {
|
||||
setupStateDir();
|
||||
await saveMatrixCredentials(
|
||||
{
|
||||
homeserver: "https://matrix.example.org",
|
||||
userId: "@bot:example.org",
|
||||
accessToken: "tok-new",
|
||||
deviceId: "DEVICE999",
|
||||
},
|
||||
{},
|
||||
"default",
|
||||
);
|
||||
|
||||
await expect(
|
||||
saveBackfilledMatrixDeviceId(
|
||||
{
|
||||
homeserver: "https://matrix.example.org",
|
||||
userId: "@bot:example.org",
|
||||
accessToken: "tok-old",
|
||||
deviceId: "DEVICE123",
|
||||
},
|
||||
{},
|
||||
"default",
|
||||
),
|
||||
).resolves.toBe("skipped");
|
||||
|
||||
expect(loadMatrixCredentials({}, "default")).toMatchObject({
|
||||
accessToken: "tok-new",
|
||||
deviceId: "DEVICE999",
|
||||
});
|
||||
});
|
||||
|
||||
it("serializes stale backfill writes behind newer credential saves", async () => {
|
||||
setupStateDir();
|
||||
await saveMatrixCredentials(
|
||||
{
|
||||
homeserver: "https://matrix.example.org",
|
||||
userId: "@bot:example.org",
|
||||
accessToken: "tok-old",
|
||||
},
|
||||
{},
|
||||
"default",
|
||||
);
|
||||
|
||||
let releaseFirstWrite: (() => void) | undefined;
|
||||
let firstWriteStarted = false;
|
||||
const originalRename = fsPromises.rename.bind(fsPromises);
|
||||
const renameSpy = vi
|
||||
.spyOn(fsPromises, "rename")
|
||||
.mockImplementation(async (...args: Parameters<typeof fsPromises.rename>) => {
|
||||
if (!firstWriteStarted) {
|
||||
firstWriteStarted = true;
|
||||
await new Promise<void>((resolve) => {
|
||||
releaseFirstWrite = resolve;
|
||||
});
|
||||
}
|
||||
await originalRename(...args);
|
||||
});
|
||||
|
||||
try {
|
||||
const staleBackfillPromise = saveBackfilledMatrixDeviceId(
|
||||
{
|
||||
homeserver: "https://matrix.example.org",
|
||||
userId: "@bot:example.org",
|
||||
accessToken: "tok-old",
|
||||
deviceId: "DEVICE123",
|
||||
},
|
||||
{},
|
||||
"default",
|
||||
);
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(firstWriteStarted).toBe(true);
|
||||
});
|
||||
|
||||
const newerSavePromise = saveMatrixCredentials(
|
||||
{
|
||||
homeserver: "https://matrix.example.org",
|
||||
userId: "@bot:example.org",
|
||||
accessToken: "tok-new",
|
||||
deviceId: "DEVICE999",
|
||||
},
|
||||
{},
|
||||
"default",
|
||||
);
|
||||
|
||||
releaseFirstWrite?.();
|
||||
await Promise.all([staleBackfillPromise, newerSavePromise]);
|
||||
|
||||
expect(loadMatrixCredentials({}, "default")).toMatchObject({
|
||||
accessToken: "tok-new",
|
||||
deviceId: "DEVICE999",
|
||||
});
|
||||
} finally {
|
||||
renameSpy.mockRestore();
|
||||
}
|
||||
});
|
||||
|
||||
it("migrates legacy matrix credential files on read", async () => {
|
||||
const stateDir = setupStateDir({
|
||||
channels: {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { writeJsonFileAtomically } from "../runtime-api.js";
|
||||
import { createAsyncLock, type AsyncLock } from "./async-lock.js";
|
||||
import { loadMatrixCredentials, resolveMatrixCredentialsPath } from "./credentials-read.js";
|
||||
import type { MatrixStoredCredentials } from "./credentials-read.js";
|
||||
|
||||
@@ -11,35 +12,84 @@ export {
|
||||
} from "./credentials-read.js";
|
||||
export type { MatrixStoredCredentials } from "./credentials-read.js";
|
||||
|
||||
const credentialWriteLocks = new Map<string, AsyncLock>();
|
||||
|
||||
function withCredentialWriteLock<T>(credPath: string, fn: () => Promise<T>): Promise<T> {
|
||||
let withLock = credentialWriteLocks.get(credPath);
|
||||
if (!withLock) {
|
||||
withLock = createAsyncLock();
|
||||
credentialWriteLocks.set(credPath, withLock);
|
||||
}
|
||||
return withLock(fn);
|
||||
}
|
||||
|
||||
async function writeMatrixCredentialsUnlocked(params: {
|
||||
credPath: string;
|
||||
credentials: Omit<MatrixStoredCredentials, "createdAt" | "lastUsedAt">;
|
||||
existing: MatrixStoredCredentials | null;
|
||||
}): Promise<void> {
|
||||
const now = new Date().toISOString();
|
||||
const toSave: MatrixStoredCredentials = {
|
||||
...params.credentials,
|
||||
createdAt: params.existing?.createdAt ?? now,
|
||||
lastUsedAt: now,
|
||||
};
|
||||
await writeJsonFileAtomically(params.credPath, toSave);
|
||||
}
|
||||
|
||||
export async function saveMatrixCredentials(
|
||||
credentials: Omit<MatrixStoredCredentials, "createdAt" | "lastUsedAt">,
|
||||
env: NodeJS.ProcessEnv = process.env,
|
||||
accountId?: string | null,
|
||||
): Promise<void> {
|
||||
const credPath = resolveMatrixCredentialsPath(env, accountId);
|
||||
await withCredentialWriteLock(credPath, async () => {
|
||||
await writeMatrixCredentialsUnlocked({
|
||||
credPath,
|
||||
credentials,
|
||||
existing: loadMatrixCredentials(env, accountId),
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
const existing = loadMatrixCredentials(env, accountId);
|
||||
const now = new Date().toISOString();
|
||||
export async function saveBackfilledMatrixDeviceId(
|
||||
credentials: Omit<MatrixStoredCredentials, "createdAt" | "lastUsedAt">,
|
||||
env: NodeJS.ProcessEnv = process.env,
|
||||
accountId?: string | null,
|
||||
): Promise<"saved" | "skipped"> {
|
||||
const credPath = resolveMatrixCredentialsPath(env, accountId);
|
||||
return await withCredentialWriteLock(credPath, async () => {
|
||||
const existing = loadMatrixCredentials(env, accountId);
|
||||
if (
|
||||
existing &&
|
||||
(existing.homeserver !== credentials.homeserver ||
|
||||
existing.userId !== credentials.userId ||
|
||||
existing.accessToken !== credentials.accessToken)
|
||||
) {
|
||||
return "skipped";
|
||||
}
|
||||
|
||||
const toSave: MatrixStoredCredentials = {
|
||||
...credentials,
|
||||
createdAt: existing?.createdAt ?? now,
|
||||
lastUsedAt: now,
|
||||
};
|
||||
|
||||
await writeJsonFileAtomically(credPath, toSave);
|
||||
await writeMatrixCredentialsUnlocked({
|
||||
credPath,
|
||||
credentials,
|
||||
existing,
|
||||
});
|
||||
return "saved";
|
||||
});
|
||||
}
|
||||
|
||||
export async function touchMatrixCredentials(
|
||||
env: NodeJS.ProcessEnv = process.env,
|
||||
accountId?: string | null,
|
||||
): Promise<void> {
|
||||
const existing = loadMatrixCredentials(env, accountId);
|
||||
if (!existing) {
|
||||
return;
|
||||
}
|
||||
|
||||
existing.lastUsedAt = new Date().toISOString();
|
||||
const credPath = resolveMatrixCredentialsPath(env, accountId);
|
||||
await writeJsonFileAtomically(credPath, existing);
|
||||
await withCredentialWriteLock(credPath, async () => {
|
||||
const existing = loadMatrixCredentials(env, accountId);
|
||||
if (!existing) {
|
||||
return;
|
||||
}
|
||||
|
||||
existing.lastUsedAt = new Date().toISOString();
|
||||
await writeJsonFileAtomically(credPath, existing);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -57,7 +57,9 @@ const hoisted = vi.hoisted(() => {
|
||||
const releaseSharedClientInstance = vi.fn(async () => true);
|
||||
const setActiveMatrixClient = vi.fn();
|
||||
const setMatrixRuntime = vi.fn();
|
||||
const backfillMatrixAuthDeviceIdAfterStartup = vi.fn(async () => undefined);
|
||||
return {
|
||||
backfillMatrixAuthDeviceIdAfterStartup,
|
||||
callOrder,
|
||||
accountConfig,
|
||||
client,
|
||||
@@ -222,6 +224,7 @@ vi.mock("../active-client.js", () => ({
|
||||
}));
|
||||
|
||||
vi.mock("../client.js", () => ({
|
||||
backfillMatrixAuthDeviceIdAfterStartup: hoisted.backfillMatrixAuthDeviceIdAfterStartup,
|
||||
isBunRuntime: () => false,
|
||||
resolveMatrixAuth: vi.fn(async () => ({
|
||||
accountId: "default",
|
||||
@@ -370,6 +373,7 @@ describe("monitorMatrixProvider", () => {
|
||||
hoisted.inboundDeduper.releaseEvent.mockReset();
|
||||
hoisted.inboundDeduper.flush.mockReset().mockResolvedValue(undefined);
|
||||
hoisted.inboundDeduper.stop.mockReset().mockResolvedValue(undefined);
|
||||
hoisted.backfillMatrixAuthDeviceIdAfterStartup.mockReset().mockResolvedValue(undefined);
|
||||
hoisted.createMatrixRoomMessageHandler.mockReset().mockReturnValue(vi.fn());
|
||||
Object.values(hoisted.logger).forEach((mock) => mock.mockReset());
|
||||
});
|
||||
@@ -408,6 +412,28 @@ describe("monitorMatrixProvider", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("starts monitoring without waiting for best-effort deviceId backfill", async () => {
|
||||
hoisted.backfillMatrixAuthDeviceIdAfterStartup.mockImplementation(
|
||||
() => new Promise<undefined>(() => {}),
|
||||
);
|
||||
|
||||
const abortController = new AbortController();
|
||||
const monitorPromise = monitorMatrixProvider({ abortSignal: abortController.signal });
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(hoisted.callOrder).toContain("start-client");
|
||||
expect(hoisted.backfillMatrixAuthDeviceIdAfterStartup).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
expect(hoisted.backfillMatrixAuthDeviceIdAfterStartup).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
abortSignal: abortController.signal,
|
||||
}),
|
||||
);
|
||||
|
||||
abortController.abort();
|
||||
await expect(monitorPromise).resolves.toBeUndefined();
|
||||
});
|
||||
|
||||
it("cleans up thread bindings and shared clients when startup fails", async () => {
|
||||
hoisted.state.startClientError = new Error("start failed");
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ import { resolveMatrixAccountConfig } from "../account-config.js";
|
||||
import { resolveConfiguredMatrixBotUserIds } from "../accounts.js";
|
||||
import { setActiveMatrixClient } from "../active-client.js";
|
||||
import {
|
||||
backfillMatrixAuthDeviceIdAfterStartup,
|
||||
isBunRuntime,
|
||||
resolveMatrixAuth,
|
||||
resolveMatrixAuthContext,
|
||||
@@ -351,6 +352,13 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
|
||||
// Shared client is already started via resolveSharedMatrixClient.
|
||||
logger.info(`matrix: logged in as ${auth.userId}`);
|
||||
void backfillMatrixAuthDeviceIdAfterStartup({
|
||||
auth,
|
||||
env: process.env,
|
||||
abortSignal: opts.abortSignal,
|
||||
}).catch((err) => {
|
||||
logVerboseMessage(`matrix: failed to backfill deviceId after startup (${String(err)})`);
|
||||
});
|
||||
|
||||
execApprovalsHandler = new MatrixExecApprovalHandler({
|
||||
client,
|
||||
|
||||
@@ -6,7 +6,11 @@ import { getSessionBindingService, __testing } from "openclaw/plugin-sdk/convers
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { PluginRuntime } from "../../runtime-api.js";
|
||||
import { setMatrixRuntime } from "../runtime.js";
|
||||
import { resolveMatrixStateFilePath, resolveMatrixStoragePaths } from "./client/storage.js";
|
||||
import {
|
||||
resolveMatrixStateFilePath,
|
||||
resolveMatrixStoragePaths,
|
||||
writeStorageMeta,
|
||||
} from "./client/storage.js";
|
||||
import {
|
||||
createMatrixThreadBindingManager,
|
||||
resetMatrixThreadBindingsForTests,
|
||||
@@ -370,7 +374,7 @@ describe("matrix thread bindings", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("reloads persisted bindings after the Matrix access token changes", async () => {
|
||||
it("does not reload persisted bindings after the Matrix access token changes while deviceId is unknown", async () => {
|
||||
const initialAuth = {
|
||||
...auth,
|
||||
accessToken: "token-old",
|
||||
@@ -400,6 +404,108 @@ describe("matrix thread bindings", () => {
|
||||
},
|
||||
placement: "current",
|
||||
});
|
||||
const initialStoragePaths = resolveMatrixStoragePaths({
|
||||
...initialAuth,
|
||||
env: process.env,
|
||||
});
|
||||
writeStorageMeta({
|
||||
storagePaths: initialStoragePaths,
|
||||
homeserver: initialAuth.homeserver,
|
||||
userId: initialAuth.userId,
|
||||
accountId: initialAuth.accountId,
|
||||
deviceId: null,
|
||||
});
|
||||
|
||||
initialManager.stop();
|
||||
resetMatrixThreadBindingsForTests();
|
||||
__testing.resetSessionBindingAdaptersForTests();
|
||||
|
||||
await createMatrixThreadBindingManager({
|
||||
accountId: "ops",
|
||||
auth: rotatedAuth,
|
||||
client: {} as never,
|
||||
idleTimeoutMs: 24 * 60 * 60 * 1000,
|
||||
maxAgeMs: 0,
|
||||
enableSweeper: false,
|
||||
});
|
||||
|
||||
expect(
|
||||
getSessionBindingService().resolveByConversation({
|
||||
channel: "matrix",
|
||||
accountId: "ops",
|
||||
conversationId: "$thread",
|
||||
parentConversationId: "!room:example",
|
||||
}),
|
||||
).toBeNull();
|
||||
|
||||
const initialBindingsPath = path.join(initialStoragePaths.rootDir, "thread-bindings.json");
|
||||
const rotatedBindingsPath = path.join(
|
||||
resolveMatrixStoragePaths({
|
||||
...rotatedAuth,
|
||||
env: process.env,
|
||||
}).rootDir,
|
||||
"thread-bindings.json",
|
||||
);
|
||||
expect(rotatedBindingsPath).not.toBe(initialBindingsPath);
|
||||
});
|
||||
|
||||
it("reloads persisted bindings after the Matrix access token changes when deviceId is known", async () => {
|
||||
const initialAuth = {
|
||||
...auth,
|
||||
accessToken: "token-old",
|
||||
deviceId: "DEVICE123",
|
||||
};
|
||||
const rotatedAuth = {
|
||||
...auth,
|
||||
accessToken: "token-new",
|
||||
deviceId: "DEVICE123",
|
||||
};
|
||||
|
||||
const initialManager = await createMatrixThreadBindingManager({
|
||||
accountId: "ops",
|
||||
auth: initialAuth,
|
||||
client: {} as never,
|
||||
idleTimeoutMs: 24 * 60 * 60 * 1000,
|
||||
maxAgeMs: 0,
|
||||
enableSweeper: false,
|
||||
});
|
||||
|
||||
await getSessionBindingService().bind({
|
||||
targetSessionKey: "agent:ops:subagent:child",
|
||||
targetKind: "subagent",
|
||||
conversation: {
|
||||
channel: "matrix",
|
||||
accountId: "ops",
|
||||
conversationId: "$thread",
|
||||
parentConversationId: "!room:example",
|
||||
},
|
||||
placement: "current",
|
||||
});
|
||||
const initialStoragePaths = resolveMatrixStoragePaths({
|
||||
...initialAuth,
|
||||
env: process.env,
|
||||
});
|
||||
writeStorageMeta({
|
||||
storagePaths: initialStoragePaths,
|
||||
homeserver: initialAuth.homeserver,
|
||||
userId: initialAuth.userId,
|
||||
accountId: initialAuth.accountId,
|
||||
deviceId: initialAuth.deviceId,
|
||||
});
|
||||
const initialBindingsPath = path.join(initialStoragePaths.rootDir, "thread-bindings.json");
|
||||
await vi.waitFor(async () => {
|
||||
const persistedRaw = await fs.readFile(initialBindingsPath, "utf-8");
|
||||
expect(JSON.parse(persistedRaw)).toMatchObject({
|
||||
version: 1,
|
||||
bindings: [
|
||||
expect.objectContaining({
|
||||
conversationId: "$thread",
|
||||
parentConversationId: "!room:example",
|
||||
targetSessionKey: "agent:ops:subagent:child",
|
||||
}),
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
initialManager.stop();
|
||||
resetMatrixThreadBindingsForTests();
|
||||
@@ -425,13 +531,6 @@ describe("matrix thread bindings", () => {
|
||||
targetSessionKey: "agent:ops:subagent:child",
|
||||
});
|
||||
|
||||
const initialBindingsPath = path.join(
|
||||
resolveMatrixStoragePaths({
|
||||
...initialAuth,
|
||||
env: process.env,
|
||||
}).rootDir,
|
||||
"thread-bindings.json",
|
||||
);
|
||||
const rotatedBindingsPath = path.join(
|
||||
resolveMatrixStoragePaths({
|
||||
...rotatedAuth,
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import path from "node:path";
|
||||
import { readJsonFileWithFallback, writeJsonFileAtomically } from "openclaw/plugin-sdk/json-store";
|
||||
import { resolveAgentIdFromSessionKey } from "openclaw/plugin-sdk/routing";
|
||||
import {
|
||||
@@ -6,7 +7,7 @@ import {
|
||||
type SessionBindingAdapter,
|
||||
unregisterSessionBindingAdapter,
|
||||
} from "openclaw/plugin-sdk/thread-bindings-runtime";
|
||||
import { resolveMatrixStateFilePath } from "./client/storage.js";
|
||||
import { claimCurrentTokenStorageState, resolveMatrixStateFilePath } from "./client/storage.js";
|
||||
import type { MatrixAuth } from "./client/types.js";
|
||||
import type { MatrixClient } from "./sdk.js";
|
||||
import { sendMessageMatrix } from "./send.js";
|
||||
@@ -131,6 +132,9 @@ async function persistBindingsSnapshot(
|
||||
bindings: MatrixThreadBindingRecord[],
|
||||
): Promise<void> {
|
||||
await writeJsonFileAtomically(filePath, toStoredBindingsState(bindings));
|
||||
claimCurrentTokenStorageState({
|
||||
rootDir: path.dirname(filePath),
|
||||
});
|
||||
}
|
||||
|
||||
function buildMatrixBindingIntroText(params: {
|
||||
|
||||
Reference in New Issue
Block a user