Skip to content

Instantly share code, notes, and snippets.

@thomasluce
Created December 30, 2010 19:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thomasluce/760174 to your computer and use it in GitHub Desktop.
Save thomasluce/760174 to your computer and use it in GitHub Desktop.
gcc -g3 -o queue -I/opt/local/include -lzmq queue.c
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <zmq.h>
#include <assert.h>
char in_connect[1000] = "";
char out_connect[1000] = "";
void *context;
void *in_socket;
void *out_socket;
void push_on_queue(char *message, void *queue) {
if(strcmp(out_connect, "") != 0) {
zmq_msg_t out_msg;
if(zmq_msg_init_size(&out_msg, strlen(message)) == -1) {
fprintf(stderr, "Error creating output queue mesage, %d\n", errno);
exit(errno);
}
memcpy(zmq_msg_data(&out_msg), message, strlen(message));
if(zmq_send(queue, &out_msg, 0) == -1) {
fprintf(stderr, "Error sending: %d\n", errno);
exit(errno);
}
if(zmq_msg_close(&out_msg) == -1) {
fprintf(stderr, "Error closing message: %d\n", errno);
exit(errno);
}
} else {
printf("%s", message);
fflush(stdout);
}
}
void usage() {
printf("Usage: ./queue [-i INPUT] [-o OUTPUT]\nWhere INPUT and OUTPUT are formatted as connection strings (urls)\n");
exit(0);
}
char *get_line(char line[], size_t s) {
if(strcmp(in_connect, "") == 0) {
return fgets(line, s, stdin);
} else {
zmq_msg_t in_msg;
if(zmq_msg_init(&in_msg) == -1) {
fprintf(stderr, "Error creating input queue mesage, %d\n", errno);
exit(errno);
}
if(zmq_recv(in_socket, &in_msg, 0) == 0) {
int size = zmq_msg_size (&in_msg);
memcpy (line, zmq_msg_data (&in_msg), size);
if(zmq_msg_close(&in_msg) == -1) {
fprintf(stderr, "Error closing, %d\n", errno);
exit(errno);
}
line[size] = 0;
return (line);
} else {
if(zmq_msg_close(&in_msg) == -1) {
fprintf(stderr, "Error closing, %d\n", errno);
exit(errno);
}
return NULL;
}
}
}
int main(int argc, char *argv[]) {
int c;
while((c = getopt(argc, argv, "i:o:h")) != -1) {
switch(c) {
case 'i':
strcpy(in_connect, optarg);
break;
case 'o':
strcpy(out_connect, optarg);
break;
case 'h':
usage();
break;
default:
printf("Bad argument: %c\n", c);
exit(1);
break;
}
}
context = zmq_init(1);
if(!context) {
fprintf(stderr, "Error creating queue context, %d\n", errno);
return errno;
}
in_socket = zmq_socket(context, ZMQ_PULL);
out_socket = zmq_socket(context, ZMQ_PUSH);
//Set up input
if(strcmp(in_connect, "") != 0) {
if(!in_socket) {
fprintf(stderr, "Error creating input queue socket, %d\n", errno);
return errno;
}
if(zmq_connect(in_socket, in_connect) == -1) {
fprintf(stderr, "Error connecting to input queue, %d\n", errno);
return errno;
}
}
// Set up output
if(strcmp(out_connect, "") != 0) {
if(!out_socket) {
fprintf(stderr, "Error creating output queue socket, %d\n", errno);
return errno;
}
if(zmq_bind(out_socket, out_connect) == -1) {
fprintf(stderr, "Error binding to output queue, %d, %s\n", errno, out_connect);
return errno;
}
}
char item[1000];
while(get_line(item, 1000)) {
push_on_queue(item, out_socket);
}
if(strcmp(out_connect, "") != 0) {
zmq_close(&out_socket);
}
if(strcmp(in_connect, "") != 0) {
zmq_close(&in_socket);
}
zmq_term(&context);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment