Last active
June 21, 2022 01:31
-
-
Save westurner/98453d572284ef7fa1ff to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import print_function, division | |
""" | |
avgs.py | |
various implementations of 'streaming' simple "cumulative" / "momentary" mean | |
""" | |
import sys | |
try: | |
from itertools import izip, count | |
except ImportError: | |
from itertools import count | |
izip = zip | |
xrange = range | |
from functools import reduce | |
def simpleavg(itr): | |
""" | |
calculate simple mean | |
Args: | |
itr (iterable): iterable of numbers | |
Returns: | |
float: simple mean | |
""" | |
return sum(itr) / len(itr) | |
def simpleavg_iter(itr): | |
""" | |
calculate momentary mean (cummean) from an iterable | |
Args: | |
itr (terable): iterable of numbers | |
Yields: | |
tuple: (i, value, momentary mean) | |
""" | |
nasty = [] | |
for i, val in enumerate(itr): | |
nasty.append(val) | |
yield i, val, simpleavg(nasty) | |
def streamavg_running(itr): | |
""" | |
calculate continuous momentary mean (cummean) from a (streaming) iterable | |
Args: | |
itr (iterable): iterable of numbers | |
Yields: | |
tuple: (i, value, momentary mean) | |
""" | |
if not hasattr(itr, 'next'): | |
itr = iter(itr) | |
avg = next(itr) | |
yield 0, avg, avg | |
for i, val in izip(count(1), itr): | |
weight = (1.0 / (i+1)) | |
newavg = avg - (avg*weight) + (weight*val) | |
# ~= newavg=(avg*i + val)/(i + 1) | |
yield i, val, newavg | |
avg = newavg | |
def streamavg_running_segment(itr, avg=None, seqpos=0): | |
""" | |
calculate continuous momentary mean (cummean) from an iterable; | |
optionally from ``avg``, starting at ``seqpos`` | |
Args: | |
itr (iterable): iterable of numbers | |
avg (number): (optional) current average | |
seqpos (int): (optional) zero-based sequence position | |
example: [x, y, z] -> (0, 1, 2) | |
Yields: | |
float: cumulative (momentary) mean | |
""" | |
if not hasattr(itr, 'next'): | |
itr = iter(itr) | |
avg = avg or next(itr) | |
yield seqpos, avg, avg | |
for i, val in izip(count(seqpos+1), itr): | |
weight = (1.0 / (i+1)) | |
newavg = avg - (avg*weight) + (weight*val) | |
yield i, val, newavg | |
avg = newavg | |
TESTDATA = range(0, 1000, 10) | |
TESTDATA_SEGS = ( | |
range(0, 500, 10), | |
range(500, 600, 10), | |
range(600, 1000, 10), | |
) | |
import unittest | |
import operator | |
class TestStreamAvgs(unittest.TestCase): | |
def test_00_testdata(self): | |
self.assertEqual( | |
reduce(operator.add, TESTDATA_SEGS), | |
TESTDATA) | |
def test_10_simpleavg_iter(self): | |
for x in simpleavg_iter(TESTDATA): | |
print(x) | |
def test_20_streamavg_runing(self): | |
for x in streamavg_running(TESTDATA): | |
print(x) | |
def test_30_streamavg_runing_segment(self): | |
for x in streamavg_running_segment(TESTDATA): | |
print(x) | |
def test_50_sanity(self): | |
one = list(simpleavg_iter(TESTDATA)) | |
two = list(streamavg_running(TESTDATA)) | |
self.assertEqual(one, two) | |
def test_50_sanity2(self): | |
""" | |
test that each method produces equivalent output | |
""" | |
for input_ in TESTDATA_SEGS: | |
one = list(simpleavg_iter(input_)) | |
two = list(streamavg_running(input_)) | |
thr = list(streamavg_running_segment(input_)) | |
self.assertEqual(one, two) | |
self.assertEqual(two, thr) | |
def test_71_streamavg_running_segment(self): | |
""" | |
test that we can start in the middle of an iterator | |
""" | |
# previous = [0, 100, 200] # 300 / 3 = 100 | |
current_average = 100 | |
seqpos = 3 - 1 # zero-based | |
itr = [200, 100, 0] # 300 / 3 = 100 | |
expected_averages = [300 / 3, | |
500 / 4, | |
600 / 5, | |
600 / 6, | |
] | |
expected_final_average = 100 | |
output = list( | |
streamavg_running_segment(itr, avg=current_average, seqpos=seqpos)) | |
print(output) | |
self.assertEqual([x[-1] for x in output], expected_averages) | |
self.assertEqual(output[-1][-1], expected_final_average) | |
def tst_61_pandas_numpy(self): | |
import pandas as pd | |
import numpy as np | |
itr = [0, 100, 200, 200, 100, 0] | |
df = pd.DataFrame({'seq': itr}) | |
df['n'] = df.index + 1 | |
df['cumsum'] = np.cumsum(df['seq']) | |
df['cummean'] = df['cumsum'] / (df.index + 1) | |
print(df) | |
assert df['seq'].mean() == 100 | |
def main(): | |
""" | |
run tests if -t is in sys.argv, | |
print cummean from streamavg_running for each line in stdin | |
""" | |
if '-t' in sys.argv: | |
sys.argv.remove('-t') | |
import unittest | |
return unittest.main() | |
itr = (float(l.strip()) for l in sys.stdin) | |
for output in streamavg_running(itr): | |
print(output[-1]) | |
return 0 | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Can
avg
be zero? Should be