Skip to content

Instantly share code, notes, and snippets.

@drakeguan
Created July 1, 2011 18:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save drakeguan/1059090 to your computer and use it in GitHub Desktop.
Save drakeguan/1059090 to your computer and use it in GitHub Desktop.
concurrent map
## {{{ http://code.activestate.com/recipes/577360/ (r1)
import threading
def concurrent_map(func, data):
"""
Similar to the bultin function map(). But spawn a thread for each argument
and apply `func` concurrently.
Note: unlike map(), we cannot take an iterable argument. `data` should be an
indexable sequence.
"""
N = len(data)
result = [None] * N
# wrapper to dispose the result in the right slot
def task_wrapper(i):
result[i] = func(data[i])
threads = [threading.Thread(target=task_wrapper, args=(i,)) for i in xrange(N)]
for t in threads:
t.start()
for t in threads:
t.join()
return result
## end of http://code.activestate.com/recipes/577360/ }}}
def dmap1(func, data):
"""distributed map (v1) derived from concurrent_map with supporting of iterable argument.
"""
result = {}
# wrapper to dispose the result in the right slot
def task_wrapper(i, d):
result[i] = func(d)
threads = [threading.Thread(target=task_wrapper, args=(i, d)) for (i, d) in enumerate(data)]
for t in threads:
t.start()
for t in threads:
t.join()
return [result[i] for i in xrange(len(result))]
def dmap2(func, *data):
"""distributed map (v2) derived from dmap1 with supporting of multiple arguments.
"""
result = {}
# wrapper to dispose the result in the right slot
def task_wrapper(i, t):
result[i] = func(*list(t))
threads = [threading.Thread(target=task_wrapper, args=(i, t)) for (i, t) in enumerate(zip(*data))]
for t in threads:
t.start()
for t in threads:
t.join()
return [result[i] for i in xrange(len(result))]
@drakeguan
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment