Skip to content

Instantly share code, notes, and snippets.

@howardhamilton
Created October 3, 2018 20:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save howardhamilton/11d6ee0a8b0acb29b9ed82338ca1e849 to your computer and use it in GitHub Desktop.
Save howardhamilton/11d6ee0a8b0acb29b9ed82338ca1e849 to your computer and use it in GitHub Desktop.
Multiprocessing pattern
import multiprocessing
class Consumer(multiprocessing.Process):
"""Consumer processes. Subclassed from Process in multiprocessing."""
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
print '%s: Exiting' % proc_name
self.task_queue.task_done()
break
print '%s: %s' % (proc_name, next_task)
answer = next_task() # carry out task
self.task_queue.task_done() # mark task as complete
self.result_queue.put(answer) # put answer in result queue
return
class Task(object):
def __call__(self):
return foo()
def __str__(self):
return "Processing {}".format(self.file_name)
def foo():
return
if __name__ == "__main__":
# Establish communication queues
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
# Start consumers
num_consumers = multiprocessing.cpu_count() * 2
print('Creating {} consumers'.format(num_consumers))
consumers = [Consumer(tasks, results) for i in range(num_consumers)]
for w in consumers:
w.start()
# Enqueue jobs
tasks.put(Task())
# Add poison pill for each consumer
for i in range(num_consumers):
tasks.put(None)
# Wait for all tasks to finish
tasks.join()
# Print results
while num_jobs:
result = results.get()
print(result)
num_jobs -= 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment