Skip to content

Instantly share code, notes, and snippets.

@thehesiod
Last active May 14, 2017 18:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thehesiod/281b539147affedd5cc942880c5fa311 to your computer and use it in GitHub Desktop.
Save thehesiod/281b539147affedd5cc942880c5fa311 to your computer and use it in GitHub Desktop.
libzmq bug
#include <unistd.h>
#include <sys/types.h>
#include <errno.h>
#include "zmq.h"
#include <stdio.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <stdexcept>
#include <time.h>
#include <string>
#include <vector>
#include <thread>
#include <assert.h>
#include <chrono>
#include <iostream>
#include <mutex>
#include <string.h>
/*
docker run --rm -ti debian
apt-get update
apt-get install -y clang wget libsodium13 libunwind8 libpgm-5.1-0 vim
cd /tmp
wget http://download.opensuse.org/repositories/network:/messaging:/zeromq:/git-stable/Debian_8.0/amd64/libzmq5_4.2.2+git20170510-0_amd64.deb
wget http://download.opensuse.org/repositories/network:/messaging:/zeromq:/git-stable/Debian_8.0/amd64/libzmq3-dev_4.2.2+git20170510-0_amd64.deb
dpkg -i libzmq3-dev_4.2.2+git20170510-0_amd64.deb libzmq5_4.2.2+git20170510-0_amd64.deb
clang++ -std=c++11 -lzmq -lpthread test.cpp -o test
*/
typedef void (*WorkerFuncPtr)();
#define NUM_PROCS 6
#define NUM_ITERS 4
#define NUM_TERMINATORS NUM_PROCS * NUM_ITERS
#define TEMP_PATH "/tmp/foobar"
#define SOCKET_NAME "ipc:///tmp/foobar"
inline bool path_exists(const char* name) {
return access(name, F_OK) != -1 ;
}
class zmq_error : public std::exception
{
public:
zmq_error () : errnum (zmq_errno ()) {}
virtual const char *what () const noexcept
{
return zmq_strerror (errnum);
}
private:
int errnum;
};
void ensure_rc(int rc)
{
if (rc != 0)
{
zmq_error err;
printf("Error occurred %s\n", err.what());
throw err;
}
}
class zmq_message {
friend class socket_t;
public:
inline zmq_message ()
{
ensure_rc(zmq_msg_init(&msg));
}
inline zmq_message(const void *data_, size_t size_)
{
ensure_rc(zmq_msg_init_size(&msg, size_));
memmove(data(), data_, size_);
}
inline ~zmq_message() noexcept
{
int rc = zmq_msg_close (&msg);
assert(rc == 0);
}
inline void *data() noexcept
{
return zmq_msg_data (&msg);
}
inline size_t size() const noexcept
{
return zmq_msg_size (const_cast<zmq_msg_t*>(&msg));
}
zmq_msg_t msg;
private:
zmq_message (const zmq_message&);
void operator = (const zmq_message&);
};
class zmq_context
{
public:
zmq_context() {
int io_threads_ = 1;
int max_sockets_ = ZMQ_MAX_SOCKETS_DFLT;
ptr = zmq_ctx_new();
if(!ptr)
throw zmq_error();
ensure_rc(zmq_ctx_set(ptr, ZMQ_IO_THREADS, io_threads_));
ensure_rc(zmq_ctx_set(ptr, ZMQ_MAX_SOCKETS, max_sockets_));
}
void close()
{
if(!ptr)
return;
int rc = zmq_ctx_destroy(ptr);
assert(rc == 0);
ptr = NULL;
}
~zmq_context()
{
close();
}
void* ptr;
};
class zmq_socket_t
{
public:
inline zmq_socket_t(zmq_context& context_, int type_)
{
ptr = zmq_socket(context_.ptr, type_);
if (ptr == NULL)
throw zmq_error();
}
inline void bind(const char* addr)
{
ensure_rc(zmq_bind(ptr, addr));
}
inline void connect (const char *addr_)
{
ensure_rc(zmq_connect(ptr, addr_));
}
inline bool recv(zmq_message& msg_, int flags_ = 0)
{
int nbytes = zmq_msg_recv (&(msg_.msg), ptr, flags_);
if (nbytes >= 0)
return true;
if (zmq_errno () == EAGAIN)
return false;
throw zmq_error();
}
inline void close() noexcept
{
if(ptr == NULL)
return ;
int rc = zmq_close (ptr);
assert(rc == 0);
ptr = 0;
}
~zmq_socket_t() noexcept
{
close();
}
void* ptr;
};
void ensure_send(void* socket, zmq_message& msg)
{
// pyzmq uses a poller to only send when we can, see: http://api.zeromq.org/4-2:zmq-poll
int rc = 0;
while(1)
{
int val = 0;
size_t val_size = sizeof(val);
rc = zmq_getsockopt(socket, ZMQ_EVENTS, &val, &val_size);
if (rc != 0)
{
printf("Error setting ZMQ_SNDHWM %d %d\n", rc, zmq_errno());
throw std::runtime_error("Error sending message");
}
if(val & POLL_OUT)
{
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
int nbytes = zmq_msg_send(&(msg.msg), socket, ZMQ_DONTWAIT);
if (nbytes < 0)
{
int rc = zmq_errno();
if (rc != 0)
{
std::cout << "Error sending message rc: " << rc << std::endl;
throw std::runtime_error("Error sending message");
}
}
}
void ensure_hwm(void* socket)
{
int val = 2;
int rc = zmq_setsockopt(socket, ZMQ_SNDHWM, &val, sizeof(val));
if (rc != 0)
{
std::cout << "Error setting ZMQ_SNDHWM" << rc << " " << zmq_errno() << std::endl;
throw std::runtime_error("Error setting ZMQ_SNDHWM");
}
rc = zmq_setsockopt(socket, ZMQ_RCVHWM, &val, sizeof(val));
if (rc != 0)
{
std::cout << "Error setting ZMQ_RCVHWM " << rc << " " << zmq_errno() << std::endl;
throw std::runtime_error("Error setting ZMQ_RCVHWM");
}
val = -1;
rc = zmq_setsockopt(socket, ZMQ_LINGER, &val, sizeof(val));
if (rc != 0)
{
std::cout << "Error setting ZMQ_LINGER " << rc << " " << zmq_errno() << std::endl;
throw std::runtime_error("Error setting ZMQ_LINGER");
}
}
void sender()
{
std::string obj_bytes(229139, 'a');
std::string term_bytes(41, 't');
try
{
void* context = zmq_ctx_new();
void* socket = zmq_socket(context, ZMQ_PUSH);
ensure_hwm(socket);
int rc = zmq_connect(socket, SOCKET_NAME);
if (rc != 0)
{
std::cout << "Error connecting to socket rc:" << rc << " " << zmq_errno() <<std::endl;
throw std::runtime_error("Error connecting to socket");
}
for(int i = 0; i < NUM_ITERS; ++i)
{
std::cout << "Sndr iter: " << i << "/" << NUM_ITERS << std::endl;
for(int j = 0; j < 5; ++j)
{
zmq_message msg(obj_bytes.c_str(), obj_bytes.length());
ensure_send(socket, msg);
}
std::cout << "Sending terminator" << std::endl;
zmq_message msg2(term_bytes.c_str(), term_bytes.length());
ensure_send(socket, msg2);
}
rc = zmq_close (socket);
if(rc != 0)
{
std::cout << "Error destroying context rc:" << rc << " " << zmq_errno() <<std::endl;
throw std::runtime_error("Error closing socket");
}
rc = zmq_ctx_destroy(context);
if(rc != 0)
{
std::cout << "Error destroying context rc:" << rc << " " << zmq_errno() <<std::endl;
throw std::runtime_error("Error destroying context");
}
std::cout << "Exiting sender" << std::endl;
}
catch(std::exception& e)
{
std::cout << "Error in sender " << e.what() << std::endl;
throw;
}
}
void receiver(std::mutex* mutex)
{
try
{
zmq_context context;
zmq_socket_t socket(context, ZMQ_PULL);
ensure_hwm(socket.ptr);
socket.bind(SOCKET_NAME);
std::cout << "Receiver ready" << std::endl;
mutex->unlock();
int total_received = 0;
int num_terminators = NUM_TERMINATORS;
while(1)
{
zmq_message msg;
bool success = socket.recv(msg);
if(!success) {
throw std::runtime_error("Error receiving message");
}
char* data = static_cast<char*>(msg.data());
total_received += 1;
if(data[0] == 't')
{
num_terminators -= 1;
}
std::cout << "Rcv: " << data[0] << " size:" << msg.size() << " rcv'd: " << total_received << " waiting terms: " << num_terminators << "/" << NUM_TERMINATORS << std::endl;
if(num_terminators == 0)
{
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
socket.close();
context.close();
std::cout << "Total received: " << total_received << std::endl;
}
catch(std::exception& e)
{
std::cout << "Error in receiver " << e.what() << std::endl;
throw;
}
}
pid_t fork_method(WorkerFuncPtr funcPtr)
{
pid_t childPID = fork();
if(childPID >= 0) // fork was successful
{
if(childPID == 0) // child process
{
bool wait = false;
while(wait)
{
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
funcPtr();
exit(0);
}
else //Parent process
{
return childPID; // fall through
}
}
else // fork failed
{
throw std::runtime_error("Error forking");
}
}
void forked_senders()
{
std::vector<pid_t> workers;
for(int i = 0; i < NUM_PROCS; ++i)
{
workers.push_back(fork_method(sender));
}
std::cout << "Waiting for sends to complete" << std::endl;
for (auto pid : workers) {
int status = 0;
while (-1 == waitpid(pid, &status, 0));
if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
std::cout << "Process " << pid << " failed with status: " << status << std::endl;
exit(1);
}
}
}
void threaded_senders()
{
std::vector<std::thread> workers;
for(int i = 0; i < NUM_PROCS; ++i)
{
workers.push_back(std::thread(sender));
}
std::cout << "Waiting for sends to complete" << std::endl;
for (auto& worker : workers) {
worker.join();
}
}
int main(int argc, const char * argv[])
{
try {
if(path_exists(TEMP_PATH)) {
unlink(TEMP_PATH);
}
std::mutex mutex;
mutex.lock();
std::thread rcvThread(receiver, &mutex);
// wait for receiver to be ready, should use event
mutex.lock();
threaded_senders();
std::cout << "Waiting to receive all terminators" << std::endl;
rcvThread.join();
} catch (std::exception& e) {
std::cout << "Error occurred " << e.what() << std::endl;
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment