Last active
July 13, 2022 04:49
-
-
Save osteele/926b5229eb51c090fa65597f833da379 to your computer and use it in GitHub Desktop.
List connected BLE devices
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import sys | |
from collections import Counter, defaultdict | |
from io import StringIO | |
import asyncclick as click | |
from bleak import BleakClient, BleakError, BleakScanner | |
print_device_names = False | |
@click.command() | |
@click.option("--alphabetical", default=False, is_flag=True, help="Sort by name") | |
@click.option("--name-filter") | |
async def main(name_filter, alphabetical=False): | |
devices = sorted(await BleakScanner.discover(), key=lambda d: d.name) | |
if print_device_names: | |
names = Counter(d.name for d in devices) | |
print( | |
"Device names:", | |
", ".join( | |
f"{name} (x{count})" if count > 1 else name | |
for name, count in names.items() | |
), | |
) | |
if name_filter: | |
devices = [d for d in devices if name_filter in d.name] | |
if not devices: | |
print("Device not found") | |
sys.exit(1) | |
errors = defaultdict(list) | |
sep = "" | |
return_when = asyncio.ALL_COMPLETED if alphabetical else asyncio.FIRST_COMPLETED | |
pending = [asyncio.create_task(get_device_services(device)) for device in devices] | |
while pending: | |
done, pending = await asyncio.wait(pending, return_when=return_when) | |
for task in done: | |
result = await task | |
device = result["device"] | |
if "error" in result: | |
errors[result["error"]].append(result) | |
else: | |
print(f"{sep}Device: {device.name} ({device.address}):") | |
print("Services:") | |
for svc in result["services"]: | |
print(f" {name_or_description(svc)}:") | |
print( | |
"\n".join( | |
f" {name_or_description(char)}" | |
for char in svc.characteristics | |
) | |
) | |
sep = "\n" | |
for error, results in errors.items(): | |
devices = [result["device"] for result in results] | |
print(f"{error}:") | |
print( | |
" ", | |
", ".join( | |
device.address | |
if device.name == "Unknown" | |
else f"{device.name} ({device.address})" | |
for device in devices | |
), | |
) | |
async def get_device_services(device): | |
output = StringIO() | |
try: | |
async with BleakClient(device.address) as client: | |
svcs = await client.get_services() | |
return dict(device=device, services=svcs, output=output.getvalue()) | |
except BleakError: | |
return dict(device=device, error="Disconnected") | |
except (asyncio.exceptions.TimeoutError, asyncio.exceptions.TimeoutError) as ex: | |
return dict(device=device, error="Cancelled") | |
def name_or_description(char): | |
return char.uuid if char.description == "Unknown" else char.description | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
anyio==3.5.0 | |
asyncclick==8.0.3.2 | |
asyncio==3.4.3 | |
bleak==0.14.2 | |
pyobjc==8.4.1 | |
# dev | |
black==22.3.0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment