Created
February 13, 2019 17:33
-
-
Save nikoheikkila/dabbdc6f13e7ef49901cb14f81c72512 to your computer and use it in GitHub Desktop.
MQ example with Python and Pika
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pika import BlockingConnection | |
from pika.exceptions import AMQPConnectionError, AMQPChannelError | |
class MQ: | |
def __init__(self): | |
self.connection = BlockingConnection() | |
self.channel = self.connection.channel() | |
def close_connection(self): | |
self.connection.close() | |
def basic_cancel(self): | |
self.channel.cancel() | |
class Publisher(MQ): | |
def __init__(self): | |
super().__init__() | |
def publish(self, exchange, routing_key, message): | |
body = b"{message}" | |
self.channel.basic_publish(exchange=exchange, routing_key=routing_key, body=body) | |
self.close_connection() | |
class Consumer(MQ): | |
def __init__(self, consume_limit=0): | |
super().__init__() | |
self.consume_limit consume_limit | |
def on_close_callback(self): | |
self.basic_cancel() | |
self.close_connection() | |
@retry(AMQPConnectionError, delay=5, jitter=(1, 3)) | |
def consume(self, exchange): | |
self.channel.basic_consume(exchange, self.on_close_callback) | |
try: | |
self.channel.start_consuming() | |
except ConnectionClosedByBroker | |
pass | |
def basic_ack(self, delivery_tag): | |
if self.channel.is_open: | |
return self.channel.basic_ack(delivery_tag) | |
raise AMQPChannelError(f"Channel with tag {delivery_tag} is already closed.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment