Created
August 19, 2024 10:39
-
-
Save ggalmazor/758bba14035f323267d0c3b6970ff552 to your computer and use it in GitHub Desktop.
Ruby script that compares downsampling algorithms that keep local extrema
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
require 'gnuplot' | |
require 'narray' | |
# Function to calculate the perpendicular distance from a point to a line | |
def perpendicular_distance_pip(x0, y0, x1, y1, xp, yp) | |
num = ((y1 - y0) * xp - (x1 - x0) * yp + x1 * y0 - y1 * x0).abs | |
denom = Math.sqrt((y1 - y0) ** 2 + (x1 - x0) ** 2) | |
num / denom | |
end | |
# PIP downsampling function | |
def pip_downsample(time, series, n_out) | |
return [time, series] if n_out >= time.size | |
pip_time = [time[0], time[-1]] | |
pip_series = [series[0], series[-1]] | |
while pip_time.size < n_out | |
max_dist = 0 | |
index_to_add = nil | |
(0...time.size).each do |i| | |
next if pip_time.include?(time[i]) | |
prev_index = pip_time.rindex { |t| t < time[i] } | |
next_index = pip_time.index { |t| t > time[i] } | |
x0 = pip_time[prev_index] | |
y0 = pip_series[prev_index] | |
x1 = pip_time[next_index] | |
y1 = pip_series[next_index] | |
x = time[i] | |
y = series[i] | |
dist = perpendicular_distance_pip(x0, y0, x1, y1, x, y) | |
if dist > max_dist | |
max_dist = dist | |
index_to_add = i | |
end | |
end | |
pip_time.insert(pip_time.index { |t| t > time[index_to_add] }, time[index_to_add]) | |
pip_series.insert(pip_time.index(time[index_to_add]), series[index_to_add]) | |
end | |
[pip_time, pip_series] | |
end | |
# LTTB downsampling function | |
def lttb_downsample(time, series, n_out) | |
return [time, series] if n_out >= time.size | |
bucket_size = (time.size.to_f / (n_out - 2)).ceil | |
downsampled_time = [time[0]] | |
downsampled_series = [series[0]] | |
(1...(n_out - 1)).each do |i| | |
bucket_start = (i - 1) * bucket_size | |
bucket_end = [i * bucket_size, time.size].min | |
next if bucket_end <= bucket_start # Skip if the bucket is invalid or empty | |
avg_x = time[bucket_start...bucket_end].sum / (bucket_end - bucket_start) | |
avg_y = series[bucket_start...bucket_end].sum / (bucket_end - bucket_start) | |
max_area = -1 | |
selected_index = nil | |
(bucket_start...bucket_end).each do |j| | |
area = ((time[j] - downsampled_time[-1]) * (avg_y - downsampled_series[-1]) - | |
(avg_x - downsampled_time[-1]) * (series[j] - downsampled_series[-1])).abs | |
if area > max_area | |
max_area = area | |
selected_index = j | |
end | |
end | |
if selected_index | |
downsampled_time << time[selected_index] | |
downsampled_series << series[selected_index] | |
end | |
end | |
downsampled_time << time[-1] | |
downsampled_series << series[-1] | |
[downsampled_time, downsampled_series] | |
end | |
# Function to find local maxima and minima | |
def find_extremes_top_k(time, series) | |
peaks = [] | |
troughs = [] | |
(1...(series.size - 1)).each do |i| | |
if series[i] > series[i - 1] && series[i] > series[i + 1] | |
peaks << i | |
elsif series[i] < series[i - 1] && series[i] < series[i + 1] | |
troughs << i | |
end | |
end | |
[peaks, troughs] | |
end | |
# Top-K Significant Extremes Downsampling function | |
def top_k_extremes_downsample(time, series, n_out) | |
peaks, troughs = find_extremes_top_k(time, series) | |
extremes = peaks + troughs | |
if extremes.size + 2 <= n_out | |
selected_indices = extremes | |
else | |
prominences = extremes.map do |index| | |
left = index - 1 | |
right = index + 1 | |
[(series[index] - series[left]).abs, (series[index] - series[right]).abs].min | |
end | |
selected_indices = extremes.zip(prominences) | |
.sort_by { |_, prominence| -prominence } | |
.take(n_out - 2) | |
.map(&:first) | |
end | |
selected_indices = [0] + selected_indices.sort + [series.size - 1] | |
[selected_indices.map { |i| time[i] }, selected_indices.map { |i| series[i] }] | |
end | |
# Function to find local maxima and minima | |
def find_extremes_eps(time, series) | |
peaks = [] | |
troughs = [] | |
(1...(series.size - 1)).each do |i| | |
if series[i] > series[i - 1] && series[i] > series[i + 1] | |
peaks << i | |
elsif series[i] < series[i - 1] && series[i] < series[i + 1] | |
troughs << i | |
end | |
end | |
[peaks, troughs] | |
end | |
# EPS (Extreme Point Sampling) Downsampling function | |
def eps_downsample(time, series, n_out) | |
peaks, troughs = find_extremes_eps(time, series) | |
extremes = peaks + troughs | |
if extremes.size + 2 <= n_out | |
selected_indices = extremes | |
else | |
selected_indices = extremes.sample(n_out - 2) | |
end | |
selected_indices = [0] + selected_indices.sort + [series.size - 1] | |
[selected_indices.map { |i| time[i] }, selected_indices.map { |i| series[i] }] | |
end | |
# Function to find local maxima and minima | |
def find_extremes_ler(time, series) | |
peaks = [] | |
troughs = [] | |
(1...(series.size - 1)).each do |i| | |
if series[i] > series[i - 1] && series[i] > series[i + 1] | |
peaks << i | |
elsif series[i] < series[i - 1] && series[i] < series[i + 1] | |
troughs << i | |
end | |
end | |
[peaks, troughs] | |
end | |
# LER (Local Extrema Retention) Downsampling function | |
def ler_downsample(time, series, n_out, threshold = 0.05) | |
peaks, troughs = find_extremes_ler(time, series) | |
extremes = peaks + troughs | |
prominences = extremes.map do |index| | |
left = index - 1 | |
right = index + 1 | |
[(series[index] - series[left]).abs, (series[index] - series[right]).abs].min | |
end | |
significant_extremes = extremes.zip(prominences) | |
.select { |_, prominence| prominence >= threshold } | |
.map(&:first) | |
if significant_extremes.size + 2 <= n_out | |
selected_indices = significant_extremes | |
else | |
selected_indices = significant_extremes.sample(n_out - 2) | |
end | |
selected_indices = [0] + selected_indices.sort + [series.size - 1] | |
[selected_indices.map { |i| time[i] }, selected_indices.map { |i| series[i] }] | |
end | |
# Plotting the original and downsampled series | |
def plot(time, series, time_downsampled, series_downsampled, title, output_file) | |
Gnuplot.open do |gp| | |
Gnuplot::Plot.new(gp) do |plot| | |
plot.terminal "pngcairo size 800,600" | |
plot.output output_file | |
plot.title title | |
plot.xlabel "Time" | |
plot.ylabel "Value" | |
plot.grid | |
plot.data << Gnuplot::DataSet.new([time.to_a, series.to_a]) do |ds| | |
ds.with = "lines" | |
ds.title = "Original Series" | |
ds.linewidth = 1 | |
ds.linecolor = "rgb '#FFB6C1'" | |
end | |
plot.data << Gnuplot::DataSet.new([time_downsampled, series_downsampled]) do |ds| | |
ds.with = "lines" | |
ds.title = "Downsampled Series" | |
ds.linewidth = 2 | |
ds.linecolor = "rgb 'red'" | |
end | |
end | |
end | |
end | |
def perpendicular_distance_rdp(point, line_start, line_end) | |
x0, y0 = point | |
x1, y1 = line_start | |
x2, y2 = line_end | |
num = ((y2 - y1) * x0 - (x2 - x1) * y0 + x2 * y1 - y2 * x1).abs | |
denom = Math.sqrt((y2 - y1) ** 2 + (x2 - x1) ** 2) | |
num / denom | |
end | |
# RDP (Ramer-Douglas-Peucker) Downsampling function | |
def rdp(time, series, epsilon) | |
return [[time[0], series[0]], [time[-1], series[-1]]] if time.size < 3 | |
line_start = [time[0], series[0]] | |
line_end = [time[-1], series[-1]] | |
max_distance = 0 | |
index_of_max = 0 | |
(1...time.size - 1).each do |i| | |
point = [time[i], series[i]] | |
distance = perpendicular_distance_rdp(point, line_start, line_end) | |
if distance > max_distance | |
max_distance = distance | |
index_of_max = i | |
end | |
end | |
if max_distance > epsilon | |
left_results = rdp(time[0..index_of_max], series[0..index_of_max], epsilon) | |
right_results = rdp(time[index_of_max..-1], series[index_of_max..-1], epsilon) | |
left_results[0...-1] + right_results | |
else | |
[[time[0], series[0]], [time[-1], series[-1]]] | |
end | |
end | |
# Function to generate a synthetic time series resembling stock prices | |
def generate_synthetic_series(length: 100, start_price: 100.0, volatility: 0.02, trend: 0.001) | |
series = NArray.float(length) | |
series[0] = start_price | |
(1...length).each do |i| | |
random_walk = (rand - 0.5) * 2 * volatility | |
series[i] = series[i - 1] * (1 + trend + random_walk) | |
end | |
series | |
end | |
# Parameters for the synthetic series | |
length = 500 | |
start_price = 100.0 | |
volatility = 0.02 # Volatility factor, higher values lead to more fluctuations | |
trend = 0.001 # Upward trend, positive values create an upward trend, negative for downward | |
# Generate the synthetic time series | |
time = NArray.sfloat(length).indgen! | |
series = generate_synthetic_series(length: length, start_price: start_price, volatility: volatility, trend: trend) | |
n_out = 50 | |
# Downsample using PIP algorithm | |
pip_time, pip_series = pip_downsample(time.to_a, series.to_a, n_out) | |
plot(time, series, pip_time, pip_series, 'PIP (Perceptually Important Points) Downsampling', "pip.png") | |
# Downsample using LTTB algorithm | |
lttb_time, lttb_series = lttb_downsample(time.to_a, series.to_a, n_out) | |
plot(time, series, lttb_time, lttb_series, "LTTB (Largest-Triangle, Three Buckets) Downsampling", "lttb.png") | |
# Downsample using Top-K Significant Extremes algorithm | |
topk_time, topk_series = top_k_extremes_downsample(time.to_a, series.to_a, n_out) | |
plot(time, series, topk_time, topk_series, "Top-K Significant Extremes Downsampling", "topk.png") | |
# Downsample using EPS algorithm | |
eps_time, eps_series = eps_downsample(time.to_a, series.to_a, n_out) | |
plot(time, series, eps_time, eps_series, "EPS (Extreme Point Sampling) Downsampling", "eps.png") | |
# Downsample using LER algorithm | |
ler_time, ler_series = ler_downsample(time.to_a, series.to_a, n_out) | |
plot(time, series, ler_time, ler_series, "LER (Local Extrema Retention) Downsampling", "ler.png") | |
# Downsample using RDP algorithm with a chosen epsilon value | |
epsilon = 3 | |
rdp_result = rdp(time.to_a, series.to_a, epsilon) | |
rdp_time, rdp_series = rdp_result.transpose | |
plot(time, series, rdp_time, rdp_series, "RDP (Ramer-Douglas-Peucker) Downsampling", "rdp_epsilon_#{epsilon}.png") |
This gist is linked at a blog post in my personal webpage 👉 https://ggalmazor.com/blog/evaluating_downsampling_algorithms.html
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This code was generated by ChatGPT, with a few minor tweaks from me