Skip to content

Instantly share code, notes, and snippets.

@lxcid
Last active August 2, 2016 01:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lxcid/eba382c6936c66e514c6858025c9278a to your computer and use it in GitHub Desktop.
Save lxcid/eba382c6936c66e514c6858025c9278a to your computer and use it in GitHub Desktop.
DataBuffer + DataPipe
//
// DataBuffer.swift
// WebSocket
//
// Created by Stan Chang Khin Boon on 30/7/16.
// Copyright © 2016 Trifia. All rights reserved.
//
/// DataBuffer uses a serial dispatch queue to manage its in and out operations asynchornously.
/// The use of dispatch queue ensure that in/out operations are thread safe.
/// But order of in/out operations are often important and care must taken when
/// scheduling in/out operations concurrently so as to ensure a deterministic behavior.
final class DataBuffer {
enum ReadResult {
case NoOperation
case Consume(bytes: Int)
}
typealias ReadHandler = (data: Data) -> ReadResult
typealias CompletionHandler = () -> Void
let serialQueue: DispatchQueue
var data: Data
init(serialQueue optSerialQueue: DispatchQueue? = nil) {
self.serialQueue = optSerialQueue ?? DispatchQueue(label: "com.trifia.networker.databuffer", attributes: [ .serial ], target: nil)
self.data = Data()
}
func asyncIn(data: Data, completionHandler: CompletionHandler? = nil) {
self.serialQueue.async {
self.in(data: data)
completionHandler?()
}
}
func asyncOut(handler: ReadHandler, completionHandler: CompletionHandler? = nil) {
self.serialQueue.async {
self.out(handler: handler)
completionHandler?()
}
}
func `in`(data: Data) {
dispatchPrecondition(condition: .onQueue(self.serialQueue))
self.data.append(data)
}
func out(handler: ReadHandler) {
dispatchPrecondition(condition: .onQueue(self.serialQueue))
let result = handler(data: self.data)
switch result {
case .NoOperation:
break // noop
case .Consume(bytes: let bytes):
self.data = self.data.subdata(in: Range(uncheckedBounds: (0, bytes)))
}
}
}
/// DataPipe wraps a DataBuffer, turning in operations into out events.
/// NOTE: Dispatch source was considered for coalescing out events, but because
/// it only promise that at least an out event is enqueued but not ensuring all in
/// operations are flush when the last out event is executed. This behavior might be
/// surprising for developer. We might consider other way to coalescing out events
/// in the future… e.g. Counting the in and scheduling out when its not 0…
final class DataPipe {
let dataBuffer: DataBuffer
var readHandler: DataBuffer.ReadHandler?
var serialQueue: DispatchQueue {
return self.dataBuffer.serialQueue
}
init(serialQueue optSerialQueue: DispatchQueue? = nil) {
let serialQueue = optSerialQueue ?? DispatchQueue(label: "com.trifia.networker.datapipe", attributes: [ .serial ], target: nil)
let dataBuffer = DataBuffer(serialQueue: serialQueue)
self.dataBuffer = dataBuffer
}
func asyncIn(data: Data) {
self.dataBuffer.asyncIn(data: data) {
self.flush()
}
}
func `in`(data: Data, flush: Bool = true) {
self.dataBuffer.in(data: data)
if flush {
self.flush()
}
}
func flush() {
guard let readHandler = self.readHandler else {
return
}
self.dataBuffer.out(handler: readHandler)
}
func asyncFlush() {
self.serialQueue.async {
self.flush()
}
}
}
//
// TCPSession.swift
// WebSocket
//
// Created by Stan Chang Khin Boon on 29/7/16.
// Copyright © 2016 Trifia. All rights reserved.
//
import CoreFoundation
import Foundation
protocol TCPSessionDelegate : class {
func TCPSession(session: TCPSession, didReceiveData data: Data) -> DataBuffer.ReadResult
}
/// TCP Session represents a complete life cycle of a TCP socket. It is stateful. But it is not meant to be recycled.
class TCPSession {
let host: String
let port: UInt32
let readStream: CFReadStream
let writeStream: CFWriteStream
let readQueue: DispatchQueue
let writeQueue: DispatchQueue
var readPipe: DataPipe
var writePipe: DataPipe
weak var delegate: TCPSessionDelegate?
init(host: String, port: UInt32, secure: Bool) throws {
guard !secure else {
throw Error.NotImplemented
}
var unmanagedReadStream: Unmanaged<CFReadStream>?
var unmanagedWriteStream: Unmanaged<CFWriteStream>?
CFStreamCreatePairWithSocketToHost(kCFAllocatorDefault, host, UInt32(port), &unmanagedReadStream, &unmanagedWriteStream)
guard let readStream = unmanagedReadStream?.takeRetainedValue() else {
throw Error.NoReadStream
}
guard let writeStream = unmanagedWriteStream?.takeRetainedValue() else {
throw Error.NoWriteStream
}
self.host = host
self.port = port
self.readStream = readStream
self.writeStream = writeStream
self.readQueue = DispatchQueue(label: "com.trifia.websocket.connection.read", attributes: [ .serial ], target: nil)
self.writeQueue = DispatchQueue(label: "com.trifia.websocket.connection.write", attributes: [ .serial ], target: nil)
self.readPipe = DataPipe(serialQueue: self.readQueue)
self.writePipe = DataPipe(serialQueue: self.writeQueue)
let commonStreamEvents: CFStreamEventType = [
.openCompleted,
.errorOccurred,
.endEncountered
]
var context = CFStreamClientContext(version: CFIndex(0), info: Unmanaged.passUnretained(self).toOpaque(), retain: nil, release: nil, copyDescription: nil)
guard CFReadStreamSetClient(self.readStream, commonStreamEvents.union(.hasBytesAvailable).rawValue, read, &context) else {
throw Error.NoReadStream
}
guard CFWriteStreamSetClient(self.writeStream, commonStreamEvents.union(.canAcceptBytes).rawValue, write, &context) else {
throw Error.NoWriteStream
}
CFReadStreamSetDispatchQueue(self.readStream, self.readQueue)
CFWriteStreamSetDispatchQueue(self.writeStream, self.writeQueue)
self.writePipe.readHandler = { [weak self] (data: Data) -> DataBuffer.ReadResult in
return self?._writeHandler(data: data) ?? .NoOperation
}
self.readPipe.readHandler = { [weak self] (data: Data) -> DataBuffer.ReadResult in
guard let strongSelf = self, let delegate = strongSelf.delegate else {
return .NoOperation
}
return delegate.TCPSession(session: strongSelf, didReceiveData: data)
}
}
func connect() throws {
guard CFReadStreamOpen(readStream) else {
throw Error.NoReadStream
}
guard CFWriteStreamOpen(writeStream) else {
throw Error.NoWriteStream
}
}
func asyncSend(data: Data) {
self.writePipe.asyncIn(data: data)
}
func flush() {
self.writePipe.flush()
}
func _writeHandler(data: Data) -> DataBuffer.ReadResult {
dispatchPrecondition(condition: .onQueue(self.writeQueue))
var totalNumberOfBytesWritten = 0
while (CFWriteStreamCanAcceptBytes(self.writeStream) && (totalNumberOfBytesWritten < data.count)) {
let numberOfBytesWritten = data.withUnsafeBytes { (bufferPtr: UnsafeMutablePointer<UInt8>) -> CFIndex in
return CFWriteStreamWrite(self.writeStream, bufferPtr.advanced(by: totalNumberOfBytesWritten), CFIndex(data.count - totalNumberOfBytesWritten))
}
if numberOfBytesWritten > 0 {
totalNumberOfBytesWritten += numberOfBytesWritten
} else if numberOfBytesWritten < 0 {
// TODO: (stan@trifia.com) Encountered error. We should log…
} else {
// Noop
}
}
if totalNumberOfBytesWritten > 0 {
return .Consume(bytes: totalNumberOfBytesWritten)
} else {
return .NoOperation
}
}
}
extension TCPSession {
enum Error : ErrorProtocol {
case NoReadStream
case NoWriteStream
case NotImplemented
}
}
func read(_ readStream: CFReadStream?, _ event: CFStreamEventType, _ optContext: UnsafeMutablePointer<Void>?) {
guard let context = optContext else {
return
}
let session = Unmanaged<TCPSession>.fromOpaque(context).takeUnretainedValue()
if event.contains(.hasBytesAvailable) {
var readCount = 0
while (CFReadStreamHasBytesAvailable(session.readStream)) {
let bufferCount = 1024
guard var buffer = Data(count: bufferCount) else {
return
}
let optData = buffer.withUnsafeMutableBytes { (bufferPtr: UnsafeMutablePointer<UInt8>) -> Data? in
let numberOfBytesRead = CFReadStreamRead(readStream, bufferPtr, bufferCount)
if numberOfBytesRead > 0 {
let range = Range(uncheckedBounds: (0, numberOfBytesRead))
let subdata = buffer.subdata(in: range)
buffer.resetBytes(in: range)
return subdata
} else if numberOfBytesRead < 0 {
// TODO: (stan@trifia.com) Encountered error. We should log…
return nil
} else {
return nil
}
}
if let data = optData {
session.readPipe.in(data: data, flush: false)
readCount += 1
}
}
if readCount > 0 {
session.readPipe.flush()
}
} else if event.contains(.openCompleted) {
} else if event.contains(.errorOccurred) {
} else if event.contains(.endEncountered) {
}
}
func write(_ writeStream: CFWriteStream?, _ event: CFStreamEventType, _ optContext: UnsafeMutablePointer<Void>?) {
guard let context = optContext else {
return
}
let session = Unmanaged<TCPSession>.fromOpaque(context).takeUnretainedValue()
if event.contains(.canAcceptBytes) {
session.flush()
} else if event.contains(.openCompleted) {
} else if event.contains(.errorOccurred) {
} else if event.contains(.endEncountered) {
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment