Skip to content

Instantly share code, notes, and snippets.

@dlashua
Last active September 2, 2019 13:27
Show Gist options
  • Save dlashua/6a3473ea27a94dd4eba40fed7c9a6272 to your computer and use it in GitHub Desktop.
Save dlashua/6a3473ea27a94dd4eba40fed7c9a6272 to your computer and use it in GitHub Desktop.
alexaalarms.py
import appdaemon.plugins.hass.hassapi as hass
import datetime
import re
import pprint
from alexapy import AlexaLogin, AlexaAPI
BASE_DIR = "/conf/apps/"
ALEXA_DIR = BASE_DIR + "alexa_files/"
UPDATE_INTERVAL = 60 * 1
DEVICE_UPDATE_INTERVAL = 60 * 30
def alexa_path(x):
return ALEXA_DIR + x
class AlexaAlarms(hass.Hass):
def terminate(self):
self.deactivate_all_alarms()
def initialize(self):
self._email = self.args.get('email', None)
self._password = self.args.get('password', None)
self._url = self.args.get('url', None)
self._skip = self.args.get('skip', [])
self._ready = False
self._alarms = {}
self._timer_n = None
self._timer_d = None
self._devices = None
self.listen_event(self.captcha_cb, 'alexaalarms.captcha')
self.login = AlexaLogin(
self._url,
self._email,
self._password,
alexa_path,
False)
self.test_login()
def get_entity_id(self, entity):
return "sensor.{}_alarm".format(entity)
def get_entity_by_name(self, name):
return re.sub(r'([^\w])', '_', name).lower()
def get_entity_by_sn(self, sn):
return self.get_entity_by_name(self.get_name_by_sn(sn))
def get_name_by_sn(self, sn):
try:
for d in self._devices:
if d['serialNumber'] == sn:
return d['accountName']
except Exception:
return None
return None
def get_all_entities(self):
ret = []
for d in self._devices:
if "serialNumber" not in d:
continue
if "accountName" not in d:
continue
if "capabilities" not in d:
continue
if "TIMERS_AND_ALARMS" not in d["capabilities"]:
continue
entity = self.get_entity_by_name(d["accountName"])
ret.append(entity)
return ret
def set_alarm_state(self, entity, data):
if data['alarm'] is not None:
state = str(data['alarm'])
attributes = {
"timestamp": data['alarm'].timestamp()
}
else:
state = False
attributes = {
"timestamp": None
}
self.log("{}: State: {}".format(
self.get_entity_id(entity),
state))
self.set_state(
self.get_entity_id(entity),
state=state,
attributes=attributes)
self._alarms[entity] = data
def deactivate_alarm(self, entity):
self.log("{}: State: {}".format(
self.get_entity_id(entity),
"unavailable"))
self.set_state(
self.get_entity_id(entity),
state='unavailable',
attributes={})
del self._alarms[entity]
def deactivate_all_alarms(self):
for alarm_entity in list(self._alarms.keys()):
self.deactivate_alarm(alarm_entity)
def update_notifications(self):
self.log('Updating Notifications', level="DEBUG")
self.cancel_timer(self._timer_n)
if not self._ready:
self.log("Not Ready. (retry 10s)", level="WARNING")
self._timer_n = self.run_in(self.update_notifications_tcb, 10)
return
notifications = AlexaAPI.get_notifications(self.login)
alarms = {}
for a in self.get_all_entities():
if a in self._skip:
continue
alarms[a] = {
"entity": a,
"alarm": None
}
for n in notifications:
entity = self.get_entity_by_sn(n['deviceSerialNumber'])
if entity in self._skip:
continue
alarm_time_obj = None
if n['status'] != "ON":
continue
if n['originalDate'] is None:
continue
if n['originalTime'] is None:
continue
alarm_time_string = n['originalDate'] + " " + n['originalTime']
alarm_time_obj = datetime.datetime.strptime(
alarm_time_string,
"%Y-%m-%d %H:%M:%S.%f"
)
data = {
"entity": entity,
"alarm": alarm_time_obj,
}
self.log("ALARM SEEN FOR {}: {}".format(entity, n), level="DEBUG")
if entity in alarms and alarms[entity]['alarm'] is not None:
if data['alarm'] is not None:
if alarms[entity]['alarm'] > data['alarm']:
alarms[entity] = data
else:
alarms[entity] = data
self.log(
"ALARMS: {}".format(
pprint.pformat(alarms, indent=4)),
level="DEBUG")
for alarm_entity in list(self._alarms.keys()):
if alarm_entity not in alarms:
self.deactivate_alarm(alarm_entity)
for alarm_entity in alarms.keys():
if (alarm_entity in self._alarms
and alarms[alarm_entity] == self._alarms[alarm_entity]):
pass
else:
self.set_alarm_state(alarm_entity, alarms[alarm_entity])
self._timer_n = self.run_in(self.update_notifications_tcb, UPDATE_INTERVAL)
def update_notifications_tcb(self, kwargs):
self.update_notifications()
def update_notifications_ecb(self, event_name, data, kwargs):
self.update_notifications()
def update_devices_tcb(self, kwargs):
self.update_devices()
def update_devices(self):
self.log('Updating Devices', level="DEBUG")
self.cancel_timer(self._timer_d)
if not self._ready:
self.log("Not Ready. (retry 10s)", level="WARNING")
self._timer_d = self.run_in(self.update_devices_tcb, 10)
return
self._devices = AlexaAPI.get_devices(self.login)
self.log(pprint.pformat(self._devices, indent=4))
self.log(self.get_all_entities(), level="INFO")
self._timer_d = self.run_in(self.update_devices_tcb, DEVICE_UPDATE_INTERVAL)
def make_ready(self):
if self._ready:
return
self.listen_event(
self.update_notifications_ecb,
"alexaalarms.update")
self._ready = True
self.update_devices()
self.update_notifications()
def captcha_cb(self, event_name, data, kwargs):
if "captcha" in data:
self.login.login(captcha=data['captcha'])
self.test_login()
def test_login(self):
if ('login_successful' in self.login.status
and self.login.status['login_successful']):
self.make_ready()
return
self._ready = False
if ('captcha_required' in self.login.status
and self.login.status['captcha_required']):
self.log("Captcha Required: {}".format(
self.login.status['captcha_image_url']),
level="ERROR")
return
self.log('login probably failed. {}'.format(self.login.status))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment