diff --git a/extensions/msteams/src/pending-uploads-fs.test.ts b/extensions/msteams/src/pending-uploads-fs.test.ts index 74bdb0a97c5..630198c8699 100644 --- a/extensions/msteams/src/pending-uploads-fs.test.ts +++ b/extensions/msteams/src/pending-uploads-fs.test.ts @@ -1,6 +1,7 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; +import { resetPluginStateStoreForTests } from "openclaw/plugin-sdk/plugin-state-test-runtime"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { prepareFileConsentActivityFs } from "./file-consent-helpers.js"; import { @@ -50,6 +51,7 @@ async function cleanupTempDirs(): Promise { describe("msteams pending uploads (fs-backed)", () => { beforeEach(() => { + resetPluginStateStoreForTests(); setMSTeamsRuntime(msteamsRuntimeStub); clearPendingUploads(); }); @@ -105,18 +107,12 @@ describe("msteams pending uploads (fs-backed)", () => { { env }, ); - // Confirm the backing file actually exists on disk with expected shape + // Confirm SQLite-backed plugin state was created instead of a new JSON store. const storePath = path.join(stateDir, "msteams-pending-uploads.json"); - const raw = await fs.promises.readFile(storePath, "utf-8"); - const parsed = JSON.parse(raw) as { - version: number; - uploads: Record; - }; - expect(parsed.version).toBe(1); - expect(parsed.uploads["upload-x"]?.filename).toBe("secret.bin"); - expect(Buffer.from(parsed.uploads["upload-x"].bufferBase64, "base64").toString("utf8")).toBe( - "top secret", - ); + await expect(fs.promises.access(storePath)).rejects.toThrow(); + await expect( + fs.promises.access(path.join(stateDir, "state", "openclaw.sqlite")), + ).resolves.toBeUndefined(); // Second "process": reader using the same state dir const reader = await getPendingUploadFs("upload-x", { env }); @@ -124,6 +120,26 @@ describe("msteams pending uploads (fs-backed)", () => { expect(reader?.filename).toBe("secret.bin"); }); + it("stores multi-megabyte uploads by chunking payload bytes", async () => { + const stateDir = await makeTempStateDir(); + const env = makeEnv(stateDir); + const payload = Buffer.alloc(6 * 1024 * 1024, 7); + + await storePendingUploadFs( + { + id: "upload-large", + buffer: payload, + filename: "large.bin", + conversationId: "19:conv@thread.v2", + }, + { env }, + ); + + const reader = await getPendingUploadFs("upload-large", { env }); + expect(reader?.buffer.equals(payload)).toBe(true); + expect(reader?.filename).toBe("large.bin"); + }); + it("removes persisted entries", async () => { const stateDir = await makeTempStateDir(); const env = makeEnv(stateDir); @@ -204,14 +220,85 @@ describe("msteams pending uploads (fs-backed)", () => { // Should not throw and should treat as empty expect(await getPendingUploadFs("anything", { env })).toBeUndefined(); + await expect(fs.promises.access(storePath)).rejects.toThrow(); - await fs.promises.writeFile(storePath, JSON.stringify({ version: 2, uploads: {} }), "utf-8"); - expect(await getPendingUploadFs("anything", { env })).toBeUndefined(); + const secondStateDir = await makeTempStateDir(); + const secondEnv = makeEnv(secondStateDir); + const secondStorePath = path.join(secondStateDir, "msteams-pending-uploads.json"); + await fs.promises.writeFile( + secondStorePath, + JSON.stringify({ version: 2, uploads: {} }), + "utf-8", + ); + expect(await getPendingUploadFs("anything", { env: secondEnv })).toBeUndefined(); + await expect(fs.promises.access(secondStorePath)).rejects.toThrow(); + }); + + it("imports a legacy JSON file that appears after an empty migration marker", async () => { + const stateDir = await makeTempStateDir(); + const env = makeEnv(stateDir); + const storePath = path.join(stateDir, "msteams-pending-uploads.json"); + + expect(await getPendingUploadFs("upload-late", { env })).toBeUndefined(); + await fs.promises.writeFile( + storePath, + `${JSON.stringify({ + version: 1, + uploads: { + "upload-late": { + id: "upload-late", + bufferBase64: Buffer.from("late payload").toString("base64"), + filename: "late.txt", + conversationId: "19:conv@thread.v2", + createdAt: Date.now(), + }, + }, + })}\n`, + "utf-8", + ); + + const loaded = await getPendingUploadFs("upload-late", { env }); + expect(loaded?.filename).toBe("late.txt"); + expect(loaded?.buffer.toString("utf8")).toBe("late payload"); + await expect(fs.promises.access(storePath)).rejects.toThrow(); + }); + + it("skips malformed legacy upload rows while importing valid rows", async () => { + const stateDir = await makeTempStateDir(); + const env = makeEnv(stateDir); + const storePath = path.join(stateDir, "msteams-pending-uploads.json"); + await fs.promises.writeFile( + storePath, + `${JSON.stringify({ + version: 1, + uploads: { + broken: { + id: "broken", + filename: "broken.txt", + conversationId: "19:conv@thread.v2", + createdAt: Date.now(), + }, + valid: { + id: "valid", + bufferBase64: Buffer.from("valid payload").toString("base64"), + filename: "valid.txt", + conversationId: "19:conv@thread.v2", + createdAt: Date.now(), + }, + }, + })}\n`, + "utf-8", + ); + + expect(await getPendingUploadFs("broken", { env })).toBeUndefined(); + const loaded = await getPendingUploadFs("valid", { env }); + expect(loaded?.buffer.toString("utf8")).toBe("valid payload"); }); }); describe("prepareFileConsentActivityFs end-to-end", () => { beforeEach(() => { + resetPluginStateStoreForTests(); setMSTeamsRuntime(msteamsRuntimeStub); clearPendingUploads(); }); diff --git a/extensions/msteams/src/pending-uploads-fs.ts b/extensions/msteams/src/pending-uploads-fs.ts index ba80a5f274a..b4aad1e0b7f 100644 --- a/extensions/msteams/src/pending-uploads-fs.ts +++ b/extensions/msteams/src/pending-uploads-fs.ts @@ -1,29 +1,30 @@ -/** - * Filesystem-backed pending upload store for the FileConsentCard flow. - * - * The CLI `message send --media` path runs in a different process from the - * gateway's bot monitor that receives the `fileConsent/invoke` callback. - * An in-memory `pending-uploads.ts` store cannot bridge those processes, so - * when the user clicks "Allow" the monitor handler's lookup misses and the - * user sees "card action not supported". - * - * This FS store persists pending uploads to a JSON file (with the file buffer - * base64-encoded) so any process that shares the OpenClaw state dir can read - * them back. The in-memory store in `pending-uploads.ts` is still the fast - * path for same-process flows (for example the messenger reply path); this FS - * store is a cross-process fallback. - */ - +import { createHash } from "node:crypto"; +import fs from "node:fs/promises"; +import type { PluginStateKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime"; +import { getMSTeamsRuntime } from "./runtime.js"; +import { + resolveMSTeamsSqliteStateEnv, + toPluginJsonValue, + withMSTeamsSqliteMutationLock, +} from "./sqlite-state.js"; import { resolveMSTeamsStorePath } from "./storage.js"; -import { readJsonFile, withFileLock, writeJsonFile } from "./store-fs.js"; +import { readJsonFile } from "./store-fs.js"; /** TTL for persisted pending uploads (matches in-memory store). */ const PENDING_UPLOAD_TTL_MS = 5 * 60 * 1000; /** Cap to avoid unbounded growth if a process crashes mid-flow. */ const MAX_PENDING_UPLOADS = 100; +const MAX_CHUNKS_PER_UPLOAD = 3072; +const MAX_PENDING_UPLOAD_CHUNK_ROWS = 45_000; +const RAW_CHUNK_BYTES = 36 * 1024; +const PENDING_UPLOAD_META_MAX_ENTRIES = MAX_PENDING_UPLOADS + 100; const STORE_FILENAME = "msteams-pending-uploads.json"; +const PENDING_UPLOAD_META_NAMESPACE = "pending-uploads"; +const PENDING_UPLOAD_CHUNKS_NAMESPACE = "pending-upload-chunks"; +const PENDING_UPLOAD_MIGRATIONS_NAMESPACE = "pending-upload-migrations"; +const PENDING_UPLOAD_LOCK_FILENAME = "msteams-pending-uploads.sqlite.lock"; type PendingUploadFsRecord = { id: string; @@ -53,6 +54,21 @@ type PendingUploadStoreData = { const empty: PendingUploadStoreData = { version: 1, uploads: {} }; +type PendingUploadMetaRecord = Omit & { + chunkCount: number; + byteLength: number; +}; + +type PendingUploadChunkRecord = { + id: string; + index: number; + dataBase64: string; +}; + +type PendingUploadMigrationMarker = { + importedAt: string; +}; + type PendingUploadsFsOptions = { env?: NodeJS.ProcessEnv; homedir?: () => string; @@ -61,7 +77,7 @@ type PendingUploadsFsOptions = { ttlMs?: number; }; -function resolveFilePath(options: PendingUploadsFsOptions | undefined): string { +function resolveLegacyFilePath(options: PendingUploadsFsOptions | undefined): string { return resolveMSTeamsStorePath({ filename: STORE_FILENAME, env: options?.env, @@ -71,6 +87,60 @@ function resolveFilePath(options: PendingUploadsFsOptions | undefined): string { }); } +function createMetaStore( + options: PendingUploadsFsOptions | undefined, +): PluginStateKeyedStore { + return getMSTeamsRuntime().state.openKeyedStore({ + namespace: PENDING_UPLOAD_META_NAMESPACE, + maxEntries: PENDING_UPLOAD_META_MAX_ENTRIES, + env: resolveMSTeamsSqliteStateEnv(options), + }); +} + +function createChunkStore( + options: PendingUploadsFsOptions | undefined, +): PluginStateKeyedStore { + return getMSTeamsRuntime().state.openKeyedStore({ + namespace: PENDING_UPLOAD_CHUNKS_NAMESPACE, + maxEntries: MAX_PENDING_UPLOAD_CHUNK_ROWS, + env: resolveMSTeamsSqliteStateEnv(options), + }); +} + +function createMigrationStore( + options: PendingUploadsFsOptions | undefined, +): PluginStateKeyedStore { + return getMSTeamsRuntime().state.openKeyedStore({ + namespace: PENDING_UPLOAD_MIGRATIONS_NAMESPACE, + maxEntries: 100, + env: resolveMSTeamsSqliteStateEnv(options), + }); +} + +function buildUploadKey(id: string): string { + return `upload:${createHash("sha256").update(id).digest("hex")}`; +} + +function buildMetaKey(id: string): string { + return `${buildUploadKey(id)}:meta`; +} + +function buildChunkKey(id: string, index: number): string { + return `${buildUploadKey(id)}:chunk:${String(index).padStart(4, "0")}`; +} + +function buildMigrationKey(filePath: string): string { + return `legacy-json:${createHash("sha256").update(filePath).digest("hex")}`; +} + +function buildMigrationContentKey(filePath: string, value: unknown): string { + return `legacy-json-content:${createHash("sha256") + .update(filePath) + .update("\0") + .update(JSON.stringify(value) ?? "undefined") + .digest("hex")}`; +} + function pruneExpired( uploads: Record, nowMs: number, @@ -98,10 +168,13 @@ function pruneToLimit( return Object.fromEntries(keep); } -function recordToUpload(record: PendingUploadFsRecord): PendingUploadFs { +function recordToUpload( + record: PendingUploadFsRecord | PendingUploadMetaRecord, + buffer: Buffer, +): PendingUploadFs { return { id: record.id, - buffer: Buffer.from(record.bufferBase64, "base64"), + buffer, filename: record.filename, contentType: record.contentType, conversationId: record.conversationId, @@ -123,13 +196,220 @@ function isValidStore(value: unknown): value is PendingUploadStoreData { ); } -async function readStore(filePath: string, ttlMs: number): Promise { - const { value } = await readJsonFile(filePath, empty); - if (!isValidStore(value)) { - return { version: 1, uploads: {} }; +function normalizeLegacyUploadRecord(value: unknown): PendingUploadFsRecord | null { + if (!value || typeof value !== "object") { + return null; + } + const record = value as Partial; + if ( + typeof record.id !== "string" || + !record.id || + typeof record.bufferBase64 !== "string" || + typeof record.filename !== "string" || + !record.filename || + typeof record.conversationId !== "string" || + !record.conversationId || + typeof record.createdAt !== "number" || + !Number.isFinite(record.createdAt) + ) { + return null; + } + return { + id: record.id, + bufferBase64: record.bufferBase64, + filename: record.filename, + contentType: typeof record.contentType === "string" ? record.contentType : undefined, + conversationId: record.conversationId, + consentCardActivityId: + typeof record.consentCardActivityId === "string" ? record.consentCardActivityId : undefined, + createdAt: record.createdAt, + }; +} + +function normalizeLegacyUploads(value: unknown): Record { + if (!isValidStore(value)) { + return {}; + } + const uploads: Record = {}; + for (const record of Object.values(value.uploads)) { + const normalized = normalizeLegacyUploadRecord(record); + if (normalized) { + uploads[normalized.id] = normalized; + } + } + return uploads; +} + +async function deleteUploadRows( + id: string, + metaStore: PluginStateKeyedStore, + chunkStore: PluginStateKeyedStore, +): Promise { + const existing = await metaStore.lookup(buildMetaKey(id)); + await metaStore.delete(buildMetaKey(id)); + if (!existing) { + return; + } + const chunkCount = existing.chunkCount; + for (let index = 0; index < chunkCount; index += 1) { + await chunkStore.delete(buildChunkKey(id, index)); + } +} + +async function registerUploadRows( + record: PendingUploadFsRecord, + metaStore: PluginStateKeyedStore, + chunkStore: PluginStateKeyedStore, + ttlMs: number, + overwrite: boolean, +): Promise { + const buffer = Buffer.from(record.bufferBase64, "base64"); + const chunkCount = Math.max(1, Math.ceil(buffer.byteLength / RAW_CHUNK_BYTES)); + if (chunkCount > MAX_CHUNKS_PER_UPLOAD) { + throw new Error( + `Microsoft Teams pending upload ${record.id} exceeds SQLite chunk limit (${chunkCount}/${MAX_CHUNKS_PER_UPLOAD})`, + ); + } + if (overwrite) { + await deleteUploadRows(record.id, metaStore, chunkStore); + } else if (await metaStore.lookup(buildMetaKey(record.id))) { + return; + } + await pruneUploadStore(metaStore, chunkStore, ttlMs, chunkCount); + for (let index = 0; index < chunkCount; index += 1) { + const chunk = buffer.subarray(index * RAW_CHUNK_BYTES, (index + 1) * RAW_CHUNK_BYTES); + await chunkStore.register( + buildChunkKey(record.id, index), + toPluginJsonValue({ + id: record.id, + index, + dataBase64: chunk.toString("base64"), + }), + ); + } + await metaStore.register( + buildMetaKey(record.id), + toPluginJsonValue({ + id: record.id, + filename: record.filename, + contentType: record.contentType, + conversationId: record.conversationId, + consentCardActivityId: record.consentCardActivityId, + createdAt: record.createdAt, + chunkCount, + byteLength: buffer.byteLength, + }), + ); +} + +async function importLegacyStore( + options: PendingUploadsFsOptions | undefined, + metaStore: PluginStateKeyedStore, + chunkStore: PluginStateKeyedStore, + ttlMs: number, +): Promise { + const legacyFilePath = resolveLegacyFilePath(options); + const migrationStore = createMigrationStore(options); + const migrationKey = buildMigrationKey(legacyFilePath); + const imported = (await migrationStore.lookup(migrationKey)) !== undefined; + const { value, exists } = await readJsonFile(legacyFilePath, empty); + if (!exists) { + if (!imported) { + await migrationStore.register(migrationKey, { importedAt: new Date().toISOString() }); + } + return; + } + const contentKey = buildMigrationContentKey(legacyFilePath, value); + if (await migrationStore.lookup(contentKey)) { + return; + } + const legacy = pruneToLimit(pruneExpired(normalizeLegacyUploads(value), Date.now(), ttlMs)); + for (const record of Object.values(legacy)) { + await registerUploadRows(record, metaStore, chunkStore, ttlMs, false); + } + await migrationStore.register(contentKey, { importedAt: new Date().toISOString() }); + if (!imported) { + await migrationStore.register(migrationKey, { importedAt: new Date().toISOString() }); + } + await fs.rm(legacyFilePath, { force: true }).catch(() => {}); +} + +async function withPendingUploadLock( + options: PendingUploadsFsOptions | undefined, + run: () => Promise, +): Promise { + return await withMSTeamsSqliteMutationLock(options, PENDING_UPLOAD_LOCK_FILENAME, run); +} + +async function ensureLegacyImported( + options: PendingUploadsFsOptions | undefined, + metaStore: PluginStateKeyedStore, + chunkStore: PluginStateKeyedStore, + ttlMs: number, +): Promise { + await withPendingUploadLock(options, () => + importLegacyStore(options, metaStore, chunkStore, ttlMs), + ); +} + +async function readUploadRows( + id: string, + metaStore: PluginStateKeyedStore, + chunkStore: PluginStateKeyedStore, +): Promise { + const meta = await metaStore.lookup(buildMetaKey(id)); + if (!meta) { + return undefined; + } + const chunks: Buffer[] = []; + for (let index = 0; index < meta.chunkCount; index += 1) { + const chunk = await chunkStore.lookup(buildChunkKey(id, index)); + if (!chunk || chunk.id !== id || chunk.index !== index) { + return undefined; + } + chunks.push(Buffer.from(chunk.dataBase64, "base64")); + } + return recordToUpload(meta, Buffer.concat(chunks, meta.byteLength)); +} + +async function pruneUploadStore( + metaStore: PluginStateKeyedStore, + chunkStore: PluginStateKeyedStore, + ttlMs: number, + extraChunkRows = 0, +): Promise { + const rows = await metaStore.entries(); + const liveRows = []; + const now = Date.now(); + let liveChunkRows = 0; + for (const row of rows) { + if (now - row.value.createdAt > ttlMs) { + await deleteUploadRows(row.value.id, metaStore, chunkStore); + continue; + } + liveChunkRows += row.value.chunkCount; + liveRows.push(row); + } + if ( + liveRows.length <= MAX_PENDING_UPLOADS && + liveChunkRows + extraChunkRows <= MAX_PENDING_UPLOAD_CHUNK_ROWS + ) { + return; + } + const sorted = liveRows.toSorted( + (a, b) => a.value.createdAt - b.value.createdAt || a.value.id.localeCompare(b.value.id), + ); + for (const row of sorted) { + if ( + liveRows.length <= MAX_PENDING_UPLOADS && + liveChunkRows + extraChunkRows <= MAX_PENDING_UPLOAD_CHUNK_ROWS + ) { + break; + } + await deleteUploadRows(row.value.id, metaStore, chunkStore); + liveChunkRows -= row.value.chunkCount; + liveRows.pop(); } - const uploads = pruneToLimit(pruneExpired(value.uploads, Date.now(), ttlMs)); - return { version: 1, uploads }; } /** @@ -149,20 +429,26 @@ export async function storePendingUploadFs( options?: PendingUploadsFsOptions, ): Promise { const ttlMs = options?.ttlMs ?? PENDING_UPLOAD_TTL_MS; - const filePath = resolveFilePath(options); - await withFileLock(filePath, empty, async () => { - const store = await readStore(filePath, ttlMs); - store.uploads[upload.id] = { - id: upload.id, - bufferBase64: upload.buffer.toString("base64"), - filename: upload.filename, - contentType: upload.contentType, - conversationId: upload.conversationId, - consentCardActivityId: upload.consentCardActivityId, - createdAt: Date.now(), - }; - store.uploads = pruneToLimit(pruneExpired(store.uploads, Date.now(), ttlMs)); - await writeJsonFile(filePath, store); + const metaStore = createMetaStore(options); + const chunkStore = createChunkStore(options); + await withPendingUploadLock(options, async () => { + await importLegacyStore(options, metaStore, chunkStore, ttlMs); + await registerUploadRows( + { + id: upload.id, + bufferBase64: upload.buffer.toString("base64"), + filename: upload.filename, + contentType: upload.contentType, + conversationId: upload.conversationId, + consentCardActivityId: upload.consentCardActivityId, + createdAt: Date.now(), + }, + metaStore, + chunkStore, + ttlMs, + true, + ); + await pruneUploadStore(metaStore, chunkStore, ttlMs); }); } @@ -177,16 +463,18 @@ export async function getPendingUploadFs( return undefined; } const ttlMs = options?.ttlMs ?? PENDING_UPLOAD_TTL_MS; - const filePath = resolveFilePath(options); - const store = await readStore(filePath, ttlMs); - const record = store.uploads[id]; - if (!record) { + const metaStore = createMetaStore(options); + const chunkStore = createChunkStore(options); + await ensureLegacyImported(options, metaStore, chunkStore, ttlMs); + const upload = await readUploadRows(id, metaStore, chunkStore); + if (!upload) { return undefined; } - if (Date.now() - record.createdAt > ttlMs) { + if (Date.now() - upload.createdAt > ttlMs) { + await removePendingUploadFs(id, options); return undefined; } - return recordToUpload(record); + return upload; } /** @@ -201,14 +489,11 @@ export async function removePendingUploadFs( return; } const ttlMs = options?.ttlMs ?? PENDING_UPLOAD_TTL_MS; - const filePath = resolveFilePath(options); - await withFileLock(filePath, empty, async () => { - const store = await readStore(filePath, ttlMs); - if (!(id in store.uploads)) { - return; - } - delete store.uploads[id]; - await writeJsonFile(filePath, store); + const metaStore = createMetaStore(options); + const chunkStore = createChunkStore(options); + await withPendingUploadLock(options, async () => { + await importLegacyStore(options, metaStore, chunkStore, ttlMs); + await deleteUploadRows(id, metaStore, chunkStore); }); } @@ -222,14 +507,17 @@ export async function setPendingUploadActivityIdFs( options?: PendingUploadsFsOptions, ): Promise { const ttlMs = options?.ttlMs ?? PENDING_UPLOAD_TTL_MS; - const filePath = resolveFilePath(options); - await withFileLock(filePath, empty, async () => { - const store = await readStore(filePath, ttlMs); - const record = store.uploads[id]; - if (!record) { + const metaStore = createMetaStore(options); + const chunkStore = createChunkStore(options); + await withPendingUploadLock(options, async () => { + await importLegacyStore(options, metaStore, chunkStore, ttlMs); + const record = await metaStore.lookup(buildMetaKey(id)); + if (!record || Date.now() - record.createdAt > ttlMs) { return; } - record.consentCardActivityId = activityId; - await writeJsonFile(filePath, store); + await metaStore.register( + buildMetaKey(id), + toPluginJsonValue({ ...record, consentCardActivityId: activityId }), + ); }); } diff --git a/extensions/msteams/src/sso-token-store.test.ts b/extensions/msteams/src/sso-token-store.test.ts index 204bbf6f22e..c9d2d9a6939 100644 --- a/extensions/msteams/src/sso-token-store.test.ts +++ b/extensions/msteams/src/sso-token-store.test.ts @@ -1,10 +1,22 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; -import { describe, expect, it } from "vitest"; +import { resetPluginStateStoreForTests } from "openclaw/plugin-sdk/plugin-state-test-runtime"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { setMSTeamsRuntime } from "./runtime.js"; import { createMSTeamsSsoTokenStoreFs } from "./sso-token-store.js"; +import { msteamsRuntimeStub } from "./test-support/runtime.js"; + +describe("msteams sso token store (plugin state)", () => { + beforeEach(() => { + resetPluginStateStoreForTests(); + setMSTeamsRuntime(msteamsRuntimeStub); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); -describe("msteams sso token store (fs)", () => { it("keeps distinct tokens when connectionName and userId contain the legacy delimiter", async () => { const stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-sso-")); const storePath = path.join(stateDir, "msteams-sso-tokens.json"); @@ -29,10 +41,10 @@ describe("msteams sso token store (fs)", () => { expect(await store.get(first)).toEqual(first); expect(await store.get(second)).toEqual(second); - const raw = JSON.parse(await fs.readFile(storePath, "utf8")) as { - tokens: Record; - }; - expect(Object.keys(raw.tokens)).toHaveLength(2); + await expect(fs.access(storePath)).rejects.toThrow(); + await expect( + fs.access(path.join(stateDir, "state", "openclaw.sqlite")), + ).resolves.toBeUndefined(); }); it("loads legacy flat-key files by rebuilding keys from stored token payloads", async () => { @@ -70,5 +82,90 @@ describe("msteams sso token store (fs)", () => { token: "token-1", updatedAt: "2026-04-10T00:00:00.000Z", }); + await expect(fs.access(storePath)).rejects.toThrow(); + }); + + it("keeps plugin-state keys bounded for long Teams identifiers", async () => { + const stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-sso-long-")); + const store = createMSTeamsSsoTokenStoreFs({ stateDir }); + const token = { + connectionName: `conn-${"c".repeat(1000)}`, + userId: `user-${"u".repeat(2000)}`, + token: "token-long", + updatedAt: "2026-04-10T00:00:00.000Z", + } as const; + + await store.save(token); + expect(await store.get(token)).toEqual(token); + expect(await store.remove(token)).toBe(true); + expect(await store.get(token)).toBeNull(); + }); + + it("imports a legacy token file that appears after an empty migration marker", async () => { + const stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-sso-late-")); + const storePath = path.join(stateDir, "msteams-sso-tokens.json"); + const store = createMSTeamsSsoTokenStoreFs({ storePath }); + + expect(await store.get({ connectionName: "conn", userId: "user-late" })).toBeNull(); + await fs.writeFile( + storePath, + `${JSON.stringify({ + version: 1, + tokens: { + late: { + connectionName: "conn", + userId: "user-late", + token: "token-late", + updatedAt: "2026-04-10T00:00:00.000Z", + }, + }, + })}\n`, + "utf8", + ); + + expect(await store.get({ connectionName: "conn", userId: "user-late" })).toEqual({ + connectionName: "conn", + userId: "user-late", + token: "token-late", + updatedAt: "2026-04-10T00:00:00.000Z", + }); + await expect(fs.access(storePath)).rejects.toThrow(); + }); + + it("does not resurrect removed tokens when a migrated legacy file cannot be deleted", async () => { + const stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-msteams-sso-stale-")); + const storePath = path.join(stateDir, "msteams-sso-tokens.json"); + await fs.writeFile( + storePath, + `${JSON.stringify({ + version: 1, + tokens: { + stale: { + connectionName: "conn", + userId: "user-stale", + token: "token-stale", + updatedAt: "2026-04-10T00:00:00.000Z", + }, + }, + })}\n`, + "utf8", + ); + const originalRm = fs.rm; + vi.spyOn(fs, "rm").mockImplementation(async (target, options) => { + if (target === storePath) { + throw new Error("cannot remove"); + } + return await originalRm(target, options); + }); + + const store = createMSTeamsSsoTokenStoreFs({ storePath }); + expect(await store.get({ connectionName: "conn", userId: "user-stale" })).toEqual({ + connectionName: "conn", + userId: "user-stale", + token: "token-stale", + updatedAt: "2026-04-10T00:00:00.000Z", + }); + expect(await store.remove({ connectionName: "conn", userId: "user-stale" })).toBe(true); + expect(await store.get({ connectionName: "conn", userId: "user-stale" })).toBeNull(); }); }); diff --git a/extensions/msteams/src/sso-token-store.ts b/extensions/msteams/src/sso-token-store.ts index 21fba4a12b6..ec9ff006afe 100644 --- a/extensions/msteams/src/sso-token-store.ts +++ b/extensions/msteams/src/sso-token-store.ts @@ -1,5 +1,5 @@ /** - * File-backed store for Bot Framework OAuth SSO tokens. + * SQLite-backed store for Bot Framework OAuth SSO tokens. * * Tokens are keyed by (connectionName, userId). `userId` should be the * stable AAD object ID (`activity.from.aadObjectId`) when available, @@ -11,8 +11,17 @@ * valid token without reaching back into Bot Framework every turn. */ +import { createHash } from "node:crypto"; +import fs from "node:fs/promises"; +import type { PluginStateKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime"; +import { getMSTeamsRuntime } from "./runtime.js"; +import { + resolveMSTeamsSqliteStateEnv, + toPluginJsonValue, + withMSTeamsSqliteMutationLock, +} from "./sqlite-state.js"; import { resolveMSTeamsStorePath } from "./storage.js"; -import { readJsonFile, withFileLock, writeJsonFile } from "./store-fs.js"; +import { readJsonFile } from "./store-fs.js"; type MSTeamsSsoStoredToken = { /** Connection name from the Bot Framework OAuth connection setting. */ @@ -40,13 +49,54 @@ type SsoStoreData = { }; const STORE_FILENAME = "msteams-sso-tokens.json"; +const SSO_TOKENS_NAMESPACE = "sso-tokens"; +const SSO_TOKEN_MIGRATIONS_NAMESPACE = "sso-token-migrations"; +const SSO_TOKEN_LOCK_FILENAME = "msteams-sso-tokens.sqlite.lock"; +const MAX_SSO_TOKENS = 5000; const STORE_KEY_VERSION_PREFIX = "v2:"; function makeKey(connectionName: string, userId: string): string { - return `${STORE_KEY_VERSION_PREFIX}${Buffer.from( - JSON.stringify([connectionName, userId]), - "utf8", - ).toString("base64url")}`; + return `${STORE_KEY_VERSION_PREFIX}${createHash("sha256") + .update(JSON.stringify([connectionName, userId])) + .digest("hex")}`; +} + +function buildMigrationKey(filePath: string): string { + return `legacy-json:${createHash("sha256").update(filePath).digest("hex")}`; +} + +function buildMigrationContentKey(filePath: string, value: unknown): string { + return `legacy-json-content:${createHash("sha256") + .update(filePath) + .update("\0") + .update(JSON.stringify(value) ?? "undefined") + .digest("hex")}`; +} + +function createTokenStore(params?: { + env?: NodeJS.ProcessEnv; + homedir?: () => string; + stateDir?: string; + storePath?: string; +}): PluginStateKeyedStore { + return getMSTeamsRuntime().state.openKeyedStore({ + namespace: SSO_TOKENS_NAMESPACE, + maxEntries: MAX_SSO_TOKENS, + env: resolveMSTeamsSqliteStateEnv(params), + }); +} + +function createMigrationStore(params?: { + env?: NodeJS.ProcessEnv; + homedir?: () => string; + stateDir?: string; + storePath?: string; +}): PluginStateKeyedStore<{ importedAt: string }> { + return getMSTeamsRuntime().state.openKeyedStore<{ importedAt: string }>({ + namespace: SSO_TOKEN_MIGRATIONS_NAMESPACE, + maxEntries: 100, + env: resolveMSTeamsSqliteStateEnv(params), + }); } function normalizeStoredToken(value: unknown): MSTeamsSsoStoredToken | null { @@ -89,60 +139,81 @@ export function createMSTeamsSsoTokenStoreFs(params?: { stateDir?: string; storePath?: string; }): MSTeamsSsoTokenStore { - const filePath = resolveMSTeamsStorePath({ + const legacyFilePath = resolveMSTeamsStorePath({ filename: STORE_FILENAME, env: params?.env, homedir: params?.homedir, stateDir: params?.stateDir, storePath: params?.storePath, }); - const empty: SsoStoreData = { version: 1, tokens: {} }; + const tokenStore = createTokenStore(params); + const migrationStore = createMigrationStore(params); + const migrationKey = buildMigrationKey(legacyFilePath); + let legacyImportPromise: Promise | null = null; - const readStore = async (): Promise => { - const { value } = await readJsonFile(filePath, empty); - if (!isSsoStoreData(value)) { - return { version: 1, tokens: {} }; + const importLegacyStore = async (): Promise => { + const imported = (await migrationStore.lookup(migrationKey)) !== undefined; + const { value, exists } = await readJsonFile(legacyFilePath, empty); + const contentKey = exists ? buildMigrationContentKey(legacyFilePath, value) : null; + if (contentKey && (await migrationStore.lookup(contentKey))) { + return; } - const tokens: Record = {}; - for (const stored of Object.values(value.tokens)) { - const normalized = normalizeStoredToken(stored); - if (!normalized) { - continue; + if (exists && isSsoStoreData(value)) { + for (const stored of Object.values(value.tokens)) { + const normalized = normalizeStoredToken(stored); + if (!normalized) { + continue; + } + await tokenStore.registerIfAbsent( + makeKey(normalized.connectionName, normalized.userId), + toPluginJsonValue(normalized), + ); } - tokens[makeKey(normalized.connectionName, normalized.userId)] = normalized; } - return { - version: 1, - tokens, - }; + if (contentKey) { + await migrationStore.register(contentKey, { importedAt: new Date().toISOString() }); + } + if (!imported) { + await migrationStore.register(migrationKey, { importedAt: new Date().toISOString() }); + } + if (exists) { + await fs.rm(legacyFilePath, { force: true }).catch(() => {}); + } + }; + + const ensureLegacyImported = async (): Promise => { + if (!legacyImportPromise) { + legacyImportPromise = withMSTeamsSqliteMutationLock(params, SSO_TOKEN_LOCK_FILENAME, () => + importLegacyStore(), + ).finally(() => { + legacyImportPromise = null; + }); + } + await legacyImportPromise; }; return { async get({ connectionName, userId }) { - const store = await readStore(); - return store.tokens[makeKey(connectionName, userId)] ?? null; + await ensureLegacyImported(); + return (await tokenStore.lookup(makeKey(connectionName, userId))) ?? null; }, async save(token) { - await withFileLock(filePath, empty, async () => { - const store = await readStore(); - const key = makeKey(token.connectionName, token.userId); - store.tokens[key] = { ...token }; - await writeJsonFile(filePath, store); + await withMSTeamsSqliteMutationLock(params, SSO_TOKEN_LOCK_FILENAME, async () => { + await importLegacyStore(); + await tokenStore.register( + makeKey(token.connectionName, token.userId), + toPluginJsonValue({ ...token }), + ); }); }, async remove({ connectionName, userId }) { let removed = false; - await withFileLock(filePath, empty, async () => { - const store = await readStore(); - const key = makeKey(connectionName, userId); - if (store.tokens[key]) { - delete store.tokens[key]; - removed = true; - await writeJsonFile(filePath, store); - } + await withMSTeamsSqliteMutationLock(params, SSO_TOKEN_LOCK_FILENAME, async () => { + await importLegacyStore(); + removed = await tokenStore.delete(makeKey(connectionName, userId)); }); return removed; }, diff --git a/extensions/voice-call/index.ts b/extensions/voice-call/index.ts index f24ca207739..998ea3d078c 100644 --- a/extensions/voice-call/index.ts +++ b/extensions/voice-call/index.ts @@ -300,6 +300,7 @@ export default definePluginEntry({ coreConfig: api.config as CoreConfig, fullConfig: api.config, agentRuntime: api.runtime.agent, + stateRuntime: api.runtime.state, ttsRuntime: api.runtime.tts, logger: api.logger, }); @@ -811,6 +812,7 @@ export default definePluginEntry({ program, config, ensureRuntime, + stateRuntime: api.runtime.state, logger: api.logger, }), { commands: ["voicecall"] }, diff --git a/extensions/voice-call/src/cli.ts b/extensions/voice-call/src/cli.ts index b6bff84bcbc..92fc670c9e4 100644 --- a/extensions/voice-call/src/cli.ts +++ b/extensions/voice-call/src/cli.ts @@ -18,6 +18,8 @@ import { } from "openclaw/plugin-sdk/string-coerce-runtime"; import { sleep } from "../api.js"; import { validateProviderConfig, type VoiceCallConfig } from "./config.js"; +import { getCallHistoryFromStore } from "./manager/store.js"; +import { setVoiceCallStateRuntime, type VoiceCallStateRuntime } from "./runtime-state.js"; import type { VoiceCallRuntime } from "./runtime.js"; import { resolveUserPath } from "./utils.js"; import { resolveWebhookExposureStatus } from "./webhook-exposure.js"; @@ -426,9 +428,15 @@ export function registerVoiceCallCli(params: { program: Command; config: VoiceCallConfig; ensureRuntime: () => Promise; + stateRuntime?: VoiceCallStateRuntime["state"]; logger: Logger; }) { - const { program, config, ensureRuntime, logger } = params; + const { program, config, ensureRuntime, stateRuntime } = params; + const ensureHistoryStateRuntime = (): void => { + if (stateRuntime) { + setVoiceCallStateRuntime({ state: stateRuntime }); + } + }; const root = program .command("voicecall") .description("Voice call utilities") @@ -745,43 +753,68 @@ export function registerVoiceCallCli(params: { const since = parseVoiceCallIntOption(options.since, "--since", { min: 0 }); const pollMs = parseVoiceCallIntOption(options.poll, "--poll", { min: 50 }); - if (!fs.existsSync(file)) { - logger.error(`No log file at ${file}`); - process.exit(1); - } - - const initial = fs.readFileSync(file, "utf8"); - const lines = initial.split("\n").filter(Boolean); - for (const line of lines.slice(Math.max(0, lines.length - since))) { - writeStdoutLine(line); - } - - let offset = Buffer.byteLength(initial, "utf8"); - - for (;;) { - try { - const stat = fs.statSync(file); - if (stat.size < offset) { - offset = 0; + const tailSqliteHistory = async (initialLimit: number): Promise => { + ensureHistoryStateRuntime(); + const seen = new Set(); + const printCall = (call: unknown): void => { + const line = JSON.stringify(call); + if (!seen.has(line)) { + seen.add(line); + writeStdoutLine(line); } - if (stat.size > offset) { - const fd = fs.openSync(file, "r"); - try { - const buf = Buffer.alloc(stat.size - offset); - fs.readSync(fd, buf, 0, buf.length, offset); - offset = stat.size; - const text = buf.toString("utf8"); - for (const line of text.split("\n").filter(Boolean)) { - writeStdoutLine(line); - } - } finally { - fs.closeSync(fd); - } + }; + if (initialLimit > 0) { + for (const call of await getCallHistoryFromStore(path.dirname(file), initialLimit)) { + printCall(call); } - } catch { - // ignore and retry } - await sleep(pollMs); + for (;;) { + try { + for (const call of await getCallHistoryFromStore(path.dirname(file), 1000)) { + printCall(call); + } + } catch { + // ignore and retry + } + await sleep(pollMs); + } + }; + + if (fs.existsSync(file) && path.basename(file) !== "calls.jsonl") { + const initial = fs.readFileSync(file, "utf8"); + const lines = initial.split("\n").filter(Boolean); + for (const line of lines.slice(Math.max(0, lines.length - since))) { + writeStdoutLine(line); + } + + let offset = Buffer.byteLength(initial, "utf8"); + for (;;) { + try { + const stat = fs.statSync(file); + if (stat.size < offset) { + offset = 0; + } + if (stat.size > offset) { + const fd = fs.openSync(file, "r"); + try { + const buf = Buffer.alloc(stat.size - offset); + fs.readSync(fd, buf, 0, buf.length, offset); + offset = stat.size; + const text = buf.toString("utf8"); + for (const line of text.split("\n").filter(Boolean)) { + writeStdoutLine(line); + } + } finally { + fs.closeSync(fd); + } + } + } catch { + // ignore and retry + } + await sleep(pollMs); + } + } else { + await tailSqliteHistory(since); } }); @@ -794,41 +827,50 @@ export function registerVoiceCallCli(params: { const file = options.file; const last = parseVoiceCallIntOption(options.last, "--last", { min: 1 }); - if (!fs.existsSync(file)) { - throw new Error("No log file at " + file); + if (fs.existsSync(file) && path.basename(file) !== "calls.jsonl") { + const content = fs.readFileSync(file, "utf8"); + const calls = content + .split("\n") + .filter(Boolean) + .slice(-last) + .map((line) => { + try { + const parsed = JSON.parse(line) as { call?: unknown }; + return (parsed.call ?? parsed) as { metadata?: Record }; + } catch { + return null; + } + }) + .filter((call): call is { metadata?: Record } => call !== null); + writeVoiceCallLatencySummary(calls); + } else { + ensureHistoryStateRuntime(); + writeVoiceCallLatencySummary(await getCallHistoryFromStore(path.dirname(file), last)); } - - const content = fs.readFileSync(file, "utf8"); - const lines = content.split("\n").filter(Boolean).slice(-last); - - const turnLatencyMs: number[] = []; - const listenWaitMs: number[] = []; - - for (const line of lines) { - try { - const parsed = JSON.parse(line) as { - metadata?: { lastTurnLatencyMs?: unknown; lastTurnListenWaitMs?: unknown }; - }; - const latency = parsed.metadata?.lastTurnLatencyMs; - const listenWait = parsed.metadata?.lastTurnListenWaitMs; - if (typeof latency === "number" && Number.isFinite(latency)) { - turnLatencyMs.push(latency); - } - if (typeof listenWait === "number" && Number.isFinite(listenWait)) { - listenWaitMs.push(listenWait); - } - } catch { - // ignore malformed JSON lines - } - } - - writeStdoutJson({ - recordsScanned: lines.length, - turnLatency: summarizeSeries(turnLatencyMs), - listenWait: summarizeSeries(listenWaitMs), - }); }); + function writeVoiceCallLatencySummary(calls: Array<{ metadata?: Record }>) { + const turnLatencyMs: number[] = []; + const listenWaitMs: number[] = []; + + for (const call of calls) { + const latency = call.metadata?.lastTurnLatencyMs; + const listenWait = call.metadata?.lastTurnListenWaitMs; + if (typeof latency === "number" && Number.isFinite(latency)) { + turnLatencyMs.push(latency); + } + if (typeof listenWait === "number" && Number.isFinite(listenWait)) { + listenWaitMs.push(listenWait); + } + } + + writeStdoutJson({ + recordsScanned: calls.length, + turnLatency: summarizeSeries(turnLatencyMs), + listenWait: summarizeSeries(listenWaitMs), + }); + } + root .command("expose") .description("Enable/disable Tailscale serve/funnel for the webhook") diff --git a/extensions/voice-call/src/manager/store.test.ts b/extensions/voice-call/src/manager/store.test.ts new file mode 100644 index 00000000000..5a68a1cd027 --- /dev/null +++ b/extensions/voice-call/src/manager/store.test.ts @@ -0,0 +1,189 @@ +import fs from "node:fs"; +import path from "node:path"; +import type { OpenKeyedStoreOptions } from "openclaw/plugin-sdk/plugin-state-runtime"; +import { + createPluginStateSyncKeyedStoreForTests, + resetPluginStateStoreForTests, +} from "openclaw/plugin-sdk/plugin-state-test-runtime"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { + createTestStorePath, + makePersistedCall, + writeCallsToStore, +} from "../manager.test-harness.js"; +import { clearVoiceCallStateRuntime, setVoiceCallStateRuntime } from "../runtime-state.js"; +import { CallRecordSchema } from "../types.js"; +import { + flushPendingCallRecordWritesForTest, + getCallHistoryFromStore, + loadActiveCallsFromStore, + persistCallRecord, +} from "./store.js"; + +function installStateRuntime(): void { + setVoiceCallStateRuntime({ + state: { + resolveStateDir: () => "", + openKeyedStore: (() => { + throw new Error("openKeyedStore is not used by voice-call store tests"); + }) as never, + openSyncKeyedStore: (options: OpenKeyedStoreOptions) => + createPluginStateSyncKeyedStoreForTests("voice-call", options), + }, + }); +} + +describe("voice-call call record store", () => { + beforeEach(() => { + resetPluginStateStoreForTests(); + installStateRuntime(); + }); + + afterEach(() => { + vi.useRealTimers(); + clearVoiceCallStateRuntime(); + resetPluginStateStoreForTests(); + }); + + it("migrates legacy JSONL records into SQLite-backed plugin state", async () => { + const storePath = createTestStorePath(); + const call = CallRecordSchema.parse( + makePersistedCall({ callId: "call-legacy", processedEventIds: ["evt-1"] }), + ); + writeCallsToStore(storePath, [call]); + + const restored = loadActiveCallsFromStore(storePath); + expect(restored.activeCalls.get("call-legacy")?.providerCallId).toBe(call.providerCallId); + expect(restored.processedEventIds.has("evt-1")).toBe(true); + expect(fs.existsSync(path.join(storePath, "calls.jsonl"))).toBe(false); + expect(fs.existsSync(path.join(storePath, "state", "openclaw.sqlite"))).toBe(true); + + const history = await getCallHistoryFromStore(storePath); + expect(history).toHaveLength(1); + expect(history[0]?.callId).toBe("call-legacy"); + }); + + it("persists new call snapshots without recreating the JSONL log", async () => { + const storePath = createTestStorePath(); + const call = CallRecordSchema.parse( + makePersistedCall({ callId: "call-sqlite", transcript: [] }), + ); + + persistCallRecord(storePath, call); + await flushPendingCallRecordWritesForTest(); + + expect(fs.existsSync(path.join(storePath, "calls.jsonl"))).toBe(false); + const restored = loadActiveCallsFromStore(storePath); + expect(restored.activeCalls.get("call-sqlite")?.providerCallId).toBe(call.providerCallId); + }); + + it("imports fallback JSONL writes created after the migration marker", async () => { + const storePath = createTestStorePath(); + const sqliteCall = CallRecordSchema.parse(makePersistedCall({ callId: "call-sqlite" })); + const fallbackCall = CallRecordSchema.parse(makePersistedCall({ callId: "call-fallback" })); + + persistCallRecord(storePath, sqliteCall); + writeCallsToStore(storePath, [fallbackCall]); + + const restored = loadActiveCallsFromStore(storePath); + expect(restored.activeCalls.has("call-sqlite")).toBe(true); + expect(restored.activeCalls.get("call-fallback")?.providerCallId).toBe( + fallbackCall.providerCallId, + ); + expect(fs.existsSync(path.join(storePath, "calls.jsonl"))).toBe(false); + }); + + it("reads the JSONL fallback when SQLite state cannot open", () => { + const storePath = createTestStorePath(); + const call = CallRecordSchema.parse(makePersistedCall({ callId: "call-jsonl" })); + writeCallsToStore(storePath, [call]); + setVoiceCallStateRuntime({ + state: { + resolveStateDir: () => "", + openKeyedStore: (() => { + throw new Error("openKeyedStore is not used by voice-call store tests"); + }) as never, + openSyncKeyedStore: (() => { + throw new Error("sqlite unavailable"); + }) as never, + }, + }); + + const restored = loadActiveCallsFromStore(storePath); + expect(restored.activeCalls.get("call-jsonl")?.providerCallId).toBe(call.providerCallId); + }); + + it("keeps oversized fallback records readable when they exceed SQLite chunk budget", async () => { + const storePath = createTestStorePath(); + const call = CallRecordSchema.parse( + makePersistedCall({ + callId: "call-large", + transcript: [ + { + timestamp: Date.now(), + speaker: "user", + text: "x".repeat(2 * 1024 * 1024), + isFinal: true, + }, + ], + }), + ); + + persistCallRecord(storePath, call); + await flushPendingCallRecordWritesForTest(); + + const restored = loadActiveCallsFromStore(storePath); + expect(restored.activeCalls.get("call-large")?.providerCallId).toBe(call.providerCallId); + expect(fs.existsSync(path.join(storePath, "calls.jsonl"))).toBe(true); + }); + + it("does not let an older fallback record override a newer SQLite snapshot", async () => { + const storePath = createTestStorePath(); + const olderFallback = CallRecordSchema.parse( + makePersistedCall({ + callId: "call-mixed", + state: "answered", + transcript: [ + { + timestamp: Date.now(), + speaker: "user", + text: "x".repeat(2 * 1024 * 1024), + isFinal: true, + }, + ], + }), + ); + const newerSqlite = CallRecordSchema.parse( + makePersistedCall({ + callId: "call-mixed", + state: "completed", + endedAt: Date.now(), + endReason: "completed", + }), + ); + + persistCallRecord(storePath, olderFallback); + await flushPendingCallRecordWritesForTest(); + persistCallRecord(storePath, newerSqlite); + + const restored = loadActiveCallsFromStore(storePath); + expect(restored.activeCalls.has("call-mixed")).toBe(false); + }); + + it("replays same-millisecond snapshots in write order", () => { + vi.useFakeTimers({ now: new Date("2026-05-31T10:00:00.000Z") }); + const storePath = createTestStorePath(); + const first = CallRecordSchema.parse( + makePersistedCall({ callId: "call-order", state: "ringing" }), + ); + const second = CallRecordSchema.parse( + makePersistedCall({ callId: "call-order", state: "answered" }), + ); + + persistCallRecord(storePath, first); + persistCallRecord(storePath, second); + + const restored = loadActiveCallsFromStore(storePath); + expect(restored.activeCalls.get("call-order")?.state).toBe("answered"); + }); +}); diff --git a/extensions/voice-call/src/manager/store.ts b/extensions/voice-call/src/manager/store.ts index 004325f5a47..3d569a0b2c6 100644 --- a/extensions/voice-call/src/manager/store.ts +++ b/extensions/voice-call/src/manager/store.ts @@ -1,16 +1,341 @@ +import { createHash, randomUUID } from "node:crypto"; +import fs from "node:fs"; import path from "node:path"; +import type { PluginStateSyncKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime"; import { appendRegularFile, privateFileStore, privateFileStoreSync, } from "openclaw/plugin-sdk/security-runtime"; +import { getOptionalVoiceCallStateRuntime } from "../runtime-state.js"; import { CallRecordSchema, TerminalStates, type CallId, type CallRecord } from "../types.js"; const pendingPersistWrites = new Set>(); +const CALL_RECORD_EVENTS_NAMESPACE = "call-record-events"; +const CALL_RECORD_EVENT_CHUNKS_NAMESPACE = "call-record-event-chunks"; +const CALL_RECORD_MIGRATIONS_NAMESPACE = "call-record-migrations"; +const CALL_RECORD_JSONL_MIGRATION_KEY = "calls-jsonl-v1"; +const MAX_CALL_RECORD_EVENTS = 1000; +const CALL_RECORD_EVENT_META_MAX_ENTRIES = MAX_CALL_RECORD_EVENTS + 100; +const MAX_CHUNKS_PER_CALL_RECORD_EVENT = 48; +const CALL_RECORD_CHUNK_MAX_ENTRIES = + MAX_CALL_RECORD_EVENTS * MAX_CHUNKS_PER_CALL_RECORD_EVENT + MAX_CHUNKS_PER_CALL_RECORD_EVENT; +const RAW_CHUNK_BYTES = 36 * 1024; +let callRecordEventSequence = 0; + +type CallRecordEventMeta = { + chunkCount: number; + byteLength: number; + persistedAt?: number; + sequence?: number; +}; + +type CallRecordEventChunk = { + index: number; + dataBase64: string; +}; + +type CallRecordMigrationMarker = { + importedAt: string; +}; + +type PersistedCallRecord = { + call: CallRecord; + persistedAt: number; + sequence: number; + orderKey: string; +}; + +type CallRecordStateStores = { + events: PluginStateSyncKeyedStore; + chunks: PluginStateSyncKeyedStore; + migrations: PluginStateSyncKeyedStore; +}; + +function resolveCallLogPath(storePath: string): string { + return path.join(storePath, "calls.jsonl"); +} + +function resolvePluginStateEnv(storePath: string): NodeJS.ProcessEnv { + return { ...process.env, OPENCLAW_STATE_DIR: storePath }; +} + +function createCallRecordStateStores(storePath: string): CallRecordStateStores | null { + const runtime = getOptionalVoiceCallStateRuntime(); + if (!runtime) { + return null; + } + const env = resolvePluginStateEnv(storePath); + return { + events: runtime.state.openSyncKeyedStore({ + namespace: CALL_RECORD_EVENTS_NAMESPACE, + maxEntries: CALL_RECORD_EVENT_META_MAX_ENTRIES, + env, + }), + chunks: runtime.state.openSyncKeyedStore({ + namespace: CALL_RECORD_EVENT_CHUNKS_NAMESPACE, + maxEntries: CALL_RECORD_CHUNK_MAX_ENTRIES, + env, + }), + migrations: runtime.state.openSyncKeyedStore({ + namespace: CALL_RECORD_MIGRATIONS_NAMESPACE, + maxEntries: 100, + env, + }), + }; +} + +function tryCreateCallRecordStateStores(storePath: string): CallRecordStateStores | null { + try { + return createCallRecordStateStores(storePath); + } catch (err) { + console.error("[voice-call] Failed to open SQLite call record store:", err); + return null; + } +} + +function buildChunkKey(eventKey: string, index: number): string { + return `${eventKey}:chunk:${String(index).padStart(4, "0")}`; +} + +function buildJsonlEventKey(line: string, index: number): string { + return `jsonl:${String(index).padStart(8, "0")}:${createHash("sha256").update(line).digest("hex")}`; +} + +function nextCallRecordOrder(): { persistedAt: number; sequence: number } { + const sequence = callRecordEventSequence; + callRecordEventSequence = (callRecordEventSequence + 1) % 1_000_000; + return { persistedAt: Date.now(), sequence }; +} + +function buildNewEventKey(order: { persistedAt: number; sequence: number }): string { + return `event:${order.persistedAt.toString(36)}:${String(order.sequence).padStart(6, "0")}:${randomUUID()}`; +} + +function parseEventKeySequence(key: string): number { + const match = /^event:[^:]+:(\d+):/.exec(key); + return match ? Number.parseInt(match[1], 10) : 0; +} + +function parseCallRecordLine(line: string, sequence = 0): PersistedCallRecord | null { + if (!line.trim()) { + return null; + } + try { + const parsed = JSON.parse(line); + if (parsed && typeof parsed === "object" && (parsed as { version?: unknown }).version === 2) { + const envelope = parsed as { + call?: unknown; + persistedAt?: unknown; + sequence?: unknown; + }; + const call = CallRecordSchema.parse(envelope.call); + return { + call, + persistedAt: + typeof envelope.persistedAt === "number" && Number.isFinite(envelope.persistedAt) + ? envelope.persistedAt + : 0, + sequence: + typeof envelope.sequence === "number" && Number.isFinite(envelope.sequence) + ? envelope.sequence + : sequence, + orderKey: "", + }; + } + return { + call: CallRecordSchema.parse(parsed), + persistedAt: 0, + sequence, + orderKey: "", + }; + } catch { + return null; + } +} + +function registerCallRecordEvent( + stores: CallRecordStateStores, + eventKey: string, + call: CallRecord, + order?: { persistedAt: number; sequence: number }, +): void { + const serialized = JSON.stringify(call); + const buffer = Buffer.from(serialized, "utf8"); + const chunkCount = Math.max(1, Math.ceil(buffer.byteLength / RAW_CHUNK_BYTES)); + if (chunkCount > MAX_CHUNKS_PER_CALL_RECORD_EVENT) { + throw new Error( + `voice-call record exceeds SQLite chunk limit (${chunkCount}/${MAX_CHUNKS_PER_CALL_RECORD_EVENT})`, + ); + } + for (let index = 0; index < chunkCount; index += 1) { + const chunk = buffer.subarray(index * RAW_CHUNK_BYTES, (index + 1) * RAW_CHUNK_BYTES); + stores.chunks.register(buildChunkKey(eventKey, index), { + index, + dataBase64: chunk.toString("base64"), + }); + } + stores.events.register(eventKey, { + chunkCount, + byteLength: buffer.byteLength, + persistedAt: order?.persistedAt, + sequence: order?.sequence, + }); + pruneCallRecordEvents(stores); +} + +function deleteCallRecordEventRows(stores: CallRecordStateStores, eventKey: string): void { + const meta = stores.events.lookup(eventKey); + stores.events.delete(eventKey); + if (!meta) { + return; + } + for (let index = 0; index < meta.chunkCount; index += 1) { + stores.chunks.delete(buildChunkKey(eventKey, index)); + } +} + +function pruneCallRecordEvents(stores: CallRecordStateStores): void { + const rows = stores.events.entries(); + if (rows.length <= MAX_CALL_RECORD_EVENTS) { + return; + } + const sorted = rows.toSorted((a, b) => a.createdAt - b.createdAt || a.key.localeCompare(b.key)); + for (const row of sorted.slice(0, rows.length - MAX_CALL_RECORD_EVENTS)) { + deleteCallRecordEventRows(stores, row.key); + } +} + +function registerCallRecordEventIfAbsent( + stores: CallRecordStateStores, + eventKey: string, + record: PersistedCallRecord, +): void { + if (!stores.events.lookup(eventKey)) { + registerCallRecordEvent(stores, eventKey, record.call, { + persistedAt: record.persistedAt, + sequence: record.sequence, + }); + } +} + +function readCallRecordEvent(stores: CallRecordStateStores, eventKey: string): CallRecord | null { + const meta = stores.events.lookup(eventKey); + if (!meta) { + return null; + } + const chunks: Buffer[] = []; + for (let index = 0; index < meta.chunkCount; index += 1) { + const chunk = stores.chunks.lookup(buildChunkKey(eventKey, index)); + if (!chunk || chunk.index !== index) { + return null; + } + chunks.push(Buffer.from(chunk.dataBase64, "base64")); + } + const serialized = Buffer.concat(chunks, meta.byteLength).toString("utf8"); + return parseCallRecordLine(serialized)?.call ?? null; +} + +function ensureLegacyCallLogImported( + storePath: string, + stores: CallRecordStateStores, +): PersistedCallRecord[] { + const imported = stores.migrations.lookup(CALL_RECORD_JSONL_MIGRATION_KEY) !== undefined; + const logPath = resolveCallLogPath(storePath); + const content = privateFileStoreSync(storePath).readTextIfExists(path.basename(logPath)); + if (content === null) { + if (!imported) { + stores.migrations.register(CALL_RECORD_JSONL_MIGRATION_KEY, { + importedAt: new Date().toISOString(), + }); + } + return []; + } + + const fallbackCalls: PersistedCallRecord[] = []; + { + let index = 0; + let importFailed = false; + for (const line of content.split("\n")) { + const parsed = parseCallRecordLine(line, index); + if (!parsed) { + index += 1; + continue; + } + // Fallback JSONL writes can appear after the migration marker if SQLite + // persistence had a transient failure. Stable keys make the importer + // idempotent if the legacy file cannot be removed. + try { + registerCallRecordEventIfAbsent(stores, buildJsonlEventKey(line, index), parsed); + } catch (err) { + importFailed = true; + fallbackCalls.push({ + ...parsed, + orderKey: `jsonl:${String(index).padStart(8, "0")}`, + }); + console.error("[voice-call] Failed to import persisted call record:", err); + } + index += 1; + } + if (!importFailed) { + try { + fs.rmSync(logPath, { force: true }); + } catch { + // Import already completed; leave an unreadable legacy log in place. + } + } + } + if (!imported) { + stores.migrations.register(CALL_RECORD_JSONL_MIGRATION_KEY, { + importedAt: new Date().toISOString(), + }); + } + return fallbackCalls; +} + +function readCallRecordEvents(storePath: string, stores: CallRecordStateStores): CallRecord[] { + const fallbackCalls = ensureLegacyCallLogImported(storePath, stores); + const sqliteCalls: PersistedCallRecord[] = stores.events + .entries() + .toSorted((a, b) => a.createdAt - b.createdAt || a.key.localeCompare(b.key)) + .map((entry) => { + const call = readCallRecordEvent(stores, entry.key); + return call + ? { + call, + persistedAt: entry.value.persistedAt ?? entry.createdAt, + sequence: entry.value.sequence ?? parseEventKeySequence(entry.key), + orderKey: entry.key, + } + : null; + }) + .filter((entry): entry is PersistedCallRecord => entry !== null); + return [...sqliteCalls, ...fallbackCalls] + .toSorted( + (a, b) => + a.persistedAt - b.persistedAt || + a.sequence - b.sequence || + a.orderKey.localeCompare(b.orderKey), + ) + .map((entry) => entry.call); +} export function persistCallRecord(storePath: string, call: CallRecord): void { - const logPath = path.join(storePath, "calls.jsonl"); - const line = `${JSON.stringify(call)}\n`; + const stores = tryCreateCallRecordStateStores(storePath); + if (stores) { + try { + void ensureLegacyCallLogImported(storePath, stores); + const order = nextCallRecordOrder(); + registerCallRecordEvent(stores, buildNewEventKey(order), call, order); + return; + } catch (err) { + console.error("[voice-call] Failed to persist call record:", err); + } + } + + const logPath = resolveCallLogPath(storePath); + const order = nextCallRecordOrder(); + const line = `${JSON.stringify({ version: 2, ...order, call })}\n`; // Fire-and-forget async write to avoid blocking event loop. const write = appendRegularFile({ filePath: logPath, @@ -36,9 +361,17 @@ export function loadActiveCallsFromStore(storePath: string): { processedEventIds: Set; rejectedProviderCallIds: Set; } { - const logPath = path.join(storePath, "calls.jsonl"); - const content = privateFileStoreSync(storePath).readTextIfExists(path.basename(logPath)); - if (content === null) { + const stores = tryCreateCallRecordStateStores(storePath); + let calls: CallRecord[]; + try { + calls = stores + ? readCallRecordEvents(storePath, stores) + : readCallRecordsFromLegacyLog(storePath); + } catch (err) { + console.error("[voice-call] Failed to read SQLite call records:", err); + calls = readCallRecordsFromLegacyLog(storePath); + } + if (calls.length === 0) { return { activeCalls: new Map(), providerCallIdMap: new Map(), @@ -46,19 +379,9 @@ export function loadActiveCallsFromStore(storePath: string): { rejectedProviderCallIds: new Set(), }; } - const lines = content.split("\n"); - const callMap = new Map(); - for (const line of lines) { - if (!line.trim()) { - continue; - } - try { - const call = CallRecordSchema.parse(JSON.parse(line)); - callMap.set(call.callId, call); - } catch { - // Skip invalid lines. - } + for (const call of calls) { + callMap.set(call.callId, call); } const activeCalls = new Map(); @@ -86,7 +409,18 @@ export async function getCallHistoryFromStore( storePath: string, limit = 50, ): Promise { - const logPath = path.join(storePath, "calls.jsonl"); + if (limit <= 0) { + return []; + } + const stores = tryCreateCallRecordStateStores(storePath); + if (stores) { + try { + return readCallRecordEvents(storePath, stores).slice(-limit); + } catch (err) { + console.error("[voice-call] Failed to read SQLite call history:", err); + } + } + const logPath = resolveCallLogPath(storePath); const content = await privateFileStore(storePath).readTextIfExists(path.basename(logPath)); if (content === null) { return []; @@ -94,14 +428,24 @@ export async function getCallHistoryFromStore( const lines = content.trim().split("\n").filter(Boolean); const calls: CallRecord[] = []; - for (const line of lines.slice(-limit)) { - try { - const parsed = CallRecordSchema.parse(JSON.parse(line)); - calls.push(parsed); - } catch { - // Skip invalid lines. + for (const [index, line] of lines.slice(-limit).entries()) { + const parsed = parseCallRecordLine(line, index); + if (parsed) { + calls.push(parsed.call); } } return calls; } + +function readCallRecordsFromLegacyLog(storePath: string): CallRecord[] { + const logPath = resolveCallLogPath(storePath); + const content = privateFileStoreSync(storePath).readTextIfExists(path.basename(logPath)); + if (content === null) { + return []; + } + return content + .split("\n") + .map((line, index) => parseCallRecordLine(line, index)?.call ?? null) + .filter((call): call is CallRecord => call !== null); +} diff --git a/extensions/voice-call/src/runtime-state.ts b/extensions/voice-call/src/runtime-state.ts new file mode 100644 index 00000000000..1d5a8e25fbc --- /dev/null +++ b/extensions/voice-call/src/runtime-state.ts @@ -0,0 +1,14 @@ +import { createPluginRuntimeStore, type PluginRuntime } from "openclaw/plugin-sdk/runtime-store"; + +export type VoiceCallStateRuntime = Pick; + +const { + setRuntime: setVoiceCallStateRuntime, + clearRuntime: clearVoiceCallStateRuntime, + tryGetRuntime: getOptionalVoiceCallStateRuntime, +} = createPluginRuntimeStore({ + pluginId: "voice-call-state", + errorMessage: "Voice Call state runtime not initialized", +}); + +export { clearVoiceCallStateRuntime, getOptionalVoiceCallStateRuntime, setVoiceCallStateRuntime }; diff --git a/extensions/voice-call/src/runtime.ts b/extensions/voice-call/src/runtime.ts index 26d86144bb7..8ae8b32c82e 100644 --- a/extensions/voice-call/src/runtime.ts +++ b/extensions/voice-call/src/runtime.ts @@ -24,6 +24,7 @@ import type { TwilioProvider } from "./providers/twilio.js"; import { buildRealtimeVoiceInstructions } from "./realtime-agent-context.js"; import { resolveRealtimeFastContextConsult } from "./realtime-fast-context.js"; import { resolveVoiceResponseModel } from "./response-model.js"; +import { setVoiceCallStateRuntime, type VoiceCallStateRuntime } from "./runtime-state.js"; import type { TelephonyTtsRuntime } from "./telephony-tts.js"; import { createTelephonyTtsProvider } from "./telephony-tts.js"; import { startTunnel, type TunnelResult } from "./tunnel.js"; @@ -265,10 +266,19 @@ export async function createVoiceCallRuntime(params: { coreConfig: CoreConfig; fullConfig?: OpenClawConfig; agentRuntime: CoreAgentDeps; + stateRuntime?: VoiceCallStateRuntime["state"]; ttsRuntime?: TelephonyTtsRuntime; logger?: Logger; }): Promise { - const { config: rawConfig, coreConfig, fullConfig, agentRuntime, ttsRuntime, logger } = params; + const { + config: rawConfig, + coreConfig, + fullConfig, + agentRuntime, + stateRuntime, + ttsRuntime, + logger, + } = params; const log = logger ?? { info: console.log, warn: console.warn, @@ -295,6 +305,9 @@ export async function createVoiceCallRuntime(params: { } const provider = await resolveProvider(config); + if (stateRuntime) { + setVoiceCallStateRuntime({ state: stateRuntime }); + } const manager = new CallManager(config); const realtimeProvider = config.realtime.enabled ? await resolveRealtimeProvider({