Skip to content

Instantly share code, notes, and snippets.

@jbylund
Created March 9, 2016 21:58
Show Gist options
  • Save jbylund/9b80e88e5b12434feeff to your computer and use it in GitHub Desktop.
Save jbylund/9b80e88e5b12434feeff to your computer and use it in GitHub Desktop.
#!/usr/bin/python
"""
Something that's kind of sort of like a multiprocessing pool, but is designed for heterogenously sized tasks
"""
import math
import multiprocessing
import random
import sys
import time
import Queue
class Worker(multiprocessing.Process):
def __init__(self, workers_queue, output_queue, workers_used, func, args, kwargs):
self.workers_queue = workers_queue
self.output_queue = output_queue
self.workers_used = workers_used
self.func = func
self.args = args
self.kwargs = kwargs
super(Worker, self).__init__()
def run(self):
retval = None
try:
retval = self.func(*self.args, **self.kwargs)
except Exception as oops:
print >> sys.stderr, oops
finally:
for worker_id in self.workers_used:
self.workers_queue.put(worker_id)
self.output_queue.put(retval)
return retval
class HetPool(object):
def __init__(self, workers=multiprocessing.cpu_count()):
self.total_workers = workers
self.available_workers = multiprocessing.Queue()
self.output = multiprocessing.Queue()
for worker_id in xrange(self.total_workers):
self.available_workers.put(worker_id)
def imap_unordered(self, func, iterable):
for task_cost, task_args in iterable:
task_workers = list()
while len(task_workers) < task_cost:
worker_id = self.available_workers.get()
task_workers.append(worker_id)
try:
yield self.output.get(False)
except Queue.Empty as oops:
pass
iworker = Worker(self.available_workers, self.output, task_workers, func, *task_args)
iworker.start()
def super_sleep(sleep_time):
time.sleep(sleep_time)
return sleep_time
def main():
mypool = HetPool()
tasks_list = ((random.randint(1, 3), ((random.random() * 3,), dict())) for x in xrange(20))
start_time = time.time()
total_sleep_time = 0
for sleep_time in mypool.imap_unordered(super_sleep, tasks_list):
total_sleep_time += sleep_time
print time.time() - start_time, total_sleep_time, sleep_time
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment