Skip to content

Instantly share code, notes, and snippets.

@Lazin
Created February 2, 2017 13:38
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Lazin/a28ba28b1a51a95e193e13575eef6509 to your computer and use it in GitHub Desktop.
Save Lazin/a28ba28b1a51a95e193e13575eef6509 to your computer and use it in GitHub Desktop.
from __future__ import print_function
import ctypes
import random
import itertools
class Bits:
@staticmethod
def float2bin(f):
val = ctypes.cast((ctypes.c_double * 1)(f), ctypes.POINTER(ctypes.c_ulonglong)).contents.value
return val
@staticmethod
def clzl(val):
"""Count leading zeroes. Assume `val` is a 64-bit integer"""
cnt = 0
for i in range(0, 64):
if val & (1 << (63 - i)) != 0:
break
cnt += 1
return cnt
@staticmethod
def ctzl(val):
"""Count trailing zeroes. Assume `val` is a 64-bit integer"""
cnt = 0
for i in range(0, 64):
if val & (1 << i) != 0:
break
cnt += 1
return cnt
class SimplePredictor:
"""Simple predictor that just assumes that the next value
will be the same as the previous one (aka delta modulation).
"""
def __init__(self):
self.__prev = 0l
def predict_next(self, hint):
"""Predict value. Parameter `hint` contains a value that should be
predicted by the model."""
return self.__prev
def update(self, val):
self.__prev = val
def compresses_trailing(self):
return True
class FcmPredictor:
"""Predictor that uses 1st order finite context method"""
def __init__(self, table_size):
if (table_size & (table_size - 1)) != 0:
raise ValueError("`table_size` should be a power of two!")
self.__table = [0]*table_size
self.__mask = table_size - 1
self.__last_hash = 0l
def predict_next(self, hint):
return self.__table[self.__last_hash]
def update(self, val):
self.__table[self.__last_hash] = val
shifted = (self.__last_hash << 5) & 0xFFFFFFFFFFFFFFFF
self.__last_hash = (shifted ^ (val >> 50)) & self.__mask;
def compresses_trailing(self):
return True
class DfcmPredictor:
"""Predictor that uses 1st order differential finite context method"""
def __init__(self, table_size):
if (table_size & (table_size - 1)) != 0:
raise ValueError("`table_size` should be a power of two!")
self.__table = [0]*table_size
self.__mask = table_size - 1
self.__last_hash = 0l
self.__last_value = 0l
def predict_next(self, hint):
return self.__table[self.__last_hash] + self.__last_value
def update(self, val):
delta = val - self.__last_value
self.__table[self.__last_hash] = delta
shifted = (self.__last_hash << 5) & 0xFFFFFFFFFFFFFFFF
self.__last_hash = (shifted ^ (delta >> 50)) & self.__mask;
self.__last_value = val
def compresses_trailing(self):
return True
class CombinedPredictor:
def __init__(self, tabsize):
self.__fcm = SimplePredictor()
self.__dfcm = DfcmPredictor(tabsize)
def predict_next(self, hint):
fcm = self.__fcm.predict_next(hint)
dfcm = self.__dfcm.predict_next(hint)
diff_fcm = fcm ^ hint
diff_dfcm = dfcm ^ hint
lzfcm = Bits.clzl(diff_fcm)
lzdfcm = Bits.clzl(diff_dfcm)
if lzfcm > lzdfcm:
return fcm
else:
return dfcm
def update(self, val):
self.__fcm.update(val)
self.__dfcm.update(val)
def compresses_trailing(self):
return False
def analyze_sequence(seq, predictor):
"""Compreses the sequence and returns various stats
that describes how good sequence can be compressed
using this predictor"""
for fpvalue in seq:
actual = Bits.float2bin(fpvalue)
predicted = predictor.predict_next(actual)
delta = actual ^ predicted
predictor.update(actual)
trailing_zeros = Bits.ctzl(delta)
leading_zeros = Bits.clzl(delta)
bits = 64
nleading = 0
if predictor.compresses_trailing():
if trailing_zeros > leading_zeros:
bits -= trailing_zeros
else:
bits -= leading_zeros
nleading = 1
else:
bits -= leading_zeros
nleading = 1 if leading_zeros >= trailing_zeros else 0
yield fpvalue, actual, delta, bits, nleading
class TestCases:
@staticmethod
def const(value):
"""Return same value"""
while True:
yield value
@staticmethod
def rwalk(mu, sigma):
"""Returns random walk sequence"""
value = 0.0
while True:
value += random.gauss(mu, sigma)
yield value
@staticmethod
def finite_set(pswitch, N, arg=None):
"""Return infinite random sequence of flags (1, 2, ...).
Probability of the flag switch is defined by the `pswitch` argument (should be in 0-1 range).
Number of flags is defined by `N`."""
flags = range(0, N) if arg is None else arg
curr = random.choice(flags)
while True:
e = random.random()
if e < pswitch:
curr = random.choice(flags)
yield float(curr)
@staticmethod
def pattern(N, arg=None):
"""Return the same pattern over and over again"""
flags = range(0, N) + [-1] if arg is None else arg
random.shuffle(flags)
while True:
for f in flags:
yield float(f)
@staticmethod
def randomly_growing(growthrate, step):
"""Randomly growing sequence. Value will be increased at each step with
at least once (propbability defined by the `growthrate` argument)."""
curr = 0.0
while True:
curr += step
e = random.random()
while e < growthrate:
curr += step
e = random.random()
yield curr
@staticmethod
def steady_growing(growthrate, steps):
"""Steady growing sequence. Output value is increased periodically
by the same amount. Period is defined by `steps` arg."""
assert(steps != 0)
curr = 0.0
while True:
for i in range(0, steps):
yield curr
curr += growthrate
def take_n(seq, N):
"""Return sequence that contain first N elements of the
underlying sequence"""
return itertools.islice(seq, 0, N)
def main():
limit = 500
test_cases = [
("Constant", TestCases.const(42.0)),
("Random walk", TestCases.rwalk(0.0, 0.01)),
("Random walk x10", TestCases.rwalk(0.0, 0.1)),
("Finite set (2)", TestCases.finite_set(0.05, 2)),
("Finite set (10)", TestCases.finite_set(0.05, 10)),
("Finite set (100)", TestCases.finite_set(0.05, 100)),
("Finite set (wide)", TestCases.finite_set(0.05, 0, [0, 10, 100, 1000])),
("Finite set (wide 2)", TestCases.finite_set(0.5, 0, [0, 10, 100, 1000])),
("Pattern (2)", TestCases.pattern(2)),
("Pattern (10)", TestCases.pattern(10)),
("Pattern (100)", TestCases.pattern(100)),
("Pattern (wide)", TestCases.pattern(0, [0, 10, 100, 1000])),
("Steady growth", TestCases.steady_growing(0.1, 1)),
("Steady growth int", TestCases.steady_growing(1, 1)),
("Slow random growth", TestCases.randomly_growing(0.01, 0.1)),
("Fast random growth", TestCases.randomly_growing(0.5, 0.1)),
("Slow rnd growth int", TestCases.randomly_growing(0.01, 10)),
("Fast rnd growth int", TestCases.randomly_growing(0.5, 10)),
]
pred_list = [
("Delta modulation", lambda: SimplePredictor()),
("First order FCM", lambda: FcmPredictor(1 << 7)),
("First order DeltaFCM", lambda: DfcmPredictor(1 << 7)),
("Combined DFCM+Delta", lambda: CombinedPredictor(1 << 7)),
]
print("------------------------------------------------------------------------------------------------------------------")
print("{0: <30} | {1: <30} | {2: <20} | {3: <10} | {4: <10} |".format("Test case", "Predictor",
"Bits/elem", "N zeros", "Leading"))
print("-------------------------------|--------------------------------|----------------------|------------|------------|")
for test_name, test_seq in test_cases:
rowres = []
for name, pred_generator in pred_list:
pred = pred_generator()
total_elements = 0
total_bits = 0
total_zeros = 0
total_leading = 0
seq = take_n(test_seq, limit)
res = analyze_sequence(seq, pred)
for fpvalue, actual, delta, bits, nleading in res:
total_elements += 1
total_bits += bits
total_leading += nleading
if delta == 0:
total_zeros += 1
rowres.append((test_name, name, total_bits, total_elements, total_zeros, total_leading))
#rowres.sort(key=lambda tup: float(tup[2])/tup[3])
for test_name, name, total_bits, total_eleents, total_zeros, total_leading in rowres:
print("{0: <30} | {1: <30} | {2: <20} | {3: <10} | {4: <10} |".format(test_name, name,
float(total_bits)/total_elements,
total_zeros, total_leading))
print("-------------------------------|--------------------------------|----------------------|------------|------------|")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment