Skip to content

Instantly share code, notes, and snippets.

@ls0f
Last active April 22, 2018 17:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ls0f/2bf398b70e359f3ecf36 to your computer and use it in GitHub Desktop.
Save ls0f/2bf398b70e359f3ecf36 to your computer and use it in GitHub Desktop.
a simple gevent task queue
#coding:utf-8
from gevent import monkey
monkey.patch_all()
import logging
import gevent
logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger("GeventTaskQueue")
from gevent.queue import Queue, Empty
class GeventTaskQueue(object):
def __init__(self, worker_nums=5, work_forever=False, **kwargs):
self.task_queue = Queue()
self.worker = None
self.boss = None
self.worker_nums = worker_nums
self.work_forever = work_forever
self.init_kwargs(**kwargs)
def init_kwargs(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
def add_task(self, args):
assert isinstance(args, tuple) or isinstance(args, list)
LOGGER.debug("add task {}".format(args))
self.task_queue.put_nowait(args)
def set_worker_nums(self, worker_nums):
self.worker_nums = worker_nums
def register_worker(self, func):
self.worker = func
def register_boss(self, func, *args, **kwargs):
# func should return task args list
def _boss():
return func(*args, **kwargs)
self.boss = _boss
def _boss(self):
if self.boss is None:
return
while self.work_forever:
max_task_queue = getattr(self, "max_task_queue", 50)
if self.task_queue.qsize() >= max_task_queue:
boss_sleep_time = getattr(self, "boss_sleep_time", 5)
LOGGER.debug("boss will sleep {}s".format(boss_sleep_time))
gevent.sleep(boss_sleep_time)
else:
task_args = self.boss()
for args in task_args:
self.add_task(args)
def _worker(self, worker_no):
while 1:
try:
args = self.task_queue.get_nowait()
except Empty:
if self.work_forever:
sleep_time = getattr(self, "worker_sleep_time", 3)
LOGGER.debug("{} will sleep {} s".format(worker_no, sleep_time))
gevent.sleep(sleep_time)
continue
else:
LOGGER.debug("{} will return".format(worker_no))
return
LOGGER.debug("{} get task {}".format(worker_no, args))
self.worker(*args)
LOGGER.debug("{} done task {}".format(worker_no, args))
def start(self):
if self.worker is None:
raise Exception("you don't register a worker")
workers = [gevent.spawn(self._boss)] + \
[gevent.spawn(self._worker, "work %s" % i) for i in range(1, self.worker_nums + 1)]
gevent.joinall(workers)
def main():
import random
def worker(i):
gevent.sleep(i)
def boss():
return [[random.randint(1, 9)]]
# work_forever=False
TQ = GeventTaskQueue(worker_nums=2, work_forever=False)
for i in range(1, 5):
TQ.add_task([i])
TQ.register_worker(worker)
TQ.start()
# work_forever=True
TQ = GeventTaskQueue(worker_nums=3, work_forever=True)
TQ.register_boss(boss)
TQ.register_worker(worker)
TQ.start()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment