Skip to content

Instantly share code, notes, and snippets.

@jdmaturen
Forked from imlucas/thrift_gevent.py
Created March 3, 2011 23:23
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jdmaturen/853845 to your computer and use it in GitHub Desktop.
Save jdmaturen/853845 to your computer and use it in GitHub Desktop.
import logging
import gevent
from gevent.queue import Queue
from thrift.server.TServer import TServer
from thrift.transport.TTransport import TTransportException
class TGEventServer(TServer):
"""Gevent socket server."""
def serve(self):
self.serverTransport.listen()
while True:
client = self.serverTransport.accept()
gevent.spawn(self._process_socket, client)
def _process_socket(self, client):
"""A greenlet for handling a single client."""
itrans = self.inputTransportFactory.getTransport(client)
otrans = self.outputTransportFactory.getTransport(client)
iprot = self.inputProtocolFactory.getProtocol(itrans)
oprot = self.outputProtocolFactory.getProtocol(otrans)
try:
while True:
self.processor.process(iprot, oprot)
except TTransportException, e:
pass
except Exception, e:
pass
itrans.close()
otrans.close()
class TGEventPoolServer(TServer):
"""Gevent socket server with fixed pool of processors"""
def __init__(self, *args, **kwargs):
TServer.__init__(self, *args)
self.clients = Queue()
self.greenlets = 10
def setNumGreenlets(self, num):
"""Set the number of worker greenlets that should be created"""
self.greenlets = num
def serveGreenlet(self):
"""Loop around getting clients from the shared queue and process them."""
while True:
try:
client = self.clients.get()
self.serveClient(client)
except Exception, x:
logging.exception(x)
def serveClient(self, client):
"""Process input/output from a client for as long as possible"""
itrans = self.inputTransportFactory.getTransport(client)
otrans = self.outputTransportFactory.getTransport(client)
iprot = self.inputProtocolFactory.getProtocol(itrans)
oprot = self.outputProtocolFactory.getProtocol(otrans)
try:
while True:
self.processor.process(iprot, oprot)
except TTransportException, tx:
pass
except Exception, x:
logging.exception(x)
itrans.close()
otrans.close()
def serve(self):
"""Start a fixed number of worker greenlets and put client into a queue"""
for i in range(self.greenlets):
try:
g = gevent.spawn(self.serveGreenlet)
except Exception, x:
logging.exception(x)
# Pump the socket for clients
self.serverTransport.listen()
while True:
try:
client = self.serverTransport.accept()
self.clients.put(client)
except Exception, x:
logging.exception(x)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment