Skip to content

Instantly share code, notes, and snippets.

@nikoheikkila
Created February 13, 2019 17:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nikoheikkila/dabbdc6f13e7ef49901cb14f81c72512 to your computer and use it in GitHub Desktop.
Save nikoheikkila/dabbdc6f13e7ef49901cb14f81c72512 to your computer and use it in GitHub Desktop.
MQ example with Python and Pika
from pika import BlockingConnection
from pika.exceptions import AMQPConnectionError, AMQPChannelError
class MQ:
def __init__(self):
self.connection = BlockingConnection()
self.channel = self.connection.channel()
def close_connection(self):
self.connection.close()
def basic_cancel(self):
self.channel.cancel()
class Publisher(MQ):
def __init__(self):
super().__init__()
def publish(self, exchange, routing_key, message):
body = b"{message}"
self.channel.basic_publish(exchange=exchange, routing_key=routing_key, body=body)
self.close_connection()
class Consumer(MQ):
def __init__(self, consume_limit=0):
super().__init__()
self.consume_limit consume_limit
def on_close_callback(self):
self.basic_cancel()
self.close_connection()
@retry(AMQPConnectionError, delay=5, jitter=(1, 3))
def consume(self, exchange):
self.channel.basic_consume(exchange, self.on_close_callback)
try:
self.channel.start_consuming()
except ConnectionClosedByBroker
pass
def basic_ack(self, delivery_tag):
if self.channel.is_open:
return self.channel.basic_ack(delivery_tag)
raise AMQPChannelError(f"Channel with tag {delivery_tag} is already closed.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment