From fda0141a01b31f023995168b38eaad5d64bef04d Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 25 May 2026 23:13:27 +0100 Subject: [PATCH] Refactor realtime voice turn context tracking (#86650) * refactor: share realtime turn context tracking * chore: track realtime voice sdk api baseline * fix: preserve pruned realtime turn handle state --- CHANGELOG.md | 1 + .../.generated/plugin-sdk-api-baseline.sha256 | 4 +- docs/plugins/sdk-migration.md | 2 +- docs/plugins/sdk-subpaths.md | 2 +- extensions/discord/src/voice/realtime.ts | 129 ++++-------- scripts/lib/plugin-sdk-doc-metadata.ts | 3 + src/plugin-sdk/realtime-voice.ts | 6 + src/talk/turn-context-tracker.test.ts | 124 +++++++++++ src/talk/turn-context-tracker.ts | 194 ++++++++++++++++++ 9 files changed, 369 insertions(+), 96 deletions(-) create mode 100644 src/talk/turn-context-tracker.test.ts create mode 100644 src/talk/turn-context-tracker.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index af23b8dff66..cbd19f9c389 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ Docs: https://docs.openclaw.ai ### Changes +- Voice: expose shared realtime turn-context tracking through the realtime voice SDK and reuse it for Discord speaker attribution and wake-name context recovery. - Voice: expose shared realtime consult question matching, speakable-result extraction, and forced-consult coordination through the realtime voice SDK, then reuse it in Gateway Talk, Voice Call, and Discord voice paths. - Voice: share activation-name matching and consult-transcript screening through the realtime voice SDK so Discord, browser voice, and meeting surfaces can reuse one implementation. - Cron: default `cron.maxConcurrentRuns` to 8 so scheduled automations and their isolated agent turns can make progress in parallel without explicit configuration. diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index 5c7d9c4e775..b2a821d0e94 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -374f1fec7d6fa8c00865dcb58b68d89ec10e85e81ef536c5746167a83d10bcc7 plugin-sdk-api-baseline.json -ffc6a2faf381d1bb118845e010b2798397c3d41fff400f52ee57b6dc197c8af3 plugin-sdk-api-baseline.jsonl +88b179d6ae301fdfbb5104533b66cdac629a76c1afb33faa1548162cee22dc79 plugin-sdk-api-baseline.json +30c981ed0987cc72335f3f16aff264e9a0f1c903b1cc03f347fa97db9468366e plugin-sdk-api-baseline.jsonl diff --git a/docs/plugins/sdk-migration.md b/docs/plugins/sdk-migration.md index d3ae046b5fc..f1906880a6c 100644 --- a/docs/plugins/sdk-migration.md +++ b/docs/plugins/sdk-migration.md @@ -626,7 +626,7 @@ releases. | `plugin-sdk/speech` | Speech helpers | Speech provider types plus provider-facing directive, registry, validation helpers, and OpenAI-compatible TTS builder | | `plugin-sdk/speech-core` | Shared speech core | Speech provider types, registry, directives, normalization | | `plugin-sdk/realtime-transcription` | Realtime transcription helpers | Provider types, registry helpers, and shared WebSocket session helper | - | `plugin-sdk/realtime-voice` | Realtime voice helpers | Provider types, registry/resolution helpers, bridge session helpers, shared agent talk-back queues, active-run voice control, transcript/event health, echo suppression, consult question matching, forced-consult coordination, and fast context consult helpers | + | `plugin-sdk/realtime-voice` | Realtime voice helpers | Provider types, registry/resolution helpers, bridge session helpers, shared agent talk-back queues, active-run voice control, transcript/event health, echo suppression, consult question matching, forced-consult coordination, turn-context tracking, and fast context consult helpers | | `plugin-sdk/image-generation` | Image-generation helpers | Image generation provider types plus image asset/data URL helpers and the OpenAI-compatible image provider builder | | `plugin-sdk/image-generation-core` | Shared image-generation core | Image-generation types, failover, auth, and registry helpers | | `plugin-sdk/music-generation` | Music-generation helpers | Music-generation provider/request/result types | diff --git a/docs/plugins/sdk-subpaths.md b/docs/plugins/sdk-subpaths.md index ddffdb1e02e..1033ba9e4b7 100644 --- a/docs/plugins/sdk-subpaths.md +++ b/docs/plugins/sdk-subpaths.md @@ -331,7 +331,7 @@ focused channel/runtime subpaths, `config-contracts`, `string-coerce-runtime`, | `plugin-sdk/speech-core` | Shared speech provider types, registry, directive, normalization, and speech helper exports | | `plugin-sdk/realtime-transcription` | Realtime transcription provider types, registry helpers, and shared WebSocket session helper | | `plugin-sdk/realtime-bootstrap-context` | Realtime profile bootstrap helper for bounded `IDENTITY.md`, `USER.md`, and `SOUL.md` context injection | - | `plugin-sdk/realtime-voice` | Realtime voice provider types and registry helpers | + | `plugin-sdk/realtime-voice` | Realtime voice provider types, registry helpers, and shared realtime voice behavior helpers | | `plugin-sdk/image-generation` | Image generation provider types plus image asset/data URL helpers and the OpenAI-compatible image provider builder | | `plugin-sdk/image-generation-core` | Shared image-generation types, failover, auth, and registry helpers | | `plugin-sdk/music-generation` | Music generation provider/request/result types | diff --git a/extensions/discord/src/voice/realtime.ts b/extensions/discord/src/voice/realtime.ts index c0682bc7876..f4f0e8acb6c 100644 --- a/extensions/discord/src/voice/realtime.ts +++ b/extensions/discord/src/voice/realtime.ts @@ -7,6 +7,7 @@ import { controlRealtimeVoiceAgentRun, createRealtimeVoiceAgentTalkbackQueue, createRealtimeVoiceBridgeSession, + createRealtimeVoiceTurnContextTracker, matchRealtimeVoiceActivationName, matchRealtimeVoiceConsultQuestions, normalizeSupportedRealtimeVoiceActivationName, @@ -26,6 +27,8 @@ import { type RealtimeVoiceBridgeSession, type RealtimeVoiceProviderConfig, type RealtimeVoiceToolCallEvent, + type RealtimeVoiceTurnContextHandle, + type RealtimeVoiceTurnContextTracker, sortRealtimeVoiceActivationNames, type RealtimeVoiceActivationNameTranscriptResult, } from "openclaw/plugin-sdk/realtime-voice"; @@ -94,18 +97,18 @@ type DiscordRealtimeSpeakerContext = VoiceRealtimeSpeakerContext & { userId: str type DiscordRealtimeVoiceConfig = NonNullable["realtime"]; -type PendingSpeakerTurn = { - context: DiscordRealtimeSpeakerContext; - hasAudio: boolean; +type PendingSpeakerTurnStats = { inputDiscordBytes: number; inputRealtimeBytes: number; inputChunks: number; interruptedPlayback: boolean; - closed: boolean; - startedAt: number; - lastAudioAt?: number; }; +type PendingSpeakerTurn = RealtimeVoiceTurnContextHandle< + DiscordRealtimeSpeakerContext, + PendingSpeakerTurnStats +>; + type PendingAgentProxyConsultContext = { context: DiscordRealtimeSpeakerContext; question: string; @@ -126,11 +129,6 @@ type RecentAgentProxyConsultContext = { result?: RecentAgentProxyConsultResult; }; -type RecentIgnoredWakeNameSpeakerContext = { - context: DiscordRealtimeSpeakerContext; - createdAt: number; -}; - function formatRealtimeLogPreview(text: string): string { const oneLine = text.replace(/\s+/g, " ").trim(); if (oneLine.length <= DISCORD_REALTIME_LOG_PREVIEW_CHARS) { @@ -366,8 +364,15 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession { private wakeNames: string[] = []; private pendingAgentProxyConsultContexts: PendingAgentProxyConsultContext[] = []; private recentAgentProxyConsultContexts: RecentAgentProxyConsultContext[] = []; - private recentIgnoredWakeNameSpeakerContext: RecentIgnoredWakeNameSpeakerContext | undefined; - private readonly pendingSpeakerTurns: PendingSpeakerTurn[] = []; + private readonly speakerTurns: RealtimeVoiceTurnContextTracker< + DiscordRealtimeSpeakerContext, + PendingSpeakerTurnStats + > = createRealtimeVoiceTurnContextTracker( + { + limit: DISCORD_REALTIME_PENDING_SPEAKER_CONTEXT_LIMIT, + ignoredContextTtlMs: DISCORD_REALTIME_IGNORED_WAKE_NAME_CONTEXT_TTL_MS, + }, + ); private outputAudioTimestampMs = 0; private outputAudioDiscordBytes = 0; private outputAudioRealtimeBytes = 0; @@ -573,8 +578,7 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession { this.clearForcedConsultTimers(); this.pendingAgentProxyConsultContexts = []; this.recentAgentProxyConsultContexts = []; - this.recentIgnoredWakeNameSpeakerContext = undefined; - this.pendingSpeakerTurns.length = 0; + this.speakerTurns.clear(); this.queuedExactSpeechMessages = []; this.exactSpeechResponseActive = false; this.exactSpeechAudioStarted = false; @@ -613,29 +617,25 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession { beginSpeakerTurn(context: VoiceRealtimeSpeakerContext, userId: string): VoiceRealtimeSpeakerTurn { this.resetPartialWakeNameTracking(); - const turn: PendingSpeakerTurn = { - context: { ...context, userId }, - hasAudio: false, - inputDiscordBytes: 0, - inputRealtimeBytes: 0, - inputChunks: 0, - interruptedPlayback: false, - closed: false, - startedAt: Date.now(), - }; - this.pendingSpeakerTurns.push(turn); - logger.info( - `discord voice: realtime speaker turn opened guild=${this.params.entry.guildId} channel=${this.params.entry.channelId} user=${userId} speaker=${context.speakerLabel} owner=${context.senderIsOwner} pendingTurns=${this.pendingSpeakerTurns.length}`, + const turn = this.speakerTurns.open( + { ...context, userId }, + { + inputDiscordBytes: 0, + inputRealtimeBytes: 0, + inputChunks: 0, + interruptedPlayback: false, + }, + ); + logger.info( + `discord voice: realtime speaker turn opened guild=${this.params.entry.guildId} channel=${this.params.entry.channelId} user=${userId} speaker=${context.speakerLabel} owner=${context.senderIsOwner} pendingTurns=${this.speakerTurns.size()}`, ); - this.prunePendingSpeakerTurns(); return { sendInputAudio: (discordPcm48kStereo) => this.sendInputAudioForTurn(turn, discordPcm48kStereo), close: () => { this.sendRealtimeTrailingSilenceForTurn(turn); this.logSpeakerTurnClosed(turn); - turn.closed = true; - this.prunePendingSpeakerTurns(); + this.speakerTurns.close(turn); }, }; } @@ -644,13 +644,12 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession { if (!this.bridge || this.stopped) { return; } - turn.hasAudio = true; + this.speakerTurns.markAudio(turn); const realtimePcm = convertDiscordPcm48kStereoToRealtimePcm24kMono(discordPcm48kStereo); if (realtimePcm.length > 0) { turn.inputDiscordBytes += discordPcm48kStereo.length; turn.inputRealtimeBytes += realtimePcm.length; turn.inputChunks += 1; - turn.lastAudioAt = Date.now(); if (turn.inputChunks === 1) { logger.info( `discord voice: realtime input audio started guild=${this.params.entry.guildId} channel=${this.params.entry.channelId} user=${turn.context.userId} speaker=${turn.context.speakerLabel} discordBytes=${discordPcm48kStereo.length} realtimeBytes=${realtimePcm.length} outputAudioMs=${Math.floor(this.outputAudioTimestampMs)} outputActive=${this.isOutputAudioActive()}`, @@ -1456,79 +1455,25 @@ export class DiscordRealtimeVoiceSession implements VoiceRealtimeSession { } private consumePendingSpeakerContext(): DiscordRealtimeSpeakerContext | undefined { - this.prunePendingSpeakerTurns(); - this.expireClosedSpeakerTurnsBeforeLaterAudio(); - const index = this.pendingSpeakerTurns.findIndex((turn) => turn.hasAudio); - if (index < 0) { - return undefined; - } - const [turn] = this.pendingSpeakerTurns.splice(index, 1); - this.prunePendingSpeakerTurns(); - return turn?.context; + return this.speakerTurns.consumeAudioContext(); } private rememberIgnoredWakeNameSpeakerContext( context: DiscordRealtimeSpeakerContext | undefined, ): void { - if (!context) { - return; - } - this.recentIgnoredWakeNameSpeakerContext = { - context, - createdAt: Date.now(), - }; + this.speakerTurns.rememberIgnoredContext(context); } private consumeRecentIgnoredWakeNameSpeakerContext(): DiscordRealtimeSpeakerContext | undefined { - const recent = this.recentIgnoredWakeNameSpeakerContext; - this.recentIgnoredWakeNameSpeakerContext = undefined; - if ( - !recent || - Date.now() - recent.createdAt > DISCORD_REALTIME_IGNORED_WAKE_NAME_CONTEXT_TTL_MS - ) { - return undefined; - } - return recent.context; + return this.speakerTurns.consumeIgnoredContext(); } private peekPendingSpeakerTurn(): PendingSpeakerTurn | undefined { - this.prunePendingSpeakerTurns(); - this.expireClosedSpeakerTurnsBeforeLaterAudio(); - return this.pendingSpeakerTurns.find((turn) => turn.hasAudio); + return this.speakerTurns.peekAudioTurn(); } private hasPendingSpeakerAudioContext(): boolean { - this.prunePendingSpeakerTurns(); - this.expireClosedSpeakerTurnsBeforeLaterAudio(); - return this.pendingSpeakerTurns.some((turn) => turn.hasAudio); - } - - private prunePendingSpeakerTurns(): void { - for (let index = this.pendingSpeakerTurns.length - 1; index >= 0; index -= 1) { - const turn = this.pendingSpeakerTurns[index]; - if (turn?.closed && !turn.hasAudio) { - this.pendingSpeakerTurns.splice(index, 1); - } - } - while (this.pendingSpeakerTurns.length > DISCORD_REALTIME_PENDING_SPEAKER_CONTEXT_LIMIT) { - const completedIndex = this.pendingSpeakerTurns.findIndex((turn) => turn.closed); - this.pendingSpeakerTurns.splice(Math.max(completedIndex, 0), 1); - } - } - - private expireClosedSpeakerTurnsBeforeLaterAudio(): void { - let hasLaterAudio = false; - for (let index = this.pendingSpeakerTurns.length - 1; index >= 0; index -= 1) { - const turn = this.pendingSpeakerTurns[index]; - if (!turn?.hasAudio) { - continue; - } - if (turn.closed && hasLaterAudio) { - this.pendingSpeakerTurns.splice(index, 1); - continue; - } - hasLaterAudio = true; - } + return this.speakerTurns.hasAudioContext(); } private rememberRecentAgentProxyConsultContext( diff --git a/scripts/lib/plugin-sdk-doc-metadata.ts b/scripts/lib/plugin-sdk-doc-metadata.ts index 826bea06590..9fabc40e0e8 100644 --- a/scripts/lib/plugin-sdk-doc-metadata.ts +++ b/scripts/lib/plugin-sdk-doc-metadata.ts @@ -104,6 +104,9 @@ export const pluginSdkDocMetadata = { "speech-core": { category: "provider", }, + "realtime-voice": { + category: "provider", + }, "tts-runtime": { category: "runtime", }, diff --git a/src/plugin-sdk/realtime-voice.ts b/src/plugin-sdk/realtime-voice.ts index 666ce999ce2..6ab2068e2bf 100644 --- a/src/plugin-sdk/realtime-voice.ts +++ b/src/plugin-sdk/realtime-voice.ts @@ -84,6 +84,12 @@ export { type RealtimeVoiceForcedConsultNativeRecentOptions, type RealtimeVoiceForcedConsultTimer, } from "../talk/forced-consult-coordinator.js"; +export { + createRealtimeVoiceTurnContextTracker, + type RealtimeVoiceTurnContextHandle, + type RealtimeVoiceTurnContextTracker, + type RealtimeVoiceTurnContextTrackerOptions, +} from "../talk/turn-context-tracker.js"; export { buildRealtimeVoiceAgentConsultChatMessage, buildRealtimeVoiceAgentConsultPolicyInstructions, diff --git a/src/talk/turn-context-tracker.test.ts b/src/talk/turn-context-tracker.test.ts new file mode 100644 index 00000000000..bbe1bc59209 --- /dev/null +++ b/src/talk/turn-context-tracker.test.ts @@ -0,0 +1,124 @@ +import { describe, expect, it } from "vitest"; +import { createRealtimeVoiceTurnContextTracker } from "./turn-context-tracker.js"; + +describe("realtime voice turn context tracker", () => { + it("consumes audio contexts and prunes silent closed turns", () => { + const tracker = createRealtimeVoiceTurnContextTracker<{ id: string }>(); + const silent = tracker.open({ id: "silent" }); + const spoken = tracker.open({ id: "spoken" }); + + tracker.close(silent); + tracker.markAudio(spoken); + + expect(tracker.size()).toBe(1); + expect(tracker.consumeAudioContext()).toEqual({ id: "spoken" }); + expect(tracker.consumeAudioContext()).toBeUndefined(); + }); + + it("marks consumed handles closed when callers close them later", () => { + const tracker = createRealtimeVoiceTurnContextTracker<{ id: string }>(); + const turn = tracker.open({ id: "speaker" }); + tracker.markAudio(turn); + + expect(tracker.consumeAudioContext()).toEqual({ id: "speaker" }); + tracker.close(turn); + + expect(turn.closed).toBe(true); + }); + + it("ignores handles from another tracker", () => { + const first = createRealtimeVoiceTurnContextTracker<{ id: string }>(); + const second = createRealtimeVoiceTurnContextTracker<{ id: string }>(); + const firstTurn = first.open({ id: "first" }); + + second.markAudio(firstTurn); + second.close(firstTurn); + + expect(firstTurn.hasAudio).toBe(false); + expect(firstTurn.closed).toBe(false); + expect(first.consumeAudioContext()).toBeUndefined(); + }); + + it("drops closed audio turns that are older than later audio", () => { + const tracker = createRealtimeVoiceTurnContextTracker<{ id: string }>(); + const older = tracker.open({ id: "older" }); + tracker.markAudio(older); + tracker.close(older); + const later = tracker.open({ id: "later" }); + tracker.markAudio(later); + + expect(tracker.consumeAudioContext()).toEqual({ id: "later" }); + expect(tracker.consumeAudioContext()).toBeUndefined(); + }); + + it("retains caller-owned turn stats on peeked audio turns", () => { + const tracker = createRealtimeVoiceTurnContextTracker< + { id: string }, + { chunks: number; interruptedPlayback: boolean } + >(); + const turn = tracker.open({ id: "speaker" }, { chunks: 0, interruptedPlayback: false }); + + tracker.markAudio(turn); + turn.chunks += 1; + + expect(tracker.peekAudioTurn()).toMatchObject({ + context: { id: "speaker" }, + chunks: 1, + interruptedPlayback: false, + hasAudio: true, + }); + }); + + it("bounds retained turn handles", () => { + const tracker = createRealtimeVoiceTurnContextTracker<{ id: string }>({ limit: 2 }); + const first = tracker.open({ id: "first" }); + tracker.markAudio(first); + tracker.close(first); + const second = tracker.open({ id: "second" }); + tracker.markAudio(second); + const third = tracker.open({ id: "third" }); + tracker.markAudio(third); + + expect(tracker.consumeAudioContext()).toEqual({ id: "second" }); + expect(tracker.consumeAudioContext()).toEqual({ id: "third" }); + expect(tracker.consumeAudioContext()).toBeUndefined(); + }); + + it("allows a zero turn limit", () => { + const tracker = createRealtimeVoiceTurnContextTracker<{ id: string }>({ limit: 0 }); + const turn = tracker.open({ id: "discarded" }); + + tracker.markAudio(turn); + + expect(tracker.size()).toBe(0); + expect(turn.hasAudio).toBe(true); + expect(tracker.consumeAudioContext()).toBeUndefined(); + }); + + it("consumes recently ignored contexts once before the ttl expires", () => { + let now = 1_000; + const tracker = createRealtimeVoiceTurnContextTracker<{ id: string }>({ + ignoredContextTtlMs: 500, + now: () => now, + }); + + tracker.rememberIgnoredContext({ id: "recent" }); + + now = 1_400; + expect(tracker.consumeIgnoredContext()).toEqual({ id: "recent" }); + expect(tracker.consumeIgnoredContext()).toBeUndefined(); + }); + + it("expires ignored contexts after the ttl", () => { + let now = 1_000; + const tracker = createRealtimeVoiceTurnContextTracker<{ id: string }>({ + ignoredContextTtlMs: 500, + now: () => now, + }); + + tracker.rememberIgnoredContext({ id: "old" }); + now = 1_501; + + expect(tracker.consumeIgnoredContext()).toBeUndefined(); + }); +}); diff --git a/src/talk/turn-context-tracker.ts b/src/talk/turn-context-tracker.ts new file mode 100644 index 00000000000..b56e09691b7 --- /dev/null +++ b/src/talk/turn-context-tracker.ts @@ -0,0 +1,194 @@ +const DEFAULT_REALTIME_VOICE_TURN_CONTEXT_LIMIT = 32; +const DEFAULT_REALTIME_VOICE_IGNORED_CONTEXT_TTL_MS = 10_000; + +export type RealtimeVoiceTurnContextTrackerOptions = { + limit?: number; + ignoredContextTtlMs?: number; + now?: () => number; +}; + +export type RealtimeVoiceTurnContextHandle< + TContext, + TExtra extends object = Record, +> = TExtra & { + id: string; + context: TContext; + hasAudio: boolean; + closed: boolean; + startedAt: number; + lastAudioAt?: number; +}; + +type RealtimeVoiceTurnContextOpenArgs = keyof TExtra extends never + ? [extra?: TExtra] + : [extra: TExtra]; + +export type RealtimeVoiceTurnContextTracker< + TContext, + TExtra extends object = Record, +> = { + open( + context: TContext, + ...extra: RealtimeVoiceTurnContextOpenArgs + ): RealtimeVoiceTurnContextHandle; + markAudio(handle: RealtimeVoiceTurnContextHandle): void; + close(handle: RealtimeVoiceTurnContextHandle): void; + consumeAudioContext(): TContext | undefined; + peekAudioTurn(): RealtimeVoiceTurnContextHandle | undefined; + hasAudioContext(): boolean; + rememberIgnoredContext(context: TContext | undefined): void; + consumeIgnoredContext(): TContext | undefined; + size(): number; + clear(): void; +}; + +type RecentIgnoredContext = { + context: TContext; + createdAt: number; +}; + +function normalizeNonNegativeInteger(value: number | undefined, fallback: number): number { + if (value === undefined || !Number.isFinite(value)) { + return fallback; + } + return Math.max(0, Math.floor(value)); +} + +export function createRealtimeVoiceTurnContextTracker< + TContext, + TExtra extends object = Record, +>( + options: RealtimeVoiceTurnContextTrackerOptions = {}, +): RealtimeVoiceTurnContextTracker { + const turns: RealtimeVoiceTurnContextHandle[] = []; + let recentIgnoredContext: RecentIgnoredContext | undefined; + let nextId = 0; + const owner = Symbol("realtimeVoiceTurnContextTracker"); + const now = options.now ?? Date.now; + const limit = normalizeNonNegativeInteger( + options.limit, + DEFAULT_REALTIME_VOICE_TURN_CONTEXT_LIMIT, + ); + const ignoredContextTtlMs = normalizeNonNegativeInteger( + options.ignoredContextTtlMs, + DEFAULT_REALTIME_VOICE_IGNORED_CONTEXT_TTL_MS, + ); + + const prune = () => { + for (let index = turns.length - 1; index >= 0; index -= 1) { + const turn = turns[index]; + if (turn?.closed && !turn.hasAudio) { + turns.splice(index, 1); + } + } + while (turns.length > limit) { + const completedIndex = turns.findIndex((turn) => turn.closed); + turns.splice(Math.max(completedIndex, 0), 1); + } + }; + + const expireClosedTurnsBeforeLaterAudio = () => { + let hasLaterAudio = false; + for (let index = turns.length - 1; index >= 0; index -= 1) { + const turn = turns[index]; + if (!turn?.hasAudio) { + continue; + } + if (turn.closed && hasLaterAudio) { + turns.splice(index, 1); + continue; + } + hasLaterAudio = true; + } + }; + + const prepareForAudioContextRead = () => { + prune(); + expireClosedTurnsBeforeLaterAudio(); + }; + + const owns = (handle: RealtimeVoiceTurnContextHandle) => + ( + handle as RealtimeVoiceTurnContextHandle & { + [owner]?: true; + } + )[owner] === true; + + return { + open(context, ...extra) { + const startedAt = now(); + const handle: RealtimeVoiceTurnContextHandle = { + ...(extra[0] ?? ({} as TExtra)), + [owner]: true, + id: `realtime-turn:${startedAt}:${++nextId}`, + context, + hasAudio: false, + closed: false, + startedAt, + }; + turns.push(handle); + prune(); + return handle; + }, + markAudio(handle) { + if (!owns(handle)) { + return; + } + handle.hasAudio = true; + handle.lastAudioAt = now(); + if (!turns.includes(handle)) { + return; + } + }, + close(handle) { + if (!owns(handle)) { + return; + } + handle.closed = true; + if (!turns.includes(handle)) { + return; + } + prune(); + }, + consumeAudioContext() { + prepareForAudioContextRead(); + const index = turns.findIndex((turn) => turn.hasAudio); + if (index < 0) { + return undefined; + } + const [turn] = turns.splice(index, 1); + prune(); + return turn?.context; + }, + peekAudioTurn() { + prepareForAudioContextRead(); + return turns.find((turn) => turn.hasAudio); + }, + hasAudioContext() { + prepareForAudioContextRead(); + return turns.some((turn) => turn.hasAudio); + }, + rememberIgnoredContext(context) { + if (!context) { + return; + } + recentIgnoredContext = { context, createdAt: now() }; + }, + consumeIgnoredContext() { + const recent = recentIgnoredContext; + recentIgnoredContext = undefined; + if (!recent || now() - recent.createdAt > ignoredContextTtlMs) { + return undefined; + } + return recent.context; + }, + size() { + prune(); + return turns.length; + }, + clear() { + turns.length = 0; + recentIgnoredContext = undefined; + }, + }; +}