Created
July 28, 2021 18:03
-
-
Save awesie/be58bb56593501b95f9a38ad3c507821 to your computer and use it in GitHub Desktop.
NXZ with compression (not entire sure it works)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# | |
# Example decompression script for the NXZ file format | |
# | |
# Author: | |
# Andrew Wesie <awesie@gmail.com> | |
# | |
# Algorithm details: | |
# Huffman encoding + LZ77 (64K window) | |
# 274 symbols | |
# 0x00 - 0xFF (literal bytes) | |
# 0x100 - 0x107 (LZ77 with size 0 - 8) | |
# 0x108 - 0x10F (LZ77 with encoded size) | |
# 0x110 (rebuild Huffman codes) | |
# 0x111 (invalid) | |
# | |
# Implementation details: | |
# This implementation is written to be correct and easy to understand, not | |
# fast. | |
# | |
# The bit reading implementation is slow. It would be better to use a library | |
# like `bitarray' if you are using Python. | |
# | |
import math | |
import struct | |
MIN_MATCH = 4 | |
MAX_MATCH = 520 | |
INITIAL_ALPHABET = [ | |
0x100, 0x101, 0x102, 0x103, 0x104, 0x105, 0x106, 0x107, 0x108, | |
0x109, 0x10A, 0x10B, 0x10C, 0x10D, 0x10E, 0x10F, 0x0, 0x20, 0x30, | |
0x0FF, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0x0A, 0x0B, 0x0C, 0x0D, | |
0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, | |
0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F, 0x21, 0x22, 0x23, 0x24, | |
0x25, 0x26, 0x27, 0x28, 0x29, 0x2A, 0x2B, 0x2C, 0x2D, 0x2E, 0x2F, | |
0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x3A, 0x3B, | |
0x3C, 0x3D, 0x3E, 0x3F, 0x40, 0x41, 0x42, 0x43, 0x44, 0x45, 0x46, | |
0x47, 0x48, 0x49, 0x4A, 0x4B, 0x4C, 0x4D, 0x4E, 0x4F, 0x50, 0x51, | |
0x52, 0x53, 0x54, 0x55, 0x56, 0x57, 0x58, 0x59, 0x5A, 0x5B, 0x5C, | |
0x5D, 0x5E, 0x5F, 0x60, 0x61, 0x62, 0x63, 0x64, 0x65, 0x66, 0x67, | |
0x68, 0x69, 0x6A, 0x6B, 0x6C, 0x6D, 0x6E, 0x6F, 0x70, 0x71, 0x72, | |
0x73, 0x74, 0x75, 0x76, 0x77, 0x78, 0x79, 0x7A, 0x7B, 0x7C, 0x7D, | |
0x7E, 0x7F, 0x80, 0x81, 0x82, 0x83, 0x84, 0x85, 0x86, 0x87, 0x88, | |
0x89, 0x8A, 0x8B, 0x8C, 0x8D, 0x8E, 0x8F, 0x90, 0x91, 0x92, 0x93, | |
0x94, 0x95, 0x96, 0x97, 0x98, 0x99, 0x9A, 0x9B, 0x9C, 0x9D, 0x9E, | |
0x9F, 0x0A0, 0x0A1, 0x0A2, 0x0A3, 0x0A4, 0x0A5, 0x0A6, 0x0A7, | |
0x0A8, 0x0A9, 0x0AA, 0x0AB, 0x0AC, 0x0AD, 0x0AE, 0x0AF, 0x0B0, | |
0x0B1, 0x0B2, 0x0B3, 0x0B4, 0x0B5, 0x0B6, 0x0B7, 0x0B8, 0x0B9, | |
0x0BA, 0x0BB, 0x0BC, 0x0BD, 0x0BE, 0x0BF, 0x0C0, 0x0C1, 0x0C2, | |
0x0C3, 0x0C4, 0x0C5, 0x0C6, 0x0C7, 0x0C8, 0x0C9, 0x0CA, 0x0CB, | |
0x0CC, 0x0CD, 0x0CE, 0x0CF, 0x0D0, 0x0D1, 0x0D2, 0x0D3, 0x0D4, | |
0x0D5, 0x0D6, 0x0D7, 0x0D8, 0x0D9, 0x0DA, 0x0DB, 0x0DC, 0x0DD, | |
0x0DE, 0x0DF, 0x0E0, 0x0E1, 0x0E2, 0x0E3, 0x0E4, 0x0E5, 0x0E6, | |
0x0E7, 0x0E8, 0x0E9, 0x0EA, 0x0EB, 0x0EC, 0x0ED, 0x0EE, 0x0EF, | |
0x0F0, 0x0F1, 0x0F2, 0x0F3, 0x0F4, 0x0F5, 0x0F6, 0x0F7, 0x0F8, | |
0x0F9, 0x0FA, 0x0FB, 0x0FC, 0x0FD, 0x0FE, 0x110, 0x111 | |
] | |
INITIAL_HUFFMAN_TABLE = [ | |
(2, 0x00), (3, 0x04), (3, 0x0C), (4, 0x14), | |
(4, 0x24), (4, 0x34), (4, 0x44), (4, 0x54), | |
(4, 0x64), (4, 0x74), (4, 0x84), (4, 0x94), | |
(4, 0xA4), (5, 0xB4), (5, 0xD4), (5, 0xF4) | |
] | |
LENGTH_HUFFMAN_TABLE = [ | |
(1, 0x08), (2, 0x0A), (3, 0x0E), (4, 0x16), | |
(5, 0x26), (6, 0x46), (7, 0x86), (8, 0x106) | |
] | |
DISTANCE_HUFFMAN_TABLE = [ | |
(0, 0x00), (0, 0x01), (1, 0x02), (2, 0x04), | |
(3, 0x08), (4, 0x10), (5, 0x20), (6, 0x40) | |
] | |
NUM_SYMBOLS = 0x112 | |
WINDOW_SIZE = 0x10000 | |
def huff_index(tbl, value): | |
for x in xrange(len(tbl)): | |
bits, offset = tbl[x] | |
if value >= offset and value < offset + (1 << bits): | |
return x | |
raise Exception('Could not find %d in table' % value) | |
class NxzDecompress(object): | |
def __init__(self): | |
self.alphabet = INITIAL_ALPHABET | |
self.huffman = INITIAL_HUFFMAN_TABLE | |
self.symbol_counts = [0] * NUM_SYMBOLS | |
self.bits = 0 | |
self.bits_size = 0 | |
self.history = [0] * WINDOW_SIZE | |
self.history_idx = 0 | |
def read_bit(self): | |
if self.bits_size == 0: | |
self.bits = ord(self.src[self.src_idx]) | |
self.src_idx += 1 | |
self.bits_size = 8 | |
out = (self.bits & 0x80) >> 7 | |
self.bits <<= 1 | |
self.bits_size -= 1 | |
return out | |
def read_bits(self, count): | |
# reads an N-bit number from self.bits / self.src | |
out = 0 | |
for x in xrange(count): | |
out = (out << 1) | self.read_bit() | |
return out | |
def rebuild_alphabet(self): | |
# construct array of symbols sorted by self.symbol_counts | |
symbols = sorted(xrange(NUM_SYMBOLS), key=lambda x: (self.symbol_counts[x] << 16) + x, reverse=True) | |
self.alphabet = [] | |
for x in symbols: | |
self.alphabet.append(x) | |
def decompress(self, src, dst_len): | |
self.src = src | |
self.src_idx = 0 | |
dst = '' | |
cnt = 0 | |
while len(dst) < dst_len: | |
code = self.read_bits(4) | |
(bits, offset) = self.huffman[code] | |
idx = self.read_bits(bits) + offset | |
symbol = self.alphabet[idx] | |
self.symbol_counts[symbol] += 1 | |
cnt += 1 | |
if symbol < 0x100: | |
# literal byte | |
# output byte | |
dst += chr(symbol) | |
# add to history | |
self.history[self.history_idx % WINDOW_SIZE] = symbol | |
self.history_idx += 1 | |
elif symbol == 0x110: | |
# rebuild huffman codes | |
self.rebuild_alphabet() | |
# divide all counts by two | |
self.symbol_counts = map(lambda x: x / 2, self.symbol_counts) | |
# parse new huffman table | |
self.huffman = [] | |
bits = 0 | |
offset = 0 | |
for x in xrange(16): | |
i = 0 | |
while self.read_bit() == 0: | |
i += 1 | |
bits += i | |
self.huffman.append((bits, offset)) | |
offset += 1 << bits | |
elif symbol >= 0x111: | |
# error | |
raise Exception('Invalid symbol') | |
else: | |
# LZ77 | |
length = 4 | |
if symbol < 0x108: | |
length += symbol - 0x100 | |
else: | |
(bits, offset) = LENGTH_HUFFMAN_TABLE[symbol - 0x108] | |
length += offset + self.read_bits(bits) | |
code = self.read_bits(3) | |
(bits, offset) = DISTANCE_HUFFMAN_TABLE[code] | |
distance = (offset << 9) + self.read_bits(bits + 9) | |
if len(dst) + length > dst_len: | |
raise Exception('Invalid LZ77 length') | |
idx = self.history_idx - distance | |
for x in xrange(length): | |
symbol = self.history[(idx + x) % WINDOW_SIZE] | |
# output byte | |
dst += chr(symbol) | |
# add to history | |
self.history[self.history_idx % WINDOW_SIZE] = symbol | |
self.history_idx += 1 | |
return dst | |
class NxzCompress(object): | |
def __init__(self): | |
self.alphabet = INITIAL_ALPHABET | |
self.huffman = INITIAL_HUFFMAN_TABLE | |
self.symbol_counts = [0] * NUM_SYMBOLS | |
self.bits = 0 | |
self.bits_size = 0 | |
self.history = [0] * WINDOW_SIZE | |
self.history_idx = 0 | |
self.history_map = {} | |
def write_bit(self, bit): | |
self.bits <<= 1 | |
self.bits |= (1 if bit else 0) | |
self.bits_size += 1 | |
if self.bits_size == 8: | |
self.file.write(chr(self.bits)) | |
self.bits = 0 | |
self.bits_size = 0 | |
def write_bits(self, value, bits): | |
for x in xrange(0, bits): | |
self.write_bit((value >> (bits - x - 1)) & 1) | |
def flush(self): | |
while self.bits_size != 0: | |
self.write_bit(0) | |
def rebuild_alphabet(self): | |
# construct array of symbols sorted by self.symbol_counts | |
symbols = sorted(xrange(NUM_SYMBOLS), key=lambda x: (self.symbol_counts[x] << 16) + x, reverse=True) | |
self.alphabet = [] | |
for x in symbols: | |
self.alphabet.append(x) | |
def rebuild_table(self): | |
# split the symbols into 16 groups of roughly the same probability | |
# with a power of two number of symbols in each group | |
# every symbol must have a non-zero probability | |
symbols = [max(self.symbol_counts[self.alphabet[x]], 1) for x in xrange(0, NUM_SYMBOLS)] | |
total_prob = sum(self.symbol_counts) | |
tbl = [] | |
x = 0 | |
for i in xrange(15): | |
p = 0 | |
n = 0 | |
while p < total_prob / (16 - i) and x + n < NUM_SYMBOLS: | |
p += symbols[x + n] | |
n += 1 | |
if n > 1: | |
n -= 1 | |
bits = math.log(n, 2) | |
bits = int(math.ceil(bits)) | |
n = 1 << bits | |
tbl.append(bits) | |
total_prob -= sum(symbols[x : x + n]) | |
x += n | |
n = NUM_SYMBOLS - x | |
bits = math.log(n, 2) | |
bits = int(math.ceil(bits)) | |
n = 1 << bits | |
tbl.append(bits) | |
x += n | |
extra = x - NUM_SYMBOLS | |
bits = int(math.log(extra, 2)) | |
for i in xrange(16): | |
if tbl[i] == bits + 1: | |
tbl[i] = bits | |
break | |
# ensure that bits are monotonic | |
tbl = sorted(tbl) | |
self.huffman = [] | |
offset = 0 | |
for bits in tbl: | |
self.huffman.append((bits, offset)) | |
offset += 1 << bits | |
# def find_match(self, s): | |
# distance = 0 | |
# length = 0 | |
# | |
# if self.history_len == 0: return (0, 0) | |
# | |
# # find longest match <= MAX_MATCH starting from the end | |
# i = (self.history_idx + WINDOW_SIZE - 1) % WINDOW_SIZE | |
# while length < 128 and i != (self.history_idx - self.history_len + WINDOW_SIZE) % WINDOW_SIZE: # stop if we have a "good enough" match | |
# n = 0 | |
# while (i + n) % WINDOW_SIZE != self.history_idx: | |
# if self.history[(i + n) % WINDOW_SIZE] != s[n]: | |
# break | |
# n += 1 | |
# if n == len(s): | |
# # if we hit the end of our input | |
# distance = (self.history_idx + WINDOW_SIZE - i) % WINDOW_SIZE | |
# return (distance, n) | |
# if n > length: | |
# distance = (self.history_idx + WINDOW_SIZE - i) % WINDOW_SIZE | |
# length = n | |
# i = (i + WINDOW_SIZE - 1) % WINDOW_SIZE | |
# | |
# # return (distance, length) | |
# return (distance, length) | |
def find_match(self, s): | |
if len(s) < 4 or self.history_idx < 4: | |
return (0, 0) | |
distance = 0 | |
length = 0 | |
for idx in self.history_map.get(s[0:4], []): | |
if idx <= self.history_idx - WINDOW_SIZE: | |
break | |
n = 0 | |
while n < MAX_MATCH and n < len(s): | |
if idx + n >= self.history_idx: | |
x = s[idx + n - self.history_idx] | |
else: | |
x = self.history[(idx + n) % WINDOW_SIZE] | |
if x != s[n]: | |
break | |
n += 1 | |
if n > length: | |
length = n | |
distance = self.history_idx - idx | |
return (distance, length) | |
def append_history(self, s): | |
# add to the window, keeping window at most WINDOW_SIZE bytes | |
for c in s: | |
self.history[self.history_idx % WINDOW_SIZE] = c | |
self.history_idx += 1 | |
if self.history_idx >= 4: | |
t = self.history[(self.history_idx - 4) % WINDOW_SIZE] | |
t += self.history[(self.history_idx - 3) % WINDOW_SIZE] | |
t += self.history[(self.history_idx - 2) % WINDOW_SIZE] | |
t += self.history[(self.history_idx - 1) % WINDOW_SIZE] | |
if t not in self.history_map: | |
self.history_map[t] = [] | |
self.history_map[t].insert(0, self.history_idx - 4) | |
def output_table(self): | |
base = 0 | |
for bits, offset in self.huffman: | |
bits -= base | |
assert bits >= 0 | |
self.write_bits(1, bits + 1) | |
base += bits | |
def output_symbol(self, sym): | |
sym = self.alphabet.index(sym) | |
idx = huff_index(self.huffman, sym) | |
bits, offset = self.huffman[idx] | |
self.write_bits(idx, 4) | |
self.write_bits(sym - offset, bits) | |
def compress(self, src, dst): | |
self.file = dst | |
i = 0 | |
cnt = 0 | |
while i < len(src): | |
if cnt >= 4096: | |
cnt = 0 | |
# rebuild huffman table | |
self.output_symbol(0x110) | |
self.symbol_counts[0x110] += 1 | |
self.rebuild_alphabet() | |
self.rebuild_table() | |
self.symbol_counts = map(lambda x: x / 2, self.symbol_counts) | |
self.output_table() | |
(distance, length) = self.find_match(buffer(src, i)) | |
if i+1 < len(src): | |
(next_distance, next_length) = self.find_match(buffer(src, i+1)) | |
if next_length > length: | |
length = 0 | |
if length < 4: | |
symbol = ord(src[i]) | |
self.output_symbol(symbol) | |
self.append_history(buffer(src, i, 1)) | |
i += 1 | |
elif length < 12: | |
symbol = 0x100 + length - 4 | |
self.output_symbol(symbol) | |
idx = huff_index(DISTANCE_HUFFMAN_TABLE, distance >> 9) | |
bits, offset = DISTANCE_HUFFMAN_TABLE[idx] | |
self.write_bits(idx, 3) | |
self.write_bits(distance - (offset << 9), bits + 9) | |
self.append_history(buffer(src, i, length)) | |
i += length | |
else: | |
idx = huff_index(LENGTH_HUFFMAN_TABLE, length - 4) | |
bits, offset = LENGTH_HUFFMAN_TABLE[idx] | |
symbol = 0x108 + idx | |
self.output_symbol(symbol) | |
self.write_bits(length - 4 - offset, bits) | |
idx = huff_index(DISTANCE_HUFFMAN_TABLE, distance >> 9) | |
bits, offset = DISTANCE_HUFFMAN_TABLE[idx] | |
self.write_bits(idx, 3) | |
self.write_bits(distance - (offset << 9), bits + 9) | |
self.append_history(buffer(src, i, length)) | |
i += length | |
self.symbol_counts[symbol] += 1 | |
cnt += 1 | |
if __name__ == '__main__': | |
import sys | |
f = open(sys.argv[1], 'rb') | |
src = f.read() | |
f = open(sys.argv[2], 'wb') | |
f.write(struct.pack('<I', len(src))) | |
nxz = NxzCompress() | |
nxz.compress(src, f) | |
nxz.flush() | |
f.close() | |
f = open(sys.argv[2], 'rb') | |
f.read(4) | |
nxz = NxzDecompress() | |
dst = nxz.decompress(f.read(), len(src)) | |
f = open(sys.argv[3], 'wb') | |
f.write(dst) | |
f.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment