Skip to content

Instantly share code, notes, and snippets.

@rileypeterson
Last active January 31, 2020 07:01
Show Gist options
  • Save rileypeterson/5b7b78bbe4180da635dc1f85134ca8be to your computer and use it in GitHub Desktop.
Save rileypeterson/5b7b78bbe4180da635dc1f85134ca8be to your computer and use it in GitHub Desktop.
Bottom Up Algorithm Demo
import random
import tempfile
import os
import numpy as np
import matplotlib.pyplot as plt
def r_sqr(yi, fi):
ss_res = np.sum((yi - fi) ** 2)
ss_tot = np.sum((yi - np.mean(yi)) ** 2)
if ss_tot == 0:
return 1
r_squared = 1 - (ss_res / ss_tot)
return r_squared
class Segment(object):
""" A line segment """
DEFAULT_ERROR = np.nan
def __init__(self, x1, y1, x2, y2, error=DEFAULT_ERROR):
self._ind = 0
self.x1 = x1
self.y1 = y1
self.x2 = x2
self.y2 = y2
self.error = error
assert self.x1 != self.x2, "The start and end x coordinates of segment cannot be equivalent"
self.m = (self.y2 - self.y1) / (self.x2 - self.x1)
self.b = self.y1 - self.m*self.x1
def interpolate(self, x):
""" The y value for an arbitrary point in the segment """
x = x[(self.x1 <= x) & (x <= self.x2)]
return self.m*x + self.b
def merge(self, s, x, y):
y = y[(self.x1 <= x) & (x <= s.x2)]
x = x[(self.x1 <= x) & (x <= s.x2)]
x_bar = np.mean(x)
y_bar = np.mean(y)
m = np.sum((x - x_bar)*(y - y_bar)) / np.sum((x - x_bar)**2)
b = y_bar - m*x_bar
f = lambda x_: m*x_ + b
return Segment(self.x1, f(self.x1), s.x2, f(s.x2), self.DEFAULT_ERROR)
# def merge(self, s):
# """ Merge two segments and return a new one. """
# return Segment(self.x1, self.y1, s.x2, s.y2, self.DEFAULT_ERROR)
def __str__(self):
""" Return a readable representation of the segment """
return """<{} x1:{:.2f}, y1:{:.2f}, x2:{:.2f}, y2:{:.2f}, err:{:.2f}>"""\
.format(self._ind, self.x1, self.y1,
self.x2, self.y2,
self.error)
def __repr__(self):
return self.__str__()
class BottomUpJoiner(object):
def __init__(self, x, y):
self.x = x
self.y = y
self.total_err = 1
self.merged_ind = None
# Initialize all segments
self.segments = []
for i in range(len(self.x)-1):
s = Segment(self.x[i], self.y[i], self.x[i+1], self.y[i+1])
s._ind = i
self.segments.append(s)
def y_in_segment(self, s):
return self.y[(s.x1 <= self.x) & (self.x <= s.x2)]
def update_merge_costs(self):
# Calculate the merge cost for each segment
# Error for merging segments 1 and 2 lies in segment 1 (Left bias)
for i in range(len(self.segments) - 1):
s1, s2 = self.segments[i], self.segments[i+1]
merged_segment = s1.merge(s2, self.x, self.y)
yi = self.y_in_segment(merged_segment)
fi = merged_segment.interpolate(self.x)
self.segments[i].error = np.sum((yi - fi)**2)
def merge_least_costly_segment(self):
ind = np.nanargmin([s.error for s in self.segments])
left_ind = ind - 1
if left_ind < 0:
left_ind = None
right_ind = min(ind + 1, len(self.segments) - 1)
if right_ind == len(self.segments) - 1:
right_ind = None
self.merged_ind = ind
s1, s2 = self.segments[ind], self.segments[ind + 1]
self.segments[ind] = s1.merge(s2, self.x, self.y)
self.segments.pop(ind + 1)
for i in range(ind, len(self.segments)):
self.segments[i]._ind -= 1
self.segments[ind]._ind = ind
if left_ind is not None:
self.segments[left_ind].y2 = self.segments[left_ind + 1].y1
if right_ind is not None:
self.segments[right_ind].y1 = self.segments[right_ind - 1].y2
def compute_total_error(self):
fi = []
for i, s in enumerate(self.segments):
vals = list(s.interpolate(self.x))
if i != 0:
vals = vals[1:]
fi = fi + vals
assert len(fi) == len(self.y), "Lengths must be equal"
self.total_err = r_sqr(self.y, np.array(fi))
print("{:.4f}".format(self.total_err))
if __name__ == '__main__':
x = np.array(range(100))
np.random.seed(2)
f = lambda x: np.round(-0.02*x**2+0.02*x+200+20*(np.random.rand(100,) - 0.5))
y = f(x)
b = BottomUpJoiner(x, y)
print(b.segments)
it = 0
with tempfile.TemporaryDirectory() as tmpdirname:
while len(b.segments) > 3:
b.update_merge_costs()
b.merge_least_costly_segment()
b.compute_total_error()
# if it % 5 == 0:
plt.close('all')
fig = plt.figure(figsize=(10, 10))
ax = plt.gca()
plt.plot(b.x, b.y)
for s in b.segments:
plt.plot([s.x1, s.x2], [s.y1, s.y2])
plt.ylim(bottom=-5, top=225)
plt.tight_layout()
plt.savefig('{}/{:0>3}.png'.format(tmpdirname, it))
it += 1
os.system('convert -delay 25 -loop 0 {}/*.png out.gif'.format(tmpdirname))
print(b.segments)
plt.show()
@rileypeterson
Copy link
Author

Epic bottom up smoothing with smart interpolation!

out

@rileypeterson
Copy link
Author

Inspired by: https://www.ics.uci.edu/~pazzani/Publications/survey.pdf
And for when link inevitably breaks:
Segmenting Time Series: A Survey and Novel Approach
Eamonn Keogh Selina Chu David Hart Michael Pazzani
Department of Information and Computer Science
University of California, Irvine, California 92697 USA
{eamonn, selina, dhart, pazzani}@ics.uci.edu

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment