Skip to content

Instantly share code, notes, and snippets.

@sboily
Created November 29, 2019 19:40
Show Gist options
  • Save sboily/05d77029f5345e46f70c7886475bbb41 to your computer and use it in GitHub Desktop.
Save sboily/05d77029f5345e46f70c7886475bbb41 to your computer and use it in GitHub Desktop.
import argparse
import kombu
import logging
from kombu.mixins import ConsumerMixin
from pprint import pformat
EXCHANGE = kombu.Exchange('xivo', type='topic')
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(process)d] (%(levelname)s) (%(name)s): %(message)s')
logger = logging.getLogger(__name__)
class C(ConsumerMixin):
def __init__(self, connection, routing_key):
self.connection = connection
self.routing_key = routing_key
self.stats = dict(count=0)
def get_consumers(self, Consumer, channel):
return [Consumer(kombu.Queue(exchange=EXCHANGE, routing_key=self.routing_key, exclusive=True),
callbacks=[self.on_message])]
def on_message(self, body, message):
#logger.info('Received: %s', pformat(body))
if self.stats.get(body['name']):
self.stats[body['name']] += 1
else:
self.stats[body['name']] = 1
self.stats['count'] += 1
message.ack()
def msg(self):
return self.stats
def main():
parser = argparse.ArgumentParser('read RabbitMQ xivo exchange')
parser.add_argument('-n', '--hostname', help='RabbitMQ server',
default='localhost')
parser.add_argument('-p', '--port', help='Port of RabbitMQ',
default='5672')
parser.add_argument('-r', '--routing-key', help='Routing key to bind on bus',
dest='routing_key', default='#')
args = parser.parse_args()
url_amqp = 'amqp://guest:guest@%s:%s//' % (args.hostname, args.port)
with kombu.Connection(url_amqp) as conn:
ws = C(conn, args.routing_key)
try:
ws.run()
except KeyboardInterrupt:
print "Message number: ", pformat(ws.msg())
return
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment