|
HEADER_LENGTH = 2 * 4 |
|
ETX_MARKER = b'\x11\x00\x00' |
|
MINIMAL_ENCODED_LENGTH = HEADER_LENGTH + len(ETX_MARKER) |
|
|
|
|
|
def read_int(data: bytes): |
|
return int.from_bytes(data[:4], 'little') |
|
|
|
def read_short(data: bytes): |
|
return int.from_bytes(data[:2], 'little') |
|
|
|
def read_byte(data: bytes): |
|
return data[:1][0] |
|
|
|
def read_most_significant_nibble(data: bytes): |
|
return read_byte(data) >> 4 |
|
|
|
def most_significant_nibble_matches(data: bytes, ones_mask: int, zeros_mask: int) -> bool: |
|
assert 0b0000 <= ones_mask <= 0b1111 |
|
assert 0b0000 <= zeros_mask <= 0b1111 |
|
most_significant_nibble = read_byte(data) >> 4 |
|
ones_mask_matches = (most_significant_nibble & ones_mask) == ones_mask |
|
zeros_mask_matches = ((0b1111 - most_significant_nibble) & zeros_mask) == zeros_mask |
|
return ones_mask_matches and zeros_mask_matches |
|
|
|
|
|
def decode(data: bytes) -> bytes: |
|
assert len(data) >= MINIMAL_ENCODED_LENGTH, f'Encoded data too short ({len(data)}, expected {MINIMAL_ENCODED_LENGTH} bytes)' |
|
header = data[:HEADER_LENGTH] |
|
data = data[HEADER_LENGTH:] |
|
decoded_length = read_int(header) |
|
encoded_length = read_int(header[4:]) |
|
assert len(data) == encoded_length, f'Length of encoded data {len(data)} does not match length from the header {encoded_length}' |
|
|
|
decoded_data = bytearray(decoded_length) |
|
enc_ptr = 0 |
|
dec_ptr = 0 |
|
literal_encountered = False |
|
while True: |
|
assert enc_ptr < len(data), f'Out of bounds access to encoded data at index {enc_ptr} (data length: {len(data)})' |
|
if data[enc_ptr:enc_ptr + 3] == ETX_MARKER: |
|
# print(f'Encountered ETX marker: {ETX_MARKER} at position {enc_ptr}') |
|
enc_ptr += 3 |
|
break |
|
assert dec_ptr < len(decoded_data), f'Out of bounds access to decoded data at index {dec_ptr} (data length: {len(decoded_data)})' |
|
if read_most_significant_nibble(data[enc_ptr:enc_ptr + 1]) == 0b0000: |
|
# print(f'Encountered {0b0000:04b} literal symbol at position {enc_ptr}') |
|
zero_bytes_count = 0 |
|
while read_byte(data[enc_ptr:enc_ptr + 1]) == 0: |
|
zero_bytes_count += 1 |
|
enc_ptr += 1 |
|
literal_length = read_byte(data[enc_ptr:enc_ptr + 1]) |
|
enc_ptr += 1 |
|
if zero_bytes_count == 0: |
|
literal_length += 3 |
|
else: |
|
literal_length += 18 + (zero_bytes_count - 1) * 255 |
|
# print(f' length: {literal_length}') |
|
decoded_data[dec_ptr:dec_ptr + literal_length] = data[enc_ptr:enc_ptr + literal_length] |
|
enc_ptr += literal_length |
|
dec_ptr += literal_length |
|
literal_encountered = True |
|
elif read_most_significant_nibble(data[enc_ptr:enc_ptr + 1]) > 0b0000 and literal_encountered == False: |
|
# print(f'Encountered the initial literal symbol at position {enc_ptr}') |
|
literal_length = read_byte(data[enc_ptr:enc_ptr + 1]) |
|
enc_ptr += 1 |
|
assert literal_length >= 18, f'Unexpected length of initial literal: {literal_length} (should be >= 18)' |
|
literal_length -= 17 |
|
# print(f' length: {literal_length}') |
|
decoded_data[dec_ptr:dec_ptr + literal_length] = data[enc_ptr:enc_ptr + literal_length] |
|
enc_ptr += literal_length |
|
dec_ptr += literal_length |
|
literal_encountered = True |
|
elif read_most_significant_nibble(data[enc_ptr:enc_ptr + 1]) >= 0b0100: |
|
# print(f'Encountered {0b0100:04b} reference+literal symbol at position {enc_ptr}') |
|
assert literal_encountered, f'No literal before a reference symbol {0b0100:04b}' |
|
buffer = read_byte(data[enc_ptr:enc_ptr + 1]) |
|
enc_ptr += 1 |
|
length = (buffer >> 5) & 0b111 |
|
length += 1 |
|
distance = (buffer >> 2) & 0b111 |
|
following_literal_length = buffer & 0b0011 |
|
distance |= read_byte(data[enc_ptr:enc_ptr + 1]) << 3 |
|
distance += 1 |
|
enc_ptr += 1 |
|
# print(f' distance: {distance}, length: {length}, following literal length: {following_literal_length}') |
|
for _ in range(length): |
|
assert dec_ptr - distance > 0, f'Out of bounds access to decoded data at index {dec_ptr - distance} (data length: {len(decoded_data)})' |
|
decoded_data[dec_ptr] = decoded_data[dec_ptr - distance] |
|
dec_ptr += 1 |
|
decoded_data[dec_ptr:dec_ptr + following_literal_length] = data[enc_ptr:enc_ptr + following_literal_length] |
|
enc_ptr += following_literal_length |
|
dec_ptr += following_literal_length |
|
elif read_most_significant_nibble(data[enc_ptr:enc_ptr + 1]) >= 0b0010: |
|
# print(f'Encountered {0b0010:04b} reference+literal symbol at position {enc_ptr}') |
|
assert literal_encountered, f'No literal before a reference symbol {0b0010:04b}' |
|
length = read_byte(data[enc_ptr:enc_ptr + 1]) & 0b00011111 |
|
enc_ptr += 1 |
|
# print(f' base length: {length}') |
|
if length == 0: |
|
zero_bytes_count = 0 |
|
length += 31 |
|
while read_byte(data[enc_ptr:enc_ptr + 1]) == 0: |
|
zero_bytes_count += 1 |
|
enc_ptr += 1 |
|
length += read_byte(data[enc_ptr:enc_ptr + 1]) |
|
enc_ptr += 1 |
|
length += zero_bytes_count * 255 |
|
# print(f' no. of zero bytes: {zero_bytes_count}') |
|
length += 2 |
|
buffer = read_byte(data[enc_ptr:enc_ptr + 1]) |
|
enc_ptr += 1 |
|
distance = buffer >> 2 |
|
following_literal_length = buffer & 0b0011 |
|
distance |= read_byte(data[enc_ptr:enc_ptr + 1]) << 6 |
|
distance += 1 |
|
enc_ptr += 1 |
|
# print(f' distance: {distance}, length: {length}, following literal length: {following_literal_length}') |
|
for _ in range(length): |
|
assert dec_ptr - distance > 0, f'Out of bounds access to decoded data at index {dec_ptr - distance} (data length: {len(decoded_data)})' |
|
decoded_data[dec_ptr] = decoded_data[dec_ptr - distance] |
|
dec_ptr += 1 |
|
decoded_data[dec_ptr:dec_ptr + following_literal_length] = data[enc_ptr:enc_ptr + following_literal_length] |
|
enc_ptr += following_literal_length |
|
dec_ptr += following_literal_length |
|
elif read_most_significant_nibble(data[enc_ptr:enc_ptr + 1]) == 0b0001: |
|
# print(f'Encountered {0b0001:04b} reference+literal symbol at position {enc_ptr}') |
|
assert literal_encountered, f'No literal before a reference symbol {0b0001:04b}' |
|
buffer = read_byte(data[enc_ptr:enc_ptr + 1]) |
|
enc_ptr += 1 |
|
distance = ((buffer & 0b1000) >> 3) << 14 |
|
length = buffer & 0b111 |
|
# print(f' base length: {length}') |
|
if length == 0: |
|
zero_bytes_count = 0 |
|
length += 7 |
|
while read_byte(data[enc_ptr:enc_ptr + 1]) == 0: |
|
zero_bytes_count += 1 |
|
enc_ptr += 1 |
|
length += read_byte(data[enc_ptr:enc_ptr + 1]) |
|
enc_ptr += 1 |
|
length += zero_bytes_count * 255 |
|
# print(f' no. of zero bytes: {zero_bytes_count}') |
|
length += 2 |
|
buffer = read_byte(data[enc_ptr:enc_ptr + 1]) |
|
enc_ptr += 1 |
|
distance |= buffer >> 2 |
|
following_literal_length = buffer & 0b0011 |
|
distance |= read_byte(data[enc_ptr:enc_ptr + 1]) << 6 |
|
distance += 16384 |
|
enc_ptr += 1 |
|
# print(f' distance: {distance}, length: {length}, following literal length: {following_literal_length}') |
|
for _ in range(length): |
|
assert dec_ptr - distance > 0, f'Out of bounds access to decoded data at index {dec_ptr - distance} (data length: {len(decoded_data)})' |
|
decoded_data[dec_ptr] = decoded_data[dec_ptr - distance] |
|
dec_ptr += 1 |
|
decoded_data[dec_ptr:dec_ptr + following_literal_length] = data[enc_ptr:enc_ptr + following_literal_length] |
|
enc_ptr += following_literal_length |
|
dec_ptr += following_literal_length |
|
|
|
assert dec_ptr == decoded_length, f'Length of decoded data {len(decoded_data)} does not match length from the header {decoded_length}' |
|
return decoded_data |