Skip to content

Instantly share code, notes, and snippets.

@hintjens
Created September 9, 2010 19:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hintjens/572417 to your computer and use it in GitHub Desktop.
Save hintjens/572417 to your computer and use it in GitHub Desktop.
/* testclient.c program to emit one 2-part message to testpub.c */
#include "zhelpers.h"
int main (int argc, char **argv) {
void *ctx = zmq_init (1);
assert (ctx);
void *client = zmq_socket(ctx, ZMQ_REQ);
assert (client);
assert (zmq_connect (client, "tcp://127.0.0.1:4001") == 0);
s_sendmore (client, "COMMAND");
s_send (client, "DATA");
char *reply = s_recv (client);
printf ("Received reply: %s\n", reply);
free (reply);
zmq_term (context);
return 0;
}
/* testpub.c file to recieve 2-part message from testclient.c */
#include "zhelpers.h"
static void *
queue_thread (void *arg) {
// Shuffle requests to worker threads and back to main
// Frontend router speaks to mama clients
void *context = arg;
void *frontend = zmq_socket (context, ZMQ_XREP);
assert (frontend);
assert (zmq_bind (frontend, "tcp://*:4001") == 0);
// Backend dealer speaks to papa workers
void *backend = zmq_socket (context, ZMQ_XREQ);
assert (backend);
assert (zmq_bind (backend, "ipc://worker") == 0);
zmq_device (ZMQ_QUEUE, frontend, backend);
return NULL;
}
static void *
worker_thread (void *arg) {
void *context = arg;
void *worker = zmq_socket (context, ZMQ_REP);
assert (worker);
assert (zmq_connect (worker, "ipc://worker") == 0);
void *collector = zmq_socket (context, ZMQ_PUSH);
assert (collector);
assert (zmq_connect (collector, "ipc://collector") == 0);
while (1) {
char *part1 = s_recv (worker);
char *part2 = s_recv (worker);
printf ("Worker got [%s][%s]\n", part1, part2);
s_sendmore (collector, "msg");
s_sendmore (collector, part1);
s_send (collector, part2);
free (part1);
free (part2);
s_send (worker, "OK");
}
return NULL;
}
int main (int argc, char **argv){
void *context = zmq_init (1);
assert (context);
// Start child threads
pthread_t thread;
assert (pthread_create (&thread, NULL, queue_thread, context) == 0);
assert (pthread_create (&thread, NULL, worker_thread, context) == 0);
// Create collector for worker results
void *collector = zmq_socket (context, ZMQ_PULL);
assert (collector);
assert (zmq_bind (collector, "ipc://collector") == 0);
// Forward messages from workers to our pub socket
// We don't actually have a pub socket
while (1) {
char *key = s_recv (collector);
char *part1 = s_recv (collector);
char *part2 = s_recv (collector);
printf ("Main got: [%s][%s][%s]\n", key, part1, part2);
free (key);
free (part1);
free (part2);
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment