public
Created

Example Redis loader

  • Download Gist
gistfile1.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
import multiprocessing, os, sys, time
from itertools import izip
import redis
 
rr = None
def process_chunk(chunk):
global rr
if rr is None:
rr = redis.Redis() # Create one connection per process
print 'PID %d' % os.getpid()
 
pl = rr.pipeline(transaction=False)
lc = 0
for line in chunk:
sample = dict(izip(cols, line.split(',')))
key = "%s" % (sample['hhmi']])
bps = int(sample['data'])
for f in fields:
pl.hincrby("%s:%s" % (key, f), sample[f], bps)
lc = lc + 1
 
pl.execute()
return lc
 
def grouper(n, iterable):
return izip(*[iter(iterable)]*n)
if __name__ == '__main__':
cols = #list of all columns in the data set / header
cols = cols.split(',')
fields = #list of fields to parse
fields = fields.split(',')
 
csv = open('0000.csv','rU')
pool = multiprocessing.Pool(6)
c = 0
lc = 0
jobs = []
for chunk in grouper(100, csv.readlines()):
c += 1
jobs.append(chunk)
print '%d chunks' % c
stime = time.time()
rc = pool.map(process_chunk, jobs)
rtime = (time.time()-stime)+.001 # Avoid div/0
for l in rc: lc += l
print("END: %d/s" % int(lc/rtime))

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.