Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
libzmq linger test
#include "testutil.hpp"
#define PAYLOAD_SIZE 229139
#define TERM_SIZE 41
#define NUM_WORKERS 6
#define NUM_ITERS 4
#define NUM_TERMS NUM_WORKERS * NUM_ITERS
void worker_task (void *ep)
{
char *my_endpoint = (char *)ep;
assert (my_endpoint);
char payload [PAYLOAD_SIZE];
memset (payload, 'a', PAYLOAD_SIZE);
char term [TERM_SIZE];
memset (term, 't', TERM_SIZE);
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sc = zmq_socket (ctx, ZMQ_PUSH);
assert (sc);
int val = 2;
int rc = zmq_setsockopt(sc, ZMQ_SNDHWM, &val, sizeof(val));
assert (rc == 0);
val = 2;
rc = zmq_setsockopt(sc, ZMQ_RCVHWM, &val, sizeof(val));
assert (rc == 0);
val = -1;
rc = zmq_setsockopt(sc, ZMQ_LINGER, &val, sizeof(val));
assert (rc == 0);
rc = zmq_connect (sc, my_endpoint);
assert (rc == 0);
for (int i = 0; i < NUM_ITERS; ++i) {
zmq_msg_t msg;
for(int j = 0; j < 5; ++j) {
rc = zmq_msg_init_data (&msg, payload, PAYLOAD_SIZE, NULL, NULL);
assert (rc == 0);
while (1) {
rc = zmq_msg_send (&msg, sc, ZMQ_DONTWAIT);
if (rc == PAYLOAD_SIZE)
break;
else if (rc == -1 && errno == EAGAIN)
msleep (100);
else
assert (false);
}
}
rc = zmq_msg_init_data (&msg, term, TERM_SIZE, NULL, NULL);
assert (rc == 0);
rc = zmq_msg_send (&msg, sc, 0);
assert (rc == TERM_SIZE);
}
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
}
int main (void)
{
char my_endpoint[256];
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_PULL);
assert (sb);
int val = 2;
int rc = zmq_setsockopt(sb, ZMQ_SNDHWM, &val, sizeof(val));
assert (rc == 0);
val = 2;
rc = zmq_setsockopt(sb, ZMQ_RCVHWM, &val, sizeof(val));
assert (rc == 0);
val = -1;
rc = zmq_setsockopt(sb, ZMQ_LINGER, &val, sizeof(val));
assert (rc == 0);
rc = zmq_bind (sb, "ipc://*");
assert (rc == 0);
size_t len = sizeof(my_endpoint);
rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
void *workers [NUM_WORKERS];
for (int i = 0; i < NUM_WORKERS; ++i)
workers [i] = zmq_threadstart(&worker_task, my_endpoint);
for (int terms_left = NUM_TERMS; terms_left > 0; ) {
zmq_msg_t msg;
zmq_msg_init (&msg);
rc = zmq_msg_recv (&msg, sb, 0);
assert (rc == PAYLOAD_SIZE || rc == TERM_SIZE);
if (*(char *)zmq_msg_data (&msg) == 't')
--terms_left;
zmq_msg_close (&msg);
msleep (100);
}
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
for (int i = 0; i < NUM_WORKERS; ++i)
zmq_threadclose(workers [i]);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.