Skip to content

Instantly share code, notes, and snippets.

@bgloh
Last active June 9, 2017 03:50
Show Gist options
  • Save bgloh/f6aec0788f3796d22c407de2e12194ab to your computer and use it in GitHub Desktop.
Save bgloh/f6aec0788f3796d22c407de2e12194ab to your computer and use it in GitHub Desktop.
bluepy sensortag.py
from bluepy.btle import UUID, Peripheral, DefaultDelegate, AssignedNumbers
import struct
import math
def _TI_UUID(val):
return UUID("%08X-0451-4000-b000-000000000000" % (0xF0000000+val))
# Sensortag versions
AUTODETECT = "-"
SENSORTAG_V1 = "v1"
SENSORTAG_2650 = "CC2650"
class SensorBase:
# Derived classes should set: svcUUID, ctrlUUID, dataUUID
sensorOn = struct.pack("B", 0x01)
sensorOff = struct.pack("B", 0x00)
def __init__(self, periph):
self.periph = periph
self.service = None
self.ctrl = None
self.data = None
def enable(self):
if self.service is None:
self.service = self.periph.getServiceByUUID(self.svcUUID)
if self.ctrl is None:
self.ctrl = self.service.getCharacteristics(self.ctrlUUID) [0]
if self.data is None:
self.data = self.service.getCharacteristics(self.dataUUID) [0]
if self.sensorOn is not None:
self.ctrl.write(self.sensorOn,withResponse=True)
def read(self):
return self.data.read()
def disable(self):
if self.ctrl is not None:
self.ctrl.write(self.sensorOff)
# Derived class should implement _formatData()
def calcPoly(coeffs, x):
return coeffs[0] + (coeffs[1]*x) + (coeffs[2]*x*x)
class IRTemperatureSensor(SensorBase):
svcUUID = _TI_UUID(0xAA00)
dataUUID = _TI_UUID(0xAA01)
ctrlUUID = _TI_UUID(0xAA02)
zeroC = 273.15 # Kelvin
tRef = 298.15
Apoly = [1.0, 1.75e-3, -1.678e-5]
Bpoly = [-2.94e-5, -5.7e-7, 4.63e-9]
Cpoly = [0.0, 1.0, 13.4]
def __init__(self, periph):
SensorBase.__init__(self, periph)
self.S0 = 6.4e-14
def read(self):
'''Returns (ambient_temp, target_temp) in degC'''
# See http://processors.wiki.ti.com/index.php/SensorTag_User_Guide#IR_Temperature_Sensor
(rawVobj, rawTamb) = struct.unpack('<hh', self.data.read())
tAmb = rawTamb / 128.0
Vobj = 1.5625e-7 * rawVobj
tDie = tAmb + self.zeroC
S = self.S0 * calcPoly(self.Apoly, tDie-self.tRef)
Vos = calcPoly(self.Bpoly, tDie-self.tRef)
fObj = calcPoly(self.Cpoly, Vobj-Vos)
tObj = math.pow( math.pow(tDie,4.0) + (fObj/S), 0.25 )
return (tAmb, tObj - self.zeroC)
# IO service added
class IOservice(SensorBase):
svcUUID = _TI_UUID(0xAA64)
dataUUID = _TI_UUID(0xAA65)
ctrlUUID = _TI_UUID(0xAA66)
sensorOn = None
remoteIoReady = struct.pack('B',0x01)
def __init__(self, periph):
SensorBase.__init__(self, periph)
def enable(self):
SensorBase.enable(self)
# self.ctrl.write(self.remoteIoReady)
# value: 1 = red, 2 = green, 3 = red + green , 4 = buzzer
# 5 = red + buzzer, 6 = green + buzzer, 7 = all
def write(self,value):
ioCommand = struct.pack('B',value)
self.data.write(ioCommand)
self.ctrl.write(self.remoteIoReady)
class IRTemperatureSensorTMP007(SensorBase):
svcUUID = _TI_UUID(0xAA00)
dataUUID = _TI_UUID(0xAA01)
ctrlUUID = _TI_UUID(0xAA02)
SCALE_LSB = 0.03125;
def __init__(self, periph):
SensorBase.__init__(self, periph)
def read(self):
'''Returns (ambient_temp, target_temp) in degC'''
# http://processors.wiki.ti.com/index.php/CC2650_SensorTag_User's_Guide?keyMatch=CC2650&tisearch=Search-EN
(rawTobj, rawTamb) = struct.unpack('<hh', self.data.read())
tObj = (rawTobj >> 2) * self.SCALE_LSB;
tAmb = (rawTamb >> 2) * self.SCALE_LSB;
return (tAmb, tObj)
class AccelerometerSensor(SensorBase):
svcUUID = _TI_UUID(0xAA10)
dataUUID = _TI_UUID(0xAA11)
ctrlUUID = _TI_UUID(0xAA12)
def __init__(self, periph):
SensorBase.__init__(self, periph)
if periph.firmwareVersion.startswith("1.4 "):
self.scale = 64.0
else:
self.scale = 16.0
def read(self):
'''Returns (x_accel, y_accel, z_accel) in units of g'''
x_y_z = struct.unpack('bbb', self.data.read())
return tuple([ (val/self.scale) for val in x_y_z ])
class MovementSensorMPU9250(SensorBase):
svcUUID = _TI_UUID(0xAA80)
dataUUID = _TI_UUID(0xAA81)
ctrlUUID = _TI_UUID(0xAA82)
sensorOn = None
GYRO_XYZ = 7
ACCEL_XYZ = 7 << 3
MAG_XYZ = 1 << 6
ACCEL_RANGE_2G = 0 << 8
ACCEL_RANGE_4G = 1 << 8
ACCEL_RANGE_8G = 2 << 8
ACCEL_RANGE_16G = 3 << 8
def __init__(self, periph):
SensorBase.__init__(self, periph)
self.ctrlBits = 0
def enable(self, bits):
SensorBase.enable(self)
self.ctrlBits |= bits
self.ctrl.write( struct.pack("<H", self.ctrlBits) )
def disable(self, bits):
self.ctrlBits &= ~bits
self.ctrl.write( struct.pack("<H", self.ctrlBits) )
def rawRead(self):
dval = self.data.read()
return struct.unpack("<hhhhhhhhh", dval)
class AccelerometerSensorMPU9250:
def __init__(self, sensor_):
self.sensor = sensor_
self.bits = self.sensor.ACCEL_XYZ | self.sensor.ACCEL_RANGE_4G
self.scale = 8.0/32768.0 # TODO: why not 4.0, as documented?
def enable(self):
self.sensor.enable(self.bits)
def disable(self):
self.sensor.disable(self.bits)
def read(self):
'''Returns (x_accel, y_accel, z_accel) in units of g'''
rawVals = self.sensor.rawRead()[3:6]
return tuple([ v*self.scale for v in rawVals ])
class HumiditySensor(SensorBase):
svcUUID = _TI_UUID(0xAA20)
dataUUID = _TI_UUID(0xAA21)
ctrlUUID = _TI_UUID(0xAA22)
def __init__(self, periph):
SensorBase.__init__(self, periph)
def read(self):
'''Returns (ambient_temp, rel_humidity)'''
(rawT, rawH) = struct.unpack('<HH', self.data.read())
temp = -46.85 + 175.72 * (rawT / 65536.0)
RH = -6.0 + 125.0 * ((rawH & 0xFFFC)/65536.0)
return (temp, RH)
class HumiditySensorHDC1000(SensorBase):
svcUUID = _TI_UUID(0xAA20)
dataUUID = _TI_UUID(0xAA21)
ctrlUUID = _TI_UUID(0xAA22)
def __init__(self, periph):
SensorBase.__init__(self, periph)
def read(self):
'''Returns (ambient_temp, rel_humidity)'''
(rawT, rawH) = struct.unpack('<HH', self.data.read())
temp = -40.0 + 165.0 * (rawT / 65536.0)
RH = 100.0 * (rawH/65536.0)
return (temp, RH)
class MagnetometerSensor(SensorBase):
svcUUID = _TI_UUID(0xAA30)
dataUUID = _TI_UUID(0xAA31)
ctrlUUID = _TI_UUID(0xAA32)
def __init__(self, periph):
SensorBase.__init__(self, periph)
def read(self):
'''Returns (x, y, z) in uT units'''
x_y_z = struct.unpack('<hhh', self.data.read())
return tuple([ 1000.0 * (v/32768.0) for v in x_y_z ])
# Revisit - some absolute calibration is needed
class MagnetometerSensorMPU9250:
def __init__(self, sensor_):
self.sensor = sensor_
self.scale = 4912.0 / 32760
# Reference: MPU-9250 register map v1.4
def enable(self):
self.sensor.enable(self.sensor.MAG_XYZ)
def disable(self):
self.sensor.disable(self.sensor.MAG_XYZ)
def read(self):
'''Returns (x_mag, y_mag, z_mag) in units of uT'''
rawVals = self.sensor.rawRead()[6:9]
return tuple([ v*self.scale for v in rawVals ])
class BarometerSensor(SensorBase):
svcUUID = _TI_UUID(0xAA40)
dataUUID = _TI_UUID(0xAA41)
ctrlUUID = _TI_UUID(0xAA42)
calUUID = _TI_UUID(0xAA43)
sensorOn = None
def __init__(self, periph):
SensorBase.__init__(self, periph)
def enable(self):
SensorBase.enable(self)
self.calChr = self.service.getCharacteristics(self.calUUID) [0]
# Read calibration data
self.ctrl.write( struct.pack("B", 0x02), True )
(c1,c2,c3,c4,c5,c6,c7,c8) = struct.unpack("<HHHHhhhh", self.calChr.read())
self.c1_s = c1/float(1 << 24)
self.c2_s = c2/float(1 << 10)
self.sensPoly = [ c3/1.0, c4/float(1 << 17), c5/float(1<<34) ]
self.offsPoly = [ c6*float(1<<14), c7/8.0, c8/float(1<<19) ]
self.ctrl.write( struct.pack("B", 0x01), True )
def read(self):
'''Returns (ambient_temp, pressure_millibars)'''
(rawT, rawP) = struct.unpack('<hH', self.data.read())
temp = (self.c1_s * rawT) + self.c2_s
sens = calcPoly( self.sensPoly, float(rawT) )
offs = calcPoly( self.offsPoly, float(rawT) )
pres = (sens * rawP + offs) / (100.0 * float(1<<14))
return (temp,pres)
class BarometerSensorBMP280(SensorBase):
svcUUID = _TI_UUID(0xAA40)
dataUUID = _TI_UUID(0xAA41)
ctrlUUID = _TI_UUID(0xAA42)
def __init__(self, periph):
SensorBase.__init__(self, periph)
def read(self):
(tL,tM,tH,pL,pM,pH) = struct.unpack('<BBBBBB', self.data.read())
temp = (tH*65536 + tM*256 + tL) / 100.0
press = (pH*65536 + pM*256 + pL) / 100.0
return (temp, press)
class GyroscopeSensor(SensorBase):
svcUUID = _TI_UUID(0xAA50)
dataUUID = _TI_UUID(0xAA51)
ctrlUUID = _TI_UUID(0xAA52)
sensorOn = struct.pack("B",0x07)
def __init__(self, periph):
SensorBase.__init__(self, periph)
def read(self):
'''Returns (x,y,z) rate in deg/sec'''
x_y_z = struct.unpack('<hhh', self.data.read())
return tuple([ 250.0 * (v/32768.0) for v in x_y_z ])
class GyroscopeSensorMPU9250:
def __init__(self, sensor_):
self.sensor = sensor_
self.scale = 500.0/65536.0
def enable(self):
self.sensor.enable(self.sensor.GYRO_XYZ)
def disable(self):
self.sensor.disable(self.sensor.GYRO_XYZ)
def read(self):
'''Returns (x_gyro, y_gyro, z_gyro) in units of degrees/sec'''
rawVals = self.sensor.rawRead()[0:3]
return tuple([ v*self.scale for v in rawVals ])
class KeypressSensor(SensorBase):
svcUUID = UUID(0xFFE0)
dataUUID = UUID(0xFFE1)
ctrlUUID = None
sensorOn = None
def __init__(self, periph):
SensorBase.__init__(self, periph)
def enable(self):
SensorBase.enable(self)
self.char_descr = self.service.getDescriptors(forUUID=0x2902)[0]
self.char_descr.write(struct.pack('<bb', 0x01, 0x00), True)
def disable(self):
self.char_descr.write(struct.pack('<bb', 0x00, 0x00), True)
class OpticalSensorOPT3001(SensorBase):
svcUUID = _TI_UUID(0xAA70)
dataUUID = _TI_UUID(0xAA71)
ctrlUUID = _TI_UUID(0xAA72)
def __init__(self, periph):
SensorBase.__init__(self, periph)
def read(self):
'''Returns value in lux'''
raw = struct.unpack('<h', self.data.read()) [0]
m = raw & 0xFFF;
e = (raw & 0xF000) >> 12;
return 0.01 * (m << e)
class BatterySensor(SensorBase):
svcUUID = UUID("0000180f-0000-1000-8000-00805f9b34fb")
dataUUID = UUID("00002a19-0000-1000-8000-00805f9b34fb")
ctrlUUID = None
sensorOn = None
def __init__(self, periph):
SensorBase.__init__(self, periph)
def read(self):
'''Returns the battery level in percent'''
val = ord(self.data.read())
return val
class SensorTag(Peripheral):
def __init__(self,addr,version=AUTODETECT):
Peripheral.__init__(self,addr)
if version==AUTODETECT:
svcs = self.discoverServices()
if _TI_UUID(0xAA70) in svcs:
version = SENSORTAG_2650
else:
version = SENSORTAG_V1
fwVers = self.getCharacteristics(uuid=AssignedNumbers.firmwareRevisionString)
if len(fwVers) >= 1:
self.firmwareVersion = fwVers[0].read().decode("utf-8")
else:
self.firmwareVersion = u''
if version==SENSORTAG_V1:
self.IRtemperature = IRTemperatureSensor(self)
self.accelerometer = AccelerometerSensor(self)
self.humidity = HumiditySensor(self)
self.magnetometer = MagnetometerSensor(self)
self.barometer = BarometerSensor(self)
self.gyroscope = GyroscopeSensor(self)
self.keypress = KeypressSensor(self)
self.lightmeter = None
elif version==SENSORTAG_2650:
self._mpu9250 = MovementSensorMPU9250(self)
self.IRtemperature = IRTemperatureSensorTMP007(self)
self.accelerometer = AccelerometerSensorMPU9250(self._mpu9250)
self.humidity = HumiditySensorHDC1000(self)
self.magnetometer = MagnetometerSensorMPU9250(self._mpu9250)
self.barometer = BarometerSensorBMP280(self)
self.gyroscope = GyroscopeSensorMPU9250(self._mpu9250)
self.keypress = KeypressSensor(self)
self.lightmeter = OpticalSensorOPT3001(self)
self.battery = BatterySensor(self)
self.ioService = IOservice(self)
class KeypressDelegate(DefaultDelegate):
BUTTON_L = 0x02
BUTTON_R = 0x01
ALL_BUTTONS = (BUTTON_L | BUTTON_R)
_button_desc = {
BUTTON_L : "Left button",
BUTTON_R : "Right button",
ALL_BUTTONS : "Both buttons"
}
def __init__(self):
DefaultDelegate.__init__(self)
self.lastVal = 0
def handleNotification(self, hnd, data):
# NB: only one source of notifications at present
# so we can ignore 'hnd'.
val = struct.unpack("B", data)[0]
down = (val & ~self.lastVal) & self.ALL_BUTTONS
if down != 0:
self.onButtonDown(down)
up = (~val & self.lastVal) & self.ALL_BUTTONS
if up != 0:
self.onButtonUp(up)
self.lastVal = val
def onButtonUp(self, but):
print ( "** " + self._button_desc[but] + " UP")
def onButtonDown(self, but):
print ( "** " + self._button_desc[but] + " DOWN")
def main():
import time
import sys
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('host', action='store',help='MAC of BT device')
parser.add_argument('-n', action='store', dest='count', default=0,
type=int, help="Number of times to loop data")
parser.add_argument('-t',action='store',type=float, default=5.0, help='time between polling')
parser.add_argument('-T','--temperature', action="store_true",default=False)
parser.add_argument('-A','--accelerometer', action='store_true',
default=False)
parser.add_argument('-H','--humidity', action='store_true', default=False)
parser.add_argument('-M','--magnetometer', action='store_true',
default=False)
parser.add_argument('-B','--barometer', action='store_true', default=False)
parser.add_argument('-G','--gyroscope', action='store_true', default=False)
parser.add_argument('-K','--keypress', action='store_true', default=False)
parser.add_argument('-L','--light', action='store_true', default=False)
parser.add_argument('-P','--battery', action='store_true', default=False)
parser.add_argument('--all', action='store_true', default=False)
arg = parser.parse_args(sys.argv[1:])
print('Connecting to ' + arg.host)
tag = SensorTag(arg.host)
# Enabling selected sensors
if arg.temperature or arg.all:
tag.IRtemperature.enable()
if arg.humidity or arg.all:
tag.humidity.enable()
if arg.barometer or arg.all:
tag.barometer.enable()
if arg.accelerometer or arg.all:
tag.accelerometer.enable()
if arg.magnetometer or arg.all:
tag.magnetometer.enable()
if arg.gyroscope or arg.all:
tag.gyroscope.enable()
if arg.battery or arg.all:
tag.battery.enable()
if arg.keypress or arg.all:
tag.keypress.enable()
tag.setDelegate(KeypressDelegate())
if arg.light and tag.lightmeter is None:
print("Warning: no lightmeter on this device")
if (arg.light or arg.all) and tag.lightmeter is not None:
tag.lightmeter.enable()
# Some sensors (e.g., temperature, accelerometer) need some time for initialization.
# Not waiting here after enabling a sensor, the first read value might be empty or incorrect.
time.sleep(1.0)
counter=1
while True:
if arg.temperature or arg.all:
print('Temp: ', tag.IRtemperature.read())
if arg.humidity or arg.all:
print("Humidity: ", tag.humidity.read())
if arg.barometer or arg.all:
print("Barometer: ", tag.barometer.read())
if arg.accelerometer or arg.all:
print("Accelerometer: ", tag.accelerometer.read())
if arg.magnetometer or arg.all:
print("Magnetometer: ", tag.magnetometer.read())
if arg.gyroscope or arg.all:
print("Gyroscope: ", tag.gyroscope.read())
if (arg.light or arg.all) and tag.lightmeter is not None:
print("Light: ", tag.lightmeter.read())
if arg.battery or arg.all:
print("Battery: ", tag.battery.read())
if counter >= arg.count and arg.count != 0:
break
counter += 1
tag.waitForNotifications(arg.t)
tag.disconnect()
del tag
if __name__ == "__main__":
main()
@bgloh
Copy link
Author

bgloh commented Jun 9, 2017

ioService is added.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment