Skip to content

Instantly share code, notes, and snippets.

@stoensin
Last active November 14, 2022 12:15
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stoensin/ad7a0a726db48251422caf97dd54c1b4 to your computer and use it in GitHub Desktop.
Save stoensin/ad7a0a726db48251422caf97dd54c1b4 to your computer and use it in GitHub Desktop.
python因为其全局解释器锁GIL而无法通过线程实现真正的平行计算。 IO密集型:读取文件,读取网络套接字频繁。 计算密集型:大量消耗CPU的数学与逻辑运算,也就是我们这里说的平行计算。 而concurrent.futures模块,可以利用multiprocessing实现真正的平行计算。 核心原理是:concurrent.futures会以子进程的形式,平行的运行多个python解释器,从而令python程序可以利用多核CPU来提升执行速度。由于子进程与主解释器相分离,所以他们的全局解释器锁也是相互独立的。每个子进程都能够完整的使用一个CPU内核。
# 求最大公约数
def gcd(pair):
a, b = pair
low = min(a, b)
for i in range(low, 0, -1):
if a % i == 0 and b % i == 0:
return i
numbers = [
(1963309, 2265973), (1879675, 2493670), (2030677, 3814172),
(1551645, 2229620), (1988912, 4736670), (2198964, 7876293)
]
import time
from concurrent.futures import ThreadPoolExecutor, Executor
start = time.time()
pool = ThreadPoolExecutor(max_workers=2)
results = list(pool.map(gcd, numbers))
end = time.time()
print ('Took %.3f seconds.' % (end - start) )
#Took 2.840 seconds.
import time
from concurrent.futures import ProcessPoolExecutor, Executor
start = time.time()
pool = ProcessPoolExecutor(max_workers=2)
results = list(pool.map(gcd, numbers))
end = time.time()
print 'Took %.3f seconds.' % (end - start)
# Took 1.861 seconds. 进程更高效
'''
函数gcd是一个CPU计算密集型函数,因为GIL的原因,多线程是无法提升效率的。同时,线程启动的时候,有一定的开销,与线程池进行通信,也会有开销,所以这个程序使用了多线程反而更慢了。由于GIL限制,建议:IO密集的任务,用ThreadPoolExecutor;CPU密集任务,用ProcessPoolExcutor。
**************************** * **************************
在两个CPU核心的机器上运行多进程程序,比其他两个版本都快。这是因为,ProcessPoolExecutor类会利用multiprocessing模块所提供的底层机制,完成下列操作:
1)把numbers列表中的每一项输入数据都传给map。
2)用pickle模块对数据进行序列化,将其变成二进制形式。
3)通过本地套接字,将序列化之后的数据从煮解释器所在的进程,发送到子解释器所在的进程。
4)在子进程中,用pickle对二进制数据进行反序列化,将其还原成python对象。
5)引入包含gcd函数的python模块。
6)各个子进程并行的对各自的输入数据进行计算。
7)对运行的结果进行序列化操作,将其转变成字节。
8)将这些字节通过socket复制到主进程之中。
9)主进程对这些字节执行反序列化操作,将其还原成python对象。
10)最后,把每个子进程所求出的计算结果合并到一份列表之中,并返回给调用者。
multiprocessing开销比较大,原因就在于:主进程和子进程之间通信,必须进行序列化和反序列化的操作。
'''
from concurrent import futures
import threading
import time
def task(n):
print("Launching task {}".format(n))
time.sleep(n)
print('{}: done with {}'.format(threading.current_thread().name, n))
return n
with futures.ThreadPoolExecutor(max_workers=5) as ex:
results = ex.map(task, range(1, 6), timeout=3)
outputs = []
try:
for i in results:
outputs.append(i)
except futures._base.TimeoutError:
print("TIMEOUT")
print(outputs)
'''
Launching task 1
Launching task 2
Launching task 3
Launching task 4
Launching task 5
ThreadPoolExecutor-5_0: done with 1
ThreadPoolExecutor-5_1: done with 2
TIMEOUT
[1, 2]
ThreadPoolExecutor-5_2: done with 3
ThreadPoolExecutor-5_3: done with 4
ThreadPoolExecutor-5_4: done with 5
'''
# coding: utf8
import os
import sys
import math
import time
import fire
from concurrent.futures import ProcessPoolExecutor, wait
# 分割子任务
def each_task(n):
# 按公式计算圆周率
s = 0.0
for i in range(n):
s += 1.0/(i+1)/(i+1)
pi = math.sqrt(6*s)
# os.getpid可以获得子进程号
sys.stdout.write("process %s n=%d pi=%s\n" % (os.getpid(), n, pi))
return pi
def run(process_num, *ns): # 输入多个n值,分成多个子任务来计算结果
# 实例化进程池,process_num个进程
executor = ProcessPoolExecutor(process_num)
start = time.time()
fs = [] # future列表
for n in ns:
fs.append(executor.submit(each_task, int(n))) # 提交任务
wait(fs) # 等待计算结束
end = time.time()
duration = end - start
print "total cost: %.2fs" % duration
executor.shutdown() # 销毁进程池
if __name__ == '__main__':
fire.Fire(run)
'''
concurrent提供了两种并发模型,一个是多线程ThreadPoolExecutor,一个是多进程ProcessPoolExecutor。对于IO密集型任务宜使用多线程模型。对于计算密集型任务应该使用多进程模型。
为什么要这样选择呢?是因为Python GIL的存在让Python虚拟机在进行运算时无法有效利用多核心。对于纯计算任务,它永远最多只能榨干单个CPU核心。如果要突破这个瓶颈,就必须fork出多个子进程来分担计算任务。而对于IO密集型任务,CPU使用率往往是极低的,使用多线程虽然会加倍CPU使用率,但是还远远到不了饱和(100%)的地步,在单核心可以应付整体计算的前提下,自然是应该选择资源占用少的模式,也就是多线程模式。
'''
import time
import fire
import threading
from concurrent.futures import ThreadPoolExecutor, wait
# 分割子任务
def each_task(index):
time.sleep(1) # 睡1s,模拟IO
print "thread %s square %d" % (threading.current_thread().ident, index)
return index * index # 返回结果
def run(thread_num, task_num):
# 实例化线程池,thread_num个线程
executor = ThreadPoolExecutor(thread_num)
start = time.time()
fs = [] # future列表
for i in range(task_num):
fs.append(executor.submit(each_task, i)) # 提交任务
wait(fs) # 等待计算结束
end = time.time()
duration = end - start
s = sum([f.result() for f in fs]) # 求和
print "total result=%s cost: %.2fs" % (s, duration)
executor.shutdown() # 销毁线程池
if __name__ == '__main__':
fire.Fire(run)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment