Skip to content

Instantly share code, notes, and snippets.

@yottatsa
Created January 30, 2019 16:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yottatsa/2395de4ea665fecabf1dfb63796a546b to your computer and use it in GitHub Desktop.
Save yottatsa/2395de4ea665fecabf1dfb63796a546b to your computer and use it in GitHub Desktop.
systemd Service Notification for home assistant
import os
import logging
from datetime import timedelta
from homeassistant.components.binary_sensor import BinarySensorDevice
from homeassistant.util import Throttle
REQUIREMENTS = ["sdnotify"]
__version__ = "0.1"
_LOGGER = logging.getLogger(__name__)
def get_scan_interval():
watchdog_usec = os.environ.get("WATCHDOG_USEC")
if watchdog_usec is not None:
_LOGGER.debug("WATCHDOG_USEC=%s", watchdog_usec)
watchdog_sec = int(watchdog_usec)/1000/1000
return watchdog_sec / 4
SCAN_INTERVAL = timedelta(seconds=get_scan_interval() or 5)
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None): """Initialise SDNotifier."""
async_add_entities([Notifier(hass)], True)
class Notifier(BinarySensorDevice):
def __init__(self, hass):
import sdnotify
self.notifier = sdnotify.SystemdNotifier()
self.ready = False
def _notify(self, message):
self.notifier.notify(message)
_LOGGER.debug("sdnotify: %s", message)
@property
def should_poll(self):
"""Determine if polling needed."""
return self.notifier.socket is not None
@property
def name(self):
"""Return the name of the sensor."""
return "systemd Service Notification"
@Throttle(SCAN_INTERVAL)
async def async_update(self):
if self.notifier.socket is not None:
if not self.ready:
self._notify("READY=1")
self.ready = True
self._notify("WATCHDOG=1")
@property
def is_on(self):
"""Return the state of the sensor."""
return self.ready
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment