Skip to content

Instantly share code, notes, and snippets.

@GaretJax
Created August 20, 2009 18:08
Show Gist options
  • Save GaretJax/171238 to your computer and use it in GitHub Desktop.
Save GaretJax/171238 to your computer and use it in GitHub Desktop.
A transport class and a factory for thrift clients
from tamqtransport import TAMQTransport
from thrift.protocol import TBinaryProtocol
class TAMQBinaryProtocol(TBinaryProtocol):
def readMessageEnd(self):
self.trans.notifyMessageEnd()
class TAMQClientFactory(object):
def __init__(self, channel, services_exchange='services',
responses_exchange='responses', protocol_class=None):
self.channel = channel
self.services_exchange = services_exchange
self.responses_exchange = responses_exchange
self.protocol_class = protocol_class or TAMQBinaryProtocol
self.channel.exchange_declare(self.services_exchange, "direct")
self.channel.exchange_declare(self.responses_exchange, "direct")
def get_client(self, client_class, routing_key):
queue, _, _ = self.channel.queue_declare(exclusive=True,
auto_delete=True)
channel.queue_bind(queue, self.responses_exchange, routing_key=queue)
transport = TAMQTransport(self.channel, queue, self.exchange,
routing_key)
protocol = self.protocol_class(transport)
return client_class(protocol)
from cStringIO import StringIO
from amqplib import client_0_8 as amqp
class TAMQPTransport(TTransportBase):
def __init__(self, channel, reply_queue, exchange, routing_key):
self.channel = channel
self.__wbuf = StringIO()
self.__rbuf = None
self.reply_to = reply_queue
self.exchange = exchange
self.routing_key = routing_key
channel.basic_consume(reply_queue, callback=self.incomingMessage,
no_ack=True)
def write(self, buf):
self.__wbuf.write(buf)
def flush(self):
msg = self.__wbuf.getvalue()
self.__wbuf = StringIO()
self.sendMessage(msg)
def sendMessage(self, message):
msg = amqp.Message(message, reply_to=self.reply_to,
application_headers={'thriftClientName' : self.routing_key})
self.channel.basic_publish(msg, self.exchange, self.routing_key)
def notifyMessageEnd(self):
self.__rbuf = None
def incomingMessage(self, msg):
self.__rbuf = StringIO()
self.__rbuf.write(msg.body)
self.__rbuf.seek(0)
def read(self, sz):
if not self.__rbuf:
self.channel.wait()
return self.__rbuf.read(sz)
from service_api import FluidDB, Parrot # Imaginary thrift services
from tamqpclientfactory import TAMQClientFactory
from amqplib import client_0_8 as amqp
connection = amqp.Connection()
channel = connection.channel()
factory = TAMQClientFactory(channel)
fluiddb = factory.get_client(FluidDB.Client, 'query_browser')
results = fluiddb.query("hey man, don't have looked at this now...")
parrot = factory.get_client(Parrot.Client, 'mypersonalparrot')
print parrot.feed()
channel.close()
connection.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment