Skip to content

Instantly share code, notes, and snippets.

@extrasleepy
Created March 22, 2022 20:31
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save extrasleepy/ef5221bf3bdf74fb20f78c39ed50d030 to your computer and use it in GitHub Desktop.
Save extrasleepy/ef5221bf3bdf74fb20f78c39ed50d030 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import requests
import RPi.GPIO as GPIO
import time
import colorsys
import sys
import ST7735
try:
# Transitional fix for breaking change in LTR559
from ltr559 import LTR559
ltr559 = LTR559()
except ImportError:
import ltr559
from bme280 import BME280
from pms5003 import PMS5003, ReadTimeoutError as pmsReadTimeoutError
from enviroplus import gas
from PIL import Image
from PIL import ImageDraw
from PIL import ImageFont
from fonts.ttf import RobotoMedium as UserFont
import logging
logging.basicConfig(
format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
logging.info("""all-in-one.py - Displays readings from all of Enviro plus' sensors
Press Ctrl+C to exit!
""")
# BME280 temperature/pressure/humidity sensor
bme280 = BME280()
# PMS5003 particulate sensor
pms5003 = PMS5003()
# Create ST7735 LCD display class
st7735 = ST7735.ST7735(
port=0,
cs=1,
dc=9,
backlight=12,
rotation=270,
spi_speed_hz=10000000
)
# Initialize display
st7735.begin()
WIDTH = st7735.width
HEIGHT = st7735.height
# Set up canvas and font
img = Image.new('RGB', (WIDTH, HEIGHT), color=(0, 0, 0))
draw = ImageDraw.Draw(img)
font_size = 20
font = ImageFont.truetype(UserFont, font_size)
message = ""
# The position of the top bar
top_pos = 25
# Displays data and text on the 0.96" LCD
def display_text(variable, data, unit):
# Maintain length of list
values[variable] = values[variable][1:] + [data]
# Scale the values for the variable between 0 and 1
vmin = min(values[variable])
vmax = max(values[variable])
colours = [(v - vmin + 1) / (vmax - vmin + 1) for v in values[variable]]
# Format the variable name and value
message = "{}: {:.1f} {}".format(variable[:4], data, unit)
logging.info(message)
draw.rectangle((0, 0, WIDTH, HEIGHT), (255, 255, 255))
for i in range(len(colours)):
# Convert the values to colours from red to blue
colour = (1.0 - colours[i]) * 0.6
r, g, b = [int(x * 255.0) for x in colorsys.hsv_to_rgb(colour, 1.0, 1.0)]
# Draw a 1-pixel wide rectangle of colour
draw.rectangle((i, top_pos, i + 1, HEIGHT), (r, g, b))
# Draw a line graph in black
line_y = HEIGHT - (top_pos + (colours[i] * (HEIGHT - top_pos))) + top_pos
draw.rectangle((i, line_y, i + 1, line_y + 1), (0, 0, 0))
# Write the text at the top in black
draw.text((0, 0), message, font=font, fill=(0, 0, 0))
st7735.display(img)
# Get the temperature of the CPU for compensation
def get_cpu_temperature():
with open("/sys/class/thermal/thermal_zone0/temp", "r") as f:
temp = f.read()
temp = int(temp) / 1000.0
return temp
# Tuning factor for compensation. Decrease this number to adjust the
# temperature down, and increase to adjust up
factor = 2.25
cpu_temps = [get_cpu_temperature()] * 5
delay = 0.5 # Debounce the proximity tap
mode = 0 # The starting mode
last_page = 0
light = 1
# Create a values dict to store the data
variables = ["temperature",
"pressure",
"humidity",
"light",
"oxidised",
"reduced",
"nh3",
"pm1",
"pm25",
"pm10"]
values = {}
for v in variables:
values[v] = [1] * WIDTH
#----ubidots needs------------
GPIO.setmode(GPIO.BCM)
TOKEN = "YOUR TOKEN" # Put your TOKEN here
DEVICE_LABEL = "pi" # Put your device label here
VARIABLE_LABEL_1 = "light" # Put your first variable label here
VARIABLE_LABEL_2 = "temperature" # Put your second variable label here
light=0
temperature=0
def build_payload(variable_1, variable_2):
value_1=data #these are where your values need to live!!!
value_2=0 #these are where your values need to live!!!
payload = {variable_1: value_1,
variable_2: value_2}
print("light="+ str(value_1) + " temperature=" + str(value_2))
return payload
def post_request(payload):
# Creates the headers for the HTTP requests
url = "http://industrial.api.ubidots.com"
url = "{}/api/v1.6/devices/{}".format(url, DEVICE_LABEL)
headers = {"X-Auth-Token": TOKEN, "Content-Type": "application/json"}
# Makes the HTTP requests
status = 400
attempts = 0
while status >= 400 and attempts <= 5:
req = requests.post(url=url, headers=headers, json=payload)
status = req.status_code
attempts += 1
time.sleep(1)
# Processes results
print(req.status_code, req.json())
if status >= 400:
print("[ERROR] Could not send data after 5 attempts, please check \
your token credentials and internet connection")
return False
print("[INFO] request made properly, your device is updated")
return True
def main():
payload = build_payload(
VARIABLE_LABEL_1, VARIABLE_LABEL_2)
print("[INFO] Attemping to send data")
post_request(payload)
print("[INFO] finished")
#---------end of ubidots------------
# The main loop
try:
while True:
proximity = ltr559.get_proximity()
# If the proximity crosses the threshold, toggle the mode
if proximity > 1500 and time.time() - last_page > delay:
mode += 1
mode %= len(variables)
last_page = time.time()
# One mode for each variable
if mode == 0:
# variable = "temperature"
unit = "C"
cpu_temp = get_cpu_temperature()
# Smooth out with some averaging to decrease jitter
cpu_temps = cpu_temps[1:] + [cpu_temp]
avg_cpu_temp = sum(cpu_temps) / float(len(cpu_temps))
raw_temp = bme280.get_temperature()
data = raw_temp - ((avg_cpu_temp - raw_temp) / factor)
display_text(variables[mode], data, unit)
if mode == 1:
# variable = "pressure"
unit = "hPa"
data = bme280.get_pressure()
display_text(variables[mode], data, unit)
if mode == 2:
# variable = "humidity"
unit = "%"
data = bme280.get_humidity()
display_text(variables[mode], data, unit)
if mode == 3:
# variable = "light"
unit = "Lux"
if proximity < 10:
data = ltr559.get_lux()
else:
data = 1
display_text(variables[mode], data, unit)
if mode == 4:
# variable = "oxidised"
unit = "kO"
data = gas.read_all()
data = data.oxidising / 1000
display_text(variables[mode], data, unit)
if mode == 5:
# variable = "reduced"
unit = "kO"
data = gas.read_all()
data = data.reducing / 1000
display_text(variables[mode], data, unit)
if mode == 6:
# variable = "nh3"
unit = "kO"
data = gas.read_all()
data = data.nh3 / 1000
display_text(variables[mode], data, unit)
if mode == 7:
# variable = "pm1"
unit = "ug/m3"
try:
data = pms5003.read()
except pmsReadTimeoutError:
logging.warning("Failed to read PMS5003")
else:
data = float(data.pm_ug_per_m3(1.0))
display_text(variables[mode], data, unit)
if mode == 8:
# variable = "pm25"
unit = "ug/m3"
try:
data = pms5003.read()
except pmsReadTimeoutError:
logging.warning("Failed to read PMS5003")
else:
data = float(data.pm_ug_per_m3(2.5))
display_text(variables[mode], data, unit)
if mode == 9:
# variable = "pm10"
unit = "ug/m3"
try:
data = pms5003.read()
except pmsReadTimeoutError:
logging.warning("Failed to read PMS5003")
else:
data = float(data.pm_ug_per_m3(10))
display_text(variables[mode], data, unit)
main() #run ubidots function
time.sleep(30) #repeat every 30 seconds
# Exit cleanly
except KeyboardInterrupt:
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment