Skip to content

Instantly share code, notes, and snippets.

@ls0f
Created October 19, 2015 02:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ls0f/e2969ee868345a5af708 to your computer and use it in GitHub Desktop.
Save ls0f/e2969ee868345a5af708 to your computer and use it in GitHub Desktop.
模仿 celery 的gevent celery
#coding:utf-8
from gevent import monkey
monkey.patch_all()
import logging
import gevent
import datetime
logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger("GeventCelery")
from gevent.queue import Queue, Empty
from functools import wraps
class GeventCelery(object):
def __init__(self, *args, **kwargs):
self.task_queue = Queue()
self.init_kwargs(args, kwargs)
gevent.spawn(self._work)
def init_kwargs(self, *args, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
def register(self, *args, **kwargs):
def wraps_1(func):
@wraps(func)
def wraps_2(*args2, **kwargs2):
# return func(*args2, **kwargs2)
self.task_queue.put_nowait((func, args2, kwargs2))
LOGGER.debug("get task {} {} {}".format(func.__name__, args2, kwargs2))
return wraps_2
return wraps_1
def _work(self):
while 1:
try:
func, args, kwargs = self.task_queue.get_nowait()
except Empty:
gevent.sleep(getattr(self, "sleep_time", 1))
continue
LOGGER.debug("will run {} {} {}".format(func.__name__, args, kwargs))
res = func(*args, **kwargs)
LOGGER.debug("run {} {} {} done, result is {}, "
"task_queue size is {}".format(func.__name__, args, kwargs, res, self.task_queue.qsize()))
GC = GeventCelery()
@GC.register()
def sleep(i):
gevent.sleep(i)
if __name__ == "__main__":
import random
while 1:
i = random.randint(1, 3)
print "begin sleep"
sleep(i)
print "done sleep"
gevent.sleep(0.1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment