feat(android): wire TalkModeManager into NodeRuntime for voice screen TTS

TalkModeManager is instantiated lazily in NodeRuntime and drives ElevenLabs
streaming TTS for all assistant responses when the voice screen is active.
MicCaptureManager continues to own STT and chat.send; TalkModeManager is
TTS-only (ttsOnAllResponses = true, setEnabled never called).

- talkMode.ttsOnAllResponses = true when mic is enabled or voice screen active
- Barge-in: tapping the mic button calls stopTts() before re-enabling mic
- Lifecycle: PostOnboardingTabs LaunchedEffect + VoiceTabScreen onDispose both
  call setVoiceScreenActive(false) so TTS stops cleanly on tab switch or
  app backgrounding
- applyMainSessionKey wires the session key into TalkModeManager so it
  subscribes to the correct chat session for TTS
This commit is contained in:
Greg Mousseau
2026-02-28 01:32:45 -05:00
committed by Ayaan Zaidi
parent f0fcecd7c1
commit 68db055f1a
6 changed files with 581 additions and 32 deletions

1
.gitignore vendored
View File

@@ -120,3 +120,4 @@ dist/protocol.schema.json
# Synthing # Synthing
**/.stfolder/ **/.stfolder/
apps/android/.kotlin/

View File

@@ -130,6 +130,10 @@ class MainViewModel(app: Application) : AndroidViewModel(app) {
runtime.setCanvasDebugStatusEnabled(value) runtime.setCanvasDebugStatusEnabled(value)
} }
fun setVoiceScreenActive(active: Boolean) {
runtime.setVoiceScreenActive(active)
}
fun setMicEnabled(enabled: Boolean) { fun setMicEnabled(enabled: Boolean) {
runtime.setMicEnabled(enabled) runtime.setMicEnabled(enabled)
} }

View File

@@ -387,11 +387,22 @@ class NodeRuntime(context: Context) {
val micIsSending: StateFlow<Boolean> val micIsSending: StateFlow<Boolean>
get() = micCapture.isSending get() = micCapture.isSending
private val talkMode: TalkModeManager by lazy {
TalkModeManager(
context = appContext,
scope = scope,
session = operatorSession,
supportsChatSubscribe = true,
isConnected = { operatorConnected },
)
}
private fun applyMainSessionKey(candidate: String?) { private fun applyMainSessionKey(candidate: String?) {
val trimmed = normalizeMainKey(candidate) ?: return val trimmed = normalizeMainKey(candidate) ?: return
if (isCanonicalMainSessionKey(_mainSessionKey.value)) return if (isCanonicalMainSessionKey(_mainSessionKey.value)) return
if (_mainSessionKey.value == trimmed) return if (_mainSessionKey.value == trimmed) return
_mainSessionKey.value = trimmed _mainSessionKey.value = trimmed
talkMode.setMainSessionKey(trimmed)
chat.applyMainSessionKey(trimmed) chat.applyMainSessionKey(trimmed)
} }
@@ -529,7 +540,14 @@ class NodeRuntime(context: Context) {
scope.launch { scope.launch {
prefs.talkEnabled.collect { enabled -> prefs.talkEnabled.collect { enabled ->
// MicCaptureManager handles STT + send to gateway.
// TalkModeManager plays TTS on assistant responses.
micCapture.setMicEnabled(enabled) micCapture.setMicEnabled(enabled)
if (enabled) {
// Mic on = user is on voice screen and wants TTS responses.
talkMode.ttsOnAllResponses = true
scope.launch { talkMode.ensureChatSubscribed() }
}
externalAudioCaptureActive.value = enabled externalAudioCaptureActive.value = enabled
} }
} }
@@ -637,8 +655,25 @@ class NodeRuntime(context: Context) {
prefs.setCanvasDebugStatusEnabled(value) prefs.setCanvasDebugStatusEnabled(value)
} }
fun setVoiceScreenActive(active: Boolean) {
if (!active) {
// User left voice screen — stop mic and TTS
talkMode.ttsOnAllResponses = false
talkMode.stopTts()
micCapture.setMicEnabled(false)
prefs.setTalkEnabled(false)
}
// Don't re-enable on active=true; mic toggle drives that
}
fun setMicEnabled(value: Boolean) { fun setMicEnabled(value: Boolean) {
prefs.setTalkEnabled(value) prefs.setTalkEnabled(value)
if (value) {
// Tapping mic on interrupts any active TTS (barge-in)
talkMode.stopTts()
talkMode.ttsOnAllResponses = true
scope.launch { talkMode.ensureChatSubscribed() }
}
micCapture.setMicEnabled(value) micCapture.setMicEnabled(value)
externalAudioCaptureActive.value = value externalAudioCaptureActive.value = value
} }
@@ -834,6 +869,7 @@ class NodeRuntime(context: Context) {
private fun handleGatewayEvent(event: String, payloadJson: String?) { private fun handleGatewayEvent(event: String, payloadJson: String?) {
micCapture.handleGatewayEvent(event, payloadJson) micCapture.handleGatewayEvent(event, payloadJson)
talkMode.handleGatewayEvent(event, payloadJson)
chat.handleGatewayEvent(event, payloadJson) chat.handleGatewayEvent(event, payloadJson)
} }

View File

@@ -31,6 +31,7 @@ import androidx.compose.material3.Surface
import androidx.compose.material3.Text import androidx.compose.material3.Text
import androidx.compose.runtime.Composable import androidx.compose.runtime.Composable
import androidx.compose.runtime.collectAsState import androidx.compose.runtime.collectAsState
import androidx.compose.runtime.LaunchedEffect
import androidx.compose.runtime.getValue import androidx.compose.runtime.getValue
import androidx.compose.runtime.mutableStateOf import androidx.compose.runtime.mutableStateOf
import androidx.compose.runtime.remember import androidx.compose.runtime.remember
@@ -68,6 +69,11 @@ private enum class StatusVisual {
fun PostOnboardingTabs(viewModel: MainViewModel, modifier: Modifier = Modifier) { fun PostOnboardingTabs(viewModel: MainViewModel, modifier: Modifier = Modifier) {
var activeTab by rememberSaveable { mutableStateOf(HomeTab.Connect) } var activeTab by rememberSaveable { mutableStateOf(HomeTab.Connect) }
// Stop TTS when user navigates away from voice tab
LaunchedEffect(activeTab) {
viewModel.setVoiceScreenActive(activeTab == HomeTab.Voice)
}
val statusText by viewModel.statusText.collectAsState() val statusText by viewModel.statusText.collectAsState()
val isConnected by viewModel.isConnected.collectAsState() val isConnected by viewModel.isConnected.collectAsState()

View File

@@ -101,7 +101,11 @@ fun VoiceTabScreen(viewModel: MainViewModel) {
} }
} }
lifecycleOwner.lifecycle.addObserver(observer) lifecycleOwner.lifecycle.addObserver(observer)
onDispose { lifecycleOwner.lifecycle.removeObserver(observer) } onDispose {
lifecycleOwner.lifecycle.removeObserver(observer)
// Stop TTS when leaving the voice screen
viewModel.setVoiceScreenActive(false)
}
} }
val requestMicPermission = val requestMicPermission =

View File

@@ -5,6 +5,7 @@ import android.content.Context
import android.content.Intent import android.content.Intent
import android.content.pm.PackageManager import android.content.pm.PackageManager
import android.media.AudioAttributes import android.media.AudioAttributes
import android.media.AudioFocusRequest
import android.media.AudioFormat import android.media.AudioFormat
import android.media.AudioManager import android.media.AudioManager
import android.media.AudioTrack import android.media.AudioTrack
@@ -23,16 +24,19 @@ import androidx.core.content.ContextCompat
import ai.openclaw.android.gateway.GatewaySession import ai.openclaw.android.gateway.GatewaySession
import ai.openclaw.android.isCanonicalMainSessionKey import ai.openclaw.android.isCanonicalMainSessionKey
import ai.openclaw.android.normalizeMainKey import ai.openclaw.android.normalizeMainKey
import android.os.Build
import java.io.File
import java.net.HttpURLConnection import java.net.HttpURLConnection
import java.net.URL import java.net.URL
import java.util.UUID import java.util.UUID
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
@@ -56,7 +60,12 @@ class TalkModeManager(
private const val tag = "TalkMode" private const val tag = "TalkMode"
private const val defaultModelIdFallback = "eleven_v3" private const val defaultModelIdFallback = "eleven_v3"
private const val defaultOutputFormatFallback = "pcm_24000" private const val defaultOutputFormatFallback = "pcm_24000"
private const val defaultTalkProvider = "elevenlabs" private const val defaultTalkProvider = "elevenlabs"
private const val silenceWindowMs = 500L
private const val listenWatchdogMs = 12_000L
private const val chatFinalWaitWithSubscribeMs = 45_000L
private const val chatFinalWaitWithoutSubscribeMs = 6_000L
private const val maxCachedRunCompletions = 128
internal data class TalkProviderConfigSelection( internal data class TalkProviderConfigSelection(
val provider: String, val provider: String,
@@ -140,26 +149,55 @@ class TalkModeManager(
private var defaultOutputFormat: String? = null private var defaultOutputFormat: String? = null
private var apiKey: String? = null private var apiKey: String? = null
private var voiceAliases: Map<String, String> = emptyMap() private var voiceAliases: Map<String, String> = emptyMap()
private var interruptOnSpeech: Boolean = true // Interrupt-on-speech is disabled by default: starting a SpeechRecognizer during
// TTS creates an audio session conflict on OxygenOS/OnePlus that causes AudioTrack
// write to return 0 and MediaPlayer to error. Can be enabled via gateway talk config.
private var activeProviderIsElevenLabs: Boolean = true
private var interruptOnSpeech: Boolean = false
private var voiceOverrideActive = false private var voiceOverrideActive = false
private var modelOverrideActive = false private var modelOverrideActive = false
private var mainSessionKey: String = "main" private var mainSessionKey: String = "main"
private var pendingRunId: String? = null @Volatile private var pendingRunId: String? = null
private var pendingFinal: CompletableDeferred<Boolean>? = null private var pendingFinal: CompletableDeferred<Boolean>? = null
private val completedRunsLock = Any()
private val completedRunStates = LinkedHashMap<String, Boolean>()
private val completedRunTexts = LinkedHashMap<String, String>()
private var chatSubscribedSessionKey: String? = null private var chatSubscribedSessionKey: String? = null
private var configLoaded = false private var configLoaded = false
@Volatile private var playbackEnabled = true @Volatile private var playbackEnabled = true
private val playbackGeneration = AtomicLong(0L) private val playbackGeneration = AtomicLong(0L)
private var ttsJob: Job? = null
private var player: MediaPlayer? = null private var player: MediaPlayer? = null
private var streamingSource: StreamingMediaDataSource? = null private var streamingSource: StreamingMediaDataSource? = null
private var pcmTrack: AudioTrack? = null private var pcmTrack: AudioTrack? = null
@Volatile private var pcmStopRequested = false @Volatile private var pcmStopRequested = false
@Volatile private var finalizeInFlight = false
private var listenWatchdogJob: Job? = null
private var systemTts: TextToSpeech? = null private var systemTts: TextToSpeech? = null
private var systemTtsPending: CompletableDeferred<Unit>? = null private var systemTtsPending: CompletableDeferred<Unit>? = null
private var systemTtsPendingId: String? = null private var systemTtsPendingId: String? = null
private var audioFocusRequest: AudioFocusRequest? = null
private val audioFocusListener = AudioManager.OnAudioFocusChangeListener { focusChange ->
when (focusChange) {
AudioManager.AUDIOFOCUS_LOSS,
AudioManager.AUDIOFOCUS_LOSS_TRANSIENT -> {
if (_isSpeaking.value) {
Log.d(tag, "audio focus lost; stopping TTS")
stopSpeaking(resetInterrupt = true)
}
}
else -> { /* regained or duck — ignore */ }
}
}
suspend fun ensureChatSubscribed() {
reloadConfig()
subscribeChatIfNeeded(session = session, sessionKey = mainSessionKey.ifBlank { "main" })
}
fun setMainSessionKey(sessionKey: String?) { fun setMainSessionKey(sessionKey: String?) {
val trimmed = sessionKey?.trim().orEmpty() val trimmed = sessionKey?.trim().orEmpty()
if (trimmed.isEmpty()) return if (trimmed.isEmpty()) return
@@ -179,10 +217,174 @@ class TalkModeManager(
} }
} }
/**
* Speak a wake-word command through TalkMode's full pipeline:
* chat.send → wait for final → read assistant text → TTS.
* Calls [onComplete] when done so the caller can disable TalkMode and re-arm VoiceWake.
*/
fun speakWakeCommand(command: String, onComplete: () -> Unit) {
scope.launch {
try {
reloadConfig()
subscribeChatIfNeeded(session = session, sessionKey = mainSessionKey.ifBlank { "main" })
val startedAt = System.currentTimeMillis().toDouble() / 1000.0
val prompt = buildPrompt(command)
val runId = sendChat(prompt, session)
val ok = waitForChatFinal(runId)
val assistant = consumeRunText(runId)
?: waitForAssistantText(session, startedAt, if (ok) 12_000 else 25_000)
if (!assistant.isNullOrBlank()) {
val playbackToken = playbackGeneration.incrementAndGet()
_statusText.value = "Speaking…"
playAssistant(assistant, playbackToken)
} else {
_statusText.value = "No reply"
}
} catch (err: Throwable) {
Log.w(tag, "speakWakeCommand failed: ${err.message}")
}
onComplete()
}
}
/** When true, play TTS for all final chat responses (even ones we didn't initiate). */
@Volatile var ttsOnAllResponses = false
// Streaming TTS: active session keyed by runId
private var streamingTts: ElevenLabsStreamingTts? = null
private var streamingFullText: String = ""
@Volatile private var lastHandledStreamingRunId: String? = null
private var drainingTts: ElevenLabsStreamingTts? = null
private fun stopActiveStreamingTts() {
streamingTts?.stop()
streamingTts = null
drainingTts?.stop()
drainingTts = null
streamingFullText = ""
}
/** Handle agent stream events — only speak assistant text, not tool calls or thinking. */
private fun handleAgentStreamEvent(payloadJson: String?) {
if (payloadJson.isNullOrBlank()) return
val payload = try {
json.parseToJsonElement(payloadJson).asObjectOrNull()
} catch (_: Throwable) { null } ?: return
// Only speak events for the active session — prevents TTS leaking from
// concurrent sessions/channels (privacy + correctness).
val eventSession = payload["sessionKey"]?.asStringOrNull()
val activeSession = mainSessionKey.ifBlank { "main" }
if (eventSession != null && eventSession != activeSession) return
val stream = payload["stream"]?.asStringOrNull() ?: return
if (stream != "assistant") return // Only speak assistant text
val data = payload["data"]?.asObjectOrNull() ?: return
if (data["type"]?.asStringOrNull() == "thinking") return // Skip thinking tokens
val text = data["text"]?.asStringOrNull()?.trim() ?: return
if (text.isEmpty()) return
if (!playbackEnabled) {
stopActiveStreamingTts()
return
}
// Start streaming session if not already active
if (streamingTts == null) {
if (!activeProviderIsElevenLabs) return // Non-ElevenLabs provider — skip streaming TTS
val voiceId = currentVoiceId ?: defaultVoiceId
val apiKey = this.apiKey
if (voiceId == null || apiKey == null) {
Log.w(tag, "streaming TTS: missing voiceId or apiKey")
return
}
val modelId = currentModelId ?: defaultModelId ?: ""
val streamModel = if (ElevenLabsStreamingTts.supportsStreaming(modelId)) {
modelId
} else {
"eleven_flash_v2_5"
}
val tts = ElevenLabsStreamingTts(
scope = scope,
voiceId = voiceId,
apiKey = apiKey,
modelId = streamModel,
outputFormat = "pcm_24000",
sampleRate = 24000,
)
streamingTts = tts
streamingFullText = ""
_isSpeaking.value = true
_statusText.value = "Speaking…"
tts.start()
Log.d(tag, "streaming TTS started for agent assistant text")
lastHandledStreamingRunId = null // will be set on final
}
val accepted = streamingTts?.sendText(text) ?: false
if (!accepted && streamingTts != null) {
Log.d(tag, "text diverged, restarting streaming TTS")
streamingTts?.stop()
streamingTts = null
// Restart with the new text
val voiceId2 = currentVoiceId ?: defaultVoiceId
val apiKey2 = this.apiKey
if (voiceId2 != null && apiKey2 != null) {
val modelId2 = currentModelId ?: defaultModelId ?: ""
val streamModel2 = if (ElevenLabsStreamingTts.supportsStreaming(modelId2)) modelId2 else "eleven_flash_v2_5"
val newTts = ElevenLabsStreamingTts(
scope = scope, voiceId = voiceId2, apiKey = apiKey2,
modelId = streamModel2, outputFormat = "pcm_24000", sampleRate = 24000,
)
streamingTts = newTts
streamingFullText = text
newTts.start()
newTts.sendText(streamingFullText)
Log.d(tag, "streaming TTS restarted with new text")
}
}
}
/** Called when chat final/error/aborted arrives — finish any active streaming TTS. */
private fun finishStreamingTts() {
streamingFullText = ""
val tts = streamingTts ?: return
// Null out immediately so the next response creates a fresh TTS instance.
// The drain coroutine below holds a reference to this instance for cleanup.
streamingTts = null
drainingTts = tts
tts.finish()
scope.launch {
delay(500)
while (tts.isPlaying.value) { delay(200) }
if (drainingTts === tts) drainingTts = null
_isSpeaking.value = false
_statusText.value = "Ready"
}
}
fun playTtsForText(text: String) {
val playbackToken = playbackGeneration.incrementAndGet()
ttsJob?.cancel()
ttsJob = scope.launch {
reloadConfig()
ensurePlaybackActive(playbackToken)
_isSpeaking.value = true
_statusText.value = "Speaking…"
playAssistant(text, playbackToken)
ttsJob = null
}
}
fun handleGatewayEvent(event: String, payloadJson: String?) { fun handleGatewayEvent(event: String, payloadJson: String?) {
if (ttsOnAllResponses) {
Log.d(tag, "gateway event: $event")
}
if (event == "agent" && ttsOnAllResponses) {
handleAgentStreamEvent(payloadJson)
return
}
if (event != "chat") return if (event != "chat") return
if (payloadJson.isNullOrBlank()) return if (payloadJson.isNullOrBlank()) return
val pending = pendingRunId ?: return
val obj = val obj =
try { try {
json.parseToJsonElement(payloadJson).asObjectOrNull() json.parseToJsonElement(payloadJson).asObjectOrNull()
@@ -190,13 +392,68 @@ class TalkModeManager(
null null
} ?: return } ?: return
val runId = obj["runId"].asStringOrNull() ?: return val runId = obj["runId"].asStringOrNull() ?: return
if (runId != pending) return
val state = obj["state"].asStringOrNull() ?: return val state = obj["state"].asStringOrNull() ?: return
if (state == "final") {
pendingFinal?.complete(true) // Only speak events for the active session — prevents TTS from other
pendingFinal = null // sessions/channels leaking into voice mode (privacy + correctness).
pendingRunId = null val eventSession = obj["sessionKey"]?.asStringOrNull()
val activeSession = mainSessionKey.ifBlank { "main" }
if (eventSession != null && eventSession != activeSession) return
// If this is a response we initiated, handle normally below.
// Otherwise, if ttsOnAllResponses, finish streaming TTS on terminal events.
val pending = pendingRunId
if (pending == null || runId != pending) {
if (ttsOnAllResponses && state in listOf("final", "error", "aborted")) {
// Skip if we already handled TTS for this run (multiple final events
// can arrive on different threads for the same run).
if (lastHandledStreamingRunId == runId) {
if (pending == null || runId != pending) return
}
lastHandledStreamingRunId = runId
val stts = streamingTts
if (stts != null) {
// Finish streaming and let the drain coroutine handle playback completion.
// Dont check hasReceivedAudio synchronously — audio may still be in flight
// from the WebSocket (EOS was just sent). The drain coroutine in finishStreamingTts
// waits for playback to complete; if ElevenLabs truly fails, the user just wont
// hear anything (silent failure is better than double-speaking with system TTS).
finishStreamingTts()
} else if (state == "final") {
// No streaming was active — fall back to non-streaming
val text = extractTextFromChatEventMessage(obj["message"])
if (!text.isNullOrBlank()) {
playTtsForText(text)
}
}
}
if (pending == null || runId != pending) return
} }
Log.d(tag, "chat event arrived runId=$runId state=$state pendingRunId=$pendingRunId")
val terminal =
when (state) {
"final" -> true
"aborted", "error" -> false
else -> null
} ?: return
// Cache text from final event so we never need to poll chat.history
if (terminal) {
val text = extractTextFromChatEventMessage(obj["message"])
if (!text.isNullOrBlank()) {
synchronized(completedRunsLock) {
completedRunTexts[runId] = text
while (completedRunTexts.size > maxCachedRunCompletions) {
completedRunTexts.entries.firstOrNull()?.let { completedRunTexts.remove(it.key) }
}
}
}
}
cacheRunCompletion(runId, terminal)
if (runId != pendingRunId) return
pendingFinal?.complete(terminal)
pendingFinal = null
pendingRunId = null
} }
fun setPlaybackEnabled(enabled: Boolean) { fun setPlaybackEnabled(enabled: Boolean) {
@@ -204,6 +461,7 @@ class TalkModeManager(
playbackEnabled = enabled playbackEnabled = enabled
if (!enabled) { if (!enabled) {
playbackGeneration.incrementAndGet() playbackGeneration.incrementAndGet()
stopActiveStreamingTts()
stopSpeaking() stopSpeaking()
} }
} }
@@ -258,6 +516,7 @@ class TalkModeManager(
private fun stop() { private fun stop() {
stopRequested = true stopRequested = true
finalizeInFlight = false
listeningMode = false listeningMode = false
restartJob?.cancel() restartJob?.cancel()
restartJob = null restartJob = null
@@ -270,6 +529,13 @@ class TalkModeManager(
stopSpeaking() stopSpeaking()
_usingFallbackTts.value = false _usingFallbackTts.value = false
chatSubscribedSessionKey = null chatSubscribedSessionKey = null
pendingRunId = null
pendingFinal?.cancel()
pendingFinal = null
synchronized(completedRunsLock) {
completedRunStates.clear()
completedRunTexts.clear()
}
mainHandler.post { mainHandler.post {
recognizer?.cancel() recognizer?.cancel()
@@ -290,6 +556,10 @@ class TalkModeManager(
putExtra(RecognizerIntent.EXTRA_PARTIAL_RESULTS, true) putExtra(RecognizerIntent.EXTRA_PARTIAL_RESULTS, true)
putExtra(RecognizerIntent.EXTRA_MAX_RESULTS, 3) putExtra(RecognizerIntent.EXTRA_MAX_RESULTS, 3)
putExtra(RecognizerIntent.EXTRA_CALLING_PACKAGE, context.packageName) putExtra(RecognizerIntent.EXTRA_CALLING_PACKAGE, context.packageName)
// Use cloud recognition — it handles natural speech and pauses better
// than on-device which cuts off aggressively after short silences.
putExtra(RecognizerIntent.EXTRA_SPEECH_INPUT_COMPLETE_SILENCE_LENGTH_MILLIS, 2500L)
putExtra(RecognizerIntent.EXTRA_SPEECH_INPUT_POSSIBLY_COMPLETE_SILENCE_LENGTH_MILLIS, 1800L)
} }
if (markListening) { if (markListening) {
@@ -309,8 +579,8 @@ class TalkModeManager(
if (stopRequested) return@post if (stopRequested) return@post
try { try {
recognizer?.cancel() recognizer?.cancel()
val shouldListen = listeningMode val shouldListen = listeningMode && !finalizeInFlight
val shouldInterrupt = _isSpeaking.value && interruptOnSpeech val shouldInterrupt = _isSpeaking.value && interruptOnSpeech && shouldAllowSpeechInterrupt()
if (!shouldListen && !shouldInterrupt) return@post if (!shouldListen && !shouldInterrupt) return@post
startListeningInternal(markListening = shouldListen) startListeningInternal(markListening = shouldListen)
} catch (_: Throwable) { } catch (_: Throwable) {
@@ -338,6 +608,9 @@ class TalkModeManager(
if (isFinal) { if (isFinal) {
lastTranscript = trimmed lastTranscript = trimmed
// Don't finalize immediately — let the silence monitor trigger after
// silenceWindowMs. This allows the recognizer to fire onResults and
// still give the user a natural pause before we send.
} }
} }
@@ -359,7 +632,15 @@ class TalkModeManager(
val lastHeard = lastHeardAtMs ?: return val lastHeard = lastHeardAtMs ?: return
val elapsed = SystemClock.elapsedRealtime() - lastHeard val elapsed = SystemClock.elapsedRealtime() - lastHeard
if (elapsed < silenceWindowMs) return if (elapsed < silenceWindowMs) return
scope.launch { finalizeTranscript(transcript) } if (finalizeInFlight) return
finalizeInFlight = true
scope.launch {
try {
finalizeTranscript(transcript)
} finally {
finalizeInFlight = false
}
}
} }
private suspend fun finalizeTranscript(transcript: String) { private suspend fun finalizeTranscript(transcript: String) {
@@ -368,6 +649,16 @@ class TalkModeManager(
_statusText.value = "Thinking…" _statusText.value = "Thinking…"
lastTranscript = "" lastTranscript = ""
lastHeardAtMs = null lastHeardAtMs = null
// Release SpeechRecognizer before making the API call and playing TTS.
// Must use withContext(Main) — not post() — so we WAIT for destruction before
// proceeding. A fire-and-forget post() races with TTS startup: the recognizer
// stays alive, picks up TTS audio as speech (onBeginningOfSpeech), and the
// OS kills the AudioTrack write (returns 0) on OxygenOS/OnePlus devices.
withContext(Dispatchers.Main) {
recognizer?.cancel()
recognizer?.destroy()
recognizer = null
}
ensureConfigLoaded() ensureConfigLoaded()
val prompt = buildPrompt(transcript) val prompt = buildPrompt(transcript)
@@ -388,7 +679,9 @@ class TalkModeManager(
if (!ok) { if (!ok) {
Log.w(tag, "chat final timeout runId=$runId; attempting history fallback") Log.w(tag, "chat final timeout runId=$runId; attempting history fallback")
} }
val assistant = waitForAssistantText(session, startedAt, if (ok) 12_000 else 25_000) // Use text cached from the final event first — avoids chat.history polling
val assistant = consumeRunText(runId)
?: waitForAssistantText(session, startedAt, if (ok) 12_000 else 25_000)
if (assistant.isNullOrBlank()) { if (assistant.isNullOrBlank()) {
_statusText.value = "No reply" _statusText.value = "No reply"
Log.w(tag, "assistant text timeout runId=$runId") Log.w(tag, "assistant text timeout runId=$runId")
@@ -482,6 +775,36 @@ class TalkModeManager(
return result return result
} }
private fun cacheRunCompletion(runId: String, isFinal: Boolean) {
synchronized(completedRunsLock) {
completedRunStates[runId] = isFinal
while (completedRunStates.size > maxCachedRunCompletions) {
val first = completedRunStates.entries.firstOrNull() ?: break
completedRunStates.remove(first.key)
}
}
}
private fun consumeRunCompletion(runId: String): Boolean? {
synchronized(completedRunsLock) {
return completedRunStates.remove(runId)
}
}
private fun consumeRunText(runId: String): String? {
synchronized(completedRunsLock) {
return completedRunTexts.remove(runId)
}
}
private fun extractTextFromChatEventMessage(messageEl: JsonElement?): String? {
val msg = messageEl?.asObjectOrNull() ?: return null
val content = msg["content"] as? JsonArray ?: return null
return content.mapNotNull { entry ->
entry.asObjectOrNull()?.get("text")?.asStringOrNull()?.trim()
}.filter { it.isNotEmpty() }.joinToString("\n").takeIf { it.isNotBlank() }
}
private suspend fun waitForAssistantText( private suspend fun waitForAssistantText(
session: GatewaySession, session: GatewaySession,
sinceSeconds: Double, sinceSeconds: Double,
@@ -566,6 +889,7 @@ class TalkModeManager(
_isSpeaking.value = true _isSpeaking.value = true
lastSpokenText = cleaned lastSpokenText = cleaned
ensureInterruptListener() ensureInterruptListener()
requestAudioFocusForTts()
try { try {
val canUseElevenLabs = !voiceId.isNullOrBlank() && !apiKey.isNullOrEmpty() val canUseElevenLabs = !voiceId.isNullOrBlank() && !apiKey.isNullOrEmpty()
@@ -623,6 +947,7 @@ class TalkModeManager(
Log.w(tag, "system voice failed: ${fallbackErr.message ?: fallbackErr::class.simpleName}") Log.w(tag, "system voice failed: ${fallbackErr.message ?: fallbackErr::class.simpleName}")
} }
} finally { } finally {
_isSpeaking.value = false _isSpeaking.value = false
} }
} }
@@ -655,8 +980,14 @@ class TalkModeManager(
} }
} }
ensurePlaybackActive(playbackToken) // When falling back from PCM, rewrite format to MP3 and download to file.
streamAndPlayMp3(voiceId = voiceId, apiKey = apiKey, request = request, playbackToken = playbackToken) // File-based playback avoids custom DataSource races and is reliable across OEMs.
val mp3Request = if (request.outputFormat?.startsWith("pcm_") == true) {
request.copy(outputFormat = "mp3_44100_128")
} else {
request
}
streamAndPlayMp3(voiceId = voiceId, apiKey = apiKey, request = mp3Request, playbackToken = playbackToken)
} }
private suspend fun streamAndPlayMp3( private suspend fun streamAndPlayMp3(
@@ -677,7 +1008,7 @@ class TalkModeManager(
player.setAudioAttributes( player.setAudioAttributes(
AudioAttributes.Builder() AudioAttributes.Builder()
.setContentType(AudioAttributes.CONTENT_TYPE_SPEECH) .setContentType(AudioAttributes.CONTENT_TYPE_SPEECH)
.setUsage(AudioAttributes.USAGE_ASSISTANT) .setUsage(AudioAttributes.USAGE_MEDIA)
.build(), .build(),
) )
player.setOnPreparedListener { player.setOnPreparedListener {
@@ -724,6 +1055,74 @@ class TalkModeManager(
Log.d(tag, "play done") Log.d(tag, "play done")
} }
/**
* Download ElevenLabs audio to a temp file, then play from disk via MediaPlayer.
* Simpler and more reliable than streaming: avoids custom DataSource races and
* AudioTrack underrun issues on OxygenOS/OnePlus.
*/
private suspend fun streamAndPlayViaFile(voiceId: String, apiKey: String, request: ElevenLabsRequest) {
val tempFile = withContext(Dispatchers.IO) {
val file = File.createTempFile("tts_", ".mp3", context.cacheDir)
val conn = openTtsConnection(voiceId = voiceId, apiKey = apiKey, request = request)
try {
val payload = buildRequestPayload(request)
conn.outputStream.use { it.write(payload.toByteArray()) }
val code = conn.responseCode
if (code >= 400) {
val body = conn.errorStream?.readBytes()?.toString(Charsets.UTF_8) ?: ""
file.delete()
throw IllegalStateException("ElevenLabs failed: $code $body")
}
Log.d(tag, "elevenlabs http code=$code voiceId=$voiceId format=${request.outputFormat}")
// Manual loop so cancellation is honoured on every chunk.
// input.copyTo() is a single blocking call with no yield points; if the
// coroutine is cancelled mid-download the entire response would finish
// before cancellation was observed.
conn.inputStream.use { input ->
file.outputStream().use { out ->
val buf = ByteArray(8192)
var n: Int
while (input.read(buf).also { n = it } != -1) {
ensureActive()
out.write(buf, 0, n)
}
}
}
} catch (err: Throwable) {
file.delete()
throw err
} finally {
conn.disconnect()
}
file
}
try {
val player = MediaPlayer()
this.player = player
val finished = CompletableDeferred<Unit>()
player.setAudioAttributes(
AudioAttributes.Builder()
.setContentType(AudioAttributes.CONTENT_TYPE_SPEECH)
.setUsage(AudioAttributes.USAGE_MEDIA)
.build(),
)
player.setOnCompletionListener { finished.complete(Unit) }
player.setOnErrorListener { _, what, extra ->
finished.completeExceptionally(IllegalStateException("MediaPlayer error what=$what extra=$extra"))
true
}
player.setDataSource(tempFile.absolutePath)
withContext(Dispatchers.IO) { player.prepare() }
Log.d(tag, "file play start bytes=${tempFile.length()}")
player.start()
finished.await()
Log.d(tag, "file play done")
} finally {
try { cleanupPlayer() } catch (_: Throwable) {}
tempFile.delete()
}
}
private suspend fun streamAndPlayPcm( private suspend fun streamAndPlayPcm(
voiceId: String, voiceId: String,
apiKey: String, apiKey: String,
@@ -747,7 +1146,7 @@ class TalkModeManager(
AudioTrack( AudioTrack(
AudioAttributes.Builder() AudioAttributes.Builder()
.setContentType(AudioAttributes.CONTENT_TYPE_SPEECH) .setContentType(AudioAttributes.CONTENT_TYPE_SPEECH)
.setUsage(AudioAttributes.USAGE_ASSISTANT) .setUsage(AudioAttributes.USAGE_MEDIA)
.build(), .build(),
AudioFormat.Builder() AudioFormat.Builder()
.setSampleRate(sampleRate) .setSampleRate(sampleRate)
@@ -763,7 +1162,10 @@ class TalkModeManager(
throw IllegalStateException("AudioTrack init failed") throw IllegalStateException("AudioTrack init failed")
} }
pcmTrack = track pcmTrack = track
track.play() // Don't call track.play() yet — start the track only when the first audio
// chunk arrives from ElevenLabs (see streamPcm). OxygenOS/OnePlus kills an
// AudioTrack that underruns (no data written) for ~1+ seconds, causing
// write() to return 0. Deferring play() until first data avoids the underrun.
Log.d(tag, "pcm play start sampleRate=$sampleRate bufferSize=$bufferSize") Log.d(tag, "pcm play start sampleRate=$sampleRate bufferSize=$bufferSize")
try { try {
@@ -869,6 +1271,14 @@ class TalkModeManager(
} }
} }
/** Stop any active TTS immediately — call when user taps mic to barge in. */
fun stopTts() {
stopActiveStreamingTts()
stopSpeaking(resetInterrupt = true)
_isSpeaking.value = false
_statusText.value = "Listening"
}
private fun stopSpeaking(resetInterrupt: Boolean = true) { private fun stopSpeaking(resetInterrupt: Boolean = true) {
pcmStopRequested = true pcmStopRequested = true
if (!_isSpeaking.value) { if (!_isSpeaking.value) {
@@ -878,6 +1288,7 @@ class TalkModeManager(
systemTtsPending?.cancel() systemTtsPending?.cancel()
systemTtsPending = null systemTtsPending = null
systemTtsPendingId = null systemTtsPendingId = null
abandonAudioFocus()
return return
} }
if (resetInterrupt) { if (resetInterrupt) {
@@ -891,6 +1302,57 @@ class TalkModeManager(
systemTtsPending = null systemTtsPending = null
systemTtsPendingId = null systemTtsPendingId = null
_isSpeaking.value = false _isSpeaking.value = false
abandonAudioFocus()
}
private fun shouldAllowSpeechInterrupt(): Boolean {
return !finalizeInFlight
}
private fun clearListenWatchdog() {
listenWatchdogJob?.cancel()
listenWatchdogJob = null
}
private fun requestAudioFocusForTts(): Boolean {
val am = context.getSystemService(Context.AUDIO_SERVICE) as? AudioManager ?: return true
return if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
val req = AudioFocusRequest.Builder(AudioManager.AUDIOFOCUS_GAIN_TRANSIENT_MAY_DUCK)
.setAudioAttributes(
AudioAttributes.Builder()
.setUsage(AudioAttributes.USAGE_MEDIA)
.setContentType(AudioAttributes.CONTENT_TYPE_SPEECH)
.build()
)
.setOnAudioFocusChangeListener(audioFocusListener)
.build()
audioFocusRequest = req
val result = am.requestAudioFocus(req)
Log.d(tag, "audio focus request result=$result")
result == AudioManager.AUDIOFOCUS_REQUEST_GRANTED || result == AudioManager.AUDIOFOCUS_REQUEST_DELAYED
} else {
@Suppress("DEPRECATION")
val result = am.requestAudioFocus(
audioFocusListener,
AudioManager.STREAM_MUSIC,
AudioManager.AUDIOFOCUS_GAIN_TRANSIENT_MAY_DUCK,
)
result == AudioManager.AUDIOFOCUS_REQUEST_GRANTED
}
}
private fun abandonAudioFocus() {
val am = context.getSystemService(Context.AUDIO_SERVICE) as? AudioManager ?: return
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
audioFocusRequest?.let {
am.abandonAudioFocusRequest(it)
Log.d(tag, "audio focus abandoned")
}
audioFocusRequest = null
} else {
@Suppress("DEPRECATION")
am.abandonAudioFocus(audioFocusListener)
}
} }
private fun cleanupPlayer() { private fun cleanupPlayer() {
@@ -980,14 +1442,15 @@ class TalkModeManager(
defaultModelId = model ?: defaultModelIdFallback defaultModelId = model ?: defaultModelIdFallback
if (!modelOverrideActive) currentModelId = defaultModelId if (!modelOverrideActive) currentModelId = defaultModelId
defaultOutputFormat = outputFormat ?: defaultOutputFormatFallback defaultOutputFormat = outputFormat ?: defaultOutputFormatFallback
apiKey = apiKey = key ?: envKey?.takeIf { it.isNotEmpty() }
if (activeProvider == defaultTalkProvider) { Log.d(tag, "reloadConfig apiKey=${if (apiKey != null) "set" else "null"} voiceId=$defaultVoiceId")
key ?: envKey?.takeIf { it.isNotEmpty() }
} else {
null
}
if (interrupt != null) interruptOnSpeech = interrupt if (interrupt != null) interruptOnSpeech = interrupt
if (activeProvider != defaultTalkProvider) { activeProviderIsElevenLabs = activeProvider == defaultTalkProvider
if (!activeProviderIsElevenLabs) {
// Clear ElevenLabs credentials so playAssistant won't attempt ElevenLabs calls
apiKey = null
defaultVoiceId = null
if (!voiceOverrideActive) currentVoiceId = null
Log.w(tag, "talk provider $activeProvider unsupported; using system voice fallback") Log.w(tag, "talk provider $activeProvider unsupported; using system voice fallback")
} else if (selection?.normalizedPayload == true) { } else if (selection?.normalizedPayload == true) {
Log.d(tag, "talk config provider=elevenlabs") Log.d(tag, "talk config provider=elevenlabs")
@@ -1025,8 +1488,10 @@ class TalkModeManager(
conn.outputStream.use { it.write(payload.toByteArray()) } conn.outputStream.use { it.write(payload.toByteArray()) }
val code = conn.responseCode val code = conn.responseCode
Log.d(tag, "elevenlabs http code=$code voiceId=$voiceId format=${request.outputFormat} keyLen=${apiKey.length}")
if (code >= 400) { if (code >= 400) {
val message = conn.errorStream?.readBytes()?.toString(Charsets.UTF_8) ?: "" val message = conn.errorStream?.readBytes()?.toString(Charsets.UTF_8) ?: ""
Log.w(tag, "elevenlabs error code=$code voiceId=$voiceId body=$message")
sink.fail() sink.fail()
throw IllegalStateException("ElevenLabs failed: $code $message") throw IllegalStateException("ElevenLabs failed: $code $message")
} }
@@ -1068,12 +1533,21 @@ class TalkModeManager(
throw IllegalStateException("ElevenLabs failed: $code $message") throw IllegalStateException("ElevenLabs failed: $code $message")
} }
var totalBytesWritten = 0L
var trackStarted = false
val buffer = ByteArray(8 * 1024) val buffer = ByteArray(8 * 1024)
conn.inputStream.use { input -> conn.inputStream.use { input ->
while (true) { while (true) {
if (pcmStopRequested || isPlaybackCancelled(null, playbackToken)) return@withContext if (pcmStopRequested || isPlaybackCancelled(null, playbackToken)) return@withContext
val read = input.read(buffer) val read = input.read(buffer)
if (read <= 0) break if (read <= 0) break
// Start the AudioTrack only when the first chunk is ready — avoids
// the ~1.4s underrun window while ElevenLabs prepares audio.
// OxygenOS kills a track that underruns for >1s (write() returns 0).
if (!trackStarted) {
track.play()
trackStarted = true
}
var offset = 0 var offset = 0
while (offset < read) { while (offset < read) {
if (pcmStopRequested || isPlaybackCancelled(null, playbackToken)) return@withContext if (pcmStopRequested || isPlaybackCancelled(null, playbackToken)) return@withContext
@@ -1098,6 +1572,20 @@ class TalkModeManager(
} }
} }
private suspend fun waitForPcmDrain(track: AudioTrack, totalFrames: Long, sampleRate: Int) {
if (totalFrames <= 0) return
withContext(Dispatchers.IO) {
val drainDeadline = SystemClock.elapsedRealtime() + 15_000
while (!pcmStopRequested && SystemClock.elapsedRealtime() < drainDeadline) {
val played = track.playbackHeadPosition.toLong().and(0xFFFFFFFFL)
if (played >= totalFrames) break
val remainingFrames = totalFrames - played
val sleepMs = ((remainingFrames * 1000L) / sampleRate.toLong()).coerceIn(12L, 120L)
delay(sleepMs)
}
}
}
private fun openTtsConnection( private fun openTtsConnection(
voiceId: String, voiceId: String,
apiKey: String, apiKey: String,
@@ -1248,9 +1736,13 @@ class TalkModeManager(
} }
private fun ensureInterruptListener() { private fun ensureInterruptListener() {
if (!interruptOnSpeech || !_isEnabled.value) return if (!interruptOnSpeech || !_isEnabled.value || !shouldAllowSpeechInterrupt()) return
// Don't create a new recognizer when we just destroyed one for TTS (finalizeInFlight=true).
// Starting a new recognizer mid-TTS causes audio session conflict that kills AudioTrack
// writes (returns 0) and MediaPlayer on OxygenOS/OnePlus devices.
if (finalizeInFlight) return
mainHandler.post { mainHandler.post {
if (stopRequested) return@post if (stopRequested || finalizeInFlight) return@post
if (!SpeechRecognizer.isRecognitionAvailable(context)) return@post if (!SpeechRecognizer.isRecognitionAvailable(context)) return@post
try { try {
if (recognizer == null) { if (recognizer == null) {
@@ -1277,8 +1769,9 @@ class TalkModeManager(
val trimmed = preferred?.trim().orEmpty() val trimmed = preferred?.trim().orEmpty()
if (trimmed.isNotEmpty()) { if (trimmed.isNotEmpty()) {
val resolved = resolveVoiceAlias(trimmed) val resolved = resolveVoiceAlias(trimmed)
if (resolved != null) return resolved // If it resolves as an alias, use the alias target.
Log.w(tag, "unknown voice alias $trimmed") // Otherwise treat it as a direct voice ID (e.g. "21m00Tcm4TlvDq8ikWAM").
return resolved ?: trimmed
} }
fallbackVoiceId?.let { return it } fallbackVoiceId?.let { return it }
@@ -1354,7 +1847,12 @@ class TalkModeManager(
override fun onBufferReceived(buffer: ByteArray?) {} override fun onBufferReceived(buffer: ByteArray?) {}
override fun onEndOfSpeech() { override fun onEndOfSpeech() {
scheduleRestart() clearListenWatchdog()
// Don't restart while a transcript is being processed — the recognizer
// competing for audio resources kills AudioTrack PCM playback.
if (!finalizeInFlight) {
scheduleRestart()
}
} }
override fun onError(error: Int) { override fun onError(error: Int) {