Last active
January 2, 2023 16:57
-
-
Save deepanshululla/a4e378dae2295d298c7d0ebc48a04697 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import selectors | |
import socket | |
import types | |
class ServerWithSelect: | |
def __init__(self, host_name="localhost", port_num=6379, reuse_port=True): | |
self.host_name = host_name | |
self.port_num = port_num | |
self.reuse_port = reuse_port | |
self.selector = selectors.DefaultSelector() | |
self.__init_server() | |
def __init_server(self): | |
sock = socket.create_server((self.host_name, self.port_num), reuse_port=self.reuse_port) | |
print(f"Listening on {(self.host_name, self.port_num)}") | |
sock.setblocking(False) | |
self.selector.register( | |
sock, selectors.EVENT_READ, data=None | |
) | |
def listen_for_connections(self): | |
while True: | |
events = self.selector.select(timeout=None) | |
for key, mask in events: | |
if key.data is None: | |
self.__accept_wrapper(key) | |
else: | |
self.__service_connection(key, mask) | |
def __accept_wrapper(self, key): | |
sock = key.fileobj | |
print("Listening for Connections") | |
conn, addr = sock.accept() # wait for client | |
print(f"Connected by {addr}") | |
conn.setblocking(False) | |
data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"") | |
events = selectors.EVENT_READ | selectors.EVENT_WRITE | |
self.selector.register(conn, events, data=data) | |
def __service_connection(self, key, mask): | |
sock = key.fileobj | |
data = key.data | |
if mask & selectors.EVENT_READ: | |
recv_data = sock.recv(1024) # Should be ready to read | |
if recv_data: | |
data.outb += recv_data | |
else: | |
print(f"Closing connection to {data.addr}") | |
self.selector.unregister(sock) | |
sock.close() | |
if mask & selectors.EVENT_WRITE: | |
if data.outb: | |
# reply to ping | |
sent = sock.send(b"+PONG\r\n") # Should be ready to write | |
# sent = self.__reply_to_ping(key) | |
data.outb = data.outb[sent:] | |
def __del__(self): | |
self.selector.close() | |
if __name__ == "__main__": | |
ServerWithSelect().listen_for_connections() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment