Skip to content

Instantly share code, notes, and snippets.

@artemrys
Last active January 20, 2019 20:16
Show Gist options
  • Save artemrys/4e2984b0266d99f45200f3d92790c3a0 to your computer and use it in GitHub Desktop.
Save artemrys/4e2984b0266d99f45200f3d92790c3a0 to your computer and use it in GitHub Desktop.
Celery External Call using RabbitMQ
import json
from celery import Celery
from celery import bootsteps
from kombu import Consumer, Exchange, Queue
queue = Queue("input.queue", Exchange("default"), "input.key")
app = Celery(broker="amqp://")
# Decalring the general input message handler
class InputMessageHandler(object):
def handle(self, body):
body_json = json.loads(body)
_type = body_json["type"]
if _type == "ETL":
ETLMessageHandler().handle(body_json)
# Declaring the ETL message handler
class ETLMessageHandler(object):
def handle(self, body):
print("Working on ETL for message: {0}".format(body))
# Calling out your Celery tasks here
# Declaring the bootstep for our purposes
class InputMessageConsumerStep(bootsteps.ConsumerStep):
def get_consumers(self, channel):
return [Consumer(channel,
queues=[queue],
callbacks=[self.handle_message],
accept=["json"])]
def handle_message(self, body, message):
InputMessageHandler().handle(body)
message.ack()
app.steps["consumer"].add(InputMessageConsumerStep)
if __name__ == "__main__":
app.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment