Created
April 27, 2016 18:41
-
-
Save goloveychuk/ac251335b3a5a5ba6b2d003d94edc1f2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ZeroMQ | |
import Venice | |
import Foundation | |
import C7 | |
let context = try! Context() | |
struct Error: ErrorProtocol { | |
} | |
public typealias ZmqSerializer = ProtobufSerializer | |
public class ZmqConnection { | |
let socket: ZeroMQ.Socket | |
let address: String | |
var polled: Venice.PollEvent? = nil | |
var fd: Int32 { | |
return try! socket.getFileDescriptor() | |
} | |
init(address: String, type: SocketType) { | |
self.address = address | |
socket = try! context.socket(type) | |
} | |
public func connect() throws { | |
try socket.connect(address) | |
} | |
func read() -> [Data]? { | |
if polled == nil { | |
polled = try! Venice.poll(fd, for: .reading) | |
guard polled!.contains(.reading) else { | |
return nil | |
} | |
} | |
let events = try! socket.getEvents()! | |
if events.contains(.In) { | |
var msg_parts: [Data] = [] | |
while true { | |
guard let array = try! socket.receive() else { | |
break | |
} | |
msg_parts.append(array) | |
if !(try! socket.getReceiveMore()) { | |
break | |
} | |
} | |
return msg_parts | |
} else { | |
polled = nil | |
return nil | |
} | |
} | |
} | |
public class ZmqConnectionSubscribe: ZmqConnection { | |
public var onReceive: ([Data] -> Void)? | |
public init(address: String) { | |
super.init(address: address, type: .Sub) | |
} | |
public func subscribe(_ subscribe: Data) { | |
try! socket.setSubscribe(subscribe) | |
} | |
public func start_loop() { | |
while true { | |
if let red = read() { | |
onReceive!(red) | |
} | |
} | |
} | |
} | |
public class ZmqConnectionRequest: ZmqConnection { | |
var requests: [String: Channel<[Data]>] = [:] | |
public convenience init(address: String) { | |
self.init(address: address, type: .Dealer) | |
} | |
public func send(_ data: [Data]) throws -> ReceivingChannel<[Data]> { | |
let msgId = NSUUID().uuidString | |
guard try socket.sendString(msgId, mode: [.SendMore, .DontWait]) else { | |
throw Error() | |
} | |
guard try socket.send([], mode: [.SendMore, .DontWait]) else { | |
throw Error() | |
} | |
for msg in data[0..<data.count-1]{ | |
guard try socket.send(msg, mode: [.SendMore, .DontWait]) else { | |
throw Error() | |
} | |
} | |
guard try socket.send(data.last!, mode: .DontWait) else { | |
throw Error() | |
} | |
let ch = Channel<[Data]>(bufferSize: 1) | |
requests[msgId] = ch | |
return ch.receivingChannel | |
} | |
public func start_loop() { | |
while true { | |
if var data = read() { | |
let reqId = data.removeFirst() | |
let reqStr = try! String(data: reqId) | |
if let resp = requests.removeValue(forKey: reqStr) { | |
data.removeFirst() | |
resp.send(data) | |
} else { | |
print("no request") | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment