-
-
Save jk/dc58ef3759b1d61f021281531858a2a9 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*- | |
"""Test using the InfluxDB client.""" | |
import time | |
import colorsys | |
import os | |
import sys | |
import socket | |
import ST7735 | |
import ltr559 | |
from bme280 import BME280 | |
from enviroplus import gas | |
from subprocess import PIPE, Popen | |
from PIL import Image | |
from PIL import ImageDraw | |
from PIL import ImageFont | |
from influxdb import InfluxDBClient | |
try: | |
from smbus2 import SMBus | |
except ImportError: | |
from smbus import SMBus | |
print(""" | |
Press Ctrl+C to exit! | |
""") | |
# BME280 temperature/pressure/humidity sensor | |
bus = SMBus(1) | |
bme280 = BME280(i2c_dev=bus) | |
# Create ST7735 LCD display class | |
st7735 = ST7735.ST7735( | |
port=0, | |
cs=1, | |
dc=9, | |
backlight=12, | |
rotation=270, | |
spi_speed_hz=10000000 | |
) | |
# Initialize display | |
st7735.begin() | |
WIDTH = st7735.width | |
HEIGHT = st7735.height | |
# Set up canvas and font | |
img = Image.new('RGB', (WIDTH, HEIGHT), color=(0, 0, 0)) | |
draw = ImageDraw.Draw(img) | |
path = os.path.dirname(os.path.realpath(__file__)) | |
font = ImageFont.truetype(path + "/fonts/Asap/Asap-Bold.ttf", 20) | |
# Set up InfluxDB | |
influx = InfluxDBClient(host="influx.domain.tld", | |
username="username", | |
password="secret", | |
database="enviroplus") | |
influx_json_prototyp = [ | |
{ | |
"measurement": "enviroplus", | |
"tags": { | |
"host": "enviroplus" | |
}, | |
"fields": { | |
} | |
} | |
] | |
# The position of the top bar | |
top_pos = 25 | |
# Displays data and text on the 0.96" LCD | |
def display_text(variable, data, unit): | |
# Maintain length of list | |
values[variable] = values[variable][1:] + [data] | |
# Scale the values for the variable between 0 and 1 | |
colours = [(v - min(values[variable]) + 1) / (max(values[variable]) | |
- min(values[variable]) + 1) for v in values[variable]] | |
# Format the variable name and value | |
message = "{}: {:.1f} {}".format(variable[:4], data, unit) | |
print(message) | |
draw.rectangle((0, 0, WIDTH, HEIGHT), (255, 255, 255)) | |
for i in range(len(colours)): | |
# Convert the values to colours from red to blue | |
colour = (1.0 - colours[i]) * 0.6 | |
r, g, b = [int(x * 255.0) for x in colorsys.hsv_to_rgb(colour, | |
1.0, 1.0)] | |
# Draw a 1-pixel wide rectangle of colour | |
draw.rectangle((i, top_pos, i+1, HEIGHT), (r, g, b)) | |
# Draw a line graph in black | |
line_y = HEIGHT - (top_pos + (colours[i] * (HEIGHT - top_pos)))\ | |
+ top_pos | |
draw.rectangle((i, line_y, i+1, line_y+1), (0, 0, 0)) | |
# Write the text at the top in black | |
draw.text((0, 0), message, font=font, fill=(0, 0, 0)) | |
st7735.display(img) | |
# Get the temperature of the CPU for compensation | |
def get_cpu_temperature(): | |
process = Popen(['vcgencmd', 'measure_temp'], stdout=PIPE) | |
output, _error = process.communicate() | |
output = output.decode() | |
return float(output[output.index('=') + 1:output.rindex("'")]) | |
# Get local IP address | |
def get_ip(): | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
try: | |
# doesn't even have to be reachable | |
s.connect(('10.255.255.255', 1)) | |
IP = s.getsockname()[0] | |
except: | |
IP = '127.0.0.1' | |
finally: | |
s.close() | |
return IP | |
# Tuning factor for compensation. Decrease this number to adjust the | |
# temperature down, and increase to adjust up | |
factor = 0.8 | |
cpu_temps = [0] * 5 | |
delay = 0.5 # Debounce the proximity tap | |
mode = 0 # The starting mode | |
last_page = 0 | |
light = 1 | |
font_size = 25 | |
text_colour = (128, 128, 128) | |
back_colour = (0, 0, 0) | |
size_x, size_y = draw.textsize(get_ip(), font) | |
# Calculate text position | |
x = (WIDTH - size_x) / 2 | |
y = (HEIGHT / 2) - (size_y / 2) | |
# Draw background rectangle and write text. | |
draw.rectangle((0, 0, 160, 80), back_colour) | |
draw.text((x, y), get_ip(), font=font, fill=text_colour) | |
st7735.display(img) | |
# Create a values dict to store the data | |
variables = ["temperature", | |
"pressure", | |
"humidity", | |
"light", | |
"oxidised", | |
"reduced", | |
"nh3"] | |
values = {} | |
for v in variables: | |
values[v] = [1] * WIDTH | |
# The main loop | |
try: | |
iterations = 0 | |
while True: | |
proximity = ltr559.get_proximity() | |
# Compensated Temperature | |
cpu_temp = get_cpu_temperature() | |
# Smooth out with some averaging to decrease jitter | |
cpu_temps = cpu_temps[1:] + [cpu_temp] | |
avg_cpu_temp = sum(cpu_temps) / float(len(cpu_temps)) | |
raw_temp = bme280.get_temperature() | |
compensated_temp = raw_temp - ((avg_cpu_temp - raw_temp) / factor) | |
# Change json | |
influx_json_prototyp[0]['fields']['ltr559.proximity'] = proximity | |
if proximity < 10: | |
influx_json_prototyp[0]['fields']['ltr559.lux'] = ltr559.get_lux() | |
else: | |
influx_json_prototyp[0]['fields']['ltr559.lux'] = 1.0 | |
if iterations >= 6: | |
influx_json_prototyp[0]['fields']['bme280.temperature.raw'] = bme280.get_temperature() | |
influx_json_prototyp[0]['fields']['bme280.temperature.compensated'] = compensated_temp | |
influx_json_prototyp[0]['fields']['cpu.temperature'] = get_cpu_temperature() | |
influx_json_prototyp[0]['fields']['bme280.pressure'] = bme280.get_pressure() | |
influx_json_prototyp[0]['fields']['bme280.humidity'] = bme280.get_humidity() | |
gas_data = gas.read_all() | |
influx_json_prototyp[0]['fields']['mics6814.oxidising'] = gas_data.oxidising | |
influx_json_prototyp[0]['fields']['mics6814.reducing'] = gas_data.reducing | |
influx_json_prototyp[0]['fields']['mics6814.nh3'] = gas_data.nh3 | |
if iterations >= 3: | |
print("Write points: {0}".format(influx_json_prototyp)) | |
influx.write_points(influx_json_prototyp, retention_policy="rp_1y") | |
else: | |
print("Skip iteration: " + str(iterations)) | |
time.sleep(10) | |
iterations += 1 | |
# Exit cleanly | |
except KeyboardInterrupt: | |
sys.exit(0) |
Hi, this is great! I have successfully implemented this with my PI.
The one thing that I noticed is that you are not importing any data from a PMS5003 sensor, is this something that you'd be willing and able to add for an upcoming release
Please let me know when you have a chance.
Thank you again for this!
@Esteidinger I don't have any plans to share it with a wider audience, nor will I share my grafana dashboards. I'm not doing anything fancy here for now.
@exchefinma Yeah, I'm able to do this since all the code is in all-in-one.py
from the examples, but since I'm not owning a PMS5003 and I'm already measuring particles with ESP8266+SDS011 I'm not motivated to support it my scripts, which I'm just writing for my own.
Thank you @jk. I've got it running and I really appreciate having it.
Hello,
I find this code very interesting. In addition to Enviro+, I also own a PMS5003 and having an integration would be fine.
Thank you
I find this code very interesting. In addition to Enviro+, I also own a PMS5003 and having an integration would be fine.
@FValent This isn't »a wish come true«. I share my code, nothing more. If you need something else implemented, you've to implement it on your own.
Great starting point !
Thanks for sharing this @jk, I've setup my influxdb and grafana but been always short in time to put together the code you've shared here. Very helpful thank you :-)
This is brilliant thank you.
If you choose to share this with a wider audience.. some comments.
I installed the influxdb python package with pip install influxdb
You have a hard coded value for retention_policy on line 204 which I had to remove to work in my setup.
I also need to figure out how to get these values into fahrenheit.
Do you have any grafana graphs you would like to share?