Skip to content

Instantly share code, notes, and snippets.

@fabware
Created April 23, 2011 10:03
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fabware/938517 to your computer and use it in GitHub Desktop.
Save fabware/938517 to your computer and use it in GitHub Desktop.
--------- clients.py --------
class PikaPubSubPublishClient(object):
def __init__(self, host=settings.APP_MESSAGER_DEFAULT_HOST,
port=settings.APP_MESSAGER_DEFAULT_PORT,
virtual_host=settings.APP_MESSAGER_DEFAULT_VIRTUALHOST,
username=settings.APP_MESSAGER_DEFAULT_USERNAME,
password=settings.APP_MESSAGER_DEFAULT_PASSWORD,
exchange_name='ground',
exchange_type='fanout'):
self.host = host
self.port = port
self.virtual_host = virtual_host
self.username = username
self.password = password
self.exchange_name = exchange_name
self.exchange_type = exchange_type
self.connected = False
self.connecting = False
self.connection = None
self.channel = None
def connect(self):
if self.connecting:
return
self.connecting = True
credentials = pika.PlainCredentials(self.username, self.password)
param = pika.ConnectionParameters(host=self.host,
port=self.port,
virtual_host=self.virtual_host,
credentials=credentials)
srs = pika.reconnection_strategies.SimpleReconnectionStrategy()
self.connection = TornadoConnection(param,
on_open_callback=self.on_connected,
reconnection_strategy=srs)
self.connection.add_on_close_callback(self.on_closed)
def on_connected(self, connection):
self.connected = True
# self.connecting = False
self.connection = connection
self.connection.channel(self.on_channel_open)
def on_channel_open(self, channel):
self.channel = channel
self.channel.exchange_declare(exchange=self.exchange_name,
type=self.exchange_type,
auto_delete=False,
durable=True)
def on_closed(self, connection):
connection.close()
def publish_message(self, msg, routing_key=''):
if not self.channel:
print 'channel not established'
return
if settings.ENABLE_MESSAGING:
msg_str = simplejson.dumps(msg)
try:
properties = pika.BasicProperties(content_type="text/plain",
delivery_mode=2)
self.channel.basic_publish(exchange=self.exchange_name,
routing_key=routing_key,
body=msg_str,
properties=properties
)
print 'publish message, sent %s'%msg_str
except Exception, e:
logging.error('publish message error: %s'%e)
else:
print 'messaging not enabled'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment