From 3ff86f335020951ce699a8a1618708103b67bb57 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 31 May 2026 19:43:03 +0100 Subject: [PATCH] refactor: migrate voice-call call logs through doctor (#88731) --- .../voice-call/doctor-contract-api.test.ts | 212 ++++++++++++ extensions/voice-call/doctor-contract-api.ts | 313 ++++++++++++++++++ .../voice-call/src/manager.restore.test.ts | 28 +- .../voice-call/src/manager.test-harness.ts | 29 ++ .../voice-call/src/manager/store.test.ts | 88 ++--- extensions/voice-call/src/manager/store.ts | 247 +++++--------- .../src/webhook.hangup-once.lifecycle.test.ts | 29 +- 7 files changed, 710 insertions(+), 236 deletions(-) create mode 100644 extensions/voice-call/doctor-contract-api.test.ts create mode 100644 extensions/voice-call/doctor-contract-api.ts diff --git a/extensions/voice-call/doctor-contract-api.test.ts b/extensions/voice-call/doctor-contract-api.test.ts new file mode 100644 index 00000000000..10ab8c380f8 --- /dev/null +++ b/extensions/voice-call/doctor-contract-api.test.ts @@ -0,0 +1,212 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { + createPluginStateKeyedStoreForTests, + createPluginStateSyncKeyedStoreForTests, + resetPluginStateStoreForTests, +} from "openclaw/plugin-sdk/plugin-state-test-runtime"; +import type { + OpenKeyedStoreOptions, + PluginDoctorStateMigrationContext, +} from "openclaw/plugin-sdk/runtime-doctor"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { stateMigrations } from "./doctor-contract-api.js"; +import { + createTestStorePath, + makePersistedCall, + writeLegacyCallsJsonl, +} from "./src/manager.test-harness.js"; +import { getCallHistoryFromStore, loadActiveCallsFromStore } from "./src/manager/store.js"; +import { clearVoiceCallStateRuntime, setVoiceCallStateRuntime } from "./src/runtime-state.js"; + +function createDoctorContext(env: NodeJS.ProcessEnv): PluginDoctorStateMigrationContext { + return { + openPluginStateKeyedStore(options: OpenKeyedStoreOptions) { + return createPluginStateKeyedStoreForTests("voice-call", { + ...options, + env: options.env ?? env, + }); + }, + }; +} + +function installStateRuntime(): void { + setVoiceCallStateRuntime({ + state: { + resolveStateDir: () => "", + openKeyedStore: (() => { + throw new Error("openKeyedStore is not used by voice-call doctor tests"); + }) as never, + openSyncKeyedStore: (options: OpenKeyedStoreOptions) => + createPluginStateSyncKeyedStoreForTests("voice-call", options), + }, + }); +} + +describe("voice-call doctor state migration", () => { + let stateDir = ""; + let storePath = ""; + let env: NodeJS.ProcessEnv; + + beforeEach(async () => { + resetPluginStateStoreForTests(); + stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-voice-call-doctor-")); + storePath = createTestStorePath(); + env = { ...process.env, HOME: stateDir, OPENCLAW_STATE_DIR: stateDir }; + installStateRuntime(); + }); + + afterEach(async () => { + clearVoiceCallStateRuntime(); + resetPluginStateStoreForTests(); + await fs.rm(stateDir, { recursive: true, force: true }); + await fs.rm(storePath, { recursive: true, force: true }); + }); + + it("imports legacy calls.jsonl into plugin state", async () => { + const sourcePath = path.join(storePath, "calls.jsonl"); + const call = makePersistedCall({ + callId: "call-doctor", + providerCallId: "provider-doctor", + processedEventIds: ["evt-doctor"], + }); + writeLegacyCallsJsonl(storePath, [ + { + version: 2, + persistedAt: 1000, + sequence: 0, + call, + }, + ]); + + const migration = stateMigrations[0]; + const config = { + plugins: { + entries: { + "@openclaw/voice-call": { + config: { store: storePath }, + }, + }, + }, + }; + await expect( + migration.detectLegacyState({ + config, + env, + stateDir, + oauthDir: path.join(stateDir, "oauth"), + context: createDoctorContext(env), + }), + ).resolves.toMatchObject({ + preview: [expect.stringContaining("1 record")], + }); + + const result = await migration.migrateLegacyState({ + config, + env, + stateDir, + oauthDir: path.join(stateDir, "oauth"), + context: createDoctorContext(env), + }); + + expect(result.warnings).toEqual([]); + expect(result.changes).toEqual([ + expect.stringContaining("Migrated 1 Voice Call call-log record"), + expect.stringContaining("Archived Voice Call call-log legacy source"), + ]); + await expect(fs.access(sourcePath)).rejects.toThrow(); + await expect(fs.access(`${sourcePath}.migrated`)).resolves.toBeUndefined(); + + const restored = loadActiveCallsFromStore(storePath); + expect(restored.activeCalls.get("call-doctor")?.providerCallId).toBe("provider-doctor"); + expect(restored.processedEventIds.has("evt-doctor")).toBe(true); + + const history = await getCallHistoryFromStore(storePath); + expect(history).toHaveLength(1); + expect(history[0]?.callId).toBe("call-doctor"); + }); + + it("imports the newest legacy call records when the JSONL log is over capacity", async () => { + const calls = Array.from({ length: 1002 }, (_, index) => + makePersistedCall({ + callId: `call-${index}`, + providerCallId: `provider-${index}`, + }), + ); + writeLegacyCallsJsonl(storePath, calls); + + const config = { + plugins: { + entries: { + "@openclaw/voice-call": { + config: { store: storePath }, + }, + }, + }, + }; + const result = await stateMigrations[0].migrateLegacyState({ + config, + env, + stateDir, + oauthDir: path.join(stateDir, "oauth"), + context: createDoctorContext(env), + }); + + expect(result.warnings).toEqual([ + expect.stringContaining("Pruned 2 older Voice Call call-log records"), + ]); + expect(result.changes).toEqual([ + expect.stringContaining("Migrated 1000 Voice Call call-log records"), + expect.stringContaining("Archived Voice Call call-log legacy source"), + ]); + + const restored = loadActiveCallsFromStore(storePath); + expect(restored.activeCalls.has("call-0")).toBe(false); + expect(restored.activeCalls.has("call-1")).toBe(false); + expect(restored.activeCalls.get("call-1001")?.providerCallId).toBe("provider-1001"); + + const history = await getCallHistoryFromStore(storePath, 1000); + expect(history).toHaveLength(1000); + expect(history[0]?.callId).toBe("call-2"); + expect(history.at(-1)?.callId).toBe("call-1001"); + }); + + it("leaves malformed mixed legacy logs in place after importing valid records", async () => { + const sourcePath = path.join(storePath, "calls.jsonl"); + const call = makePersistedCall({ + callId: "call-valid", + providerCallId: "provider-valid", + }); + await fs.mkdir(path.dirname(sourcePath), { recursive: true }); + await fs.writeFile(sourcePath, `${JSON.stringify(call)}\n{not json}\n`); + + const config = { + plugins: { + entries: { + "@openclaw/voice-call": { + config: { store: storePath }, + }, + }, + }, + }; + const result = await stateMigrations[0].migrateLegacyState({ + config, + env, + stateDir, + oauthDir: path.join(stateDir, "oauth"), + context: createDoctorContext(env), + }); + + expect(result.changes).toEqual([ + expect.stringContaining("Migrated 1 Voice Call call-log record"), + ]); + expect(result.warnings).toEqual([ + "Skipped malformed Voice Call call-log line 2", + "Left Voice Call call-log source in place because migration was incomplete", + ]); + await expect(fs.access(sourcePath)).resolves.toBeUndefined(); + await expect(fs.access(`${sourcePath}.migrated`)).rejects.toThrow(); + expect(loadActiveCallsFromStore(storePath).activeCalls.has("call-valid")).toBe(true); + }); +}); diff --git a/extensions/voice-call/doctor-contract-api.ts b/extensions/voice-call/doctor-contract-api.ts new file mode 100644 index 00000000000..6086d1a167e --- /dev/null +++ b/extensions/voice-call/doctor-contract-api.ts @@ -0,0 +1,313 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import type { + PluginDoctorStateMigration, + PluginStateKeyedStore, +} from "openclaw/plugin-sdk/runtime-doctor"; +import { + buildVoiceCallLegacyJsonlEventKey, + CALL_RECORD_CHUNK_MAX_ENTRIES, + CALL_RECORD_EVENT_CHUNKS_NAMESPACE, + CALL_RECORD_EVENT_META_MAX_ENTRIES, + CALL_RECORD_EVENTS_NAMESPACE, + MAX_CALL_RECORD_EVENTS, + MAX_CHUNKS_PER_CALL_RECORD_EVENT, + prepareVoiceCallRecordForStorage, + parseVoiceCallRecordLine, + RAW_CALL_RECORD_CHUNK_BYTES, + resolveVoiceCallLegacyCallLogPath, +} from "./src/manager/store.js"; +import type { CallRecord } from "./src/types.js"; + +type CallRecordEventMeta = { + chunkCount: number; + byteLength: number; + persistedAt?: number; + sequence?: number; +}; + +type CallRecordEventChunk = { + index: number; + dataBase64: string; +}; + +type PreparedLegacyCallRecord = { + eventKey: string; + lineNumber: number; + chunks: CallRecordEventChunk[]; + meta: CallRecordEventMeta; +}; + +function resolveHome(env: NodeJS.ProcessEnv): string { + return env.HOME?.trim() || os.homedir(); +} + +function resolveUserPath(input: string, env: NodeJS.ProcessEnv): string { + const trimmed = input.trim(); + if (!trimmed) { + return trimmed; + } + if (trimmed.startsWith("~")) { + return path.resolve(trimmed.replace(/^~(?=$|[\\/])/, resolveHome(env))); + } + return path.resolve(trimmed); +} + +function getVoiceCallConfigStore(config: PluginDoctorStateMigrationParams["config"]): string { + for (const pluginId of ["voice-call", "@openclaw/voice-call"]) { + const rawConfig = config.plugins?.entries?.[pluginId]?.config; + if (!rawConfig || typeof rawConfig !== "object" || Array.isArray(rawConfig)) { + continue; + } + const store = (rawConfig as { store?: unknown }).store; + if (typeof store === "string" && store.trim()) { + return store.trim(); + } + } + return ""; +} + +type PluginDoctorStateMigrationParams = Parameters< + PluginDoctorStateMigration["detectLegacyState"] +>[0]; + +function resolveVoiceCallStorePath(params: { + config: PluginDoctorStateMigrationParams["config"]; + env: NodeJS.ProcessEnv; +}): string { + const configuredStore = getVoiceCallConfigStore(params.config); + if (configuredStore) { + return resolveUserPath(configuredStore, params.env); + } + return path.join(resolveHome(params.env), ".openclaw", "voice-calls"); +} + +async function fileExists(filePath: string): Promise { + try { + const stat = await fs.stat(filePath); + return stat.isFile(); + } catch { + return false; + } +} + +function buildChunkKey(eventKey: string, index: number): string { + return `${eventKey}:chunk:${String(index).padStart(4, "0")}`; +} + +function prepareChunks(call: CallRecord): { + chunks: CallRecordEventChunk[]; + meta: CallRecordEventMeta; +} { + const serialized = JSON.stringify(prepareVoiceCallRecordForStorage(call)); + const buffer = Buffer.from(serialized, "utf8"); + const chunkCount = Math.max(1, Math.ceil(buffer.byteLength / RAW_CALL_RECORD_CHUNK_BYTES)); + if (chunkCount > MAX_CHUNKS_PER_CALL_RECORD_EVENT) { + throw new Error( + `voice-call record exceeds SQLite chunk limit (${chunkCount}/${MAX_CHUNKS_PER_CALL_RECORD_EVENT})`, + ); + } + const chunks: CallRecordEventChunk[] = []; + for (let index = 0; index < chunkCount; index += 1) { + const chunk = buffer.subarray( + index * RAW_CALL_RECORD_CHUNK_BYTES, + (index + 1) * RAW_CALL_RECORD_CHUNK_BYTES, + ); + chunks.push({ index, dataBase64: chunk.toString("base64") }); + } + return { + chunks, + meta: { + chunkCount, + byteLength: buffer.byteLength, + }, + }; +} + +async function readLegacyCallRecords(filePath: string): Promise<{ + entries: PreparedLegacyCallRecord[]; + warnings: string[]; +}> { + let content = ""; + try { + content = await fs.readFile(filePath, "utf8"); + } catch { + return { entries: [], warnings: [] }; + } + const entries: PreparedLegacyCallRecord[] = []; + const warnings: string[] = []; + let index = 0; + for (const line of content.split("\n")) { + const parsed = parseVoiceCallRecordLine(line, index); + if (!parsed) { + if (line.trim()) { + warnings.push(`Skipped malformed Voice Call call-log line ${index + 1}`); + } + index += 1; + continue; + } + try { + const prepared = prepareChunks(parsed.call); + entries.push({ + eventKey: buildVoiceCallLegacyJsonlEventKey(line, index), + lineNumber: index + 1, + chunks: prepared.chunks, + meta: { + ...prepared.meta, + persistedAt: parsed.persistedAt, + sequence: parsed.sequence, + }, + }); + } catch (err) { + warnings.push(`Skipped Voice Call call-log line ${index + 1}: ${String(err)}`); + } + index += 1; + } + return { entries, warnings }; +} + +async function archiveLegacySource(params: { + filePath: string; + changes: string[]; + warnings: string[]; +}): Promise { + const archivedPath = `${params.filePath}.migrated`; + if (await fileExists(archivedPath)) { + params.warnings.push( + `Left migrated Voice Call call-log source in place because ${archivedPath} already exists`, + ); + return; + } + try { + await fs.rename(params.filePath, archivedPath); + params.changes.push(`Archived Voice Call call-log legacy source -> ${archivedPath}`); + } catch (err) { + params.warnings.push(`Failed archiving Voice Call call-log legacy source: ${String(err)}`); + } +} + +async function selectEntriesForImport(params: { + entries: PreparedLegacyCallRecord[]; + eventStore: PluginStateKeyedStore; + chunkStore: PluginStateKeyedStore; + warnings: string[]; +}): Promise<{ existingEventKeys: Set; entries: PreparedLegacyCallRecord[] }> { + const existingEventKeys = new Set((await params.eventStore.entries()).map((entry) => entry.key)); + const missingEntries = params.entries.filter((entry) => !existingEventKeys.has(entry.eventKey)); + const existingChunks = await params.chunkStore.entries(); + let eventRoom = Math.max(0, MAX_CALL_RECORD_EVENTS - existingEventKeys.size); + let chunkRoom = Math.max(0, CALL_RECORD_CHUNK_MAX_ENTRIES - existingChunks.length); + const selected: PreparedLegacyCallRecord[] = []; + let pruned = 0; + for (const entry of missingEntries.toReversed()) { + if (eventRoom <= 0 || entry.chunks.length > chunkRoom) { + pruned++; + continue; + } + selected.push(entry); + eventRoom--; + chunkRoom -= entry.chunks.length; + } + if (pruned > 0) { + params.warnings.push( + `Pruned ${pruned} older Voice Call call-log ${pruned === 1 ? "record" : "records"} during migration because plugin state keeps the newest ${MAX_CALL_RECORD_EVENTS} records`, + ); + } + return { existingEventKeys, entries: selected.toReversed() }; +} + +async function importLegacyCallRecords(params: { + entries: PreparedLegacyCallRecord[]; + eventStore: PluginStateKeyedStore; + chunkStore: PluginStateKeyedStore; + warnings: string[]; +}): Promise { + const selected = await selectEntriesForImport(params); + let imported = 0; + for (const entry of selected.entries) { + if (selected.existingEventKeys.has(entry.eventKey)) { + continue; + } + try { + for (const chunk of entry.chunks) { + await params.chunkStore.register(buildChunkKey(entry.eventKey, chunk.index), chunk); + } + await params.eventStore.register(entry.eventKey, entry.meta); + selected.existingEventKeys.add(entry.eventKey); + imported++; + } catch (err) { + params.warnings.push( + `Failed migrating Voice Call call-log line ${entry.lineNumber}: ${String(err)}`, + ); + } + } + return imported; +} + +export const stateMigrations: PluginDoctorStateMigration[] = [ + { + id: "voice-call-calls-jsonl-to-plugin-state", + label: "Voice Call call log", + async detectLegacyState(params) { + const storePath = resolveVoiceCallStorePath(params); + const filePath = resolveVoiceCallLegacyCallLogPath(storePath); + const { entries } = await readLegacyCallRecords(filePath); + if (entries.length === 0) { + return null; + } + return { + preview: [ + `- Voice Call call log: ${entries.length} ${entries.length === 1 ? "record" : "records"} -> plugin state (${CALL_RECORD_EVENTS_NAMESPACE})`, + ], + }; + }, + async migrateLegacyState(params) { + const changes: string[] = []; + const warnings: string[] = []; + const storePath = resolveVoiceCallStorePath(params); + const filePath = resolveVoiceCallLegacyCallLogPath(storePath); + const { entries, warnings: readWarnings } = await readLegacyCallRecords(filePath); + warnings.push(...readWarnings); + if (entries.length === 0) { + return { changes, warnings }; + } + const env = { ...params.env, OPENCLAW_STATE_DIR: storePath }; + const eventStore = params.context.openPluginStateKeyedStore({ + namespace: CALL_RECORD_EVENTS_NAMESPACE, + maxEntries: CALL_RECORD_EVENT_META_MAX_ENTRIES, + env, + }); + const chunkStore = params.context.openPluginStateKeyedStore({ + namespace: CALL_RECORD_EVENT_CHUNKS_NAMESPACE, + maxEntries: CALL_RECORD_CHUNK_MAX_ENTRIES, + env, + }); + const imported = await importLegacyCallRecords({ + entries, + eventStore, + chunkStore, + warnings, + }); + if (imported > 0) { + changes.push( + `Migrated ${imported} Voice Call call-log ${imported === 1 ? "record" : "records"} -> plugin state`, + ); + } + if ( + warnings.some( + (warning) => + warning.startsWith("Failed migrating Voice Call") || + warning.startsWith("Skipped malformed Voice Call call-log line") || + warning.startsWith("Skipped Voice Call call-log line") || + warning.startsWith("Skipped Voice Call call-log migration"), + ) + ) { + warnings.push("Left Voice Call call-log source in place because migration was incomplete"); + return { changes, warnings }; + } + await archiveLegacySource({ filePath, changes, warnings }); + return { changes, warnings }; + }, + }, +]; diff --git a/extensions/voice-call/src/manager.restore.test.ts b/extensions/voice-call/src/manager.restore.test.ts index 6676b175de4..6f4a31787a6 100644 --- a/extensions/voice-call/src/manager.restore.test.ts +++ b/extensions/voice-call/src/manager.restore.test.ts @@ -1,4 +1,9 @@ -import { afterEach, describe, expect, it, vi } from "vitest"; +import type { OpenKeyedStoreOptions } from "openclaw/plugin-sdk/plugin-state-runtime"; +import { + createPluginStateSyncKeyedStoreForTests, + resetPluginStateStoreForTests, +} from "openclaw/plugin-sdk/plugin-state-test-runtime"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { VoiceCallConfigSchema } from "./config.js"; import { CallManager } from "./manager.js"; import { @@ -8,6 +13,20 @@ import { writeCallsToStore, } from "./manager.test-harness.js"; import { flushPendingCallRecordWritesForTest, loadActiveCallsFromStore } from "./manager/store.js"; +import { clearVoiceCallStateRuntime, setVoiceCallStateRuntime } from "./runtime-state.js"; + +function installStateRuntime(): void { + setVoiceCallStateRuntime({ + state: { + resolveStateDir: () => "", + openKeyedStore: (() => { + throw new Error("openKeyedStore is not used by voice-call restore tests"); + }) as never, + openSyncKeyedStore: (options: OpenKeyedStoreOptions) => + createPluginStateSyncKeyedStoreForTests("voice-call", options), + }, + }); +} function requireSingleActiveCall(manager: CallManager) { const activeCalls = manager.getActiveCalls(); @@ -32,9 +51,16 @@ function requireSingleHangupCall(provider: FakeProvider) { } describe("CallManager verification on restore", () => { + beforeEach(() => { + resetPluginStateStoreForTests(); + installStateRuntime(); + }); + afterEach(() => { vi.useRealTimers(); vi.restoreAllMocks(); + clearVoiceCallStateRuntime(); + resetPluginStateStoreForTests(); }); async function initializeManager(params?: { diff --git a/extensions/voice-call/src/manager.test-harness.ts b/extensions/voice-call/src/manager.test-harness.ts index c992b789506..948956ee934 100644 --- a/extensions/voice-call/src/manager.test-harness.ts +++ b/extensions/voice-call/src/manager.test-harness.ts @@ -1,9 +1,14 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; +import type { OpenKeyedStoreOptions } from "openclaw/plugin-sdk/plugin-state-runtime"; +import { createPluginStateSyncKeyedStoreForTests } from "openclaw/plugin-sdk/plugin-state-test-runtime"; import { VoiceCallConfigSchema } from "./config.js"; import { CallManager } from "./manager.js"; +import { persistCallRecord } from "./manager/store.js"; import type { VoiceCallProvider } from "./providers/base.js"; +import { getOptionalVoiceCallStateRuntime, setVoiceCallStateRuntime } from "./runtime-state.js"; +import { CallRecordSchema } from "./types.js"; import type { GetCallStatusInput, GetCallStatusResult, @@ -72,6 +77,22 @@ export function createTestStorePath(): string { return fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-voice-call-test-")); } +export function installVoiceCallStateRuntimeForTests(): void { + if (getOptionalVoiceCallStateRuntime()) { + return; + } + setVoiceCallStateRuntime({ + state: { + resolveStateDir: () => "", + openKeyedStore: (() => { + throw new Error("openKeyedStore is not used by voice-call manager tests"); + }) as never, + openSyncKeyedStore: (options: OpenKeyedStoreOptions) => + createPluginStateSyncKeyedStoreForTests("voice-call", options), + }, + }); +} + export async function createManagerHarness( configOverrides: Record = {}, provider = new FakeProvider(), @@ -85,6 +106,7 @@ export async function createManagerHarness( fromNumber: "+15550000000", ...configOverrides, }); + installVoiceCallStateRuntimeForTests(); const manager = new CallManager(config, createTestStorePath()); await manager.initialize(provider, "https://example.com/voice/webhook"); return { manager, provider }; @@ -101,6 +123,13 @@ export function markCallAnswered(manager: CallManager, callId: string, eventId: } export function writeCallsToStore(storePath: string, calls: Record[]): void { + fs.mkdirSync(storePath, { recursive: true }); + for (const call of calls) { + persistCallRecord(storePath, CallRecordSchema.parse(call)); + } +} + +export function writeLegacyCallsJsonl(storePath: string, calls: Record[]): void { fs.mkdirSync(storePath, { recursive: true }); const logPath = path.join(storePath, "calls.jsonl"); const lines = calls.map((c) => JSON.stringify(c)).join("\n") + "\n"; diff --git a/extensions/voice-call/src/manager/store.test.ts b/extensions/voice-call/src/manager/store.test.ts index 5a68a1cd027..c0a4a3ef490 100644 --- a/extensions/voice-call/src/manager/store.test.ts +++ b/extensions/voice-call/src/manager/store.test.ts @@ -9,7 +9,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { createTestStorePath, makePersistedCall, - writeCallsToStore, + writeLegacyCallsJsonl, } from "../manager.test-harness.js"; import { clearVoiceCallStateRuntime, setVoiceCallStateRuntime } from "../runtime-state.js"; import { CallRecordSchema } from "../types.js"; @@ -45,22 +45,20 @@ describe("voice-call call record store", () => { resetPluginStateStoreForTests(); }); - it("migrates legacy JSONL records into SQLite-backed plugin state", async () => { + it("does not import legacy JSONL records at runtime", async () => { const storePath = createTestStorePath(); const call = CallRecordSchema.parse( makePersistedCall({ callId: "call-legacy", processedEventIds: ["evt-1"] }), ); - writeCallsToStore(storePath, [call]); + writeLegacyCallsJsonl(storePath, [call]); const restored = loadActiveCallsFromStore(storePath); - expect(restored.activeCalls.get("call-legacy")?.providerCallId).toBe(call.providerCallId); - expect(restored.processedEventIds.has("evt-1")).toBe(true); - expect(fs.existsSync(path.join(storePath, "calls.jsonl"))).toBe(false); - expect(fs.existsSync(path.join(storePath, "state", "openclaw.sqlite"))).toBe(true); + expect(restored.activeCalls.has("call-legacy")).toBe(false); + expect(restored.processedEventIds.has("evt-1")).toBe(false); + expect(fs.existsSync(path.join(storePath, "calls.jsonl"))).toBe(true); const history = await getCallHistoryFromStore(storePath); - expect(history).toHaveLength(1); - expect(history[0]?.callId).toBe("call-legacy"); + expect(history).toEqual([]); }); it("persists new call snapshots without recreating the JSONL log", async () => { @@ -77,26 +75,10 @@ describe("voice-call call record store", () => { expect(restored.activeCalls.get("call-sqlite")?.providerCallId).toBe(call.providerCallId); }); - it("imports fallback JSONL writes created after the migration marker", async () => { - const storePath = createTestStorePath(); - const sqliteCall = CallRecordSchema.parse(makePersistedCall({ callId: "call-sqlite" })); - const fallbackCall = CallRecordSchema.parse(makePersistedCall({ callId: "call-fallback" })); - - persistCallRecord(storePath, sqliteCall); - writeCallsToStore(storePath, [fallbackCall]); - - const restored = loadActiveCallsFromStore(storePath); - expect(restored.activeCalls.has("call-sqlite")).toBe(true); - expect(restored.activeCalls.get("call-fallback")?.providerCallId).toBe( - fallbackCall.providerCallId, - ); - expect(fs.existsSync(path.join(storePath, "calls.jsonl"))).toBe(false); - }); - - it("reads the JSONL fallback when SQLite state cannot open", () => { + it("does not read the JSONL fallback when SQLite state cannot open", () => { const storePath = createTestStorePath(); const call = CallRecordSchema.parse(makePersistedCall({ callId: "call-jsonl" })); - writeCallsToStore(storePath, [call]); + writeLegacyCallsJsonl(storePath, [call]); setVoiceCallStateRuntime({ state: { resolveStateDir: () => "", @@ -110,19 +92,21 @@ describe("voice-call call record store", () => { }); const restored = loadActiveCallsFromStore(storePath); - expect(restored.activeCalls.get("call-jsonl")?.providerCallId).toBe(call.providerCallId); + expect(restored.activeCalls.has("call-jsonl")).toBe(false); + expect(fs.existsSync(path.join(storePath, "calls.jsonl"))).toBe(true); }); - it("keeps oversized fallback records readable when they exceed SQLite chunk budget", async () => { + it("persists oversized records in SQLite without creating a JSONL fallback", async () => { const storePath = createTestStorePath(); const call = CallRecordSchema.parse( makePersistedCall({ callId: "call-large", + metadata: { mode: "conversation", numberRouteKey: "+15550000001" }, transcript: [ { timestamp: Date.now(), speaker: "user", - text: "x".repeat(2 * 1024 * 1024), + text: "x".repeat(3 * 1024 * 1024), isFinal: true, }, ], @@ -133,41 +117,15 @@ describe("voice-call call record store", () => { await flushPendingCallRecordWritesForTest(); const restored = loadActiveCallsFromStore(storePath); - expect(restored.activeCalls.get("call-large")?.providerCallId).toBe(call.providerCallId); - expect(fs.existsSync(path.join(storePath, "calls.jsonl"))).toBe(true); - }); - - it("does not let an older fallback record override a newer SQLite snapshot", async () => { - const storePath = createTestStorePath(); - const olderFallback = CallRecordSchema.parse( - makePersistedCall({ - callId: "call-mixed", - state: "answered", - transcript: [ - { - timestamp: Date.now(), - speaker: "user", - text: "x".repeat(2 * 1024 * 1024), - isFinal: true, - }, - ], - }), - ); - const newerSqlite = CallRecordSchema.parse( - makePersistedCall({ - callId: "call-mixed", - state: "completed", - endedAt: Date.now(), - endReason: "completed", - }), - ); - - persistCallRecord(storePath, olderFallback); - await flushPendingCallRecordWritesForTest(); - persistCallRecord(storePath, newerSqlite); - - const restored = loadActiveCallsFromStore(storePath); - expect(restored.activeCalls.has("call-mixed")).toBe(false); + const restoredCall = restored.activeCalls.get("call-large"); + expect(restoredCall?.providerCallId).toBe(call.providerCallId); + expect(restoredCall?.transcript).toEqual([]); + expect(restoredCall?.metadata).toMatchObject({ + mode: "conversation", + numberRouteKey: "+15550000001", + voiceCallPersistence: { transcriptTruncated: true }, + }); + expect(fs.existsSync(path.join(storePath, "calls.jsonl"))).toBe(false); }); it("replays same-millisecond snapshots in write order", () => { diff --git a/extensions/voice-call/src/manager/store.ts b/extensions/voice-call/src/manager/store.ts index 3d569a0b2c6..7e9e5a34e5d 100644 --- a/extensions/voice-call/src/manager/store.ts +++ b/extensions/voice-call/src/manager/store.ts @@ -1,26 +1,17 @@ import { createHash, randomUUID } from "node:crypto"; -import fs from "node:fs"; import path from "node:path"; import type { PluginStateSyncKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime"; -import { - appendRegularFile, - privateFileStore, - privateFileStoreSync, -} from "openclaw/plugin-sdk/security-runtime"; import { getOptionalVoiceCallStateRuntime } from "../runtime-state.js"; import { CallRecordSchema, TerminalStates, type CallId, type CallRecord } from "../types.js"; -const pendingPersistWrites = new Set>(); -const CALL_RECORD_EVENTS_NAMESPACE = "call-record-events"; -const CALL_RECORD_EVENT_CHUNKS_NAMESPACE = "call-record-event-chunks"; -const CALL_RECORD_MIGRATIONS_NAMESPACE = "call-record-migrations"; -const CALL_RECORD_JSONL_MIGRATION_KEY = "calls-jsonl-v1"; -const MAX_CALL_RECORD_EVENTS = 1000; -const CALL_RECORD_EVENT_META_MAX_ENTRIES = MAX_CALL_RECORD_EVENTS + 100; -const MAX_CHUNKS_PER_CALL_RECORD_EVENT = 48; -const CALL_RECORD_CHUNK_MAX_ENTRIES = +export const CALL_RECORD_EVENTS_NAMESPACE = "call-record-events"; +export const CALL_RECORD_EVENT_CHUNKS_NAMESPACE = "call-record-event-chunks"; +export const MAX_CALL_RECORD_EVENTS = 1000; +export const CALL_RECORD_EVENT_META_MAX_ENTRIES = MAX_CALL_RECORD_EVENTS + 100; +export const MAX_CHUNKS_PER_CALL_RECORD_EVENT = 48; +export const CALL_RECORD_CHUNK_MAX_ENTRIES = MAX_CALL_RECORD_EVENTS * MAX_CHUNKS_PER_CALL_RECORD_EVENT + MAX_CHUNKS_PER_CALL_RECORD_EVENT; -const RAW_CHUNK_BYTES = 36 * 1024; +export const RAW_CALL_RECORD_CHUNK_BYTES = 47 * 1024; let callRecordEventSequence = 0; type CallRecordEventMeta = { @@ -35,11 +26,7 @@ type CallRecordEventChunk = { dataBase64: string; }; -type CallRecordMigrationMarker = { - importedAt: string; -}; - -type PersistedCallRecord = { +export type PersistedCallRecord = { call: CallRecord; persistedAt: number; sequence: number; @@ -49,10 +36,9 @@ type PersistedCallRecord = { type CallRecordStateStores = { events: PluginStateSyncKeyedStore; chunks: PluginStateSyncKeyedStore; - migrations: PluginStateSyncKeyedStore; }; -function resolveCallLogPath(storePath: string): string { +export function resolveVoiceCallLegacyCallLogPath(storePath: string): string { return path.join(storePath, "calls.jsonl"); } @@ -77,11 +63,6 @@ function createCallRecordStateStores(storePath: string): CallRecordStateStores | maxEntries: CALL_RECORD_CHUNK_MAX_ENTRIES, env, }), - migrations: runtime.state.openSyncKeyedStore({ - namespace: CALL_RECORD_MIGRATIONS_NAMESPACE, - maxEntries: 100, - env, - }), }; } @@ -98,7 +79,7 @@ function buildChunkKey(eventKey: string, index: number): string { return `${eventKey}:chunk:${String(index).padStart(4, "0")}`; } -function buildJsonlEventKey(line: string, index: number): string { +export function buildVoiceCallLegacyJsonlEventKey(line: string, index: number): string { return `jsonl:${String(index).padStart(8, "0")}:${createHash("sha256").update(line).digest("hex")}`; } @@ -117,7 +98,7 @@ function parseEventKeySequence(key: string): number { return match ? Number.parseInt(match[1], 10) : 0; } -function parseCallRecordLine(line: string, sequence = 0): PersistedCallRecord | null { +export function parseVoiceCallRecordLine(line: string, sequence = 0): PersistedCallRecord | null { if (!line.trim()) { return null; } @@ -154,22 +135,70 @@ function parseCallRecordLine(line: string, sequence = 0): PersistedCallRecord | } } +function countCallRecordChunks(call: CallRecord): number { + return Math.max( + 1, + Math.ceil(Buffer.byteLength(JSON.stringify(call), "utf8") / RAW_CALL_RECORD_CHUNK_BYTES), + ); +} + +export function prepareVoiceCallRecordForStorage(call: CallRecord): CallRecord { + if (countCallRecordChunks(call) <= MAX_CHUNKS_PER_CALL_RECORD_EVENT) { + return call; + } + const transcriptEntries = call.transcript.length; + const metadata = { + ...call.metadata, + voiceCallPersistence: { + transcriptTruncated: true, + originalTranscriptEntries: transcriptEntries, + }, + }; + const candidateInputs = [ + { transcript: call.transcript.slice(-20), metadata }, + { transcript: [], metadata }, + { + transcript: [], + metadata: { + voiceCallPersistence: { + transcriptTruncated: true, + originalTranscriptEntries: transcriptEntries, + metadataTruncated: true, + }, + }, + }, + ]; + for (const candidateInput of candidateInputs) { + const candidate = CallRecordSchema.parse({ + ...call, + ...candidateInput, + }); + if (countCallRecordChunks(candidate) <= MAX_CHUNKS_PER_CALL_RECORD_EVENT) { + return candidate; + } + } + return call; +} + function registerCallRecordEvent( stores: CallRecordStateStores, eventKey: string, call: CallRecord, order?: { persistedAt: number; sequence: number }, ): void { - const serialized = JSON.stringify(call); + const serialized = JSON.stringify(prepareVoiceCallRecordForStorage(call)); const buffer = Buffer.from(serialized, "utf8"); - const chunkCount = Math.max(1, Math.ceil(buffer.byteLength / RAW_CHUNK_BYTES)); + const chunkCount = Math.max(1, Math.ceil(buffer.byteLength / RAW_CALL_RECORD_CHUNK_BYTES)); if (chunkCount > MAX_CHUNKS_PER_CALL_RECORD_EVENT) { throw new Error( `voice-call record exceeds SQLite chunk limit (${chunkCount}/${MAX_CHUNKS_PER_CALL_RECORD_EVENT})`, ); } for (let index = 0; index < chunkCount; index += 1) { - const chunk = buffer.subarray(index * RAW_CHUNK_BYTES, (index + 1) * RAW_CHUNK_BYTES); + const chunk = buffer.subarray( + index * RAW_CALL_RECORD_CHUNK_BYTES, + (index + 1) * RAW_CALL_RECORD_CHUNK_BYTES, + ); stores.chunks.register(buildChunkKey(eventKey, index), { index, dataBase64: chunk.toString("base64"), @@ -206,19 +235,6 @@ function pruneCallRecordEvents(stores: CallRecordStateStores): void { } } -function registerCallRecordEventIfAbsent( - stores: CallRecordStateStores, - eventKey: string, - record: PersistedCallRecord, -): void { - if (!stores.events.lookup(eventKey)) { - registerCallRecordEvent(stores, eventKey, record.call, { - persistedAt: record.persistedAt, - sequence: record.sequence, - }); - } -} - function readCallRecordEvent(stores: CallRecordStateStores, eventKey: string): CallRecord | null { const meta = stores.events.lookup(eventKey); if (!meta) { @@ -233,68 +249,10 @@ function readCallRecordEvent(stores: CallRecordStateStores, eventKey: string): C chunks.push(Buffer.from(chunk.dataBase64, "base64")); } const serialized = Buffer.concat(chunks, meta.byteLength).toString("utf8"); - return parseCallRecordLine(serialized)?.call ?? null; + return parseVoiceCallRecordLine(serialized)?.call ?? null; } -function ensureLegacyCallLogImported( - storePath: string, - stores: CallRecordStateStores, -): PersistedCallRecord[] { - const imported = stores.migrations.lookup(CALL_RECORD_JSONL_MIGRATION_KEY) !== undefined; - const logPath = resolveCallLogPath(storePath); - const content = privateFileStoreSync(storePath).readTextIfExists(path.basename(logPath)); - if (content === null) { - if (!imported) { - stores.migrations.register(CALL_RECORD_JSONL_MIGRATION_KEY, { - importedAt: new Date().toISOString(), - }); - } - return []; - } - - const fallbackCalls: PersistedCallRecord[] = []; - { - let index = 0; - let importFailed = false; - for (const line of content.split("\n")) { - const parsed = parseCallRecordLine(line, index); - if (!parsed) { - index += 1; - continue; - } - // Fallback JSONL writes can appear after the migration marker if SQLite - // persistence had a transient failure. Stable keys make the importer - // idempotent if the legacy file cannot be removed. - try { - registerCallRecordEventIfAbsent(stores, buildJsonlEventKey(line, index), parsed); - } catch (err) { - importFailed = true; - fallbackCalls.push({ - ...parsed, - orderKey: `jsonl:${String(index).padStart(8, "0")}`, - }); - console.error("[voice-call] Failed to import persisted call record:", err); - } - index += 1; - } - if (!importFailed) { - try { - fs.rmSync(logPath, { force: true }); - } catch { - // Import already completed; leave an unreadable legacy log in place. - } - } - } - if (!imported) { - stores.migrations.register(CALL_RECORD_JSONL_MIGRATION_KEY, { - importedAt: new Date().toISOString(), - }); - } - return fallbackCalls; -} - -function readCallRecordEvents(storePath: string, stores: CallRecordStateStores): CallRecord[] { - const fallbackCalls = ensureLegacyCallLogImported(storePath, stores); +function readCallRecordEvents(stores: CallRecordStateStores): CallRecord[] { const sqliteCalls: PersistedCallRecord[] = stores.events .entries() .toSorted((a, b) => a.createdAt - b.createdAt || a.key.localeCompare(b.key)) @@ -310,7 +268,7 @@ function readCallRecordEvents(storePath: string, stores: CallRecordStateStores): : null; }) .filter((entry): entry is PersistedCallRecord => entry !== null); - return [...sqliteCalls, ...fallbackCalls] + return sqliteCalls .toSorted( (a, b) => a.persistedAt - b.persistedAt || @@ -321,38 +279,21 @@ function readCallRecordEvents(storePath: string, stores: CallRecordStateStores): } export function persistCallRecord(storePath: string, call: CallRecord): void { - const stores = tryCreateCallRecordStateStores(storePath); - if (stores) { - try { - void ensureLegacyCallLogImported(storePath, stores); - const order = nextCallRecordOrder(); - registerCallRecordEvent(stores, buildNewEventKey(order), call, order); - return; - } catch (err) { - console.error("[voice-call] Failed to persist call record:", err); + try { + const stores = createCallRecordStateStores(storePath); + if (!stores) { + throw new Error("Voice Call state runtime not initialized"); } + const order = nextCallRecordOrder(); + registerCallRecordEvent(stores, buildNewEventKey(order), call, order); + } catch (err) { + console.error("[voice-call] Failed to persist call record:", err); + throw err; } - - const logPath = resolveCallLogPath(storePath); - const order = nextCallRecordOrder(); - const line = `${JSON.stringify({ version: 2, ...order, call })}\n`; - // Fire-and-forget async write to avoid blocking event loop. - const write = appendRegularFile({ - filePath: logPath, - content: line, - rejectSymlinkParents: true, - }) - .catch((err) => { - console.error("[voice-call] Failed to persist call record:", err); - }) - .finally(() => { - pendingPersistWrites.delete(write); - }); - pendingPersistWrites.add(write); } export async function flushPendingCallRecordWritesForTest(): Promise { - await Promise.allSettled(pendingPersistWrites); + await Promise.resolve(); } export function loadActiveCallsFromStore(storePath: string): { @@ -362,14 +303,11 @@ export function loadActiveCallsFromStore(storePath: string): { rejectedProviderCallIds: Set; } { const stores = tryCreateCallRecordStateStores(storePath); - let calls: CallRecord[]; + let calls: CallRecord[] = []; try { - calls = stores - ? readCallRecordEvents(storePath, stores) - : readCallRecordsFromLegacyLog(storePath); + calls = stores ? readCallRecordEvents(stores) : []; } catch (err) { console.error("[voice-call] Failed to read SQLite call records:", err); - calls = readCallRecordsFromLegacyLog(storePath); } if (calls.length === 0) { return { @@ -415,37 +353,10 @@ export async function getCallHistoryFromStore( const stores = tryCreateCallRecordStateStores(storePath); if (stores) { try { - return readCallRecordEvents(storePath, stores).slice(-limit); + return readCallRecordEvents(stores).slice(-limit); } catch (err) { console.error("[voice-call] Failed to read SQLite call history:", err); } } - const logPath = resolveCallLogPath(storePath); - const content = await privateFileStore(storePath).readTextIfExists(path.basename(logPath)); - if (content === null) { - return []; - } - const lines = content.trim().split("\n").filter(Boolean); - const calls: CallRecord[] = []; - - for (const [index, line] of lines.slice(-limit).entries()) { - const parsed = parseCallRecordLine(line, index); - if (parsed) { - calls.push(parsed.call); - } - } - - return calls; -} - -function readCallRecordsFromLegacyLog(storePath: string): CallRecord[] { - const logPath = resolveCallLogPath(storePath); - const content = privateFileStoreSync(storePath).readTextIfExists(path.basename(logPath)); - if (content === null) { - return []; - } - return content - .split("\n") - .map((line, index) => parseCallRecordLine(line, index)?.call ?? null) - .filter((call): call is CallRecord => call !== null); + return []; } diff --git a/extensions/voice-call/src/webhook.hangup-once.lifecycle.test.ts b/extensions/voice-call/src/webhook.hangup-once.lifecycle.test.ts index f467cdd3b35..04d4d25b179 100644 --- a/extensions/voice-call/src/webhook.hangup-once.lifecycle.test.ts +++ b/extensions/voice-call/src/webhook.hangup-once.lifecycle.test.ts @@ -1,11 +1,30 @@ -import { afterEach, describe, expect, it } from "vitest"; +import type { OpenKeyedStoreOptions } from "openclaw/plugin-sdk/plugin-state-runtime"; +import { + createPluginStateSyncKeyedStoreForTests, + resetPluginStateStoreForTests, +} from "openclaw/plugin-sdk/plugin-state-test-runtime"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { VoiceCallConfigSchema, type VoiceCallConfig } from "./config.js"; import { CallManager } from "./manager.js"; import { createTestStorePath, FakeProvider } from "./manager.test-harness.js"; import { flushPendingCallRecordWritesForTest } from "./manager/store.js"; +import { clearVoiceCallStateRuntime, setVoiceCallStateRuntime } from "./runtime-state.js"; import type { WebhookContext, WebhookParseOptions } from "./types.js"; import { VoiceCallWebhookServer } from "./webhook.js"; +function installStateRuntime(): void { + setVoiceCallStateRuntime({ + state: { + resolveStateDir: () => "", + openKeyedStore: (() => { + throw new Error("openKeyedStore is not used by voice-call webhook lifecycle tests"); + }) as never, + openSyncKeyedStore: (options: OpenKeyedStoreOptions) => + createPluginStateSyncKeyedStoreForTests("voice-call", options), + }, + }); +} + const createConfig = (overrides: Partial = {}): VoiceCallConfig => { const base = VoiceCallConfigSchema.parse({ enabled: true, @@ -118,8 +137,14 @@ class RejectInboundReplayWithHangupFailureProvider extends RejectInboundReplayPr } describe("Voice-call webhook hangup-once lifecycle", () => { + beforeEach(() => { + resetPluginStateStoreForTests(); + installStateRuntime(); + }); + afterEach(() => { - // Each test uses an isolated store path, so only server cleanup is needed. + clearVoiceCallStateRuntime(); + resetPluginStateStoreForTests(); }); it("hangs up a rejected inbound replay only once across duplicate webhook delivery", async () => {