Skip to content

Instantly share code, notes, and snippets.

@macieksk
Created March 16, 2011 03:52
Show Gist options
  • Save macieksk/871985 to your computer and use it in GitHub Desktop.
Save macieksk/871985 to your computer and use it in GitHub Desktop.
multiprocessing, threading, pool, with access to db, web2py, parallel list process with example
#import threading
from multiprocessing import Process,Queue,Pool
from gluon.shell import exec_environment
def parallelListDB(fun, lst, nthreads=2, request=None,
timeout=10, aggregate = lambda lst: it.chain(*lst)):
assert request!=None
q=Queue()
def procf(i,sublst):
imidlog.log("started procf %d with len(sublst)=%d"%(i,len(sublst)))
ee = exec_environment('applications/welcome/models/db.py',request=request)
res=fun(i,ee.db,sublst)
q.put(res)
imidlog.log("procf %d finished"%i)
n=len(lst);
def assignSubLst(n,nthr):
a = n/nthr; r = n % nthr
assigLens=[a for _ in xrange(nthr)]
for i in xrange(r): assigLens[i]+=1
s=[0]
def cs(v):
fr=s[0]
s[0]+=v
return (fr,s[0])
return map(cs,assigLens)
thrs=[]
for i,(fr,to) in enumerate(assignSubLst(n,int(nthreads))):
imidlog.log("assigning sublist %d:%d"%(fr,to))
sublst=lst[fr:to]
if len(sublst)>0:
mt = Process(target=procf, args=(i,sublst))
mt.start()
thrs.append(mt)
res = []
while ( (len(res)<len(thrs)) and
(q.qsize()>0 or any(t.is_alive() for t in thrs))):
imidlog.log("qsize=%d alives=%d"%(q.qsize(), sum(t.is_alive() for t in thrs)))
try:
while (len(res)<len(thrs)): res.append(q.get(True,timeout))
break #if everything was gathered we return
except: pass
return aggregate(res)
def giveL2rs(db,request,patExps,probeData,chip_versions):
nexp=len(patExps)
def tupleForExpId(db,cnt,patexp):
expId=patexp.acgh_experiment.id
#untuple and add patid
l2rs = map(lambda (v,):v, db.executesql(
db((db.probe_experiment.acgh_exp_id==expId)
)._select(db.probe_experiment.l2r,
orderby=db.probe_experiment.probe_id)
))
imidlog.log("processing acgh_exp %d/%d"%(cnt,nexp))
return (l2rs,expId,patexp.chip_version.id)
seqofseq = parallelListDB(
lambda i,db,sublst: (i,[tupleForExpId(db,cnt,pe) for cnt,pe in sublst
if pe.auth_user.email!="macieksk@gmail.com"]),
list(enumerate(patExps)),
nthreads=Config.maxConcurrentJobs,
request=request,
#This complication is not needed if we don't need the result
#to be in the same order as it was submitted
aggregate=lambda res: it.chain( *(lst for i,lst in
sorted(res,key=lambda r:r[0])) )
)
#[...]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment