Skip to content

Instantly share code, notes, and snippets.

@lloesche
Created August 15, 2023 19:24
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lloesche/6c03ae3ffb5115852fa53dca7c803f05 to your computer and use it in GitHub Desktop.
Save lloesche/6c03ae3ffb5115852fa53dca7c803f05 to your computer and use it in GitHub Desktop.
Python async server that uses multiple CPU cores while sharing a single socket.
#!/bin/env python3
import signal
import socket
import multiprocessing
import asyncio
import logging
import argparse
from threading import Event
from asyncio import StreamReader, StreamWriter
log_format = f"%(asctime)s|%(levelname)5s|%(process)d|%(threadName)10s %(message)s"
logging.basicConfig(level=logging.WARNING, format=log_format)
log = logging.getLogger("multicoreserver")
log.setLevel(logging.DEBUG)
HTTP_RESPONSE = b"""HTTP/1.1 200 OK
Content-Type: text/plain
Connection: close
Hello, World!
"""
async def handle_client(reader: StreamReader, writer: StreamWriter) -> None:
request: bytes = await reader.read(4096)
if request.startswith(b"GET"):
writer.write(HTTP_RESPONSE)
await writer.drain()
writer.close()
await writer.wait_closed()
async def worker_task(sock: socket.socket, command_queue: multiprocessing.Queue) -> None:
try:
server: asyncio.AbstractServer = await asyncio.start_server(handle_client, sock=sock)
except Exception as e:
log.error(f"Error while starting server: {e}")
return
try:
while True:
if not command_queue.empty():
try:
command = command_queue.get()
except EOFError:
break
except Exception as e:
log.error(f"Error while getting command from queue: {e}")
continue
if command == "shutdown":
break
await asyncio.sleep(1)
finally:
server.close()
await server.wait_closed()
def worker(sock: socket.socket, command_queue: multiprocessing.Queue) -> None:
signal.signal(signal.SIGINT, signal.SIG_IGN)
loop: asyncio.AbstractEventLoop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(worker_task(sock, command_queue))
finally:
loop.close()
def main() -> None:
parser = argparse.ArgumentParser(description="Multi-core HTTP Server")
parser.add_argument("-p", "--port", type=int, default=8080, help="Port to listen on. (default: 8080)")
parser.add_argument("-n", "--num-workers", type=int, default=16, help="Number of worker processes. (default: 16)")
args = parser.parse_args()
port = args.port
num_workers = args.num_workers
command_queue = multiprocessing.Queue()
shutdown_event = Event()
log.info(f"Starting server on port {port}...")
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
sock.bind(("::", port))
sock.listen(5)
processes = []
def start_new_process():
try:
p: multiprocessing.Process = multiprocessing.Process(target=worker, args=(sock, command_queue))
p.start()
except Exception as e:
log.error(f"Error while starting worker process: {e}")
else:
log.debug(f"Started worker process with PID {p.pid}")
processes.append(p)
for _ in range(num_workers):
start_new_process()
def signal_handler(signum, frame):
shutdown_event.set()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
while not shutdown_event.is_set():
for p in list(processes):
if not p.is_alive():
log.warning(f"Worker process with PID {p.pid} died. Restarting it...")
processes.remove(p)
start_new_process()
shutdown_event.wait(timeout=1)
log.info("Shutting down...")
try:
for _ in processes:
command_queue.put("shutdown")
except Exception as e:
log.error(f"Error while sending shutdown command to worker processes: {e}")
for p in processes:
p.join(timeout=5)
if p.is_alive():
log.warning(f"Worker process with PID {p.pid} did not shut down in time. Forcefully terminating it...")
p.terminate()
log.info("Shutdown complete")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment