Skip to content

Instantly share code, notes, and snippets.

@TD22057
Created March 14, 2022 02:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TD22057/d4da4be5134e38c895e347e5f676ec2c to your computer and use it in GitHub Desktop.
Save TD22057/d4da4be5134e38c895e347e5f676ec2c to your computer and use it in GitHub Desktop.
HASS blue iris notify
import hassapi as hass
import datetime as DT
import requests
__doc__ = """Send notification when the camera motion is triggered
apps.yaml config entry:
camera_notify:
module: camera_notify
class: CameraNotify
user: bi_user
password: bi_pass
service: telegram_bot/send_photo
url: http://192.168.10.9:81
cameras:
- sensor.camera_ai_front_door
- sensor.camera_ai_driveway_middle
- sensor.camera_ai_driveway_upper
- sensor.camera_ai_driveway_lower
temp_file: /tmp/camera_ai_motion-{}.jpg
max_temp_file: 20
disable: input_boolean.camera_motion_enable
"""
class CameraNotify(hass.Hass):
def initialize(self):
self.log("Starting camera notification")
self.url = self.args["url"]
self.user = self.args["user"]
self.password = self.args["password"]
self.notify = self.args["service"]
self.temp_file = self.args["temp_file"]
self.max_temp_file = self.args["max_temp_file"]
self.temp_count = 0
for s in self.args["cameras"]:
self.listen_state(self.motion_cb, s)
self.disable = self.args.get("disable", None)
#-------------------------------------------------------------------------
def motion_cb(self, entity, attr, old_state, new_state, kwargs):
self.log("Camera '%s' changed from '%s' to '%s'", entity, old_state,
new_state)
if self.disable:
disable_obj = self.get_entity(self.disable)
if disable_obj.get_state() == "off":
self.log("Camera motion disabled")
return
obj = self.get_entity(entity)
attr = obj.get_state(attribute="all")
self.log("Camera: %s", attr)
extra = attr.get("attributes", {})
type = extra.get("type", "Unknown")
memo = extra.get("memo", "Unknown")
alert_db = extra["alert_db"]
url = (f'{self.url}/alerts/{alert_db}&fulljpeg'
f'&user={self.user}&pw={self.password}')
dest = self.temp_file.format(self.temp_count)
self.temp_count += 1
if self.temp_count > self.max_temp_file:
self.temp_count = 0
self.download_file(url, dest)
self.call_service(self.notify, file=dest, caption=memo)
#-------------------------------------------------------------------------
def download_file(self, url, local_file):
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open(local_file, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
#-------------------------------------------------------------------------
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment