Skip to content

Instantly share code, notes, and snippets.

@JRodez
Created November 22, 2023 10:00
Show Gist options
  • Save JRodez/c1a1a7ca1725abd4212f64412b94df83 to your computer and use it in GitHub Desktop.
Save JRodez/c1a1a7ca1725abd4212f64412b94df83 to your computer and use it in GitHub Desktop.
OneM2M notification server
# server.py
import http.server # Our http server handler for http requests
import socketserver # Establish the TCP Socket connections
import time
PORT = 9999
class MyHttpRequestHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self) -> None:
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(bytes("<html><head><title>[ACME] Notification Server</title></head><body>This server doesn't provide a web page.</body></html>","utf-8"))
def do_POST(self) -> None:
""" Handle notification.
"""
_responseHeaders:list = []
# Get headers and content data
length = int(self.headers['Content-Length'])
contentType = self.headers['Content-Type']
requestID = self.headers['X-M2M-RI']
post_data = self.rfile.read(length)
# Print the content data
print(f'### Received Notification (http)')
print(self.headers)
# Construct return header
# Always acknowledge the verification requests
self.send_response(200)
self.send_header('X-M2M-RSC', '2000')
self.send_header('X-M2M-RI', requestID)
_responseHeaders = self._headers_buffer # type:ignore [attr-defined]
self.end_headers()
Handler = MyHttpRequestHandler
time.sleep(3)
with socketserver.TCPServer(("", PORT), Handler) as httpd:
print("Http Server Serving at port", PORT)
httpd.serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment