Skip to content

Instantly share code, notes, and snippets.

@mattbierbaum
Last active September 12, 2016 19:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mattbierbaum/e9424854cfb256091139e9301079b7ed to your computer and use it in GitHub Desktop.
Save mattbierbaum/e9424854cfb256091139e9301079b7ed to your computer and use it in GitHub Desktop.
import os
import uuid
from celery import signals
from celery import Celery, Task
from celery.utils.log import get_task_logger
app = Celery('simple_celery')
logger = get_task_logger(__name__)
class Worker(object):
def __init__(self):
logger.fatal("INIT")
self.calls = 0
self.logger = logger
self.uuid = uuid.uuid4()
def run(self, n):
self.calls += 1
self.logger.fatal(
'PID: {}, UUID: {}, RUN: {}, CALLS: {}'.format(
os.getpid(), self.uuid, n, self.calls
)
)
return 2*n
def echo(self, text):
self.calls += 1
self.logger.fatal(
'PID: {}, UUID: {}, ECHO: {}, CALLS: {}'.format(
os.getpid(), self.uuid, text, self.calls
)
)
return text
class AbstractTask(Task):
abstract = True
_obj = None
@property
def obj(self):
if AbstractTask._obj is None:
AbstractTask._obj = self.new()
return AbstractTask._obj
def new(self):
return Worker()
registry = ['run', 'echo']
task_kwargs = {'bind': True}
def load_tasks():
tasks = []
for task in registry:
fullname = 'simple_celery.{}'.format(task)
wrap2 = app.task(base=AbstractTask, name=fullname, **task_kwargs)
wrap1 = taskcall(task)
def stub(*args, **kwargs):
pass
tasks.append(wrap2(wrap1(stub)))
return tasks
def taskcall(obj_func):
def task_wrapper(func):
def _wrap(self, *args, **kwargs):
# reference to self are given by `bind` task argument
try:
return getattr(self.obj, obj_func)(*args, **kwargs)
except (SystemExit, KeyboardInterrupt) as e:
self.retry(exc=e)
except Exception as e:
self.obj.logger.exception(
'%r had an exception\n%r' % (obj_func, e)
)
raise e
return _wrap
return task_wrapper
_tasks = load_tasks()
app.send_task('simple_celery.run', kwargs={'n': 10})
app.send_task('simple_celery.run', kwargs={'n': 10})
app.send_task('simple_celery.echo', kwargs={'text': 'break, please'})
app.send_task('simple_celery.echo', kwargs={'text': 'break, please'})
@mattbierbaum
Copy link
Author

mattbierbaum commented Sep 2, 2016

Run with celery -A simple_celery worker --concurrency=1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment