Skip to content

Instantly share code, notes, and snippets.

@davisagli
Last active December 18, 2015 18:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save davisagli/5824709 to your computer and use it in GitHub Desktop.
Save davisagli/5824709 to your computer and use it in GitHub Desktop.
Calling Salesforce as a celery task, from within a Zope transaction
from celery import Celery, Task
from requests.exceptions import ConnectionError
from simple_salesforce import Salesforce
import transaction
celery = Celery(
'path.to.this.module',
broker='amqp://localhost//',
backend='amqp://localhost//',
)
class AfterCommitTask(Task):
"""Base for tasks that queue themselves after commit.
Note: Tasks may run in parallel if you have celery's concurrency set > 1.
So if you need things to happen in the order they get queued, either
set CELERYD_CONCURRENCY = 1 or modify this to create a *chain* of tasks
for each transaction.
This is intended for tasks scheduled from inside Zope.
"""
abstract = True
# Override apply_async to register an after-commit hook
# instead of queueing the task right away.
def apply_async(self, *args, **kw):
def hook(success):
if success:
super(AfterCommitTask, self).apply_async(*args, **kw)
transaction.get().addAfterCommitHook(hook)
# apply_async normally returns a deferred result object,
# but we don't have one available yet
def salesforce_task(func):
"""Decorator to help write tasks that call out to Salesforce.
Takes care of instantiating a simple-salesforce client and
passing it in as the first argument of the decorated function.
Also takes care of catching and retrying on ConnectionErrors.
"""
def new_func(*args, **kw):
try:
sf = Salesforce(
username='FIXME',
password='FIXME',
security_token='FIXME',
sandbox=True,
)
res = func(sf, *args, **kw)
except ConnectionError, e:
# Handle transient network errors by telling
# celery to retry the job
raise new_func.retry(exc=e)
return res
new_func.__name__ = func.__name__
return celery.task(base=AfterCommitTask)(new_func)
# Example of use...
# 1. Defining a task:
@salesforce_task
def do_upsert(sf, sobject_type, record_id, data):
"""Upsert an object to Salesforce using the REST API.
This should be called as a deferred task.
"""
sobject = getattr(sf, sobject_type)
return sobject.upsert(record_id, data)
# 2. Using the task (from within a transaction somewhere):
do_upsert.delay('Contact', '1234', {'FirstName': 'Harvey'})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment