Created
August 16, 2010 13:04
-
-
Save imatix/526909 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Task sink in C - design 2 | |
// Adds pub-sub flow to send kill signal to workers | |
// | |
#include <zmq.h> | |
#include <unistd.h> | |
#include <string.h> | |
#include <stdio.h> | |
#include <time.h> | |
#include <sys/time.h> | |
int main (int argc, char *argv[]) | |
{ | |
void *context; // ØMQ context for our process | |
void *input; // Socket for input | |
void *control; // Socket for worker control | |
int task_nbr; | |
struct timeval | |
tstart, tend, tdiff; | |
int total_msec = 0; // Total calculated cost in msecs | |
zmq_msg_t message; | |
// Prepare our context and sockets | |
context = zmq_init (1); | |
input = zmq_socket (context, ZMQ_PULL); | |
zmq_bind (input, "tcp://*:5558"); | |
control = zmq_socket (context, ZMQ_PUB); | |
zmq_bind (control, "tcp://*:5559"); | |
// Wait for start of batch | |
zmq_msg_init (&message); | |
zmq_recv (input, &message, 0); | |
zmq_msg_close (&message); | |
// Start our clock now | |
gettimeofday (&tstart, NULL); | |
// Process 100 confirmations | |
for (task_nbr = 0; task_nbr < 100; task_nbr++) { | |
zmq_msg_init (&message); | |
zmq_recv (input, &message, 0); | |
zmq_msg_close (&message); | |
if ((task_nbr / 10) * 10 == task_nbr) | |
printf (":"); | |
else | |
printf ("."); | |
fflush (stdout); | |
} | |
// Calculate and report duration of batch | |
gettimeofday (&tend, NULL); | |
if (tend.tv_usec < tstart.tv_usec) { | |
tdiff.tv_sec = tend.tv_sec - tstart.tv_sec - 1; | |
tdiff.tv_usec = 1000000 + tend.tv_usec - tstart.tv_usec; | |
} else { | |
tdiff.tv_sec = tend.tv_sec - tstart.tv_sec; | |
tdiff.tv_usec = tend.tv_usec - tstart.tv_usec; | |
} | |
total_msec = tdiff.tv_sec * 1000 + tdiff.tv_usec / 1000; | |
printf ("Total elapsed time: %d msec\n", total_msec); | |
return 0; | |
// Send kill signal to workers | |
zmq_msg_init_data (&message, "KILL", 5, NULL, NULL); | |
zmq_send (control, &message, 0); | |
zmq_msg_close (&message); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Task ventilator in C | |
// Binds PUSH socket to tcp://localhost:5557 | |
// Sends batch of tasks to workers via that socket | |
// | |
#include <zmq.h> | |
#include <unistd.h> | |
#include <string.h> | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <time.h> | |
#define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0)) | |
int main (int argc, char *argv[]) | |
{ | |
void *context; // ØMQ context for our process | |
void *output; // Socket for output | |
int task_nbr; | |
int total_msec = 0; // Total expected cost in msecs | |
zmq_msg_t message; | |
// Initialize random number generator | |
srandom ((unsigned) time (NULL)); | |
// Prepare our context and socket | |
context = zmq_init (1); | |
output = zmq_socket (context, ZMQ_PUSH); | |
zmq_bind (output, "tcp://*:5557"); | |
printf ("Press Enter when the workers are ready: "); | |
getchar (); | |
printf ("Sending tasks to workers...\n"); | |
// The first message is "0" and signals start of batch | |
zmq_msg_init_data (&message, "0", 2, NULL, NULL); | |
zmq_send (output, &message, 0); | |
zmq_msg_close (&message); | |
// Send 100 tasks | |
for (task_nbr = 0; task_nbr < 100; task_nbr++) { | |
int workload; | |
// Random workload from 1 to 100msecs | |
workload = within (100) + 1; | |
total_msec += workload; | |
zmq_msg_init_size (&message, 10); | |
sprintf ((char *) zmq_msg_data (&message), "%d", workload); | |
zmq_send (output, &message, 0); | |
zmq_msg_close (&message); | |
} | |
printf ("Total expected cost: %d msec\n", total_msec); | |
sleep (1); // Give 0MQ time to deliver tasks | |
return 0; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Task worker in C - design 2 | |
// Adds pub-sub flow to receive and respond to kill signal | |
// | |
#include <zmq.h> | |
#include <unistd.h> | |
#include <string.h> | |
#include <stdio.h> | |
#include <time.h> | |
int main (int argc, char *argv[]) | |
{ | |
void *context; // ØMQ context for our process | |
void *input, *output; // Sockets for input and output | |
void *control; // Socket for control input | |
// Prepare our context and sockets | |
context = zmq_init (1); | |
input = zmq_socket (context, ZMQ_PULL); | |
zmq_connect (input, "tcp://localhost:5557"); | |
output = zmq_socket (context, ZMQ_PUSH); | |
zmq_connect (output, "tcp://localhost:5558"); | |
control = zmq_socket (context, ZMQ_SUB); | |
zmq_connect (output, "tcp://localhost:5559"); | |
zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0); | |
// Process messages from input and control sockets | |
// We prioritize traffic from the task ventilator | |
while (1) { | |
int rc; | |
struct timespec t; | |
zmq_msg_t message; | |
// Process any waiting tasks | |
for (rc = 0; !rc; ) { | |
zmq_msg_init (&message); | |
if ((rc = zmq_recv (input, &message, ZMQ_NOBLOCK)) == 0) { | |
// Process task | |
int workload; // Workload in msecs | |
struct timespec t; | |
sscanf ((char *) zmq_msg_data (&message), "%d", &workload); | |
t.tv_sec = 0; | |
t.tv_nsec = workload * 1000000; | |
// Do the work | |
nanosleep (&t, NULL); | |
// Send results to collector | |
zmq_msg_init (&message); | |
zmq_send (output, &message, 0); | |
// Simple progress indicator for the viewer | |
printf ("."); | |
fflush (stdout); | |
} | |
zmq_msg_close (&message); | |
if (rc == -1 && errno != EAGAIN) | |
break; | |
} | |
#if 0 | |
// Process any waiting control commands | |
for (rc = 0; !rc; ) { | |
zmq_msg_init (&message); | |
if ((rc = zmq_recv (control, &message, ZMQ_NOBLOCK)) == 0) { | |
// Process control command | |
// Assume any command means KILL | |
return 0; | |
} | |
zmq_msg_close (&message); | |
if (rc == -1 && errno != EAGAIN) | |
break; | |
} | |
// Sleep for 1 msec | |
t.tv_sec = 0; | |
t.tv_nsec = 1000000; | |
nanosleep (&t, NULL); | |
#endif | |
} | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment