-
-
Save shish/c754815f3712775efc6bbd6b616631b3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from bluepy.btle import Scanner, DefaultDelegate, ScanEntry | |
import datetime | |
import paho.mqtt.client as mqtt | |
import json | |
import struct | |
from time import sleep | |
import sys | |
import argparse | |
import random | |
import logging | |
import urllib | |
from colors import color | |
log = logging.getLogger("ble2mqtt") | |
def caddr(addr): | |
return color(addr, fg=int(addr[-2:], 16)) | |
service_to_parser = {} | |
class ScanDelegate(DefaultDelegate): | |
def __init__(self, args): | |
super().__init__() | |
self.args = args | |
def handleDiscovery(self, dev, isNewDev, isNewData): | |
# find all the possible UUIDs that this device is advertising under | |
services = [ | |
str(x) | |
for x in ( | |
(dev.getValue(ScanEntry.COMPLETE_16B_SERVICES) or []) | |
+ (dev.getValue(ScanEntry.INCOMPLETE_16B_SERVICES) or []) | |
+ (dev.getValue(ScanEntry.COMPLETE_32B_SERVICES) or []) | |
+ (dev.getValue(ScanEntry.INCOMPLETE_32B_SERVICES) or []) | |
+ (dev.getValue(ScanEntry.COMPLETE_128B_SERVICES) or []) | |
+ (dev.getValue(ScanEntry.INCOMPLETE_128B_SERVICES) or []) | |
) | |
] | |
data = None | |
# SwitchBot | |
if "cba20d00-224d-11e6-9fb8-0002a5d5c51b" in services: | |
data = parse_switchbot(dev) | |
# ThermoBeacon | |
elif "0000fff0-0000-1000-8000-00805f9b34fb" in services: | |
data = parse_thermobeacon(dev) | |
# UniFi Cloud Key Gen2 | |
elif "45caadb4-6de8-4466-9680-313f1a692594" in services: | |
# What data is this thing broadcasting?? | |
pass | |
else: | |
log.debug( | |
"{}: Unknown beacon ({}dB): {} / {}".format( | |
caddr(dev.addr), | |
dev.rssi, | |
repr(dev.getScanData()), | |
) | |
) | |
if data: | |
data["time"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
data["signal_strength"] = dev.rssi | |
msg_data = json.dumps(data) | |
log.info(caddr(dev.addr) + ": " + msg_data) | |
if self.args.mqtt: | |
url = urllib.parse.urlparse(self.args.mqtt) | |
c = mqtt.Client("ble2mqtt-%08X" % random.randint(0, 2 ** 32)) | |
if url.username and url.password: | |
c.username_pw_set(url.username, url.password) | |
c.connect(url.hostname, url.port or 1883) | |
c.publish(self.args.topic + "/" + dev.addr, msg_data, 1) | |
def parse_switchbot(dev): | |
""" | |
Decoding taken from sbm2mqtt | |
""" | |
binvalue = dev.getValue(ScanEntry.SERVICE_DATA_16B) | |
# Check for model "T" (54) in 16b service data | |
if not binvalue or len(binvalue) != 8 or binvalue[2] != 0x54: | |
return | |
# Get temperature and related characteristics | |
# Absolute value of temp | |
temperature = (binvalue[6] & 0b01111111) + ((binvalue[5] & 0b00001111) / 10) | |
if not (binvalue[6] & 0b10000000): # Is temp negative? | |
temperature = -temperature | |
if not (binvalue[7] & 0b10000000): # C or F? | |
temp_scale = "C" | |
else: | |
temp_scale = "F" | |
# Convert to F | |
temperature = round(temperature * 1.8 + 32, 1) | |
# Get other info | |
humidity = binvalue[7] & 0b01111111 | |
battery = binvalue[4] & 0b01111111 | |
return { | |
"temperature": temperature, | |
"humidity": humidity, | |
"battery": battery, | |
"temperature_scale": temp_scale, | |
} | |
def parse_thermobeacon(dev): | |
data = dev.getValue(ScanEntry.MANUFACTURER) | |
header = data[0:10] | |
if header != b"\x10\x00\x00\x00\x3d\x0c\x00\x00\x0d\x02": | |
log.warning("%s: Surprise ThermoBeacon header:", caddr(dev.addr), header) | |
unknown_1 = data[10] | |
if data[11] == 0x01: | |
a, b, c, d, e = struct.unpack("<hhhhh", data[12:22]) | |
return { | |
"unknown_1": unknown_1, | |
"unknown_2": a, | |
"unknown_3": b, | |
"min_temperature:": c / 16, | |
"unknown_5": d, | |
"unknown_6": e, | |
} | |
elif data[11] == 0x0B: | |
a, b, c = struct.unpack("<hhI", data[12:20]) | |
return { | |
"unknown_1": unknown_1, | |
"temperature": a / 16, | |
"humidity": b / 16, | |
"uptime": c, # number of seconds since device reset | |
} | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"--mqtt", help="MQTT URL, eg mqtt://ble:pass@homeassistant:1234/" | |
) | |
parser.add_argument( | |
"--verbose", default=False, action="store_true", help="More detailed logs" | |
) | |
parser.add_argument("--topic", help="Root topic to publish to", default="ble2mqtt") | |
args = parser.parse_args() | |
logging.basicConfig( | |
format="%(asctime)-15s %(levelname)4.4s %(message)s", | |
level=logging.DEBUG if args.verbose else logging.WARNING, | |
) | |
scanner = Scanner().withDelegate(ScanDelegate(args)) | |
while True: | |
try: | |
scanner.scan(60.0) | |
except Exception as e: | |
log.exception(e) | |
sleep(10) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment