Created
October 19, 2012 17:33
-
-
Save akorobov/3919513 to your computer and use it in GitHub Desktop.
test/demo code to receive messages from given rabbitmq's queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ***** BEGIN LICENSE BLOCK ***** | |
# | |
# For copyright and licensing please refer to COPYING. | |
# | |
# ***** END LICENSE BLOCK ***** | |
""" | |
Example of simple consumer using SSL. Acks each message as it arrives. | |
""" | |
from ssl import CERT_REQUIRED, PROTOCOL_SSLv3 | |
import sys | |
import logging | |
# Detect if we're running in a git repo | |
from os.path import exists, normpath | |
if exists(normpath('../pika')): | |
sys.path.insert(0, '..') | |
from pika.adapters import SelectConnection | |
from pika.connection import ConnectionParameters | |
#import pika.log as log | |
# We use these to hold our connection & channel | |
connection = None | |
channel = None | |
queue_name = 'test' | |
def on_connected(connection): | |
global channel | |
print "demo_receive: Connected to RabbitMQ" | |
connection.channel(on_channel_open) | |
def on_channel_open(channel_): | |
global channel | |
channel = channel_ | |
print "demo_receive: Received our Channel" | |
channel.queue_declare(queue=queue_name, | |
durable=True, | |
exclusive=False, | |
auto_delete=False, | |
callback=on_queue_declared) | |
def on_queue_declared(frame): | |
print "demo_receive: Queue Declared %s" % frame | |
channel.basic_consume(handle_delivery, queue=queue_name) | |
count = 0 | |
step = 5000 | |
def handle_delivery(channel, method_frame, header_frame, body): | |
# print "Basic.Deliver %s delivery-tag %i: body %s" %\ | |
# (header_frame.content_type, | |
# method_frame.delivery_tag, | |
# body) | |
global count | |
count +=1 | |
if 0 == count % step: | |
print "received %d messages" % count | |
channel.basic_ack(delivery_tag=method_frame.delivery_tag) | |
if __name__ == '__main__': | |
logging.basicConfig(level=logging.INFO, format='%(levelname) -10s %(asctime)s %(name) -35s %(funcName) -30s %(lineno) -5d: %(message)s') | |
# Setup empty ssl options | |
ssl_options = {} | |
# Uncomment this to test client certs, change to your cert paths | |
# Uses certs as generated from http://www.rabbitmq.com/ssl.html | |
ssl_options = {"ca_certs": "test.pem", | |
"certfile": "test.pem", | |
"keyfile": "test-key.pem", | |
"cert_reqs": CERT_REQUIRED} | |
# Connect to RabbitMQ | |
queue_name = (len(sys.argv) > 1) and sys.argv[1] or 'test' | |
host = (len(sys.argv) > 2) and sys.argv[2] or '127.0.0.1' | |
ssl_parameters = ConnectionParameters(host, 5671, | |
ssl=True, | |
ssl_options=ssl_options) | |
parameters = ConnectionParameters(host, 5672, | |
ssl=False) | |
connection = SelectConnection(ssl_parameters, on_connected) | |
# Loop until CTRL-C | |
try: | |
# Start our blocking loop | |
connection.ioloop.start() | |
except KeyboardInterrupt: | |
# Close the connection | |
connection.close() | |
# Loop until the conneciton is closed | |
connection.ioloop.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment