Skip to content

Instantly share code, notes, and snippets.

@moteus
Created January 10, 2014 06:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save moteus/8347993 to your computer and use it in GitHub Desktop.
Save moteus/8347993 to your computer and use it in GitHub Desktop.
Binary zeromq message class.
#ifndef __BZMSG_H_INCLUDED__
#define __BZMSG_H_INCLUDED__
#include <zmq.hpp>
#include <vector>
#include <list>
#include <iostream>
#include <iomanip>
#include <string>
#include <boost/tr1/memory.hpp>
using std::tr1::shared_ptr;
#include <boost/static_assert.hpp>
#define STATIC_ASSERT BOOST_STATIC_ASSERT
#include <boost/cstdint.hpp>
using boost::uint8_t;
using boost::uint16_t;
using boost::uint32_t;
using boost::uint64_t;
using boost::int8_t;
using boost::int16_t;
using boost::int32_t;
using boost::int64_t;
class bzmsg{
public:
typedef shared_ptr<bzmsg> pointer;
class row_t{
public:
typedef shared_ptr<row_t> pointer;
public:
row_t(){
}
row_t(const row_t&rhs){
this->row_ = rhs.row_;
}
explicit row_t(const char*str){
this->set(str);
}
explicit row_t(const std::string &str){
this->set(str);
}
explicit row_t(const byte* ptr, size_t len){
this->set(ptr,len);
}
explicit row_t(const zmq::message_t &msg){
this->set(msg);
}
row_t& operator= (const row_t&rhs){
this->row_ = rhs.row_;
return *this;
}
void set(const char*str){
size_t len = strlen(str);
row_.resize(len);
std::copy(&str[0],&str[len], row_.begin());
}
void set(const std::string &str){
row_.resize(str.size());
std::copy(str.begin(), str.end(), row_.begin());
}
void set(const byte* ptr, size_t len){
row_.resize(len);
std::copy(&ptr[0],&ptr[len], row_.begin());
}
void set(const zmq::message_t &msg){
size_t len = msg.size();
const byte* ptr = static_cast<const byte*>(msg.data());
row_.resize(len);
std::copy(&ptr[0], &ptr[len], row_.begin());
}
size_t size() const{
return row_.size();
}
byte* data(){
return size() ? &row_[0] : 0;
}
const byte* data() const{
return size() ? &row_[0] : 0;
}
bool is_str() const{
return (size() > 0) && ('\0' == data()[size()-1]);
}
bool is_uuid() const{
return (size() == 17) && (data()[0] == 0);
}
bool is_str_uuid() const{
return (size() == 34) && (data()[0] == '@') && is_str();
}
const char* c_str() const{
assert(is_str());
return reinterpret_cast<const char*>(this->data());
}
char* c_str(){
assert(is_str());
return reinterpret_cast<char*>(this->data());
}
char* to_str(){
if(!is_str()){
row_.resize(size()+1);
row_[size()-1] = '\0';
}
assert(is_str());
return this->c_str();
}
void clear(){
row_.clear();
}
bool is_equal(const char *str)const{
size_t len = strlen(str);
// we ignore last zero if row is string
size_t data_len = is_str() ? size()-1 : size();
if(len != data_len) return false;
return 0 == data_cmp(str,len);
}
bool is_equal(const byte *ptr, size_t bytes)const{
if(bytes != size())
return false;
return 0 == data_cmp(ptr, bytes);
}
bool is_equal(const row_t &rhs)const{
return is_equal(rhs.data(),rhs.size());
}
bool is_startwith(const char *str)const{
size_t len = strlen(str);
if(size() < len)
return false;
return 0 == data_cmp(str, len);
}
bool is_less(const byte *ptr, size_t bytes)const{
const size_t sz = size();
const int r = data_cmp(ptr, min(sz,bytes));
if(r == 0) return sz < bytes;
return r < 0;
}
bool is_less(const row_t &rhs)const{
return is_less(rhs.data(),rhs.size());
}
private:
template <typename BYTE_TYPE>
int data_cmp(const BYTE_TYPE* ptr, size_t bytes)const{
STATIC_ASSERT(sizeof(BYTE_TYPE) == 1);
assert(bytes <= size());
return memcmp(data(), (byte*)ptr, bytes);
}
private:
std::vector<byte> row_;
public:
struct is_less_fn{
bool operator()(const row_t::pointer lhs, const row_t::pointer rhs)const{return lhs->is_less(*rhs);}
};
};
typedef row_t::pointer row_ptr;
typedef std::list<row_ptr> rows_t;
public:
size_t parts()const{
return rows_.size();
}
void clear(){
rows_.clear();
}
void push_front(){
rows_.push_front(row_ptr(new row_t()));
}
void push_front(row_ptr part){
rows_.push_front(part);
}
void push_front(const char *part){
rows_.push_front(row_ptr(new row_t(part)));
}
void push_front(const std::string &part){
rows_.push_front(row_ptr(new row_t(part)));
}
void push_front(const byte *ptr, size_t len){
rows_.push_front(row_ptr(new row_t(ptr, len)));
}
void push_front(const char *ptr, size_t len){
rows_.push_front(row_ptr(new row_t((byte*)ptr, len)));
}
void push_front(const zmq::message_t &msg){
rows_.push_front(row_ptr(new row_t(msg)));
}
void push_back(){
rows_.push_back(row_ptr(new row_t()));
}
void push_back(row_ptr part){
rows_.push_back(part);
}
void push_back(const char *part){
rows_.push_back(row_ptr(new row_t(part)));
}
void push_back(const std::string &part){
rows_.push_back(row_ptr(new row_t(part)));
}
void push_back(const byte*ptr, size_t len){
rows_.push_back(row_ptr(new row_t(ptr, len)));
}
void push_back(const char*ptr, size_t len){
rows_.push_back(row_ptr(new row_t((byte*)ptr, len)));
}
void push_back(const zmq::message_t &msg){
rows_.push_back(row_ptr(new row_t(msg)));
}
row_ptr pop_front(){
if(this->parts() == 0){
throw std::out_of_range("message is empty");
}
row_ptr part = top();
rows_.pop_front();
return part;
}
row_ptr pop_back(){
if(this->parts() == 0){
throw std::out_of_range("message is empty");
}
row_ptr part = back();
rows_.pop_back();
return part;
}
row_ptr top(){
if(this->parts() == 0){
throw std::out_of_range("message is empty");
}
return rows_.front();
}
const row_ptr top()const{
if(this->parts() == 0){
throw std::out_of_range("message is empty");
}
return rows_.front();
}
row_ptr back(){
if(this->parts() == 0){
throw std::out_of_range("message is empty");
}
return rows_.back();
}
const row_ptr back()const{
if(this->parts() == 0){
throw std::out_of_range("message is empty");
}
return rows_.back();
}
rows_t::const_iterator begin()const{
return rows_.begin();
}
rows_t::const_reverse_iterator rbegin()const{
return rows_.rbegin();
}
rows_t::const_iterator end()const{
return rows_.end();
}
rows_t::const_reverse_iterator rend()const{
return rows_.rend();
}
template<typename A, typename D>
void wrap(A addr, D delim){
if(delim){
push_front(delim);
}
push_front(addr);
}
void wrap(bzmsg::pointer addr, const char *delim){
if(delim){
push_front(delim);
}
while(addr->parts()){
row_ptr part = addr->pop_back();
push_front(part);
}
}
bzmsg::pointer unwrap(){
bzmsg::pointer addr = bzmsg::pointer(new bzmsg);
while(parts()){
row_ptr part = pop_front();
if(part->size() == 0)
break;
addr->push_back(part);
}
return addr;
}
row_ptr address(){
if(parts() == 0)
return row_ptr();
return top();
}
const row_ptr address() const{
if(parts() == 0)
return row_ptr();
return top();
}
row_ptr body(){
if(parts() == 0)
return row_ptr();
return rows_.back();
}
const row_ptr body() const{
if(parts() == 0)
return row_ptr();
return rows_.back();
}
bool recv(zmq::socket_t & socket){
rows_t::iterator it = rows_.begin();
rows_t::iterator en = rows_.end();
bool is_end = (it == en);
bool res = true;
while(1){
zmq::message_t message(0);
while(!socket.recv(&message, 0));//EAGAIN
if(is_end){
push_back(message);
}
else{
(*it)->set(message);
++it;
is_end = (it == en);
}
int64_t more;
size_t more_size = sizeof(more);
socket.getsockopt(ZMQ_RCVMORE, &more, &more_size);
if(!more){
break;
}
}
if(!is_end)
rows_.erase(it, en);
return res;
}
bool send(zmq::socket_t & socket) const{
for(rows_t::const_iterator it=rows_.begin(), en = rows_.end(); it!=en;){
zmq::message_t message((*it)->size());
memcpy(message.data(), (*it)->data(), (*it)->size());
++it;
while(!socket.send(message, (it == en) ? 0 : ZMQ_SNDMORE));//EAGAIN
}
return true;
}
void dump(std::ostream&ost = std::cerr)const;
private:
rows_t rows_;
};
inline
std::ostream& operator << (std::ostream&ost,const bzmsg::row_t& r){
const byte *data = r.data();
size_t size = r.size();
// Dump the message as text or binary
int is_text = 1;
for (unsigned int char_nbr = 0; char_nbr < size; char_nbr++)
if (data [char_nbr] < 32 || data [char_nbr] > 127)
is_text = 0;
for (unsigned int char_nbr = 0; char_nbr < size; char_nbr++) {
if (is_text) {
ost << (char) data [char_nbr];
} else {
ost << std::hex << std::setw(2) << std::setfill('0') << (short int) data [char_nbr];
}
}
return ost;
}
inline
std::ostream& operator << (std::ostream&ost, const bzmsg& msg){
msg.dump(ost);
return ost;
}
inline
void bzmsg::dump(std::ostream&ost)const{
ost << "--------------------------------------" << std::endl;
for(rows_t::const_iterator it=rows_.begin(), en = rows_.end(); it!=en; ++it){
size_t size = (*it)->size();
ost << "[" << std::setw(3) << std::setfill('0') << (int) size << "] "
<< **it << std::endl;
}
}
typedef bzmsg::pointer bzmsg_ptr;
#endif /* BZMSG_H_ */
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment