-
-
Save stoensin/ad7a0a726db48251422caf97dd54c1b4 to your computer and use it in GitHub Desktop.
python因为其全局解释器锁GIL而无法通过线程实现真正的平行计算。 IO密集型:读取文件,读取网络套接字频繁。 计算密集型:大量消耗CPU的数学与逻辑运算,也就是我们这里说的平行计算。 而concurrent.futures模块,可以利用multiprocessing实现真正的平行计算。 核心原理是:concurrent.futures会以子进程的形式,平行的运行多个python解释器,从而令python程序可以利用多核CPU来提升执行速度。由于子进程与主解释器相分离,所以他们的全局解释器锁也是相互独立的。每个子进程都能够完整的使用一个CPU内核。
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# 求最大公约数 | |
def gcd(pair): | |
a, b = pair | |
low = min(a, b) | |
for i in range(low, 0, -1): | |
if a % i == 0 and b % i == 0: | |
return i | |
numbers = [ | |
(1963309, 2265973), (1879675, 2493670), (2030677, 3814172), | |
(1551645, 2229620), (1988912, 4736670), (2198964, 7876293) | |
] | |
import time | |
from concurrent.futures import ThreadPoolExecutor, Executor | |
start = time.time() | |
pool = ThreadPoolExecutor(max_workers=2) | |
results = list(pool.map(gcd, numbers)) | |
end = time.time() | |
print ('Took %.3f seconds.' % (end - start) ) | |
#Took 2.840 seconds. | |
import time | |
from concurrent.futures import ProcessPoolExecutor, Executor | |
start = time.time() | |
pool = ProcessPoolExecutor(max_workers=2) | |
results = list(pool.map(gcd, numbers)) | |
end = time.time() | |
print 'Took %.3f seconds.' % (end - start) | |
# Took 1.861 seconds. 进程更高效 | |
''' | |
函数gcd是一个CPU计算密集型函数,因为GIL的原因,多线程是无法提升效率的。同时,线程启动的时候,有一定的开销,与线程池进行通信,也会有开销,所以这个程序使用了多线程反而更慢了。由于GIL限制,建议:IO密集的任务,用ThreadPoolExecutor;CPU密集任务,用ProcessPoolExcutor。 | |
**************************** * ************************** | |
在两个CPU核心的机器上运行多进程程序,比其他两个版本都快。这是因为,ProcessPoolExecutor类会利用multiprocessing模块所提供的底层机制,完成下列操作: | |
1)把numbers列表中的每一项输入数据都传给map。 | |
2)用pickle模块对数据进行序列化,将其变成二进制形式。 | |
3)通过本地套接字,将序列化之后的数据从煮解释器所在的进程,发送到子解释器所在的进程。 | |
4)在子进程中,用pickle对二进制数据进行反序列化,将其还原成python对象。 | |
5)引入包含gcd函数的python模块。 | |
6)各个子进程并行的对各自的输入数据进行计算。 | |
7)对运行的结果进行序列化操作,将其转变成字节。 | |
8)将这些字节通过socket复制到主进程之中。 | |
9)主进程对这些字节执行反序列化操作,将其还原成python对象。 | |
10)最后,把每个子进程所求出的计算结果合并到一份列表之中,并返回给调用者。 | |
multiprocessing开销比较大,原因就在于:主进程和子进程之间通信,必须进行序列化和反序列化的操作。 | |
''' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from concurrent import futures | |
import threading | |
import time | |
def task(n): | |
print("Launching task {}".format(n)) | |
time.sleep(n) | |
print('{}: done with {}'.format(threading.current_thread().name, n)) | |
return n | |
with futures.ThreadPoolExecutor(max_workers=5) as ex: | |
results = ex.map(task, range(1, 6), timeout=3) | |
outputs = [] | |
try: | |
for i in results: | |
outputs.append(i) | |
except futures._base.TimeoutError: | |
print("TIMEOUT") | |
print(outputs) | |
''' | |
Launching task 1 | |
Launching task 2 | |
Launching task 3 | |
Launching task 4 | |
Launching task 5 | |
ThreadPoolExecutor-5_0: done with 1 | |
ThreadPoolExecutor-5_1: done with 2 | |
TIMEOUT | |
[1, 2] | |
ThreadPoolExecutor-5_2: done with 3 | |
ThreadPoolExecutor-5_3: done with 4 | |
ThreadPoolExecutor-5_4: done with 5 | |
''' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf8 | |
import os | |
import sys | |
import math | |
import time | |
import fire | |
from concurrent.futures import ProcessPoolExecutor, wait | |
# 分割子任务 | |
def each_task(n): | |
# 按公式计算圆周率 | |
s = 0.0 | |
for i in range(n): | |
s += 1.0/(i+1)/(i+1) | |
pi = math.sqrt(6*s) | |
# os.getpid可以获得子进程号 | |
sys.stdout.write("process %s n=%d pi=%s\n" % (os.getpid(), n, pi)) | |
return pi | |
def run(process_num, *ns): # 输入多个n值,分成多个子任务来计算结果 | |
# 实例化进程池,process_num个进程 | |
executor = ProcessPoolExecutor(process_num) | |
start = time.time() | |
fs = [] # future列表 | |
for n in ns: | |
fs.append(executor.submit(each_task, int(n))) # 提交任务 | |
wait(fs) # 等待计算结束 | |
end = time.time() | |
duration = end - start | |
print "total cost: %.2fs" % duration | |
executor.shutdown() # 销毁进程池 | |
if __name__ == '__main__': | |
fire.Fire(run) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
concurrent提供了两种并发模型,一个是多线程ThreadPoolExecutor,一个是多进程ProcessPoolExecutor。对于IO密集型任务宜使用多线程模型。对于计算密集型任务应该使用多进程模型。 | |
为什么要这样选择呢?是因为Python GIL的存在让Python虚拟机在进行运算时无法有效利用多核心。对于纯计算任务,它永远最多只能榨干单个CPU核心。如果要突破这个瓶颈,就必须fork出多个子进程来分担计算任务。而对于IO密集型任务,CPU使用率往往是极低的,使用多线程虽然会加倍CPU使用率,但是还远远到不了饱和(100%)的地步,在单核心可以应付整体计算的前提下,自然是应该选择资源占用少的模式,也就是多线程模式。 | |
''' | |
import time | |
import fire | |
import threading | |
from concurrent.futures import ThreadPoolExecutor, wait | |
# 分割子任务 | |
def each_task(index): | |
time.sleep(1) # 睡1s,模拟IO | |
print "thread %s square %d" % (threading.current_thread().ident, index) | |
return index * index # 返回结果 | |
def run(thread_num, task_num): | |
# 实例化线程池,thread_num个线程 | |
executor = ThreadPoolExecutor(thread_num) | |
start = time.time() | |
fs = [] # future列表 | |
for i in range(task_num): | |
fs.append(executor.submit(each_task, i)) # 提交任务 | |
wait(fs) # 等待计算结束 | |
end = time.time() | |
duration = end - start | |
s = sum([f.result() for f in fs]) # 求和 | |
print "total result=%s cost: %.2fs" % (s, duration) | |
executor.shutdown() # 销毁线程池 | |
if __name__ == '__main__': | |
fire.Fire(run) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment