Skip to content

Instantly share code, notes, and snippets.

@quentusrex
Created March 26, 2013 16:45
Show Gist options
  • Save quentusrex/5246965 to your computer and use it in GitHub Desktop.
Save quentusrex/5246965 to your computer and use it in GitHub Desktop.
#include <stdio.h>
#include <hiredis.h>
#include <pthread.h>
#include <semaphore.h>
#define NUM_THREADS 50
// Globals
char *server = "192.168.100.219";
int port = 6379, ready = 0;
pthread_mutex_t subscriber_block;
void print_reply(redisReply *r) {
int x = 0;
switch(r->type) {
case REDIS_REPLY_STATUS:
printf("Reply(status): %s\n", r->str);
break;
case REDIS_REPLY_ERROR:
printf("Reply(error): %s\n", r->str);
break;
case REDIS_REPLY_INTEGER:
printf("Reply(int): %lld\n", r->integer);
break;
case REDIS_REPLY_STRING:
printf("Reply(str): %s\n", r->str);
break;
case REDIS_REPLY_ARRAY:
printf("Reply(arra):\n");
for( x = 0; x < r->elements; x++) {
print_reply(r->element[x]);
}
break;
default:
printf("Unknown reply type\n");
}
}
void *subscriber(void *data) {
redisContext *c = NULL;
redisReply *r = NULL;
int x = 0;
printf("Thread %d\n", (int) data);
c = redisConnect(server, port);
if ( c != NULL && c->err) {
printf("Error: %s\n", c->errstr);
goto err;
}
r = redisCommand(c, "SUBSCRIBE FOO");
freeReplyObject(r);
pthread_mutex_lock(&subscriber_block);
ready += 1;
pthread_mutex_unlock(&subscriber_block);
while( redisGetReply(c, (void **)&r) == REDIS_OK ) {
if( strcmp(r->element[2]->str, "test") == 0 ) {
// printf("Got the test command\n");
} else if( strcmp(r->element[2]->str, "quit") == 0 ) {
printf("Got the quit command\n");
goto err;
} else {
printf("NOT THE QUIT\n");
}
freeReplyObject(r);
}
printf("Error\n");
err:
printf("Thread %d exiting\n", (int) data);
freeReplyObject(r);
redisFree(c);
pthread_exit(NULL);
}
int main() {
redisContext *c = NULL, *cp = NULL;
redisReply *r = NULL;
pthread_t threads[NUM_THREADS];
int rc;
long t;
pthread_mutex_init(&subscriber_block, NULL);
c = redisConnect(server, port);
if ( c != NULL && c->err) {
printf("Error: %s\n", c->errstr);
goto err;
}
for ( t = 0; t < NUM_THREADS; t++ ) {
rc = pthread_create(&threads[t], NULL, subscriber, (void *) t);
if ( rc) {
printf("Error creating thread\n");
return 1;
}
// sleep(1);
}
while ( pthread_mutex_lock(&subscriber_block) && ready < NUM_THREADS ) {
pthread_mutex_unlock(&subscriber_block);
}
sleep(1);
printf("Main is unblocked now\n");
r = redisCommand(c, "PUBLISH FOO %s", "test");
print_reply(r);
freeReplyObject(r);
sleep(5);
r = redisCommand(c, "PUBLISH FOO %s", "quit");
print_reply(r);
freeReplyObject(r);
r = NULL;
redisFree(c);
c = NULL;
for ( t = 0; t < NUM_THREADS; t++ ){
void *status;
printf("Joining %d\n", t);
pthread_join(threads[t], &status);
}
return 0;
err:
if ( r ) {
freeReplyObject(r);
r = NULL;
}
if ( c ) {
redisFree(c);
c = NULL;
}
return 1;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment