Created
October 20, 2023 09:07
-
-
Save steven-geo/6ff2827bdf228d1f8375acff21bb5c9d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# MicroPython code for the Raspberry Pi Pico to read the Ikea Vindriktning | |
# PM1006 Air Quality Sensor | |
# http://www.jdscompany.co.kr/download.asp?gubun=07&filename=PM1006_LED_PARTICLE_SENSOR_MODULE_SPECIFICATIONS.pdf | |
# | |
# Signal PM1006 Pico | |
# +5V +5V VSYS | |
# GND GND GND | |
# TX REST GP1 (UART0 RX) | |
# NOTE: Please add a favicon.ico to the root of the raspberry Pi if you want an Icon shown for the web interface | |
import _thread | |
import network | |
import socket | |
import time | |
from machine import Pin | |
from machine import UART | |
threadlocker = _thread.allocate_lock() | |
vindriktning_readings = "none available" | |
# vindriktning - data bit: 8, Stop bit: 1, Check bit: none, Baud rate: 9600bps | |
uart0 = UART(0, baudrate=9600, tx=Pin(0), rx=Pin(1)) | |
# WIFI Setup | |
ssid = "mySSID" | |
password = "mySecurePa33w0rd" | |
def vindriktning_validpacket(data): | |
validpacket = True | |
if not data: # Packet is none - just exit | |
validpacket = False | |
elif len(data) != 20: # Packet must be 20 bytes long | |
print(f"WARNING: Invalid PM1006 Packet Length of {len(data)}") | |
validpacket = False | |
elif not vindriktning_checksum(data): | |
print(f"WARNING: PM1006 Checksum Invalid") | |
validpacket = False | |
elif not vindriktning_headerok(data): | |
print(f"WARNING: PM1006 Header Does not match") | |
validpacket = False | |
return validpacket | |
def vindriktning_headerok(d): | |
# First 3 Bytes must be 0x16,0x11,0x0b | |
headerValid = (d[0] == 0x16 and d[1] == 0x11 and d[2] == 0x0B) | |
return headerValid | |
def vindriktning_checksum(data): | |
# Last byte is the checksum of the previous 19 bytes | |
cs = 0 | |
for x in range (len(data) - 1): | |
cs += data[x] | |
if cs > 256: | |
cs = cs - 256 | |
cs_ok = ((256 - cs) == data[19]) | |
# if not cs_ok: | |
# print(f"Calculated Checksum = {cs}, data checksum = {data[19]}") | |
# print(f"DATA={str(data)}") | |
return cs_ok | |
def vindriktning_decode(data): | |
json_pm = {} | |
# json_pm['pm010'] = int((data[9] << 8) | data[10]) / 10 # Not officially supported | |
json_pm['pm025'] = int((data[5] << 8) | data[6]) / 10 | |
# json_pm['pm100'] = int((data[13] << 8) | data[14]) / 10 # Not officially supported | |
json_pm['status'] = vindriktning_trafficlights(json_pm['pm025']) | |
return json_pm | |
def vindriktning_trafficlights(pm25): | |
# These are based on the Ikea VINDRIKTNING Manual | |
if pm25 <= 3.5: | |
tl = "Green" | |
elif pm25 <= 8.5: | |
tl = "Amber" | |
else: | |
tl = "Red" | |
return tl | |
def wifi_connect(): | |
global ssid | |
global password | |
wlan = network.WLAN(network.STA_IF) | |
wlan.active(True) | |
wlan.connect(ssid, password) | |
max_wait = 6 | |
while max_wait > 0: | |
if wlan.status() < 0 or wlan.status() >= 3: | |
break | |
max_wait -= 1 | |
print("INFO: WIFI Waiting for connection...") | |
time.sleep(2) | |
if wlan.status() != 3: | |
raise RuntimeError("INFO: WIFI Connection failed") | |
else: | |
status = wlan.ifconfig() | |
print(f"INFO: WIFI connected - IP={status[0]}") | |
addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1] | |
wifi_socket = socket.socket() | |
wifi_socket.bind(addr) | |
wifi_socket.listen(1) | |
print(f"INFO: WIFI Listening on port {str(addr[1])}") | |
return wifi_socket | |
def serve_file(client, filename): | |
with open(filename, 'rb') as f: | |
while True: | |
c = f.read(1024) | |
if len(c) == 0: | |
break | |
client.sendall(c) | |
def wifi_thread(): | |
global vindriktning_readings | |
global wifi_socket | |
print("INFO: Starting Wifi Thread") | |
while True: | |
try: | |
cl, addr = wifi_socket.accept() | |
request = cl.recv(1024) | |
print(f"INFO: Client connected from {str(addr)}") | |
request = str(request).split() | |
# reading = request.find('/reading') | |
path = "" | |
if len(request) > 2: | |
path = request[1] | |
if path == "/reading": | |
stateis = str(vindriktning_readings) | |
response = stateis | |
cl.send('HTTP/1.0 200 OK\r\nContent-type: application/json\r\n\r\n') | |
elif path == "/favicon.ico": | |
cl.send('HTTP/1.0 200 OK\r\nContent-type: image/icon\r\n\r\n') | |
serve_file(cl,"favicon.ico") | |
else: | |
cl.send('HTTP/1.0 200 OK\r\nContent-type: text/html\r\n\r\n') | |
response = "use /reading to obtain information" | |
cl.send(response) | |
cl.close() | |
except OSError as e: | |
cl.close() | |
print('INFO: Connection closed') | |
def uart_thread(): | |
# Thread - Serial Reading and processing | |
global vindriktning_readings | |
print("INFO: Listening to PM1006 Sensor") | |
while True: | |
data = uart0.read() | |
if vindriktning_validpacket(data): | |
vindriktning_readings = vindriktning_decode(data) | |
print(vindriktning_readings) | |
# Thread 1 = Wifi Connectivity | |
wifi_socket = wifi_connect() | |
# thread = _thread.start_new_thread(wifi_thread,(ssid, password)) | |
# wifi_thread(ssid, password) | |
thread = _thread.start_new_thread(uart_thread,()) | |
wifi_thread() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment