Skip to content

Instantly share code, notes, and snippets.

@binh-bk
Last active January 30, 2019 11:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save binh-bk/1ec38f082e31a28949d6e0c4b07ecc14 to your computer and use it in GitHub Desktop.
Save binh-bk/1ec38f082e31a28949d6e0c4b07ecc14 to your computer and use it in GitHub Desktop.
read and log data from dust sensor PM2.5 Plantower PMS7003
#! /usr/bin/python2
"""
Binh Nguyen, Deccember 01, 2018
forked from Mark Benson's gist
Read a Plantower PMS7003 serial sensor data.
English datasheet: https://www.pdf-archive.com/2017/04/12/plantower-pms-7003-sensor-data-sheet/plantower-pms-7003-sensor-data-sheet.pdf
The sensor payload is 32 bytes:
"""
import serial
import time
import os
physicalPort = '/dev/ttyUSB0'
serialPort = serial.Serial(physicalPort) # open serial port
logFile = 'PMS7003_header.csv'
debug = False
keep_going = True
lastTime = 0
intervalSampling = 59 # seconds
headers = 'time,pm1.0_ac,pm2.5_ac,pm10_ac,pm1.0_cf,pm2.5_cf,pm10_cf\n'
with open(logFile, 'a+') as f:
head_ = f.readline().lower()
if not head_.startswith("data captured"):
f.write('Data captured Python Script directly from PMS7003 >> USB >>PC\n')
f.write(headers)
print('add header')
else:
time_ = time.strftime('%x %X', time.localtime())
sprtor = '{},None,None,None,None,None,None\n'.format(time_)
f.write(sprtor)
def log_data(logFile, msg):
with open(logFile, 'a') as f:
timestamp = time.strftime('%x %X', time.localtime())
try:
print('Type of MSG {}'.format(type(msg)))
assert type(msg) == str
msg = ','.join((timestamp, msg))
f.write(msg)
print('Log data successfully')
return None
except Exception as e:
print('Error! {}'.format(e))
return -1
def read_pms7003(debug=False):
# important !!! reset buffer for interval reading to clear up old data
serialPort.reset_input_buffer()
time.sleep(0.7) # wait for new data to populate
if serialPort.in_waiting >= 32:
# Check that we are reading the payload from the correct place (i.e. the start bits)
while not (ord(serialPort.read()) == 0x42 and ord(serialPort.read()) == 0x4d):
time.sleep(0.7)
print('waiting for the head')
# Read the remaining payload data
data = serialPort.read(30)
# Extract the byte data by summing the bit shifted high byte with the low byte
# Use ordinals in python to get the byte value rather than the char value
frameLength = ord(data[1]) + (ord(data[0])<<8)
# Standard particulate values in ug/m3
concPM1_0_CF1 = ord(data[3]) + (ord(data[2])<<8)
concPM2_5_CF1 = ord(data[5]) + (ord(data[4])<<8)
concPM10_0_CF1 = ord(data[7]) + (ord(data[6])<<8)
# Atmospheric particulate values in ug/m3
concPM1_0_ATM = ord(data[9]) + (ord(data[8])<<8)
concPM2_5_ATM = ord(data[11]) + (ord(data[10])<<8)
concPM10_0_ATM = ord(data[13]) + (ord(data[12])<<8)
# Raw counts per 0.1l
rawGt0_3um = ord(data[15]) + (ord(data[14])<<8)
rawGt0_5um = ord(data[17]) + (ord(data[16])<<8)
rawGt1_0um = ord(data[19]) + (ord(data[18])<<8)
rawGt2_5um = ord(data[21]) + (ord(data[20])<<8)
rawGt5_0um = ord(data[23]) + (ord(data[22])<<8)
rawGt10_0um = ord(data[25]) + (ord(data[24])<<8)
# Misc data
version = ord(data[26])
errorCode = ord(data[27])
payloadChecksum = ord(data[29]) + (ord(data[28])<<8)
# Calculate the payload checksum (not including the payload checksum bytes)
inputChecksum = 0x42 + 0x4d
for x in range(0,27):
inputChecksum = inputChecksum + ord(data[x])
# os.system('clear') # Set to 'cls' on Windows
# print("PMS7003 Sensor Data:")
# print("PM1.0 = " + str(concPM1_0_CF1) + " ug/m3")
# print("PM2.5 = " + str(concPM2_5_CF1) + " ug/m3")
# print("PM10 = " + str(concPM10_0_CF1) + " ug/m3")
# print("PM1 Atmospheric concentration = " + str(concPM1_0_ATM) + " ug/m3")
print("PM2.5 AC= " + str(concPM2_5_ATM) + " ug/m3")
# print("PM10 Atmospheric concentration = " + str(concPM10_0_ATM) + " ug/m3")
# print("Count: 0.3um = " + str(rawGt0_3um) + " per 0.1l")
# print("Count: 0.5um = " + str(rawGt0_5um) + " per 0.1l")
# print("Count: 1.0um = " + str(rawGt1_0um) + " per 0.1l")
# print("Count: 2.5um = " + str(rawGt2_5um) + " per 0.1l")
# print("Count: 5.0um = " + str(rawGt5_0um) + " per 0.1l")
# print("Count: 10um = " + str(rawGt10_0um) + " per 0.1l")
# print("Version = " + str(version))
# print("Error Code = " + str(errorCode))
# print("Frame length = " + str(frameLength))
if inputChecksum != payloadChecksum:
print("Warning! Checksums don't match!")
print("Calculated Checksum = " + str(inputChecksum))
print("Payload checksum = " + str(payloadChecksum))
log = ','.join([str(concPM1_0_ATM), str(concPM2_5_ATM),str(concPM10_0_CF1),\
str(concPM1_0_CF1), str(concPM2_5_CF1), str(concPM10_0_CF1)])+'\n'
print('Returned log {}'.format(log))
return str(log)
while keep_going:
if time.time() - lastTime > intervalSampling:
msg = read_pms7003()
status = log_data(logFile, msg)
print('Status {}'.format(status))
if status == None:
lastTime = time.time()
else:
time.sleep(0.1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment