Created
May 30, 2012 06:31
-
-
Save blackwithwhite666/2834098 to your computer and use it in GitHub Desktop.
Cython - gevent - thrift server. Prototype.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# cython: profile=True | |
from gevent import socket | |
import logging | |
import struct | |
from libcpp.string cimport string | |
cimport cython | |
from cpython cimport bool | |
from gevent.hub import get_hub | |
from gevent.event import Event | |
cdef enum ConnectionStates: | |
WAIT_LEN = 0 | |
WAIT_MESSAGE = 1 | |
WAIT_PROCESS = 2 | |
SEND_ANSWER = 3 | |
CLOSED = 4 | |
def socket_exception(func): | |
"Decorator close object on socket.error." | |
def inner(self, *args, **kwargs): | |
try: | |
return func(self, *args, **kwargs) | |
except socket.error: | |
self.close() | |
return inner | |
@cython.final | |
cdef class Connection(object): | |
"""Basic class is represented connection. | |
It can be in state: | |
WAIT_LEN --- connection is reading request len. | |
WAIT_MESSAGE --- connection is reading request. | |
WAIT_PROCESS --- connection has just read whole request and | |
waits for call ready routine. | |
SEND_ANSWER --- connection is sending answer string (including length | |
of answer). | |
CLOSED --- socket was closed and connection should be deleted. | |
""" | |
cdef object socket | |
cdef object format | |
cdef ConnectionStates status | |
cdef size_t len | |
cdef string *message | |
cdef object event | |
cdef object read_watcher | |
cdef object write_watcher | |
def __cinit__(self): | |
self.message = new string() | |
self.len = 0 | |
self.status = WAIT_LEN | |
def __init__(self, client_socket): | |
self.socket = client_socket | |
self.format = struct.Struct('!i') | |
self.event = Event() | |
loop = get_hub().loop | |
self.read_watcher = loop.io(self.socket.fileno(), 1) | |
self.write_watcher = loop.io(self.socket.fileno(), 2) | |
self.read_watcher.priority = loop.MAXPRI | |
self.write_watcher.priority = loop.MAXPRI | |
self.start_listen_read() | |
def __dealloc__(self): | |
del self.message | |
cdef inline void _read_len(self): | |
"""Reads length of request. | |
It's really paranoic routine and it may be replaced by | |
self.socket.recv(4).""" | |
read = self.socket.recv(4 - self.message.size()) | |
cdef int read_length = len(read) | |
if read_length == 0: | |
# if we read 0 bytes and self.message is empty, it means client close | |
# connection | |
if self.message.size() != 0: | |
logging.error("can't read frame size from socket") | |
self.close() | |
return | |
self.message.append(<char *>read, read_length) | |
if self.message.size() == 4: | |
self.len, = self.format.unpack(self.content()) | |
if self.len < 0: | |
logging.error("negative frame size, it seems client"\ | |
" doesn't use FramedTransport") | |
self.close() | |
elif self.len == 0: | |
logging.error("empty frame, it's really strange") | |
self.close() | |
else: | |
self.message.clear() | |
self.status = WAIT_MESSAGE | |
cdef void read(self): | |
"""Reads data from stream and switch state.""" | |
cdef int read_length = 0 | |
assert self.is_readable() | |
if self.status == WAIT_LEN: | |
self._read_len() | |
# go back to the main loop here for simplicity instead of | |
# falling through, even though there is a good chance that | |
# the message is already available | |
elif self.status == WAIT_MESSAGE: | |
read = self.socket.recv(self.len - self.message.size()) | |
read_length = len(read) | |
if read_length == 0: | |
logging.error("can't read frame from socket (get %d of %d bytes)" % | |
(self.message.size(), self.len)) | |
self.close() | |
return | |
self.message.append(<char *>read, read_length) | |
if self.message.size() == self.len: | |
self.status = WAIT_PROCESS | |
cdef void write(self): | |
"""Writes data from socket and switch state.""" | |
cdef string s | |
assert self.is_writeable() | |
sent = self.socket.send(self.content()) | |
if sent == self.message.size(): | |
self.status = WAIT_LEN | |
self.message.clear() | |
self.len = 0 | |
else: | |
s = self.message.substr(sent, self.message.size() - sent) | |
self.message.assign(s) | |
cdef void ready(self, all_ok, message): | |
"""Callback function for switching state and waking up main thread. | |
This function is the only function witch can be called asynchronous. | |
The ready can switch Connection to three states: | |
WAIT_LEN if request was oneway. | |
SEND_ANSWER if request was processed in normal way. | |
CLOSED if request throws unexpected exception. | |
The one wakes up main thread. | |
""" | |
assert self.is_ready() | |
if not all_ok: | |
self.close() | |
return | |
self.len = 0 | |
if len(message) == 0: | |
# it was a oneway request, do not write answer | |
self.message.clear() | |
self.status = WAIT_LEN | |
else: | |
reply = self.format.pack(len(message)) + message | |
self.message.assign(<char *>reply, len(reply)) | |
self.status = SEND_ANSWER | |
cpdef close(self): | |
"Closes connection" | |
self.status = CLOSED | |
self.stop_listen_read() | |
self.stop_listen_write() | |
self.socket.close() | |
@cython.profile(False) | |
cpdef inline bool is_writeable(self): | |
"Returns True if connection should be added to write list of select." | |
return self.status == SEND_ANSWER | |
@cython.profile(False) | |
cpdef inline bool is_readable(self): | |
"Returns True if connection should be added to read list of select." | |
return self.status == WAIT_LEN or self.status == WAIT_MESSAGE | |
@cython.profile(False) | |
cpdef inline bool is_closed(self): | |
"Returns True if connection is closed." | |
return self.status == CLOSED | |
@cython.profile(False) | |
cpdef inline bool is_ready(self): | |
"Returns True if connection is ready." | |
return self.status == WAIT_PROCESS | |
cpdef inline bytes content(self): | |
cdef bytes s = self.message.c_str()[:self.message.size()] | |
return s | |
cdef inline void start_listen_read(self): | |
self.read_watcher.start(self.on_readable) | |
cdef inline void start_listen_write(self): | |
self.write_watcher.start(self.on_writable) | |
cdef inline void stop_listen_read(self): | |
self.read_watcher.stop() | |
cdef inline void stop_listen_write(self): | |
self.write_watcher.stop() | |
cpdef on_readable(self): | |
assert self.is_readable() | |
try: | |
self.read() | |
if self.is_ready() or self.is_closed(): | |
self.stop_listen_read() | |
self.event.set() | |
else: | |
assert self.is_readable() | |
except: | |
self.close() | |
self.event.set() | |
logging.error("invalid state after read") | |
cpdef on_writable(self): | |
assert self.is_writeable() | |
try: | |
self.write() | |
if self.is_readable(): | |
self.stop_listen_write() | |
self.start_listen_read() | |
elif self.is_closed(): | |
self.stop_listen_write() | |
else: | |
assert self.is_writeable() | |
except: | |
self.close() | |
logging.error("invalid state after write") | |
cpdef get_request(self): | |
self.event.wait() | |
self.event.clear() | |
assert self.is_ready() or self.is_closed() | |
return self.content() | |
cpdef set_reply(self, content, is_successed=True): | |
assert self.is_ready() | |
self.ready(is_successed, content) | |
if self.is_writeable(): | |
self.start_listen_write() | |
elif self.is_readable(): | |
self.start_listen_read() | |
else: | |
self.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# cython: profile=True | |
from .connection import Connection | |
from gevent.pool import Pool | |
from gevent.server import StreamServer | |
from thrift.protocol.TBinaryProtocol import TBinaryProtocolFactory | |
from thrift.transport import TTransport | |
import logging | |
class TGeventStreamServer(StreamServer): | |
def __init__(self, listener, processor, inputProtocolFactory=None, | |
outputProtocolFactory=None, pool_size=None): | |
self.processor = processor | |
self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory() | |
self.out_protocol = outputProtocolFactory or self.in_protocol | |
self.pool = Pool(size=pool_size or 1024) | |
StreamServer.__init__(self, listener, spawn=self.pool) | |
def process(self, connection): | |
content = connection.get_request() | |
if connection.is_closed(): | |
return | |
itransport = TTransport.TMemoryBuffer(content) | |
otransport = TTransport.TMemoryBuffer() | |
iprot = self.in_protocol.getProtocol(itransport) | |
oprot = self.out_protocol.getProtocol(otransport) | |
try: | |
self.processor.process(iprot, oprot) | |
except Exception, exc: | |
logging.exception(exc) | |
connection.set_reply('', is_successed=False) | |
else: | |
connection.set_reply(otransport.getvalue()) | |
def handle(self, socket, address): | |
connection = Connection(socket) | |
while not connection.is_closed(): | |
self.process(connection) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from distutils.core import setup | |
from distutils.extension import Extension | |
from Cython.Distutils import build_ext | |
import zmq | |
ext_modules = [Extension("gevent_thrift.connection", | |
["gevent_thrift/connection.pyx"], | |
language="c++"),] | |
setup( | |
name='gevent_thrift', | |
cmdclass={'build_ext': build_ext}, | |
ext_modules=ext_modules | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from broker import * | |
from broker.ttypes import * | |
from thrift import Thrift | |
from thrift.protocol import TBinaryProtocol | |
from thrift.transport import TSocket, TTransport | |
import sys | |
import time | |
if sys.platform == 'win32': | |
_timer = time.clock | |
else: | |
_timer = time.time | |
host = "localhost" | |
port = 9090 | |
delta = 5 | |
# Init thrift connection and protocol handlers | |
transport = TSocket.TSocket(host, port) | |
transport = TTransport.TFramedTransport(transport) | |
protocol = TBinaryProtocol.TBinaryProtocol(transport) | |
# Set client to our Example | |
client = Broker.Client(protocol) | |
# Connect to server | |
transport.open() | |
task = Task('reverse', 'test word') | |
elapsed = 0 | |
iterations = 1 | |
while elapsed < delta: | |
iterations *= 2 | |
t = _timer() | |
for i in xrange(iterations): | |
client.execute(task) | |
elapsed = _timer() - t | |
print iterations, 'objects passed through connection in', elapsed, 'seconds' | |
print 'average number/sec:', iterations / elapsed | |
# Close connection | |
transport.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from broker import Broker | |
from broker.ttypes import Result | |
from gevent_thrift.server import TGeventStreamServer | |
class BrokerHandler: | |
a = '0' * 256 | |
def execute(self, task): | |
return Result(self.a) | |
listener = ('localhost', 9090) | |
handler = BrokerHandler() | |
processor = Broker.Processor(handler) | |
server = TGeventStreamServer(listener, processor) | |
print 'Starting server' | |
server.serve_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment