Skip to content

Instantly share code, notes, and snippets.

@aytekinar aytekinar/zmq.hpp
Created Apr 13, 2018

Embed
What would you like to do?
0MQ wrapper
#ifndef ZMQ_HPP_
#define ZMQ_HPP_
#include <zmq.h>
#include <algorithm>
#include <cstring>
#include <exception>
#include <iterator>
#include <memory>
#include <tuple>
#include <vector>
namespace zmq {
/* Version information */
void version(int &major, int &minor, int &patch) {
zmq_version(&major, &minor, &patch);
}
std::tuple<int, int, int> version() {
int major, minor, patch;
version(major, minor, patch);
return std::make_tuple(major, minor, patch);
}
/* Error handling through exceptions */
struct error : public std::exception {
error() noexcept : errno_{zmq_errno()} {}
const char *what() const noexcept override { return zmq_strerror(errno_); }
operator int() const noexcept { return errno_; }
private:
int errno_;
};
/* Forward declarations and definitions */
struct context;
struct socket;
struct message;
struct poller;
enum struct context_opt : int;
enum struct socket_type : int;
enum struct socket_opt : int;
enum struct monitor_event : int;
enum struct poll_event : short;
struct context {
context();
void set(context_opt, int) const;
int get(context_opt) const;
explicit operator void *() const noexcept;
explicit operator bool() const noexcept;
private:
std::shared_ptr<void> ptr_;
};
struct socket {
socket(const context &, socket_type);
template <class T> void set(socket_opt, const T &) const;
template <class T> void get(socket_opt, T &) const;
void bind(const char *) const;
void connect(const char *) const;
void unbind(const char *) const;
void disconnect(const char *) const;
int send(message, bool = true) const;
template <class T> int send(std::size_t, const T *, int) const;
message receive(bool = true) const;
template <class T> int receive(std::size_t, T *, int) const;
socket monitor(const context &, const char *, monitor_event) const;
socket monitor(const context &, const char *, int) const;
explicit operator void *() const noexcept;
private:
std::unique_ptr<void, void (*)(void *)> ptr_;
};
struct message {
private:
struct part {
/* Constructors */
part() noexcept;
explicit part(std::size_t);
template <class T> part(const T &);
template <class T> part(std::size_t, const T *);
template <class T> part(std::size_t, T *, zmq_free_fn *, void * = nullptr);
template <class Iter> part(Iter, Iter);
/* Copying and moving */
part(const part &) noexcept;
part &operator=(const part &) noexcept;
part(part &&) noexcept;
part &operator=(part &&) noexcept;
/* Destruction */
~part();
/* Convenience */
std::size_t size() const noexcept;
explicit operator void *() noexcept;
void *data() noexcept;
bool more() const noexcept;
int send(const socket &, int);
int receive(const socket &, int);
template <class OutputIt> OutputIt read(OutputIt, OutputIt) noexcept;
void clear() noexcept;
private:
zmq_msg_t msg_;
};
std::vector<part> parts_;
public:
message() noexcept(noexcept(std::vector<part>()));
message(std::vector<part>) noexcept;
std::size_t addpart();
std::size_t addpart(std::size_t);
template <class T> std::size_t addpart(const T &);
template <class T> std::size_t addpart(std::size_t, const T *);
template <class T>
std::size_t addpart(std::size_t, T *, zmq_free_fn *, void * = nullptr);
template <class Iter> std::size_t addpart(Iter, Iter);
void *data(std::size_t) noexcept;
std::size_t numparts() const noexcept;
std::size_t size() const noexcept;
std::size_t size(std::size_t) const noexcept;
part &operator[](std::size_t);
const part &operator[](std::size_t) const;
void pop_back();
int send(const socket &, bool = true);
int receive(const socket &, bool = true);
template <class OutputIt>
OutputIt read(std::size_t, OutputIt, OutputIt) noexcept;
void clear() noexcept;
};
struct poller {
private:
struct item {
explicit item(const zmq_pollitem_t) noexcept;
bool belongsto(const socket &) const noexcept;
bool isready() const noexcept;
private:
const zmq_pollitem_t item_;
};
std::vector<zmq_pollitem_t> items_;
public:
poller() noexcept(noexcept(std::vector<zmq_pollitem_t>()));
void additem(const socket &, poll_event);
void additem(const socket &, short);
std::size_t size() const noexcept;
item operator[](std::size_t) const;
int poll(long);
void clear() noexcept;
};
/* Proxy functionality */
void proxy(const socket *frontend, const socket *backend,
const socket *capture = nullptr) {
void *f = static_cast<void *>(*frontend);
void *b = static_cast<void *>(*backend);
void *c = (capture == nullptr) ? nullptr : static_cast<void *>(*capture);
zmq_proxy(f, b, c);
}
void proxy(const socket *frontend, const socket *backend, const socket *capture,
const socket *control) {
void *f = static_cast<void *>(*frontend);
void *b = static_cast<void *>(*backend);
void *cap = (capture == nullptr) ? nullptr : static_cast<void *>(*capture);
void *con = static_cast<void *>(*control);
zmq_proxy_steerable(f, b, cap, con);
}
/* Implementations */
enum struct context_opt : int {
#ifdef ZMQ_IO_THREADS
io_threads = ZMQ_IO_THREADS,
#endif
#ifdef ZMQ_MAX_SOCKETS
max_sockets = ZMQ_MAX_SOCKETS,
#endif
#ifdef ZMQ_SOCKET_LIMIT
socket_limit = ZMQ_SOCKET_LIMIT,
#endif
#ifdef ZMQ_THREAD_PRIORITY
thread_priority = ZMQ_THREAD_PRIORITY,
#endif
#ifdef ZMQ_THREAD_SCHED_POLICY
thread_sched_policy = ZMQ_THREAD_SCHED_POLICY,
#endif
#ifdef ZMQ_MAX_MSGSZ
max_msgsz = ZMQ_MAX_MSGSZ,
#endif
#ifdef ZMQ_MSG_T_SIZE
msg_t_size = ZMQ_MSG_T_SIZE,
#endif
#ifdef ZMQ_THREAD_AFFINITY_CPU_ADD
thread_affinity_cpu_add = ZMQ_THREAD_AFFINITY_CPU_ADD,
#endif
#ifdef ZMQ_THREAD_AFFINITY_CPU_REMOVE
thread_affinity_cpu_remove = ZMQ_THREAD_AFFINITY_CPU_REMOVE,
#endif
#ifdef ZMQ_THREAD_NAME_PREFIX
thread_name_prefix = ZMQ_THREAD_NAME_PREFIX,
#endif
#ifdef ZMQ_ZERO_COPY_RCV
zero_copy_rcv = ZMQ_ZERO_COPY_RCV,
#endif
};
context::context()
: ptr_{zmq_ctx_new(), [](void *ptr) noexcept {
if (ptr != nullptr)
zmq_ctx_term(ptr);
}} {
if (!ptr_)
throw error();
}
void context::set(context_opt name, int value) const {
int rc = zmq_ctx_set(ptr_.get(), static_cast<int>(name), value);
if (rc != 0)
throw error();
}
int context::get(context_opt name) const {
int val = zmq_ctx_get(ptr_.get(), static_cast<int>(name));
if (val < 0)
throw error();
return val;
}
context::operator void *() const noexcept { return ptr_.get(); }
context::operator bool() const noexcept { return static_cast<bool>(ptr_); }
enum struct socket_type : int {
#ifdef ZMQ_PAIR
pair = ZMQ_PAIR,
#endif
#ifdef ZMQ_PUB
pub = ZMQ_PUB,
#endif
#ifdef ZMQ_SUB
sub = ZMQ_SUB,
#endif
#ifdef ZMQ_REQ
req = ZMQ_REQ,
#endif
#ifdef ZMQ_REP
rep = ZMQ_REP,
#endif
#ifdef ZMQ_DEALER
dealer = ZMQ_DEALER,
#endif
#ifdef ZMQ_ROUTER
router = ZMQ_ROUTER,
#endif
#ifdef ZMQ_PULL
pull = ZMQ_PULL,
#endif
#ifdef ZMQ_PUSH
push = ZMQ_PUSH,
#endif
#ifdef ZMQ_XPUB
xpub = ZMQ_XPUB,
#endif
#ifdef ZMQ_XSUB
xsub = ZMQ_XSUB,
#endif
#ifdef ZMQ_STREAM
stream = ZMQ_STREAM,
#endif
#ifdef ZMQ_SERVER
server = ZMQ_SERVER,
#endif
#ifdef ZMQ_CLIENT
client = ZMQ_CLIENT,
#endif
#ifdef ZMQ_RADIO
radio = ZMQ_RADIO,
#endif
#ifdef ZMQ_DISH
dish = ZMQ_DISH,
#endif
#ifdef ZMQ_GATHER
gather = ZMQ_GATHER,
#endif
#ifdef ZMQ_SCATTER
scatter = ZMQ_SCATTER,
#endif
#ifdef ZMQ_DGRAM
dgram = ZMQ_DGRAM,
#endif
};
enum struct socket_opt : int {
#ifdef ZMQ_AFFINITY
affinity = ZMQ_AFFINITY,
#endif
#ifdef ZMQ_ROUTING_ID
routing_id = ZMQ_ROUTING_ID,
#endif
#ifdef ZMQ_SUBSCRIBE
subscribe = ZMQ_SUBSCRIBE,
#endif
#ifdef ZMQ_UNSUBSCRIBE
unsubscribe = ZMQ_UNSUBSCRIBE,
#endif
#ifdef ZMQ_RATE
rate = ZMQ_RATE,
#endif
#ifdef ZMQ_RECOVERY_IVL
recovery_ivl = ZMQ_RECOVERY_IVL,
#endif
#ifdef ZMQ_SNDBUF
sndbuf = ZMQ_SNDBUF,
#endif
#ifdef ZMQ_RCVBUF
rcvbuf = ZMQ_RCVBUF,
#endif
#ifdef ZMQ_RCVMORE
rcvmore = ZMQ_RCVMORE,
#endif
#ifdef ZMQ_FD
fd = ZMQ_FD,
#endif
#ifdef ZMQ_EVENTS
events = ZMQ_EVENTS,
#endif
#ifdef ZMQ_TYPE
type = ZMQ_TYPE,
#endif
#ifdef ZMQ_LINGER
linger = ZMQ_LINGER,
#endif
#ifdef ZMQ_RECONNECT_IVL
reconnect_ivl = ZMQ_RECONNECT_IVL,
#endif
#ifdef ZMQ_BACKLOG
backlog = ZMQ_BACKLOG,
#endif
#ifdef ZMQ_RECONNECT_IVL_MAX
reconnect_ivl_max = ZMQ_RECONNECT_IVL_MAX,
#endif
#ifdef ZMQ_MAXMSGSIZE
maxmsgsize = ZMQ_MAXMSGSIZE,
#endif
#ifdef ZMQ_SNDHWM
sndhwm = ZMQ_SNDHWM,
#endif
#ifdef ZMQ_RCVHWM
rcvhwm = ZMQ_RCVHWM,
#endif
#ifdef ZMQ_MULTICAST_HOPS
multicast_hops = ZMQ_MULTICAST_HOPS,
#endif
#ifdef ZMQ_RCVTIMEO
rcvtimeo = ZMQ_RCVTIMEO,
#endif
#ifdef ZMQ_SNDTIMEO
sndtimeo = ZMQ_SNDTIMEO,
#endif
#ifdef ZMQ_LAST_ENDPOINT
last_endpoint = ZMQ_LAST_ENDPOINT,
#endif
#ifdef ZMQ_ROUTER_MANDATORY
router_mandatory = ZMQ_ROUTER_MANDATORY,
#endif
#ifdef ZMQ_TCP_KEEPALIVE
tcp_keepalive = ZMQ_TCP_KEEPALIVE,
#endif
#ifdef ZMQ_TCP_KEEPALIVE_CNT
tcp_keepalive_cnt = ZMQ_TCP_KEEPALIVE_CNT,
#endif
#ifdef ZMQ_TCP_KEEPALIVE_IDLE
tcp_keepalive_idle = ZMQ_TCP_KEEPALIVE_IDLE,
#endif
#ifdef ZMQ_TCP_KEEPALIVE_INTVL
tcp_keepalive_intvl = ZMQ_TCP_KEEPALIVE_INTVL,
#endif
#ifdef ZMQ_IMMEDIATE
immediate = ZMQ_IMMEDIATE,
#endif
#ifdef ZMQ_XPUB_VERBOSE
xpub_verbose = ZMQ_XPUB_VERBOSE,
#endif
#ifdef ZMQ_ROUTER_RAW
router_raw = ZMQ_ROUTER_RAW,
#endif
#ifdef ZMQ_IPV6
ipv6 = ZMQ_IPV6,
#endif
#ifdef ZMQ_MECHANISM
mechanism = ZMQ_MECHANISM,
#endif
#ifdef ZMQ_PLAIN_SERVER
plain_server = ZMQ_PLAIN_SERVER,
#endif
#ifdef ZMQ_PLAIN_USERNAME
plain_username = ZMQ_PLAIN_USERNAME,
#endif
#ifdef ZMQ_PLAIN_PASSWORD
plain_password = ZMQ_PLAIN_PASSWORD,
#endif
#ifdef ZMQ_CURVE_SERVER
curve_server = ZMQ_CURVE_SERVER,
#endif
#ifdef ZMQ_CURVE_PUBLICKEY
curve_publickey = ZMQ_CURVE_PUBLICKEY,
#endif
#ifdef ZMQ_CURVE_SECRETKEY
curve_secretkey = ZMQ_CURVE_SECRETKEY,
#endif
#ifdef ZMQ_CURVE_SERVERKEY
curve_serverkey = ZMQ_CURVE_SERVERKEY,
#endif
#ifdef ZMQ_PROBE_ROUTER
probe_router = ZMQ_PROBE_ROUTER,
#endif
#ifdef ZMQ_REQ_CORRELATE
req_correlate = ZMQ_REQ_CORRELATE,
#endif
#ifdef ZMQ_REQ_RELAXED
req_relaxed = ZMQ_REQ_RELAXED,
#endif
#ifdef ZMQ_CONFLATE
conflate = ZMQ_CONFLATE,
#endif
#ifdef ZMQ_ZAP_DOMAIN
zap_domain = ZMQ_ZAP_DOMAIN,
#endif
#ifdef ZMQ_ROUTER_HANDOVER
router_handover = ZMQ_ROUTER_HANDOVER,
#endif
#ifdef ZMQ_TOS
tos = ZMQ_TOS,
#endif
#ifdef ZMQ_CONNECT_ROUTING_ID
connect_routing_id = ZMQ_CONNECT_ROUTING_ID,
#endif
#ifdef ZMQ_GSSAPI_SERVER
gssapi_server = ZMQ_GSSAPI_SERVER,
#endif
#ifdef ZMQ_GSSAPI_PRINCIPAL
gssapi_principal = ZMQ_GSSAPI_PRINCIPAL,
#endif
#ifdef ZMQ_GSSAPI_SERVICE_PRINCIPAL
gssapi_service_principal = ZMQ_GSSAPI_SERVICE_PRINCIPAL,
#endif
#ifdef ZMQ_GSSAPI_PLAINTEXT
gssapi_plaintext = ZMQ_GSSAPI_PLAINTEXT,
#endif
#ifdef ZMQ_HANDSHAKE_IVL
handshake_ivl = ZMQ_HANDSHAKE_IVL,
#endif
#ifdef ZMQ_SOCKS_PROXY
socks_proxy = ZMQ_SOCKS_PROXY,
#endif
#ifdef ZMQ_XPUB_NODROP
xpub_nodrop = ZMQ_XPUB_NODROP,
#endif
#ifdef ZMQ_BLOCKY
blocky = ZMQ_BLOCKY,
#endif
#ifdef ZMQ_XPUB_MANUAL
xpub_manual = ZMQ_XPUB_MANUAL,
#endif
#ifdef ZMQ_XPUB_WELCOME_MSG
xpub_welcome_msg = ZMQ_XPUB_WELCOME_MSG,
#endif
#ifdef ZMQ_STREAM_NOTIFY
stream_notify = ZMQ_STREAM_NOTIFY,
#endif
#ifdef ZMQ_INVERT_MATCHING
invert_matching = ZMQ_INVERT_MATCHING,
#endif
#ifdef ZMQ_HEARTBEAT_IVL
heartbeat_ivl = ZMQ_HEARTBEAT_IVL,
#endif
#ifdef ZMQ_HEARTBEAT_TTL
heartbeat_ttl = ZMQ_HEARTBEAT_TTL,
#endif
#ifdef ZMQ_HEARTBEAT_TIMEOUT
heartbeat_timeout = ZMQ_HEARTBEAT_TIMEOUT,
#endif
#ifdef ZMQ_XPUB_VERBOSER
xpub_verboser = ZMQ_XPUB_VERBOSER,
#endif
#ifdef ZMQ_CONNECT_TIMEOUT
connect_timeout = ZMQ_CONNECT_TIMEOUT,
#endif
#ifdef ZMQ_TCP_MAXRT
tcp_maxrt = ZMQ_TCP_MAXRT,
#endif
#ifdef ZMQ_THREAD_SAFE
thread_safe = ZMQ_THREAD_SAFE,
#endif
#ifdef ZMQ_MULTICAST_MAXTPDU
multicast_maxtpdu = ZMQ_MULTICAST_MAXTPDU,
#endif
#ifdef ZMQ_VMCI_BUFFER_SIZE
vmci_buffer_size = ZMQ_VMCI_BUFFER_SIZE,
#endif
#ifdef ZMQ_VMCI_BUFFER_MIN_SIZE
vmci_buffer_min_size = ZMQ_VMCI_BUFFER_MIN_SIZE,
#endif
#ifdef ZMQ_VMCI_BUFFER_MAX_SIZE
vmci_buffer_max_size = ZMQ_VMCI_BUFFER_MAX_SIZE,
#endif
#ifdef ZMQ_VMCI_CONNECT_TIMEOUT
vmci_connect_timeout = ZMQ_VMCI_CONNECT_TIMEOUT,
#endif
#ifdef ZMQ_USE_FD
use_fd = ZMQ_USE_FD,
#endif
#ifdef ZMQ_GSSAPI_PRINCIPAL_NAMETYPE
gssapi_principal_nametype = ZMQ_GSSAPI_PRINCIPAL_NAMETYPE,
#endif
#ifdef ZMQ_GSSAPI_SERVICE_PRINCIPAL_NAMETYPE
gssapi_service_principal_nametype = ZMQ_GSSAPI_SERVICE_PRINCIPAL_NAMETYPE,
#endif
#ifdef ZMQ_BINDTODEVICE
bindtodevice = ZMQ_BINDTODEVICE,
#endif
#ifdef ZMQ_ZAP_ENFORCE_DOMAIN
zap_enforce_domain = ZMQ_ZAP_ENFORCE_DOMAIN,
#endif
#ifdef ZMQ_LOOPBACK_FASTPATH
loopback_fastpath = ZMQ_LOOPBACK_FASTPATH,
#endif
#ifdef ZMQ_METADATA
metadata = ZMQ_METADATA,
#endif
};
enum struct monitor_event : int {
#ifdef ZMQ_EVENT_CONNECTED
event_connected = ZMQ_EVENT_CONNECTED,
#endif
#ifdef ZMQ_EVENT_CONNECT_DELAYED
event_connect_delayed = ZMQ_EVENT_CONNECT_DELAYED,
#endif
#ifdef ZMQ_EVENT_CONNECT_RETRIED
event_connect_retried = ZMQ_EVENT_CONNECT_RETRIED,
#endif
#ifdef ZMQ_EVENT_LISTENING
event_listening = ZMQ_EVENT_LISTENING,
#endif
#ifdef ZMQ_EVENT_BIND_FAILED
event_bind_failed = ZMQ_EVENT_BIND_FAILED,
#endif
#ifdef ZMQ_EVENT_ACCEPTED
event_accepted = ZMQ_EVENT_ACCEPTED,
#endif
#ifdef ZMQ_EVENT_ACCEPT_FAILED
event_accept_failed = ZMQ_EVENT_ACCEPT_FAILED,
#endif
#ifdef ZMQ_EVENT_CLOSED
event_closed = ZMQ_EVENT_CLOSED,
#endif
#ifdef ZMQ_EVENT_CLOSE_FAILED
event_close_failed = ZMQ_EVENT_CLOSE_FAILED,
#endif
#ifdef ZMQ_EVENT_DISCONNECTED
event_disconnected = ZMQ_EVENT_DISCONNECTED,
#endif
#ifdef ZMQ_EVENT_MONITOR_STOPPED
event_monitor_stopped = ZMQ_EVENT_MONITOR_STOPPED,
#endif
#ifdef ZMQ_EVENT_ALL
event_all = ZMQ_EVENT_ALL,
#endif
};
socket::socket(const context &ctx, socket_type type)
: ptr_{zmq_socket(static_cast<void *>(ctx), static_cast<int>(type)),
[](void *ptr) noexcept { zmq_close(ptr); }} {
if (!ptr_)
throw error();
}
template <class T> void socket::set(socket_opt name, const T &value) const {
int rc =
zmq_setsockopt(ptr_.get(), static_cast<int>(name), &value, sizeof(T));
if (rc != 0)
throw error();
}
template <class T> void socket::get(socket_opt name, T &value) const {
std::size_t sz = sizeof(T);
int rc = zmq_getsockopt(ptr_.get(), static_cast<int>(name), &value, &sz);
if (rc != 0)
throw error();
}
void socket::bind(const char *endpoint) const {
int rc = zmq_bind(ptr_.get(), endpoint);
if (rc != 0)
throw error();
}
void socket::connect(const char *endpoint) const {
int rc = zmq_connect(ptr_.get(), endpoint);
if (rc != 0)
throw error();
}
void socket::unbind(const char *endpoint) const {
int rc = zmq_unbind(ptr_.get(), endpoint);
if (rc != 0)
throw error();
}
void socket::disconnect(const char *endpoint) const {
int rc = zmq_disconnect(ptr_.get(), endpoint);
if (rc != 0)
throw error();
}
int socket::send(message msg, bool wait) const {
int bytes = msg.send(*this, wait);
return bytes;
}
template <class T>
int socket::send(std::size_t len, const T *data, int flags) const {
int bytes =
zmq_send_const(static_cast<void *>(*this), data, len * sizeof(T), flags);
if (bytes < 0)
throw error();
return bytes;
}
message socket::receive(bool wait) const {
message msg;
msg.receive(*this, wait);
return msg;
}
template <class T>
int socket::receive(std::size_t len, T *data, int flags) const {
int bytes =
zmq_recv(static_cast<void *>(*this), data, len * sizeof(T), flags);
if (bytes < 0)
throw error();
return bytes;
}
socket socket::monitor(const context &ctx, const char *endpoint,
monitor_event event) const {
return monitor(ctx, endpoint, static_cast<int>(event));
}
socket socket::monitor(const context &ctx, const char *endpoint,
int events) const {
int rc = zmq_socket_monitor(static_cast<void *>(*this), endpoint, events);
if (rc < 0)
throw error();
socket listener(ctx, socket_type::pair);
listener.connect(endpoint);
return listener;
}
socket::operator void *() const noexcept { return ptr_.get(); }
message::part::part() noexcept { zmq_msg_init(&msg_); }
message::part::part(std::size_t size) {
int rc = zmq_msg_init_size(&msg_, size);
if (rc != 0)
throw error();
}
template <class T> message::part::part(const T &value) : part(sizeof(T)) {
std::memcpy(zmq_msg_data(&msg_), &value, sizeof(T));
}
template <class T>
message::part::part(std::size_t len, const T *data) : part(len * sizeof(T)) {
std::memcpy(zmq_msg_data(&msg_), data, len * sizeof(T));
}
template <class T>
message::part::part(std::size_t len, T *data, zmq_free_fn *ffn, void *hint) {
int rc = zmq_msg_init_data(&msg_, data, len * sizeof(T), ffn, hint);
if (rc != 0)
throw error();
}
template <class Iter> message::part::part(Iter begin, Iter end) {
using size_type = typename std::iterator_traits<Iter>::difference_type;
using value_type = typename std::iterator_traits<Iter>::value_type;
size_type sz = std::distance(begin, end) * sizeof(value_type);
int rc = zmq_msg_init_size(&msg_, sz);
if (rc != 0)
throw error();
value_type *dest = static_cast<value_type *>(zmq_msg_data(&msg_));
std::copy(begin, end, dest);
}
message::part::part(const part &rhs) noexcept : part() {
zmq_msg_copy(&msg_, const_cast<zmq_msg_t *>(&rhs.msg_));
}
message::part &message::part::operator=(const part &rhs) noexcept {
clear();
zmq_msg_copy(&msg_, const_cast<zmq_msg_t *>(&rhs.msg_));
return *this;
}
message::part::part(part &&rhs) noexcept : part() {
zmq_msg_move(&msg_, &rhs.msg_);
}
message::part &message::part::operator=(part &&rhs) noexcept {
clear();
zmq_msg_move(&msg_, &rhs.msg_);
return *this;
}
message::part::~part() { zmq_msg_close(&msg_); }
std::size_t message::part::size() const noexcept { return zmq_msg_size(&msg_); }
message::part::operator void *() noexcept { return zmq_msg_data(&msg_); }
void *message::part::data() noexcept { return static_cast<void *>(*this); }
bool message::part::more() const noexcept { return zmq_msg_more(&msg_) == 1; }
int message::part::send(const socket &s, int flags) {
int bytes = zmq_msg_send(&msg_, static_cast<void *>(s), flags);
if (bytes < 0)
throw error();
return bytes;
}
int message::part::receive(const socket &s, int flags) {
clear();
int bytes = zmq_msg_recv(&msg_, static_cast<void *>(s), flags);
if (bytes < 0)
throw error();
return bytes;
}
template <class OutputIt>
OutputIt message::part::read(OutputIt begin, OutputIt end) noexcept {
using value_type = typename std::iterator_traits<OutputIt>::value_type;
const std::size_t iterlen = std::distance(begin, end);
const std::size_t datalen = size() / sizeof(value_type);
value_type *source = static_cast<value_type *>(static_cast<void *>(*this));
return std::copy(source, source + std::min(iterlen, datalen), begin);
}
void message::part::clear() noexcept {
zmq_msg_close(&msg_);
zmq_msg_init(&msg_);
}
message::message() noexcept(noexcept(std::vector<part>())) = default;
message::message(std::vector<part> parts) noexcept : parts_{std::move(parts)} {}
std::size_t message::addpart() {
parts_.push_back(part());
return parts_.back().size();
}
std::size_t message::addpart(std::size_t size) {
parts_.push_back(part(size));
return parts_.back().size();
}
template <class T> std::size_t message::addpart(const T &value) {
part msg = value;
parts_.push_back(std::move(msg));
return parts_.back().size();
}
template <class T>
std::size_t message::addpart(std::size_t len, const T *data) {
parts_.emplace_back(len, data);
return parts_.back().size();
}
template <class T>
std::size_t message::addpart(std::size_t len, T *data, zmq_free_fn *ffn,
void *hint) {
parts_.emplace_back(len, data, ffn, hint);
return parts_.back().size();
}
template <class Iter> std::size_t message::addpart(Iter begin, Iter end) {
parts_.emplace_back(begin, end);
return parts_.back().size();
}
void *message::data(std::size_t pid) noexcept {
return pid < numparts() ? static_cast<void *>(parts_[pid]) : nullptr;
}
std::size_t message::numparts() const noexcept { return parts_.size(); }
std::size_t message::size() const noexcept {
std::size_t size_ = 0;
for (const auto &part : parts_)
size_ += part.size();
return size_;
}
std::size_t message::size(std::size_t pid) const noexcept {
return pid < numparts() ? parts_[pid].size() : 0;
}
message::part &message::operator[](std::size_t pid) { return parts_[pid]; }
const message::part &message::operator[](std::size_t pid) const {
return parts_[pid];
}
void message::pop_back() { parts_.pop_back(); }
int message::send(const socket &s, bool wait) {
int flags = wait ? 0 : ZMQ_DONTWAIT;
int bytes, total{0};
const std::size_t len = parts_.size();
for (std::size_t idx = 0; idx < len; idx++) {
bytes = parts_[idx].send(s, (idx < len - 1) ? flags | ZMQ_SNDMORE : flags);
total += bytes;
}
return total;
}
int message::receive(const socket &s, bool wait) {
clear();
int flags = wait ? 0 : ZMQ_DONTWAIT;
int bytes, total{0};
part msgpart;
bytes = msgpart.receive(s, flags);
total = bytes;
while (msgpart.more()) {
parts_.push_back(std::move(msgpart));
bytes = msgpart.receive(s, flags);
total += bytes;
}
parts_.push_back(std::move(msgpart));
return total;
}
template <class OutputIt>
OutputIt message::read(std::size_t pid, OutputIt begin, OutputIt end) noexcept {
return pid < numparts() ? parts_[pid].read(begin, end) : begin;
}
void message::clear() noexcept { parts_.clear(); }
enum struct poll_event : short {
#ifdef ZMQ_POLLIN
pollin = ZMQ_POLLIN,
#endif
#ifdef ZMQ_POLLOUT
pollout = ZMQ_POLLOUT,
#endif
#ifdef ZMQ_POLLERR
poller = ZMQ_POLLERR,
#endif
#ifdef ZMQ_POLLPRI
pollpri = ZMQ_POLLPRI,
#endif
};
poller::item::item(const zmq_pollitem_t item) noexcept : item_{item} {}
bool poller::item::belongsto(const socket &s) const noexcept {
return item_.socket == static_cast<void *>(s);
}
bool poller::item::isready() const noexcept {
return item_.events & item_.revents;
}
poller::poller() noexcept(noexcept(std::vector<zmq_pollitem_t>())) = default;
void poller::additem(const socket &s, poll_event pe) {
additem(s, static_cast<short>(pe));
}
void poller::additem(const socket &s, short events) {
zmq_pollitem_t item;
item.socket = static_cast<void *>(s);
item.fd = 0;
item.events = events;
item.revents = 0;
items_.push_back(item);
}
std::size_t poller::size() const noexcept { return items_.size(); }
poller::item poller::operator[](std::size_t idx) const {
return item{items_[idx]};
}
int poller::poll(long timeout) {
int rc = zmq_poll(&items_[0], items_.size(), timeout);
if (rc < 0)
throw error();
return rc;
}
void poller::clear() noexcept { items_.clear(); }
} // namespace zmq
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.