Last active
March 27, 2022 13:07
-
-
Save hardenchant/1a7376302344c82c034e9af65b90faf6 to your computer and use it in GitHub Desktop.
Power on over zigbee and notify when you connect to bluetooth
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import json | |
import logging | |
import logging.handlers | |
import os | |
import signal | |
import sys | |
import dbus | |
import dbus.mainloop.glib | |
import dbus.service | |
import paho.mqtt.publish as publish | |
import requests | |
from gi.repository import GLib as glibobject | |
logging.basicConfig( | |
level=logging.INFO, | |
format="[%(asctime)s] %(levelname)s [%(funcName)s:%(lineno)d] %(message)s" | |
) | |
ZIGBEE_PILOT_ADDR = os.getenv('ZIGBEE_PILOT_ADDR', '') | |
TG_TOKEN = os.getenv('TG_TOKEN', '') | |
TG_CHAT = os.getenv('TG_CHAT', '') | |
mainloop = glibobject.MainLoop() | |
def shutdown(signum, frame): | |
mainloop.quit() | |
def is_any_device_connected() -> bool: | |
""" | |
Iterate over dbus all added bluetooth devices and check if connected now | |
Returns: True/False | |
""" | |
bus = dbus.SystemBus() | |
mngr = bus.get_object('org.bluez', '/') | |
obj_manager = dbus.Interface(mngr, 'org.freedesktop.DBus.ObjectManager') | |
objs = obj_manager.GetManagedObjects() | |
for obj in objs: | |
if objs[obj].get('org.bluez.Device1', {}).get('Connected', None) == 1: | |
return True | |
return False | |
def telegram_notify(message): | |
""" | |
Notify by tg about connections | |
Args: | |
message: text message to send | |
Returns: nothing | |
""" | |
bot_token = TG_TOKEN | |
chat_id = TG_CHAT | |
try: | |
requests.post( | |
f'https://api.telegram.org/bot{bot_token}/sendMessage', | |
data={ | |
'chat_id': chat_id, | |
'text': message | |
} | |
) | |
except Exception as e: | |
logging.error(f'TG message not sent: {e}') | |
def sound_switch_power(power_on=True): | |
""" | |
Power on/off speakers over zigbee protocol | |
Args: | |
power_on: | |
Returns: | |
""" | |
pilot_zigbee_addr = ZIGBEE_PILOT_ADDR | |
message = {"state_l4": "ON"} if power_on else {"state_l4": "OFF"} | |
publish.single(f'zigbee2mqtt/{pilot_zigbee_addr}/set', json.dumps(message), hostname='localhost') | |
logging.info(f'Message {message} sent to mqtt addr {pilot_zigbee_addr}') | |
def device_property_changed(interface, properties, invalidated, path): | |
""" Check for changes in org.bluez object tree | |
Check for any device connected and turn on/off power | |
Args: | |
interface(string) : name of the dbus interface where changes occured | |
properties(dict) : list of all parameters changed and their new value | |
invalidated(array) : list of properties invalidated | |
path(string) : path of the dbus object that triggered the call | |
""" | |
if interface != 'org.bluez.Device1': | |
return | |
if is_any_device_connected(): | |
logging.info(f'Connected devices exists, sound is powered on') | |
sound_switch_power() | |
else: | |
logging.info(f'Connected devices not exists, sound is powered off') | |
sound_switch_power(power_on=False) | |
bus = dbus.SystemBus() | |
device_object = bus.get_object('org.bluez', path) | |
device_properties_interface = dbus.Interface(device_object, 'org.freedesktop.DBus.Properties') | |
name = device_properties_interface.Get('org.bluez.Device1', 'Name') | |
address = device_properties_interface.Get('org.bluez.Device1', 'Address') | |
if properties.get(dbus.String('Connected')) == 1: | |
logging.info(f'{name} ({address}) connected') | |
telegram_notify(f'{name} ({address}) connected to rpi bluetooth') | |
elif properties.get(dbus.String('Connected')) == 0: | |
logging.info(f'{name} ({address}) disconnected') | |
telegram_notify(f'{name} ({address}) disconnected from rpi bluetooth') | |
def main(): | |
# shut down on a TERM signal | |
signal.signal(signal.SIGTERM, shutdown) | |
logging.info('Started') | |
# Get the system bus | |
try: | |
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) | |
bus = dbus.SystemBus() | |
except Exception as e: | |
logging.error(f'Unable to get the system dbus: "{e}". Exiting. Is dbus running?') | |
sys.exit(1) | |
# listen for PropertyChanged signal on org.bluez | |
bus.add_signal_receiver( | |
device_property_changed, | |
bus_name='org.bluez', | |
signal_name='PropertiesChanged', | |
dbus_interface='org.freedesktop.DBus.Properties', | |
path_keyword='path' | |
) | |
try: | |
mainloop.run() | |
except KeyboardInterrupt: | |
pass | |
except: | |
logging.error('Unable to run the gobject main loop') | |
sys.exit(1) | |
logging.info('Shutting down') | |
sys.exit(0) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment