Last active
May 14, 2017 18:22
-
-
Save thehesiod/281b539147affedd5cc942880c5fa311 to your computer and use it in GitHub Desktop.
libzmq bug
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <unistd.h> | |
#include <sys/types.h> | |
#include <errno.h> | |
#include "zmq.h" | |
#include <stdio.h> | |
#include <sys/wait.h> | |
#include <stdlib.h> | |
#include <stdexcept> | |
#include <time.h> | |
#include <string> | |
#include <vector> | |
#include <thread> | |
#include <assert.h> | |
#include <chrono> | |
#include <iostream> | |
#include <mutex> | |
#include <string.h> | |
/* | |
docker run --rm -ti debian | |
apt-get update | |
apt-get install -y clang wget libsodium13 libunwind8 libpgm-5.1-0 vim | |
cd /tmp | |
wget http://download.opensuse.org/repositories/network:/messaging:/zeromq:/git-stable/Debian_8.0/amd64/libzmq5_4.2.2+git20170510-0_amd64.deb | |
wget http://download.opensuse.org/repositories/network:/messaging:/zeromq:/git-stable/Debian_8.0/amd64/libzmq3-dev_4.2.2+git20170510-0_amd64.deb | |
dpkg -i libzmq3-dev_4.2.2+git20170510-0_amd64.deb libzmq5_4.2.2+git20170510-0_amd64.deb | |
clang++ -std=c++11 -lzmq -lpthread test.cpp -o test | |
*/ | |
typedef void (*WorkerFuncPtr)(); | |
#define NUM_PROCS 6 | |
#define NUM_ITERS 4 | |
#define NUM_TERMINATORS NUM_PROCS * NUM_ITERS | |
#define TEMP_PATH "/tmp/foobar" | |
#define SOCKET_NAME "ipc:///tmp/foobar" | |
inline bool path_exists(const char* name) { | |
return access(name, F_OK) != -1 ; | |
} | |
class zmq_error : public std::exception | |
{ | |
public: | |
zmq_error () : errnum (zmq_errno ()) {} | |
virtual const char *what () const noexcept | |
{ | |
return zmq_strerror (errnum); | |
} | |
private: | |
int errnum; | |
}; | |
void ensure_rc(int rc) | |
{ | |
if (rc != 0) | |
{ | |
zmq_error err; | |
printf("Error occurred %s\n", err.what()); | |
throw err; | |
} | |
} | |
class zmq_message { | |
friend class socket_t; | |
public: | |
inline zmq_message () | |
{ | |
ensure_rc(zmq_msg_init(&msg)); | |
} | |
inline zmq_message(const void *data_, size_t size_) | |
{ | |
ensure_rc(zmq_msg_init_size(&msg, size_)); | |
memmove(data(), data_, size_); | |
} | |
inline ~zmq_message() noexcept | |
{ | |
int rc = zmq_msg_close (&msg); | |
assert(rc == 0); | |
} | |
inline void *data() noexcept | |
{ | |
return zmq_msg_data (&msg); | |
} | |
inline size_t size() const noexcept | |
{ | |
return zmq_msg_size (const_cast<zmq_msg_t*>(&msg)); | |
} | |
zmq_msg_t msg; | |
private: | |
zmq_message (const zmq_message&); | |
void operator = (const zmq_message&); | |
}; | |
class zmq_context | |
{ | |
public: | |
zmq_context() { | |
int io_threads_ = 1; | |
int max_sockets_ = ZMQ_MAX_SOCKETS_DFLT; | |
ptr = zmq_ctx_new(); | |
if(!ptr) | |
throw zmq_error(); | |
ensure_rc(zmq_ctx_set(ptr, ZMQ_IO_THREADS, io_threads_)); | |
ensure_rc(zmq_ctx_set(ptr, ZMQ_MAX_SOCKETS, max_sockets_)); | |
} | |
void close() | |
{ | |
if(!ptr) | |
return; | |
int rc = zmq_ctx_destroy(ptr); | |
assert(rc == 0); | |
ptr = NULL; | |
} | |
~zmq_context() | |
{ | |
close(); | |
} | |
void* ptr; | |
}; | |
class zmq_socket_t | |
{ | |
public: | |
inline zmq_socket_t(zmq_context& context_, int type_) | |
{ | |
ptr = zmq_socket(context_.ptr, type_); | |
if (ptr == NULL) | |
throw zmq_error(); | |
} | |
inline void bind(const char* addr) | |
{ | |
ensure_rc(zmq_bind(ptr, addr)); | |
} | |
inline void connect (const char *addr_) | |
{ | |
ensure_rc(zmq_connect(ptr, addr_)); | |
} | |
inline bool recv(zmq_message& msg_, int flags_ = 0) | |
{ | |
int nbytes = zmq_msg_recv (&(msg_.msg), ptr, flags_); | |
if (nbytes >= 0) | |
return true; | |
if (zmq_errno () == EAGAIN) | |
return false; | |
throw zmq_error(); | |
} | |
inline void close() noexcept | |
{ | |
if(ptr == NULL) | |
return ; | |
int rc = zmq_close (ptr); | |
assert(rc == 0); | |
ptr = 0; | |
} | |
~zmq_socket_t() noexcept | |
{ | |
close(); | |
} | |
void* ptr; | |
}; | |
void ensure_send(void* socket, zmq_message& msg) | |
{ | |
// pyzmq uses a poller to only send when we can, see: http://api.zeromq.org/4-2:zmq-poll | |
int rc = 0; | |
while(1) | |
{ | |
int val = 0; | |
size_t val_size = sizeof(val); | |
rc = zmq_getsockopt(socket, ZMQ_EVENTS, &val, &val_size); | |
if (rc != 0) | |
{ | |
printf("Error setting ZMQ_SNDHWM %d %d\n", rc, zmq_errno()); | |
throw std::runtime_error("Error sending message"); | |
} | |
if(val & POLL_OUT) | |
{ | |
break; | |
} | |
std::this_thread::sleep_for(std::chrono::milliseconds(100)); | |
} | |
int nbytes = zmq_msg_send(&(msg.msg), socket, ZMQ_DONTWAIT); | |
if (nbytes < 0) | |
{ | |
int rc = zmq_errno(); | |
if (rc != 0) | |
{ | |
std::cout << "Error sending message rc: " << rc << std::endl; | |
throw std::runtime_error("Error sending message"); | |
} | |
} | |
} | |
void ensure_hwm(void* socket) | |
{ | |
int val = 2; | |
int rc = zmq_setsockopt(socket, ZMQ_SNDHWM, &val, sizeof(val)); | |
if (rc != 0) | |
{ | |
std::cout << "Error setting ZMQ_SNDHWM" << rc << " " << zmq_errno() << std::endl; | |
throw std::runtime_error("Error setting ZMQ_SNDHWM"); | |
} | |
rc = zmq_setsockopt(socket, ZMQ_RCVHWM, &val, sizeof(val)); | |
if (rc != 0) | |
{ | |
std::cout << "Error setting ZMQ_RCVHWM " << rc << " " << zmq_errno() << std::endl; | |
throw std::runtime_error("Error setting ZMQ_RCVHWM"); | |
} | |
val = -1; | |
rc = zmq_setsockopt(socket, ZMQ_LINGER, &val, sizeof(val)); | |
if (rc != 0) | |
{ | |
std::cout << "Error setting ZMQ_LINGER " << rc << " " << zmq_errno() << std::endl; | |
throw std::runtime_error("Error setting ZMQ_LINGER"); | |
} | |
} | |
void sender() | |
{ | |
std::string obj_bytes(229139, 'a'); | |
std::string term_bytes(41, 't'); | |
try | |
{ | |
void* context = zmq_ctx_new(); | |
void* socket = zmq_socket(context, ZMQ_PUSH); | |
ensure_hwm(socket); | |
int rc = zmq_connect(socket, SOCKET_NAME); | |
if (rc != 0) | |
{ | |
std::cout << "Error connecting to socket rc:" << rc << " " << zmq_errno() <<std::endl; | |
throw std::runtime_error("Error connecting to socket"); | |
} | |
for(int i = 0; i < NUM_ITERS; ++i) | |
{ | |
std::cout << "Sndr iter: " << i << "/" << NUM_ITERS << std::endl; | |
for(int j = 0; j < 5; ++j) | |
{ | |
zmq_message msg(obj_bytes.c_str(), obj_bytes.length()); | |
ensure_send(socket, msg); | |
} | |
std::cout << "Sending terminator" << std::endl; | |
zmq_message msg2(term_bytes.c_str(), term_bytes.length()); | |
ensure_send(socket, msg2); | |
} | |
rc = zmq_close (socket); | |
if(rc != 0) | |
{ | |
std::cout << "Error destroying context rc:" << rc << " " << zmq_errno() <<std::endl; | |
throw std::runtime_error("Error closing socket"); | |
} | |
rc = zmq_ctx_destroy(context); | |
if(rc != 0) | |
{ | |
std::cout << "Error destroying context rc:" << rc << " " << zmq_errno() <<std::endl; | |
throw std::runtime_error("Error destroying context"); | |
} | |
std::cout << "Exiting sender" << std::endl; | |
} | |
catch(std::exception& e) | |
{ | |
std::cout << "Error in sender " << e.what() << std::endl; | |
throw; | |
} | |
} | |
void receiver(std::mutex* mutex) | |
{ | |
try | |
{ | |
zmq_context context; | |
zmq_socket_t socket(context, ZMQ_PULL); | |
ensure_hwm(socket.ptr); | |
socket.bind(SOCKET_NAME); | |
std::cout << "Receiver ready" << std::endl; | |
mutex->unlock(); | |
int total_received = 0; | |
int num_terminators = NUM_TERMINATORS; | |
while(1) | |
{ | |
zmq_message msg; | |
bool success = socket.recv(msg); | |
if(!success) { | |
throw std::runtime_error("Error receiving message"); | |
} | |
char* data = static_cast<char*>(msg.data()); | |
total_received += 1; | |
if(data[0] == 't') | |
{ | |
num_terminators -= 1; | |
} | |
std::cout << "Rcv: " << data[0] << " size:" << msg.size() << " rcv'd: " << total_received << " waiting terms: " << num_terminators << "/" << NUM_TERMINATORS << std::endl; | |
if(num_terminators == 0) | |
{ | |
break; | |
} | |
std::this_thread::sleep_for(std::chrono::milliseconds(100)); | |
} | |
socket.close(); | |
context.close(); | |
std::cout << "Total received: " << total_received << std::endl; | |
} | |
catch(std::exception& e) | |
{ | |
std::cout << "Error in receiver " << e.what() << std::endl; | |
throw; | |
} | |
} | |
pid_t fork_method(WorkerFuncPtr funcPtr) | |
{ | |
pid_t childPID = fork(); | |
if(childPID >= 0) // fork was successful | |
{ | |
if(childPID == 0) // child process | |
{ | |
bool wait = false; | |
while(wait) | |
{ | |
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); | |
} | |
funcPtr(); | |
exit(0); | |
} | |
else //Parent process | |
{ | |
return childPID; // fall through | |
} | |
} | |
else // fork failed | |
{ | |
throw std::runtime_error("Error forking"); | |
} | |
} | |
void forked_senders() | |
{ | |
std::vector<pid_t> workers; | |
for(int i = 0; i < NUM_PROCS; ++i) | |
{ | |
workers.push_back(fork_method(sender)); | |
} | |
std::cout << "Waiting for sends to complete" << std::endl; | |
for (auto pid : workers) { | |
int status = 0; | |
while (-1 == waitpid(pid, &status, 0)); | |
if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) { | |
std::cout << "Process " << pid << " failed with status: " << status << std::endl; | |
exit(1); | |
} | |
} | |
} | |
void threaded_senders() | |
{ | |
std::vector<std::thread> workers; | |
for(int i = 0; i < NUM_PROCS; ++i) | |
{ | |
workers.push_back(std::thread(sender)); | |
} | |
std::cout << "Waiting for sends to complete" << std::endl; | |
for (auto& worker : workers) { | |
worker.join(); | |
} | |
} | |
int main(int argc, const char * argv[]) | |
{ | |
try { | |
if(path_exists(TEMP_PATH)) { | |
unlink(TEMP_PATH); | |
} | |
std::mutex mutex; | |
mutex.lock(); | |
std::thread rcvThread(receiver, &mutex); | |
// wait for receiver to be ready, should use event | |
mutex.lock(); | |
threaded_senders(); | |
std::cout << "Waiting to receive all terminators" << std::endl; | |
rcvThread.join(); | |
} catch (std::exception& e) { | |
std::cout << "Error occurred " << e.what() << std::endl; | |
} | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment