Last active
January 31, 2022 00:17
-
-
Save ryang14/10ee177cac9740ea31f4a2310b7949ba to your computer and use it in GitHub Desktop.
CircuitPython HTTP server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class HTTPRequest: | |
def __init__(self, method="", path="", req_bytearray=None): | |
if req_bytearray is None: | |
self.method = method | |
self.path = path | |
else: | |
# Parse request data from byte array | |
lines = req_bytearray.decode("utf8").split('\r\n') | |
request = lines[0].split(' ') | |
self.method = request[0] | |
self.path = request[1] | |
# Format the request to use as a route key | |
def __repr__(self): | |
return "{} {}".format(self.method, self.path) | |
class HTTPResponse: | |
def __init__(self, status, content_type, body): | |
self.status = status | |
self.content_type = content_type | |
self.body = body | |
# Common MIME types from https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Common_types | |
self.mime_types = { | |
".aac": "audio/aac", | |
".abw": "application/x-abiword", | |
".arc": "application/x-freearc", | |
".avi": "video/x-msvideo", | |
".azw": "application/vnd.amazon.ebook", | |
".bin": "application/octet-stream", | |
".bmp": "image/bmp", | |
".bz": "application/x-bzip", | |
".bz2": "application/x-bzip2", | |
".csh": "application/x-csh", | |
".css": "text/css", | |
".csv": "text/csv", | |
".doc": "application/msword", | |
".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document", | |
".eot": "application/vnd.ms-fontobject", | |
".epub": "application/epub+zip", | |
".gz": "application/gzip", | |
".gif": "image/gif", | |
".html": "text/html", | |
".htm": "text/html", | |
".ico": "image/vnd.microsoft.icon", | |
".ics": "text/calendar", | |
".jar": "application/java-archive", | |
".jpeg .jpg": "image/jpeg", | |
".js": "text/javascript", | |
".json": "application/json", | |
".jsonld": "application/ld+json", | |
".mid": "audio/midi", | |
".midi": "audio/midi", | |
".mjs": "text/javascript", | |
".mp3": "audio/mpeg", | |
".cda": "application/x-cdf", | |
".mp4": "video/mp4", | |
".mpeg": "video/mpeg", | |
".mpkg": "application/vnd.apple.installer+xml", | |
".odp": "application/vnd.oasis.opendocument.presentation", | |
".ods": "application/vnd.oasis.opendocument.spreadsheet", | |
".odt": "application/vnd.oasis.opendocument.text", | |
".oga": "audio/ogg", | |
".ogv": "video/ogg", | |
".ogx": "application/ogg", | |
".opus": "audio/opus", | |
".otf": "font/otf", | |
".png": "image/png", | |
".pdf": "application/pdf", | |
".php": "application/x-httpd-php", | |
".ppt": "application/vnd.ms-powerpoint", | |
".pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation", | |
".rar": "application/vnd.rar", | |
".rtf": "application/rtf", | |
".sh": "application/x-sh", | |
".svg": "image/svg+xml", | |
".swf": "application/x-shockwave-flash", | |
".tar": "application/x-tar", | |
".tiff": "image/tiff", | |
".tif": "image/tiff", | |
".ts": "video/mp2t", | |
".ttf": "font/ttf", | |
".txt": "text/plain", | |
".vsd": "application/vnd.visio", | |
".wav": "audio/wav", | |
".weba": "audio/webm", | |
".webm": "video/webm", | |
".webp": "image/webp", | |
".woff": "font/woff", | |
".woff2": "font/woff2", | |
".xhtml": "application/xhtml+xml", | |
".xls": "application/vnd.ms-excel", | |
".xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", | |
".xml": "application/xml", | |
".xul": "application/vnd.mozilla.xul+xml", | |
".zip": "application/zip", | |
".7z": "application/x-7z-compressed" | |
} | |
# Format the response with an HTTP header | |
def __repr__(self): | |
response_string = '''HTTP/1.1 {} | |
Content-Type: {} | |
Content-Length: {} | |
Connection: close | |
{}''' | |
return response_string.format(self.status, self.content_type, len(self.body), self.body) | |
# Use a file extention to set the MIME content type | |
def set_mime_from_extention(self, extention): | |
if extention in self.mime_types: | |
self.content_type = self.mime_types[extention] | |
else: | |
self.content_type = "text/plain" | |
class HTTPServer: | |
def __init__(self, socketpool): | |
self.__pool = socketpool | |
self.routes = dict() | |
# Add a route | |
# Example: | |
# @server.route(path, method) | |
# def route_func(request, response): | |
# res.status = "200 OK" | |
# res.body = "hello world" | |
def route(self, path, method="GET"): | |
def route_decorator(func): | |
self.routes[repr(HTTPRequest(method, path))] = func | |
return func | |
return route_decorator | |
def listen(self, host, port=80): | |
self.host = host | |
self.port = port | |
# Setup socket | |
self.__sock = self.__pool.socket( | |
self.__pool.AF_INET, self.__pool.SOCK_STREAM) | |
self.__sock.bind((self.host, self.port)) | |
self.__sock.listen(1) | |
while True: | |
# Accept connection | |
print("Accepting connections") | |
conn, addr = self.__sock.accept() | |
with conn: | |
buff = bytearray(128) | |
# If reading fails, close connection and retry | |
try: | |
conn.recvfrom_into(buff) | |
except: | |
conn.close() | |
continue | |
request = HTTPRequest(req_bytearray=buff) | |
# Setup default not found response | |
response = HTTPResponse( | |
status="404 Not Found", body='404', content_type="text/plain") | |
# If a route exists for this request, call it. Otherwise try to serve a file | |
if repr(request) in self.routes: | |
self.routes[repr(request)](request, response) | |
else: | |
self.handle_file(request, response) | |
# Send response | |
bytes_sent = conn.send(repr(response)) | |
while bytes_sent < len(repr(response)): | |
bytes_sent += conn.send(repr(response)[bytes_sent:])conn.send(repr(response)) | |
conn.close() | |
def handle_file(self, request, response): | |
if request.method == "GET": | |
try: | |
# Read in the file | |
with open(request.path, "r") as file: | |
response.body = file.read() | |
# Set the response type | |
response.set_mime_from_extention(request.path.split('.')[-1]) | |
except: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment