refactor: migrate voice-call call logs through doctor (#88731)

This commit is contained in:
Peter Steinberger
2026-05-31 19:43:03 +01:00
committed by GitHub
parent 2f449285b9
commit 3ff86f3350
7 changed files with 710 additions and 236 deletions

View File

@@ -0,0 +1,212 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import {
createPluginStateKeyedStoreForTests,
createPluginStateSyncKeyedStoreForTests,
resetPluginStateStoreForTests,
} from "openclaw/plugin-sdk/plugin-state-test-runtime";
import type {
OpenKeyedStoreOptions,
PluginDoctorStateMigrationContext,
} from "openclaw/plugin-sdk/runtime-doctor";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { stateMigrations } from "./doctor-contract-api.js";
import {
createTestStorePath,
makePersistedCall,
writeLegacyCallsJsonl,
} from "./src/manager.test-harness.js";
import { getCallHistoryFromStore, loadActiveCallsFromStore } from "./src/manager/store.js";
import { clearVoiceCallStateRuntime, setVoiceCallStateRuntime } from "./src/runtime-state.js";
function createDoctorContext(env: NodeJS.ProcessEnv): PluginDoctorStateMigrationContext {
return {
openPluginStateKeyedStore<T>(options: OpenKeyedStoreOptions) {
return createPluginStateKeyedStoreForTests<T>("voice-call", {
...options,
env: options.env ?? env,
});
},
};
}
function installStateRuntime(): void {
setVoiceCallStateRuntime({
state: {
resolveStateDir: () => "",
openKeyedStore: (() => {
throw new Error("openKeyedStore is not used by voice-call doctor tests");
}) as never,
openSyncKeyedStore: (options: OpenKeyedStoreOptions) =>
createPluginStateSyncKeyedStoreForTests("voice-call", options),
},
});
}
describe("voice-call doctor state migration", () => {
let stateDir = "";
let storePath = "";
let env: NodeJS.ProcessEnv;
beforeEach(async () => {
resetPluginStateStoreForTests();
stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-voice-call-doctor-"));
storePath = createTestStorePath();
env = { ...process.env, HOME: stateDir, OPENCLAW_STATE_DIR: stateDir };
installStateRuntime();
});
afterEach(async () => {
clearVoiceCallStateRuntime();
resetPluginStateStoreForTests();
await fs.rm(stateDir, { recursive: true, force: true });
await fs.rm(storePath, { recursive: true, force: true });
});
it("imports legacy calls.jsonl into plugin state", async () => {
const sourcePath = path.join(storePath, "calls.jsonl");
const call = makePersistedCall({
callId: "call-doctor",
providerCallId: "provider-doctor",
processedEventIds: ["evt-doctor"],
});
writeLegacyCallsJsonl(storePath, [
{
version: 2,
persistedAt: 1000,
sequence: 0,
call,
},
]);
const migration = stateMigrations[0];
const config = {
plugins: {
entries: {
"@openclaw/voice-call": {
config: { store: storePath },
},
},
},
};
await expect(
migration.detectLegacyState({
config,
env,
stateDir,
oauthDir: path.join(stateDir, "oauth"),
context: createDoctorContext(env),
}),
).resolves.toMatchObject({
preview: [expect.stringContaining("1 record")],
});
const result = await migration.migrateLegacyState({
config,
env,
stateDir,
oauthDir: path.join(stateDir, "oauth"),
context: createDoctorContext(env),
});
expect(result.warnings).toEqual([]);
expect(result.changes).toEqual([
expect.stringContaining("Migrated 1 Voice Call call-log record"),
expect.stringContaining("Archived Voice Call call-log legacy source"),
]);
await expect(fs.access(sourcePath)).rejects.toThrow();
await expect(fs.access(`${sourcePath}.migrated`)).resolves.toBeUndefined();
const restored = loadActiveCallsFromStore(storePath);
expect(restored.activeCalls.get("call-doctor")?.providerCallId).toBe("provider-doctor");
expect(restored.processedEventIds.has("evt-doctor")).toBe(true);
const history = await getCallHistoryFromStore(storePath);
expect(history).toHaveLength(1);
expect(history[0]?.callId).toBe("call-doctor");
});
it("imports the newest legacy call records when the JSONL log is over capacity", async () => {
const calls = Array.from({ length: 1002 }, (_, index) =>
makePersistedCall({
callId: `call-${index}`,
providerCallId: `provider-${index}`,
}),
);
writeLegacyCallsJsonl(storePath, calls);
const config = {
plugins: {
entries: {
"@openclaw/voice-call": {
config: { store: storePath },
},
},
},
};
const result = await stateMigrations[0].migrateLegacyState({
config,
env,
stateDir,
oauthDir: path.join(stateDir, "oauth"),
context: createDoctorContext(env),
});
expect(result.warnings).toEqual([
expect.stringContaining("Pruned 2 older Voice Call call-log records"),
]);
expect(result.changes).toEqual([
expect.stringContaining("Migrated 1000 Voice Call call-log records"),
expect.stringContaining("Archived Voice Call call-log legacy source"),
]);
const restored = loadActiveCallsFromStore(storePath);
expect(restored.activeCalls.has("call-0")).toBe(false);
expect(restored.activeCalls.has("call-1")).toBe(false);
expect(restored.activeCalls.get("call-1001")?.providerCallId).toBe("provider-1001");
const history = await getCallHistoryFromStore(storePath, 1000);
expect(history).toHaveLength(1000);
expect(history[0]?.callId).toBe("call-2");
expect(history.at(-1)?.callId).toBe("call-1001");
});
it("leaves malformed mixed legacy logs in place after importing valid records", async () => {
const sourcePath = path.join(storePath, "calls.jsonl");
const call = makePersistedCall({
callId: "call-valid",
providerCallId: "provider-valid",
});
await fs.mkdir(path.dirname(sourcePath), { recursive: true });
await fs.writeFile(sourcePath, `${JSON.stringify(call)}\n{not json}\n`);
const config = {
plugins: {
entries: {
"@openclaw/voice-call": {
config: { store: storePath },
},
},
},
};
const result = await stateMigrations[0].migrateLegacyState({
config,
env,
stateDir,
oauthDir: path.join(stateDir, "oauth"),
context: createDoctorContext(env),
});
expect(result.changes).toEqual([
expect.stringContaining("Migrated 1 Voice Call call-log record"),
]);
expect(result.warnings).toEqual([
"Skipped malformed Voice Call call-log line 2",
"Left Voice Call call-log source in place because migration was incomplete",
]);
await expect(fs.access(sourcePath)).resolves.toBeUndefined();
await expect(fs.access(`${sourcePath}.migrated`)).rejects.toThrow();
expect(loadActiveCallsFromStore(storePath).activeCalls.has("call-valid")).toBe(true);
});
});

View File

@@ -0,0 +1,313 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import type {
PluginDoctorStateMigration,
PluginStateKeyedStore,
} from "openclaw/plugin-sdk/runtime-doctor";
import {
buildVoiceCallLegacyJsonlEventKey,
CALL_RECORD_CHUNK_MAX_ENTRIES,
CALL_RECORD_EVENT_CHUNKS_NAMESPACE,
CALL_RECORD_EVENT_META_MAX_ENTRIES,
CALL_RECORD_EVENTS_NAMESPACE,
MAX_CALL_RECORD_EVENTS,
MAX_CHUNKS_PER_CALL_RECORD_EVENT,
prepareVoiceCallRecordForStorage,
parseVoiceCallRecordLine,
RAW_CALL_RECORD_CHUNK_BYTES,
resolveVoiceCallLegacyCallLogPath,
} from "./src/manager/store.js";
import type { CallRecord } from "./src/types.js";
type CallRecordEventMeta = {
chunkCount: number;
byteLength: number;
persistedAt?: number;
sequence?: number;
};
type CallRecordEventChunk = {
index: number;
dataBase64: string;
};
type PreparedLegacyCallRecord = {
eventKey: string;
lineNumber: number;
chunks: CallRecordEventChunk[];
meta: CallRecordEventMeta;
};
function resolveHome(env: NodeJS.ProcessEnv): string {
return env.HOME?.trim() || os.homedir();
}
function resolveUserPath(input: string, env: NodeJS.ProcessEnv): string {
const trimmed = input.trim();
if (!trimmed) {
return trimmed;
}
if (trimmed.startsWith("~")) {
return path.resolve(trimmed.replace(/^~(?=$|[\\/])/, resolveHome(env)));
}
return path.resolve(trimmed);
}
function getVoiceCallConfigStore(config: PluginDoctorStateMigrationParams["config"]): string {
for (const pluginId of ["voice-call", "@openclaw/voice-call"]) {
const rawConfig = config.plugins?.entries?.[pluginId]?.config;
if (!rawConfig || typeof rawConfig !== "object" || Array.isArray(rawConfig)) {
continue;
}
const store = (rawConfig as { store?: unknown }).store;
if (typeof store === "string" && store.trim()) {
return store.trim();
}
}
return "";
}
type PluginDoctorStateMigrationParams = Parameters<
PluginDoctorStateMigration["detectLegacyState"]
>[0];
function resolveVoiceCallStorePath(params: {
config: PluginDoctorStateMigrationParams["config"];
env: NodeJS.ProcessEnv;
}): string {
const configuredStore = getVoiceCallConfigStore(params.config);
if (configuredStore) {
return resolveUserPath(configuredStore, params.env);
}
return path.join(resolveHome(params.env), ".openclaw", "voice-calls");
}
async function fileExists(filePath: string): Promise<boolean> {
try {
const stat = await fs.stat(filePath);
return stat.isFile();
} catch {
return false;
}
}
function buildChunkKey(eventKey: string, index: number): string {
return `${eventKey}:chunk:${String(index).padStart(4, "0")}`;
}
function prepareChunks(call: CallRecord): {
chunks: CallRecordEventChunk[];
meta: CallRecordEventMeta;
} {
const serialized = JSON.stringify(prepareVoiceCallRecordForStorage(call));
const buffer = Buffer.from(serialized, "utf8");
const chunkCount = Math.max(1, Math.ceil(buffer.byteLength / RAW_CALL_RECORD_CHUNK_BYTES));
if (chunkCount > MAX_CHUNKS_PER_CALL_RECORD_EVENT) {
throw new Error(
`voice-call record exceeds SQLite chunk limit (${chunkCount}/${MAX_CHUNKS_PER_CALL_RECORD_EVENT})`,
);
}
const chunks: CallRecordEventChunk[] = [];
for (let index = 0; index < chunkCount; index += 1) {
const chunk = buffer.subarray(
index * RAW_CALL_RECORD_CHUNK_BYTES,
(index + 1) * RAW_CALL_RECORD_CHUNK_BYTES,
);
chunks.push({ index, dataBase64: chunk.toString("base64") });
}
return {
chunks,
meta: {
chunkCount,
byteLength: buffer.byteLength,
},
};
}
async function readLegacyCallRecords(filePath: string): Promise<{
entries: PreparedLegacyCallRecord[];
warnings: string[];
}> {
let content = "";
try {
content = await fs.readFile(filePath, "utf8");
} catch {
return { entries: [], warnings: [] };
}
const entries: PreparedLegacyCallRecord[] = [];
const warnings: string[] = [];
let index = 0;
for (const line of content.split("\n")) {
const parsed = parseVoiceCallRecordLine(line, index);
if (!parsed) {
if (line.trim()) {
warnings.push(`Skipped malformed Voice Call call-log line ${index + 1}`);
}
index += 1;
continue;
}
try {
const prepared = prepareChunks(parsed.call);
entries.push({
eventKey: buildVoiceCallLegacyJsonlEventKey(line, index),
lineNumber: index + 1,
chunks: prepared.chunks,
meta: {
...prepared.meta,
persistedAt: parsed.persistedAt,
sequence: parsed.sequence,
},
});
} catch (err) {
warnings.push(`Skipped Voice Call call-log line ${index + 1}: ${String(err)}`);
}
index += 1;
}
return { entries, warnings };
}
async function archiveLegacySource(params: {
filePath: string;
changes: string[];
warnings: string[];
}): Promise<void> {
const archivedPath = `${params.filePath}.migrated`;
if (await fileExists(archivedPath)) {
params.warnings.push(
`Left migrated Voice Call call-log source in place because ${archivedPath} already exists`,
);
return;
}
try {
await fs.rename(params.filePath, archivedPath);
params.changes.push(`Archived Voice Call call-log legacy source -> ${archivedPath}`);
} catch (err) {
params.warnings.push(`Failed archiving Voice Call call-log legacy source: ${String(err)}`);
}
}
async function selectEntriesForImport(params: {
entries: PreparedLegacyCallRecord[];
eventStore: PluginStateKeyedStore<CallRecordEventMeta>;
chunkStore: PluginStateKeyedStore<CallRecordEventChunk>;
warnings: string[];
}): Promise<{ existingEventKeys: Set<string>; entries: PreparedLegacyCallRecord[] }> {
const existingEventKeys = new Set((await params.eventStore.entries()).map((entry) => entry.key));
const missingEntries = params.entries.filter((entry) => !existingEventKeys.has(entry.eventKey));
const existingChunks = await params.chunkStore.entries();
let eventRoom = Math.max(0, MAX_CALL_RECORD_EVENTS - existingEventKeys.size);
let chunkRoom = Math.max(0, CALL_RECORD_CHUNK_MAX_ENTRIES - existingChunks.length);
const selected: PreparedLegacyCallRecord[] = [];
let pruned = 0;
for (const entry of missingEntries.toReversed()) {
if (eventRoom <= 0 || entry.chunks.length > chunkRoom) {
pruned++;
continue;
}
selected.push(entry);
eventRoom--;
chunkRoom -= entry.chunks.length;
}
if (pruned > 0) {
params.warnings.push(
`Pruned ${pruned} older Voice Call call-log ${pruned === 1 ? "record" : "records"} during migration because plugin state keeps the newest ${MAX_CALL_RECORD_EVENTS} records`,
);
}
return { existingEventKeys, entries: selected.toReversed() };
}
async function importLegacyCallRecords(params: {
entries: PreparedLegacyCallRecord[];
eventStore: PluginStateKeyedStore<CallRecordEventMeta>;
chunkStore: PluginStateKeyedStore<CallRecordEventChunk>;
warnings: string[];
}): Promise<number> {
const selected = await selectEntriesForImport(params);
let imported = 0;
for (const entry of selected.entries) {
if (selected.existingEventKeys.has(entry.eventKey)) {
continue;
}
try {
for (const chunk of entry.chunks) {
await params.chunkStore.register(buildChunkKey(entry.eventKey, chunk.index), chunk);
}
await params.eventStore.register(entry.eventKey, entry.meta);
selected.existingEventKeys.add(entry.eventKey);
imported++;
} catch (err) {
params.warnings.push(
`Failed migrating Voice Call call-log line ${entry.lineNumber}: ${String(err)}`,
);
}
}
return imported;
}
export const stateMigrations: PluginDoctorStateMigration[] = [
{
id: "voice-call-calls-jsonl-to-plugin-state",
label: "Voice Call call log",
async detectLegacyState(params) {
const storePath = resolveVoiceCallStorePath(params);
const filePath = resolveVoiceCallLegacyCallLogPath(storePath);
const { entries } = await readLegacyCallRecords(filePath);
if (entries.length === 0) {
return null;
}
return {
preview: [
`- Voice Call call log: ${entries.length} ${entries.length === 1 ? "record" : "records"} -> plugin state (${CALL_RECORD_EVENTS_NAMESPACE})`,
],
};
},
async migrateLegacyState(params) {
const changes: string[] = [];
const warnings: string[] = [];
const storePath = resolveVoiceCallStorePath(params);
const filePath = resolveVoiceCallLegacyCallLogPath(storePath);
const { entries, warnings: readWarnings } = await readLegacyCallRecords(filePath);
warnings.push(...readWarnings);
if (entries.length === 0) {
return { changes, warnings };
}
const env = { ...params.env, OPENCLAW_STATE_DIR: storePath };
const eventStore = params.context.openPluginStateKeyedStore<CallRecordEventMeta>({
namespace: CALL_RECORD_EVENTS_NAMESPACE,
maxEntries: CALL_RECORD_EVENT_META_MAX_ENTRIES,
env,
});
const chunkStore = params.context.openPluginStateKeyedStore<CallRecordEventChunk>({
namespace: CALL_RECORD_EVENT_CHUNKS_NAMESPACE,
maxEntries: CALL_RECORD_CHUNK_MAX_ENTRIES,
env,
});
const imported = await importLegacyCallRecords({
entries,
eventStore,
chunkStore,
warnings,
});
if (imported > 0) {
changes.push(
`Migrated ${imported} Voice Call call-log ${imported === 1 ? "record" : "records"} -> plugin state`,
);
}
if (
warnings.some(
(warning) =>
warning.startsWith("Failed migrating Voice Call") ||
warning.startsWith("Skipped malformed Voice Call call-log line") ||
warning.startsWith("Skipped Voice Call call-log line") ||
warning.startsWith("Skipped Voice Call call-log migration"),
)
) {
warnings.push("Left Voice Call call-log source in place because migration was incomplete");
return { changes, warnings };
}
await archiveLegacySource({ filePath, changes, warnings });
return { changes, warnings };
},
},
];

View File

@@ -1,4 +1,9 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import type { OpenKeyedStoreOptions } from "openclaw/plugin-sdk/plugin-state-runtime";
import {
createPluginStateSyncKeyedStoreForTests,
resetPluginStateStoreForTests,
} from "openclaw/plugin-sdk/plugin-state-test-runtime";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { VoiceCallConfigSchema } from "./config.js";
import { CallManager } from "./manager.js";
import {
@@ -8,6 +13,20 @@ import {
writeCallsToStore,
} from "./manager.test-harness.js";
import { flushPendingCallRecordWritesForTest, loadActiveCallsFromStore } from "./manager/store.js";
import { clearVoiceCallStateRuntime, setVoiceCallStateRuntime } from "./runtime-state.js";
function installStateRuntime(): void {
setVoiceCallStateRuntime({
state: {
resolveStateDir: () => "",
openKeyedStore: (() => {
throw new Error("openKeyedStore is not used by voice-call restore tests");
}) as never,
openSyncKeyedStore: (options: OpenKeyedStoreOptions) =>
createPluginStateSyncKeyedStoreForTests("voice-call", options),
},
});
}
function requireSingleActiveCall(manager: CallManager) {
const activeCalls = manager.getActiveCalls();
@@ -32,9 +51,16 @@ function requireSingleHangupCall(provider: FakeProvider) {
}
describe("CallManager verification on restore", () => {
beforeEach(() => {
resetPluginStateStoreForTests();
installStateRuntime();
});
afterEach(() => {
vi.useRealTimers();
vi.restoreAllMocks();
clearVoiceCallStateRuntime();
resetPluginStateStoreForTests();
});
async function initializeManager(params?: {

View File

@@ -1,9 +1,14 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import type { OpenKeyedStoreOptions } from "openclaw/plugin-sdk/plugin-state-runtime";
import { createPluginStateSyncKeyedStoreForTests } from "openclaw/plugin-sdk/plugin-state-test-runtime";
import { VoiceCallConfigSchema } from "./config.js";
import { CallManager } from "./manager.js";
import { persistCallRecord } from "./manager/store.js";
import type { VoiceCallProvider } from "./providers/base.js";
import { getOptionalVoiceCallStateRuntime, setVoiceCallStateRuntime } from "./runtime-state.js";
import { CallRecordSchema } from "./types.js";
import type {
GetCallStatusInput,
GetCallStatusResult,
@@ -72,6 +77,22 @@ export function createTestStorePath(): string {
return fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-voice-call-test-"));
}
export function installVoiceCallStateRuntimeForTests(): void {
if (getOptionalVoiceCallStateRuntime()) {
return;
}
setVoiceCallStateRuntime({
state: {
resolveStateDir: () => "",
openKeyedStore: (() => {
throw new Error("openKeyedStore is not used by voice-call manager tests");
}) as never,
openSyncKeyedStore: (options: OpenKeyedStoreOptions) =>
createPluginStateSyncKeyedStoreForTests("voice-call", options),
},
});
}
export async function createManagerHarness(
configOverrides: Record<string, unknown> = {},
provider = new FakeProvider(),
@@ -85,6 +106,7 @@ export async function createManagerHarness(
fromNumber: "+15550000000",
...configOverrides,
});
installVoiceCallStateRuntimeForTests();
const manager = new CallManager(config, createTestStorePath());
await manager.initialize(provider, "https://example.com/voice/webhook");
return { manager, provider };
@@ -101,6 +123,13 @@ export function markCallAnswered(manager: CallManager, callId: string, eventId:
}
export function writeCallsToStore(storePath: string, calls: Record<string, unknown>[]): void {
fs.mkdirSync(storePath, { recursive: true });
for (const call of calls) {
persistCallRecord(storePath, CallRecordSchema.parse(call));
}
}
export function writeLegacyCallsJsonl(storePath: string, calls: Record<string, unknown>[]): void {
fs.mkdirSync(storePath, { recursive: true });
const logPath = path.join(storePath, "calls.jsonl");
const lines = calls.map((c) => JSON.stringify(c)).join("\n") + "\n";

View File

@@ -9,7 +9,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
createTestStorePath,
makePersistedCall,
writeCallsToStore,
writeLegacyCallsJsonl,
} from "../manager.test-harness.js";
import { clearVoiceCallStateRuntime, setVoiceCallStateRuntime } from "../runtime-state.js";
import { CallRecordSchema } from "../types.js";
@@ -45,22 +45,20 @@ describe("voice-call call record store", () => {
resetPluginStateStoreForTests();
});
it("migrates legacy JSONL records into SQLite-backed plugin state", async () => {
it("does not import legacy JSONL records at runtime", async () => {
const storePath = createTestStorePath();
const call = CallRecordSchema.parse(
makePersistedCall({ callId: "call-legacy", processedEventIds: ["evt-1"] }),
);
writeCallsToStore(storePath, [call]);
writeLegacyCallsJsonl(storePath, [call]);
const restored = loadActiveCallsFromStore(storePath);
expect(restored.activeCalls.get("call-legacy")?.providerCallId).toBe(call.providerCallId);
expect(restored.processedEventIds.has("evt-1")).toBe(true);
expect(fs.existsSync(path.join(storePath, "calls.jsonl"))).toBe(false);
expect(fs.existsSync(path.join(storePath, "state", "openclaw.sqlite"))).toBe(true);
expect(restored.activeCalls.has("call-legacy")).toBe(false);
expect(restored.processedEventIds.has("evt-1")).toBe(false);
expect(fs.existsSync(path.join(storePath, "calls.jsonl"))).toBe(true);
const history = await getCallHistoryFromStore(storePath);
expect(history).toHaveLength(1);
expect(history[0]?.callId).toBe("call-legacy");
expect(history).toEqual([]);
});
it("persists new call snapshots without recreating the JSONL log", async () => {
@@ -77,26 +75,10 @@ describe("voice-call call record store", () => {
expect(restored.activeCalls.get("call-sqlite")?.providerCallId).toBe(call.providerCallId);
});
it("imports fallback JSONL writes created after the migration marker", async () => {
const storePath = createTestStorePath();
const sqliteCall = CallRecordSchema.parse(makePersistedCall({ callId: "call-sqlite" }));
const fallbackCall = CallRecordSchema.parse(makePersistedCall({ callId: "call-fallback" }));
persistCallRecord(storePath, sqliteCall);
writeCallsToStore(storePath, [fallbackCall]);
const restored = loadActiveCallsFromStore(storePath);
expect(restored.activeCalls.has("call-sqlite")).toBe(true);
expect(restored.activeCalls.get("call-fallback")?.providerCallId).toBe(
fallbackCall.providerCallId,
);
expect(fs.existsSync(path.join(storePath, "calls.jsonl"))).toBe(false);
});
it("reads the JSONL fallback when SQLite state cannot open", () => {
it("does not read the JSONL fallback when SQLite state cannot open", () => {
const storePath = createTestStorePath();
const call = CallRecordSchema.parse(makePersistedCall({ callId: "call-jsonl" }));
writeCallsToStore(storePath, [call]);
writeLegacyCallsJsonl(storePath, [call]);
setVoiceCallStateRuntime({
state: {
resolveStateDir: () => "",
@@ -110,19 +92,21 @@ describe("voice-call call record store", () => {
});
const restored = loadActiveCallsFromStore(storePath);
expect(restored.activeCalls.get("call-jsonl")?.providerCallId).toBe(call.providerCallId);
expect(restored.activeCalls.has("call-jsonl")).toBe(false);
expect(fs.existsSync(path.join(storePath, "calls.jsonl"))).toBe(true);
});
it("keeps oversized fallback records readable when they exceed SQLite chunk budget", async () => {
it("persists oversized records in SQLite without creating a JSONL fallback", async () => {
const storePath = createTestStorePath();
const call = CallRecordSchema.parse(
makePersistedCall({
callId: "call-large",
metadata: { mode: "conversation", numberRouteKey: "+15550000001" },
transcript: [
{
timestamp: Date.now(),
speaker: "user",
text: "x".repeat(2 * 1024 * 1024),
text: "x".repeat(3 * 1024 * 1024),
isFinal: true,
},
],
@@ -133,41 +117,15 @@ describe("voice-call call record store", () => {
await flushPendingCallRecordWritesForTest();
const restored = loadActiveCallsFromStore(storePath);
expect(restored.activeCalls.get("call-large")?.providerCallId).toBe(call.providerCallId);
expect(fs.existsSync(path.join(storePath, "calls.jsonl"))).toBe(true);
});
it("does not let an older fallback record override a newer SQLite snapshot", async () => {
const storePath = createTestStorePath();
const olderFallback = CallRecordSchema.parse(
makePersistedCall({
callId: "call-mixed",
state: "answered",
transcript: [
{
timestamp: Date.now(),
speaker: "user",
text: "x".repeat(2 * 1024 * 1024),
isFinal: true,
},
],
}),
);
const newerSqlite = CallRecordSchema.parse(
makePersistedCall({
callId: "call-mixed",
state: "completed",
endedAt: Date.now(),
endReason: "completed",
}),
);
persistCallRecord(storePath, olderFallback);
await flushPendingCallRecordWritesForTest();
persistCallRecord(storePath, newerSqlite);
const restored = loadActiveCallsFromStore(storePath);
expect(restored.activeCalls.has("call-mixed")).toBe(false);
const restoredCall = restored.activeCalls.get("call-large");
expect(restoredCall?.providerCallId).toBe(call.providerCallId);
expect(restoredCall?.transcript).toEqual([]);
expect(restoredCall?.metadata).toMatchObject({
mode: "conversation",
numberRouteKey: "+15550000001",
voiceCallPersistence: { transcriptTruncated: true },
});
expect(fs.existsSync(path.join(storePath, "calls.jsonl"))).toBe(false);
});
it("replays same-millisecond snapshots in write order", () => {

View File

@@ -1,26 +1,17 @@
import { createHash, randomUUID } from "node:crypto";
import fs from "node:fs";
import path from "node:path";
import type { PluginStateSyncKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime";
import {
appendRegularFile,
privateFileStore,
privateFileStoreSync,
} from "openclaw/plugin-sdk/security-runtime";
import { getOptionalVoiceCallStateRuntime } from "../runtime-state.js";
import { CallRecordSchema, TerminalStates, type CallId, type CallRecord } from "../types.js";
const pendingPersistWrites = new Set<Promise<void>>();
const CALL_RECORD_EVENTS_NAMESPACE = "call-record-events";
const CALL_RECORD_EVENT_CHUNKS_NAMESPACE = "call-record-event-chunks";
const CALL_RECORD_MIGRATIONS_NAMESPACE = "call-record-migrations";
const CALL_RECORD_JSONL_MIGRATION_KEY = "calls-jsonl-v1";
const MAX_CALL_RECORD_EVENTS = 1000;
const CALL_RECORD_EVENT_META_MAX_ENTRIES = MAX_CALL_RECORD_EVENTS + 100;
const MAX_CHUNKS_PER_CALL_RECORD_EVENT = 48;
const CALL_RECORD_CHUNK_MAX_ENTRIES =
export const CALL_RECORD_EVENTS_NAMESPACE = "call-record-events";
export const CALL_RECORD_EVENT_CHUNKS_NAMESPACE = "call-record-event-chunks";
export const MAX_CALL_RECORD_EVENTS = 1000;
export const CALL_RECORD_EVENT_META_MAX_ENTRIES = MAX_CALL_RECORD_EVENTS + 100;
export const MAX_CHUNKS_PER_CALL_RECORD_EVENT = 48;
export const CALL_RECORD_CHUNK_MAX_ENTRIES =
MAX_CALL_RECORD_EVENTS * MAX_CHUNKS_PER_CALL_RECORD_EVENT + MAX_CHUNKS_PER_CALL_RECORD_EVENT;
const RAW_CHUNK_BYTES = 36 * 1024;
export const RAW_CALL_RECORD_CHUNK_BYTES = 47 * 1024;
let callRecordEventSequence = 0;
type CallRecordEventMeta = {
@@ -35,11 +26,7 @@ type CallRecordEventChunk = {
dataBase64: string;
};
type CallRecordMigrationMarker = {
importedAt: string;
};
type PersistedCallRecord = {
export type PersistedCallRecord = {
call: CallRecord;
persistedAt: number;
sequence: number;
@@ -49,10 +36,9 @@ type PersistedCallRecord = {
type CallRecordStateStores = {
events: PluginStateSyncKeyedStore<CallRecordEventMeta>;
chunks: PluginStateSyncKeyedStore<CallRecordEventChunk>;
migrations: PluginStateSyncKeyedStore<CallRecordMigrationMarker>;
};
function resolveCallLogPath(storePath: string): string {
export function resolveVoiceCallLegacyCallLogPath(storePath: string): string {
return path.join(storePath, "calls.jsonl");
}
@@ -77,11 +63,6 @@ function createCallRecordStateStores(storePath: string): CallRecordStateStores |
maxEntries: CALL_RECORD_CHUNK_MAX_ENTRIES,
env,
}),
migrations: runtime.state.openSyncKeyedStore<CallRecordMigrationMarker>({
namespace: CALL_RECORD_MIGRATIONS_NAMESPACE,
maxEntries: 100,
env,
}),
};
}
@@ -98,7 +79,7 @@ function buildChunkKey(eventKey: string, index: number): string {
return `${eventKey}:chunk:${String(index).padStart(4, "0")}`;
}
function buildJsonlEventKey(line: string, index: number): string {
export function buildVoiceCallLegacyJsonlEventKey(line: string, index: number): string {
return `jsonl:${String(index).padStart(8, "0")}:${createHash("sha256").update(line).digest("hex")}`;
}
@@ -117,7 +98,7 @@ function parseEventKeySequence(key: string): number {
return match ? Number.parseInt(match[1], 10) : 0;
}
function parseCallRecordLine(line: string, sequence = 0): PersistedCallRecord | null {
export function parseVoiceCallRecordLine(line: string, sequence = 0): PersistedCallRecord | null {
if (!line.trim()) {
return null;
}
@@ -154,22 +135,70 @@ function parseCallRecordLine(line: string, sequence = 0): PersistedCallRecord |
}
}
function countCallRecordChunks(call: CallRecord): number {
return Math.max(
1,
Math.ceil(Buffer.byteLength(JSON.stringify(call), "utf8") / RAW_CALL_RECORD_CHUNK_BYTES),
);
}
export function prepareVoiceCallRecordForStorage(call: CallRecord): CallRecord {
if (countCallRecordChunks(call) <= MAX_CHUNKS_PER_CALL_RECORD_EVENT) {
return call;
}
const transcriptEntries = call.transcript.length;
const metadata = {
...call.metadata,
voiceCallPersistence: {
transcriptTruncated: true,
originalTranscriptEntries: transcriptEntries,
},
};
const candidateInputs = [
{ transcript: call.transcript.slice(-20), metadata },
{ transcript: [], metadata },
{
transcript: [],
metadata: {
voiceCallPersistence: {
transcriptTruncated: true,
originalTranscriptEntries: transcriptEntries,
metadataTruncated: true,
},
},
},
];
for (const candidateInput of candidateInputs) {
const candidate = CallRecordSchema.parse({
...call,
...candidateInput,
});
if (countCallRecordChunks(candidate) <= MAX_CHUNKS_PER_CALL_RECORD_EVENT) {
return candidate;
}
}
return call;
}
function registerCallRecordEvent(
stores: CallRecordStateStores,
eventKey: string,
call: CallRecord,
order?: { persistedAt: number; sequence: number },
): void {
const serialized = JSON.stringify(call);
const serialized = JSON.stringify(prepareVoiceCallRecordForStorage(call));
const buffer = Buffer.from(serialized, "utf8");
const chunkCount = Math.max(1, Math.ceil(buffer.byteLength / RAW_CHUNK_BYTES));
const chunkCount = Math.max(1, Math.ceil(buffer.byteLength / RAW_CALL_RECORD_CHUNK_BYTES));
if (chunkCount > MAX_CHUNKS_PER_CALL_RECORD_EVENT) {
throw new Error(
`voice-call record exceeds SQLite chunk limit (${chunkCount}/${MAX_CHUNKS_PER_CALL_RECORD_EVENT})`,
);
}
for (let index = 0; index < chunkCount; index += 1) {
const chunk = buffer.subarray(index * RAW_CHUNK_BYTES, (index + 1) * RAW_CHUNK_BYTES);
const chunk = buffer.subarray(
index * RAW_CALL_RECORD_CHUNK_BYTES,
(index + 1) * RAW_CALL_RECORD_CHUNK_BYTES,
);
stores.chunks.register(buildChunkKey(eventKey, index), {
index,
dataBase64: chunk.toString("base64"),
@@ -206,19 +235,6 @@ function pruneCallRecordEvents(stores: CallRecordStateStores): void {
}
}
function registerCallRecordEventIfAbsent(
stores: CallRecordStateStores,
eventKey: string,
record: PersistedCallRecord,
): void {
if (!stores.events.lookup(eventKey)) {
registerCallRecordEvent(stores, eventKey, record.call, {
persistedAt: record.persistedAt,
sequence: record.sequence,
});
}
}
function readCallRecordEvent(stores: CallRecordStateStores, eventKey: string): CallRecord | null {
const meta = stores.events.lookup(eventKey);
if (!meta) {
@@ -233,68 +249,10 @@ function readCallRecordEvent(stores: CallRecordStateStores, eventKey: string): C
chunks.push(Buffer.from(chunk.dataBase64, "base64"));
}
const serialized = Buffer.concat(chunks, meta.byteLength).toString("utf8");
return parseCallRecordLine(serialized)?.call ?? null;
return parseVoiceCallRecordLine(serialized)?.call ?? null;
}
function ensureLegacyCallLogImported(
storePath: string,
stores: CallRecordStateStores,
): PersistedCallRecord[] {
const imported = stores.migrations.lookup(CALL_RECORD_JSONL_MIGRATION_KEY) !== undefined;
const logPath = resolveCallLogPath(storePath);
const content = privateFileStoreSync(storePath).readTextIfExists(path.basename(logPath));
if (content === null) {
if (!imported) {
stores.migrations.register(CALL_RECORD_JSONL_MIGRATION_KEY, {
importedAt: new Date().toISOString(),
});
}
return [];
}
const fallbackCalls: PersistedCallRecord[] = [];
{
let index = 0;
let importFailed = false;
for (const line of content.split("\n")) {
const parsed = parseCallRecordLine(line, index);
if (!parsed) {
index += 1;
continue;
}
// Fallback JSONL writes can appear after the migration marker if SQLite
// persistence had a transient failure. Stable keys make the importer
// idempotent if the legacy file cannot be removed.
try {
registerCallRecordEventIfAbsent(stores, buildJsonlEventKey(line, index), parsed);
} catch (err) {
importFailed = true;
fallbackCalls.push({
...parsed,
orderKey: `jsonl:${String(index).padStart(8, "0")}`,
});
console.error("[voice-call] Failed to import persisted call record:", err);
}
index += 1;
}
if (!importFailed) {
try {
fs.rmSync(logPath, { force: true });
} catch {
// Import already completed; leave an unreadable legacy log in place.
}
}
}
if (!imported) {
stores.migrations.register(CALL_RECORD_JSONL_MIGRATION_KEY, {
importedAt: new Date().toISOString(),
});
}
return fallbackCalls;
}
function readCallRecordEvents(storePath: string, stores: CallRecordStateStores): CallRecord[] {
const fallbackCalls = ensureLegacyCallLogImported(storePath, stores);
function readCallRecordEvents(stores: CallRecordStateStores): CallRecord[] {
const sqliteCalls: PersistedCallRecord[] = stores.events
.entries()
.toSorted((a, b) => a.createdAt - b.createdAt || a.key.localeCompare(b.key))
@@ -310,7 +268,7 @@ function readCallRecordEvents(storePath: string, stores: CallRecordStateStores):
: null;
})
.filter((entry): entry is PersistedCallRecord => entry !== null);
return [...sqliteCalls, ...fallbackCalls]
return sqliteCalls
.toSorted(
(a, b) =>
a.persistedAt - b.persistedAt ||
@@ -321,38 +279,21 @@ function readCallRecordEvents(storePath: string, stores: CallRecordStateStores):
}
export function persistCallRecord(storePath: string, call: CallRecord): void {
const stores = tryCreateCallRecordStateStores(storePath);
if (stores) {
try {
void ensureLegacyCallLogImported(storePath, stores);
const order = nextCallRecordOrder();
registerCallRecordEvent(stores, buildNewEventKey(order), call, order);
return;
} catch (err) {
console.error("[voice-call] Failed to persist call record:", err);
try {
const stores = createCallRecordStateStores(storePath);
if (!stores) {
throw new Error("Voice Call state runtime not initialized");
}
const order = nextCallRecordOrder();
registerCallRecordEvent(stores, buildNewEventKey(order), call, order);
} catch (err) {
console.error("[voice-call] Failed to persist call record:", err);
throw err;
}
const logPath = resolveCallLogPath(storePath);
const order = nextCallRecordOrder();
const line = `${JSON.stringify({ version: 2, ...order, call })}\n`;
// Fire-and-forget async write to avoid blocking event loop.
const write = appendRegularFile({
filePath: logPath,
content: line,
rejectSymlinkParents: true,
})
.catch((err) => {
console.error("[voice-call] Failed to persist call record:", err);
})
.finally(() => {
pendingPersistWrites.delete(write);
});
pendingPersistWrites.add(write);
}
export async function flushPendingCallRecordWritesForTest(): Promise<void> {
await Promise.allSettled(pendingPersistWrites);
await Promise.resolve();
}
export function loadActiveCallsFromStore(storePath: string): {
@@ -362,14 +303,11 @@ export function loadActiveCallsFromStore(storePath: string): {
rejectedProviderCallIds: Set<string>;
} {
const stores = tryCreateCallRecordStateStores(storePath);
let calls: CallRecord[];
let calls: CallRecord[] = [];
try {
calls = stores
? readCallRecordEvents(storePath, stores)
: readCallRecordsFromLegacyLog(storePath);
calls = stores ? readCallRecordEvents(stores) : [];
} catch (err) {
console.error("[voice-call] Failed to read SQLite call records:", err);
calls = readCallRecordsFromLegacyLog(storePath);
}
if (calls.length === 0) {
return {
@@ -415,37 +353,10 @@ export async function getCallHistoryFromStore(
const stores = tryCreateCallRecordStateStores(storePath);
if (stores) {
try {
return readCallRecordEvents(storePath, stores).slice(-limit);
return readCallRecordEvents(stores).slice(-limit);
} catch (err) {
console.error("[voice-call] Failed to read SQLite call history:", err);
}
}
const logPath = resolveCallLogPath(storePath);
const content = await privateFileStore(storePath).readTextIfExists(path.basename(logPath));
if (content === null) {
return [];
}
const lines = content.trim().split("\n").filter(Boolean);
const calls: CallRecord[] = [];
for (const [index, line] of lines.slice(-limit).entries()) {
const parsed = parseCallRecordLine(line, index);
if (parsed) {
calls.push(parsed.call);
}
}
return calls;
}
function readCallRecordsFromLegacyLog(storePath: string): CallRecord[] {
const logPath = resolveCallLogPath(storePath);
const content = privateFileStoreSync(storePath).readTextIfExists(path.basename(logPath));
if (content === null) {
return [];
}
return content
.split("\n")
.map((line, index) => parseCallRecordLine(line, index)?.call ?? null)
.filter((call): call is CallRecord => call !== null);
return [];
}

View File

@@ -1,11 +1,30 @@
import { afterEach, describe, expect, it } from "vitest";
import type { OpenKeyedStoreOptions } from "openclaw/plugin-sdk/plugin-state-runtime";
import {
createPluginStateSyncKeyedStoreForTests,
resetPluginStateStoreForTests,
} from "openclaw/plugin-sdk/plugin-state-test-runtime";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { VoiceCallConfigSchema, type VoiceCallConfig } from "./config.js";
import { CallManager } from "./manager.js";
import { createTestStorePath, FakeProvider } from "./manager.test-harness.js";
import { flushPendingCallRecordWritesForTest } from "./manager/store.js";
import { clearVoiceCallStateRuntime, setVoiceCallStateRuntime } from "./runtime-state.js";
import type { WebhookContext, WebhookParseOptions } from "./types.js";
import { VoiceCallWebhookServer } from "./webhook.js";
function installStateRuntime(): void {
setVoiceCallStateRuntime({
state: {
resolveStateDir: () => "",
openKeyedStore: (() => {
throw new Error("openKeyedStore is not used by voice-call webhook lifecycle tests");
}) as never,
openSyncKeyedStore: (options: OpenKeyedStoreOptions) =>
createPluginStateSyncKeyedStoreForTests("voice-call", options),
},
});
}
const createConfig = (overrides: Partial<VoiceCallConfig> = {}): VoiceCallConfig => {
const base = VoiceCallConfigSchema.parse({
enabled: true,
@@ -118,8 +137,14 @@ class RejectInboundReplayWithHangupFailureProvider extends RejectInboundReplayPr
}
describe("Voice-call webhook hangup-once lifecycle", () => {
beforeEach(() => {
resetPluginStateStoreForTests();
installStateRuntime();
});
afterEach(() => {
// Each test uses an isolated store path, so only server cleanup is needed.
clearVoiceCallStateRuntime();
resetPluginStateStoreForTests();
});
it("hangs up a rejected inbound replay only once across duplicate webhook delivery", async () => {