Skip to content

Instantly share code, notes, and snippets.

@EggieCode
Created February 18, 2016 09:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save EggieCode/5f41187a0db5f320b7bf to your computer and use it in GitHub Desktop.
Save EggieCode/5f41187a0db5f320b7bf to your computer and use it in GitHub Desktop.
Finger socket in python3 with asyncio
#!/usr/bin/python3
#SERVER
import asyncio
import ssl
class EchoServerClientProtocol(asyncio.Protocol):
def __init__(self, *args, **kwargs):
self.data = bytearray()
self.queue = asyncio.Queue()
super().__init__(*args, **kwargs)
def connection_made(self, transport):
self.transport = transport
print("Connection from ", self.transport.get_extra_info("peername"))
def data_received(self, data):
print(data);
self.data += data
index = self.data.find("\r\n".encode())
if index != -1:
message = self.data[:index].decode()
self.data = self.data[index + 2:]
print("Message found: %s" % (message))
self.write("Writing a finger Python3 Async IO Server")
self.write()
self.write("No methods yet! Even not for `%s`"% message)
self.write()
self.write("Try: finger tilburg@graph.no")
self.write("Greetz, EggieCode.org")
self.close()
def write(self, message = ""):
message += "\r\n"
self.transport.write(message.encode("ascii"))
def close(self):
self.transport.close()
def connection_lost(self, exc):
pass
loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(EchoServerClientProtocol, '', 79)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment