mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-28 15:27:14 +00:00
Merged via squash.
Prepared head SHA: 3f5aedb265
Co-authored-by: ngutman <1540134+ngutman@users.noreply.github.com>
Co-authored-by: ngutman <1540134+ngutman@users.noreply.github.com>
Reviewed-by: @ngutman
995 lines
44 KiB
Swift
995 lines
44 KiB
Swift
import AVFAudio
|
|
import Foundation
|
|
import OpenClawChatUI
|
|
import OpenClawKit
|
|
import OpenClawProtocol
|
|
import OSLog
|
|
@preconcurrency import WebRTC
|
|
|
|
@MainActor
|
|
protocol TalkRealtimeWebRTCSessionDelegate: AnyObject {
|
|
func realtimeSession(_ session: TalkRealtimeWebRTCSession, didChangeStatus status: String)
|
|
func realtimeSession(_ session: TalkRealtimeWebRTCSession, didDetectInputSpeech active: Bool)
|
|
func realtimeSession(_ session: TalkRealtimeWebRTCSession, didReceiveUserTranscript text: String)
|
|
func realtimeSession(_ session: TalkRealtimeWebRTCSession, didReceiveAssistantTranscript text: String)
|
|
func realtimeSessionDidFinish(_ session: TalkRealtimeWebRTCSession)
|
|
}
|
|
|
|
@MainActor
|
|
final class TalkRealtimeWebRTCSession: NSObject {
|
|
private static let logger = Logger(subsystem: "ai.openclaw", category: "TalkRealtimeWebRTC")
|
|
private static let consultToolName = "openclaw_agent_consult"
|
|
private static let controlToolName = "openclaw_agent_control"
|
|
private static let defaultOfferURL = "https://api.openai.com/v1/realtime/calls"
|
|
private static let mediaStreamID = "openclaw-ios-realtime"
|
|
private static let audioTrackID = "openclaw-ios-audio"
|
|
private static let dataChannelLabel = "oai-events"
|
|
private static let toolCallTimeoutSeconds = 12
|
|
private static let toolResultTimeoutSeconds = 45
|
|
private static let agentWaitSliceSeconds = 3
|
|
private static let agentWaitRequestGraceSeconds = 15
|
|
private static let historyFallbackTimeoutSeconds = 5
|
|
private static let stillWorkingDelaySeconds = 6
|
|
private static let assistantPlaybackDrainGraceSeconds = 1.8
|
|
|
|
private let gateway: GatewayNodeSession
|
|
private let sessionKey: String
|
|
private weak var delegate: TalkRealtimeWebRTCSessionDelegate?
|
|
|
|
private var factory: RTCPeerConnectionFactory?
|
|
private var peerConnection: RTCPeerConnection?
|
|
private var dataChannel: RTCDataChannel?
|
|
private var session: TalkRealtimeClientSession?
|
|
private var toolBuffers: [String: ToolBuffer] = [:]
|
|
private var activeToolTasks: [String: Task<Void, Never>] = [:]
|
|
private var activeToolRunIds: [String: String] = [:]
|
|
private var stopped = false
|
|
private var timelineStartedAt = ProcessInfo.processInfo.systemUptime
|
|
private var seenRealtimeEventTypes: Set<String> = []
|
|
private var loggedFirstServerSpeech = false
|
|
private var loggedFirstAssistantSignal = false
|
|
private var assistantAudioActive = false
|
|
private var assistantAudioFinishTask: Task<Void, Never>?
|
|
|
|
private struct ToolBuffer {
|
|
var name: String
|
|
var callId: String
|
|
var args: String
|
|
}
|
|
|
|
private struct AgentWaitResponse: Decodable {
|
|
let runId: String?
|
|
let status: String?
|
|
let startedAt: Double?
|
|
let endedAt: Double?
|
|
let error: String?
|
|
let stopReason: String?
|
|
let timeoutPhase: String?
|
|
let providerStarted: Bool?
|
|
}
|
|
|
|
init(gateway: GatewayNodeSession, sessionKey: String, delegate: TalkRealtimeWebRTCSessionDelegate) {
|
|
self.gateway = gateway
|
|
self.sessionKey = sessionKey
|
|
self.delegate = delegate
|
|
super.init()
|
|
}
|
|
|
|
func start(
|
|
provider: String?,
|
|
model: String?,
|
|
voice: String?,
|
|
prefetchedSession: TalkRealtimeClientSession? = nil) async throws
|
|
{
|
|
self.timelineStartedAt = ProcessInfo.processInfo.systemUptime
|
|
self.seenRealtimeEventTypes.removeAll()
|
|
self.loggedFirstServerSpeech = false
|
|
self.loggedFirstAssistantSignal = false
|
|
self.assistantAudioActive = false
|
|
self.assistantAudioFinishTask?.cancel()
|
|
self.assistantAudioFinishTask = nil
|
|
self.stopped = false
|
|
self.trace(
|
|
"start provider=\(provider ?? "default") model=\(model ?? "default") "
|
|
+ "voice=\(voice ?? "default") sessionKey=\(self.sessionKey)")
|
|
self.delegate?.realtimeSession(self, didChangeStatus: "Connecting")
|
|
let session: TalkRealtimeClientSession
|
|
if let prefetchedSession {
|
|
self.trace(
|
|
"gateway talk.client.create skipped prefetched provider=\(prefetchedSession.provider) "
|
|
+ "transport=\(prefetchedSession.transport) model=\(prefetchedSession.model ?? "unknown") "
|
|
+ "voice=\(prefetchedSession.voice ?? "unknown")")
|
|
session = prefetchedSession
|
|
} else {
|
|
session = try await self.createClientSession(provider: provider, model: model, voice: voice)
|
|
}
|
|
let sessionModel = session.model ?? "unknown"
|
|
let sessionVoice = session.voice ?? "unknown"
|
|
Self.logger.info(
|
|
"realtime session provider=\(session.provider, privacy: .public) model=\(sessionModel, privacy: .public)")
|
|
Self.logger.info(
|
|
"realtime session voice=\(sessionVoice, privacy: .public) transport=\(session.transport, privacy: .public)")
|
|
try self.checkNotStopped()
|
|
guard session.isWebRTC else {
|
|
throw NSError(domain: "TalkRealtimeWebRTC", code: 1, userInfo: [
|
|
NSLocalizedDescriptionKey: "Realtime provider returned unsupported transport \(session.transport)",
|
|
])
|
|
}
|
|
self.session = session
|
|
|
|
self.trace("configure audio session start")
|
|
try Self.configureAudioSession()
|
|
self.trace("configure audio session done")
|
|
RTCInitializeSSL()
|
|
let factory = RTCPeerConnectionFactory(
|
|
encoderFactory: RTCDefaultVideoEncoderFactory(),
|
|
decoderFactory: RTCDefaultVideoDecoderFactory())
|
|
self.factory = factory
|
|
|
|
let config = RTCConfiguration()
|
|
config.sdpSemantics = .unifiedPlan
|
|
config.continualGatheringPolicy = .gatherContinually
|
|
let constraints = RTCMediaConstraints(mandatoryConstraints: nil, optionalConstraints: nil)
|
|
guard let peer = factory.peerConnection(with: config, constraints: constraints, delegate: self) else {
|
|
throw NSError(domain: "TalkRealtimeWebRTC", code: 2, userInfo: [
|
|
NSLocalizedDescriptionKey: "Failed to create WebRTC peer connection",
|
|
])
|
|
}
|
|
self.peerConnection = peer
|
|
|
|
let audioSource = factory.audioSource(with: constraints)
|
|
let audioTrack = factory.audioTrack(with: audioSource, trackId: Self.audioTrackID)
|
|
peer.add(audioTrack, streamIds: [Self.mediaStreamID])
|
|
|
|
let channelConfig = RTCDataChannelConfiguration()
|
|
let channel = peer.dataChannel(forLabel: Self.dataChannelLabel, configuration: channelConfig)
|
|
channel?.delegate = self
|
|
self.dataChannel = channel
|
|
|
|
let offer = try await createOffer(peer: peer)
|
|
self.trace("local offer created sdpBytes=\(offer.sdp.utf8.count)")
|
|
try self.checkNotStopped()
|
|
try await self.setLocalDescription(offer, peer: peer)
|
|
self.trace("local description set")
|
|
try self.checkNotStopped()
|
|
let answerSDP = try await exchangeOffer(offer.sdp, session: session)
|
|
self.trace("remote answer received sdpBytes=\(answerSDP.utf8.count)")
|
|
try self.checkNotStopped()
|
|
let answer = RTCSessionDescription(type: .answer, sdp: answerSDP)
|
|
try await setRemoteDescription(answer, peer: peer)
|
|
self.trace("remote description set")
|
|
try self.checkNotStopped()
|
|
self.delegate?.realtimeSession(self, didChangeStatus: "Listening")
|
|
}
|
|
|
|
func stop() {
|
|
let shouldNotify = !self.stopped
|
|
self.stopped = true
|
|
self.cancelActiveToolCalls()
|
|
self.toolBuffers.removeAll()
|
|
self.dataChannel?.close()
|
|
self.dataChannel = nil
|
|
self.peerConnection?.close()
|
|
self.peerConnection = nil
|
|
self.factory = nil
|
|
self.session = nil
|
|
self.assistantAudioActive = false
|
|
self.assistantAudioFinishTask?.cancel()
|
|
self.assistantAudioFinishTask = nil
|
|
if shouldNotify {
|
|
self.delegate?.realtimeSessionDidFinish(self)
|
|
}
|
|
}
|
|
|
|
private func checkNotStopped() throws {
|
|
if self.stopped {
|
|
throw CancellationError()
|
|
}
|
|
}
|
|
|
|
private func elapsedMs() -> Int {
|
|
max(0, Int((ProcessInfo.processInfo.systemUptime - self.timelineStartedAt) * 1000))
|
|
}
|
|
|
|
private func trace(_ message: String) {
|
|
GatewayDiagnostics.log("talk.timeline realtime +\(self.elapsedMs())ms \(message)")
|
|
Self.logger.info("timeline +\(self.elapsedMs(), privacy: .public)ms \(message, privacy: .public)")
|
|
}
|
|
|
|
func cancelResponse() {
|
|
self.sendRealtimeEvent(["type": "response.cancel"])
|
|
self.cancelActiveToolCalls()
|
|
}
|
|
|
|
private func cancelActiveToolCalls() {
|
|
let runIds = Array(Set(activeToolRunIds.values))
|
|
for task in self.activeToolTasks.values {
|
|
task.cancel()
|
|
}
|
|
self.activeToolTasks.removeAll()
|
|
self.activeToolRunIds.removeAll()
|
|
for runId in runIds {
|
|
Task { [gateway, sessionKey] in
|
|
let params = ["sessionKey": sessionKey, "runId": runId]
|
|
guard let data = try? JSONSerialization.data(withJSONObject: params),
|
|
let json = String(data: data, encoding: .utf8)
|
|
else { return }
|
|
_ = try? await gateway.request(method: "chat.abort", paramsJSON: json, timeoutSeconds: 5)
|
|
}
|
|
}
|
|
}
|
|
|
|
private func createClientSession(
|
|
provider: String?,
|
|
model: String?,
|
|
voice: String?) async throws -> TalkRealtimeClientSession
|
|
{
|
|
self.trace("gateway talk.client.create start")
|
|
let startedAt = ProcessInfo.processInfo.systemUptime
|
|
let params = TalkRealtimeClientCreateParams(provider: provider, model: model, voice: voice)
|
|
let data = try JSONEncoder().encode(params)
|
|
let json = String(data: data, encoding: .utf8)
|
|
let res = try await gateway.request(method: "talk.client.create", paramsJSON: json, timeoutSeconds: 12)
|
|
let session = try JSONDecoder().decode(TalkRealtimeClientSession.self, from: res)
|
|
let elapsed = Int((ProcessInfo.processInfo.systemUptime - startedAt) * 1000)
|
|
self.trace(
|
|
"gateway talk.client.create done elapsedMs=\(elapsed) "
|
|
+ "provider=\(session.provider) transport=\(session.transport) "
|
|
+ "model=\(session.model ?? "unknown") voice=\(session.voice ?? "unknown")")
|
|
return session
|
|
}
|
|
|
|
private func createOffer(peer: RTCPeerConnection) async throws -> RTCSessionDescription {
|
|
self.trace("local offer create start")
|
|
let constraints = RTCMediaConstraints(
|
|
mandatoryConstraints: [
|
|
"OfferToReceiveAudio": "true",
|
|
"OfferToReceiveVideo": "false",
|
|
],
|
|
optionalConstraints: nil)
|
|
return try await withCheckedThrowingContinuation { continuation in
|
|
peer.offer(for: constraints) { offer, error in
|
|
if let error {
|
|
continuation.resume(throwing: error)
|
|
} else if let offer {
|
|
continuation.resume(returning: offer)
|
|
} else {
|
|
continuation.resume(throwing: NSError(domain: "TalkRealtimeWebRTC", code: 3, userInfo: [
|
|
NSLocalizedDescriptionKey: "OpenAI realtime offer creation returned no SDP",
|
|
]))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private func setLocalDescription(_ description: RTCSessionDescription, peer: RTCPeerConnection) async throws {
|
|
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
|
|
peer.setLocalDescription(description) { error in
|
|
if let error {
|
|
continuation.resume(throwing: error)
|
|
} else {
|
|
continuation.resume()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private func setRemoteDescription(_ description: RTCSessionDescription, peer: RTCPeerConnection) async throws {
|
|
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
|
|
peer.setRemoteDescription(description) { error in
|
|
if let error {
|
|
continuation.resume(throwing: error)
|
|
} else {
|
|
continuation.resume()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private func exchangeOffer(_ sdp: String, session: TalkRealtimeClientSession) async throws -> String {
|
|
let rawURL = session.offerUrl ?? Self.defaultOfferURL
|
|
guard let url = URL(string: rawURL) else {
|
|
throw NSError(domain: "TalkRealtimeWebRTC", code: 4, userInfo: [
|
|
NSLocalizedDescriptionKey: "Invalid OpenAI realtime offer URL",
|
|
])
|
|
}
|
|
var request = URLRequest(url: url)
|
|
request.httpMethod = "POST"
|
|
request.setValue("Bearer \(session.clientSecret)", forHTTPHeaderField: "Authorization")
|
|
request.setValue("application/sdp", forHTTPHeaderField: "Content-Type")
|
|
request.httpBody = sdp.data(using: .utf8)
|
|
for (key, value) in session.offerHeaders ?? [:] {
|
|
request.setValue(value, forHTTPHeaderField: key)
|
|
}
|
|
|
|
self.trace("openai webrtc offer exchange start urlHost=\(url.host ?? "unknown")")
|
|
let startedAt = ProcessInfo.processInfo.systemUptime
|
|
let (data, response) = try await URLSession.shared.data(for: request)
|
|
guard let http = response as? HTTPURLResponse else {
|
|
throw NSError(domain: "TalkRealtimeWebRTC", code: 5, userInfo: [
|
|
NSLocalizedDescriptionKey: "OpenAI realtime offer returned a non-HTTP response",
|
|
])
|
|
}
|
|
let elapsed = Int((ProcessInfo.processInfo.systemUptime - startedAt) * 1000)
|
|
self.trace("openai webrtc offer exchange response status=\(http.statusCode) elapsedMs=\(elapsed)")
|
|
guard (200..<300).contains(http.statusCode) else {
|
|
let body = String(data: data, encoding: .utf8) ?? ""
|
|
throw NSError(domain: "TalkRealtimeWebRTC", code: http.statusCode, userInfo: [
|
|
NSLocalizedDescriptionKey: "OpenAI realtime offer failed: \(http.statusCode) \(body)",
|
|
])
|
|
}
|
|
guard let answer = String(data: data, encoding: .utf8),
|
|
!answer.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty
|
|
else {
|
|
throw NSError(domain: "TalkRealtimeWebRTC", code: 6, userInfo: [
|
|
NSLocalizedDescriptionKey: "OpenAI realtime offer returned an empty SDP answer",
|
|
])
|
|
}
|
|
return answer
|
|
}
|
|
|
|
private func handleRealtimeEvent(_ event: TalkRealtimeServerEvent) {
|
|
if !self.seenRealtimeEventTypes.contains(event.type) {
|
|
self.seenRealtimeEventTypes.insert(event.type)
|
|
self.trace("event first type=\(event.type)")
|
|
}
|
|
if self.handleRealtimeAudioStateEvent(event) {
|
|
return
|
|
}
|
|
switch event.type {
|
|
case "conversation.input_transcript.delta",
|
|
"conversation.item.input_audio_transcription.delta":
|
|
if !self.loggedFirstServerSpeech {
|
|
self.loggedFirstServerSpeech = true
|
|
self.trace("server speech/transcript first delta")
|
|
}
|
|
if let text = event.delta ?? event.transcript {
|
|
self.delegate?.realtimeSession(self, didReceiveUserTranscript: text)
|
|
}
|
|
case "conversation.input_transcript.done",
|
|
"conversation.item.input_audio_transcription.completed":
|
|
if let text = event.transcript ?? event.text {
|
|
self.delegate?.realtimeSession(self, didReceiveUserTranscript: text)
|
|
}
|
|
case "conversation.output_transcript.delta",
|
|
"response.output_text.delta",
|
|
"response.audio_transcript.delta",
|
|
"response.output_audio_transcript.delta":
|
|
if !self.loggedFirstAssistantSignal {
|
|
self.loggedFirstAssistantSignal = true
|
|
self.trace("assistant first output signal type=\(event.type)")
|
|
}
|
|
if let text = event.delta ?? event.transcript ?? event.text {
|
|
self.delegate?.realtimeSession(self, didReceiveAssistantTranscript: text)
|
|
}
|
|
case "conversation.output_transcript.done",
|
|
"response.output_text.done",
|
|
"response.audio_transcript.done",
|
|
"response.output_audio_transcript.done":
|
|
if let text = event.transcript ?? event.text {
|
|
self.delegate?.realtimeSession(self, didReceiveAssistantTranscript: text)
|
|
}
|
|
case "response.function_call_arguments.delta":
|
|
self.bufferToolDelta(event)
|
|
case "response.output_item.added":
|
|
self.bufferToolMetadata(event)
|
|
case "response.function_call_arguments.done",
|
|
"response.output_item.done",
|
|
"conversation.item.done":
|
|
self.handleToolDone(event)
|
|
case "error":
|
|
self.delegate?.realtimeSession(self, didChangeStatus: "Realtime error")
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
|
|
private func handleRealtimeAudioStateEvent(_ event: TalkRealtimeServerEvent) -> Bool {
|
|
switch event.type {
|
|
case "response.audio.delta", "response.output_audio.delta", "conversation.output_audio.delta":
|
|
self.markAssistantAudioActive()
|
|
return true
|
|
case "response.created":
|
|
self.trace("response created")
|
|
self.markAssistantAudioActive()
|
|
return true
|
|
case "response.audio.done", "response.output_audio.done", "conversation.output_audio.done", "response.done":
|
|
self.scheduleAssistantAudioFinished()
|
|
return true
|
|
case "input_audio_buffer.speech_started":
|
|
if self.assistantAudioActive {
|
|
self.trace("input speech ignored while assistant audio active")
|
|
return true
|
|
}
|
|
if !self.loggedFirstServerSpeech {
|
|
self.loggedFirstServerSpeech = true
|
|
self.trace("server detected speech")
|
|
}
|
|
self.delegate?.realtimeSession(self, didDetectInputSpeech: true)
|
|
self.delegate?.realtimeSession(self, didChangeStatus: "Listening")
|
|
return true
|
|
case "input_audio_buffer.speech_stopped", "input_audio_buffer.committed":
|
|
self.delegate?.realtimeSession(self, didDetectInputSpeech: false)
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
private func markAssistantAudioActive() {
|
|
self.assistantAudioActive = true
|
|
self.assistantAudioFinishTask?.cancel()
|
|
self.assistantAudioFinishTask = nil
|
|
self.delegate?.realtimeSession(self, didDetectInputSpeech: false)
|
|
self.delegate?.realtimeSession(self, didChangeStatus: "Speaking")
|
|
}
|
|
|
|
private func scheduleAssistantAudioFinished() {
|
|
self.assistantAudioFinishTask?.cancel()
|
|
self.assistantAudioFinishTask = Task { @MainActor [weak self] in
|
|
try? await Task.sleep(
|
|
nanoseconds: UInt64(Self.assistantPlaybackDrainGraceSeconds * 1_000_000_000))
|
|
guard let self, !Task.isCancelled, !self.stopped else { return }
|
|
self.assistantAudioActive = false
|
|
self.assistantAudioFinishTask = nil
|
|
self.delegate?.realtimeSession(self, didChangeStatus: "Listening")
|
|
}
|
|
}
|
|
|
|
private func toolBufferKey(for event: TalkRealtimeServerEvent) -> String? {
|
|
event.resolvedItemId ?? event.resolvedCallId
|
|
}
|
|
|
|
private func bufferToolMetadata(_ event: TalkRealtimeServerEvent) {
|
|
guard Self.isSupportedToolName(event.resolvedName), let key = toolBufferKey(for: event) else { return }
|
|
var buffer = self.toolBuffers[key] ?? ToolBuffer(name: "", callId: "", args: "")
|
|
buffer.name = event.resolvedName ?? buffer.name
|
|
buffer.callId = event.resolvedCallId ?? buffer.callId
|
|
if let arguments = event.resolvedArguments, !arguments.isEmpty {
|
|
buffer.args = arguments
|
|
}
|
|
self.toolBuffers[key] = buffer
|
|
}
|
|
|
|
private func bufferToolDelta(_ event: TalkRealtimeServerEvent) {
|
|
guard let key = toolBufferKey(for: event) else { return }
|
|
var buffer = self.toolBuffers[key] ?? ToolBuffer(
|
|
name: event.resolvedName ?? "",
|
|
callId: event.resolvedCallId ?? "",
|
|
args: "")
|
|
buffer.name = buffer.name.isEmpty ? (event.resolvedName ?? "") : buffer.name
|
|
buffer.callId = buffer.callId.isEmpty ? (event.resolvedCallId ?? "") : buffer.callId
|
|
buffer.args += event.delta ?? ""
|
|
self.toolBuffers[key] = buffer
|
|
}
|
|
|
|
private func handleToolDone(_ event: TalkRealtimeServerEvent) {
|
|
guard let key = toolBufferKey(for: event) else { return }
|
|
let buffered = self.toolBuffers[key]
|
|
let name = buffered?.name.isEmpty == false ? buffered?.name : event.resolvedName
|
|
let callId = buffered?.callId.isEmpty == false ? buffered?.callId : event.resolvedCallId
|
|
let args = buffered?.args.isEmpty == false ? buffered?.args : event.resolvedArguments
|
|
guard Self.isSupportedToolName(name), let callId, !callId.isEmpty else { return }
|
|
guard args?.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty == false else {
|
|
self.bufferToolMetadata(event)
|
|
return
|
|
}
|
|
guard self.activeToolTasks[callId] == nil else { return }
|
|
self.toolBuffers.removeValue(forKey: key)
|
|
self.trace("tool call ready name=\(name ?? "unknown") callId=\(callId) argsBytes=\((args ?? "").utf8.count)")
|
|
self.assistantAudioActive = false
|
|
self.assistantAudioFinishTask?.cancel()
|
|
self.assistantAudioFinishTask = nil
|
|
self.delegate?.realtimeSession(
|
|
self,
|
|
didChangeStatus: name == Self.controlToolName ? "Updating OpenClaw" : "Asking OpenClaw")
|
|
let task = Task { @MainActor [weak self] in
|
|
guard let self else { return }
|
|
if name == Self.controlToolName {
|
|
await self.submitControlToolCall(callId: callId, argsJSON: args ?? "{}")
|
|
} else {
|
|
await self.submitConsultToolCall(callId: callId, argsJSON: args ?? "{}")
|
|
}
|
|
}
|
|
self.activeToolTasks[callId] = task
|
|
}
|
|
|
|
private static func isSupportedToolName(_ name: String?) -> Bool {
|
|
name == self.consultToolName || name == self.controlToolName
|
|
}
|
|
|
|
private func submitConsultToolCall(callId: String, argsJSON: String) async {
|
|
self.trace("tool call submit start callId=\(callId) argsBytes=\(argsJSON.utf8.count)")
|
|
let statusTask = Task { @MainActor [weak self] in
|
|
try? await Task.sleep(nanoseconds: UInt64(Self.stillWorkingDelaySeconds) * 1_000_000_000)
|
|
guard let self, !Task.isCancelled, !self.stopped else { return }
|
|
self.delegate?.realtimeSession(self, didChangeStatus: "Still asking OpenClaw")
|
|
}
|
|
defer {
|
|
statusTask.cancel()
|
|
self.activeToolTasks[callId] = nil
|
|
self.activeToolRunIds[callId] = nil
|
|
}
|
|
do {
|
|
let args = try Self.decodeJSONObject(argsJSON)
|
|
let params: [String: Any] = [
|
|
"sessionKey": sessionKey,
|
|
"callId": callId,
|
|
"name": Self.consultToolName,
|
|
"args": args,
|
|
]
|
|
let historySince = Date().timeIntervalSince1970
|
|
let data = try JSONSerialization.data(withJSONObject: params)
|
|
guard let json = String(data: data, encoding: .utf8) else {
|
|
throw NSError(domain: "TalkRealtimeWebRTC", code: 7, userInfo: [
|
|
NSLocalizedDescriptionKey: "Failed to encode realtime tool call",
|
|
])
|
|
}
|
|
let stream = await gateway.subscribeServerEvents(bufferingNewest: 200)
|
|
self.trace("tool call gateway request start callId=\(callId)")
|
|
let requestStartedAt = ProcessInfo.processInfo.systemUptime
|
|
let res = try await gateway.request(
|
|
method: "talk.client.toolCall",
|
|
paramsJSON: json,
|
|
timeoutSeconds: Self.toolCallTimeoutSeconds)
|
|
let response = try JSONDecoder().decode(TalkRealtimeToolCallResponse.self, from: res)
|
|
let requestElapsed = Int((ProcessInfo.processInfo.systemUptime - requestStartedAt) * 1000)
|
|
guard let runId = response.runId ?? response.idempotencyKey else {
|
|
throw NSError(domain: "TalkRealtimeWebRTC", code: 8, userInfo: [
|
|
NSLocalizedDescriptionKey: "Gateway did not return a realtime tool run id",
|
|
])
|
|
}
|
|
self.trace("tool call gateway request done callId=\(callId) runId=\(runId) elapsedMs=\(requestElapsed)")
|
|
self.activeToolRunIds[callId] = runId
|
|
if Task.isCancelled || self.stopped {
|
|
await self.abortChatRun(runId: runId)
|
|
return
|
|
}
|
|
let result = try await waitForChatResult(
|
|
runId: runId,
|
|
stream: stream,
|
|
since: historySince,
|
|
timeoutSeconds: Self.toolResultTimeoutSeconds)
|
|
if Task.isCancelled || self.stopped { return }
|
|
self.trace("tool call chat result ready callId=\(callId) runId=\(runId) chars=\(result.count)")
|
|
self.submitToolResult(callId: callId, result: ["result": result])
|
|
} catch is CancellationError {
|
|
return
|
|
} catch {
|
|
if Task.isCancelled || self.stopped { return }
|
|
Self.logger.error("realtime tool call failed: \(error.localizedDescription, privacy: .public)")
|
|
self.trace("tool call failed callId=\(callId) error=\(error.localizedDescription)")
|
|
if let runId = activeToolRunIds[callId] {
|
|
await self.abortChatRun(runId: runId)
|
|
}
|
|
self.delegate?.realtimeSession(self, didChangeStatus: "OpenClaw unavailable")
|
|
let fallbackMessage = [
|
|
"OpenClaw consult did not finish quickly enough.",
|
|
"Give a brief spoken fallback from the realtime conversation",
|
|
"and ask the user to try again if they need OpenClaw-specific context.",
|
|
].joined(separator: " ")
|
|
self.submitToolResult(callId: callId, result: [
|
|
"error": fallbackMessage,
|
|
])
|
|
}
|
|
guard !Task.isCancelled, !self.stopped else { return }
|
|
if !self.assistantAudioActive {
|
|
self.delegate?.realtimeSession(self, didChangeStatus: "Listening")
|
|
}
|
|
}
|
|
|
|
private func submitControlToolCall(callId: String, argsJSON: String) async {
|
|
self.trace("control tool submit start callId=\(callId) argsBytes=\(argsJSON.utf8.count)")
|
|
defer { self.activeToolTasks[callId] = nil }
|
|
do {
|
|
let params = try Self.controlParams(sessionKey: self.sessionKey, argsJSON: argsJSON)
|
|
let data = try JSONSerialization.data(withJSONObject: params)
|
|
guard let json = String(data: data, encoding: .utf8) else {
|
|
throw NSError(domain: "TalkRealtimeWebRTC", code: 19, userInfo: [
|
|
NSLocalizedDescriptionKey: "Failed to encode realtime control call",
|
|
])
|
|
}
|
|
let res = try await gateway.request(
|
|
method: "talk.client.steer",
|
|
paramsJSON: json,
|
|
timeoutSeconds: Self.toolCallTimeoutSeconds)
|
|
let message = Self.controlResultMessage(from: res) ?? "OpenClaw updated the active run."
|
|
self.trace("control tool gateway request done callId=\(callId) messageBytes=\(message.utf8.count)")
|
|
self.submitToolResult(callId: callId, result: ["result": message])
|
|
} catch is CancellationError {
|
|
return
|
|
} catch {
|
|
if Task.isCancelled || self.stopped { return }
|
|
Self.logger.error("realtime control tool failed: \(error.localizedDescription, privacy: .public)")
|
|
self.trace("control tool failed callId=\(callId) error=\(error.localizedDescription)")
|
|
self.submitToolResult(callId: callId, result: [
|
|
"error": "OpenClaw could not update the active run.",
|
|
])
|
|
}
|
|
guard !Task.isCancelled, !self.stopped else { return }
|
|
if !self.assistantAudioActive {
|
|
self.delegate?.realtimeSession(self, didChangeStatus: "Listening")
|
|
}
|
|
}
|
|
|
|
private static func controlParams(sessionKey: String, argsJSON: String) throws -> [String: Any] {
|
|
let args = try Self.decodeJSONObject(argsJSON)
|
|
let record = args as? [String: Any] ?? [:]
|
|
let text = Self.nonEmptyString(record["text"])
|
|
?? Self.nonEmptyString(record["message"])
|
|
?? Self.nonEmptyString(record["request"])
|
|
?? Self.nonEmptyString(record["query"])
|
|
guard let text else {
|
|
throw NSError(domain: "TalkRealtimeWebRTC", code: 20, userInfo: [
|
|
NSLocalizedDescriptionKey: "OpenClaw control tool call missing text",
|
|
])
|
|
}
|
|
var params: [String: Any] = [
|
|
"sessionKey": sessionKey,
|
|
"text": text,
|
|
]
|
|
if let mode = Self.nonEmptyString(record["mode"]) {
|
|
params["mode"] = mode
|
|
}
|
|
return params
|
|
}
|
|
|
|
private static func nonEmptyString(_ value: Any?) -> String? {
|
|
guard let raw = value as? String else { return nil }
|
|
let trimmed = raw.trimmingCharacters(in: .whitespacesAndNewlines)
|
|
return trimmed.isEmpty ? nil : trimmed
|
|
}
|
|
|
|
private static func controlResultMessage(from data: Data) -> String? {
|
|
guard let object = try? JSONSerialization.jsonObject(with: data),
|
|
let record = object as? [String: Any]
|
|
else { return nil }
|
|
return Self.nonEmptyString(record["message"])
|
|
}
|
|
|
|
private func abortChatRun(runId: String) async {
|
|
let params = ["sessionKey": sessionKey, "runId": runId]
|
|
guard let data = try? JSONSerialization.data(withJSONObject: params),
|
|
let json = String(data: data, encoding: .utf8)
|
|
else { return }
|
|
_ = try? await self.gateway.request(method: "chat.abort", paramsJSON: json, timeoutSeconds: 5)
|
|
}
|
|
|
|
private static func decodeJSONObject(_ json: String) throws -> Any {
|
|
let trimmed = json.trimmingCharacters(in: .whitespacesAndNewlines)
|
|
guard !trimmed.isEmpty else { return [:] }
|
|
let data = Data(trimmed.utf8)
|
|
return try JSONSerialization.jsonObject(with: data)
|
|
}
|
|
|
|
private func waitForChatResult(
|
|
runId: String,
|
|
stream: AsyncStream<EventFrame>,
|
|
since: Double,
|
|
timeoutSeconds: Int = 120) async throws -> String
|
|
{
|
|
let currentSessionKey = self.sessionKey
|
|
return try await withThrowingTaskGroup(of: String.self) { group in
|
|
group.addTask { [runId, currentSessionKey] in
|
|
for await evt in stream {
|
|
guard evt.event == "chat", let payload = evt.payload else { continue }
|
|
guard let chatEvent = try? GatewayPayloadDecoding.decode(
|
|
payload,
|
|
as: OpenClawChatEventPayload.self)
|
|
else {
|
|
continue
|
|
}
|
|
guard chatEvent.runId == runId else { continue }
|
|
if let eventSessionKey = chatEvent.sessionKey,
|
|
!Self.matchesSessionKey(eventSessionKey, currentSessionKey)
|
|
{
|
|
continue
|
|
}
|
|
await MainActor.run {
|
|
self.trace("chat event runId=\(runId) state=\(chatEvent.state ?? "unknown")")
|
|
}
|
|
if chatEvent.state == "final" {
|
|
return OpenClawChatEventText.assistantText(from: chatEvent) ?? "OpenClaw finished with no text."
|
|
}
|
|
if chatEvent.state == "aborted" {
|
|
throw NSError(domain: "TalkRealtimeWebRTC", code: 9, userInfo: [
|
|
NSLocalizedDescriptionKey: "OpenClaw realtime tool call aborted",
|
|
])
|
|
}
|
|
if chatEvent.state == "error" {
|
|
throw NSError(domain: "TalkRealtimeWebRTC", code: 10, userInfo: [
|
|
NSLocalizedDescriptionKey: "OpenClaw realtime tool call failed",
|
|
])
|
|
}
|
|
}
|
|
throw NSError(domain: "TalkRealtimeWebRTC", code: 11, userInfo: [
|
|
NSLocalizedDescriptionKey: "OpenClaw realtime tool event stream ended",
|
|
])
|
|
}
|
|
group.addTask { [gateway, sessionKey] in
|
|
try await Self.waitForAgentResult(
|
|
gateway: gateway,
|
|
sessionKey: sessionKey,
|
|
runId: runId,
|
|
since: since,
|
|
timeoutSeconds: timeoutSeconds)
|
|
}
|
|
group.addTask {
|
|
try await Task.sleep(nanoseconds: UInt64(timeoutSeconds) * 1_000_000_000)
|
|
throw NSError(domain: "TalkRealtimeWebRTC", code: 12, userInfo: [
|
|
NSLocalizedDescriptionKey: "OpenClaw realtime tool call timed out",
|
|
])
|
|
}
|
|
guard let result = try await group.next() else {
|
|
throw NSError(domain: "TalkRealtimeWebRTC", code: 13, userInfo: [
|
|
NSLocalizedDescriptionKey: "OpenClaw realtime tool call did not finish",
|
|
])
|
|
}
|
|
group.cancelAll()
|
|
return result
|
|
}
|
|
}
|
|
|
|
private nonisolated static func matchesSessionKey(_ incoming: String, _ current: String) -> Bool {
|
|
let incoming = incoming.trimmingCharacters(in: .whitespacesAndNewlines).lowercased()
|
|
let current = current.trimmingCharacters(in: .whitespacesAndNewlines).lowercased()
|
|
if incoming == current { return true }
|
|
return (incoming == "agent:main:main" && current == "main") ||
|
|
(incoming == "main" && current == "agent:main:main")
|
|
}
|
|
|
|
private static func waitForAgentResult(
|
|
gateway: GatewayNodeSession,
|
|
sessionKey: String,
|
|
runId: String,
|
|
since: Double,
|
|
timeoutSeconds: Int) async throws -> String
|
|
{
|
|
let deadline = Date().addingTimeInterval(TimeInterval(timeoutSeconds))
|
|
var sawProviderStart = false
|
|
while Date() < deadline {
|
|
let remaining = max(1, Int(ceil(deadline.timeIntervalSinceNow)))
|
|
let waitSeconds = min(Self.agentWaitSliceSeconds, remaining)
|
|
let wait = try await Self.agentWait(
|
|
gateway: gateway,
|
|
runId: runId,
|
|
timeoutSeconds: waitSeconds)
|
|
let status = wait.status?.lowercased() ?? "unknown"
|
|
if wait.startedAt != nil || wait.providerStarted == true {
|
|
sawProviderStart = true
|
|
}
|
|
GatewayDiagnostics.log(
|
|
"talk.timeline realtime agent.wait runId=\(runId) status=\(status) "
|
|
+
|
|
"phase=\(wait.timeoutPhase ?? "unknown") "
|
|
+
|
|
"providerStarted=\(wait.providerStarted.map(String.init) ?? "unknown")")
|
|
switch status {
|
|
case "ok":
|
|
if let text = try await Self.waitForAssistantTextFromHistory(
|
|
gateway: gateway,
|
|
sessionKey: sessionKey,
|
|
since: since,
|
|
timeoutSeconds: Self.historyFallbackTimeoutSeconds)
|
|
{
|
|
return text
|
|
}
|
|
case "error":
|
|
throw NSError(domain: "TalkRealtimeWebRTC", code: 14, userInfo: [
|
|
NSLocalizedDescriptionKey: wait.error ?? "OpenClaw realtime tool call failed",
|
|
])
|
|
case "aborted", "cancelled", "canceled":
|
|
throw NSError(domain: "TalkRealtimeWebRTC", code: 15, userInfo: [
|
|
NSLocalizedDescriptionKey: wait.stopReason ?? "OpenClaw realtime tool call aborted",
|
|
])
|
|
case "timeout":
|
|
break
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
let phase = sawProviderStart ? "provider" : "queue"
|
|
throw NSError(domain: "TalkRealtimeWebRTC", code: 16, userInfo: [
|
|
NSLocalizedDescriptionKey: "OpenClaw realtime tool call timed out in \(phase)",
|
|
])
|
|
}
|
|
|
|
private static func agentWait(
|
|
gateway: GatewayNodeSession,
|
|
runId: String,
|
|
timeoutSeconds: Int) async throws -> AgentWaitResponse
|
|
{
|
|
let timeoutMs = max(1, timeoutSeconds) * 1000
|
|
let params: [String: Any] = [
|
|
"runId": runId,
|
|
"timeoutMs": timeoutMs,
|
|
]
|
|
let data = try JSONSerialization.data(withJSONObject: params)
|
|
guard let json = String(data: data, encoding: .utf8) else {
|
|
throw NSError(domain: "TalkRealtimeWebRTC", code: 17, userInfo: [
|
|
NSLocalizedDescriptionKey: "Failed to encode OpenClaw wait request",
|
|
])
|
|
}
|
|
let response = try await gateway.request(
|
|
method: "agent.wait",
|
|
paramsJSON: json,
|
|
timeoutSeconds: timeoutSeconds + Self.agentWaitRequestGraceSeconds)
|
|
return try JSONDecoder().decode(AgentWaitResponse.self, from: response)
|
|
}
|
|
|
|
private static func waitForAssistantTextFromHistory(
|
|
gateway: GatewayNodeSession,
|
|
sessionKey: String,
|
|
since: Double,
|
|
timeoutSeconds: Int) async throws -> String?
|
|
{
|
|
let deadline = Date().addingTimeInterval(TimeInterval(timeoutSeconds))
|
|
while Date() < deadline {
|
|
if let text = try await Self.latestAssistantTextFromHistory(
|
|
gateway: gateway,
|
|
sessionKey: sessionKey,
|
|
since: since)
|
|
{
|
|
return text
|
|
}
|
|
try? await Task.sleep(nanoseconds: 300_000_000)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
private static func latestAssistantTextFromHistory(
|
|
gateway: GatewayNodeSession,
|
|
sessionKey: String,
|
|
since: Double) async throws -> String?
|
|
{
|
|
let params: [String: Any] = ["sessionKey": sessionKey]
|
|
let data = try JSONSerialization.data(withJSONObject: params)
|
|
guard let json = String(data: data, encoding: .utf8) else {
|
|
throw NSError(domain: "TalkRealtimeWebRTC", code: 18, userInfo: [
|
|
NSLocalizedDescriptionKey: "Failed to encode OpenClaw history request",
|
|
])
|
|
}
|
|
let response = try await gateway.request(method: "chat.history", paramsJSON: json, timeoutSeconds: 15)
|
|
let history = try JSONDecoder().decode(OpenClawChatHistoryPayload.self, from: response)
|
|
let messages = history.messages ?? []
|
|
let decoded: [OpenClawChatMessage] = messages.compactMap { item in
|
|
guard let data = try? JSONEncoder().encode(item) else { return nil }
|
|
return try? JSONDecoder().decode(OpenClawChatMessage.self, from: data)
|
|
}
|
|
let assistant = decoded.last { message in
|
|
guard message.role == "assistant" else { return false }
|
|
guard let timestamp = message.timestamp else { return false }
|
|
return TalkHistoryTimestamp.isAfter(timestamp, sinceSeconds: since)
|
|
}
|
|
guard let assistant else { return nil }
|
|
let text = assistant.content.compactMap(\.text).joined(separator: "\n")
|
|
let trimmed = text.trimmingCharacters(in: .whitespacesAndNewlines)
|
|
return trimmed.isEmpty ? nil : trimmed
|
|
}
|
|
|
|
private func submitToolResult(callId: String, result: [String: String]) {
|
|
guard let output = Self.encodeJSONString(result) else { return }
|
|
self.trace("tool result send callId=\(callId) outputBytes=\(output.utf8.count)")
|
|
self.sendRealtimeEvent([
|
|
"type": "conversation.item.create",
|
|
"item": [
|
|
"type": "function_call_output",
|
|
"call_id": callId,
|
|
"output": output,
|
|
],
|
|
])
|
|
self.sendRealtimeEvent(["type": "response.create"])
|
|
}
|
|
|
|
private static func encodeJSONString(_ value: Any) -> String? {
|
|
guard JSONSerialization.isValidJSONObject(value) else { return nil }
|
|
guard let data = try? JSONSerialization.data(withJSONObject: value) else { return nil }
|
|
return String(data: data, encoding: .utf8)
|
|
}
|
|
|
|
private func sendRealtimeEvent(_ event: [String: Any]) {
|
|
guard
|
|
let channel = dataChannel,
|
|
channel.readyState == .open,
|
|
let json = Self.encodeJSONString(event),
|
|
let data = json.data(using: .utf8)
|
|
else { return }
|
|
channel.sendData(RTCDataBuffer(data: data, isBinary: false))
|
|
if let type = event["type"] as? String {
|
|
self.trace("client event sent type=\(type)")
|
|
}
|
|
}
|
|
|
|
private static func configureAudioSession() throws {
|
|
let config = RTCAudioSessionConfiguration.webRTC()
|
|
config.category = AVAudioSession.Category.playAndRecord.rawValue
|
|
config.mode = AVAudioSession.Mode.default.rawValue
|
|
config.categoryOptions = [
|
|
.allowBluetoothHFP,
|
|
.defaultToSpeaker,
|
|
]
|
|
config.sampleRate = 48000
|
|
config.ioBufferDuration = 0.01
|
|
RTCAudioSessionConfiguration.setWebRTC(config)
|
|
|
|
let session = RTCAudioSession.sharedInstance()
|
|
session.lockForConfiguration()
|
|
defer { session.unlockForConfiguration() }
|
|
|
|
session.ignoresPreferredAttributeConfigurationErrors = true
|
|
try session.setConfiguration(config, active: true)
|
|
try? session.overrideOutputAudioPort(.speaker)
|
|
}
|
|
}
|
|
|
|
extension TalkRealtimeWebRTCSession: RTCPeerConnectionDelegate {
|
|
nonisolated func peerConnection(_: RTCPeerConnection, didChange _: RTCSignalingState) {}
|
|
nonisolated func peerConnection(_: RTCPeerConnection, didAdd stream: RTCMediaStream) {
|
|
Task { @MainActor in
|
|
self
|
|
.trace(
|
|
"remote stream added audioTracks=\(stream.audioTracks.count) "
|
|
+ "videoTracks=\(stream.videoTracks.count)")
|
|
}
|
|
}
|
|
|
|
nonisolated func peerConnection(_: RTCPeerConnection, didRemove _: RTCMediaStream) {}
|
|
nonisolated func peerConnectionShouldNegotiate(_: RTCPeerConnection) {}
|
|
nonisolated func peerConnection(_: RTCPeerConnection, didChange newState: RTCIceConnectionState) {
|
|
Task { @MainActor in
|
|
guard !self.stopped else { return }
|
|
switch newState {
|
|
case .connected, .completed:
|
|
if !self.assistantAudioActive {
|
|
self.delegate?.realtimeSession(self, didChangeStatus: "Listening")
|
|
}
|
|
case .disconnected:
|
|
self.delegate?.realtimeSession(self, didChangeStatus: "Reconnecting")
|
|
case .failed, .closed:
|
|
self.delegate?.realtimeSession(self, didChangeStatus: "Realtime disconnected")
|
|
self.stop()
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
nonisolated func peerConnection(_: RTCPeerConnection, didChange _: RTCIceGatheringState) {}
|
|
nonisolated func peerConnection(_: RTCPeerConnection, didGenerate _: RTCIceCandidate) {}
|
|
nonisolated func peerConnection(_: RTCPeerConnection, didRemove _: [RTCIceCandidate]) {}
|
|
nonisolated func peerConnection(_: RTCPeerConnection, didOpen dataChannel: RTCDataChannel) {
|
|
Task { @MainActor in
|
|
self.dataChannel = dataChannel
|
|
dataChannel.delegate = self
|
|
}
|
|
}
|
|
}
|
|
|
|
extension TalkRealtimeWebRTCSession: RTCDataChannelDelegate {
|
|
nonisolated func dataChannelDidChangeState(_ dataChannel: RTCDataChannel) {
|
|
Task { @MainActor in
|
|
guard !self.stopped else { return }
|
|
if dataChannel.readyState == .open {
|
|
if !self.assistantAudioActive {
|
|
self.delegate?.realtimeSession(self, didChangeStatus: "Listening")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
nonisolated func dataChannel(_: RTCDataChannel, didReceiveMessageWith buffer: RTCDataBuffer) {
|
|
guard !buffer.isBinary else { return }
|
|
let data = buffer.data
|
|
Task { @MainActor in
|
|
guard !self.stopped else { return }
|
|
do {
|
|
let event = try JSONDecoder().decode(TalkRealtimeServerEvent.self, from: data)
|
|
self.handleRealtimeEvent(event)
|
|
} catch {
|
|
Self.logger
|
|
.debug("ignored realtime event decode failure: \(error.localizedDescription, privacy: .public)")
|
|
}
|
|
}
|
|
}
|
|
}
|