Skip to content

Instantly share code, notes, and snippets.

@evanreichard
Last active March 26, 2023 18:16
Show Gist options
  • Save evanreichard/2bc07840d589f0ec2714b09428a959c7 to your computer and use it in GitHub Desktop.
Save evanreichard/2bc07840d589f0ec2714b09428a959c7 to your computer and use it in GitHub Desktop.
Sonoff Tasmota RF Bridge Demultiplexer
# NOTE:
# The following is a _subset_ of HA's "automations.yaml"
- alias: '[RFBridge] Demultiplexer'
trigger:
- platform: mqtt
topic: tele/RFBridge/RESULT
- platform: mqtt
topic: tele/RFBridge/LWT
action:
- service: python_script.rfbridge_demux
data_template:
payload: '{{trigger}}'
# NOTE:
# The following is a _subset_ of HA's "configuration.yaml"
# Used to Normalize MQTT Topics
python_script:
mqtt:
binary_sensor:
- name: Half Bath Water Sensor
state_topic: 'home/half_bath_water_sensor'
device_class: moisture
off_delay: 10
- name: Laundry Room Water Sensor
state_topic: 'home/laundry_room_water_sensor'
device_class: moisture
off_delay: 10
- name: HVAC Drip Pan Water Sensor
state_topic: 'home/hvac_drip_pan_water_sensor'
device_class: moisture
off_delay: 10
- name: Guest Bath Sink Water Sensor
state_topic: 'home/guest_bath_sink_water_sensor'
device_class: moisture
off_delay: 10
- name: Master Bath Sink Water Sensor
state_topic: 'home/master_bath_sink_water_sensor'
device_class: moisture
off_delay: 10
# https://community.home-assistant.io/t/sonoff-rf-bridge-strategies-for-receiving-data/108181
# Sensor Payload Definition:
# '<PAYLOAD>':['<PUBLISH_TOPIC>','<STATE>','<RETAIN>']
sensors = {
'F5ACFB': ['half_bath_water_sensor', 'ON', 'false'],
'F58AFB': ['laundry_room_water_sensor', 'ON', 'false'],
'F4AEFB': ['hvac_drip_pan_water_sensor', 'ON', 'false'],
'F819FB': ['guest_bath_sink_water_sensor', 'ON', 'false'],
'F4A5FB': ['master_bath_sink_water_sensor', 'ON', 'false']
}
"""
Initial Entrypoint
"""
def process_data():
payload = data.get('payload', {})
topic = str(payload.get('topic'))
if topic == "tele/RFBridge/RESULT":
sensor_id = payload.get('payload_json', {}).get('RfReceived', {}).get('Data')
if sensor_id is not None:
set_update_state(sensor_id)
elif topic == "tele/RFBridge/LWT":
status = payload.get('payload')
if status == 'Online':
set_init_state()
else:
logger.warning('<rfbridge_demux> LWT Status Unknown: {}'.format(status))
"""
On initial load we subscribe to "tele/RFBridge/LWT". This only returns
"Online" or "Offline". If we're "Online" set sensors to OFF. This is
acceptable because these sensors continously send updates if they're ON.
"""
def set_init_state():
for sensor_id in sensors.keys():
sensor = sensors[sensor_id]
sensor_state = hass.states.get('binary_sensor.{}'.format(sensor[0])).state
logger.info('<rfbridge_demux> Sensor {} State: {}'.format(sensor[0], sensor_state))
# LWT Online & Current Unknown State -> Set OFF
if sensor_state == 'unknown':
logger.info('<rfbridge_demux> Sensor {} State Unknown -> Setting: OFF'.format(sensor[0]))
service_data = {
'topic': 'home/{}'.format(sensor[0]),
'payload': 'OFF',
'retain': '{}'.format(sensor[2]),
'qos': 0
}
hass.services.call('mqtt', 'publish', service_data, False)
elif sensor_state in ['on', 'off']:
logger.info('<rfbridge_demux> Sensor {} State Known -> Skipping'.format(sensor[0]))
else:
logger.warning('<rfbridge_demux> Sensor {} State Unknown: {}'.format(sensor[0], sensor_state))
"""
Given a Sensor ID, publish the new state. If the Sensor ID is unknown,
then publish an unknown state and print a warning.
"""
def set_update_state(sensor_id):
if sensor_id in sensors.keys():
sensor = sensors[sensor_id]
logger.info('<rfbridge_demux> Updating Sensor: {}'.format(sensor))
service_data = {
'topic': 'home/{}'.format(sensor[0]),
'payload': '{}'.format(sensor[1]),
'retain': '{}'.format(sensor[2]),
'qos': 0
}
else:
logger.warning('<rfbridge_demux> Unknown Sensor: {}'.format(sensor_id))
service_data = {
'topic': 'home/unknown',
'payload': '{}'.format(sensor_id),
'retain': 'false',
'qos': 0
}
hass.services.call('mqtt', 'publish', service_data, False)
# Call Initial Load
process_data()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment