Skip to content

Instantly share code, notes, and snippets.

@rkachowski
Created October 18, 2021 21:05
Show Gist options
  • Save rkachowski/f61eada9846e9062fdb94e9319153bd1 to your computer and use it in GitHub Desktop.
Save rkachowski/f61eada9846e9062fdb94e9319153bd1 to your computer and use it in GitHub Desktop.
get data from dht, co2 meter and noise level sensor
import paho.mqtt.client as mqtt
import Adafruit_DHT
import usb.core
import statistics
import requests
import sdnotify
import sys, fcntl, time, os, argparse, socket
from time import sleep
delay = 5
broker = "kitchenpi.lan"
lwt = "alive/stats_relay"
topic = "stats"
n = sdnotify.SystemdNotifier()
h = []
t = []
s = []
buffer_size = 5
min_deviation = 5
def app(list, val):
if len(list) < buffer_size:
list.append(val)
return val
else:
#return last sample if val is weird outlier
std_dev = statistics.stdev(list)
if abs(statistics.mean(list) - val) > (std_dev * 2 + min_deviation) :
return list[len(list)-1]
else:
list.pop(0)
list.append(val)
return val
dev = usb.core.find(idVendor=0x16c0, idProduct=0x5dc)
def on_connect(client, userdata, flags, rc):
if rc == 3:
print("Server unavailable")
on_disconnect(client, userdata, rc)
return
print("connected to mqtt broker")
client.will_set(lwt, "dead", QOS1, retain=True)
client = mqtt.Client("stats_relay")
client.on_connect = on_connect
client.connect(broker, 1883,60)
client.loop_start()
def notify():
requests.get(url = "https://hc-ping.com/53816436-ddba-490d-b113-8a7d9017c629")
n.notify("WATCHDOG=1")
def decrypt_co2(key, data):
cstate = [0x48, 0x74, 0x65, 0x6D, 0x70, 0x39, 0x39, 0x65]
shuffle = [2, 4, 0, 7, 1, 6, 5, 3]
phase1 = [0] * 8
for i, o in enumerate(shuffle):
phase1[o] = data[i]
phase2 = [0] * 8
for i in range(8):
phase2[i] = phase1[i] ^ key[i]
phase3 = [0] * 8
for i in range(8):
phase3[i] = ( (phase2[i] >> 3) | (phase2[ (i-1+8)%8 ] << 5) ) & 0xff
ctmp = [0] * 8
for i in range(8):
ctmp[i] = ( (cstate[i] >> 4) | (cstate[i]<<4) ) & 0xff
out = [0] * 8
for i in range(8):
out[i] = (0x100 + phase3[i] - ctmp[i]) & 0xff
return out
def stat_loop():
key = [0xc4, 0xc6, 0xc0, 0x92, 0x40, 0x23, 0xdc, 0x96]
fp = open("/dev/hidraw0", "a+b", 0)
HIDIOCSFEATURE_9 = 0xC0094806
set_report = "\x00" + "".join(chr(e) for e in key)
fcntl.ioctl(fp, HIDIOCSFEATURE_9, set_report)
values = {}
data_encrypted_print = False
while(1):
data = list(ord(e) for e in fp.read(8))
if data[4] == 0x0d and (sum(data[:3]) & 0xff) == data[3]:
decrypted = data
if data_encrypted_print == False:
print "Info: data not encrypted"
data_encrypted_print = True
else:
decrypted = decrypt(key, data)
if decrypted[4] != 0x0d or (sum(decrypted[:3]) & 0xff) != decrypted[3]:
print "co2 device Checksum error"
else:
op = decrypted[0]
val = decrypted[1] << 8 | decrypted[2]
values[op] = val
if (0x50 in values) and (0x42 in values):
co2 = values[0x50]
tmp = (values[0x42]/16.0-273.15)
client.publish(topic + "/co2", co2)
client.publish(topic + "/co2_temp", tmp)
print "co2: ", co2, " co2_temp : ", tmp
humidity, temperature = Adafruit_DHT.read_retry(Adafruit_DHT.DHT22, 18)
print "raw dht22 temperature: ", temperature, " humidity : ", humidity
humidity = app(h, humidity)
temperature = app(t, temperature)
client.publish(topic + "/humidity", humidity)
client.publish(topic + "/temperature", temperature)
ret = dev.ctrl_transfer(0xC0, 4, 0, 0, 200)
dB = (ret[0] + ((ret[1] & 3) * 256)) * 0.1 + 30
client.publish(topic + "/raw_sound_level", dB)
s.append(dB)
if len(s) > buffer_size:
s.pop(0)
dB = statistics.mean(s)
client.publish(topic + "/sound_level", dB)
print "dht22 temperature: ", temperature, " humidity : ", humidity, " db level: ", dB
notify()
sleep(delay)
stat_loop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment