Skip to content

Instantly share code, notes, and snippets.

@svyatogor
Last active January 17, 2025 19:02
Show Gist options
  • Save svyatogor/7839d00303998a9fa37eb48494dd680f to your computer and use it in GitHub Desktop.
Save svyatogor/7839d00303998a9fa37eb48494dd680f to your computer and use it in GitHub Desktop.
Convert SmartIR Broadlink commands to Tuya
@BenJamesAndo
Copy link

Try upgrading to Python 3.10 or newer. It seems the .py file isn't compatible with some older versions.
I'm running Python 3.11.6 and it works fine for me.

@rafareusch
Copy link

Try upgrading to Python 3.10 or newer. It seems the .py file isn't compatible with some older versions. I'm running Python 3.11.6 and it works fine for me.

You are correct!
Updated to python 3.12 and the script worked.

@rafareusch the command should simply be python broadlink_to_tuya.py 1400.json remove > 1400tuya.json

But this command is needed to direct the output to another file, and it works fine!

Thanks!! Awesome work you guys did!

@manherna
Copy link

Hi! I am trying to execute your script with python 3.13 and SmartIR file "1137.json". I am getting this error:

  File "/Users/A200199775/tempo/broadlink_to_tuya.py", line 174, in <module>
    print(process_commands(sys.argv[1]))
          ~~~~~~~~~~~~~~~~^^^^^^^^^^^^^
  File "/Users/A200199775/tempo/broadlink_to_tuya.py", line 168, in process_commands
    data['commands'] = process_commands_recursively(data.get('commands', {}))
                       ~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/A200199775/tempo/broadlink_to_tuya.py", line 162, in process_commands_recursively
    processed_commands[key] = process_commands_recursively(value)
                              ~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^
  File "/Users/A200199775/tempo/broadlink_to_tuya.py", line 162, in process_commands_recursively
    processed_commands[key] = process_commands_recursively(value)
                              ~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^
  File "/Users/A200199775/tempo/broadlink_to_tuya.py", line 162, in process_commands_recursively
    processed_commands[key] = process_commands_recursively(value)
                              ~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^
  File "/Users/A200199775/tempo/broadlink_to_tuya.py", line 160, in process_commands_recursively
    processed_commands[key] = encode_ir(value)
                              ~~~~~~~~~^^^^^^^
  File "/Users/A200199775/tempo/broadlink_to_tuya.py", line 19, in encode_ir
    compress(out := io.BytesIO(), payload, level = 2)
    ~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/A200199775/tempo/broadlink_to_tuya.py", line 100, in compress
    emit_distance_block(out, c[0], c[1])
    ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^
  File "/Users/A200199775/tempo/broadlink_to_tuya.py", line 44, in emit_distance_block
    assert length < 256
           ^^^^^^^^^^^^
AssertionError 

Any clue why this may be happening?

@mildsunrise
Copy link

thanks, it was a little bug in my compression code. should be fixed in the latest revision of my gist.
@svyatogor please port these fixes into this gist when you have time :)

@manherna
Copy link

I managed to get it working using compression level 1 and 0. But 2 or 3 would result in the aforementioned error

@mildsunrise
Copy link

it is still failing even with the fix? in that case please send me a sample input and I'll take a deeper look

@manherna
Copy link

Hi! I ported the changes to @svyatogor 's code and it works with all levels of compression! New code would be:

import io
import base64
import json
import sys
from bisect import bisect
from struct import pack, unpack
from math import ceil

BRDLNK_UNIT = 269 / 8192

# MAIN API
filter = lambda x: [i for i in x if i<65535]

def encode_ir(command: str) -> str:
	# command = "JgC8AXE5DioPDg0PDQ8OKw0PDg4ODw0PDSsODw0rDisNDw0sDSsOKw0rDisNDwwQDSwMEA0QDBAMEAwQDRAMLAwtDSsOKwwQDBANEAwQDBANEAwQDBAMEA0QDBAMEAwQDRAMEAwQDRAMEAwQDBANEAwQDBAMEA4PDSsODw0PDQ8ODg4PDQ8NAAPNcjgOKg8ODg4ODg8qDg4PDQ8NDw4OKg8ODioPKg4ODykPKg8pDyoPKg4ODw0PKg4ODw0PDg4ODg4PDQ8ODg4PDQ8ODg4ODg8NDw4ODg8NDw0PDg4ODw0PDg4ODg4PDQ8qDw0PDQ8ODioPKg4ODyoODg8NDw0PDg4ODw0PDg4ODg4PDQ8ODg4PDQ8ODg4OKg8ODioPDQ8ODg4PDQ8ODg4ODg8NDw4ODg8NDw0PDg4ODw0PDg4ODg4PDQ8ODg4PDQ8ODg4ODg8NDw4ODg8NDw0PDg4ODw0PDg4ODg4PDQ8ODg4PDQ8NDw4ODg8NDw4ODg4ODw0PDg4ODg4PDQ8ODg4PKg4qDw0PDg4ODg4PDg4ODg4ODg8ODg4NDw4ODg8NDw0PDg8NDw0rDisNKw4rDg4OKw0rDgANBQAAAAAAAAAAAAAAAA=="
  signal = filter(get_raw_from_broadlink(base64.b64decode(command).hex()))

  payload = b''.join(pack('<H', t) for t in signal)
  compress(out := io.BytesIO(), payload, level = 2)
  payload = out.getvalue()
  return base64.encodebytes(payload).decode('ascii').replace('\n', '')


# COMPRESSION

def emit_literal_blocks(out: io.FileIO, data: bytes):
	for i in range(0, len(data), 32):
		emit_literal_block(out, data[i:i+32])

def emit_literal_block(out: io.FileIO, data: bytes):
	length = len(data) - 1
	assert 0 <= length < (1 << 5)
	out.write(bytes([length]))
	out.write(data)

def emit_distance_block(out: io.FileIO, length: int, distance: int):
	distance -= 1
	assert 0 <= distance < (1 << 13)
	length -= 2
	assert length > 0
	block = bytearray()
	if length >= 7:
		assert length - 7 < (1 << 8)
		block.append(length - 7)
		length = 7
	block.insert(0, length << 5 | distance >> 8)
	block.append(distance & 0xFF)
	out.write(block)

def compress(out: io.FileIO, data: bytes, level=2):
	'''
	Takes a byte string and outputs a compressed "Tuya stream".
	Implemented compression levels:
	0 - copy over (no compression, 3.1% overhead)
	1 - eagerly use first length-distance pair found (linear)
	2 - eagerly use best length-distance pair found
	3 - optimal compression (n^3)
	'''
	if level == 0:
		return emit_literal_blocks(out, data)

	W = 2**13 # window size
	L = 255+9 # maximum length
	distance_candidates = lambda: range(1, min(pos, W) + 1)

	def find_length_for_distance(start: int) -> int:
		length = 0
		limit = min(L, len(data) - pos)
		while length < limit and data[pos + length] == data[start + length]:
			length += 1
		return length
	find_length_candidates = lambda: \
		( (find_length_for_distance(pos - d), d) for d in distance_candidates() )
	find_length_cheap = lambda: \
		next((c for c in find_length_candidates() if c[0] >= 3), None)
	find_length_max = lambda: \
		max(find_length_candidates(), key=lambda c: (c[0], -c[1]), default=None)

	if level >= 2:
		suffixes = []; next_pos = 0
		key = lambda n: data[n:]
		find_idx = lambda n: bisect(suffixes, key(n), key=key)
		def distance_candidates():
			nonlocal next_pos
			while next_pos <= pos:
				if len(suffixes) == W:
					suffixes.pop(find_idx(next_pos - W))
				suffixes.insert(idx := find_idx(next_pos), next_pos)
				next_pos += 1
			idxs = (idx+i for i in (+1,-1)) # try +1 first
			return (pos - suffixes[i] for i in idxs if 0 <= i < len(suffixes))

	if level <= 2:
		find_length = { 1: find_length_cheap, 2: find_length_max }[level]
		block_start = pos = 0
		while pos < len(data):
			if (c := find_length()) and c[0] >= 3:
				emit_literal_blocks(out, data[block_start:pos])
				emit_distance_block(out, c[0], c[1])
				pos += c[0]
				block_start = pos
			else:
				pos += 1
		emit_literal_blocks(out, data[block_start:pos])
		return

	# use topological sort to find shortest path
	predecessors = [(0, None, None)] + [None] * len(data)
	def put_edge(cost, length, distance):
		npos = pos + length
		cost += predecessors[pos][0]
		current = predecessors[npos]
		if not current or cost < current[0]:
			predecessors[npos] = cost, length, distance
	for pos in range(len(data)):
		if c := find_length_max():
			for l in range(3, c[0] + 1):
				put_edge(2 if l < 9 else 3, l, c[1])
		for l in range(1, min(32, len(data) - pos) + 1):
			put_edge(1 + l, l, 0)

	# reconstruct path, emit blocks
	blocks = []; pos = len(data)
	while pos > 0:
		_, length, distance = predecessors[pos]
		pos -= length
		blocks.append((pos, length, distance))
	for pos, length, distance in reversed(blocks):
		if not distance:
			emit_literal_block(out, data[pos:pos + length])
		else:
			emit_distance_block(out, length, distance)

def get_raw_from_broadlink(string):
  dec = []
  unit = BRDLNK_UNIT  # 32.84ms units, or 2^-15s
  length = int(string[6:8] + string[4:6], 16)  # Length of payload in little endian

  i = 8
  while i < length * 2 + 8:  # IR Payload
    hex_value = string[i:i+2]
    if hex_value == "00":
      hex_value = string[i+2:i+4] + string[i+4:i+6]  # Quick & dirty big-endian conversion
      i += 4
    dec.append(ceil(int(hex_value, 16) / unit))  # Will be lower than initial value due to former round()
    i += 2

  return dec


def process_commands(filename):
  with open(filename, 'r') as file:
    data = json.load(file)

  def process_commands_recursively(commands):
    processed_commands = {}
    for key, value in commands.items():
      if isinstance(value, str):
        processed_commands[key] = encode_ir(value)
      elif isinstance(value, dict):
        processed_commands[key] = process_commands_recursively(value)
      else:
        processed_commands[key] = value

    return processed_commands

  data['commands'] = process_commands_recursively(data.get('commands', {}))
  data['supportedController'] = 'MQTT'
  data['commandsEncoding'] = 'Raw'
  return json.dumps(data, indent=2)


print(process_commands(sys.argv[1]))

@gianlucasullazzo
Copy link

gianlucasullazzo commented Aug 21, 2024

Hi guys, is there a way to do the opposite? I changed my IR blaster from tuya to broadlink and I need some solution to convert code from old format to the new one.

@MusiCode1
Copy link

This is so genius!
Why is this code not integrated into SmartIR?

@BenJamesAndo
Copy link

This is so genius! Why is this code not integrated into SmartIR?

I did see on the SmartIR fork that the author is thinking about making a dynamic converter litinoveweedle/SmartIR#122 (comment)

@nazmibojan
Copy link

Hi guys, I got this error when convert 4180.json

Traceback (most recent call last):
  File "/home/nazmi/Documents/ferbos/code/irremote/broadlink_to_tuya.py", line 173, in <module>
    print(process_commands(sys.argv[1]))
  File "/home/nazmi/Documents/ferbos/code/irremote/broadlink_to_tuya.py", line 167, in process_commands
    data['commands'] = process_commands_recursively(data.get('commands', {}))
  File "/home/nazmi/Documents/ferbos/code/irremote/broadlink_to_tuya.py", line 159, in process_commands_recursively
    processed_commands[key] = encode_ir(value)
  File "/home/nazmi/Documents/ferbos/code/irremote/broadlink_to_tuya.py", line 16, in encode_ir
    signal = filter(get_raw_from_broadlink(base64.b64decode(command).hex()))
  File "/home/nazmi/Documents/ferbos/code/irremote/broadlink_to_tuya.py", line 145, in get_raw_from_broadlink
    dec.append(ceil(int(hex_value, 16) / unit))  # Will be lower than initial value due to former round()
ValueError: invalid literal for int() with base 16: ''

Do you have any idea why did this happened?

@nazmibojan
Copy link

Hi guys, I got this error when convert 4180.json

Traceback (most recent call last):
  File "/home/nazmi/Documents/ferbos/code/irremote/broadlink_to_tuya.py", line 173, in <module>
    print(process_commands(sys.argv[1]))
  File "/home/nazmi/Documents/ferbos/code/irremote/broadlink_to_tuya.py", line 167, in process_commands
    data['commands'] = process_commands_recursively(data.get('commands', {}))
  File "/home/nazmi/Documents/ferbos/code/irremote/broadlink_to_tuya.py", line 159, in process_commands_recursively
    processed_commands[key] = encode_ir(value)
  File "/home/nazmi/Documents/ferbos/code/irremote/broadlink_to_tuya.py", line 16, in encode_ir
    signal = filter(get_raw_from_broadlink(base64.b64decode(command).hex()))
  File "/home/nazmi/Documents/ferbos/code/irremote/broadlink_to_tuya.py", line 145, in get_raw_from_broadlink
    dec.append(ceil(int(hex_value, 16) / unit))  # Will be lower than initial value due to former round()
ValueError: invalid literal for int() with base 16: ''

Do you have any idea why did this happened?

Apologize for the mistake, 4180.json is using Xiaomi controller, not broadlink.

@nakata5321
Copy link

Hi guys, thank you for your code!
I came form SmartIR repo to here and i'm working with Z06/UFO-R11 and convert to this IR remote from broadlink. To work properly, i have to change one line at the and of the code:
data['supportedController'] = 'MQTT' -> data['supportedController'] = 'UFOR11'
Just leave it here for the history.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment