Skip to content

Instantly share code, notes, and snippets.

@koryk
Created October 28, 2023 13:02
Show Gist options
  • Save koryk/2b0ee913c74fc72a002b2eb9817a31b8 to your computer and use it in GitHub Desktop.
Save koryk/2b0ee913c74fc72a002b2eb9817a31b8 to your computer and use it in GitHub Desktop.
a few modifications from upstream, pipe output force json
# MIT License
#
# Copyright (c) 2018 Airthings AS
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# https://airthings.com
# ===============================
# Module import dependencies
# ===============================
from bluepy.btle import UUID, Peripheral, Scanner, DefaultDelegate
import sys
import time
import struct
import json
import tableprint
# ===============================
# Script guards for correct usage
# ===============================
if len(sys.argv) < 3:
print "ERROR: Missing input argument SN or SAMPLE-PERIOD."
print "USAGE: read_waveplus.py SN SAMPLE-PERIOD [pipe > yourfile.txt]"
print " where SN is the 10-digit serial number found under the magnetic backplate of your Wave Plus."
print " where SAMPLE-PERIOD is the time in seconds between reading the current values."
print " where [pipe > yourfile.txt] is optional and specifies that you want to pipe your results to yourfile.txt."
sys.exit(1)
if sys.argv[1].isdigit() is not True or len(sys.argv[1]) != 10:
print "ERROR: Invalid SN format."
print "USAGE: read_waveplus.py SN SAMPLE-PERIOD [pipe > yourfile.txt]"
print " where SN is the 10-digit serial number found under the magnetic backplate of your Wave Plus."
print " where SAMPLE-PERIOD is the time in seconds between reading the current values."
print " where [pipe > yourfile.txt] is optional and specifies that you want to pipe your results to yourfile.txt."
sys.exit(1)
if sys.argv[2].isdigit() is not True or int(sys.argv[2])<0:
print "ERROR: Invalid SAMPLE-PERIOD. Must be a numerical value larger than zero."
print "USAGE: read_waveplus.py SN SAMPLE-PERIOD [pipe > yourfile.txt]"
print " where SN is the 10-digit serial number found under the magnetic backplate of your Wave Plus."
print " where SAMPLE-PERIOD is the time in seconds between reading the current values."
print " where [pipe > yourfile.txt] is optional and specifies that you want to pipe your results to yourfile.txt."
sys.exit(1)
if len(sys.argv) > 3:
Mode = sys.argv[3].lower()
else:
Mode = 'terminal' # (default) print to terminal
if Mode!='pipe' and Mode!='terminal':
print "ERROR: Invalid piping method."
print "USAGE: read_waveplus.py SN SAMPLE-PERIOD [pipe > yourfile.txt]"
print " where SN is the 10-digit serial number found under the magnetic backplate of your Wave Plus."
print " where SAMPLE-PERIOD is the time in seconds between reading the current values."
print " where [pipe > yourfile.txt] is optional and specifies that you want to pipe your results to yourfile.txt."
sys.exit(1)
SerialNumber = int(sys.argv[1])
SamplePeriod = int(sys.argv[2])
# ====================================
# Utility functions for WavePlus class
# ====================================
def parseSerialNumber(ManuDataHexStr):
if (ManuDataHexStr == "None"):
SN = "Unknown"
else:
ManuData = bytearray.fromhex(ManuDataHexStr)
if (((ManuData[1] << 8) | ManuData[0]) == 0x0334):
SN = ManuData[2]
SN |= (ManuData[3] << 8)
SN |= (ManuData[4] << 16)
SN |= (ManuData[5] << 24)
else:
SN = "Unknown"
return SN
# ===============================
# Class WavePlus
# ===============================
class WavePlus():
def __init__(self, SerialNumber):
self.periph = None
self.curr_val_char = None
self.MacAddr = None
self.SN = SerialNumber
self.uuid = UUID("b42e2a68-ade7-11e4-89d3-123b93f75cba")
def connect(self):
# Auto-discover device on first connection
if (self.MacAddr is None):
scanner = Scanner().withDelegate(DefaultDelegate())
searchCount = 0
while self.MacAddr is None and searchCount < 50:
devices = scanner.scan(0.1) # 0.1 seconds scan period
searchCount += 1
for dev in devices:
ManuData = dev.getValueText(255)
SN = parseSerialNumber(ManuData)
if (SN == self.SN):
self.MacAddr = dev.addr # exits the while loop on next conditional check
break # exit for loop
if (self.MacAddr is None):
print "ERROR: Could not find device."
print "GUIDE: (1) Please verify the serial number."
print " (2) Ensure that the device is advertising."
print " (3) Retry connection."
sys.exit(1)
# Connect to device
if (self.periph is None):
self.periph = Peripheral(self.MacAddr)
if (self.curr_val_char is None):
self.curr_val_char = self.periph.getCharacteristics(uuid=self.uuid)[0]
def read(self):
if (self.curr_val_char is None):
print "ERROR: Devices are not connected."
sys.exit(1)
rawdata = self.curr_val_char.read()
rawdata = struct.unpack('BBBBHHHHHHHH', rawdata)
sensors = Sensors()
sensors.set(rawdata)
return sensors
def disconnect(self):
if self.periph is not None:
self.periph.disconnect()
self.periph = None
self.curr_val_char = None
# ===================================
# Class Sensor and sensor definitions
# ===================================
NUMBER_OF_SENSORS = 7
SENSOR_IDX_HUMIDITY = 0
SENSOR_IDX_RADON_SHORT_TERM_AVG = 1
SENSOR_IDX_RADON_LONG_TERM_AVG = 2
SENSOR_IDX_TEMPERATURE = 3
SENSOR_IDX_REL_ATM_PRESSURE = 4
SENSOR_IDX_CO2_LVL = 5
SENSOR_IDX_VOC_LVL = 6
class Sensors():
def __init__(self):
self.sensor_version = None
self.sensor_data = [None]*NUMBER_OF_SENSORS
self.sensor_units = ["%rH", "Bq/m3", "Bq/m3", "degC", "hPa", "ppm", "ppb"]
def set(self, rawData):
self.sensor_version = rawData[0]
if (self.sensor_version == 1):
self.sensor_data[SENSOR_IDX_HUMIDITY] = rawData[1]/2.0
self.sensor_data[SENSOR_IDX_RADON_SHORT_TERM_AVG] = self.conv2radon(rawData[4])
self.sensor_data[SENSOR_IDX_RADON_LONG_TERM_AVG] = self.conv2radon(rawData[5])
self.sensor_data[SENSOR_IDX_TEMPERATURE] = rawData[6]/100.0
self.sensor_data[SENSOR_IDX_REL_ATM_PRESSURE] = rawData[7]/50.0
self.sensor_data[SENSOR_IDX_CO2_LVL] = rawData[8]*1.0
self.sensor_data[SENSOR_IDX_VOC_LVL] = rawData[9]*1.0
else:
print "ERROR: Unknown sensor version.\n"
print "GUIDE: Contact Airthings for support.\n"
sys.exit(1)
def conv2radon(self, radon_raw):
radon = "N/A" # Either invalid measurement, or not available
if 0 <= radon_raw <= 16383:
radon = radon_raw
return radon
def getValue(self, sensor_index):
return self.sensor_data[sensor_index]
def getUnit(self, sensor_index):
return self.sensor_units[sensor_index]
try:
#---- Initialize ----#
waveplus = WavePlus(SerialNumber)
if (Mode=='terminal'):
print "\nPress ctrl+C to exit program\n"
print "Device serial number: %s" %(SerialNumber)
header = ['Humidity', 'Radon ST avg', 'Radon LT avg', 'Temperature', 'Pressure', 'CO2 level', 'VOC level']
if (Mode=='terminal'):
print tableprint.header(header, width=12)
count = 0
while True:
waveplus.connect()
# read values
sensors = waveplus.read()
# extract
humidity = str(sensors.getValue(SENSOR_IDX_HUMIDITY))
radon_st_avg = str(sensors.getValue(SENSOR_IDX_RADON_SHORT_TERM_AVG))
radon_lt_avg = str(sensors.getValue(SENSOR_IDX_RADON_LONG_TERM_AVG))
temperature = str(sensors.getValue(SENSOR_IDX_TEMPERATURE))
pressure = str(sensors.getValue(SENSOR_IDX_REL_ATM_PRESSURE))
CO2_lvl = str(sensors.getValue(SENSOR_IDX_CO2_LVL))
VOC_lvl = str(sensors.getValue(SENSOR_IDX_VOC_LVL))
# Print data
data = [humidity, radon_st_avg, radon_lt_avg, temperature, pressure, CO2_lvl, VOC_lvl]
if (Mode=='terminal'):
print tableprint.row(data, width=12)
elif (Mode=='pipe'):
print json.dumps(data)
waveplus.disconnect()
if count >= 2:
break
count += 1
time.sleep(SamplePeriod)
finally:
waveplus.disconnect()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment