Skip to content

Instantly share code, notes, and snippets.

@guerbai
Last active June 2, 2019 04:01
Show Gist options
  • Save guerbai/682a7c714c88edf2baea8656336d8ab8 to your computer and use it in GitHub Desktop.
Save guerbai/682a7c714c88edf2baea8656336d8ab8 to your computer and use it in GitHub Desktop.
python并发几种方式的演示
# -*- coding: utf-8 -*-
# import os
# print 'Process (%s) start...' % os.getpid()
# pid = os.fork()
# if pid==0:
# print 'I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())
# else:
# print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)
# from multiprocessing import Process
# import os
# import time
# # 子进程要执行的代码
# def run_proc(name):
# time.sleep(2)
# print 'Run child process %s (%s)...' % (name, os.getpid())
# if __name__=='__main__':
# print 'Parent process %s.' % os.getpid()
# p = Process(target=run_proc, args=('test',))
# print 'Process will start.'
# p.start()
# p.join()
# print 'Process end.'
#----------------------------------------------------------------------------------------------------------------------
# from multiprocessing import Pool
# import os, time, random
# def long_time_task(name):
# # while True:
# # pass
# print 'Run task %s (%s)...' % (name, os.getpid())
# start = time.time()
# time.sleep(10)
# # time.sleep(random.random() * 3)
# end = time.time()
# print 'Task %s runs %0.2f seconds.' % (name, (end - start))
# if __name__=='__main__':
# print 'Parent process %s.' % os.getpid()
# p = Pool(5) # 并不是几核心就只能跑几个进程,可以更多。
# for i in range(5):
# p.apply_async(long_time_task, args=(i,))
# print 'Waiting for all subprocesses done...'
# p.close()
# p.join()
# print 'All subprocesses done.'
# -------------------------------------------------------------------------------------------------------------------
# from multiprocessing import Process, Queue
# import os, time, random
# # 写数据进程执行的代码:
# senddata = ["a", "b", "c", "d", "e"]
# def write(q):
# for value in senddata:
# print 'Put %s to queue...' % value
# q.put(value)
# time.sleep(1)
# # time.sleep(random.random())
# # 读数据进程执行的代码:
# def read(q):
# while True:
# value = q.get(True)
# time.sleep(2)
# print 'Get %s from queue.' % value
# if __name__=='__main__':
# # 父进程创建Queue,并传给各个子进程:
# q = Queue()
# pw = Process(target=write, args=(q,))
# pr = Process(target=read, args=(q,))
# pr2 = Process(target=read, args=(q,))
# # 启动子进程pw,写入:
# pw.start()
# # 启动子进程pr,读取:
# pr.start()
# pr2.start()
# # 等待pw结束:
# pw.join()
# # pr进程里是死循环,无法等待其结束,只能强行终止:
# pr.terminate()
# pr2.terminate()
常见的Apache服务器就是由父进程监听端口,每当有新的http请求时,就fork出子进程来处理新的http请求。
join()方法可以等待子进程结束后主进程再继续往下运行,通常用于进程间的同步。
几个cpu
more /proc/cpuinfo |grep "physical id"|uniq|wc -l
每个cpu是几核(假设cpu配置相同)
more /proc/cpuinfo |grep "physical id"|grep "0"|wc -l
cat /proc/cpuinfo | grep processor
我们前面提到了进程是由若干线程组成的,一个进程至少有一个线程。
由于任何进程默认就会启动一个线程,我们把该线程称为主线程,主线程又可以启动新的线程,Python的threading模块有个current_thread()函数,它永远返回当前线程的实例。主线程实例的名字叫MainThread,子线程的名字在创建时指定,我们用LoopThread命名子线程。名字仅仅在打印时用来显示,完全没有其他意义,如果不起名字Python就自动给线程命名为Thread-1,Thread-2……
多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。
我们定义了一个共享变量balance,初始值为0,并且启动两个线程,先存后取,理论上结果应该为0,但是,由于线程的调度是由操作系统决定的,当t1、t2交替执行时,只要循环次数足够多,balance的结果就不一定是0了。
究其原因,是因为修改balance需要多条语句,而执行这几条语句时,线程可能中断,从而导致多个线程把同一个对象的内容改乱了。
如果我们要确保balance计算正确,就要给change_it()上一把锁,当某个线程开始执行change_it()时,我们说,该线程因为获得了锁,因此其他线程不能同时执行change_it(),只能等待,直到锁被释放后,获得该锁以后才能改。由于锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁,所以,不会造成修改的冲突。创建一个锁就是通过threading.Lock()来实现:
锁的好处就是确保了某段关键代码只能由一个线程从头到尾完整地执行,坏处当然也很多,首先是阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了。其次,由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁,导致多个线程全部挂起,既不能执行,也无法结束,只能靠操作系统强制终止。
ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。
多线程模式致命的缺点就是任何一个线程挂掉都可能直接造成整个进程崩溃,因为所有线程共享进程的内存。
如果有几千个任务同时进行,操作系统可能就主要忙着切换任务,根本没有多少时间去执行任务了,这种情况最常见的就是硬盘狂响,点窗口无反应,系统处于假死状态。
所以,多任务一旦多到一个限度,就会消耗掉系统所有的资源,结果效率急剧下降,所有任务都做不好。
计算密集型任务同时进行的数量应当等于CPU的核心数。
对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。
Nginx就是支持异步IO的Web服务器,它在单核CPU上采用单进程模型就可以高效地支持多任务。
所以子程序调用是通过栈实现的,一个线程就是执行一个子程序。
但协程的特点在于是一个线程执行,最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。
因为协程是一个线程执行,那怎么利用多核CPU呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。
整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。
由于gevent是基于IO切换的协程,所以最神奇的是,我们编写的Web App代码,不需要引入gevent的包,也不需要改任何代码,仅仅在部署的时候,用一个支持gevent的WSGI服务器,立刻就获得了数倍的性能提升。
这里选择Nginx,它可以处理静态资源,同时作为反向代理把动态请求交给gunicorn处理。gunicorn负责调用我们的Python代码,
http://xiaorui.cc/2014/11/22/%E7%94%A8gunicorn%E5%92%8Cgevent%E6%8F%90%E9%AB%98python-web%E6%A1%86%E6%9E%B6%E7%9A%84%E6%80%A7%E8%83%BD/
https://blog.tonyseek.com/post/the-context-mechanism-of-flask/
import time
import threading
import Queue
num_worker_threads = 8
class MyThread(threading.Thread):
def __init__(self, func, args, name = ''):
threading.Thread.__init__(self)
self.name = name
self.func = func
self.args = args
def getResult(self):
return self.res
def run(self):
print self.args
self.res = apply(self.func, self.args)
def do_work(item):
print str(item)+'done!'
time.sleep(2)
def worker(name):
while True:
if not q.empty():
item = q.get()
do_work(item)
q.task_done()
print 'task done by {}'.format(name)
def source():
a = 0
while True:
q.put(a)
print 'in source put {}'.format(str(a))
a += 1
time.sleep(0.5)
q = Queue.Queue()
producer = threading.Thread(target = source)
producer.daemon = True
producer.start()
for i in range(num_worker_threads):
t = MyThread(worker, ('Thread - {}'.format(i),))
#t = threading.Thread(target=worker, name = 'Threade - {}'.format(i))
t.daemon = True
t.start()
#q.join()
producer.join()
# -*- coding: utf-8 -*-
# import time, threading
# # 新线程执行的代码:
# def loop():
# print 'thread %s is running...' % threading.current_thread().name
# n = 0
# while n < 5:
# n = n + 1
# print 'thread %s >>> %s' % (threading.current_thread().name, n)
# time.sleep(1)
# print 'thread %s ended.' % threading.current_thread().name
# print 'thread %s is running...' % threading.current_thread().name
# t = threading.Thread(target=loop, name='LoopThread')
# t.start()
# # t.join()
# print 'thread %s ended.' % threading.current_thread().name
# ------------------------------------------------------------------------------------------------------------------------
# import time, threading
# # 假定这是你的银行存款:
# balance = 0
# lock = threading.Lock()
# def change_it(n):
# # 先存后取,结果应该为0:
# global balance
# balance = balance + n
# balance = balance - n
# def run_thread(n):
# for i in range(100000):
# # 先要获取锁:
# lock.acquire()
# try:
# # 放心地改吧:
# change_it(n)
# finally:
# # 改完了一定要释放锁:
# lock.release()
# def run_thread(n):
# for i in range(100000):
# change_it(n)
# t1 = threading.Thread(target=run_thread, args=(5,))
# t2 = threading.Thread(target=run_thread, args=(8,))
# t1.start()
# t2.start()
# t1.join()
# t2.join()
# print balance
# balance = 0
# lock = threading.Lock()
# ---------------------------------------------------------------------------------------------------------------------
# import threading, multiprocessing
# def loop():
# x = 0
# while True:
# x = x ^ 1
# # for i in range(multiprocessing.cpu_count()):
# for i in range(100):
# t = threading.Thread(target=loop)
# t.start()
# -*- coding: utf-8 -*-
import time
# def consumer():
# r = ''
# while True:
# n = yield r
# if not n:
# return
# print('[CONSUMER] Consuming %s...' % n)
# time.sleep(1)
# r = '200 OK'
# def produce(c):
# c.next()
# n = 0
# while n < 5:
# n = n + 1
# print('[PRODUCER] Producing %s...' % n)
# r = c.send(n)
# print('[PRODUCER] Consumer return: %s' % r)
# c.close()
# if __name__=='__main__':
# c = consumer()
# produce(c)
# ------------------------------------------------------------------------------------------------------------------
# from gevent import monkey; monkey.patch_socket()
# import gevent
# import time
# def f(n):
# for i in range(n):
# print gevent.getcurrent(), i
# gevent.sleep(1)
# g1 = gevent.spawn(f, 5)
# g2 = gevent.spawn(f, 5)
# g3 = gevent.spawn(f, 5)
# g1.join()
# g2.join()
# g3.join()
# ---------------------------------------------------------------------------------------------------------------------------
# from gevent import monkey; monkey.patch_all()
# import gevent
# import urllib2
# def f(url):
# print('GET: %s' % url)
# print time.time()
# resp = urllib2.urlopen(url)
# data = resp.read()
# print('%d bytes received from %s.' % (len(data), url))
# gevent.joinall([
# gevent.spawn(f, 'https://www.python.org/'),
# gevent.spawn(f, 'https://www.baidu.com/'),
# gevent.spawn(f, 'https://www.bing.com/'),
# ])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment