refactor: move plugin state stores to sqlite (#88609)

This commit is contained in:
Peter Steinberger
2026-05-31 13:37:11 +01:00
committed by GitHub
parent fd88f34a8f
commit e5c61383e5
10 changed files with 1356 additions and 209 deletions

View File

@@ -1,6 +1,7 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { resetPluginStateStoreForTests } from "openclaw/plugin-sdk/plugin-state-test-runtime";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { prepareFileConsentActivityFs } from "./file-consent-helpers.js";
import {
@@ -50,6 +51,7 @@ async function cleanupTempDirs(): Promise<void> {
describe("msteams pending uploads (fs-backed)", () => {
beforeEach(() => {
resetPluginStateStoreForTests();
setMSTeamsRuntime(msteamsRuntimeStub);
clearPendingUploads();
});
@@ -105,18 +107,12 @@ describe("msteams pending uploads (fs-backed)", () => {
{ env },
);
// Confirm the backing file actually exists on disk with expected shape
// Confirm SQLite-backed plugin state was created instead of a new JSON store.
const storePath = path.join(stateDir, "msteams-pending-uploads.json");
const raw = await fs.promises.readFile(storePath, "utf-8");
const parsed = JSON.parse(raw) as {
version: number;
uploads: Record<string, { bufferBase64: string; filename: string }>;
};
expect(parsed.version).toBe(1);
expect(parsed.uploads["upload-x"]?.filename).toBe("secret.bin");
expect(Buffer.from(parsed.uploads["upload-x"].bufferBase64, "base64").toString("utf8")).toBe(
"top secret",
);
await expect(fs.promises.access(storePath)).rejects.toThrow();
await expect(
fs.promises.access(path.join(stateDir, "state", "openclaw.sqlite")),
).resolves.toBeUndefined();
// Second "process": reader using the same state dir
const reader = await getPendingUploadFs("upload-x", { env });
@@ -124,6 +120,26 @@ describe("msteams pending uploads (fs-backed)", () => {
expect(reader?.filename).toBe("secret.bin");
});
it("stores multi-megabyte uploads by chunking payload bytes", async () => {
const stateDir = await makeTempStateDir();
const env = makeEnv(stateDir);
const payload = Buffer.alloc(6 * 1024 * 1024, 7);
await storePendingUploadFs(
{
id: "upload-large",
buffer: payload,
filename: "large.bin",
conversationId: "19:conv@thread.v2",
},
{ env },
);
const reader = await getPendingUploadFs("upload-large", { env });
expect(reader?.buffer.equals(payload)).toBe(true);
expect(reader?.filename).toBe("large.bin");
});
it("removes persisted entries", async () => {
const stateDir = await makeTempStateDir();
const env = makeEnv(stateDir);
@@ -204,14 +220,85 @@ describe("msteams pending uploads (fs-backed)", () => {
// Should not throw and should treat as empty
expect(await getPendingUploadFs("anything", { env })).toBeUndefined();
await expect(fs.promises.access(storePath)).rejects.toThrow();
await fs.promises.writeFile(storePath, JSON.stringify({ version: 2, uploads: {} }), "utf-8");
expect(await getPendingUploadFs("anything", { env })).toBeUndefined();
const secondStateDir = await makeTempStateDir();
const secondEnv = makeEnv(secondStateDir);
const secondStorePath = path.join(secondStateDir, "msteams-pending-uploads.json");
await fs.promises.writeFile(
secondStorePath,
JSON.stringify({ version: 2, uploads: {} }),
"utf-8",
);
expect(await getPendingUploadFs("anything", { env: secondEnv })).toBeUndefined();
await expect(fs.promises.access(secondStorePath)).rejects.toThrow();
});
it("imports a legacy JSON file that appears after an empty migration marker", async () => {
const stateDir = await makeTempStateDir();
const env = makeEnv(stateDir);
const storePath = path.join(stateDir, "msteams-pending-uploads.json");
expect(await getPendingUploadFs("upload-late", { env })).toBeUndefined();
await fs.promises.writeFile(
storePath,
`${JSON.stringify({
version: 1,
uploads: {
"upload-late": {
id: "upload-late",
bufferBase64: Buffer.from("late payload").toString("base64"),
filename: "late.txt",
conversationId: "19:conv@thread.v2",
createdAt: Date.now(),
},
},
})}\n`,
"utf-8",
);
const loaded = await getPendingUploadFs("upload-late", { env });
expect(loaded?.filename).toBe("late.txt");
expect(loaded?.buffer.toString("utf8")).toBe("late payload");
await expect(fs.promises.access(storePath)).rejects.toThrow();
});
it("skips malformed legacy upload rows while importing valid rows", async () => {
const stateDir = await makeTempStateDir();
const env = makeEnv(stateDir);
const storePath = path.join(stateDir, "msteams-pending-uploads.json");
await fs.promises.writeFile(
storePath,
`${JSON.stringify({
version: 1,
uploads: {
broken: {
id: "broken",
filename: "broken.txt",
conversationId: "19:conv@thread.v2",
createdAt: Date.now(),
},
valid: {
id: "valid",
bufferBase64: Buffer.from("valid payload").toString("base64"),
filename: "valid.txt",
conversationId: "19:conv@thread.v2",
createdAt: Date.now(),
},
},
})}\n`,
"utf-8",
);
expect(await getPendingUploadFs("broken", { env })).toBeUndefined();
const loaded = await getPendingUploadFs("valid", { env });
expect(loaded?.buffer.toString("utf8")).toBe("valid payload");
});
});
describe("prepareFileConsentActivityFs end-to-end", () => {
beforeEach(() => {
resetPluginStateStoreForTests();
setMSTeamsRuntime(msteamsRuntimeStub);
clearPendingUploads();
});

View File

@@ -1,29 +1,30 @@
/**
* Filesystem-backed pending upload store for the FileConsentCard flow.
*
* The CLI `message send --media` path runs in a different process from the
* gateway's bot monitor that receives the `fileConsent/invoke` callback.
* An in-memory `pending-uploads.ts` store cannot bridge those processes, so
* when the user clicks "Allow" the monitor handler's lookup misses and the
* user sees "card action not supported".
*
* This FS store persists pending uploads to a JSON file (with the file buffer
* base64-encoded) so any process that shares the OpenClaw state dir can read
* them back. The in-memory store in `pending-uploads.ts` is still the fast
* path for same-process flows (for example the messenger reply path); this FS
* store is a cross-process fallback.
*/
import { createHash } from "node:crypto";
import fs from "node:fs/promises";
import type { PluginStateKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime";
import { getMSTeamsRuntime } from "./runtime.js";
import {
resolveMSTeamsSqliteStateEnv,
toPluginJsonValue,
withMSTeamsSqliteMutationLock,
} from "./sqlite-state.js";
import { resolveMSTeamsStorePath } from "./storage.js";
import { readJsonFile, withFileLock, writeJsonFile } from "./store-fs.js";
import { readJsonFile } from "./store-fs.js";
/** TTL for persisted pending uploads (matches in-memory store). */
const PENDING_UPLOAD_TTL_MS = 5 * 60 * 1000;
/** Cap to avoid unbounded growth if a process crashes mid-flow. */
const MAX_PENDING_UPLOADS = 100;
const MAX_CHUNKS_PER_UPLOAD = 3072;
const MAX_PENDING_UPLOAD_CHUNK_ROWS = 45_000;
const RAW_CHUNK_BYTES = 36 * 1024;
const PENDING_UPLOAD_META_MAX_ENTRIES = MAX_PENDING_UPLOADS + 100;
const STORE_FILENAME = "msteams-pending-uploads.json";
const PENDING_UPLOAD_META_NAMESPACE = "pending-uploads";
const PENDING_UPLOAD_CHUNKS_NAMESPACE = "pending-upload-chunks";
const PENDING_UPLOAD_MIGRATIONS_NAMESPACE = "pending-upload-migrations";
const PENDING_UPLOAD_LOCK_FILENAME = "msteams-pending-uploads.sqlite.lock";
type PendingUploadFsRecord = {
id: string;
@@ -53,6 +54,21 @@ type PendingUploadStoreData = {
const empty: PendingUploadStoreData = { version: 1, uploads: {} };
type PendingUploadMetaRecord = Omit<PendingUploadFsRecord, "bufferBase64"> & {
chunkCount: number;
byteLength: number;
};
type PendingUploadChunkRecord = {
id: string;
index: number;
dataBase64: string;
};
type PendingUploadMigrationMarker = {
importedAt: string;
};
type PendingUploadsFsOptions = {
env?: NodeJS.ProcessEnv;
homedir?: () => string;
@@ -61,7 +77,7 @@ type PendingUploadsFsOptions = {
ttlMs?: number;
};
function resolveFilePath(options: PendingUploadsFsOptions | undefined): string {
function resolveLegacyFilePath(options: PendingUploadsFsOptions | undefined): string {
return resolveMSTeamsStorePath({
filename: STORE_FILENAME,
env: options?.env,
@@ -71,6 +87,60 @@ function resolveFilePath(options: PendingUploadsFsOptions | undefined): string {
});
}
function createMetaStore(
options: PendingUploadsFsOptions | undefined,
): PluginStateKeyedStore<PendingUploadMetaRecord> {
return getMSTeamsRuntime().state.openKeyedStore<PendingUploadMetaRecord>({
namespace: PENDING_UPLOAD_META_NAMESPACE,
maxEntries: PENDING_UPLOAD_META_MAX_ENTRIES,
env: resolveMSTeamsSqliteStateEnv(options),
});
}
function createChunkStore(
options: PendingUploadsFsOptions | undefined,
): PluginStateKeyedStore<PendingUploadChunkRecord> {
return getMSTeamsRuntime().state.openKeyedStore<PendingUploadChunkRecord>({
namespace: PENDING_UPLOAD_CHUNKS_NAMESPACE,
maxEntries: MAX_PENDING_UPLOAD_CHUNK_ROWS,
env: resolveMSTeamsSqliteStateEnv(options),
});
}
function createMigrationStore(
options: PendingUploadsFsOptions | undefined,
): PluginStateKeyedStore<PendingUploadMigrationMarker> {
return getMSTeamsRuntime().state.openKeyedStore<PendingUploadMigrationMarker>({
namespace: PENDING_UPLOAD_MIGRATIONS_NAMESPACE,
maxEntries: 100,
env: resolveMSTeamsSqliteStateEnv(options),
});
}
function buildUploadKey(id: string): string {
return `upload:${createHash("sha256").update(id).digest("hex")}`;
}
function buildMetaKey(id: string): string {
return `${buildUploadKey(id)}:meta`;
}
function buildChunkKey(id: string, index: number): string {
return `${buildUploadKey(id)}:chunk:${String(index).padStart(4, "0")}`;
}
function buildMigrationKey(filePath: string): string {
return `legacy-json:${createHash("sha256").update(filePath).digest("hex")}`;
}
function buildMigrationContentKey(filePath: string, value: unknown): string {
return `legacy-json-content:${createHash("sha256")
.update(filePath)
.update("\0")
.update(JSON.stringify(value) ?? "undefined")
.digest("hex")}`;
}
function pruneExpired(
uploads: Record<string, PendingUploadFsRecord>,
nowMs: number,
@@ -98,10 +168,13 @@ function pruneToLimit(
return Object.fromEntries(keep);
}
function recordToUpload(record: PendingUploadFsRecord): PendingUploadFs {
function recordToUpload(
record: PendingUploadFsRecord | PendingUploadMetaRecord,
buffer: Buffer,
): PendingUploadFs {
return {
id: record.id,
buffer: Buffer.from(record.bufferBase64, "base64"),
buffer,
filename: record.filename,
contentType: record.contentType,
conversationId: record.conversationId,
@@ -123,13 +196,220 @@ function isValidStore(value: unknown): value is PendingUploadStoreData {
);
}
async function readStore(filePath: string, ttlMs: number): Promise<PendingUploadStoreData> {
const { value } = await readJsonFile<unknown>(filePath, empty);
if (!isValidStore(value)) {
return { version: 1, uploads: {} };
function normalizeLegacyUploadRecord(value: unknown): PendingUploadFsRecord | null {
if (!value || typeof value !== "object") {
return null;
}
const record = value as Partial<PendingUploadFsRecord>;
if (
typeof record.id !== "string" ||
!record.id ||
typeof record.bufferBase64 !== "string" ||
typeof record.filename !== "string" ||
!record.filename ||
typeof record.conversationId !== "string" ||
!record.conversationId ||
typeof record.createdAt !== "number" ||
!Number.isFinite(record.createdAt)
) {
return null;
}
return {
id: record.id,
bufferBase64: record.bufferBase64,
filename: record.filename,
contentType: typeof record.contentType === "string" ? record.contentType : undefined,
conversationId: record.conversationId,
consentCardActivityId:
typeof record.consentCardActivityId === "string" ? record.consentCardActivityId : undefined,
createdAt: record.createdAt,
};
}
function normalizeLegacyUploads(value: unknown): Record<string, PendingUploadFsRecord> {
if (!isValidStore(value)) {
return {};
}
const uploads: Record<string, PendingUploadFsRecord> = {};
for (const record of Object.values(value.uploads)) {
const normalized = normalizeLegacyUploadRecord(record);
if (normalized) {
uploads[normalized.id] = normalized;
}
}
return uploads;
}
async function deleteUploadRows(
id: string,
metaStore: PluginStateKeyedStore<PendingUploadMetaRecord>,
chunkStore: PluginStateKeyedStore<PendingUploadChunkRecord>,
): Promise<void> {
const existing = await metaStore.lookup(buildMetaKey(id));
await metaStore.delete(buildMetaKey(id));
if (!existing) {
return;
}
const chunkCount = existing.chunkCount;
for (let index = 0; index < chunkCount; index += 1) {
await chunkStore.delete(buildChunkKey(id, index));
}
}
async function registerUploadRows(
record: PendingUploadFsRecord,
metaStore: PluginStateKeyedStore<PendingUploadMetaRecord>,
chunkStore: PluginStateKeyedStore<PendingUploadChunkRecord>,
ttlMs: number,
overwrite: boolean,
): Promise<void> {
const buffer = Buffer.from(record.bufferBase64, "base64");
const chunkCount = Math.max(1, Math.ceil(buffer.byteLength / RAW_CHUNK_BYTES));
if (chunkCount > MAX_CHUNKS_PER_UPLOAD) {
throw new Error(
`Microsoft Teams pending upload ${record.id} exceeds SQLite chunk limit (${chunkCount}/${MAX_CHUNKS_PER_UPLOAD})`,
);
}
if (overwrite) {
await deleteUploadRows(record.id, metaStore, chunkStore);
} else if (await metaStore.lookup(buildMetaKey(record.id))) {
return;
}
await pruneUploadStore(metaStore, chunkStore, ttlMs, chunkCount);
for (let index = 0; index < chunkCount; index += 1) {
const chunk = buffer.subarray(index * RAW_CHUNK_BYTES, (index + 1) * RAW_CHUNK_BYTES);
await chunkStore.register(
buildChunkKey(record.id, index),
toPluginJsonValue({
id: record.id,
index,
dataBase64: chunk.toString("base64"),
}),
);
}
await metaStore.register(
buildMetaKey(record.id),
toPluginJsonValue({
id: record.id,
filename: record.filename,
contentType: record.contentType,
conversationId: record.conversationId,
consentCardActivityId: record.consentCardActivityId,
createdAt: record.createdAt,
chunkCount,
byteLength: buffer.byteLength,
}),
);
}
async function importLegacyStore(
options: PendingUploadsFsOptions | undefined,
metaStore: PluginStateKeyedStore<PendingUploadMetaRecord>,
chunkStore: PluginStateKeyedStore<PendingUploadChunkRecord>,
ttlMs: number,
): Promise<void> {
const legacyFilePath = resolveLegacyFilePath(options);
const migrationStore = createMigrationStore(options);
const migrationKey = buildMigrationKey(legacyFilePath);
const imported = (await migrationStore.lookup(migrationKey)) !== undefined;
const { value, exists } = await readJsonFile<unknown>(legacyFilePath, empty);
if (!exists) {
if (!imported) {
await migrationStore.register(migrationKey, { importedAt: new Date().toISOString() });
}
return;
}
const contentKey = buildMigrationContentKey(legacyFilePath, value);
if (await migrationStore.lookup(contentKey)) {
return;
}
const legacy = pruneToLimit(pruneExpired(normalizeLegacyUploads(value), Date.now(), ttlMs));
for (const record of Object.values(legacy)) {
await registerUploadRows(record, metaStore, chunkStore, ttlMs, false);
}
await migrationStore.register(contentKey, { importedAt: new Date().toISOString() });
if (!imported) {
await migrationStore.register(migrationKey, { importedAt: new Date().toISOString() });
}
await fs.rm(legacyFilePath, { force: true }).catch(() => {});
}
async function withPendingUploadLock<T>(
options: PendingUploadsFsOptions | undefined,
run: () => Promise<T>,
): Promise<T> {
return await withMSTeamsSqliteMutationLock(options, PENDING_UPLOAD_LOCK_FILENAME, run);
}
async function ensureLegacyImported(
options: PendingUploadsFsOptions | undefined,
metaStore: PluginStateKeyedStore<PendingUploadMetaRecord>,
chunkStore: PluginStateKeyedStore<PendingUploadChunkRecord>,
ttlMs: number,
): Promise<void> {
await withPendingUploadLock(options, () =>
importLegacyStore(options, metaStore, chunkStore, ttlMs),
);
}
async function readUploadRows(
id: string,
metaStore: PluginStateKeyedStore<PendingUploadMetaRecord>,
chunkStore: PluginStateKeyedStore<PendingUploadChunkRecord>,
): Promise<PendingUploadFs | undefined> {
const meta = await metaStore.lookup(buildMetaKey(id));
if (!meta) {
return undefined;
}
const chunks: Buffer[] = [];
for (let index = 0; index < meta.chunkCount; index += 1) {
const chunk = await chunkStore.lookup(buildChunkKey(id, index));
if (!chunk || chunk.id !== id || chunk.index !== index) {
return undefined;
}
chunks.push(Buffer.from(chunk.dataBase64, "base64"));
}
return recordToUpload(meta, Buffer.concat(chunks, meta.byteLength));
}
async function pruneUploadStore(
metaStore: PluginStateKeyedStore<PendingUploadMetaRecord>,
chunkStore: PluginStateKeyedStore<PendingUploadChunkRecord>,
ttlMs: number,
extraChunkRows = 0,
): Promise<void> {
const rows = await metaStore.entries();
const liveRows = [];
const now = Date.now();
let liveChunkRows = 0;
for (const row of rows) {
if (now - row.value.createdAt > ttlMs) {
await deleteUploadRows(row.value.id, metaStore, chunkStore);
continue;
}
liveChunkRows += row.value.chunkCount;
liveRows.push(row);
}
if (
liveRows.length <= MAX_PENDING_UPLOADS &&
liveChunkRows + extraChunkRows <= MAX_PENDING_UPLOAD_CHUNK_ROWS
) {
return;
}
const sorted = liveRows.toSorted(
(a, b) => a.value.createdAt - b.value.createdAt || a.value.id.localeCompare(b.value.id),
);
for (const row of sorted) {
if (
liveRows.length <= MAX_PENDING_UPLOADS &&
liveChunkRows + extraChunkRows <= MAX_PENDING_UPLOAD_CHUNK_ROWS
) {
break;
}
await deleteUploadRows(row.value.id, metaStore, chunkStore);
liveChunkRows -= row.value.chunkCount;
liveRows.pop();
}
const uploads = pruneToLimit(pruneExpired(value.uploads, Date.now(), ttlMs));
return { version: 1, uploads };
}
/**
@@ -149,20 +429,26 @@ export async function storePendingUploadFs(
options?: PendingUploadsFsOptions,
): Promise<void> {
const ttlMs = options?.ttlMs ?? PENDING_UPLOAD_TTL_MS;
const filePath = resolveFilePath(options);
await withFileLock(filePath, empty, async () => {
const store = await readStore(filePath, ttlMs);
store.uploads[upload.id] = {
id: upload.id,
bufferBase64: upload.buffer.toString("base64"),
filename: upload.filename,
contentType: upload.contentType,
conversationId: upload.conversationId,
consentCardActivityId: upload.consentCardActivityId,
createdAt: Date.now(),
};
store.uploads = pruneToLimit(pruneExpired(store.uploads, Date.now(), ttlMs));
await writeJsonFile(filePath, store);
const metaStore = createMetaStore(options);
const chunkStore = createChunkStore(options);
await withPendingUploadLock(options, async () => {
await importLegacyStore(options, metaStore, chunkStore, ttlMs);
await registerUploadRows(
{
id: upload.id,
bufferBase64: upload.buffer.toString("base64"),
filename: upload.filename,
contentType: upload.contentType,
conversationId: upload.conversationId,
consentCardActivityId: upload.consentCardActivityId,
createdAt: Date.now(),
},
metaStore,
chunkStore,
ttlMs,
true,
);
await pruneUploadStore(metaStore, chunkStore, ttlMs);
});
}
@@ -177,16 +463,18 @@ export async function getPendingUploadFs(
return undefined;
}
const ttlMs = options?.ttlMs ?? PENDING_UPLOAD_TTL_MS;
const filePath = resolveFilePath(options);
const store = await readStore(filePath, ttlMs);
const record = store.uploads[id];
if (!record) {
const metaStore = createMetaStore(options);
const chunkStore = createChunkStore(options);
await ensureLegacyImported(options, metaStore, chunkStore, ttlMs);
const upload = await readUploadRows(id, metaStore, chunkStore);
if (!upload) {
return undefined;
}
if (Date.now() - record.createdAt > ttlMs) {
if (Date.now() - upload.createdAt > ttlMs) {
await removePendingUploadFs(id, options);
return undefined;
}
return recordToUpload(record);
return upload;
}
/**
@@ -201,14 +489,11 @@ export async function removePendingUploadFs(
return;
}
const ttlMs = options?.ttlMs ?? PENDING_UPLOAD_TTL_MS;
const filePath = resolveFilePath(options);
await withFileLock(filePath, empty, async () => {
const store = await readStore(filePath, ttlMs);
if (!(id in store.uploads)) {
return;
}
delete store.uploads[id];
await writeJsonFile(filePath, store);
const metaStore = createMetaStore(options);
const chunkStore = createChunkStore(options);
await withPendingUploadLock(options, async () => {
await importLegacyStore(options, metaStore, chunkStore, ttlMs);
await deleteUploadRows(id, metaStore, chunkStore);
});
}
@@ -222,14 +507,17 @@ export async function setPendingUploadActivityIdFs(
options?: PendingUploadsFsOptions,
): Promise<void> {
const ttlMs = options?.ttlMs ?? PENDING_UPLOAD_TTL_MS;
const filePath = resolveFilePath(options);
await withFileLock(filePath, empty, async () => {
const store = await readStore(filePath, ttlMs);
const record = store.uploads[id];
if (!record) {
const metaStore = createMetaStore(options);
const chunkStore = createChunkStore(options);
await withPendingUploadLock(options, async () => {
await importLegacyStore(options, metaStore, chunkStore, ttlMs);
const record = await metaStore.lookup(buildMetaKey(id));
if (!record || Date.now() - record.createdAt > ttlMs) {
return;
}
record.consentCardActivityId = activityId;
await writeJsonFile(filePath, store);
await metaStore.register(
buildMetaKey(id),
toPluginJsonValue({ ...record, consentCardActivityId: activityId }),
);
});
}

View File

@@ -1,10 +1,22 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import { resetPluginStateStoreForTests } from "openclaw/plugin-sdk/plugin-state-test-runtime";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { setMSTeamsRuntime } from "./runtime.js";
import { createMSTeamsSsoTokenStoreFs } from "./sso-token-store.js";
import { msteamsRuntimeStub } from "./test-support/runtime.js";
describe("msteams sso token store (plugin state)", () => {
beforeEach(() => {
resetPluginStateStoreForTests();
setMSTeamsRuntime(msteamsRuntimeStub);
});
afterEach(() => {
vi.restoreAllMocks();
});
describe("msteams sso token store (fs)", () => {
it("keeps distinct tokens when connectionName and userId contain the legacy delimiter", async () => {
const stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-sso-"));
const storePath = path.join(stateDir, "msteams-sso-tokens.json");
@@ -29,10 +41,10 @@ describe("msteams sso token store (fs)", () => {
expect(await store.get(first)).toEqual(first);
expect(await store.get(second)).toEqual(second);
const raw = JSON.parse(await fs.readFile(storePath, "utf8")) as {
tokens: Record<string, unknown>;
};
expect(Object.keys(raw.tokens)).toHaveLength(2);
await expect(fs.access(storePath)).rejects.toThrow();
await expect(
fs.access(path.join(stateDir, "state", "openclaw.sqlite")),
).resolves.toBeUndefined();
});
it("loads legacy flat-key files by rebuilding keys from stored token payloads", async () => {
@@ -70,5 +82,90 @@ describe("msteams sso token store (fs)", () => {
token: "token-1",
updatedAt: "2026-04-10T00:00:00.000Z",
});
await expect(fs.access(storePath)).rejects.toThrow();
});
it("keeps plugin-state keys bounded for long Teams identifiers", async () => {
const stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-sso-long-"));
const store = createMSTeamsSsoTokenStoreFs({ stateDir });
const token = {
connectionName: `conn-${"c".repeat(1000)}`,
userId: `user-${"u".repeat(2000)}`,
token: "token-long",
updatedAt: "2026-04-10T00:00:00.000Z",
} as const;
await store.save(token);
expect(await store.get(token)).toEqual(token);
expect(await store.remove(token)).toBe(true);
expect(await store.get(token)).toBeNull();
});
it("imports a legacy token file that appears after an empty migration marker", async () => {
const stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-sso-late-"));
const storePath = path.join(stateDir, "msteams-sso-tokens.json");
const store = createMSTeamsSsoTokenStoreFs({ storePath });
expect(await store.get({ connectionName: "conn", userId: "user-late" })).toBeNull();
await fs.writeFile(
storePath,
`${JSON.stringify({
version: 1,
tokens: {
late: {
connectionName: "conn",
userId: "user-late",
token: "token-late",
updatedAt: "2026-04-10T00:00:00.000Z",
},
},
})}\n`,
"utf8",
);
expect(await store.get({ connectionName: "conn", userId: "user-late" })).toEqual({
connectionName: "conn",
userId: "user-late",
token: "token-late",
updatedAt: "2026-04-10T00:00:00.000Z",
});
await expect(fs.access(storePath)).rejects.toThrow();
});
it("does not resurrect removed tokens when a migrated legacy file cannot be deleted", async () => {
const stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-sso-stale-"));
const storePath = path.join(stateDir, "msteams-sso-tokens.json");
await fs.writeFile(
storePath,
`${JSON.stringify({
version: 1,
tokens: {
stale: {
connectionName: "conn",
userId: "user-stale",
token: "token-stale",
updatedAt: "2026-04-10T00:00:00.000Z",
},
},
})}\n`,
"utf8",
);
const originalRm = fs.rm;
vi.spyOn(fs, "rm").mockImplementation(async (target, options) => {
if (target === storePath) {
throw new Error("cannot remove");
}
return await originalRm(target, options);
});
const store = createMSTeamsSsoTokenStoreFs({ storePath });
expect(await store.get({ connectionName: "conn", userId: "user-stale" })).toEqual({
connectionName: "conn",
userId: "user-stale",
token: "token-stale",
updatedAt: "2026-04-10T00:00:00.000Z",
});
expect(await store.remove({ connectionName: "conn", userId: "user-stale" })).toBe(true);
expect(await store.get({ connectionName: "conn", userId: "user-stale" })).toBeNull();
});
});

View File

@@ -1,5 +1,5 @@
/**
* File-backed store for Bot Framework OAuth SSO tokens.
* SQLite-backed store for Bot Framework OAuth SSO tokens.
*
* Tokens are keyed by (connectionName, userId). `userId` should be the
* stable AAD object ID (`activity.from.aadObjectId`) when available,
@@ -11,8 +11,17 @@
* valid token without reaching back into Bot Framework every turn.
*/
import { createHash } from "node:crypto";
import fs from "node:fs/promises";
import type { PluginStateKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime";
import { getMSTeamsRuntime } from "./runtime.js";
import {
resolveMSTeamsSqliteStateEnv,
toPluginJsonValue,
withMSTeamsSqliteMutationLock,
} from "./sqlite-state.js";
import { resolveMSTeamsStorePath } from "./storage.js";
import { readJsonFile, withFileLock, writeJsonFile } from "./store-fs.js";
import { readJsonFile } from "./store-fs.js";
type MSTeamsSsoStoredToken = {
/** Connection name from the Bot Framework OAuth connection setting. */
@@ -40,13 +49,54 @@ type SsoStoreData = {
};
const STORE_FILENAME = "msteams-sso-tokens.json";
const SSO_TOKENS_NAMESPACE = "sso-tokens";
const SSO_TOKEN_MIGRATIONS_NAMESPACE = "sso-token-migrations";
const SSO_TOKEN_LOCK_FILENAME = "msteams-sso-tokens.sqlite.lock";
const MAX_SSO_TOKENS = 5000;
const STORE_KEY_VERSION_PREFIX = "v2:";
function makeKey(connectionName: string, userId: string): string {
return `${STORE_KEY_VERSION_PREFIX}${Buffer.from(
JSON.stringify([connectionName, userId]),
"utf8",
).toString("base64url")}`;
return `${STORE_KEY_VERSION_PREFIX}${createHash("sha256")
.update(JSON.stringify([connectionName, userId]))
.digest("hex")}`;
}
function buildMigrationKey(filePath: string): string {
return `legacy-json:${createHash("sha256").update(filePath).digest("hex")}`;
}
function buildMigrationContentKey(filePath: string, value: unknown): string {
return `legacy-json-content:${createHash("sha256")
.update(filePath)
.update("\0")
.update(JSON.stringify(value) ?? "undefined")
.digest("hex")}`;
}
function createTokenStore(params?: {
env?: NodeJS.ProcessEnv;
homedir?: () => string;
stateDir?: string;
storePath?: string;
}): PluginStateKeyedStore<MSTeamsSsoStoredToken> {
return getMSTeamsRuntime().state.openKeyedStore<MSTeamsSsoStoredToken>({
namespace: SSO_TOKENS_NAMESPACE,
maxEntries: MAX_SSO_TOKENS,
env: resolveMSTeamsSqliteStateEnv(params),
});
}
function createMigrationStore(params?: {
env?: NodeJS.ProcessEnv;
homedir?: () => string;
stateDir?: string;
storePath?: string;
}): PluginStateKeyedStore<{ importedAt: string }> {
return getMSTeamsRuntime().state.openKeyedStore<{ importedAt: string }>({
namespace: SSO_TOKEN_MIGRATIONS_NAMESPACE,
maxEntries: 100,
env: resolveMSTeamsSqliteStateEnv(params),
});
}
function normalizeStoredToken(value: unknown): MSTeamsSsoStoredToken | null {
@@ -89,60 +139,81 @@ export function createMSTeamsSsoTokenStoreFs(params?: {
stateDir?: string;
storePath?: string;
}): MSTeamsSsoTokenStore {
const filePath = resolveMSTeamsStorePath({
const legacyFilePath = resolveMSTeamsStorePath({
filename: STORE_FILENAME,
env: params?.env,
homedir: params?.homedir,
stateDir: params?.stateDir,
storePath: params?.storePath,
});
const empty: SsoStoreData = { version: 1, tokens: {} };
const tokenStore = createTokenStore(params);
const migrationStore = createMigrationStore(params);
const migrationKey = buildMigrationKey(legacyFilePath);
let legacyImportPromise: Promise<void> | null = null;
const readStore = async (): Promise<SsoStoreData> => {
const { value } = await readJsonFile(filePath, empty);
if (!isSsoStoreData(value)) {
return { version: 1, tokens: {} };
const importLegacyStore = async (): Promise<void> => {
const imported = (await migrationStore.lookup(migrationKey)) !== undefined;
const { value, exists } = await readJsonFile<unknown>(legacyFilePath, empty);
const contentKey = exists ? buildMigrationContentKey(legacyFilePath, value) : null;
if (contentKey && (await migrationStore.lookup(contentKey))) {
return;
}
const tokens: Record<string, MSTeamsSsoStoredToken> = {};
for (const stored of Object.values(value.tokens)) {
const normalized = normalizeStoredToken(stored);
if (!normalized) {
continue;
if (exists && isSsoStoreData(value)) {
for (const stored of Object.values(value.tokens)) {
const normalized = normalizeStoredToken(stored);
if (!normalized) {
continue;
}
await tokenStore.registerIfAbsent(
makeKey(normalized.connectionName, normalized.userId),
toPluginJsonValue(normalized),
);
}
tokens[makeKey(normalized.connectionName, normalized.userId)] = normalized;
}
return {
version: 1,
tokens,
};
if (contentKey) {
await migrationStore.register(contentKey, { importedAt: new Date().toISOString() });
}
if (!imported) {
await migrationStore.register(migrationKey, { importedAt: new Date().toISOString() });
}
if (exists) {
await fs.rm(legacyFilePath, { force: true }).catch(() => {});
}
};
const ensureLegacyImported = async (): Promise<void> => {
if (!legacyImportPromise) {
legacyImportPromise = withMSTeamsSqliteMutationLock(params, SSO_TOKEN_LOCK_FILENAME, () =>
importLegacyStore(),
).finally(() => {
legacyImportPromise = null;
});
}
await legacyImportPromise;
};
return {
async get({ connectionName, userId }) {
const store = await readStore();
return store.tokens[makeKey(connectionName, userId)] ?? null;
await ensureLegacyImported();
return (await tokenStore.lookup(makeKey(connectionName, userId))) ?? null;
},
async save(token) {
await withFileLock(filePath, empty, async () => {
const store = await readStore();
const key = makeKey(token.connectionName, token.userId);
store.tokens[key] = { ...token };
await writeJsonFile(filePath, store);
await withMSTeamsSqliteMutationLock(params, SSO_TOKEN_LOCK_FILENAME, async () => {
await importLegacyStore();
await tokenStore.register(
makeKey(token.connectionName, token.userId),
toPluginJsonValue({ ...token }),
);
});
},
async remove({ connectionName, userId }) {
let removed = false;
await withFileLock(filePath, empty, async () => {
const store = await readStore();
const key = makeKey(connectionName, userId);
if (store.tokens[key]) {
delete store.tokens[key];
removed = true;
await writeJsonFile(filePath, store);
}
await withMSTeamsSqliteMutationLock(params, SSO_TOKEN_LOCK_FILENAME, async () => {
await importLegacyStore();
removed = await tokenStore.delete(makeKey(connectionName, userId));
});
return removed;
},

View File

@@ -300,6 +300,7 @@ export default definePluginEntry({
coreConfig: api.config as CoreConfig,
fullConfig: api.config,
agentRuntime: api.runtime.agent,
stateRuntime: api.runtime.state,
ttsRuntime: api.runtime.tts,
logger: api.logger,
});
@@ -811,6 +812,7 @@ export default definePluginEntry({
program,
config,
ensureRuntime,
stateRuntime: api.runtime.state,
logger: api.logger,
}),
{ commands: ["voicecall"] },

View File

@@ -18,6 +18,8 @@ import {
} from "openclaw/plugin-sdk/string-coerce-runtime";
import { sleep } from "../api.js";
import { validateProviderConfig, type VoiceCallConfig } from "./config.js";
import { getCallHistoryFromStore } from "./manager/store.js";
import { setVoiceCallStateRuntime, type VoiceCallStateRuntime } from "./runtime-state.js";
import type { VoiceCallRuntime } from "./runtime.js";
import { resolveUserPath } from "./utils.js";
import { resolveWebhookExposureStatus } from "./webhook-exposure.js";
@@ -426,9 +428,15 @@ export function registerVoiceCallCli(params: {
program: Command;
config: VoiceCallConfig;
ensureRuntime: () => Promise<VoiceCallRuntime>;
stateRuntime?: VoiceCallStateRuntime["state"];
logger: Logger;
}) {
const { program, config, ensureRuntime, logger } = params;
const { program, config, ensureRuntime, stateRuntime } = params;
const ensureHistoryStateRuntime = (): void => {
if (stateRuntime) {
setVoiceCallStateRuntime({ state: stateRuntime });
}
};
const root = program
.command("voicecall")
.description("Voice call utilities")
@@ -745,43 +753,68 @@ export function registerVoiceCallCli(params: {
const since = parseVoiceCallIntOption(options.since, "--since", { min: 0 });
const pollMs = parseVoiceCallIntOption(options.poll, "--poll", { min: 50 });
if (!fs.existsSync(file)) {
logger.error(`No log file at ${file}`);
process.exit(1);
}
const initial = fs.readFileSync(file, "utf8");
const lines = initial.split("\n").filter(Boolean);
for (const line of lines.slice(Math.max(0, lines.length - since))) {
writeStdoutLine(line);
}
let offset = Buffer.byteLength(initial, "utf8");
for (;;) {
try {
const stat = fs.statSync(file);
if (stat.size < offset) {
offset = 0;
const tailSqliteHistory = async (initialLimit: number): Promise<never> => {
ensureHistoryStateRuntime();
const seen = new Set<string>();
const printCall = (call: unknown): void => {
const line = JSON.stringify(call);
if (!seen.has(line)) {
seen.add(line);
writeStdoutLine(line);
}
if (stat.size > offset) {
const fd = fs.openSync(file, "r");
try {
const buf = Buffer.alloc(stat.size - offset);
fs.readSync(fd, buf, 0, buf.length, offset);
offset = stat.size;
const text = buf.toString("utf8");
for (const line of text.split("\n").filter(Boolean)) {
writeStdoutLine(line);
}
} finally {
fs.closeSync(fd);
}
};
if (initialLimit > 0) {
for (const call of await getCallHistoryFromStore(path.dirname(file), initialLimit)) {
printCall(call);
}
} catch {
// ignore and retry
}
await sleep(pollMs);
for (;;) {
try {
for (const call of await getCallHistoryFromStore(path.dirname(file), 1000)) {
printCall(call);
}
} catch {
// ignore and retry
}
await sleep(pollMs);
}
};
if (fs.existsSync(file) && path.basename(file) !== "calls.jsonl") {
const initial = fs.readFileSync(file, "utf8");
const lines = initial.split("\n").filter(Boolean);
for (const line of lines.slice(Math.max(0, lines.length - since))) {
writeStdoutLine(line);
}
let offset = Buffer.byteLength(initial, "utf8");
for (;;) {
try {
const stat = fs.statSync(file);
if (stat.size < offset) {
offset = 0;
}
if (stat.size > offset) {
const fd = fs.openSync(file, "r");
try {
const buf = Buffer.alloc(stat.size - offset);
fs.readSync(fd, buf, 0, buf.length, offset);
offset = stat.size;
const text = buf.toString("utf8");
for (const line of text.split("\n").filter(Boolean)) {
writeStdoutLine(line);
}
} finally {
fs.closeSync(fd);
}
}
} catch {
// ignore and retry
}
await sleep(pollMs);
}
} else {
await tailSqliteHistory(since);
}
});
@@ -794,41 +827,50 @@ export function registerVoiceCallCli(params: {
const file = options.file;
const last = parseVoiceCallIntOption(options.last, "--last", { min: 1 });
if (!fs.existsSync(file)) {
throw new Error("No log file at " + file);
if (fs.existsSync(file) && path.basename(file) !== "calls.jsonl") {
const content = fs.readFileSync(file, "utf8");
const calls = content
.split("\n")
.filter(Boolean)
.slice(-last)
.map((line) => {
try {
const parsed = JSON.parse(line) as { call?: unknown };
return (parsed.call ?? parsed) as { metadata?: Record<string, unknown> };
} catch {
return null;
}
})
.filter((call): call is { metadata?: Record<string, unknown> } => call !== null);
writeVoiceCallLatencySummary(calls);
} else {
ensureHistoryStateRuntime();
writeVoiceCallLatencySummary(await getCallHistoryFromStore(path.dirname(file), last));
}
const content = fs.readFileSync(file, "utf8");
const lines = content.split("\n").filter(Boolean).slice(-last);
const turnLatencyMs: number[] = [];
const listenWaitMs: number[] = [];
for (const line of lines) {
try {
const parsed = JSON.parse(line) as {
metadata?: { lastTurnLatencyMs?: unknown; lastTurnListenWaitMs?: unknown };
};
const latency = parsed.metadata?.lastTurnLatencyMs;
const listenWait = parsed.metadata?.lastTurnListenWaitMs;
if (typeof latency === "number" && Number.isFinite(latency)) {
turnLatencyMs.push(latency);
}
if (typeof listenWait === "number" && Number.isFinite(listenWait)) {
listenWaitMs.push(listenWait);
}
} catch {
// ignore malformed JSON lines
}
}
writeStdoutJson({
recordsScanned: lines.length,
turnLatency: summarizeSeries(turnLatencyMs),
listenWait: summarizeSeries(listenWaitMs),
});
});
function writeVoiceCallLatencySummary(calls: Array<{ metadata?: Record<string, unknown> }>) {
const turnLatencyMs: number[] = [];
const listenWaitMs: number[] = [];
for (const call of calls) {
const latency = call.metadata?.lastTurnLatencyMs;
const listenWait = call.metadata?.lastTurnListenWaitMs;
if (typeof latency === "number" && Number.isFinite(latency)) {
turnLatencyMs.push(latency);
}
if (typeof listenWait === "number" && Number.isFinite(listenWait)) {
listenWaitMs.push(listenWait);
}
}
writeStdoutJson({
recordsScanned: calls.length,
turnLatency: summarizeSeries(turnLatencyMs),
listenWait: summarizeSeries(listenWaitMs),
});
}
root
.command("expose")
.description("Enable/disable Tailscale serve/funnel for the webhook")

View File

@@ -0,0 +1,189 @@
import fs from "node:fs";
import path from "node:path";
import type { OpenKeyedStoreOptions } from "openclaw/plugin-sdk/plugin-state-runtime";
import {
createPluginStateSyncKeyedStoreForTests,
resetPluginStateStoreForTests,
} from "openclaw/plugin-sdk/plugin-state-test-runtime";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
createTestStorePath,
makePersistedCall,
writeCallsToStore,
} from "../manager.test-harness.js";
import { clearVoiceCallStateRuntime, setVoiceCallStateRuntime } from "../runtime-state.js";
import { CallRecordSchema } from "../types.js";
import {
flushPendingCallRecordWritesForTest,
getCallHistoryFromStore,
loadActiveCallsFromStore,
persistCallRecord,
} from "./store.js";
function installStateRuntime(): void {
setVoiceCallStateRuntime({
state: {
resolveStateDir: () => "",
openKeyedStore: (() => {
throw new Error("openKeyedStore is not used by voice-call store tests");
}) as never,
openSyncKeyedStore: (options: OpenKeyedStoreOptions) =>
createPluginStateSyncKeyedStoreForTests("voice-call", options),
},
});
}
describe("voice-call call record store", () => {
beforeEach(() => {
resetPluginStateStoreForTests();
installStateRuntime();
});
afterEach(() => {
vi.useRealTimers();
clearVoiceCallStateRuntime();
resetPluginStateStoreForTests();
});
it("migrates legacy JSONL records into SQLite-backed plugin state", async () => {
const storePath = createTestStorePath();
const call = CallRecordSchema.parse(
makePersistedCall({ callId: "call-legacy", processedEventIds: ["evt-1"] }),
);
writeCallsToStore(storePath, [call]);
const restored = loadActiveCallsFromStore(storePath);
expect(restored.activeCalls.get("call-legacy")?.providerCallId).toBe(call.providerCallId);
expect(restored.processedEventIds.has("evt-1")).toBe(true);
expect(fs.existsSync(path.join(storePath, "calls.jsonl"))).toBe(false);
expect(fs.existsSync(path.join(storePath, "state", "openclaw.sqlite"))).toBe(true);
const history = await getCallHistoryFromStore(storePath);
expect(history).toHaveLength(1);
expect(history[0]?.callId).toBe("call-legacy");
});
it("persists new call snapshots without recreating the JSONL log", async () => {
const storePath = createTestStorePath();
const call = CallRecordSchema.parse(
makePersistedCall({ callId: "call-sqlite", transcript: [] }),
);
persistCallRecord(storePath, call);
await flushPendingCallRecordWritesForTest();
expect(fs.existsSync(path.join(storePath, "calls.jsonl"))).toBe(false);
const restored = loadActiveCallsFromStore(storePath);
expect(restored.activeCalls.get("call-sqlite")?.providerCallId).toBe(call.providerCallId);
});
it("imports fallback JSONL writes created after the migration marker", async () => {
const storePath = createTestStorePath();
const sqliteCall = CallRecordSchema.parse(makePersistedCall({ callId: "call-sqlite" }));
const fallbackCall = CallRecordSchema.parse(makePersistedCall({ callId: "call-fallback" }));
persistCallRecord(storePath, sqliteCall);
writeCallsToStore(storePath, [fallbackCall]);
const restored = loadActiveCallsFromStore(storePath);
expect(restored.activeCalls.has("call-sqlite")).toBe(true);
expect(restored.activeCalls.get("call-fallback")?.providerCallId).toBe(
fallbackCall.providerCallId,
);
expect(fs.existsSync(path.join(storePath, "calls.jsonl"))).toBe(false);
});
it("reads the JSONL fallback when SQLite state cannot open", () => {
const storePath = createTestStorePath();
const call = CallRecordSchema.parse(makePersistedCall({ callId: "call-jsonl" }));
writeCallsToStore(storePath, [call]);
setVoiceCallStateRuntime({
state: {
resolveStateDir: () => "",
openKeyedStore: (() => {
throw new Error("openKeyedStore is not used by voice-call store tests");
}) as never,
openSyncKeyedStore: (() => {
throw new Error("sqlite unavailable");
}) as never,
},
});
const restored = loadActiveCallsFromStore(storePath);
expect(restored.activeCalls.get("call-jsonl")?.providerCallId).toBe(call.providerCallId);
});
it("keeps oversized fallback records readable when they exceed SQLite chunk budget", async () => {
const storePath = createTestStorePath();
const call = CallRecordSchema.parse(
makePersistedCall({
callId: "call-large",
transcript: [
{
timestamp: Date.now(),
speaker: "user",
text: "x".repeat(2 * 1024 * 1024),
isFinal: true,
},
],
}),
);
persistCallRecord(storePath, call);
await flushPendingCallRecordWritesForTest();
const restored = loadActiveCallsFromStore(storePath);
expect(restored.activeCalls.get("call-large")?.providerCallId).toBe(call.providerCallId);
expect(fs.existsSync(path.join(storePath, "calls.jsonl"))).toBe(true);
});
it("does not let an older fallback record override a newer SQLite snapshot", async () => {
const storePath = createTestStorePath();
const olderFallback = CallRecordSchema.parse(
makePersistedCall({
callId: "call-mixed",
state: "answered",
transcript: [
{
timestamp: Date.now(),
speaker: "user",
text: "x".repeat(2 * 1024 * 1024),
isFinal: true,
},
],
}),
);
const newerSqlite = CallRecordSchema.parse(
makePersistedCall({
callId: "call-mixed",
state: "completed",
endedAt: Date.now(),
endReason: "completed",
}),
);
persistCallRecord(storePath, olderFallback);
await flushPendingCallRecordWritesForTest();
persistCallRecord(storePath, newerSqlite);
const restored = loadActiveCallsFromStore(storePath);
expect(restored.activeCalls.has("call-mixed")).toBe(false);
});
it("replays same-millisecond snapshots in write order", () => {
vi.useFakeTimers({ now: new Date("2026-05-31T10:00:00.000Z") });
const storePath = createTestStorePath();
const first = CallRecordSchema.parse(
makePersistedCall({ callId: "call-order", state: "ringing" }),
);
const second = CallRecordSchema.parse(
makePersistedCall({ callId: "call-order", state: "answered" }),
);
persistCallRecord(storePath, first);
persistCallRecord(storePath, second);
const restored = loadActiveCallsFromStore(storePath);
expect(restored.activeCalls.get("call-order")?.state).toBe("answered");
});
});

View File

@@ -1,16 +1,341 @@
import { createHash, randomUUID } from "node:crypto";
import fs from "node:fs";
import path from "node:path";
import type { PluginStateSyncKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime";
import {
appendRegularFile,
privateFileStore,
privateFileStoreSync,
} from "openclaw/plugin-sdk/security-runtime";
import { getOptionalVoiceCallStateRuntime } from "../runtime-state.js";
import { CallRecordSchema, TerminalStates, type CallId, type CallRecord } from "../types.js";
const pendingPersistWrites = new Set<Promise<void>>();
const CALL_RECORD_EVENTS_NAMESPACE = "call-record-events";
const CALL_RECORD_EVENT_CHUNKS_NAMESPACE = "call-record-event-chunks";
const CALL_RECORD_MIGRATIONS_NAMESPACE = "call-record-migrations";
const CALL_RECORD_JSONL_MIGRATION_KEY = "calls-jsonl-v1";
const MAX_CALL_RECORD_EVENTS = 1000;
const CALL_RECORD_EVENT_META_MAX_ENTRIES = MAX_CALL_RECORD_EVENTS + 100;
const MAX_CHUNKS_PER_CALL_RECORD_EVENT = 48;
const CALL_RECORD_CHUNK_MAX_ENTRIES =
MAX_CALL_RECORD_EVENTS * MAX_CHUNKS_PER_CALL_RECORD_EVENT + MAX_CHUNKS_PER_CALL_RECORD_EVENT;
const RAW_CHUNK_BYTES = 36 * 1024;
let callRecordEventSequence = 0;
type CallRecordEventMeta = {
chunkCount: number;
byteLength: number;
persistedAt?: number;
sequence?: number;
};
type CallRecordEventChunk = {
index: number;
dataBase64: string;
};
type CallRecordMigrationMarker = {
importedAt: string;
};
type PersistedCallRecord = {
call: CallRecord;
persistedAt: number;
sequence: number;
orderKey: string;
};
type CallRecordStateStores = {
events: PluginStateSyncKeyedStore<CallRecordEventMeta>;
chunks: PluginStateSyncKeyedStore<CallRecordEventChunk>;
migrations: PluginStateSyncKeyedStore<CallRecordMigrationMarker>;
};
function resolveCallLogPath(storePath: string): string {
return path.join(storePath, "calls.jsonl");
}
function resolvePluginStateEnv(storePath: string): NodeJS.ProcessEnv {
return { ...process.env, OPENCLAW_STATE_DIR: storePath };
}
function createCallRecordStateStores(storePath: string): CallRecordStateStores | null {
const runtime = getOptionalVoiceCallStateRuntime();
if (!runtime) {
return null;
}
const env = resolvePluginStateEnv(storePath);
return {
events: runtime.state.openSyncKeyedStore<CallRecordEventMeta>({
namespace: CALL_RECORD_EVENTS_NAMESPACE,
maxEntries: CALL_RECORD_EVENT_META_MAX_ENTRIES,
env,
}),
chunks: runtime.state.openSyncKeyedStore<CallRecordEventChunk>({
namespace: CALL_RECORD_EVENT_CHUNKS_NAMESPACE,
maxEntries: CALL_RECORD_CHUNK_MAX_ENTRIES,
env,
}),
migrations: runtime.state.openSyncKeyedStore<CallRecordMigrationMarker>({
namespace: CALL_RECORD_MIGRATIONS_NAMESPACE,
maxEntries: 100,
env,
}),
};
}
function tryCreateCallRecordStateStores(storePath: string): CallRecordStateStores | null {
try {
return createCallRecordStateStores(storePath);
} catch (err) {
console.error("[voice-call] Failed to open SQLite call record store:", err);
return null;
}
}
function buildChunkKey(eventKey: string, index: number): string {
return `${eventKey}:chunk:${String(index).padStart(4, "0")}`;
}
function buildJsonlEventKey(line: string, index: number): string {
return `jsonl:${String(index).padStart(8, "0")}:${createHash("sha256").update(line).digest("hex")}`;
}
function nextCallRecordOrder(): { persistedAt: number; sequence: number } {
const sequence = callRecordEventSequence;
callRecordEventSequence = (callRecordEventSequence + 1) % 1_000_000;
return { persistedAt: Date.now(), sequence };
}
function buildNewEventKey(order: { persistedAt: number; sequence: number }): string {
return `event:${order.persistedAt.toString(36)}:${String(order.sequence).padStart(6, "0")}:${randomUUID()}`;
}
function parseEventKeySequence(key: string): number {
const match = /^event:[^:]+:(\d+):/.exec(key);
return match ? Number.parseInt(match[1], 10) : 0;
}
function parseCallRecordLine(line: string, sequence = 0): PersistedCallRecord | null {
if (!line.trim()) {
return null;
}
try {
const parsed = JSON.parse(line);
if (parsed && typeof parsed === "object" && (parsed as { version?: unknown }).version === 2) {
const envelope = parsed as {
call?: unknown;
persistedAt?: unknown;
sequence?: unknown;
};
const call = CallRecordSchema.parse(envelope.call);
return {
call,
persistedAt:
typeof envelope.persistedAt === "number" && Number.isFinite(envelope.persistedAt)
? envelope.persistedAt
: 0,
sequence:
typeof envelope.sequence === "number" && Number.isFinite(envelope.sequence)
? envelope.sequence
: sequence,
orderKey: "",
};
}
return {
call: CallRecordSchema.parse(parsed),
persistedAt: 0,
sequence,
orderKey: "",
};
} catch {
return null;
}
}
function registerCallRecordEvent(
stores: CallRecordStateStores,
eventKey: string,
call: CallRecord,
order?: { persistedAt: number; sequence: number },
): void {
const serialized = JSON.stringify(call);
const buffer = Buffer.from(serialized, "utf8");
const chunkCount = Math.max(1, Math.ceil(buffer.byteLength / RAW_CHUNK_BYTES));
if (chunkCount > MAX_CHUNKS_PER_CALL_RECORD_EVENT) {
throw new Error(
`voice-call record exceeds SQLite chunk limit (${chunkCount}/${MAX_CHUNKS_PER_CALL_RECORD_EVENT})`,
);
}
for (let index = 0; index < chunkCount; index += 1) {
const chunk = buffer.subarray(index * RAW_CHUNK_BYTES, (index + 1) * RAW_CHUNK_BYTES);
stores.chunks.register(buildChunkKey(eventKey, index), {
index,
dataBase64: chunk.toString("base64"),
});
}
stores.events.register(eventKey, {
chunkCount,
byteLength: buffer.byteLength,
persistedAt: order?.persistedAt,
sequence: order?.sequence,
});
pruneCallRecordEvents(stores);
}
function deleteCallRecordEventRows(stores: CallRecordStateStores, eventKey: string): void {
const meta = stores.events.lookup(eventKey);
stores.events.delete(eventKey);
if (!meta) {
return;
}
for (let index = 0; index < meta.chunkCount; index += 1) {
stores.chunks.delete(buildChunkKey(eventKey, index));
}
}
function pruneCallRecordEvents(stores: CallRecordStateStores): void {
const rows = stores.events.entries();
if (rows.length <= MAX_CALL_RECORD_EVENTS) {
return;
}
const sorted = rows.toSorted((a, b) => a.createdAt - b.createdAt || a.key.localeCompare(b.key));
for (const row of sorted.slice(0, rows.length - MAX_CALL_RECORD_EVENTS)) {
deleteCallRecordEventRows(stores, row.key);
}
}
function registerCallRecordEventIfAbsent(
stores: CallRecordStateStores,
eventKey: string,
record: PersistedCallRecord,
): void {
if (!stores.events.lookup(eventKey)) {
registerCallRecordEvent(stores, eventKey, record.call, {
persistedAt: record.persistedAt,
sequence: record.sequence,
});
}
}
function readCallRecordEvent(stores: CallRecordStateStores, eventKey: string): CallRecord | null {
const meta = stores.events.lookup(eventKey);
if (!meta) {
return null;
}
const chunks: Buffer[] = [];
for (let index = 0; index < meta.chunkCount; index += 1) {
const chunk = stores.chunks.lookup(buildChunkKey(eventKey, index));
if (!chunk || chunk.index !== index) {
return null;
}
chunks.push(Buffer.from(chunk.dataBase64, "base64"));
}
const serialized = Buffer.concat(chunks, meta.byteLength).toString("utf8");
return parseCallRecordLine(serialized)?.call ?? null;
}
function ensureLegacyCallLogImported(
storePath: string,
stores: CallRecordStateStores,
): PersistedCallRecord[] {
const imported = stores.migrations.lookup(CALL_RECORD_JSONL_MIGRATION_KEY) !== undefined;
const logPath = resolveCallLogPath(storePath);
const content = privateFileStoreSync(storePath).readTextIfExists(path.basename(logPath));
if (content === null) {
if (!imported) {
stores.migrations.register(CALL_RECORD_JSONL_MIGRATION_KEY, {
importedAt: new Date().toISOString(),
});
}
return [];
}
const fallbackCalls: PersistedCallRecord[] = [];
{
let index = 0;
let importFailed = false;
for (const line of content.split("\n")) {
const parsed = parseCallRecordLine(line, index);
if (!parsed) {
index += 1;
continue;
}
// Fallback JSONL writes can appear after the migration marker if SQLite
// persistence had a transient failure. Stable keys make the importer
// idempotent if the legacy file cannot be removed.
try {
registerCallRecordEventIfAbsent(stores, buildJsonlEventKey(line, index), parsed);
} catch (err) {
importFailed = true;
fallbackCalls.push({
...parsed,
orderKey: `jsonl:${String(index).padStart(8, "0")}`,
});
console.error("[voice-call] Failed to import persisted call record:", err);
}
index += 1;
}
if (!importFailed) {
try {
fs.rmSync(logPath, { force: true });
} catch {
// Import already completed; leave an unreadable legacy log in place.
}
}
}
if (!imported) {
stores.migrations.register(CALL_RECORD_JSONL_MIGRATION_KEY, {
importedAt: new Date().toISOString(),
});
}
return fallbackCalls;
}
function readCallRecordEvents(storePath: string, stores: CallRecordStateStores): CallRecord[] {
const fallbackCalls = ensureLegacyCallLogImported(storePath, stores);
const sqliteCalls: PersistedCallRecord[] = stores.events
.entries()
.toSorted((a, b) => a.createdAt - b.createdAt || a.key.localeCompare(b.key))
.map((entry) => {
const call = readCallRecordEvent(stores, entry.key);
return call
? {
call,
persistedAt: entry.value.persistedAt ?? entry.createdAt,
sequence: entry.value.sequence ?? parseEventKeySequence(entry.key),
orderKey: entry.key,
}
: null;
})
.filter((entry): entry is PersistedCallRecord => entry !== null);
return [...sqliteCalls, ...fallbackCalls]
.toSorted(
(a, b) =>
a.persistedAt - b.persistedAt ||
a.sequence - b.sequence ||
a.orderKey.localeCompare(b.orderKey),
)
.map((entry) => entry.call);
}
export function persistCallRecord(storePath: string, call: CallRecord): void {
const logPath = path.join(storePath, "calls.jsonl");
const line = `${JSON.stringify(call)}\n`;
const stores = tryCreateCallRecordStateStores(storePath);
if (stores) {
try {
void ensureLegacyCallLogImported(storePath, stores);
const order = nextCallRecordOrder();
registerCallRecordEvent(stores, buildNewEventKey(order), call, order);
return;
} catch (err) {
console.error("[voice-call] Failed to persist call record:", err);
}
}
const logPath = resolveCallLogPath(storePath);
const order = nextCallRecordOrder();
const line = `${JSON.stringify({ version: 2, ...order, call })}\n`;
// Fire-and-forget async write to avoid blocking event loop.
const write = appendRegularFile({
filePath: logPath,
@@ -36,9 +361,17 @@ export function loadActiveCallsFromStore(storePath: string): {
processedEventIds: Set<string>;
rejectedProviderCallIds: Set<string>;
} {
const logPath = path.join(storePath, "calls.jsonl");
const content = privateFileStoreSync(storePath).readTextIfExists(path.basename(logPath));
if (content === null) {
const stores = tryCreateCallRecordStateStores(storePath);
let calls: CallRecord[];
try {
calls = stores
? readCallRecordEvents(storePath, stores)
: readCallRecordsFromLegacyLog(storePath);
} catch (err) {
console.error("[voice-call] Failed to read SQLite call records:", err);
calls = readCallRecordsFromLegacyLog(storePath);
}
if (calls.length === 0) {
return {
activeCalls: new Map(),
providerCallIdMap: new Map(),
@@ -46,19 +379,9 @@ export function loadActiveCallsFromStore(storePath: string): {
rejectedProviderCallIds: new Set(),
};
}
const lines = content.split("\n");
const callMap = new Map<CallId, CallRecord>();
for (const line of lines) {
if (!line.trim()) {
continue;
}
try {
const call = CallRecordSchema.parse(JSON.parse(line));
callMap.set(call.callId, call);
} catch {
// Skip invalid lines.
}
for (const call of calls) {
callMap.set(call.callId, call);
}
const activeCalls = new Map<CallId, CallRecord>();
@@ -86,7 +409,18 @@ export async function getCallHistoryFromStore(
storePath: string,
limit = 50,
): Promise<CallRecord[]> {
const logPath = path.join(storePath, "calls.jsonl");
if (limit <= 0) {
return [];
}
const stores = tryCreateCallRecordStateStores(storePath);
if (stores) {
try {
return readCallRecordEvents(storePath, stores).slice(-limit);
} catch (err) {
console.error("[voice-call] Failed to read SQLite call history:", err);
}
}
const logPath = resolveCallLogPath(storePath);
const content = await privateFileStore(storePath).readTextIfExists(path.basename(logPath));
if (content === null) {
return [];
@@ -94,14 +428,24 @@ export async function getCallHistoryFromStore(
const lines = content.trim().split("\n").filter(Boolean);
const calls: CallRecord[] = [];
for (const line of lines.slice(-limit)) {
try {
const parsed = CallRecordSchema.parse(JSON.parse(line));
calls.push(parsed);
} catch {
// Skip invalid lines.
for (const [index, line] of lines.slice(-limit).entries()) {
const parsed = parseCallRecordLine(line, index);
if (parsed) {
calls.push(parsed.call);
}
}
return calls;
}
function readCallRecordsFromLegacyLog(storePath: string): CallRecord[] {
const logPath = resolveCallLogPath(storePath);
const content = privateFileStoreSync(storePath).readTextIfExists(path.basename(logPath));
if (content === null) {
return [];
}
return content
.split("\n")
.map((line, index) => parseCallRecordLine(line, index)?.call ?? null)
.filter((call): call is CallRecord => call !== null);
}

View File

@@ -0,0 +1,14 @@
import { createPluginRuntimeStore, type PluginRuntime } from "openclaw/plugin-sdk/runtime-store";
export type VoiceCallStateRuntime = Pick<PluginRuntime, "state">;
const {
setRuntime: setVoiceCallStateRuntime,
clearRuntime: clearVoiceCallStateRuntime,
tryGetRuntime: getOptionalVoiceCallStateRuntime,
} = createPluginRuntimeStore<VoiceCallStateRuntime>({
pluginId: "voice-call-state",
errorMessage: "Voice Call state runtime not initialized",
});
export { clearVoiceCallStateRuntime, getOptionalVoiceCallStateRuntime, setVoiceCallStateRuntime };

View File

@@ -24,6 +24,7 @@ import type { TwilioProvider } from "./providers/twilio.js";
import { buildRealtimeVoiceInstructions } from "./realtime-agent-context.js";
import { resolveRealtimeFastContextConsult } from "./realtime-fast-context.js";
import { resolveVoiceResponseModel } from "./response-model.js";
import { setVoiceCallStateRuntime, type VoiceCallStateRuntime } from "./runtime-state.js";
import type { TelephonyTtsRuntime } from "./telephony-tts.js";
import { createTelephonyTtsProvider } from "./telephony-tts.js";
import { startTunnel, type TunnelResult } from "./tunnel.js";
@@ -265,10 +266,19 @@ export async function createVoiceCallRuntime(params: {
coreConfig: CoreConfig;
fullConfig?: OpenClawConfig;
agentRuntime: CoreAgentDeps;
stateRuntime?: VoiceCallStateRuntime["state"];
ttsRuntime?: TelephonyTtsRuntime;
logger?: Logger;
}): Promise<VoiceCallRuntime> {
const { config: rawConfig, coreConfig, fullConfig, agentRuntime, ttsRuntime, logger } = params;
const {
config: rawConfig,
coreConfig,
fullConfig,
agentRuntime,
stateRuntime,
ttsRuntime,
logger,
} = params;
const log = logger ?? {
info: console.log,
warn: console.warn,
@@ -295,6 +305,9 @@ export async function createVoiceCallRuntime(params: {
}
const provider = await resolveProvider(config);
if (stateRuntime) {
setVoiceCallStateRuntime({ state: stateRuntime });
}
const manager = new CallManager(config);
const realtimeProvider = config.realtime.enabled
? await resolveRealtimeProvider({