Last active
June 20, 2023 02:19
-
-
Save magiskboy/5c511a30ab2220d8f4c84d8e56eabf34 to your computer and use it in GitHub Desktop.
The simple HTTP server is based on socket programming
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from urllib.parse import parse_qs | |
import json | |
class Headers(dict): | |
def __setitem__(self, k, v): | |
super().__setitem__(k.lower(), v) | |
def __getitem__(self, k): | |
return super().__getitem__(k.lower()) | |
def get(self, key): | |
return super().get(key.lower()) | |
@classmethod | |
def from_list(cls, headers) -> "Headers": | |
h = cls() | |
for k, v in headers: | |
h.__setitem__(k, v) | |
return h | |
class Request: | |
def __init__(self): | |
self.url = "" | |
self.schema = "" | |
self.host = "" | |
self.port = 80 | |
self.path = "" | |
self.args = {} | |
self.headers = Headers() | |
self.body = b"" | |
self.method = "" | |
self._environments = {} | |
@classmethod | |
def from_environments(cls, environments: dict) -> "Request": | |
headers = Headers.from_list(environments.get("headers", [])) | |
request = cls() | |
request.url = environments.get("url").decode("utf-8") | |
request.schema = environments.get("url.schema", b"http").decode("utf-8") | |
request.host = environments.get("url.host") or headers.get("host") | |
if isinstance(request.host, bytes): | |
request.host = request.host.decode("utf-8") | |
request.port = environments.get("url.port") or 80 | |
request.path = environments.get("url.path").decode("utf-8") | |
request.args = parse_qs(environments.get("url.query", "")) | |
request.headers = Headers() | |
for k, v in headers.items(): | |
request.headers[k.decode("utf-8")] = v.decode("utf-8") | |
request.body = environments.get("body") | |
request.method = environments.get("method", "get").upper().decode("utf-8") | |
request._environments = environments | |
return request | |
@property | |
def text(self) -> str: | |
return self.body.decode("utf-8") | |
@property | |
def json(self) -> dict: | |
if self.headers.get("content-type") == "application/json": | |
try: | |
return json.loads(self.body) | |
except Exception: | |
return {} | |
raise ValueError("content-type of response must be application/json") | |
def __str__(self): | |
return f"<Request: {self.method} {self.path}>" | |
class Response: | |
def __init__( | |
self, | |
content: str | dict | list = "", | |
status_code: int = 200, | |
headers: Headers = None | |
): | |
if isinstance(content, str): | |
self.content = content | |
elif isinstance(content, (dict, list)): | |
self.content = json.dumps(content) | |
else: | |
self.content = "" | |
self.status_code = status_code | |
self.headers = headers | |
def __str__(self): | |
return f"<Response {self.status_code}>" | |
class Application: | |
def __init__(self): | |
self._endpoints = {} | |
def add_endpoint(self, url: str, method: str, handler): | |
self._endpoints[(url, method.lower())] = handler | |
def get(self, url: str): | |
def deco(func): | |
self.add_endpoint(url, "GET", func) | |
return deco | |
def post(self, url: str): | |
def deco(func): | |
self.add_endpoint(url, "POST", func) | |
return deco | |
def put(self, url: str): | |
def deco(func): | |
self.add_endpoint(url, "PUT", func) | |
return deco | |
def patch(self, url: str): | |
def deco(func): | |
self.add_endpoint(url, "PATCH", func) | |
return deco | |
def delete(self, url: str): | |
def deco(func): | |
self.add_endpoint(url, "DELETE", func) | |
return deco | |
def get_handler_for(self, path: str, method: str): | |
return self._endpoints.get((path, method.lower())) | |
async def __call__(self, environment): | |
request = Request.from_environments(environment) | |
# routing | |
handler = self.get_handler_for(request.path, request.method) | |
if handler: | |
response = await handler(request) | |
else: | |
response = Response("Not Found", 404) | |
return self.make_http_response_message(response) | |
def make_http_response_message(self, response: Response) -> bytes: | |
lines = [] | |
lines.append(f"HTTP/1.1 {response.status_code} OK\r\n") | |
if response.headers: | |
for k, v in response.headers.items(): | |
lines.append(f"{k}:{v}\r\n") | |
lines.append("\r\n") | |
if response.content: | |
lines.append(response.content) | |
lines.append("\r\n") | |
lines.append("\r\n") | |
return "".join(lines) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from server import Server | |
from application import Application, Response | |
app = Application() | |
index_html = """ | |
<html> | |
<head> | |
<title>Fib</title> | |
</head> | |
<body> | |
<div> | |
<div id="fib">0</div> | |
<input type="number" id="number" /> | |
<button onclick="sendValue()">Calculate</button> | |
</div> | |
<script> | |
function sendValue() { | |
const value = parseInt(document.getElementById("number").value); | |
console.log({value}); | |
fetch("/fib", { | |
"method": "POST", | |
"headers": { | |
"content-type": "application/json", | |
}, | |
"body": JSON.stringify({ | |
"value": value, | |
}) | |
}).then(response => { | |
document.getElementById("fib").innerHTML = response.value; | |
}).catch(e => console.error(e)); | |
} | |
</script> | |
</body> | |
</html> | |
""" | |
@app.get("/") | |
async def homepage(request): | |
return Response(index_html, 200, {"content-type": "text/html"}) | |
@app.post("/fib") | |
async def greet(request): | |
data = request.json | |
a, b = 0, 1 | |
for i in range(data["value"]): | |
a = a + b | |
b = a - b | |
return Response({"value": a}, 201, {"content-type": "application/json"}) | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
s = Server(app) | |
loop.create_task(s.run()) | |
loop.run_forever() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
from queue import LifoQueue | |
int_pattern = "([\d]+)" | |
str_pattern = "([\w]+)" | |
def parse_url_pattern(pattern): | |
variables = [] | |
matcher = r"" | |
stack = LifoQueue() | |
stop_append = False | |
for token in pattern: | |
if token == '{': | |
stack.put(token) | |
stop_append = True | |
elif token == '}': | |
var_name = "" | |
while True: | |
char = stack.get() | |
if char == '{': | |
break | |
var_name = char + var_name | |
name, op_type = var_name.split(":") | |
variables.append((name, int if op_type == "int" else str)) | |
matcher += int_pattern if op_type == "int" else str_pattern | |
stop_append = False | |
else: | |
stack.put(token) | |
if not stop_append: | |
matcher += token | |
return variables, matcher | |
def parse_url(url, variables, matcher): | |
gs = re.match(matcher, url) | |
args = {} | |
if gs: | |
for i, var in enumerate(variables): | |
name, type = var | |
args[name] = type(gs.group(i+1)) | |
return args | |
if __name__ == "__main__": | |
path = "/users/{id:int}/info/{field_name:str}" | |
variables, matcher = parse_url_pattern(path) | |
url = "/users/1/info/username" | |
args = parse_url(url, variables, matcher) | |
print(args) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import asyncio | |
from urllib.parse import unquote | |
import httptools | |
class Protocol: | |
def __init__(self, app): | |
self.environment = { | |
"url": b"", | |
"url.path": b"", | |
"url.port": None, | |
"url.host": b"", | |
"url.schema": b"", | |
"url.query": b"", | |
"headers": [], | |
"body": b"", | |
"method": b"", | |
"http_version": b"", | |
"http_keep_alive": False, | |
"http_upgrade": False | |
} | |
self._content_length = -1 | |
self.parser = httptools.HttpRequestParser(self) | |
self.app = app | |
self.read_done = False | |
def on_message_complete(self): | |
self.environment["method"] = self.parser.get_method() | |
self.environment["http_version"] = self.parser.get_http_version()\ | |
.encode("utf-8") | |
self.environment["http_keep_alive"] = self.parser.should_keep_alive() | |
self.environment["http_upgrade"] = self.parser.should_upgrade() | |
self.read_done = True | |
self.t = asyncio.get_running_loop().create_task(self.on_request_done()) | |
def on_url(self, url: bytes): | |
self.environment["url"] += url | |
def on_headers_complete(self): | |
url = unquote(self.environment["url"].decode("utf-8")) | |
o = httptools.parse_url(url.encode("utf-8")) | |
self.environment["url.schema"] = o.schema or b"http" | |
self.environment["url.host"] = o.host or b"" | |
self.environment["url.port"] = o.port or 80 | |
self.environment["url.path"] = o.path or b"" | |
self.environment["url.query"] = o.query or b"" | |
for k, v in self.environment["headers"]: | |
if k.lower() == b"content-length": | |
self._content_length = int(v.decode("utf-8")) | |
break | |
def on_header(self, name: bytes, value: bytes): | |
self.environment["headers"].append((name, value)) | |
def on_body(self, body: bytes): | |
self.environment["body"] += body | |
def on_data(self, data: bytes): | |
self.parser.feed_data(data) | |
async def on_request_done(self): | |
response = await self.app(self.environment) | |
return response | |
async def process(self): | |
data = await self.t | |
return data | |
class Server: | |
def __init__(self, app): | |
self.app = app | |
self.loop = asyncio.get_event_loop() | |
async def run(self, host = "127.0.0.1", port = 5000): | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
s.setblocking(False) | |
s.bind((host, port)) | |
s.listen(1000) | |
print(f"Start server at {host}:{port}") | |
while True: | |
client, address = await self.loop.sock_accept(s) | |
self.loop.create_task(self.handle_client(client)) | |
async def handle_client(self, client: socket.socket): | |
protocol = Protocol(self.app) | |
with client: | |
while not protocol.read_done: | |
data = await self.loop.sock_recv(client, 64) | |
protocol.on_data(data) | |
data = await protocol.process() | |
await self.loop.sock_sendall(client, data.encode("utf-8")) | |
client.close() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment