Skip to content

Instantly share code, notes, and snippets.

@bmad4ever
Created February 28, 2023 00:23
Show Gist options
  • Save bmad4ever/01092f9ad044707e15930243b4f35991 to your computer and use it in GitHub Desktop.
Save bmad4ever/01092f9ad044707e15930243b4f35991 to your computer and use it in GitHub Desktop.
Try to compress it, but you can only store random seeds
# Disclaimer: The exclusive use of random seeds to compress data in a LOSSLESS fashion is a bad idea.
# ( !!! much work for few gains !!! )
# With that out of the way, lets do it anyway...
#
# This "programming joke exercise" took me more time than expected,
# but at least was somewhat interesting despite the expected outcome.
# The solution is not yet 100% complete and there are performance improvements not yet implement
# but I doubt I will have the time and motivation to improve it.
import enum
import math
import struct
import sys
from collections import Counter
import random
from multiprocessing import Pool
import numpy as np
from joblib._multiprocessing_helpers import mp
import ckmeans as ckm
class RandomBasedXCompressor:
DEFAULT_BUFFER_SIZE = 8192
"The used buffer size when reading file to get symbol counts or decompressing."
SEED_PROC_STATE = enum.Enum('SEED_PROC_STATE', ['header', 'tail', 'output'])
"""
header: process seed metadata
tail: process seed data.
output: write byte to file.
"""
# TODO (solution completeness and efficacy)
# when all 255 symbols are used don't make a list just use freq
# ( current solution will alt if storing seeds is worse than symbols and no alternative is implemented )
# TODO (simplify solution and slight temporal performance improvement)
# use a byte with 4 items of metadata followed by their seed chunk bytes.
# easier and faster to parse than current alternating pattern, as there is no need
# to take into account the offset of the items within the read bytes
# TODO (solution completeness)
# consider forcing seed search with max. numb. of bytes 3 (max iteration = 16777215) and forcing chunk size 3 or 4
# then, if seed is not found the seed metadata stores the number of seed bytes instead of (seed bytes - 1)
# and the 00 is used to store the following 3 or 4 numbers of raw input bytes *no seed used
# TODO (temporal performance enhancement)
# consider processing multiple data chunks in parallel instead of using multiple threads for a single seed search
# Using a single thread pool, instead of using one per search should speed up the compression
# TODO (simplify usage & solution completness)
# Guess an acceptable number of splits for frequency chunks given the number of symbols and stop_mse
# ( compute some samples and find a function that somewhat fits the data )
def __init__(self):
self.verbose = False
self.__thread_found_seed: bool = False
"Used to stop parallel threads when a solution is found. " \
"Is used in more granular methods, so if these methods are 'manually' called it may require a 'manual' reset. " \
"Can also be set to True 'manually' to forcebly stop all threads."
self.max_threads: int = mp.cpu_count()
"The maximum number of threads to use on tasks that allow parallel processing. " \
"Set to mp.cpu_count() by default."
self.starting_number_of_chunks_for_symbols: int = 0
"[Deprecated] When generating the symbols' seed list using chunks of equal size, " \
"set the initial number of chunks to be used to speedup symbol generation. " \
"If it is not possible to generate seeds for every chunk, the program will increment the number of chunks. " \
"When using the default value of 0, an initial guess is estimated. " \
"( note: it may be possible to obtain a valid seed list with a number of chunks lower than the initial guess )"
self.smart_split_max_chunk_length = 15
"When generating the symbols' seed list via smart split, " \
"keep splitting the symbol chunks until every chunk's length is equal or below this value. " \
"The bigger the value, the harder is to find a seed. For better context take into account that: " \
"12! < max unsigned integer < 13!, so do not use a max length much higher than 12"
self.stop_mse = 0
"When generating frequencies' seeds, stop the search when mse is lower than the stop_mse. " \
"When using the default value of 0, the search will continue until max iteration is reached."
self.number_of_freq_seed_chunk = 3
"The number of seeds (and complementary metadata) stored when compressing. " \
"The higher the number, the easier it is to get a good approximation of the real freq. distribution."
self.source_data_chunks_length = 3 # TODO consider using 4 instead ???
"Ratio of bytes to seeds. 3 means that for each 3 bytes a seed is generated." \
" Note that, in the current implementation, a seed has 1 to 4 bytes plus 2 bits of metadata."
self.symbols_chunk_seed_max_iterations = 500000000
self.freq_chunk_seed_max_iterations = 20000000
self.data_chunk_seed_max_iterations = 500000000 # use 16777215 (+1 probably, due to range right side being exclusive) for 3 bytes maximum
def find_best_seed_to_match_frequency(self, data, range_start, range_end, total=0):
"""
:param data: List with the number occurrences.
May be a sublist of the original list, in which case the total parameter should be specified.
:param range_start start value for the seed search (inclusive)
:param range_end: end value for the seed search (non inclusive)
:param total: the total number of occurrences, only needed if data is a subset of the entire data in order to
properly compute the frequencies.
:return: seed, max frequency, mse
"""
if total == 0:
total = sum(data)
frequencies = [x / total for x in data]
max_frequency = max(frequencies)
best_seed = 0
min_mse = float('inf')
for seed in range(range_start, range_end):
if self.__thread_found_seed:
break
random.seed(seed)
generated = [random.random() * max_frequency for _ in range(len(data))]
mse = sum((x - y) ** 2 for x, y in zip(generated, frequencies))
if mse < min_mse:
best_seed = seed
min_mse = mse
if min_mse < self.stop_mse:
self.verbose and print(
f'freq match stopped at mse = {min_mse}, lower than stop value of {self.stop_mse}')
self.__thread_found_seed = True
return best_seed, max_frequency, min_mse
def find_best_seed_to_match_frequency_auto_single(self, data, max_iteration=sys.maxsize, total=None):
"""
Similar to find_best_seed_to_match_frequency, but will schedule the search on multiple threads for a faster search.
:param data: List with the number occurrences.
May be a sublist of the original list, in which case the total parameter should be specified.
:param max_iteration: The number of seeds to check.
:param total: the total number of occurrences, only needed if data is a subset of the entire data in order to
properly compute the frequencies.
:return: seed, max frequency, mse
"""
if total is None:
total = sum(data)
for arg, name in [(max_iteration, "max_iteration"), (total, "total")]:
if not type(arg) == int:
raise TypeError(f"{name} must be an integer.")
if arg < 1:
raise ValueError(f"{name} must be a positive number greater than zero.")
range_size, rest = divmod(max_iteration, self.max_threads)
p = Pool(self.max_threads)
args = [(data, i * range_size, i * range_size + range_size, total) for i in range(0, self.max_threads - 1)]
args.append((data, (self.max_threads - 1) * range_size, max_iteration, total))
self.__thread_found_seed = False # only used if self.stop_mse > 0
with p:
results = p.starmap(self.find_best_seed_to_match_frequency, args)
self.__thread_found_seed = False # reset to avoid potential conflicts with more granular functions
return min(results, key=lambda r: r[2])
def find_best_seeds_to_match_frequency_chunks(self, data, chunks=1, max_iteration=sys.maxsize):
"""
Splits the data list into multiple chunks and finds a seed for each one.
:param data: List with the number occurrences.
May be a sublist of the original list, in which case the total parameter should be specified.
:param chunks: The number of chunks that compose the frequency list. If omitted, no split is done, and only one
seed will be searched to match the entire list
:param max_iteration: The number of seeds to check.
:return: list of ( seed, max frequency, mse ) tuples.
"""
if not type(chunks) == int:
raise TypeError("Chunk must be an integer")
if chunks < 1:
raise ValueError("Chunk must be an integer greater than zero.")
if chunks == 1:
return [self.find_best_seed_to_match_frequency_auto_single(data, max_iteration=max_iteration)]
total = sum(data)
range_size, rest = divmod(len(data), chunks)
split_data = [data[x * range_size: (x + 1) * range_size] for x in range(0, chunks - 1)]
split_data.append(data[(chunks - 1) * range_size:])
return [self.find_best_seed_to_match_frequency_auto_single(s, max_iteration, total) for s in split_data]
@staticmethod
def generate_frequencies_from_seed(seed, max_freq, number_of_symbols):
"""
Obtain the approximate frequencies using a given seed and max frequency value
:param seed:
:param max_freq:
:param number_of_symbols:
:return: List of frequencies.
"""
random.seed(seed)
return [random.random() * max_freq for _ in range(number_of_symbols)]
def rebuild_frequency_table(self, seed_maxfreq_pairs, number_of_symbols):
"""
Rebuilds the list of approximate frequencies.
:param seed_maxfreq_pairs: list of (seeds, max frequencies) tuples.
:param number_of_symbols: the number of frequencies.
:return: a list with the approximate frequencies.
"""
len_pairs = len(seed_maxfreq_pairs)
range_size, rest = divmod(number_of_symbols, len_pairs)
freqs = []
for seed, max_freq in seed_maxfreq_pairs[:-1]:
freqs = freqs.__add__(
self.generate_frequencies_from_seed(seed, max_freq, range_size)
)
seed, max_freq = seed_maxfreq_pairs[-1]
freqs = freqs.__add__(
self.generate_frequencies_from_seed(seed, max_freq, range_size + rest)
)
return freqs
def rebuild_data_chunk(self, seed, cumulative_distribution, symbols):
random.seed(seed)
random_values = [random.random() for i in range(self.source_data_chunks_length)]
chunk = bytearray([next(symbols[x] for x, val in enumerate(cumulative_distribution) if val > r)
for r in random_values])
return chunk
def find_seed_2gen_sequence(self, sequence, range_start, range_end, freqs, symbols):
"""
Searches for a seed that produces the given sequence, exactly as it is (lossless).
:param sequence: target sequence to match.
:param range_start: seeds to test lower bound (inclusive).
:param range_end: seeds to test upper bound (exclusive).
:param freqs: the frequency of the symbols. Its order should match the symbols order.
:param symbols: the symbols used to generate the sequence.
:return:
"""
cfreqs = self.get_cumulative_distribution(freqs)
# sequence = sequence.encode('utf-8')
for seed in range(range_start, range_end):
if self.__thread_found_seed:
return 'Stopped since other thread found a valid seed'
random.seed(seed)
valid = True
for symb in sequence:
rand = random.random()
symbol_index = next(x for x, val in enumerate(cfreqs) if val > rand)
if symb != symbols[symbol_index]: # !!! different types RN, correct this
valid = False
break
if valid:
self.__thread_found_seed = True
return seed
return 'no seed found'
def get_cumulative_distribution(self, freqs):
cfreqs = np.cumsum(freqs)
cfreqs = cfreqs / max(cfreqs) # fix cumsum, last won't be 1 since the freqs is an approximation
return cfreqs
def find_seed_2gen_sequence_p(self, sequence, freqs, symbols, max_iteration=sys.maxsize):
"""
Parallel version of find_seed_2gen_sequence.
:param sequence:
:param freqs:
:param symbols:
:param max_iteration:
:return:
"""
range_size, rest = divmod(max_iteration, self.max_threads)
p = Pool(self.max_threads)
args = [(sequence, i * range_size, i * range_size + range_size, freqs, symbols) for i in
range(0, self.max_threads - 1)]
args.append((sequence, (self.max_threads - 1) * range_size, max_iteration, freqs, symbols))
self.__thread_found_seed = False
with p:
results = p.starmap(self.find_seed_2gen_sequence, args)
self.__thread_found_seed = False # reset to avoid potential conflicts with more granular functions
return next((val for x, val in enumerate(results) if isinstance(val, int)), 'no seed found')
# TODO consider generating the symbol list from a random too? unlike the freqs, the order is irrelevant.
# the code should be very similar...
# might help reduce the solution space for seed, when generating the sequence...
# ... actually, on second though, this solution requires it...
def find_best_seed_to_match_symbols(self, range_start, range_end, symbols):
# symbols must be sorted for this to work!
# upper bound is used as stop criteria to allow arbitrary length (harder match though)
lower_bound = symbols[0]
upper_bound = symbols[-1]
solution_space = [*range(lower_bound + 1, upper_bound + 1)]
symbols = symbols[1:] # ignore first element
for seed in range(range_start, range_end):
if self.__thread_found_seed:
return 'Stopped since other thread found a valid seed'
random.seed(seed)
# shuffle and pick nth elements until upper bound to avoid duplicates and storing size of symbol
# upper bound is used as stop symbol, in order to allow for arbitrary length chunks
generated = random.sample(solution_space, k=len(symbols))
if generated[
len(symbols) - 1] == upper_bound: # this check is required to allow for arbitrary length chunks
generated.sort()
if generated == symbols:
self.__thread_found_seed = True
return seed, lower_bound, upper_bound
return 'no seed found'
def find_best_seed_to_match_symbols_p(self, symbols, max_iteration=sys.maxsize):
range_size, rest = divmod(max_iteration, self.max_threads)
p = Pool(self.max_threads)
args = [(i * range_size, i * range_size + range_size, symbols) for i in
range(0, self.max_threads - 1)]
args.append(((self.max_threads - 1) * range_size, max_iteration, symbols))
self.__thread_found_seed = False
with p:
results = p.starmap(self.find_best_seed_to_match_symbols, args)
self.__thread_found_seed = False # reset to avoid potential conflicts with more granular functions
return next((val for x, val in enumerate(results) if not isinstance(val, str)), 'no seed found')
def find_best_seed_to_match_symbols_recursive_split_p(self, symbols, max_iteration=sys.maxsize,
start_chunk_count=1):
"""
Split symbols into equal length chunks and find the seeds that generate them.
If a seed is not found for a chunk, the number of chunks is increased by 1.
:param symbols:
:param max_iteration: The maximum number of iterations when searching for a seed.
:param start_chunk_count: The starting number of chunks, useful to speedup the process. If set to zero, an initial guess is computed.
:return: list of seeds.
"""
if not type(start_chunk_count) == int:
raise TypeError("start_chunk_count must be an integer")
if start_chunk_count == 0:
# try to guess a good starting chunk count
clusters = ckm.ckmeans(symbols)
self.verbose and print(f"Auto clustering results: {clusters}")
start_chunk_count = clusters.k
# ret = self.find_best_seed_to_match_symbols_p(symbols, max_iteration)
symbols = np.array(symbols)
ret = []
# note that storing seeds as integers, 4 bytes are required, plus 2 additional bytes for the min and max
# so the the number os seeds must be less than 1/6th of the length in order to use less space than actually
# storing the entire symbol table...
# I could optimize by adding a 2bit prefix, thus allowing for byte, short, int and long seeds...
# but that would be taking the joke too far... this is already a "pretty" complete solution...
if start_chunk_count > len(symbols) // (4 + 2) + 1:
self.verbose and print(f'Max chunk size of length {math.ceil(len(symbols) / start_chunk_count)}.There is '
f'no gain in generating symbols from seeds.')
return ret
# print(range(start_chunk_count, len(symbols) // (4 + 2) + 1))
for number_of_chunks in range(start_chunk_count, len(symbols) // (4 + 2) + 1):
# print(number_of_chunks)
symbol_chunks = np.array_split(symbols, number_of_chunks)
found = True # if 1 fails is set to False and restart with a higher split
for chunk in symbol_chunks:
self.verbose and print(f"processing symbol chunk: {chunk}")
single_ret = self.find_best_seed_to_match_symbols_p(list(chunk), max_iteration)
self.verbose and print(f"symbol chunk seed: {single_ret}")
if type(single_ret) == str:
found = False
break
ret.append(single_ret)
if found:
break
ret = []
return ret
def find_best_seed_to_match_symbols_recursive_smart_split_p(self, symbols, max_iteration=sys.maxsize):
"""
Split symbols into variable length chunks and find the seeds that generate them.
If a seed is not found for one of the chunks, the number of chunks is increased by 1.
The initial number of chunks is chosen via initial guesses, until a guess where no chunk has a length bigger
than smart_split_max_chunk_length is made.
If smart_split_max_chunk_length is set to zero then only one initial guess is made.
:param symbols:
:param max_iteration: The maximum number of iterations when searching for a seed.
:return: list of seeds.
"""
clusters = ckm.ckmeans(symbols)
if self.smart_split_max_chunk_length > 0:
for i in range(len(symbols) // self.smart_split_max_chunk_length * 2):
if Counter(clusters.clustering).most_common(1)[0][1] > self.smart_split_max_chunk_length:
clusters = ckm.ckmeans(symbols, (clusters.k + 1, clusters.k + 1))
else:
break
self.verbose and self.smart_split_max_chunk_length <= 0 and print('Ignoring max symbols chunk length for '
'smart split.')
self.verbose and self.smart_split_max_chunk_length > 0 and print(f'Max symbols chunk length for smart split '
f'is set to: {self.smart_split_max_chunk_length}')
starting_number_of_chunks = clusters.k
for ik in range(starting_number_of_chunks, starting_number_of_chunks + 10):
if len(symbols) <= 6 * clusters.k:
return '6 * clusters.k = len(symbols). There is no gain in generating symbols from seeds.'
self.verbose and print(f'Symbols grouped in {clusters.k} clusters, '
f'with max chunk length of {Counter(clusters.clustering).most_common(1)[0][1]}')
symbol_chunks = []
for i in range(clusters.k):
symbol_chunks.append([])
for i in range(len(symbols)):
symbol_chunks[clusters.clustering[i]].append(symbols[i])
ret = []
found = True
for chunk in symbol_chunks:
single_ret = self.find_best_seed_to_match_symbols_p(list(chunk), max_iteration)
if type(single_ret) == str:
found = False
break
ret.append(single_ret)
if not found:
clusters = ckm.ckmeans(symbols, (clusters.k + 1, clusters.k + 1))
self.verbose and print(f'Failed to generate seeds for {clusters.k} clusters')
continue
return ret
return f'Failed to find list of seed for smart split for k [{starting_number_of_chunks},{starting_number_of_chunks + 10}]'
@staticmethod
def rebuild_symbol_table(seeds):
# lower and upper bounds indicate which one is removed from list
symbol_table = []
for seed, lower_bound, upper_bound in seeds:
symbol_table.append(lower_bound)
solution_space = [*range(lower_bound + 1, upper_bound + 1)]
random.seed(seed)
generated = random.sample(solution_space, k=len(solution_space))
stop_index = generated.index(upper_bound)
generated = generated[0: stop_index + 1]
generated.sort()
symbol_table.extend(generated)
return symbol_table
@staticmethod
def byte_length(decimal):
number_of_bytes = 0
while decimal != 0:
number_of_bytes += 1
decimal = decimal >> 8
return number_of_bytes
@staticmethod
def fill_with_byte(byte, byte_bit_index, target, target_bit_index):
#print(f"B> {'{:08b}'.format(byte)} {'{:08b}'.format(target)} {byte_bit_index} {target_bit_index}")
byte |= ((target << target_bit_index) & 0x00FF) >> byte_bit_index
#print(f"A> {'{:08b}'.format(byte)} {'{:08b}'.format(target)} {byte_bit_index} {target_bit_index}")
return byte, min((byte_bit_index + 8 - target_bit_index), 8) % 8, min((target_bit_index + 8 - byte_bit_index),
8) % 8
@staticmethod
def fill_with_bit_pair(byte, byte_bit_index, bit1, bit2):
# important: will only work it is possible to always fill w/ both, if index can be odd this won't work
# it is not a generalized solution
#print(f"bef. b1 {bit1} b2 {bit2} >> {'{:08b}'.format(byte)} ... {byte_bit_index}")
byte |= ((bit1 * 2 + bit2) << 6) >> byte_bit_index
#print(f"aft. b1 {bit1} b2 {bit2} >> {'{:08b}'.format(byte)}")
return byte, (byte_bit_index + 2) % 8
@staticmethod
def write_byte(byte_2_write, output_file):
#print("{:08b}".format(byte_2_write))
output_file.write(byte_2_write.to_bytes(length=1, byteorder='big'))
byte_2_write = 0
return byte_2_write
@staticmethod
def write_symbol_frequency_metadata(output_file, seed_maxfreq_pairs):
output_file.write(len(seed_maxfreq_pairs).to_bytes(length=1, byteorder='big', signed=False))
for seed, max_freq in seed_maxfreq_pairs:
output_file.write(struct.pack('>I', seed))
output_file.write(struct.pack('>f', max_freq))
# output_file.write(max_freq.to_bytes(length=4, byteorder='big'))
@staticmethod
def write_symbol_seeds(output_file, symbol_seeds):
output_file.write(len(symbol_seeds).to_bytes(length=1, byteorder='big', signed=False))
for seed in symbol_seeds:
output_file.write(seed[0].to_bytes(length=4, byteorder='big', signed=False))
output_file.write(seed[1].to_bytes(length=1, byteorder='big', signed=False))
output_file.write(seed[2].to_bytes(length=1, byteorder='big', signed=False))
@staticmethod
def display_work_done(step, total_steps):
print("\r", end="")
if not total_steps:
print(f"{step}/??? = ??%", end="")
else:
print(f"{step}/{total_steps}", " = {:.1%}".format(step / total_steps), end="")
def compress(self, input_filepath, output_filepath):
counts, symbols = self.get_symbol_count(input_filepath)
# TODO
# first check if range is complete or a defined interval
# if not, try to find seed to generate symbols
# (also try via complement if there are more used symbols than un used)
# TODO
# consider storing list of symbols in some specific cases
# *Current solution only obtains finds seeds to generate list of symbols
symbol_seeds = self.find_best_seed_to_match_symbols_recursive_smart_split_p(symbols)
self.verbose and print(f'Symbol generation seed(s): {symbol_seeds}')
if len(symbol_seeds) == 0:
print('Failed to generate symbol seeds')
return
# find seed for frequency table
freq_seeds = self.find_best_seeds_to_match_frequency_chunks(
data=counts, chunks=3, max_iteration=self.freq_chunk_seed_max_iterations)
self.verbose and print(f'Frequency generation seed(s): {freq_seeds}')
# get the frequencies that are going to be obtained from the stored freq seeds.
# this required, since these are the ones used when decompressing, not the real frequencies.
# this could have been returned by find seed func, but should make no difference performance wise.
seed_maxfreq_pairs = [(seed, max_freq) for seed, max_freq, _ in freq_seeds] # remove mse from results
counts_sum = sum(counts)
gen_freqs = self.rebuild_frequency_table(seed_maxfreq_pairs, len(counts))
self.verbose and print(f"True frequency table:\n{[x / sum(counts) for x in counts]}")
self.verbose and print(f"Seed generated frequency table:\n{gen_freqs}")
del counts
# save to file & delete original
# store order:
# number of symbols
# n1 = number of seeds to gen symbols (1 byte)
# ( seed, min, max ) x n1 n1(4+1+1) bytes
# n2 = number of seed to gen freqs
# ( seed, maxfreq ) x n2 n2(4+4) bytes
# n3 = number of seeds to gen file
# seed x n3
with open(output_filepath, "xb") as output_file:
self.write_symbol_seeds(output_file, symbol_seeds)
del symbol_seeds
self.write_symbol_frequency_metadata(output_file, seed_maxfreq_pairs)
del seed_maxfreq_pairs
self.compute_and_write_data_chunks_seeds(gen_freqs, input_filepath, output_file, symbols, counts_sum)
# TODO (potential minor improvement)
# 1st and 2nd steps could use less space by storing
# the minimum number of needed bytes required for a given seed,
# using 2 bits as metadata, as is done in step 3.
# However, as the file grows, this improvement becomes negligible.
# It would also require a bit based state machine to write/read all the data in an organized fashion
# as the next steps may be offset in an even amount of bits.
return
def compute_and_write_data_chunks_seeds(self, frequency_table, filepath, output_file, symbols, total_to_read=0):
# TODO a meta data bit or escape code is necessary when no match is found.
# It is also needed for the last segment of the file if it is smaller than the chunk length.
# Current solution will skip last chunk, so it is not truly lossless.
# TODO cumsum only needs to be computed here once. Lower granularity function(s) need to be fixed for this too...
# 2 1st bit tell how many bytes are needed to store the seed value
# if seed is 2330, for example
# 01 00001001 00011010 is stored
chunk_size = self.source_data_chunks_length
read_bytes = 0
self.display_work_done(read_bytes, total_to_read)
with open(filepath, mode="rb") as input_file:
buffer = input_file.read(chunk_size)
byte_2_write = 0
write_bit_ptr = 0
while len(buffer) == chunk_size: # TODO change this when last segment can be properly read
data_chunk_seed = self.find_seed_2gen_sequence_p(buffer, frequency_table, symbols,
self.data_chunk_seed_max_iterations)
seed_byte_len: int = int(self.byte_length(data_chunk_seed))
#print(f'seq seed:{data_chunk_seed} len:{seed_byte_len}')
byte_2_write, write_bit_ptr = self.fill_with_bit_pair(byte_2_write,
write_bit_ptr,
(seed_byte_len - 1) // 2,
(seed_byte_len - 1) % 2)
if write_bit_ptr == 0:
byte_2_write = self.write_byte(byte_2_write, output_file)
target_ptr = 0
buffer = data_chunk_seed.to_bytes(length=4, byteorder='big', signed=False)
for c in range(4 - seed_byte_len, 4):
target = buffer[c]
while True:
byte_2_write, write_bit_ptr, target_ptr = \
self.fill_with_byte(byte_2_write, write_bit_ptr, target, target_ptr)
if write_bit_ptr == 0:
byte_2_write = self.write_byte(byte_2_write, output_file)
if target_ptr == 0:
break
buffer = input_file.read(chunk_size)
# update display with how many chunks were read
read_bytes += chunk_size
self.display_work_done(read_bytes, total_to_read)
if write_bit_ptr != 0: # there is still something left to write from last chunk
self.fill_with_byte(byte_2_write, write_bit_ptr, target, target_ptr)
@staticmethod
def get_symbol_count(filepath):
symbol_counter = Counter()
with open(filepath, mode="rb") as input_file:
buffer = input_file.read(RandomBasedXCompressor.DEFAULT_BUFFER_SIZE)
while len(buffer):
for c in buffer:
symbol_counter[c] += 1
buffer = input_file.read(RandomBasedXCompressor.DEFAULT_BUFFER_SIZE)
symbols = []
counts = []
for k, v in sorted(symbol_counter.items(), key=lambda item: item[0]):
counts.append(v)
symbols.append(k)
return counts, symbols
def decompress(self, input_filepath, output_filepath):
with open(input_filepath, mode="rb") as input_file:
# get symbols
number_of_symbols = int.from_bytes(struct.unpack('>B', input_file.read(1)), byteorder='big', signed=False)
symbol_metadata = [struct.unpack('>IBB', input_file.read(6)) for i in range(number_of_symbols)]
symbols = self.rebuild_symbol_table(symbol_metadata)
del symbol_metadata
self.verbose and print(f"Symbols:\n{symbols}")
# get freq table
number_of_freq_seeds = int.from_bytes(struct.unpack('>B', input_file.read(1)), byteorder='big',
signed=False)
freq_pairs = [struct.unpack('>If', input_file.read(8)) for i in range(number_of_freq_seeds)]
frequencies = self.rebuild_frequency_table(freq_pairs, len(symbols))
cumulative_distribution = self.get_cumulative_distribution(frequencies)
self.verbose and print(f"Read frequencies:\n{frequencies}")
self.verbose and print(f"Cumulative distribution table:\n{cumulative_distribution}")
del freq_pairs
del frequencies
# get sequence
number_of_restored_bytes = 0
with open(output_filepath, mode="xb") as output_file:
while True:
read_data = input_file.read(self.DEFAULT_BUFFER_SIZE)
if not read_data:
break
bit_ptr = 0 # in the read byte
v_byte = 0
v_bit_ptr = 0 # in the byte to be written
read_data_index = 0
chunk_seed_size = 0
seed_bytes = []
state = self.SEED_PROC_STATE.header
while read_data_index < len(read_data):
self.display_work_done(number_of_restored_bytes , 0)
r_byte = read_data[read_data_index] # read byte, without any offsets
if state is self.SEED_PROC_STATE.header:
chunk_seed_size = ((r_byte & (3 << (6 - bit_ptr))) >> (6 - bit_ptr)) + 1
#print(f"seed size: {chunk_seed_size}")
bit_ptr = (bit_ptr + 2) % 8
seed_bytes = []
state = self.SEED_PROC_STATE.tail
elif state is self.SEED_PROC_STATE.tail:
v_byte, v_bit_ptr, bit_ptr = self.fill_with_byte(v_byte, v_bit_ptr, r_byte, bit_ptr)
if v_bit_ptr == 0:
seed_bytes.append(v_byte)
v_byte = 0
if len(seed_bytes) == chunk_seed_size:
state = self.SEED_PROC_STATE.output
if state is self.SEED_PROC_STATE.output:
seed = int.from_bytes(seed_bytes, byteorder='big', signed=False)
#print(f"seed: {seed}")
data_chunk = self.rebuild_data_chunk(seed, cumulative_distribution, symbols)
output_file.write(bytes(data_chunk))
state = self.SEED_PROC_STATE.header
if bit_ptr == 0: # should be reset by the state
read_data_index += 1
number_of_restored_bytes += 1 # update display
return
if __name__ == '__main__':
comp = RandomBasedXCompressor()
#comp.smart_split_max_chunk_length = 12
comp.max_threads = 6
comp.verbose = True
#comp.compress("sample.txt", "sample3.rand")
#comp.decompress("./sample3.rand", "./sample3_rec.txt")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment