Non binary appdaemon based presence detector for HomeAssistant based on https://philhawthorne.com/making-home-assistants-presence-detection-not-so-binary/
utils: | |
module: utils | |
class: utils | |
Presence: | |
file: /home/homeassistant/.appdaemon/storage/cache/presence | |
module: presence | |
class: Presence | |
dependencies: | |
- utils | |
debug: true | |
delays: | |
just_arrived: 15s | |
home: 5m | |
just_left: 20s | |
away: 15m | |
extended_away: 24h | |
persons: | |
- name: Sylvia | |
input_select_entity: input_select.sylvia_status | |
constrain_input_select: input_select.sylvia_status,Home,Just Arrived,Just left,Away,Extended Away | |
group_trackers: | |
- sylvia_is_home | |
- name: Wesley | |
input_select_entity: input_select.wesley_status | |
constrain_input_select: input_select.wesley_status,Home,Just Arrived,Just left,Away,Extended Away | |
group_trackers: | |
- wesley_is_home | |
manual_override_entity_id: input_boolean.presence_override |
import appdaemon.plugins.hass.hassapi as hass | |
import shelve | |
import time | |
import datetime as dt | |
db_key = 'presence_data'; | |
class Presence(hass.Hass): | |
def initialize(self): | |
if ('file' not in self.args): | |
self.debug_log("Error: missing 'file' configuration data") | |
return | |
self.run_in_timers = {} | |
self.utils = self.get_app('utils') | |
self.populate_duration_for() | |
self.populate_presence_config_dictionary() | |
self.device_db = shelve.open(self.args['file']) | |
if (db_key not in self.device_db or not isinstance(self.device_db[db_key], type({}))): | |
self.device_db[db_key] = {} | |
# Subscribe to presence changes | |
for name, presence_config in self.presence_config_dictionary.items(): | |
if not isinstance(presence_config, Presence_Config): | |
self.debug_log("Presence Data is invalid") | |
if presence_config.device_trackers: | |
for device_tracker in presence_config.device_trackers: | |
self.listen_state(self._presence_changed, "device_tracker." + device_tracker, name=name, type='device') | |
if presence_config.group_trackers: | |
for group_tracker in presence_config.group_trackers: | |
self.listen_state(self._presence_changed, "group." + group_tracker, name=name, type='group') | |
self.listen_state(self._status_changed, presence_config.input_select_entity, name=name) | |
self.update_status(name) | |
def update_status(self, name): | |
self.clear_run_in_timer(name) # Clear any timer for [name] | |
if (self.manual_override_is_enabled()): | |
self.debug_log("Manual override is enabled. Halting presence automation.") | |
return | |
if (not name in self.presence_config_dictionary or not isinstance(self.presence_config_dictionary[name], Presence_Config)): | |
self.debug_log("Updating status of {} failed. {} is not in list".format(name, name)) | |
return | |
presence = self.get_presence(name) #current home vs not_home | |
presence_data = self.device_db[db_key].get(name, Presence_Data(name)) | |
delays = self.args['delays'] if 'delays' in self.args else None | |
next_status, delay = None, 0 | |
self.debug_log("Updating {}'s status w/ presence: {} -> {}".format(name, presence_data.last_state, presence)) | |
#Prepare new values | |
if (presence != presence_data.last_state): #If person returned or left | |
delays_config_name=0 | |
if (presence == 'home'): #Updated state | |
delays_config_name, next_status = 'just_arrived', 'Just Arrived' | |
else: | |
delays_config_name, next_status = 'just_left', 'Just Left' | |
delay = self.get_delay_for(delays_config_name) | |
else: #stayed home/away | |
duration_in_state = presence_data.get_state_duration_in_seconds() | |
input_select_entity = self.presence_config_dictionary[name].input_select_entity | |
current_status = self.get_state(input_select_entity) | |
self.debug_log("Duration in state is {}".format(self.utils.seconds_to_str(duration_in_state))) | |
if (presence_data.last_state == 'home'): #still home | |
if (duration_in_state > self.get_delay_for('home')): | |
self.set_entity_value(name, 'Home') | |
elif (duration_in_state >= self.get_delay_for('just_arrived')): | |
if current_status != 'Just Arrived': #We have automation to handle the next step | |
self.set_entity_value(name, 'Just Arrived') | |
else: #Otherwise, we start a routine to start the automation routine to change this. | |
next_status = 'Home' | |
delay = self.get_delay_for('home') - duration_in_state | |
else: | |
next_status = 'Just Arrived' | |
delay = self.get_delay_for('just_arrived') - duration_in_state | |
else: #still away | |
if (duration_in_state >= self.get_delay_for('extended_away')): | |
self.set_entity_value(name, 'Extended Away') | |
elif (duration_in_state >= self.get_delay_for('away')): | |
if current_status != 'Away': #We have automation to handle the next step | |
self.set_entity_value(name, 'Away') | |
else: #Otherwise, we start a routine to change this. | |
next_status = 'Extended Away' | |
delay = self.get_delay_for('extended_away') - duration_in_state | |
elif (duration_in_state >= self.get_delay_for('just_left')): | |
if current_status != 'Just Left': #We have automation to handle the next step | |
self.set_entity_value(name, 'Just Left') | |
else: #Otherwise, we start a routine to start the automation routine to change this. | |
next_status = 'Away' | |
delay = self.get_delay_for('away') - duration_in_state | |
else: | |
next_status = 'Just Left' | |
delay = self.get_delay_for('just_left') - duration_in_state | |
if (next_status): | |
self.run_in_timers[name] = self.run_in(self._set_entity_value_callback, delay, person_name=name, new_status=next_status) | |
self.debug_log("Queued: Setting {}'s status to {} in {}.".format(name, next_status, self.utils.seconds_to_str(delay))) | |
presence_data.set_state(presence) | |
new_data = self.device_db[db_key] | |
new_data[name] = presence_data | |
self.device_db[db_key] = new_data | |
################# | |
### Callbacks ### | |
################# | |
def _set_entity_value_callback(self, kwargs): | |
self.set_entity_value(kwargs['person_name'], kwargs['new_status']) | |
#If status just changed. This is usually done manually in the UI. | |
def _status_changed(self, entity, attribute, old, new, kwargs): | |
name = kwargs['name'] | |
self.debug_log("Status changed for {} ({}): ({} -> {})".format(name, entity, old, new)) | |
if (self.manual_override_is_enabled()): | |
return | |
self.debug_log("Processing status changed routine for " + name) | |
self.clear_run_in_timer(name) | |
if new in ['Home', 'Extended Away']: | |
return | |
elif new == 'Just Arrived': | |
next_status, delay = 'Home', self.duration_for['home'] | |
elif new == 'Just Left': | |
next_status, delay = 'Away', self.duration_for['away'] | |
elif new == 'Away': | |
next_status, delay = 'Extended Away', self.duration_for['extended_away'] | |
else: | |
self.debug_log("Error: Invalid new status") | |
if (delay): #Restart timer for next transition | |
input_select_entity = self.presence_config_dictionary[name].input_select_entity | |
self.debug_log("Queued (Status Changed): Setting entity value {}\n from {} to {} in {}".format(input_select_entity, new, next_status, self.utils.seconds_to_str(delay))) | |
self.run_in_timers[name] = self.run_in(self._set_entity_value_callback, delay, person_name=name, next_status=next_status) | |
def _presence_changed(self, entity, attribute, old, new, kwargs): | |
self.debug_log("Presence of type {} changed for:\n {} ({}): ({} -> {})".format(kwargs['type'], kwargs['name'], entity, old, new)) | |
self.update_status(kwargs['name']); | |
############## | |
### Timers ### | |
############## | |
def clear_run_in_timer(self, name): | |
if (name in self.run_in_timers): | |
self.cancel_timer(self.run_in_timers.get(name)) | |
################ | |
### Set Data ### | |
################ | |
def set_entity_value(self, name, new_status): | |
input_select_entity = self.presence_config_dictionary[name].input_select_entity | |
current_status = self.get_state(input_select_entity) | |
if (current_status != new_status): | |
self.debug_log("Setting entity value {} from {} to {}".format(input_select_entity, current_status, new_status)) | |
self.set_state(input_select_entity, state=new_status) | |
################ | |
### GET Data ### | |
################ | |
def get_presence(self, name): #Return home/not_home based on device/group trackers. Considered home if any device is home. | |
if (not name in self.presence_config_dictionary or not isinstance(self.presence_config_dictionary[name], Presence_Config)): | |
self.debug_log("Getting status of " + name + " failed. ") | |
presence_config = self.presence_config_dictionary[name] | |
if presence_config.device_trackers: | |
for device_tracker in presence_config.device_trackers: | |
if (self.get_tracker_state("device_tracker." + device_tracker) == 'home'): | |
return 'home' | |
break | |
if presence_config.group_trackers: | |
for group_tracker in presence_config.group_trackers: | |
if (self.get_state("group." + group_tracker) == 'home'): | |
return 'home' | |
return 'not_home' | |
def manual_override_is_enabled(self): #Return True/False | |
return ('manual_override_switch_id' in self.args and \ | |
self.get_state(self.args['manual_override_entity_id']) == True) | |
def get_delay_for(self, name): | |
delays = self.args['delays'] if 'delays' in self.args else None | |
delay = self.utils.convert_to_seconds(delays[name]) \ | |
if (delays and name in delays) else 0 | |
return max(0, delay) | |
############################## | |
### Initialization Methods ### | |
############################## | |
def populate_duration_for(self): # How long is person away/home to be considered.... | |
self.duration_for = {} | |
self.duration_for['just_arrived'] = self.get_delay_for('just_arrived') | |
self.duration_for['home'] = self.get_delay_for('home')-self.get_delay_for('just_arrived') | |
self.duration_for['just_left'] = self.get_delay_for('just_left') | |
self.duration_for['away'] = self.get_delay_for('away')-self.get_delay_for('just_left') | |
self.duration_for['extended_away'] = self.get_delay_for('extended_away')-self.get_delay_for('away') | |
def populate_presence_config_dictionary(self): | |
if ('persons' not in self.args): | |
self.debug_log("Error: missing 'persons' configuration data") | |
self.args['persons'] = [] | |
persons = self.args['persons'] | |
self.presence_config_dictionary = {} | |
for config in persons: | |
if ('name' not in config): | |
self.debug_log("name is missing from config") | |
continue | |
if config['name'] in self.presence_config_dictionary: | |
self.debug_log("name needs to be unique. Skipping person: " + config['name']) | |
raise ValueError("name needs to be unique. Skipping person: " + config['name']) | |
continue | |
if ('input_select_entity' not in config): | |
self.debug_log("input_select_entity is missing from config") | |
continue | |
if ({'device_trackers','group_trackers'}.isdisjoint(config)): | |
self.debug_log("Either Device trackers or group trackers are required to populate presence configuration.") | |
self.debug_log(config['name'] + " cannot be tracked.") | |
continue | |
presence_config = Presence_Config(config['input_select_entity']) | |
if 'device_trackers' in config: | |
presence_config.device_trackers = config['device_trackers'] | |
if 'group_trackers' in config: | |
presence_config.group_trackers = config['group_trackers'] | |
self.presence_config_dictionary[config['name']] = presence_config | |
###################### | |
### Helper Methods ### | |
###################### | |
def debug_log(self, msg): | |
if (self.args['debug'] == 1): | |
self.log(msg) | |
############### | |
### Classes ### | |
############### | |
class Presence_Config: | |
def __init__(self, input_select_entity, device_trackers=[], group_trackers=[]): | |
self.input_select_entity = input_select_entity #Input select | |
self.device_trackers = device_trackers | |
self.group_trackers = group_trackers | |
class Presence_Data: | |
def __init__(self, name='Default name', current_state = 'home', last_state_change = dt.datetime.now(), last_update = dt.datetime.now()): | |
self.name = name | |
self.last_state = current_state | |
self.last_state_change = last_state_change | |
self.last_update = last_update | |
def set_state(self, new_state): | |
now = dt.datetime.now() | |
if (self.last_state != new_state): | |
self.last_state_change = now | |
self.last_state = new_state | |
self.last_update = now | |
def get_time_since_last_update(self): | |
return (dt.datetime.now() - self.last_update) | |
def get_state_duration(self): | |
return (dt.datetime.now() - self.last_state_change) | |
def get_state_duration_in_seconds(self): | |
return self.get_state_duration().total_seconds() |
import appdaemon.plugins.hass.hassapi as hass | |
import shelve | |
import time | |
import datetime as dt | |
db_key = 'presence_data'; | |
class Presence(hass.Hass): | |
def initialize(self): | |
if ('file' not in self.args): | |
self.debug_log("Error: missing 'file' configuration data") | |
return | |
self.run_in_timers = {} | |
self.utils = self.get_app('utils') | |
self.populate_duration_for() | |
self.populate_presence_config_dictionary() | |
self.device_db = shelve.open(self.args['file']) | |
if (db_key not in self.device_db or not isinstance(self.device_db[db_key], type({}))): | |
self.device_db[db_key] = {} | |
# Subscribe to presence changes | |
for name, presence_config in self.presence_config_dictionary.items(): | |
if not isinstance(presence_config, Presence_Config): | |
self.debug_log("Presence Data is invalid") | |
if presence_config.device_trackers: | |
for device_tracker in presence_config.device_trackers: | |
self.listen_state(self._presence_changed, "device_tracker." + device_tracker, name=name, type='device') | |
if presence_config.group_trackers: | |
for group_tracker in presence_config.group_trackers: | |
self.listen_state(self._presence_changed, "group." + group_tracker, name=name, type='group') | |
self.listen_state(self._status_changed, presence_config.input_select_entity, name=name) | |
self.update_status(name) | |
def update_status(self, name): | |
self.clear_run_in_timer(name) # Clear any timer for [name] | |
if (self.manual_override_is_enabled()): | |
self.debug_log("Manual override is enabled. Halting presence automation.") | |
return | |
if (not name in self.presence_config_dictionary or not isinstance(self.presence_config_dictionary[name], Presence_Config)): | |
self.debug_log("Updating status of {} failed. {} is not in list".format(name, name)) | |
return | |
presence = self.get_presence(name) #current home vs not_home | |
presence_data = self.device_db[db_key].get(name, Presence_Data(name)) | |
delays = self.args['delays'] if 'delays' in self.args else None | |
next_status, delay = None, 0 | |
self.debug_log("Updating {}'s status w/ presence: {} -> {}".format(name, presence_data.last_state, presence)) | |
#Prepare new values | |
if (presence != presence_data.last_state): #If person returned or left | |
delays_config_name=0 | |
if (presence == 'home'): #Updated state | |
delays_config_name, next_status = 'just_arrived', 'Just Arrived' | |
else: | |
delays_config_name, next_status = 'just_left', 'Just Left' | |
delay = self.get_delay_for(delays_config_name) | |
else: #stayed home/away | |
duration_in_state = presence_data.get_state_duration_in_seconds() | |
input_select_entity = self.presence_config_dictionary[name].input_select_entity | |
current_status = self.get_state(input_select_entity) | |
self.debug_log("Duration in state is {}".format(self.utils.seconds_to_str(duration_in_state))) | |
if (presence_data.last_state == 'home'): #still home | |
if (duration_in_state > self.get_delay_for('home')): | |
self.set_entity_value(name, 'Home') | |
elif (duration_in_state >= self.get_delay_for('just_arrived')): | |
if current_status != 'Just Arrived': #We have automation to handle the next step | |
self.set_entity_value(name, 'Just Arrived') | |
else: #Otherwise, we start a routine to start the automation routine to change this. | |
next_status = 'Home' | |
delay = self.get_delay_for('home') - duration_in_state | |
else: | |
next_status = 'Just Arrived' | |
delay = self.get_delay_for('just_arrived') - duration_in_state | |
else: #still away | |
if (duration_in_state >= self.get_delay_for('extended_away')): | |
self.set_entity_value(name, 'Extended Away') | |
elif (duration_in_state >= self.get_delay_for('away')): | |
if current_status != 'Away': #We have automation to handle the next step | |
self.set_entity_value(name, 'Away') | |
else: #Otherwise, we start a routine to change this. | |
next_status = 'Extended Away' | |
delay = self.get_delay_for('extended_away') - duration_in_state | |
elif (duration_in_state >= self.get_delay_for('just_left')): | |
if current_status != 'Just Left': #We have automation to handle the next step | |
self.set_entity_value(name, 'Just Left') | |
else: #Otherwise, we start a routine to start the automation routine to change this. | |
next_status = 'Away' | |
delay = self.get_delay_for('away') - duration_in_state | |
else: | |
next_status = 'Just Left' | |
delay = self.get_delay_for('just_left') - duration_in_state | |
if (next_status): | |
self.run_in_timers[name] = self.run_in(self._set_entity_value_callback, delay, person_name=name, new_status=next_status) | |
self.debug_log("Queued: Setting {}'s status to {} in {}.".format(name, next_status, self.utils.seconds_to_str(delay))) | |
presence_data.set_state(presence) | |
new_data = self.device_db[db_key] | |
new_data[name] = presence_data | |
self.device_db[db_key] = new_data | |
################# | |
### Callbacks ### | |
################# | |
def _set_entity_value_callback(self, kwargs): | |
self.set_entity_value(kwargs['person_name'], kwargs['new_status']) | |
#If status just changed. This is usually done manually in the UI. | |
def _status_changed(self, entity, attribute, old, new, kwargs): | |
name = kwargs['name'] | |
self.debug_log("Status changed for {} ({}): ({} -> {})".format(name, entity, old, new)) | |
if (self.manual_override_is_enabled()): | |
return | |
self.debug_log("Processing status changed routine for " + name) | |
self.clear_run_in_timer(name) | |
if new in ['Home', 'Extended Away']: | |
return | |
elif new == 'Just Arrived': | |
next_status, delay = 'Home', self.duration_for['home'] | |
elif new == 'Just Left': | |
next_status, delay = 'Away', self.duration_for['away'] | |
elif new == 'Away': | |
next_status, delay = 'Extended Away', self.duration_for['extended_away'] | |
else: | |
self.debug_log("Error: Invalid new status") | |
if (delay): #Restart timer for next transition | |
input_select_entity = self.presence_config_dictionary[name].input_select_entity | |
self.debug_log("Queued (Status Changed): Setting entity value {}\n from {} to {} in {}".format(input_select_entity, new, next_status, self.utils.seconds_to_str(delay))) | |
self.run_in_timers[name] = self.run_in(self._set_entity_value_callback, delay, person_name=name, new_status=next_status) | |
def _presence_changed(self, entity, attribute, old, new, name, type, kwargs): | |
self.debug_log("Presence of type {} changed for:\n {} ({}): ({} -> {})".format(type, name, entity, old, new)) | |
self.update_status(kwargs['name']);W | |
############## | |
### Timers ### | |
############## | |
def clear_run_in_timer(self, name): | |
if (name in self.run_in_timers): | |
self.cancel_timer(self.run_in_timers.get(name)) | |
################ | |
### Set Data ### | |
################ | |
def set_entity_value(self, name, new_status): | |
input_select_entity = self.presence_config_dictionary[name].input_select_entity | |
current_status = self.get_state(input_select_entity) | |
if (current_status != new_status): | |
self.debug_log("Setting entity value {} from {} to {}".format(input_select_entity, current_status, new_status)) | |
self.set_state(input_select_entity, state=new_status) | |
################ | |
### GET Data ### | |
################ | |
def get_presence(self, name): #Return home/not_home based on device/group trackers. Considered home if any device is home. | |
if (not name in self.presence_config_dictionary or not isinstance(self.presence_config_dictionary[name], Presence_Config)): | |
self.debug_log("Getting status of " + name + " failed. ") | |
presence_config = self.presence_config_dictionary[name] | |
if presence_config.device_trackers: | |
for device_tracker in presence_config.device_trackers: | |
if (self.get_tracker_state("device_tracker." + device_tracker) == 'home'): | |
return 'home' | |
break | |
if presence_config.group_trackers: | |
for group_tracker in presence_config.group_trackers: | |
if (self.get_state("group." + group_tracker) == 'home'): | |
return 'home' | |
return 'not_home' | |
def manual_override_is_enabled(self): #Return True/False | |
return ('manual_override_switch_id' in self.args and \ | |
self.get_state(self.args['manual_override_entity_id']) == True) | |
def get_delay_for(self, name): | |
delays = self.args['delays'] if 'delays' in self.args else None | |
delay = self.utils.convert_to_seconds(delays[name]) \ | |
if (delays and name in delays) else 0 | |
return max(0, delay) | |
############################## | |
### Initialization Methods ### | |
############################## | |
def populate_duration_for(self): # How long is person away/home to be considered.... | |
self.duration_for = {} | |
self.duration_for['just_arrived'] = self.get_delay_for('just_arrived') | |
self.duration_for['home'] = self.get_delay_for('home')-self.get_delay_for('just_arrived') | |
self.duration_for['just_left'] = self.get_delay_for('just_left') | |
self.duration_for['away'] = self.get_delay_for('away')-self.get_delay_for('just_left') | |
self.duration_for['extended_away'] = self.get_delay_for('extended_away')-self.get_delay_for('away') | |
def populate_presence_config_dictionary(self): | |
if ('persons' not in self.args): | |
self.debug_log("Error: missing 'persons' configuration data") | |
self.args['persons'] = [] | |
persons = self.args['persons'] | |
self.presence_config_dictionary = {} | |
for config in persons: | |
if ('name' not in config): | |
self.debug_log("name is missing from config") | |
continue | |
if config['name'] in self.presence_config_dictionary: | |
self.debug_log("name needs to be unique. Skipping person: " + config['name']) | |
raise ValueError("name needs to be unique. Skipping person: " + config['name']) | |
continue | |
if ('input_select_entity' not in config): | |
self.debug_log("input_select_entity is missing from config") | |
continue | |
if ({'device_trackers','group_trackers'}.isdisjoint(config)): | |
self.debug_log("Either Device trackers or group trackers are required to populate presence configuration.") | |
self.debug_log(config['name'] + " cannot be tracked.") | |
continue | |
presence_config = Presence_Config(config['input_select_entity']) | |
if 'device_trackers' in config: | |
presence_config.device_trackers = config['device_trackers'] | |
if 'group_trackers' in config: | |
presence_config.group_trackers = config['group_trackers'] | |
self.presence_config_dictionary[config['name']] = presence_config | |
###################### | |
### Helper Methods ### | |
###################### | |
def debug_log(self, msg): | |
if (self.args['debug'] == 1): | |
self.log(msg) | |
############### | |
### Classes ### | |
############### | |
class Presence_Config: | |
def __init__(self, input_select_entity, device_trackers=[], group_trackers=[]): | |
self.input_select_entity = input_select_entity #Input select | |
self.device_trackers = device_trackers | |
self.group_trackers = group_trackers | |
class Presence_Data: | |
def __init__(self, name='Default name', current_state = 'home', last_state_change = dt.datetime.now(), last_update = dt.datetime.now()): | |
self.name = name | |
self.last_state = current_state | |
self.last_state_change = last_state_change | |
self.last_update = last_update | |
def set_state(self, new_state): | |
now = dt.datetime.now() | |
if (self.last_state != new_state): | |
self.last_state_change = now | |
self.last_state = new_state | |
self.last_update = now | |
def get_time_since_last_update(self): | |
return (dt.datetime.now() - self.last_update) | |
def get_state_duration(self): | |
return (dt.datetime.now() - self.last_state_change) | |
def get_state_duration_in_seconds(self): | |
return self.get_state_duration().total_seconds() |
import appdaemon.appapi as appapi | |
import json | |
import math | |
from datetime import datetime, timedelta | |
seconds_per_unit = {"s": 1, "m": 60, "h": 3600, "d": 86400, "w": 604800} | |
class utils(appapi.AppDaemon): | |
def convert_to_seconds(self, time): | |
return int(time[:-1]) * seconds_per_unit[time[-1]] | |
def seconds_to_str(self, seconds): | |
return timedelta(seconds=math.ceil(seconds)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment