Skip to content

Instantly share code, notes, and snippets.

@mcourteaux
Created April 25, 2022 10:00
Show Gist options
  • Save mcourteaux/fc6edd7feaa180da224782933a7b6079 to your computer and use it in GitHub Desktop.
Save mcourteaux/fc6edd7feaa180da224782933a7b6079 to your computer and use it in GitHub Desktop.
Starting point for chunking algorithm to replace TS Flex chunking.
import pandas as pd
import numpy as np
from typing import Dict, List, Union, Tuple, Optional
df1 = pd.read_parquet("smartphone_acceleration.parquet")
df2 = pd.read_parquet("wearable_acceleration.parquet")
df1.set_index("timestamp", inplace=True)
df2.set_index("timestamp", inplace=True)
df1.rename(columns={str(x) : str(x) + "_A" for x in df1.columns}, inplace=True)
df2.rename(columns={str(x) : str(x) + "_B" for x in df2.columns}, inplace=True)
def my_chunk_data(data: Dict[str, pd.DataFrame], fs_dict: Dict[str, float]):
"""Simple chunking algorithm for multiple time-overlapping dataframes.
Returns a list of ChunkGroup structs, which are indicated to have a
beginning and ending timestamp, together with a dictionary of the
corresponding sections of the given DataFrames. Every ChunkGroup contains
the region of data where _all_ given DataFrames contained data. This means:
this algoritm makes chunks based on the intersection of all time ranges
found in the given DataFrames. A time range is given by a sequence of
samples which are close enough together to be considered continuous. This
sense of "close enough" is determined by the fs_dict which gives the
expected sampling rate of samples (i.e. rows) of each DataFrame.
"""
from datetime import timedelta
assert(len(data) == len(fs_dict))
_DEBUG=False
def chunk(series, max_gap):
gaps = series.index.to_series().diff() > timedelta(seconds=max_gap)
gaps.iloc[[0, -1]] = True
gaps.reset_index(drop=True, inplace=True)
gaps: List[int] = gaps.index[gaps].to_list()
chunks = []
for (i_begin, i_end) in zip(gaps, gaps[1:]):
chunk = series.iloc[i_begin:i_end]
chunks.append(chunk)
return chunks
keys = list(data.keys())
chunked : Dict[str, List[pd.DataFrame]] = {name: chunk(data[name], 1.5/fs_dict[name]) for name in keys}
# We need a concept like "chunk_groups", which is a collection of chunks
# from different series that match together. Those will be dicts too.
if _DEBUG:
for k in chunked.keys():
print("Base chunks for %s: %d" % (k, len(chunked[k])))
def ts_intersect(ref_b, ref_e, test_b, test_e):
if ref_e <= test_b: return "FullyRight"
if test_e <= ref_b: return "FullyLeft"
return max(ref_b, test_b), min(ref_e, test_e)
class ChunkGroup:
def __init__(self, ts_begin: pd.Timestamp, ts_end: pd.Timestamp, df_dict: Dict[str, pd.DataFrame]):
self.ts_begin = ts_begin
self.ts_end = ts_end
self.df_dict = df_dict
def __str__(self):
return "ChunkGroup(ts_begin=%s, ts_end=%s, df_dict=%s)" % (self.ts_begin, self.ts_end, self.df_dict)
def __repr__(self):
return "ChunkGroup(ts_begin=%s, ts_end=%s, df_dict=%s)" % (self.ts_begin, self.ts_end, self.df_dict.keys())
def split_chunk_group(ch_group, ts):
chg_A = ChunkGroup(ch_group.ts_begin, ts, {k: ch_group.df_dict[k][:ts] for k in ch_group.df_dict.keys()})
chg_B = ChunkGroup(ts, ch_group.ts_end, {k: ch_group.df_dict[k][ts:] for k in ch_group.df_dict.keys()})
return chg_A, chg_B
def merge_chunks(
merged_chunks: List[ChunkGroup],
remaining_chunks : List[pd.DataFrame],
remaining_key : str):
if _DEBUG:
print("merge_chunks(... (len=%d), ... (len=%d), key=%s)" % (len(merged_chunks), len(remaining_chunks), remaining_key))
idx_m = 0
while idx_m < len(merged_chunks):
# We selected a resulting chunk. This one needs to be either cut down, or even split and readded to the result list.
merged = merged_chunks[idx_m]
if _DEBUG:
print(" chunk:", merged.ts_begin, "-->", merged.ts_end)
idx_r = 0
found_overlap = False
while idx_r < len(remaining_chunks):
rch = remaining_chunks[idx_r]
rch_b = rch.index[0] - timedelta(seconds=0.5 / fs_dict[remaining_key])
rch_e = rch.index[-1] + timedelta(seconds=0.5 / fs_dict[remaining_key])
if _DEBUG:
print(" ", "remaining", rch_b, "-->", rch_e)
intersection = ts_intersect(merged.ts_begin, merged.ts_end, rch_b, rch_e)
if intersection == "FullyLeft":
# This tested chunk is fully left of the reference, so will never be merged.
# To speed up the rest of the algo, we can delete it from the list.
if _DEBUG:
print(" ", intersection)
del remaining_chunks[idx_r] # TODO instead of slow del, just move the index starting point up
continue
elif intersection == "FullyRight":
if _DEBUG:
print(" ", intersection)
# This chunk might be merged in the future. Do nothing here.
# However, all next chunks are even more right, so we can stop
# this loop.
break
else:
# There is actual intersection. We need to cut out the intersection
# from this one, and cut out this intersection from ALL the
# elements of the currently merged chunk group.
is_b, is_e = intersection
if _DEBUG:
print(" ", "intersection:", is_b, "->", is_e, "=", is_e - is_b)
to_insert = rch[is_b:is_e]
if is_b > merged.ts_begin:
if _DEBUG:
print(" ", "Shrink front")
merged.ts_begin = is_b
merged.df_dict = {k : merged.df_dict[k][merged.ts_begin:] for k in merged.df_dict.keys()}
if is_e < merged.ts_end:
if _DEBUG:
print(" ", "Split back")
del merged_chunks[idx_m]
chg_A, chg_B = split_chunk_group(merged, is_e)
merged_chunks.insert(idx_m, chg_A)
merged_chunks.insert(idx_m + 1, chg_B)
merged = chg_A
# Insert into the chunk group
merged.df_dict[remaining_key] = to_insert
# We have found an overlapping group of chunks and processed it.
# Let's keep it simple and go to the next chunk group.
found_overlap = True
break
idx_r += 1
# If none of the remaining chunks have found overlap, this chunk
# is unmatched. This means we will delete this chunk group.
if not found_overlap:
del merged_chunks[idx_m]
else:
idx_m += 1
# Convert dataframe of first key to a "merged chunks" format.
merged_chunks = [ChunkGroup(ch.index[0], ch.index[-1], {keys[0]: ch}) for ch in chunked[keys[0]]]
for key in keys[1:]:
merge_chunks(merged_chunks, chunked[key], key)
return merged_chunks
# TS Flex chunking (note that TS Flex chunking returns different things. You cannot use the same
# printing code below to inspect the result)
#import tsflex
#import tsflex.chunking
#chunks = tsflex.chunking.chunk_data(data={"A": df1, "B": df2}, fs_dict={"A": 4, "B": 4})
# Sane chunking
chunks = my_chunk_data(data={"A": df1, "B": df2}, fs_dict={"A": 32, "B": 32})
print()
print()
print("Found", len(chunks), "chunks")
for chunk in chunks:
print(chunk.ts_begin, "-->", chunk.ts_end)
for k in chunk.df_dict.keys():
print(k, ":", chunk.df_dict[k].index[0], "->", chunk.df_dict[k].index[-1])
print()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment