Skip to content

Instantly share code, notes, and snippets.

@balloob
Created May 7, 2018 02:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save balloob/031461de91ab2b06ff1efb75623aa7d9 to your computer and use it in GitHub Desktop.
Save balloob/031461de91ab2b06ff1efb75623aa7d9 to your computer and use it in GitHub Desktop.
Basis for a component to prototype bluetooth discovery
"""Basis for a BLE discovery component.
To add to Home Assistant, copy this file to
<config>/custom_components/ble_discovery.py
Add to your configuration.yaml:
```
ble_discovery:
logger:
default: info
logs:
custom_components.ble_discovery: debug
```
Lifecycle of component:
- Start scanning
- Discover device that broadcasts URL with device description
- Parse description, find available services
- Establish connection with device
- Create a new Home Assistant entity for each available service on device
- Connection lost with device: remove entities from Home Assistant
Left to implement are the parts that interact with BLE (marked with TODO):
- Discover GATT devices and retrieve URL with device description
- Connect to GATT devices
- Read boolean characteristic
- Write boolean characteristic
All todo parts are currently mocked so you can test this without BLE.
"""
import logging
import voluptuous as vol
from homeassistant.const import (
SERVICE_TURN_ON, SERVICE_TURN_OFF, EVENT_HOMEASSISTANT_START,
EVENT_HOMEASSISTANT_STOP, ATTR_ENTITY_ID)
from homeassistant.helpers.entity_component import EntityComponent
from homeassistant.helpers.entity import ToggleEntity
from homeassistant.helpers import config_validation as cv
DOMAIN = 'ble_discovery'
_LOGGER = logging.getLogger(__name__)
SERVICE_SCHEMA = vol.Schema({
ATTR_ENTITY_ID: cv.entity_ids
})
# MOCK: Fake description used for discovered devices
MOCK_DESCRIPTION = {
'name': 'Fake device',
'services': [
{
'type': 'BinarySwitch',
'characteristic_id': 'characteristic_1',
'name': 'LED 1'
},
{
'type': 'BinarySwitch',
'characteristic_id': 'characteristic_2',
'name': 'LED 2'
},
]
}
def setup(hass, config):
"""Basic ble discovery."""
component = EntityComponent(_LOGGER, DOMAIN, hass)
ble_interface = BLEInterface(component)
# Register services to turn device on/off
def handle_service(service):
"""Handle calls to the services."""
for entity_id in service.data[ATTR_ENTITY_ID]:
entity = component.get_entity(entity_id)
if service.service == SERVICE_TURN_ON:
entity.turn_on()
else:
entity.turn_off()
entity.update()
entity.schedule_update_ha_state()
hass.services.register(
DOMAIN, SERVICE_TURN_OFF, handle_service, SERVICE_SCHEMA)
hass.services.register(
DOMAIN, SERVICE_TURN_ON, handle_service, SERVICE_SCHEMA)
# Register to start/stop scanning when Home Assistant starts/stops
hass.bus.listen(EVENT_HOMEASSISTANT_START,
lambda event: ble_interface.start_scan())
hass.bus.listen(EVENT_HOMEASSISTANT_STOP,
lambda event: ble_interface.stop_scan())
# Indicate that the component was setup successfully
return True
class BLEInterface:
"""Stub for a BLE interface."""
def __init__(self, component):
"""Initialize the interface."""
self.component = component
self.devices = {}
# MOCK: keep track of values we are supposed to read/write
self._mock_cache = {}
def start_scan(self):
"""Start the Bluetooth scanning.
Discover devices that are broadcasting a specific packet. This packet
should contain a URL pointing at a device schema. Pass device ID and
URL to self.device_discovered(device_id, url).
The URL will be fetched and we will call self.connect(device_id) if we
can handle it.
"""
_LOGGER.debug("Start scanning")
# TODO
# MOCK: Imitate we discover a device
self.device_discovered('mock_device',
'http://example.com/device_schema.json')
# MOCK: Imitate we lose connection in 10 seconds
# from threading import Timer
# Timer(10, self.connection_closed, args=['mock_device']).start()
def stop_scan(self):
"""Stop the bluetooth scanning."""
_LOGGER.debug("Stop scanning")
# TODO
def connect(self, device_id):
"""Initialize a connection with a device.
Should call self.connection_established(device_id) when successful.
Once connected, should call self.connection_closed(device_id) when
connection is lost.
"""
_LOGGER.debug("Connecting to %s", device_id)
# TODO
# MOCK: Imitate a successful connection
self.connection_established(device_id)
def device_discovered(self, device_id, description_url):
"""Called from the scanning process when a new device is discovered."""
if device_id in self.devices:
_LOGGER.debug("Discovered existing device %s", device_id)
return
_LOGGER.debug("Discovering new device %s: %s",
device_id, description_url)
# If we wanted to fetch the url for real
# import requests
# description = requests.get(description_url).json()
# MOCK: Stub description
description = MOCK_DESCRIPTION
# Store the description for the device and try to connect
self.devices[device_id] = {
'description': description
}
self.connect(device_id)
def connection_established(self, device_id):
"""Called when a connection with a device has been established."""
_LOGGER.debug('Connection established with %s', device_id)
entities = []
for service in self.devices[device_id]['description']['services']:
if service['type'] == 'BinarySwitch':
entities.append(BinarySwitchEntity(self, device_id, service))
# MOCK: Initialize a fake value
self._mock_cache[(device_id, service['characteristic_id'])] = False
else:
_LOGGER.warning('Unknown service type %s', service['type'])
self.devices[device_id]['entities'] = entities
# Update entities and add to Home Assistant
for entity in entities:
entity.update()
self.component.add_entities(entities)
def connection_closed(self, device_id):
"""Called when a connection with a device has been closed."""
_LOGGER.debug('Connection closed with %s', device_id)
connection = self.devices.pop(device_id, None)
if connection is None:
return
for entity in connection['entities']:
self.component.hass.add_job(entity.async_remove)
def read_boolean(self, device_id, characteristic_id):
"""Read a boolean characteristic from a device."""
_LOGGER.debug('Read boolean %s.%s', device_id, characteristic_id)
# TODO
# MOCK: Return value from mock_cache
return self._mock_cache[(device_id, characteristic_id)]
def write_boolean(self, device_id, characteristic_id, value):
"""Write a boolean to a characteristic on a device."""
_LOGGER.debug('Write boolean %s to %s.%s',
value, device_id, characteristic_id)
# TODO
# MOCK: Set value in mock_cache
self._mock_cache[(device_id, characteristic_id)] = value
class BinarySwitchEntity(ToggleEntity):
"""Entity that can handle a BinarySwitch service."""
def __init__(self, ble_interface, device_id, service):
"""Initialize an entity."""
self._ble_interface = ble_interface
self._device_id = device_id
self._service = service
self._is_on = False
@property
def name(self):
"""Name of the device."""
return self._service['name']
@property
def is_on(self):
"""Return if device is on."""
return self._is_on
@property
def should_poll(self):
"""If Home Assistant should poll this entity."""
return False
def turn_on(self, **kwargs):
"""Turn the entity on."""
self._ble_interface.write_boolean(
self._device_id, self._service['characteristic_id'], True)
def turn_off(self, **kwargs):
"""Turn the entity off."""
self._ble_interface.write_boolean(
self._device_id, self._service['characteristic_id'], False)
@property
def device_state_attributes(self):
"""Extra state attributes for this device."""
return {
'device': self._device_id,
'characteristic': self._service['characteristic_id']
}
def update(self):
"""Read the characteristic."""
self._is_on = self._ble_interface.read_boolean(
self._device_id, self._service['characteristic_id'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment