Created
May 21, 2022 09:26
-
-
Save mhaberler/701eddf6ccd6200f747bbaabdd6f1fd8 to your computer and use it in GitHub Desktop.
drive DSD Tech BLE Relay via Python & bleak
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Connect by BLEDevice | |
""" | |
import asyncio | |
import platform | |
import sys | |
from bleak import BleakClient, BleakScanner | |
from bleak.exc import BleakError | |
from bleak.backends.device import BLEDevice | |
from bleak.backends.scanner import AdvertisementData | |
RELAY_UUID = "0000ffe1-0000-1000-8000-00805f9b34fb" | |
XX_UUID = '0000ffe0-0000-1000-8000-00805f9b34fb' | |
SVC_UUID = "0000ffe0-0000-1000-8000-00805f9b34fb" | |
SVC_UUID ="0000ffe1-0000-1000-8000-00805f9b34fb" | |
# Channel 1 ON: A00101A2 | |
# Channel 1 OFF: A00100A1 | |
# Channel 2 ON: A00201A3 | |
# Channel 2 OFF: A00200A2 | |
# Channel 3 ON: A00301A4 | |
# Channel 3 OFF: A00300A3 | |
# Channel 4 ON: A00401A5 | |
# Channel 4 OFF: A00400A4 | |
# channel: 1..4 | |
# value: 0,1 | |
def cmd(channel, value): | |
return bytearray([0xA0, channel, int(value), 0xA0 + channel + int(value)]) | |
ADDRESS = ( | |
"24:71:89:cc:09:05" | |
if platform.system() != "Darwin" | |
# else "B9EA5233-37EF-4DD6-87A8-2A875E821C46" | |
else "D1ABC82C-8B12-617A-00A0-1FD612307DD8" | |
) | |
channel = 1 | |
DSD = 'DSD TECH' | |
def notification_handler(sender, data): | |
"""Simple notification handler which prints the data received.""" | |
print("{0}: {1}".format(sender, data)) | |
def match_dsd(device: BLEDevice, adv: AdvertisementData): | |
print(f"Ad: {device} {adv}") | |
if SVC_UUID in adv.service_uuids: | |
print("SVC_UUID match") | |
return True | |
if DSD in device.name: | |
print(f"DSD TECH match, {adv.service_uuids=}") | |
return True | |
async def main(ble_address: str): | |
# device = await BleakScanner.find_device_by_address(ble_address, timeout=20.0) | |
device = await BleakScanner.find_device_by_filter(match_dsd) | |
if not device: | |
raise BleakError(f"A device with address {ble_address} could not be found.") | |
async with BleakClient(device) as client: | |
print(f"Connected to: {client}") | |
print(f"device: {device=}") | |
# paired = await client.pair(protection_level=1) | |
# print(f"Paired with Ball: {paired}") | |
await client.start_notify(#client.service_uuids[0], notification_handler) | |
RELAY_UUID, notification_handler | |
) # ,force_indicate=True) | |
print(f"notification started: {client}") | |
on = False | |
while True: | |
on = not on | |
await client.write_gatt_char(RELAY_UUID, cmd(channel, on)) | |
await asyncio.sleep(0.5) | |
await client.write_gatt_char(RELAY_UUID, cmd(2, on)) | |
await asyncio.sleep(0.5) | |
if __name__ == "__main__": | |
asyncio.run(main(sys.argv[1] if len(sys.argv) == 2 else ADDRESS)) | |
# try: | |
# loop=asyncio.get_event_loop() | |
# loop.run_until_complete(main()) | |
# except asyncio.CancelledError: | |
# # task is cancelled on disconnect, so we ignore this error | |
# pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment