Skip to content

Instantly share code, notes, and snippets.

@miniyk2012
Created July 30, 2018 15:45
Show Gist options
  • Save miniyk2012/4a2edf98493d91c60af06232b6c69582 to your computer and use it in GitHub Desktop.
Save miniyk2012/4a2edf98493d91c60af06232b6c69582 to your computer and use it in GitHub Desktop.
numpy并行运算
from time import time
from handythread import foreach
from multiprocessing.pool import Pool
import numpy as np
import math
def f(x):
# print(x)
y = [1]*10000000
[math.exp(i) for i in y]
def g(x):
# print(x)
y = np.ones(10000000)
np.exp(y)
def fornorm(f, l):
for i in l:
f(i)
result = {}
start_time = time()
fornorm(g,range(100))
end_time = time()
result['norm g 100'] = end_time - start_time
start_time = time()
fornorm(f,range(10))
end_time = time()
result['norm f 10'] = end_time - start_time
start_time = time()
foreach(g,range(100),threads=2)
end_time = time()
result['2 threads g 100'] = end_time - start_time
start_time = time()
foreach(f,range(10),threads=2)
end_time = time()
result['2 threads f 10'] = end_time - start_time
p = Pool(2)
start_time = time()
p.map(g,range(100))
end_time = time()
result['2 processes g 100'] = end_time - start_time
start_time = time()
p.map(f,range(10))
end_time = time()
result['2 processes f 10'] = end_time - start_time
print(result)
import sys
import time
import threading
from itertools import count
def foreach(f,l,threads=3,return_=False):
"""
Apply f to each element of l, in parallel
"""
if threads>1:
iteratorlock = threading.Lock()
exceptions = []
if return_:
n = 0
d = {}
i = zip(count(),l.__iter__())
else:
i = l.__iter__()
def runall():
while True:
iteratorlock.acquire()
try:
try:
if exceptions:
return
v = next(i)
finally:
iteratorlock.release()
except StopIteration:
return
try:
if return_:
n,x = v
d[n] = f(x)
else:
f(v)
except:
e = sys.exc_info()
iteratorlock.acquire()
try:
exceptions.append(e)
finally:
iteratorlock.release()
threadlist = [threading.Thread(target=runall) for j in range(threads)]
for t in threadlist:
t.start()
for t in threadlist:
t.join()
if exceptions:
a, b, c = exceptions[0]
raise a | b | c
if return_:
r = d.items()
r.sort()
return [v for (n,v) in r]
else:
if return_:
return [f(v) for v in l]
else:
for v in l:
f(v)
return
def parallel_map(f,l,threads=3):
return foreach(f,l,threads=threads,return_=True)
if __name__=='__main__':
def f(x):
print(x)
time.sleep(0.5)
foreach(f,range(10))
def g(x):
time.sleep(0.5)
print(x)
raise ValueError
time.sleep(0.5)
foreach(g,range(10))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment