Skip to content

Instantly share code, notes, and snippets.

@nicolasbock
Created March 27, 2020 20:13
Show Gist options
  • Save nicolasbock/d13154c966edd721c8f4dd6ee23e36dd to your computer and use it in GitHub Desktop.
Save nicolasbock/d13154c966edd721c8f4dd6ee23e36dd to your computer and use it in GitHub Desktop.
rabbitmq tester
#!/usr/bin/env python
import argparse
import pika
import amqp
import sys
def parse_command_line():
parser = argparse.ArgumentParser()
parser.add_argument(
"BROKER",
help="The IP address or hostname of the broker",
nargs="+")
parser.add_argument(
"--queue",
type=str,
default="test_queue")
parser.add_argument(
"--send",
metavar="MSG",
help="Send MSG to queue")
parser.add_argument(
"--get",
help="Get one message from queue",
action="store_true")
parser.add_argument(
"--list",
help="List messages in queue",
action="store_true")
return parser.parse_args()
def open_connection(broker, queue_name):
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=broker,
virtual_host="tester",
credentials=pika.PlainCredentials('tester', 'linux')
))
except pika.exceptions.AMQPConnectionError:
print("connection failure for %s" % broker)
return None, None
channel = connection.channel()
channel.queue_declare(queue_name, durable=False)
return connection, channel
def main(options):
for broker in options.BROKER:
connection, channel = open_connection(broker, options.queue)
if connection is None:
continue
if options.send:
print("sending one message")
channel.basic_publish(exchange="",
routing_key=options.queue,
body=options.send)
break
elif options.get:
print("getting one message")
method_frame, header_frame, body = channel.basic_get(options.queue)
if method_frame:
print("message %s %s '%s'" % (method_frame, message_frame, body.decode("utf-8")))
channel.basic_ack(method_frame.delivery_tag)
else:
print("no message received")
elif options.list:
print("message list on broker %s" % options.BROKER)
messages = []
while True:
method_frame, header_frame, body = channel.basic_get(options.queue)
if method_frame:
messages.append((method_frame, header_frame, body))
channel.basic_ack(method_frame.delivery_tag)
print("message %s %s body: '%s'" % (method_frame, header_frame, body.decode("utf-8")))
else:
break
for _, _, body in messages:
channel.basic_publish(exchange="",
routing_key=options.queue,
body=body)
connection.close()
if __name__ == "__main__":
main(parse_command_line())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment