Skip to content

Instantly share code, notes, and snippets.

@jam182
Last active October 23, 2015 14:37
Show Gist options
  • Save jam182/45e0825f8bbe1874a3df to your computer and use it in GitHub Desktop.
Save jam182/45e0825f8bbe1874a3df to your computer and use it in GitHub Desktop.
Echo server that uses only sockets and selects. No threads, asyncio etc.
"""Daniele Giancola 2015
This program is implemented in python3.5.
The idea behind this server is to use non-blocking sockets,
and to schedule callbacks with 'select' when the sockets are ready
to be read or written.
According to the documentation, DefaultSelector() is an alias
for the most efficient implementation (epoll, select, kqueue, iocp, etc.)
based on the current platform.
https://docs.python.org/3/library/selectors.html#selectors.DefaultSelector
The default selector is used to register a mapping, objects to callbacks:
(file descriptor, event) -> (callback).
When one of the objects (file descriptor or object with fileno() method)
becomes ready for the event it was registered for, the 'select' method will
return it in a list of (key, events) tuples (one for each ready object).
For each of these object the callback can be retrieved and finally
be executed being sure it can properly use the sockets for the operations
it needs.
In order to keep the registry clean, when a connection is closed or is lost,
the socket will be unregistered.
In order to send complete lines (that end with \n), a buffer to store
incomplete lines is kept for each client.
This buffer is used to attach to its content the next chunk of data received
until there is a \n. Then the message can be finally sent.
For instance when the server is first initialized, the socket on which
it will listen to, for incoming connections, is set to non-blocking.
However it is only registered in the selector to wait until it is ready
to be read. Only when the select decides that the socket is ready, the event
is returned so the callback to accept an incoming connection is executed.
Basically the event loop returns the socket with the callback for each event,
and it executes the callback passing the socket as an argument.
Simple Usage:
Server:
$ python3.5 echo_server.py
Echo Server Started
Client:
$ ncat localhost 8888
hello
hello
"""
import selectors
import socket
from functools import partial
from collections import deque
class EchoServer:
"""Asynchronous server that echos every message it receives.
It echos line by line.
It works without threads, only sockets and selects.
It can concurrently accept and receive messages from multiple clients.
Examples - Start the server and connect with multiple clients:
Server:
>>> server = EchoServer()
>>> server
EchoServer(host='localhost', port=8888)
>>> server.run()
Echo Server Started
Client:
$ ncat localhost 8888
hello
hello
"""
def __init__(self, host='localhost', port=8888, backlog=5):
"""Start up the server on the specified address:port.
Get the most efficient implementation for the current platform of
the selector.
Set the socket to non blocking mode and register it
to the select system-call for the EVENT_READ event, so to be notified
when the socket is ready to be read and the server
can accept connections"""
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.setblocking(False)
self.sock.bind((host, port))
self.sock.listen(backlog)
self.sel = selectors.DefaultSelector()
self.sel.register(self.sock, selectors.EVENT_READ, self._accept)
self.clients_buffered_line = dict()
self.clients_echos = dict()
def __repr__(self):
host, port = self.sock.getsockname()
cls = self.__class__.__name__
return '{0}(host={1}, port={2})'.format(cls, host, port)
def run(self):
"""Simple event loop that calls all the callbacks
for those sockets who registered to require some action
on a particular event happening."""
while True:
events = self.sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj)
def _accept(self, sock):
"""When a socket is ready to accept a new connection,
this callback is executed. For each client a buffer to receive
incomplete lines is initialized. Finally register the socket
to be called back whenever it is ready to receive with the _read()"""
conn, addr = sock.accept()
conn.setblocking(False)
self.clients_buffered_line[conn] = ''
self.sel.register(conn, selectors.EVENT_READ, self._read)
def _read(self, sock, size=4096):
"""When the socket is ready to receive, get data as long as
there is any and make sure to echo every line back to the client."""
try:
data = sock.recv(size)
if data:
self._echo(sock, data.decode())
else:
self._unregister_client(sock)
except ConnectionResetError: # Unexpected connection loss
self._unregister_client(sock)
def _echo(self, sock, data):
"""This function makes sure to compose a complete line
for each client and sends it back to it."""
lines = data.split('\n')
lines[0] = ''.join([self.clients_buffered_line[sock], lines[0]])
for line in lines[:-1]:
msg = ''.join([line, '\n']).encode()
self._add_writer_callback(msg, sock)
if len(lines) > 1:
self.sel.modify(sock, selectors.EVENT_WRITE, self._schedule_send)
self.clients_buffered_line[sock] = lines.pop()
def _add_writer_callback(self, msg, sock):
"""Create a callback to send a line and add it
to the queue"""
callback = partial(self._send_line, data=msg)
if sock not in self.clients_echos:
self.clients_echos[sock] = deque()
self.clients_echos[sock].append(callback)
def _send_line(self, sock, data):
"""Send a full line. If the client cannot receive the
whole packet, schedule the remaining data to be sent for
when the client socket is ready to be written on again.
When done start listening again for new data to echo."""
data_sent = sock.send(data)
if data_sent < len(data):
callback = partial(self._send_line, data=data[data_sent:])
self.sel.modify(sock, selectors.EVENT_WRITE, callback)
else:
self.sel.modify(sock, selectors.EVENT_READ, self._read)
def _schedule_send(self, sock):
"""Whenever the socket is ready for write operations,
call send. Register for writing event if there is more than
one line to be sent"""
callback = self.clients_echos[sock].popleft()
try:
callback(sock)
if self.clients_echos[sock]:
self.sel.modify(sock, selectors.EVENT_WRITE,
self._schedule_send)
else:
del self.clients_echos[sock]
except (BlockingIOError, ConnectionResetError):
self._unregister_client(sock)
def _unregister_client(self, sock):
"""When a client is done sending data, the server can clean up by
removing its buffer and unregistering its socket from the
select"""
if sock in self.clients_buffered_line:
del self.clients_buffered_line[sock]
if sock in self.clients_echos:
del self.clients_echos[sock]
try:
self.sel.unregister(sock)
except ValueError:
pass # File Descriptor -1. Error, close connection.
finally:
sock.close()
def close(self):
"""Close nicely when the server exits"""
clients = self.clients_buffered_line.copy()
for client in clients:
self._unregister_client(client)
self.sel.close()
self.sock.close()
def start_sever():
"""Start the server from the command line"""
server = EchoServer()
try:
print('Echo Server Started')
server.run()
except KeyboardInterrupt:
print('Server Closing')
finally:
server.close()
if __name__ == '__main__':
start_sever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment