Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Easy to use socket servers for testing
# Copyright (C) 2011 Vinay Sajip. All rights reserved.
# Encapsulate socket servers into an easy-to-use package
try: # Python 2.X
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
from SocketServer import ThreadingUDPServer, DatagramRequestHandler, \
ThreadingTCPServer, StreamRequestHandler
except ImportError: # Python 3.X
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import ThreadingUDPServer, DatagramRequestHandler, \
ThreadingTCPServer, StreamRequestHandler
raw_input = input
import threading
import time
class ControlMixin(object):
"""
This mixin is used to start a server on a separate thread, and
shut it down programmatically. Request handling is simplified - instead
of needing to derive a suitable RequestHandler subclass, you just
provide a callable which will be passed each received request to be
processed.
:param handler: A handler callable which will be called with a
single parameter - the request - in order to
process the request. This handler is called on the
server thread, effectively meaning that requests are
processed serially. While not quite Web scale ;-),
this should be fine for testing applications.
:param poll_interval: The polling interval in seconds.
"""
def __init__(self, handler, poll_interval):
self._thread = None
self.poll_interval = poll_interval
self._handler = handler
self._ready = threading.Event()
def start(self):
"""
Create a daemon thread to run the server, and start it.
"""
self._thread = t = threading.Thread(target=self.serve_forever,
args=(self.poll_interval,))
t.setDaemon(True)
t.start()
def serve_forever(self, poll_interval):
self._ready.set()
super(ControlMixin, self).serve_forever(poll_interval)
def stop(self):
"""
Tell the server thread to stop, and wait for it to do so.
"""
self.shutdown()
self._thread.join()
self._thread = None
self._ready.clear()
class EasyHTTPServer(ControlMixin, HTTPServer):
"""
An HTTP server which is controllable using :class:`ControlMixin`.
:param addr: A tuple with the IP address and port to listen on.
:param handler: A handler callable which will be called with a
single parameter - the request - in order to
process the request.
:param poll_interval: The polling interval in seconds.
"""
def __init__(self, addr, handler, poll_interval=0.5):
class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler):
def __getattr__(self, name, default=None):
if name.startswith('do_'):
return self.process_request
raise AttributeError(name)
def process_request(self):
self.server._handler(self)
HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler)
ControlMixin.__init__(self, handler, poll_interval)
class EasyTCPServer(ControlMixin, ThreadingTCPServer):
"""
A TCP server which is controllable using :class:`ControlMixin`.
:param addr: A tuple with the IP address and port to listen on.
:param handler: A handler callable which will be called with a
single parameter - the request - in order to
process the request.
:param poll_interval: The polling interval in seconds.
:bind_and_activate: If True (the default), binds the server and
starts it listening. If False, you need to
call :meth:`server_bind` and
:meth:`server_activate` at some later time
before calling :meth:`start`, so that the server will
set up the socket and listen on it.
"""
def __init__(self, addr, handler, poll_interval=0.5, bind_and_activate=True):
class DelegatingTCPRequestHandler(StreamRequestHandler):
def handle(self):
self.server._handler(self)
ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler,
bind_and_activate)
ControlMixin.__init__(self, handler, poll_interval)
class EasyUDPServer(ControlMixin, ThreadingUDPServer):
"""
A UDP server which is controllable using :class:`ControlMixin`.
:param addr: A tuple with the IP address and port to listen on.
:param handler: A handler callable which will be called with a
single parameter - the request - in order to
process the request.
:param poll_interval: The polling interval for shutdown requests,
in seconds.
:bind_and_activate: If True (the default), binds the server and
starts it listening. If False, you need to
call :meth:`server_bind` and
:meth:`server_activate` at some later time
before calling :meth:`start`, so that the server will
set up the socket and listen on it.
"""
def __init__(self, addr, handler, poll_interval=0.5, bind_and_activate=True):
class DelegatingUDPRequestHandler(DatagramRequestHandler):
def handle(self):
self.server._handler(self)
ThreadingUDPServer.__init__(self, addr, DelegatingUDPRequestHandler,
bind_and_activate)
ControlMixin.__init__(self, handler, poll_interval)
def main():
def deleg(request):
s = '%.2f' % time.time()
request.wfile.write(s.encode('utf-8'))
httpserver = EasyHTTPServer(('', 8090), deleg, 0.001)
tcpserver = EasyTCPServer(('', 8091), deleg, 0.001)
udpserver = EasyUDPServer(('', 8092), deleg, 0.001)
httpserver.start()
tcpserver.start()
udpserver.start()
raw_input('Servers started, press enter to stop:')
udpserver.stop()
tcpserver.stop()
httpserver.stop()
print('All done.')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment