Skip to content

Instantly share code, notes, and snippets.

@westurner
Last active June 21, 2022 01:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save westurner/98453d572284ef7fa1ff to your computer and use it in GitHub Desktop.
Save westurner/98453d572284ef7fa1ff to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
from __future__ import print_function, division
"""
avgs.py
various implementations of 'streaming' simple "cumulative" / "momentary" mean
"""
import sys
try:
from itertools import izip, count
except ImportError:
from itertools import count
izip = zip
xrange = range
from functools import reduce
def simpleavg(itr):
"""
calculate simple mean
Args:
itr (iterable): iterable of numbers
Returns:
float: simple mean
"""
return sum(itr) / len(itr)
def simpleavg_iter(itr):
"""
calculate momentary mean (cummean) from an iterable
Args:
itr (terable): iterable of numbers
Yields:
tuple: (i, value, momentary mean)
"""
nasty = []
for i, val in enumerate(itr):
nasty.append(val)
yield i, val, simpleavg(nasty)
def streamavg_running(itr):
"""
calculate continuous momentary mean (cummean) from a (streaming) iterable
Args:
itr (iterable): iterable of numbers
Yields:
tuple: (i, value, momentary mean)
"""
if not hasattr(itr, 'next'):
itr = iter(itr)
avg = next(itr)
yield 0, avg, avg
for i, val in izip(count(1), itr):
weight = (1.0 / (i+1))
newavg = avg - (avg*weight) + (weight*val)
# ~= newavg=(avg*i + val)/(i + 1)
yield i, val, newavg
avg = newavg
def streamavg_running_segment(itr, avg=None, seqpos=0):
"""
calculate continuous momentary mean (cummean) from an iterable;
optionally from ``avg``, starting at ``seqpos``
Args:
itr (iterable): iterable of numbers
avg (number): (optional) current average
seqpos (int): (optional) zero-based sequence position
example: [x, y, z] -> (0, 1, 2)
Yields:
float: cumulative (momentary) mean
"""
if not hasattr(itr, 'next'):
itr = iter(itr)
avg = avg or next(itr)
yield seqpos, avg, avg
for i, val in izip(count(seqpos+1), itr):
weight = (1.0 / (i+1))
newavg = avg - (avg*weight) + (weight*val)
yield i, val, newavg
avg = newavg
TESTDATA = range(0, 1000, 10)
TESTDATA_SEGS = (
range(0, 500, 10),
range(500, 600, 10),
range(600, 1000, 10),
)
import unittest
import operator
class TestStreamAvgs(unittest.TestCase):
def test_00_testdata(self):
self.assertEqual(
reduce(operator.add, TESTDATA_SEGS),
TESTDATA)
def test_10_simpleavg_iter(self):
for x in simpleavg_iter(TESTDATA):
print(x)
def test_20_streamavg_runing(self):
for x in streamavg_running(TESTDATA):
print(x)
def test_30_streamavg_runing_segment(self):
for x in streamavg_running_segment(TESTDATA):
print(x)
def test_50_sanity(self):
one = list(simpleavg_iter(TESTDATA))
two = list(streamavg_running(TESTDATA))
self.assertEqual(one, two)
def test_50_sanity2(self):
"""
test that each method produces equivalent output
"""
for input_ in TESTDATA_SEGS:
one = list(simpleavg_iter(input_))
two = list(streamavg_running(input_))
thr = list(streamavg_running_segment(input_))
self.assertEqual(one, two)
self.assertEqual(two, thr)
def test_71_streamavg_running_segment(self):
"""
test that we can start in the middle of an iterator
"""
# previous = [0, 100, 200] # 300 / 3 = 100
current_average = 100
seqpos = 3 - 1 # zero-based
itr = [200, 100, 0] # 300 / 3 = 100
expected_averages = [300 / 3,
500 / 4,
600 / 5,
600 / 6,
]
expected_final_average = 100
output = list(
streamavg_running_segment(itr, avg=current_average, seqpos=seqpos))
print(output)
self.assertEqual([x[-1] for x in output], expected_averages)
self.assertEqual(output[-1][-1], expected_final_average)
def tst_61_pandas_numpy(self):
import pandas as pd
import numpy as np
itr = [0, 100, 200, 200, 100, 0]
df = pd.DataFrame({'seq': itr})
df['n'] = df.index + 1
df['cumsum'] = np.cumsum(df['seq'])
df['cummean'] = df['cumsum'] / (df.index + 1)
print(df)
assert df['seq'].mean() == 100
def main():
"""
run tests if -t is in sys.argv,
print cummean from streamavg_running for each line in stdin
"""
if '-t' in sys.argv:
sys.argv.remove('-t')
import unittest
return unittest.main()
itr = (float(l.strip()) for l in sys.stdin)
for output in streamavg_running(itr):
print(output[-1])
return 0
if __name__ == "__main__":
sys.exit(main())
@westurner
Copy link
Author

westurner commented Jun 21, 2022

avg = avg or next(itr)

Can avg be zero? Should be

next(itr) if avg is None else avg
# or
avg if avg is not None else next(itr)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment