Last active
March 21, 2021 03:43
-
-
Save orelogo/60a9b24ad70240a5b0aa391b5977d841 to your computer and use it in GitHub Desktop.
Read data from Plantower PMS5003 Particulate Matter (PM) Air Quality Sensor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import serial | |
import datetime | |
from collections import namedtuple | |
from os import path | |
AirQuality = namedtuple('AirQuality', [ | |
'pm1_standard', # PM 1.0 concentration in μg/m^3, corrected to standard atmophere conditions | |
'pm25_standard', # PM 2.5 concentration in μg/m^3, corrected to standard atmophere conditions | |
'pm10_standard', # PM 10 concentration in μg/m^3, corrected to standard atmophere conditions | |
'pm1_ambient', # PM 10 concentration in μg/m^3, in the current ambient conditions | |
'pm25_ambient', # PM 10 concentration in μg/m^3, in the current ambient conditions | |
'pm10_ambient', # PM 10 concentration in μg/m^3, in the current ambient conditions | |
# number of particles with diameter >0.3 μm in 0.1 L (0.0001 m^3) of air | |
'particles_03', | |
# number of particles with diameter >0.5 μm in 0.1 L (0.0001 m^3) of air | |
'particles_05', | |
# number of particles with diameter >1.0 μm in 0.1 L (0.0001 m^3) of air | |
'particles_1', | |
# number of particles with diameter >2.5 μm in 0.1 L (0.0001 m^3) of air | |
'particles_25', | |
# number of particles with diameter >5.0 μm in 0.1 L (0.0001 m^3) of air | |
'particles_5', | |
# number of particles with diameter >10.0 μm in 0.1 L (0.0001 m^3) of air | |
'particles_10', | |
]) | |
BYTE_COUNT = 32 | |
FIRST_BYTE = 0x42 | |
SECOND_BYTE = 0x4d | |
FRAME_LENGTH = 28 | |
def verify_data(data: bytes) -> bool: | |
if not data: | |
print("No data received from sensor") | |
return False | |
frame_length = int.from_bytes(data[2:4], byteorder='big') | |
check_sum_expected = int.from_bytes(data[30:32], byteorder='big') | |
check_sum_calculated = 0 | |
for i in range(0, BYTE_COUNT - 2): | |
check_sum_calculated += data[i] | |
if (data[0] != FIRST_BYTE | |
or data[1] != SECOND_BYTE | |
or frame_length != FRAME_LENGTH | |
or check_sum_calculated != check_sum_expected): | |
print("Unexpected data received from sensor") | |
return False | |
return True | |
serial_port=serial.Serial("/dev/ttyAMA0", baudrate = 9600, timeout = 3.0) | |
abs_log_path=path.join(path.dirname(__file__), "pms5003_log.txt") | |
log=open(abs_log_path, "a+") | |
try: | |
while True: | |
data=serial_port.read(BYTE_COUNT) | |
if not verify_data(data): | |
print("Terminating connection with sensor") | |
break | |
air_quality=AirQuality( | |
pm1_standard = int.from_bytes(data[4:6], byteorder='big'), | |
pm25_standard = int.from_bytes(data[6:8], byteorder='big'), | |
pm10_standard = int.from_bytes(data[8:10], byteorder='big'), | |
pm1_ambient = int.from_bytes(data[10:12], byteorder='big'), | |
pm25_ambient = int.from_bytes(data[12:14], byteorder='big'), | |
pm10_ambient = int.from_bytes(data[14:16], byteorder='big'), | |
particles_03 = int.from_bytes(data[16:18], byteorder='big'), | |
particles_05 = int.from_bytes(data[18:20], byteorder='big'), | |
particles_1 = int.from_bytes(data[20:22], byteorder='big'), | |
particles_25 = int.from_bytes(data[22:24], byteorder='big'), | |
particles_5 = int.from_bytes(data[24:26], byteorder='big'), | |
particles_10 = int.from_bytes(data[26:28], byteorder='big'), | |
) | |
log.write(f"{str(datetime.datetime.now())} - {air_quality} \n") | |
log.flush() | |
print(f"{str(datetime.datetime.now())} - {air_quality}") | |
finally: | |
print("Closing program") | |
log.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment