-
-
Save svyatogor/7839d00303998a9fa37eb48494dd680f to your computer and use it in GitHub Desktop.
import io | |
import base64 | |
import json | |
import sys | |
from bisect import bisect | |
from struct import pack, unpack | |
from math import ceil | |
BRDLNK_UNIT = 269 / 8192 | |
# MAIN API | |
filter = lambda x: [i for i in x if i<65535] | |
def encode_ir(command: str) -> str: | |
# command = "JgC8AXE5DioPDg0PDQ8OKw0PDg4ODw0PDSsODw0rDisNDw0sDSsOKw0rDisNDwwQDSwMEA0QDBAMEAwQDRAMLAwtDSsOKwwQDBANEAwQDBANEAwQDBAMEA0QDBAMEAwQDRAMEAwQDRAMEAwQDBANEAwQDBAMEA4PDSsODw0PDQ8ODg4PDQ8NAAPNcjgOKg8ODg4ODg8qDg4PDQ8NDw4OKg8ODioPKg4ODykPKg8pDyoPKg4ODw0PKg4ODw0PDg4ODg4PDQ8ODg4PDQ8ODg4ODg8NDw4ODg8NDw0PDg4ODw0PDg4ODg4PDQ8qDw0PDQ8ODioPKg4ODyoODg8NDw0PDg4ODw0PDg4ODg4PDQ8ODg4PDQ8ODg4OKg8ODioPDQ8ODg4PDQ8ODg4ODg8NDw4ODg8NDw0PDg4ODw0PDg4ODg4PDQ8ODg4PDQ8ODg4ODg8NDw4ODg8NDw0PDg4ODw0PDg4ODg4PDQ8ODg4PDQ8NDw4ODg8NDw4ODg4ODw0PDg4ODg4PDQ8ODg4PKg4qDw0PDg4ODg4PDg4ODg4ODg8ODg4NDw4ODg8NDw0PDg8NDw0rDisNKw4rDg4OKw0rDgANBQAAAAAAAAAAAAAAAA==" | |
signal = filter(get_raw_from_broadlink(base64.b64decode(command).hex())) | |
payload = b''.join(pack('<H', t) for t in signal) | |
compress(out := io.BytesIO(), payload, level = 2) | |
payload = out.getvalue() | |
return base64.encodebytes(payload).decode('ascii').replace('\n', '') | |
# COMPRESSION | |
def emit_literal_blocks(out: io.FileIO, data: bytes): | |
for i in range(0, len(data), 32): | |
emit_literal_block(out, data[i:i+32]) | |
def emit_literal_block(out: io.FileIO, data: bytes): | |
length = len(data) - 1 | |
assert 0 <= length < (1 << 5) | |
out.write(bytes([length])) | |
out.write(data) | |
def emit_distance_block(out: io.FileIO, length: int, distance: int): | |
distance -= 1 | |
assert 0 <= distance < (1 << 13) | |
length -= 2 | |
assert length > 0 | |
block = bytearray() | |
if length >= 7: | |
assert length < (1 << 8) | |
block.append(length - 7) | |
length = 7 | |
block.insert(0, length << 5 | distance >> 8) | |
block.append(distance & 0xFF) | |
out.write(block) | |
def compress(out: io.FileIO, data: bytes, level=2): | |
''' | |
Takes a byte string and outputs a compressed "Tuya stream". | |
Implemented compression levels: | |
0 - copy over (no compression, 3.1% overhead) | |
1 - eagerly use first length-distance pair found (linear) | |
2 - eagerly use best length-distance pair found | |
3 - optimal compression (n^3) | |
''' | |
if level == 0: | |
return emit_literal_blocks(out, data) | |
W = 2**13 # window size | |
L = 256+9 # maximum length | |
distance_candidates = lambda: range(1, min(pos, W) + 1) | |
def find_length_for_distance(start: int) -> int: | |
length = 0 | |
limit = min(L, len(data) - pos) | |
while length < limit and data[pos + length] == data[start + length]: | |
length += 1 | |
return length | |
find_length_candidates = lambda: \ | |
( (find_length_for_distance(pos - d), d) for d in distance_candidates() ) | |
find_length_cheap = lambda: \ | |
next((c for c in find_length_candidates() if c[0] >= 3), None) | |
find_length_max = lambda: \ | |
max(find_length_candidates(), key=lambda c: (c[0], -c[1]), default=None) | |
if level >= 2: | |
suffixes = []; next_pos = 0 | |
key = lambda n: data[n:] | |
find_idx = lambda n: bisect(suffixes, key(n), key=key) | |
def distance_candidates(): | |
nonlocal next_pos | |
while next_pos <= pos: | |
if len(suffixes) == W: | |
suffixes.pop(find_idx(next_pos - W)) | |
suffixes.insert(idx := find_idx(next_pos), next_pos) | |
next_pos += 1 | |
idxs = (idx+i for i in (+1,-1)) # try +1 first | |
return (pos - suffixes[i] for i in idxs if 0 <= i < len(suffixes)) | |
if level <= 2: | |
find_length = { 1: find_length_cheap, 2: find_length_max }[level] | |
block_start = pos = 0 | |
while pos < len(data): | |
if (c := find_length()) and c[0] >= 3: | |
emit_literal_blocks(out, data[block_start:pos]) | |
emit_distance_block(out, c[0], c[1]) | |
pos += c[0] | |
block_start = pos | |
else: | |
pos += 1 | |
emit_literal_blocks(out, data[block_start:pos]) | |
return | |
# use topological sort to find shortest path | |
predecessors = [(0, None, None)] + [None] * len(data) | |
def put_edge(cost, length, distance): | |
npos = pos + length | |
cost += predecessors[pos][0] | |
current = predecessors[npos] | |
if not current or cost < current[0]: | |
predecessors[npos] = cost, length, distance | |
for pos in range(len(data)): | |
if c := find_length_max(): | |
for l in range(3, c[0] + 1): | |
put_edge(2 if l < 9 else 3, l, c[1]) | |
for l in range(1, min(32, len(data) - pos) + 1): | |
put_edge(1 + l, l, 0) | |
# reconstruct path, emit blocks | |
blocks = []; pos = len(data) | |
while pos > 0: | |
_, length, distance = predecessors[pos] | |
pos -= length | |
blocks.append((pos, length, distance)) | |
for pos, length, distance in reversed(blocks): | |
if not distance: | |
emit_literal_block(out, data[pos:pos + length]) | |
else: | |
emit_distance_block(out, length, distance) | |
def get_raw_from_broadlink(string): | |
dec = [] | |
unit = BRDLNK_UNIT # 32.84ms units, or 2^-15s | |
length = int(string[6:8] + string[4:6], 16) # Length of payload in little endian | |
i = 8 | |
while i < length * 2 + 8: # IR Payload | |
hex_value = string[i:i+2] | |
if hex_value == "00": | |
hex_value = string[i+2:i+4] + string[i+4:i+6] # Quick & dirty big-endian conversion | |
i += 4 | |
dec.append(ceil(int(hex_value, 16) / unit)) # Will be lower than initial value due to former round() | |
i += 2 | |
return dec | |
def process_commands(filename): | |
with open(filename, 'r') as file: | |
data = json.load(file) | |
def process_commands_recursively(commands): | |
processed_commands = {} | |
for key, value in commands.items(): | |
if isinstance(value, str): | |
processed_commands[key] = encode_ir(value) | |
elif isinstance(value, dict): | |
processed_commands[key] = process_commands_recursively(value) | |
else: | |
processed_commands[key] = value | |
return processed_commands | |
data['commands'] = process_commands_recursively(data.get('commands', {})) | |
data['supportedController'] = 'MQTT' | |
data['commandsEncoding'] = 'Raw' | |
return json.dumps(data, indent=2) | |
print(process_commands(sys.argv[1])) |
Try upgrading to Python 3.10 or newer. It seems the .py file isn't compatible with some older versions. I'm running Python 3.11.6 and it works fine for me.
You are correct!
Updated to python 3.12 and the script worked.
@rafareusch the command should simply be
python broadlink_to_tuya.py 1400.json
remove> 1400tuya.json
But this command is needed to direct the output to another file, and it works fine!
Thanks!! Awesome work you guys did!
Hi! I am trying to execute your script with python 3.13 and SmartIR file "1137.json". I am getting this error:
File "/Users/A200199775/tempo/broadlink_to_tuya.py", line 174, in <module>
print(process_commands(sys.argv[1]))
~~~~~~~~~~~~~~~~^^^^^^^^^^^^^
File "/Users/A200199775/tempo/broadlink_to_tuya.py", line 168, in process_commands
data['commands'] = process_commands_recursively(data.get('commands', {}))
~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/A200199775/tempo/broadlink_to_tuya.py", line 162, in process_commands_recursively
processed_commands[key] = process_commands_recursively(value)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^
File "/Users/A200199775/tempo/broadlink_to_tuya.py", line 162, in process_commands_recursively
processed_commands[key] = process_commands_recursively(value)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^
File "/Users/A200199775/tempo/broadlink_to_tuya.py", line 162, in process_commands_recursively
processed_commands[key] = process_commands_recursively(value)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^
File "/Users/A200199775/tempo/broadlink_to_tuya.py", line 160, in process_commands_recursively
processed_commands[key] = encode_ir(value)
~~~~~~~~~^^^^^^^
File "/Users/A200199775/tempo/broadlink_to_tuya.py", line 19, in encode_ir
compress(out := io.BytesIO(), payload, level = 2)
~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/A200199775/tempo/broadlink_to_tuya.py", line 100, in compress
emit_distance_block(out, c[0], c[1])
~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^
File "/Users/A200199775/tempo/broadlink_to_tuya.py", line 44, in emit_distance_block
assert length < 256
^^^^^^^^^^^^
AssertionError
Any clue why this may be happening?
thanks, it was a little bug in my compression code. should be fixed in the latest revision of my gist.
@svyatogor please port these fixes into this gist when you have time :)
I managed to get it working using compression level 1 and 0. But 2 or 3 would result in the aforementioned error
it is still failing even with the fix? in that case please send me a sample input and I'll take a deeper look
Hi! I ported the changes to @svyatogor 's code and it works with all levels of compression! New code would be:
import io
import base64
import json
import sys
from bisect import bisect
from struct import pack, unpack
from math import ceil
BRDLNK_UNIT = 269 / 8192
# MAIN API
filter = lambda x: [i for i in x if i<65535]
def encode_ir(command: str) -> str:
# command = "JgC8AXE5DioPDg0PDQ8OKw0PDg4ODw0PDSsODw0rDisNDw0sDSsOKw0rDisNDwwQDSwMEA0QDBAMEAwQDRAMLAwtDSsOKwwQDBANEAwQDBANEAwQDBAMEA0QDBAMEAwQDRAMEAwQDRAMEAwQDBANEAwQDBAMEA4PDSsODw0PDQ8ODg4PDQ8NAAPNcjgOKg8ODg4ODg8qDg4PDQ8NDw4OKg8ODioPKg4ODykPKg8pDyoPKg4ODw0PKg4ODw0PDg4ODg4PDQ8ODg4PDQ8ODg4ODg8NDw4ODg8NDw0PDg4ODw0PDg4ODg4PDQ8qDw0PDQ8ODioPKg4ODyoODg8NDw0PDg4ODw0PDg4ODg4PDQ8ODg4PDQ8ODg4OKg8ODioPDQ8ODg4PDQ8ODg4ODg8NDw4ODg8NDw0PDg4ODw0PDg4ODg4PDQ8ODg4PDQ8ODg4ODg8NDw4ODg8NDw0PDg4ODw0PDg4ODg4PDQ8ODg4PDQ8NDw4ODg8NDw4ODg4ODw0PDg4ODg4PDQ8ODg4PKg4qDw0PDg4ODg4PDg4ODg4ODg8ODg4NDw4ODg8NDw0PDg8NDw0rDisNKw4rDg4OKw0rDgANBQAAAAAAAAAAAAAAAA=="
signal = filter(get_raw_from_broadlink(base64.b64decode(command).hex()))
payload = b''.join(pack('<H', t) for t in signal)
compress(out := io.BytesIO(), payload, level = 2)
payload = out.getvalue()
return base64.encodebytes(payload).decode('ascii').replace('\n', '')
# COMPRESSION
def emit_literal_blocks(out: io.FileIO, data: bytes):
for i in range(0, len(data), 32):
emit_literal_block(out, data[i:i+32])
def emit_literal_block(out: io.FileIO, data: bytes):
length = len(data) - 1
assert 0 <= length < (1 << 5)
out.write(bytes([length]))
out.write(data)
def emit_distance_block(out: io.FileIO, length: int, distance: int):
distance -= 1
assert 0 <= distance < (1 << 13)
length -= 2
assert length > 0
block = bytearray()
if length >= 7:
assert length - 7 < (1 << 8)
block.append(length - 7)
length = 7
block.insert(0, length << 5 | distance >> 8)
block.append(distance & 0xFF)
out.write(block)
def compress(out: io.FileIO, data: bytes, level=2):
'''
Takes a byte string and outputs a compressed "Tuya stream".
Implemented compression levels:
0 - copy over (no compression, 3.1% overhead)
1 - eagerly use first length-distance pair found (linear)
2 - eagerly use best length-distance pair found
3 - optimal compression (n^3)
'''
if level == 0:
return emit_literal_blocks(out, data)
W = 2**13 # window size
L = 255+9 # maximum length
distance_candidates = lambda: range(1, min(pos, W) + 1)
def find_length_for_distance(start: int) -> int:
length = 0
limit = min(L, len(data) - pos)
while length < limit and data[pos + length] == data[start + length]:
length += 1
return length
find_length_candidates = lambda: \
( (find_length_for_distance(pos - d), d) for d in distance_candidates() )
find_length_cheap = lambda: \
next((c for c in find_length_candidates() if c[0] >= 3), None)
find_length_max = lambda: \
max(find_length_candidates(), key=lambda c: (c[0], -c[1]), default=None)
if level >= 2:
suffixes = []; next_pos = 0
key = lambda n: data[n:]
find_idx = lambda n: bisect(suffixes, key(n), key=key)
def distance_candidates():
nonlocal next_pos
while next_pos <= pos:
if len(suffixes) == W:
suffixes.pop(find_idx(next_pos - W))
suffixes.insert(idx := find_idx(next_pos), next_pos)
next_pos += 1
idxs = (idx+i for i in (+1,-1)) # try +1 first
return (pos - suffixes[i] for i in idxs if 0 <= i < len(suffixes))
if level <= 2:
find_length = { 1: find_length_cheap, 2: find_length_max }[level]
block_start = pos = 0
while pos < len(data):
if (c := find_length()) and c[0] >= 3:
emit_literal_blocks(out, data[block_start:pos])
emit_distance_block(out, c[0], c[1])
pos += c[0]
block_start = pos
else:
pos += 1
emit_literal_blocks(out, data[block_start:pos])
return
# use topological sort to find shortest path
predecessors = [(0, None, None)] + [None] * len(data)
def put_edge(cost, length, distance):
npos = pos + length
cost += predecessors[pos][0]
current = predecessors[npos]
if not current or cost < current[0]:
predecessors[npos] = cost, length, distance
for pos in range(len(data)):
if c := find_length_max():
for l in range(3, c[0] + 1):
put_edge(2 if l < 9 else 3, l, c[1])
for l in range(1, min(32, len(data) - pos) + 1):
put_edge(1 + l, l, 0)
# reconstruct path, emit blocks
blocks = []; pos = len(data)
while pos > 0:
_, length, distance = predecessors[pos]
pos -= length
blocks.append((pos, length, distance))
for pos, length, distance in reversed(blocks):
if not distance:
emit_literal_block(out, data[pos:pos + length])
else:
emit_distance_block(out, length, distance)
def get_raw_from_broadlink(string):
dec = []
unit = BRDLNK_UNIT # 32.84ms units, or 2^-15s
length = int(string[6:8] + string[4:6], 16) # Length of payload in little endian
i = 8
while i < length * 2 + 8: # IR Payload
hex_value = string[i:i+2]
if hex_value == "00":
hex_value = string[i+2:i+4] + string[i+4:i+6] # Quick & dirty big-endian conversion
i += 4
dec.append(ceil(int(hex_value, 16) / unit)) # Will be lower than initial value due to former round()
i += 2
return dec
def process_commands(filename):
with open(filename, 'r') as file:
data = json.load(file)
def process_commands_recursively(commands):
processed_commands = {}
for key, value in commands.items():
if isinstance(value, str):
processed_commands[key] = encode_ir(value)
elif isinstance(value, dict):
processed_commands[key] = process_commands_recursively(value)
else:
processed_commands[key] = value
return processed_commands
data['commands'] = process_commands_recursively(data.get('commands', {}))
data['supportedController'] = 'MQTT'
data['commandsEncoding'] = 'Raw'
return json.dumps(data, indent=2)
print(process_commands(sys.argv[1]))
Hi guys, is there a way to do the opposite? I changed my IR blaster from tuya to broadlink and I need some solution to convert code from old format to the new one.
This is so genius!
Why is this code not integrated into SmartIR?
This is so genius! Why is this code not integrated into SmartIR?
I did see on the SmartIR fork that the author is thinking about making a dynamic converter litinoveweedle/SmartIR#122 (comment)
Hi guys, I got this error when convert 4180.json
Traceback (most recent call last):
File "/home/nazmi/Documents/ferbos/code/irremote/broadlink_to_tuya.py", line 173, in <module>
print(process_commands(sys.argv[1]))
File "/home/nazmi/Documents/ferbos/code/irremote/broadlink_to_tuya.py", line 167, in process_commands
data['commands'] = process_commands_recursively(data.get('commands', {}))
File "/home/nazmi/Documents/ferbos/code/irremote/broadlink_to_tuya.py", line 159, in process_commands_recursively
processed_commands[key] = encode_ir(value)
File "/home/nazmi/Documents/ferbos/code/irremote/broadlink_to_tuya.py", line 16, in encode_ir
signal = filter(get_raw_from_broadlink(base64.b64decode(command).hex()))
File "/home/nazmi/Documents/ferbos/code/irremote/broadlink_to_tuya.py", line 145, in get_raw_from_broadlink
dec.append(ceil(int(hex_value, 16) / unit)) # Will be lower than initial value due to former round()
ValueError: invalid literal for int() with base 16: ''
Do you have any idea why did this happened?
Hi guys, I got this error when convert 4180.json
Traceback (most recent call last): File "/home/nazmi/Documents/ferbos/code/irremote/broadlink_to_tuya.py", line 173, in <module> print(process_commands(sys.argv[1])) File "/home/nazmi/Documents/ferbos/code/irremote/broadlink_to_tuya.py", line 167, in process_commands data['commands'] = process_commands_recursively(data.get('commands', {})) File "/home/nazmi/Documents/ferbos/code/irremote/broadlink_to_tuya.py", line 159, in process_commands_recursively processed_commands[key] = encode_ir(value) File "/home/nazmi/Documents/ferbos/code/irremote/broadlink_to_tuya.py", line 16, in encode_ir signal = filter(get_raw_from_broadlink(base64.b64decode(command).hex())) File "/home/nazmi/Documents/ferbos/code/irremote/broadlink_to_tuya.py", line 145, in get_raw_from_broadlink dec.append(ceil(int(hex_value, 16) / unit)) # Will be lower than initial value due to former round() ValueError: invalid literal for int() with base 16: ''
Do you have any idea why did this happened?
Apologize for the mistake, 4180.json is using Xiaomi controller, not broadlink.
Hi guys, thank you for your code!
I came form SmartIR repo to here and i'm working with Z06/UFO-R11
and convert to this IR remote from broadlink. To work properly, i have to change one line at the and of the code:
data['supportedController'] = 'MQTT'
-> data['supportedController'] = 'UFOR11'
Just leave it here for the history.
Try upgrading to Python 3.10 or newer. It seems the .py file isn't compatible with some older versions.
I'm running Python 3.11.6 and it works fine for me.