from __future__ import print_function | |
import ctypes | |
import random | |
import itertools | |
class Bits: | |
@staticmethod | |
def float2bin(f): | |
val = ctypes.cast((ctypes.c_double * 1)(f), ctypes.POINTER(ctypes.c_ulonglong)).contents.value | |
return val | |
@staticmethod | |
def clzl(val): | |
"""Count leading zeroes. Assume `val` is a 64-bit integer""" | |
cnt = 0 | |
for i in range(0, 64): | |
if val & (1 << (63 - i)) != 0: | |
break | |
cnt += 1 | |
return cnt | |
@staticmethod | |
def ctzl(val): | |
"""Count trailing zeroes. Assume `val` is a 64-bit integer""" | |
cnt = 0 | |
for i in range(0, 64): | |
if val & (1 << i) != 0: | |
break | |
cnt += 1 | |
return cnt | |
class SimplePredictor: | |
"""Simple predictor that just assumes that the next value | |
will be the same as the previous one (aka delta modulation). | |
""" | |
def __init__(self): | |
self.__prev = 0l | |
def predict_next(self, hint): | |
"""Predict value. Parameter `hint` contains a value that should be | |
predicted by the model.""" | |
return self.__prev | |
def update(self, val): | |
self.__prev = val | |
def compresses_trailing(self): | |
return True | |
class FcmPredictor: | |
"""Predictor that uses 1st order finite context method""" | |
def __init__(self, table_size): | |
if (table_size & (table_size - 1)) != 0: | |
raise ValueError("`table_size` should be a power of two!") | |
self.__table = [0]*table_size | |
self.__mask = table_size - 1 | |
self.__last_hash = 0l | |
def predict_next(self, hint): | |
return self.__table[self.__last_hash] | |
def update(self, val): | |
self.__table[self.__last_hash] = val | |
shifted = (self.__last_hash << 5) & 0xFFFFFFFFFFFFFFFF | |
self.__last_hash = (shifted ^ (val >> 50)) & self.__mask; | |
def compresses_trailing(self): | |
return True | |
class DfcmPredictor: | |
"""Predictor that uses 1st order differential finite context method""" | |
def __init__(self, table_size): | |
if (table_size & (table_size - 1)) != 0: | |
raise ValueError("`table_size` should be a power of two!") | |
self.__table = [0]*table_size | |
self.__mask = table_size - 1 | |
self.__last_hash = 0l | |
self.__last_value = 0l | |
def predict_next(self, hint): | |
return self.__table[self.__last_hash] + self.__last_value | |
def update(self, val): | |
delta = val - self.__last_value | |
self.__table[self.__last_hash] = delta | |
shifted = (self.__last_hash << 5) & 0xFFFFFFFFFFFFFFFF | |
self.__last_hash = (shifted ^ (delta >> 50)) & self.__mask; | |
self.__last_value = val | |
def compresses_trailing(self): | |
return True | |
class CombinedPredictor: | |
def __init__(self, tabsize): | |
self.__fcm = SimplePredictor() | |
self.__dfcm = DfcmPredictor(tabsize) | |
def predict_next(self, hint): | |
fcm = self.__fcm.predict_next(hint) | |
dfcm = self.__dfcm.predict_next(hint) | |
diff_fcm = fcm ^ hint | |
diff_dfcm = dfcm ^ hint | |
lzfcm = Bits.clzl(diff_fcm) | |
lzdfcm = Bits.clzl(diff_dfcm) | |
if lzfcm > lzdfcm: | |
return fcm | |
else: | |
return dfcm | |
def update(self, val): | |
self.__fcm.update(val) | |
self.__dfcm.update(val) | |
def compresses_trailing(self): | |
return False | |
def analyze_sequence(seq, predictor): | |
"""Compreses the sequence and returns various stats | |
that describes how good sequence can be compressed | |
using this predictor""" | |
for fpvalue in seq: | |
actual = Bits.float2bin(fpvalue) | |
predicted = predictor.predict_next(actual) | |
delta = actual ^ predicted | |
predictor.update(actual) | |
trailing_zeros = Bits.ctzl(delta) | |
leading_zeros = Bits.clzl(delta) | |
bits = 64 | |
nleading = 0 | |
if predictor.compresses_trailing(): | |
if trailing_zeros > leading_zeros: | |
bits -= trailing_zeros | |
else: | |
bits -= leading_zeros | |
nleading = 1 | |
else: | |
bits -= leading_zeros | |
nleading = 1 if leading_zeros >= trailing_zeros else 0 | |
yield fpvalue, actual, delta, bits, nleading | |
class TestCases: | |
@staticmethod | |
def const(value): | |
"""Return same value""" | |
while True: | |
yield value | |
@staticmethod | |
def rwalk(mu, sigma): | |
"""Returns random walk sequence""" | |
value = 0.0 | |
while True: | |
value += random.gauss(mu, sigma) | |
yield value | |
@staticmethod | |
def finite_set(pswitch, N, arg=None): | |
"""Return infinite random sequence of flags (1, 2, ...). | |
Probability of the flag switch is defined by the `pswitch` argument (should be in 0-1 range). | |
Number of flags is defined by `N`.""" | |
flags = range(0, N) if arg is None else arg | |
curr = random.choice(flags) | |
while True: | |
e = random.random() | |
if e < pswitch: | |
curr = random.choice(flags) | |
yield float(curr) | |
@staticmethod | |
def pattern(N, arg=None): | |
"""Return the same pattern over and over again""" | |
flags = range(0, N) + [-1] if arg is None else arg | |
random.shuffle(flags) | |
while True: | |
for f in flags: | |
yield float(f) | |
@staticmethod | |
def randomly_growing(growthrate, step): | |
"""Randomly growing sequence. Value will be increased at each step with | |
at least once (propbability defined by the `growthrate` argument).""" | |
curr = 0.0 | |
while True: | |
curr += step | |
e = random.random() | |
while e < growthrate: | |
curr += step | |
e = random.random() | |
yield curr | |
@staticmethod | |
def steady_growing(growthrate, steps): | |
"""Steady growing sequence. Output value is increased periodically | |
by the same amount. Period is defined by `steps` arg.""" | |
assert(steps != 0) | |
curr = 0.0 | |
while True: | |
for i in range(0, steps): | |
yield curr | |
curr += growthrate | |
def take_n(seq, N): | |
"""Return sequence that contain first N elements of the | |
underlying sequence""" | |
return itertools.islice(seq, 0, N) | |
def main(): | |
limit = 500 | |
test_cases = [ | |
("Constant", TestCases.const(42.0)), | |
("Random walk", TestCases.rwalk(0.0, 0.01)), | |
("Random walk x10", TestCases.rwalk(0.0, 0.1)), | |
("Finite set (2)", TestCases.finite_set(0.05, 2)), | |
("Finite set (10)", TestCases.finite_set(0.05, 10)), | |
("Finite set (100)", TestCases.finite_set(0.05, 100)), | |
("Finite set (wide)", TestCases.finite_set(0.05, 0, [0, 10, 100, 1000])), | |
("Finite set (wide 2)", TestCases.finite_set(0.5, 0, [0, 10, 100, 1000])), | |
("Pattern (2)", TestCases.pattern(2)), | |
("Pattern (10)", TestCases.pattern(10)), | |
("Pattern (100)", TestCases.pattern(100)), | |
("Pattern (wide)", TestCases.pattern(0, [0, 10, 100, 1000])), | |
("Steady growth", TestCases.steady_growing(0.1, 1)), | |
("Steady growth int", TestCases.steady_growing(1, 1)), | |
("Slow random growth", TestCases.randomly_growing(0.01, 0.1)), | |
("Fast random growth", TestCases.randomly_growing(0.5, 0.1)), | |
("Slow rnd growth int", TestCases.randomly_growing(0.01, 10)), | |
("Fast rnd growth int", TestCases.randomly_growing(0.5, 10)), | |
] | |
pred_list = [ | |
("Delta modulation", lambda: SimplePredictor()), | |
("First order FCM", lambda: FcmPredictor(1 << 7)), | |
("First order DeltaFCM", lambda: DfcmPredictor(1 << 7)), | |
("Combined DFCM+Delta", lambda: CombinedPredictor(1 << 7)), | |
] | |
print("------------------------------------------------------------------------------------------------------------------") | |
print("{0: <30} | {1: <30} | {2: <20} | {3: <10} | {4: <10} |".format("Test case", "Predictor", | |
"Bits/elem", "N zeros", "Leading")) | |
print("-------------------------------|--------------------------------|----------------------|------------|------------|") | |
for test_name, test_seq in test_cases: | |
rowres = [] | |
for name, pred_generator in pred_list: | |
pred = pred_generator() | |
total_elements = 0 | |
total_bits = 0 | |
total_zeros = 0 | |
total_leading = 0 | |
seq = take_n(test_seq, limit) | |
res = analyze_sequence(seq, pred) | |
for fpvalue, actual, delta, bits, nleading in res: | |
total_elements += 1 | |
total_bits += bits | |
total_leading += nleading | |
if delta == 0: | |
total_zeros += 1 | |
rowres.append((test_name, name, total_bits, total_elements, total_zeros, total_leading)) | |
#rowres.sort(key=lambda tup: float(tup[2])/tup[3]) | |
for test_name, name, total_bits, total_eleents, total_zeros, total_leading in rowres: | |
print("{0: <30} | {1: <30} | {2: <20} | {3: <10} | {4: <10} |".format(test_name, name, | |
float(total_bits)/total_elements, | |
total_zeros, total_leading)) | |
print("-------------------------------|--------------------------------|----------------------|------------|------------|") | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment