Skip to content

Instantly share code, notes, and snippets.

@AustinBrunkhorst
Created April 23, 2021 21:47
Show Gist options
  • Save AustinBrunkhorst/df443cc58edf2c188da95280bd019736 to your computer and use it in GitHub Desktop.
Save AustinBrunkhorst/df443cc58edf2c188da95280bd019736 to your computer and use it in GitHub Desktop.
SNOOZ Command Line Controller
import asyncio
import logging
from datetime import datetime
from threading import RLock
import bluepy.btle
# uuid of the service that controls snooz
SNOOZ_SERVICE_UUID = "729f0608496a47fea1243a62aaa3fbc0"
# uuid of the characteristic that reads snooz state
READ_STATE_UUID = "80c37f00-cc16-11e4-8830-0800200c9a66"
# uuid of the characteristic that writes snooz state
WRITE_STATE_UUID = "90759319-1668-44da-9ef3-492d593bd1e5"
# sequence of bytes to write when connected to the device
CONNECTION_SEQUENCE = [
[0x07],
[0x06, 0xae, 0xf6, 0x74, 0x5a, 0x01, 0x18, 0x02, 0x24],
[0x07],
[0x0b, 0x31, 0xd2, 0x08, 0x00],
[0x0c],
[0x10],
]
# length in bytes of the read characteristic
STATE_UPDATE_LENGTH = 20
# bytes that turn on the snooz
COMMAND_TURN_ON = [0x02, 0x01]
# bytes that turn off the snooz
COMMAND_TURN_OFF = [0x02, 0x00]
# interval to retry connecting
CONNECTION_RETRY_INTERVAL = 5
# timeout for waiting on notifications
NOTIFICATION_TIMEOUT = 15
# maximum age for a state update job
MAX_QUEUED_STATE_AGE = 10
# maximum number of queued items
MAX_QUEUED_STATE_COUNT = 2
class SnoozeDevice():
on = False
percentage = 0
connected = False
_running = False
_run_task = None
_sleep_task = None
_on_state_change = None
_state_lock = None
_sleep_lock = None
_queued_state = []
def __init__(self, loop, on_state_change):
self._state_lock = RLock()
self._sleep_lock = RLock()
self.loop = loop
self._on_state_change = on_state_change
self._device = bluepy.btle.Peripheral()
def stop(self):
self._running = False
if self._run_task is not None and not self._run_task.cancelled():
self._run_task.cancel()
self._run_task = None
if self.connected and self._device:
try:
self._device.disconnect()
except:
pass
self._cancel_sleep()
def start(self, address: str):
self._running = True
self._run_task = self.loop.create_task(self._async_run(address))
async def _async_run(self, address: str):
print("[listening for {}]".format(address))
while True:
if not self._running:
return
try:
if self.connected:
self._flush_queued_state()
self._update_state()
else:
self._device.connect(address)
self._init_connection()
except bluepy.btle.BTLEDisconnectError:
self._on_disconnected()
finally:
self._flush_queued_state()
await self._sleep()
def set_on(self, on: bool):
self._write_state(COMMAND_TURN_ON if on else COMMAND_TURN_OFF)
def set_percentage(self, percentage: int):
if percentage < 0 or percentage > 100:
raise Exception("Invalid percentage {}".format(percentage))
self._write_state([0x01, percentage])
def queue_state(self, state_writer: callable):
with self._state_lock:
self._queued_state.append((datetime.now(), state_writer))
self._cancel_sleep()
def _flush_queued_state(self):
while self.connected:
task = None
with self._state_lock:
now = datetime.now()
# remove stale jobs
queued_state = [
(created, task) for (created, task) in self._queued_state
if (now - created).seconds <= MAX_QUEUED_STATE_AGE
][-MAX_QUEUED_STATE_COUNT:]
if len(queued_state) == 0:
return
(_, task) = queued_state[0]
self._queued_state = queued_state
if task:
task()
if self.connected:
with self._state_lock:
self._queued_state.pop(0)
def _init_connection(self):
self.connected = True
service = self._device.getServiceByUUID(SNOOZ_SERVICE_UUID)
self._reader = service.getCharacteristics(READ_STATE_UUID)[0]
self._writer = service.getCharacteristics(WRITE_STATE_UUID)[0]
for bytes in CONNECTION_SEQUENCE:
self._write_state(bytes)
# load initial state
self._update_state()
def _on_disconnected(self):
print("[disconnected]")
was_connected = self.connected
self.connected = False
self._reader = None
self._writer = None
if was_connected:
self._on_state_change()
async def _sleep(self):
seconds = NOTIFICATION_TIMEOUT if self.connected else CONNECTION_RETRY_INTERVAL
with self._state_lock:
# use a small timeout when we have queued jobs
if len(self._queued_state) > 0:
seconds = 0.5
with self._sleep_lock:
self._sleep_task = self.loop.create_task(asyncio.sleep(seconds))
try:
await self._sleep_task
except asyncio.CancelledError:
pass
with self._sleep_lock:
self._sleep_task = None
def _cancel_sleep(self):
with self._sleep_lock:
if self._sleep_task:
self._sleep_task.cancel()
self._sleep_task = None
def _update_state(self):
if self.connected and self._reader:
try:
self._on_receive_state(self._reader.read())
except:
self._on_disconnected()
def _on_receive_state(self, data: bytearray):
# malformed or unexpected data
if len(data) != STATE_UPDATE_LENGTH:
return
on = data[1] == 0x01
percentage = data[0]
# state not changed
if on == self.on and percentage == self.percentage:
return
self.on = on
self.percentage = percentage
self._on_state_change()
def _write_state(self, state: list):
if self._writer == None:
return
try:
self._writer.write(bytearray(state), withResponse=True)
except:
self._on_disconnected()
import asyncio as aio
@aio.coroutine
def main(loop):
def on_state_change():
print("[state change]: connected = {}, on = {}, speed = {}".format(device.connected, device.on, device.percentage))
address = yield from loop.run_in_executor(None, input, "SNOOZ Bluetooth Address: ")
device = SnoozeDevice(loop, on_state_change)
device.start(address)
while True:
command = yield from loop.run_in_executor(None, input, "[quit | on | off | set {0-100}]:\n")
if command == "quit":
break
elif command == "":
continue
elif command == "on":
print("[turning on]")
def turn_on():
device.set_on(True)
device.queue_state(turn_on)
elif command == "off":
print("[turning off]")
def turn_off():
device.set_on(False)
device.queue_state(turn_off)
elif command.startswith("set "):
percentage = int(command.replace("set ", ""), base=10)
print("[setting speed to {}]".format(percentage))
def set_level():
device.set_percentage(percentage)
device.queue_state(set_level)
device.stop()
loop = aio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment