Created
February 24, 2016 06:26
-
-
Save kurtbrose/419d706f4542361b4562 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Simple distributed, streaming algorithm for keeping counts on | |
an adaptive sample of data. | |
''' | |
import time | |
import hashlib | |
import json | |
import base64 | |
class HistrogramSample(object): | |
''' | |
Keeps a count, and first and last occurrences for randomly | |
selected items from an infinite stream of strings. | |
Never tracks more than maxsize items. As more unique | |
items are seen, it periodically reduces the fraction | |
of data that is tracked. | |
''' | |
def __init__(self, maxsize=512): | |
self.n = 0 | |
self.data = {} | |
self.maxsize = maxsize | |
self.leading_zeros = 0 | |
self.prefix = _leading_zeros(0) | |
def add(self, data): | |
self.n += 1 | |
signature = hashlib.sha256(data).digest() | |
if signature > self.prefix: | |
return # vast majority of calls return here | |
signature = signature[:12] # 12 bytes -> 1 in 2**96 collisions; plenty | |
curt = time.time() | |
if signature not in self.data: | |
self.data[signature] = {'first3': [], 'n': 0, 'last3': []} | |
data = self.data[signature] | |
if len(data['first3']) < 3: | |
data['first3'].append(curt) | |
data['n'] += 1 | |
data['last3'] = data['last3'][:2] + [curt] | |
# only very very tiny fraction will get here | |
# at most, 1 / maxsize of data updates will cause a recompress | |
if len(self.data) > self.maxsize: | |
self.leading_zeros += 1 | |
self.prefix = _leading_zeros(self.leading_zeros) | |
evicted = [s for s in self.data if s > self.prefix] | |
for signature in evicted: | |
del self.data[signature] | |
def merge(self, other): | |
if other.maxsize != self.maxsize: | |
raise ValueError("algorithm is only distributed when maxsize is the same") | |
self.n += other.n | |
for k in other.data: | |
if k not in self.data: | |
self.data[k] = dict(other.data[k]) | |
else: | |
sd, od = self.data[k], other.data[k] | |
sd['n'] += od['n'] | |
sd['first3'] = sorted(sd['first3'] + od['first3'])[:3] | |
sd['last3'] = sorted(sd['first3'] + od['first3'], reverse=True)[:3] | |
while len(self.data) > self.maxsize: | |
self.leading_zeros += 1 | |
self.prefix = _leading_zeros | |
evicted = [s for s in self.data if s > self.prefix] | |
for signature in evicted: | |
del self.data[signature] | |
def to_json(self): | |
outp = {} | |
for sig, data in self.data.items(): | |
outp[base64.b64encode(sig)] = data | |
return json.dumps({ | |
'n': self.n, | |
'data': outp, | |
'z0s': self.leading_zeros, | |
'size': self.maxsize}) | |
@classmethod | |
def from_json(cls, data): | |
self = cls() | |
inp = json.loads(data) | |
self.n = inp['n'] | |
for k, v in inp['data'].items(): | |
self.data[base64.b64decode(k)] = v | |
self.leading_zeros = inp['z0s'] | |
self.maxsize = inp['size'] | |
self.prefix = _leading_zeros(self.leading_zeros) | |
return self | |
def _leading_zeros(n): | |
return b'\0' * (n / 8) + chr(0xFF >> n % 8) | |
''' | |
A note on the binary counter thing used here. | |
Every "increment", we increase the number of leading 0's in the binary | |
representation of the string. | |
A property of Python strings is that they sort first by integer value of | |
common prefix, then by length. So, we can use this for an elegant comparison | |
''' | |
def test(): | |
hs = HistrogramSample() | |
SIZE = 4096 | |
import os | |
data = [os.urandom(32) for i in range(SIZE)] | |
s = time.time() | |
for d in data: | |
hs.add(d) | |
hs.add(d) | |
hs.add(d) | |
hs.add(d) | |
f = time.time() | |
copy = HistrogramSample.from_json(hs.to_json()) | |
assert copy.__dict__ == hs.__dict__, "json round-trip failed" | |
for sample in hs.data.values(): | |
assert sample['n'] == 4 | |
hs.merge(copy) | |
data2 = [os.urandom(32) for i in range(SIZE)] | |
for d in data2: | |
copy.add(d) | |
hs.merge(copy) | |
print "time per add", (f - s) / (hs.n) * 1e6, "microseconds" | |
if __name__ == "__main__": | |
test() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment