Skip to content

Instantly share code, notes, and snippets.

@mbacvanski
Last active December 6, 2020 23:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mbacvanski/8d255433fe25adb431e310a687719ddb to your computer and use it in GitHub Desktop.
Save mbacvanski/8d255433fe25adb431e310a687719ddb to your computer and use it in GitHub Desktop.
Plantower PM2.5 particulate matter sensor script that calculates AQI
import time
import board
import busio
import datetime
from pytz import timezone, utc
import adafruit_pm25
reset_pin = None
# If you have a GPIO, its not a bad idea to connect it to the RESET pin
# This may solve some sensor drift issues over long periods of time (unverified)
# reset_pin = DigitalInOut(board.G0)
# reset_pin.direction = Direction.OUTPUT
# reset_pin.value = False
# For use with a computer running Windows:
# import serial
# uart = serial.Serial("COM30", baudrate=9600, timeout=1)
# For use with microcontroller board:
# (Connect the sensor TX pin to the board/computer RX pin)
# uart = busio.UART(board.TX, board.RX, baudrate=9600)
# For use with Raspberry Pi/Linux:
import serial
uart = serial.Serial("/dev/ttyS0", baudrate=9600, timeout=0.25)
# For use with USB-to-serial cable:
# import serial
# uart = serial.Serial("/dev/ttyUSB0", baudrate=9600, timeout=0.25)
# Connect to a PM2.5 sensor over UART
# pm25 = adafruit_pm25.PM25_UART(uart, reset_pin)
# Create library object, use 'slow' 100KHz frequency!
# i2c = busio.I2C(board.SCL, board.SDA, frequency=100000)
# Connect to a PM2.5 sensor over UART
pm25 = adafruit_pm25.PM25_UART(uart)
liz = [(0, 12, 0, 50),
(12.1, 35.4, 51, 100),
(35.5, 55.4, 101, 150),
(55.5, 150.4, 151, 200),
(150.5, 250.4, 201, 300),
(250.5, 350.4, 301, 400),
(350.5, 500.4, 401, 500)]
def calcAQI(pm25):
for r in liz:
(rlo, rhi, lo, hi) = r
if pm25 >= rlo and pm25 <= rhi:
return float(hi - lo) / (rhi - rlo) * (pm25 - rlo) + lo
return float('nan')
letters = { "a":[ "####", "# #", "####", "# #", "# #"], "b":[ "###", "# #", "###", "# #", "###"], "c":[ "###", "#", "#", "#", "###"], "d":[ "##", "# #", "# #", "# #", "##"], "e":[ "###", "#", "###", "#", "###"], "f":[ "###", "#", "###", "#", "#"], "g":[ "###", "# #", "###", " #", "###"], "h":[ "# #", "# #", "###", "# #", "# #"], "i":[ "###", " #", " #", " #", "###"], "j":[ "###", " #", " #", " #", "##"], "k":[ "# #", "##", "#", "##", "# #"], "l":[ "#", "#", "#", "#", "###"], "m":[ "# #", "###", "###", "# #", "# #"], "n":[ "###", "# #", "# #", "# #", "# #"], "o":[ "###", "# #", "# #", "# #", "###"], "p":[ "###", "# #", "###", "#", "#"], "q":[ "###", "# #", "# #", "# #", "####"], "r":[ "###", "# #", "##", "# #", "# #"], "s":[ "###", "#", "###", " #", "###"], "t":[ "###", " #", " #", " #", " #"], "u":[ "# #", "# #", "# #", "# #", "###"], "v":[ "# #", "# #", "# #", "# #", " #"], "w":[ "# #", "# #", "# #", "###", "###"], "x":[ "# #", " #", " #", " #", "# #"], "y":[ "# #", "# #", "###", " #", " #"], "z":[ "###", " #", " #", "#", "###"], " ":[ " "], "1":[ " #", "##", " #", " #", "###"], "2":[ "###", " #", "###", "#", "###"], "3":[ "###", " #", "###", " #", "###"], "4":[ "# #", "# #", "####", " #", " #"], "5":[ "####", "#", "####", " #", "####"], "6":[ "###", "#", "###", "# #", "###"], "7":[ "####", " # ", " #", " #", " #"], "8":[ "####", "# #", "####", "# #", "####"], "9":[ "###", "# #", "###", " #", "###"], "0":[ "####", "# #", "# #", "# #", "####"], "!":[ " # ", " # ", " # ", " ", " # "], "?":[ "###", " #", " ##", " ", " # "], ".":[ " ", " ", " ", " ", " # "], "]":[ " ", " ", " ", " #", " # "], "/":[ " #", " #", " # ", "# ", "# "], ":":[ " ", " # ", " ", " # ", " "], "@":[ "###", "# #", "## ", "# ", "###"], "'":[ " # ", " # ", " ", " ", " "], "#":[ " # ", "###", " # ", "###", " # "] }
def print_letters(text):
bigletters = []
for i in text:
bigletters.append(letters.get(i.lower(),letters[' ']))
output = ['']*5
for i in range(5):
for j in bigletters:
temp = ' '
try:
temp = j[i]
except:
pass
temp += ' '*(5-len(temp))
temp = temp.replace(' ',' ')
temp = temp.replace('#','█')
output[i] += temp
return '\n'.join(output)
print("Found PM2.5 sensor, reading data...")
while True:
try:
aqdata = pm25.read()
except RuntimeError as err:
print(err, "retrying...")
continue
print("---------------------------------------")
date_format = '%Y-%m-%d at %H:%M:%S PST'
print(datetime.datetime.now(tz=utc).astimezone(timezone('US/Pacific')).strftime(date_format))
print(print_letters("AQI:%.1f" % calcAQI(aqdata["pm25 standard"])))
#print("Concentration Units (standard)")
print("PM 1.0: %d\tPM2.5: %d\tPM10: %d" %(aqdata["pm10 standard"], aqdata["pm25 standard"], aqdata["pm100 standard"]))
# Concentration units (environmental): aqdata["pm10 env"], aqdata["pm25 env"], aqdata["pm100 env"]
print("Particles > 0.3um / 0.1L air:", aqdata["particles 03um"])
print("Particles > 0.5um / 0.1L air:", aqdata["particles 05um"])
print("Particles > 1.0um / 0.1L air:", aqdata["particles 10um"])
print("Particles > 2.5um / 0.1L air:", aqdata["particles 25um"])
print("Particles > 5.0um / 0.1L air:", aqdata["particles 50um"])
print("Particles > 10 um / 0.1L air:", aqdata["particles 100um"])
print("---------------------------------------")
time.sleep(3)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment