Skip to content

Instantly share code, notes, and snippets.

@Arrayly
Created April 21, 2023 15:00
Show Gist options
  • Save Arrayly/97d2c0b88c3726c54c51f5a692e9950a to your computer and use it in GitHub Desktop.
Save Arrayly/97d2c0b88c3726c54c51f5a692e9950a to your computer and use it in GitHub Desktop.
RabbitMQ Test Connection Python
import pika
private_ip = "10.0.2.248"
# Construct the RabbitMQ connection string
rabbitmq_connection_string = f"amqp://sdkuser:sdkuserfa77e8704f07404ab4b923b1fdd23608@{private_ip}:5672/sdk"
# Establish a connection to RabbitMQ
connection = pika.BlockingConnection(pika.URLParameters(rabbitmq_connection_string))
channel = connection.channel()
# Declare a test queue, publish a message, and consume the message
queue_name = "test_queue"
channel.queue_declare(queue=queue_name)
message = "Hello RabbitMQ!"
channel.basic_publish(exchange='', routing_key=queue_name, body=message)
print(f"Sent: {message}")
method_frame, _, body = channel.basic_get(queue=queue_name)
if method_frame:
print(f"Received: {body.decode()}")
channel.basic_ack(method_frame.delivery_tag)
else:
print("No message received")
# Close the connection
connection.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment