Created
April 23, 2021 21:47
-
-
Save AustinBrunkhorst/df443cc58edf2c188da95280bd019736 to your computer and use it in GitHub Desktop.
SNOOZ Command Line Controller
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
from datetime import datetime | |
from threading import RLock | |
import bluepy.btle | |
# uuid of the service that controls snooz | |
SNOOZ_SERVICE_UUID = "729f0608496a47fea1243a62aaa3fbc0" | |
# uuid of the characteristic that reads snooz state | |
READ_STATE_UUID = "80c37f00-cc16-11e4-8830-0800200c9a66" | |
# uuid of the characteristic that writes snooz state | |
WRITE_STATE_UUID = "90759319-1668-44da-9ef3-492d593bd1e5" | |
# sequence of bytes to write when connected to the device | |
CONNECTION_SEQUENCE = [ | |
[0x07], | |
[0x06, 0xae, 0xf6, 0x74, 0x5a, 0x01, 0x18, 0x02, 0x24], | |
[0x07], | |
[0x0b, 0x31, 0xd2, 0x08, 0x00], | |
[0x0c], | |
[0x10], | |
] | |
# length in bytes of the read characteristic | |
STATE_UPDATE_LENGTH = 20 | |
# bytes that turn on the snooz | |
COMMAND_TURN_ON = [0x02, 0x01] | |
# bytes that turn off the snooz | |
COMMAND_TURN_OFF = [0x02, 0x00] | |
# interval to retry connecting | |
CONNECTION_RETRY_INTERVAL = 5 | |
# timeout for waiting on notifications | |
NOTIFICATION_TIMEOUT = 15 | |
# maximum age for a state update job | |
MAX_QUEUED_STATE_AGE = 10 | |
# maximum number of queued items | |
MAX_QUEUED_STATE_COUNT = 2 | |
class SnoozeDevice(): | |
on = False | |
percentage = 0 | |
connected = False | |
_running = False | |
_run_task = None | |
_sleep_task = None | |
_on_state_change = None | |
_state_lock = None | |
_sleep_lock = None | |
_queued_state = [] | |
def __init__(self, loop, on_state_change): | |
self._state_lock = RLock() | |
self._sleep_lock = RLock() | |
self.loop = loop | |
self._on_state_change = on_state_change | |
self._device = bluepy.btle.Peripheral() | |
def stop(self): | |
self._running = False | |
if self._run_task is not None and not self._run_task.cancelled(): | |
self._run_task.cancel() | |
self._run_task = None | |
if self.connected and self._device: | |
try: | |
self._device.disconnect() | |
except: | |
pass | |
self._cancel_sleep() | |
def start(self, address: str): | |
self._running = True | |
self._run_task = self.loop.create_task(self._async_run(address)) | |
async def _async_run(self, address: str): | |
print("[listening for {}]".format(address)) | |
while True: | |
if not self._running: | |
return | |
try: | |
if self.connected: | |
self._flush_queued_state() | |
self._update_state() | |
else: | |
self._device.connect(address) | |
self._init_connection() | |
except bluepy.btle.BTLEDisconnectError: | |
self._on_disconnected() | |
finally: | |
self._flush_queued_state() | |
await self._sleep() | |
def set_on(self, on: bool): | |
self._write_state(COMMAND_TURN_ON if on else COMMAND_TURN_OFF) | |
def set_percentage(self, percentage: int): | |
if percentage < 0 or percentage > 100: | |
raise Exception("Invalid percentage {}".format(percentage)) | |
self._write_state([0x01, percentage]) | |
def queue_state(self, state_writer: callable): | |
with self._state_lock: | |
self._queued_state.append((datetime.now(), state_writer)) | |
self._cancel_sleep() | |
def _flush_queued_state(self): | |
while self.connected: | |
task = None | |
with self._state_lock: | |
now = datetime.now() | |
# remove stale jobs | |
queued_state = [ | |
(created, task) for (created, task) in self._queued_state | |
if (now - created).seconds <= MAX_QUEUED_STATE_AGE | |
][-MAX_QUEUED_STATE_COUNT:] | |
if len(queued_state) == 0: | |
return | |
(_, task) = queued_state[0] | |
self._queued_state = queued_state | |
if task: | |
task() | |
if self.connected: | |
with self._state_lock: | |
self._queued_state.pop(0) | |
def _init_connection(self): | |
self.connected = True | |
service = self._device.getServiceByUUID(SNOOZ_SERVICE_UUID) | |
self._reader = service.getCharacteristics(READ_STATE_UUID)[0] | |
self._writer = service.getCharacteristics(WRITE_STATE_UUID)[0] | |
for bytes in CONNECTION_SEQUENCE: | |
self._write_state(bytes) | |
# load initial state | |
self._update_state() | |
def _on_disconnected(self): | |
print("[disconnected]") | |
was_connected = self.connected | |
self.connected = False | |
self._reader = None | |
self._writer = None | |
if was_connected: | |
self._on_state_change() | |
async def _sleep(self): | |
seconds = NOTIFICATION_TIMEOUT if self.connected else CONNECTION_RETRY_INTERVAL | |
with self._state_lock: | |
# use a small timeout when we have queued jobs | |
if len(self._queued_state) > 0: | |
seconds = 0.5 | |
with self._sleep_lock: | |
self._sleep_task = self.loop.create_task(asyncio.sleep(seconds)) | |
try: | |
await self._sleep_task | |
except asyncio.CancelledError: | |
pass | |
with self._sleep_lock: | |
self._sleep_task = None | |
def _cancel_sleep(self): | |
with self._sleep_lock: | |
if self._sleep_task: | |
self._sleep_task.cancel() | |
self._sleep_task = None | |
def _update_state(self): | |
if self.connected and self._reader: | |
try: | |
self._on_receive_state(self._reader.read()) | |
except: | |
self._on_disconnected() | |
def _on_receive_state(self, data: bytearray): | |
# malformed or unexpected data | |
if len(data) != STATE_UPDATE_LENGTH: | |
return | |
on = data[1] == 0x01 | |
percentage = data[0] | |
# state not changed | |
if on == self.on and percentage == self.percentage: | |
return | |
self.on = on | |
self.percentage = percentage | |
self._on_state_change() | |
def _write_state(self, state: list): | |
if self._writer == None: | |
return | |
try: | |
self._writer.write(bytearray(state), withResponse=True) | |
except: | |
self._on_disconnected() | |
import asyncio as aio | |
@aio.coroutine | |
def main(loop): | |
def on_state_change(): | |
print("[state change]: connected = {}, on = {}, speed = {}".format(device.connected, device.on, device.percentage)) | |
address = yield from loop.run_in_executor(None, input, "SNOOZ Bluetooth Address: ") | |
device = SnoozeDevice(loop, on_state_change) | |
device.start(address) | |
while True: | |
command = yield from loop.run_in_executor(None, input, "[quit | on | off | set {0-100}]:\n") | |
if command == "quit": | |
break | |
elif command == "": | |
continue | |
elif command == "on": | |
print("[turning on]") | |
def turn_on(): | |
device.set_on(True) | |
device.queue_state(turn_on) | |
elif command == "off": | |
print("[turning off]") | |
def turn_off(): | |
device.set_on(False) | |
device.queue_state(turn_off) | |
elif command.startswith("set "): | |
percentage = int(command.replace("set ", ""), base=10) | |
print("[setting speed to {}]".format(percentage)) | |
def set_level(): | |
device.set_percentage(percentage) | |
device.queue_state(set_level) | |
device.stop() | |
loop = aio.get_event_loop() | |
loop.run_until_complete(main(loop)) | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment