Skip to content

Instantly share code, notes, and snippets.

@VaslD
Created June 26, 2023 15:57
Show Gist options
  • Save VaslD/a32b2ab63f0afcfd5760c3855989c7a2 to your computer and use it in GitHub Desktop.
Save VaslD/a32b2ab63f0afcfd5760c3855989c7a2 to your computer and use it in GitHub Desktop.
import Foundation
import Network
public typealias TCPClient = TCPConnection
/// TCP 节点(客户端)连接
///
/// 由于 TCP 是全双工通讯,服务端也需要作为节点建立连接。
public final class TCPConnection: Sendable {
let queue: DispatchQueue
let connection: NWConnection
public init(_ connection: NWConnection) {
let pointer = Unmanaged.passUnretained(connection).toOpaque()
self.queue = DispatchQueue(label: "NWConnection (\(pointer))")
self.connection = connection
}
public init?(host: String, port: Int) {
guard let unsigned = UInt16(exactly: port), let port = NWEndpoint.Port(rawValue: unsigned) else {
return nil
}
self.queue = DispatchQueue(label: "NWConnection (\(host):\(port))")
self.connection = NWConnection(host: NWEndpoint.Host(host), port: port, using: .tcp)
}
public var state: NWConnection.State {
self.connection.state
}
public var peer: NWEndpoint {
self.connection.endpoint
}
public var localEndpoint: NWEndpoint? {
self.connection.currentPath?.localEndpoint
}
public var remoteEndpoint: NWEndpoint? {
self.connection.currentPath?.remoteEndpoint
}
/// 建立连接
///
/// 此方法需要 `await`,将在连接成功或异常后返回。当连接正在建立时,继续(并发)调用此方法将立即导致 `CancellationError`
/// 错误。在已建立的连接上调用此方法无效。
public func connect() async throws {
guard self.connection.stateUpdateHandler == nil else {
throw CancellationError()
}
guard self.connection.state != .ready else {
return
}
return try await withUnsafeThrowingContinuation { continuation in
self.connection.stateUpdateHandler = {
switch $0 {
case .ready:
self.connection.stateUpdateHandler = nil
continuation.resume()
case let .failed(error):
self.connection.stateUpdateHandler = nil
continuation.resume(throwing: error)
case .cancelled:
self.connection.stateUpdateHandler = nil
continuation.resume(throwing: CancellationError())
case let .waiting(error):
self.connection.stateUpdateHandler = nil
self.connection.cancel()
continuation.resume(throwing: error)
default:
break
}
}
self.connection.start(queue: self.queue)
}
}
/// 断开连接
public func disconnect() {
self.connection.cancel()
}
/// 发送一条消息
public func send(_ data: some DataProtocol) async throws {
try await withUnsafeThrowingContinuation { (continuation: UnsafeContinuation<Void, Error>) in
self.connection.send(content: data, completion: .contentProcessed {
if let error = $0 {
continuation.resume(throwing: error)
return
}
continuation.resume()
})
}
}
/// 发送终止消息
///
/// 如需断开连接,提前调用此方法非必需;但一旦调用此方法,连接不可用于发送其他消息。
public func finish() async throws {
try await withUnsafeThrowingContinuation { (continuation: UnsafeContinuation<Void, Error>) in
self.connection.send(content: nil, contentContext: .finalMessage, completion: .contentProcessed {
if let error = $0 {
continuation.resume(throwing: error)
return
}
continuation.resume()
})
}
}
/// 获取接收到消息的异步流
///
/// ```swift
/// for try await message in connection.receive() {
/// // Read message…
/// }
/// ```
///
/// 如果接收过程中连接层出现异常,异步流将被错误中断,通常抛出
/// `NWError`。取决于错误原因,重复调用此方法可能能够从异常中恢复、继续接收消息,也可能会立即再次出错。建议查询
/// ``state`` 和关联错误以决定代码如何继续。
public func receive() -> AsyncThrowingStream<Data, Error> {
AsyncThrowingStream {
try Task.checkCancellation()
return try await withUnsafeThrowingContinuation { continuation in
self.connection.receive(minimumIncompleteLength: 0, maximumLength: .max) {
_ = $1
if let error = $3 {
continuation.resume(throwing: error)
return
}
if $2 {
self.connection.cancel()
}
continuation.resume(returning: $0)
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment