Skip to content

Instantly share code, notes, and snippets.

Last active September 26, 2023 13:32
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
What would you like to do?
Script to publish mqtt discovery for wink relay running with
import json
import logging
import getpass
import argparse
import paho.mqtt.client as mqtt # paho-mqtt
payloads = {
"homeassistant/device_automation/B0C1/%MQTT_PREFIX%/config": {
"topic": "%MQTT_PREFIX%/buttons/0/click/1",
"subtype": "button_1",
"type": "button_short_press"
"homeassistant/device_automation/B0H/%MQTT_PREFIX%/config": {
"topic": "%MQTT_PREFIX%/buttons/0/held/1",
"subtype": "button_1",
"type": "button_long_press"
"homeassistant/device_automation/B1C1/%MQTT_PREFIX%/config": {
"topic": "%MQTT_PREFIX%/buttons/1/click/1",
"subtype": "button_2",
"type": "button_short_press"
"homeassistant/device_automation/B1H/%MQTT_PREFIX%/config": {
"topic": "%MQTT_PREFIX%/buttons/1/held/1",
"subtype": "button_2",
"type": "button_long_press"
"homeassistant/switch/%MQTT_PREFIX%_top/config": {
"name": "%MQTT_PREFIX%_top",
"state_topic": "%MQTT_PREFIX%/relays/0/state",
"command_topic": "%MQTT_PREFIX%/relays/0",
"payload_on": "ON",
"payload_off": "OFF"
"homeassistant/switch/%MQTT_PREFIX%_bottom/config": {
"name": "%MQTT_PREFIX%_bottom",
"state_topic": "%MQTT_PREFIX%/relays/1/state",
"command_topic": "%MQTT_PREFIX%/relays/1",
"payload_on": "ON",
"payload_off": "OFF"
"homeassistant/switch/%MQTT_PREFIX%_screen/config": {
"name": "%MQTT_PREFIX%_screen",
"state_topic": "%MQTT_PREFIX%/screen/state",
"command_topic": "%MQTT_PREFIX%/screen",
"payload_on": "ON",
"payload_off": "OFF"
"homeassistant/sensor/%MQTT_PREFIX%_Temp/config": {
"name": "%MQTT_PREFIX%_Temp",
"device_class": "temperature",
"state_topic": "%MQTT_PREFIX%/sensors/temperature",
"unit_of_measurement": "°C",
"value_template": "{{value | round(1)}}"
"homeassistant/sensor/%MQTT_PREFIX%_Humidity/config": {
"name": "%MQTT_PREFIX%_Humidity",
"device_class": "humidity",
"state_topic": "%MQTT_PREFIX%/sensors/humidity",
"unit_of_measurement": "%",
"value_template": "{{value | round(1)}}"
"homeassistant/sensor/%MQTT_PREFIX%_Proximity/config": {
"name": "%MQTT_PREFIX%_Proximity",
"state_topic": "%MQTT_PREFIX%/proximity/trigger"
def get_password(args: argparse.Namespace):
if args.user and args.password is None:
logging.debug('user passed but password not passed, asking for one')
args.password = getpass.getpass()
def get_client(args: argparse.Namespace) -> mqtt.Client:
client = mqtt.Client()
if args.user:
logging.debug('Setting username / password')
client.username_pw_set(args.user, args.password)'Connecting to {args.MQTT_SERVER}:{args.port}')
client.connect(args.MQTT_SERVER, args.port)
return client
def replace_strings(s: str, replacements: dict) -> str:
for k,v in replacements.items():
s = s.replace(k, v)
return s
def replace_dicts(d: "dict[str,str]", replacements: dict) -> dict:
return {
replace_strings(k, replacements): replace_strings(v, replacements)
for k,v in d.items()
def get_payloads(replacements: dict) -> dict:
return {
replace_strings(k, replacements): replace_dicts(v, replacements)
for k,v in payloads.items()
def send_payloads(client: mqtt.Client, payloads: dict, dryrun=False, delete=False, retain=True):
:param payloads: k,v pairs, where k is the topic and value is the payload to publish
:param dryrun: do not publish the payloads (usefule for debugging)
:param delete: send blank payload to delete the config
:param retain: retain messages in mqtt broker
for topic, payload in payloads.items():
if delete:
payload = ""
payload = json.dumps(payload)"Sending: {topic} retain={retain}\n{payload}")
if not dryrun:
client.publish(topic, payload, retain=retain)
def setup_logging():
def go(args: argparse.Namespace):
client = get_client(args)
payloads = get_payloads({"%MQTT_PREFIX%": args.MQTT_PREFIX})
send_payloads(client, payloads, args.dryrun, args.delete, args.retain)
def main(name):
# import gaurd
if name != '__main__':
# main code
parser = argparse.ArgumentParser(description="publish HA discovery topics for wink relays")
parser.add_argument('MQTT_SERVER', help="mqtt server to publish to")
parser.add_argument('MQTT_PREFIX', help="Relay's mqtt prefix")
parser.add_argument('--port', default=1883, type=int, help='mqtt port default: %(default)s')
parser.add_argument('--user', help='if mqtt server requires a auth')
parser.add_argument('--password', help='password to login with, if ommitted and user passed, password is requested on the command line')
parser.add_argument('--dryrun', action='store_true', help='Do not publish any topics, just log what would have happened')
parser.add_argument('--delete', action="store_true", help='publish empty configs to delete devices previously configured')
parser.add_argument('--no-retain', action='store_false', default=True, dest='retain', help='Do not set retain flag for config messages on mqtt')
args = parser.parse_args()
Copy link

Hey this is awesome, I was just about to try and write something similar for my setup.
How are you integrating this in home assistant? Are you just using AppDaemon?

Copy link

pykler commented Apr 18, 2022

This just pushes some messages to the queues that homeassistant is listening to for MQTT discovery. You just need to setup HA with MQTT

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment