-
-
Save dfrankow/f91aefd683ece8e696c26e183d696c29 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python | |
"""A simple HTTP server with REST and json for python 3. | |
addrecord takes utf8-encoded URL parameters | |
getrecord returns utf8-encoded json. | |
""" | |
import argparse | |
import json | |
import re | |
import threading | |
from email.message import EmailMessage | |
from http import HTTPStatus | |
from http.server import BaseHTTPRequestHandler, HTTPServer | |
from urllib import parse | |
def _parse_header(content_type): | |
m = EmailMessage() | |
m["content-type"] = content_type | |
return m.get_content_type(), m["content-type"].params | |
class LocalData(object): | |
records = {} | |
class HTTPRequestHandler(BaseHTTPRequestHandler): | |
def do_POST(self): | |
if re.search("/api/v1/addrecord/*", self.path): | |
ctype, pdict = _parse_header(self.headers.get("content-type")) | |
if ctype == "application/json": | |
length = int(self.headers.get("content-length")) | |
rfile_str = self.rfile.read(length).decode("utf8") | |
data = parse.parse_qs(rfile_str, keep_blank_values=True) | |
record_id = self.path.split("/")[-1] | |
LocalData.records[record_id] = data | |
print("addrecord %s: %s" % (record_id, data)) | |
self.send_response(HTTPStatus.OK) | |
else: | |
self.send_response( | |
HTTPStatus.BAD_REQUEST, "Bad Request: must give data" | |
) | |
else: | |
self.send_response(HTTPStatus.FORBIDDEN) | |
self.end_headers() | |
def do_GET(self): | |
if re.search("/api/v1/shutdown", self.path): | |
# Must shutdown in another thread or we'll hang | |
def kill_me_please(): | |
self.server.shutdown() | |
threading.Thread(target=kill_me_please).start() | |
# Send out a 200 before we go | |
self.send_response(HTTPStatus.OK) | |
elif re.search("/api/v1/getrecord/*", self.path): | |
record_id = self.path.split("/")[-1] | |
if record_id in LocalData.records: | |
self.send_response(HTTPStatus.OK) | |
self.send_header("Content-Type", "application/json") | |
self.end_headers() | |
# Return json, even though it came in as POST URL params | |
data = json.dumps(LocalData.records[record_id]) | |
print("getrecord %s: %s" % (record_id, data)) | |
self.wfile.write(data.encode("utf8")) | |
else: | |
self.send_response( | |
HTTPStatus.NOT_FOUND, "Not Found: record does not exist" | |
) | |
else: | |
self.send_response(HTTPStatus.BAD_REQUEST) | |
self.end_headers() | |
def main(): | |
parser = argparse.ArgumentParser(description="HTTP Server") | |
parser.add_argument("port", type=int, help="Listening port for HTTP Server") | |
parser.add_argument("ip", help="HTTP Server IP") | |
args = parser.parse_args() | |
server = HTTPServer((args.ip, args.port), HTTPRequestHandler) | |
print("HTTP Server Running...........") | |
server.serve_forever() | |
if __name__ == "__main__": | |
main() |
@loomsen Sure. I ended up moving to flask. I find it more straightforward. See https://gist.github.com/dfrankow/14fa994d7edcbe2e88f54823b90b41a3.
thanks.veryuseful!
Great! Thanks for letting me know.
Again, just FYI, I decided I liked flask better. See link above.
cgi is deprecated and slated for removal in 3.13
Here's a replacement for cgi.parse_header
from email.message import EmailMessage
def parse_header(content_type):
m = EmailMessage()
m['content-type'] = content_type
return m.get_content_type(), m['content-type'].params
Thanks @iwconfig, I added your changes. I'd still likely use flask. :)
Yeah, I would too. :)
Hello dfrankow. Maybe it would be better to use codes from HTTPStaus python module? For example here. It may go like this:
from http import HTTPStatus
...
self.send_response(HTTPStatus.OK)
Also, in this case, your code will not require a comments such as this
What do you think?
Hello dfrankow. Maybe it would be better to use codes from HTTPStatus python module?
Good idea. Done.
Thank you very much for your effort 👍