Skip to content

Instantly share code, notes, and snippets.

@giorgib

giorgib/main.py Secret

Created February 12, 2024 10:24
Show Gist options
  • Save giorgib/a74f15e591979036c154fab5520b22c2 to your computer and use it in GitHub Desktop.
Save giorgib/a74f15e591979036c154fab5520b22c2 to your computer and use it in GitHub Desktop.
Sample code for reproducing BLE advertisement error on toradex AM62 modules
import asyncio
import logging
from typing import Collection
import dbus_next
from dbus_next.aio import MessageBus
from dbus_next.aio.proxy_object import ProxyObject
from dbus_next.constants import BusType
from dbus_next.errors import InvalidIntrospectionError
from dbus_next.service import ServiceInterface, method, dbus_property, PropertyAccess
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
BLUEZ_SERVICE_NAME = "org.bluez"
DBUS_PROPERTIES = "org.freedesktop.DBus.Properties"
DBUS_OM_IFACE = "org.freedesktop.DBus.ObjectManager"
ADAPTER_INTERFACE = BLUEZ_SERVICE_NAME + ".Adapter1"
DEVICE_INTERFACE = BLUEZ_SERVICE_NAME + ".Device1"
ADVERTISING_MANAGER_INTERFACE = BLUEZ_SERVICE_NAME + ".LEAdvertisingManager1"
bus = None
advertisement = None
class Adapter:
"""A bluetooth adapter."""
_INTERFACE = "org.bluez.Adapter1"
_adapter_interface = None
def __init__(self, proxy: ProxyObject):
self._proxy = proxy
self._adapter_interface = proxy.get_interface(self._INTERFACE)
async def get_address(self) -> str:
"""Read the bluetooth address of this device."""
return await self._adapter_interface.get_address()
async def get_name(self) -> str:
"""Read the bluetooth hostname of this system."""
return await self._adapter_interface.get_name()
async def get_alias(self) -> str:
"""The user friendly name of the device."""
return await self._adapter_interface.get_alias()
async def set_alias(self, val: str):
"""Set the user friendly name for this device.
Changing the device hostname directly is preferred.
Writing an empty string will result in the alias resetting to the device hostname.
"""
await self._adapter_interface.set_alias(val)
async def get_powered(self) -> bool:
"""Indicates if the adapter is on or off."""
return await self._adapter_interface.get_powered()
async def set_powered(self, val: bool):
"""Turn this adapter on or off."""
await self._adapter_interface.set_powered(val)
@classmethod
async def get_all(cls, bus: MessageBus) -> Collection["Adapter"]:
"""Get a list of available Bluetooth adapters.
Args:
bus: The message bus used to query bluez.
Returns:
A list of available bluetooth adapters.
"""
adapter_nodes = (await bus.introspect("org.bluez", "/org/bluez")).nodes
adapters = []
for node in adapter_nodes:
try:
introspection = await bus.introspect(
"org.bluez", "/org/bluez/" + node.name
)
proxy = bus.get_proxy_object(
"org.bluez", "/org/bluez/" + node.name, introspection
)
adapters.append(cls(proxy))
except InvalidIntrospectionError:
pass
return adapters
@classmethod
async def get_first(cls, bus: MessageBus) -> "Adapter":
"""Gets the first adapter listed by bluez.
Args:
bus: The bus to use for adapter discovery.
Raises:
ValueError: Raised when no bluetooth adapters are available.
Returns:
The resulting adapter.
"""
adapters = await cls.get_all(bus)
if len(adapters) > 0:
return adapters[0]
else:
raise ValueError("No bluetooth adapters could be found.")
class Advertisement(ServiceInterface):
"""
org.bluez.LEAdvertisement1 interface implementation
"""
PATH_BASE = "/org/bluez/test/advertisement"
def __init__(self):
self.index = 0
self.discoverable = True
self.includes = []
self.tx_power = 0
self.advertising_type = "peripheral"
self.local_name = "sample_local_name"
super().__init__("org.bluez.LEAdvertisement1")
def get_path(self):
return self.PATH_BASE + str(self.index)
async def register(self, bus: MessageBus, adapter: Adapter = None):
if adapter is None:
adapter = await Adapter.get_first(bus)
await self.unregister(bus)
self.index += 1
path = self.get_path()
logger.info(f"Registering advertisement with path: {path}")
try:
bus.export(path, self)
logger.info("Advertisement exported")
except:
logger.info("Advertisement already exported")
interface = adapter._proxy.get_interface(ADVERTISING_MANAGER_INTERFACE)
try:
await interface.call_register_advertisement(path, {})
logger.info(f"Advertisement registered with path: {path}")
except Exception as e:
logger.info(f"Error while registering advertisement: {e}")
async def unregister(self, bus: MessageBus, adapter: Adapter = None):
path = self.get_path()
logger.info(f"Unregistering advertisement with path: {path}")
if adapter is None:
adapter = await Adapter.get_first(bus)
interface = adapter._proxy.get_interface(ADVERTISING_MANAGER_INTERFACE)
try:
bus.export(path, self)
logger.info("Advertisement exported")
except:
logger.info("Advertisement already exported")
try:
await interface.call_unregister_advertisement(path)
logger.info("Advertisement unregistered")
except:
logger.info("Advertisement not registered yet")
try:
bus.unexport(path)
logger.info("Advertisement unexported")
except:
logger.info("Advertisement not exported yet")
logger.info(f"Advertisement unregistered with path: {path}")
@dbus_property(access=PropertyAccess.READ)
def Type(self) -> "s":
return self.advertising_type
@dbus_property()
def Discoverable(self) -> "b":
return self.discoverable
@Discoverable.setter
def Discoverable(self, val: "b"):
if self.discoverable == val:
return
self.discoverable = val
self.emit_properties_changed({"Discoverable": self.discoverable})
@dbus_property()
def Includes(self) -> "as":
return self.includes
@Includes.setter
def Includes(self, val: "as"):
if self.includes == val:
return
self.includes = val
self.emit_properties_changed({"Includes": self.includes})
@dbus_property()
def LocalName(self) -> "s":
return self.local_name
@LocalName.setter
def LocalName(self, val: "s"):
if self.local_name == val:
return
self.local_name = val
self.emit_properties_changed({"LocalName": self.local_name})
@dbus_property()
def TxPower(self) -> "n":
return self.tx_power
@TxPower.setter
def TxPower(self, val: "n"):
if self.tx_power == val:
return
self.tx_power = val
self.emit_properties_changed({"TxPower": self.tx_power})
@method()
def Release(self):
logger.info(f"{self.path} Released!")
async def interfaces_added(path, interfaces):
global bus
global advertisement
if DEVICE_INTERFACE in interfaces:
properties = interfaces[DEVICE_INTERFACE]
if "Connected" in properties:
logger.info(f"Device connected: {path}")
await advertisement.register(bus)
async def interfaces_removed(path, interfaces):
global bus
global advertisement
if DEVICE_INTERFACE in interfaces:
logger.info(f"Device disconnected: {path}")
await advertisement.register(bus)
async def setup_interfaces_callback(bus: MessageBus):
introspection = await bus.introspect(BLUEZ_SERVICE_NAME, "/")
proxy_object = bus.get_proxy_object(BLUEZ_SERVICE_NAME, "/", introspection)
interface = proxy_object.get_interface(DBUS_OM_IFACE)
interface.on_interfaces_added(interfaces_added)
interface.on_interfaces_removed(interfaces_removed)
async def main():
global bus
global advertisement
bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
await setup_interfaces_callback(bus)
adapter = "/org/bluez/hci0"
introspection = await bus.introspect(BLUEZ_SERVICE_NAME, adapter)
adapter_obj = bus.get_proxy_object(BLUEZ_SERVICE_NAME, adapter, introspection)
adapter_props = adapter_obj.get_interface(DBUS_PROPERTIES)
await adapter_props.call_set(
ADAPTER_INTERFACE, "Powered", dbus_next.Variant("b", True)
)
advertisement = Advertisement()
await advertisement.register(bus)
logger.info("advertisement registered")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_forever()
asyncio
uvloop
dbus-next
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment