Skip to content

Instantly share code, notes, and snippets.

@dspezia
Created November 26, 2012 18:26
Show Gist options
  • Save dspezia/4149768 to your computer and use it in GitHub Desktop.
Save dspezia/4149768 to your computer and use it in GitHub Desktop.
Example of Redis zset polling daemon
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <time.h>
#include "hiredis.h"
#include "async.h"
#include "adapters/ae.h"
#include "sha1.h"
struct Singleton {
int n;
int *port;
redisAsyncContext **servers;
aeEventLoop *loop;
char luasha1[48];
} singleton;
const char *LuaCmd =
"local res = redis.call('ZRANGEBYSCORE',KEYS[1], 0, ARGV[1], 'LIMIT', 0, 10 ) "
"if #res > 0 then "
" redis.call( 'ZREMRANGEBYRANK', KEYS[1], 0, #res-1 ) "
" return res "
"else "
" return false "
"end ";
void sha1hex(char *digest, const char *script, size_t len) {
SHA1_CTX ctx;
unsigned char hash[20];
char *cset = "0123456789abcdef";
int j;
SHA1Init(&ctx);
SHA1Update(&ctx,(unsigned char*)script,len);
SHA1Final(hash,&ctx);
for (j = 0; j < 20; j++) {
digest[j*2] = cset[((hash[j]&0xF0)>>4)];
digest[j*2+1] = cset[(hash[j]&0xF)];
}
digest[40] = '\0';
}
void dequeuedItem(redisAsyncContext *c, void *r, void *privdata) {
int i;
redisReply *reply = r;
if (reply == NULL) return;
switch( reply->type ) {
case REDIS_REPLY_ARRAY:
for ( i=0; i<reply->elements; ++i ) {
printf("Expired: %s\n", reply->element[i]->str );
redisAsyncCommand( c, NULL, NULL, "DEL %s", reply->element[i]->str );
}
if ( i>0 )
redisAsyncCommand( c, dequeuedItem, NULL, "EVALSHA %s 1 to_be_expired %ld", singleton.luasha1, time(NULL) );
break;
case REDIS_REPLY_ERROR:
case REDIS_REPLY_STATUS:
printf("ERror: %s\n",reply->str );
break;
case REDIS_REPLY_NIL:
break;
default:
printf("Error\n");
break;
}
}
int mainLoop( struct aeEventLoop *loop, long long id, void *clientData) {
time_t t = time(NULL);
for ( int i=0; i<singleton.n; ++i ) {
if ( singleton.servers[i] != NULL ) {
redisAsyncCommand( singleton.servers[i], dequeuedItem, NULL, "EVALSHA %s 1 to_be_expired %ld", singleton.luasha1, t );
}
}
fflush(stdout);
return 1000+rand()%1000;
}
void connectCallback(const redisAsyncContext *c, int status) {
if ( status != REDIS_OK )
{
printf("Error: %s\n", c->errstr);
for (int i=0; i<singleton.n; ++i )
if ( singleton.servers[i] == c )
singleton.servers[i] = NULL;
}
else
printf("connected...\n");
}
void disconnectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr);
}
printf("disconnected...\n");
for (int i=0; i<singleton.n; ++i )
if ( singleton.servers[i] == c )
singleton.servers[i] = NULL;
}
void checkConnections()
{
for ( int i=0; i<singleton.n; ++i ) {
if ( singleton.servers[i] == NULL ) {
printf("Connecting %d...\n",singleton.port[i] );
singleton.servers[i] = redisAsyncConnect("127.0.0.1", singleton.port[i] );
if ( singleton.servers[i]->err ) { perror("redisAsyncConnect"); exit( -1 ); }
redisAeAttach( singleton.loop, singleton.servers[i] );
redisAsyncSetConnectCallback( singleton.servers[i],connectCallback);
redisAsyncSetDisconnectCallback( singleton.servers[i],disconnectCallback);
redisAsyncCommand( singleton.servers[i], NULL, NULL, "SCRIPT LOAD %s", LuaCmd );
}
}
}
int reconnectIfNeeded( struct aeEventLoop *loop, long long id, void *clientData) {
checkConnections();
return 1000;
}
int main ( int argc, char *argv[] ) {
srand(time(NULL));
signal(SIGPIPE, SIG_IGN);
memset( &singleton, '\0', sizeof(struct Singleton) );
singleton.n = argc - 1;
singleton.servers = (redisAsyncContext **) malloc( singleton.n*sizeof( redisAsyncContext *) );
if ( !singleton.servers ) { perror("malloc"); exit( -1 ); }
singleton.port = (int *) malloc( singleton.n*sizeof(int) );
if ( !singleton.port ) { perror("malloc"); exit( -1 ); }
singleton.loop = aeCreateEventLoop(256);
memset( singleton.servers, '\0', singleton.n*sizeof( redisAsyncContext *) );
memset( singleton.port, '\0', singleton.n*sizeof(int) );
for ( int i=0; i<singleton.n; ++i )
singleton.port[i] = atoi(argv[i+1]);
sha1hex( singleton.luasha1, LuaCmd, strlen(LuaCmd) );
checkConnections();
aeCreateTimeEvent( singleton.loop,5,reconnectIfNeeded, NULL, NULL );
aeCreateTimeEvent( singleton.loop,5,mainLoop, NULL, NULL );
aeMain(singleton.loop);
return 0;
}
#!/usr/bin/env python
import time
for x in range(0,1000):
print "multi"
print 'set c%06d dummydata%d' % ( x,x )
print 'zadd to_be_expired %ld c%06d' % ( time.time()+20,x)
print "exec"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment