Skip to content

Instantly share code, notes, and snippets.

@yxy
Created October 29, 2015 09:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yxy/b05fc301f88a4651c73b to your computer and use it in GitHub Desktop.
Save yxy/b05fc301f88a4651c73b to your computer and use it in GitHub Desktop.
simple thread pool workers
#-*- encoding: utf8 -*-
import pickle
import sys
from threading import Thread
PY2 = sys.version_info[0] == 2
if PY2:
from Queue import Queue
else:
from queue import Queue
class Worker(Thread):
def __init__(self, pool):
self.pool = pool
Thread.__init__(self)
self.tasks = self.pool.tasks
# set daemon to True from main thread
self.setDaemon(True)
self.start()
def run(self):
while True:
(func, args, kwargs) = self.pool.get_task()
try:
func(*args, **kwargs)
except Exception as e:
sys.stderr.write(
"thread execute task {} args: {} kwargs: {} failed, exc: {}"
.format(func.__name__, args, kwargs, e))
finally:
self.tasks.task_done()
# update task info
class ThreadPool(object):
def __init__(self, thread_num):
self.tasks = Queue(thread_num)
# start workers
[Worker(self) for i in range(thread_num)]
def add_task(self, func, *args, **kwargs):
self.tasks.put(pickle.dumps((func, args, kwargs)))
def get_task(self):
return pickle.loads(self.tasks.get())
def wait_completion(self):
self.tasks.join()
def test_func(tix=2):
import time
time.sleep(tix)
print("sleep done")
raise IOError("NOooooooooooooooo!")
def main():
pool = ThreadPool(3)
[pool.add_task(test_func, tix=1) for i in range(10)]
pool.wait_completion()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment