Skip to content

Instantly share code, notes, and snippets.

@evan-burke
Last active June 15, 2020 20:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save evan-burke/981c1cc258c0a7dbac3c55c7f3a47902 to your computer and use it in GitHub Desktop.
Save evan-burke/981c1cc258c0a7dbac3c55c7f3a47902 to your computer and use it in GitHub Desktop.
Detect repeated values in a series, and assigning an index to each sequence
import pandas as pd
# Use case: triggering an alert only if, say, monitoring is outside of a desired value for 4 hours in a row
def detect_sequential_failures(series, how_many):
# Takes as input a pd.Series of True or False values.
# Calculate like, e.g.,: df['my_condition_evaluation'] = df['testcol'] < threshold)
# then: detect_sequential_failures('my_condition_evaluation', 3)
#
# Returns a series with None for False items or True items in a sequence < how_many in a row,
# and a 1-indexed integer for the id of each individual sequence of >= how_many in a row.
# (The 1-index is to avoid 0 being falsey.)
# Should be O(n).
if not isinstance(series, pd.Series):
raise Exception("input is not a pd.Series!")
seq_counter = 0
seq_id = None
begin_idx = None
rows = series.shape[0]
seq_out = [None for i in range(rows)]
for i in series.iteritems():
if not isinstance(i[1], bool):
raise Exception("input series contains non-bool values!")
if i[1]:
if begin_idx is not None:
seq_counter += 1
else:
seq_counter = 1
if seq_id is None:
seq_id = 1
else:
seq_id += 1
begin_idx = i[0]
else:
if begin_idx is not None and seq_counter >= how_many:
seq_out[begin_idx:begin_idx + seq_counter] = [seq_id for i in range(seq_counter)]
begin_idx = None
seq_counter = 0
# handling for last row:
if i[0] == (rows - 1):
if begin_idx is not None and seq_counter >= how_many:
seq_out[begin_idx:begin_idx + seq_counter] = [seq_id for i in range(seq_counter)]
# specify nullable integer data type, requires pandas 0.24 +
return pd.Series(seq_out, dtype=pd.Int64Dtype())
# Sequence of 4 or more
rates1 = pd.Series([25, 25, 25, 25, 100, 100, 100, 100, 25, 25, 100, 100]) < 50
r1_resp_four = pd.Series([1, 1, 1, 1, None, None, None, None, None, None, None, None], dtype=pd.Int64Dtype())
d4 = detect_sequential_failures(rates1, 4)
assert d4.equals(r1_resp_four)
# sequence of 2 or more
rates1 = pd.Series([25, 25, 25, 25, 100, 100, 100, 100, 25, 25, 100, 100]) < 50
r1_resp_fourtwo = pd.Series([1, 1, 1, 1, None, None, None, None, 2, 2, None, None], dtype=pd.Int64Dtype())
d42 = detect_sequential_failures(rates1, 2)
assert d42.equals(r1_resp_fourtwo)
# No sequence of 5 in series
rates1 = pd.Series([25, 25, 25, 25, 100, 100, 100, 100, 25, 25, 100, 100]) < 50
r1_resp_5 = pd.Series([None for i in range(12)], dtype=pd.Int64Dtype())
d42 = detect_sequential_failures(rates1, 5)
assert d42.equals(r1_resp_5)
# Sequence of 4 or more at end of series
rates2 = pd.Series([100, 100, 100, 100, 100, 100, 100, 100, 25, 25, 25, 25]) < 50
r2_resp_four = pd.Series([None, None, None, None, None, None, None, None, 1, 1, 1, 1], dtype=pd.Int64Dtype())
d2 = detect_sequential_failures(rates2, 4)
assert d2.equals(r2_resp_four)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment