Skip to content

Instantly share code, notes, and snippets.

@jannau
Created December 13, 2016 07:59
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save jannau/2e77b4379f6a0bacc7f518080ae4aee7 to your computer and use it in GitHub Desktop.
Save jannau/2e77b4379f6a0bacc7f518080ae4aee7 to your computer and use it in GitHub Desktop.
bluez gatt dbus example for eQ-3 BT Smart Thermostat
#!/usr/bin/env python3
import sys
from datetime import datetime
import dbus
try:
from gi.repository import GObject
except ImportError:
import gobject as GObject
from dbus.mainloop.glib import DBusGMainLoop
bus = None
mainloop = None
BLUEZ_SERVICE_NAME = 'org.bluez'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
BLUEZ_DEVICE_IFACE = 'org.bluez.Device1'
GATT_SERVICE_IFACE = 'org.bluez.GattService1'
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
EQ3BT_SERVICE_UUID = '3e135142-654f-9090-134a-a6ff5bb77046'
EQ3BT_WRITE_UUID = '3fa4585a-ce4a-3bad-db4b-b8df8179ea09'
EQ3BT_NOTIFY_UUID = 'd0e8434d-cd29-0996-af41-6c90f4e0eb2a'
# The objects that we interact with.
eq3bt_service = None
eq3bt_write_chrc = None
eq3bt_notify_chrc = None
def generic_error_cb(error):
print('D-Bus call failed: ' + str(error))
mainloop.quit()
def eq3bt_notify_reply_cb():
print('eq3 bt notifications enabled')
def eq3bt_write_reply_cb():
print('eq3 bt write')
def eq3bt_changed_cb(iface, changed_props, invalidated_props):
if iface != GATT_CHRC_IFACE:
return
if not len(changed_props):
return
value = changed_props.get('Value', None)
if not value or len(value) < 1:
return
print('New Notify: len {:d}, frame type: 0x{:02x}'.format(len(value),
value[0]))
if value[0] == 2 and len(value) >= 6:
print('eQ-3 BT status: mode: 0x{:02x}, valve state: {:d}, temp: {:.1f}'\
.format(value[2], value[3], value[5] / 2.0))
def start_client():
eq3bt_prop_iface = dbus.Interface(eq3bt_notify_chrc[0], DBUS_PROP_IFACE)
eq3bt_prop_iface.connect_to_signal("PropertiesChanged",
eq3bt_changed_cb)
eq3bt_notify_chrc[0].StartNotify(reply_handler=eq3bt_notify_reply_cb,
error_handler=generic_error_cb,
dbus_interface=GATT_CHRC_IFACE)
time = datetime.now()
value = []
value.append(dbus.Byte(0x03))
value.append(dbus.Byte(time.year % 100))
value.append(dbus.Byte(time.month))
value.append(dbus.Byte(time.day))
value.append(dbus.Byte(time.hour))
value.append(dbus.Byte(time.minute))
value.append(dbus.Byte(time.second))
eq3bt_write_chrc[0].WriteValue(value, {},
reply_handler=eq3bt_write_reply_cb,
error_handler=generic_error_cb,
dbus_interface=GATT_CHRC_IFACE)
def process_chrc(chrc_path):
chrc = bus.get_object(BLUEZ_SERVICE_NAME, chrc_path)
chrc_props = chrc.GetAll(GATT_CHRC_IFACE,
dbus_interface=DBUS_PROP_IFACE)
uuid = chrc_props['UUID']
if uuid == EQ3BT_WRITE_UUID:
global eq3bt_write_chrc
eq3bt_write_chrc = (chrc, chrc_props)
elif uuid == EQ3BT_NOTIFY_UUID:
global eq3bt_notify_chrc
eq3bt_notify_chrc = (chrc, chrc_props)
return True
def process_eq3bt_service(service_path, chrc_paths):
service = bus.get_object(BLUEZ_SERVICE_NAME, service_path)
service_props = service.GetAll(GATT_SERVICE_IFACE,
dbus_interface=DBUS_PROP_IFACE)
uuid = service_props['UUID']
if uuid != EQ3BT_SERVICE_UUID:
return False
print('eQ-3 BT Service found: ' + service_path)
# Process the characteristics.
for chrc_path in chrc_paths:
process_chrc(chrc_path)
global eq3bt_service
eq3bt_service = (service, service_props, service_path)
eq3bt_dev = bus.get_object(BLUEZ_SERVICE_NAME, service_props['Device'])
# connect the device
eq3bt_dev.Connect(dbus_interface=BLUEZ_DEVICE_IFACE)
return True
def interfaces_removed_cb(object_path, interfaces):
if not eq3bt_service:
return
if object_path == eq3bt_service[2]:
print('Service was removed')
mainloop.quit()
def main():
# Set up the main loop.
DBusGMainLoop(set_as_default=True)
global bus
bus = dbus.SystemBus()
global mainloop
mainloop = GObject.MainLoop()
om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
om.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
print('Getting objects...')
objects = om.GetManagedObjects()
chrcs = []
# List characteristics found
for path, interfaces in objects.items():
if GATT_CHRC_IFACE not in interfaces.keys():
continue
chrcs.append(path)
# List sevices found
for path, interfaces in objects.items():
if GATT_SERVICE_IFACE not in interfaces.keys():
continue
chrc_paths = [d for d in chrcs if d.startswith(path + "/")]
if process_eq3bt_service(path, chrc_paths):
break
if not eq3bt_service:
print('No eQ-3 BT Smart Thermostat Service found')
sys.exit(1)
start_client()
mainloop.run()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment