Skip to content

Instantly share code, notes, and snippets.

@Sylph Sylph/apps.yaml Secret
Last active Mar 12, 2019

Embed
What would you like to do?
Non binary appdaemon based presence detector for HomeAssistant based on https://philhawthorne.com/making-home-assistants-presence-detection-not-so-binary/
utils:
module: utils
class: utils
Presence:
file: /home/homeassistant/.appdaemon/storage/cache/presence
module: presence
class: Presence
dependencies:
- utils
debug: true
delays:
just_arrived: 15s
home: 5m
just_left: 20s
away: 15m
extended_away: 24h
persons:
- name: Sylvia
input_select_entity: input_select.sylvia_status
constrain_input_select: input_select.sylvia_status,Home,Just Arrived,Just left,Away,Extended Away
group_trackers:
- sylvia_is_home
- name: Wesley
input_select_entity: input_select.wesley_status
constrain_input_select: input_select.wesley_status,Home,Just Arrived,Just left,Away,Extended Away
group_trackers:
- wesley_is_home
manual_override_entity_id: input_boolean.presence_override
import appdaemon.plugins.hass.hassapi as hass
import shelve
import time
import datetime as dt
db_key = 'presence_data';
class Presence(hass.Hass):
def initialize(self):
if ('file' not in self.args):
self.debug_log("Error: missing 'file' configuration data")
return
self.run_in_timers = {}
self.utils = self.get_app('utils')
self.populate_duration_for()
self.populate_presence_config_dictionary()
self.device_db = shelve.open(self.args['file'])
if (db_key not in self.device_db or not isinstance(self.device_db[db_key], type({}))):
self.device_db[db_key] = {}
# Subscribe to presence changes
for name, presence_config in self.presence_config_dictionary.items():
if not isinstance(presence_config, Presence_Config):
self.debug_log("Presence Data is invalid")
if presence_config.device_trackers:
for device_tracker in presence_config.device_trackers:
self.listen_state(self._presence_changed, "device_tracker." + device_tracker, name=name, type='device')
if presence_config.group_trackers:
for group_tracker in presence_config.group_trackers:
self.listen_state(self._presence_changed, "group." + group_tracker, name=name, type='group')
self.listen_state(self._status_changed, presence_config.input_select_entity, name=name)
self.update_status(name)
def update_status(self, name):
self.clear_run_in_timer(name) # Clear any timer for [name]
if (self.manual_override_is_enabled()):
self.debug_log("Manual override is enabled. Halting presence automation.")
return
if (not name in self.presence_config_dictionary or not isinstance(self.presence_config_dictionary[name], Presence_Config)):
self.debug_log("Updating status of {} failed. {} is not in list".format(name, name))
return
presence = self.get_presence(name) #current home vs not_home
presence_data = self.device_db[db_key].get(name, Presence_Data(name))
delays = self.args['delays'] if 'delays' in self.args else None
next_status, delay = None, 0
self.debug_log("Updating {}'s status w/ presence: {} -> {}".format(name, presence_data.last_state, presence))
#Prepare new values
if (presence != presence_data.last_state): #If person returned or left
delays_config_name=0
if (presence == 'home'): #Updated state
delays_config_name, next_status = 'just_arrived', 'Just Arrived'
else:
delays_config_name, next_status = 'just_left', 'Just Left'
delay = self.get_delay_for(delays_config_name)
else: #stayed home/away
duration_in_state = presence_data.get_state_duration_in_seconds()
input_select_entity = self.presence_config_dictionary[name].input_select_entity
current_status = self.get_state(input_select_entity)
self.debug_log("Duration in state is {}".format(self.utils.seconds_to_str(duration_in_state)))
if (presence_data.last_state == 'home'): #still home
if (duration_in_state > self.get_delay_for('home')):
self.set_entity_value(name, 'Home')
elif (duration_in_state >= self.get_delay_for('just_arrived')):
if current_status != 'Just Arrived': #We have automation to handle the next step
self.set_entity_value(name, 'Just Arrived')
else: #Otherwise, we start a routine to start the automation routine to change this.
next_status = 'Home'
delay = self.get_delay_for('home') - duration_in_state
else:
next_status = 'Just Arrived'
delay = self.get_delay_for('just_arrived') - duration_in_state
else: #still away
if (duration_in_state >= self.get_delay_for('extended_away')):
self.set_entity_value(name, 'Extended Away')
elif (duration_in_state >= self.get_delay_for('away')):
if current_status != 'Away': #We have automation to handle the next step
self.set_entity_value(name, 'Away')
else: #Otherwise, we start a routine to change this.
next_status = 'Extended Away'
delay = self.get_delay_for('extended_away') - duration_in_state
elif (duration_in_state >= self.get_delay_for('just_left')):
if current_status != 'Just Left': #We have automation to handle the next step
self.set_entity_value(name, 'Just Left')
else: #Otherwise, we start a routine to start the automation routine to change this.
next_status = 'Away'
delay = self.get_delay_for('away') - duration_in_state
else:
next_status = 'Just Left'
delay = self.get_delay_for('just_left') - duration_in_state
if (next_status):
self.run_in_timers[name] = self.run_in(self._set_entity_value_callback, delay, person_name=name, new_status=next_status)
self.debug_log("Queued: Setting {}'s status to {} in {}.".format(name, next_status, self.utils.seconds_to_str(delay)))
presence_data.set_state(presence)
new_data = self.device_db[db_key]
new_data[name] = presence_data
self.device_db[db_key] = new_data
#################
### Callbacks ###
#################
def _set_entity_value_callback(self, kwargs):
self.set_entity_value(kwargs['person_name'], kwargs['new_status'])
#If status just changed. This is usually done manually in the UI.
def _status_changed(self, entity, attribute, old, new, kwargs):
name = kwargs['name']
self.debug_log("Status changed for {} ({}): ({} -> {})".format(name, entity, old, new))
if (self.manual_override_is_enabled()):
return
self.debug_log("Processing status changed routine for " + name)
self.clear_run_in_timer(name)
if new in ['Home', 'Extended Away']:
return
elif new == 'Just Arrived':
next_status, delay = 'Home', self.duration_for['home']
elif new == 'Just Left':
next_status, delay = 'Away', self.duration_for['away']
elif new == 'Away':
next_status, delay = 'Extended Away', self.duration_for['extended_away']
else:
self.debug_log("Error: Invalid new status")
if (delay): #Restart timer for next transition
input_select_entity = self.presence_config_dictionary[name].input_select_entity
self.debug_log("Queued (Status Changed): Setting entity value {}\n from {} to {} in {}".format(input_select_entity, new, next_status, self.utils.seconds_to_str(delay)))
self.run_in_timers[name] = self.run_in(self._set_entity_value_callback, delay, person_name=name, next_status=next_status)
def _presence_changed(self, entity, attribute, old, new, kwargs):
self.debug_log("Presence of type {} changed for:\n {} ({}): ({} -> {})".format(kwargs['type'], kwargs['name'], entity, old, new))
self.update_status(kwargs['name']);
##############
### Timers ###
##############
def clear_run_in_timer(self, name):
if (name in self.run_in_timers):
self.cancel_timer(self.run_in_timers.get(name))
################
### Set Data ###
################
def set_entity_value(self, name, new_status):
input_select_entity = self.presence_config_dictionary[name].input_select_entity
current_status = self.get_state(input_select_entity)
if (current_status != new_status):
self.debug_log("Setting entity value {} from {} to {}".format(input_select_entity, current_status, new_status))
self.set_state(input_select_entity, state=new_status)
################
### GET Data ###
################
def get_presence(self, name): #Return home/not_home based on device/group trackers. Considered home if any device is home.
if (not name in self.presence_config_dictionary or not isinstance(self.presence_config_dictionary[name], Presence_Config)):
self.debug_log("Getting status of " + name + " failed. ")
presence_config = self.presence_config_dictionary[name]
if presence_config.device_trackers:
for device_tracker in presence_config.device_trackers:
if (self.get_tracker_state("device_tracker." + device_tracker) == 'home'):
return 'home'
break
if presence_config.group_trackers:
for group_tracker in presence_config.group_trackers:
if (self.get_state("group." + group_tracker) == 'home'):
return 'home'
return 'not_home'
def manual_override_is_enabled(self): #Return True/False
return ('manual_override_switch_id' in self.args and \
self.get_state(self.args['manual_override_entity_id']) == True)
def get_delay_for(self, name):
delays = self.args['delays'] if 'delays' in self.args else None
delay = self.utils.convert_to_seconds(delays[name]) \
if (delays and name in delays) else 0
return max(0, delay)
##############################
### Initialization Methods ###
##############################
def populate_duration_for(self): # How long is person away/home to be considered....
self.duration_for = {}
self.duration_for['just_arrived'] = self.get_delay_for('just_arrived')
self.duration_for['home'] = self.get_delay_for('home')-self.get_delay_for('just_arrived')
self.duration_for['just_left'] = self.get_delay_for('just_left')
self.duration_for['away'] = self.get_delay_for('away')-self.get_delay_for('just_left')
self.duration_for['extended_away'] = self.get_delay_for('extended_away')-self.get_delay_for('away')
def populate_presence_config_dictionary(self):
if ('persons' not in self.args):
self.debug_log("Error: missing 'persons' configuration data")
self.args['persons'] = []
persons = self.args['persons']
self.presence_config_dictionary = {}
for config in persons:
if ('name' not in config):
self.debug_log("name is missing from config")
continue
if config['name'] in self.presence_config_dictionary:
self.debug_log("name needs to be unique. Skipping person: " + config['name'])
raise ValueError("name needs to be unique. Skipping person: " + config['name'])
continue
if ('input_select_entity' not in config):
self.debug_log("input_select_entity is missing from config")
continue
if ({'device_trackers','group_trackers'}.isdisjoint(config)):
self.debug_log("Either Device trackers or group trackers are required to populate presence configuration.")
self.debug_log(config['name'] + " cannot be tracked.")
continue
presence_config = Presence_Config(config['input_select_entity'])
if 'device_trackers' in config:
presence_config.device_trackers = config['device_trackers']
if 'group_trackers' in config:
presence_config.group_trackers = config['group_trackers']
self.presence_config_dictionary[config['name']] = presence_config
######################
### Helper Methods ###
######################
def debug_log(self, msg):
if (self.args['debug'] == 1):
self.log(msg)
###############
### Classes ###
###############
class Presence_Config:
def __init__(self, input_select_entity, device_trackers=[], group_trackers=[]):
self.input_select_entity = input_select_entity #Input select
self.device_trackers = device_trackers
self.group_trackers = group_trackers
class Presence_Data:
def __init__(self, name='Default name', current_state = 'home', last_state_change = dt.datetime.now(), last_update = dt.datetime.now()):
self.name = name
self.last_state = current_state
self.last_state_change = last_state_change
self.last_update = last_update
def set_state(self, new_state):
now = dt.datetime.now()
if (self.last_state != new_state):
self.last_state_change = now
self.last_state = new_state
self.last_update = now
def get_time_since_last_update(self):
return (dt.datetime.now() - self.last_update)
def get_state_duration(self):
return (dt.datetime.now() - self.last_state_change)
def get_state_duration_in_seconds(self):
return self.get_state_duration().total_seconds()
import appdaemon.plugins.hass.hassapi as hass
import shelve
import time
import datetime as dt
db_key = 'presence_data';
class Presence(hass.Hass):
def initialize(self):
if ('file' not in self.args):
self.debug_log("Error: missing 'file' configuration data")
return
self.run_in_timers = {}
self.utils = self.get_app('utils')
self.populate_duration_for()
self.populate_presence_config_dictionary()
self.device_db = shelve.open(self.args['file'])
if (db_key not in self.device_db or not isinstance(self.device_db[db_key], type({}))):
self.device_db[db_key] = {}
# Subscribe to presence changes
for name, presence_config in self.presence_config_dictionary.items():
if not isinstance(presence_config, Presence_Config):
self.debug_log("Presence Data is invalid")
if presence_config.device_trackers:
for device_tracker in presence_config.device_trackers:
self.listen_state(self._presence_changed, "device_tracker." + device_tracker, name=name, type='device')
if presence_config.group_trackers:
for group_tracker in presence_config.group_trackers:
self.listen_state(self._presence_changed, "group." + group_tracker, name=name, type='group')
self.listen_state(self._status_changed, presence_config.input_select_entity, name=name)
self.update_status(name)
def update_status(self, name):
self.clear_run_in_timer(name) # Clear any timer for [name]
if (self.manual_override_is_enabled()):
self.debug_log("Manual override is enabled. Halting presence automation.")
return
if (not name in self.presence_config_dictionary or not isinstance(self.presence_config_dictionary[name], Presence_Config)):
self.debug_log("Updating status of {} failed. {} is not in list".format(name, name))
return
presence = self.get_presence(name) #current home vs not_home
presence_data = self.device_db[db_key].get(name, Presence_Data(name))
delays = self.args['delays'] if 'delays' in self.args else None
next_status, delay = None, 0
self.debug_log("Updating {}'s status w/ presence: {} -> {}".format(name, presence_data.last_state, presence))
#Prepare new values
if (presence != presence_data.last_state): #If person returned or left
delays_config_name=0
if (presence == 'home'): #Updated state
delays_config_name, next_status = 'just_arrived', 'Just Arrived'
else:
delays_config_name, next_status = 'just_left', 'Just Left'
delay = self.get_delay_for(delays_config_name)
else: #stayed home/away
duration_in_state = presence_data.get_state_duration_in_seconds()
input_select_entity = self.presence_config_dictionary[name].input_select_entity
current_status = self.get_state(input_select_entity)
self.debug_log("Duration in state is {}".format(self.utils.seconds_to_str(duration_in_state)))
if (presence_data.last_state == 'home'): #still home
if (duration_in_state > self.get_delay_for('home')):
self.set_entity_value(name, 'Home')
elif (duration_in_state >= self.get_delay_for('just_arrived')):
if current_status != 'Just Arrived': #We have automation to handle the next step
self.set_entity_value(name, 'Just Arrived')
else: #Otherwise, we start a routine to start the automation routine to change this.
next_status = 'Home'
delay = self.get_delay_for('home') - duration_in_state
else:
next_status = 'Just Arrived'
delay = self.get_delay_for('just_arrived') - duration_in_state
else: #still away
if (duration_in_state >= self.get_delay_for('extended_away')):
self.set_entity_value(name, 'Extended Away')
elif (duration_in_state >= self.get_delay_for('away')):
if current_status != 'Away': #We have automation to handle the next step
self.set_entity_value(name, 'Away')
else: #Otherwise, we start a routine to change this.
next_status = 'Extended Away'
delay = self.get_delay_for('extended_away') - duration_in_state
elif (duration_in_state >= self.get_delay_for('just_left')):
if current_status != 'Just Left': #We have automation to handle the next step
self.set_entity_value(name, 'Just Left')
else: #Otherwise, we start a routine to start the automation routine to change this.
next_status = 'Away'
delay = self.get_delay_for('away') - duration_in_state
else:
next_status = 'Just Left'
delay = self.get_delay_for('just_left') - duration_in_state
if (next_status):
self.run_in_timers[name] = self.run_in(self._set_entity_value_callback, delay, person_name=name, new_status=next_status)
self.debug_log("Queued: Setting {}'s status to {} in {}.".format(name, next_status, self.utils.seconds_to_str(delay)))
presence_data.set_state(presence)
new_data = self.device_db[db_key]
new_data[name] = presence_data
self.device_db[db_key] = new_data
#################
### Callbacks ###
#################
def _set_entity_value_callback(self, kwargs):
self.set_entity_value(kwargs['person_name'], kwargs['new_status'])
#If status just changed. This is usually done manually in the UI.
def _status_changed(self, entity, attribute, old, new, kwargs):
name = kwargs['name']
self.debug_log("Status changed for {} ({}): ({} -> {})".format(name, entity, old, new))
if (self.manual_override_is_enabled()):
return
self.debug_log("Processing status changed routine for " + name)
self.clear_run_in_timer(name)
if new in ['Home', 'Extended Away']:
return
elif new == 'Just Arrived':
next_status, delay = 'Home', self.duration_for['home']
elif new == 'Just Left':
next_status, delay = 'Away', self.duration_for['away']
elif new == 'Away':
next_status, delay = 'Extended Away', self.duration_for['extended_away']
else:
self.debug_log("Error: Invalid new status")
if (delay): #Restart timer for next transition
input_select_entity = self.presence_config_dictionary[name].input_select_entity
self.debug_log("Queued (Status Changed): Setting entity value {}\n from {} to {} in {}".format(input_select_entity, new, next_status, self.utils.seconds_to_str(delay)))
self.run_in_timers[name] = self.run_in(self._set_entity_value_callback, delay, person_name=name, new_status=next_status)
def _presence_changed(self, entity, attribute, old, new, name, type, kwargs):
self.debug_log("Presence of type {} changed for:\n {} ({}): ({} -> {})".format(type, name, entity, old, new))
self.update_status(kwargs['name']);W
##############
### Timers ###
##############
def clear_run_in_timer(self, name):
if (name in self.run_in_timers):
self.cancel_timer(self.run_in_timers.get(name))
################
### Set Data ###
################
def set_entity_value(self, name, new_status):
input_select_entity = self.presence_config_dictionary[name].input_select_entity
current_status = self.get_state(input_select_entity)
if (current_status != new_status):
self.debug_log("Setting entity value {} from {} to {}".format(input_select_entity, current_status, new_status))
self.set_state(input_select_entity, state=new_status)
################
### GET Data ###
################
def get_presence(self, name): #Return home/not_home based on device/group trackers. Considered home if any device is home.
if (not name in self.presence_config_dictionary or not isinstance(self.presence_config_dictionary[name], Presence_Config)):
self.debug_log("Getting status of " + name + " failed. ")
presence_config = self.presence_config_dictionary[name]
if presence_config.device_trackers:
for device_tracker in presence_config.device_trackers:
if (self.get_tracker_state("device_tracker." + device_tracker) == 'home'):
return 'home'
break
if presence_config.group_trackers:
for group_tracker in presence_config.group_trackers:
if (self.get_state("group." + group_tracker) == 'home'):
return 'home'
return 'not_home'
def manual_override_is_enabled(self): #Return True/False
return ('manual_override_switch_id' in self.args and \
self.get_state(self.args['manual_override_entity_id']) == True)
def get_delay_for(self, name):
delays = self.args['delays'] if 'delays' in self.args else None
delay = self.utils.convert_to_seconds(delays[name]) \
if (delays and name in delays) else 0
return max(0, delay)
##############################
### Initialization Methods ###
##############################
def populate_duration_for(self): # How long is person away/home to be considered....
self.duration_for = {}
self.duration_for['just_arrived'] = self.get_delay_for('just_arrived')
self.duration_for['home'] = self.get_delay_for('home')-self.get_delay_for('just_arrived')
self.duration_for['just_left'] = self.get_delay_for('just_left')
self.duration_for['away'] = self.get_delay_for('away')-self.get_delay_for('just_left')
self.duration_for['extended_away'] = self.get_delay_for('extended_away')-self.get_delay_for('away')
def populate_presence_config_dictionary(self):
if ('persons' not in self.args):
self.debug_log("Error: missing 'persons' configuration data")
self.args['persons'] = []
persons = self.args['persons']
self.presence_config_dictionary = {}
for config in persons:
if ('name' not in config):
self.debug_log("name is missing from config")
continue
if config['name'] in self.presence_config_dictionary:
self.debug_log("name needs to be unique. Skipping person: " + config['name'])
raise ValueError("name needs to be unique. Skipping person: " + config['name'])
continue
if ('input_select_entity' not in config):
self.debug_log("input_select_entity is missing from config")
continue
if ({'device_trackers','group_trackers'}.isdisjoint(config)):
self.debug_log("Either Device trackers or group trackers are required to populate presence configuration.")
self.debug_log(config['name'] + " cannot be tracked.")
continue
presence_config = Presence_Config(config['input_select_entity'])
if 'device_trackers' in config:
presence_config.device_trackers = config['device_trackers']
if 'group_trackers' in config:
presence_config.group_trackers = config['group_trackers']
self.presence_config_dictionary[config['name']] = presence_config
######################
### Helper Methods ###
######################
def debug_log(self, msg):
if (self.args['debug'] == 1):
self.log(msg)
###############
### Classes ###
###############
class Presence_Config:
def __init__(self, input_select_entity, device_trackers=[], group_trackers=[]):
self.input_select_entity = input_select_entity #Input select
self.device_trackers = device_trackers
self.group_trackers = group_trackers
class Presence_Data:
def __init__(self, name='Default name', current_state = 'home', last_state_change = dt.datetime.now(), last_update = dt.datetime.now()):
self.name = name
self.last_state = current_state
self.last_state_change = last_state_change
self.last_update = last_update
def set_state(self, new_state):
now = dt.datetime.now()
if (self.last_state != new_state):
self.last_state_change = now
self.last_state = new_state
self.last_update = now
def get_time_since_last_update(self):
return (dt.datetime.now() - self.last_update)
def get_state_duration(self):
return (dt.datetime.now() - self.last_state_change)
def get_state_duration_in_seconds(self):
return self.get_state_duration().total_seconds()
import appdaemon.appapi as appapi
import json
import math
from datetime import datetime, timedelta
seconds_per_unit = {"s": 1, "m": 60, "h": 3600, "d": 86400, "w": 604800}
class utils(appapi.AppDaemon):
def convert_to_seconds(self, time):
return int(time[:-1]) * seconds_per_unit[time[-1]]
def seconds_to_str(self, seconds):
return timedelta(seconds=math.ceil(seconds))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.