Skip to content

Instantly share code, notes, and snippets.

@imatix
Created August 16, 2010 13:04
Show Gist options
  • Save imatix/526909 to your computer and use it in GitHub Desktop.
Save imatix/526909 to your computer and use it in GitHub Desktop.
//
// Task sink in C - design 2
// Adds pub-sub flow to send kill signal to workers
//
#include <zmq.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <time.h>
#include <sys/time.h>
int main (int argc, char *argv[])
{
void *context; // ØMQ context for our process
void *input; // Socket for input
void *control; // Socket for worker control
int task_nbr;
struct timeval
tstart, tend, tdiff;
int total_msec = 0; // Total calculated cost in msecs
zmq_msg_t message;
// Prepare our context and sockets
context = zmq_init (1);
input = zmq_socket (context, ZMQ_PULL);
zmq_bind (input, "tcp://*:5558");
control = zmq_socket (context, ZMQ_PUB);
zmq_bind (control, "tcp://*:5559");
// Wait for start of batch
zmq_msg_init (&message);
zmq_recv (input, &message, 0);
zmq_msg_close (&message);
// Start our clock now
gettimeofday (&tstart, NULL);
// Process 100 confirmations
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
zmq_msg_init (&message);
zmq_recv (input, &message, 0);
zmq_msg_close (&message);
if ((task_nbr / 10) * 10 == task_nbr)
printf (":");
else
printf (".");
fflush (stdout);
}
// Calculate and report duration of batch
gettimeofday (&tend, NULL);
if (tend.tv_usec < tstart.tv_usec) {
tdiff.tv_sec = tend.tv_sec - tstart.tv_sec - 1;
tdiff.tv_usec = 1000000 + tend.tv_usec - tstart.tv_usec;
} else {
tdiff.tv_sec = tend.tv_sec - tstart.tv_sec;
tdiff.tv_usec = tend.tv_usec - tstart.tv_usec;
}
total_msec = tdiff.tv_sec * 1000 + tdiff.tv_usec / 1000;
printf ("Total elapsed time: %d msec\n", total_msec);
return 0;
// Send kill signal to workers
zmq_msg_init_data (&message, "KILL", 5, NULL, NULL);
zmq_send (control, &message, 0);
zmq_msg_close (&message);
}
//
// Task ventilator in C
// Binds PUSH socket to tcp://localhost:5557
// Sends batch of tasks to workers via that socket
//
#include <zmq.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))
int main (int argc, char *argv[])
{
void *context; // ØMQ context for our process
void *output; // Socket for output
int task_nbr;
int total_msec = 0; // Total expected cost in msecs
zmq_msg_t message;
// Initialize random number generator
srandom ((unsigned) time (NULL));
// Prepare our context and socket
context = zmq_init (1);
output = zmq_socket (context, ZMQ_PUSH);
zmq_bind (output, "tcp://*:5557");
printf ("Press Enter when the workers are ready: ");
getchar ();
printf ("Sending tasks to workers...\n");
// The first message is "0" and signals start of batch
zmq_msg_init_data (&message, "0", 2, NULL, NULL);
zmq_send (output, &message, 0);
zmq_msg_close (&message);
// Send 100 tasks
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
int workload;
// Random workload from 1 to 100msecs
workload = within (100) + 1;
total_msec += workload;
zmq_msg_init_size (&message, 10);
sprintf ((char *) zmq_msg_data (&message), "%d", workload);
zmq_send (output, &message, 0);
zmq_msg_close (&message);
}
printf ("Total expected cost: %d msec\n", total_msec);
sleep (1); // Give 0MQ time to deliver tasks
return 0;
}
//
// Task worker in C - design 2
// Adds pub-sub flow to receive and respond to kill signal
//
#include <zmq.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <time.h>
int main (int argc, char *argv[])
{
void *context; // ØMQ context for our process
void *input, *output; // Sockets for input and output
void *control; // Socket for control input
// Prepare our context and sockets
context = zmq_init (1);
input = zmq_socket (context, ZMQ_PULL);
zmq_connect (input, "tcp://localhost:5557");
output = zmq_socket (context, ZMQ_PUSH);
zmq_connect (output, "tcp://localhost:5558");
control = zmq_socket (context, ZMQ_SUB);
zmq_connect (output, "tcp://localhost:5559");
zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
// Process messages from input and control sockets
// We prioritize traffic from the task ventilator
while (1) {
int rc;
struct timespec t;
zmq_msg_t message;
// Process any waiting tasks
for (rc = 0; !rc; ) {
zmq_msg_init (&message);
if ((rc = zmq_recv (input, &message, ZMQ_NOBLOCK)) == 0) {
// Process task
int workload; // Workload in msecs
struct timespec t;
sscanf ((char *) zmq_msg_data (&message), "%d", &workload);
t.tv_sec = 0;
t.tv_nsec = workload * 1000000;
// Do the work
nanosleep (&t, NULL);
// Send results to collector
zmq_msg_init (&message);
zmq_send (output, &message, 0);
// Simple progress indicator for the viewer
printf (".");
fflush (stdout);
}
zmq_msg_close (&message);
if (rc == -1 && errno != EAGAIN)
break;
}
#if 0
// Process any waiting control commands
for (rc = 0; !rc; ) {
zmq_msg_init (&message);
if ((rc = zmq_recv (control, &message, ZMQ_NOBLOCK)) == 0) {
// Process control command
// Assume any command means KILL
return 0;
}
zmq_msg_close (&message);
if (rc == -1 && errno != EAGAIN)
break;
}
// Sleep for 1 msec
t.tv_sec = 0;
t.tv_nsec = 1000000;
nanosleep (&t, NULL);
#endif
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment