Skip to content

Instantly share code, notes, and snippets.

@deepanshululla
Last active January 2, 2023 16:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save deepanshululla/0d6bc812112c0d6566d5b77efc6c5fc1 to your computer and use it in GitHub Desktop.
Save deepanshululla/0d6bc812112c0d6566d5b77efc6c5fc1 to your computer and use it in GitHub Desktop.
import socket
import threading
class ThreadedServer:
def __init__(self, host_name="localhost", port_num=6379, reuse_port=True):
self.host_name = host_name
self.port_num = port_num
self.reuse_port = reuse_port
self.init_server()
def init_server(self):
self.socket = socket.create_server((self.host_name, self.port_num), reuse_port=self.reuse_port)
# self.socket.setblocking(False) # raises error for some reason
print(f"Listening on {(self.host_name, self.port_num)}")
def accept(self):
conn, addr = self.socket.accept()
# conn.setblocking(False) # raises error for some reason
print(f"Connected to {addr}")
return conn, addr
def __reply_to_ping(self, conn, addr):
while True:
try:
conn.recv(1024) # wait for client to send data
self.__send(conn, b"+PONG\r\n")
except ConnectionError:
print(f"Disconnected from {addr}")
break # Stop serving if the client connection is closed
def __send(self, conn, data):
conn.sendall(data)
def listen_for_connections(self):
while True:
conn, addr = self.accept()
threading.Thread(target=self.__reply_to_ping, args=(conn, addr,)).start()
def __del__(self):
self.socket.close()
if __name__ == "__main__":
s = ThreadedServer()
s.listen_for_connections()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment