Skip to content

Instantly share code, notes, and snippets.

@nivhty
Last active August 18, 2020 04:34
Show Gist options
  • Save nivhty/ce069ec37e1908f9899424602fd47d9c to your computer and use it in GitHub Desktop.
Save nivhty/ce069ec37e1908f9899424602fd47d9c to your computer and use it in GitHub Desktop.
Python to script to use the SDS011 air quality sensor and outputting to a 16x2 I2C LCD
#!/usr/bin/python -u
# coding=utf-8
# "DATASHEET": http://cl.ly/ekot
#AQI
from __future__ import print_function, division
import serial, struct, sys, time, json, subprocess, smbus
DEBUG = 0
CMD_MODE = 2
CMD_QUERY_DATA = 4
CMD_DEVICE_ID = 5
CMD_SLEEP = 6
CMD_FIRMWARE = 7
CMD_WORKING_PERIOD = 8
MODE_ACTIVE = 0
MODE_QUERY = 1
PERIOD_CONTINUOUS = 0
ser = serial.Serial()
ser.port = "/dev/ttyUSB0"
ser.baudrate = 9600
ser.open()
ser.flushInput()
byte, data = 0, ""
# Function
def dump(d, prefix=''):
print(prefix + ' '.join(x.encode('hex') for x in d))
def construct_command(cmd, data=[]):
assert len(data) <= 12
data += [0,]*(12-len(data))
checksum = (sum(data)+cmd-2)%256
ret = "\xaa\xb4" + chr(cmd)
ret += ''.join(chr(x) for x in data)
ret += "\xff\xff" + chr(checksum) + "\xab"
if DEBUG:
dump(ret, '> ')
return ret
def process_data(d):
r = struct.unpack('<HHxxBB', d[2:])
pm25 = r[0]/10.0
pm10 = r[1]/10.0
checksum = sum(ord(v) for v in d[2:8])%256
return [pm25, pm10]
def process_version(d):
r = struct.unpack('<BBBHBB', d[3:])
checksum = sum(ord(v) for v in d[2:8])%256
print("Y: {}, M: {}, D: {}, ID: {}, CRC={}".format(r[0], r[1], r[2], hex(r[3]), "OK" if (checksum==r[4] and r[5]==0xab) else "NOK"))
def read_response():
byte = 0
while byte != "\xaa":
byte = ser.read(size=1)
d = ser.read(size=9)
if DEBUG:
dump(d, '< ')
return byte + d
def cmd_set_mode(mode=MODE_QUERY):
ser.write(construct_command(CMD_MODE, [0x1, mode]))
read_response()
def cmd_query_data():
ser.write(construct_command(CMD_QUERY_DATA))
d = read_response()
values = []
if d[1] == "\xc0":
values = process_data(d)
return values
def cmd_set_sleep(sleep):
mode = 0 if sleep else 1
ser.write(construct_command(CMD_SLEEP, [0x1, mode]))
read_response()
def cmd_set_working_period(period):
ser.write(construct_command(CMD_WORKING_PERIOD, [0x1, period]))
read_response()
def cmd_firmware_ver():
ser.write(construct_command(CMD_FIRMWARE))
d = read_response()
process_version(d)
def cmd_set_id(id):
id_h = (id>>8) % 256
id_l = id % 256
ser.write(construct_command(CMD_DEVICE_ID, [0]*10+[id_l, id_h]))
read_response()
# LED FUNCTION
# LED
# Define some device parameters
I2C_ADDR = 0x27 # I2C device address
LCD_WIDTH = 16 # Maximum characters per line
# Define some device constants
LCD_CHR = 1 # Mode - Sending data
LCD_CMD = 0 # Mode - Sending command
LCD_LINE_1 = 0x80 # LCD RAM address for the 1st line
LCD_LINE_2 = 0xC0 # LCD RAM address for the 2nd line
LCD_LINE_3 = 0x94 # LCD RAM address for the 3rd line
LCD_LINE_4 = 0xD4 # LCD RAM address for the 4th line
LCD_BACKLIGHT = 0x08 # On
#LCD_BACKLIGHT = 0x00 # Off
ENABLE = 0b00000100 # Enable bit
# Timing constants
E_PULSE = 0.0005
E_DELAY = 0.0005
#Open I2C interface
#bus = smbus.SMBus(0) # Rev 1 Pi uses 0
bus = smbus.SMBus(1) # Rev 2 Pi uses 1
def lcd_init():
# Initialise display
lcd_byte(0x33,LCD_CMD) # 110011 Initialise
lcd_byte(0x32,LCD_CMD) # 110010 Initialise
lcd_byte(0x06,LCD_CMD) # 000110 Cursor move direction
lcd_byte(0x0C,LCD_CMD) # 001100 Display On,Cursor Off, Blink Off
lcd_byte(0x28,LCD_CMD) # 101000 Data length, number of lines, font size
lcd_byte(0x01,LCD_CMD) # 000001 Clear display
time.sleep(E_DELAY)
def lcd_byte(bits, mode):
# Send byte to data pins
# bits = the data
# mode = 1 for data
# 0 for command
bits_high = mode | (bits & 0xF0) | LCD_BACKLIGHT
bits_low = mode | ((bits<<4) & 0xF0) | LCD_BACKLIGHT
# High bits
bus.write_byte(I2C_ADDR, bits_high)
lcd_toggle_enable(bits_high)
# Low bits
bus.write_byte(I2C_ADDR, bits_low)
lcd_toggle_enable(bits_low)
def lcd_toggle_enable(bits):
# Toggle enable
time.sleep(E_DELAY)
bus.write_byte(I2C_ADDR, (bits | ENABLE))
time.sleep(E_PULSE)
bus.write_byte(I2C_ADDR,(bits & ~ENABLE))
time.sleep(E_DELAY)
def lcd_string(message, line):
# Send string to display
message = str(message)
message = message.ljust(LCD_WIDTH," ")
lcd_byte(line, LCD_CMD)
for i in range(LCD_WIDTH):
lcd_byte(ord(message[i]),LCD_CHR)
# Calculate function
def calcAQIpm25(pm25):
pm1 = 0
pm2 = 12
pm3 = 35.4
pm4 = 55.4
pm5 = 150.4
pm6 = 250.4
pm7 = 350.4
pm8 = 500.4
aqi1 = 0
aqi2 = 50
aqi3 = 100
aqi4 = 150
aqi5 = 200
aqi6 = 300
aqi7 = 400
aqi8 = 500
if (pm25 >= pm1 and pm25 < pm2):
aqipm25 = ((aqi2 - aqi1) / (pm2 - pm1)) * (pm25 - pm1) + aqi1
elif (pm25 >= pm2 and pm25 < pm3):
aqipm25 = ((aqi3 - aqi2) / (pm3 - pm2)) * (pm25 - pm2) + aqi2
elif (pm25 >= pm3 and pm25 < pm4):
aqipm25 = ((aqi4 - aqi3) / (pm4 - pm3)) * (pm25 - pm3) + aqi3
elif (pm25 >= pm4 and pm25 < pm5):
aqipm25 = ((aqi5 - aqi4) / (pm5 - pm4)) * (pm25 - pm4) + aqi4
elif (pm25 >= pm5 and pm25 < pm6):
aqipm25 = ((aqi6 - aqi5) / (pm6 - pm5)) * (pm25 - pm5) + aqi5
elif (pm25 >= pm6 and pm25 < pm7):
aqipm25 = ((aqi7 - aqi6) / (pm7 - pm6)) * (pm25 - pm6) + aqi6
elif (pm25 >= pm7 and pm25 <= pm8):
aqipm25 = ((aqi8 - aqi7) / (pm8 - pm7)) * (pm25 - pm7) + aqi7
return round(aqipm25)
def calcAQIpm10(pm10):
pm1 = 0
pm2 = 54
pm3 = 154
pm4 = 254
pm5 = 354
pm6 = 424
pm7 = 504
pm8 = 604
aqi1 = 0
aqi2 = 50
aqi3 = 100
aqi4 = 150
aqi5 = 200
aqi6 = 300
aqi7 = 400
aqi8 = 500
if (pm10 >= pm1 and pm10 < pm2):
aqipm10 = ((aqi2 - aqi1) / (pm2 - pm1)) * (pm10 - pm1) + aqi1
elif (pm10 >= pm2 and pm10 < pm3):
aqipm10 = ((aqi3 - aqi2) / (pm3 - pm2)) * (pm10 - pm2) + aqi2
elif (pm10 >= pm3 and pm10 < pm4):
aqipm10 = ((aqi4 - aqi3) / (pm4 - pm3)) * (pm10 - pm3) + aqi3
elif (pm10 >= pm4 and pm10 < pm5):
aqipm10 = ((aqi5 - aqi4) / (pm5 - pm4)) * (pm10 - pm4) + aqi4
elif (pm10 >= pm5 and pm10 < pm6):
aqipm10 = ((aqi6 - aqi5) / (pm6 - pm5)) * (pm10 - pm5) + aqi5
elif (pm10 >= pm6 and pm10 < pm7):
aqipm10 = ((aqi7 - aqi6) / (pm7 - pm6)) * (pm10 - pm6) + aqi6
elif (pm10 >= pm7 and pm10 <= pm8):
aqipm10 = ((aqi8 - aqi7) / (pm8 - pm7)) * (pm10 - pm7) + aqi7
return round(aqipm10)
def air_quality(index):
if index >=0 and index <=50:
level = "Green"
elif index >=51 and index <=100:
level = "Yellow"
elif index >=101 and index <=150:
level = "Orange"
elif index >=151 and index <=200:
level = "Red"
elif index >=201 and index <=300:
level = "Purple"
elif index >=301 and index <=500:
level = "Maroon"
return level
# ==========================================================
# Main
# AQI MAIN
def main():
# Main program block
lcd_init()
cmd_set_sleep(0)
cmd_firmware_ver()
cmd_set_working_period(PERIOD_CONTINUOUS)
cmd_set_mode(MODE_QUERY);
while True:
cmd_set_sleep(0)
for t in range(15):
values = cmd_query_data();
if values is not None and len(values) == 2:
# Write to lcd
pm25_per = str(values[0])
pm10_per = str(values[1])
air_pm25 = str(calcAQIpm25(values[0]))
air_pm10 = str(calcAQIpm10(values[1]))
air_q_25 = air_quality(calcAQIpm25(values[0]))
air_q_10 = air_quality(calcAQIpm10(values[1]))
lcd_string(pm25_per + "|" + air_pm25 +"|" + air_q_25 , LCD_LINE_1)
lcd_string(pm10_per + "|" + air_pm10 +"|" + air_q_10, LCD_LINE_2)
time.sleep(2)
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
pass
finally:
lcd_byte(0x01, LCD_CMD)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment