Skip to content

Instantly share code, notes, and snippets.

@goloveychuk
Created April 27, 2016 18:41
Show Gist options
  • Save goloveychuk/ac251335b3a5a5ba6b2d003d94edc1f2 to your computer and use it in GitHub Desktop.
Save goloveychuk/ac251335b3a5a5ba6b2d003d94edc1f2 to your computer and use it in GitHub Desktop.
import ZeroMQ
import Venice
import Foundation
import C7
let context = try! Context()
struct Error: ErrorProtocol {
}
public typealias ZmqSerializer = ProtobufSerializer
public class ZmqConnection {
let socket: ZeroMQ.Socket
let address: String
var polled: Venice.PollEvent? = nil
var fd: Int32 {
return try! socket.getFileDescriptor()
}
init(address: String, type: SocketType) {
self.address = address
socket = try! context.socket(type)
}
public func connect() throws {
try socket.connect(address)
}
func read() -> [Data]? {
if polled == nil {
polled = try! Venice.poll(fd, for: .reading)
guard polled!.contains(.reading) else {
return nil
}
}
let events = try! socket.getEvents()!
if events.contains(.In) {
var msg_parts: [Data] = []
while true {
guard let array = try! socket.receive() else {
break
}
msg_parts.append(array)
if !(try! socket.getReceiveMore()) {
break
}
}
return msg_parts
} else {
polled = nil
return nil
}
}
}
public class ZmqConnectionSubscribe: ZmqConnection {
public var onReceive: ([Data] -> Void)?
public init(address: String) {
super.init(address: address, type: .Sub)
}
public func subscribe(_ subscribe: Data) {
try! socket.setSubscribe(subscribe)
}
public func start_loop() {
while true {
if let red = read() {
onReceive!(red)
}
}
}
}
public class ZmqConnectionRequest: ZmqConnection {
var requests: [String: Channel<[Data]>] = [:]
public convenience init(address: String) {
self.init(address: address, type: .Dealer)
}
public func send(_ data: [Data]) throws -> ReceivingChannel<[Data]> {
let msgId = NSUUID().uuidString
guard try socket.sendString(msgId, mode: [.SendMore, .DontWait]) else {
throw Error()
}
guard try socket.send([], mode: [.SendMore, .DontWait]) else {
throw Error()
}
for msg in data[0..<data.count-1]{
guard try socket.send(msg, mode: [.SendMore, .DontWait]) else {
throw Error()
}
}
guard try socket.send(data.last!, mode: .DontWait) else {
throw Error()
}
let ch = Channel<[Data]>(bufferSize: 1)
requests[msgId] = ch
return ch.receivingChannel
}
public func start_loop() {
while true {
if var data = read() {
let reqId = data.removeFirst()
let reqStr = try! String(data: reqId)
if let resp = requests.removeValue(forKey: reqStr) {
data.removeFirst()
resp.send(data)
} else {
print("no request")
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment