Skip to content

Instantly share code, notes, and snippets.

@pepijndevos
Created April 9, 2020 08:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pepijndevos/2cc4a32a9c600937e47db03786e14b4e to your computer and use it in GitHub Desktop.
Save pepijndevos/2cc4a32a9c600937e47db03786e14b4e to your computer and use it in GitHub Desktop.
Send info from a DHT11 and SGP30 to an Influxdb
import time
import board
import busio
import adafruit_sgp30
import adafruit_dht
from influxdb import InfluxDBClient
from math import exp
from queue import Queue
from threading import Thread, Event
i2c = busio.I2C(board.SCL, board.SDA, frequency=100000)
sgp30 = adafruit_sgp30.Adafruit_SGP30(i2c)
dhtDevice = adafruit_dht.DHT11(board.D4)
dbname = "air"
client = InfluxDBClient(database=dbname)
measurement = "sgp30"
def getRelativeHumidity():
qr = client.query('SELECT mean(temperature) as temperature, mean(humidity) as humidity FROM "dht11" WHERE time > now() - 1m')
res = next(qr.get_points())
return res['temperature'], res['humidity']
def getAbsoluteHumidity(temperature, humidity):
# approximation formula from Sensirion SGP30 Driver Integration chapter 3.15
absoluteHumidity = 216.7 * ((humidity / 100.0) * 6.112 * exp((17.62 * temperature) / (243.12 + temperature)) / (273.15 + temperature)) # [g/m^3]
#absoluteHumidityScaled = int(1000.0 * absoluteHumidity) # [mg/m^3]
return absoluteHumidity
print("SGP30 serial #", [hex(i) for i in sgp30.serial])
temperature = 0
humidity = 0
temp_event = Event()
sgp30.iaq_init()
#sgp30.set_iaq_humidity(abshum)
#sgp30.set_iaq_baseline(0x8973, 0x8AAE)
measurements = Queue()
def run_sgp30():
global temperature, humidity
while True:
try:
iso = time.asctime(time.gmtime())
co2 = sgp30.eCO2
tvoc = sgp30.TVOC
measurements.put({
"measurement": "sgp30",
"time": iso,
"fields": {
"eCO2" : co2,
"TVOC": tvoc,
}
})
print("[%s] eCO2: %s, TVOC: %s" % (iso, co2, tvoc))
if temp_event.is_set():
abshum = getAbsoluteHumidity(temperature, humidity)
sgp30.set_iaq_humidity(abshum)
temp_event.clear()
iso = time.asctime(time.gmtime())
print("[%s] Averag Absolute Humidity: %s" % (iso, abshum))
except Exception as e:
print(e)
time.sleep(1)
def run_avg_humidity():
global temperature, humidity
while True:
try:
temperature, humidity = getRelativeHumidity()
temp_event.set()
iso = time.asctime(time.gmtime())
print("[%s] Average Temp: %s, Humidity: %s" % (iso, temperature, humidity))
except Exception as e:
print(e)
time.sleep(10)
def run_dht11():
while True:
try:
iso = time.asctime(time.gmtime())
temperature = dhtDevice.temperature
humidity = dhtDevice.humidity
print("[%s] Temp: %s, Humidity: %s" % (iso, temperature, humidity))
measurements.put({
"measurement": "dht11",
"time": iso,
"fields": {
"temperature" : temperature,
"humidity": humidity
}
})
except Exception as e:
print(e)
time.sleep(2)
if __name__ == '__main__':
dht_thread = Thread(target=run_dht11, daemon=True)
dht_thread.start()
sgp_thread = Thread(target=run_sgp30, daemon=True)
sgp_thread.start()
avg_thread = Thread(target=run_avg_humidity, daemon=True)
avg_thread.start()
data = []
while True:
assert dht_thread.is_alive()
assert sgp_thread.is_alive()
assert avg_thread.is_alive()
try:
while True:
data.append(measurements.get(timeout=60))
if measurements.empty(): break
print(data)
client.write_points(data)
data = []
except Exception as e:
print(e)
time.sleep(10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment