Skip to content

Instantly share code, notes, and snippets.

@troolee
Last active June 4, 2019 17:06
Show Gist options
  • Save troolee/06cea54f5ec8d35db7ce575cf801ecbf to your computer and use it in GitHub Desktop.
Save troolee/06cea54f5ec8d35db7ce575cf801ecbf to your computer and use it in GitHub Desktop.
# pylint: disable=E0401,E1002
import array
import functools
import dbus
import dbus.exceptions
import dbus.mainloop.glib
import dbus.service
import pprint
try:
from gi.repository import GObject as gobject
except ImportError:
import gobject
def log(msg):
if isinstance(msg, (dict, list)):
pprint.pprint(msg)
else:
print msg
DBusStringArray = functools.partial(dbus.Array, signature='s')
DBusByteStringArray = lambda x: dbus.Array(array.array('B', x), signature='y')
class DBusObjectBase(dbus.service.Object):
def __init__(self, bus, path):
super(DBusObjectBase, self).__init__(bus, path)
def get_path(self):
return dbus.ObjectPath(getattr(self, 'path'))
class Characteristic(DBusObjectBase):
def __init__(self, bus, index, uuid, flags, service):
self.path = service.path + '/characteristic/' + str(index)
self.bus = bus
self.uuid = uuid
self.service = service
self.flags = flags
self.descriptors = []
super(Characteristic, self).__init__(bus, self.path)
def get_properties(self):
return {
'org.bluez.GattCharacteristic1': {
'Service': self.service.get_path(),
'UUID': self.uuid,
'Flags': self.flags,
'Descriptors': dbus.Array(self.get_descriptor_paths(), signature='o')
}
}
def add_descriptor(self, descriptor):
self.descriptors.append(descriptor)
def get_descriptor_paths(self):
return [d.get_path() for d in self.descriptors]
@dbus.service.method('org.freedesktop.DBus.Properties', in_signature='s', out_signature='a{sv}')
def GetAll(self, interface):
return self.get_properties()[interface]
@dbus.service.method('org.bluez.GattCharacteristic1', in_signature='a{sv}', out_signature='ay')
def ReadValue(self, options):
log('??? Default ReadValue called, returning error')
@dbus.service.method('org.bluez.GattCharacteristic1', in_signature='aya{sv}')
def WriteValue(self, value, options):
log('??? Default WriteValue called, returning error')
@dbus.service.method('org.bluez.GattCharacteristic1')
def StartNotify(self):
log('??? Default StartNotify called, returning error')
@dbus.service.method('org.bluez.GattCharacteristic1')
def StopNotify(self):
log('??? Default StopNotify called, returning error')
@dbus.service.signal('org.freedesktop.DBus.Properties', signature='sa{sv}as')
def PropertiesChanged(self, interface, changed, invalidated):
log([interface, changed, invalidated])
log('PropertiesChanged')
class Descriptor(DBusObjectBase):
def __init__(self, bus, index, uuid, flags, characteristic):
self.path = characteristic.path + '/descriptor/' + str(index)
self.bus = bus
self.uuid = uuid
self.flags = flags
self.characteristic = characteristic
super(Descriptor, self).__init__(bus, self.path)
def get_properties(self):
return {
'org.bluez.GattDescriptor1': {
'Characteristic': self.characteristic.get_path(),
'UUID': self.uuid,
'Flags': self.flags,
}
}
@dbus.service.method('org.freedesktop.DBus.Properties', in_signature='s', out_signature='a{sv}')
def GetAll(self, interface):
return self.get_properties()[interface]
@dbus.service.method('org.freedesktop.DBus.Properties', in_signature='a{sv}', out_signature='ay')
def ReadValue(self, options):
log('??? Default ReadValue called, returning error')
@dbus.service.method('org.freedesktop.DBus.Properties', in_signature='aya{sv}')
def WriteValue(self, value, options):
log('??? Default WriteValue called, returning error')
class ZeyeAdvertisement(DBusObjectBase):
def __init__(self, bus):
self.bus = bus
self.path = '/ws/zeye/bluez/advertisement/0'
# self.service_uuids = DBusStringArray([
# 'd9c3ab90-9f18-4fd8-b05f-93384e6b4988',
# ])
# self.manufacturer_data = dbus.Dictionary({
# 0xffff: DBusByteStringArray('Zeye'),
# }, signature='qv')
# self.service_data = dbus.Dictionary({
# '9999': DBusByteStringArray('Test'),
# }, signature='sv')
# self.include_tx_power = True
super(ZeyeAdvertisement, self).__init__(bus, self.path)
def get_properties(self):
return {
'org.bluez.LEAdvertisement1': {
'Type': 'peripheral',
# 'ServiceUUIDs': self.service_uuids,
# 'ManufacturerData': self.manufacturer_data,
# 'ServiceData': self.service_data,
# 'IncludeTxPower': self.include_tx_power,
},
}
@dbus.service.method('org.freedesktop.DBus.Properties', in_signature='s', out_signature='a{sv}')
def GetAll(self, interface):
log('Getting advertisement parameters (%s)...' % interface)
return self.get_properties().get(interface)
@dbus.service.method('org.bluez.LEAdvertisement1', in_signature='', out_signature='')
def Release(self):
log('%s: Released!' % self.path)
class ZeyeApplication(DBusObjectBase):
def __init__(self, bus):
self.path = '/ws/zeye/bluez/application/0'
super(ZeyeApplication, self).__init__(bus, self.path)
self.services = [
ZeyeConfigService(bus),
]
@dbus.service.method('org.freedesktop.DBus.ObjectManager', out_signature='a{oa{sa{sv}}}')
def GetManagedObjects(self):
log('ZeyeApplication.GetManagedObjects')
response = {}
for service in self.services:
response[service.get_path()] = service.get_properties()
for c in service.characteristics:
response[c.get_path()] = c.get_properties()
for d in c.descriptors:
response[d.get_path()] = d.get_properties()
return response
class ZeyeConfigService(DBusObjectBase):
def __init__(self, bus):
self.path = '/ws/zeye/bluez/service/0'
self.uuid = '2e560001-f981-11e6-bc64-92361f002671'
super(ZeyeConfigService, self).__init__(bus, self.path)
self.bus = bus
self.primary = True
self.characteristics = [
TestCharacteristic(bus, 0, self)
]
def get_properties(self):
return {
'org.bluez.GattService1': {
'UUID': self.uuid,
'Primary': self.primary,
'Characteristics': dbus.Array(
[c.get_path() for c in self.characteristics],
signature='o')
}
}
@dbus.service.method('org.freedesktop.DBus.Properties', in_signature='s', out_signature='a{sv}')
def GetAll(self, interface):
log('ZeyeConfigService.GetAll')
return self.get_properties()[interface]
class TestCharacteristic(Characteristic):
variants = [
'brennan',
'aristoteles',
'vedic',
'zoanthropy',
'mercurous',
'solferino',
'newshen',
'zebrawood',
'frumentaceous',
'malmdy',
]
def __init__(self, bus, index, service):
Characteristic.__init__(
self,
bus,
index,
'2e560002-f981-11e6-bc64-92361f002671',
['read', 'notify'], # 'write', 'writable-auxiliaries',
service)
self.value = 'Hello'
self.notifying = False
gobject.timeout_add(500, self.update_value)
def update_value(self):
import random
self.value = self.variants[random.randint(0, 1000) % len(self.variants)]
self.notify_value()
return True
def ReadValue(self, options):
log('TestCharacteristic Read: ' + repr(self.value))
return DBusByteStringArray(self.value)
def StartNotify(self):
if self.notifying:
return
log('Start notifications')
self.notifying = True
self.notify_value()
def StopNotify(self):
if not self.notifying:
return
log('Stop notifications')
self.notifying = False
def notify_value(self):
if not self.notifying:
return
log('Notify Value Change: ' + repr(self.value))
self.PropertiesChanged('org.bluez.GattCharacteristic1', {'Value': DBusByteStringArray(self.value)}, [])
def find_adapter(bus, supported_interfaces):
remote_om = dbus.Interface(bus.get_object(
'org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager')
objects = remote_om.GetManagedObjects()
for o, props in objects.items():
for interface in supported_interfaces:
if interface in props.keys():
return o
return None
def register_ad_cb():
log('Advertisement has been successfully registered.')
def register_ad_error_cb(error):
log('Failed to register advertisement: %s.' % error)
def register_app_cb():
log('GATT application registered.')
def register_app_error_cb(error):
log('Failed to register application: %s.' % error)
def main():
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
log('Getting BT adapter...')
adapter = find_adapter(
bus, ['org.bluez.LEAdvertisingManager1', 'org.bluez.GattManager1'])
bluez = bus.get_object('org.bluez', adapter)
adapter_props = dbus.Interface(bluez, 'org.freedesktop.DBus.Properties')
log('-' * 80)
log('Adapter : %s' % adapter_props.Get('org.bluez.Adapter1', 'Name'))
log('Address : %s' % adapter_props.Get('org.bluez.Adapter1', 'Address'))
log('-' * 80)
log('Setting power on...')
adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1))
mainloop = gobject.MainLoop()
log('Registering application...')
app = ZeyeApplication(bus)
service_manager = dbus.Interface(bluez, 'org.bluez.GattManager1')
service_manager.RegisterApplication(
app.get_path(),
{},
reply_handler=register_app_cb,
error_handler=register_app_error_cb, )
log('Registering advertisement...')
advertisement = ZeyeAdvertisement(bus)
ad_manager = dbus.Interface(bluez, 'org.bluez.LEAdvertisingManager1')
ad_manager.RegisterAdvertisement(
advertisement.get_path(),
{},
reply_handler=register_ad_cb,
error_handler=register_ad_error_cb, )
log('Starting main loop...')
try:
mainloop.run()
except KeyboardInterrupt:
log('\nShutting down...')
log('Unregistering application...')
service_manager.UnregisterApplication(
app.get_path(),
reply_handler=lambda: log('Advertisement has been successfully unregistered.'),
error_handler=lambda e: log('Failed to unregister advertisement: %s.' % e))
log('Unregistering advertisement...')
ad_manager.UnregisterAdvertisement(
advertisement.get_path(),
reply_handler=lambda: log('Application has been successfully unregistered.'),
error_handler=lambda e: log('Failed to unregister application: %s.' % e))
log('Bye.')
if __name__ == '__main__':
main()
@HDDmiller
Copy link

Pavel, thank you for the example. I'm new to BLE and python and your example helped me to send a byte array through the notification. There is very little documentation for that but after studying your code, I figured it out.

Thanks Again.

Mark

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment