Skip to content

Instantly share code, notes, and snippets.

@kurtbrose
Created February 24, 2016 06:26
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kurtbrose/419d706f4542361b4562 to your computer and use it in GitHub Desktop.
Save kurtbrose/419d706f4542361b4562 to your computer and use it in GitHub Desktop.
'''
Simple distributed, streaming algorithm for keeping counts on
an adaptive sample of data.
'''
import time
import hashlib
import json
import base64
class HistrogramSample(object):
'''
Keeps a count, and first and last occurrences for randomly
selected items from an infinite stream of strings.
Never tracks more than maxsize items. As more unique
items are seen, it periodically reduces the fraction
of data that is tracked.
'''
def __init__(self, maxsize=512):
self.n = 0
self.data = {}
self.maxsize = maxsize
self.leading_zeros = 0
self.prefix = _leading_zeros(0)
def add(self, data):
self.n += 1
signature = hashlib.sha256(data).digest()
if signature > self.prefix:
return # vast majority of calls return here
signature = signature[:12] # 12 bytes -> 1 in 2**96 collisions; plenty
curt = time.time()
if signature not in self.data:
self.data[signature] = {'first3': [], 'n': 0, 'last3': []}
data = self.data[signature]
if len(data['first3']) < 3:
data['first3'].append(curt)
data['n'] += 1
data['last3'] = data['last3'][:2] + [curt]
# only very very tiny fraction will get here
# at most, 1 / maxsize of data updates will cause a recompress
if len(self.data) > self.maxsize:
self.leading_zeros += 1
self.prefix = _leading_zeros(self.leading_zeros)
evicted = [s for s in self.data if s > self.prefix]
for signature in evicted:
del self.data[signature]
def merge(self, other):
if other.maxsize != self.maxsize:
raise ValueError("algorithm is only distributed when maxsize is the same")
self.n += other.n
for k in other.data:
if k not in self.data:
self.data[k] = dict(other.data[k])
else:
sd, od = self.data[k], other.data[k]
sd['n'] += od['n']
sd['first3'] = sorted(sd['first3'] + od['first3'])[:3]
sd['last3'] = sorted(sd['first3'] + od['first3'], reverse=True)[:3]
while len(self.data) > self.maxsize:
self.leading_zeros += 1
self.prefix = _leading_zeros
evicted = [s for s in self.data if s > self.prefix]
for signature in evicted:
del self.data[signature]
def to_json(self):
outp = {}
for sig, data in self.data.items():
outp[base64.b64encode(sig)] = data
return json.dumps({
'n': self.n,
'data': outp,
'z0s': self.leading_zeros,
'size': self.maxsize})
@classmethod
def from_json(cls, data):
self = cls()
inp = json.loads(data)
self.n = inp['n']
for k, v in inp['data'].items():
self.data[base64.b64decode(k)] = v
self.leading_zeros = inp['z0s']
self.maxsize = inp['size']
self.prefix = _leading_zeros(self.leading_zeros)
return self
def _leading_zeros(n):
return b'\0' * (n / 8) + chr(0xFF >> n % 8)
'''
A note on the binary counter thing used here.
Every "increment", we increase the number of leading 0's in the binary
representation of the string.
A property of Python strings is that they sort first by integer value of
common prefix, then by length. So, we can use this for an elegant comparison
'''
def test():
hs = HistrogramSample()
SIZE = 4096
import os
data = [os.urandom(32) for i in range(SIZE)]
s = time.time()
for d in data:
hs.add(d)
hs.add(d)
hs.add(d)
hs.add(d)
f = time.time()
copy = HistrogramSample.from_json(hs.to_json())
assert copy.__dict__ == hs.__dict__, "json round-trip failed"
for sample in hs.data.values():
assert sample['n'] == 4
hs.merge(copy)
data2 = [os.urandom(32) for i in range(SIZE)]
for d in data2:
copy.add(d)
hs.merge(copy)
print "time per add", (f - s) / (hs.n) * 1e6, "microseconds"
if __name__ == "__main__":
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment