Skip to content

Instantly share code, notes, and snippets.

@dexter3k
Last active August 31, 2021 18:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dexter3k/1b3b1ec250d810c9535cdca410dea101 to your computer and use it in GitHub Desktop.
Save dexter3k/1b3b1ec250d810c9535cdca410dea101 to your computer and use it in GitHub Desktop.
Data collection python module.
import sys
import asyncio
import logging
import bleak
from bleak import BleakClient
# NORDIC_UART_CHARACTERISTIC_RX = "6e400002-b5a3-f393-e0a9-e50e24dcca9e"
NORDIC_UART_CHARACTERISTIC_TX = "6e400003-b5a3-f393-e0a9-e50e24dcca9e"
DATAS = []
def callback(sender: str, data: bytearray):
global DATAS
devices = list(filter(lambda x: x['name'] == sender, DATAS))
if len(devices) == 0:
device = {'name': sender, 'outs': []}
DATAS += [device]
else:
device = devices[0]
first_name = data[0:6].hex()
first_level = int(data[6:7].hex(), 16)
first_count = int(data[7:9].hex(), 16)
data = data[9:]
second_name = data[0:6].hex()
second_level = int(data[6:7].hex(), 16)
second_count = int(data[7:9].hex(), 16)
outs = list(filter(lambda x: x['name'] == first_name, device['outs']))
if len(outs) == 0:
out1 = {'name': first_name, 'count': first_count, 'level': first_level}
device['outs'] += [out1]
else:
out1 = outs[0]
out1['count'] = first_count
out1['level'] = first_level
outs = list(filter(lambda x: x['name'] == second_name, device['outs']))
if len(outs) == 0:
out1 = {'name': second_name, 'count': second_count, 'level': second_level}
device['outs'] += [out1]
else:
out1 = outs[0]
out1['count'] = second_count
out1['level'] = second_level
text = f'{sender} =>'
for x in device['outs']:
name = x["name"]
name = ':'.join([name[i:i+2] for i in range(0, len(name), 2)])
text += f' {name}/{x["count"]:04x}/{x["level"]:02x}'
print(text)
with open(sys.argv[1], "a") as file:
file.write(text + "\n")
async def run(address, debug=False):
log = logging.getLogger(__name__)
if debug:
import sys
log.setLevel(logging.DEBUG)
h = logging.StreamHandler(sys.stdout)
h.setLevel(logging.DEBUG)
log.addHandler(h)
while True:
try:
async with BleakClient(address, timeout=100.0) as client:
await client.start_notify(NORDIC_UART_CHARACTERISTIC_TX, lambda _, d: callback(address, d))
await asyncio.sleep(5000.0)
except (bleak.exc.BleakError, RuntimeError, AttributeError, asyncio.exceptions.TimeoutError):
# print(f"retrying {address}")
pass
if __name__ == "__main__":
if len(sys.argv) == 0:
print("Please provide output filename as argument")
else:
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(asyncio.gather(*[
# run(("d6:ac:aa:8b:57:20"), True), # development board tag
# run(("fd:33:ed:f7:b4:0b"), True), # tag with dead battery
run(("c2:b6:ed:fd:75:88"), True),
run(("c9:0c:d4:28:6f:32"), True),
run(("df:8f:d1:70:f5:8b"), True),
run(("e6:e0:fa:e2:0f:b9"), True),
run(("e7:bc:aa:46:fd:19"), True),
run(("f2:d8:b7:af:70:e1"), True),
run(("f8:7e:71:a8:43:4e"), True),
run(("fb:ef:a9:53:4b:43"), True)
]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment