Skip to content

Instantly share code, notes, and snippets.

@Peter42
Created February 27, 2017 09:43
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Embed
What would you like to do?
IoT POST Request Processing Demo Server
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from json import loads, dumps
from threading import Thread
class IOTHandler(BaseHTTPRequestHandler):
def _send_success_headers(self, content_type="application/json", code=200):
self.send_response(code)
self.send_header('Content-Type', content_type)
self.end_headers()
def do_HEAD(self):
self._send_success_headers(content_type='text/html')
def do_GET(self):
self.do_HEAD()
self.wfile.write("<html><body>IoT POST Request Processing Demo Server</body></html>")
def do_POST(self):
content_length = int(self.headers.getheader('content-length', -1))
# no post data check
if content_length < 0:
self._send_success_headers(code=400)
return
try:
raw_body = self.rfile.read(content_length)
body = loads(raw_body)
print dumps(body, indent=4)
self._send_success_headers()
except ValueError:
print "Error parsing posted json"
self._send_success_headers(code=400)
def main(port=8000):
print "Starting server on port: " + str(port)
address = ('', port)
server = HTTPServer(address, IOTHandler)
try:
# sometimes [CTRL] + [C] can't interupt serve_forever, so we start a new thread as a workaround
thread = Thread(target = server.serve_forever)
thread.daemon = True
thread.start()
while thread.isAlive():
thread.join(1000)
except KeyboardInterrupt:
print "Stopping server"
server.server_close()
if __name__ == "__main__":
from sys import argv
if len(argv) > 2:
print "Usage: " + argv[0] + " [port]"
exit(1)
if len(argv) == 2:
main(port=int(argv[1]))
else:
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment