Let's test integration between Zope transactions and celery tasks.
First let's turn off celery's asynchronous behavior so it's easier to check when tasks have been queued:
>>> from celery import Celery >>> celery = Celery('example.celery') >>> celery.conf.CELERY_ALWAYS_EAGER = True >>> celery.conf.BROKER_BACKEND = 'memory'
Okay, we're going to try out a deferred log task that will record the fact that it's been called in a global we can easily check:
>>> result = [] >>> @celery.task ... def log(*args): ... global result ... result = list(args)
We can schedule this task within a transaction:
>>> import transaction >>> txn = transaction.begin() >>> res = log.delay('1')
But it will get executed even if the transaction gets aborted:
>>> transaction.abort() >>> result ['1']
That's a problem if the transaction ends up hitting a conflict error and getting retried, because the same task will get queued multiple times! So let's be smarter and schedule the task from within an after-commit hook that will only run it if the commit succeeded:
>>> def schedule_task(task, *args, **kw): ... def hook(success): ... if success: ... task.delay(*args, **kw) ... transaction.get().addAfterCommitHook(hook) >>> schedule_task(log, '2')
Now if we commit the transaction the task gets queued:
>>> transaction.commit() >>> result ['2']
But if we set things up the same way and abort the transaction, the task will not get queued:
>>> result = [] >>> schedule_task(log, '3') >>> transaction.abort() >>> result []