Skip to content

Instantly share code, notes, and snippets.

@rpavlik
Last active Aug 1, 2021
Embed
What would you like to do?
air quality on funhouse/clue
# SPDX-FileCopyrightText: 2017 Scott Shawcroft, written for Adafruit Industries
# SPDX-FileCopyrightText: Copyright (c) 2021 Melissa LeBlanc-Williams for Adafruit Industries
# SPDX-FileCopyrightText: 2021, Ryan Pavlik <ryan.pavlik@gmail.com>
# SPDX-License-Identifier: MIT
try:
from typing import Optional
except ImportError:
pass
from pmhelper import PMDATA_ROWS
from netstuff import NetStuff
import time
import board
import microcontroller
from adafruit_pm25.i2c import PM25_I2C
from adafruit_scd30 import SCD30
from adafruit_funhouse import FunHouse
from adafruit_simple_text_display import SimpleTextDisplay
from dew_point import apply_temp_compensation_to_relative_humidity, compute_dew_point
# i2c = busio.I2C(board.SCL, board.SDA, frequency=50000)
i2c = board.I2C()
funhouse = FunHouse(default_bg=None)
TEMP_ADJUST = -14
pm25 = None # type: Optional[PM25_I2C]
scd = None # type: Optional[SCD30]
try:
scd = SCD30(i2c)
have_scd = True
scd.measurement_interval = 5
except ValueError:
have_scd = False
try:
pm25 = PM25_I2C(i2c)
except ValueError:
pass
def c_to_f(temp):
return temp * 1.8 + 32
def ljust(s, n):
return s + " " * (n - len(s))
if have_scd:
# Tell it our ambient pressure on startup for best data
scd.ambient_pressure = funhouse.peripherals.pressure
max_co2 = 0 # type: int
last_co2 = "No CO2 meas"
reset_countdown = None # type: Optional[int]
netstuff = NetStuff(funhouse)
class AutoOffScreen:
def __init__(self, duration=10, initial_duration=60) -> None:
self.turn_off = None # type: Optional[int]
self.duration = duration
self.set_turn_off(time.monotonic() + initial_duration)
def set_turn_off(self, off_time):
self.on = True
if self.turn_off:
self.turn_off = max(self.turn_off, off_time)
else:
self.turn_off = off_time
def poll(self):
now = time.monotonic()
if funhouse.peripherals.pir_sensor:
# turn on/push out turn-off time
self.set_turn_off(now + self.duration)
return True
if self.on and self.turn_off is not None and now >= self.turn_off:
self.on = False
self.turn_off = None
return self.on
screen = AutoOffScreen()
data_display = SimpleTextDisplay(text_scale=2)
SCD30_CAL_VALUE = 400
def update_during_recalibrate():
while not scd.data_available:
time.sleep(0.5)
data_display[4].text = "CO2 meas: {:.1f}".format(scd.CO2)
data_display.show()
def recalibrate():
global reset_countdown
data_display[0].text = ""
data_display[1].text = ""
data_display[2].text = ""
data_display[3].text = "Will recal to {}ppm".format(SCD30_CAL_VALUE)
data_display[4].text = "Must be outside!"
data_display[5].text = "Release to continue"
data_display[6].text = ""
data_display[7].text = ""
data_display[8].text = ""
data_display.show()
while funhouse.peripherals.button_sel:
time.sleep(0.5)
scd.ambient_pressure = funhouse.peripherals.pressure
update_during_recalibrate()
data_display[5].text = "Press select again to confirm"
data_display.show()
while not funhouse.peripherals.button_sel:
update_during_recalibrate()
data_display[5].text = "Release and move away!"
data_display.show()
while funhouse.peripherals.button_sel:
update_during_recalibrate()
calibrate_at = time.monotonic() + 15
left = calibrate_at - time.monotonic()
while left > 0:
data_display[5].text = "Calibrate in {:1f}".format(left)
update_during_recalibrate()
left = calibrate_at - time.monotonic()
data_display[5].text = "Calibrating!"
data_display.show()
scd.forced_recalibration_reference = SCD30_CAL_VALUE
time.sleep(5)
data_display[5].text = "Calibrated, reset to resume normal ops"
data_display.show()
update_during_recalibrate()
reset_countdown = 2
while True:
show_display = screen.poll()
data_display[0].text = "Pressure: {:.3f}hPa".format(funhouse.peripherals.pressure)
netstuff.pressure = funhouse.peripherals.pressure
# PM25 sensor stuff
if pm25 is not None:
try:
aqdata = pm25.read()
except RuntimeError:
print("Unable to read from sensor, retrying...")
continue
for row, row_details in enumerate(PMDATA_ROWS, 3):
row_text = "{}: {}".format(
row_details.raw_label, aqdata[row_details.raw_key]
)
if row_details.pm_label:
pm_text = "{}: {}".format(
row_details.pm_label, aqdata[row_details.pm_key]
)
row_text = ljust(row_text, 10) + pm_text
data_display[row].text = row_text
netstuff.pm25 = aqdata["pm25 standard"]
# Onboard temp/humidity sensor
temp, humidity = (
funhouse.peripherals.temperature,
funhouse.peripherals.relative_humidity,
)
humidity = apply_temp_compensation_to_relative_humidity(temp, humidity, TEMP_ADJUST)
temp = temp + TEMP_ADJUST
temp_source = "AHT"
if reset_countdown is not None:
reset_countdown -= 1
if reset_countdown <= 0:
microcontroller.reset()
# CO2 sensor
if scd is not None:
try:
if scd.data_available:
if not funhouse.peripherals.captouch6:
# captouch 6 is for forcing display of onboard funhouse aht temp/humidity
temp, humidity = scd.temperature, scd.relative_humidity
temp_source = "SCD"
last_co2 = "CO2: {:.1f} PPM".format(scd.CO2)
netstuff.co2 = scd.CO2
max_co2 = max(max_co2, scd.CO2)
if funhouse.peripherals.button_sel:
data_display[2].text = "Max CO2: {:.1f} PPM".format(max_co2)
else:
data_display[2].text = last_co2
# Handle recalibration
if funhouse.peripherals.button_sel:
recalibrate()
except ValueError:
print("oops")
scd = None
data_display[2].text = "SCD crashed!"
reset_countdown = 10
# Let's display dewpoint instead of less-useful RH% and temp
dew_point = compute_dew_point(temp, humidity)
# data_display[1].text = "{:.1f}C, {:.1f}%RH".format(temp, humidity)
data_display[1].text = "Dew Pt {:.1f}F ({})".format(c_to_f(dew_point), temp_source)
netstuff.poll()
# Handle display power and sleep
if show_display:
funhouse.display.brightness = 0.5
data_display.show()
else:
funhouse.display.brightness = 0
for _ in range(50):
time.sleep(0.1)
show_display = screen.poll()
if show_display:
funhouse.display.brightness = 0.5
data_display.show()
# Copyright 2021, Ryan Pavlik <ryan.pavlik@gmail.com>
# SPDX-License-Identifier: Unlicense
import time
import board
from adafruit_clue import clue
from adafruit_pm25.i2c import PM25_I2C
from adafruit_scd30 import SCD30
from pmhelper import PMDATA_ROWS
i2c = board.I2C()
pm25 = PM25_I2C(i2c)
try:
scd = SCD30(i2c)
have_scd = True
scd.measurement_interval = 5
except ValueError:
have_scd = False
def c_to_f(temp):
return temp * 1.8 + 32
clue_data = clue.simple_text_display(text_scale=2)
def ljust(s, n):
return s + " " * (n-len(s))
if have_scd:
# Tell it our ambient pressure on startup for best data
scd.ambient_pressure = clue.pressure
max_co2 = 0
last_co2 = "No CO2 meas"
class AutoOffScreen:
def __init__(self, duration=10, initial_duration=60) -> None:
self.turn_off = None
self.duration = duration
self.set_turn_off(time.monotonic() + initial_duration)
def set_turn_off(self, off_time):
self.on = True
if self.turn_off:
self.turn_off = max(self.turn_off, off_time)
else:
self.turn_off = off_time
def poll(self):
now = time.monotonic()
if clue.proximity > 3:
self.set_turn_off(now + self.duration)
return True
if self.on and now >= self.turn_off:
self.on = False
self.turn_off = None
return self.on
screen = AutoOffScreen()
while True:
# print(clue.proximity)
show_display = screen.poll()
clue_data[0].text = "Pressure: {:.3f}hPa".format(clue.pressure)
try:
aqdata = pm25.read()
except RuntimeError:
print("Unable to read from sensor, retrying...")
continue
for i, row_details in enumerate(PMDATA_ROWS):
row_text = "{}: {}".format(row_details.raw_label, aqdata[row_details.raw_key])
if row_details.pm_label:
pm_text = "{}: {}".format(row_details.pm_label, aqdata[row_details.pm_key])
row_text = ljust(row_text, 10) + pm_text
row = 3 + i
clue_data[row].text = row_text
if have_scd and scd.data_available:
clue_data[1].text = "{:.1f}F, {:.1f}%RH".format(c_to_f(scd.temperature), scd.relative_humidity)
last_co2 = "CO2: {:.1f} PPM".format(scd.CO2)
max_co2 = max(max_co2, scd.CO2)
if clue.button_a:
clue_data[2].text = "Max CO2: {:.1f} PPM".format(max_co2)
else:
clue_data[2].text = last_co2
if show_display:
clue.display.brightness = 0.5
clue_data.show()
else:
clue.display.brightness = 0
for _ in range(50):
time.sleep(0.1)
show_display = screen.poll()
if show_display:
clue.display.brightness = 0.5
clue_data.show()
# SPDX-FileCopyrightText: 2021 Ryan Pavlik <ryan.pavlik@gmail.com>
#
# SPDX-License-Identifier: MIT
import math
def compute_dew_point(temperature, relative_humidity):
T = temperature
RH = relative_humidity
return (
243.04
* (math.log(RH / 100.0) + ((17.625 * T) / (243.04 + T)))
/ (17.625 - math.log(RH / 100.0) - ((17.625 * T) / (243.04 + T)))
)
def compute_relative_humidity(temperature, dew_point):
TD = dew_point
T = temperature
return 100 * (
math.exp((17.625 * TD) / (243.04 + TD)) / math.exp((17.625 * T) / (243.04 + T))
)
def apply_temp_compensation_to_relative_humidity(
temperature, relative_humidity, temp_adjust
):
dew_point = compute_dew_point(temperature, relative_humidity)
return compute_relative_humidity(temperature + temp_adjust, dew_point)
# SPDX-FileCopyrightText: 2017 Scott Shawcroft, written for Adafruit Industries
# SPDX-FileCopyrightText: Copyright (c) 2021 Melissa LeBlanc-Williams for Adafruit Industries
# SPDX-FileCopyrightText: 2021 Ryan Pavlik <ryan.pavlik@gmail.com>
#
# SPDX-License-Identifier: MIT
#
import time
# pylint: disable=unused-argument
def connected(client):
print("Connected to Adafruit IO! Subscribing...")
client.subscribe("buzzer")
client.subscribe("neopixels")
def subscribe(client, userdata, topic, granted_qos):
print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))
def disconnected(client):
print("Disconnected from Adafruit IO!")
class NetStuff:
def __init__(self, funhouse):
self.funhouse = funhouse
self.last_pir = None
# Initialize a new MQTT Client object
self.funhouse.network.init_io_mqtt()
self.funhouse.network.on_mqtt_connect = connected
self.funhouse.network.on_mqtt_disconnect = disconnected
self.funhouse.network.on_mqtt_subscribe = subscribe
self.funhouse.network.on_mqtt_message = self.message
print("Connecting to Adafruit IO...")
self.funhouse.network.mqtt_connect()
self.sensorwrite_timestamp = time.monotonic()
self.temp = None
self.humidity = None
self.pressure = None
self.co2 = None
def message(self, client, feed_id, payload):
print("Feed {0} received new value: {1}".format(feed_id, payload))
if feed_id == "buzzer":
if int(payload) == 1:
self.funhouse.peripherals.play_tone(2000, 0.25)
if feed_id == "neopixels":
print(payload)
color = int(payload[1:], 16)
self.funhouse.peripherals.dotstars.fill(color)
def poll(self):
self.funhouse.network.mqtt_loop()
# every 10 seconds, write temp/hum/press
now = time.monotonic()
if (now - self.sensorwrite_timestamp) > 10:
self.funhouse.peripherals.led = True
print("Sending data to adafruit IO!")
if self.temp:
self.funhouse.network.mqtt_publish("temperature", self.temp)
if self.humidity:
self.funhouse.network.mqtt_publish("humidity", int(self.humidity))
if self.pressure:
self.funhouse.network.mqtt_publish("pressure", self.pressure)
if self.co2:
self.funhouse.network.mqtt_publish("co2", int(self.co2))
self.sensorwrite_timestamp = now
# Send PIR only if changed!
if (
self.last_pir is None
or self.last_pir != self.funhouse.peripherals.pir_sensor
):
last_pir = self.funhouse.peripherals.pir_sensor
self.funhouse.network.mqtt_publish("pir", "%d" % last_pir)
self.funhouse.peripherals.led = False
# Copyright 2021, Ryan Pavlik <ryan.pavlik@gmail.com>
# SPDX-License-Identifier: Unlicense
from collections import namedtuple
_PMData = namedtuple("PMData", ["raw_label", "raw_key", "pm_label", "pm_key"])
_avail_pm_ratings = set(("PM1.0", "PM2.5", "PM10.0"))
PMDATA_ROWS = []
for size in ["0.3", "0.5", "1.0", "2.5", "5.0", "10.0"]:
# We have raw data for all sizes
raw_label = "{}um".format(size)
raw_key = "particles {}".format(raw_label.replace(".", ""))
# We don't have a "PM" value for all sizes
pm_label = "PM{}".format(size)
pm_key = None
if pm_label in _avail_pm_ratings:
pm_key = "{} standard".format(pm_label.replace(".", "").lower())
else:
# Nothing for this size, so wipe out the label
pm_label = None
PMDATA_ROWS.append(_PMData(raw_label, raw_key, pm_label, pm_key))
# SPDX-FileCopyrightText: 2020 Kattni Rembor for Adafruit Industries
# SPDX-FileCopyrightText: 2021 Ryan Pavlik <ryan.pavlik@gmail.com>
#
# SPDX-License-Identifier: MIT
import board
class SimpleTextDisplay:
"""Easily display lines of text on a displayio display."""
# Color variables available for import.
RED = (255, 0, 0)
YELLOW = (255, 255, 0)
ORANGE = (255, 150, 0)
GREEN = (0, 255, 0)
TEAL = (0, 255, 120)
CYAN = (0, 255, 255)
BLUE = (0, 0, 255)
PURPLE = (180, 0, 255)
MAGENTA = (255, 0, 150)
WHITE = (255, 255, 255)
BLACK = (0, 0, 0)
GOLD = (255, 222, 30)
PINK = (242, 90, 255)
AQUA = (50, 255, 255)
JADE = (0, 255, 40)
AMBER = (255, 100, 0)
VIOLET = (255, 0, 255)
SKY = (0, 180, 255)
RAINBOW = (RED, ORANGE, YELLOW, GREEN, BLUE, PURPLE)
def __init__( # pylint: disable=too-many-arguments
self,
title=None,
title_color=0xFFFFFF,
title_scale=1,
text_scale=1,
font=None,
colors=None,
display=None,
):
"""Display lines of text on a displayio display. Lines of text are created in order as shown
in the example below. If you skip a number, the line will be shown blank on the display,
e.g. if you include ``[0]`` and ``[2]``, the second line on the display will be empty, and
the text specified for lines 0 and 2 will be displayed on the first and third line.
Remember, Python begins counting at 0, so the first line on the display is 0 in the code.
Setup occurs before the loop. For data to be dynamically updated on the display, you must
include the data call in the loop by using ``.text =``. For example, if setup is saved as
``clue_data = simple_text_display()`` then ``clue_data[0].text = clue.proximity`` must be
inside the ``while True:`` loop for the proximity data displayed to update as the
values change. You must call ``show()`` at the end of the list for anything to display.
See example below for usage.
:param str title: The title displayed above the data. Set ``title="Title text"`` to provide
a title. Defaults to None.
:param title_color: The color of the title. Not necessary if no title is provided. Defaults
to white (255, 255, 255).
:param int title_scale: Scale the size of the title. Not necessary if no title is provided.
Defaults to 1.
:param int text_scale: Scale the size of the data lines. Scales the title as well.
Defaults to 1.
:param str font: The font to use to display the title and data. Defaults to built in
``terminalio.FONT``.
:param colors: A list of colors for the lines of data on the display. If you provide a
single color, all lines will be that color. Otherwise it will cycle through
the list you provide if the list is less than the number of lines displayed.
Default colors are used if ``colors`` is not set. For example, if creating
two lines of data, ``colors=((255, 255, 255), (255, 0, 0))`` would set the
first line white and the second line red, and if you created four lines of
data with the same setup, it would alternate white and red.
:param display: The display to use. If not specified, will try to use the built-in one via
``board``
.. image :: ../docs/_static/display_funhouse_data.jpg
:alt: Display Data demo
This example displays three lines with acceleration, gyro and magnetic data on the display.
Remember to call ``show()`` after the list to update the display.
.. code-block:: python
from adafruit_funhouse import FunHouse
from simple_text_display import SimpleTextDisplay
funhouse = FunHouse()
sensor_data = SimpleTextDisplay(title="Sensor Data!", title_scale=2)
while True:
sensor_data[0].text = "Temperature: {:.2f} degrees C".format(
funhouse.peripherals.temperature
)
sensor_data[1].text = "Humidity: {:.2f}% RH".format(
funhouse.peripherals.relative_humidity
)
sensor_data[2].text = "Pressure: {:.2f} hPa".format(funhouse.peripherals.pressure)
sensor_data.show()
"""
# pylint: disable=import-outside-toplevel
import displayio
import terminalio
from adafruit_display_text import label
# pylint: enable=import-outside-toplevel
if not colors:
colors = (
SimpleTextDisplay.VIOLET,
SimpleTextDisplay.GREEN,
SimpleTextDisplay.RED,
SimpleTextDisplay.CYAN,
SimpleTextDisplay.ORANGE,
SimpleTextDisplay.BLUE,
SimpleTextDisplay.MAGENTA,
SimpleTextDisplay.SKY,
SimpleTextDisplay.YELLOW,
SimpleTextDisplay.PURPLE,
)
self._colors = colors
self._label = label
if display is None:
display = board.DISPLAY
self._display = display
self._font = terminalio.FONT
if font:
self._font = font
self.text_group = displayio.Group(max_size=20, scale=text_scale)
if title:
# Fail gracefully if title is longer than 60 characters.
if len(title) > 60:
raise ValueError("Title must be 60 characters or less.")
title = label.Label(
self._font,
text=title,
max_glyphs=60,
color=title_color,
scale=title_scale,
)
title.x = 0
title.y = 8
self._y = title.y + 18
self.text_group.append(title)
else:
self._y = 3
self._lines = []
for num in range(1):
self._lines.append(self.add_text_line(color=colors[num % len(colors)]))
def __getitem__(self, item):
"""Fetch the Nth text line Group"""
if len(self._lines) - 1 < item:
for _ in range(item - (len(self._lines) - 1)):
self._lines.append(
self.add_text_line(color=self._colors[item % len(self._colors)])
)
return self._lines[item]
def add_text_line(self, color=0xFFFFFF):
"""Adds a line on the display of the specified color and returns the label object."""
text_label = self._label.Label(self._font, text="", max_glyphs=45, color=color)
text_label.x = 0
text_label.y = self._y
self._y = text_label.y + 13
self.text_group.append(text_label)
return text_label
def show(self):
"""Call show() to display the data list."""
self._display.show(self.text_group)
def show_terminal(self):
"""Revert to terminalio screen."""
self._display.show(None)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment