Skip to content

Instantly share code, notes, and snippets.

@tnarik
Last active June 10, 2021 15:10
Show Gist options
  • Save tnarik/4aa3c4a392613e078f2a2dd82033bfed to your computer and use it in GitHub Desktop.
Save tnarik/4aa3c4a392613e078f2a2dd82033bfed to your computer and use it in GitHub Desktop.
Bluezero testing - Central-Peripheral testing
#!/usr/bin/env python
import logging
from bluezero import adapter, central, dbus_tools
from bluezero.device import Device
import dbus
import colorama
colorama.init()
from colorama import Fore, Style
from peri import decode_ieee11073
dongle = None
logger = logging.getLogger('bluezero.central')
logger.setLevel(logging.DEBUG)
loggerGATT = logging.getLogger('bluezero.GATT')
loggerGATT.setLevel(logging.DEBUG)
# Using the default instead
def on_notify(iface, changed_props, invalidated_props):
print('on_notify', flush=True)
def known_devices():
device_list = []
mng_objs = dbus_tools.get_managed_objects()
for path in mng_objs:
address = mng_objs[path].get('org.bluez.Device1', {}).get('Address')
if address:
device_list.append(str(address))
return device_list
def central_debug(central_role):
# Ensure we start disconnected
central_role.disconnect()
for service in central_role.services_available:
print('service -> ',service)
print('sevices resolved: ',central_role.services_resolved)
#'0000{0}-0000-1000-8000-00805F9B34FB'.format(uuid)
characteristic = central_role.add_characteristic(srv_uuid='12341000-1234-1234-1234-123456789abc',
chrc_uuid='0000{0}-0000-1000-8000-00805F9B34FB'.format('2A1C'))
print('1. added Characteristics to monitor', characteristic)
# print('1. notifying: ',characteristic.notifying)
print('Services resolved: ',central_role.services_resolved)
if not central_role.connected:
print('connecting')
central_role.connect()
while not central_role.services_resolved:
sleep(0.5)
print('Connected: ', central_role.connected)
print('Services resolved: ',central_role.services_resolved)
print('1 methods: ',characteristic.characteristic_methods)
# current_reading = characteristic.value
# current_reading.pop(0) # Unit indication
# print('1 read value: {}{:.3f}\u00B0C{}'.format(
# Fore.RED,
# decode_ieee11073(bytes(current_reading)),
# Style.RESET_ALL
# ))
central_role.load_gatt()
print('loaded GATT')
print('2. added Characteristics to monitor', characteristic)
print('2. notifying: ',characteristic.notifying)
print('2 methods: ',characteristic.characteristic_methods)
current_reading = characteristic.value
current_reading.pop(0) # Unit indication
print('2 read value: {}{:.3f}\u00B0C{}'.format(
Fore.RED,
decode_ieee11073(bytes(current_reading)),
Style.RESET_ALL
))
characteristic.add_characteristic_cb(on_notify)
print('added Callback')
characteristic.start_notify()
print('starting notify')
central_role.run()
def on_properties_changed(interface, changed, invalidated, path):
# print(interface, changed, invalidated, path)
# print('.', end='')
device_address = dbus_tools.get_device_address_from_dbus_path(path)
dev = Device(
adapter_addr = dongle.address,
device_addr = device_address
)
# print('!!! ', dev.alias)
# Try to filter here
try:
name = dev.name
if not name.startswith('TNARIK'):
return
except dbus.exceptions.DBusException:
# If there is no name we cannot filter and we ignore
print('.', end='', flush=True)
return
print('!!!!!!!!! NICE, got ',name)
if name == 'TNARIK_PICAKE' or name == 'TNARIK_PIW':
print('stopping discovery')
dongle.stop_discovery()
dongle.mainloop.quit()
print('try to connect')
print(dongle)
central_role = central.Central(dev.address)
central_debug(central_role)
def on_device_found(device):
try:
name = device.name
if not name.startswith('TNARIK'):
return
except dbus.exceptions.DBusException:
# If there is no name we cannot filter and we ignore
print('>>> GOT ',device.alias)
return
print('==========', flush=True)
print('FOUND ',device)
print('name: ', name)
print('alias: ', device.alias)
try:
print('tx_power: ', device.tx_power)
except dbus.exceptions.DBusException:
pass
print('paired: ', device.paired)
print('connected: ', device.connected)
print('trusted: ', device.trusted)
print('blocked: ', device.blocked)
print('legacy_pairing: ', device.legacy_pairing)
if name == 'TNARIK_PICAKE' or name == 'TNARIK_PIW':
print('stopping discovery')
dongle.stop_discovery()
print('try to connect')
print(dongle)
central_role = central.Central(device.address, dongle.address)
def main(dongle):
print('address: ', dongle.address)
print('name: ', dongle.name)
print('alias: ', dongle.alias)
print('discovering: ', dongle.discovering)
if not dongle.powered:
dongle.powered = True
print('Now powered: ', dongle.powered)
print('Start discovering')
# dongle.on_device_found = on_device_found # Skipping the InterfacesAdded as it is not responsive enough
# Could also take a look at the list of known devices, but instead of that we are going for the PropertiesChanged
# print(known_devices())
dongle.bus.add_signal_receiver(on_properties_changed,
dbus_interface=dbus.PROPERTIES_IFACE,
signal_name='PropertiesChanged',
arg0='org.bluez.Device1',
path_keyword='path')
dongle.nearby_discovery(timeout=45)
# device = Central(None, adapter_address)
if __name__ == '__main__':
# Get the default adapter address and pass it to main
dongle = list(adapter.Adapter.available())[0]
main(dongle)
#!/usr/bin/env python
# Standard modules
import logging
import random
import struct
import math
import colorama
colorama.init()
from colorama import Fore, Style
import functools # patching
# Bluezero modules
from bluezero import adapter
from bluezero import peripheral
from bluezero import dbus_tools
from bluezero import async_tools
from bluezero.advertisement import Advertisement
from gi.repository import GLib
import dbus
from pprint import pprint
try:
from sense_hat import SenseHat
except ModuleNotFoundError as e:
print('Error: ', e)
SenseHat = None
from agent3 import Agent as Agent
import socket
deviceAdvertisedName = 'TNARIK_{}'.format(socket.gethostname().upper())
DBUS_BLUEZ_CUSTOMAGENT_PATH = "/lecafeautomatique/bluetooth/agent" #"/test/agent"
DBUS_BLUEZ_INTERFACE_AGENTMANAGER_NAME = "org.bluez.AgentManager1"
DBUS_BLUEZ_SERVICE_NAME = "org.bluez"
DBUS_BLUEZ_PATH = "/org/bluez"
SERVICE_CUSTOM = '12341000-1234-1234-1234-123456789abc'
SERVICE_TEMPERATURE ='1809' # Let's use the Health Thermometer Service instead # Or not, on IOS I think this requires pairing...
CHARACTERISTIC_TEMPERATURE = '2A6E' # as 0.01˚C
CHARACTERISTIC_TEMPERATURE_MEASUREMENT = '2A1C' # Should be a float, therefore changing the format
DESCRIPTOR_CHARACTERISTIC_PRESENTATION_FORMAT = '2904' # DESCRIPTOR_CHARACTERISTIC PRESENTATION FORMAT
DESCRIPTOR_CLIENT_CHARACTERISTIC_CONFIGURATION = '2902'
MANUFACTURER_ID=0x01A9 # Faking Apple (0x004C) or Cannon (0x01A9)
# Adafruit docs are really useful, like most maker websites. See https://learn.adafruit.com/introduction-to-bluetooth-low-energy/
logger = logging.getLogger('localGATT')
logger.setLevel(logging.DEBUG)
def do_the_stick_clear(event):
if sense: sense.clear()
def do_the_stick_check(event):
if sense: sense.set_pixel(0,0,230,230,250)
if SenseHat is None:
sense = None
else:
try:
sense = SenseHat()
except OSError:
print("Issue setting up HAT. Allowing run")
sense = None
if sense:
sense.low_light = True
sense.clear()
sense.stick.direction_up = do_the_stick_clear
sense.stick.direction_down = do_the_stick_clear
sense.stick.direction_left = do_the_stick_clear
sense.stick.direction_right = do_the_stick_clear
sense.stick.direction_middle = do_the_stick_check
#Simple IEEE 11073 encoder/decoder with fix/guessed exponent
def encode_ieee11073(value, precision=None):
if precision is None:
# Try to provide the best precision
precision = 0
# num_decimals = int(math.log10(abs(value)))
mantissa = value * 10**precision
while (mantissa < 0x7FFFFE) and (mantissa > -0x800002):
precision += 1
mantissa = value * 10**precision
precision -= 1
mantissa = value * 10**precision
if (mantissa > 0x7FFFFE) or (mantissa < -0x800002):
# Cannot represent with that precision -> NRes
nre = bytearray.fromhex('800000')
nre.reverse()
return nre + struct.pack('<b', 0x00)
return int(mantissa).to_bytes(3, byteorder='little', signed=True) + struct.pack('<b', -precision)
def decode_ieee11073(value):
exponent = struct.unpack('<b', value[-1:])[0]
mantissa = int.from_bytes(value[:-1], byteorder='little', signed=True)
if exponent == 0x00:
# Cannot return NRes (not at this resolution, 0x800000) nor Reserved (0x800001)
if mantissa == 0x7FFFFF:
return float('NaN')
if mantissa == 0x7FFFFE:
return float('Inf')
if mantissa == -0x800002:
return float('-Inf')
if mantissa == -0x800000:
print("NRes")
return None
# print(hex(mantissa))
return (mantissa * (10 ** exponent))
def on_connect(device, thing):
logger.debug('connect {} to {}'.format(device.address, device.adapter))
# logger.debug('name: {}\nrssi: {}\nlegacy: {}\nmanu data: {}\npaired: {}\ntrusted: {}\n'
# .format(device.name, device.RSSI,
# device.legacy_pairing, device.manufacturer_data,
# device.paired, device.trusted))
if sense: sense.set_pixel(4,4,0,0,255)
print('set blue connect')
pass
def on_disconnect(selfaddr, peeraddr, thing):
'''
Not using the single parameter version (`device`) because in Peripheral mode Bluez doesn't seem to persis the device (at least sometimes)
'''
# selfaddr = device.adapter
# peeraddr = device.address
logger.debug('disconnect {} from {}'.format(peeraddr, selfaddr))
objectManager = dbus.Interface(dbus.SystemBus().get_object(DBUS_BLUEZ_SERVICE_NAME, '/'), "org.freedesktop.DBus.ObjectManager")
# print(objectManager.GetManagedObjects())
remote_path = None
if objectManager is not None:
for path, ifaces in objectManager.GetManagedObjects().items():
device = ifaces.get('org.bluez.Device1')
if device is None:
continue
else:
if device["Address"]:
print(device['Address'])
if device["Address"] == peeraddr:
print('THIS IS THE ONE')
obj = dbus.SystemBus().get_object(DBUS_BLUEZ_SERVICE_NAME, path)
print('Device Interface: ', dbus.Interface(obj, 'org.bluez.Device1'))
print('Device Interface path: ', dbus.Interface(obj, 'org.bluez.Device1').object_path)
print('----------')
remote_path=dbus.Interface(obj, 'org.bluez.Device1').object_path
# dbus.SystemBus().get_object(DBUS_BLUEZ_SERVICE_NAME, DBUS_BLUEZ_PATH)
# print('trusted {}'.format(device.trusted))
# print('paired {}'.format(device.paired))
# print('adapter {}'.format(device.adapter))
print("attempt removal/unbonding of {} aka {}".format(peeraddr, remote_path))
# print(device.remote_device_path)
if remote_path is not None:
try:
# print( '/org/bluez/hci0/dev_{}'.format(peeraddr.replace(':', '_')) )
# thing.dongle.remove_device('/org/bluez/hci0/dev_{}'.format(peeraddr.replace(':', '_'))) #device.remote_device_path)
# thing.dongle.remove_device(peeraddr)
thing.dongle.remove_device(remote_path)
print('Device removed in advance')
except Exception as e:
print(e)
print('Some Error ON ON_DISCONNECT')
pass
# pprint(device)
if sense: sense.set_pixel(0,4,255,0,0)
print('set red disconnect')
pass
notify_timer = None
def notify_callback(notifying, characteristic):
global notify_timer
print('notification timer: ',notify_timer)
if notifying:
if notify_timer is None:
if sense: sense.set_pixel(0,0,255,255,0)
print('set yellow NOTIFY')
notify_timer = GLib.timeout_add_seconds(2, refresh_value_temperature_measurement, characteristic)
else:
# This shouldn't be needed and it is just protecting for the case in which NOTIFY and INDICATE are BOTH active
# and triggering notification, due to some clients allowing it (nRF Connect Android)
if sense: sense.set_pixel(0,0,255,191,0)
print('set yellow NOTIFYING ALREADY')
else:
if sense: sense.set_pixel(0,0,255,0,0)
print('set red NOT NOTIFY')
# This is just to match the check above for some clients. In reality it would happen as the callback would auto terminate when moving out of notification mode
# and the problem with clients like nRF Connect Android is that notifications might not get disabled at all.
GLib.source_remove(notify_timer)
notify_timer = None
def write_value_temperature_measurement(value, options):
print('WRITTEN a value: ', value)
def read_value_temperature_measurement(visual_feedback = True):
value = (random.random()*40)-10
# logger.debug('temperature measurement is {:.3f}\u00B0C'.format(value))
temp = encode_ieee11073(value) # bytearray(struct.pack('<f', value))
logger.debug('Temperature measurement parsed as {}{:.3f}\u00B0C{}'.format(
Fore.RED,
decode_ieee11073(temp),
Style.RESET_ALL
))
# logger.debug('generated {}'.format(temp.hex()))
temp = bytearray(struct.pack('<B', 0x00)) + temp # Using Celsius (0x00) or Fahrenheit (0x01)
if visual_feedback:
if sense: sense.set_pixel(5,5,255,255,0)
print('set yellow READ')
return temp
def read_value_temperature(visual_feedback = True):
value = random.randrange(-10, 30)
logger.debug('temperature is {}\u00B0C'.format(value))
if visual_feedback:
# sense.set_pixel(5,5,255,0,0)
# print('set red read')
if sense: sense.clear()
if sense: sense.show_message('{} C'.format(value))
return list(int(value).to_bytes(2, byteorder='little', signed=True))
def refresh_value_temperature_measurement(characteristic):
logger.debug('Refreshing characteristic: {}'.format(characteristic.get_path()))
new_value = read_value_temperature_measurement(visual_feedback=False)
characteristic.set_value(new_value)
return characteristic.is_notifying
def main(adapter_address):
"""Creation of peripheral"""
# Example of the output from read_value
# logger.debug('CPU temperature is {}\u00B0C'.format(
# int.from_bytes(read_value(), byteorder='little', signed=True)))
current_reading = read_value_temperature_measurement()
current_reading.pop(0) # Unit indication
# logger.debug('Temperature measurement parsed as {:.2f}\u00B0C'.format(decode_ieee11073(current_reading)))
current_reading = read_value_temperature()
# logger.debug('Temperature parsed as {}\u00B0C'.format(int.from_bytes(current_reading, byteorder='little', signed=True)))
# Create peripheral
# Appearance comes from : https://specificationrefs.bluetooth.com/assigned-values/Appearance%20Values.pdf
thing = peripheral.Peripheral(adapter_address,
local_name=deviceAdvertisedName,
appearance=0x00C0 )
# 0x0086 - wearable computer size of a watch
# 0x00C2 - smart watch, in documentation, not in XML
# 0x00C0 - generic watch
# Additions to advertisement (company, where it mimics Apple, and some additional data)
#thing.advert.manufacturer_data(MANUFACTURER_ID, [0x00])#bytes('demonstrator', 'utf-8'))
# None of the below works the current advertisement interfaces uses an array, see: https://github.com/hadess/bluez/blob/master/doc/advertising-api.txt
#thing.advert.include_tx_power = dbus.Boolean(True)
#thing.advert.Set('org.bluez.LEAdvertisement1', 'IncludeTxPower', dbus.Boolean(True))
# Monkey patching
thing.advert.props['org.bluez.LEAdvertisement1']['Includes'] = []
#Patching thing.advert.GetAll
# def add_includes(interface_name):
# print('CALLING THE PATCHED wrapper')
# @functools.wraps(func)
# def add_includes_decorator(func):
# print('CALLING THE PATCHED')
# response = func(interface_name)
# # Our patch
# response['Includes'] = self.props[interface_name]['Includes']
# return response
# return add_includes_decorator
# def add_includes(func):
# print('CALLING THE PATCHED wrapper')
# @functools.wraps(func)
# def wrapper_getall(interface_name):
# print('CALLING THE PATCHED')
# response = func(interface_name)
# # Our patch
# response['Includes'] = self.props[interface_name]['Includes']
# return response
# return wrapper_getall
# thing.advert.GetAll = add_includes(thing.advert.GetAll)
old_getall = Advertisement.GetAll
@dbus.service.method('org.freedesktop.DBus.Properties',
in_signature='s',
out_signature='a{sv}')
def new_getall(self, interface_name):
# print('CALLING THE PATCHED 2')
response = old_getall(self, interface_name)
# Our patch
if 'Includes' in self.props[interface_name]:
if self.props[interface_name]['Includes'] is not None:
response['Includes'] = self.props[interface_name]['Includes']
return response
Advertisement.GetAll = new_getall
# pprint(thing.advert.GetAll('org.bluez.LEAdvertisement1'))
thing.advert.Set('org.bluez.LEAdvertisement1', 'Includes', dbus.Array(["tx-power"], signature='s'))
# Customization on the adapter itself
thing.dongle.alias = deviceAdvertisedName
debug_dongle(thing.dongle)
# Adding a second advertisement (EVEN BEFORE the first one). They can be seen on nRF Connect iOS, not on Android (local Name needs to be added, otherwise is lost)
# But this doesn't do what I thought (merging records), so I'll try the scan response approach
if False:
second_adv = Advertisement(2, 'peripheral')
second_adv.local_name = deviceAdvertisedName
# Manufacturer data as second adv
second_adv.manufacturer_data(MANUFACTURER_ID, bytes('demonstrator', 'utf-8'))
thing.ad_manager.register_advertisement(second_adv, {})
pprint(second_adv.GetAll('org.bluez.LEAdvertisement1'))
# Register Agent? shamelessly copying from https://github.com/myronww/python-bluetooth/blob/master/bluetooth_pin_agent
# Combined with https://github.com/pauloborges/bluez/blob/master/test/simple-agent
bluez_obj = dbus.SystemBus().get_object(DBUS_BLUEZ_SERVICE_NAME, DBUS_BLUEZ_PATH)
agent = Agent(dbus.SystemBus(), DBUS_BLUEZ_CUSTOMAGENT_PATH)
agent_manager = dbus.Interface(bluez_obj, DBUS_BLUEZ_INTERFACE_AGENTMANAGER_NAME)
resp = agent_manager.RegisterAgent(DBUS_BLUEZ_CUSTOMAGENT_PATH, Agent.AGENT_PAIRING_CAPABILITIES)
logger.debug("Pairing agent registered. resp={}".format(resp))
# This shouldn't be needed
if True:
resp = agent_manager.RequestDefaultAgent(DBUS_BLUEZ_CUSTOMAGENT_PATH)
logger.debug("Pairing agent set as default. resp={}".format(resp))
# Callbacks
thing.on_connect = lambda device: on_connect(device, thing)
thing.on_disconnect = lambda selfaddr, peeraddr: on_disconnect(selfaddr, peeraddr, thing)
# Appearance comes from https://www.bluetooth.com/specifications/assigned-numbers/ , in particular from https://specificationrefs.bluetooth.com/assigned-values/Appearance%20Values.pdf
# Services can be officially adopted (16bit UUIDs) or custom services (128bit)
# Characteristic and descriptor 16bit UUIDs are now at https://btprodspecificationrefs.blob.core.windows.net/assigned-values/16-bit%20UUID%20Numbers%20Document.pdf
##### NOT SURE HOW ALL OF THESE WORK
if True:
# Adding services: Temperature Measurement
thing.add_service(srv_id=1, uuid=SERVICE_CUSTOM, primary=True)
# Adding characteristics
thing.add_characteristic(srv_id=1, chr_id=1, uuid=CHARACTERISTIC_TEMPERATURE_MEASUREMENT,
value=[], notifying=False,
flags=['indicate', 'read'],
read_callback=read_value_temperature_measurement,
write_callback=write_value_temperature_measurement,
notify_callback=notify_callback
)
# As a note: should NOT use 'indicate' and 'notify' simultaneously
#, as a bad behaved client will end up allowing NOT disabling them
# (such as nrf Connect Android when tapping notify and then indicate)
# nrF Connect iOS doesn't seem to update.
# nrF Connect Android updates for indicate only
# The Client Characteristic Configuration Descriptor is taken care of automatically (it seems)
# thing.add_descriptor(srv_id=1, chr_id=1, dsc_id=1, uuid=DESCRIPTOR_CLIENT_CHARACTERISTIC_CONFIGURATION,
# value=[0x03, 0x00],
# flags=['read','write'])
# Adding services: Temperature
if False:
thing.add_service(srv_id=2, uuid=SERVICE_TEMPERATURE, primary=False)
# Adding characteristics
thing.add_characteristic(srv_id=2, chr_id=1, uuid=CHARACTERISTIC_TEMPERATURE,
value=[], notifying=False,
flags=['secure-read'],
read_callback=read_value_temperature,
write_callback=None,
notify_callback=None
)
# Adding descriptors: CHARACTERISTIC_TEMPERATURE
thing.add_descriptor(srv_id=2, chr_id=1, dsc_id=1, uuid=DESCRIPTOR_CHARACTERISTIC_PRESENTATION_FORMAT,
value=[0x0E, 0xFE, 0x2F, 0x27, 0x01, 0x00, 0x00],
flags=['read'])
# See some explanation about Characteristic Presentation Format here https://www.bluetooth.com/wp-content/uploads/Sitecore-Media-Library/Gatt/Xml/Descriptors/org.bluetooth.descriptor.gatt.characteristic_presentation_format.xml
# format: 0xE0 = 16bit signed int / 0x14 = 32bit float
# exponent (as signed int): 0xFE = -2 , 0x00 = 0 (BUT 0xFE is required if using `2A6E` as a characteristic
# Unit : Celsius 0x272F
# namespace 0x01 -> Bluetooth SIG Assigned Numbers
# description 0x00
# Publish peripheral and start event loop
print('main loop is : {}'.format(thing.mainloop))
thing.publish()
# async def sense_input():
# while True:
# pass
def debug_dongle(dongle):
print('address: ', dongle.address)
print('name: ', dongle.name)
print('alias: ', dongle.alias)
print('powered: ', dongle.powered)
print('pairable: ', dongle.pairable)
print('pairable timeout: ', dongle.pairabletimeout)
print('discoverable: ', dongle.discoverable)
print('discoverable timeout: ', dongle.discoverabletimeout)
print('discovering: ', dongle.discovering)
print('Powered: ', dongle.powered)
if not dongle.powered:
dongle.powered = True
print('Now powered: ', dongle.powered)
# dongle.pairable = False
if __name__ == '__main__':
# Get the default adapter address and pass it to main
print(list(adapter.Adapter.available())[0].address)
print(adapter.list_adapters())
debug_dongle(adapter.Adapter(adapter.list_adapters()[0]))
main(list(adapter.Adapter.available())[0].address)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment