Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Script to publish mqtt discovery for wink relay running with https://github.com/jimpastos/wink-relay-manager
import json
import logging
import getpass
import argparse
import paho.mqtt.client as mqtt # paho-mqtt
payloads = {
"homeassistant/device_automation/B0C1/%MQTT_PREFIX%/config": {
"automation_type":"trigger",
"topic": "%MQTT_PREFIX%/buttons/0/click/1",
"subtype": "button_1",
"type": "button_short_press"
},
"homeassistant/device_automation/B0H/%MQTT_PREFIX%/config": {
"automation_type":"trigger",
"topic": "%MQTT_PREFIX%/buttons/0/held/1",
"subtype": "button_1",
"type": "button_long_press"
},
"homeassistant/device_automation/B1C1/%MQTT_PREFIX%/config": {
"automation_type":"trigger",
"topic": "%MQTT_PREFIX%/buttons/1/click/1",
"subtype": "button_2",
"type": "button_short_press"
},
"homeassistant/device_automation/B1H/%MQTT_PREFIX%/config": {
"automation_type":"trigger",
"topic": "%MQTT_PREFIX%/buttons/1/held/1",
"subtype": "button_2",
"type": "button_long_press"
},
"homeassistant/switch/%MQTT_PREFIX%_top/config": {
"name": "%MQTT_PREFIX%_top",
"state_topic": "%MQTT_PREFIX%/relays/0/state",
"command_topic": "%MQTT_PREFIX%/relays/0",
"payload_on": "ON",
"payload_off": "OFF"
},
"homeassistant/switch/%MQTT_PREFIX%_bottom/config": {
"name": "%MQTT_PREFIX%_bottom",
"state_topic": "%MQTT_PREFIX%/relays/1/state",
"command_topic": "%MQTT_PREFIX%/relays/1",
"payload_on": "ON",
"payload_off": "OFF"
},
"homeassistant/switch/%MQTT_PREFIX%_screen/config": {
"name": "%MQTT_PREFIX%_screen",
"state_topic": "%MQTT_PREFIX%/screen/state",
"command_topic": "%MQTT_PREFIX%/screen",
"payload_on": "ON",
"payload_off": "OFF"
},
"homeassistant/sensor/%MQTT_PREFIX%_Temp/config": {
"name": "%MQTT_PREFIX%_Temp",
"device_class": "temperature",
"state_topic": "%MQTT_PREFIX%/sensors/temperature",
"unit_of_measurement": "°C",
"value_template": "{{value | round(1)}}"
},
"homeassistant/sensor/%MQTT_PREFIX%_Humidity/config": {
"name": "%MQTT_PREFIX%_Humidity",
"device_class": "humidity",
"state_topic": "%MQTT_PREFIX%/sensors/humidity",
"unit_of_measurement": "%",
"value_template": "{{value | round(1)}}"
},
"homeassistant/sensor/%MQTT_PREFIX%_Proximity/config": {
"name": "%MQTT_PREFIX%_Proximity",
"state_topic": "%MQTT_PREFIX%/proximity/trigger"
},
}
def get_password(args: argparse.Namespace):
if args.user and args.password is None:
logging.debug('user passed but password not passed, asking for one')
args.password = getpass.getpass()
def get_client(args: argparse.Namespace) -> mqtt.Client:
client = mqtt.Client()
if args.user:
logging.debug('Setting username / password')
client.username_pw_set(args.user, args.password)
logging.info(f'Connecting to {args.MQTT_SERVER}:{args.port}')
client.connect(args.MQTT_SERVER, args.port)
return client
def replace_strings(s: str, replacements: dict) -> str:
for k,v in replacements.items():
s = s.replace(k, v)
return s
def replace_dicts(d: "dict[str,str]", replacements: dict) -> dict:
return {
replace_strings(k, replacements): replace_strings(v, replacements)
for k,v in d.items()
}
def get_payloads(replacements: dict) -> dict:
return {
replace_strings(k, replacements): replace_dicts(v, replacements)
for k,v in payloads.items()
}
def send_payloads(client: mqtt.Client, payloads: dict, dryrun=False, delete=False, retain=True):
'''
:param payloads: k,v pairs, where k is the topic and value is the payload to publish
:param dryrun: do not publish the payloads (usefule for debugging)
:param delete: send blank payload to delete the config
:param retain: retain messages in mqtt broker
'''
for topic, payload in payloads.items():
if delete:
payload = ""
else:
payload = json.dumps(payload)
logging.info(f"Sending: {topic} retain={retain}\n{payload}")
if not dryrun:
client.publish(topic, payload, retain=retain)
def setup_logging():
logging.basicConfig(level=logging.INFO)
def go(args: argparse.Namespace):
setup_logging()
get_password(args)
client = get_client(args)
payloads = get_payloads({"%MQTT_PREFIX%": args.MQTT_PREFIX})
send_payloads(client, payloads, args.dryrun, args.delete, args.retain)
def main(name):
# import gaurd
if name != '__main__':
return
# main code
parser = argparse.ArgumentParser(description="publish HA discovery topics for wink relays")
parser.add_argument('MQTT_SERVER', help="mqtt server to publish to")
parser.add_argument('MQTT_PREFIX', help="Relay's mqtt prefix")
parser.add_argument('--port', default=1883, type=int, help='mqtt port default: %(default)s')
parser.add_argument('--user', help='if mqtt server requires a auth')
parser.add_argument('--password', help='password to login with, if ommitted and user passed, password is requested on the command line')
parser.add_argument('--dryrun', action='store_true', help='Do not publish any topics, just log what would have happened')
parser.add_argument('--delete', action="store_true", help='publish empty configs to delete devices previously configured')
parser.add_argument('--no-retain', action='store_false', default=True, dest='retain', help='Do not set retain flag for config messages on mqtt')
args = parser.parse_args()
go(args)
main(__name__)
@ChocolateNinja
Copy link

ChocolateNinja commented Apr 4, 2022

Hey this is awesome, I was just about to try and write something similar for my setup.
How are you integrating this in home assistant? Are you just using AppDaemon?

@pykler
Copy link
Author

pykler commented Apr 18, 2022

This just pushes some messages to the queues that homeassistant is listening to for MQTT discovery. You just need to setup HA with MQTT

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment