Skip to content

Instantly share code, notes, and snippets.

@devbis
Created September 20, 2021 21:13
Show Gist options
  • Save devbis/1e0e8ac430b410eb4a87481940b779b7 to your computer and use it in GitHub Desktop.
Save devbis/1e0e8ac430b410eb4a87481940b779b7 to your computer and use it in GitHub Desktop.
import asyncio
import logging
import os
from bleak import BleakScanner
from bleak.backends.device import BLEDevice
from bleak.exc import BleakDBusError
logger = logging.getLogger(__name__)
BLUETOOTH_RESTARTING = asyncio.Lock()
def device_detection_callback(device: BLEDevice, advertisement_data):
logger.info(f'{device} {advertisement_data}')
async def scan_devices_task():
while True:
scanner = None
try:
logger.info(" --> scan start")
scanner = BleakScanner()
scanner.register_detection_callback(device_detection_callback)
try:
try:
await asyncio.wait_for(scanner.start(), 10)
except asyncio.TimeoutError:
logger.error('Scanner start failed with timeout')
await asyncio.sleep(3)
devices = scanner.discovered_devices
logger.info(f'found {len(devices)} devices: {devices}')
finally:
try:
await scanner.stop()
except Exception as e:
print(e)
scanner._rules.clear()
if scanner._bus:
scanner._bus.remove_message_handler(scanner._parse_msg)
except KeyboardInterrupt:
if scanner:
scanner._callback = None
del scanner
raise
except asyncio.IncompleteReadError:
if scanner:
scanner._callback = None
del scanner
raise
except BleakDBusError as e:
logger.exception(str(e))
except Exception as e:
print(e)
if scanner:
scanner._callback = None
del scanner
await asyncio.sleep(1)
async def restart_ble(interface='hci0'):
while True:
if BLUETOOTH_RESTARTING.locked():
await asyncio.sleep(9)
return
async with BLUETOOTH_RESTARTING:
logger.warning(f'--> hciconfig down')
proc = await asyncio.create_subprocess_exec(
'hciconfig', interface, 'down',
)
# logger.warning(f'--> wait for proc')
await proc.wait()
await asyncio.sleep(2)
logger.warning(f'--> bluetoothd restart')
if os.path.exists('/etc/openwrt_release'):
proc = await asyncio.create_subprocess_exec(
'/etc/init.d/bluetoothd', 'restart',
)
else:
proc = await asyncio.create_subprocess_exec(
'hciconfig', interface, 'reset',
)
# logger.warning(f'--> wait for proc')
await proc.wait()
await asyncio.sleep(3)
logger.warning(f'--> hciconfig up')
proc = await asyncio.create_subprocess_exec(
'hciconfig', interface, 'up',
)
# logger.warning(f'--> wait for proc')
await proc.wait()
await asyncio.sleep(15)
async def amain():
logging.basicConfig(level=logging.INFO)
logger.info("start")
await asyncio.gather(
restart_ble(),
scan_devices_task(),
return_exceptions=True,
)
asyncio.run(amain())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment