Skip to content

Instantly share code, notes, and snippets.

@johnlpage
Last active March 4, 2022 13:02
Show Gist options
  • Save johnlpage/86214a12f2dbbc6a70ff02b5bdf7a23f to your computer and use it in GitHub Desktop.
Save johnlpage/86214a12f2dbbc6a70ff02b5bdf7a23f to your computer and use it in GitHub Desktop.
from bluepy.btle import Scanner, DefaultDelegate, Peripheral
from pprint import pprint
import time
import pymongo
import datetime
TEMPERATURE_SERVICE_UUID = "e95d6100-251d-470a-a062-fa1922dfa9a8"
TEMPERATURE_CHARACTERISTIC_UUID = "e95d9250-251d-470a-a062-fa1922dfa9a8"
class ScanDelegate(DefaultDelegate):
def __init__(self):
DefaultDelegate.__init__(self)
def handleDiscovery(self, dev, isNewDev, isNewData):
pass
with open("/home/pi/.mongo_uri.txt") as f:
uri = f.readline().strip()
mongoclient = pymongo.MongoClient(uri)
map = {
"c7:03:41:4f:6b:61": "Study",
"d4:7d:86:27:03:d2": "Landing",
"d4:26:d7:2d:dc:a7": "Utility",
"d1:a0:81:06:f5:23": "GamesRoom",
"dd:ec:c8:b2:0b:64": "Hallway",
"d0:27:86:a2:95:e1": "LivingRoom",
}
calibration = {
"c7:03:41:4f:6b:61": -3,
"d4:7d:86:27:03:d2": -4,
"d4:26:d7:2d:dc:a7": -5,
"d1:a0:81:06:f5:23": -3,
"dd:ec:c8:b2:0b:64": -5,
"d0:27:86:a2:95:e1": -6,
}
def send_temp_to_atlas(dev, temp):
mongoclient.energy.roomtemps.insert_one(
{
"date": datetime.datetime.now(),
"location": map.get(dev, "unknown"),
"temp": temp + calibration.get(dev, 0),
}
)
# BT comms can fail randomly
def read_temp(dev):
for a in range(5):
try:
microbit = Peripheral(dev)
print("Connected")
print("Getting Service Handle")
tempService = microbit.getServiceByUUID(TEMPERATURE_SERVICE_UUID)
print("Getting Characteristic Handle")
characteristics = tempService.getCharacteristics(
forUUID=TEMPERATURE_CHARACTERISTIC_UUID
)
print("Getting value")
temp = int.from_bytes(characteristics[0].read(), "big")
print(f"Device: {dev.addr} Temp: {temp}")
send_temp_to_atlas(dev.addr, temp)
return
except Exception as e:
print(e)
# Scan for any and all Bluetooth devices for 10 seconds.
scanner = Scanner().withDelegate(ScanDelegate())
devices = scanner.scan(10.0)
time.sleep(2)
for dev in devices:
# print("Device %s (%s), RSSI=%d dB" % (dev.addr, dev.addrType, dev.rssi))
for (adtype, desc, value) in dev.getScanData():
if value.startswith("BBC micro:bit"):
print(f"Microbit found {value}")
read_temp(dev)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment