Skip to content

Instantly share code, notes, and snippets.

@csm10495
Created September 28, 2020 00:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save csm10495/b188b266a49610d6d4e3fd690663b183 to your computer and use it in GitHub Desktop.
Save csm10495/b188b266a49610d6d4e3fd690663b183 to your computer and use it in GitHub Desktop.
simple_compression.py
'''
Simple compression example
(C) - Charles Machalow - MIT License
Attempts to compress using a 'bit-compression' algorithm. Basically it goes
byte by byte trying to shrink each byte to the minimum number of bits needed to
represent each byte. If all 8 bits are needed, it will likely yield a larger than
original file. Often all 8 aren't needed if the file is something like ascii.
Some optimizations to speed that would likely speed this up greatly
- Stop going to string for to get binary representations (use math instead)
- Don't modify bit_strings, instead iterate along them
- Create a translation_table attribute that gives the table from byte to bit-length-correct bit string
(this is instead of computing it over and over at runtime, which is what it does now)
- Can split input data into multiple sections, then use threads/processes to compress instead of having
one thread go through the whole data.
'''
def bytes_to_number(data):
return int.from_bytes(data, 'little', signed=False)
def bits_to_bytes(bit_string, bits_in_byte=8):
if bit_string.startswith('0b'):
bit_string = bit_string[2:]
while len(bit_string) % bits_in_byte != 0:
bit_string = '0' + bit_string
ret_bytes = b''
while bit_string != '':
this_byte = int(bit_string[:bits_in_byte], 2)
bit_string = bit_string[bits_in_byte:]
ret_bytes += coerce_to_byte(this_byte)
return ret_bytes
def bytes_to_bits(the_bytes, bits_in_byte=8):
ret_bits = ''
for b in the_bytes:
unexpanded_bits = bin(b)[2:]
while len(unexpanded_bits) % bits_in_byte != 0:
unexpanded_bits = '0' + unexpanded_bits
ret_bits += unexpanded_bits
return ret_bits
def number_to_bytes(num, num_bytes=None):
if num_bytes is None:
num_bits = num.bit_length()
while num_bits % 8 != 0:
num_bits += 1
assert num_bits % 8 == 0, 'math error'
num_bytes = int(num_bits / 8)
return num.to_bytes(num_bytes, 'little', signed=False)
def inverted_dict(d):
return {v: k for k, v in d.items()}
def coerce_to_bytes(d):
if isinstance(d, str):
d = d.encode('utf-8')
return d
def coerce_to_byte(d):
if isinstance(d, str):
d = d[0].encode('utf-8')
elif isinstance(d, int):
d = bytes([d])
return d
class TranslationTable:
VERSION = b'1'
EYE_CATCHER = b'COMP'
HEADER_LENGTH_SIZE_IN_BYTES = 2
def __init__(self, translation_data=None, data_to_compress=None):
self.HEADER_LENGTH_IN_BYTES = len(self.VERSION) + len(self.EYE_CATCHER) + self.HEADER_LENGTH_SIZE_IN_BYTES
if translation_data is not None and data_to_compress is not None:
raise ValueError("Only one of translation_data/data_to_compress should be given.")
if translation_data:
self._init_from_translation_data(translation_data)
if data_to_compress:
self._init_from_data_to_compress(data_to_compress)
assert isinstance(self._translations, dict)
def __getitem__(self, key):
return self._translations[key]
def __eq__(self, other):
return self._translations == other._translations
def _init_from_translation_data(self, translation_data):
self._translations = {}
if translation_data.startswith(self.EYE_CATCHER):
translation_data_length = bytes_to_number(translation_data[5:7])
translation_data = translation_data[7:7+translation_data_length] # skip header and raw data
assert len(translation_data) % 2 == 0, 'translation_data should have an even length'
for idx in range(0, len(translation_data), 2):
actual_byte = coerce_to_byte(translation_data[idx])
compressed_byte = coerce_to_byte(translation_data[idx + 1])
self._translations[actual_byte] = compressed_byte
def _init_from_data_to_compress(self, data_to_compress):
data_to_compress = coerce_to_bytes(data_to_compress)
self._translations = {}
max_num = 1
for byte in data_to_compress:
if coerce_to_byte(byte) not in self._translations:
self._translations[coerce_to_byte(byte)] = coerce_to_byte(max_num)
max_num += 1
def is_compressible(self):
return len(self._translations) < 0xFF
def get_item_length_in_bits(self):
return max(v[0] for v in self._translations.values()).bit_length()
def get_translation_table_raw(self):
ret_bytes = b''
for uncompressed, compressed in self._translations.items():
ret_bytes += uncompressed + compressed
return ret_bytes
def get_translation_table_with_header(self):
raw = self.get_translation_table_raw()
# Byte 0-3: COMP
# Byte 4: version = 1
# Byte 5-6: length of table
# Byte 7:7+length: table
ret_bytes = self.EYE_CATCHER + self.VERSION + number_to_bytes(len(raw), self.HEADER_LENGTH_SIZE_IN_BYTES) + raw
return ret_bytes
def get_inverted_translation_dict(self):
return inverted_dict(self._translations)
test_data = b'abc'
t = TranslationTable(data_to_compress=test_data)
t2 = TranslationTable(translation_data=t.get_translation_table_raw())
assert t2 == t
t3 = TranslationTable(translation_data=t.get_translation_table_with_header())
assert t3 == t
class Compressor:
def __init__(self, raw_data):
self.raw_data = coerce_to_bytes(raw_data)
def compress(self, add_table_header=True):
translation_table = TranslationTable(data_to_compress=self.raw_data)
item_bit_length = translation_table.get_item_length_in_bits()
bit_string = ''
for byte in self.raw_data:
this_byte = coerce_to_byte(byte)
this_byte_compressed_to_x_bits = bin(translation_table[this_byte][0])[2:].rjust(item_bit_length, '0')
assert len(this_byte_compressed_to_x_bits) == item_bit_length
bit_string += this_byte_compressed_to_x_bits
ret_bytes = bits_to_bytes(bit_string, 8)
if add_table_header:
ret_bytes = translation_table.get_translation_table_with_header() + ret_bytes
return ret_bytes
t4 = TranslationTable(translation_data=Compressor(test_data).compress())
assert t4 == t
class Uncompressor:
def __init__(self, compressed_data):
self.compressed_data = coerce_to_bytes(compressed_data)
def uncompress(self):
translation_table = TranslationTable(translation_data=self.compressed_data)
reverse_translation_dict = translation_table.get_inverted_translation_dict()
actual_compressed_data = self.compressed_data[len(translation_table.get_translation_table_with_header()):]
item_bit_length = translation_table.get_item_length_in_bits()
in_bits = bytes_to_bits(actual_compressed_data, item_bit_length)
expanded_items = b''
current_offset = 0
# expand
while in_bits[current_offset:current_offset+item_bit_length]:
the_bits = in_bits[current_offset:current_offset+item_bit_length]
expanded_items += coerce_to_byte(int(the_bits, 2))
current_offset += item_bit_length
# convert
ret_bytes = b''
for byte in expanded_items:
ret_bytes += reverse_translation_dict[coerce_to_byte(byte)]
return ret_bytes
compressed = Compressor(test_data).compress()
uncompressed = Uncompressor(compressed).uncompress()
assert uncompressed == test_data
def compression_ratio(raw_data):
compressed = Compressor(raw_data).compress()
ratio = len(compressed) / len(raw_data)
return ratio
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment