Skip to content

Instantly share code, notes, and snippets.

@AlphaSheep
Last active September 22, 2023 13:07
Show Gist options
  • Save AlphaSheep/0e846c6b6835f8e56394c46b793043cc to your computer and use it in GitHub Desktop.
Save AlphaSheep/0e846c6b6835f8e56394c46b793043cc to your computer and use it in GitHub Desktop.
Averages with outliers
@timeit
def calc_averages_naive(times: List[float], window_size: int, cutoff_size: int):
divisor = window_size - (cutoff_size * 2)
averages = []
for i in range(len(times) - window_size + 1):
window = times[i:i+window_size]
window.sort()
averages.append(sum(window[cutoff_size: -cutoff_size]) / divisor)
return averages
averages_naive = calc_averages_naive(times, WINDOW_SIZE, CUTOFF_SIZE)
import math
SAMPLE_SIZE: Final[int] = 100000
WINDOW_SIZE: Final[int] = 1000
THRESHOLD: Final[float] = 0.05
CUTOFF_SIZE: Final[int] = math.ceil(WINDOW_SIZE * THRESHOLD)
from numpy.random import normal
def generate_sample_data() -> List[float]:
y_0 = 60
y_final = 20
base = 0.5
power = 0.00005
noise = lambda : 1 + 0.2 * normal()
trend = lambda x : y_final + (y_0-y_final) * (base ** (power * x))
return [trend(x) * noise() for x in range(SAMPLE_SIZE)]
times = generate_sample_data()
# pylint: disable=missing-docstring, wrong-import-order, wrong-import-position, unnecessary-lambda-assignment
from typing import List, Final
import pylab as pl
# ------------------------------------------------
# 0. Timing Decorator
# ------------------------------------------------
from time import perf_counter
import functools
def timeit(func):
@functools.wraps(func)
def timed(*args, **kwargs):
start = perf_counter()
result = func(*args, **kwargs)
end = perf_counter()
print(f'{func.__name__} took {end - start:.6} seconds')
return result
return timed
# ------------------------------------------------
# 1. Define some constants
# ------------------------------------------------
import math
SAMPLE_SIZE: Final[int] = 100000
WINDOW_SIZE: Final[int] = 1000
THRESHOLD: Final[float] = 0.05
CUTOFF_SIZE: Final[int] = math.ceil(WINDOW_SIZE * THRESHOLD)
# ------------------------------------------------
# 2. Generate some data with a trend
# ------------------------------------------------
from numpy.random import normal
def generate_sample_data() -> List[float]:
y_0 = 60
y_final = 20
base = 0.5
power = 0.00005
noise = lambda : 1 + 0.2 * normal()
trend = lambda x : y_final + (y_0-y_final) * (base ** (power * x))
return [trend(x) * noise() for x in range(SAMPLE_SIZE)]
times = generate_sample_data()
# ------------------------------------------------
# pl.plot(times, '.')
# pl.show()
# ------------------------------------------------
# 3. Naive Approach
# ------------------------------------------------
# Means
@timeit
def calc_means_naive(times: List[float], window_size: int) -> List[float]:
means = []
for i in range(len(times) - window_size + 1):
window = times[i:i+window_size]
means.append(sum(window) / window_size)
return means
means_naive = calc_means_naive(times, WINDOW_SIZE)
# Averages
@timeit
def calc_averages_naive(times: List[float], window_size: int, cutoff_size: int):
divisor = window_size - (cutoff_size * 2)
averages = []
for i in range(len(times) - window_size + 1):
window = times[i:i+window_size]
window.sort()
averages.append(sum(window[cutoff_size: -cutoff_size]) / divisor)
return averages
averages_naive = calc_averages_naive(times, WINDOW_SIZE, CUTOFF_SIZE)
# ------------------------------------------------
# pl.plot(times, '.', alpha=0.1)
# pl.plot(range(WINDOW_SIZE-1, len(times)), means_naive, 'g')
# pl.plot(range(WINDOW_SIZE-1, len(times)), averages_naive, 'r')
# pl.show()
# ------------------------------------------------
# 4. A Smarter Approach for the mean
# ------------------------------------------------
class MovingMeanWindow():
def __init__(self, initial_window: List[float]) -> None:
self._total: float = sum(initial_window)
self._window: List[float] = initial_window
self._size: int = len(initial_window)
def next(self, value: float) -> float:
self._window.append(value)
self._total += value - self._window.pop(0)
return self._total / self._size
@timeit
def calc_means_moving_window(times: List[float], window_size: int):
initial_window = times[:window_size]
window = MovingMeanWindow(times[:window_size])
initial_mean = sum(initial_window) / window_size
return [initial_mean] + [window.next(x) for x in times[window_size:]]
means_moving_window = calc_means_moving_window(times, WINDOW_SIZE)
# ------------------------------------------------
# pl.plot(times, '.', alpha=0.1)
# pl.plot(range(WINDOW_SIZE-1, len(times)), means_naive, 'g')
# pl.plot(range(WINDOW_SIZE-1, len(times)), averages_naive, 'r')
# pl.show()
# ------------------------------------------------
# 4. A Smarter Approach for the average
# ------------------------------------------------
from bisect import bisect_left
class MovingAverageWindow():
def __init__(self, initial_window: List[float], cutoff_size: int) -> None:
self._cutoff_size = cutoff_size
self._values: List[float] = initial_window
self._sorted_values: List[float] = sorted(initial_window)
self._total: float = sum(initial_window)
self._upper_total: float = sum(self._sorted_values[-self._cutoff_size:])
self._lower_total: float = sum(self._sorted_values[:self._cutoff_size])
self._size: int = len(initial_window)
def _insert_new_value(self, value_to_add: float) -> None:
low_to_drop = self._sorted_values[self._cutoff_size - 1]
high_to_drop = self._sorted_values[-self._cutoff_size]
self._values.append(value_to_add)
insert_pos = bisect_left(self._sorted_values, value_to_add)
self._sorted_values.insert(insert_pos, value_to_add)
if insert_pos < self._cutoff_size:
self._lower_total += value_to_add - low_to_drop
if insert_pos >= (len(self._values) - self._cutoff_size):
self._upper_total += value_to_add - high_to_drop
self._total += value_to_add
def _remove_old_value(self) -> None:
value_to_remove = self._values.pop(0)
remove_pos = bisect_left(self._sorted_values, value_to_remove)
next_lowest = self._sorted_values[self._cutoff_size]
next_highest = self._sorted_values[-self._cutoff_size - 1]
if remove_pos < self._cutoff_size:
self._lower_total += next_lowest - value_to_remove
if remove_pos >= (len(self._values) - self._cutoff_size):
self._upper_total += next_highest - value_to_remove
del self._sorted_values[remove_pos]
self._total -= value_to_remove
def next(self, value_to_add: float) -> float:
self._insert_new_value(value_to_add)
self._remove_old_value()
middle_total = self._total - self._upper_total - self._lower_total
divisor = len(self._values) - (self._cutoff_size * 2)
return middle_total / divisor
@timeit
def calc_averages_moving_window(times: List[float], window_size: int, cutoff_size: int):
divisor = window_size - (cutoff_size * 2)
initial_window = times[:window_size]
window = MovingAverageWindow(times[:window_size], cutoff_size)
initial_average = sum(sorted(initial_window)[cutoff_size: -cutoff_size]) / divisor
return [initial_average] + [window.next(x) for x in times[window_size:]]
averages_moving_window = calc_averages_moving_window(times, WINDOW_SIZE, CUTOFF_SIZE)
# ------------------------------------------------
pl.plot(times, '.', alpha=0.25)
pl.plot(range(WINDOW_SIZE-1, len(times)), means_naive, 'g')
pl.plot(range(WINDOW_SIZE-1, len(times)), averages_naive, 'r')
pl.plot(range(WINDOW_SIZE-1, len(times)), means_moving_window, 'g--')
pl.plot(range(WINDOW_SIZE-1, len(times)), averages_moving_window, 'r--')
pl.show()
# ------------------------------------------------
for naive, better in zip(means_naive, means_moving_window):
assert abs(naive - better) < 1e-8
for naive, better in zip(averages_naive, averages_moving_window):
assert abs(naive - better) < 1e-8
@timeit
def calc_means_naive(times: List[float], window_size: int) -> List[float]:
means = []
for i in range(len(times) - window_size + 1):
window = times[i:i+window_size]
means.append(sum(window) / window_size)
return means
means_naive = calc_means_naive(times, WINDOW_SIZE)
from bisect import bisect_left
class MovingAverageWindow():
def __init__(self, initial_window: List[float], cutoff_size: int) -> None:
self._cutoff_size = cutoff_size
self._values: List[float] = initial_window
self._sorted_values: List[float] = sorted(initial_window)
self._total: float = sum(initial_window)
self._upper_total: float = sum(self._sorted_values[-self._cutoff_size:])
self._lower_total: float = sum(self._sorted_values[:self._cutoff_size])
self._size: int = len(initial_window)
def _insert_new_value(self, value_to_add: float) -> None:
low_to_drop = self._sorted_values[self._cutoff_size - 1]
high_to_drop = self._sorted_values[-self._cutoff_size]
self._values.append(value_to_add)
insert_pos = bisect_left(self._sorted_values, value_to_add)
self._sorted_values.insert(insert_pos, value_to_add)
if insert_pos < self._cutoff_size:
self._lower_total += value_to_add - low_to_drop
if insert_pos >= (len(self._values) - self._cutoff_size):
self._upper_total += value_to_add - high_to_drop
self._total += value_to_add
def _remove_old_value(self) -> None:
value_to_remove = self._values.pop(0)
remove_pos = bisect_left(self._sorted_values, value_to_remove)
next_lowest = self._sorted_values[self._cutoff_size]
next_highest = self._sorted_values[-self._cutoff_size - 1]
if remove_pos < self._cutoff_size:
self._lower_total += next_lowest - value_to_remove
if remove_pos >= (len(self._values) - self._cutoff_size):
self._upper_total += next_highest - value_to_remove
del self._sorted_values[remove_pos]
self._total -= value_to_remove
def next(self, value_to_add: float) -> float:
self._insert_new_value(value_to_add)
self._remove_old_value()
middle_total = self._total - self._upper_total - self._lower_total
divisor = len(self._values) - (self._cutoff_size * 2)
return middle_total / divisor
@timeit
def calc_averages_moving_window(times: List[float], window_size: int, cutoff_size: int):
divisor = window_size - (cutoff_size * 2)
initial_window = times[:window_size]
window = MovingAverageWindow(times[:window_size], cutoff_size)
initial_average = sum(sorted(initial_window)[cutoff_size: -cutoff_size]) / divisor
return [initial_average] + [window.next(x) for x in times[window_size:]]
averages_moving_window = calc_averages_moving_window(times, WINDOW_SIZE, CUTOFF_SIZE)
@timeit
def calc_means_moving_window(times: List[float], window_size: int):
initial_window = times[:window_size]
window = MovingMeanWindow(times[:window_size])
initial_mean = sum(initial_window) / window_size
return [initial_mean] + [window.next(x) for x in times[window_size:]]
means_moving_window = calc_means_moving_window(times, WINDOW_SIZE)
class MovingMeanWindow():
def __init__(self, initial_window: List[float]) -> None:
self._total: float = sum(initial_window)
self._window: List[float] = initial_window
self._size: int = len(initial_window)
def next(self, value: float) -> float:
self._window.append(value)
self._total += value - self._window.pop(0)
return self._total / self._size
from time import perf_counter
import functools
def timeit(func):
@functools.wraps(func)
def timed(*args, **kwargs):
start = perf_counter()
result = func(*args, **kwargs)
end = perf_counter()
print(f'{func.__name__} took {end - start:.6} seconds')
return result
return timed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment