Skip to content

Instantly share code, notes, and snippets.

@darach
Last active August 29, 2015 14:05
Show Gist options
  • Save darach/7c4d074edac22dd9d25c to your computer and use it in GitHub Desktop.
Save darach/7c4d074edac22dd9d25c to your computer and use it in GitHub Desktop.
0MQ Slow Joiner
CC = gcc
LD =
AR = ar
OBJS = zpub.o zsub.o
LIBS = -lzmq -lczmq -lwjelement -lwjreader #-lpthreads
CFLAGS = -Ideps/libzmq/include -Ideps/czmq/include -Ideps/wjelement/include -O2 -funroll-loops -g -std=c99 -pedantic -Wall -fPIC
LDFLAGS = -Ldeps/libzmq/src/.libs -Ldeps/czmq/src/.libs -Ldeps/wjelement/src/wjelement -Ldeps/wjelement/src/wjreader/
ARFLAGS =
zpub.o: zpub.c
$(CC) $(CFLAGS) -c zpub.c
zsub.o: zsub.c
$(CC) $(CFLAGS) -c zsub.c
clean :
cd deps/wjelement \
make clean \
cd ../..
/bin/rm -f *.o
/bin/rm -f zpub
/bin/rm -f zsub
purge: clean
/bin/rm -rf deps
get-deps:
@if [ ! -d deps ]; then \
mkdir deps ; \
echo "Dependencies missing. Charging." ; \
fi ;
@if [ -e deps/wjelement ]; then \
echo "Dependency 'deps/wjelement' already bound." ; \
else \
echo Git cloning deps/wjelement ; \
git clone https://github.com/netmail-open/wjelement deps/wjelement ; \
echo Building deps/wjelement ; \
cd deps/wjelement ; \
cmake -Wno-dev . ; \
make ; \
fi ;
@if [ -e deps/libzmq ]; then \
echo "Dependency 'deps/libzmq' already bound." ; \
else \
echo Git cloning deps/libzmq; \
git clone git://github.com/zeromq/libzmq deps/libzmq ; \
echo Building deps/libzmq; \
cd deps/libzmq; \
./autogen.sh ; \
./configure ; \
make ; \
fi
@if [ -e deps/czmq ]; then \
echo "Dependency 'deps/czmq' already bound." ; \
else \
echo Git cloning deps/czmq; \
git clone git://github.com/zeromq/czmq deps/czmq ; \
echo Building deps/czmq; \
cd deps/czmq; \
./autogen.sh ; \
./configure ; \
make ; \
fi
zsub: get-deps zsub.o
@gcc $(LDFLAGS) $(LIBS) -o zsub zsub.o
zpub: get-deps zpub.o
@gcc $(LDFLAGS) $(LIBS) -o zpub zpub.o
all: zsub zpub
.PHONY: all
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <zmq.h>
int main (int argc, char **argv)
{
if (argc != 5)
{
fprintf(stderr, "Usage:\n\n %s <zeromq-pub-url> <channel> <delay-us> <count>", argv[0]);
fprintf(stderr, "\n\n example: $ %s tcp://*:4001 MyChannel 10000\n", argv[0]);
fflush(stderr);
return 1;
}
const char *zurl = argv[1];
const char *channel_name = argv[2];
const int delay_us = atoi(argv[3]);
unsigned long int count = strtoul(argv[4], NULL, 10);
printf("zurl: %s\n", zurl);
printf("channel: %s\n", channel_name);
printf("delay_us: %i\n", delay_us);
printf("count: %lu\n", count);
// Prepare our context and publisher
void *context = zmq_ctx_new ();
void *publisher = zmq_socket (context, ZMQ_PUB);
zmq_bind (publisher, zurl);
printf("Listening on %s\n", zurl);
unsigned long int current = 0;
unsigned long int tx = 0;
do {
int size = zmq_send(publisher, channel_name, strlen(channel_name), ZMQ_SNDMORE);
char buf[256];
sprintf(buf,"%lu bottles of beer on the wall",current++);
size += zmq_send(publisher, buf, strlen(buf), 0);
usleep(delay_us);
tx += size;
} while(--count > 0);
printf("Total bytes sent: %lu", tx);
zmq_close (publisher);
zmq_ctx_destroy (context);
return 0;
}
#include <limits.h>
#include <stdlib.h>
#include <zmq.h>
const int rcvhwm = 0;
int main(int argc, char **argv)
{
if (argc != 3)
{
fprintf(stderr, "Usage:\n\n %s <zeromq-pub-url> <channel>", argv[0]);
fprintf(stderr, "\n\n example: $ %s tcp://localhost:4001 MyChannel\n", argv[0]);
fflush(stderr);
return 1;
}
printf("zurl: %s\n", zurl);
printf("channel: %s\n", channel_name);
const char* zurl = argv[1];
const char* channel_name = argv[2];
unsigned long int min = ULONG_MAX;
printf("Connecting to %s\n", zurl);
void *context = zmq_ctx_new ();
void *subscriber = zmq_socket (context, ZMQ_SUB);
zmq_setsockopt (subscriber, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm));
zmq_connect (subscriber, zurl);
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, &channel_name, 0);
while (1) {
char channel[256];
int size = zmq_recv(subscriber, channel, 255, 0);
if (size == -1)
{
break;
}
if (size > 255)
{
size = 255;
}
channel[size] = 0;
char data[1024];
size = zmq_recv(subscriber, data, 1023, 0);
if (size == -1)
{
break;
}
if (size > 1023)
{
size = 1023;
}
data[size] = 0;
unsigned long int as_num = strtoul(data, NULL, 10);
if (as_num < min)
{
min = as_num;
}
printf ("[%s] %s (%lu)\n", channel, data, min);
}
// ZeroMQ context was terminated (for some unknown reason)
//
zmq_close (subscriber);
zmq_ctx_destroy (context);
return 0;
}
@darach
Copy link
Author

darach commented Aug 19, 2014

Sample usage once the 0mq peers are built:

Run a number of subscribers:

$ DYLD_LIBRARY_PATH=deps/libzmq/src/.libs:deps/czmq/src/.libs ./zsub "tcp://localhost:5001" Channel

Run a publisher:

$ DYLD_LIBRARY_PATH=deps/libzmq/src/.libs:deps/czmq/src/.libs ./zpub "tcp://*:5001" Channel 1000 1000000

Related content from the ZeroMQ guide: http://zguide.zeromq.org/page:all#Missing-Message-Problem-Solver

The guide suggests:

  • Reversing bind/connect in pub/sub may help
    Observation: Messages may still be lost. The higher the messaging frequency the more messages (rate relative) will be lost as with the previous case
  • Synchronized pub/sub.
    Observation: Introduces a sync protocol via REQ-REP pattern. It may be better to use messaging with guaranteed delivery instead. The example, for example, requires a priori knowledge of number of subscribers...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment