Skip to content

Instantly share code, notes, and snippets.

@evindunn
Last active September 28, 2021 20:38
Show Gist options
  • Save evindunn/082b7e22aba2e5104de3ed19e87f38ba to your computer and use it in GitHub Desktop.
Save evindunn/082b7e22aba2e5104de3ed19e87f38ba to your computer and use it in GitHub Desktop.
DHT11 Temperature Sensor as HTTP Service
#!/usr/bin/env python3
from argparse import ArgumentParser
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
from json import dumps as json_dumps
from threading import Thread, RLock
from time import sleep
import Adafruit_DHT
GPIO_PIN_DEFAULT = 4
class Handler(BaseHTTPRequestHandler):
CODE_INTERNAL_SERVER_ERROR = 500
CODE_OK = 200
CONTENT_TYPE = "application/json;charset=utf-8"
protocol_version = "HTTP/1.1"
error_content_type = CONTENT_TYPE
error_message_format = json_dumps({"message": "%(explain)s"})
def do_GET(self):
self.send_response(Handler.CODE_OK)
self.send_header("Content-Type", Handler.CONTENT_TYPE)
response = json_dumps(self.server.sensor_data).encode("utf-8")
self.send_header("Content-Length", len(response))
self.end_headers()
self.wfile.write(response)
class Server(ThreadingHTTPServer):
SENSOR_POLL_INTERVAL = 1
allow_reuse_address = True
def __init__(self, address, handler, gpio_pin):
super().__init__(address, handler)
self._sensor_data = {"humidity": 0, "temperature": 0}
self._sensor_lock = RLock()
self._sensor_thread = Thread(
target=Server.poll_sensor,
args=(self._sensor_lock, gpio_pin, self._sensor_data),
daemon=True
)
self._sensor_thread.start()
@property
def sensor_data(self):
with self._sensor_lock:
return self._sensor_data
@staticmethod
def poll_sensor(lock, gpio_pin, data):
while True:
humidity, temperature = Adafruit_DHT.read_retry(Adafruit_DHT.DHT11, gpio_pin)
with lock:
data["humidity"] = humidity
data["temperature"] = temperature
sleep(Server.SENSOR_POLL_INTERVAL)
if __name__ == "__main__":
parser = ArgumentParser(description="Temperature and Humidity Server")
parser.add_argument("port", type=int, help="The port to the the server on")
parser.add_argument("--gpio-pin", type=int, help="The GPIO pin that the sensor is connected to", default=GPIO_PIN_DEFAULT)
args = parser.parse_args()
with Server(("0.0.0.0", args.port), Handler, args.gpio_pin) as httpd:
print("Server running on port {}...".format(args.port))
try:
httpd.serve_forever()
except KeyboardInterrupt:
httpd.shutdown()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment