Created
March 26, 2013 16:45
-
-
Save quentusrex/5246965 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <hiredis.h> | |
#include <pthread.h> | |
#include <semaphore.h> | |
#define NUM_THREADS 50 | |
// Globals | |
char *server = "192.168.100.219"; | |
int port = 6379, ready = 0; | |
pthread_mutex_t subscriber_block; | |
void print_reply(redisReply *r) { | |
int x = 0; | |
switch(r->type) { | |
case REDIS_REPLY_STATUS: | |
printf("Reply(status): %s\n", r->str); | |
break; | |
case REDIS_REPLY_ERROR: | |
printf("Reply(error): %s\n", r->str); | |
break; | |
case REDIS_REPLY_INTEGER: | |
printf("Reply(int): %lld\n", r->integer); | |
break; | |
case REDIS_REPLY_STRING: | |
printf("Reply(str): %s\n", r->str); | |
break; | |
case REDIS_REPLY_ARRAY: | |
printf("Reply(arra):\n"); | |
for( x = 0; x < r->elements; x++) { | |
print_reply(r->element[x]); | |
} | |
break; | |
default: | |
printf("Unknown reply type\n"); | |
} | |
} | |
void *subscriber(void *data) { | |
redisContext *c = NULL; | |
redisReply *r = NULL; | |
int x = 0; | |
printf("Thread %d\n", (int) data); | |
c = redisConnect(server, port); | |
if ( c != NULL && c->err) { | |
printf("Error: %s\n", c->errstr); | |
goto err; | |
} | |
r = redisCommand(c, "SUBSCRIBE FOO"); | |
freeReplyObject(r); | |
pthread_mutex_lock(&subscriber_block); | |
ready += 1; | |
pthread_mutex_unlock(&subscriber_block); | |
while( redisGetReply(c, (void **)&r) == REDIS_OK ) { | |
if( strcmp(r->element[2]->str, "test") == 0 ) { | |
// printf("Got the test command\n"); | |
} else if( strcmp(r->element[2]->str, "quit") == 0 ) { | |
printf("Got the quit command\n"); | |
goto err; | |
} else { | |
printf("NOT THE QUIT\n"); | |
} | |
freeReplyObject(r); | |
} | |
printf("Error\n"); | |
err: | |
printf("Thread %d exiting\n", (int) data); | |
freeReplyObject(r); | |
redisFree(c); | |
pthread_exit(NULL); | |
} | |
int main() { | |
redisContext *c = NULL, *cp = NULL; | |
redisReply *r = NULL; | |
pthread_t threads[NUM_THREADS]; | |
int rc; | |
long t; | |
pthread_mutex_init(&subscriber_block, NULL); | |
c = redisConnect(server, port); | |
if ( c != NULL && c->err) { | |
printf("Error: %s\n", c->errstr); | |
goto err; | |
} | |
for ( t = 0; t < NUM_THREADS; t++ ) { | |
rc = pthread_create(&threads[t], NULL, subscriber, (void *) t); | |
if ( rc) { | |
printf("Error creating thread\n"); | |
return 1; | |
} | |
// sleep(1); | |
} | |
while ( pthread_mutex_lock(&subscriber_block) && ready < NUM_THREADS ) { | |
pthread_mutex_unlock(&subscriber_block); | |
} | |
sleep(1); | |
printf("Main is unblocked now\n"); | |
r = redisCommand(c, "PUBLISH FOO %s", "test"); | |
print_reply(r); | |
freeReplyObject(r); | |
sleep(5); | |
r = redisCommand(c, "PUBLISH FOO %s", "quit"); | |
print_reply(r); | |
freeReplyObject(r); | |
r = NULL; | |
redisFree(c); | |
c = NULL; | |
for ( t = 0; t < NUM_THREADS; t++ ){ | |
void *status; | |
printf("Joining %d\n", t); | |
pthread_join(threads[t], &status); | |
} | |
return 0; | |
err: | |
if ( r ) { | |
freeReplyObject(r); | |
r = NULL; | |
} | |
if ( c ) { | |
redisFree(c); | |
c = NULL; | |
} | |
return 1; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment