Created
September 9, 2010 19:39
-
-
Save hintjens/572417 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* testclient.c program to emit one 2-part message to testpub.c */ | |
#include "zhelpers.h" | |
int main (int argc, char **argv) { | |
void *ctx = zmq_init (1); | |
assert (ctx); | |
void *client = zmq_socket(ctx, ZMQ_REQ); | |
assert (client); | |
assert (zmq_connect (client, "tcp://127.0.0.1:4001") == 0); | |
s_sendmore (client, "COMMAND"); | |
s_send (client, "DATA"); | |
char *reply = s_recv (client); | |
printf ("Received reply: %s\n", reply); | |
free (reply); | |
zmq_term (context); | |
return 0; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* testpub.c file to recieve 2-part message from testclient.c */ | |
#include "zhelpers.h" | |
static void * | |
queue_thread (void *arg) { | |
// Shuffle requests to worker threads and back to main | |
// Frontend router speaks to mama clients | |
void *context = arg; | |
void *frontend = zmq_socket (context, ZMQ_XREP); | |
assert (frontend); | |
assert (zmq_bind (frontend, "tcp://*:4001") == 0); | |
// Backend dealer speaks to papa workers | |
void *backend = zmq_socket (context, ZMQ_XREQ); | |
assert (backend); | |
assert (zmq_bind (backend, "ipc://worker") == 0); | |
zmq_device (ZMQ_QUEUE, frontend, backend); | |
return NULL; | |
} | |
static void * | |
worker_thread (void *arg) { | |
void *context = arg; | |
void *worker = zmq_socket (context, ZMQ_REP); | |
assert (worker); | |
assert (zmq_connect (worker, "ipc://worker") == 0); | |
void *collector = zmq_socket (context, ZMQ_PUSH); | |
assert (collector); | |
assert (zmq_connect (collector, "ipc://collector") == 0); | |
while (1) { | |
char *part1 = s_recv (worker); | |
char *part2 = s_recv (worker); | |
printf ("Worker got [%s][%s]\n", part1, part2); | |
s_sendmore (collector, "msg"); | |
s_sendmore (collector, part1); | |
s_send (collector, part2); | |
free (part1); | |
free (part2); | |
s_send (worker, "OK"); | |
} | |
return NULL; | |
} | |
int main (int argc, char **argv){ | |
void *context = zmq_init (1); | |
assert (context); | |
// Start child threads | |
pthread_t thread; | |
assert (pthread_create (&thread, NULL, queue_thread, context) == 0); | |
assert (pthread_create (&thread, NULL, worker_thread, context) == 0); | |
// Create collector for worker results | |
void *collector = zmq_socket (context, ZMQ_PULL); | |
assert (collector); | |
assert (zmq_bind (collector, "ipc://collector") == 0); | |
// Forward messages from workers to our pub socket | |
// We don't actually have a pub socket | |
while (1) { | |
char *key = s_recv (collector); | |
char *part1 = s_recv (collector); | |
char *part2 = s_recv (collector); | |
printf ("Main got: [%s][%s][%s]\n", key, part1, part2); | |
free (key); | |
free (part1); | |
free (part2); | |
} | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment