Skip to content

Instantly share code, notes, and snippets.

@steven-geo
Created October 20, 2023 09:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save steven-geo/6ff2827bdf228d1f8375acff21bb5c9d to your computer and use it in GitHub Desktop.
Save steven-geo/6ff2827bdf228d1f8375acff21bb5c9d to your computer and use it in GitHub Desktop.
# MicroPython code for the Raspberry Pi Pico to read the Ikea Vindriktning
# PM1006 Air Quality Sensor
# http://www.jdscompany.co.kr/download.asp?gubun=07&filename=PM1006_LED_PARTICLE_SENSOR_MODULE_SPECIFICATIONS.pdf
#
# Signal PM1006 Pico
# +5V +5V VSYS
# GND GND GND
# TX REST GP1 (UART0 RX)
# NOTE: Please add a favicon.ico to the root of the raspberry Pi if you want an Icon shown for the web interface
import _thread
import network
import socket
import time
from machine import Pin
from machine import UART
threadlocker = _thread.allocate_lock()
vindriktning_readings = "none available"
# vindriktning - data bit: 8, Stop bit: 1, Check bit: none, Baud rate: 9600bps
uart0 = UART(0, baudrate=9600, tx=Pin(0), rx=Pin(1))
# WIFI Setup
ssid = "mySSID"
password = "mySecurePa33w0rd"
def vindriktning_validpacket(data):
validpacket = True
if not data: # Packet is none - just exit
validpacket = False
elif len(data) != 20: # Packet must be 20 bytes long
print(f"WARNING: Invalid PM1006 Packet Length of {len(data)}")
validpacket = False
elif not vindriktning_checksum(data):
print(f"WARNING: PM1006 Checksum Invalid")
validpacket = False
elif not vindriktning_headerok(data):
print(f"WARNING: PM1006 Header Does not match")
validpacket = False
return validpacket
def vindriktning_headerok(d):
# First 3 Bytes must be 0x16,0x11,0x0b
headerValid = (d[0] == 0x16 and d[1] == 0x11 and d[2] == 0x0B)
return headerValid
def vindriktning_checksum(data):
# Last byte is the checksum of the previous 19 bytes
cs = 0
for x in range (len(data) - 1):
cs += data[x]
if cs > 256:
cs = cs - 256
cs_ok = ((256 - cs) == data[19])
# if not cs_ok:
# print(f"Calculated Checksum = {cs}, data checksum = {data[19]}")
# print(f"DATA={str(data)}")
return cs_ok
def vindriktning_decode(data):
json_pm = {}
# json_pm['pm010'] = int((data[9] << 8) | data[10]) / 10 # Not officially supported
json_pm['pm025'] = int((data[5] << 8) | data[6]) / 10
# json_pm['pm100'] = int((data[13] << 8) | data[14]) / 10 # Not officially supported
json_pm['status'] = vindriktning_trafficlights(json_pm['pm025'])
return json_pm
def vindriktning_trafficlights(pm25):
# These are based on the Ikea VINDRIKTNING Manual
if pm25 <= 3.5:
tl = "Green"
elif pm25 <= 8.5:
tl = "Amber"
else:
tl = "Red"
return tl
def wifi_connect():
global ssid
global password
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, password)
max_wait = 6
while max_wait > 0:
if wlan.status() < 0 or wlan.status() >= 3:
break
max_wait -= 1
print("INFO: WIFI Waiting for connection...")
time.sleep(2)
if wlan.status() != 3:
raise RuntimeError("INFO: WIFI Connection failed")
else:
status = wlan.ifconfig()
print(f"INFO: WIFI connected - IP={status[0]}")
addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1]
wifi_socket = socket.socket()
wifi_socket.bind(addr)
wifi_socket.listen(1)
print(f"INFO: WIFI Listening on port {str(addr[1])}")
return wifi_socket
def serve_file(client, filename):
with open(filename, 'rb') as f:
while True:
c = f.read(1024)
if len(c) == 0:
break
client.sendall(c)
def wifi_thread():
global vindriktning_readings
global wifi_socket
print("INFO: Starting Wifi Thread")
while True:
try:
cl, addr = wifi_socket.accept()
request = cl.recv(1024)
print(f"INFO: Client connected from {str(addr)}")
request = str(request).split()
# reading = request.find('/reading')
path = ""
if len(request) > 2:
path = request[1]
if path == "/reading":
stateis = str(vindriktning_readings)
response = stateis
cl.send('HTTP/1.0 200 OK\r\nContent-type: application/json\r\n\r\n')
elif path == "/favicon.ico":
cl.send('HTTP/1.0 200 OK\r\nContent-type: image/icon\r\n\r\n')
serve_file(cl,"favicon.ico")
else:
cl.send('HTTP/1.0 200 OK\r\nContent-type: text/html\r\n\r\n')
response = "use /reading to obtain information"
cl.send(response)
cl.close()
except OSError as e:
cl.close()
print('INFO: Connection closed')
def uart_thread():
# Thread - Serial Reading and processing
global vindriktning_readings
print("INFO: Listening to PM1006 Sensor")
while True:
data = uart0.read()
if vindriktning_validpacket(data):
vindriktning_readings = vindriktning_decode(data)
print(vindriktning_readings)
# Thread 1 = Wifi Connectivity
wifi_socket = wifi_connect()
# thread = _thread.start_new_thread(wifi_thread,(ssid, password))
# wifi_thread(ssid, password)
thread = _thread.start_new_thread(uart_thread,())
wifi_thread()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment