mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-02 04:41:11 +00:00
feat(voice): add talk mode latency tracing
This commit is contained in:
@@ -82,6 +82,7 @@ final class TalkModeManager: NSObject {
|
||||
private var noiseFloorSamples: [Double] = []
|
||||
private var noiseFloor: Double?
|
||||
private var noiseFloorReady: Bool = false
|
||||
private var activeLatencyTrace: TalkTurnLatencyTrace?
|
||||
|
||||
private var chatSubscribedSessionKeys = Set<String>()
|
||||
private var incrementalSpeechQueue: [String] = []
|
||||
@@ -199,6 +200,7 @@ final class TalkModeManager: NSObject {
|
||||
self.stopRecognition()
|
||||
self.stopSpeaking()
|
||||
self.lastInterruptedAtSeconds = nil
|
||||
self.activeLatencyTrace = nil
|
||||
let pendingPTT = self.pttCompletion != nil
|
||||
let pendingCaptureId = self.activePTTCaptureId ?? UUID().uuidString
|
||||
self.pttTimeoutTask?.cancel()
|
||||
@@ -244,6 +246,7 @@ final class TalkModeManager: NSObject {
|
||||
self.stopRecognition()
|
||||
self.stopSpeaking()
|
||||
self.lastInterruptedAtSeconds = nil
|
||||
self.activeLatencyTrace = nil
|
||||
TalkSystemSpeechSynthesizer.shared.stop()
|
||||
|
||||
do {
|
||||
@@ -668,6 +671,7 @@ final class TalkModeManager: NSObject {
|
||||
self.lastTranscript = trimmed
|
||||
guard !trimmed.isEmpty else { return }
|
||||
GatewayDiagnostics.log("talk speech: final transcript chars=\(trimmed.count)")
|
||||
self.beginLatencyTrace(transcriptChars: trimmed.count)
|
||||
self.loggedPartialThisCycle = false
|
||||
if self.captureMode == .pushToTalk, self.pttAutoStopEnabled, self.isPushToTalkActive {
|
||||
_ = await self.endPushToTalk()
|
||||
@@ -735,6 +739,7 @@ final class TalkModeManager: NSObject {
|
||||
}
|
||||
|
||||
private func processTranscript(_ transcript: String, restartAfter: Bool) async {
|
||||
self.ensureLatencyTrace(transcriptChars: transcript.count)
|
||||
self.isListening = false
|
||||
self.captureMode = .idle
|
||||
self.statusText = "Thinking…"
|
||||
@@ -742,6 +747,10 @@ final class TalkModeManager: NSObject {
|
||||
self.lastHeard = nil
|
||||
self.stopRecognition()
|
||||
|
||||
self.markLatencyAnchorIfNeeded(
|
||||
\.processStartAt,
|
||||
stage: "process.start",
|
||||
fields: ["restartAfter=\(restartAfter)"])
|
||||
GatewayDiagnostics.log("talk: process transcript chars=\(transcript.count) restartAfter=\(restartAfter)")
|
||||
await self.reloadConfig()
|
||||
let prompt = self.buildPrompt(transcript: transcript)
|
||||
@@ -749,6 +758,7 @@ final class TalkModeManager: NSObject {
|
||||
self.statusText = "Gateway not connected"
|
||||
self.logger.warning("finalize: gateway not connected")
|
||||
GatewayDiagnostics.log("talk: abort gateway not connected")
|
||||
self.finishLatencyTrace(outcome: "gateway_offline")
|
||||
if restartAfter {
|
||||
await self.start()
|
||||
}
|
||||
@@ -759,10 +769,18 @@ final class TalkModeManager: NSObject {
|
||||
let startedAt = Date().timeIntervalSince1970
|
||||
let sessionKey = self.mainSessionKey
|
||||
await self.subscribeChatIfNeeded(sessionKey: sessionKey)
|
||||
self.markLatencyAnchorIfNeeded(
|
||||
\.chatSendStartAt,
|
||||
stage: "chat.send.start",
|
||||
fields: ["session=\(sessionKey)", "promptChars=\(prompt.count)"])
|
||||
self.logger.info(
|
||||
"chat.send start sessionKey=\(sessionKey, privacy: .public) chars=\(prompt.count, privacy: .public)")
|
||||
GatewayDiagnostics.log("talk: chat.send start sessionKey=\(sessionKey) chars=\(prompt.count)")
|
||||
let runId = try await self.sendChat(prompt, gateway: gateway)
|
||||
self.markLatencyAnchorIfNeeded(
|
||||
\.chatSendAckAt,
|
||||
stage: "chat.send.ack",
|
||||
fields: ["runId=\(runId)"])
|
||||
self.logger.info("chat.send ok runId=\(runId, privacy: .public)")
|
||||
GatewayDiagnostics.log("talk: chat.send ok runId=\(runId)")
|
||||
let shouldIncremental = self.shouldUseIncrementalTTS()
|
||||
@@ -779,10 +797,12 @@ final class TalkModeManager: NSObject {
|
||||
self.logger.warning(
|
||||
"chat completion timeout runId=\(runId, privacy: .public); attempting history fallback")
|
||||
GatewayDiagnostics.log("talk: chat completion timeout runId=\(runId)")
|
||||
self.logLatency(stage: "chat.completion.timeout", fields: ["runId=\(runId)"])
|
||||
} else if completion == .aborted {
|
||||
self.statusText = "Aborted"
|
||||
self.logger.warning("chat completion aborted runId=\(runId, privacy: .public)")
|
||||
GatewayDiagnostics.log("talk: chat completion aborted runId=\(runId)")
|
||||
self.finishLatencyTrace(outcome: "chat_aborted", fields: ["runId=\(runId)"])
|
||||
streamingTask?.cancel()
|
||||
await self.finishIncrementalSpeech()
|
||||
await self.start()
|
||||
@@ -791,6 +811,7 @@ final class TalkModeManager: NSObject {
|
||||
self.statusText = "Chat error"
|
||||
self.logger.warning("chat completion error runId=\(runId, privacy: .public)")
|
||||
GatewayDiagnostics.log("talk: chat completion error runId=\(runId)")
|
||||
self.finishLatencyTrace(outcome: "chat_error", fields: ["runId=\(runId)"])
|
||||
streamingTask?.cancel()
|
||||
await self.finishIncrementalSpeech()
|
||||
await self.start()
|
||||
@@ -811,6 +832,7 @@ final class TalkModeManager: NSObject {
|
||||
self.statusText = "No reply"
|
||||
self.logger.warning("assistant text timeout runId=\(runId, privacy: .public)")
|
||||
GatewayDiagnostics.log("talk: assistant text timeout runId=\(runId)")
|
||||
self.finishLatencyTrace(outcome: "assistant_timeout", fields: ["runId=\(runId)"])
|
||||
streamingTask?.cancel()
|
||||
await self.finishIncrementalSpeech()
|
||||
await self.start()
|
||||
@@ -824,10 +846,12 @@ final class TalkModeManager: NSObject {
|
||||
} else {
|
||||
await self.playAssistant(text: assistantText)
|
||||
}
|
||||
self.finishLatencyTrace(outcome: "spoken", fields: ["runId=\(runId)"])
|
||||
} catch {
|
||||
self.statusText = "Talk failed: \(error.localizedDescription)"
|
||||
self.logger.error("finalize failed: \(error.localizedDescription, privacy: .public)")
|
||||
GatewayDiagnostics.log("talk: failed error=\(error.localizedDescription)")
|
||||
self.finishLatencyTrace(outcome: "error", fields: ["source=process_transcript"])
|
||||
}
|
||||
|
||||
if restartAfter {
|
||||
@@ -977,6 +1001,10 @@ final class TalkModeManager: NSObject {
|
||||
self.statusText = "Generating voice…"
|
||||
self.isSpeaking = true
|
||||
self.lastSpokenText = cleaned
|
||||
self.markLatencyAnchorIfNeeded(
|
||||
\.firstSpeakAttemptAt,
|
||||
stage: "tts.segment.first_start",
|
||||
fields: ["chars=\(cleaned.count)", "mode=single"])
|
||||
|
||||
do {
|
||||
let started = Date()
|
||||
@@ -1001,6 +1029,10 @@ final class TalkModeManager: NSObject {
|
||||
|
||||
if canUseElevenLabs, let voiceId, let apiKey {
|
||||
GatewayDiagnostics.log("talk tts: provider=elevenlabs voiceId=\(voiceId)")
|
||||
self.markLatencyAnchorIfNeeded(
|
||||
\.firstTTSRequestAt,
|
||||
stage: "tts.request.first",
|
||||
fields: ["mode=single", "textChars=\(cleaned.count)"])
|
||||
let desiredOutputFormat = (directive?.outputFormat ?? self.defaultOutputFormat)?
|
||||
.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
let requestedOutputFormat = (desiredOutputFormat?.isEmpty == false) ? desiredOutputFormat : nil
|
||||
@@ -1033,7 +1065,11 @@ final class TalkModeManager: NSObject {
|
||||
let request = makeRequest(outputFormat: outputFormat)
|
||||
|
||||
let client = ElevenLabsTTSClient(apiKey: apiKey)
|
||||
let stream = client.streamSynthesize(voiceId: voiceId, request: request)
|
||||
let rawStream = client.streamSynthesize(voiceId: voiceId, request: request)
|
||||
let stream = self.instrumentTTSStreamFirstChunk(
|
||||
rawStream,
|
||||
mode: "single",
|
||||
textChars: cleaned.count)
|
||||
|
||||
if self.interruptOnSpeech {
|
||||
do {
|
||||
@@ -1049,22 +1085,37 @@ final class TalkModeManager: NSObject {
|
||||
let result: StreamingPlaybackResult
|
||||
if let sampleRate {
|
||||
self.lastPlaybackWasPCM = true
|
||||
self.markLatencyAnchorIfNeeded(
|
||||
\.firstPlaybackRequestAt,
|
||||
stage: "tts.playback.request.first",
|
||||
fields: ["player=pcm", "sampleRate=\(Int(sampleRate))", "mode=single"])
|
||||
var playback = await self.pcmPlayer.play(stream: stream, sampleRate: sampleRate)
|
||||
if !playback.finished, playback.interruptedAt == nil {
|
||||
let mp3Format = ElevenLabsTTSClient.validatedOutputFormat("mp3_44100")
|
||||
self.logger.warning("pcm playback failed; retrying mp3")
|
||||
self.lastPlaybackWasPCM = false
|
||||
let mp3Stream = client.streamSynthesize(
|
||||
let mp3RawStream = client.streamSynthesize(
|
||||
voiceId: voiceId,
|
||||
request: makeRequest(outputFormat: mp3Format))
|
||||
let mp3Stream = self.instrumentTTSStreamFirstChunk(
|
||||
mp3RawStream,
|
||||
mode: "single-mp3-retry",
|
||||
textChars: cleaned.count)
|
||||
playback = await self.mp3Player.play(stream: mp3Stream)
|
||||
}
|
||||
result = playback
|
||||
} else {
|
||||
self.lastPlaybackWasPCM = false
|
||||
self.markLatencyAnchorIfNeeded(
|
||||
\.firstPlaybackRequestAt,
|
||||
stage: "tts.playback.request.first",
|
||||
fields: ["player=mp3", "mode=single"])
|
||||
result = await self.mp3Player.play(stream: stream)
|
||||
}
|
||||
let duration = Date().timeIntervalSince(started)
|
||||
self.logLatency(
|
||||
stage: "tts.playback.finished",
|
||||
fields: ["mode=single", "finished=\(result.finished)", "durationMs=\(Int(duration * 1000))"])
|
||||
self.logger.info("elevenlabs stream finished=\(result.finished, privacy: .public) dur=\(duration, privacy: .public)s")
|
||||
if !result.finished, let interruptedAt = result.interruptedAt {
|
||||
self.lastInterruptedAtSeconds = interruptedAt
|
||||
@@ -1072,6 +1123,10 @@ final class TalkModeManager: NSObject {
|
||||
} else {
|
||||
self.logger.warning("tts unavailable; falling back to system voice (missing key or voiceId)")
|
||||
GatewayDiagnostics.log("talk tts: provider=system (missing key or voiceId)")
|
||||
self.markLatencyAnchorIfNeeded(
|
||||
\.firstSystemSpeakCallAt,
|
||||
stage: "tts.system.request.first",
|
||||
fields: ["reason=missing_key_or_voice", "textChars=\(cleaned.count)"])
|
||||
if self.interruptOnSpeech {
|
||||
do {
|
||||
try self.startRecognition()
|
||||
@@ -1081,7 +1136,15 @@ final class TalkModeManager: NSObject {
|
||||
}
|
||||
}
|
||||
self.statusText = "Speaking (System)…"
|
||||
try await TalkSystemSpeechSynthesizer.shared.speak(text: cleaned, language: language)
|
||||
try await TalkSystemSpeechSynthesizer.shared.speak(
|
||||
text: cleaned,
|
||||
language: language,
|
||||
onStart: { [weak self] in
|
||||
self?.markLatencyAnchorIfNeeded(
|
||||
\.firstSystemSpeechStartAt,
|
||||
stage: "tts.system.did_start",
|
||||
fields: ["mode=single"])
|
||||
})
|
||||
}
|
||||
} catch {
|
||||
self.logger.error(
|
||||
@@ -1098,7 +1161,19 @@ final class TalkModeManager: NSObject {
|
||||
}
|
||||
self.statusText = "Speaking (System)…"
|
||||
let language = ElevenLabsTTSClient.validatedLanguage(directive?.language)
|
||||
try await TalkSystemSpeechSynthesizer.shared.speak(text: cleaned, language: language)
|
||||
self.markLatencyAnchorIfNeeded(
|
||||
\.firstSystemSpeakCallAt,
|
||||
stage: "tts.system.request.first",
|
||||
fields: ["reason=elevenlabs_error", "textChars=\(cleaned.count)"])
|
||||
try await TalkSystemSpeechSynthesizer.shared.speak(
|
||||
text: cleaned,
|
||||
language: language,
|
||||
onStart: { [weak self] in
|
||||
self?.markLatencyAnchorIfNeeded(
|
||||
\.firstSystemSpeechStartAt,
|
||||
stage: "tts.system.did_start",
|
||||
fields: ["mode=single"])
|
||||
})
|
||||
} catch {
|
||||
self.statusText = "Speak failed: \(error.localizedDescription)"
|
||||
self.logger.error("system voice failed: \(error.localizedDescription, privacy: .public)")
|
||||
@@ -1117,6 +1192,8 @@ final class TalkModeManager: NSObject {
|
||||
let interruptedAt = self.lastPlaybackWasPCM
|
||||
? self.pcmPlayer.stop()
|
||||
: self.mp3Player.stop()
|
||||
let interruptedLabel = interruptedAt.map { String(format: "%.3f", $0) } ?? "nil"
|
||||
self.logLatency(stage: "tts.interrupt", fields: ["offsetSec=\(interruptedLabel)"])
|
||||
if storeInterruption {
|
||||
self.lastInterruptedAtSeconds = interruptedAt
|
||||
}
|
||||
@@ -1214,6 +1291,10 @@ final class TalkModeManager: NSObject {
|
||||
guard !trimmed.isEmpty else { return }
|
||||
self.incrementalSpeechQueue.append(trimmed)
|
||||
self.incrementalSpeechUsed = true
|
||||
self.markLatencyAnchorIfNeeded(
|
||||
\.assistantFirstSegmentQueuedAt,
|
||||
stage: "assistant.segment.first_queued",
|
||||
fields: ["chars=\(trimmed.count)"])
|
||||
if self.incrementalSpeechTask == nil {
|
||||
self.startIncrementalSpeechTask()
|
||||
}
|
||||
@@ -1240,6 +1321,10 @@ final class TalkModeManager: NSObject {
|
||||
while !Task.isCancelled {
|
||||
guard !self.incrementalSpeechQueue.isEmpty else { break }
|
||||
let segment = self.incrementalSpeechQueue.removeFirst()
|
||||
self.markLatencyAnchorIfNeeded(
|
||||
\.firstSpeakAttemptAt,
|
||||
stage: "tts.segment.first_start",
|
||||
fields: ["chars=\(segment.count)"])
|
||||
self.statusText = "Speaking…"
|
||||
self.isSpeaking = true
|
||||
self.lastSpokenText = segment
|
||||
@@ -1373,11 +1458,19 @@ final class TalkModeManager: NSObject {
|
||||
return nil
|
||||
}
|
||||
if let chunks = prefetch.chunks, !chunks.isEmpty {
|
||||
self.logLatency(
|
||||
stage: "tts.prefetch.hit",
|
||||
fields: ["segmentChars=\(segment.count)", "chunks=\(chunks.count)"])
|
||||
let prefetched = IncrementalPrefetchedAudio(chunks: chunks, outputFormat: prefetch.outputFormat)
|
||||
self.incrementalSpeechPrefetch = nil
|
||||
return prefetched
|
||||
}
|
||||
let waitStartedAt = Date()
|
||||
self.logLatency(stage: "tts.prefetch.wait", fields: ["segmentChars=\(segment.count)"])
|
||||
await prefetch.task.value
|
||||
self.logLatency(
|
||||
stage: "tts.prefetch.wait.done",
|
||||
fields: ["segmentChars=\(segment.count)", "waitMs=\(Self.elapsedMs(since: waitStartedAt))"])
|
||||
guard let completed = self.incrementalSpeechPrefetch else { return nil }
|
||||
guard completed.context == context, completed.segment == segment else { return nil }
|
||||
guard let chunks = completed.chunks, !chunks.isEmpty else { return nil }
|
||||
@@ -1435,6 +1528,10 @@ final class TalkModeManager: NSObject {
|
||||
}
|
||||
guard agentEvent.runId == runId, agentEvent.stream == "assistant" else { continue }
|
||||
guard let text = agentEvent.data["text"]?.value as? String else { continue }
|
||||
self.markLatencyAnchorIfNeeded(
|
||||
\.assistantFirstStreamTextAt,
|
||||
stage: "assistant.stream.first_text",
|
||||
fields: ["runId=\(runId)", "chars=\(text.count)"])
|
||||
let segments = self.incrementalSpeechBuffer.ingest(text: text, isFinal: false)
|
||||
if let lang = self.incrementalSpeechBuffer.directive?.language {
|
||||
self.incrementalSpeechLanguage = ElevenLabsTTSClient.validatedLanguage(lang)
|
||||
@@ -1549,53 +1646,94 @@ final class TalkModeManager: NSObject {
|
||||
} else {
|
||||
await self.updateIncrementalContextIfNeeded()
|
||||
guard let resolvedContext = self.incrementalSpeechContext else {
|
||||
self.markLatencyAnchorIfNeeded(
|
||||
\.firstSystemSpeakCallAt,
|
||||
stage: "tts.system.request.first",
|
||||
fields: ["reason=incremental_context_missing", "textChars=\(text.count)"])
|
||||
try? await TalkSystemSpeechSynthesizer.shared.speak(
|
||||
text: text,
|
||||
language: self.incrementalSpeechLanguage)
|
||||
language: self.incrementalSpeechLanguage,
|
||||
onStart: { [weak self] in
|
||||
self?.markLatencyAnchorIfNeeded(
|
||||
\.firstSystemSpeechStartAt,
|
||||
stage: "tts.system.did_start",
|
||||
fields: ["mode=incremental"])
|
||||
})
|
||||
return
|
||||
}
|
||||
context = resolvedContext
|
||||
}
|
||||
|
||||
guard context.canUseElevenLabs, let apiKey = context.apiKey, let voiceId = context.voiceId else {
|
||||
self.markLatencyAnchorIfNeeded(
|
||||
\.firstSystemSpeakCallAt,
|
||||
stage: "tts.system.request.first",
|
||||
fields: ["reason=incremental_context_unavailable", "textChars=\(text.count)"])
|
||||
try? await TalkSystemSpeechSynthesizer.shared.speak(
|
||||
text: text,
|
||||
language: self.incrementalSpeechLanguage)
|
||||
language: self.incrementalSpeechLanguage,
|
||||
onStart: { [weak self] in
|
||||
self?.markLatencyAnchorIfNeeded(
|
||||
\.firstSystemSpeechStartAt,
|
||||
stage: "tts.system.did_start",
|
||||
fields: ["mode=incremental"])
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
let client = ElevenLabsTTSClient(apiKey: apiKey)
|
||||
self.markLatencyAnchorIfNeeded(
|
||||
\.firstTTSRequestAt,
|
||||
stage: "tts.request.first",
|
||||
fields: ["mode=incremental", "textChars=\(text.count)"])
|
||||
let request = self.makeIncrementalTTSRequest(
|
||||
text: text,
|
||||
context: context,
|
||||
outputFormat: context.outputFormat)
|
||||
let stream: AsyncThrowingStream<Data, Error>
|
||||
if let prefetchedAudio, !prefetchedAudio.chunks.isEmpty {
|
||||
stream = Self.makeBufferedAudioStream(chunks: prefetchedAudio.chunks)
|
||||
stream = self.instrumentTTSStreamFirstChunk(
|
||||
Self.makeBufferedAudioStream(chunks: prefetchedAudio.chunks),
|
||||
mode: "incremental-prefetched",
|
||||
textChars: text.count)
|
||||
} else {
|
||||
stream = client.streamSynthesize(voiceId: voiceId, request: request)
|
||||
stream = self.instrumentTTSStreamFirstChunk(
|
||||
client.streamSynthesize(voiceId: voiceId, request: request),
|
||||
mode: "incremental-live",
|
||||
textChars: text.count)
|
||||
}
|
||||
let playbackFormat = prefetchedAudio?.outputFormat ?? context.outputFormat
|
||||
let sampleRate = TalkTTSValidation.pcmSampleRate(from: playbackFormat)
|
||||
let result: StreamingPlaybackResult
|
||||
if let sampleRate {
|
||||
self.lastPlaybackWasPCM = true
|
||||
self.markLatencyAnchorIfNeeded(
|
||||
\.firstPlaybackRequestAt,
|
||||
stage: "tts.playback.request.first",
|
||||
fields: ["player=pcm", "sampleRate=\(Int(sampleRate))", "mode=incremental"])
|
||||
var playback = await self.pcmPlayer.play(stream: stream, sampleRate: sampleRate)
|
||||
if !playback.finished, playback.interruptedAt == nil {
|
||||
self.logger.warning("pcm playback failed; retrying mp3")
|
||||
self.lastPlaybackWasPCM = false
|
||||
let mp3Format = ElevenLabsTTSClient.validatedOutputFormat("mp3_44100")
|
||||
let mp3Stream = client.streamSynthesize(
|
||||
voiceId: voiceId,
|
||||
request: self.makeIncrementalTTSRequest(
|
||||
text: text,
|
||||
context: context,
|
||||
outputFormat: mp3Format))
|
||||
let mp3Stream = self.instrumentTTSStreamFirstChunk(
|
||||
client.streamSynthesize(
|
||||
voiceId: voiceId,
|
||||
request: self.makeIncrementalTTSRequest(
|
||||
text: text,
|
||||
context: context,
|
||||
outputFormat: mp3Format)),
|
||||
mode: "incremental-mp3-retry",
|
||||
textChars: text.count)
|
||||
playback = await self.mp3Player.play(stream: mp3Stream)
|
||||
}
|
||||
result = playback
|
||||
} else {
|
||||
self.lastPlaybackWasPCM = false
|
||||
self.markLatencyAnchorIfNeeded(
|
||||
\.firstPlaybackRequestAt,
|
||||
stage: "tts.playback.request.first",
|
||||
fields: ["player=mp3", "mode=incremental"])
|
||||
result = await self.mp3Player.play(stream: stream)
|
||||
}
|
||||
if !result.finished, let interruptedAt = result.interruptedAt {
|
||||
@@ -1606,6 +1744,8 @@ final class TalkModeManager: NSObject {
|
||||
}
|
||||
|
||||
private struct IncrementalSpeechBuffer {
|
||||
private static let softBoundaryMinChars = 72
|
||||
|
||||
private(set) var latestText: String = ""
|
||||
private(set) var directive: TalkDirective?
|
||||
private var spokenOffset: Int = 0
|
||||
@@ -1698,8 +1838,9 @@ private struct IncrementalSpeechBuffer {
|
||||
}
|
||||
|
||||
if !inCodeBlock {
|
||||
buffer.append(chars[idx])
|
||||
if Self.isBoundary(chars[idx]) {
|
||||
let currentChar = chars[idx]
|
||||
buffer.append(currentChar)
|
||||
if Self.isBoundary(currentChar) || Self.isSoftBoundary(currentChar, bufferedChars: buffer.count) {
|
||||
lastBoundary = idx + 1
|
||||
bufferAtBoundary = buffer
|
||||
inCodeBlockAtBoundary = inCodeBlock
|
||||
@@ -1726,6 +1867,10 @@ private struct IncrementalSpeechBuffer {
|
||||
private static func isBoundary(_ ch: Character) -> Bool {
|
||||
ch == "." || ch == "!" || ch == "?" || ch == "\n"
|
||||
}
|
||||
|
||||
private static func isSoftBoundary(_ ch: Character, bufferedChars: Int) -> Bool {
|
||||
bufferedChars >= Self.softBoundaryMinChars && ch.isWhitespace
|
||||
}
|
||||
}
|
||||
|
||||
extension TalkModeManager {
|
||||
@@ -1998,6 +2143,114 @@ extension TalkModeManager {
|
||||
}
|
||||
}
|
||||
|
||||
private func beginLatencyTrace(transcriptChars: Int) {
|
||||
let now = Date()
|
||||
self.activeLatencyTrace = TalkTurnLatencyTrace(
|
||||
id: String(UUID().uuidString.prefix(8)).lowercased(),
|
||||
speechFinalAt: now)
|
||||
self.logLatency(
|
||||
stage: "speech.final",
|
||||
at: now,
|
||||
fields: ["transcriptChars=\(transcriptChars)"])
|
||||
}
|
||||
|
||||
private func ensureLatencyTrace(transcriptChars: Int) {
|
||||
guard self.activeLatencyTrace == nil else { return }
|
||||
let now = Date()
|
||||
self.activeLatencyTrace = TalkTurnLatencyTrace(
|
||||
id: String(UUID().uuidString.prefix(8)).lowercased(),
|
||||
speechFinalAt: now)
|
||||
self.logLatency(
|
||||
stage: "speech.final.synthetic",
|
||||
at: now,
|
||||
fields: ["transcriptChars=\(transcriptChars)"])
|
||||
}
|
||||
|
||||
private func finishLatencyTrace(outcome: String, fields: [String] = []) {
|
||||
guard self.activeLatencyTrace != nil else { return }
|
||||
self.logLatency(stage: "turn.complete", fields: ["outcome=\(outcome)"] + fields)
|
||||
self.activeLatencyTrace = nil
|
||||
}
|
||||
|
||||
private func markLatencyAnchorIfNeeded(
|
||||
_ keyPath: WritableKeyPath<TalkTurnLatencyTrace, Date?>,
|
||||
stage: String,
|
||||
at timestamp: Date = Date(),
|
||||
fields: [String] = []
|
||||
) {
|
||||
guard var trace = self.activeLatencyTrace else { return }
|
||||
guard trace[keyPath: keyPath] == nil else { return }
|
||||
trace[keyPath: keyPath] = timestamp
|
||||
self.activeLatencyTrace = trace
|
||||
self.logLatency(stage: stage, at: timestamp, fields: fields)
|
||||
}
|
||||
|
||||
private func logLatency(stage: String, at timestamp: Date = Date(), fields: [String] = []) {
|
||||
guard let trace = self.activeLatencyTrace else { return }
|
||||
var line: [String] = [
|
||||
"trace=\(trace.id)",
|
||||
"stage=\(stage)",
|
||||
"sinceSpeechFinalMs=\(Self.elapsedMs(since: trace.speechFinalAt, now: timestamp))",
|
||||
]
|
||||
if let processStartAt = trace.processStartAt {
|
||||
line.append("sinceProcessMs=\(Self.elapsedMs(since: processStartAt, now: timestamp))")
|
||||
}
|
||||
if let chatSendStartAt = trace.chatSendStartAt {
|
||||
line.append("sinceChatSendMs=\(Self.elapsedMs(since: chatSendStartAt, now: timestamp))")
|
||||
}
|
||||
if let assistantFirstStreamTextAt = trace.assistantFirstStreamTextAt {
|
||||
line.append("sinceAssistantStreamMs=\(Self.elapsedMs(since: assistantFirstStreamTextAt, now: timestamp))")
|
||||
}
|
||||
if let firstSpeakAttemptAt = trace.firstSpeakAttemptAt {
|
||||
line.append("sinceFirstSpeakAttemptMs=\(Self.elapsedMs(since: firstSpeakAttemptAt, now: timestamp))")
|
||||
}
|
||||
line.append(contentsOf: fields)
|
||||
GatewayDiagnostics.log("talk latency: \(line.joined(separator: " "))")
|
||||
}
|
||||
|
||||
private func instrumentTTSStreamFirstChunk(
|
||||
_ stream: AsyncThrowingStream<Data, Error>,
|
||||
mode: String,
|
||||
textChars: Int
|
||||
) -> AsyncThrowingStream<Data, Error>
|
||||
{
|
||||
AsyncThrowingStream { continuation in
|
||||
let relay = Task {
|
||||
var sawFirstChunk = false
|
||||
do {
|
||||
for try await chunk in stream {
|
||||
if !sawFirstChunk {
|
||||
sawFirstChunk = true
|
||||
await MainActor.run { [weak self] in
|
||||
self?.markLatencyAnchorIfNeeded(
|
||||
\.firstTTSChunkAt,
|
||||
stage: "tts.chunk.first",
|
||||
fields: [
|
||||
"mode=\(mode)",
|
||||
"bytes=\(chunk.count)",
|
||||
"textChars=\(textChars)",
|
||||
])
|
||||
}
|
||||
}
|
||||
continuation.yield(chunk)
|
||||
}
|
||||
continuation.finish()
|
||||
} catch is CancellationError {
|
||||
continuation.finish()
|
||||
} catch {
|
||||
continuation.finish(throwing: error)
|
||||
}
|
||||
}
|
||||
continuation.onTermination = { _ in
|
||||
relay.cancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static func elapsedMs(since startedAt: Date, now: Date = Date()) -> Int {
|
||||
max(0, Int((now.timeIntervalSince(startedAt) * 1000).rounded()))
|
||||
}
|
||||
|
||||
static func configureAudioSession() throws {
|
||||
let session = AVAudioSession.sharedInstance()
|
||||
// Prefer `.spokenAudio` for STT; it tends to preserve speech energy better than `.voiceChat`.
|
||||
@@ -2150,4 +2403,20 @@ private struct IncrementalPrefetchedAudio {
|
||||
let outputFormat: String?
|
||||
}
|
||||
|
||||
private struct TalkTurnLatencyTrace {
|
||||
let id: String
|
||||
let speechFinalAt: Date
|
||||
var processStartAt: Date?
|
||||
var chatSendStartAt: Date?
|
||||
var chatSendAckAt: Date?
|
||||
var assistantFirstStreamTextAt: Date?
|
||||
var assistantFirstSegmentQueuedAt: Date?
|
||||
var firstSpeakAttemptAt: Date?
|
||||
var firstTTSRequestAt: Date?
|
||||
var firstTTSChunkAt: Date?
|
||||
var firstPlaybackRequestAt: Date?
|
||||
var firstSystemSpeakCallAt: Date?
|
||||
var firstSystemSpeechStartAt: Date?
|
||||
}
|
||||
|
||||
// swiftlint:enable type_body_length file_length
|
||||
|
||||
28
apps/ios/Tests/TalkModeIncrementalSpeechBufferTests.swift
Normal file
28
apps/ios/Tests/TalkModeIncrementalSpeechBufferTests.swift
Normal file
@@ -0,0 +1,28 @@
|
||||
import Testing
|
||||
@testable import OpenClaw
|
||||
|
||||
@MainActor
|
||||
@Suite struct TalkModeIncrementalSpeechBufferTests {
|
||||
@Test func emitsSoftBoundaryBeforeTerminalPunctuation() {
|
||||
let manager = TalkModeManager(allowSimulatorCapture: true)
|
||||
manager._test_incrementalReset()
|
||||
|
||||
let partial =
|
||||
"We start speaking earlier by splitting this long stream chunk at a whitespace boundary before punctuation arrives"
|
||||
let segments = manager._test_incrementalIngest(partial, isFinal: false)
|
||||
|
||||
#expect(segments.count == 1)
|
||||
#expect(segments[0].count >= 72)
|
||||
#expect(segments[0].count < partial.count)
|
||||
}
|
||||
|
||||
@Test func keepsShortChunkBufferedWithoutPunctuation() {
|
||||
let manager = TalkModeManager(allowSimulatorCapture: true)
|
||||
manager._test_incrementalReset()
|
||||
|
||||
let short = "short chunk without punctuation"
|
||||
let segments = manager._test_incrementalIngest(short, isFinal: false)
|
||||
|
||||
#expect(segments.isEmpty)
|
||||
}
|
||||
}
|
||||
@@ -12,6 +12,7 @@ public final class TalkSystemSpeechSynthesizer: NSObject {
|
||||
private let synth = AVSpeechSynthesizer()
|
||||
private var speakContinuation: CheckedContinuation<Void, Error>?
|
||||
private var currentUtterance: AVSpeechUtterance?
|
||||
private var didStartCallback: (() -> Void)?
|
||||
private var currentToken = UUID()
|
||||
private var watchdog: Task<Void, Never>?
|
||||
|
||||
@@ -26,17 +27,23 @@ public final class TalkSystemSpeechSynthesizer: NSObject {
|
||||
self.currentToken = UUID()
|
||||
self.watchdog?.cancel()
|
||||
self.watchdog = nil
|
||||
self.didStartCallback = nil
|
||||
self.synth.stopSpeaking(at: .immediate)
|
||||
self.finishCurrent(with: SpeakError.canceled)
|
||||
}
|
||||
|
||||
public func speak(text: String, language: String? = nil) async throws {
|
||||
public func speak(
|
||||
text: String,
|
||||
language: String? = nil,
|
||||
onStart: (() -> Void)? = nil
|
||||
) async throws {
|
||||
let trimmed = text.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
guard !trimmed.isEmpty else { return }
|
||||
|
||||
self.stop()
|
||||
let token = UUID()
|
||||
self.currentToken = token
|
||||
self.didStartCallback = onStart
|
||||
|
||||
let utterance = AVSpeechUtterance(string: trimmed)
|
||||
if let language, let voice = AVSpeechSynthesisVoice(language: language) {
|
||||
@@ -85,6 +92,7 @@ public final class TalkSystemSpeechSynthesizer: NSObject {
|
||||
|
||||
private func finishCurrent(with error: Error?) {
|
||||
self.currentUtterance = nil
|
||||
self.didStartCallback = nil
|
||||
let cont = self.speakContinuation
|
||||
self.speakContinuation = nil
|
||||
if let error {
|
||||
@@ -96,6 +104,17 @@ public final class TalkSystemSpeechSynthesizer: NSObject {
|
||||
}
|
||||
|
||||
extension TalkSystemSpeechSynthesizer: AVSpeechSynthesizerDelegate {
|
||||
public nonisolated func speechSynthesizer(
|
||||
_ synthesizer: AVSpeechSynthesizer,
|
||||
didStart utterance: AVSpeechUtterance)
|
||||
{
|
||||
Task { @MainActor in
|
||||
let callback = self.didStartCallback
|
||||
self.didStartCallback = nil
|
||||
callback?()
|
||||
}
|
||||
}
|
||||
|
||||
public nonisolated func speechSynthesizer(
|
||||
_ synthesizer: AVSpeechSynthesizer,
|
||||
didFinish utterance: AVSpeechUtterance)
|
||||
|
||||
Reference in New Issue
Block a user