Skip to content

Instantly share code, notes, and snippets.

@jeanmonet
Forked from alimanfoo/find_runs.py
Last active December 4, 2021 17:05
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jeanmonet/6757a3f38a15621d33b13179416eb8db to your computer and use it in GitHub Desktop.
Save jeanmonet/6757a3f38a15621d33b13179416eb8db to your computer and use it in GitHub Desktop.
Find runs of consecutive items in a numpy array.
# Python 3.9
from __future__ import annotations
import numpy as np
def find_runs(array: np.ndarray) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Find runs of consecutive items in an array that are equal to each other.
Ie. find equal elements that are adjacent to each other.
https://gist.github.com/alimanfoo/c5977e87111abe8127453b21204c1065
"""
# ensure array
array = np.asanyarray(array)
if array.ndim != 1:
raise ValueError('only 1D array supported')
size = array.shape[0]
# handle empty array
if size == 0:
return np.array([]), np.array([]), np.array([])
else:
# find run starts
loc_run_start = np.empty(size, dtype=bool)
loc_run_start[0] = True # The first element is necessarily the start of a run
# Key: two consecutive elements not equal ==> True = 1
# Store result in loc_run_start where first item already set to True
np.not_equal(array[:-1], array[1:], out=loc_run_start[1:])
# Key: index of each element in loc_run_start that is not equal to the next element
# ^-- This means a new run starts at each such point.
# If "False" means element is equal to previous element, thus part of a run
run_starts = np.nonzero(loc_run_start)[0]
# find run values
run_values = array[loc_run_start]
# find run lengths
run_lengths = np.diff( # Diff of two consecutive elements (indices)
# ^-- Calculate the n-th discrete difference along the given axis.
np.append(run_starts, size)) # Appends the last element index (index of x[-1])
return run_values, run_starts, run_lengths
def find_runs_of_item(array: np.ndarray, item) -> tuple[np.ndarray, np.ndarray]:
"""
Find start positions and lengths of consecutive occurrences of a given element, inside an array.
>>> x = np.array([9, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 9, 9, 0, 1, 2, 3, 4, 5, 9, 9, 9, 9])
>>> run_starts, run_lengths = find_runs_of_item(x, 9)
>>> # (array([ 0, 11, 20]), array([2, 3, 4]))
>>> find_runs_of_item(x, 0)
>>> # (array([ 2, 14]), array([1, 1]))
>>> find_runs_of_item(x, 100)
>>> # (array([], dtype=int64), array([], dtype=int64))
"""
item_indexes = np.where(array == item)[0]
if item_indexes.size == 0:
# Empty result
return item_indexes, item_indexes
consecutive_diffs = np.diff(item_indexes)
run_interrupts = np.where(consecutive_diffs > 1)[0] + 1
run_interrupts = np.insert(run_interrupts,
[0, len(run_interrupts)], # where to insert
[0, len(item_indexes)]) # what to insert
run_starts = item_indexes[run_interrupts[:-1]]
run_lengths = np.diff(run_interrupts)
return run_starts, run_lengths
def find_run_of_item_with_min_length(array: np.ndarray, item, min_run_len: int = 2) -> tuple[np.ndarray, np.ndarray]:
"""
Find runs of a minimum length of a certain item inside a given array.
>>> x = np.array([9, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 9, 9, 0, 1, 2, 3, 4, 5, 9, 9, 9, 9])
>>> find_run_of_item_with_min_length(array, item=9, min_run_len=3)
>>> # (array([11, 20]), array([3, 4]))*
>>> find_run_of_item_with_min_length(array, 9, 6) # item not found or no run with such length
>>> # (array([], dtype=int64), array([], dtype=int64))
"""
run_starts, run_lengths = find_runs_of_item(array, item)
ix = np.where(run_lengths >= min_run_len)[0]
return run_starts[ix], run_lengths[ix]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment