Skip to content

Instantly share code, notes, and snippets.

@bcg
Created April 7, 2011 14:21
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save bcg/907853 to your computer and use it in GitHub Desktop.
Save bcg/907853 to your computer and use it in GitHub Desktop.
Hipster C + ZeroMQ + Redis MQ
zmqd.c = 45k rps
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include "zapi.h"
int count = 0;
int main(void) {
zctx_t *ctx = zctx_new();
void *s = zctx_socket_new (ctx, ZMQ_PUSH);
zctx_set_iothreads (ctx, 10);
zmq_connect (s, "tcp://127.0.0.1:15000");
while(1) {
zmsg_t *msg = zmsg_new ();
zframe_t *frame = zframe_new ("test", 4);
zmsg_push (msg, frame);
zmsg_send (&msg, s);
count++;
if ( (count%10000) == 0) {
printf("%d\n", count);
count = 0;
}
}
zctx_destroy(&ctx);
return 0;
}
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include "zapi.h"
#include "hiredis/hiredis.h"
int count = 0;
zlist_t *redis_buffer = NULL;
int report(zloop_t *loop, void *s, void *arg) {
printf("T:1s - %d\n", count);
count = 0;
}
int on_readable(zloop_t *loop, void *s, redisContext *redis) {
zmsg_t *msg = zmsg_new ();
int i;
msg = zmsg_recv (s);
assert(msg);
for(i = 0;i <= zmsg_size(msg); i++) {
char *str = zmsg_popstr (msg);
redisReply *reply;
zlist_append(redis_buffer, str);
if (zlist_size(redis_buffer) > 1000) { // ZOMG batching!!
int i;
int n = zlist_size(redis_buffer);
for (i=0; i < n; i++) {
char *str2 = zlist_pop(redis_buffer);
redisAppendCommand(redis, "LPUSH q %s", str2);
}
}
count++;
}
return 0;
}
int main(void) {
redisContext * redis = redisConnect("127.0.0.1", 6379);
zctx_t *ctx = zctx_new();
void *s = zctx_socket_new (ctx, ZMQ_PULL);
redis_buffer = zlist_new();
zctx_set_iothreads (ctx, 10);
zmq_bind (s, "tcp://*:15000");
if (redis->err) {
printf("Error: %s\n", redis->errstr);
exit(1);
}
zloop_t *loop = zloop_new ();
assert (loop);
zloop_reader (loop, s, on_readable, redis);
zloop_timer (loop, 1000, 100000, report, NULL);
zloop_start (loop);
redisFree(redis);
zloop_destroy (&loop);
assert (loop == NULL);
zctx_destroy(&ctx);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment