Skip to content

Instantly share code, notes, and snippets.

@johnfink8
Created April 9, 2012 01:48
Show Gist options
  • Save johnfink8/2340771 to your computer and use it in GitHub Desktop.
Save johnfink8/2340771 to your computer and use it in GitHub Desktop.
Process a list of input using a given function using multiple parallel processes.
'''
ItemProcessor
Written by: John Fink
Takes a list of objects, and a function, and some optional arguments to
that function. Runs that function (with those optional arguments) against
each element of the input list, in parallel. Results from each function
call can be retrieved in a list of tuples (along with the input) with
.result(), which will also optionally (by default) join and block until
the entire list is finished.
Sample usage:
tasks = [1,2,3,4,5,6]
def test(num,val):
return num > val
print itemprocessor.ItemProcessor(tasks).start(test,3).result()
'[(1, False), (2, False), (3, False), (4, True), (5, True), (6, True)]'
'''
import multiprocessing
class ItemProcessor(object):
def __init__(self,task_list,process_count=10):
self.m=multiprocessing.Manager()
self.tasks=self.m.Queue()
self.results=self.m.list()
for task in task_list:
self.tasks.put(task)
self.process_count=process_count
def join(self):
return self.tasks.join()
def _worker_process(self,*args,**kwargs):
func=kwargs['func']
del kwargs['func']
while True:
try:
task=self.tasks.get_nowait()
except:
return
self.results.append((task,func(task,*args,**kwargs)))
self.tasks.task_done()
def start(self,func,*args,**kwargs):
for i in xrange(0,self.process_count):
kwargs['func']=func
thread=multiprocessing.Process(target=self._worker_process,args=args,kwargs=kwargs)
thread.start()
return self
def result(self,join=True):
if join:
self.join()
return list(self.results)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment