fix(android): stop operator chat subscription

This commit is contained in:
Ayaan Zaidi
2026-05-24 18:15:53 +05:30
parent a72b11d29a
commit 5c15859759
4 changed files with 8 additions and 47 deletions

View File

@@ -490,7 +490,6 @@ class NodeRuntime(
scope = scope,
session = operatorSession,
json = json,
supportsChatSubscribe = false,
).also {
it.applyMainSessionKey(_mainSessionKey.value)
}
@@ -502,7 +501,6 @@ class NodeRuntime(
context = appContext,
scope = scope,
session = operatorSession,
supportsChatSubscribe = false,
isConnected = { operatorConnected },
onBeforeSpeak = { micCapture.pauseForTts() },
onAfterSpeak = { micCapture.resumeAfterTts() },
@@ -610,7 +608,6 @@ class NodeRuntime(
context = appContext,
scope = scope,
session = operatorSession,
supportsChatSubscribe = true,
isConnected = { operatorConnected },
onBeforeSpeak = { micCapture.pauseForTts() },
onAfterSpeak = { micCapture.resumeAfterTts() },
@@ -1150,7 +1147,7 @@ class NodeRuntime(
NodeForegroundService.setVoiceCaptureMode(appContext, VoiceCaptureMode.TalkMode)
talkMode.ttsOnAllResponses = true
talkMode.setPlaybackEnabled(speakerEnabled.value)
talkMode.ensureChatSubscribed()
talkMode.refreshConfig()
externalAudioCaptureActive.value = true
}
@@ -1222,7 +1219,7 @@ class NodeRuntime(
}
// Tapping mic on interrupts any active TTS (barge-in).
stopVoicePlayback()
scope.launch { talkMode.ensureChatSubscribed() }
scope.launch { talkMode.refreshConfig() }
micCapture.setMicEnabled(true)
externalAudioCaptureActive.value = true
}
@@ -1235,7 +1232,7 @@ class NodeRuntime(
NodeForegroundService.setVoiceCaptureMode(appContext, VoiceCaptureMode.TalkMode)
talkMode.ttsOnAllResponses = true
talkMode.setPlaybackEnabled(speakerEnabled.value)
scope.launch { talkMode.ensureChatSubscribed() }
scope.launch { talkMode.refreshConfig() }
talkMode.setEnabled(true)
externalAudioCaptureActive.value = true
}

View File

@@ -22,7 +22,6 @@ class ChatController(
private val scope: CoroutineScope,
private val session: GatewaySession,
private val json: Json,
private val supportsChatSubscribe: Boolean,
) {
private var appliedMainSessionKey = "main"
private val _sessionKey = MutableStateFlow("main")
@@ -302,10 +301,6 @@ class ChatController(
val key = _sessionKey.value
try {
if (supportsChatSubscribe) {
session.sendNodeEvent("chat.subscribe", """{"sessionKey":"$key"}""")
}
val historyJson = session.request("chat.history", """{"sessionKey":"$key"}""")
val history = parseHistory(historyJson, sessionKey = key, previousMessages = _messages.value)
_messages.value = mergeOptimisticMessages(incoming = history.messages, optimistic = optimisticMessagesByRunId.values)

View File

@@ -91,7 +91,6 @@ class TalkModeManager internal constructor(
private val context: Context,
private val scope: CoroutineScope,
private val session: GatewaySession,
private val supportsChatSubscribe: Boolean,
private val isConnected: () -> Boolean,
private val onBeforeSpeak: suspend () -> Unit = {},
private val onAfterSpeak: suspend () -> Unit = {},
@@ -104,8 +103,7 @@ class TalkModeManager internal constructor(
private const val realtimeSampleRateHz = 24_000
private const val realtimeAudioFrameMs = 100
private const val listenWatchdogMs = 12_000L
private const val chatFinalWaitWithSubscribeMs = 45_000L
private const val chatFinalWaitWithoutSubscribeMs = 6_000L
private const val chatFinalWaitMs = 45_000L
private const val maxCachedRunCompletions = 128
private const val maxConversationEntries = 40
private const val realtimePlaybackBufferMs = 240
@@ -158,7 +156,6 @@ class TalkModeManager internal constructor(
private val completedRunsLock = Any()
private val completedRunStates = LinkedHashMap<String, Boolean>()
private val completedRunTexts = LinkedHashMap<String, String>()
private var chatSubscribedSessionKey: String? = null
private var configLoaded = false
private var executionMode = TalkModeExecutionMode.Native
private val startGeneration = AtomicLong(0L)
@@ -216,11 +213,6 @@ class TalkModeManager internal constructor(
}
}
suspend fun ensureChatSubscribed() {
reloadConfig()
subscribeChatIfNeeded(session = session, sessionKey = mainSessionKey.ifBlank { "main" })
}
fun setMainSessionKey(sessionKey: String?) {
val trimmed = sessionKey?.trim().orEmpty()
if (trimmed.isEmpty()) return
@@ -372,7 +364,6 @@ class TalkModeManager internal constructor(
scope.launch {
try {
reloadConfig()
subscribeChatIfNeeded(session = session, sessionKey = mainSessionKey.ifBlank { "main" })
val startedAt = System.currentTimeMillis().toDouble() / 1000.0
val prompt = buildPrompt(command)
val runId = sendChat(prompt, session)
@@ -590,7 +581,6 @@ class TalkModeManager internal constructor(
_statusText.value = "Off"
stopRealtimeRelay()
stopSpeaking()
chatSubscribedSessionKey = null
pendingRunId = null
pendingFinal?.cancel()
pendingFinal = null
@@ -1590,7 +1580,6 @@ class TalkModeManager internal constructor(
try {
val startedAt = System.currentTimeMillis().toDouble() / 1000.0
subscribeChatIfNeeded(session = session, sessionKey = mainSessionKey)
Log.d(tag, "chat.send start sessionKey=${mainSessionKey.ifBlank { "main" }} chars=${prompt.length}")
val runId = sendChat(prompt, session)
Log.d(tag, "chat.send ok runId=$runId")
@@ -1649,23 +1638,6 @@ class TalkModeManager internal constructor(
return payload
}
private suspend fun subscribeChatIfNeeded(
session: GatewaySession,
sessionKey: String,
) {
if (!supportsChatSubscribe) return
val key = sessionKey.trim()
if (key.isEmpty()) return
if (chatSubscribedSessionKey == key) return
val sent = session.sendNodeEvent("chat.subscribe", """{"sessionKey":"$key"}""")
if (sent) {
chatSubscribedSessionKey = key
Log.d(tag, "chat.subscribe ok sessionKey=$key")
} else {
Log.w(tag, "chat.subscribe failed sessionKey=$key")
}
}
private fun buildPrompt(transcript: String): String {
val lines =
mutableListOf(
@@ -1719,10 +1691,9 @@ class TalkModeManager internal constructor(
consumeRunCompletion(runId)?.let { return it }
val timeoutMs = if (supportsChatSubscribe) chatFinalWaitWithSubscribeMs else chatFinalWaitWithoutSubscribeMs
val result =
try {
withTimeout(timeoutMs) { deferred.await() }
withTimeout(chatFinalWaitMs) { deferred.await() }
} catch (_: TimeoutCancellationException) {
false
}

View File

@@ -418,22 +418,21 @@ class TalkModeManagerTest {
@Test
@OptIn(ExperimentalCoroutinesApi::class)
fun chatFinalWaitWithoutSubscribeUsesShortTimeout() =
fun chatFinalWaitUsesGatewayEventTimeout() =
runTest {
val manager = createManager(scope = this, supportsChatSubscribe = false)
val manager = createManager(scope = this)
setPrivateField(manager, "pendingRunId", "run-missing-final")
setPrivateField(manager, "pendingFinal", CompletableDeferred<Boolean>())
assertFalse(manager.waitForChatFinal("run-missing-final"))
assertEquals(6_000, currentTime)
assertEquals(45_000, currentTime)
}
private fun createManager(
talkSpeakClient: TalkSpeechSynthesizing = TalkSpeakClient(),
talkAudioPlayer: TalkAudioPlaying? = null,
scope: CoroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default),
supportsChatSubscribe: Boolean = false,
isConnected: () -> Boolean = { true },
onStoppedByRelay: () -> Unit = {},
): TalkModeManager {
@@ -452,7 +451,6 @@ class TalkModeManagerTest {
context = app,
scope = scope,
session = session,
supportsChatSubscribe = supportsChatSubscribe,
isConnected = isConnected,
onStoppedByRelay = onStoppedByRelay,
talkSpeakClient = talkSpeakClient,