Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
from bluepy.sensortag import SensorTag
from bluepy.btle import BTLEDisconnectError
import influxdb
import schedule
import datetime
import sys, time
SENSORTAG_MAC_ADDRESS = '54:6C:0E:79:13:06' # Change to your SensorTag MAC Address
# Connect to SensorTag
print('Connecting to ' + SENSORTAG_MAC_ADDRESS)
stag = SensorTag(SENSORTAG_MAC_ADDRESS)
print('Connected.')
db = influxdb.InfluxDBClient(
host='localhost',
port=8086,
database='sensortag'
)
def get_data(stag):
# Turn on sensors
stag.humidity.enable()
stag.barometer.enable()
stag.lightmeter.enable()
stag.battery.enable()
# Waiting for sensor initialization
time.sleep(1.0)
# Read sensor value
data = {}
humid_sensor_value = stag.humidity.read()
data['temperature'] = humid_sensor_value[0]
data['humidity'] = humid_sensor_value[1]
data['barometer'] = stag.barometer.read()[1]
data['lightmeter'] = stag.lightmeter.read()
data['battery'] = stag.battery.read()
# Turn off sensors
stag.humidity.disable()
stag.barometer.disable()
stag.lightmeter.disable()
stag.battery.disable()
return data
def write_to_influxdb(data):
json_body = [{
'measurement': 'sensortag',
'tags': {
'macaddr': stag.addr.replace(':','')
},
'time': datetime.datetime.utcnow(),
'fields': data
}]
db.write_points(json_body)
def on_minute():
global stag
if stag is not None:
try:
data = get_data(stag)
print(data)
write_to_influxdb(data)
except BTLEDisconnectError:
# When disconnect, exit
sys.exit(1)
def main():
schedule.every(1).minutes.do(on_minute)
while True:
schedule.run_pending()
time.sleep(1)
if __name__=='__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment