Let's test integration between Zope transactions and celery tasks.
First let's turn off celery's asynchronous behavior so it's easier to check when tasks have been queued:
>>> from celery import Celery
>>> celery = Celery('example.celery')
>>> celery.conf.CELERY_ALWAYS_EAGER = True
>>> celery.conf.BROKER_BACKEND = 'memory'
Okay, we're going to try out a deferred log task that will record the fact that it's been called in a global we can easily check:
>>> result = []
>>> @celery.task
... def log(*args):
... global result
... result = list(args)
We can schedule this task within a transaction:
>>> import transaction
>>> txn = transaction.begin()
>>> res = log.delay('1')
But it will get executed even if the transaction gets aborted:
>>> transaction.abort()
>>> result
['1']
That's a problem if the transaction ends up hitting a conflict error and getting retried, because the same task will get queued multiple times! So let's be smarter and schedule the task from within an after-commit hook that will only run it if the commit succeeded:
>>> def schedule_task(task, *args, **kw):
... def hook(success):
... if success:
... task.delay(*args, **kw)
... transaction.get().addAfterCommitHook(hook)
>>> schedule_task(log, '2')
Now if we commit the transaction the task gets queued:
>>> transaction.commit()
>>> result
['2']
But if we set things up the same way and abort the transaction, the task will not get queued:
>>> result = []
>>> schedule_task(log, '3')
>>> transaction.abort()
>>> result
[]