Created
December 28, 2018 08:39
-
-
Save vimiix/14075aa492e59049a89f1bcbb2022926 to your computer and use it in GitHub Desktop.
自定义线程池
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
# Python的线程池实现 | |
import Queue | |
import threading | |
import sys | |
import time | |
# 替我们工作的线程池中的线程 | |
class MyThread(threading.Thread): | |
def __init__(self, workQueue, resultQueue, timeout=30, **kwargs): | |
threading.Thread.__init__(self, kwargs=kwargs) | |
# 线程在结束前等待任务队列多长时间 | |
self.timeout = timeout | |
self.setDaemon(True) | |
self.workQueue = workQueue | |
self.resultQueue = resultQueue | |
self.start() | |
def run(self): | |
while True: | |
try: | |
# 从工作队列中获取一个任务 | |
func, args, kwargs = self.workQueue.get(timeout=self.timeout) | |
# 我们要执行的任务 | |
res = func(args, kwargs) | |
# 报任务返回的结果放在结果队列中 | |
self.resultQueue.put(res+" | "+self.getName()) | |
except Queue.Empty: # 任务队列空的时候结束此线程 | |
break | |
except: | |
print(sys.exc_info()) | |
raise | |
class ThreadPool: | |
def __init__(self, num_of_threads=10): | |
self.workQueue = Queue.Queue() | |
self.resultQueue = Queue.Queue() | |
self.threads = [] | |
self.__createThreadPool(num_of_threads) | |
def __createThreadPool(self, num_of_threads): | |
for i in range(num_of_threads): | |
thread = MyThread(self.workQueue, self.resultQueue) | |
self.threads.append(thread) | |
def wait_for_complete(self): | |
# 等待所有线程完成。 | |
while len(self.threads): | |
thread = self.threads.pop() | |
# 等待线程结束 | |
if thread.isAlive(): # 判断线程是否还存活来决定是否调用join | |
thread.join() | |
def add_job(self, func, *args, **kwargs): | |
self.workQueue.put((func, args, kwargs)) | |
def todo_func(*args, **kwargs): | |
time.sleep(1) | |
def test(): | |
print('start testing') | |
tp = ThreadPool(10) | |
for i in range(50): | |
time.sleep(0.2) | |
tp.add_job(todo_func, i, i*0.001) | |
tp.wait_for_complete() | |
# 处理结果 | |
print('result Queue\'s length == %d ' % tp.resultQueue.qsize()) | |
while tp.resultQueue.qsize(): | |
print(tp.resultQueue.get()) | |
print('end testing') | |
if __name__ == '__main__': | |
test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment