Skip to content

Instantly share code, notes, and snippets.

@oktal
Last active August 29, 2015 14:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save oktal/cfef32adff358e8f5d3b to your computer and use it in GitHub Desktop.
Save oktal/cfef32adff358e8f5d3b to your computer and use it in GitHub Desktop.
Multipart messaging protocol with nanomsg
#include <nanomsg/nn.h>
#include <nanomsg/reqrep.h>
#include <string>
#include <cstring>
#include <cstdio>
#include <iostream>
#include <vector>
#include <memory>
#include <limits>
namespace Nano {
const char* encodeMesage(const char* s) {
return s;
}
std::string encodeMessage(const std::string& str) {
return str;
}
template<typename Int>
typename std::enable_if<std::is_integral<Int>::value, std::string>::type
encodeMessage(Int val) {
return std::to_string(val);
}
template<typename Float>
typename std::enable_if<std::is_floating_point<Float>::value, std::string>::type
encodeMessage(Float val) {
return std::to_string(val);
}
namespace Multipart {
typedef uint16_t SizeType;
static constexpr size_t MaxParts = std::numeric_limits<SizeType>::max();
namespace details {
struct Packer {
std::vector<std::string> payload;
template<typename Head, typename... Tail> void pack(Head&& head, Tail&& ...rest)
{
payload.push_back(Nano::encodeMessage(std::forward<Head>(head)));
pack(rest...);
}
void pack()
{
}
};
}
std::pair<char *, size_t> pack(const std::vector<std::string>& message)
{
const auto partsCount = message.size();
if (partsCount > MaxParts)
throw std::invalid_argument("The message contains more parts than allowed");
size_t totalLen = sizeof(SizeType); /* The number of parts */
for (const auto& msg: message) {
if (msg.size() > MaxParts)
throw std::invalid_argument("The message is too big");
totalLen += (msg.size() + sizeof(SizeType)); /* Message size + bytes */
}
auto encodeSize = [](SizeType size, char *&buf) {
*buf++ = (size >> 8) & 0xFF;
*buf++ = size & 0xFF;
};
std::unique_ptr<char[]> buf(new char[totalLen]);
char* ptr = buf.get();
encodeSize(partsCount, ptr);
for (const auto& msg: message) {
encodeSize(msg.size(), ptr);
std::memcpy(ptr, msg.c_str(), msg.size());
ptr += msg.size();
}
return std::make_pair(buf.release(), totalLen);
}
std::vector<std::string> unpack(const char* buf)
{
auto decodeSize = [](const char* buf) -> SizeType {
return buf[0] << 8 | buf[1];
};
const SizeType parts = decodeSize(buf);
buf += 2;
std::vector<std::string> messages;
messages.reserve(parts);
for (SizeType i = 0; i < parts; ++i) {
const SizeType len = decodeSize(buf);
buf += 2;
messages.push_back(std::string(buf, buf + len));
buf += len;
}
return messages;
}
int send(int sock, const std::vector<std::string>& message) {
char* msg = 0;
size_t len = 0;
std::tie(msg, len) = pack(message);
int nbytes = nn_send(sock, msg, len, 0);
return nbytes;
}
int recv(int sock, std::vector<std::string>& message) {
void* buf = NULL;
int nbytes = nn_recv(sock, &buf, NN_MSG, 0);
if (nbytes > 0) {
message = unpack(static_cast<const char*>(buf));
}
return nbytes;
}
template<typename... Args>
int send(int sock, Args&& ...args) {
details::Packer packer;
packer.pack(args...);
char* msg = 0;
size_t len = 0;
std::tie(msg, len) = pack(packer.payload);
int nbytes = nn_send(sock, msg, len, 0);
return nbytes;
}
} // namespace Multipart
} // namespace Nano
void test_multipart(const std::string& url)
{
int sock1 = nn_socket(AF_SP, NN_REQ);
if (sock1 == -1)
perror("nn_socket");
if (nn_bind(sock1, url.c_str()) == -1)
perror("nn_bind");
int sock2 = nn_socket(AF_SP, NN_REP);
if (sock2 == -1)
perror("nn_socket");
if (nn_connect(sock2, url.c_str()) == -1)
perror("nn_connect");
int nbytes = Nano::Multipart::send(sock1, "Hello", std::string("World"), 21);
if (nbytes == -1)
perror("send_multipart");
else
std::cout << "Sent " << nbytes << " bytes" << std::endl;
std::vector<std::string> message;
nbytes = Nano::Multipart::recv(sock2, message);
if (nbytes == -1)
perror("recv_multipart");
else {
std::cout << "Received " << nbytes << " bytes" << std::endl;
for (const auto& msg: message) std::cout << msg << std::endl;
}
}
int main()
{
test_multipart("tcp://127.0.0.1:9090");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment