Skip to content

Instantly share code, notes, and snippets.

@sporsh
Created October 23, 2012 19:49
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save sporsh/3941124 to your computer and use it in GitHub Desktop.
Save sporsh/3941124 to your computer and use it in GitHub Desktop.
A Twisted example to demo deferreds, client/server protocols and endpoints
import time
from twisted.python import failure
from twisted.internet import reactor, defer, task
from twisted.internet.protocol import Protocol
# Define some messages for our protocol
MSG_REQUEST = '>'
MSG_REQ_ACK = ':'
MSG_REQ_SUCCEEDED = '='
MSG_REQ_FAILED = '!'
class ID10TRemoteProcedureCallProtocol(Protocol):
"""A trivial RPC-like protocol for handling concurrent requests
Demonstrates some features of Twisted like reactor, deferreds, endpoints...
Request messages start with '>' and a request name followed by arguments
>request_name [[arguments]...]
Responses are
': <request_id>' for acknowledged requests
'= <request_id> <result>' results for a request
'! <request_id> <error message>' for error message for a request
For detailed help on a specific request, send a help request message:
>help [request_name]
To list requests we have available handlers for:
>list
"""
def __init__(self):
self._request_id = 0
def connectionMade(self):
print "Connection made"
def connectionLost(self, reason):
print "Connection lost: %s" % reason.value
def dataReceived(self, data):
"""Callback called whenever twisted receive data on the transport.
"""
# We don't know that data contains exactly whole messages, so
# we should buffer and defragment messages more properly here...
# ... but I'm lazy, so assume that we get whole messages:
for message in data.split('\n'):
self._dispatch_message(message.strip())
def _dispatch_message(self, message):
"""Dispatches messages to correct handler.
"""
# You'd normally want to implement better error handling here
if not message:
return
try:
message_type = message[0]
if message_type == MSG_REQUEST:
parts = message[1:].split()
request, data = parts[0], parts[1:]
self._handle_request(request, data)
else :
print 'Unknown message %r' % message
except Exception as e:
print 'error:', e, "while dispatching message", repr(message)
def _handle_request(self, request_type, data):
"""Handle request of a certain type and given data
"""
self._request_id += 1
# Send a message to the client that we started handling the request
message = ' '.join([MSG_REQ_ACK, str(self._request_id)])
self.transport.write(message+'\n')
handler = getattr(self, 'handle_'+request_type.strip().upper(), None)
if not handler:
deferred = defer.fail("Unknown request type %r" % request_type)
else:
print "Handling request:", request_type, data
deferred = defer.maybeDeferred(handler, *data)
# Send the result when handling is done
deferred.addCallback(self._request_succeeded, self._request_id)
deferred.addErrback(self._request_failed, self._request_id)
def _request_succeeded(self, result, request_id):
"""Sends a result back to the client when the request is done.
This method is added as an callback to the deferred of the
request handler, so that it is triggered automatically when the result
is ready.
"""
message = ' '.join([MSG_REQ_SUCCEEDED, str(request_id), str(result)])
self.transport.write(message+'\n')
def _request_failed(self, failure, request_id):
"""Sends a failure response to the client with a error message
"""
print "[%i] Request failed: %s" % (request_id, failure.value)
message = ' '.join([MSG_REQ_FAILED, str(request_id), str(failure.value)])
self.transport.write(message+'\n')
# Always handle quit requests by droopping the connection
handle_QUIT = lambda self, *_: self.transport.loseConnection()
class ID10THelpHandlersMixin(object):
"""Handlers for self documenting the protocol
"""
def handle_HELP(self, name=''):
"""Display help text for a request if it is defined
>help [request]
"""
handler = (getattr(self, 'handle_'+name.strip().upper(), None)
or ID10TRemoteProcedureCallProtocol)
return handler.__doc__ or "No help for %r" % request
def handle_LIST(self):
"""List all request handlers
>list
"""
requests = [name.strip('handle_').lower() for name in dir(self)
if name.startswith('handle_')]
return 'Request handlers:\n' + '\n'.join(requests)
class ID10TSchedulingHandlersMixin(object):
"""Handlers that demonstrate blocking and non-blocing waits in the reactor.
"""
def handle_SLEEP(self, seconds, result=None):
"""Sleep for n seconds, then return a result
>sleep seconds [result]
"""
result = result or 'Slept for %s seconds' % seconds
deferred = defer.Deferred()
reactor.callLater(float(seconds), deferred.callback, result)
return deferred
def handle_BLOCK(self, seconds, result=None):
"""Block the reactor for n seconds, then return a result
>block seconds [result]
"""
time.sleep(float(seconds))
return defer.succeed(result or 'Blocked for %s seconds' % seconds)
class ID10TExtraHandlersMixin(object):
"""Mixin with super simple handlers just for fun
"""
handle_PING = lambda self, *_: 'PONG!'
handle_ECHO = lambda self, *a: ' '.join(a)
handle_SUM = lambda self, *a: sum(float(i) for i in a)
handle_MIN = lambda self, *a: min(*a)
if __name__ == '__main__':
import sys
from twisted.internet import reactor
from twisted.internet.endpoints import serverFromString
from twisted.internet.protocol import Protocol, ServerFactory
# Get port number to listen on from argv, or let twisted decide...
port = int(sys.argv[1]) if len(sys.argv)==2 else 0
# Get a Twisted server endpoint object from a description string
server = serverFromString(reactor, 'tcp:%i' % port)
# Set up the ID10T protocol as we want to use it
class ID10TRPCProtocol(ID10TRemoteProcedureCallProtocol,
ID10THelpHandlersMixin,
ID10TSchedulingHandlersMixin,
ID10TExtraHandlersMixin):
pass
# Start the ID10T server
server_factory = ServerFactory()
server_factory.protocol = ID10TRPCProtocol
server_factory.stopFactory = reactor.stop
server_listen_deferred = server.listen(server_factory)
@server_listen_deferred.addErrback
def server_listen_failed(failure):
"""Errback that is triggered if we fail to start the server
"""
print failure.value
reactor.stop()
@server_listen_deferred.addCallback
def server_listen_callback(twisted_port):
"""Callback that is triggered when the server has started listening
"""
print "Listening on port", twisted_port.getHost().port
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment