Skip to content

Instantly share code, notes, and snippets.

@lubieowoce
Created March 9, 2020 13:33
Show Gist options
  • Save lubieowoce/66a300abc9d6f012ced6a60ab050573e to your computer and use it in GitHub Desktop.
Save lubieowoce/66a300abc9d6f012ced6a60ab050573e to your computer and use it in GitHub Desktop.
import asyncio
import bleak
# https://os.mbed.com/teams/Bluetooth-Low-Energy/wiki/UART-access-over-BLE
# https://www.mgsuperlabs.co.in/estore/IoT-pHAT-with-header-for-Raspberry-Pi
RBL_SERVICE_UUID = "713D0000-503E-4C75-BA94-3148F18D941E"
RBL_CHAR_RX_UUID = "713D0003-503E-4C75-BA94-3148F18D941E"
# https://learn.adafruit.com/introducing-adafruit-ble-bluetooth-low-energy-friend/uart-service
ADA_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
ADA_CHAR_TX_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E"
SERVICE_UUIDS = [
"713D0000-503E-4C75-BA94-3148F18D941E",
"6E400001-B5A3-F393-E0A9-E50E24DCCA9E",
]
RX_CHARACTERISTIC_UUIDS = [
"713D0003-503E-4C75-BA94-3148F18D941E",
"6E400002-B5A3-F393-E0A9-E50E24DCCA9E",
]
TX = 0x0002
RX = 0x0003
BOARD_WIDTH = 11
BOARD_HEIGHT = 18
CHUNK_SIZE = 20
# S - start
# P - move
# E - end
BOARD = """
. . . . . . . . . . .
. . . . . . . . . . .
. . . . . . . . . . .
. . . . . . . . . . .
. . . . . . . . . . .
. . P . . . . . P . .
. . . . . . . . . . .
. . . . . . . . . . .
. . . . . . . . . . .
. . . . . . . . . . .
. . . . . . . . . . .
. . . . . . . . . . .
. P . . . . . . . P .
. . P . . . . . P . .
. . . P P P P P . . .
. . . . . . . . . . .
. . . . . . . . . . .
. . . . . . . . . . .
. . . . . . . . . . .
. . . . . . . . . . .
. . . . . . . . . . .
"""[1:-1]
def pulse(s):
return s.replace('P', '#').replace('S', 'P').replace('#', 'S')
def encode_board(s: str) -> str:
board = [[None for i in range(BOARD_WIDTH)] for j in range(BOARD_HEIGHT)]
for row, line in enumerate(s.splitlines()):
# line = line.strip()
for col, char in enumerate(line.split()):
# print(row, BOARD_HEIGHT-col-1, col, char, end=' ')
# [BOARD_HEIGHT-col-1][row] = char
board[BOARD_HEIGHT-row-1][col] = char
# print()
pprint(board)
board2 = sum(board, [])
# pprint(board2)
assert len(board2) == BOARD_WIDTH*BOARD_HEIGHT, board2
return 'l#' + ','.join(dot+str(i) for i,dot in enumerate(board2) if dot != '.') + '#'
def run(loop, coro): return loop.run_until_complete(coro)
def chunks(xs, n):
assert n >= 1
chunk = []
for x in xs:
chunk.append(x)
if len(chunk) == n:
yield chunk
chunk = []
if chunk:
yield chunk
async def find_moonboards():
devices = await bleak.discover()
return [d for d in devices if 'MoonBoard' in d.name]
async def moonboard_pulse():
moonboards = await find_moonboards()
if not moonboards:
print('no moonboards found')
return
dev = moonboards[0]
print('found', dev)
loop = asyncio.get_event_loop()
async with bleak.BleakClient(dev.address, loop=loop, address_type='public') as client:
c = await client.connect()
if not c:
print('not connected')
return
print('connected')
await client.write_gatt_char(ADA_CHAR_TX_UUID, b'l#S15,E20#')
await client.disconnect()
async def moonboard_pulse2(**kwargs):
addr = 'C5:D7:BD:EA:B3:F3'.lower()
loop = asyncio.get_event_loop()
async with bleak.BleakClient(addr, loop=loop, **kwargs) as client:
c = await client.connect()
if not c:
print('not connected')
return
print('connected')
await client.write_gatt_char(ADA_CHAR_TX_UUID, b'l#S15,E20#')
await client.disconnect()
async def scan_and_get_services():
devices = await bleak.discover()
if not devices:
print('no devices')
return
print('devices:')
print(*devices, sep='\n')
devices = {dev.address: dev for dev in devices}
loop = asyncio.get_event_loop()
device_services = {}
for dev in devices.values():
device_services[dev.address] = {'services': None, 'error': None}
try:
async with bleak.BleakClient(dev.address, loop=loop) as client:
services = await client.get_services()
# await client.write_gatt_char(_uuid: str, data: bytearray, response: bool = False) → None
# await services.get_service(_uuid) → bleak.backends.service.BleakGATTService
# await services.get_characteristic(_uuid) → bleak.backends.characteristic.BleakGATTCharacteristic
# await service.get_characteristic(_uuid) → Optional[bleak.backends.characteristic.BleakGATTCharacteristic]
print(dev)
device_services[dev.address].setdefault('services', []).append(services)
for ser in services:
print(ser.uuid)
print()
except Exception as err:
device_services[dev.address]['error'] = err
return devices, device_services
# if __name__ == '__main__':
# from pprint import pprint
# loop = asyncio.get_event_loop()
# devices, device_services = loop.run_until_complete(scan_and_get_services())
# pprint(device_services)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment