Skip to content

Instantly share code, notes, and snippets.

@ergo70
Last active December 6, 2018 14:51
Show Gist options
  • Save ergo70/9ced3c07ec0181c91f6391a5562f56bb to your computer and use it in GitHub Desktop.
Save ergo70/9ced3c07ec0181c91f6391a5562f56bb to your computer and use it in GitHub Desktop.
import time
import struct
import os
import uuid
import json
from boto3 import resource
from boto3.dynamodb.conditions import Key
from bluepy.btle import UUID, Peripheral, Scanner
from decimal import Decimal
from datetime import datetime
from AWSIoTPythonSDK.core.greengrass.discovery.providers import DiscoveryInfoProvider
from AWSIoTPythonSDK.core.protocol.connection.cores import ProgressiveBackOffCore
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
from AWSIoTPythonSDK.exception.AWSIoTExceptions import DiscoveryInvalidRequestException
ENABLE = b'\x01'
DISABLE = b'\x00'
TEST = b'\x02'
lux_data_uuid = UUID("f000aa71-0451-4000-b000-000000000000")
lux_enable_uuid = UUID("f000aa72-0451-4000-b000-000000000000")
temp_relH_data_uuid = UUID("f000aa21-0451-4000-b000-000000000000")
temp_relH_enable_uuid = UUID("f000aa22-0451-4000-b000-000000000000")
POST_data_uuid = UUID("f000aa65-0451-4000-b000-000000000000")
POST_mode_uuid = UUID("f000aa66-0451-4000-b000-000000000000")
battery_level_uuid = UUID("2a19")
ACCESS_KEY_ID = os.environ['ACCESS_KEY_ID']
SECRET_ACCESS_KEY = os.environ['SECRET_ACCESS_KEY']
REGION = os.environ['AWS_REGION']
TABLE = os.environ['TABLE']
dynamodb_resource = resource('dynamodb', region_name=REGION, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY)
table = dynamodb_resource.Table(TABLE)
host = os.environ['ENDPOINT']
rootCAPath = os.environ['ROOT_CA_FILE']
certificatePath = os.environ['CERT_FILE']
privateKeyPath = os.environ['KEY_FILE']
clientId = os.environ['THING_NAME']
topic = 'foo/bar'
thingName = clientId
sleep_interval = int(os.environ['SLEEP'])
ble_interface = int(os.environ['BLE_INTERFACE'])
MAX_DISCOVERY_RETRIES = 10
GROUP_CA_PATH = "./groupCA/"
backOffCore = ProgressiveBackOffCore()
# Discover GGCs
discoveryInfoProvider = DiscoveryInfoProvider()
discoveryInfoProvider.configureEndpoint(host)
discoveryInfoProvider.configureCredentials(rootCAPath, certificatePath, privateKeyPath)
discoveryInfoProvider.configureTimeout(10) # 10 sec
# General message notification callback
def customOnMessage(message):
print('Received message on topic %s: %s\n' % (message.topic, message.payload))
def connectGG():
retryCount = MAX_DISCOVERY_RETRIES
discovered = False
groupCA = None
coreInfo = None
while retryCount != 0:
try:
discoveryInfo = discoveryInfoProvider.discover(thingName)
caList = discoveryInfo.getAllCas()
coreList = discoveryInfo.getAllCores()
# We only pick the first ca and core info
groupId, ca = caList[0]
coreInfo = coreList[0]
print("Discovered GGC: %s from Group: %s" % (coreInfo.coreThingArn, groupId))
print("Now we persist the connectivity/identity information...")
groupCA = GROUP_CA_PATH + groupId + "_CA_" + str(uuid.uuid4()) + ".crt"
if not os.path.exists(GROUP_CA_PATH):
os.makedirs(GROUP_CA_PATH)
groupCAFile = open(groupCA, "w")
groupCAFile.write(ca)
groupCAFile.close()
discovered = True
print("Now proceed to the connecting flow...")
break
except DiscoveryInvalidRequestException as e:
print("Invalid discovery request detected!")
print("Type: %s" % str(type(e)))
print("Error message: %s" % e.message)
print("Stopping...")
break
except BaseException as e:
print("Error in discovery!")
print("Type: %s" % str(type(e)))
print("Error message: %s" % e.message)
retryCount -= 1
print("\n%d/%d retries left\n" % (retryCount, MAX_DISCOVERY_RETRIES))
print("Backing off...\n")
backOffCore.backOff()
if not discovered:
print("Discovery failed after %d retries. Exiting...\n" % (MAX_DISCOVERY_RETRIES))
sys.exit(-1)
# Iterate through all connection options for the core and use the first successful one
myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId)
myAWSIoTMQTTClient.configureCredentials(groupCA, privateKeyPath, certificatePath)
myAWSIoTMQTTClient.onMessage = customOnMessage
connected = False
for connectivityInfo in coreInfo.connectivityInfoList:
currentHost = connectivityInfo.host
currentPort = connectivityInfo.port
print("Trying to connect to core at %s:%d" % (currentHost, currentPort))
myAWSIoTMQTTClient.configureEndpoint(currentHost, currentPort)
try:
myAWSIoTMQTTClient.connect()
connected = True
break
except BaseException as e:
print("Error in connect!")
print("Type: %s" % str(type(e)))
print("Error message: %s" % e.message)
if not connected:
print("Cannot connect to core %s. Exiting..." % coreInfo.coreThingArn)
sys.exit(-2)
return myAWSIoTMQTTClient
def detect(interface):
sensors = []
s = Scanner(interface)
s.scan()
devices = s.getDevices()
for dev in devices:
for (adtype, desc, value) in dev.getScanData():
if value == 'CC2650 SensorTag':
print (" %s = %s" % (desc, value))
print ("Device %s (%s), RSSI=%d dB" % (dev.addr, dev.addrType, dev.rssi))
sensors.append((dev.addr, dev.addrType))
#print(sensors)
return sensors
def connect(sensors):
connections = [Peripheral(sensor[0], sensor[1]) for sensor in sensors]
POST(connections)
for connection in connections:
ch = connection.getCharacteristics(uuid=lux_enable_uuid)[0]
ch.write(ENABLE, True)
ch = connection.getCharacteristics(uuid=temp_relH_enable_uuid)[0]
ch.write(ENABLE, True)
return connections
def POST(connections):
for connection in connections:
ch = connection.getCharacteristics(uuid=POST_mode_uuid)[0]
ch.write(TEST, True)
ch = connection.getCharacteristics(uuid=POST_data_uuid)[0]
if (ch.supportsRead()):
onebyte = ch.read()
if onebyte in [b'\x7e', b'\x7f']:
print("Sensor "+ connection.addr + " OK")
else:
print("Sensor "+ connection.addr +" fail: " + str(onebyte))
ch = connection.getCharacteristics(uuid=POST_mode_uuid)[0]
ch.write(DISABLE, True)
def disconnect(connections):
for connection in connections:
addr = connection.addr
connection.disconnect()
print("Sensor "+ addr + " disconnected")
def read_battery(connection):
p = connection
try:
ch = p.getCharacteristics(uuid=battery_level_uuid)[0]
#print(ch)
if (ch.supportsRead()):
#while 1:
onebyte = ch.read()
battery_level = struct.unpack('B',onebyte)[0]
print(str(battery_level) + ' %')
#time.sleep(1)
finally:
#p.disconnect()
pass
return battery_level
def read_lux(connection):
p = connection
try:
ch = p.getCharacteristics(uuid=lux_data_uuid)[0]
#print(ch)
if (ch.supportsRead()):
#while 1:
twobytes = ch.read()
rawdata = struct.unpack('H',twobytes)[0]
m = rawdata & 0x0FFF
e = (rawdata & 0xF000) >> 12
if e == 0:
e = 1
else:
e = 2 << (e - 1)
lux = round(m * (0.01 * e),1)
print(str(lux) + ' Lux')
#time.sleep(1)
finally:
#p.disconnect()
pass
return lux
def read_temp_relH(connection):
p = connection
try:
ch = p.getCharacteristics(uuid=temp_relH_data_uuid)[0]
#print(ch)
if (ch.supportsRead()):
#while 1:
fourbytes = ch.read()
#print(fourbytes)
rawdata = struct.unpack('HH',fourbytes)
#print(rawdata)
rawdata_temp = rawdata[0]
rawdata_relH = rawdata[1]
rawdata_relH &= 0b1111111111111100
temp = round((rawdata_temp / 65536.0) * 165.0 - 42.4,1)
print(str(temp) + ' °C')
relH = round((rawdata_relH / 65536.0) * 100.0,1)
print(str(relH) + ' % rH')
finally:
#p.disconnect()
pass
return temp, relH
def save_to_dynamoDB(ts, sensor_addr, lux, celsius, rH, battery):
col_dict = {"measurement": ts + '|' + sensor_addr, "sensor": sensor_addr, "ts": ts, "lux": Decimal(str(lux)), "celsius": Decimal(str(celsius)), "rH": Decimal(str(rH)), "battery": Decimal(str(battery))}
response = table.put_item(Item=col_dict)
print(response)
def save_to_GG(MQTTClient, ts, sensor_addr, lux, celsius, rH, battery):
col_dict = {"measurement" : {"sensor": sensor_addr, "ts": ts, "data": {"lux": lux, "celsius": celsius, "rH": rH, "battery": battery}}}
if MQTTClient:
messageJson = json.dumps(col_dict, sort_keys=True)
MQTTClient.publish(topic, messageJson, 0)
print('Published topic %s: %s\n' % (topic, messageJson))
try:
connections = connect(detect(ble_interface))
MQTTClient = connectGG()
while 1:
for connection in connections:
print("Reading sensor: "+str(connection.addr))
lux = read_lux(connection)
celsius, rH = read_temp_relH(connection)
battery = read_battery(connection)
ts = str(datetime.utcnow())
save_to_dynamoDB(ts, str(connection.addr), lux, celsius, rH, battery)
save_to_GG(MQTTClient, ts, str(connection.addr), lux, celsius, rH, battery)
time.sleep(sleep_interval)
finally:
disconnect(connections)
MQTTClient.disconnect()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment