Skip to content

Instantly share code, notes, and snippets.

@vgoklani
Created January 30, 2013 01:10
Show Gist options
  • Save vgoklani/4669710 to your computer and use it in GitHub Desktop.
Save vgoklani/4669710 to your computer and use it in GitHub Desktop.
# http://www.rabbitmq.com/tutorials/tutorial-two-python.html
import sys, time, json, logging
import pika
hostname = 'localhost'
logging.getLogger('pika').setLevel(logging.ERROR)
class RabbitMQ(object):
def __init__(self, hostname, queue, exchange = ''):
self.hostname = hostname
self.queue = queue
self.exchange = ''
self.connection = None
self.channel = None
self.connect()
def connect(self):
sys.stdout.write('\nOpening RabbitMQ connection via %s for %s' \
% (self.hostname, self.queue))
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host = self.hostname))
self.channel = self.connection.channel()
self.channel.queue_declare(queue = self.queue, durable = True)
def __del__(self):
sys.stdout.write('\nClosing RabbitMQ connection via %s for %s' \
% (self.hostname, self.queue))
self.connection.close()
# producer
def publish(self, messages, display_output=False):
for message in messages:
self.channel.basic_publish(exchange=self.exchange, routing_key = self.queue,
body = message, properties = pika.BasicProperties(delivery_mode = 2))
if display_output == True:
sys.stdout.write('\n[x] Sent! %r' % (message,))
def callback(self, ch, method, properties, body, display_output=True):
message = json.loads(body)
if display_output == True:
sys.stdout.write('\n[x] Received %r' % (message))
ch.basic_ack(delivery_tag = method.delivery_tag)
time.sleep(1)
# use carefully, as the consumers will have to restarted!
def flush(self):
self.channel.queue_delete(queue=self.queue)
self.connect()
# consumer
def consumer(self, callback=None):
if callback is None:
callback = self.callback
sys.stdout.write('\n[*] Waiting for messages. To exit press CTRL+C\n\n')
try:
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(callback, queue=self.queue)
self.channel.start_consuming()
except KeyboardInterrupt:
print 'RabbitMQ -> keyboard interruption!!!!!'
self.channel.stop_consuming()
finally:
self.connection.close()
def flush(queue):
queue.flush()
queue = RabbitMQ(hostname, queue.queue)
def test_producer(queue=None):
if queue is None:
queue = RabbitMQ(hostname, 'test')
messages = [json.dumps( {i : 'message #' + str(i)} ) for i in range(1000)]
queue.publish(messages, display_output=True)
def test_consumer(queue=None):
if queue is None:
queue = RabbitMQ(hostname, 'test')
queue.consumer()#callback)
def test():
queue = RabbitMQ(hostname, 'test')
test_producer(queue)
time.sleep(5)
test_consumer(queue)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment