-
-
Save eguven/69ef059a6f94e438f0bc9c9b6326c27e to your computer and use it in GitHub Desktop.
RabbitMQ consumer that logs to console and file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Copyright 2019, Bayes Esports Solutions GmbH, all rights reserved. | |
# Requirements: | |
# kombu (pip install kombu) | |
# Usage: | |
# $ export RABBITMQ_CONSUME_URI=amqps://{user}:{password}@{host}/{vhost} | |
# $ export RABBITMQ_QUEUE_NAME=foobar.outbox | |
# $ python logging_consumer.py | |
import logging | |
import os | |
import sys | |
import time | |
from kombu import Connection, Queue, disable_insecure_serializers | |
from kombu.mixins import ConsumerMixin | |
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.INFO) | |
disable_insecure_serializers() | |
class WorkerConfig(object): | |
"""Container to hold some configuration.""" | |
def __init__(self): | |
self.rabbitmq_consume_uri = os.environ['RABBITMQ_CONSUME_URI'] | |
self.queue = os.environ['RABBITMQ_QUEUE_NAME'] | |
self.prefetch_count = int(os.environ.get('PREFETCH', '10')) | |
try: | |
self.stop_after = int(os.environ.get('STOP_AFTER_MESSAGES', '')) | |
except ValueError: | |
self.stop_after = None | |
self.savefile = os.environ.get('SAVEFILE', 'consumer-{}.json'.format(int(time.time()))) | |
self.savefile = os.path.abspath(os.path.expanduser(self.savefile)) | |
logger.info('Savefile is %s', self.savefile) | |
class Worker(ConsumerMixin): | |
"""RabbitMQ Consumer that consumers messages and saves them into a file, 1 message per line. | |
Stops after consuming `config.stop_after` number of messages if provided. | |
""" | |
def __init__(self, config=None): | |
self.config = config if config is not None else WorkerConfig() | |
self.connection = Connection(config.rabbitmq_consume_uri, heartbeat=30) | |
self.queue = Queue(config.queue, no_declare=True) | |
self.stop_after = config.stop_after | |
self.message_counter = 0 | |
def get_consumers(self, Consumer, channel): | |
return [ | |
Consumer(queues=[self.queue], callbacks=[self.on_message], prefetch_count=self.config.prefetch_count), | |
] | |
def on_message(self, body, message): | |
logger.info('Received message body with length: %d\n%s', len(body), body) | |
with open(self.config.savefile, 'a') as f: | |
f.write(body) | |
f.write('\n') | |
message.ack() | |
self.message_counter += 1 | |
if self.stop_after is not None and self.message_counter == self.stop_after: | |
sys.exit(0) | |
if __name__ == '__main__': | |
config = WorkerConfig() | |
worker = Worker(config=config) | |
try: | |
worker.run() | |
except KeyboardInterrupt: | |
worker.should_stop = True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment