|
from openrgb import OpenRGBClient |
|
from openrgb.utils import RGBColor, DeviceType |
|
import paho.mqtt.client as mqtt |
|
import json |
|
import time |
|
import argparse |
|
|
|
ENTITY_NAME = "pc" |
|
|
|
parser = argparse.ArgumentParser() |
|
parser.add_argument("--openrgb_port", help="The port the openrgb port is using", default=6742, type=int) |
|
parser.add_argument("mqtt_address", help="URL for mqtt", type=str) |
|
args = parser.parse_args() |
|
|
|
|
|
base_topic = "homeassistant/light/" + ENTITY_NAME |
|
state = {"state":"ON","brightness":255,"color":{"r":255,"g":255,"b":255}} |
|
|
|
def on_connect(client, userdata, flags, rc): |
|
print("Connection returned result: "+mqtt.connack_string(rc)) |
|
|
|
def get_real_color(state): |
|
"""Apply brightness scaling to RGB""" |
|
brightness_multiplier = state["brightness"]/255 |
|
return [int(state["color"]["r"]*brightness_multiplier),int(state["color"]["g"]*brightness_multiplier),int(state["color"]["b"]*brightness_multiplier)] |
|
|
|
def on_message(client, userdata, message): |
|
global state |
|
print("message received " ,str(message.payload.decode("utf-8"))) |
|
print("message topic=",message.topic) |
|
print("message qos=",message.qos) |
|
print("message retain flag=",message.retain) |
|
msg = json.loads(message.payload) |
|
starting_col = get_real_color(state) |
|
state = {**state, **msg} |
|
client.publish(base_topic+"/state", payload=json.dumps(state), qos=0, retain=False) |
|
|
|
if msg["state"] == "OFF": |
|
final_col = [0, 0, 0] |
|
else: |
|
final_col = get_real_color(state) |
|
light.set_color(RGBColor(final_col[0],final_col[1],final_col[2])) |
|
|
|
|
|
def Initialise_clients(cname): |
|
#callback assignment |
|
client= mqtt.Client(cname,False) #don't use clean session |
|
client.on_connect= on_connect #attach function to callback |
|
client.on_message=on_message #attach function to callback |
|
client.topic_ack=[] |
|
client.run_flag=False |
|
client.running_loop=False |
|
client.subscribe_flag=False |
|
client.bad_connection_flag=False |
|
client.connected_flag=False |
|
client.disconnect_flag=False |
|
return client |
|
|
|
client = Initialise_clients(ENTITY_NAME+"_rgb") |
|
# client.username_pw_set("user", password="password") |
|
|
|
|
|
rgb_client = OpenRGBClient('127.0.0.1', args.openrgb_port, 'mqtt') |
|
light = rgb_client.get_devices_by_type(DeviceType.MOTHERBOARD)[0] |
|
|
|
client.connect(args.mqtt_address) |
|
client.subscribe(base_topic+"/set") |
|
|
|
config = {"~": base_topic, "name":ENTITY_NAME, "unique_id":ENTITY_NAME+"_light", "cmd_t": "~/set","stat_t":"~/state", "schema": "json", "brightness":"true","rgb":"true"} |
|
client.publish(base_topic+"/config", payload=json.dumps(config), qos=0, retain=False) |
|
client.publish(base_topic+"/state", payload=json.dumps(state), qos=0, retain=False) |
|
|
|
# Set initial light |
|
c = get_real_color(state) |
|
light.set_color(RGBColor(c[0],c[1],c[2])) |
|
|
|
client.loop_forever() |
Hmmm. I'd try using mqtt-explorer or similar and check whether the mqtt entries are made or not. Otherwise, I'm directly using the paho-mqtt library so check the documentation there.
If you figure it out - let me know and I'll update the code.