Skip to content

Instantly share code, notes, and snippets.

@bencord0
Created August 31, 2012 09:09
Show Gist options
  • Save bencord0/3550592 to your computer and use it in GitHub Desktop.
Save bencord0/3550592 to your computer and use it in GitHub Desktop.
A multithreaded implementation of python's map().
def concurrent_map(func, *iterm, **kwargs):
"""
threaded version of map()
Usage:
>>> my_data = [1,2,3,4,5]
>>> def my_func(x):
... return x*x
...
>>> concurrent_map(my_func, my_data)
[1, 4, 9, 16, 25]
>>> concurrent_map(lambda x,y: x+y, [1,3,5,7,9], [2,4,6,8])
[3, 7, 11, 15, TypeError("unsupported operand type(s) for +: 'int' and 'NoneType'",)]
"""
# Imports are here. If you are copy/pasting this into other code,
# this won't fill your namespace. If already imported, no harm.
# Refactor to your needs.
import threading
from time import sleep
tasks = {} # Dictionary of threads (key) and result of the function (value)
tasks_lock = threading.Lock() # protect the tasks
task_order = [] # The order of keys for tasks is unpredictable, add them in order here.
threads = []
pause = kwargs.get('pause', 0)
daemon = kwargs.get('daemon', False)
def func_wrapper(func, arg):
idx = threading.current_thread().name
# With inspiration from twisted, treat exceptions as return values.
try:
result = func(*arg)
except Exception as e:
result = e
with tasks_lock:
tasks[idx] = result
# Pad short lists with None
for i in iterm:
i.extend([None] * (max([len(n) for n in iterm])-len(i)))
# Form a list of args
args_list = zip(*iterm)
for args in args_list:
thread = threading.Thread(target=func_wrapper, args=(func, args))
thread.daemon = daemon
threads.append(thread)
task_order.append(thread.name)
thread.start()
sleep(pause)
for thread in threads:
thread.join()
results = [tasks[idx] for idx in task_order]
return results
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment