Skip to content

Instantly share code, notes, and snippets.

@kmatt
Last active Jan 11, 2019
Embed
What would you like to do?
Example file chunker and Redis loader
import multiprocessing, os, sys, time
from itertools import izip
import redis
rr = None
def process_chunk(chunk):
global rr
if rr is None:
rr = redis.Redis() # Create one connection per process
print 'PID %d' % os.getpid()
pl = rr.pipeline(transaction=False)
lc = 0
for line in chunk:
sample = dict(izip(cols, line.split(',')))
key = "%s" % (sample['hhmi']])
bps = int(sample['data'])
for f in fields:
pl.hincrby("%s:%s" % (key, f), sample[f], bps)
lc = lc + 1
pl.execute()
return lc
def grouper(n, iterable):
return izip(*[iter(iterable)]*n)
if __name__ == '__main__':
cols = #list of all columns in the data set / header
cols = cols.split(',')
fields = #list of fields to parse
fields = fields.split(',')
csv = open('0000.csv','rU')
pool = multiprocessing.Pool(6)
c = 0
lc = 0
jobs = []
for chunk in grouper(100, csv.readlines()):
c += 1
jobs.append(chunk)
print '%d chunks' % c
stime = time.time()
rc = pool.map(process_chunk, jobs)
rtime = (time.time()-stime)+.001 # Avoid div/0
for l in rc: lc += l
print("END: %d/s" % int(lc/rtime))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment