Created
May 27, 2020 18:09
-
-
Save Cactus64k/3214e7dc9b845fadb3ce2efddf889f6e to your computer and use it in GitHub Desktop.
Micropython class for Web server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import gc | |
HTTP_STATUS = """ | |
<html> | |
<head> | |
<meta http-equiv="content-type" content="text/html; charset=UTF-8"> | |
<title>ESP Web Server:{status} Status Code</title> | |
</head> | |
<body bgcolor="white"> | |
<center><h1>{status} Status Code</h1></center> | |
<hr> | |
<center>ESP Web Server</center> | |
</body> | |
</html>""" | |
class WebServer(object): | |
def __del__(self): | |
if self.socket is not None: | |
self.socket.close() | |
self.socket = None | |
if self.handlers is not None: | |
self.handlers.clear() | |
self.handlers = None | |
def __init__(self, host="0.0.0.0", port=80, queue=16): | |
try: | |
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
self.socket.bind((host, port)) | |
self.socket.listen(queue) | |
self.conn = None | |
self.host = host | |
self.port = port | |
self.handlers = dict() | |
print("Starting web server on {}:{}".format(host, port)) | |
except OSError as msg: | |
print("ERROR: {}".format(msg)) | |
self.socket.close() | |
self.socket = None | |
raise Exception("Failed to initialize socket") | |
def __send_data(self, status=200, data="", data_type="text/plain"): | |
data = data.encode("utf-8") | |
self.conn.send(b"HTTP/1.1 {}\r\n".format(status)) | |
self.conn.send(b"Server: esp8266\r\n") | |
self.conn.send(b"Connection: close\r\n") | |
self.conn.send(b"Content-Type: {}\r\n".format(data_type)) | |
self.conn.send(b"Content-Length: {}\r\n".format(len(data))) | |
self.conn.send(b"\r\n") | |
self.conn.sendall(data) | |
self.conn.close() | |
def __handle_request(self, method, path, params): | |
status = 0 | |
data = "" | |
data_type = "text/html" | |
try: | |
if path in self.handlers: | |
status, data, data_type = self.handlers[path](method, path, params) | |
else: | |
status = 404 | |
except Exception as e: | |
print("ERROR: failed to handle url {} {}".format(path, e)) | |
status = 500 | |
if status != 200: | |
data = HTTP_STATUS.format(status = status) | |
return status, data, data_type | |
def add_handler(self, url, func): | |
self.handlers[url] = func | |
def remove_handler(self, url): | |
del self.handlers[url] | |
def start_server(self): | |
try: | |
while True: | |
self.conn, addr = self.socket.accept() | |
request_data = self.conn.recv(1024).decode("utf-8") | |
request, other = request_data.split("\r\n", 1) # get first string | |
print("{}:{} {}".format(addr[0], addr[1], request)) | |
method, url, proto = request.split(" ") # GET /style.css HTTP/1.1 | |
params = dict() | |
try: | |
path, params_line = url.split("?", 1) # GET /?led=on HTTP/1.1 | |
for parm in params_line.split("&"): | |
key, val = parm.split("=", 1) | |
params[key] = val | |
except ValueError: | |
path = url | |
status, data, data_type = self.__handle_request(method, path, params) | |
self.__send_data(status, data, data_type) | |
gc.collect() | |
except Exception as e: | |
print("ERROR: in main web server loop {}".format(e)) | |
finally: | |
self.socket.close() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment