Skip to content

Instantly share code, notes, and snippets.

@eguven
Last active October 26, 2022 14:34
Show Gist options
  • Save eguven/69ef059a6f94e438f0bc9c9b6326c27e to your computer and use it in GitHub Desktop.
Save eguven/69ef059a6f94e438f0bc9c9b6326c27e to your computer and use it in GitHub Desktop.
RabbitMQ consumer that logs to console and file
#!/usr/bin/env python
# Copyright 2019, Bayes Esports Solutions GmbH, all rights reserved.
# Requirements:
# kombu (pip install kombu)
# Usage:
# $ export RABBITMQ_CONSUME_URI=amqps://{user}:{password}@{host}/{vhost}
# $ export RABBITMQ_QUEUE_NAME=foobar.outbox
# $ python logging_consumer.py
import logging
import os
import sys
import time
from kombu import Connection, Queue, disable_insecure_serializers
from kombu.mixins import ConsumerMixin
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
disable_insecure_serializers()
class WorkerConfig(object):
"""Container to hold some configuration."""
def __init__(self):
self.rabbitmq_consume_uri = os.environ['RABBITMQ_CONSUME_URI']
self.queue = os.environ['RABBITMQ_QUEUE_NAME']
self.prefetch_count = int(os.environ.get('PREFETCH', '10'))
try:
self.stop_after = int(os.environ.get('STOP_AFTER_MESSAGES', ''))
except ValueError:
self.stop_after = None
self.savefile = os.environ.get('SAVEFILE', 'consumer-{}.json'.format(int(time.time())))
self.savefile = os.path.abspath(os.path.expanduser(self.savefile))
logger.info('Savefile is %s', self.savefile)
class Worker(ConsumerMixin):
"""RabbitMQ Consumer that consumers messages and saves them into a file, 1 message per line.
Stops after consuming `config.stop_after` number of messages if provided.
"""
def __init__(self, config=None):
self.config = config if config is not None else WorkerConfig()
self.connection = Connection(config.rabbitmq_consume_uri, heartbeat=30)
self.queue = Queue(config.queue, no_declare=True)
self.stop_after = config.stop_after
self.message_counter = 0
def get_consumers(self, Consumer, channel):
return [
Consumer(queues=[self.queue], callbacks=[self.on_message], prefetch_count=self.config.prefetch_count),
]
def on_message(self, body, message):
logger.info('Received message body with length: %d\n%s', len(body), body)
with open(self.config.savefile, 'a') as f:
f.write(body)
f.write('\n')
message.ack()
self.message_counter += 1
if self.stop_after is not None and self.message_counter == self.stop_after:
sys.exit(0)
if __name__ == '__main__':
config = WorkerConfig()
worker = Worker(config=config)
try:
worker.run()
except KeyboardInterrupt:
worker.should_stop = True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment