Skip to content

Instantly share code, notes, and snippets.

@davisagli
Created May 29, 2013 05:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save davisagli/5668242 to your computer and use it in GitHub Desktop.
Save davisagli/5668242 to your computer and use it in GitHub Desktop.
Celery & Zope transactions

Let's test integration between Zope transactions and celery tasks.

First let's turn off celery's asynchronous behavior so it's easier to check when tasks have been queued:

>>> from celery import Celery
>>> celery = Celery('example.celery')
>>> celery.conf.CELERY_ALWAYS_EAGER = True
>>> celery.conf.BROKER_BACKEND = 'memory'

Okay, we're going to try out a deferred log task that will record the fact that it's been called in a global we can easily check:

>>> result = []
>>> @celery.task
... def log(*args):
...     global result
...     result = list(args)

We can schedule this task within a transaction:

>>> import transaction
>>> txn = transaction.begin()
>>> res = log.delay('1')

But it will get executed even if the transaction gets aborted:

>>> transaction.abort()
>>> result
['1']

That's a problem if the transaction ends up hitting a conflict error and getting retried, because the same task will get queued multiple times! So let's be smarter and schedule the task from within an after-commit hook that will only run it if the commit succeeded:

>>> def schedule_task(task, *args, **kw):
...     def hook(success):
...         if success:
...             task.delay(*args, **kw)
...     transaction.get().addAfterCommitHook(hook)
>>> schedule_task(log, '2')

Now if we commit the transaction the task gets queued:

>>> transaction.commit()
>>> result
['2']

But if we set things up the same way and abort the transaction, the task will not get queued:

>>> result = []
>>> schedule_task(log, '3')
>>> transaction.abort()
>>> result
[]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment