Created
June 24, 2018 05:57
-
-
Save jsbain/f734504daa44d0cb0621ba47c2caabb0 to your computer and use it in GitHub Desktop.
multicb.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cb, threading | |
class Service(object): | |
'''wrapper for a cb.Service. Can index characteristics by | |
s[uuid] or s[index] | |
''' | |
def __init__(self,service): | |
self.primary=service.primary | |
self.uuid=service.uuid | |
self.characteristics={} | |
self.service=service | |
def update(self,s): | |
for c in s.characteristics: | |
self.characteristics[c.uuid]=Characteristic(c) | |
def __getitem__(self,key): | |
if isinstance(key,int): | |
return list(self.characteristics.values())[key] | |
else: | |
return self.characteristics[key] | |
class CharacteristicProperties(object): | |
'''Parse property into a string''' | |
def __init__(self,prop): | |
self.authenticated_signed_writes=bool(prop & cb.CH_PROP_AUTHENTICATED_SIGNED_WRITES) | |
self.broadcast=bool(prop&cb.CH_PROP_BROADCAST) | |
self.extended=bool(prop&cb.CH_PROP_EXTENDED_PROPERTIES) | |
self.indicate=bool(prop&cb.CH_PROP_INDICATE) | |
self.indicate_encryption=bool(prop&cb.CH_PROP_INDICATE_ENCRYPTION_REQUIRED) | |
self.notify=bool(prop&cb.CH_PROP_NOTIFY) | |
self.notify_encryption=bool(prop&cb.CH_PROP_NOTIFY_ENCRYPTION_REQUIRED) | |
self.read=bool(prop&cb.CH_PROP_READ) | |
self.write=bool(prop&cb.CH_PROP_WRITE) | |
self.write_noresponse=bool(prop&cb.CH_PROP_WRITE_WITHOUT_RESPONSE) | |
def __repr__(self): | |
return ''.join(['r' * self.read, | |
'w' * self.write, | |
'W' * self.write_noresponse, | |
'n' * self.notify, | |
'N' * self.notify_encryption, | |
'i' * self.indicate, | |
'I' * self.indicate_encryption, | |
'b' * self.broadcast, | |
'x' * self.extended, | |
'a' * self.authenticated_signed_writes]) | |
class Characteristic(object): | |
def __init__(self,c): | |
self.characteristic=c | |
self.notifying=c.notifying | |
self.properties=CharacteristicProperties(c.properties) | |
self.uuid=c.uuid | |
self.value=c.value | |
def update_value(self,c): | |
self.notifying=c.notifying | |
self.properties=CharacteristicProperties(c.properties) | |
self.uuid=c.uuid | |
self.value=c.value | |
#PERIPHERAL STATES | |
DISCOVERED=0 | |
CONNECTING=1 | |
CONNECTED=2 | |
class Peripheral(object): | |
''' | |
Simple wrapper for a peripheral, that is actually kept up to date. | |
p[service][char] returns last read value of service/characteristic | |
''' | |
def __init__(self, p): | |
self.peripheral=p | |
self.name=p.name | |
self.services={} | |
self.uuid=p.uuid | |
self.state=p.state | |
self.connectionstate=DISCOVERED | |
def __getitem__(self,key): | |
'''return the service indicated by key, which can either be uuid, or index''' | |
if isinstance(key,int): | |
return list(self.services.values())[key] | |
else: | |
return self.services[key] | |
def found_services(self,p): | |
for s in p.services: | |
if s.uuid not in self.services: | |
self.services[s.uuid]=Service(s) | |
def connect(self): | |
cb.connect_peripheral(self.peripheral) | |
self.connectionstate=CONNECTING | |
def __repr__(p): | |
r='' | |
r+=f'Peripheral: {p.name} {p.uuid} {p.state}\n' | |
if p.connectionstate==DISCOVERED: | |
r+='\tDiscovered\n' | |
if p.connectionstate==CONNECTING: | |
r+='\tConnecting\n' | |
if p.connectionstate==CONNECTED: | |
r+='\tConnected\n' | |
for s in p.services.values(): | |
r+=f'\tService: {s.uuid} {"Primary" * s.primary}\n' | |
for c in s.characteristics.values(): | |
r+=f'\t\t: {c.uuid} {c.properties} {"Notifying"*c.notifying} = {c.value}\n' | |
return r | |
def run_async(func): | |
''' simple thread decorator''' | |
from threading import Thread | |
from functools import wraps | |
@wraps(func) | |
def async_func(*args, **kwargs): | |
func_hl = Thread(target = func, args = args, kwargs = kwargs) | |
func_hl.start() | |
return func_hl | |
return async_func | |
timeout=1.0 | |
class MultiManager(object): | |
def __init__(self): | |
self.peripherals={} | |
self.evt = threading.Event() | |
def scan(self): | |
cb.set_central_delegate(self) | |
cb.set_verbose(False) | |
try: | |
cb.scan_for_peripherals() | |
except KeyboardInterrupt: | |
cb.stop_scan() | |
@run_async | |
def update_all(self): | |
'''cycle through each known characteristic, and read the value. | |
try to read it, waiting for did_update_value so that we can keeptrack of what is happening | |
''' | |
for p in self.peripherals.values(): | |
for s in p.services.values(): | |
for c in s.characteristics.values(): | |
self.current_char_read=c | |
p.peripheral.read_characteristic_value(c.characteristic) | |
read=self.evt.wait(timeout) | |
self.evt.clear() | |
if not read: | |
print(f'*** {p.name} {p.uuid} could not read {c.uuid}') | |
def did_fail_to_connect_peripheral(self, p, error): | |
if p.uuid in self.peripherals: | |
self.peripherals[p.uuid].connectionstate=DISCOVERED | |
self.peripherals[p.uuid].state=p.state | |
print('Failed to connect: %s' % (error,)) | |
def did_disconnect_peripheral(self, p, error): | |
if p.uuid in self.peripherals: | |
print('Disconnected, error: %s' % (error,)) | |
self.peripherals[p.uuid].connectionstate=DISCOVERED | |
self.peripherals[p.uuid].state=p.state | |
def did_discover_services(self,p,error): | |
print(f'discovered {p.name} {p.uuid} services {len(p.services)}') | |
self.peripherals[p.uuid].found_services(p) | |
for s in p.services: | |
if len(self[p.uuid][s.uuid].characteristics)==0: | |
print(f'discovering characteristics for {s.uuid}') | |
p.discover_characteristics(s) | |
def did_discover_peripheral(self,p): | |
if p.uuid not in self.peripherals: | |
print(f'discovered {p.name} {p.uuid}') | |
self.peripherals[p.uuid]=Peripheral(p) | |
self.peripherals[p.uuid].connect() | |
def did_connect_peripheral(self,p): | |
print(f'connected {p.uuid}') | |
self.peripherals[p.uuid].state=p.state | |
self.peripherals[p.uuid].connectionstate=CONNECTED | |
p.discover_services() | |
def did_discover_characteristics(self,s,error): | |
print(f'discovered characteristics {s.uuid}') | |
for p in self.peripherals.values(): | |
if s.uuid in p.services: | |
p[s.uuid].update(s) | |
def did_update_value(self,c,error): | |
#print(f'Updated value {c.uuid} {c.value}') | |
self.current_char_read.update_value(c) | |
self.evt.set() | |
def __repr__(self): | |
r='' | |
for p in self.peripherals.values(): | |
r+=p.__repr__() | |
return r | |
def __getitem__(self,index): | |
''' return peripheral by index or uuid''' | |
if isinstance(index,str): | |
return self.peripherals[index] | |
elif isinstance(index,int): | |
return list(self.peripherals.values())[index] | |
m=MultiManager() | |
m.scan() | |
import time | |
m.stop=False | |
def stop(): | |
m.stop=True | |
@run_async | |
def check_status(): | |
while True: | |
if m.stop: | |
return | |
if len(m.peripherals)>0: | |
print(f'***polling characteristics for {len(m.peripherals)} peripherals***') | |
m.update_all() | |
time.sleep(3) | |
print(m) | |
time.sleep(3) | |
check_status() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment