Skip to content

Instantly share code, notes, and snippets.

@lucahammer
Created August 20, 2023 14:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lucahammer/24a60aadb16a53c200f2b9174b441fef to your computer and use it in GitHub Desktop.
Save lucahammer/24a60aadb16a53c200f2b9174b441fef to your computer and use it in GitHub Desktop.
Example of capturing data from a Xiaomi Mijia MCCGQ02HL Window/Door Sensor 2 and sending it via MQTT to Home Assistant.
# https://github.com/Ernst79/bleparser
# https://gist.github.com/freol35241/48ca1d5e388b6892990dfd7fd6b3d2a2
# Adapted by https://social.luca.run/@luca
# MIT License
# Copyright (c) 2021 freol35241
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# Requires:
# - aioblescan
# - bleparser
# - paho-mqtt
import json
import asyncio
from textwrap import wrap
from collections import defaultdict
import aioblescan as aiobs
from bleparser import BleParser
import paho.mqtt.client as mqtt
SENSORS = [
'xx:xx:xx:xx:xx:xx',
'xx:xx:xx:xx:xx:xx'
]
MQTT_HOST = "xxx.xxx.xxx.xxx"
MQTT_PORT = 1883
MQTT_USER = ""
MQTT_PASS = ""
MQTT_SENSOR_BASE_TOPIC = "homeassistant"
# Setup MQTT connection
client = mqtt.Client()
# client.username_pw_set(MQTT_USER, MQTT_PASS)
client.connect(MQTT_HOST, MQTT_PORT)
client.loop_start() # Will handle reconnections automatically
# Setup parser
parser = BleParser(
report_unknown=False,
discovery=False,
filter_duplicates=True,
sensor_whitelist=[bytes.fromhex(
mac.replace(":", "").lower()) for mac in SENSORS],
aeskeys={b'\xe4\xaa\xecS\xbaN': b'~\x1d\xe6\xb9\\\x95\xe7\x85\xed\x13\xb0\x83\x9c\xb3*\x00',
b'\xe4\xaa\xecc4\xcf': b'\xc9\x8f>\x8c\\n\xec;1\xb58\xad\xc7c\xce\x00'} # bytes.fromhex('xxx')
)
SENSOR_BUFFER = defaultdict(dict)
# Define callback
def process_hci_events(data):
sensor_data, tracker_data = parser.parse_data(data)
if sensor_data:
mac = ':'.join(wrap(sensor_data.pop("mac"), 2))
print(mac)
print(sensor_data)
old = SENSOR_BUFFER[mac]
new = SENSOR_BUFFER[mac] = {**old, **sensor_data}
if set(new.keys()) == set(old.keys()):
# Buffer filled, lets publish!
mac = mac.replace(":", "").lower()
client.publish(f"ble/{mac}", json.dumps(new))
def publish_ha_discovery(mac, type, unit_of_measurement, icon, device_class=''):
if device_class == '':
device_class = type
ha_topic = f'{MQTT_SENSOR_BASE_TOPIC}/sensor/{mac}/{type}/config'
config_msg = {
'name': type,
'unique_id': f'{mac}_{type}',
'state_topic': f"ble/{mac}",
'value_template': "{{value_json."+type+"}}",
'ic': icon,
'device': {
'identifiers': mac,
'name': mac,
'mf': 'Xiaomi Mijia',
'mdl': 'MCCGQ02HL'
}
}
if unit_of_measurement:
config_msg['unit_of_measurement'] = unit_of_measurement
config_msg['device_class'] = device_class
client.publish(ha_topic, json.dumps(config_msg), retain=True, qos=0)
def ha_discovery():
for mac_address in SENSORS:
mac = mac_address.replace(":", "").lower()
publish_ha_discovery(mac, 'status', None,
"mdi:door")
publish_ha_discovery(mac, 'light', None,
"mdi:home-lightbulb-outline")
publish_ha_discovery(mac, 'battery', "%", "mdi:battery-50")
publish_ha_discovery(mac, 'rssi', "dBm",
"mdi:signal-variant", 'signal_strength')
# Tell Home Assistant about the sensors
ha_discovery()
# Get everything connected
loop = asyncio.get_event_loop()
# Setup socket and controller
socket = aiobs.create_bt_socket(0)
fac = getattr(loop, "_create_connection_transport")(
socket, aiobs.BLEScanRequester, None, None)
conn, btctrl = loop.run_until_complete(fac)
# Attach callback
btctrl.process = process_hci_events
loop.run_until_complete(btctrl.send_scan_request(0))
# Run forever
loop.run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment