Skip to content

Instantly share code, notes, and snippets.

@shirayu
Created September 8, 2023 11:46
Show Gist options
  • Save shirayu/28ed35686a3e0b38d25f4e6f0beb5379 to your computer and use it in GitHub Desktop.
Save shirayu/28ed35686a3e0b38d25f4e6f0beb5379 to your computer and use it in GitHub Desktop.
UD-CO2S にシリアル接続して結果をJSONで配信するウェブサーバー
#!/usr/bin/env python3
import argparse
import datetime
import json
import re
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
import serial
class RequestHandler(BaseHTTPRequestHandler):
stat = {}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def do_GET(self):
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
response_json = json.dumps(RequestHandler.stat)
self.wfile.write(response_json.encode())
def fix(stat):
# https://blog.mono0x.net/2023/09/03/ud-co2s-temperature-and-humidity/
temp0: float = stat["temperature"]
temp1: float = temp0 - 4.5
stat["temperature"] = temp1
stat["humidity"] = (
stat["humidity"]
* (pow(10, (7.5 * temp0 / (temp0 + 237.3))))
/ (pow(10, (7.5 * (temp1) / ((temp1) + 237.3))))
)
return stat
def serial_reader(dev: str):
# https://gist.github.com/oquno/d07f6dbf8cc760f2534d9914efe79801
regex = re.compile(
r"CO2=(?P<co2>\d+),HUM=(?P<hum>\d+\.\d+),TMP=(?P<tmp>-?\d+\.\d+)"
)
with serial.Serial(dev, 115200, timeout=6) as conn:
conn.write("STA\r\n".encode())
ret: str = conn.readline().decode().strip()
assert len(ret) > 0
while True:
line = conn.readline().decode().strip()
m = regex.match(line)
if m is not None:
stat = {
"co2ppm": int(m.group("co2")),
"humidity": float(m.group("hum")),
"temperature": float(m.group("tmp")),
}
stat = fix(stat)
RequestHandler.stat = {
"time": int(datetime.datetime.now().timestamp()),
"stat": stat,
}
# conn.write("STP\r\n")
def operation(
*,
path_in: Path,
host: str = "localhost",
port: int = 5605,
) -> None:
dev = str(path_in)
server = HTTPServer((host, port), RequestHandler)
# シリアル通信を読み取るスレッドを開始
serial_thread = threading.Thread(target=serial_reader, args=(dev,))
serial_thread.daemon = True
serial_thread.start()
print(f"Starting server on http://{host}:{port}")
try:
server.serve_forever()
except KeyboardInterrupt:
pass
server.server_close()
def get_opts() -> argparse.Namespace:
oparser = argparse.ArgumentParser()
oparser.add_argument(
"--input", "-i", type=Path, default="/dev/ttyACM0", required=False
)
return oparser.parse_args()
def main() -> None:
opts = get_opts()
operation(
path_in=opts.input,
)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment