Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@davisagli
Last active April 24, 2022 11:19
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save davisagli/5824662 to your computer and use it in GitHub Desktop.
Save davisagli/5824662 to your computer and use it in GitHub Desktop.
Running Zope code as a celery task
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import noSecurityManager
from Testing.makerequest import makerequest
from ZODB.POSException import ConflictError
from celery import Celery, Task
from email.Header import Header
from zope.app.publication.interfaces import BeforeTraverseEvent
from zope.component.hooks import setSite
from zope.event import notify
import email
import os
import sys
import transaction
import Zope2
celery = Celery(
'myproject.tasks',
broker='amqp://localhost//',
backend='amqp://localhost//',
)
class AfterCommitTask(Task):
"""Base for tasks that queue themselves after commit.
This is intended for tasks scheduled from inside Zope.
"""
abstract = True
# Override apply_async to register an after-commit hook
# instead of queueing the task right away.
def apply_async(self, *args, **kw):
def hook(success):
if success:
super(AfterCommitTask, self).apply_async(*args, **kw)
transaction.get().addAfterCommitHook(hook)
# apply_async normally returns a deferred result object,
# but we don't have one available yet
def zope_task(**task_kw):
"""Decorator of celery tasks that should be run in a Zope context.
The decorator function takes a path as a first argument,
and will take care of traversing to it and passing it
(presumably a portal) as the first argument to the decorated function.
Also takes care of initializing the Zope environment,
running the task within a transaction, and retrying on
ZODB conflict errors.
"""
def wrap(func):
def new_func(*args, **kw):
site_path = kw.get('site_path', 'Plone')
site_path = site_path.strip().strip('/')
# This is a super ugly way of getting Zope to configure itself
# from the main instance's zope.conf. XXX FIXME
sys.argv = ['']
os.environ['ZOPE_CONFIG'] = 'parts/client1/etc/zope.conf'
app = makerequest(Zope2.app())
transaction.begin()
try:
try:
# find site
site = app.unrestrictedTraverse(site_path)
# fire traversal event so various things get set up
notify(BeforeTraverseEvent(site, site.REQUEST))
# set up admin user
user = app.acl_users.getUserById('admin')
newSecurityManager(None, user)
# run the task
result = func(site, *args, **kw)
# commit transaction
transaction.commit()
except ConflictError, e:
# On ZODB conflicts, retry using celery's mechanism
transaction.abort()
raise new_func.retry(exc=e)
except:
transaction.abort()
raise
finally:
noSecurityManager()
setSite(None)
app._p_jar.close()
return result
new_func.__name__ = func.__name__
return celery.task(base=AfterCommitTask, **task_kw)(new_func)
return wrap
@zope_task()
def run_view(portal, view_path):
view = portal.restrictedTraverse(view_path)
view()
@zope_task()
def send_mail(portal, subject, message, mfrom, mto):
if isinstance(message, unicode):
message = message.encode('utf8')
msg = email.message_from_string(message)
msg.set_charset('utf-8')
msg['Reply-To'] = Header(mfrom, 'utf-8')
mfrom = email.utils.formataddr((portal.email_from_name, portal.email_from_address))
mailhost = portal.MailHost
mailhost.send(
msg, subject=subject, mfrom=mfrom, mto=mto,
immediate=True, charset='utf-8')
# Examples of calling:
run_view.defer('@@foo')
send_mail.defer(u'Test message', u'test', u'test@example.com', u'test@example.com')
@yboussard
Copy link

cool I will test that ! Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment