Last active
April 22, 2018 17:40
-
-
Save ls0f/2bf398b70e359f3ecf36 to your computer and use it in GitHub Desktop.
a simple gevent task queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#coding:utf-8 | |
from gevent import monkey | |
monkey.patch_all() | |
import logging | |
import gevent | |
logging.basicConfig(level=logging.DEBUG) | |
LOGGER = logging.getLogger("GeventTaskQueue") | |
from gevent.queue import Queue, Empty | |
class GeventTaskQueue(object): | |
def __init__(self, worker_nums=5, work_forever=False, **kwargs): | |
self.task_queue = Queue() | |
self.worker = None | |
self.boss = None | |
self.worker_nums = worker_nums | |
self.work_forever = work_forever | |
self.init_kwargs(**kwargs) | |
def init_kwargs(self, **kwargs): | |
for key, value in kwargs.items(): | |
setattr(self, key, value) | |
def add_task(self, args): | |
assert isinstance(args, tuple) or isinstance(args, list) | |
LOGGER.debug("add task {}".format(args)) | |
self.task_queue.put_nowait(args) | |
def set_worker_nums(self, worker_nums): | |
self.worker_nums = worker_nums | |
def register_worker(self, func): | |
self.worker = func | |
def register_boss(self, func, *args, **kwargs): | |
# func should return task args list | |
def _boss(): | |
return func(*args, **kwargs) | |
self.boss = _boss | |
def _boss(self): | |
if self.boss is None: | |
return | |
while self.work_forever: | |
max_task_queue = getattr(self, "max_task_queue", 50) | |
if self.task_queue.qsize() >= max_task_queue: | |
boss_sleep_time = getattr(self, "boss_sleep_time", 5) | |
LOGGER.debug("boss will sleep {}s".format(boss_sleep_time)) | |
gevent.sleep(boss_sleep_time) | |
else: | |
task_args = self.boss() | |
for args in task_args: | |
self.add_task(args) | |
def _worker(self, worker_no): | |
while 1: | |
try: | |
args = self.task_queue.get_nowait() | |
except Empty: | |
if self.work_forever: | |
sleep_time = getattr(self, "worker_sleep_time", 3) | |
LOGGER.debug("{} will sleep {} s".format(worker_no, sleep_time)) | |
gevent.sleep(sleep_time) | |
continue | |
else: | |
LOGGER.debug("{} will return".format(worker_no)) | |
return | |
LOGGER.debug("{} get task {}".format(worker_no, args)) | |
self.worker(*args) | |
LOGGER.debug("{} done task {}".format(worker_no, args)) | |
def start(self): | |
if self.worker is None: | |
raise Exception("you don't register a worker") | |
workers = [gevent.spawn(self._boss)] + \ | |
[gevent.spawn(self._worker, "work %s" % i) for i in range(1, self.worker_nums + 1)] | |
gevent.joinall(workers) | |
def main(): | |
import random | |
def worker(i): | |
gevent.sleep(i) | |
def boss(): | |
return [[random.randint(1, 9)]] | |
# work_forever=False | |
TQ = GeventTaskQueue(worker_nums=2, work_forever=False) | |
for i in range(1, 5): | |
TQ.add_task([i]) | |
TQ.register_worker(worker) | |
TQ.start() | |
# work_forever=True | |
TQ = GeventTaskQueue(worker_nums=3, work_forever=True) | |
TQ.register_boss(boss) | |
TQ.register_worker(worker) | |
TQ.start() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment