Skip to content

Instantly share code, notes, and snippets.

@ericflo
Created June 17, 2010 19:27
Show Gist options
  • Save ericflo/442648 to your computer and use it in GitHub Desktop.
Save ericflo/442648 to your computer and use it in GitHub Desktop.
import uuid
import simplejson
import amqplib.client_0_8 as amqp
class SimpleCelery(object):
def __init__(self, task_map, host, port, user, password, exchange, vhost,
ssl=False, retries=5, routing_key='celery'):
self._task_map = task_map
self._host = host
self._port = port
self._user = user
self._password = password
self._exchange = exchange
self._vhost = vhost
self._ssl = ssl
self._retries = retries
self._routing_key = routing_key
def __getattr__(self, method):
if method not in self._task_map:
raise AttributeError("'%s' object has no attribute '%s'" % (
self.__class__.__name__, method))
return self._request(self._task_map[method])
def _request(self, task):
def _inner(*args, **kwargs):
msg_id = str(uuid.uuid1())
request = simplejson.dumps({
'task': task,
'id': msg_id,
'args': args,
'kwargs': kwargs,
'retries': self._retries,
})
msg = amqp.Message(request, delivery_mode=2,
content_type='application/json')
with amqp.Connection(host=self._host, ssl=self._ssl,
userid=self._user, password=self._password,
virtual_host=self._vhost) as conn:
with conn.channel() as ch:
ch.basic_publish(msg, exchange=self._exchange,
routing_key=self._routing_key)
return msg_id
return _inner
@ask
Copy link

ask commented Jun 18, 2010

Nice. What do you use it for? Have you seen celery.execute.send_task, not sure if that does what you want, but it might. ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment