Created
February 1, 2014 15:56
-
-
Save minskmaz/8754062 to your computer and use it in GitHub Desktop.
Pika/tx publisher (broken - don't use)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pika | |
from pika.adapters.twisted_connection import TwistedConnection | |
from pika.adapters.twisted_connection import TwistedProtocolConnection | |
from pika.connection import ConnectionParameters | |
from pika import BasicProperties | |
from twisted.internet.task import LoopingCall | |
from twisted.python import log | |
import logging | |
from twisted.internet import task, protocol, reactor, defer | |
#from platform.utils.error_handling import genericErrback | |
#from platform.utils.persist import persist_published_rmq | |
class RMQPublish(object): | |
def __init__(self, rmquser,rmqpass,rmqhost,rmqport): | |
self.rmquser = rmquser | |
self.rmqpass = rmqpass | |
self.rmqhost = rmqhost | |
self.rmqport = rmqport | |
self.credentials = \ | |
pika.PlainCredentials(rmquser,rmqpass) | |
self.parameters = \ | |
pika.ConnectionParameters( | |
host = self.rmqhost, | |
port = self.rmqport, | |
virtual_host = '/', | |
credentials = self.credentials | |
) | |
def on_connected(self, connection): | |
d = connection.channel() | |
d.addCallback(self.got_channel) | |
return d | |
def got_channel(self, channel): | |
self.channel = channel | |
return channel | |
def publish(self, channel, msg): | |
properties = BasicProperties( | |
content_type = msg['content_type'], | |
delivery_mode = msg['delivery_mode'] | |
) | |
d = channel.basic_publish( | |
exchange=msg['exchange'], | |
routing_key=msg['routing_key'], | |
body = msg['body'], | |
properties=properties) | |
d.addErrback(lambda err: log.msg(str(err),level=logging.CRITICAL)) | |
return d | |
# HERE WE LOG THE MESSAGE | |
def after_publish(self, d, msg): | |
if not msg['routing_key'] == 'log': | |
#persist_published_rmq(msg) | |
pass | |
def end_looping_call(self, lc): | |
lc.stop() | |
def push_msg(self, channel, msg=None): | |
def push(): | |
d = self.publish(channel, msg) | |
d.addCallback(self.after_publish, msg) | |
lc = LoopingCall(push) | |
lc.start(0.2) | |
return lc | |
def __call__(self, msg={}): | |
cc = \ | |
protocol.ClientCreator( | |
reactor, | |
TwistedProtocolConnection, | |
self.parameters | |
) | |
d = cc.connectTCP(self.rmqhost, self.rmqport) | |
d.addCallback(lambda protocol: protocol.ready) | |
d.addCallback(self.on_connected) | |
d.addCallback(self.push_msg, msg=msg) | |
d.addCallback(self.end_looping_call) | |
#d.addErrback(genericErrback) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment