Last active
September 2, 2019 13:27
-
-
Save dlashua/6a3473ea27a94dd4eba40fed7c9a6272 to your computer and use it in GitHub Desktop.
alexaalarms.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import appdaemon.plugins.hass.hassapi as hass | |
import datetime | |
import re | |
import pprint | |
from alexapy import AlexaLogin, AlexaAPI | |
BASE_DIR = "/conf/apps/" | |
ALEXA_DIR = BASE_DIR + "alexa_files/" | |
UPDATE_INTERVAL = 60 * 1 | |
DEVICE_UPDATE_INTERVAL = 60 * 30 | |
def alexa_path(x): | |
return ALEXA_DIR + x | |
class AlexaAlarms(hass.Hass): | |
def terminate(self): | |
self.deactivate_all_alarms() | |
def initialize(self): | |
self._email = self.args.get('email', None) | |
self._password = self.args.get('password', None) | |
self._url = self.args.get('url', None) | |
self._skip = self.args.get('skip', []) | |
self._ready = False | |
self._alarms = {} | |
self._timer_n = None | |
self._timer_d = None | |
self._devices = None | |
self.listen_event(self.captcha_cb, 'alexaalarms.captcha') | |
self.login = AlexaLogin( | |
self._url, | |
self._email, | |
self._password, | |
alexa_path, | |
False) | |
self.test_login() | |
def get_entity_id(self, entity): | |
return "sensor.{}_alarm".format(entity) | |
def get_entity_by_name(self, name): | |
return re.sub(r'([^\w])', '_', name).lower() | |
def get_entity_by_sn(self, sn): | |
return self.get_entity_by_name(self.get_name_by_sn(sn)) | |
def get_name_by_sn(self, sn): | |
try: | |
for d in self._devices: | |
if d['serialNumber'] == sn: | |
return d['accountName'] | |
except Exception: | |
return None | |
return None | |
def get_all_entities(self): | |
ret = [] | |
for d in self._devices: | |
if "serialNumber" not in d: | |
continue | |
if "accountName" not in d: | |
continue | |
if "capabilities" not in d: | |
continue | |
if "TIMERS_AND_ALARMS" not in d["capabilities"]: | |
continue | |
entity = self.get_entity_by_name(d["accountName"]) | |
ret.append(entity) | |
return ret | |
def set_alarm_state(self, entity, data): | |
if data['alarm'] is not None: | |
state = str(data['alarm']) | |
attributes = { | |
"timestamp": data['alarm'].timestamp() | |
} | |
else: | |
state = False | |
attributes = { | |
"timestamp": None | |
} | |
self.log("{}: State: {}".format( | |
self.get_entity_id(entity), | |
state)) | |
self.set_state( | |
self.get_entity_id(entity), | |
state=state, | |
attributes=attributes) | |
self._alarms[entity] = data | |
def deactivate_alarm(self, entity): | |
self.log("{}: State: {}".format( | |
self.get_entity_id(entity), | |
"unavailable")) | |
self.set_state( | |
self.get_entity_id(entity), | |
state='unavailable', | |
attributes={}) | |
del self._alarms[entity] | |
def deactivate_all_alarms(self): | |
for alarm_entity in list(self._alarms.keys()): | |
self.deactivate_alarm(alarm_entity) | |
def update_notifications(self): | |
self.log('Updating Notifications', level="DEBUG") | |
self.cancel_timer(self._timer_n) | |
if not self._ready: | |
self.log("Not Ready. (retry 10s)", level="WARNING") | |
self._timer_n = self.run_in(self.update_notifications_tcb, 10) | |
return | |
notifications = AlexaAPI.get_notifications(self.login) | |
alarms = {} | |
for a in self.get_all_entities(): | |
if a in self._skip: | |
continue | |
alarms[a] = { | |
"entity": a, | |
"alarm": None | |
} | |
for n in notifications: | |
entity = self.get_entity_by_sn(n['deviceSerialNumber']) | |
if entity in self._skip: | |
continue | |
alarm_time_obj = None | |
if n['status'] != "ON": | |
continue | |
if n['originalDate'] is None: | |
continue | |
if n['originalTime'] is None: | |
continue | |
alarm_time_string = n['originalDate'] + " " + n['originalTime'] | |
alarm_time_obj = datetime.datetime.strptime( | |
alarm_time_string, | |
"%Y-%m-%d %H:%M:%S.%f" | |
) | |
data = { | |
"entity": entity, | |
"alarm": alarm_time_obj, | |
} | |
self.log("ALARM SEEN FOR {}: {}".format(entity, n), level="DEBUG") | |
if entity in alarms and alarms[entity]['alarm'] is not None: | |
if data['alarm'] is not None: | |
if alarms[entity]['alarm'] > data['alarm']: | |
alarms[entity] = data | |
else: | |
alarms[entity] = data | |
self.log( | |
"ALARMS: {}".format( | |
pprint.pformat(alarms, indent=4)), | |
level="DEBUG") | |
for alarm_entity in list(self._alarms.keys()): | |
if alarm_entity not in alarms: | |
self.deactivate_alarm(alarm_entity) | |
for alarm_entity in alarms.keys(): | |
if (alarm_entity in self._alarms | |
and alarms[alarm_entity] == self._alarms[alarm_entity]): | |
pass | |
else: | |
self.set_alarm_state(alarm_entity, alarms[alarm_entity]) | |
self._timer_n = self.run_in(self.update_notifications_tcb, UPDATE_INTERVAL) | |
def update_notifications_tcb(self, kwargs): | |
self.update_notifications() | |
def update_notifications_ecb(self, event_name, data, kwargs): | |
self.update_notifications() | |
def update_devices_tcb(self, kwargs): | |
self.update_devices() | |
def update_devices(self): | |
self.log('Updating Devices', level="DEBUG") | |
self.cancel_timer(self._timer_d) | |
if not self._ready: | |
self.log("Not Ready. (retry 10s)", level="WARNING") | |
self._timer_d = self.run_in(self.update_devices_tcb, 10) | |
return | |
self._devices = AlexaAPI.get_devices(self.login) | |
self.log(pprint.pformat(self._devices, indent=4)) | |
self.log(self.get_all_entities(), level="INFO") | |
self._timer_d = self.run_in(self.update_devices_tcb, DEVICE_UPDATE_INTERVAL) | |
def make_ready(self): | |
if self._ready: | |
return | |
self.listen_event( | |
self.update_notifications_ecb, | |
"alexaalarms.update") | |
self._ready = True | |
self.update_devices() | |
self.update_notifications() | |
def captcha_cb(self, event_name, data, kwargs): | |
if "captcha" in data: | |
self.login.login(captcha=data['captcha']) | |
self.test_login() | |
def test_login(self): | |
if ('login_successful' in self.login.status | |
and self.login.status['login_successful']): | |
self.make_ready() | |
return | |
self._ready = False | |
if ('captcha_required' in self.login.status | |
and self.login.status['captcha_required']): | |
self.log("Captcha Required: {}".format( | |
self.login.status['captcha_image_url']), | |
level="ERROR") | |
return | |
self.log('login probably failed. {}'.format(self.login.status)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment