Skip to content

Instantly share code, notes, and snippets.

@jmorton
Created February 25, 2017 22:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jmorton/333bf9d648526e55b07300aee9d592f7 to your computer and use it in GitHub Desktop.
Save jmorton/333bf9d648526e55b07300aee9d592f7 to your computer and use it in GitHub Desktop.
import pika
import msgpack
import logging
import sys
from . import spark
from .app import logger
logger = logging.getLogger(__name__)
class MessagingException(Exception):
pass
def open_connection(host, port, ssl):
"""Explain"""
opts = pika.ConnectionParameters(host, port, ssl=ssl)
return pika.BlockingConnection(opts)
def close_connection(conn):
"""Explain"""
if conn is not None and conn.is_open:
conn.close()
return True
def open_channel(conn):
"""Explain"""
channel = conn.channel()
channel.basic_qos(prefetch_count=1)
return channel
def consume(channel, queue, handler):
"""Explain"""
return channel.basic_consume(handler, queue=queue, exclusive=False)
def decode(request):
"""Convert keys and values unpacked as bytes to strings."""
body = msgpack.unpackb(request)
out = dict()
for k, v in body.items():
out_k, out_v = k, v
if isinstance(k, bytes):
out_k = k.decode('utf-8')
if isinstance(v, bytes):
out_v = v.decode('utf-8')
out[out_k] = out_v
return out
def make_receiver(handle, sender):
"""Produce a fuction that handles messages."""
def receiver(ch, method, properties, body):
[send(ch, result) for result in handle(ch, decode(message))]
ch.basic_ack(method)
return receiver
def encode(result):
"""Prepare a result for messaging."""
return msgpack.packb(result)
def make_sender(exchange, routing_key):
"""Produce a function that sends results via a channel."""
properties = pika.BasicProperties(delivery_mode=2)
def sender(ch, result):
message = encode(result)
return ch.basic_publish(exchange=exchange,
routing_key=routing_key,
body=message,
properties=properties)
return sender
def worker(host, port, ssl, queue, exchange, routing_key, handler):
"""Create and start a consumer."""
try:
conn = open_connection(host, port, ssl)
chan = open_channel(conn)
sender = make_sender(exchange, routing_key)
receiver = make_receiver(handler, sender)
worker = consume(chan, queue, receiver).start_consuming()
return worker
except Exception as e:
logger.error(e)
sys.exit(1)
def run(cfg):
"""Start a worker."""
host = cfg['rabbit-host']
port = cfg['rabbit-port'],
ssl = cfg['rabbit-ssl'])
queue = cfg['rabbit-queue']
exchange = cfg['rabbit-exchange']
key = cfg['rabbit-result-routing-key']
return worker(host, port, ssl, queue, exchange, key, spark.run)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment