Created
March 16, 2011 03:52
-
-
Save macieksk/871985 to your computer and use it in GitHub Desktop.
multiprocessing, threading, pool, with access to db, web2py, parallel list process with example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#import threading | |
from multiprocessing import Process,Queue,Pool | |
from gluon.shell import exec_environment | |
def parallelListDB(fun, lst, nthreads=2, request=None, | |
timeout=10, aggregate = lambda lst: it.chain(*lst)): | |
assert request!=None | |
q=Queue() | |
def procf(i,sublst): | |
imidlog.log("started procf %d with len(sublst)=%d"%(i,len(sublst))) | |
ee = exec_environment('applications/welcome/models/db.py',request=request) | |
res=fun(i,ee.db,sublst) | |
q.put(res) | |
imidlog.log("procf %d finished"%i) | |
n=len(lst); | |
def assignSubLst(n,nthr): | |
a = n/nthr; r = n % nthr | |
assigLens=[a for _ in xrange(nthr)] | |
for i in xrange(r): assigLens[i]+=1 | |
s=[0] | |
def cs(v): | |
fr=s[0] | |
s[0]+=v | |
return (fr,s[0]) | |
return map(cs,assigLens) | |
thrs=[] | |
for i,(fr,to) in enumerate(assignSubLst(n,int(nthreads))): | |
imidlog.log("assigning sublist %d:%d"%(fr,to)) | |
sublst=lst[fr:to] | |
if len(sublst)>0: | |
mt = Process(target=procf, args=(i,sublst)) | |
mt.start() | |
thrs.append(mt) | |
res = [] | |
while ( (len(res)<len(thrs)) and | |
(q.qsize()>0 or any(t.is_alive() for t in thrs))): | |
imidlog.log("qsize=%d alives=%d"%(q.qsize(), sum(t.is_alive() for t in thrs))) | |
try: | |
while (len(res)<len(thrs)): res.append(q.get(True,timeout)) | |
break #if everything was gathered we return | |
except: pass | |
return aggregate(res) | |
def giveL2rs(db,request,patExps,probeData,chip_versions): | |
nexp=len(patExps) | |
def tupleForExpId(db,cnt,patexp): | |
expId=patexp.acgh_experiment.id | |
#untuple and add patid | |
l2rs = map(lambda (v,):v, db.executesql( | |
db((db.probe_experiment.acgh_exp_id==expId) | |
)._select(db.probe_experiment.l2r, | |
orderby=db.probe_experiment.probe_id) | |
)) | |
imidlog.log("processing acgh_exp %d/%d"%(cnt,nexp)) | |
return (l2rs,expId,patexp.chip_version.id) | |
seqofseq = parallelListDB( | |
lambda i,db,sublst: (i,[tupleForExpId(db,cnt,pe) for cnt,pe in sublst | |
if pe.auth_user.email!="macieksk@gmail.com"]), | |
list(enumerate(patExps)), | |
nthreads=Config.maxConcurrentJobs, | |
request=request, | |
#This complication is not needed if we don't need the result | |
#to be in the same order as it was submitted | |
aggregate=lambda res: it.chain( *(lst for i,lst in | |
sorted(res,key=lambda r:r[0])) ) | |
) | |
#[...] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment