Skip to content

Instantly share code, notes, and snippets.

@ryang14
Last active January 31, 2022 00:17
Show Gist options
  • Save ryang14/10ee177cac9740ea31f4a2310b7949ba to your computer and use it in GitHub Desktop.
Save ryang14/10ee177cac9740ea31f4a2310b7949ba to your computer and use it in GitHub Desktop.
CircuitPython HTTP server
class HTTPRequest:
def __init__(self, method="", path="", req_bytearray=None):
if req_bytearray is None:
self.method = method
self.path = path
else:
# Parse request data from byte array
lines = req_bytearray.decode("utf8").split('\r\n')
request = lines[0].split(' ')
self.method = request[0]
self.path = request[1]
# Format the request to use as a route key
def __repr__(self):
return "{} {}".format(self.method, self.path)
class HTTPResponse:
def __init__(self, status, content_type, body):
self.status = status
self.content_type = content_type
self.body = body
# Common MIME types from https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Common_types
self.mime_types = {
".aac": "audio/aac",
".abw": "application/x-abiword",
".arc": "application/x-freearc",
".avi": "video/x-msvideo",
".azw": "application/vnd.amazon.ebook",
".bin": "application/octet-stream",
".bmp": "image/bmp",
".bz": "application/x-bzip",
".bz2": "application/x-bzip2",
".csh": "application/x-csh",
".css": "text/css",
".csv": "text/csv",
".doc": "application/msword",
".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
".eot": "application/vnd.ms-fontobject",
".epub": "application/epub+zip",
".gz": "application/gzip",
".gif": "image/gif",
".html": "text/html",
".htm": "text/html",
".ico": "image/vnd.microsoft.icon",
".ics": "text/calendar",
".jar": "application/java-archive",
".jpeg .jpg": "image/jpeg",
".js": "text/javascript",
".json": "application/json",
".jsonld": "application/ld+json",
".mid": "audio/midi",
".midi": "audio/midi",
".mjs": "text/javascript",
".mp3": "audio/mpeg",
".cda": "application/x-cdf",
".mp4": "video/mp4",
".mpeg": "video/mpeg",
".mpkg": "application/vnd.apple.installer+xml",
".odp": "application/vnd.oasis.opendocument.presentation",
".ods": "application/vnd.oasis.opendocument.spreadsheet",
".odt": "application/vnd.oasis.opendocument.text",
".oga": "audio/ogg",
".ogv": "video/ogg",
".ogx": "application/ogg",
".opus": "audio/opus",
".otf": "font/otf",
".png": "image/png",
".pdf": "application/pdf",
".php": "application/x-httpd-php",
".ppt": "application/vnd.ms-powerpoint",
".pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
".rar": "application/vnd.rar",
".rtf": "application/rtf",
".sh": "application/x-sh",
".svg": "image/svg+xml",
".swf": "application/x-shockwave-flash",
".tar": "application/x-tar",
".tiff": "image/tiff",
".tif": "image/tiff",
".ts": "video/mp2t",
".ttf": "font/ttf",
".txt": "text/plain",
".vsd": "application/vnd.visio",
".wav": "audio/wav",
".weba": "audio/webm",
".webm": "video/webm",
".webp": "image/webp",
".woff": "font/woff",
".woff2": "font/woff2",
".xhtml": "application/xhtml+xml",
".xls": "application/vnd.ms-excel",
".xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
".xml": "application/xml",
".xul": "application/vnd.mozilla.xul+xml",
".zip": "application/zip",
".7z": "application/x-7z-compressed"
}
# Format the response with an HTTP header
def __repr__(self):
response_string = '''HTTP/1.1 {}
Content-Type: {}
Content-Length: {}
Connection: close
{}'''
return response_string.format(self.status, self.content_type, len(self.body), self.body)
# Use a file extention to set the MIME content type
def set_mime_from_extention(self, extention):
if extention in self.mime_types:
self.content_type = self.mime_types[extention]
else:
self.content_type = "text/plain"
class HTTPServer:
def __init__(self, socketpool):
self.__pool = socketpool
self.routes = dict()
# Add a route
# Example:
# @server.route(path, method)
# def route_func(request, response):
# res.status = "200 OK"
# res.body = "hello world"
def route(self, path, method="GET"):
def route_decorator(func):
self.routes[repr(HTTPRequest(method, path))] = func
return func
return route_decorator
def listen(self, host, port=80):
self.host = host
self.port = port
# Setup socket
self.__sock = self.__pool.socket(
self.__pool.AF_INET, self.__pool.SOCK_STREAM)
self.__sock.bind((self.host, self.port))
self.__sock.listen(1)
while True:
# Accept connection
print("Accepting connections")
conn, addr = self.__sock.accept()
with conn:
buff = bytearray(128)
# If reading fails, close connection and retry
try:
conn.recvfrom_into(buff)
except:
conn.close()
continue
request = HTTPRequest(req_bytearray=buff)
# Setup default not found response
response = HTTPResponse(
status="404 Not Found", body='404', content_type="text/plain")
# If a route exists for this request, call it. Otherwise try to serve a file
if repr(request) in self.routes:
self.routes[repr(request)](request, response)
else:
self.handle_file(request, response)
# Send response
bytes_sent = conn.send(repr(response))
while bytes_sent < len(repr(response)):
bytes_sent += conn.send(repr(response)[bytes_sent:])conn.send(repr(response))
conn.close()
def handle_file(self, request, response):
if request.method == "GET":
try:
# Read in the file
with open(request.path, "r") as file:
response.body = file.read()
# Set the response type
response.set_mime_from_extention(request.path.split('.')[-1])
except:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment