-
-
Save svyatogor/7839d00303998a9fa37eb48494dd680f to your computer and use it in GitHub Desktop.
import io | |
import base64 | |
import json | |
import sys | |
from bisect import bisect | |
from struct import pack, unpack | |
from math import ceil | |
BRDLNK_UNIT = 269 / 8192 | |
# MAIN API | |
filter = lambda x: [i for i in x if i<65535] | |
def encode_ir(command: str) -> str: | |
# command = "JgC8AXE5DioPDg0PDQ8OKw0PDg4ODw0PDSsODw0rDisNDw0sDSsOKw0rDisNDwwQDSwMEA0QDBAMEAwQDRAMLAwtDSsOKwwQDBANEAwQDBANEAwQDBAMEA0QDBAMEAwQDRAMEAwQDRAMEAwQDBANEAwQDBAMEA4PDSsODw0PDQ8ODg4PDQ8NAAPNcjgOKg8ODg4ODg8qDg4PDQ8NDw4OKg8ODioPKg4ODykPKg8pDyoPKg4ODw0PKg4ODw0PDg4ODg4PDQ8ODg4PDQ8ODg4ODg8NDw4ODg8NDw0PDg4ODw0PDg4ODg4PDQ8qDw0PDQ8ODioPKg4ODyoODg8NDw0PDg4ODw0PDg4ODg4PDQ8ODg4PDQ8ODg4OKg8ODioPDQ8ODg4PDQ8ODg4ODg8NDw4ODg8NDw0PDg4ODw0PDg4ODg4PDQ8ODg4PDQ8ODg4ODg8NDw4ODg8NDw0PDg4ODw0PDg4ODg4PDQ8ODg4PDQ8NDw4ODg8NDw4ODg4ODw0PDg4ODg4PDQ8ODg4PKg4qDw0PDg4ODg4PDg4ODg4ODg8ODg4NDw4ODg8NDw0PDg8NDw0rDisNKw4rDg4OKw0rDgANBQAAAAAAAAAAAAAAAA==" | |
signal = filter(get_raw_from_broadlink(base64.b64decode(command).hex())) | |
payload = b''.join(pack('<H', t) for t in signal) | |
compress(out := io.BytesIO(), payload, level = 2) | |
payload = out.getvalue() | |
return base64.encodebytes(payload).decode('ascii').replace('\n', '') | |
# COMPRESSION | |
def emit_literal_blocks(out: io.FileIO, data: bytes): | |
for i in range(0, len(data), 32): | |
emit_literal_block(out, data[i:i+32]) | |
def emit_literal_block(out: io.FileIO, data: bytes): | |
length = len(data) - 1 | |
assert 0 <= length < (1 << 5) | |
out.write(bytes([length])) | |
out.write(data) | |
def emit_distance_block(out: io.FileIO, length: int, distance: int): | |
distance -= 1 | |
assert 0 <= distance < (1 << 13) | |
length -= 2 | |
assert length > 0 | |
block = bytearray() | |
if length >= 7: | |
assert length < (1 << 8) | |
block.append(length - 7) | |
length = 7 | |
block.insert(0, length << 5 | distance >> 8) | |
block.append(distance & 0xFF) | |
out.write(block) | |
def compress(out: io.FileIO, data: bytes, level=2): | |
''' | |
Takes a byte string and outputs a compressed "Tuya stream". | |
Implemented compression levels: | |
0 - copy over (no compression, 3.1% overhead) | |
1 - eagerly use first length-distance pair found (linear) | |
2 - eagerly use best length-distance pair found | |
3 - optimal compression (n^3) | |
''' | |
if level == 0: | |
return emit_literal_blocks(out, data) | |
W = 2**13 # window size | |
L = 256+9 # maximum length | |
distance_candidates = lambda: range(1, min(pos, W) + 1) | |
def find_length_for_distance(start: int) -> int: | |
length = 0 | |
limit = min(L, len(data) - pos) | |
while length < limit and data[pos + length] == data[start + length]: | |
length += 1 | |
return length | |
find_length_candidates = lambda: \ | |
( (find_length_for_distance(pos - d), d) for d in distance_candidates() ) | |
find_length_cheap = lambda: \ | |
next((c for c in find_length_candidates() if c[0] >= 3), None) | |
find_length_max = lambda: \ | |
max(find_length_candidates(), key=lambda c: (c[0], -c[1]), default=None) | |
if level >= 2: | |
suffixes = []; next_pos = 0 | |
key = lambda n: data[n:] | |
find_idx = lambda n: bisect(suffixes, key(n), key=key) | |
def distance_candidates(): | |
nonlocal next_pos | |
while next_pos <= pos: | |
if len(suffixes) == W: | |
suffixes.pop(find_idx(next_pos - W)) | |
suffixes.insert(idx := find_idx(next_pos), next_pos) | |
next_pos += 1 | |
idxs = (idx+i for i in (+1,-1)) # try +1 first | |
return (pos - suffixes[i] for i in idxs if 0 <= i < len(suffixes)) | |
if level <= 2: | |
find_length = { 1: find_length_cheap, 2: find_length_max }[level] | |
block_start = pos = 0 | |
while pos < len(data): | |
if (c := find_length()) and c[0] >= 3: | |
emit_literal_blocks(out, data[block_start:pos]) | |
emit_distance_block(out, c[0], c[1]) | |
pos += c[0] | |
block_start = pos | |
else: | |
pos += 1 | |
emit_literal_blocks(out, data[block_start:pos]) | |
return | |
# use topological sort to find shortest path | |
predecessors = [(0, None, None)] + [None] * len(data) | |
def put_edge(cost, length, distance): | |
npos = pos + length | |
cost += predecessors[pos][0] | |
current = predecessors[npos] | |
if not current or cost < current[0]: | |
predecessors[npos] = cost, length, distance | |
for pos in range(len(data)): | |
if c := find_length_max(): | |
for l in range(3, c[0] + 1): | |
put_edge(2 if l < 9 else 3, l, c[1]) | |
for l in range(1, min(32, len(data) - pos) + 1): | |
put_edge(1 + l, l, 0) | |
# reconstruct path, emit blocks | |
blocks = []; pos = len(data) | |
while pos > 0: | |
_, length, distance = predecessors[pos] | |
pos -= length | |
blocks.append((pos, length, distance)) | |
for pos, length, distance in reversed(blocks): | |
if not distance: | |
emit_literal_block(out, data[pos:pos + length]) | |
else: | |
emit_distance_block(out, length, distance) | |
def get_raw_from_broadlink(string): | |
dec = [] | |
unit = BRDLNK_UNIT # 32.84ms units, or 2^-15s | |
length = int(string[6:8] + string[4:6], 16) # Length of payload in little endian | |
i = 8 | |
while i < length * 2 + 8: # IR Payload | |
hex_value = string[i:i+2] | |
if hex_value == "00": | |
hex_value = string[i+2:i+4] + string[i+4:i+6] # Quick & dirty big-endian conversion | |
i += 4 | |
dec.append(ceil(int(hex_value, 16) / unit)) # Will be lower than initial value due to former round() | |
i += 2 | |
return dec | |
def process_commands(filename): | |
with open(filename, 'r') as file: | |
data = json.load(file) | |
def process_commands_recursively(commands): | |
processed_commands = {} | |
for key, value in commands.items(): | |
if isinstance(value, str): | |
processed_commands[key] = encode_ir(value) | |
elif isinstance(value, dict): | |
processed_commands[key] = process_commands_recursively(value) | |
else: | |
processed_commands[key] = value | |
return processed_commands | |
data['commands'] = process_commands_recursively(data.get('commands', {})) | |
data['supportedController'] = 'MQTT' | |
data['commandsEncoding'] = 'Raw' | |
return json.dumps(data, indent=2) | |
print(process_commands(sys.argv[1])) |
Thanks so much for creating this! Very helpful indeed.
I replaced print(process_commands(sys.argv[1]))
with
output_filename = sys.argv[1].replace(".json", "_converted.json")
with open(output_filename, 'w') as output_file:
output_file.write(process_commands(sys.argv[1]))
this way it outputs a converted file instead of printing the conversion in the terminal window.
The converted codes work - tried using the mqtt.publish service:
service: mqtt.publish
data:
payload: >-
{"ir_code_to_send":"CIAifRGeAiUCYiABBIACbQaeIAMCgAJDIAsCiwZiYAcFQwK9AugBgBcAi6AbACVgF0ALwCNAK8BHAp4CBmA/gENAAYAjwDsCYgKqYEcEYgJPBtsgS0A3QHuAdwHsTUBXQAGAF4BjgAOAW8BT4AN7gE/gA2+AV4DDQEHgAa/gAavgAZPgAQdAiwK4m9shF8BDgJtA+0C7AIsgswHoAeEBA+ABG+EBD+ABT4FXgAeAi4BLwM9BN8BPQFtBNwCeIRdANYCzwHcB7E3gA7uBY+AB++ADY8AjAtsCySCn4QUnwD+Av0GfgaPgBbPBr8AfgAMBgAI="}
topic: zigbee2mqtt/Living room - aircon/set
However, the converted JSON will now work - z2m gives an error of "Invalid message 'null', skipping...".
By me I need to send the full payload JSON data and escape the double quotes.
So for instance, I tried this just on the off command and this works.
The "heat" -> "low" -> "16" does not (z2m gives an error of "Invalid message 'null', skipping...".)
"commands": {
"off": "{\"ir_code_to_send\": \"CIAinBFiAkMCYiABA4ACbQZACwCAYAMBbQZAB+ABDwAlICECiwaeoCMAniAPQBPgASPgAwuAP4BTAENgAcAbgDeAG0BPgCtAfwBtIFtAYwIoTiWggwKeAgbgAAeAG0ABgHvAC+ADVwJiAqqgcwAl4AgjwJ/ACwCe4AK34AGLgAMHYgJvnDcjfRHgAc9Ap8DvAYsGgS2Af4AJQSuAy+ADb8Ej4AWDgEvgAacABuACp0B7QAMAquAAb8FvA4ACzU3A70EtwXfAR4CbQJHhA6/gA6PhAU/gATvA7+ABE0CFwJfgARtAJ0Ib\"}",
"heat": {
"low": {
"16": "GmIi2RFiAr0CBgIlAoACqgYlAgUHgAJPBkMCniATAZ4CQAdAGQBiIAUAYiAHACVgFwCAYBEAQyADQAUAQ2A/QB1AH0ADAGIgDwFCB0A/ACVgQUApAIAgBYA/A8kBBQdARQBiIG9AE4BHAMhgCwMlAmVOQDFAWUArQF2AH4B3wC+AE0AHQCuAmwBDIE/AFUA7gB9AqYBt4Ak7QEOAs+ABOwDIIQMCrJy9YRcAgKEDAIsgGwDnIRNAG4EP4AE7wEuA24AbgR2AtwC9YQOAe0EHQOeBK+ABz4FrACUhA4ETgGeA50AvQSFBF+AB/4BfgCPhAStAL8BvgHsB6AGBqUAFgK2AswdDAs8D9ADnBuAFg4DDwM9BT0D7APpiD0C/",
This is my configuration YAML in HA:
smartir:
climate:
- platform: smartir
name: Living room - aircon
unique_id: ac_living_room
device_code: 1343
controller_data: zigbee2mqtt/Living room - aircon/set
Is there a way to modify this script to add this kind of output?
Much appreciated =).
I am a sysadmin and can do PowerShell and not python =).
@leviustinov You need to specify the MQTT topic as zigbee2mqtt/Living room - aircon/set/ir_code_to_send
, it will force z2m to put the payload in the correct field.
@leviustinov You need to specify the MQTT topic as
zigbee2mqtt/Living room - aircon/set/ir_code_to_send
, it will force z2m to put the payload in the correct field.
Works great, thanks!
In the meantime, knowing 0 python I ChatGPTed the fix for adding the strings and that works as well. Will be using what you mentioned though.
def encode_ir(command: str) -> str:
signal = filter(get_raw_from_broadlink(base64.b64decode(command).hex()))
payload = b''.join(pack('<H', t) for t in signal)
compress_out = io.BytesIO()
compress(compress_out, payload, level=2)
compressed_payload = compress_out.getvalue()
encoded_payload = base64.encodebytes(compressed_payload).decode('ascii').replace('\n', '')
return "{\"ir_code_to_send\": \"" + encoded_payload + "\"}"
Huge shoutout to @mildsunrise for their research on the Tuya's format