Created
April 25, 2022 10:00
-
-
Save mcourteaux/fc6edd7feaa180da224782933a7b6079 to your computer and use it in GitHub Desktop.
Starting point for chunking algorithm to replace TS Flex chunking.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import numpy as np | |
from typing import Dict, List, Union, Tuple, Optional | |
df1 = pd.read_parquet("smartphone_acceleration.parquet") | |
df2 = pd.read_parquet("wearable_acceleration.parquet") | |
df1.set_index("timestamp", inplace=True) | |
df2.set_index("timestamp", inplace=True) | |
df1.rename(columns={str(x) : str(x) + "_A" for x in df1.columns}, inplace=True) | |
df2.rename(columns={str(x) : str(x) + "_B" for x in df2.columns}, inplace=True) | |
def my_chunk_data(data: Dict[str, pd.DataFrame], fs_dict: Dict[str, float]): | |
"""Simple chunking algorithm for multiple time-overlapping dataframes. | |
Returns a list of ChunkGroup structs, which are indicated to have a | |
beginning and ending timestamp, together with a dictionary of the | |
corresponding sections of the given DataFrames. Every ChunkGroup contains | |
the region of data where _all_ given DataFrames contained data. This means: | |
this algoritm makes chunks based on the intersection of all time ranges | |
found in the given DataFrames. A time range is given by a sequence of | |
samples which are close enough together to be considered continuous. This | |
sense of "close enough" is determined by the fs_dict which gives the | |
expected sampling rate of samples (i.e. rows) of each DataFrame. | |
""" | |
from datetime import timedelta | |
assert(len(data) == len(fs_dict)) | |
_DEBUG=False | |
def chunk(series, max_gap): | |
gaps = series.index.to_series().diff() > timedelta(seconds=max_gap) | |
gaps.iloc[[0, -1]] = True | |
gaps.reset_index(drop=True, inplace=True) | |
gaps: List[int] = gaps.index[gaps].to_list() | |
chunks = [] | |
for (i_begin, i_end) in zip(gaps, gaps[1:]): | |
chunk = series.iloc[i_begin:i_end] | |
chunks.append(chunk) | |
return chunks | |
keys = list(data.keys()) | |
chunked : Dict[str, List[pd.DataFrame]] = {name: chunk(data[name], 1.5/fs_dict[name]) for name in keys} | |
# We need a concept like "chunk_groups", which is a collection of chunks | |
# from different series that match together. Those will be dicts too. | |
if _DEBUG: | |
for k in chunked.keys(): | |
print("Base chunks for %s: %d" % (k, len(chunked[k]))) | |
def ts_intersect(ref_b, ref_e, test_b, test_e): | |
if ref_e <= test_b: return "FullyRight" | |
if test_e <= ref_b: return "FullyLeft" | |
return max(ref_b, test_b), min(ref_e, test_e) | |
class ChunkGroup: | |
def __init__(self, ts_begin: pd.Timestamp, ts_end: pd.Timestamp, df_dict: Dict[str, pd.DataFrame]): | |
self.ts_begin = ts_begin | |
self.ts_end = ts_end | |
self.df_dict = df_dict | |
def __str__(self): | |
return "ChunkGroup(ts_begin=%s, ts_end=%s, df_dict=%s)" % (self.ts_begin, self.ts_end, self.df_dict) | |
def __repr__(self): | |
return "ChunkGroup(ts_begin=%s, ts_end=%s, df_dict=%s)" % (self.ts_begin, self.ts_end, self.df_dict.keys()) | |
def split_chunk_group(ch_group, ts): | |
chg_A = ChunkGroup(ch_group.ts_begin, ts, {k: ch_group.df_dict[k][:ts] for k in ch_group.df_dict.keys()}) | |
chg_B = ChunkGroup(ts, ch_group.ts_end, {k: ch_group.df_dict[k][ts:] for k in ch_group.df_dict.keys()}) | |
return chg_A, chg_B | |
def merge_chunks( | |
merged_chunks: List[ChunkGroup], | |
remaining_chunks : List[pd.DataFrame], | |
remaining_key : str): | |
if _DEBUG: | |
print("merge_chunks(... (len=%d), ... (len=%d), key=%s)" % (len(merged_chunks), len(remaining_chunks), remaining_key)) | |
idx_m = 0 | |
while idx_m < len(merged_chunks): | |
# We selected a resulting chunk. This one needs to be either cut down, or even split and readded to the result list. | |
merged = merged_chunks[idx_m] | |
if _DEBUG: | |
print(" chunk:", merged.ts_begin, "-->", merged.ts_end) | |
idx_r = 0 | |
found_overlap = False | |
while idx_r < len(remaining_chunks): | |
rch = remaining_chunks[idx_r] | |
rch_b = rch.index[0] - timedelta(seconds=0.5 / fs_dict[remaining_key]) | |
rch_e = rch.index[-1] + timedelta(seconds=0.5 / fs_dict[remaining_key]) | |
if _DEBUG: | |
print(" ", "remaining", rch_b, "-->", rch_e) | |
intersection = ts_intersect(merged.ts_begin, merged.ts_end, rch_b, rch_e) | |
if intersection == "FullyLeft": | |
# This tested chunk is fully left of the reference, so will never be merged. | |
# To speed up the rest of the algo, we can delete it from the list. | |
if _DEBUG: | |
print(" ", intersection) | |
del remaining_chunks[idx_r] # TODO instead of slow del, just move the index starting point up | |
continue | |
elif intersection == "FullyRight": | |
if _DEBUG: | |
print(" ", intersection) | |
# This chunk might be merged in the future. Do nothing here. | |
# However, all next chunks are even more right, so we can stop | |
# this loop. | |
break | |
else: | |
# There is actual intersection. We need to cut out the intersection | |
# from this one, and cut out this intersection from ALL the | |
# elements of the currently merged chunk group. | |
is_b, is_e = intersection | |
if _DEBUG: | |
print(" ", "intersection:", is_b, "->", is_e, "=", is_e - is_b) | |
to_insert = rch[is_b:is_e] | |
if is_b > merged.ts_begin: | |
if _DEBUG: | |
print(" ", "Shrink front") | |
merged.ts_begin = is_b | |
merged.df_dict = {k : merged.df_dict[k][merged.ts_begin:] for k in merged.df_dict.keys()} | |
if is_e < merged.ts_end: | |
if _DEBUG: | |
print(" ", "Split back") | |
del merged_chunks[idx_m] | |
chg_A, chg_B = split_chunk_group(merged, is_e) | |
merged_chunks.insert(idx_m, chg_A) | |
merged_chunks.insert(idx_m + 1, chg_B) | |
merged = chg_A | |
# Insert into the chunk group | |
merged.df_dict[remaining_key] = to_insert | |
# We have found an overlapping group of chunks and processed it. | |
# Let's keep it simple and go to the next chunk group. | |
found_overlap = True | |
break | |
idx_r += 1 | |
# If none of the remaining chunks have found overlap, this chunk | |
# is unmatched. This means we will delete this chunk group. | |
if not found_overlap: | |
del merged_chunks[idx_m] | |
else: | |
idx_m += 1 | |
# Convert dataframe of first key to a "merged chunks" format. | |
merged_chunks = [ChunkGroup(ch.index[0], ch.index[-1], {keys[0]: ch}) for ch in chunked[keys[0]]] | |
for key in keys[1:]: | |
merge_chunks(merged_chunks, chunked[key], key) | |
return merged_chunks | |
# TS Flex chunking (note that TS Flex chunking returns different things. You cannot use the same | |
# printing code below to inspect the result) | |
#import tsflex | |
#import tsflex.chunking | |
#chunks = tsflex.chunking.chunk_data(data={"A": df1, "B": df2}, fs_dict={"A": 4, "B": 4}) | |
# Sane chunking | |
chunks = my_chunk_data(data={"A": df1, "B": df2}, fs_dict={"A": 32, "B": 32}) | |
print() | |
print() | |
print("Found", len(chunks), "chunks") | |
for chunk in chunks: | |
print(chunk.ts_begin, "-->", chunk.ts_end) | |
for k in chunk.df_dict.keys(): | |
print(k, ":", chunk.df_dict[k].index[0], "->", chunk.df_dict[k].index[-1]) | |
print() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment