Created April 25, 2022 10:00
Starting point for chunking algorithm to replace TS Flex chunking.
import pandas as pd
import numpy as np
from typing import Dict, List, Union, Tuple, Optional
df1 = pd.read_parquet("smartphone_acceleration.parquet")
df2 = pd.read_parquet("wearable_acceleration.parquet")
df1.set_index("timestamp", inplace=True)
df2.set_index("timestamp", inplace=True)
df1.rename(columns={str(x) : str(x) + "_A" for x in df1.columns}, inplace=True)
df2.rename(columns={str(x) : str(x) + "_B" for x in df2.columns}, inplace=True)
def my_chunk_data(data: Dict[str, pd.DataFrame], fs_dict: Dict[str, float]):
"""Simple chunking algorithm for multiple time-overlapping dataframes.
Returns a list of ChunkGroup structs, which are indicated to have a
beginning and ending timestamp, together with a dictionary of the
corresponding sections of the given DataFrames. Every ChunkGroup contains
the region of data where _all_ given DataFrames contained data. This means:
this algoritm makes chunks based on the intersection of all time ranges
found in the given DataFrames. A time range is given by a sequence of
samples which are close enough together to be considered continuous. This
sense of "close enough" is determined by the fs_dict which gives the
expected sampling rate of samples (i.e. rows) of each DataFrame.
from datetime import timedelta
assert(len(data) == len(fs_dict))
def chunk(series, max_gap):
gaps = series.index.to_series().diff() > timedelta(seconds=max_gap)
gaps.iloc[[0, -1]] = True
gaps.reset_index(drop=True, inplace=True)
gaps: List[int] = gaps.index[gaps].to_list()
chunks = []
for (i_begin, i_end) in zip(gaps, gaps[1:]):
chunk = series.iloc[i_begin:i_end]
return chunks
keys = list(data.keys())
chunked : Dict[str, List[pd.DataFrame]] = {name: chunk(data[name], 1.5/fs_dict[name]) for name in keys}
# We need a concept like "chunk_groups", which is a collection of chunks
# from different series that match together. Those will be dicts too.
if _DEBUG:
for k in chunked.keys():
print("Base chunks for %s: %d" % (k, len(chunked[k])))
def ts_intersect(ref_b, ref_e, test_b, test_e):
if ref_e <= test_b: return "FullyRight"
if test_e <= ref_b: return "FullyLeft"
return max(ref_b, test_b), min(ref_e, test_e)
class ChunkGroup:
def __init__(self, ts_begin: pd.Timestamp, ts_end: pd.Timestamp, df_dict: Dict[str, pd.DataFrame]):
self.ts_begin = ts_begin
self.ts_end = ts_end
self.df_dict = df_dict
def __str__(self):
return "ChunkGroup(ts_begin=%s, ts_end=%s, df_dict=%s)" % (self.ts_begin, self.ts_end, self.df_dict)
def __repr__(self):
return "ChunkGroup(ts_begin=%s, ts_end=%s, df_dict=%s)" % (self.ts_begin, self.ts_end, self.df_dict.keys())
def split_chunk_group(ch_group, ts):
chg_A = ChunkGroup(ch_group.ts_begin, ts, {k: ch_group.df_dict[k][:ts] for k in ch_group.df_dict.keys()})
chg_B = ChunkGroup(ts, ch_group.ts_end, {k: ch_group.df_dict[k][ts:] for k in ch_group.df_dict.keys()})
return chg_A, chg_B
def merge_chunks(
merged_chunks: List[ChunkGroup],
remaining_chunks : List[pd.DataFrame],
remaining_key : str):
if _DEBUG:
print("merge_chunks(... (len=%d), ... (len=%d), key=%s)" % (len(merged_chunks), len(remaining_chunks), remaining_key))
idx_m = 0
while idx_m < len(merged_chunks):
# We selected a resulting chunk. This one needs to be either cut down, or even split and readded to the result list.
merged = merged_chunks[idx_m]
if _DEBUG:
print(" chunk:", merged.ts_begin, "-->", merged.ts_end)
idx_r = 0
found_overlap = False
while idx_r < len(remaining_chunks):
rch = remaining_chunks[idx_r]
rch_b = rch.index[0] - timedelta(seconds=0.5 / fs_dict[remaining_key])
rch_e = rch.index[-1] + timedelta(seconds=0.5 / fs_dict[remaining_key])
if _DEBUG:
print(" ", "remaining", rch_b, "-->", rch_e)
intersection = ts_intersect(merged.ts_begin, merged.ts_end, rch_b, rch_e)
if intersection == "FullyLeft":
# This tested chunk is fully left of the reference, so will never be merged.
# To speed up the rest of the algo, we can delete it from the list.
if _DEBUG:
print(" ", intersection)
del remaining_chunks[idx_r] # TODO instead of slow del, just move the index starting point up
elif intersection == "FullyRight":
if _DEBUG:
print(" ", intersection)
# This chunk might be merged in the future. Do nothing here.
# However, all next chunks are even more right, so we can stop
# this loop.
# There is actual intersection. We need to cut out the intersection
# from this one, and cut out this intersection from ALL the
# elements of the currently merged chunk group.
is_b, is_e = intersection
if _DEBUG:
print(" ", "intersection:", is_b, "->", is_e, "=", is_e - is_b)
to_insert = rch[is_b:is_e]
if is_b > merged.ts_begin:
if _DEBUG:
print(" ", "Shrink front")
merged.ts_begin = is_b
merged.df_dict = {k : merged.df_dict[k][merged.ts_begin:] for k in merged.df_dict.keys()}
if is_e < merged.ts_end:
if _DEBUG:
print(" ", "Split back")
del merged_chunks[idx_m]
chg_A, chg_B = split_chunk_group(merged, is_e)
merged_chunks.insert(idx_m, chg_A)
merged_chunks.insert(idx_m + 1, chg_B)
merged = chg_A
# Insert into the chunk group
merged.df_dict[remaining_key] = to_insert
# We have found an overlapping group of chunks and processed it.
# Let's keep it simple and go to the next chunk group.
found_overlap = True
idx_r += 1
# If none of the remaining chunks have found overlap, this chunk
# is unmatched. This means we will delete this chunk group.
if not found_overlap:
del merged_chunks[idx_m]
idx_m += 1
# Convert dataframe of first key to a "merged chunks" format.
merged_chunks = [ChunkGroup(ch.index[0], ch.index[-1], {keys[0]: ch}) for ch in chunked[keys[0]]]
for key in keys[1:]:
merge_chunks(merged_chunks, chunked[key], key)
return merged_chunks
# TS Flex chunking (note that TS Flex chunking returns different things. You cannot use the same
# printing code below to inspect the result)
#import tsflex
#import tsflex.chunking
#chunks = tsflex.chunking.chunk_data(data={"A": df1, "B": df2}, fs_dict={"A": 4, "B": 4})
# Sane chunking
chunks = my_chunk_data(data={"A": df1, "B": df2}, fs_dict={"A": 32, "B": 32})
print("Found", len(chunks), "chunks")
for chunk in chunks:
print(chunk.ts_begin, "-->", chunk.ts_end)
for k in chunk.df_dict.keys():
print(k, ":", chunk.df_dict[k].index[0], "->", chunk.df_dict[k].index[-1])
