Skip to content

Instantly share code, notes, and snippets.

@minskmaz
Created February 1, 2014 15:56
Show Gist options
  • Save minskmaz/8754062 to your computer and use it in GitHub Desktop.
Save minskmaz/8754062 to your computer and use it in GitHub Desktop.
Pika/tx publisher (broken - don't use)
import pika
from pika.adapters.twisted_connection import TwistedConnection
from pika.adapters.twisted_connection import TwistedProtocolConnection
from pika.connection import ConnectionParameters
from pika import BasicProperties
from twisted.internet.task import LoopingCall
from twisted.python import log
import logging
from twisted.internet import task, protocol, reactor, defer
#from platform.utils.error_handling import genericErrback
#from platform.utils.persist import persist_published_rmq
class RMQPublish(object):
def __init__(self, rmquser,rmqpass,rmqhost,rmqport):
self.rmquser = rmquser
self.rmqpass = rmqpass
self.rmqhost = rmqhost
self.rmqport = rmqport
self.credentials = \
pika.PlainCredentials(rmquser,rmqpass)
self.parameters = \
pika.ConnectionParameters(
host = self.rmqhost,
port = self.rmqport,
virtual_host = '/',
credentials = self.credentials
)
def on_connected(self, connection):
d = connection.channel()
d.addCallback(self.got_channel)
return d
def got_channel(self, channel):
self.channel = channel
return channel
def publish(self, channel, msg):
properties = BasicProperties(
content_type = msg['content_type'],
delivery_mode = msg['delivery_mode']
)
d = channel.basic_publish(
exchange=msg['exchange'],
routing_key=msg['routing_key'],
body = msg['body'],
properties=properties)
d.addErrback(lambda err: log.msg(str(err),level=logging.CRITICAL))
return d
# HERE WE LOG THE MESSAGE
def after_publish(self, d, msg):
if not msg['routing_key'] == 'log':
#persist_published_rmq(msg)
pass
def end_looping_call(self, lc):
lc.stop()
def push_msg(self, channel, msg=None):
def push():
d = self.publish(channel, msg)
d.addCallback(self.after_publish, msg)
lc = LoopingCall(push)
lc.start(0.2)
return lc
def __call__(self, msg={}):
cc = \
protocol.ClientCreator(
reactor,
TwistedProtocolConnection,
self.parameters
)
d = cc.connectTCP(self.rmqhost, self.rmqport)
d.addCallback(lambda protocol: protocol.ready)
d.addCallback(self.on_connected)
d.addCallback(self.push_msg, msg=msg)
d.addCallback(self.end_looping_call)
#d.addErrback(genericErrback)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment