Skip to content

Instantly share code, notes, and snippets.

@Voyz
Created October 23, 2022 14:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Voyz/454d55f8f76c2b3b65e52708c9da886e to your computer and use it in GitHub Desktop.
Save Voyz/454d55f8f76c2b3b65e52708c9da886e to your computer and use it in GitHub Desktop.
Vertices of a timeseries
def timeseries_vertices(ser:pd.Series, distance=15, coeff=3):
name = ser.name
# remove intermediate vertices on lines
def no_lines(df):
df.loc[:, 'gt_before'] = df.loc[:, name] > df.loc[:, name].shift(1)
df.loc[:, 'gt_after'] = df.loc[:, name] > df.loc[:, name].shift(-1)
df.loc[:, 'is_peak'] = df.loc[:, 'gt_before'] == df.loc[:, 'gt_after']
df = df.loc[df.loc[:, 'is_peak'], :].copy()
return df
# reduce peaks that have one or both sides short
def no_small_peaks(df, name, threshold, mutual):
df.loc[:, 'diff_before'] = df.loc[:, name] - df.loc[:, name].shift(1)
df.loc[:, 'diff_after'] = df.loc[:, name] - df.loc[:, name].shift(-1)
mod = df.loc[:, name].std() / threshold
if mutual:
df.loc[:, 'is_not_small'] = (abs(df.loc[:, 'diff_before']) > mod) & (abs(df.loc[:, 'diff_after']) > mod)
else:
df.loc[:, 'is_not_small'] = (abs(df.loc[:, 'diff_before']) > mod) | (abs(df.loc[:, 'diff_after']) > mod)
df = df.loc[df.loc[:, 'is_not_small']].copy()
return df
df = ser.to_frame()
df = no_lines(df)
df = no_small_peaks(df, name, 10, False)
df = no_lines(df)
df = no_small_peaks(df, name, 15, True)
df = no_small_peaks(df, name, 10, True)
df = no_lines(df)
downsampled = df.loc[:, name]
downsampled.loc[ser.index[0]] = ser.iloc[0]
downsampled.loc[ser.index[-1]] = ser.iloc[-1]
downsampled.sort_index(inplace=True)
downsampled = downsampled.reindex(ser.index, fill_value=np.nan).interpolate('index').ffill().bfill()
prominence = downsampled.std() / coeff
peaks, _ = find_peaks(downsampled, distance=distance)
ppeaks, _ = find_peaks(downsampled, distance=distance, prominence=prominence, width=3)
valleys, _ = find_peaks(downsampled * -1, distance=distance)
pvalleys, _ = find_peaks(downsampled * -1, distance=distance, prominence=prominence, width=3)
# if no peaks and valleys were found, lets return start - end
if len(ppeaks) == 0 or len(pvalleys) == 0:
ppeaks = [0 if ser.iloc[0] > ser.iloc[-1] else ser.shape[0]-1]
valleys = [ser.shape[0]-1 if ser.iloc[0] > ser.iloc[-1] else 0]
else:
# add first element
if ppeaks[0] < pvalleys[0]:
pvalleys = np.insert(pvalleys, 0, 0)
else:
ppeaks = np.insert(ppeaks, 0, 0)
vindex = np.concatenate([ppeaks, pvalleys])
vertices = pd.DataFrame(index=vindex)
vertices.sort_index(inplace=True)
vertices.loc[ppeaks, 'peak'] = True
vertices.loc[pvalleys, 'peak'] = False
previous_index = vindex[0]
previous_is_peak = vertices.loc[vindex[0], 'peak']
copy_peaks = []
copy_valleys = []
for index, row in vertices.iterrows():
is_peak = row.loc['peak']
if index != vindex[0]:
# if index - previous_index < distance:
# continue
if is_peak == previous_is_peak:
continue
if is_peak:
previous_is_peak = True
copy_peaks.append(index)
else:
previous_is_peak = False
copy_valleys.append(index)
previous_index = index
ppeaks = copy_peaks
pvalleys = copy_valleys
return list(peaks), list(ppeaks), list(valleys), list(pvalleys), downsampled
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment