Skip to content

Instantly share code, notes, and snippets.

@tapanpandita
Created October 8, 2016 03:56
Show Gist options
  • Save tapanpandita/46d2e2f63c7425547a865cb6298a172f to your computer and use it in GitHub Desktop.
Save tapanpandita/46d2e2f63c7425547a865cb6298a172f to your computer and use it in GitHub Desktop.
Transaction aware celery abstract task
class TransactionAwareTask(Task):
'''
Task class which is aware of django db transactions and only executes tasks
after transaction has been committed
'''
abstract = True
def apply_async(self, *args, **kwargs):
'''
Unlike the default task in celery, this task does not return an async
result
'''
transaction.on_commit(
lambda: super(TransactionAwareTask, self).apply_async(
*args, **kwargs))
@alexche8
Copy link

How transaction variable appeared there?

@marksweb
Copy link

marksweb commented Dec 4, 2017

I was just wondering that as well @alexche8

But it's as simple as importing transaction so I'm doing this in celery.py;

from django.db import transaction

from celery import Celery, Task

app = Celery('project')

app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()


class TransactionAwareTask(Task):
    """
    Task class which is aware of django db transactions and only executes
    tasks after transaction has been committed
    """
    abstract = True

    def apply_async(self, *args, **kwargs):
        """
        Unlike the default task in celery, this task does not return an async
        result
        """
        transaction.on_commit(
            lambda: super(TransactionAwareTask, self).apply_async(
                *args, **kwargs)
        )

@tamhv
Copy link

tamhv commented May 4, 2018

celery beat can't start with these tasks

[2018-05-04 18:57:42,588: INFO/MainProcess] DatabaseScheduler: Schedule changed.
[2018-05-04 18:57:42,589: INFO/MainProcess] Writing entries...
[2018-05-04 18:57:42,606: DEBUG/MainProcess] DatabaseScheduler: Fetching database schedule
[2018-05-04 18:57:42,614: DEBUG/MainProcess] Current schedule:
<ModelEntry: testing myapp.celerytasks.app_hello(*[], **{}) <freq: 10.00 seconds>>
...
File "/Users/me/env/lib/python3.5/site-packages/celery/beat.py", line 227, in apply_entry
[2018-05-04 18:58:00,046: WARNING/MainProcess] debug('%s sent. id->%s', entry.task, result.id)
[2018-05-04 18:58:00,046: WARNING/MainProcess] AttributeError
[2018-05-04 18:58:00,046: WARNING/MainProcess] :
[2018-05-04 18:58:00,046: WARNING/MainProcess] 'NoneType' object has no attribute 'id'

celery=4.1.0
    def apply_entry(self, entry, producer=None):
        info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)
        try:
            result = self.apply_async(entry, producer=producer, advance=False)
        except Exception as exc:  # pylint: disable=broad-except
            error('Message Error: %s\n%s',
                  exc, traceback.format_stack(), exc_info=True)
        else:
227           debug('%s sent. id->%s', entry.task, result.id)

apply_async need return something

@luiscastillocr
Copy link

luiscastillocr commented Oct 10, 2018

class TransactionAwareTask(Task):
    '''
    Task class which is aware of Django DB transactions and
    only executes tasks after the transaction has been committed
    '''
    
    abstract = True

    def apply_async(self, *args, **kwargs):
        cnx = transaction.get_connection()
        if not cnx.in_atomic_block:
            # https://medium.com/gitux/speed-up-django-transaction-hooks-tests-6de4a558ef96
            return super(TransactionAwareTask, self).apply_async(*args, **kwargs)

        # Unlike the default task in celery, this task does not return an async result
        transaction.on_commit(lambda: super(TransactionAwareTask, self).apply_async(*args, **kwargs))

I would like to contribute with some few code lines, the idea is to make it work with transactional and not transactional tests

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment