-
-
Save Sylph/7a2776e71e335cb52a3dcb2b32ebc0bc to your computer and use it in GitHub Desktop.
Non binary appdaemon based presence detector for HomeAssistant based on https://philhawthorne.com/making-home-assistants-presence-detection-not-so-binary/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
utils: | |
module: utils | |
class: utils | |
Presence: | |
file: /home/homeassistant/.appdaemon/storage/cache/presence | |
module: presence | |
class: Presence | |
dependencies: | |
- utils | |
debug: true | |
delays: | |
just_arrived: 15s | |
home: 5m | |
just_left: 20s | |
away: 15m | |
extended_away: 24h | |
persons: | |
- name: Sylvia | |
input_select_entity: input_select.sylvia_status | |
constrain_input_select: input_select.sylvia_status,Home,Just Arrived,Just left,Away,Extended Away | |
group_trackers: | |
- sylvia_is_home | |
- name: Wesley | |
input_select_entity: input_select.wesley_status | |
constrain_input_select: input_select.wesley_status,Home,Just Arrived,Just left,Away,Extended Away | |
group_trackers: | |
- wesley_is_home | |
manual_override_entity_id: input_boolean.presence_override |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import appdaemon.plugins.hass.hassapi as hass | |
import shelve | |
import time | |
import datetime as dt | |
db_key = 'presence_data'; | |
class Presence(hass.Hass): | |
def initialize(self): | |
if ('file' not in self.args): | |
self.debug_log("Error: missing 'file' configuration data") | |
return | |
self.run_in_timers = {} | |
self.utils = self.get_app('utils') | |
self.populate_duration_for() | |
self.populate_presence_config_dictionary() | |
self.device_db = shelve.open(self.args['file']) | |
if (db_key not in self.device_db or not isinstance(self.device_db[db_key], type({}))): | |
self.device_db[db_key] = {} | |
# Subscribe to presence changes | |
for name, presence_config in self.presence_config_dictionary.items(): | |
if not isinstance(presence_config, Presence_Config): | |
self.debug_log("Presence Data is invalid") | |
if presence_config.device_trackers: | |
for device_tracker in presence_config.device_trackers: | |
self.listen_state(self._presence_changed, "device_tracker." + device_tracker, name=name, type='device') | |
if presence_config.group_trackers: | |
for group_tracker in presence_config.group_trackers: | |
self.listen_state(self._presence_changed, "group." + group_tracker, name=name, type='group') | |
self.listen_state(self._status_changed, presence_config.input_select_entity, name=name) | |
self.update_status(name) | |
def update_status(self, name): | |
self.clear_run_in_timer(name) # Clear any timer for [name] | |
if (self.manual_override_is_enabled()): | |
self.debug_log("Manual override is enabled. Halting presence automation.") | |
return | |
if (not name in self.presence_config_dictionary or not isinstance(self.presence_config_dictionary[name], Presence_Config)): | |
self.debug_log("Updating status of {} failed. {} is not in list".format(name, name)) | |
return | |
presence = self.get_presence(name) #current home vs not_home | |
presence_data = self.device_db[db_key].get(name, Presence_Data(name)) | |
delays = self.args['delays'] if 'delays' in self.args else None | |
next_status, delay = None, 0 | |
self.debug_log("Updating {}'s status w/ presence: {} -> {}".format(name, presence_data.last_state, presence)) | |
#Prepare new values | |
if (presence != presence_data.last_state): #If person returned or left | |
delays_config_name=0 | |
if (presence == 'home'): #Updated state | |
delays_config_name, next_status = 'just_arrived', 'Just Arrived' | |
else: | |
delays_config_name, next_status = 'just_left', 'Just Left' | |
delay = self.get_delay_for(delays_config_name) | |
else: #stayed home/away | |
duration_in_state = presence_data.get_state_duration_in_seconds() | |
input_select_entity = self.presence_config_dictionary[name].input_select_entity | |
current_status = self.get_state(input_select_entity) | |
self.debug_log("Duration in state is {}".format(self.utils.seconds_to_str(duration_in_state))) | |
if (presence_data.last_state == 'home'): #still home | |
if (duration_in_state > self.get_delay_for('home')): | |
self.set_entity_value(name, 'Home') | |
elif (duration_in_state >= self.get_delay_for('just_arrived')): | |
if current_status != 'Just Arrived': #We have automation to handle the next step | |
self.set_entity_value(name, 'Just Arrived') | |
else: #Otherwise, we start a routine to start the automation routine to change this. | |
next_status = 'Home' | |
delay = self.get_delay_for('home') - duration_in_state | |
else: | |
next_status = 'Just Arrived' | |
delay = self.get_delay_for('just_arrived') - duration_in_state | |
else: #still away | |
if (duration_in_state >= self.get_delay_for('extended_away')): | |
self.set_entity_value(name, 'Extended Away') | |
elif (duration_in_state >= self.get_delay_for('away')): | |
if current_status != 'Away': #We have automation to handle the next step | |
self.set_entity_value(name, 'Away') | |
else: #Otherwise, we start a routine to change this. | |
next_status = 'Extended Away' | |
delay = self.get_delay_for('extended_away') - duration_in_state | |
elif (duration_in_state >= self.get_delay_for('just_left')): | |
if current_status != 'Just Left': #We have automation to handle the next step | |
self.set_entity_value(name, 'Just Left') | |
else: #Otherwise, we start a routine to start the automation routine to change this. | |
next_status = 'Away' | |
delay = self.get_delay_for('away') - duration_in_state | |
else: | |
next_status = 'Just Left' | |
delay = self.get_delay_for('just_left') - duration_in_state | |
if (next_status): | |
self.run_in_timers[name] = self.run_in(self._set_entity_value_callback, delay, person_name=name, new_status=next_status) | |
self.debug_log("Queued: Setting {}'s status to {} in {}.".format(name, next_status, self.utils.seconds_to_str(delay))) | |
presence_data.set_state(presence) | |
new_data = self.device_db[db_key] | |
new_data[name] = presence_data | |
self.device_db[db_key] = new_data | |
################# | |
### Callbacks ### | |
################# | |
def _set_entity_value_callback(self, kwargs): | |
self.set_entity_value(kwargs['person_name'], kwargs['new_status']) | |
#If status just changed. This is usually done manually in the UI. | |
def _status_changed(self, entity, attribute, old, new, kwargs): | |
name = kwargs['name'] | |
self.debug_log("Status changed for {} ({}): ({} -> {})".format(name, entity, old, new)) | |
if (self.manual_override_is_enabled()): | |
return | |
self.debug_log("Processing status changed routine for " + name) | |
self.clear_run_in_timer(name) | |
if new in ['Home', 'Extended Away']: | |
return | |
elif new == 'Just Arrived': | |
next_status, delay = 'Home', self.duration_for['home'] | |
elif new == 'Just Left': | |
next_status, delay = 'Away', self.duration_for['away'] | |
elif new == 'Away': | |
next_status, delay = 'Extended Away', self.duration_for['extended_away'] | |
else: | |
self.debug_log("Error: Invalid new status") | |
if (delay): #Restart timer for next transition | |
input_select_entity = self.presence_config_dictionary[name].input_select_entity | |
self.debug_log("Queued (Status Changed): Setting entity value {}\n from {} to {} in {}".format(input_select_entity, new, next_status, self.utils.seconds_to_str(delay))) | |
self.run_in_timers[name] = self.run_in(self._set_entity_value_callback, delay, person_name=name, next_status=next_status) | |
def _presence_changed(self, entity, attribute, old, new, kwargs): | |
self.debug_log("Presence of type {} changed for:\n {} ({}): ({} -> {})".format(kwargs['type'], kwargs['name'], entity, old, new)) | |
self.update_status(kwargs['name']); | |
############## | |
### Timers ### | |
############## | |
def clear_run_in_timer(self, name): | |
if (name in self.run_in_timers): | |
self.cancel_timer(self.run_in_timers.get(name)) | |
################ | |
### Set Data ### | |
################ | |
def set_entity_value(self, name, new_status): | |
input_select_entity = self.presence_config_dictionary[name].input_select_entity | |
current_status = self.get_state(input_select_entity) | |
if (current_status != new_status): | |
self.debug_log("Setting entity value {} from {} to {}".format(input_select_entity, current_status, new_status)) | |
self.set_state(input_select_entity, state=new_status) | |
################ | |
### GET Data ### | |
################ | |
def get_presence(self, name): #Return home/not_home based on device/group trackers. Considered home if any device is home. | |
if (not name in self.presence_config_dictionary or not isinstance(self.presence_config_dictionary[name], Presence_Config)): | |
self.debug_log("Getting status of " + name + " failed. ") | |
presence_config = self.presence_config_dictionary[name] | |
if presence_config.device_trackers: | |
for device_tracker in presence_config.device_trackers: | |
if (self.get_tracker_state("device_tracker." + device_tracker) == 'home'): | |
return 'home' | |
break | |
if presence_config.group_trackers: | |
for group_tracker in presence_config.group_trackers: | |
if (self.get_state("group." + group_tracker) == 'home'): | |
return 'home' | |
return 'not_home' | |
def manual_override_is_enabled(self): #Return True/False | |
return ('manual_override_switch_id' in self.args and \ | |
self.get_state(self.args['manual_override_entity_id']) == True) | |
def get_delay_for(self, name): | |
delays = self.args['delays'] if 'delays' in self.args else None | |
delay = self.utils.convert_to_seconds(delays[name]) \ | |
if (delays and name in delays) else 0 | |
return max(0, delay) | |
############################## | |
### Initialization Methods ### | |
############################## | |
def populate_duration_for(self): # How long is person away/home to be considered.... | |
self.duration_for = {} | |
self.duration_for['just_arrived'] = self.get_delay_for('just_arrived') | |
self.duration_for['home'] = self.get_delay_for('home')-self.get_delay_for('just_arrived') | |
self.duration_for['just_left'] = self.get_delay_for('just_left') | |
self.duration_for['away'] = self.get_delay_for('away')-self.get_delay_for('just_left') | |
self.duration_for['extended_away'] = self.get_delay_for('extended_away')-self.get_delay_for('away') | |
def populate_presence_config_dictionary(self): | |
if ('persons' not in self.args): | |
self.debug_log("Error: missing 'persons' configuration data") | |
self.args['persons'] = [] | |
persons = self.args['persons'] | |
self.presence_config_dictionary = {} | |
for config in persons: | |
if ('name' not in config): | |
self.debug_log("name is missing from config") | |
continue | |
if config['name'] in self.presence_config_dictionary: | |
self.debug_log("name needs to be unique. Skipping person: " + config['name']) | |
raise ValueError("name needs to be unique. Skipping person: " + config['name']) | |
continue | |
if ('input_select_entity' not in config): | |
self.debug_log("input_select_entity is missing from config") | |
continue | |
if ({'device_trackers','group_trackers'}.isdisjoint(config)): | |
self.debug_log("Either Device trackers or group trackers are required to populate presence configuration.") | |
self.debug_log(config['name'] + " cannot be tracked.") | |
continue | |
presence_config = Presence_Config(config['input_select_entity']) | |
if 'device_trackers' in config: | |
presence_config.device_trackers = config['device_trackers'] | |
if 'group_trackers' in config: | |
presence_config.group_trackers = config['group_trackers'] | |
self.presence_config_dictionary[config['name']] = presence_config | |
###################### | |
### Helper Methods ### | |
###################### | |
def debug_log(self, msg): | |
if (self.args['debug'] == 1): | |
self.log(msg) | |
############### | |
### Classes ### | |
############### | |
class Presence_Config: | |
def __init__(self, input_select_entity, device_trackers=[], group_trackers=[]): | |
self.input_select_entity = input_select_entity #Input select | |
self.device_trackers = device_trackers | |
self.group_trackers = group_trackers | |
class Presence_Data: | |
def __init__(self, name='Default name', current_state = 'home', last_state_change = dt.datetime.now(), last_update = dt.datetime.now()): | |
self.name = name | |
self.last_state = current_state | |
self.last_state_change = last_state_change | |
self.last_update = last_update | |
def set_state(self, new_state): | |
now = dt.datetime.now() | |
if (self.last_state != new_state): | |
self.last_state_change = now | |
self.last_state = new_state | |
self.last_update = now | |
def get_time_since_last_update(self): | |
return (dt.datetime.now() - self.last_update) | |
def get_state_duration(self): | |
return (dt.datetime.now() - self.last_state_change) | |
def get_state_duration_in_seconds(self): | |
return self.get_state_duration().total_seconds() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import appdaemon.plugins.hass.hassapi as hass | |
import shelve | |
import time | |
import datetime as dt | |
db_key = 'presence_data'; | |
class Presence(hass.Hass): | |
def initialize(self): | |
if ('file' not in self.args): | |
self.debug_log("Error: missing 'file' configuration data") | |
return | |
self.run_in_timers = {} | |
self.utils = self.get_app('utils') | |
self.populate_duration_for() | |
self.populate_presence_config_dictionary() | |
self.device_db = shelve.open(self.args['file']) | |
if (db_key not in self.device_db or not isinstance(self.device_db[db_key], type({}))): | |
self.device_db[db_key] = {} | |
# Subscribe to presence changes | |
for name, presence_config in self.presence_config_dictionary.items(): | |
if not isinstance(presence_config, Presence_Config): | |
self.debug_log("Presence Data is invalid") | |
if presence_config.device_trackers: | |
for device_tracker in presence_config.device_trackers: | |
self.listen_state(self._presence_changed, "device_tracker." + device_tracker, name=name, type='device') | |
if presence_config.group_trackers: | |
for group_tracker in presence_config.group_trackers: | |
self.listen_state(self._presence_changed, "group." + group_tracker, name=name, type='group') | |
self.listen_state(self._status_changed, presence_config.input_select_entity, name=name) | |
self.update_status(name) | |
def update_status(self, name): | |
self.clear_run_in_timer(name) # Clear any timer for [name] | |
if (self.manual_override_is_enabled()): | |
self.debug_log("Manual override is enabled. Halting presence automation.") | |
return | |
if (not name in self.presence_config_dictionary or not isinstance(self.presence_config_dictionary[name], Presence_Config)): | |
self.debug_log("Updating status of {} failed. {} is not in list".format(name, name)) | |
return | |
presence = self.get_presence(name) #current home vs not_home | |
presence_data = self.device_db[db_key].get(name, Presence_Data(name)) | |
delays = self.args['delays'] if 'delays' in self.args else None | |
next_status, delay = None, 0 | |
self.debug_log("Updating {}'s status w/ presence: {} -> {}".format(name, presence_data.last_state, presence)) | |
#Prepare new values | |
if (presence != presence_data.last_state): #If person returned or left | |
delays_config_name=0 | |
if (presence == 'home'): #Updated state | |
delays_config_name, next_status = 'just_arrived', 'Just Arrived' | |
else: | |
delays_config_name, next_status = 'just_left', 'Just Left' | |
delay = self.get_delay_for(delays_config_name) | |
else: #stayed home/away | |
duration_in_state = presence_data.get_state_duration_in_seconds() | |
input_select_entity = self.presence_config_dictionary[name].input_select_entity | |
current_status = self.get_state(input_select_entity) | |
self.debug_log("Duration in state is {}".format(self.utils.seconds_to_str(duration_in_state))) | |
if (presence_data.last_state == 'home'): #still home | |
if (duration_in_state > self.get_delay_for('home')): | |
self.set_entity_value(name, 'Home') | |
elif (duration_in_state >= self.get_delay_for('just_arrived')): | |
if current_status != 'Just Arrived': #We have automation to handle the next step | |
self.set_entity_value(name, 'Just Arrived') | |
else: #Otherwise, we start a routine to start the automation routine to change this. | |
next_status = 'Home' | |
delay = self.get_delay_for('home') - duration_in_state | |
else: | |
next_status = 'Just Arrived' | |
delay = self.get_delay_for('just_arrived') - duration_in_state | |
else: #still away | |
if (duration_in_state >= self.get_delay_for('extended_away')): | |
self.set_entity_value(name, 'Extended Away') | |
elif (duration_in_state >= self.get_delay_for('away')): | |
if current_status != 'Away': #We have automation to handle the next step | |
self.set_entity_value(name, 'Away') | |
else: #Otherwise, we start a routine to change this. | |
next_status = 'Extended Away' | |
delay = self.get_delay_for('extended_away') - duration_in_state | |
elif (duration_in_state >= self.get_delay_for('just_left')): | |
if current_status != 'Just Left': #We have automation to handle the next step | |
self.set_entity_value(name, 'Just Left') | |
else: #Otherwise, we start a routine to start the automation routine to change this. | |
next_status = 'Away' | |
delay = self.get_delay_for('away') - duration_in_state | |
else: | |
next_status = 'Just Left' | |
delay = self.get_delay_for('just_left') - duration_in_state | |
if (next_status): | |
self.run_in_timers[name] = self.run_in(self._set_entity_value_callback, delay, person_name=name, new_status=next_status) | |
self.debug_log("Queued: Setting {}'s status to {} in {}.".format(name, next_status, self.utils.seconds_to_str(delay))) | |
presence_data.set_state(presence) | |
new_data = self.device_db[db_key] | |
new_data[name] = presence_data | |
self.device_db[db_key] = new_data | |
################# | |
### Callbacks ### | |
################# | |
def _set_entity_value_callback(self, kwargs): | |
self.set_entity_value(kwargs['person_name'], kwargs['new_status']) | |
#If status just changed. This is usually done manually in the UI. | |
def _status_changed(self, entity, attribute, old, new, kwargs): | |
name = kwargs['name'] | |
self.debug_log("Status changed for {} ({}): ({} -> {})".format(name, entity, old, new)) | |
if (self.manual_override_is_enabled()): | |
return | |
self.debug_log("Processing status changed routine for " + name) | |
self.clear_run_in_timer(name) | |
if new in ['Home', 'Extended Away']: | |
return | |
elif new == 'Just Arrived': | |
next_status, delay = 'Home', self.duration_for['home'] | |
elif new == 'Just Left': | |
next_status, delay = 'Away', self.duration_for['away'] | |
elif new == 'Away': | |
next_status, delay = 'Extended Away', self.duration_for['extended_away'] | |
else: | |
self.debug_log("Error: Invalid new status") | |
if (delay): #Restart timer for next transition | |
input_select_entity = self.presence_config_dictionary[name].input_select_entity | |
self.debug_log("Queued (Status Changed): Setting entity value {}\n from {} to {} in {}".format(input_select_entity, new, next_status, self.utils.seconds_to_str(delay))) | |
self.run_in_timers[name] = self.run_in(self._set_entity_value_callback, delay, person_name=name, new_status=next_status) | |
def _presence_changed(self, entity, attribute, old, new, name, type, kwargs): | |
self.debug_log("Presence of type {} changed for:\n {} ({}): ({} -> {})".format(type, name, entity, old, new)) | |
self.update_status(kwargs['name']);W | |
############## | |
### Timers ### | |
############## | |
def clear_run_in_timer(self, name): | |
if (name in self.run_in_timers): | |
self.cancel_timer(self.run_in_timers.get(name)) | |
################ | |
### Set Data ### | |
################ | |
def set_entity_value(self, name, new_status): | |
input_select_entity = self.presence_config_dictionary[name].input_select_entity | |
current_status = self.get_state(input_select_entity) | |
if (current_status != new_status): | |
self.debug_log("Setting entity value {} from {} to {}".format(input_select_entity, current_status, new_status)) | |
self.set_state(input_select_entity, state=new_status) | |
################ | |
### GET Data ### | |
################ | |
def get_presence(self, name): #Return home/not_home based on device/group trackers. Considered home if any device is home. | |
if (not name in self.presence_config_dictionary or not isinstance(self.presence_config_dictionary[name], Presence_Config)): | |
self.debug_log("Getting status of " + name + " failed. ") | |
presence_config = self.presence_config_dictionary[name] | |
if presence_config.device_trackers: | |
for device_tracker in presence_config.device_trackers: | |
if (self.get_tracker_state("device_tracker." + device_tracker) == 'home'): | |
return 'home' | |
break | |
if presence_config.group_trackers: | |
for group_tracker in presence_config.group_trackers: | |
if (self.get_state("group." + group_tracker) == 'home'): | |
return 'home' | |
return 'not_home' | |
def manual_override_is_enabled(self): #Return True/False | |
return ('manual_override_switch_id' in self.args and \ | |
self.get_state(self.args['manual_override_entity_id']) == True) | |
def get_delay_for(self, name): | |
delays = self.args['delays'] if 'delays' in self.args else None | |
delay = self.utils.convert_to_seconds(delays[name]) \ | |
if (delays and name in delays) else 0 | |
return max(0, delay) | |
############################## | |
### Initialization Methods ### | |
############################## | |
def populate_duration_for(self): # How long is person away/home to be considered.... | |
self.duration_for = {} | |
self.duration_for['just_arrived'] = self.get_delay_for('just_arrived') | |
self.duration_for['home'] = self.get_delay_for('home')-self.get_delay_for('just_arrived') | |
self.duration_for['just_left'] = self.get_delay_for('just_left') | |
self.duration_for['away'] = self.get_delay_for('away')-self.get_delay_for('just_left') | |
self.duration_for['extended_away'] = self.get_delay_for('extended_away')-self.get_delay_for('away') | |
def populate_presence_config_dictionary(self): | |
if ('persons' not in self.args): | |
self.debug_log("Error: missing 'persons' configuration data") | |
self.args['persons'] = [] | |
persons = self.args['persons'] | |
self.presence_config_dictionary = {} | |
for config in persons: | |
if ('name' not in config): | |
self.debug_log("name is missing from config") | |
continue | |
if config['name'] in self.presence_config_dictionary: | |
self.debug_log("name needs to be unique. Skipping person: " + config['name']) | |
raise ValueError("name needs to be unique. Skipping person: " + config['name']) | |
continue | |
if ('input_select_entity' not in config): | |
self.debug_log("input_select_entity is missing from config") | |
continue | |
if ({'device_trackers','group_trackers'}.isdisjoint(config)): | |
self.debug_log("Either Device trackers or group trackers are required to populate presence configuration.") | |
self.debug_log(config['name'] + " cannot be tracked.") | |
continue | |
presence_config = Presence_Config(config['input_select_entity']) | |
if 'device_trackers' in config: | |
presence_config.device_trackers = config['device_trackers'] | |
if 'group_trackers' in config: | |
presence_config.group_trackers = config['group_trackers'] | |
self.presence_config_dictionary[config['name']] = presence_config | |
###################### | |
### Helper Methods ### | |
###################### | |
def debug_log(self, msg): | |
if (self.args['debug'] == 1): | |
self.log(msg) | |
############### | |
### Classes ### | |
############### | |
class Presence_Config: | |
def __init__(self, input_select_entity, device_trackers=[], group_trackers=[]): | |
self.input_select_entity = input_select_entity #Input select | |
self.device_trackers = device_trackers | |
self.group_trackers = group_trackers | |
class Presence_Data: | |
def __init__(self, name='Default name', current_state = 'home', last_state_change = dt.datetime.now(), last_update = dt.datetime.now()): | |
self.name = name | |
self.last_state = current_state | |
self.last_state_change = last_state_change | |
self.last_update = last_update | |
def set_state(self, new_state): | |
now = dt.datetime.now() | |
if (self.last_state != new_state): | |
self.last_state_change = now | |
self.last_state = new_state | |
self.last_update = now | |
def get_time_since_last_update(self): | |
return (dt.datetime.now() - self.last_update) | |
def get_state_duration(self): | |
return (dt.datetime.now() - self.last_state_change) | |
def get_state_duration_in_seconds(self): | |
return self.get_state_duration().total_seconds() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import appdaemon.appapi as appapi | |
import json | |
import math | |
from datetime import datetime, timedelta | |
seconds_per_unit = {"s": 1, "m": 60, "h": 3600, "d": 86400, "w": 604800} | |
class utils(appapi.AppDaemon): | |
def convert_to_seconds(self, time): | |
return int(time[:-1]) * seconds_per_unit[time[-1]] | |
def seconds_to_str(self, seconds): | |
return timedelta(seconds=math.ceil(seconds)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment