Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Script to publish mqtt discovery for wink relay running with https://github.com/jimpastos/wink-relay-manager
import json
import logging
import getpass
import argparse
import paho.mqtt.client as mqtt # paho-mqtt
payloads = {
"homeassistant/device_automation/B0C1/%MQTT_PREFIX%/config": {
"automation_type":"trigger",
"topic": "%MQTT_PREFIX%/buttons/0/click/1",
"subtype": "button_1",
"type": "button_short_press"
},
"homeassistant/device_automation/B0H/%MQTT_PREFIX%/config": {
"automation_type":"trigger",
"topic": "%MQTT_PREFIX%/buttons/0/held/1",
"subtype": "button_1",
"type": "button_long_press"
},
"homeassistant/device_automation/B1C1/%MQTT_PREFIX%/config": {
"automation_type":"trigger",
"topic": "%MQTT_PREFIX%/buttons/1/click/1",
"subtype": "button_2",
"type": "button_short_press"
},
"homeassistant/device_automation/B1H/%MQTT_PREFIX%/config": {
"automation_type":"trigger",
"topic": "%MQTT_PREFIX%/buttons/1/held/1",
"subtype": "button_2",
"type": "button_long_press"
},
"homeassistant/switch/%MQTT_PREFIX%_top/config": {
"name": "%MQTT_PREFIX%_top",
"state_topic": "%MQTT_PREFIX%/relays/0/state",
"command_topic": "%MQTT_PREFIX%/relays/0",
"payload_on": "ON",
"payload_off": "OFF"
},
"homeassistant/switch/%MQTT_PREFIX%_bottom/config": {
"name": "%MQTT_PREFIX%_bottom",
"state_topic": "%MQTT_PREFIX%/relays/1/state",
"command_topic": "%MQTT_PREFIX%/relays/1",
"payload_on": "ON",
"payload_off": "OFF"
},
"homeassistant/switch/%MQTT_PREFIX%_screen/config": {
"name": "%MQTT_PREFIX%_screen",
"state_topic": "%MQTT_PREFIX%/screen/state",
"command_topic": "%MQTT_PREFIX%/screen",
"payload_on": "ON",
"payload_off": "OFF"
},
"homeassistant/sensor/%MQTT_PREFIX%_Temp/config": {
"name": "%MQTT_PREFIX%_Temp",
"device_class": "temperature",
"state_topic": "%MQTT_PREFIX%/sensors/temperature",
"unit_of_measurement": "°C",
"value_template": "{{value | round(1)}}"
},
"homeassistant/sensor/%MQTT_PREFIX%_Humidity/config": {
"name": "%MQTT_PREFIX%_Humidity",
"device_class": "humidity",
"state_topic": "%MQTT_PREFIX%/sensors/humidity",
"unit_of_measurement": "%",
"value_template": "{{value | round(1)}}"
},
"homeassistant/sensor/%MQTT_PREFIX%_Proximity/config": {
"name": "%MQTT_PREFIX%_Proximity",
"state_topic": "%MQTT_PREFIX%/proximity/trigger"
},
}
def get_password(args: argparse.Namespace):
if args.user and args.password is None:
logging.debug('user passed but password not passed, asking for one')
args.password = getpass.getpass()
def get_client(args: argparse.Namespace) -> mqtt.Client:
client = mqtt.Client()
if args.user:
logging.debug('Setting username / password')
client.username_pw_set(args.user, args.password)
logging.info(f'Connecting to {args.MQTT_SERVER}:{args.port}')
client.connect(args.MQTT_SERVER, args.port)
return client
def replace_strings(s: str, replacements: dict) -> str:
for k,v in replacements.items():
s = s.replace(k, v)
return s
def replace_dicts(d: "dict[str,str]", replacements: dict) -> dict:
return {
replace_strings(k, replacements): replace_strings(v, replacements)
for k,v in d.items()
}
def get_payloads(replacements: dict) -> dict:
return {
replace_strings(k, replacements): replace_dicts(v, replacements)
for k,v in payloads.items()
}
def send_payloads(client: mqtt.Client, payloads: dict, dryrun=False, delete=False, retain=True):
'''
:param payloads: k,v pairs, where k is the topic and value is the payload to publish
:param dryrun: do not publish the payloads (usefule for debugging)
:param delete: send blank payload to delete the config
:param retain: retain messages in mqtt broker
'''
for topic, payload in payloads.items():
if delete:
payload = ""
else:
payload = json.dumps(payload)
logging.info(f"Sending: {topic} retain={retain}\n{payload}")
if not dryrun:
client.publish(topic, payload, retain=retain)
def setup_logging():
logging.basicConfig(level=logging.INFO)
def go(args: argparse.Namespace):
setup_logging()
get_password(args)
client = get_client(args)
payloads = get_payloads({"%MQTT_PREFIX%": args.MQTT_PREFIX})
send_payloads(client, payloads, args.dryrun, args.delete, args.retain)
def main(name):
# import gaurd
if name != '__main__':
return
# main code
parser = argparse.ArgumentParser(description="publish HA discovery topics for wink relays")
parser.add_argument('MQTT_SERVER', help="mqtt server to publish to")
parser.add_argument('MQTT_PREFIX', help="Relay's mqtt prefix")
parser.add_argument('--port', default=1883, type=int, help='mqtt port default: %(default)s')
parser.add_argument('--user', help='if mqtt server requires a auth')
parser.add_argument('--password', help='password to login with, if ommitted and user passed, password is requested on the command line')
parser.add_argument('--dryrun', action='store_true', help='Do not publish any topics, just log what would have happened')
parser.add_argument('--delete', action="store_true", help='publish empty configs to delete devices previously configured')
parser.add_argument('--no-retain', action='store_false', default=True, dest='retain', help='Do not set retain flag for config messages on mqtt')
args = parser.parse_args()
go(args)
main(__name__)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment