Last active
December 17, 2020 12:28
-
-
Save kimurakhs/b19d2c0a6eed9b7501cc44d9dbe18598 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from bluepy.sensortag import SensorTag | |
from bluepy.btle import BTLEDisconnectError | |
import influxdb | |
import schedule | |
import datetime | |
import sys, time | |
SENSORTAG_MAC_ADDRESS = '54:6C:0E:79:13:06' # Change to your SensorTag MAC Address | |
# Connect to SensorTag | |
print('Connecting to ' + SENSORTAG_MAC_ADDRESS) | |
stag = SensorTag(SENSORTAG_MAC_ADDRESS) | |
print('Connected.') | |
db = influxdb.InfluxDBClient( | |
host='localhost', | |
port=8086, | |
database='sensortag' | |
) | |
def get_data(stag): | |
# Turn on sensors | |
stag.humidity.enable() | |
stag.barometer.enable() | |
stag.lightmeter.enable() | |
stag.battery.enable() | |
# Waiting for sensor initialization | |
time.sleep(1.0) | |
# Read sensor value | |
data = {} | |
humid_sensor_value = stag.humidity.read() | |
data['temperature'] = humid_sensor_value[0] | |
data['humidity'] = humid_sensor_value[1] | |
data['barometer'] = stag.barometer.read()[1] | |
data['lightmeter'] = stag.lightmeter.read() | |
data['battery'] = stag.battery.read() | |
# Turn off sensors | |
stag.humidity.disable() | |
stag.barometer.disable() | |
stag.lightmeter.disable() | |
stag.battery.disable() | |
return data | |
def write_to_influxdb(data): | |
json_body = [{ | |
'measurement': 'sensortag', | |
'tags': { | |
'macaddr': stag.addr.replace(':','') | |
}, | |
'time': datetime.datetime.utcnow(), | |
'fields': data | |
}] | |
db.write_points(json_body) | |
def on_minute(): | |
global stag | |
if stag is not None: | |
try: | |
data = get_data(stag) | |
print(data) | |
write_to_influxdb(data) | |
except BTLEDisconnectError: | |
# When disconnect, exit | |
sys.exit(1) | |
def main(): | |
schedule.every(1).minutes.do(on_minute) | |
while True: | |
schedule.run_pending() | |
time.sleep(1) | |
if __name__=='__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment