mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 10:30:44 +00:00
fix: voice-call CLI gateway delegation path actionable regressions (#75459)
Fix voice-call CLI gateway delegation by returning protocol-shaped errors and running delegated continue turns through operation-id polling instead of one long Gateway RPC.\n\nThanks @serrurco and @DougButdorf.
This commit is contained in:
@@ -317,8 +317,9 @@ describe("voice-call plugin", () => {
|
||||
expect(createVoiceCallRuntime).not.toHaveBeenCalled();
|
||||
expect(respond).toHaveBeenCalledWith(
|
||||
false,
|
||||
undefined,
|
||||
expect.objectContaining({
|
||||
error: expect.stringContaining("TWILIO_ACCOUNT_SID"),
|
||||
message: expect.stringContaining("TWILIO_ACCOUNT_SID"),
|
||||
}),
|
||||
);
|
||||
});
|
||||
@@ -534,7 +535,7 @@ describe("voice-call plugin", () => {
|
||||
});
|
||||
expect(callGatewayFromCliMock).toHaveBeenCalledWith(
|
||||
"voicecall.start",
|
||||
{ json: true, timeout: "5000" },
|
||||
{ json: true, timeout: "35000" },
|
||||
{ to: "+1", message: "Hello", mode: "conversation" },
|
||||
{ progress: false },
|
||||
);
|
||||
@@ -545,6 +546,145 @@ describe("voice-call plugin", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("responds with protocol errors for delegated gateway failures", async () => {
|
||||
const { methods } = setup({ provider: "mock" });
|
||||
const handler = methods.get("voicecall.start") as
|
||||
| ((ctx: {
|
||||
params: Record<string, unknown>;
|
||||
respond: ReturnType<typeof vi.fn>;
|
||||
}) => Promise<void>)
|
||||
| undefined;
|
||||
const respond = vi.fn();
|
||||
|
||||
await handler?.({ params: {}, respond });
|
||||
|
||||
expect(respond).toHaveBeenCalledWith(
|
||||
false,
|
||||
undefined,
|
||||
expect.objectContaining({
|
||||
code: "INVALID_REQUEST",
|
||||
message: "to required",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("starts and polls delegated gateway continue operations", async () => {
|
||||
callGatewayFromCliMock
|
||||
.mockResolvedValueOnce({
|
||||
operationId: "op-1",
|
||||
status: "pending",
|
||||
pollTimeoutMs: 180000,
|
||||
})
|
||||
.mockResolvedValueOnce({
|
||||
operationId: "op-1",
|
||||
status: "completed",
|
||||
result: { success: true, transcript: "gateway hello" },
|
||||
});
|
||||
const program = new Command();
|
||||
const stdout = captureStdout();
|
||||
await registerVoiceCallCli(program, {
|
||||
provider: "mock",
|
||||
transcriptTimeoutMs: 120000,
|
||||
tts: { timeoutMs: 30000 },
|
||||
});
|
||||
|
||||
try {
|
||||
await program.parseAsync(
|
||||
["voicecall", "continue", "--call-id", "call-1", "--message", "Hello"],
|
||||
{
|
||||
from: "user",
|
||||
},
|
||||
);
|
||||
expect(callGatewayFromCliMock).toHaveBeenCalledWith(
|
||||
"voicecall.continue.start",
|
||||
{ json: true, timeout: "35000" },
|
||||
{ callId: "call-1", message: "Hello" },
|
||||
{ progress: false },
|
||||
);
|
||||
expect(callGatewayFromCliMock).toHaveBeenCalledWith(
|
||||
"voicecall.continue.result",
|
||||
{ json: true, timeout: "5000" },
|
||||
{ operationId: "op-1" },
|
||||
{ progress: false },
|
||||
);
|
||||
expect(createVoiceCallRuntime).not.toHaveBeenCalled();
|
||||
expect(stdout.output()).toContain('"transcript": "gateway hello"');
|
||||
} finally {
|
||||
stdout.restore();
|
||||
}
|
||||
});
|
||||
|
||||
it("gateway continue operations return pending then completed results", async () => {
|
||||
let finishContinue: ((value: { success: true; transcript: string }) => void) | undefined;
|
||||
const continuePromise = new Promise<{ success: true; transcript: string }>((resolve) => {
|
||||
finishContinue = resolve;
|
||||
});
|
||||
runtimeStub.manager.continueCall = vi.fn(
|
||||
async () => await continuePromise,
|
||||
) as VoiceCallRuntime["manager"]["continueCall"];
|
||||
const { methods } = setup({
|
||||
provider: "mock",
|
||||
transcriptTimeoutMs: 120000,
|
||||
tts: { timeoutMs: 30000 },
|
||||
});
|
||||
const start = methods.get("voicecall.continue.start") as
|
||||
| ((ctx: {
|
||||
params: Record<string, unknown>;
|
||||
respond: ReturnType<typeof vi.fn>;
|
||||
}) => Promise<void>)
|
||||
| undefined;
|
||||
const result = methods.get("voicecall.continue.result") as
|
||||
| ((ctx: {
|
||||
params: Record<string, unknown>;
|
||||
respond: ReturnType<typeof vi.fn>;
|
||||
}) => Promise<void>)
|
||||
| undefined;
|
||||
const startRespond = vi.fn();
|
||||
|
||||
await start?.({
|
||||
params: { callId: "call-1", message: "Hello" },
|
||||
respond: startRespond,
|
||||
});
|
||||
const startPayload = startRespond.mock.calls[0]?.[1] as
|
||||
| { operationId?: string; pollTimeoutMs?: number }
|
||||
| undefined;
|
||||
expect(startPayload).toEqual(
|
||||
expect.objectContaining({
|
||||
operationId: expect.any(String),
|
||||
status: "pending",
|
||||
pollTimeoutMs: 180000,
|
||||
}),
|
||||
);
|
||||
expect(runtimeStub.manager.continueCall).toHaveBeenCalledWith("call-1", "Hello");
|
||||
|
||||
const pendingRespond = vi.fn();
|
||||
await result?.({
|
||||
params: { operationId: startPayload?.operationId },
|
||||
respond: pendingRespond,
|
||||
});
|
||||
expect(pendingRespond).toHaveBeenCalledWith(
|
||||
true,
|
||||
expect.objectContaining({ status: "pending" }),
|
||||
);
|
||||
|
||||
finishContinue?.({ success: true, transcript: "gateway hello" });
|
||||
await continuePromise;
|
||||
await Promise.resolve();
|
||||
|
||||
const completedRespond = vi.fn();
|
||||
await result?.({
|
||||
params: { operationId: startPayload?.operationId },
|
||||
respond: completedRespond,
|
||||
});
|
||||
expect(completedRespond).toHaveBeenCalledWith(
|
||||
true,
|
||||
expect.objectContaining({
|
||||
status: "completed",
|
||||
result: { success: true, transcript: "gateway hello" },
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("CLI setup prints human-readable checks by default", async () => {
|
||||
const program = new Command();
|
||||
const stdout = captureStdout();
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
|
||||
import { ErrorCodes, errorShape } from "openclaw/plugin-sdk/gateway-runtime";
|
||||
import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime";
|
||||
import { Type } from "typebox";
|
||||
import {
|
||||
@@ -19,6 +20,7 @@ import {
|
||||
type VoiceCallConfig,
|
||||
} from "./src/config.js";
|
||||
import type { CoreConfig } from "./src/core-bridge.js";
|
||||
import { createVoiceCallContinueOperationStore } from "./src/gateway-continue-operation.js";
|
||||
|
||||
const voiceCallConfigSchema = {
|
||||
parse(value: unknown): VoiceCallConfig {
|
||||
@@ -203,6 +205,10 @@ export default definePluginEntry({
|
||||
}
|
||||
|
||||
const runtimeState = getVoiceCallRuntimeGlobalState();
|
||||
const continueOperationStore = createVoiceCallContinueOperationStore({
|
||||
config,
|
||||
coreConfig: api.config as CoreConfig,
|
||||
});
|
||||
|
||||
const ensureRuntime = async (): Promise<VoiceCallRuntime> => {
|
||||
if (!config.enabled) {
|
||||
@@ -258,8 +264,16 @@ export default definePluginEntry({
|
||||
}
|
||||
};
|
||||
|
||||
const sendError = (respond: (ok: boolean, payload?: unknown) => void, err: unknown) => {
|
||||
respond(false, { error: formatErrorMessage(err) });
|
||||
const respondError = (
|
||||
respond: GatewayRequestHandlerOptions["respond"],
|
||||
message: string,
|
||||
code: (typeof ErrorCodes)[keyof typeof ErrorCodes] = ErrorCodes.UNAVAILABLE,
|
||||
) => {
|
||||
respond(false, undefined, errorShape(code, message));
|
||||
};
|
||||
|
||||
const sendError = (respond: GatewayRequestHandlerOptions["respond"], err: unknown) => {
|
||||
respondError(respond, formatErrorMessage(err));
|
||||
};
|
||||
|
||||
const resolveCallMessageRequest = async (params: GatewayRequestHandlerOptions["params"]) => {
|
||||
@@ -271,6 +285,7 @@ export default definePluginEntry({
|
||||
const rt = await ensureRuntime();
|
||||
return { rt, callId, message } as const;
|
||||
};
|
||||
|
||||
const initiateCallAndRespond = async (params: {
|
||||
rt: VoiceCallRuntime;
|
||||
respond: GatewayRequestHandlerOptions["respond"];
|
||||
@@ -285,7 +300,7 @@ export default definePluginEntry({
|
||||
dtmfSequence: params.dtmfSequence,
|
||||
});
|
||||
if (!result.success) {
|
||||
params.respond(false, { error: result.error || "initiate failed" });
|
||||
respondError(params.respond, result.error || "initiate failed");
|
||||
return;
|
||||
}
|
||||
params.respond(true, { callId: result.callId, initiated: true });
|
||||
@@ -306,12 +321,16 @@ export default definePluginEntry({
|
||||
}) => {
|
||||
const request = await resolveCallMessageRequest(params.requestParams);
|
||||
if ("error" in request) {
|
||||
params.respond(false, { error: request.error });
|
||||
respondError(
|
||||
params.respond,
|
||||
request.error ?? "callId and message required",
|
||||
ErrorCodes.INVALID_REQUEST,
|
||||
);
|
||||
return;
|
||||
}
|
||||
const result = await params.action(request);
|
||||
if (!result.success) {
|
||||
params.respond(false, { error: result.error || params.failure });
|
||||
respondError(params.respond, result.error || params.failure);
|
||||
return;
|
||||
}
|
||||
params.respond(
|
||||
@@ -328,13 +347,13 @@ export default definePluginEntry({
|
||||
try {
|
||||
const message = normalizeOptionalString(params?.message) ?? "";
|
||||
if (!message) {
|
||||
respond(false, { error: "message required" });
|
||||
respondError(respond, "message required", ErrorCodes.INVALID_REQUEST);
|
||||
return;
|
||||
}
|
||||
const rt = await ensureRuntime();
|
||||
const to = normalizeOptionalString(params?.to) ?? rt.config.toNumber;
|
||||
if (!to) {
|
||||
respond(false, { error: "to required" });
|
||||
respondError(respond, "to required", ErrorCodes.INVALID_REQUEST);
|
||||
return;
|
||||
}
|
||||
const mode =
|
||||
@@ -369,13 +388,58 @@ export default definePluginEntry({
|
||||
},
|
||||
);
|
||||
|
||||
api.registerGatewayMethod(
|
||||
"voicecall.continue.start",
|
||||
async ({ params, respond }: GatewayRequestHandlerOptions) => {
|
||||
try {
|
||||
const request = await resolveCallMessageRequest(params);
|
||||
if ("error" in request) {
|
||||
respondError(
|
||||
respond,
|
||||
request.error ?? "callId and message required",
|
||||
ErrorCodes.INVALID_REQUEST,
|
||||
);
|
||||
return;
|
||||
}
|
||||
respond(true, continueOperationStore.start(request));
|
||||
} catch (err) {
|
||||
sendError(respond, err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
api.registerGatewayMethod(
|
||||
"voicecall.continue.result",
|
||||
async ({ params, respond }: GatewayRequestHandlerOptions) => {
|
||||
try {
|
||||
const operationId = normalizeOptionalString(params?.operationId) ?? "";
|
||||
if (!operationId) {
|
||||
respondError(respond, "operationId required", ErrorCodes.INVALID_REQUEST);
|
||||
return;
|
||||
}
|
||||
const operation = continueOperationStore.read(operationId);
|
||||
if (!operation.ok) {
|
||||
respondError(respond, operation.error, ErrorCodes.INVALID_REQUEST);
|
||||
return;
|
||||
}
|
||||
respond(true, operation.payload);
|
||||
} catch (err) {
|
||||
sendError(respond, err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
api.registerGatewayMethod(
|
||||
"voicecall.speak",
|
||||
async ({ params, respond }: GatewayRequestHandlerOptions) => {
|
||||
try {
|
||||
const request = await resolveCallMessageRequest(params);
|
||||
if ("error" in request) {
|
||||
respond(false, { error: request.error });
|
||||
respondError(
|
||||
respond,
|
||||
request.error ?? "callId and message required",
|
||||
ErrorCodes.INVALID_REQUEST,
|
||||
);
|
||||
return;
|
||||
}
|
||||
if (request.rt.config.realtime.enabled) {
|
||||
@@ -390,7 +454,7 @@ export default definePluginEntry({
|
||||
}
|
||||
const result = await request.rt.manager.speak(request.callId, request.message);
|
||||
if (!result.success) {
|
||||
respond(false, { error: result.error || "speak failed" });
|
||||
respondError(respond, result.error || "speak failed");
|
||||
return;
|
||||
}
|
||||
respond(true, { success: true });
|
||||
@@ -407,13 +471,13 @@ export default definePluginEntry({
|
||||
const callId = normalizeOptionalString(params?.callId) ?? "";
|
||||
const digits = normalizeOptionalString(params?.digits) ?? "";
|
||||
if (!callId || !digits) {
|
||||
respond(false, { error: "callId and digits required" });
|
||||
respondError(respond, "callId and digits required", ErrorCodes.INVALID_REQUEST);
|
||||
return;
|
||||
}
|
||||
const rt = await ensureRuntime();
|
||||
const result = await rt.manager.sendDtmf(callId, digits);
|
||||
if (!result.success) {
|
||||
respond(false, { error: result.error || "dtmf failed" });
|
||||
respondError(respond, result.error || "dtmf failed");
|
||||
return;
|
||||
}
|
||||
respond(true, { success: true });
|
||||
@@ -429,13 +493,13 @@ export default definePluginEntry({
|
||||
try {
|
||||
const callId = normalizeOptionalString(params?.callId) ?? "";
|
||||
if (!callId) {
|
||||
respond(false, { error: "callId required" });
|
||||
respondError(respond, "callId required", ErrorCodes.INVALID_REQUEST);
|
||||
return;
|
||||
}
|
||||
const rt = await ensureRuntime();
|
||||
const result = await rt.manager.endCall(callId);
|
||||
if (!result.success) {
|
||||
respond(false, { error: result.error || "end failed" });
|
||||
respondError(respond, result.error || "end failed");
|
||||
return;
|
||||
}
|
||||
respond(true, { success: true });
|
||||
@@ -476,7 +540,7 @@ export default definePluginEntry({
|
||||
const message = normalizeOptionalString(params?.message) ?? "";
|
||||
const dtmfSequence = normalizeOptionalString(params?.dtmfSequence);
|
||||
if (!to) {
|
||||
respond(false, { error: "to required" });
|
||||
respondError(respond, "to required", ErrorCodes.INVALID_REQUEST);
|
||||
return;
|
||||
}
|
||||
const rt = await ensureRuntime();
|
||||
|
||||
@@ -38,6 +38,8 @@ type VoiceCallGatewayMethod =
|
||||
| "voicecall.initiate"
|
||||
| "voicecall.start"
|
||||
| "voicecall.continue"
|
||||
| "voicecall.continue.start"
|
||||
| "voicecall.continue.result"
|
||||
| "voicecall.speak"
|
||||
| "voicecall.dtmf"
|
||||
| "voicecall.end"
|
||||
@@ -45,7 +47,10 @@ type VoiceCallGatewayMethod =
|
||||
|
||||
type VoiceCallGatewayCallResult = { ok: true; payload: unknown } | { ok: false; error: unknown };
|
||||
|
||||
const VOICE_CALL_GATEWAY_TIMEOUT_MS = "5000";
|
||||
const VOICE_CALL_GATEWAY_DEFAULT_TIMEOUT_MS = 5000;
|
||||
const VOICE_CALL_GATEWAY_OPERATION_TIMEOUT_MS = 30000;
|
||||
const VOICE_CALL_GATEWAY_TRANSCRIPT_BUFFER_MS = 10000;
|
||||
const VOICE_CALL_GATEWAY_POLL_INTERVAL_MS = 1000;
|
||||
|
||||
const voiceCallCliDeps = {
|
||||
callGatewayFromCli,
|
||||
@@ -83,11 +88,16 @@ function isGatewayUnavailableForLocalFallback(err: unknown): boolean {
|
||||
async function callVoiceCallGateway(
|
||||
method: VoiceCallGatewayMethod,
|
||||
params?: Record<string, unknown>,
|
||||
opts?: { timeoutMs?: number },
|
||||
): Promise<VoiceCallGatewayCallResult> {
|
||||
try {
|
||||
const timeoutMs =
|
||||
typeof opts?.timeoutMs === "number" && Number.isFinite(opts.timeoutMs)
|
||||
? Math.max(1, Math.ceil(opts.timeoutMs))
|
||||
: VOICE_CALL_GATEWAY_DEFAULT_TIMEOUT_MS;
|
||||
const payload = await voiceCallCliDeps.callGatewayFromCli(
|
||||
method,
|
||||
{ json: true, timeout: VOICE_CALL_GATEWAY_TIMEOUT_MS },
|
||||
{ json: true, timeout: String(timeoutMs) },
|
||||
params,
|
||||
{ progress: false },
|
||||
);
|
||||
@@ -100,6 +110,94 @@ async function callVoiceCallGateway(
|
||||
}
|
||||
}
|
||||
|
||||
function resolveGatewayOperationTimeoutMs(config: VoiceCallConfig): number {
|
||||
return Math.max(VOICE_CALL_GATEWAY_OPERATION_TIMEOUT_MS, config.ringTimeoutMs + 5000);
|
||||
}
|
||||
|
||||
function resolveGatewayContinueTimeoutMs(config: VoiceCallConfig): number {
|
||||
return (
|
||||
config.transcriptTimeoutMs +
|
||||
VOICE_CALL_GATEWAY_OPERATION_TIMEOUT_MS +
|
||||
VOICE_CALL_GATEWAY_TRANSCRIPT_BUFFER_MS
|
||||
);
|
||||
}
|
||||
|
||||
function isUnknownGatewayMethod(err: unknown, method: VoiceCallGatewayMethod): boolean {
|
||||
return formatErrorMessage(err).includes(`unknown method: ${method}`);
|
||||
}
|
||||
|
||||
function readGatewayOperationId(payload: unknown): string {
|
||||
if (isRecord(payload) && typeof payload.operationId === "string" && payload.operationId) {
|
||||
return payload.operationId;
|
||||
}
|
||||
throw new Error("voicecall gateway response missing operationId");
|
||||
}
|
||||
|
||||
function readGatewayPollTimeoutMs(payload: unknown, fallbackTimeoutMs: number): number {
|
||||
if (isRecord(payload) && typeof payload.pollTimeoutMs === "number") {
|
||||
return Math.max(1, Math.ceil(payload.pollTimeoutMs));
|
||||
}
|
||||
return fallbackTimeoutMs;
|
||||
}
|
||||
|
||||
function readCompletedContinueResult(
|
||||
payload: unknown,
|
||||
):
|
||||
| { status: "pending" }
|
||||
| { status: "completed"; result: unknown }
|
||||
| { status: "failed"; error: string } {
|
||||
if (!isRecord(payload)) {
|
||||
throw new Error("voicecall gateway response missing operation status");
|
||||
}
|
||||
if (payload.status === "pending") {
|
||||
return { status: "pending" };
|
||||
}
|
||||
if (payload.status === "failed") {
|
||||
return {
|
||||
status: "failed",
|
||||
error: typeof payload.error === "string" ? payload.error : "continue failed",
|
||||
};
|
||||
}
|
||||
if (payload.status === "completed") {
|
||||
return { status: "completed", result: payload.result };
|
||||
}
|
||||
throw new Error("voicecall gateway response has unknown operation status");
|
||||
}
|
||||
|
||||
async function pollVoiceCallContinueGateway(params: {
|
||||
operationId: string;
|
||||
timeoutMs: number;
|
||||
}): Promise<unknown> {
|
||||
const deadlineMs = Date.now() + params.timeoutMs;
|
||||
|
||||
while (Date.now() <= deadlineMs) {
|
||||
const gateway = await callVoiceCallGateway(
|
||||
"voicecall.continue.result",
|
||||
{ operationId: params.operationId },
|
||||
{ timeoutMs: VOICE_CALL_GATEWAY_DEFAULT_TIMEOUT_MS },
|
||||
);
|
||||
if (!gateway.ok) {
|
||||
throw new Error(
|
||||
`gateway unavailable while waiting for voicecall continue result: ${formatErrorMessage(
|
||||
gateway.error,
|
||||
)}`,
|
||||
);
|
||||
}
|
||||
const result = readCompletedContinueResult(gateway.payload);
|
||||
if (result.status === "completed") {
|
||||
return result.result;
|
||||
}
|
||||
if (result.status === "failed") {
|
||||
throw new Error(result.error);
|
||||
}
|
||||
await sleep(
|
||||
Math.min(VOICE_CALL_GATEWAY_POLL_INTERVAL_MS, Math.max(1, deadlineMs - Date.now())),
|
||||
);
|
||||
}
|
||||
|
||||
throw new Error("voicecall continue timed out waiting for gateway operation");
|
||||
}
|
||||
|
||||
function resolveMode(input: string): "off" | "serve" | "funnel" {
|
||||
const raw = normalizeOptionalLowercaseString(input) ?? "";
|
||||
if (raw === "serve" || raw === "off") {
|
||||
@@ -252,17 +350,24 @@ function writeGatewayCallId(payload: unknown): void {
|
||||
|
||||
async function initiateCallViaGatewayOrRuntime(params: {
|
||||
ensureRuntime: () => Promise<VoiceCallRuntime>;
|
||||
config: VoiceCallConfig;
|
||||
method: "voicecall.initiate" | "voicecall.start";
|
||||
to?: string;
|
||||
message?: string;
|
||||
mode?: string;
|
||||
}) {
|
||||
const mode = resolveCallMode(params.mode);
|
||||
const gateway = await callVoiceCallGateway(params.method, {
|
||||
...(params.to ? { to: params.to } : {}),
|
||||
...(params.message ? { message: params.message } : {}),
|
||||
...(mode ? { mode } : {}),
|
||||
});
|
||||
const gateway = await callVoiceCallGateway(
|
||||
params.method,
|
||||
{
|
||||
...(params.to ? { to: params.to } : {}),
|
||||
...(params.message ? { message: params.message } : {}),
|
||||
...(mode ? { mode } : {}),
|
||||
},
|
||||
{
|
||||
timeoutMs: resolveGatewayOperationTimeoutMs(params.config),
|
||||
},
|
||||
);
|
||||
if (gateway.ok) {
|
||||
writeGatewayCallId(gateway.payload);
|
||||
return;
|
||||
@@ -355,11 +460,17 @@ export function registerVoiceCallCli(params: {
|
||||
return;
|
||||
}
|
||||
const mode = resolveCallMode(options.mode) ?? "notify";
|
||||
const gateway = await callVoiceCallGateway("voicecall.start", {
|
||||
to: options.to,
|
||||
...(options.message ? { message: options.message } : {}),
|
||||
mode,
|
||||
});
|
||||
const gateway = await callVoiceCallGateway(
|
||||
"voicecall.start",
|
||||
{
|
||||
to: options.to,
|
||||
...(options.message ? { message: options.message } : {}),
|
||||
mode,
|
||||
},
|
||||
{
|
||||
timeoutMs: resolveGatewayOperationTimeoutMs(config),
|
||||
},
|
||||
);
|
||||
let callId: unknown;
|
||||
if (gateway.ok) {
|
||||
callId = isRecord(gateway.payload) ? gateway.payload.callId : undefined;
|
||||
@@ -402,6 +513,7 @@ export function registerVoiceCallCli(params: {
|
||||
.action(async (options: { message: string; to?: string; mode?: string }) => {
|
||||
await initiateCallViaGatewayOrRuntime({
|
||||
ensureRuntime,
|
||||
config,
|
||||
method: "voicecall.initiate",
|
||||
to: options.to,
|
||||
message: options.message,
|
||||
@@ -422,6 +534,7 @@ export function registerVoiceCallCli(params: {
|
||||
.action(async (options: { to: string; message?: string; mode?: string }) => {
|
||||
await initiateCallViaGatewayOrRuntime({
|
||||
ensureRuntime,
|
||||
config,
|
||||
method: "voicecall.start",
|
||||
to: options.to,
|
||||
message: options.message,
|
||||
@@ -435,11 +548,45 @@ export function registerVoiceCallCli(params: {
|
||||
.requiredOption("--call-id <id>", "Call ID")
|
||||
.requiredOption("--message <text>", "Message to speak")
|
||||
.action(async (options: { callId: string; message: string }) => {
|
||||
const gateway = await callVoiceCallGateway("voicecall.continue", {
|
||||
callId: options.callId,
|
||||
message: options.message,
|
||||
});
|
||||
let gateway: VoiceCallGatewayCallResult;
|
||||
try {
|
||||
gateway = await callVoiceCallGateway(
|
||||
"voicecall.continue.start",
|
||||
{
|
||||
callId: options.callId,
|
||||
message: options.message,
|
||||
},
|
||||
{
|
||||
timeoutMs: resolveGatewayOperationTimeoutMs(config),
|
||||
},
|
||||
);
|
||||
} catch (err) {
|
||||
if (!isUnknownGatewayMethod(err, "voicecall.continue.start")) {
|
||||
throw err;
|
||||
}
|
||||
gateway = await callVoiceCallGateway(
|
||||
"voicecall.continue",
|
||||
{
|
||||
callId: options.callId,
|
||||
message: options.message,
|
||||
},
|
||||
{
|
||||
timeoutMs: resolveGatewayContinueTimeoutMs(config),
|
||||
},
|
||||
);
|
||||
}
|
||||
if (gateway.ok) {
|
||||
if (isRecord(gateway.payload) && typeof gateway.payload.operationId === "string") {
|
||||
const result = await pollVoiceCallContinueGateway({
|
||||
operationId: readGatewayOperationId(gateway.payload),
|
||||
timeoutMs: readGatewayPollTimeoutMs(
|
||||
gateway.payload,
|
||||
resolveGatewayContinueTimeoutMs(config),
|
||||
),
|
||||
});
|
||||
writeStdoutJson(result);
|
||||
return;
|
||||
}
|
||||
writeStdoutJson(gateway.payload);
|
||||
return;
|
||||
}
|
||||
|
||||
200
extensions/voice-call/src/gateway-continue-operation.ts
Normal file
200
extensions/voice-call/src/gateway-continue-operation.ts
Normal file
@@ -0,0 +1,200 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
|
||||
import type { VoiceCallConfig } from "./config.js";
|
||||
import type { CoreConfig } from "./core-bridge.js";
|
||||
import type { VoiceCallRuntime } from "./runtime.js";
|
||||
import { TELEPHONY_DEFAULT_TTS_TIMEOUT_MS } from "./telephony-tts.js";
|
||||
|
||||
const VOICE_CALL_CONTINUE_OPERATION_BUFFER_MS = 30000;
|
||||
const VOICE_CALL_CONTINUE_OPERATION_CLEANUP_MS = 5 * 60 * 1000;
|
||||
|
||||
type VoiceCallContinueOperation =
|
||||
| {
|
||||
operationId: string;
|
||||
status: "pending";
|
||||
callId: string;
|
||||
startedAtMs: number;
|
||||
pollTimeoutMs: number;
|
||||
}
|
||||
| {
|
||||
operationId: string;
|
||||
status: "completed";
|
||||
callId: string;
|
||||
startedAtMs: number;
|
||||
completedAtMs: number;
|
||||
pollTimeoutMs: number;
|
||||
result: { success: true; transcript?: string };
|
||||
}
|
||||
| {
|
||||
operationId: string;
|
||||
status: "failed";
|
||||
callId: string;
|
||||
startedAtMs: number;
|
||||
completedAtMs: number;
|
||||
pollTimeoutMs: number;
|
||||
error: string;
|
||||
};
|
||||
|
||||
export type VoiceCallContinueOperationStartPayload = {
|
||||
operationId: string;
|
||||
status: "pending";
|
||||
pollTimeoutMs: number;
|
||||
};
|
||||
|
||||
export type VoiceCallContinueOperationResultPayload =
|
||||
| {
|
||||
operationId: string;
|
||||
status: "pending";
|
||||
pollTimeoutMs: number;
|
||||
}
|
||||
| {
|
||||
operationId: string;
|
||||
status: "completed";
|
||||
result: { success: true; transcript?: string };
|
||||
}
|
||||
| {
|
||||
operationId: string;
|
||||
status: "failed";
|
||||
error: string;
|
||||
};
|
||||
|
||||
export type VoiceCallContinueOperationRequest = {
|
||||
rt: VoiceCallRuntime;
|
||||
callId: string;
|
||||
message: string;
|
||||
};
|
||||
|
||||
export function createVoiceCallContinueOperationStore(params: {
|
||||
config: VoiceCallConfig;
|
||||
coreConfig: CoreConfig;
|
||||
}) {
|
||||
const operations = new Map<string, VoiceCallContinueOperation>();
|
||||
|
||||
const resolvePollTimeoutMs = (rt: VoiceCallRuntime): number => {
|
||||
const ttsTimeoutMs =
|
||||
rt.config.tts?.timeoutMs ??
|
||||
params.config.tts?.timeoutMs ??
|
||||
params.coreConfig.messages?.tts?.timeoutMs ??
|
||||
TELEPHONY_DEFAULT_TTS_TIMEOUT_MS;
|
||||
return (
|
||||
(rt.config.transcriptTimeoutMs ?? params.config.transcriptTimeoutMs) +
|
||||
ttsTimeoutMs +
|
||||
VOICE_CALL_CONTINUE_OPERATION_BUFFER_MS
|
||||
);
|
||||
};
|
||||
|
||||
const scheduleCleanup = (operationId: string) => {
|
||||
const timer = setTimeout(() => {
|
||||
operations.delete(operationId);
|
||||
}, VOICE_CALL_CONTINUE_OPERATION_CLEANUP_MS);
|
||||
timer.unref?.();
|
||||
};
|
||||
|
||||
const start = (
|
||||
request: VoiceCallContinueOperationRequest,
|
||||
): VoiceCallContinueOperationStartPayload => {
|
||||
const operationId = randomUUID();
|
||||
const startedAtMs = Date.now();
|
||||
const pollTimeoutMs = resolvePollTimeoutMs(request.rt);
|
||||
operations.set(operationId, {
|
||||
operationId,
|
||||
status: "pending",
|
||||
callId: request.callId,
|
||||
startedAtMs,
|
||||
pollTimeoutMs,
|
||||
});
|
||||
|
||||
void request.rt.manager
|
||||
.continueCall(request.callId, request.message)
|
||||
.then((result) => {
|
||||
const current = operations.get(operationId);
|
||||
if (!current || current.status !== "pending") {
|
||||
return;
|
||||
}
|
||||
if (!result.success) {
|
||||
operations.set(operationId, {
|
||||
operationId,
|
||||
status: "failed",
|
||||
callId: request.callId,
|
||||
startedAtMs,
|
||||
completedAtMs: Date.now(),
|
||||
pollTimeoutMs,
|
||||
error: result.error || "continue failed",
|
||||
});
|
||||
return;
|
||||
}
|
||||
operations.set(operationId, {
|
||||
operationId,
|
||||
status: "completed",
|
||||
callId: request.callId,
|
||||
startedAtMs,
|
||||
completedAtMs: Date.now(),
|
||||
pollTimeoutMs,
|
||||
result: { success: true, transcript: result.transcript },
|
||||
});
|
||||
})
|
||||
.catch((err) => {
|
||||
const current = operations.get(operationId);
|
||||
if (!current || current.status !== "pending") {
|
||||
return;
|
||||
}
|
||||
operations.set(operationId, {
|
||||
operationId,
|
||||
status: "failed",
|
||||
callId: request.callId,
|
||||
startedAtMs,
|
||||
completedAtMs: Date.now(),
|
||||
pollTimeoutMs,
|
||||
error: formatErrorMessage(err),
|
||||
});
|
||||
})
|
||||
.finally(() => {
|
||||
scheduleCleanup(operationId);
|
||||
});
|
||||
|
||||
return { operationId, status: "pending", pollTimeoutMs };
|
||||
};
|
||||
|
||||
const read = (
|
||||
operationId: string,
|
||||
):
|
||||
| { ok: true; payload: VoiceCallContinueOperationResultPayload }
|
||||
| { ok: false; error: string } => {
|
||||
const operation = operations.get(operationId);
|
||||
if (!operation) {
|
||||
return { ok: false, error: "operation not found" };
|
||||
}
|
||||
if (operation.status === "pending") {
|
||||
return {
|
||||
ok: true,
|
||||
payload: {
|
||||
operationId,
|
||||
status: "pending",
|
||||
pollTimeoutMs: operation.pollTimeoutMs,
|
||||
},
|
||||
};
|
||||
}
|
||||
if (operation.status === "failed") {
|
||||
operations.delete(operationId);
|
||||
return {
|
||||
ok: true,
|
||||
payload: {
|
||||
operationId,
|
||||
status: "failed",
|
||||
error: operation.error,
|
||||
},
|
||||
};
|
||||
}
|
||||
operations.delete(operationId);
|
||||
return {
|
||||
ok: true,
|
||||
payload: {
|
||||
operationId,
|
||||
status: "completed",
|
||||
result: operation.result,
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
return { start, read };
|
||||
}
|
||||
@@ -24,7 +24,7 @@ export type TelephonyTtsProvider = {
|
||||
synthesizeForTelephony: (text: string) => Promise<Buffer>;
|
||||
};
|
||||
|
||||
const TELEPHONY_DEFAULT_TTS_TIMEOUT_MS = 8000;
|
||||
export const TELEPHONY_DEFAULT_TTS_TIMEOUT_MS = 8000;
|
||||
|
||||
export function createTelephonyTtsProvider(params: {
|
||||
coreConfig: CoreConfig;
|
||||
|
||||
Reference in New Issue
Block a user