Created
July 1, 2011 18:07
-
-
Save drakeguan/1059090 to your computer and use it in GitHub Desktop.
concurrent map
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## {{{ http://code.activestate.com/recipes/577360/ (r1) | |
import threading | |
def concurrent_map(func, data): | |
""" | |
Similar to the bultin function map(). But spawn a thread for each argument | |
and apply `func` concurrently. | |
Note: unlike map(), we cannot take an iterable argument. `data` should be an | |
indexable sequence. | |
""" | |
N = len(data) | |
result = [None] * N | |
# wrapper to dispose the result in the right slot | |
def task_wrapper(i): | |
result[i] = func(data[i]) | |
threads = [threading.Thread(target=task_wrapper, args=(i,)) for i in xrange(N)] | |
for t in threads: | |
t.start() | |
for t in threads: | |
t.join() | |
return result | |
## end of http://code.activestate.com/recipes/577360/ }}} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def dmap1(func, data): | |
"""distributed map (v1) derived from concurrent_map with supporting of iterable argument. | |
""" | |
result = {} | |
# wrapper to dispose the result in the right slot | |
def task_wrapper(i, d): | |
result[i] = func(d) | |
threads = [threading.Thread(target=task_wrapper, args=(i, d)) for (i, d) in enumerate(data)] | |
for t in threads: | |
t.start() | |
for t in threads: | |
t.join() | |
return [result[i] for i in xrange(len(result))] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def dmap2(func, *data): | |
"""distributed map (v2) derived from dmap1 with supporting of multiple arguments. | |
""" | |
result = {} | |
# wrapper to dispose the result in the right slot | |
def task_wrapper(i, t): | |
result[i] = func(*list(t)) | |
threads = [threading.Thread(target=task_wrapper, args=(i, t)) for (i, t) in enumerate(zip(*data))] | |
for t in threads: | |
t.start() | |
for t in threads: | |
t.join() | |
return [result[i] for i in xrange(len(result))] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
blog: http://drakeguan.org/node/639