Skip to content

Instantly share code, notes, and snippets.

@shish
Last active January 19, 2021 00:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shish/c754815f3712775efc6bbd6b616631b3 to your computer and use it in GitHub Desktop.
Save shish/c754815f3712775efc6bbd6b616631b3 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
from bluepy.btle import Scanner, DefaultDelegate, ScanEntry
import datetime
import paho.mqtt.client as mqtt
import json
import struct
from time import sleep
import sys
import argparse
import random
import logging
import urllib
from colors import color
log = logging.getLogger("ble2mqtt")
def caddr(addr):
return color(addr, fg=int(addr[-2:], 16))
service_to_parser = {}
class ScanDelegate(DefaultDelegate):
def __init__(self, args):
super().__init__()
self.args = args
def handleDiscovery(self, dev, isNewDev, isNewData):
# find all the possible UUIDs that this device is advertising under
services = [
str(x)
for x in (
(dev.getValue(ScanEntry.COMPLETE_16B_SERVICES) or [])
+ (dev.getValue(ScanEntry.INCOMPLETE_16B_SERVICES) or [])
+ (dev.getValue(ScanEntry.COMPLETE_32B_SERVICES) or [])
+ (dev.getValue(ScanEntry.INCOMPLETE_32B_SERVICES) or [])
+ (dev.getValue(ScanEntry.COMPLETE_128B_SERVICES) or [])
+ (dev.getValue(ScanEntry.INCOMPLETE_128B_SERVICES) or [])
)
]
data = None
# SwitchBot
if "cba20d00-224d-11e6-9fb8-0002a5d5c51b" in services:
data = parse_switchbot(dev)
# ThermoBeacon
elif "0000fff0-0000-1000-8000-00805f9b34fb" in services:
data = parse_thermobeacon(dev)
# UniFi Cloud Key Gen2
elif "45caadb4-6de8-4466-9680-313f1a692594" in services:
# What data is this thing broadcasting??
pass
else:
log.debug(
"{}: Unknown beacon ({}dB): {} / {}".format(
caddr(dev.addr),
dev.rssi,
repr(dev.getScanData()),
)
)
if data:
data["time"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
data["signal_strength"] = dev.rssi
msg_data = json.dumps(data)
log.info(caddr(dev.addr) + ": " + msg_data)
if self.args.mqtt:
url = urllib.parse.urlparse(self.args.mqtt)
c = mqtt.Client("ble2mqtt-%08X" % random.randint(0, 2 ** 32))
if url.username and url.password:
c.username_pw_set(url.username, url.password)
c.connect(url.hostname, url.port or 1883)
c.publish(self.args.topic + "/" + dev.addr, msg_data, 1)
def parse_switchbot(dev):
"""
Decoding taken from sbm2mqtt
"""
binvalue = dev.getValue(ScanEntry.SERVICE_DATA_16B)
# Check for model "T" (54) in 16b service data
if not binvalue or len(binvalue) != 8 or binvalue[2] != 0x54:
return
# Get temperature and related characteristics
# Absolute value of temp
temperature = (binvalue[6] & 0b01111111) + ((binvalue[5] & 0b00001111) / 10)
if not (binvalue[6] & 0b10000000): # Is temp negative?
temperature = -temperature
if not (binvalue[7] & 0b10000000): # C or F?
temp_scale = "C"
else:
temp_scale = "F"
# Convert to F
temperature = round(temperature * 1.8 + 32, 1)
# Get other info
humidity = binvalue[7] & 0b01111111
battery = binvalue[4] & 0b01111111
return {
"temperature": temperature,
"humidity": humidity,
"battery": battery,
"temperature_scale": temp_scale,
}
def parse_thermobeacon(dev):
data = dev.getValue(ScanEntry.MANUFACTURER)
header = data[0:10]
if header != b"\x10\x00\x00\x00\x3d\x0c\x00\x00\x0d\x02":
log.warning("%s: Surprise ThermoBeacon header:", caddr(dev.addr), header)
unknown_1 = data[10]
if data[11] == 0x01:
a, b, c, d, e = struct.unpack("<hhhhh", data[12:22])
return {
"unknown_1": unknown_1,
"unknown_2": a,
"unknown_3": b,
"min_temperature:": c / 16,
"unknown_5": d,
"unknown_6": e,
}
elif data[11] == 0x0B:
a, b, c = struct.unpack("<hhI", data[12:20])
return {
"unknown_1": unknown_1,
"temperature": a / 16,
"humidity": b / 16,
"uptime": c, # number of seconds since device reset
}
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--mqtt", help="MQTT URL, eg mqtt://ble:pass@homeassistant:1234/"
)
parser.add_argument(
"--verbose", default=False, action="store_true", help="More detailed logs"
)
parser.add_argument("--topic", help="Root topic to publish to", default="ble2mqtt")
args = parser.parse_args()
logging.basicConfig(
format="%(asctime)-15s %(levelname)4.4s %(message)s",
level=logging.DEBUG if args.verbose else logging.WARNING,
)
scanner = Scanner().withDelegate(ScanDelegate(args))
while True:
try:
scanner.scan(60.0)
except Exception as e:
log.exception(e)
sleep(10)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment