mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 16:01:01 +00:00
fix: support Google Meet realtime barge-in (#73834)
Replay #73834 onto current main and preserve provider-side interruption when Google Meet detects a local human barge-in. Thanks @shhtheonlyperson.
This commit is contained in:
@@ -357,6 +357,9 @@ describe("google-meet plugin", () => {
|
||||
"coreaudio",
|
||||
"BlackHole 2ch",
|
||||
],
|
||||
bargeInRmsThreshold: 650,
|
||||
bargeInPeakThreshold: 2500,
|
||||
bargeInCooldownMs: 900,
|
||||
},
|
||||
voiceCall: {
|
||||
enabled: true,
|
||||
@@ -375,6 +378,48 @@ describe("google-meet plugin", () => {
|
||||
expect(resolveGoogleMeetConfig({}).realtime.instructions).toContain("openclaw_agent_consult");
|
||||
});
|
||||
|
||||
it("declares barge-in config metadata in the plugin entry and manifest", () => {
|
||||
const manifest = JSON.parse(
|
||||
readFileSync(new URL("./openclaw.plugin.json", import.meta.url), "utf8"),
|
||||
) as {
|
||||
uiHints?: Record<string, unknown>;
|
||||
configSchema?: {
|
||||
properties?: {
|
||||
chrome?: {
|
||||
properties?: Record<string, unknown>;
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
const entry = plugin as unknown as {
|
||||
configSchema: {
|
||||
uiHints?: Record<string, unknown>;
|
||||
};
|
||||
};
|
||||
|
||||
expect(entry.configSchema.uiHints).toMatchObject({
|
||||
"chrome.bargeInInputCommand": expect.objectContaining({ advanced: true }),
|
||||
"chrome.bargeInRmsThreshold": expect.objectContaining({ advanced: true }),
|
||||
"chrome.bargeInPeakThreshold": expect.objectContaining({ advanced: true }),
|
||||
"chrome.bargeInCooldownMs": expect.objectContaining({ advanced: true }),
|
||||
});
|
||||
expect(manifest.uiHints).toMatchObject({
|
||||
"chrome.bargeInInputCommand": expect.objectContaining({ advanced: true }),
|
||||
"chrome.bargeInRmsThreshold": expect.objectContaining({ advanced: true }),
|
||||
"chrome.bargeInPeakThreshold": expect.objectContaining({ advanced: true }),
|
||||
"chrome.bargeInCooldownMs": expect.objectContaining({ advanced: true }),
|
||||
});
|
||||
expect(manifest.configSchema?.properties?.chrome?.properties).toMatchObject({
|
||||
bargeInInputCommand: expect.objectContaining({
|
||||
type: "array",
|
||||
items: { type: "string" },
|
||||
}),
|
||||
bargeInRmsThreshold: expect.objectContaining({ type: "number", default: 650 }),
|
||||
bargeInPeakThreshold: expect.objectContaining({ type: "number", default: 2500 }),
|
||||
bargeInCooldownMs: expect.objectContaining({ type: "number", default: 900 }),
|
||||
});
|
||||
});
|
||||
|
||||
it("resolves the realtime consult agent id", () => {
|
||||
expect(
|
||||
resolveGoogleMeetConfig({
|
||||
@@ -1345,6 +1390,53 @@ describe("google-meet plugin", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("checks a configured local barge-in command in setup status", async () => {
|
||||
const originalPlatform = process.platform;
|
||||
Object.defineProperty(process, "platform", { value: "darwin" });
|
||||
try {
|
||||
const { tools } = setup(
|
||||
{
|
||||
defaultTransport: "chrome",
|
||||
chrome: {
|
||||
bargeInInputCommand: ["missing-barge-capture"],
|
||||
},
|
||||
},
|
||||
{
|
||||
runCommandWithTimeoutHandler: async (argv) => {
|
||||
if (argv[0] === "/usr/sbin/system_profiler") {
|
||||
return { code: 0, stdout: "BlackHole 2ch", stderr: "" };
|
||||
}
|
||||
if (argv[0] === "/bin/sh" && argv.at(-1) === "missing-barge-capture") {
|
||||
return { code: 1, stdout: "", stderr: "" };
|
||||
}
|
||||
return { code: 0, stdout: "", stderr: "" };
|
||||
},
|
||||
},
|
||||
);
|
||||
const tool = tools[0] as {
|
||||
execute: (
|
||||
id: string,
|
||||
params: unknown,
|
||||
) => Promise<{ details: { ok?: boolean; checks?: unknown[] } }>;
|
||||
};
|
||||
|
||||
const result = await tool.execute("id", { action: "setup_status", transport: "chrome" });
|
||||
|
||||
expect(result.details.ok).toBe(false);
|
||||
expect(result.details.checks).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
id: "chrome-local-audio-commands",
|
||||
ok: false,
|
||||
message: "Chrome audio command missing: missing-barge-capture",
|
||||
}),
|
||||
]),
|
||||
);
|
||||
} finally {
|
||||
Object.defineProperty(process, "platform", { value: originalPlatform });
|
||||
}
|
||||
});
|
||||
|
||||
it("skips local Chrome audio prerequisites for observe-only setup status", async () => {
|
||||
const originalPlatform = process.platform;
|
||||
Object.defineProperty(process, "platform", { value: "darwin" });
|
||||
@@ -2398,6 +2490,7 @@ describe("google-meet plugin", () => {
|
||||
connect: vi.fn(async () => {}),
|
||||
sendAudio,
|
||||
setMediaTimestamp: vi.fn(),
|
||||
handleBargeIn: vi.fn(),
|
||||
submitToolResult: vi.fn(),
|
||||
acknowledgeMark: vi.fn(),
|
||||
close: vi.fn(),
|
||||
@@ -2517,7 +2610,7 @@ describe("google-meet plugin", () => {
|
||||
});
|
||||
expect(sendAudio).toHaveBeenCalledWith(Buffer.from([1, 2, 3]));
|
||||
expect(outputStdinWrites).toEqual([Buffer.from([4, 5])]);
|
||||
expect(outputProcess.kill).toHaveBeenCalledWith("SIGTERM");
|
||||
expect(outputProcess.kill).toHaveBeenCalledWith("SIGKILL");
|
||||
expect(replacementOutputStdinWrites).toEqual([Buffer.from([6, 7])]);
|
||||
outputProcess.emit("error", new Error("stale output process failed after clear"));
|
||||
expect(bridge.close).not.toHaveBeenCalled();
|
||||
@@ -2570,7 +2663,114 @@ describe("google-meet plugin", () => {
|
||||
await handle.stop();
|
||||
expect(bridge.close).toHaveBeenCalled();
|
||||
expect(inputProcess.kill).toHaveBeenCalledWith("SIGTERM");
|
||||
expect(outputProcess.kill).toHaveBeenCalledWith("SIGTERM");
|
||||
expect(replacementOutputProcess.kill).toHaveBeenCalledWith("SIGTERM");
|
||||
});
|
||||
|
||||
it("uses a local barge-in input command to clear active Chrome playback", async () => {
|
||||
let callbacks:
|
||||
| {
|
||||
onAudio: (audio: Buffer) => void;
|
||||
}
|
||||
| undefined;
|
||||
const sendAudio = vi.fn();
|
||||
const bridge = {
|
||||
connect: vi.fn(async () => {}),
|
||||
sendAudio,
|
||||
setMediaTimestamp: vi.fn(),
|
||||
handleBargeIn: vi.fn(),
|
||||
submitToolResult: vi.fn(),
|
||||
acknowledgeMark: vi.fn(),
|
||||
close: vi.fn(),
|
||||
isConnected: vi.fn(() => true),
|
||||
};
|
||||
const provider: RealtimeVoiceProviderPlugin = {
|
||||
id: "openai",
|
||||
label: "OpenAI",
|
||||
autoSelectOrder: 1,
|
||||
resolveConfig: ({ rawConfig }) => rawConfig,
|
||||
isConfigured: () => true,
|
||||
createBridge: (req) => {
|
||||
callbacks = req;
|
||||
return bridge;
|
||||
},
|
||||
};
|
||||
const inputStdout = new PassThrough();
|
||||
const bargeInStdout = new PassThrough();
|
||||
const outputStdin = new Writable({
|
||||
write(_chunk, _encoding, done) {
|
||||
done();
|
||||
},
|
||||
});
|
||||
const replacementOutputStdin = new Writable({
|
||||
write(_chunk, _encoding, done) {
|
||||
done();
|
||||
},
|
||||
});
|
||||
const makeProcess = (stdio: {
|
||||
stdin?: { write(chunk: unknown): unknown } | null;
|
||||
stdout?: { on(event: "data", listener: (chunk: unknown) => void): unknown } | null;
|
||||
}): TestBridgeProcess => {
|
||||
const proc = new EventEmitter() as unknown as TestBridgeProcess;
|
||||
proc.stdin = stdio.stdin;
|
||||
proc.stdout = stdio.stdout;
|
||||
proc.stderr = new PassThrough();
|
||||
proc.killed = false;
|
||||
proc.kill = vi.fn(() => {
|
||||
proc.killed = true;
|
||||
return true;
|
||||
});
|
||||
return proc;
|
||||
};
|
||||
const outputProcess = makeProcess({ stdin: outputStdin, stdout: null });
|
||||
const inputProcess = makeProcess({ stdout: inputStdout, stdin: null });
|
||||
const bargeInProcess = makeProcess({ stdout: bargeInStdout, stdin: null });
|
||||
const replacementOutputProcess = makeProcess({ stdin: replacementOutputStdin, stdout: null });
|
||||
const spawnMock = vi
|
||||
.fn()
|
||||
.mockReturnValueOnce(outputProcess)
|
||||
.mockReturnValueOnce(inputProcess)
|
||||
.mockReturnValueOnce(bargeInProcess)
|
||||
.mockReturnValueOnce(replacementOutputProcess);
|
||||
|
||||
const handle = await startCommandRealtimeAudioBridge({
|
||||
config: resolveGoogleMeetConfig({
|
||||
chrome: {
|
||||
bargeInInputCommand: ["capture-human"],
|
||||
bargeInRmsThreshold: 10,
|
||||
bargeInPeakThreshold: 10,
|
||||
bargeInCooldownMs: 1,
|
||||
},
|
||||
realtime: { provider: "openai", model: "gpt-realtime" },
|
||||
}),
|
||||
fullConfig: {} as never,
|
||||
runtime: {} as never,
|
||||
meetingSessionId: "meet-1",
|
||||
inputCommand: ["capture-meet"],
|
||||
outputCommand: ["play-meet"],
|
||||
logger: noopLogger,
|
||||
providers: [provider],
|
||||
spawn: spawnMock,
|
||||
});
|
||||
|
||||
callbacks?.onAudio(Buffer.alloc(48_000));
|
||||
inputStdout.write(Buffer.from([1, 2, 3, 4]));
|
||||
bargeInStdout.write(Buffer.from([0xff, 0x7f, 0xff, 0x7f]));
|
||||
|
||||
expect(spawnMock).toHaveBeenNthCalledWith(3, "capture-human", [], {
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
});
|
||||
expect(bridge.handleBargeIn).toHaveBeenCalled();
|
||||
expect(outputProcess.kill).toHaveBeenCalledWith("SIGKILL");
|
||||
expect(sendAudio).not.toHaveBeenCalledWith(Buffer.from([1, 2, 3, 4]));
|
||||
expect(handle.getHealth()).toMatchObject({
|
||||
clearCount: 1,
|
||||
suppressedInputBytes: 4,
|
||||
});
|
||||
|
||||
await handle.stop();
|
||||
expect(inputProcess.kill).toHaveBeenCalledWith("SIGTERM");
|
||||
expect(bargeInProcess.kill).toHaveBeenCalledWith("SIGTERM");
|
||||
expect(replacementOutputProcess.kill).toHaveBeenCalledWith("SIGTERM");
|
||||
});
|
||||
|
||||
it("pipes paired-node command-pair audio through the realtime provider", async () => {
|
||||
|
||||
@@ -91,6 +91,26 @@ const googleMeetConfigSchema = {
|
||||
help: "Command that reads assistant audio from stdin in chrome.audioFormat.",
|
||||
advanced: true,
|
||||
},
|
||||
"chrome.bargeInInputCommand": {
|
||||
label: "Barge-In Input Command",
|
||||
help: "Optional Gateway-hosted microphone command that writes signed 16-bit little-endian mono PCM for human interruption detection while assistant playback is active.",
|
||||
advanced: true,
|
||||
},
|
||||
"chrome.bargeInRmsThreshold": {
|
||||
label: "Barge-In RMS Threshold",
|
||||
help: "RMS level on chrome.bargeInInputCommand that counts as a human interruption.",
|
||||
advanced: true,
|
||||
},
|
||||
"chrome.bargeInPeakThreshold": {
|
||||
label: "Barge-In Peak Threshold",
|
||||
help: "Peak level on chrome.bargeInInputCommand that counts as a human interruption.",
|
||||
advanced: true,
|
||||
},
|
||||
"chrome.bargeInCooldownMs": {
|
||||
label: "Barge-In Cooldown (ms)",
|
||||
help: "Minimum delay between repeated barge-in clears.",
|
||||
advanced: true,
|
||||
},
|
||||
"chrome.audioBridgeCommand": { label: "Audio Bridge Command", advanced: true },
|
||||
"chrome.audioBridgeHealthCommand": {
|
||||
label: "Audio Bridge Health Command",
|
||||
|
||||
@@ -65,6 +65,26 @@
|
||||
"help": "Command that reads assistant audio from stdin in chrome.audioFormat.",
|
||||
"advanced": true
|
||||
},
|
||||
"chrome.bargeInInputCommand": {
|
||||
"label": "Barge-In Input Command",
|
||||
"help": "Optional Gateway-hosted microphone command that writes signed 16-bit little-endian mono PCM for human interruption detection while assistant playback is active.",
|
||||
"advanced": true
|
||||
},
|
||||
"chrome.bargeInRmsThreshold": {
|
||||
"label": "Barge-In RMS Threshold",
|
||||
"help": "RMS level on chrome.bargeInInputCommand that counts as a human interruption.",
|
||||
"advanced": true
|
||||
},
|
||||
"chrome.bargeInPeakThreshold": {
|
||||
"label": "Barge-In Peak Threshold",
|
||||
"help": "Peak level on chrome.bargeInInputCommand that counts as a human interruption.",
|
||||
"advanced": true
|
||||
},
|
||||
"chrome.bargeInCooldownMs": {
|
||||
"label": "Barge-In Cooldown (ms)",
|
||||
"help": "Minimum delay between repeated barge-in clears.",
|
||||
"advanced": true
|
||||
},
|
||||
"chrome.audioFormat": {
|
||||
"label": "Audio Format",
|
||||
"help": "Command-pair audio format. PCM16 24 kHz is the default Chrome/Meet path; G.711 mu-law 8 kHz remains available for legacy command pairs.",
|
||||
@@ -294,6 +314,24 @@
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"bargeInInputCommand": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"bargeInRmsThreshold": {
|
||||
"type": "number",
|
||||
"default": 650
|
||||
},
|
||||
"bargeInPeakThreshold": {
|
||||
"type": "number",
|
||||
"default": 2500
|
||||
},
|
||||
"bargeInCooldownMs": {
|
||||
"type": "number",
|
||||
"default": 900
|
||||
},
|
||||
"audioBridgeCommand": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
|
||||
@@ -35,6 +35,10 @@ export type GoogleMeetConfig = {
|
||||
waitForInCallMs: number;
|
||||
audioInputCommand?: string[];
|
||||
audioOutputCommand?: string[];
|
||||
bargeInInputCommand?: string[];
|
||||
bargeInRmsThreshold: number;
|
||||
bargeInPeakThreshold: number;
|
||||
bargeInCooldownMs: number;
|
||||
audioBridgeCommand?: string[];
|
||||
audioBridgeHealthCommand?: string[];
|
||||
};
|
||||
@@ -152,6 +156,9 @@ export const LEGACY_GOOGLE_MEET_AUDIO_OUTPUT_COMMAND = [
|
||||
] as const;
|
||||
|
||||
export const DEFAULT_GOOGLE_MEET_CHROME_AUDIO_FORMAT: GoogleMeetChromeAudioFormat = "pcm16-24khz";
|
||||
export const DEFAULT_GOOGLE_MEET_BARGE_IN_RMS_THRESHOLD = 650;
|
||||
export const DEFAULT_GOOGLE_MEET_BARGE_IN_PEAK_THRESHOLD = 2500;
|
||||
export const DEFAULT_GOOGLE_MEET_BARGE_IN_COOLDOWN_MS = 900;
|
||||
|
||||
export const DEFAULT_GOOGLE_MEET_REALTIME_INSTRUCTIONS = `You are joining a private Google Meet as an OpenClaw agent. Keep spoken replies brief and natural. When a question needs deeper reasoning, current information, or tools, call ${REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME} before answering.`;
|
||||
export const DEFAULT_GOOGLE_MEET_REALTIME_INTRO_MESSAGE = "Say exactly: I'm here and listening.";
|
||||
@@ -175,6 +182,9 @@ export const DEFAULT_GOOGLE_MEET_CONFIG: GoogleMeetConfig = {
|
||||
waitForInCallMs: 20_000,
|
||||
audioInputCommand: [...DEFAULT_GOOGLE_MEET_AUDIO_INPUT_COMMAND],
|
||||
audioOutputCommand: [...DEFAULT_GOOGLE_MEET_AUDIO_OUTPUT_COMMAND],
|
||||
bargeInRmsThreshold: DEFAULT_GOOGLE_MEET_BARGE_IN_RMS_THRESHOLD,
|
||||
bargeInPeakThreshold: DEFAULT_GOOGLE_MEET_BARGE_IN_PEAK_THRESHOLD,
|
||||
bargeInCooldownMs: DEFAULT_GOOGLE_MEET_BARGE_IN_COOLDOWN_MS,
|
||||
},
|
||||
chromeNode: {},
|
||||
twilio: {},
|
||||
@@ -411,6 +421,19 @@ export function resolveGoogleMeetConfigWithEnv(
|
||||
audioOutputCommand: configuredAudioOutputCommand ?? [
|
||||
...defaultAudioOutputCommand(audioFormat),
|
||||
],
|
||||
bargeInInputCommand: resolveStringArray(chrome.bargeInInputCommand),
|
||||
bargeInRmsThreshold: resolveNumber(
|
||||
chrome.bargeInRmsThreshold,
|
||||
DEFAULT_GOOGLE_MEET_CONFIG.chrome.bargeInRmsThreshold,
|
||||
),
|
||||
bargeInPeakThreshold: resolveNumber(
|
||||
chrome.bargeInPeakThreshold,
|
||||
DEFAULT_GOOGLE_MEET_CONFIG.chrome.bargeInPeakThreshold,
|
||||
),
|
||||
bargeInCooldownMs: resolveNumber(
|
||||
chrome.bargeInCooldownMs,
|
||||
DEFAULT_GOOGLE_MEET_CONFIG.chrome.bargeInCooldownMs,
|
||||
),
|
||||
audioBridgeCommand: resolveStringArray(chrome.audioBridgeCommand),
|
||||
audioBridgeHealthCommand: resolveStringArray(chrome.audioBridgeHealthCommand),
|
||||
},
|
||||
|
||||
@@ -63,6 +63,23 @@ function splitCommand(argv: string[]): { command: string; args: string[] } {
|
||||
return { command, args };
|
||||
}
|
||||
|
||||
function readPcm16Stats(audio: Buffer): { rms: number; peak: number } {
|
||||
let sumSquares = 0;
|
||||
let peak = 0;
|
||||
let samples = 0;
|
||||
for (let offset = 0; offset + 1 < audio.byteLength; offset += 2) {
|
||||
const sample = audio.readInt16LE(offset);
|
||||
const abs = Math.abs(sample);
|
||||
peak = Math.max(peak, abs);
|
||||
sumSquares += sample * sample;
|
||||
samples += 1;
|
||||
}
|
||||
return {
|
||||
rms: samples > 0 ? Math.sqrt(sumSquares / samples) : 0,
|
||||
peak,
|
||||
};
|
||||
}
|
||||
|
||||
export function resolveGoogleMeetRealtimeAudioFormat(config: GoogleMeetConfig) {
|
||||
return config.chrome.audioFormat === "g711-ulaw-8khz"
|
||||
? REALTIME_VOICE_AUDIO_FORMAT_G711_ULAW_8KHZ
|
||||
@@ -117,6 +134,48 @@ export async function startCommandRealtimeAudioBridge(params: {
|
||||
let lastOutputBytes = 0;
|
||||
let lastClearAt: string | undefined;
|
||||
let clearCount = 0;
|
||||
let suppressedInputBytes = 0;
|
||||
let lastSuppressedInputAt: string | undefined;
|
||||
let suppressInputUntil = 0;
|
||||
let lastOutputAtMs = 0;
|
||||
let lastOutputPlayableUntilMs = 0;
|
||||
let bargeInInputProcess: BridgeProcess | undefined;
|
||||
|
||||
const suppressInputForOutput = (audio: Buffer) => {
|
||||
const bytesPerMs = params.config.chrome.audioFormat === "g711-ulaw-8khz" ? 8 : 48;
|
||||
const durationMs = Math.ceil(audio.byteLength / bytesPerMs);
|
||||
const until = Date.now() + durationMs + 900;
|
||||
suppressInputUntil = Math.max(suppressInputUntil, until);
|
||||
lastOutputPlayableUntilMs = Math.max(lastOutputPlayableUntilMs, until);
|
||||
};
|
||||
|
||||
const terminateProcess = (proc: BridgeProcess, signal: NodeJS.Signals = "SIGTERM") => {
|
||||
if (proc.killed && signal !== "SIGKILL") {
|
||||
return;
|
||||
}
|
||||
let exited = false;
|
||||
proc.on("exit", () => {
|
||||
exited = true;
|
||||
});
|
||||
try {
|
||||
proc.kill(signal);
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
if (signal === "SIGKILL") {
|
||||
return;
|
||||
}
|
||||
const timer = setTimeout(() => {
|
||||
if (!exited) {
|
||||
try {
|
||||
proc.kill("SIGKILL");
|
||||
} catch {
|
||||
// Process may have exited after the grace check.
|
||||
}
|
||||
}
|
||||
}, 1000);
|
||||
timer.unref?.();
|
||||
};
|
||||
|
||||
const stop = async () => {
|
||||
if (stopped) {
|
||||
@@ -130,8 +189,11 @@ export async function startCommandRealtimeAudioBridge(params: {
|
||||
`[google-meet] realtime voice bridge close ignored: ${formatErrorMessage(error)}`,
|
||||
);
|
||||
}
|
||||
inputProcess.kill("SIGTERM");
|
||||
outputProcess.kill("SIGTERM");
|
||||
terminateProcess(inputProcess);
|
||||
terminateProcess(outputProcess);
|
||||
if (bargeInInputProcess) {
|
||||
terminateProcess(bargeInInputProcess);
|
||||
}
|
||||
};
|
||||
|
||||
const fail = (label: string) => (error: Error) => {
|
||||
@@ -169,10 +231,69 @@ export async function startCommandRealtimeAudioBridge(params: {
|
||||
attachOutputProcessHandlers(outputProcess);
|
||||
clearCount += 1;
|
||||
lastClearAt = new Date().toISOString();
|
||||
suppressInputUntil = 0;
|
||||
lastOutputPlayableUntilMs = 0;
|
||||
params.logger.debug?.(
|
||||
`[google-meet] cleared realtime audio output buffer by restarting playback command`,
|
||||
);
|
||||
previousOutput.kill("SIGTERM");
|
||||
terminateProcess(previousOutput, "SIGKILL");
|
||||
};
|
||||
const startHumanBargeInMonitor = () => {
|
||||
const commandArgv = params.config.chrome.bargeInInputCommand;
|
||||
if (!commandArgv) {
|
||||
return;
|
||||
}
|
||||
const command = splitCommand(commandArgv);
|
||||
let lastBargeInAt = 0;
|
||||
bargeInInputProcess = spawnFn(command.command, command.args, {
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
});
|
||||
bargeInInputProcess.stdout?.on("data", (chunk) => {
|
||||
if (stopped || lastOutputAtMs === 0) {
|
||||
return;
|
||||
}
|
||||
const now = Date.now();
|
||||
const playbackActive = now <= Math.max(lastOutputPlayableUntilMs, suppressInputUntil);
|
||||
if (!playbackActive && now - lastOutputAtMs > 1000) {
|
||||
return;
|
||||
}
|
||||
if (now - lastBargeInAt < params.config.chrome.bargeInCooldownMs) {
|
||||
return;
|
||||
}
|
||||
const audio = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
|
||||
const stats = readPcm16Stats(audio);
|
||||
if (
|
||||
stats.rms < params.config.chrome.bargeInRmsThreshold &&
|
||||
stats.peak < params.config.chrome.bargeInPeakThreshold
|
||||
) {
|
||||
return;
|
||||
}
|
||||
lastBargeInAt = now;
|
||||
suppressInputUntil = 0;
|
||||
const beforeClearCount = clearCount;
|
||||
bridge?.handleBargeIn({ audioPlaybackActive: true });
|
||||
if (beforeClearCount === clearCount) {
|
||||
clearOutputPlayback();
|
||||
}
|
||||
params.logger.debug?.(
|
||||
`[google-meet] human barge-in detected by local input (rms=${Math.round(
|
||||
stats.rms,
|
||||
)}, peak=${stats.peak})`,
|
||||
);
|
||||
});
|
||||
bargeInInputProcess.stderr?.on("data", (chunk) => {
|
||||
params.logger.debug?.(`[google-meet] barge-in input: ${String(chunk).trim()}`);
|
||||
});
|
||||
bargeInInputProcess.on("error", (error) => {
|
||||
params.logger.warn(`[google-meet] human barge-in input failed: ${formatErrorMessage(error)}`);
|
||||
});
|
||||
bargeInInputProcess.on("exit", (code, signal) => {
|
||||
if (!stopped) {
|
||||
params.logger.debug?.(
|
||||
`[google-meet] human barge-in input exited (${code ?? signal ?? "done"})`,
|
||||
);
|
||||
}
|
||||
});
|
||||
};
|
||||
inputProcess.on("error", fail("audio input command"));
|
||||
inputProcess.on("exit", (code, signal) => {
|
||||
@@ -204,8 +325,10 @@ export async function startCommandRealtimeAudioBridge(params: {
|
||||
audioSink: {
|
||||
isOpen: () => !stopped,
|
||||
sendAudio: (audio) => {
|
||||
lastOutputAtMs = Date.now();
|
||||
lastOutputAt = new Date().toISOString();
|
||||
lastOutputBytes += audio.byteLength;
|
||||
suppressInputForOutput(audio);
|
||||
outputProcess.stdin?.write(audio);
|
||||
},
|
||||
clearAudio: clearOutputPlayback,
|
||||
@@ -256,10 +379,16 @@ export async function startCommandRealtimeAudioBridge(params: {
|
||||
realtimeReady = true;
|
||||
},
|
||||
});
|
||||
startHumanBargeInMonitor();
|
||||
|
||||
inputProcess.stdout?.on("data", (chunk) => {
|
||||
const audio = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
|
||||
if (!stopped && audio.byteLength > 0) {
|
||||
if (Date.now() < suppressInputUntil) {
|
||||
lastSuppressedInputAt = new Date().toISOString();
|
||||
suppressedInputBytes += audio.byteLength;
|
||||
return;
|
||||
}
|
||||
lastInputAt = new Date().toISOString();
|
||||
lastInputBytes += audio.byteLength;
|
||||
bridge?.sendAudio(Buffer.from(audio));
|
||||
@@ -281,8 +410,10 @@ export async function startCommandRealtimeAudioBridge(params: {
|
||||
audioOutputActive: lastOutputBytes > 0,
|
||||
lastInputAt,
|
||||
lastOutputAt,
|
||||
lastSuppressedInputAt,
|
||||
lastInputBytes,
|
||||
lastOutputBytes,
|
||||
suppressedInputBytes,
|
||||
lastClearAt,
|
||||
clearCount,
|
||||
bridgeClosed: stopped,
|
||||
|
||||
@@ -133,7 +133,11 @@ function evaluateSpeechReadiness(session: GoogleMeetSession): {
|
||||
function collectChromeAudioCommands(config: GoogleMeetConfig): string[] {
|
||||
const commands = config.chrome.audioBridgeCommand
|
||||
? [config.chrome.audioBridgeCommand[0]]
|
||||
: [config.chrome.audioInputCommand?.[0], config.chrome.audioOutputCommand?.[0]];
|
||||
: [
|
||||
config.chrome.audioInputCommand?.[0],
|
||||
config.chrome.audioOutputCommand?.[0],
|
||||
config.chrome.bargeInInputCommand?.[0],
|
||||
];
|
||||
return [...new Set(commands.filter((value): value is string => Boolean(value?.trim())))];
|
||||
}
|
||||
|
||||
|
||||
@@ -40,9 +40,11 @@ export type GoogleMeetChromeHealth = {
|
||||
audioOutputActive?: boolean;
|
||||
lastInputAt?: string;
|
||||
lastOutputAt?: string;
|
||||
lastSuppressedInputAt?: string;
|
||||
lastClearAt?: string;
|
||||
lastInputBytes?: number;
|
||||
lastOutputBytes?: number;
|
||||
suppressedInputBytes?: number;
|
||||
consecutiveInputErrors?: number;
|
||||
lastInputError?: string;
|
||||
clearCount?: number;
|
||||
|
||||
@@ -66,6 +66,9 @@ type FakeWebSocketInstance = InstanceType<typeof FakeWebSocket>;
|
||||
type SentRealtimeEvent = {
|
||||
type: string;
|
||||
audio?: string;
|
||||
item_id?: string;
|
||||
content_index?: number;
|
||||
audio_end_ms?: number;
|
||||
session?: {
|
||||
input_audio_format?: string;
|
||||
output_audio_format?: string;
|
||||
@@ -279,4 +282,56 @@ describe("buildOpenAIRealtimeVoiceProvider", () => {
|
||||
expect(socket.terminated).toBe(false);
|
||||
expect(onClose).toHaveBeenCalledWith("completed");
|
||||
});
|
||||
|
||||
it("truncates externally interrupted playback after an immediate mark acknowledgement", async () => {
|
||||
const provider = buildOpenAIRealtimeVoiceProvider();
|
||||
const onAudio = vi.fn();
|
||||
const onClearAudio = vi.fn();
|
||||
let bridge: ReturnType<typeof provider.createBridge>;
|
||||
bridge = provider.createBridge({
|
||||
providerConfig: { apiKey: "sk-test" }, // pragma: allowlist secret
|
||||
onAudio,
|
||||
onClearAudio,
|
||||
onMark: () => bridge.acknowledgeMark(),
|
||||
});
|
||||
const connecting = bridge.connect();
|
||||
const socket = FakeWebSocket.instances[0];
|
||||
if (!socket) {
|
||||
throw new Error("expected bridge to create a websocket");
|
||||
}
|
||||
|
||||
socket.readyState = FakeWebSocket.OPEN;
|
||||
socket.emit("open");
|
||||
await connecting;
|
||||
socket.emit("message", Buffer.from(JSON.stringify({ type: "session.updated" })));
|
||||
|
||||
bridge.setMediaTimestamp(1000);
|
||||
socket.emit(
|
||||
"message",
|
||||
Buffer.from(JSON.stringify({ type: "response.created", response: { id: "resp_1" } })),
|
||||
);
|
||||
socket.emit(
|
||||
"message",
|
||||
Buffer.from(
|
||||
JSON.stringify({
|
||||
type: "response.audio.delta",
|
||||
item_id: "item_1",
|
||||
delta: Buffer.from("assistant audio").toString("base64"),
|
||||
}),
|
||||
),
|
||||
);
|
||||
bridge.setMediaTimestamp(1240);
|
||||
|
||||
bridge.handleBargeIn?.({ audioPlaybackActive: true });
|
||||
|
||||
expect(onAudio).toHaveBeenCalledTimes(1);
|
||||
expect(onClearAudio).toHaveBeenCalledTimes(1);
|
||||
expect(parseSent(socket)).toContainEqual({ type: "response.cancel" });
|
||||
expect(parseSent(socket)).toContainEqual({
|
||||
type: "conversation.item.truncate",
|
||||
item_id: "item_1",
|
||||
content_index: 0,
|
||||
audio_end_ms: 240,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -10,6 +10,7 @@ import {
|
||||
} from "openclaw/plugin-sdk/proxy-capture";
|
||||
import type {
|
||||
RealtimeVoiceAudioFormat,
|
||||
RealtimeVoiceBargeInOptions,
|
||||
RealtimeVoiceBridge,
|
||||
RealtimeVoiceBrowserSession,
|
||||
RealtimeVoiceBrowserSessionCreateRequest,
|
||||
@@ -77,6 +78,10 @@ type RealtimeEvent = {
|
||||
item_id?: string;
|
||||
call_id?: string;
|
||||
name?: string;
|
||||
response?: {
|
||||
id?: string;
|
||||
status?: string;
|
||||
};
|
||||
error?: unknown;
|
||||
};
|
||||
|
||||
@@ -141,6 +146,7 @@ class OpenAIRealtimeVoiceBridge implements RealtimeVoiceBridge {
|
||||
private pendingAudio: Buffer[] = [];
|
||||
private markQueue: string[] = [];
|
||||
private responseStartTimestamp: number | null = null;
|
||||
private responseActive = false;
|
||||
private latestMediaTimestamp = 0;
|
||||
private lastAssistantItemId: string | null = null;
|
||||
private toolCallBuffers = new Map<string, { name: string; callId: string; args: string }>();
|
||||
@@ -216,10 +222,6 @@ class OpenAIRealtimeVoiceBridge implements RealtimeVoiceBridge {
|
||||
return;
|
||||
}
|
||||
this.markQueue.shift();
|
||||
if (this.markQueue.length === 0) {
|
||||
this.responseStartTimestamp = null;
|
||||
this.lastAssistantItemId = null;
|
||||
}
|
||||
}
|
||||
|
||||
close(): void {
|
||||
@@ -483,18 +485,23 @@ class OpenAIRealtimeVoiceBridge implements RealtimeVoiceBridge {
|
||||
}
|
||||
return;
|
||||
|
||||
case "response.created":
|
||||
this.responseActive = true;
|
||||
return;
|
||||
|
||||
case "response.audio.delta": {
|
||||
if (!event.delta) {
|
||||
return;
|
||||
}
|
||||
const audio = base64ToBuffer(event.delta);
|
||||
this.config.onAudio(audio);
|
||||
if (this.responseStartTimestamp === null) {
|
||||
if (event.item_id && event.item_id !== this.lastAssistantItemId) {
|
||||
this.lastAssistantItemId = event.item_id;
|
||||
this.responseStartTimestamp = this.latestMediaTimestamp;
|
||||
} else if (this.responseStartTimestamp === null) {
|
||||
this.responseStartTimestamp = this.latestMediaTimestamp;
|
||||
}
|
||||
if (event.item_id) {
|
||||
this.lastAssistantItemId = event.item_id;
|
||||
}
|
||||
this.responseActive = true;
|
||||
this.sendMark();
|
||||
return;
|
||||
}
|
||||
@@ -527,6 +534,10 @@ class OpenAIRealtimeVoiceBridge implements RealtimeVoiceBridge {
|
||||
}
|
||||
return;
|
||||
|
||||
case "response.done":
|
||||
this.responseActive = false;
|
||||
return;
|
||||
|
||||
case "response.function_call_arguments.delta": {
|
||||
const key = event.item_id ?? "unknown";
|
||||
const existing = this.toolCallBuffers.get(key);
|
||||
@@ -576,21 +587,29 @@ class OpenAIRealtimeVoiceBridge implements RealtimeVoiceBridge {
|
||||
}
|
||||
}
|
||||
|
||||
private handleBargeIn(): void {
|
||||
if (this.markQueue.length > 0 && this.responseStartTimestamp !== null) {
|
||||
const elapsedMs = this.latestMediaTimestamp - this.responseStartTimestamp;
|
||||
if (this.lastAssistantItemId) {
|
||||
this.sendEvent({
|
||||
type: "conversation.item.truncate",
|
||||
item_id: this.lastAssistantItemId,
|
||||
content_index: 0,
|
||||
audio_end_ms: Math.max(0, elapsedMs),
|
||||
});
|
||||
}
|
||||
handleBargeIn(options?: RealtimeVoiceBargeInOptions): void {
|
||||
const assistantItemId = this.lastAssistantItemId;
|
||||
const responseStartTimestamp = this.responseStartTimestamp;
|
||||
const shouldInterruptProvider =
|
||||
responseStartTimestamp !== null &&
|
||||
assistantItemId !== null &&
|
||||
(this.markQueue.length > 0 || options?.audioPlaybackActive === true);
|
||||
if (options?.audioPlaybackActive === true && this.responseActive) {
|
||||
this.sendEvent({ type: "response.cancel" });
|
||||
}
|
||||
if (shouldInterruptProvider) {
|
||||
const elapsedMs = this.latestMediaTimestamp - responseStartTimestamp;
|
||||
this.sendEvent({
|
||||
type: "conversation.item.truncate",
|
||||
item_id: assistantItemId,
|
||||
content_index: 0,
|
||||
audio_end_ms: Math.max(0, elapsedMs),
|
||||
});
|
||||
this.config.onClearAudio();
|
||||
this.markQueue = [];
|
||||
this.lastAssistantItemId = null;
|
||||
this.responseStartTimestamp = null;
|
||||
this.responseActive = false;
|
||||
return;
|
||||
}
|
||||
this.config.onClearAudio();
|
||||
|
||||
Reference in New Issue
Block a user