Skip to content

Instantly share code, notes, and snippets.

@ov1d1u
Created February 13, 2023 13:43
Show Gist options
  • Save ov1d1u/df2fc630579ccdce66e655b17a2eebb3 to your computer and use it in GitHub Desktop.
Save ov1d1u/df2fc630579ccdce66e655b17a2eebb3 to your computer and use it in GitHub Desktop.
import queue
from homeassistant.components import bluetooth
from bleak import BleakClient, exc
queues = {}
disconnectTasks = {}
class MessageQueue(queue.Queue):
def get(self, block=False, timeout=None):
try:
return super(MessageQueue, self).get(block, timeout)
except queue.Empty:
return None
class BLEMessage:
def __init__(self, address, characteristic, data, isHex, repeats, ttl):
self.address = address
self.characteristic = characteristic
self.data = data
self.isHex = isHex
self.repeats = repeats
self.ttl = ttl
self.retryCount = 0
def ble_get_client(address):
present = bluetooth.async_address_present(hass, address, connectable=True)
if not present:
log.error("Device with address {0} is not present".format(address))
return
device = bluetooth.async_ble_device_from_address(hass, address)
if not device:
log.error("Device with address {0} is not available".format(address))
return
client = BleakClient(device)
return client
def ble_delayed_disconnect(client):
task.wait_until(timeout=5)
address = client.address
log.info("Disconnecting from {0}".format(address))
del disconnectTasks[address]
client.disconnect()
def ble_write_client(client, msg):
if msg.isHex:
data = bytes.fromhex(msg.data)
else:
data = data.encode()
for i in range(msg.repeats+1):
client.write_gatt_char(msg.characteristic, data)
task.sleep(1)
def process_device_queue(address):
messageQueue = queues.get(address)
while msg := messageQueue.get():
log.info("Got message of length {0} for charac: {1}, retry {2}".format(len(msg.data), msg.characteristic, msg.retryCount))
client = ble_get_client(msg.address)
if not client:
log.error("BLE client with address {0} not found".format(msg.address))
continue
try:
if not client.is_connected:
log.info("Connecting to {0}".format(msg.address))
client.connect()
else:
# Cancel any previous disconnection task for this device
if prevTaskId := disconnectTasks.get(msg.address):
task.cancel(prevTaskId)
log.info("Reusing connection with {0}".format(msg.address))
ble_write_client(client, msg)
except Exception as e: # (exc.BleakDBusError, TimeoutError) as e:
log.error("BLE exception: {0}".format(e))
msg.retryCount += 1
if msg.retryCount < msg.ttl:
messageQueue.put(msg)
# Delay disconnection in case other requests are coming in
disconnectTasks[address] = task.create(ble_delayed_disconnect, client)
log.info('Clearing queue for device {0}'.format(address))
del queues[address]
@service
def ble_write(address: str, characteristic: str, data: bytes, hex=False, repeats=0, ttl=1):
"""yaml
name: BLE Write
description: Write data to a BLE characteristic
fields:
address:
description: Bluetooth address of the device
example: 11:22:33:44:55:66
required: true
selector:
text:
characteristic:
description: BLE characteristic to write onto
example: 00002a19-0000-1000-8000-00805f9b34fb
required: true
selector:
text:
data:
description: Data to write
example: bf626d541868626d4e189a62724900ff
required: true
selector:
text:
hex:
description: If data is in hex format
example: true
required: false
selector:
boolean:
repeats:
description: How many times should the write be repeated.
example: 1
required: false
selector:
number:
min: 0
max: 10
ttl:
description: How many times should try writing to the BLE device.
example: 1
required: false
selector:
number:
min: 1
max: 10
"""
msg = BLEMessage(address, characteristic, data, hex, repeats, ttl)
messageQueue = queues.get(address)
if messageQueue:
messageQueue.put(msg)
else:
messageQueue = MessageQueue()
messageQueue.put(msg)
queues[address] = messageQueue
process_device_queue(address)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment