-
-
Save giorgib/a74f15e591979036c154fab5520b22c2 to your computer and use it in GitHub Desktop.
Sample code for reproducing BLE advertisement error on toradex AM62 modules
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
from typing import Collection | |
import dbus_next | |
from dbus_next.aio import MessageBus | |
from dbus_next.aio.proxy_object import ProxyObject | |
from dbus_next.constants import BusType | |
from dbus_next.errors import InvalidIntrospectionError | |
from dbus_next.service import ServiceInterface, method, dbus_property, PropertyAccess | |
logging.basicConfig(level=logging.DEBUG) | |
logger = logging.getLogger(__name__) | |
BLUEZ_SERVICE_NAME = "org.bluez" | |
DBUS_PROPERTIES = "org.freedesktop.DBus.Properties" | |
DBUS_OM_IFACE = "org.freedesktop.DBus.ObjectManager" | |
ADAPTER_INTERFACE = BLUEZ_SERVICE_NAME + ".Adapter1" | |
DEVICE_INTERFACE = BLUEZ_SERVICE_NAME + ".Device1" | |
ADVERTISING_MANAGER_INTERFACE = BLUEZ_SERVICE_NAME + ".LEAdvertisingManager1" | |
bus = None | |
advertisement = None | |
class Adapter: | |
"""A bluetooth adapter.""" | |
_INTERFACE = "org.bluez.Adapter1" | |
_adapter_interface = None | |
def __init__(self, proxy: ProxyObject): | |
self._proxy = proxy | |
self._adapter_interface = proxy.get_interface(self._INTERFACE) | |
async def get_address(self) -> str: | |
"""Read the bluetooth address of this device.""" | |
return await self._adapter_interface.get_address() | |
async def get_name(self) -> str: | |
"""Read the bluetooth hostname of this system.""" | |
return await self._adapter_interface.get_name() | |
async def get_alias(self) -> str: | |
"""The user friendly name of the device.""" | |
return await self._adapter_interface.get_alias() | |
async def set_alias(self, val: str): | |
"""Set the user friendly name for this device. | |
Changing the device hostname directly is preferred. | |
Writing an empty string will result in the alias resetting to the device hostname. | |
""" | |
await self._adapter_interface.set_alias(val) | |
async def get_powered(self) -> bool: | |
"""Indicates if the adapter is on or off.""" | |
return await self._adapter_interface.get_powered() | |
async def set_powered(self, val: bool): | |
"""Turn this adapter on or off.""" | |
await self._adapter_interface.set_powered(val) | |
@classmethod | |
async def get_all(cls, bus: MessageBus) -> Collection["Adapter"]: | |
"""Get a list of available Bluetooth adapters. | |
Args: | |
bus: The message bus used to query bluez. | |
Returns: | |
A list of available bluetooth adapters. | |
""" | |
adapter_nodes = (await bus.introspect("org.bluez", "/org/bluez")).nodes | |
adapters = [] | |
for node in adapter_nodes: | |
try: | |
introspection = await bus.introspect( | |
"org.bluez", "/org/bluez/" + node.name | |
) | |
proxy = bus.get_proxy_object( | |
"org.bluez", "/org/bluez/" + node.name, introspection | |
) | |
adapters.append(cls(proxy)) | |
except InvalidIntrospectionError: | |
pass | |
return adapters | |
@classmethod | |
async def get_first(cls, bus: MessageBus) -> "Adapter": | |
"""Gets the first adapter listed by bluez. | |
Args: | |
bus: The bus to use for adapter discovery. | |
Raises: | |
ValueError: Raised when no bluetooth adapters are available. | |
Returns: | |
The resulting adapter. | |
""" | |
adapters = await cls.get_all(bus) | |
if len(adapters) > 0: | |
return adapters[0] | |
else: | |
raise ValueError("No bluetooth adapters could be found.") | |
class Advertisement(ServiceInterface): | |
""" | |
org.bluez.LEAdvertisement1 interface implementation | |
""" | |
PATH_BASE = "/org/bluez/test/advertisement" | |
def __init__(self): | |
self.index = 0 | |
self.discoverable = True | |
self.includes = [] | |
self.tx_power = 0 | |
self.advertising_type = "peripheral" | |
self.local_name = "sample_local_name" | |
super().__init__("org.bluez.LEAdvertisement1") | |
def get_path(self): | |
return self.PATH_BASE + str(self.index) | |
async def register(self, bus: MessageBus, adapter: Adapter = None): | |
if adapter is None: | |
adapter = await Adapter.get_first(bus) | |
await self.unregister(bus) | |
self.index += 1 | |
path = self.get_path() | |
logger.info(f"Registering advertisement with path: {path}") | |
try: | |
bus.export(path, self) | |
logger.info("Advertisement exported") | |
except: | |
logger.info("Advertisement already exported") | |
interface = adapter._proxy.get_interface(ADVERTISING_MANAGER_INTERFACE) | |
try: | |
await interface.call_register_advertisement(path, {}) | |
logger.info(f"Advertisement registered with path: {path}") | |
except Exception as e: | |
logger.info(f"Error while registering advertisement: {e}") | |
async def unregister(self, bus: MessageBus, adapter: Adapter = None): | |
path = self.get_path() | |
logger.info(f"Unregistering advertisement with path: {path}") | |
if adapter is None: | |
adapter = await Adapter.get_first(bus) | |
interface = adapter._proxy.get_interface(ADVERTISING_MANAGER_INTERFACE) | |
try: | |
bus.export(path, self) | |
logger.info("Advertisement exported") | |
except: | |
logger.info("Advertisement already exported") | |
try: | |
await interface.call_unregister_advertisement(path) | |
logger.info("Advertisement unregistered") | |
except: | |
logger.info("Advertisement not registered yet") | |
try: | |
bus.unexport(path) | |
logger.info("Advertisement unexported") | |
except: | |
logger.info("Advertisement not exported yet") | |
logger.info(f"Advertisement unregistered with path: {path}") | |
@dbus_property(access=PropertyAccess.READ) | |
def Type(self) -> "s": | |
return self.advertising_type | |
@dbus_property() | |
def Discoverable(self) -> "b": | |
return self.discoverable | |
@Discoverable.setter | |
def Discoverable(self, val: "b"): | |
if self.discoverable == val: | |
return | |
self.discoverable = val | |
self.emit_properties_changed({"Discoverable": self.discoverable}) | |
@dbus_property() | |
def Includes(self) -> "as": | |
return self.includes | |
@Includes.setter | |
def Includes(self, val: "as"): | |
if self.includes == val: | |
return | |
self.includes = val | |
self.emit_properties_changed({"Includes": self.includes}) | |
@dbus_property() | |
def LocalName(self) -> "s": | |
return self.local_name | |
@LocalName.setter | |
def LocalName(self, val: "s"): | |
if self.local_name == val: | |
return | |
self.local_name = val | |
self.emit_properties_changed({"LocalName": self.local_name}) | |
@dbus_property() | |
def TxPower(self) -> "n": | |
return self.tx_power | |
@TxPower.setter | |
def TxPower(self, val: "n"): | |
if self.tx_power == val: | |
return | |
self.tx_power = val | |
self.emit_properties_changed({"TxPower": self.tx_power}) | |
@method() | |
def Release(self): | |
logger.info(f"{self.path} Released!") | |
async def interfaces_added(path, interfaces): | |
global bus | |
global advertisement | |
if DEVICE_INTERFACE in interfaces: | |
properties = interfaces[DEVICE_INTERFACE] | |
if "Connected" in properties: | |
logger.info(f"Device connected: {path}") | |
await advertisement.register(bus) | |
async def interfaces_removed(path, interfaces): | |
global bus | |
global advertisement | |
if DEVICE_INTERFACE in interfaces: | |
logger.info(f"Device disconnected: {path}") | |
await advertisement.register(bus) | |
async def setup_interfaces_callback(bus: MessageBus): | |
introspection = await bus.introspect(BLUEZ_SERVICE_NAME, "/") | |
proxy_object = bus.get_proxy_object(BLUEZ_SERVICE_NAME, "/", introspection) | |
interface = proxy_object.get_interface(DBUS_OM_IFACE) | |
interface.on_interfaces_added(interfaces_added) | |
interface.on_interfaces_removed(interfaces_removed) | |
async def main(): | |
global bus | |
global advertisement | |
bus = await MessageBus(bus_type=BusType.SYSTEM).connect() | |
await setup_interfaces_callback(bus) | |
adapter = "/org/bluez/hci0" | |
introspection = await bus.introspect(BLUEZ_SERVICE_NAME, adapter) | |
adapter_obj = bus.get_proxy_object(BLUEZ_SERVICE_NAME, adapter, introspection) | |
adapter_props = adapter_obj.get_interface(DBUS_PROPERTIES) | |
await adapter_props.call_set( | |
ADAPTER_INTERFACE, "Powered", dbus_next.Variant("b", True) | |
) | |
advertisement = Advertisement() | |
await advertisement.register(bus) | |
logger.info("advertisement registered") | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(main()) | |
loop.run_forever() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
asyncio | |
uvloop | |
dbus-next |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment