import Foundation import Testing @testable import OpenClawKit import OpenClawProtocol private extension NSLock { func withLock(_ body: () -> T) -> T { self.lock() defer { self.unlock() } return body() } } private final class FakeGatewayWebSocketTask: WebSocketTasking, @unchecked Sendable { private let lock = NSLock() private var _state: URLSessionTask.State = .suspended private var connectRequestId: String? private var receivePhase = 0 private var pendingReceiveHandler: (@Sendable (Result) -> Void)? var state: URLSessionTask.State { get { self.lock.withLock { self._state } } set { self.lock.withLock { self._state = newValue } } } func resume() { self.state = .running } func cancel(with closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) { _ = (closeCode, reason) self.state = .canceling let handler = self.lock.withLock { () -> (@Sendable (Result) -> Void)? in defer { self.pendingReceiveHandler = nil } return self.pendingReceiveHandler } handler?(Result.failure(URLError(.cancelled))) } func send(_ message: URLSessionWebSocketTask.Message) async throws { let data: Data? = switch message { case let .data(d): d case let .string(s): s.data(using: .utf8) @unknown default: nil } guard let data else { return } if let obj = try? JSONSerialization.jsonObject(with: data) as? [String: Any], obj["type"] as? String == "req", obj["method"] as? String == "connect", let id = obj["id"] as? String { self.lock.withLock { self.connectRequestId = id } } } func sendPing(pongReceiveHandler: @escaping @Sendable (Error?) -> Void) { pongReceiveHandler(nil) } func receive() async throws -> URLSessionWebSocketTask.Message { let phase = self.lock.withLock { () -> Int in let current = self.receivePhase self.receivePhase += 1 return current } if phase == 0 { return .data(Self.connectChallengeData(nonce: "nonce-1")) } for _ in 0..<50 { let id = self.lock.withLock { self.connectRequestId } if let id { return .data(Self.connectOkData(id: id)) } try await Task.sleep(nanoseconds: 1_000_000) } return .data(Self.connectOkData(id: "connect")) } func receive( completionHandler: @escaping @Sendable (Result) -> Void) { self.lock.withLock { self.pendingReceiveHandler = completionHandler } } func emitReceiveFailure() { let handler = self.lock.withLock { () -> (@Sendable (Result) -> Void)? in self._state = .canceling defer { self.pendingReceiveHandler = nil } return self.pendingReceiveHandler } handler?(Result.failure(URLError(.networkConnectionLost))) } private static func connectChallengeData(nonce: String) -> Data { let frame: [String: Any] = [ "type": "event", "event": "connect.challenge", "payload": ["nonce": nonce], ] return (try? JSONSerialization.data(withJSONObject: frame)) ?? Data() } private static func connectOkData(id: String) -> Data { let payload: [String: Any] = [ "type": "hello-ok", "protocol": 2, "server": [ "version": "test", "connId": "test", ], "features": [ "methods": [], "events": [], ], "snapshot": [ "presence": [["ts": 1]], "health": [:], "stateVersion": [ "presence": 0, "health": 0, ], "uptimeMs": 0, ], "policy": [ "maxPayload": 1, "maxBufferedBytes": 1, "tickIntervalMs": 30_000, ], ] let frame: [String: Any] = [ "type": "res", "id": id, "ok": true, "payload": payload, ] return (try? JSONSerialization.data(withJSONObject: frame)) ?? Data() } } private final class FakeGatewayWebSocketSession: WebSocketSessioning, @unchecked Sendable { private let lock = NSLock() private var tasks: [FakeGatewayWebSocketTask] = [] private var makeCount = 0 func snapshotMakeCount() -> Int { self.lock.withLock { self.makeCount } } func latestTask() -> FakeGatewayWebSocketTask? { self.lock.withLock { self.tasks.last } } func makeWebSocketTask(url: URL) -> WebSocketTaskBox { _ = url return self.lock.withLock { self.makeCount += 1 let task = FakeGatewayWebSocketTask() self.tasks.append(task) return WebSocketTaskBox(task: task) } } } private actor SeqGapProbe { private var saw = false func mark() { self.saw = true } func value() -> Bool { self.saw } } struct GatewayNodeSessionTests { @Test func invokeWithTimeoutReturnsUnderlyingResponseBeforeTimeout() async { let request = BridgeInvokeRequest(id: "1", command: "x", paramsJSON: nil) let response = await GatewayNodeSession.invokeWithTimeout( request: request, timeoutMs: 50, onInvoke: { req in #expect(req.id == "1") return BridgeInvokeResponse(id: req.id, ok: true, payloadJSON: "{}", error: nil) } ) #expect(response.ok == true) #expect(response.error == nil) #expect(response.payloadJSON == "{}") } @Test func invokeWithTimeoutReturnsTimeoutError() async { let request = BridgeInvokeRequest(id: "abc", command: "x", paramsJSON: nil) let response = await GatewayNodeSession.invokeWithTimeout( request: request, timeoutMs: 10, onInvoke: { _ in try? await Task.sleep(nanoseconds: 200_000_000) // 200ms return BridgeInvokeResponse(id: "abc", ok: true, payloadJSON: "{}", error: nil) } ) #expect(response.ok == false) #expect(response.error?.code == .unavailable) #expect(response.error?.message.contains("timed out") == true) } @Test func invokeWithTimeoutZeroDisablesTimeout() async { let request = BridgeInvokeRequest(id: "1", command: "x", paramsJSON: nil) let response = await GatewayNodeSession.invokeWithTimeout( request: request, timeoutMs: 0, onInvoke: { req in try? await Task.sleep(nanoseconds: 5_000_000) return BridgeInvokeResponse(id: req.id, ok: true, payloadJSON: nil, error: nil) } ) #expect(response.ok == true) #expect(response.error == nil) } @Test func emitsSyntheticSeqGapAfterReconnectSnapshot() async throws { let session = FakeGatewayWebSocketSession() let gateway = GatewayNodeSession() let options = GatewayConnectOptions( role: "operator", scopes: ["operator.read"], caps: [], commands: [], permissions: [:], clientId: "openclaw-ios-test", clientMode: "ui", clientDisplayName: "iOS Test", includeDeviceIdentity: false) let stream = await gateway.subscribeServerEvents(bufferingNewest: 32) let probe = SeqGapProbe() let listenTask = Task { for await evt in stream { if evt.event == "seqGap" { await probe.mark() return } } } try await gateway.connect( url: URL(string: "ws://example.invalid")!, token: nil, password: nil, connectOptions: options, sessionBox: WebSocketSessionBox(session: session), onConnected: {}, onDisconnected: { _ in }, onInvoke: { req in BridgeInvokeResponse(id: req.id, ok: true, payloadJSON: nil, error: nil) }) let firstTask = try #require(session.latestTask()) firstTask.emitReceiveFailure() try await waitUntil("reconnect socket created") { session.snapshotMakeCount() >= 2 } try await waitUntil("synthetic seqGap broadcast") { await probe.value() } listenTask.cancel() await gateway.disconnect() } }