Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import sys, json, pika
from pika.connection import SimpleReconnectionStrategy
def msg_rcvd(channel, method, header, body):
if header.content_type != "application/json":
print "Discarding message. Not JSON."
channel.basic_ack(delivery_tag=method.delivery_tag)
return
message = json.loads(body)
print "Received: %(content)s/%(time)d" % message
channel.basic_ack(delivery_tag=method.delivery_tag)
if __name__ == "__main__":
AMQP_SERVER = sys.argv[1]
AMQP_PORT = int(sys.argv[2])
creds_broker = pika.PlainCredentials("guest", "guest")
conn_params = pika.ConnectionParameters( AMQP_SERVER,
port=AMQP_PORT,
virtual_host="/",
credentials=creds_broker,
heartbeat=10)
class CustomReconnectionStrategy(SimpleReconnectionStrategy):
def on_connection_open(self, conn):
self._reset()
channel = conn.channel()
channel.exchange_declare( exchange="cluster_test",
type="direct",
auto_delete=False)
channel.queue_declare( queue="cluster_test", auto_delete=False)
channel.queue_bind( queue="cluster_test",
exchange="cluster_test",
routing_key="cluster_test")
channel.basic_consume( msg_rcvd,
queue="cluster_test",
no_ack=False,
consumer_tag="cluster_test")
print "Ready for testing!"
reconnect = CustomReconnectionStrategy()
conn_broker = pika.AsyncoreConnection( conn_params,
reconnection_strategy=reconnect)
pika.asyncore_loop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment