diff --git a/CHANGELOG.md b/CHANGELOG.md index d1969717c01..5d7ddf8e810 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ Docs: https://docs.openclaw.ai - Google Meet/Voice Call: make Twilio setup preflight honor explicit `--transport twilio` and fail local/private Voice Call webhook URLs, including IPv6 loopback and unique-local forms, before joins. Thanks @donkeykong91 and @PfanP. - Voice Call/Twilio: retry transient 21220 live-call TwiML updates and catch answered-path initial-greeting failures, so a fast answered callback no longer crashes the Gateway or drops the Twilio greeting/listen transition. (#74606) Thanks @Sivan22. - Voice Call/Twilio: register accepted media streams immediately but wait for realtime transcription readiness before speaking the initial greeting, so reconnect grace handling stays live while OpenAI STT startup is no longer starved by TTS. Fixes #75197. (#75257) Thanks @donkeykong91 and @PfanP. +- Voice Call CLI: run gateway-delegated `voicecall continue` through operation-id polling and protocol-shaped errors, so long conversational turns keep their transcript result without blocking a single Gateway RPC. (#75459) Thanks @serrurco and @DougButdorf. - Voice Call CLI: delegate operational `voicecall` commands to the running Gateway runtime and skip webhook startup during CLI-only plugin loading, preventing webhook port conflicts and `setup --json` hangs. Fixes #72345. Thanks @serrurco and @DougButdorf. - Agents/pi-embedded-runner: extract the `abortable` provider-call wrapper from `runEmbeddedAttempt` to module scope so its promise handlers no longer close over the run lexical context, releasing transcripts, tool buffers, and subscription callbacks when a provider call hangs past abort. (#74182) Thanks @cjboy007. - Docker: restore `python3` in the gateway runtime image after the slim-runtime switch. Fixes #75041. diff --git a/extensions/voice-call/index.test.ts b/extensions/voice-call/index.test.ts index 89f53220380..8f58d3e4939 100644 --- a/extensions/voice-call/index.test.ts +++ b/extensions/voice-call/index.test.ts @@ -317,8 +317,9 @@ describe("voice-call plugin", () => { expect(createVoiceCallRuntime).not.toHaveBeenCalled(); expect(respond).toHaveBeenCalledWith( false, + undefined, expect.objectContaining({ - error: expect.stringContaining("TWILIO_ACCOUNT_SID"), + message: expect.stringContaining("TWILIO_ACCOUNT_SID"), }), ); }); @@ -534,7 +535,7 @@ describe("voice-call plugin", () => { }); expect(callGatewayFromCliMock).toHaveBeenCalledWith( "voicecall.start", - { json: true, timeout: "5000" }, + { json: true, timeout: "35000" }, { to: "+1", message: "Hello", mode: "conversation" }, { progress: false }, ); @@ -545,6 +546,145 @@ describe("voice-call plugin", () => { } }); + it("responds with protocol errors for delegated gateway failures", async () => { + const { methods } = setup({ provider: "mock" }); + const handler = methods.get("voicecall.start") as + | ((ctx: { + params: Record; + respond: ReturnType; + }) => Promise) + | undefined; + const respond = vi.fn(); + + await handler?.({ params: {}, respond }); + + expect(respond).toHaveBeenCalledWith( + false, + undefined, + expect.objectContaining({ + code: "INVALID_REQUEST", + message: "to required", + }), + ); + }); + + it("starts and polls delegated gateway continue operations", async () => { + callGatewayFromCliMock + .mockResolvedValueOnce({ + operationId: "op-1", + status: "pending", + pollTimeoutMs: 180000, + }) + .mockResolvedValueOnce({ + operationId: "op-1", + status: "completed", + result: { success: true, transcript: "gateway hello" }, + }); + const program = new Command(); + const stdout = captureStdout(); + await registerVoiceCallCli(program, { + provider: "mock", + transcriptTimeoutMs: 120000, + tts: { timeoutMs: 30000 }, + }); + + try { + await program.parseAsync( + ["voicecall", "continue", "--call-id", "call-1", "--message", "Hello"], + { + from: "user", + }, + ); + expect(callGatewayFromCliMock).toHaveBeenCalledWith( + "voicecall.continue.start", + { json: true, timeout: "35000" }, + { callId: "call-1", message: "Hello" }, + { progress: false }, + ); + expect(callGatewayFromCliMock).toHaveBeenCalledWith( + "voicecall.continue.result", + { json: true, timeout: "5000" }, + { operationId: "op-1" }, + { progress: false }, + ); + expect(createVoiceCallRuntime).not.toHaveBeenCalled(); + expect(stdout.output()).toContain('"transcript": "gateway hello"'); + } finally { + stdout.restore(); + } + }); + + it("gateway continue operations return pending then completed results", async () => { + let finishContinue: ((value: { success: true; transcript: string }) => void) | undefined; + const continuePromise = new Promise<{ success: true; transcript: string }>((resolve) => { + finishContinue = resolve; + }); + runtimeStub.manager.continueCall = vi.fn( + async () => await continuePromise, + ) as VoiceCallRuntime["manager"]["continueCall"]; + const { methods } = setup({ + provider: "mock", + transcriptTimeoutMs: 120000, + tts: { timeoutMs: 30000 }, + }); + const start = methods.get("voicecall.continue.start") as + | ((ctx: { + params: Record; + respond: ReturnType; + }) => Promise) + | undefined; + const result = methods.get("voicecall.continue.result") as + | ((ctx: { + params: Record; + respond: ReturnType; + }) => Promise) + | undefined; + const startRespond = vi.fn(); + + await start?.({ + params: { callId: "call-1", message: "Hello" }, + respond: startRespond, + }); + const startPayload = startRespond.mock.calls[0]?.[1] as + | { operationId?: string; pollTimeoutMs?: number } + | undefined; + expect(startPayload).toEqual( + expect.objectContaining({ + operationId: expect.any(String), + status: "pending", + pollTimeoutMs: 180000, + }), + ); + expect(runtimeStub.manager.continueCall).toHaveBeenCalledWith("call-1", "Hello"); + + const pendingRespond = vi.fn(); + await result?.({ + params: { operationId: startPayload?.operationId }, + respond: pendingRespond, + }); + expect(pendingRespond).toHaveBeenCalledWith( + true, + expect.objectContaining({ status: "pending" }), + ); + + finishContinue?.({ success: true, transcript: "gateway hello" }); + await continuePromise; + await Promise.resolve(); + + const completedRespond = vi.fn(); + await result?.({ + params: { operationId: startPayload?.operationId }, + respond: completedRespond, + }); + expect(completedRespond).toHaveBeenCalledWith( + true, + expect.objectContaining({ + status: "completed", + result: { success: true, transcript: "gateway hello" }, + }), + ); + }); + it("CLI setup prints human-readable checks by default", async () => { const program = new Command(); const stdout = captureStdout(); diff --git a/extensions/voice-call/index.ts b/extensions/voice-call/index.ts index c094b843c7a..a05f8fb4267 100644 --- a/extensions/voice-call/index.ts +++ b/extensions/voice-call/index.ts @@ -1,4 +1,5 @@ import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; +import { ErrorCodes, errorShape } from "openclaw/plugin-sdk/gateway-runtime"; import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime"; import { Type } from "typebox"; import { @@ -19,6 +20,7 @@ import { type VoiceCallConfig, } from "./src/config.js"; import type { CoreConfig } from "./src/core-bridge.js"; +import { createVoiceCallContinueOperationStore } from "./src/gateway-continue-operation.js"; const voiceCallConfigSchema = { parse(value: unknown): VoiceCallConfig { @@ -203,6 +205,10 @@ export default definePluginEntry({ } const runtimeState = getVoiceCallRuntimeGlobalState(); + const continueOperationStore = createVoiceCallContinueOperationStore({ + config, + coreConfig: api.config as CoreConfig, + }); const ensureRuntime = async (): Promise => { if (!config.enabled) { @@ -258,8 +264,16 @@ export default definePluginEntry({ } }; - const sendError = (respond: (ok: boolean, payload?: unknown) => void, err: unknown) => { - respond(false, { error: formatErrorMessage(err) }); + const respondError = ( + respond: GatewayRequestHandlerOptions["respond"], + message: string, + code: (typeof ErrorCodes)[keyof typeof ErrorCodes] = ErrorCodes.UNAVAILABLE, + ) => { + respond(false, undefined, errorShape(code, message)); + }; + + const sendError = (respond: GatewayRequestHandlerOptions["respond"], err: unknown) => { + respondError(respond, formatErrorMessage(err)); }; const resolveCallMessageRequest = async (params: GatewayRequestHandlerOptions["params"]) => { @@ -271,6 +285,7 @@ export default definePluginEntry({ const rt = await ensureRuntime(); return { rt, callId, message } as const; }; + const initiateCallAndRespond = async (params: { rt: VoiceCallRuntime; respond: GatewayRequestHandlerOptions["respond"]; @@ -285,7 +300,7 @@ export default definePluginEntry({ dtmfSequence: params.dtmfSequence, }); if (!result.success) { - params.respond(false, { error: result.error || "initiate failed" }); + respondError(params.respond, result.error || "initiate failed"); return; } params.respond(true, { callId: result.callId, initiated: true }); @@ -306,12 +321,16 @@ export default definePluginEntry({ }) => { const request = await resolveCallMessageRequest(params.requestParams); if ("error" in request) { - params.respond(false, { error: request.error }); + respondError( + params.respond, + request.error ?? "callId and message required", + ErrorCodes.INVALID_REQUEST, + ); return; } const result = await params.action(request); if (!result.success) { - params.respond(false, { error: result.error || params.failure }); + respondError(params.respond, result.error || params.failure); return; } params.respond( @@ -328,13 +347,13 @@ export default definePluginEntry({ try { const message = normalizeOptionalString(params?.message) ?? ""; if (!message) { - respond(false, { error: "message required" }); + respondError(respond, "message required", ErrorCodes.INVALID_REQUEST); return; } const rt = await ensureRuntime(); const to = normalizeOptionalString(params?.to) ?? rt.config.toNumber; if (!to) { - respond(false, { error: "to required" }); + respondError(respond, "to required", ErrorCodes.INVALID_REQUEST); return; } const mode = @@ -369,13 +388,58 @@ export default definePluginEntry({ }, ); + api.registerGatewayMethod( + "voicecall.continue.start", + async ({ params, respond }: GatewayRequestHandlerOptions) => { + try { + const request = await resolveCallMessageRequest(params); + if ("error" in request) { + respondError( + respond, + request.error ?? "callId and message required", + ErrorCodes.INVALID_REQUEST, + ); + return; + } + respond(true, continueOperationStore.start(request)); + } catch (err) { + sendError(respond, err); + } + }, + ); + + api.registerGatewayMethod( + "voicecall.continue.result", + async ({ params, respond }: GatewayRequestHandlerOptions) => { + try { + const operationId = normalizeOptionalString(params?.operationId) ?? ""; + if (!operationId) { + respondError(respond, "operationId required", ErrorCodes.INVALID_REQUEST); + return; + } + const operation = continueOperationStore.read(operationId); + if (!operation.ok) { + respondError(respond, operation.error, ErrorCodes.INVALID_REQUEST); + return; + } + respond(true, operation.payload); + } catch (err) { + sendError(respond, err); + } + }, + ); + api.registerGatewayMethod( "voicecall.speak", async ({ params, respond }: GatewayRequestHandlerOptions) => { try { const request = await resolveCallMessageRequest(params); if ("error" in request) { - respond(false, { error: request.error }); + respondError( + respond, + request.error ?? "callId and message required", + ErrorCodes.INVALID_REQUEST, + ); return; } if (request.rt.config.realtime.enabled) { @@ -390,7 +454,7 @@ export default definePluginEntry({ } const result = await request.rt.manager.speak(request.callId, request.message); if (!result.success) { - respond(false, { error: result.error || "speak failed" }); + respondError(respond, result.error || "speak failed"); return; } respond(true, { success: true }); @@ -407,13 +471,13 @@ export default definePluginEntry({ const callId = normalizeOptionalString(params?.callId) ?? ""; const digits = normalizeOptionalString(params?.digits) ?? ""; if (!callId || !digits) { - respond(false, { error: "callId and digits required" }); + respondError(respond, "callId and digits required", ErrorCodes.INVALID_REQUEST); return; } const rt = await ensureRuntime(); const result = await rt.manager.sendDtmf(callId, digits); if (!result.success) { - respond(false, { error: result.error || "dtmf failed" }); + respondError(respond, result.error || "dtmf failed"); return; } respond(true, { success: true }); @@ -429,13 +493,13 @@ export default definePluginEntry({ try { const callId = normalizeOptionalString(params?.callId) ?? ""; if (!callId) { - respond(false, { error: "callId required" }); + respondError(respond, "callId required", ErrorCodes.INVALID_REQUEST); return; } const rt = await ensureRuntime(); const result = await rt.manager.endCall(callId); if (!result.success) { - respond(false, { error: result.error || "end failed" }); + respondError(respond, result.error || "end failed"); return; } respond(true, { success: true }); @@ -476,7 +540,7 @@ export default definePluginEntry({ const message = normalizeOptionalString(params?.message) ?? ""; const dtmfSequence = normalizeOptionalString(params?.dtmfSequence); if (!to) { - respond(false, { error: "to required" }); + respondError(respond, "to required", ErrorCodes.INVALID_REQUEST); return; } const rt = await ensureRuntime(); diff --git a/extensions/voice-call/src/cli.ts b/extensions/voice-call/src/cli.ts index f0bd8c3a8a8..a555f728eaf 100644 --- a/extensions/voice-call/src/cli.ts +++ b/extensions/voice-call/src/cli.ts @@ -38,6 +38,8 @@ type VoiceCallGatewayMethod = | "voicecall.initiate" | "voicecall.start" | "voicecall.continue" + | "voicecall.continue.start" + | "voicecall.continue.result" | "voicecall.speak" | "voicecall.dtmf" | "voicecall.end" @@ -45,7 +47,10 @@ type VoiceCallGatewayMethod = type VoiceCallGatewayCallResult = { ok: true; payload: unknown } | { ok: false; error: unknown }; -const VOICE_CALL_GATEWAY_TIMEOUT_MS = "5000"; +const VOICE_CALL_GATEWAY_DEFAULT_TIMEOUT_MS = 5000; +const VOICE_CALL_GATEWAY_OPERATION_TIMEOUT_MS = 30000; +const VOICE_CALL_GATEWAY_TRANSCRIPT_BUFFER_MS = 10000; +const VOICE_CALL_GATEWAY_POLL_INTERVAL_MS = 1000; const voiceCallCliDeps = { callGatewayFromCli, @@ -83,11 +88,16 @@ function isGatewayUnavailableForLocalFallback(err: unknown): boolean { async function callVoiceCallGateway( method: VoiceCallGatewayMethod, params?: Record, + opts?: { timeoutMs?: number }, ): Promise { try { + const timeoutMs = + typeof opts?.timeoutMs === "number" && Number.isFinite(opts.timeoutMs) + ? Math.max(1, Math.ceil(opts.timeoutMs)) + : VOICE_CALL_GATEWAY_DEFAULT_TIMEOUT_MS; const payload = await voiceCallCliDeps.callGatewayFromCli( method, - { json: true, timeout: VOICE_CALL_GATEWAY_TIMEOUT_MS }, + { json: true, timeout: String(timeoutMs) }, params, { progress: false }, ); @@ -100,6 +110,94 @@ async function callVoiceCallGateway( } } +function resolveGatewayOperationTimeoutMs(config: VoiceCallConfig): number { + return Math.max(VOICE_CALL_GATEWAY_OPERATION_TIMEOUT_MS, config.ringTimeoutMs + 5000); +} + +function resolveGatewayContinueTimeoutMs(config: VoiceCallConfig): number { + return ( + config.transcriptTimeoutMs + + VOICE_CALL_GATEWAY_OPERATION_TIMEOUT_MS + + VOICE_CALL_GATEWAY_TRANSCRIPT_BUFFER_MS + ); +} + +function isUnknownGatewayMethod(err: unknown, method: VoiceCallGatewayMethod): boolean { + return formatErrorMessage(err).includes(`unknown method: ${method}`); +} + +function readGatewayOperationId(payload: unknown): string { + if (isRecord(payload) && typeof payload.operationId === "string" && payload.operationId) { + return payload.operationId; + } + throw new Error("voicecall gateway response missing operationId"); +} + +function readGatewayPollTimeoutMs(payload: unknown, fallbackTimeoutMs: number): number { + if (isRecord(payload) && typeof payload.pollTimeoutMs === "number") { + return Math.max(1, Math.ceil(payload.pollTimeoutMs)); + } + return fallbackTimeoutMs; +} + +function readCompletedContinueResult( + payload: unknown, +): + | { status: "pending" } + | { status: "completed"; result: unknown } + | { status: "failed"; error: string } { + if (!isRecord(payload)) { + throw new Error("voicecall gateway response missing operation status"); + } + if (payload.status === "pending") { + return { status: "pending" }; + } + if (payload.status === "failed") { + return { + status: "failed", + error: typeof payload.error === "string" ? payload.error : "continue failed", + }; + } + if (payload.status === "completed") { + return { status: "completed", result: payload.result }; + } + throw new Error("voicecall gateway response has unknown operation status"); +} + +async function pollVoiceCallContinueGateway(params: { + operationId: string; + timeoutMs: number; +}): Promise { + const deadlineMs = Date.now() + params.timeoutMs; + + while (Date.now() <= deadlineMs) { + const gateway = await callVoiceCallGateway( + "voicecall.continue.result", + { operationId: params.operationId }, + { timeoutMs: VOICE_CALL_GATEWAY_DEFAULT_TIMEOUT_MS }, + ); + if (!gateway.ok) { + throw new Error( + `gateway unavailable while waiting for voicecall continue result: ${formatErrorMessage( + gateway.error, + )}`, + ); + } + const result = readCompletedContinueResult(gateway.payload); + if (result.status === "completed") { + return result.result; + } + if (result.status === "failed") { + throw new Error(result.error); + } + await sleep( + Math.min(VOICE_CALL_GATEWAY_POLL_INTERVAL_MS, Math.max(1, deadlineMs - Date.now())), + ); + } + + throw new Error("voicecall continue timed out waiting for gateway operation"); +} + function resolveMode(input: string): "off" | "serve" | "funnel" { const raw = normalizeOptionalLowercaseString(input) ?? ""; if (raw === "serve" || raw === "off") { @@ -252,17 +350,24 @@ function writeGatewayCallId(payload: unknown): void { async function initiateCallViaGatewayOrRuntime(params: { ensureRuntime: () => Promise; + config: VoiceCallConfig; method: "voicecall.initiate" | "voicecall.start"; to?: string; message?: string; mode?: string; }) { const mode = resolveCallMode(params.mode); - const gateway = await callVoiceCallGateway(params.method, { - ...(params.to ? { to: params.to } : {}), - ...(params.message ? { message: params.message } : {}), - ...(mode ? { mode } : {}), - }); + const gateway = await callVoiceCallGateway( + params.method, + { + ...(params.to ? { to: params.to } : {}), + ...(params.message ? { message: params.message } : {}), + ...(mode ? { mode } : {}), + }, + { + timeoutMs: resolveGatewayOperationTimeoutMs(params.config), + }, + ); if (gateway.ok) { writeGatewayCallId(gateway.payload); return; @@ -355,11 +460,17 @@ export function registerVoiceCallCli(params: { return; } const mode = resolveCallMode(options.mode) ?? "notify"; - const gateway = await callVoiceCallGateway("voicecall.start", { - to: options.to, - ...(options.message ? { message: options.message } : {}), - mode, - }); + const gateway = await callVoiceCallGateway( + "voicecall.start", + { + to: options.to, + ...(options.message ? { message: options.message } : {}), + mode, + }, + { + timeoutMs: resolveGatewayOperationTimeoutMs(config), + }, + ); let callId: unknown; if (gateway.ok) { callId = isRecord(gateway.payload) ? gateway.payload.callId : undefined; @@ -402,6 +513,7 @@ export function registerVoiceCallCli(params: { .action(async (options: { message: string; to?: string; mode?: string }) => { await initiateCallViaGatewayOrRuntime({ ensureRuntime, + config, method: "voicecall.initiate", to: options.to, message: options.message, @@ -422,6 +534,7 @@ export function registerVoiceCallCli(params: { .action(async (options: { to: string; message?: string; mode?: string }) => { await initiateCallViaGatewayOrRuntime({ ensureRuntime, + config, method: "voicecall.start", to: options.to, message: options.message, @@ -435,11 +548,45 @@ export function registerVoiceCallCli(params: { .requiredOption("--call-id ", "Call ID") .requiredOption("--message ", "Message to speak") .action(async (options: { callId: string; message: string }) => { - const gateway = await callVoiceCallGateway("voicecall.continue", { - callId: options.callId, - message: options.message, - }); + let gateway: VoiceCallGatewayCallResult; + try { + gateway = await callVoiceCallGateway( + "voicecall.continue.start", + { + callId: options.callId, + message: options.message, + }, + { + timeoutMs: resolveGatewayOperationTimeoutMs(config), + }, + ); + } catch (err) { + if (!isUnknownGatewayMethod(err, "voicecall.continue.start")) { + throw err; + } + gateway = await callVoiceCallGateway( + "voicecall.continue", + { + callId: options.callId, + message: options.message, + }, + { + timeoutMs: resolveGatewayContinueTimeoutMs(config), + }, + ); + } if (gateway.ok) { + if (isRecord(gateway.payload) && typeof gateway.payload.operationId === "string") { + const result = await pollVoiceCallContinueGateway({ + operationId: readGatewayOperationId(gateway.payload), + timeoutMs: readGatewayPollTimeoutMs( + gateway.payload, + resolveGatewayContinueTimeoutMs(config), + ), + }); + writeStdoutJson(result); + return; + } writeStdoutJson(gateway.payload); return; } diff --git a/extensions/voice-call/src/gateway-continue-operation.ts b/extensions/voice-call/src/gateway-continue-operation.ts new file mode 100644 index 00000000000..f0f2a76d1fe --- /dev/null +++ b/extensions/voice-call/src/gateway-continue-operation.ts @@ -0,0 +1,200 @@ +import { randomUUID } from "node:crypto"; +import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; +import type { VoiceCallConfig } from "./config.js"; +import type { CoreConfig } from "./core-bridge.js"; +import type { VoiceCallRuntime } from "./runtime.js"; +import { TELEPHONY_DEFAULT_TTS_TIMEOUT_MS } from "./telephony-tts.js"; + +const VOICE_CALL_CONTINUE_OPERATION_BUFFER_MS = 30000; +const VOICE_CALL_CONTINUE_OPERATION_CLEANUP_MS = 5 * 60 * 1000; + +type VoiceCallContinueOperation = + | { + operationId: string; + status: "pending"; + callId: string; + startedAtMs: number; + pollTimeoutMs: number; + } + | { + operationId: string; + status: "completed"; + callId: string; + startedAtMs: number; + completedAtMs: number; + pollTimeoutMs: number; + result: { success: true; transcript?: string }; + } + | { + operationId: string; + status: "failed"; + callId: string; + startedAtMs: number; + completedAtMs: number; + pollTimeoutMs: number; + error: string; + }; + +export type VoiceCallContinueOperationStartPayload = { + operationId: string; + status: "pending"; + pollTimeoutMs: number; +}; + +export type VoiceCallContinueOperationResultPayload = + | { + operationId: string; + status: "pending"; + pollTimeoutMs: number; + } + | { + operationId: string; + status: "completed"; + result: { success: true; transcript?: string }; + } + | { + operationId: string; + status: "failed"; + error: string; + }; + +export type VoiceCallContinueOperationRequest = { + rt: VoiceCallRuntime; + callId: string; + message: string; +}; + +export function createVoiceCallContinueOperationStore(params: { + config: VoiceCallConfig; + coreConfig: CoreConfig; +}) { + const operations = new Map(); + + const resolvePollTimeoutMs = (rt: VoiceCallRuntime): number => { + const ttsTimeoutMs = + rt.config.tts?.timeoutMs ?? + params.config.tts?.timeoutMs ?? + params.coreConfig.messages?.tts?.timeoutMs ?? + TELEPHONY_DEFAULT_TTS_TIMEOUT_MS; + return ( + (rt.config.transcriptTimeoutMs ?? params.config.transcriptTimeoutMs) + + ttsTimeoutMs + + VOICE_CALL_CONTINUE_OPERATION_BUFFER_MS + ); + }; + + const scheduleCleanup = (operationId: string) => { + const timer = setTimeout(() => { + operations.delete(operationId); + }, VOICE_CALL_CONTINUE_OPERATION_CLEANUP_MS); + timer.unref?.(); + }; + + const start = ( + request: VoiceCallContinueOperationRequest, + ): VoiceCallContinueOperationStartPayload => { + const operationId = randomUUID(); + const startedAtMs = Date.now(); + const pollTimeoutMs = resolvePollTimeoutMs(request.rt); + operations.set(operationId, { + operationId, + status: "pending", + callId: request.callId, + startedAtMs, + pollTimeoutMs, + }); + + void request.rt.manager + .continueCall(request.callId, request.message) + .then((result) => { + const current = operations.get(operationId); + if (!current || current.status !== "pending") { + return; + } + if (!result.success) { + operations.set(operationId, { + operationId, + status: "failed", + callId: request.callId, + startedAtMs, + completedAtMs: Date.now(), + pollTimeoutMs, + error: result.error || "continue failed", + }); + return; + } + operations.set(operationId, { + operationId, + status: "completed", + callId: request.callId, + startedAtMs, + completedAtMs: Date.now(), + pollTimeoutMs, + result: { success: true, transcript: result.transcript }, + }); + }) + .catch((err) => { + const current = operations.get(operationId); + if (!current || current.status !== "pending") { + return; + } + operations.set(operationId, { + operationId, + status: "failed", + callId: request.callId, + startedAtMs, + completedAtMs: Date.now(), + pollTimeoutMs, + error: formatErrorMessage(err), + }); + }) + .finally(() => { + scheduleCleanup(operationId); + }); + + return { operationId, status: "pending", pollTimeoutMs }; + }; + + const read = ( + operationId: string, + ): + | { ok: true; payload: VoiceCallContinueOperationResultPayload } + | { ok: false; error: string } => { + const operation = operations.get(operationId); + if (!operation) { + return { ok: false, error: "operation not found" }; + } + if (operation.status === "pending") { + return { + ok: true, + payload: { + operationId, + status: "pending", + pollTimeoutMs: operation.pollTimeoutMs, + }, + }; + } + if (operation.status === "failed") { + operations.delete(operationId); + return { + ok: true, + payload: { + operationId, + status: "failed", + error: operation.error, + }, + }; + } + operations.delete(operationId); + return { + ok: true, + payload: { + operationId, + status: "completed", + result: operation.result, + }, + }; + }; + + return { start, read }; +} diff --git a/extensions/voice-call/src/telephony-tts.ts b/extensions/voice-call/src/telephony-tts.ts index 98e319943c5..9840374b689 100644 --- a/extensions/voice-call/src/telephony-tts.ts +++ b/extensions/voice-call/src/telephony-tts.ts @@ -24,7 +24,7 @@ export type TelephonyTtsProvider = { synthesizeForTelephony: (text: string) => Promise; }; -const TELEPHONY_DEFAULT_TTS_TIMEOUT_MS = 8000; +export const TELEPHONY_DEFAULT_TTS_TIMEOUT_MS = 8000; export function createTelephonyTtsProvider(params: { coreConfig: CoreConfig;