Skip to content

Instantly share code, notes, and snippets.

@magiskboy
Last active June 20, 2023 02:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save magiskboy/5c511a30ab2220d8f4c84d8e56eabf34 to your computer and use it in GitHub Desktop.
Save magiskboy/5c511a30ab2220d8f4c84d8e56eabf34 to your computer and use it in GitHub Desktop.
The simple HTTP server is based on socket programming
from urllib.parse import parse_qs
import json
class Headers(dict):
def __setitem__(self, k, v):
super().__setitem__(k.lower(), v)
def __getitem__(self, k):
return super().__getitem__(k.lower())
def get(self, key):
return super().get(key.lower())
@classmethod
def from_list(cls, headers) -> "Headers":
h = cls()
for k, v in headers:
h.__setitem__(k, v)
return h
class Request:
def __init__(self):
self.url = ""
self.schema = ""
self.host = ""
self.port = 80
self.path = ""
self.args = {}
self.headers = Headers()
self.body = b""
self.method = ""
self._environments = {}
@classmethod
def from_environments(cls, environments: dict) -> "Request":
headers = Headers.from_list(environments.get("headers", []))
request = cls()
request.url = environments.get("url").decode("utf-8")
request.schema = environments.get("url.schema", b"http").decode("utf-8")
request.host = environments.get("url.host") or headers.get("host")
if isinstance(request.host, bytes):
request.host = request.host.decode("utf-8")
request.port = environments.get("url.port") or 80
request.path = environments.get("url.path").decode("utf-8")
request.args = parse_qs(environments.get("url.query", ""))
request.headers = Headers()
for k, v in headers.items():
request.headers[k.decode("utf-8")] = v.decode("utf-8")
request.body = environments.get("body")
request.method = environments.get("method", "get").upper().decode("utf-8")
request._environments = environments
return request
@property
def text(self) -> str:
return self.body.decode("utf-8")
@property
def json(self) -> dict:
if self.headers.get("content-type") == "application/json":
try:
return json.loads(self.body)
except Exception:
return {}
raise ValueError("content-type of response must be application/json")
def __str__(self):
return f"<Request: {self.method} {self.path}>"
class Response:
def __init__(
self,
content: str | dict | list = "",
status_code: int = 200,
headers: Headers = None
):
if isinstance(content, str):
self.content = content
elif isinstance(content, (dict, list)):
self.content = json.dumps(content)
else:
self.content = ""
self.status_code = status_code
self.headers = headers
def __str__(self):
return f"<Response {self.status_code}>"
class Application:
def __init__(self):
self._endpoints = {}
def add_endpoint(self, url: str, method: str, handler):
self._endpoints[(url, method.lower())] = handler
def get(self, url: str):
def deco(func):
self.add_endpoint(url, "GET", func)
return deco
def post(self, url: str):
def deco(func):
self.add_endpoint(url, "POST", func)
return deco
def put(self, url: str):
def deco(func):
self.add_endpoint(url, "PUT", func)
return deco
def patch(self, url: str):
def deco(func):
self.add_endpoint(url, "PATCH", func)
return deco
def delete(self, url: str):
def deco(func):
self.add_endpoint(url, "DELETE", func)
return deco
def get_handler_for(self, path: str, method: str):
return self._endpoints.get((path, method.lower()))
async def __call__(self, environment):
request = Request.from_environments(environment)
# routing
handler = self.get_handler_for(request.path, request.method)
if handler:
response = await handler(request)
else:
response = Response("Not Found", 404)
return self.make_http_response_message(response)
def make_http_response_message(self, response: Response) -> bytes:
lines = []
lines.append(f"HTTP/1.1 {response.status_code} OK\r\n")
if response.headers:
for k, v in response.headers.items():
lines.append(f"{k}:{v}\r\n")
lines.append("\r\n")
if response.content:
lines.append(response.content)
lines.append("\r\n")
lines.append("\r\n")
return "".join(lines)
import asyncio
from server import Server
from application import Application, Response
app = Application()
index_html = """
<html>
<head>
<title>Fib</title>
</head>
<body>
<div>
<div id="fib">0</div>
<input type="number" id="number" />
<button onclick="sendValue()">Calculate</button>
</div>
<script>
function sendValue() {
const value = parseInt(document.getElementById("number").value);
console.log({value});
fetch("/fib", {
"method": "POST",
"headers": {
"content-type": "application/json",
},
"body": JSON.stringify({
"value": value,
})
}).then(response => {
document.getElementById("fib").innerHTML = response.value;
}).catch(e => console.error(e));
}
</script>
</body>
</html>
"""
@app.get("/")
async def homepage(request):
return Response(index_html, 200, {"content-type": "text/html"})
@app.post("/fib")
async def greet(request):
data = request.json
a, b = 0, 1
for i in range(data["value"]):
a = a + b
b = a - b
return Response({"value": a}, 201, {"content-type": "application/json"})
if __name__ == "__main__":
loop = asyncio.get_event_loop()
s = Server(app)
loop.create_task(s.run())
loop.run_forever()
import re
from queue import LifoQueue
int_pattern = "([\d]+)"
str_pattern = "([\w]+)"
def parse_url_pattern(pattern):
variables = []
matcher = r""
stack = LifoQueue()
stop_append = False
for token in pattern:
if token == '{':
stack.put(token)
stop_append = True
elif token == '}':
var_name = ""
while True:
char = stack.get()
if char == '{':
break
var_name = char + var_name
name, op_type = var_name.split(":")
variables.append((name, int if op_type == "int" else str))
matcher += int_pattern if op_type == "int" else str_pattern
stop_append = False
else:
stack.put(token)
if not stop_append:
matcher += token
return variables, matcher
def parse_url(url, variables, matcher):
gs = re.match(matcher, url)
args = {}
if gs:
for i, var in enumerate(variables):
name, type = var
args[name] = type(gs.group(i+1))
return args
if __name__ == "__main__":
path = "/users/{id:int}/info/{field_name:str}"
variables, matcher = parse_url_pattern(path)
url = "/users/1/info/username"
args = parse_url(url, variables, matcher)
print(args)
import socket
import asyncio
from urllib.parse import unquote
import httptools
class Protocol:
def __init__(self, app):
self.environment = {
"url": b"",
"url.path": b"",
"url.port": None,
"url.host": b"",
"url.schema": b"",
"url.query": b"",
"headers": [],
"body": b"",
"method": b"",
"http_version": b"",
"http_keep_alive": False,
"http_upgrade": False
}
self._content_length = -1
self.parser = httptools.HttpRequestParser(self)
self.app = app
self.read_done = False
def on_message_complete(self):
self.environment["method"] = self.parser.get_method()
self.environment["http_version"] = self.parser.get_http_version()\
.encode("utf-8")
self.environment["http_keep_alive"] = self.parser.should_keep_alive()
self.environment["http_upgrade"] = self.parser.should_upgrade()
self.read_done = True
self.t = asyncio.get_running_loop().create_task(self.on_request_done())
def on_url(self, url: bytes):
self.environment["url"] += url
def on_headers_complete(self):
url = unquote(self.environment["url"].decode("utf-8"))
o = httptools.parse_url(url.encode("utf-8"))
self.environment["url.schema"] = o.schema or b"http"
self.environment["url.host"] = o.host or b""
self.environment["url.port"] = o.port or 80
self.environment["url.path"] = o.path or b""
self.environment["url.query"] = o.query or b""
for k, v in self.environment["headers"]:
if k.lower() == b"content-length":
self._content_length = int(v.decode("utf-8"))
break
def on_header(self, name: bytes, value: bytes):
self.environment["headers"].append((name, value))
def on_body(self, body: bytes):
self.environment["body"] += body
def on_data(self, data: bytes):
self.parser.feed_data(data)
async def on_request_done(self):
response = await self.app(self.environment)
return response
async def process(self):
data = await self.t
return data
class Server:
def __init__(self, app):
self.app = app
self.loop = asyncio.get_event_loop()
async def run(self, host = "127.0.0.1", port = 5000):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.setblocking(False)
s.bind((host, port))
s.listen(1000)
print(f"Start server at {host}:{port}")
while True:
client, address = await self.loop.sock_accept(s)
self.loop.create_task(self.handle_client(client))
async def handle_client(self, client: socket.socket):
protocol = Protocol(self.app)
with client:
while not protocol.read_done:
data = await self.loop.sock_recv(client, 64)
protocol.on_data(data)
data = await protocol.process()
await self.loop.sock_sendall(client, data.encode("utf-8"))
client.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment