Skip to content

Instantly share code, notes, and snippets.

@Peter42
Created Feb 27, 2017
Embed
What would you like to do?
IoT POST Request Processing Demo Server
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from json import loads, dumps
from threading import Thread
class IOTHandler(BaseHTTPRequestHandler):
def _send_success_headers(self, content_type="application/json", code=200):
self.send_response(code)
self.send_header('Content-Type', content_type)
self.end_headers()
def do_HEAD(self):
self._send_success_headers(content_type='text/html')
def do_GET(self):
self.do_HEAD()
self.wfile.write("<html><body>IoT POST Request Processing Demo Server</body></html>")
def do_POST(self):
content_length = int(self.headers.getheader('content-length', -1))
# no post data check
if content_length < 0:
self._send_success_headers(code=400)
return
try:
raw_body = self.rfile.read(content_length)
body = loads(raw_body)
print dumps(body, indent=4)
self._send_success_headers()
except ValueError:
print "Error parsing posted json"
self._send_success_headers(code=400)
def main(port=8000):
print "Starting server on port: " + str(port)
address = ('', port)
server = HTTPServer(address, IOTHandler)
try:
# sometimes [CTRL] + [C] can't interupt serve_forever, so we start a new thread as a workaround
thread = Thread(target = server.serve_forever)
thread.daemon = True
thread.start()
while thread.isAlive():
thread.join(1000)
except KeyboardInterrupt:
print "Stopping server"
server.server_close()
if __name__ == "__main__":
from sys import argv
if len(argv) > 2:
print "Usage: " + argv[0] + " [port]"
exit(1)
if len(argv) == 2:
main(port=int(argv[1]))
else:
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment