Created
March 27, 2020 20:13
-
-
Save nicolasbock/d13154c966edd721c8f4dd6ee23e36dd to your computer and use it in GitHub Desktop.
rabbitmq tester
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import argparse | |
import pika | |
import amqp | |
import sys | |
def parse_command_line(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"BROKER", | |
help="The IP address or hostname of the broker", | |
nargs="+") | |
parser.add_argument( | |
"--queue", | |
type=str, | |
default="test_queue") | |
parser.add_argument( | |
"--send", | |
metavar="MSG", | |
help="Send MSG to queue") | |
parser.add_argument( | |
"--get", | |
help="Get one message from queue", | |
action="store_true") | |
parser.add_argument( | |
"--list", | |
help="List messages in queue", | |
action="store_true") | |
return parser.parse_args() | |
def open_connection(broker, queue_name): | |
try: | |
connection = pika.BlockingConnection(pika.ConnectionParameters( | |
host=broker, | |
virtual_host="tester", | |
credentials=pika.PlainCredentials('tester', 'linux') | |
)) | |
except pika.exceptions.AMQPConnectionError: | |
print("connection failure for %s" % broker) | |
return None, None | |
channel = connection.channel() | |
channel.queue_declare(queue_name, durable=False) | |
return connection, channel | |
def main(options): | |
for broker in options.BROKER: | |
connection, channel = open_connection(broker, options.queue) | |
if connection is None: | |
continue | |
if options.send: | |
print("sending one message") | |
channel.basic_publish(exchange="", | |
routing_key=options.queue, | |
body=options.send) | |
break | |
elif options.get: | |
print("getting one message") | |
method_frame, header_frame, body = channel.basic_get(options.queue) | |
if method_frame: | |
print("message %s %s '%s'" % (method_frame, message_frame, body.decode("utf-8"))) | |
channel.basic_ack(method_frame.delivery_tag) | |
else: | |
print("no message received") | |
elif options.list: | |
print("message list on broker %s" % options.BROKER) | |
messages = [] | |
while True: | |
method_frame, header_frame, body = channel.basic_get(options.queue) | |
if method_frame: | |
messages.append((method_frame, header_frame, body)) | |
channel.basic_ack(method_frame.delivery_tag) | |
print("message %s %s body: '%s'" % (method_frame, header_frame, body.decode("utf-8"))) | |
else: | |
break | |
for _, _, body in messages: | |
channel.basic_publish(exchange="", | |
routing_key=options.queue, | |
body=body) | |
connection.close() | |
if __name__ == "__main__": | |
main(parse_command_line()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment