Skip to content

Instantly share code, notes, and snippets.

@vovasty
Created October 6, 2016 18:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vovasty/c1378f035bfd3a20c7dd2d6941150f59 to your computer and use it in GitHub Desktop.
Save vovasty/c1378f035bfd3a20c7dd2d6941150f59 to your computer and use it in GitHub Desktop.
import ZeroMQ
import CZeroMQ
import Foundation
import Venice
public struct ZeroMqError : Error, CustomStringConvertible {
public let description: String
static var lastError: Error {
let description = String(validatingUTF8: zmq_strerror(zmq_errno()))!
return ZeroMqError(description: description)
}
}
public final class VeniceContext {
let context: UnsafeMutableRawPointer?
public init() throws {
context = zmq_ctx_new()
if context == nil {
throw ZeroMqError.lastError
}
}
deinit {
zmq_ctx_term(context)
}
public func terminate() throws {
if zmq_ctx_term(context) == -1 {
throw ZeroMqError.lastError
}
}
func setOption(_ option: Int32, value: Int32) {
zmq_ctx_set(context, option, value)
}
func getOption(_ option: Int32) -> Int32 {
return zmq_ctx_get(context, option)
}
public func socket(_ type: SocketType) throws -> VeniceSocket {
guard let socket = zmq_socket(context, type.rawValue) else {
throw ZeroMqError.lastError
}
return VeniceSocket(socket: socket)
}
}
public class VeniceSocket {
let socket: UnsafeMutableRawPointer
init(socket: UnsafeMutableRawPointer) {
self.socket = socket
}
deinit {
zmq_close(socket)
}
func getOption(_ option: Int32, value: UnsafeMutableRawPointer, length: UnsafeMutablePointer<Int>) throws {
if zmq_getsockopt(socket, option, value, length) == -1 {
throw ZeroMqError.lastError
}
}
public func bind(_ endpoint: String) throws {
if zmq_bind(socket, endpoint) == -1 {
throw ZeroMqError.lastError
}
}
public func connect(_ endpoint: String) throws {
if zmq_connect(socket, endpoint) == -1 {
throw ZeroMqError.lastError
}
}
public func sendMessage(_ message: Message, mode: SendMode = []) throws {
var m = mode
m.update(with: .DontWait)
try wait {
return zmq_msg_send(&message.message, socket, Int32(mode.rawValue))
}
}
func send(_ buffer: UnsafeMutableRawPointer, length: Int, mode: SendMode = []) throws {
var m = mode
m.update(with: .DontWait)
try wait {
return zmq_send(socket, buffer, length, Int32(mode.rawValue))
}
}
public func send(_ data: Data, mode: SendMode = []) throws {
var data = data
let _ = try data.withUnsafeMutableBytes { bytes in
try self.send(bytes, length: data.count, mode: mode)
}
}
public func receiveMessage(_ mode: ReceiveMode = []) throws -> Message {
var m = mode
m.update(with: .DontWait)
let message = try Message()
try wait {
return zmq_msg_recv(&message.message, socket, Int32(m.rawValue))
}
return message
}
public func receive(_ bufferSize: Int = 1024, mode: ReceiveMode = []) throws -> Data {
var m = mode
m.update(with: .DontWait)
var data = Data(count: bufferSize)
let result = try wait {
data.withUnsafeMutableBytes { bytes in
zmq_recv(socket, bytes, bufferSize, Int32(mode.rawValue))
}
}
let bufferEnd = min(Int(result), bufferSize)
return Data(data[0 ..< bufferEnd])
}
public func close() throws {
if zmq_close(socket) == -1 {
throw ZeroMqError.lastError
}
}
fileprivate func wait1<T>(_ closure: () throws -> (Int32, T?)) throws -> T? {
let result = try closure()
guard result.0 == -1 else { return result.1 }
guard zmq_errno() == EAGAIN else { throw ZeroMqError.lastError }
var value: Int32 = 0
var length = strideof(Int32.self)
try getOption(ZMQ_FD, value: &value, length: &length)
let fd = value
while true {
let events = try poll(fd, events: .read, deadline: -1)
guard events.contains(.read) else {
throw ZeroMqVeniceError(description: "Unable to poll")
}
let result = try closure()
guard result.0 == -1 else { return result.1 }
guard zmq_errno() == EAGAIN else { throw ZeroMqError.lastError }
}
}
@discardableResult
fileprivate func wait(_ closure: () throws -> (Int32)) throws -> Int32 {
let result = try closure()
guard result == -1 else { return result }
guard zmq_errno() == EAGAIN else { throw ZeroMqError.lastError }
var value: Int32 = 0
var length = strideof(Int32.self)
try getOption(ZMQ_FD, value: &value, length: &length)
let fd = value
while true {
let events = try poll(fd, events: .read, deadline: -1)
guard events.contains(.read) else {
throw ZeroMqVeniceError(description: "Unable to poll")
}
let result = try closure()
guard result == -1 else { return result }
guard zmq_errno() == EAGAIN else { throw ZeroMqError.lastError }
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment