Skip to content

Instantly share code, notes, and snippets.

@fabware
Created April 23, 2011 03:29
Show Gist options
  • Save fabware/938226 to your computer and use it in GitHub Desktop.
Save fabware/938226 to your computer and use it in GitHub Desktop.
---------- clients.py ----------
class PikaPubSubSubscribeClient(object):
def __init__(self, message_processor,
host,
port,
virtual_host,
username,
password,
exchange_name='ground',
exchange_type='fanout',
queue_name='',
routing_key=''):
self.message_processor = message_processor
self.host = host
self.port = port
self.virtual_host = virtual_host
self.username = username
self.password = password
self.exchange_name = exchange_name
self.exchange_type = exchange_type
self.connected = False
self.connecting = False
self.connection = None
self.channel = None
self.queue_name = queue_name
self.routing_key = routing_key
def connect(self):
if self.connecting:
return
self.connecting = True
credentials = pika.PlainCredentials(self.username, self.password)
param = pika.ConnectionParameters(host=self.host,
port=self.port,
virtual_host=self.virtual_host,
credentials=credentials)
srs = pika.reconnection_strategies.SimpleReconnectionStrategy()
self.connection = pika.SelectConnection(param,
on_open_callback=self.on_connected,
reconnection_strategy=srs)
self.connection.add_on_close_callback(self.on_closed)
def on_connected(self, connection):
self.connected = True
self.connection = connection
self.connection.channel(self.on_channel_open)
def on_channel_open(self, channel):
self.channel = channel
self.channel.exchange_declare(exchange=self.exchange_name,
type=self.exchange_type,
auto_delete=False,
durable=True,
callback=self.on_exchange_declared)
def on_exchange_declared(self, frame):
self.channel.queue_declare(exclusive=True,
durable=True,
auto_delete=False,
callback=self.on_queue_declared)
def on_queue_declared(self, frame):
self.queue_name = frame.method.queue
self.channel.queue_bind(exchange=self.exchange_name,
queue=self.queue_name,
routing_key=self.routing_key,
callback=self.on_queue_bind)
def on_queue_bind(self, frame):
self.channel.basic_consume(self.message_processor,
queue=self.queue_name,
no_ack=True)
def on_closed(self, connection):
connection.close()
------ test_sub.py ------------
def process_user_msg(ch, method, properties, body):
global counter
msg = simplejson.loads(body)
print "[%d]: %r" % (counter, msg)
time.sleep(0.1)
counter += 1
if __name__ == '__main__':
client = PikaPubSubSubscribeClient(process_user_msg,
settings.MESSAGER_DEFAULT_HOST,
settings.MESSAGER_DEFAULT_PORT,
settings.MESSAGER_DEFAULT_VIRTUALHOST,
settings.MESSAGER_DEFAULT_USERNAME,
settings.MESSAGER_DEFAULT_PASSWORD,
exchange_name='ground_user'
)
try:
client.connect()
client.connection.ioloop.start()
except Exception, e:
import traceback; traceback.print_exc()
client.connection.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment