Skip to content

Instantly share code, notes, and snippets.

@bojanpotocnik
Created June 16, 2020 08:10
Show Gist options
  • Save bojanpotocnik/2049e4914ea581d6814019992850743d to your computer and use it in GitHub Desktop.
Save bojanpotocnik/2049e4914ea581d6814019992850743d to your computer and use it in GitHub Desktop.
"""
Various utility classes for working with BlueZ via D-Bus
For this to work the following most be executed (installed) on most machines:
1. `sudo apt install bluez bluez-tools`
2. `sudo apt install python3-dbus libgirepository1.0-dev python3-gi`
3. `pip install PyGObject` globally or in the virtual environment
"""
import ctypes
import enum
import multiprocessing
import os
import re
import time
import warnings
from pathlib import Path
from typing import Optional, Iterable, Tuple, List, Union, Dict, Any, Iterator
import dbus
import dbus.mainloop.glib
import dbus.service
try:
# noinspection PyUnresolvedReferences,PyPackageRequirements
from gi.repository import GLib
except ModuleNotFoundError:
raise ModuleNotFoundError(f"Read the docstring in {os.path.realpath(__file__)}")
class _Singleton(type):
# noinspection PyInitNewSignature
def __new__(mcs, class_name: str, parents: tuple, attributes: dict, instance_name: str) -> Any:
# Create the original class
cls = super().__new__(mcs, class_name, parents, attributes)
# Patch the __new__ method of the created class
original_new = cls.__new__
def __singleton_new__(*args, **kwargs):
existing = globals().get(instance_name)
if existing:
warnings.warn(f"The existing instance of this class shall be used"
f", `{Path(__file__).stem}.bluez`", stacklevel=2)
return existing
return original_new(*args, **kwargs)
cls.__new__ = __singleton_new__
return cls
class MainLoop(GLib.MainLoop):
pass
class BlueZ(metaclass=_Singleton, instance_name='bluez'):
"""
Utility class for interfacing BlueZ via D-Bus
This is singleton because it is important that all related operations get the same instance if dbus.Bus.
"""
SERVICE = "org.bluez"
class Rejected(dbus.DBusException):
_dbus_error_name = "org.bluez.Error.Rejected"
class Canceled(dbus.DBusException):
_dbus_error_name = "org.bluez.Error.Canceled"
# noinspection PyTypeChecker
def __init__(self):
# Most of the interfaces cannot be used before mainloop is set and initialized,
# therefore initialization cannot be done here, but lazily when used.
# Also do not set default loop here because user might set its own loop or do
# some special initialization later (note - this class is instantiated when
# this module is imported).
self._bus: dbus.Bus = None
# Used for manual mainloop iterations
self._mainloop_context: GLib.MainContext = None
@property
def bus(self) -> dbus.Bus:
if not self._bus:
# Implicitly initialize mainloop if not yet initialized. If user would need a reference or
# custom initialization, then the mainloop was most likely initialized prior to this call.
if not dbus.get_default_main_loop():
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
if not self._mainloop_context:
# noinspection PyUnresolvedReferences
self._mainloop_context = GLib.MainContext.default()
self._bus = dbus.SystemBus()
return self._bus
def object(self, path: str) -> dbus.proxies.ProxyObject:
return self.bus.get_object(self.SERVICE, path)
def interface(self, path: str, interface: str) -> dbus.Interface:
return dbus.Interface(self.object(path), interface)
@property
def interfaces(self) -> Dict[dbus.ObjectPath, dbus.Dictionary]:
"""
Return all interfaces of this service
:return: Dictionary {path: interfaces}
"""
return self.interface("/", "org.freedesktop.DBus.ObjectManager").GetManagedObjects()
def property(self, of: dbus.Interface, which: str, allow_missing: bool = False) -> Optional[Any]:
try:
return self.interface(of.object_path, "org.freedesktop.DBus.Properties").Get(of.INTERFACE, which)
except dbus.exceptions.DBusException as e:
if allow_missing and (e.get_dbus_message() == f"No such property '{which}'"):
return None
raise
def mainloop_iterate(self, timeout: Union[None, float], until_idle: bool = False) -> Iterator[float]:
"""
Run GLib.MainLoop
Some operations require running mainloop (e.g. BLE device scan/discovery) to operate,
but basic GLib.MainLoop.run() does not implement a timeout or check for arbitrary
conditions, which is frequently required. This method extends the default mainloop
operation in a form of iterator to enable additional checks on every loop.
:param timeout: If not None then the iterations will stop after this many seconds,
irrespective of the other parameters. If 0 then one iteration will be
performed, yielding -1 and iterations stopped.
:param until_idle: If True then stop the iterations as soon as none of the associated sources
have pending events (as soon as GLib.MainContext::pending() returns False),
even if timeout not reached. This is useful to execute some work queued for
execution in the main loop. It is usually False when waiting for some I/O
event to happen because if iterations are fast there can be a lot of idle
time - time with no events pending.
:return: Iterator yielding:
Positive value representing remaining time [seconds] until timeout,
-1 if timeout was reached (no time remaining),
0 if `timeout` was None and therefore looping forever,
-2 if `until_idle` was True and there was no pending events (ignoring `timeout`).
The iterator will return (will be exhausted) after yielding negative value.
"""
# One way to do it would be to run the mainloop in a separate thread:
# import threading
#
# finished = threading.Event()
# ...
# loop = GLib.MainLoop()
# thread = threading.Thread(target=loop.run, daemon=False)
# thread.start()
# # Non-busy wait for event, condition, timeout, ...
# finished.wait(timeout)
# loop.quit()
# thread.join()
# but there is not much sense in spawning another thread if we are then blocking here anyway.
# Manually perform iterations of the main loop as it would be done in GLib.MainLoop.run().
end_time = (time.perf_counter() + timeout) if (timeout is not None) else None
while True:
# noinspection PyUnresolvedReferences
self._mainloop_context.iteration(False)
# Yield value of 0 means infinite iteration (no timeout)
ret = 0
# If timeout is used then remaining time is yielded or -1 for timeout
if end_time:
ret = end_time - time.perf_counter()
if ret <= 0:
ret = -1
# noinspection PyUnresolvedReferences
if until_idle and (not self._mainloop_context.pending()):
ret = -2
yield ret
if ret < 0:
break
def mainloop_run(self, until_idle: bool = False, timeout: Optional[float] = None) -> int:
"""Blocking version of :meth:`mainloop_iterate`"""
ret = 0
for ret in self.mainloop_iterate(timeout=timeout, until_idle=until_idle):
pass
return ret
bluez = BlueZ()
"""Single global instance of a BlueZ singleton class"""
class Device(dbus.Interface):
INTERFACE = BlueZ.SERVICE + ".Device1"
def __init__(self, path: str):
super().__init__(bluez.object(path), self.INTERFACE)
# This class can be instantiated many times (e.g. during scan) - do not register listener for every instance!
self._property_change_listener_added: bool = False
# States changed from PropertiesChanged listener
self._connected = multiprocessing.Value(ctypes.c_bool)
self._paired = multiprocessing.Value(ctypes.c_bool)
def __del__(self):
if self._property_change_listener_added:
bluez.bus.remove_signal_receiver(self._on_properties_changed)
self._property_change_listener_added = True
def _listen_to_property_changes(self) -> None:
if self._property_change_listener_added:
return
bluez.bus.add_signal_receiver(self._on_properties_changed,
dbus_interface="org.freedesktop.DBus.Properties",
signal_name="PropertiesChanged",
arg0=self.INTERFACE,
path_keyword="path")
self._property_change_listener_added = True
def _on_properties_changed(self, interface: str, changed_properties: Dict[str, GLib.Variant],
_invalidated_properties: List[str], path: dbus.ObjectPath):
if (interface != self.INTERFACE) or (path != self.object_path):
return
# This is called for every received advertisement packet and usually contains at least RSSI key. However if
# other values are present in the advertisement packet (e.g. ManufacturerData) they will also be reported as
# changed for every received packet - even if they are the same. Therefore only act on actually relevant
# changes.
if 'Connected' in changed_properties:
self._connected.value = changed_properties['Connected']
print("Connected = ", self._connected.value)
del changed_properties['Connected']
if 'Paired' in changed_properties:
self._paired.value = changed_properties['Paired']
print("Paired = ", self._paired.value)
del changed_properties['Paired']
if 'ServicesResolved' in changed_properties:
print("Resolved:", changed_properties['ServicesResolved'])
del changed_properties['ServicesResolved']
try:
del changed_properties['RSSI']
del changed_properties['ManufacturerData']
except KeyError:
pass
if changed_properties:
print(changed_properties)
@property
def address(self) -> str:
return bluez.property(self, "Address")
@property
def name(self) -> Optional[str]:
return bluez.property(self, "Name", allow_missing=True)
@property
def rssi(self) -> Optional[int]:
return bluez.property(self, "RSSI", allow_missing=True)
@property
def tx_power(self) -> Optional[int]:
return bluez.property(self, "TxPower", allow_missing=True)
@property
def manufacturer_data(self) -> Dict[int, bytearray]:
mds_raw = bluez.property(self, "ManufacturerData", allow_missing=True)
if not mds_raw:
return {}
mds = {}
# Keys are 16 bits Manufacturer ID followed by its byte array value.
for md_id, md_value in mds_raw.items():
mds[md_id] = bytearray(md_value)
return mds
@property
def connected(self) -> str:
return bluez.property(self, "Connected")
@property
def uuids(self) -> str:
return bluez.property(self, "UUIDs")
@property
def services(self) -> str:
return bluez.property(self, "UUIDs")
@property
def services_resolved(self) -> str:
return bluez.property(self, "ServicesResolved")
@property
def paired(self) -> str:
return bluez.property(self, "Paired")
def connect(self, timeout: float = 30, authentication_delay: Optional[float] = 1) -> Tuple[bool, Optional[str]]:
"""
Connect to BLE device
:param timeout: Timeout [s] for all operations.
:param authentication_delay: If this is not None, it is assumed that this device possibly has pairing
enabled and connection will be rejected if not paired. However even if
authentication is mandatory the devices will connect first - that's why
this method waits `authentication_delay` seconds before performing
check for connectable again.
:return: Success status, optional error code if success is False.
"""
finish = []
self._listen_to_property_changes()
self.Connect(
reply_handler=lambda: finish.append("OK"),
error_handler=lambda error: finish.append(error.get_dbus_message()),
timeout=timeout
)
# First, wait for connect operation to finish
for _ in bluez.mainloop_iterate(timeout=timeout):
if finish:
break
if not finish:
return False, "Timeout"
if finish[0] != "OK":
return False, finish[0]
if not self.connected:
return False, "Unknown"
# Device can be connected here, but if pairing is mandatory it will be disconnected in few moments
if authentication_delay:
bluez.mainloop_run(timeout=authentication_delay)
if not self.connected:
return False, "Rejected" # Authentication Failed
return True, None
def pair(self, timeout: float = 100) -> Tuple[bool, Optional[str]]:
"""
This method will connect to the remote device, initiate pairing and then retrieve all SDP records
(or GATT primary services). If the application has registered its own agent, then that specific
agent will be used. Otherwise it will use the default agent.
This method will block until pairing finishes (successfully or with error) or timeout is reached.
Possible errors: org.bluez.Error.InvalidArguments
org.bluez.Error.Failed
org.bluez.Error.AlreadyExists
org.bluez.Error.AuthenticationCanceled
org.bluez.Error.AuthenticationFailed
org.bluez.Error.AuthenticationRejected
org.bluez.Error.AuthenticationTimeout
org.bluez.Error.ConnectionAttemptFailed
:param timeout: Pairing timeout in seconds.
:return: Success status, optional error code if success is False.
"""
# Even if blocking, do not use self.Pair() without reply and error handles as this also
# blocks the execution of the custom Agent methods (e.g. PasskeyRequest). This one
# call dbus.connection.call_async(...) internally.
reply = []
self._listen_to_property_changes()
self.Pair(
reply_handler=lambda: reply.append("OK"),
error_handler=lambda error: reply.append(error.get_dbus_message()),
timeout=timeout
)
# First, wait for connect operation to finish
for _ in bluez.mainloop_iterate(timeout=timeout):
if reply:
break
if not reply:
return False, "Timeout"
if reply[0] != "OK":
return False, reply[0]
if not (self.connected and self.paired):
return False, "Unknown"
# Wait a bit more for device to disconnect if rejected
bluez.mainloop_run(timeout=1)
if not (self.connected and self.paired):
return False, "Rejected"
return True, None
class Adapter(dbus.Interface):
INTERFACE = BlueZ.SERVICE + ".Adapter1"
def __init__(self, pattern: str = None):
"""
Get the BlueZ Adapter interface
:param pattern: Path (e.g. "hci0" or just "0") or MAC address ("xx:xx:xx:xx:xx:xx") of the desired
adapter or None to use the firs one.
"""
obj = None
for path, interfaces in bluez.interfaces.items():
adapter = interfaces.get(self.INTERFACE)
if not adapter:
continue
if pattern and (adapter["Address"].lower() != pattern.lower()) and (not path.endswith(pattern)):
continue
obj = bluez.object(path)
break
if not obj:
raise dbus.DBusException("Adapter " + (pattern + " " if pattern else "") + "not found")
super().__init__(obj, self.INTERFACE)
@property
def discovering(self) -> bool:
return bluez.property(self, "Discovering")
@discovering.setter
def discovering(self, enable: bool) -> None:
"""Start or stop discovering"""
try:
if enable:
self.StartDiscovery()
else:
self.StopDiscovery()
except dbus.exceptions.DBusException as e:
if enable and (e.get_dbus_name() == "org.bluez.Error.InProgress"): # Operation already in progress
return
elif enable and (e.get_dbus_name() == "org.bluez.Error.Failed"): # No discovery started
return
raise
@property
def devices(self) -> Iterable[Tuple[dbus.ObjectPath, dbus.Dictionary]]:
for path, interface in bluez.interfaces.items():
if not path.startswith(self.object_path):
continue
device = interface.get(Device.INTERFACE)
if not device:
continue
yield path, device
def remove_device(self, device: Device) -> bool:
"""
This removes the remote device object and also it's the pairing information
:return: Whether the device was removed.
"""
try:
self.RemoveDevice(device.object_path)
return True
except dbus.exceptions.DBusException as e:
if e.get_dbus_name() == "org.bluez.Error.DoesNotExist":
return False
raise
def scan(self, uuid: Union[None, int, str] = None,
mfd: Union[None, int] = None,
rssi: Union[None, int] = None,
pattern: Union[None, str, re.Pattern] = None,
timeout: Union[None, float] = 30,
duplicates: bool = False) -> Iterator[Device]:
"""
Scan for BLE devices
:param uuid: Only report devices advertising specified service UUID.
:param mfd: Only report devices advertising manufacturer data longer or equal to specified number
of bytes (0 is valid, use None to disable this filter).
:param rssi: Only report devices with received RSSI higher than this threshold value.
:param pattern: String or regex pattern to match the device, tested in the following sequence:
- Device MAC address
- Device name, if advertised
String is compared with '==', compiled regex patter with fullmatch().
:param timeout: Scan timeout in seconds or None for no timeout.
:param duplicates: Whether to report every advertisement packet (True) or only once for new devices (False).
:return: Iterator returning advertisement packets as Devices, exhausted after timeout.
"""
if not isinstance(pattern, (str, re.Pattern, type(None))):
raise TypeError(f"Pattern can only be string, compiled Regex pattern or None, not {type(pattern)}")
devices: Dict[str, Device] = {}
new_devices: List[str] = []
def on_properties_changed(interface: str, _changed_properties: Dict[str, GLib.Variant],
_invalidated_properties: List[str], path: dbus.ObjectPath):
if interface != Device.INTERFACE:
return
if path in new_devices:
# D-Bus paths are unique (per bus) so if this device is still pending to be yielded it makes no
# sense to instantiate it again as it would only create another Python object using the exact
# identical device and its existing properties.
return
# Just get an actual DBus device instance instead of parsing the data (it is already parsed internally).
# If this device (path) is already cached do not just add it to new_devices again, check if it still
# passes trough all the filters.
dev = devices.get(path, Device(path))
matches = True
# UUID and RSSI are already filtered by discovery filter
# Manufacturer Data
if matches and (mfd is not None):
dev_mfd = dev.manufacturer_data
if (dev_mfd is None) or all(len(v) < mfd for v in dev_mfd.values()):
matches = False
# Address/Name
if matches and (pattern is not None):
addr = dev.address
name = dev.name
if isinstance(pattern, str):
if (pattern != addr) and (pattern != name):
matches = False
elif isinstance(pattern, re.Pattern):
if (not pattern.fullmatch(addr)) and name and (not pattern.fullmatch(name)):
matches = False
if matches:
if path not in devices:
devices[path] = dev
new_devices.append(path)
elif duplicates:
new_devices.append(path)
elif path in devices:
del devices[path]
bluez.bus.add_signal_receiver(on_properties_changed,
dbus_interface="org.freedesktop.DBus.Properties",
signal_name="PropertiesChanged",
arg0=Device.INTERFACE,
path_keyword="path")
discovery_filter = {
# Only scan BLE devices
"Transport": 'le',
# Trigger PropertiesChanged signal for every received advertisement packet, use custom filter
# for duplicates as this one would not report devices already seen before on the system
"DuplicateData": True,
}
if uuid:
discovery_filter["UUIDs"] = [f"0x{uuid:04x}" if isinstance(uuid, int) else uuid]
if rssi is not None:
discovery_filter["RSSI"] = dbus.Int16(rssi)
self.SetDiscoveryFilter(discovery_filter)
# Note: GLib.MainLoop must run for discovery to work
self.discovering = True
# Spin a mainloop until iterator is destroyed or timeout reached
for _ in bluez.mainloop_iterate(timeout=timeout):
if new_devices:
try:
yield devices[new_devices.pop()]
except GeneratorExit:
break
bluez.bus.remove_signal_receiver(on_properties_changed)
# BlueZ on Ubuntu removes device when advertising is stopped if it does not advertise Flags
# with TX Power, because it cannot calculate RSSI to show it in the UI ('RSSI is nil'). But
# it shows the RSSI while discovering, so device.rssi cannot be checked here.
# Until better fix is found, leave discovery running in such case to prevent removal of devices.
# self.discovering = False
# # Spin a bit more to handle StopDiscovery properly
# bluez.mainloop_run(until_idle=True, timeout=5)
class AgentManager(dbus.Interface, metaclass=_Singleton, instance_name='agentManager'):
INTERFACE = BlueZ.SERVICE + ".AgentManager1"
def __init__(self):
super().__init__(bluez.object("/org/bluez"), self.INTERFACE)
def register_agent(self, agent: 'Agent', capability: 'Agent.Capability') -> None:
"""
This registers an agent handler.
The object path defines the path of the agent
that will be called when user input is needed.
Every application can register its own agent and
for all actions triggered by that application its
agent is used.
It is not required by an application to register
an agent. If an application does chooses to not
register an agent, the default agent is used. This
is on most cases a good idea. Only application
like a pairing wizard should register their own
agent.
An application can only register one agent. Multiple
agents per application is not supported.
The capability parameter can have the values
"DisplayOnly", "DisplayYesNo", "KeyboardOnly",
"NoInputNoOutput" and "KeyboardDisplay" which
reflects the input and output capabilities of the
agent.
If an empty string is used it will fallback to
"KeyboardDisplay".
Possible errors: org.bluez.Error.InvalidArguments
org.bluez.Error.AlreadyExists
"""
self.RegisterAgent(agent, capability.value)
def unregister_agent(self, agent: 'Agent') -> bool:
"""
This unregisters the agent that has been previously
registered. The object path parameter must match the
same value that has been used on registration.
Possible errors: org.bluez.Error.DoesNotExist
"""
try:
self.UnregisterAgent(agent)
return True
except dbus.exceptions.DBusException as e:
if e.get_dbus_name() != "org.bluez.Error.DoesNotExist":
raise
return False
agentManager = AgentManager()
"""Single global instance of a AgentManager singleton class"""
class Agent(dbus.service.Object):
INTERFACE = BlueZ.SERVICE + ".Agent1"
@enum.unique
class Capability(enum.Enum):
DISPLAY_ONLY = "DisplayOnly"
DISPLAY_YES_NO = "DisplayYesNo"
KEYBOARD_ONLY = "KeyboardOnly"
NO_IO = "NoInputNoOutput"
KEYBOARD_DISPLAY = "KeyboardDisplay"
def __init__(self, io_capabilities: Capability = Capability.KEYBOARD_DISPLAY, register: bool = True):
self.io_capabilities = io_capabilities
super().__init__(bluez.bus, "/test/agent", None)
self.passkey: Optional[int] = None
"""6-digit passkey for Passkey Entry pairing method, if supported"""
if register:
self.register()
def __del__(self):
self.unregister()
def register(self) -> None:
agentManager.register_agent(self, self.io_capabilities)
def unregister(self) -> None:
agentManager.unregister_agent(self)
# noinspection PyPep8Naming
@dbus.service.method(INTERFACE, in_signature="", out_signature="")
def Release(self) -> None:
"""
This method gets called when the service daemon
unregisters the agent. An agent can use it to do
cleanup tasks. There is no need to unregister the
agent, because when this method gets called it has
already been unregistered.
"""
print("Release()")
# Agent was released - eventually quit mainloop?
# noinspection PyPep8Naming
@dbus.service.method(INTERFACE, in_signature="o", out_signature="s")
def RequestPinCode(self, device: dbus.service.Object) -> dbus.String:
"""
This method gets called when the service daemon
needs to get the passkey for an authentication.
The return value should be a string of 1-16 characters
length. The string can be alphanumeric.
Possible errors: org.bluez.Error.Rejected
org.bluez.Error.Canceled
"""
pincode = "1234"
print(f"RequestPinCode({device})->{pincode}")
return dbus.String(pincode)
# noinspection PyPep8Naming
@dbus.service.method(INTERFACE, in_signature="os", out_signature="")
def DisplayPinCode(self, device: dbus.service.Object, pincode: dbus.String) -> None:
"""
This method gets called when the service daemon
needs to display a pincode for an authentication.
An empty reply should be returned. When the pincode
needs no longer to be displayed, the Cancel method
of the agent will be called.
This is used during the pairing process of keyboards
that don't support Bluetooth 2.1 Secure Simple Pairing,
in contrast to DisplayPasskey which is used for those
that do.
This method will only ever be called once since
older keyboards do not support typing notification.
Note that the PIN will always be a 6-digit number,
zero-padded to 6 digits. This is for harmony with
the later specification.
Possible errors: org.bluez.Error.Rejected
org.bluez.Error.Canceled
"""
print(f"DisplayPinCode({device}, pincode={pincode})")
# noinspection PyPep8Naming
@dbus.service.method(INTERFACE, in_signature="o", out_signature="u")
def RequestPasskey(self, device: dbus.service.Object) -> dbus.UInt32:
"""
This method gets called when the service daemon
needs to get the passkey for an authentication.
The return value should be a numeric value
between 0-999999.
Possible errors: org.bluez.Error.Rejected
org.bluez.Error.Canceled
"""
print(f"RequestPasskey({device})->{self.passkey}")
if self.passkey is None:
raise bluez.Rejected("Connection rejected by user")
return dbus.UInt32(self.passkey)
# noinspection PyPep8Naming
@dbus.service.method(INTERFACE, in_signature="ouq", out_signature="")
def DisplayPasskey(self, device: dbus.service.Object, passkey: dbus.UInt32, entered: dbus.UInt16) -> None:
"""
This method gets called when the service daemon
needs to display a passkey for an authentication.
The entered parameter indicates the number of already
typed keys on the remote side.
An empty reply should be returned. When the passkey
needs no longer to be displayed, the Cancel method
of the agent will be called.
During the pairing process this method might be
called multiple times to update the entered value.
Note that the passkey will always be a 6-digit number,
so the display should be zero-padded at the start if
the value contains less than 6 digits.
"""
print(f"DisplayPasskey({device}, passkey={passkey}, entered={entered})")
# noinspection PyPep8Naming
@dbus.service.method(INTERFACE, in_signature="ou", out_signature="")
def RequestConfirmation(self, device: dbus.service.Object, passkey: dbus.UInt32) -> None:
"""
This method gets called when the service daemon
needs to confirm a passkey for an authentication.
To confirm the value it should return an empty reply
or an error in case the passkey is invalid.
Note that the passkey will always be a 6-digit number,
so the display should be zero-padded at the start if
the value contains less than 6 digits.
Possible errors: org.bluez.Error.Rejected
org.bluez.Error.Canceled
"""
confirm = True
print(f"RequestConfirmation({device}, passkey={passkey})->{confirm}")
if not confirm:
raise bluez.Rejected("Passkey does not match")
# noinspection PyPep8Naming
@dbus.service.method(INTERFACE, in_signature="o", out_signature="")
def RequestAuthorization(self, device: dbus.service.Object) -> None:
"""
This method gets called to request the user to
authorize an incoming pairing attempt which
would in other circumstances trigger the just-works
model, or when the user plugged in a device that
implements cable pairing. In the latter case, the
device would not be connected to the adapter via
Bluetooth yet.
Possible errors: org.bluez.Error.Rejected
org.bluez.Error.Canceled
"""
authorize = True
print(f"RequestAuthorization({device})->{authorize}")
if not authorize:
raise bluez.Rejected("Pairing rejected")
# noinspection PyPep8Naming
@dbus.service.method(INTERFACE, in_signature="os", out_signature="")
def AuthorizeService(self, device: dbus.service.Object, uuid: dbus.String) -> None:
"""
This method gets called when the service daemon
needs to authorize a connection/service request.
Possible errors: org.bluez.Error.Rejected
org.bluez.Error.Canceled
"""
authorize = True
print(f"AuthorizeService({device}, uuid={uuid})->{authorize}")
if not authorize:
raise bluez.Rejected("Connection rejected by user")
# noinspection PyPep8Naming
@dbus.service.method(INTERFACE, in_signature="", out_signature="")
def Cancel(self) -> None:
"""
This method gets called to indicate that the agent
request failed before a reply was returned.
"""
print("Cancel()")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment