Skip to content

Instantly share code, notes, and snippets.

@hardenchant
Last active March 27, 2022 13:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hardenchant/1a7376302344c82c034e9af65b90faf6 to your computer and use it in GitHub Desktop.
Save hardenchant/1a7376302344c82c034e9af65b90faf6 to your computer and use it in GitHub Desktop.
Power on over zigbee and notify when you connect to bluetooth
#!/usr/bin/python
import json
import logging
import logging.handlers
import os
import signal
import sys
import dbus
import dbus.mainloop.glib
import dbus.service
import paho.mqtt.publish as publish
import requests
from gi.repository import GLib as glibobject
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(levelname)s [%(funcName)s:%(lineno)d] %(message)s"
)
ZIGBEE_PILOT_ADDR = os.getenv('ZIGBEE_PILOT_ADDR', '')
TG_TOKEN = os.getenv('TG_TOKEN', '')
TG_CHAT = os.getenv('TG_CHAT', '')
mainloop = glibobject.MainLoop()
def shutdown(signum, frame):
mainloop.quit()
def is_any_device_connected() -> bool:
"""
Iterate over dbus all added bluetooth devices and check if connected now
Returns: True/False
"""
bus = dbus.SystemBus()
mngr = bus.get_object('org.bluez', '/')
obj_manager = dbus.Interface(mngr, 'org.freedesktop.DBus.ObjectManager')
objs = obj_manager.GetManagedObjects()
for obj in objs:
if objs[obj].get('org.bluez.Device1', {}).get('Connected', None) == 1:
return True
return False
def telegram_notify(message):
"""
Notify by tg about connections
Args:
message: text message to send
Returns: nothing
"""
bot_token = TG_TOKEN
chat_id = TG_CHAT
try:
requests.post(
f'https://api.telegram.org/bot{bot_token}/sendMessage',
data={
'chat_id': chat_id,
'text': message
}
)
except Exception as e:
logging.error(f'TG message not sent: {e}')
def sound_switch_power(power_on=True):
"""
Power on/off speakers over zigbee protocol
Args:
power_on:
Returns:
"""
pilot_zigbee_addr = ZIGBEE_PILOT_ADDR
message = {"state_l4": "ON"} if power_on else {"state_l4": "OFF"}
publish.single(f'zigbee2mqtt/{pilot_zigbee_addr}/set', json.dumps(message), hostname='localhost')
logging.info(f'Message {message} sent to mqtt addr {pilot_zigbee_addr}')
def device_property_changed(interface, properties, invalidated, path):
""" Check for changes in org.bluez object tree
Check for any device connected and turn on/off power
Args:
interface(string) : name of the dbus interface where changes occured
properties(dict) : list of all parameters changed and their new value
invalidated(array) : list of properties invalidated
path(string) : path of the dbus object that triggered the call
"""
if interface != 'org.bluez.Device1':
return
if is_any_device_connected():
logging.info(f'Connected devices exists, sound is powered on')
sound_switch_power()
else:
logging.info(f'Connected devices not exists, sound is powered off')
sound_switch_power(power_on=False)
bus = dbus.SystemBus()
device_object = bus.get_object('org.bluez', path)
device_properties_interface = dbus.Interface(device_object, 'org.freedesktop.DBus.Properties')
name = device_properties_interface.Get('org.bluez.Device1', 'Name')
address = device_properties_interface.Get('org.bluez.Device1', 'Address')
if properties.get(dbus.String('Connected')) == 1:
logging.info(f'{name} ({address}) connected')
telegram_notify(f'{name} ({address}) connected to rpi bluetooth')
elif properties.get(dbus.String('Connected')) == 0:
logging.info(f'{name} ({address}) disconnected')
telegram_notify(f'{name} ({address}) disconnected from rpi bluetooth')
def main():
# shut down on a TERM signal
signal.signal(signal.SIGTERM, shutdown)
logging.info('Started')
# Get the system bus
try:
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
except Exception as e:
logging.error(f'Unable to get the system dbus: "{e}". Exiting. Is dbus running?')
sys.exit(1)
# listen for PropertyChanged signal on org.bluez
bus.add_signal_receiver(
device_property_changed,
bus_name='org.bluez',
signal_name='PropertiesChanged',
dbus_interface='org.freedesktop.DBus.Properties',
path_keyword='path'
)
try:
mainloop.run()
except KeyboardInterrupt:
pass
except:
logging.error('Unable to run the gobject main loop')
sys.exit(1)
logging.info('Shutting down')
sys.exit(0)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment