Skip to content

Instantly share code, notes, and snippets.

@blazs
Last active June 29, 2022 08:07
Show Gist options
  • Save blazs/4fc78807a96976cc455f49fc0fb28738 to your computer and use it in GitHub Desktop.
Save blazs/4fc78807a96976cc455f49fc0fb28738 to your computer and use it in GitHub Desktop.
A proof-of-concept implementation of algorithms and formulas for maintaining entropy of elements over a data stream.
"""
A proof-of-concept implementation of algorithms and formulas described in [1].
[1] https://arxiv.org/abs/1403.6348
Blaz Sovdat (blaz.sovdat@gmail.com)
"""
import collections
import math
import random
def log2(p):
return math.log(p, 2) if p > 0 else 0
CountChange = collections.namedtuple('CountChange', ('label', 'change'))
class EntropyHolder:
def __init__(self, labels=[]):
self.counts_ = collections.defaultdict(int)
self.entropy_ = 0
self.sum_ = 0
def __len__(self):
return len(self.counts_)
def update(self, count_changes):
r = sum([change for _, change in count_changes])
residual = self._compute_residual(count_changes)
self.entropy_ = self.sum_ * (self.entropy_ - log2(self.sum_ / (self.sum_ + r))) / (self.sum_ + r) - residual
self._update_counts(count_changes)
return self.entropy_
def _compute_residual(self, count_changes):
r = sum([change for _, change in count_changes])
residual = 0
for label, change in count_changes:
p_new = (self.counts_[label] + change) / (self.sum_ + r)
p_old = self.counts_[label] / (self.sum_ + r)
residual += p_new * log2(p_new) - p_old * log2(p_old)
return residual
def _update_counts(self, count_changes):
for label, change in count_changes:
self.sum_ += change
self.counts_[label] += change
def entropy(self):
return self.entropy_
def count(self, label):
return self.counts_[self.label2index[label]]
def total_counts(self):
return self.sum_
def naive_entropy(counts):
s = sum(counts)
return sum([-(r/s) * log2(r/s) for r in counts])
if __name__ == '__main__':
print(naive_entropy([1, 1]))
print(naive_entropy([1, 1, 1, 1]))
entropy = EntropyHolder()
freq = collections.defaultdict(int)
for _ in range(100):
index = random.randint(0, 5)
entropy.update([CountChange(index, 1)])
freq[index] += 1
print(naive_entropy(freq.values()))
print(entropy.entropy())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment