Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Python线程池
# coding=utf-8
"""
线程池.
:copyright: (c) 2015 by fangpeng.
:license: MIT, see LICENSE for more details.
"""
import sys
import Queue
import threading
class Worker(threading.Thread):
def __init__(self, in_queue, out_queue, err_queue):
"""初始化并启动一个工作线程(work thread),
:param in_queue: 等待执行的队列
:param out_queue: 任务执行结果的队列
:param err_queue: 任务错误信息的队列
"""
threading.Thread.__init__(self)
self.setDaemon(True)
self.in_queue = in_queue
self.out_queue = out_queue
self.err_queue = err_queue
self.start()
def run(self):
while 1:
# 在in_queue队列中处理任务直到"command"为"stop"状态.
command, callback, args, kwargs = self.in_queue.get()
if command == 'stop':
break
try:
if command != 'process':
raise ValueError('Unknown command %r' % command)
except:
self.report_error()
else:
self.out_queue.put(callback(*args, **kwargs))
def dismiss(self):
command = 'stop'
self.in_queue.put((command, None, None, None))
def report_error(self):
"""通过添加错误信息到err_queue报告错误"""
self.err_queue.put(sys.exc_info()[:2])
class ThreadPool(object):
"""Manager thread pool."""
max_threads = 32
def __init__(self, num_threads, pool_size=0):
"""在线程池中生成num_threads个线程并初始化上述三个队列.
:param num_threads: 线程个数
:param pool_size: 线程池缓冲区大小,0表示缓冲区是无限。
"""
num_threads = ThreadPool.max_threads if num_threads > ThreadPool.max_threads else num_threads
self.in_queue = Queue.Queue(pool_size)
self.out_queue = Queue.Queue(pool_size)
self.err_queue = Queue.Queue(pool_size)
self.workers = {}
for i in range(num_threads):
worker = Worker(self.in_queue, self.out_queue, self.err_queue)
self.workers[i] = worker
def add_task(self, callback, *args, **kwargs):
command = "process"
self.in_queue.put((command, callback, args, kwargs))
def _get_results(self, queue):
"""Generator to yield one after the others all items currently
in the queue, without any waiting
"""
try:
while True:
# Equivalent to get(False)
yield queue.get_nowait() # Remove and return an item from the queue without blocking.
except Queue.Empty:
raise StopIteration
def get_task(self):
return self.out_queue.get()
def show_results(self):
for result in self._get_results(self.out_queue):
print 'Result:', result
def show_errors(self):
for etyp, err in self._get_results(self.err_queue):
print 'Error:', etyp, err
def destroy(self):
# 顺序很重要,第一,要停止所有的线程...:
for i in self.workers:
self.workers[i].dismiss()
# ...然后, 等待每个线程的终止:
for i in self.workers:
self.workers[i].join()
# clean up the workers from now-unused thread objects
del self.workers
if __name__ == '__main__':
import time
def stuff(arg):
time.sleep(90)
print 'thread:', arg
pool = ThreadPool(3)
pool.add_task(stuff, 'ts')
# Join and destroy all threads
pool.destroy()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment