Skip to content

Instantly share code, notes, and snippets.

@mhaberler
Created May 21, 2022 09:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mhaberler/701eddf6ccd6200f747bbaabdd6f1fd8 to your computer and use it in GitHub Desktop.
Save mhaberler/701eddf6ccd6200f747bbaabdd6f1fd8 to your computer and use it in GitHub Desktop.
drive DSD Tech BLE Relay via Python & bleak
"""
Connect by BLEDevice
"""
import asyncio
import platform
import sys
from bleak import BleakClient, BleakScanner
from bleak.exc import BleakError
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
RELAY_UUID = "0000ffe1-0000-1000-8000-00805f9b34fb"
XX_UUID = '0000ffe0-0000-1000-8000-00805f9b34fb'
SVC_UUID = "0000ffe0-0000-1000-8000-00805f9b34fb"
SVC_UUID ="0000ffe1-0000-1000-8000-00805f9b34fb"
# Channel 1 ON: A00101A2
# Channel 1 OFF: A00100A1
# Channel 2 ON: A00201A3
# Channel 2 OFF: A00200A2
# Channel 3 ON: A00301A4
# Channel 3 OFF: A00300A3
# Channel 4 ON: A00401A5
# Channel 4 OFF: A00400A4
# channel: 1..4
# value: 0,1
def cmd(channel, value):
return bytearray([0xA0, channel, int(value), 0xA0 + channel + int(value)])
ADDRESS = (
"24:71:89:cc:09:05"
if platform.system() != "Darwin"
# else "B9EA5233-37EF-4DD6-87A8-2A875E821C46"
else "D1ABC82C-8B12-617A-00A0-1FD612307DD8"
)
channel = 1
DSD = 'DSD TECH'
def notification_handler(sender, data):
"""Simple notification handler which prints the data received."""
print("{0}: {1}".format(sender, data))
def match_dsd(device: BLEDevice, adv: AdvertisementData):
print(f"Ad: {device} {adv}")
if SVC_UUID in adv.service_uuids:
print("SVC_UUID match")
return True
if DSD in device.name:
print(f"DSD TECH match, {adv.service_uuids=}")
return True
async def main(ble_address: str):
# device = await BleakScanner.find_device_by_address(ble_address, timeout=20.0)
device = await BleakScanner.find_device_by_filter(match_dsd)
if not device:
raise BleakError(f"A device with address {ble_address} could not be found.")
async with BleakClient(device) as client:
print(f"Connected to: {client}")
print(f"device: {device=}")
# paired = await client.pair(protection_level=1)
# print(f"Paired with Ball: {paired}")
await client.start_notify(#client.service_uuids[0], notification_handler)
RELAY_UUID, notification_handler
) # ,force_indicate=True)
print(f"notification started: {client}")
on = False
while True:
on = not on
await client.write_gatt_char(RELAY_UUID, cmd(channel, on))
await asyncio.sleep(0.5)
await client.write_gatt_char(RELAY_UUID, cmd(2, on))
await asyncio.sleep(0.5)
if __name__ == "__main__":
asyncio.run(main(sys.argv[1] if len(sys.argv) == 2 else ADDRESS))
# try:
# loop=asyncio.get_event_loop()
# loop.run_until_complete(main())
# except asyncio.CancelledError:
# # task is cancelled on disconnect, so we ignore this error
# pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment