Skip to content

Instantly share code, notes, and snippets.

@solidiquis
Last active May 9, 2024 19:06
Show Gist options
  • Save solidiquis/81aa5ec1d9784f281ead86c56c9e4b33 to your computer and use it in GitHub Desktop.
Save solidiquis/81aa5ec1d9784f281ead86c56c9e4b33 to your computer and use it in GitHub Desktop.
type TimeSeries = list[tuple[float, float]]
def normalize_time_series(time_series: TimeSeries, T_min: float, T_max: float) -> TimeSeries:
min_time = time_series[0][0]
max_time = time_series[-1][0]
return [
(normalize_time(t, min_time, max_time, T_min, T_max), v) for (t, v) in time_series
]
type AsOfJoinResult = list[tuple[float, float, float]]
def outer_as_of_join(time_series_a: TimeSeries, time_series_b: TimeSeries) -> AsOfJoinResult:
merged_time_series = []
i = 0
j = 0
while i < len(time_series_a) or j < len(time_series_b):
if i < len(time_series_a) and (j == len(time_series_b) or time_series_a[i][0] < time_series_b[j][0]):
timestamp_a, value_a = time_series_a[i]
closest_value_b = time_series_b[j - 1][1] if j > 0 else 0.0
merged_time_series.append((timestamp_a, value_a, closest_value_b))
i += 1
elif j < len(time_series_b) and (i == len(time_series_a) or time_series_b[j][0] <= time_series_a[i][0]):
timestamp_b, value_b = time_series_b[j]
closest_value_a = time_series_a[i - 1][1] if i > 0 else 0.0
merged_time_series.append((timestamp_b, closest_value_a, value_b))
j += 1
return merged_time_series
def normalize_time(
t: float,
t_min: float,
t_max: float,
T_min: float,
T_max: float
) -> float:
return (t - t_min) / (t_max - t_min) * (T_max - T_min)
if __name__ == "__main__":
channel_a_time_series = [
(0.0, 2.0),
(1.0, 6.0),
(4.0, 8.0),
(7.0, 10.0),
]
channel_b_time_series = [
(10.0, 1.0),
(12.0, 2.0),
(14.0, 3.0),
(20.0, 0.0),
(22.0, 0.0),
(24.0, 1.0),
]
print("----Channel A-----")
for val in channel_a_time_series:
print(val)
print()
print("----Channel B-----")
for val in channel_b_time_series:
print(val)
print()
common_time_range_min = 0.0
common_time_range_max = 100.0
# Normalize to common time scale for computation
channel_a_intermediate = normalize_time_series(channel_a_time_series, common_time_range_min, common_time_range_max)
channel_b_intermediate = normalize_time_series(channel_b_time_series, common_time_range_min, common_time_range_max)
intermediate = outer_as_of_join(channel_b_intermediate, channel_a_intermediate)
print("----JOINED AND NORMALIZED-----")
for v in intermediate:
print(v)
print()
transformed_time_series: TimeSeries = []
for (t, a, b) in intermediate:
transformed_time_series.append((t, a + b))
# Output time range... the "root" time range"
output_time_range_min = channel_a_time_series[0][0]
output_time_range_max = channel_a_time_series[-1][0]
# Normalize to output time scale
out = normalize_time_series(transformed_time_series, output_time_range_min, output_time_range_max)
print("----OUT-----")
for v in out:
print(v)
# OUTPUT
# ----Channel A-----
# (0.0, 2.0)
# (1.0, 6.0)
# (4.0, 8.0)
# (7.0, 10.0)
#
# ----Channel B-----
# (10.0, 1.0)
# (12.0, 2.0)
# (14.0, 3.0)
# (20.0, 0.0)
# (22.0, 0.0)
# (24.0, 1.0)
#
# ----JOINED AND NORMALIZED-----
# (0.0, 0.0, 2.0)
# (0.0, 1.0, 2.0)
# (14.285714285714285, 1.0, 6.0)
# (14.285714285714285, 2.0, 6.0)
# (28.57142857142857, 3.0, 6.0)
# (57.14285714285714, 3.0, 8.0)
# (71.42857142857143, 0.0, 8.0)
# (85.71428571428571, 0.0, 8.0)
# (100.0, 0.0, 10.0)
# (100.0, 1.0, 10.0)
#
# ----OUT-----
# (0.0, 2.0)
# (0.0, 3.0)
# (1.0, 7.0)
# (1.0, 8.0)
# (2.0, 9.0)
# (4.0, 11.0)
# (5.0, 8.0)
# (6.0, 8.0)
# (7.0, 10.0)
# (7.0, 11.0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment