Skip to content

Instantly share code, notes, and snippets.

@HNJAMeindersma
Last active February 16, 2024 21:21
Show Gist options
  • Save HNJAMeindersma/36583dde8e0eb8e97e2cff2e7d9d2836 to your computer and use it in GitHub Desktop.
Save HNJAMeindersma/36583dde8e0eb8e97e2cff2e7d9d2836 to your computer and use it in GitHub Desktop.
Goal is to control switching HDMI sources on my TV, unfortunately my TV's API is hardly working so I'll need HDMI-CEC. Since LibreELEC-11.0 has no default package manager, thus PIP packages cannot easily be installed (like paho.mqtt). And considering cec-client is not functioning properly on my system, thus the more difficult cec-ctl had to be u…
[Unit]
Description=HDMI-CEC HTTP Bridge
After=multi-user.target
[Service]
Type=simple
Restart=always
ExecStart=/usr/bin/python3 /storage/LibreELEC-11.0_HDMI-CEC_HTTP_bridge.py
[Install]
WantedBy=multi-user.target
from http.server import BaseHTTPRequestHandler, HTTPServer
import re
import subprocess
import json
# Function: add value to dictionary
def nested_set(dic, keys, value):
for key in keys[:-1]:
dic = dic.setdefault(key, {})
dic[keys[-1]] = value
# Setup HTTP request handler
class S(BaseHTTPRequestHandler):
# Handle GET requests
def do_GET(self):
# Handle /topology
if self.path == '/topology':
# Create empty dictionary
topo_dict = {
'logical': {},
'physical': {},
'name': {}
}
topo_dict_list = []
# --- 1: GET SELF ---
# Get own CEC info
topo_command_input = 'cec-ctl --logical-address'
topo_command_output = subprocess.check_output(topo_command_input, shell=True)
# Sanitize output
topo_output_string = topo_command_output.decode() # Decode to string
topo_output_string = topo_output_string.replace('\t', '') # Remove tabs
topo_output_string = re.sub(' +', ' ', topo_output_string) # Remove double spaces
# Create empty item dictionary
topo_dict_self = {'host':'true'}
# Split output to lines
topo_lines = list(filter(None, topo_output_string.split('\n')))
# Run through lines
for topo_line in topo_lines:
# Check if key/value pair
topo_split_line = topo_line.split(' : ')
if len(topo_split_line) > 1:
# Sanitize 'Logical Address'
if topo_split_line[0].strip() == 'Logical Address':
topo_dict_self[topo_split_line[0].strip()] = topo_split_line[1].split('(')[0].strip().replace('\'', '')
# Correct self 'OSD Name'
elif topo_split_line[0].strip() == 'OSD Name':
topo_dict_self[topo_split_line[0].strip()] = subprocess.check_output('cat /etc/os-release | grep "NAME" | grep -v "PRETTY_NAME" | cut -d = -f 2 | tr -d \'"\' | tac | tr \'\n\' \' \'', shell=True).decode().strip()
# Save value to dictionary
else:
topo_dict_self[topo_split_line[0].strip()] = topo_split_line[1].strip().replace('\'', '')
# Place item dictionary inside dictionary
nested_set(topo_dict, ['logical', topo_dict_self.get('Logical Address')], topo_dict_self)
nested_set(topo_dict, ['physical', topo_dict_self.get('Physical Address')], topo_dict_self)
nested_set(topo_dict, ['name', topo_dict_self.get('Vendor ID') + ' - ' + topo_dict_self.get('OSD Name')], topo_dict_self)
nested_set(topo_dict, ['host'], topo_dict_self)
topo_dict_list.append(topo_dict_self)
# --- 2: GET OTHERS ---
# Get CEC topology
topo_command_input = 'cec-ctl --show-topology --skip-info'
topo_command_output = subprocess.check_output(topo_command_input, shell=True)
# Sanitize output
topo_output_string = topo_command_output.decode() # Decode to string
topo_output_string = topo_output_string.split('\n\n')[0] # Remove topology summary
topo_output_string = topo_output_string.replace('\t', '') # Remove tabs
topo_output_string = re.sub(' +', ' ', topo_output_string) # Remove double spaces
# Only continue if any results
if topo_output_string.strip() != 'Topology:':
# Split output to items
topo_items = list(filter(None, topo_output_string.split('System Information for device ')))
# Run through items
for topo_item in topo_items:
# Create empty item dictionary
topo_dict_item = {}
# Split item to lines
topo_lines = list(filter(None, topo_item.split('\n')))
# Extract logical address
topo_dict_item['Logical Address'] = topo_item.split(' ')[0].strip()
# Run through lines
for topo_line in topo_lines:
# Check if key/value pair
topo_split_line = topo_line.split(' : ')
if len(topo_split_line) > 1:
# Save value to dictionary
topo_dict_item[topo_split_line[0].strip()] = topo_split_line[1].strip().replace('\'', '')
# Place item dictionary inside dictionary
nested_set(topo_dict, ['logical', topo_dict_item.get('Logical Address')], topo_dict_item)
nested_set(topo_dict, ['physical', topo_dict_item.get('Physical Address')], topo_dict_item)
nested_set(topo_dict, ['name', topo_dict_item.get('Vendor ID') + ' - ' + topo_dict_item.get('OSD Name')], topo_dict_item)
topo_dict_list.append(topo_dict_item)
# --- 3: FINISH & SEND ---
# Add list to dictionary
topo_dict['list'] = topo_dict_list
# Convert dictionary to JSON string
topo_json_string = json.dumps(topo_dict, indent = None)
# Send response
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(topo_json_string.encode('utf-8'))
# Handle /state/logical/$
elif self.path.startswith('/state/logical/'):
if self.path.split('/')[3].isdigit():
output = subprocess.check_output('cec-ctl --to ' + self.path.split('/')[3] + ' --give-device-power-status | grep "pwr-state" | cut -d ":" -f2 | cut -d "(" -f1 | tr -d " "', shell=True)
if not output.strip():
self.send_response(500)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write('Empty CEC response!'.encode('utf-8'))
else:
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(output.strip())
else:
self.send_response(400)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write('Incomplete request!'.encode('utf-8'))
# Not found
else:
self.send_response(404)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write('GET endpoint not found'.encode('utf-8'))
# Handle POST requests
def do_POST(self):
# Check POST data size
content_length = int(self.headers['Content-Length'])
if content_length > 255:
self.send_response(507)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write('POST body to large!'.encode('utf-8'))
# Retrieve POST data
else:
post_data = self.rfile.read(content_length).decode()
# Handle /state/logical/$
if self.path.startswith('/state/logical/'):
if self.path.split('/')[3].isdigit():
if post_data == 'on':
subprocess.check_output('cec-ctl --to ' + self.path.split('/')[3] + ' --image-view-on', shell=True)
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write('on'.encode('utf-8'))
elif post_data == 'standby':
subprocess.check_output('cec-ctl --to ' + self.path.split('/')[3] + ' --standby', shell=True)
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write('standby'.encode('utf-8'))
else:
self.send_response(400)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write('POST body invalid!'.encode('utf-8'))
else:
self.send_response(400)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write('Incomplete request!'.encode('utf-8'))
# Handle /source/logical/$
elif self.path.startswith('/source/logical/'):
if self.path.split('/')[3].isdigit():
#allowed_input = re.compile(r'^[0-9]\.[0-9]\.[0-9]\.[0-9]$')
allowed_input = re.compile(r'^\d{1}\.\d{1}\.\d{1}\.\d{1}$')
if (allowed_input.match(post_data) and content_length == 7):
input = 'cec-ctl --to ' + self.path.split('/')[3] + ' --image-view-on --active-source phys-addr=' + str(post_data) + ' | grep "phys-addr" | cut -d ":" -f2 | tr -d " "'
output = subprocess.check_output(input, shell=True)
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(output.strip())
else:
self.send_response(400)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write('POST body invalid!'.encode('utf-8'))
else:
self.send_response(400)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write('Incomplete request!'.encode('utf-8'))
# Handle /keypress/logical/$
elif self.path.startswith('/keypress/logical/'):
if self.path.split('/')[3].isdigit():
allowed_input = re.compile(r'^[a-zA-Z0-9-]*$')
if (allowed_input.match(post_data) and content_length < 64):
input = 'cec-ctl --to 0 --user-control-pressed ui-cmd=' + str(post_data) + ' | grep \'ui-cmd\' | cut -d ":" -f2 | cut -d "(" -f1 | tr -d " "'
output = subprocess.check_output(input, shell=True)
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(output.strip())
else:
self.send_response(400)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write('POST body invalid!'.encode('utf-8'))
else:
self.send_response(400)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write('Incomplete request!'.encode('utf-8'))
# Not found
else:
self.send_response(404)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write('POST endpoint not found'.encode('utf-8'))
# Setup HTTP server
def run(server_class=HTTPServer, handler_class=S, port=8232):
server_address = ('', port)
httpd = server_class(server_address, handler_class)
try:
httpd.serve_forever()
except KeyboardInterrupt:
pass
httpd.server_close()
# Run HTTP server
if __name__ == '__main__':
from sys import argv
if len(argv) == 2:
run(port=int(argv[1]))
else:
run()
String Woonkamer_CEC_topology "CEC topology" <text> { channel="http:url:woonkamer_cec:cec_topology" }
Switch Woonkamer_CEC_online "Online" <switch> { channel="network:servicedevice:woonkamer_cec:online" }
Number:Time Woonkamer_CEC_latency "Latency" <time> { channel="network:servicedevice:woonkamer_cec:latency" }
DateTime Woonkamer_CEC_last_seen "Last seen" <time> { channel="network:servicedevice:woonkamer_cec:lastseen" }
String Woonkamer_CEC_logical_address_TV "CEC logical address: TV" <text> { channel="http:url:woonkamer_cec:cec_logical_address_tv" }
String Woonkamer_CEC_physical_address_TV "CEC physical address: TV" <text> { channel="http:url:woonkamer_cec:cec_physical_address_tv" }
Switch Woonkamer_CEC_state_TV "CEC state: TV" <switch> { autoupdate="true" }
String Woonkamer_CEC_source_TV "CEC source: TV" <receiver> { autoupdate="true" }
String Woonkamer_CEC_keypress_TV "CEC key press: TV" <mediacontrol> { autoupdate="true" }
String Woonkamer_CEC_logical_address_Kodi "CEC logical address: Kodi" <text> { channel="http:url:woonkamer_cec:cec_logical_address_kodi" }
String Woonkamer_CEC_physical_address_Kodi "CEC physical address: Kodi" <text> { channel="http:url:woonkamer_cec:cec_physical_address_kodi" }
Switch Woonkamer_CEC_state_Kodi "CEC state: Kodi" <switch> { channel="network:servicedevice:woonkamer_cec:online" }
String Woonkamer_CEC_keypress_Kodi "CEC key press: Kodi" <mediacontrol> { autoupdate="true" }
String Woonkamer_CEC_logical_address_PlayStation4 "CEC logical address: PlayStation 4" <text> { channel="http:url:woonkamer_cec:cec_logical_address_playstation4" }
String Woonkamer_CEC_physical_address_PlayStation4 "CEC physical address: PlayStation 4" <text> { channel="http:url:woonkamer_cec:cec_physical_address_playstation4" }
Switch Woonkamer_CEC_state_PlayStation4 "CEC state: PlayStation 4" <switch> { autoupdate="true" }
String Woonkamer_CEC_keypress_PlayStation4 "CEC key press: PlayStation 4" <mediacontrol> { autoupdate="true" }
String Woonkamer_CEC_logical_address_Chromecast "CEC logical address: Chromecast" <text> { channel="http:url:woonkamer_cec:cec_logical_address_chromecast" }
String Woonkamer_CEC_physical_address_Chromecast "CEC physical address: Chromecast" <text> { channel="http:url:woonkamer_cec:cec_physical_address_chromecast" }
Switch Woonkamer_CEC_state_Chromecast "CEC state: Chromecast" <switch> { autoupdate="true" }
String Woonkamer_CEC_keypress_Chromecast "CEC key press: Chromecast" <mediacontrol> { autoupdate="true" }
// --- Config ---
val WoonkamerCEChost = "192.168.x.x."
val WoonkamerCECport = "8232"
val WoonkamerCECtimeout = 5000
// --- TV ---
rule "CEC state status: TV"
when
Time cron "0/5 * * * * *"
then
val url = "http://" + WoonkamerCEChost + ":" + WoonkamerCECport + "/state/logical/" + Woonkamer_CEC_logical_address_TV.state
val output = sendHttpGetRequest(url, newHashMap(), WoonkamerCECtimeout)
if (output == 'on' || output == 'to-on') Woonkamer_CEC_state_TV.postUpdate(ON)
if (output == 'standby' || output == 'to-standby') Woonkamer_CEC_state_TV.postUpdate(OFF)
end
rule "CEC state command: TV"
when
Item Woonkamer_CEC_state_TV received command
then
Thread::sleep(50)
val url = "http://" + WoonkamerCEChost + ":" + WoonkamerCECport + "/state/logical/" + Woonkamer_CEC_logical_address_TV.state
val command = if (Woonkamer_CEC_state_TV.state == ON) "on" else "standby"
sendHttpPostRequest(url, "text/plain", command, newHashMap(), WoonkamerCECtimeout)
end
rule "CEC source command: TV"
when
Item Woonkamer_CEC_source_TV received command
then
if (Woonkamer_CEC_state_TV.state != ON) {
Woonkamer_CEC_state_TV.sendCommand(ON)
Thread::sleep(WoonkamerCECtimeout)
}
Thread::sleep(50)
val url = "http://" + WoonkamerCEChost + ":" + WoonkamerCECport + "/source/logical/" + Woonkamer_CEC_logical_address_TV.state
var command = ""
if (Woonkamer_CEC_source_TV.state == "Kodi") {
command = "" + Woonkamer_CEC_physical_address_Kodi.state
} else if (Woonkamer_CEC_source_TV.state == "PlayStation 4") {
command = "" + Woonkamer_CEC_physical_address_PlayStation4.state
} else if (Woonkamer_CEC_source_TV.state == "Chromecast") {
command = "" + Woonkamer_CEC_physical_address_Chromecast.state
}
sendHttpPostRequest(url, "text/plain", command, newHashMap(), WoonkamerCECtimeout)
Thread::sleep(50)
Woonkamer_CEC_source_TV.postUpdate("")
end
rule "CEC key press command: TV"
when
Item Woonkamer_CEC_keypress_TV received command
then
Thread::sleep(50)
val url = "http://" + WoonkamerCEChost + ":" + WoonkamerCECport + "/keypress/logical/" + Woonkamer_CEC_logical_address_TV.state
val command = "" + Woonkamer_CEC_keypress_TV.state
sendHttpPostRequest(url, "text/plain", command, newHashMap(), WoonkamerCECtimeout)
Thread::sleep(50)
Woonkamer_CEC_keypress_TV.postUpdate("")
end
// --- Kodi ---
rule "CEC key press command: Kodi"
when
Item Woonkamer_CEC_keypress_Kodi received command
then
Thread::sleep(50)
val url = "http://" + WoonkamerCEChost + ":" + WoonkamerCECport + "/keypress/logical/" + Woonkamer_CEC_logical_address_Kodi.state
val command = "" + Woonkamer_CEC_keypress_Kodi.state
sendHttpPostRequest(url, "text/plain", command, newHashMap(), WoonkamerCECtimeout)
Thread::sleep(50)
Woonkamer_CEC_keypress_Kodi.postUpdate("")
end
// --- PlayStation 4 ---
rule "CEC state status: PlayStation 4"
when
Time cron "0/5 * * * * *"
then
val url = "http://" + WoonkamerCEChost + ":" + WoonkamerCECport + "/state/logical/" + Woonkamer_CEC_logical_address_PlayStation4.state
val output = sendHttpGetRequest(url, newHashMap(), WoonkamerCECtimeout)
if (output == 'on' || output == 'to-on') Woonkamer_CEC_state_PlayStation4.postUpdate(ON)
if (output == 'standby' || output == 'to-standby') Woonkamer_CEC_state_PlayStation4.postUpdate(OFF)
end
rule "CEC key press command: PlayStation 4"
when
Item Woonkamer_CEC_keypress_PlayStation4 received command
then
Thread::sleep(50)
val url = "http://" + WoonkamerCEChost + ":" + WoonkamerCECport + "/keypress/logical/" + Woonkamer_CEC_logical_address_PlayStation4.state
val command = "" + Woonkamer_CEC_keypress_PlayStation4.state
sendHttpPostRequest(url, "text/plain", command, newHashMap(), WoonkamerCECtimeout)
Thread::sleep(50)
Woonkamer_CEC_keypress_PlayStation4.postUpdate("")
end
// --- Chromecast ---
rule "CEC state status: Chromecast"
when
Time cron "0/5 * * * * *"
then
val url = "http://" + WoonkamerCEChost + ":" + WoonkamerCECport + "/state/logical/" + Woonkamer_CEC_logical_address_Chromecast.state
val output = sendHttpGetRequest(url, newHashMap(), WoonkamerCECtimeout)
if (output == 'on' || output == 'to-on') Woonkamer_CEC_state_Chromecast.postUpdate(ON)
if (output == 'standby' || output == 'to-standby') Woonkamer_CEC_state_Chromecast.postUpdate(OFF)
end
rule "CEC key press command: Chromecast"
when
Item Woonkamer_CEC_keypress_Chromecast received command
then
Thread::sleep(50)
val url = "http://" + WoonkamerCEChost + ":" + WoonkamerCECport + "/keypress/logical/" + Woonkamer_CEC_logical_address_Chromecast.state
val command = "" + Woonkamer_CEC_keypress_Chromecast.state
sendHttpPostRequest(url, "text/plain", command, newHashMap(), WoonkamerCECtimeout)
Thread::sleep(50)
Woonkamer_CEC_keypress_Chromecast.postUpdate("")
end
Thing http:url:woonkamer_cec "CEC" @ "Woonkamer" [
baseURL = "http://192.168.x.x:8232",
refresh = "300",
timeout = "20000",
bufferSize = "9192",
stateMethod = "GET",
contentType = "application/json",
ignoreSSLErrors = "true"
] {
Channels:
Type string : cec_topology "CEC topology" [
stateExtension = "/topology",
mode = "READONLY"
]
Type string : cec_logical_address_tv "CEC logical address: TV" [
stateExtension = "/topology",
stateTransformation = "JSONPATH:$.['name'].['0x00903e (Philips) - TV'].['Logical Address']",
mode = "READONLY"
]
Type string : cec_physical_address_tv "CEC physical address: TV" [
stateExtension = "/topology",
stateTransformation = "JSONPATH:$.['name'].['0x00903e (Philips) - TV'].['Physical Address']",
mode = "READONLY"
]
Type string : cec_logical_address_kodi "CEC logical address: Kodi" [
stateExtension = "/topology",
stateTransformation = "JSONPATH:$.['name'].['0x001582 (Pulse-Eight) - LibreELEC'].['Logical Address']",
mode = "READONLY"
]
Type string : cec_physical_address_kodi "CEC physical address: Kodi" [
stateExtension = "/topology",
stateTransformation = "JSONPATH:$.['name'].['0x001582 (Pulse-Eight) - LibreELEC'].['Physical Address']",
mode = "READONLY"
]
Type string : cec_logical_address_playstation4 "CEC logical address: PlayStation 4" [
stateExtension = "/topology",
stateTransformation = "JSONPATH:$.['name'].['0x080046 (Sony) - PlayStation 4'].['Logical Address']",
mode = "READONLY"
]
Type string : cec_physical_address_playstation4 "CEC physical address: PlayStation 4" [
stateExtension = "/topology",
stateTransformation = "JSONPATH:$.['name'].['0x080046 (Sony) - PlayStation 4'].['Physical Address']",
mode = "READONLY"
]
Type string : cec_logical_address_chromecast "CEC logical address: Chromecast" [
stateExtension = "/topology",
stateTransformation = "JSONPATH:$.['name'].['0x001a11 (Google) - Chromecast'].['Logical Address']",
mode = "READONLY"
]
Type string : cec_physical_address_chromecast "CEC physical address: Chromecast" [
stateExtension = "/topology",
stateTransformation = "JSONPATH:$.['name'].['0x001a11 (Google) - Chromecast'].['Physical Address']",
mode = "READONLY"
]
}
Thing network:servicedevice:woonkamer_cec "CEC" @ "Woonkamer" [
hostname = "192.168.x.x",
port = 8232,
retry = 1,
timeout = 5000,
refreshInterval = 5000
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment