Skip to content

Instantly share code, notes, and snippets.

@petrilli
Created January 13, 2015 06:16
Show Gist options
  • Save petrilli/ec17f7a070432ec155da to your computer and use it in GitHub Desktop.
Save petrilli/ec17f7a070432ec155da to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
from celery.app import app_or_default
Task = app_or_default().create_task_cls
class TransactionalTask(Task):
"""A task whose execution is delayed until after the current transaction.
"""
abstract = True
@classmethod
def original_apply_async(cls, *args, **kwargs):
"""Shortcut method to reach real implementation
of celery.Task.apply_sync
"""
return super(TransactionalTask, cls).apply_async(cls, *args, **kwargs)
@classmethod
def apply_async(cls, *args, **kwargs):
_get_manager().append((cls, args, kwargs))
@petrilli
Copy link
Author

Currently, exploding this way when used as a class:

File "/Users/petrilli/src/pyramid_transactional_celery/pyramid_transactional_celery/transactional_task.py", line 89, in tpc_finish
cls.original_apply_async(_args, *_kwargs)
File "/Users/petrilli/src/pyramid_transactional_celery/pyramid_transactional_celery/transactional_task.py", line 113, in original_apply_async
return super(TransactionalTask, cls).apply_async(cls, _args, *_kwargs)
File "/Users/petrilli/.virtualenvs/celery/lib/python3.4/site-packages/celery/app/task.py", line 555, in apply_async
*_dict(self._get_exec_options(), *_options)
TypeError: _get_exec_options() missing 1 required positional argument: 'self'

@malinoff
Copy link

Can you remove @classmethods and try again?
Also, do you use it like below?

@app.task(base=TransactionalTask)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment