Skip to content

Instantly share code, notes, and snippets.

@Amitjipathak
Last active June 15, 2023 13:33
Show Gist options
  • Save Amitjipathak/ee59c31f6e43c2455aaf4a29c241b9bb to your computer and use it in GitHub Desktop.
Save Amitjipathak/ee59c31f6e43c2455aaf4a29c241b9bb to your computer and use it in GitHub Desktop.
uploading cc2650stk sensor data to azure iot hub/stream analytics/powerBi using raspberrypi
from base64 import b64encode, b64decode
from hashlib import sha256
from urllib import quote_plus, urlencode
from hmac import HMAC
import requests
import time
import json
import struct
import Sensortag2650 as sensortag
# don't even bother starting until you see your SensorTag
# $ hcitool -i hci1 lescan
# LE Scan ...
# B0:B4:48:BD:B8:05 CC2650 SensorTag
my_sensor = '54:6C:0E:52:F8:FC'
tag = sensortag.SensorTag(my_sensor)
print ("Connected to SensorTag", my_sensor)
sensorOn = struct.pack("B", 0x01)
sensorbarcal = struct.pack("B", 0x02)
sensorMagOn = struct.pack("H", 0x0007)
sensorGyrOn = struct.pack("H", 0x0007)
sensorAccOn = struct.pack("H", 0x0038)
tag.IRtemperature.enable()
tag.humidity.enable()
tag.barometer.enable()
tag.accelerometer.enable()
tag.magnetometer.enable()
tag.gyroscope.enable()
tag.lightmeter.enable()
tag.battery.enable()
# Azure IoT Hub
URI = 'sensor.azure-devices.net' # your iothub
KEY = 'yourprimarykeystring' #fill your primary key string from shared access polices or go to iot devices and copy paste primary key string
IOT_DEVICE_ID = 'RaspberryPi'
POLICY = 'iothubowner'
def generate_sas_token():
expiry=3600
ttl = time.time() + expiry
sign_key = "%s\n%d" % ((quote_plus(URI)), int(ttl))
signature = b64encode(HMAC(b64decode(KEY), sign_key, sha256).digest())
rawtoken = {
'sr' : URI,
'sig': signature,
'se' : str(int(ttl))
}
rawtoken['skn'] = POLICY
return 'SharedAccessSignature ' + urlencode(rawtoken)
def send_message(token, message):
url = 'https://{0}/devices/{1}/messages/events?api-version=2016-11-14'.format(URI, IOT_DEVICE_ID)
headers = {
"Content-Type": "application/json",
"Authorization": token
}
data = json.dumps(message)
#print (data)
response = requests.post(url, data=data, headers=headers)
if __name__ == '__main__':
# 2. Generate SAS Token
token = generate_sas_token()
while True:
msgs = []
ambient_temp, target_temp= tag.IRtemperature.read()
x_accel, y_accel, z_accel = tag.accelerometer.read()
ambient_temp, rel_humidity = tag.humidity.read()
lux = tag.lightmeter.read()
ambient_temp, pressure_millibars = tag.barometer.read()
x_mag, y_mag, z_mag = tag.magnetometer.read()
x_gyro, y_gyro, z_gyro= tag.gyroscope.read()
battery =tag.battery.read()
data = {
"ambient_temp" : ambient_temp,
"target_temp" : target_temp,
"humidity" : rel_humidity,
"x_accel" : x_accel,
"y_accel" : y_accel,
"z_accel" : z_accel,
"lux" : lux,
"x_mag" : x_mag,
"y_mag" : y_mag,
"z_mag" : z_mag,
"battery" : battery,
"x_gyro" : x_gyro,
"y_gyro" : y_gyro,
"z_gyro" : z_gyro,
"millibars" : pressure_millibars,
# "tst" : int(time.time()),
}
payload = json.dumps(data,indent=2)
print (payload)
message = { "data": str(payload) }
send_message(token, message)
time.sleep(1)
tag.disconnect()
search and connect your bluetooth sensor follow this link https://elinux.org/RPi_Bluetooth_LE
(type in pi command shell/pi terminal) sudo hcitool lescan
gatttool -I -b 54:6C:0E:52:F8:FC
connect (after this step your sensortag is connected to raspberry pi)
now download bluepy libaray from this link https://github.com/IanHarvey/bluepy
after connecting your sensor tag to raspberry pi make sure to change the directory in pi terminal type 1.cd bluepy/bluepy 2.make
if you want to see sensor tag data in pi terminal then type python Sensortag2650.py -T -A -H -M -B -G -K -L -P 54:6C:0E:52:F8:FC
after performing above step you will get sensor data in pi terminal
now you can use either mqtt or rest api to send your data to azure iot hub i have used rest api to send it follow this https://gist.github.com/jpmens/4bbe165b19a2b0d457c6
put your code both Sensortag.py and SendIoThub.py in bluepy folder (otherwise you will get error like can not import bluepy.btle)
for rest api reference you can see https://www.taygan.co/blog/2018/03/12/streaming-sensor-data-in-real-time-with-azure-iot-hub
follow https://www.taygan.co/blog/2018/03/12/streaming-sensor-data-in-real-time-with-azure-iot-hub this link to create iothub in azure
now type python SendIoThub.py to your raspberrypi terminal
now your sensordata is sending to iot hub you can see your data in azure cloud/powershell and stream analytics job at both places
from bluepy.btle import UUID, Peripheral, DefaultDelegate, AssignedNumbers
import struct
import math
def _TI_UUID(val):
return UUID("%08X-0451-4000-b000-000000000000" % (0xF0000000+val))
# Sensortag versions
AUTODETECT = "-"
SENSORTAG_V1 = "v1"
SENSORTAG_2650 = "CC2650"
class SensorBase:
# Derived classes should set: svcUUID, ctrlUUID, dataUUID
sensorOn = struct.pack("B", 0x01)
sensorOff = struct.pack("B", 0x00)
def __init__(self, periph):
self.periph = periph
self.service = None
self.ctrl = None
self.data = None
def enable(self):
if self.service is None:
self.service = self.periph.getServiceByUUID(self.svcUUID)
if self.ctrl is None:
self.ctrl = self.service.getCharacteristics(self.ctrlUUID) [0]
if self.data is None:
self.data = self.service.getCharacteristics(self.dataUUID) [0]
if self.sensorOn is not None:
self.ctrl.write(self.sensorOn,withResponse=True)
def read(self):
return self.data.read()
def disable(self):
if self.ctrl is not None:
self.ctrl.write(self.sensorOff)
# Derived class should implement _formatData()
def calcPoly(coeffs, x):
return coeffs[0] + (coeffs[1]*x) + (coeffs[2]*x*x)
class IRTemperatureSensor(SensorBase):
svcUUID = _TI_UUID(0xAA00)
dataUUID = _TI_UUID(0xAA01)
ctrlUUID = _TI_UUID(0xAA02)
zeroC = 273.15 # Kelvin
tRef = 298.15
Apoly = [1.0, 1.75e-3, -1.678e-5]
Bpoly = [-2.94e-5, -5.7e-7, 4.63e-9]
Cpoly = [0.0, 1.0, 13.4]
def __init__(self, periph):
SensorBase.__init__(self, periph)
self.S0 = 6.4e-14
def read(self):
'''Returns (ambient_temp, target_temp) in degC'''
# See http://processors.wiki.ti.com/index.php/SensorTag_User_Guide#IR_Temperature_Sensor
(rawVobj, rawTamb) = struct.unpack('<hh', self.data.read())
tAmb = rawTamb / 128.0
Vobj = 1.5625e-7 * rawVobj
tDie = tAmb + self.zeroC
S = self.S0 * calcPoly(self.Apoly, tDie-self.tRef)
Vos = calcPoly(self.Bpoly, tDie-self.tRef)
fObj = calcPoly(self.Cpoly, Vobj-Vos)
tObj = math.pow( math.pow(tDie,4.0) + (fObj/S), 0.25 )
return (tAmb, tObj - self.zeroC)
class IRTemperatureSensorTMP007(SensorBase):
svcUUID = _TI_UUID(0xAA00)
dataUUID = _TI_UUID(0xAA01)
ctrlUUID = _TI_UUID(0xAA02)
SCALE_LSB = 0.03125;
def __init__(self, periph):
SensorBase.__init__(self, periph)
def read(self):
'''Returns (ambient_temp, target_temp) in degC'''
# http://processors.wiki.ti.com/index.php/CC2650_SensorTag_User's_Guide?keyMatch=CC2650&tisearch=Search-EN
(rawTobj, rawTamb) = struct.unpack('<hh', self.data.read())
tObj = (rawTobj >> 2) * self.SCALE_LSB;
tAmb = (rawTamb >> 2) * self.SCALE_LSB;
return (tAmb, tObj)
class AccelerometerSensor(SensorBase):
svcUUID = _TI_UUID(0xAA10)
dataUUID = _TI_UUID(0xAA11)
ctrlUUID = _TI_UUID(0xAA12)
def __init__(self, periph):
SensorBase.__init__(self, periph)
if periph.firmwareVersion.startswith("1.4 "):
self.scale = 64.0
else:
self.scale = 16.0
def read(self):
'''Returns (x_accel, y_accel, z_accel) in units of g'''
x_y_z = struct.unpack('bbb', self.data.read())
return tuple([ (val/self.scale) for val in x_y_z ])
class MovementSensorMPU9250(SensorBase):
svcUUID = _TI_UUID(0xAA80)
dataUUID = _TI_UUID(0xAA81)
ctrlUUID = _TI_UUID(0xAA82)
sensorOn = None
GYRO_XYZ = 7
ACCEL_XYZ = 7 << 3
MAG_XYZ = 1 << 6
ACCEL_RANGE_2G = 0 << 8
ACCEL_RANGE_4G = 1 << 8
ACCEL_RANGE_8G = 2 << 8
ACCEL_RANGE_16G = 3 << 8
def __init__(self, periph):
SensorBase.__init__(self, periph)
self.ctrlBits = 0
def enable(self, bits):
SensorBase.enable(self)
self.ctrlBits |= bits
self.ctrl.write( struct.pack("<H", self.ctrlBits) )
def disable(self, bits):
self.ctrlBits &= ~bits
self.ctrl.write( struct.pack("<H", self.ctrlBits) )
def rawRead(self):
dval = self.data.read()
return struct.unpack("<hhhhhhhhh", dval)
class AccelerometerSensorMPU9250:
def __init__(self, sensor_):
self.sensor = sensor_
self.bits = self.sensor.ACCEL_XYZ | self.sensor.ACCEL_RANGE_4G
self.scale = 8.0/32768.0 # TODO: why not 4.0, as documented?
def enable(self):
self.sensor.enable(self.bits)
def disable(self):
self.sensor.disable(self.bits)
def read(self):
'''Returns (x_accel, y_accel, z_accel) in units of g'''
rawVals = self.sensor.rawRead()[3:6]
return tuple([ v*self.scale for v in rawVals ])
class HumiditySensor(SensorBase):
svcUUID = _TI_UUID(0xAA20)
dataUUID = _TI_UUID(0xAA21)
ctrlUUID = _TI_UUID(0xAA22)
def __init__(self, periph):
SensorBase.__init__(self, periph)
def read(self):
'''Returns (ambient_temp, rel_humidity)'''
(rawT, rawH) = struct.unpack('<HH', self.data.read())
temp = -46.85 + 175.72 * (rawT / 65536.0)
RH = -6.0 + 125.0 * ((rawH & 0xFFFC)/65536.0)
return (temp, RH)
class HumiditySensorHDC1000(SensorBase):
svcUUID = _TI_UUID(0xAA20)
dataUUID = _TI_UUID(0xAA21)
ctrlUUID = _TI_UUID(0xAA22)
def __init__(self, periph):
SensorBase.__init__(self, periph)
def read(self):
'''Returns (ambient_temp, rel_humidity)'''
(rawT, rawH) = struct.unpack('<HH', self.data.read())
temp = -40.0 + 165.0 * (rawT / 65536.0)
RH = 100.0 * (rawH/65536.0)
return (temp, RH)
class MagnetometerSensor(SensorBase):
svcUUID = _TI_UUID(0xAA30)
dataUUID = _TI_UUID(0xAA31)
ctrlUUID = _TI_UUID(0xAA32)
def __init__(self, periph):
SensorBase.__init__(self, periph)
def read(self):
'''Returns (x, y, z) in uT units'''
x_y_z = struct.unpack('<hhh', self.data.read())
return tuple([ 1000.0 * (v/32768.0) for v in x_y_z ])
# Revisit - some absolute calibration is needed
class MagnetometerSensorMPU9250:
def __init__(self, sensor_):
self.sensor = sensor_
self.scale = 4912.0 / 32760
# Reference: MPU-9250 register map v1.4
def enable(self):
self.sensor.enable(self.sensor.MAG_XYZ)
def disable(self):
self.sensor.disable(self.sensor.MAG_XYZ)
def read(self):
'''Returns (x_mag, y_mag, z_mag) in units of uT'''
rawVals = self.sensor.rawRead()[6:9]
return tuple([ v*self.scale for v in rawVals ])
class BarometerSensor(SensorBase):
svcUUID = _TI_UUID(0xAA40)
dataUUID = _TI_UUID(0xAA41)
ctrlUUID = _TI_UUID(0xAA42)
calUUID = _TI_UUID(0xAA43)
sensorOn = None
def __init__(self, periph):
SensorBase.__init__(self, periph)
def enable(self):
SensorBase.enable(self)
self.calChr = self.service.getCharacteristics(self.calUUID) [0]
# Read calibration data
self.ctrl.write( struct.pack("B", 0x02), True )
(c1,c2,c3,c4,c5,c6,c7,c8) = struct.unpack("<HHHHhhhh", self.calChr.read())
self.c1_s = c1/float(1 << 24)
self.c2_s = c2/float(1 << 10)
self.sensPoly = [ c3/1.0, c4/float(1 << 17), c5/float(1<<34) ]
self.offsPoly = [ c6*float(1<<14), c7/8.0, c8/float(1<<19) ]
self.ctrl.write( struct.pack("B", 0x01), True )
def read(self):
'''Returns (ambient_temp, pressure_millibars)'''
(rawT, rawP) = struct.unpack('<hH', self.data.read())
temp = (self.c1_s * rawT) + self.c2_s
sens = calcPoly( self.sensPoly, float(rawT) )
offs = calcPoly( self.offsPoly, float(rawT) )
pres = (sens * rawP + offs) / (100.0 * float(1<<14))
return (temp,pres)
class BarometerSensorBMP280(SensorBase):
svcUUID = _TI_UUID(0xAA40)
dataUUID = _TI_UUID(0xAA41)
ctrlUUID = _TI_UUID(0xAA42)
def __init__(self, periph):
SensorBase.__init__(self, periph)
def read(self):
(tL,tM,tH,pL,pM,pH) = struct.unpack('<BBBBBB', self.data.read())
temp = (tH*65536 + tM*256 + tL) / 100.0
press = (pH*65536 + pM*256 + pL) / 100.0
return (temp, press)
class GyroscopeSensor(SensorBase):
svcUUID = _TI_UUID(0xAA50)
dataUUID = _TI_UUID(0xAA51)
ctrlUUID = _TI_UUID(0xAA52)
sensorOn = struct.pack("B",0x07)
def __init__(self, periph):
SensorBase.__init__(self, periph)
def read(self):
'''Returns (x,y,z) rate in deg/sec'''
x_y_z = struct.unpack('<hhh', self.data.read())
return tuple([ 250.0 * (v/32768.0) for v in x_y_z ])
class GyroscopeSensorMPU9250:
def __init__(self, sensor_):
self.sensor = sensor_
self.scale = 500.0/65536.0
def enable(self):
self.sensor.enable(self.sensor.GYRO_XYZ)
def disable(self):
self.sensor.disable(self.sensor.GYRO_XYZ)
def read(self):
'''Returns (x_gyro, y_gyro, z_gyro) in units of degrees/sec'''
rawVals = self.sensor.rawRead()[0:3]
return tuple([ v*self.scale for v in rawVals ])
class KeypressSensor(SensorBase):
svcUUID = UUID(0xFFE0)
dataUUID = UUID(0xFFE1)
ctrlUUID = None
sensorOn = None
def __init__(self, periph):
SensorBase.__init__(self, periph)
def enable(self):
SensorBase.enable(self)
self.char_descr = self.service.getDescriptors(forUUID=0x2902)[0]
self.char_descr.write(struct.pack('<bb', 0x01, 0x00), True)
def disable(self):
self.char_descr.write(struct.pack('<bb', 0x00, 0x00), True)
class OpticalSensorOPT3001(SensorBase):
svcUUID = _TI_UUID(0xAA70)
dataUUID = _TI_UUID(0xAA71)
ctrlUUID = _TI_UUID(0xAA72)
def __init__(self, periph):
SensorBase.__init__(self, periph)
def read(self):
'''Returns value in lux'''
raw = struct.unpack('<h', self.data.read()) [0]
m = raw & 0xFFF;
e = (raw & 0xF000) >> 12;
return 0.01 * (m << e)
class BatterySensor(SensorBase):
svcUUID = UUID("0000180f-0000-1000-8000-00805f9b34fb")
dataUUID = UUID("00002a19-0000-1000-8000-00805f9b34fb")
ctrlUUID = None
sensorOn = None
def __init__(self, periph):
SensorBase.__init__(self, periph)
def read(self):
'''Returns the battery level in percent'''
val = ord(self.data.read())
return val
class SensorTag(Peripheral):
def __init__(self,addr,version=AUTODETECT):
Peripheral.__init__(self,addr)
if version==AUTODETECT:
svcs = self.discoverServices()
if _TI_UUID(0xAA70) in svcs:
version = SENSORTAG_2650
else:
version = SENSORTAG_V1
fwVers = self.getCharacteristics(uuid=AssignedNumbers.firmwareRevisionString)
if len(fwVers) >= 1:
self.firmwareVersion = fwVers[0].read().decode("utf-8")
else:
self.firmwareVersion = u''
if version==SENSORTAG_V1:
self.IRtemperature = IRTemperatureSensor(self)
self.accelerometer = AccelerometerSensor(self)
self.humidity = HumiditySensor(self)
self.magnetometer = MagnetometerSensor(self)
self.barometer = BarometerSensor(self)
self.gyroscope = GyroscopeSensor(self)
self.keypress = KeypressSensor(self)
self.lightmeter = None
elif version==SENSORTAG_2650:
self._mpu9250 = MovementSensorMPU9250(self)
self.IRtemperature = IRTemperatureSensorTMP007(self)
self.accelerometer = AccelerometerSensorMPU9250(self._mpu9250)
self.humidity = HumiditySensorHDC1000(self)
self.magnetometer = MagnetometerSensorMPU9250(self._mpu9250)
self.barometer = BarometerSensorBMP280(self)
self.gyroscope = GyroscopeSensorMPU9250(self._mpu9250)
self.keypress = KeypressSensor(self)
self.lightmeter = OpticalSensorOPT3001(self)
self.battery = BatterySensor(self)
class KeypressDelegate(DefaultDelegate):
BUTTON_L = 0x02
BUTTON_R = 0x01
ALL_BUTTONS = (BUTTON_L | BUTTON_R)
_button_desc = {
BUTTON_L : "Left button",
BUTTON_R : "Right button",
ALL_BUTTONS : "Both buttons"
}
def __init__(self):
DefaultDelegate.__init__(self)
self.lastVal = 0
def handleNotification(self, hnd, data):
# NB: only one source of notifications at present
# so we can ignore 'hnd'.
val = struct.unpack("B", data)[0]
down = (val & ~self.lastVal) & self.ALL_BUTTONS
if down != 0:
self.onButtonDown(down)
up = (~val & self.lastVal) & self.ALL_BUTTONS
if up != 0:
self.onButtonUp(up)
self.lastVal = val
def onButtonUp(self, but):
print ( "** " + self._button_desc[but] + " UP")
def onButtonDown(self, but):
print ( "** " + self._button_desc[but] + " DOWN")
def main():
import time
import sys
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('host', action='store',help='MAC of BT device')
parser.add_argument('-n', action='store', dest='count', default=0,
type=int, help="Number of times to loop data")
parser.add_argument('-t',action='store',type=float, default=5.0, help='time between polling')
parser.add_argument('-T','--temperature', action="store_true",default=False)
parser.add_argument('-A','--accelerometer', action='store_true',
default=False)
parser.add_argument('-H','--humidity', action='store_true', default=False)
parser.add_argument('-M','--magnetometer', action='store_true',
default=False)
parser.add_argument('-B','--barometer', action='store_true', default=False)
parser.add_argument('-G','--gyroscope', action='store_true', default=False)
parser.add_argument('-K','--keypress', action='store_true', default=False)
parser.add_argument('-L','--light', action='store_true', default=False)
parser.add_argument('-P','--battery', action='store_true', default=False)
parser.add_argument('--all', action='store_true', default=False)
arg = parser.parse_args(sys.argv[1:])
print('Connecting to ' + arg.host)
tag = SensorTag(arg.host)
# Enabling selected sensors
if arg.temperature or arg.all:
tag.IRtemperature.enable()
if arg.humidity or arg.all:
tag.humidity.enable()
if arg.barometer or arg.all:
tag.barometer.enable()
if arg.accelerometer or arg.all:
tag.accelerometer.enable()
if arg.magnetometer or arg.all:
tag.magnetometer.enable()
if arg.gyroscope or arg.all:
tag.gyroscope.enable()
if arg.battery or arg.all:
tag.battery.enable()
if arg.keypress or arg.all:
tag.keypress.enable()
tag.setDelegate(KeypressDelegate())
if arg.light and tag.lightmeter is None:
print("Warning: no lightmeter on this device")
if (arg.light or arg.all) and tag.lightmeter is not None:
tag.lightmeter.enable()
# Some sensors (e.g., temperature, accelerometer) need some time for initialization.
# Not waiting here after enabling a sensor, the first read value might be empty or incorrect.
time.sleep(1.0)
counter=1
while True:
if arg.temperature or arg.all:
print('Temp: ', tag.IRtemperature.read())
if arg.humidity or arg.all:
print("Humidity: ", tag.humidity.read())
if arg.barometer or arg.all:
print("Barometer: ", tag.barometer.read())
if arg.accelerometer or arg.all:
print("Accelerometer: ", tag.accelerometer.read())
if arg.magnetometer or arg.all:
print("Magnetometer: ", tag.magnetometer.read())
if arg.gyroscope or arg.all:
print("Gyroscope: ", tag.gyroscope.read())
if (arg.light or arg.all) and tag.lightmeter is not None:
print("Light: ", tag.lightmeter.read())
if arg.battery or arg.all:
print("Battery: ", tag.battery.read())
if counter >= arg.count and arg.count != 0:
break
counter += 1
tag.waitForNotifications(arg.t)
tag.disconnect()
del tag
if __name__ == "__main__":
main()
@Transistor15
Copy link

hi, am working on something related to this project, would you mind we discus this ? hopping to here from you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment