Skip to content

Instantly share code, notes, and snippets.

@jsbain
Created June 24, 2018 05:57
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save jsbain/f734504daa44d0cb0621ba47c2caabb0 to your computer and use it in GitHub Desktop.
Save jsbain/f734504daa44d0cb0621ba47c2caabb0 to your computer and use it in GitHub Desktop.
multicb.py
import cb, threading
class Service(object):
'''wrapper for a cb.Service. Can index characteristics by
s[uuid] or s[index]
'''
def __init__(self,service):
self.primary=service.primary
self.uuid=service.uuid
self.characteristics={}
self.service=service
def update(self,s):
for c in s.characteristics:
self.characteristics[c.uuid]=Characteristic(c)
def __getitem__(self,key):
if isinstance(key,int):
return list(self.characteristics.values())[key]
else:
return self.characteristics[key]
class CharacteristicProperties(object):
'''Parse property into a string'''
def __init__(self,prop):
self.authenticated_signed_writes=bool(prop & cb.CH_PROP_AUTHENTICATED_SIGNED_WRITES)
self.broadcast=bool(prop&cb.CH_PROP_BROADCAST)
self.extended=bool(prop&cb.CH_PROP_EXTENDED_PROPERTIES)
self.indicate=bool(prop&cb.CH_PROP_INDICATE)
self.indicate_encryption=bool(prop&cb.CH_PROP_INDICATE_ENCRYPTION_REQUIRED)
self.notify=bool(prop&cb.CH_PROP_NOTIFY)
self.notify_encryption=bool(prop&cb.CH_PROP_NOTIFY_ENCRYPTION_REQUIRED)
self.read=bool(prop&cb.CH_PROP_READ)
self.write=bool(prop&cb.CH_PROP_WRITE)
self.write_noresponse=bool(prop&cb.CH_PROP_WRITE_WITHOUT_RESPONSE)
def __repr__(self):
return ''.join(['r' * self.read,
'w' * self.write,
'W' * self.write_noresponse,
'n' * self.notify,
'N' * self.notify_encryption,
'i' * self.indicate,
'I' * self.indicate_encryption,
'b' * self.broadcast,
'x' * self.extended,
'a' * self.authenticated_signed_writes])
class Characteristic(object):
def __init__(self,c):
self.characteristic=c
self.notifying=c.notifying
self.properties=CharacteristicProperties(c.properties)
self.uuid=c.uuid
self.value=c.value
def update_value(self,c):
self.notifying=c.notifying
self.properties=CharacteristicProperties(c.properties)
self.uuid=c.uuid
self.value=c.value
#PERIPHERAL STATES
DISCOVERED=0
CONNECTING=1
CONNECTED=2
class Peripheral(object):
'''
Simple wrapper for a peripheral, that is actually kept up to date.
p[service][char] returns last read value of service/characteristic
'''
def __init__(self, p):
self.peripheral=p
self.name=p.name
self.services={}
self.uuid=p.uuid
self.state=p.state
self.connectionstate=DISCOVERED
def __getitem__(self,key):
'''return the service indicated by key, which can either be uuid, or index'''
if isinstance(key,int):
return list(self.services.values())[key]
else:
return self.services[key]
def found_services(self,p):
for s in p.services:
if s.uuid not in self.services:
self.services[s.uuid]=Service(s)
def connect(self):
cb.connect_peripheral(self.peripheral)
self.connectionstate=CONNECTING
def __repr__(p):
r=''
r+=f'Peripheral: {p.name} {p.uuid} {p.state}\n'
if p.connectionstate==DISCOVERED:
r+='\tDiscovered\n'
if p.connectionstate==CONNECTING:
r+='\tConnecting\n'
if p.connectionstate==CONNECTED:
r+='\tConnected\n'
for s in p.services.values():
r+=f'\tService: {s.uuid} {"Primary" * s.primary}\n'
for c in s.characteristics.values():
r+=f'\t\t: {c.uuid} {c.properties} {"Notifying"*c.notifying} = {c.value}\n'
return r
def run_async(func):
''' simple thread decorator'''
from threading import Thread
from functools import wraps
@wraps(func)
def async_func(*args, **kwargs):
func_hl = Thread(target = func, args = args, kwargs = kwargs)
func_hl.start()
return func_hl
return async_func
timeout=1.0
class MultiManager(object):
def __init__(self):
self.peripherals={}
self.evt = threading.Event()
def scan(self):
cb.set_central_delegate(self)
cb.set_verbose(False)
try:
cb.scan_for_peripherals()
except KeyboardInterrupt:
cb.stop_scan()
@run_async
def update_all(self):
'''cycle through each known characteristic, and read the value.
try to read it, waiting for did_update_value so that we can keeptrack of what is happening
'''
for p in self.peripherals.values():
for s in p.services.values():
for c in s.characteristics.values():
self.current_char_read=c
p.peripheral.read_characteristic_value(c.characteristic)
read=self.evt.wait(timeout)
self.evt.clear()
if not read:
print(f'*** {p.name} {p.uuid} could not read {c.uuid}')
def did_fail_to_connect_peripheral(self, p, error):
if p.uuid in self.peripherals:
self.peripherals[p.uuid].connectionstate=DISCOVERED
self.peripherals[p.uuid].state=p.state
print('Failed to connect: %s' % (error,))
def did_disconnect_peripheral(self, p, error):
if p.uuid in self.peripherals:
print('Disconnected, error: %s' % (error,))
self.peripherals[p.uuid].connectionstate=DISCOVERED
self.peripherals[p.uuid].state=p.state
def did_discover_services(self,p,error):
print(f'discovered {p.name} {p.uuid} services {len(p.services)}')
self.peripherals[p.uuid].found_services(p)
for s in p.services:
if len(self[p.uuid][s.uuid].characteristics)==0:
print(f'discovering characteristics for {s.uuid}')
p.discover_characteristics(s)
def did_discover_peripheral(self,p):
if p.uuid not in self.peripherals:
print(f'discovered {p.name} {p.uuid}')
self.peripherals[p.uuid]=Peripheral(p)
self.peripherals[p.uuid].connect()
def did_connect_peripheral(self,p):
print(f'connected {p.uuid}')
self.peripherals[p.uuid].state=p.state
self.peripherals[p.uuid].connectionstate=CONNECTED
p.discover_services()
def did_discover_characteristics(self,s,error):
print(f'discovered characteristics {s.uuid}')
for p in self.peripherals.values():
if s.uuid in p.services:
p[s.uuid].update(s)
def did_update_value(self,c,error):
#print(f'Updated value {c.uuid} {c.value}')
self.current_char_read.update_value(c)
self.evt.set()
def __repr__(self):
r=''
for p in self.peripherals.values():
r+=p.__repr__()
return r
def __getitem__(self,index):
''' return peripheral by index or uuid'''
if isinstance(index,str):
return self.peripherals[index]
elif isinstance(index,int):
return list(self.peripherals.values())[index]
m=MultiManager()
m.scan()
import time
m.stop=False
def stop():
m.stop=True
@run_async
def check_status():
while True:
if m.stop:
return
if len(m.peripherals)>0:
print(f'***polling characteristics for {len(m.peripherals)} peripherals***')
m.update_all()
time.sleep(3)
print(m)
time.sleep(3)
check_status()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment