Skip to content

Instantly share code, notes, and snippets.

Created January 25, 2011 03:42
Show Gist options
  • Save jdmaturen/794474 to your computer and use it in GitHub Desktop.
Save jdmaturen/794474 to your computer and use it in GitHub Desktop.
Redis stats aggregator w/ Gevent
import gevent
from gevent import monkey
import hashlib
import os
import redis
from collections import defaultdict
from datetime import datetime
from gevent.pool import Pool
from time import time
def run(host):
set_key = 'formspring:fisher:tracked_keys'
r = redis.Redis(host)
exists = False
exists = gevent.with_timeout(1, r.exists, set_key, timeout_value=False)
except redis.exceptions.ConnectionError, e:
if not exists:
print 'Failure: %s' % host
return False
keys = list(r.smembers(set_key))
pipe = r.pipeline(transaction=False)
for key in keys:
pipe.getset(key, 0)
vals = pipe.execute()
return zip(keys, [int(val) for val in vals])
if __name__ == '__main__':
import settings
hosts = []
with open(settings.hosts) as f:
for line in f:
# not sure what a good max is here ... seems to work fine @ 200
pool = Pool(min(200, len(hosts)))
coros = []
with gevent.Timeout(10, False):
for host in hosts:
coros.append(pool.spawn(run, host))
results = defaultdict(lambda: 0)
finished = 0
succ = 0
for job in coros:
if job.ready():
finished += 1
if job.value:
succ += 1
for k, v in job.value:
results[k] += v
print succ, finished, len(hosts)
results = sorted([(k.replace('formspring:fisher:', '').replace(' ', '_'), v) for k, v in results.iteritems()])
tmp = 'zenoss.%s' % hashlib.md5(str(time())).hexdigest()[:6]
with open(tmp, 'w') as f:
f.write('timestamp:%d ' % time())
f.write(' '.join(['%s:%d' % (k, v) for k, v in results]))
os.rename(tmp, settings.zenoss)
with open(settings.aggregate, 'a') as f:
now =
f.write('\n'.join(["%s\t%s\t%s" % (now, k, v) for k,v in results]) + '\n')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment