Last active
July 31, 2020 07:36
-
-
Save prockenschaub/4fe4cd0ac995fef42cdacbb342d9ca77 to your computer and use it in GitHub Desktop.
Awkward-Pandas timeseries example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import pandas as pd | |
import awkward1 as awk | |
# TODO: add typing | |
from typing import Type | |
def is_iloc_key(key): | |
if isinstance(key, (int, slice)): | |
return True | |
elif isinstance(key, np.ndarray) and np.issubdtype(key.dtype, np.int): | |
return True | |
elif isinstance(key, np.ndarray) and np.issubdtype(key.dtype, np.bool): | |
return True | |
elif callable(key): | |
return True | |
return False | |
class TimeSeries(pd.Series): | |
""" | |
An extension of the pandas.Series object that implements additional | |
functionality if the underlying data structure is a timeseries (represented | |
as an awkward array). | |
""" | |
_metadata = ["name"] | |
@property | |
def _constructor(self) -> Type["TimeSeries"]: | |
return TimeSeries | |
def __init__(self, data=None, index=None, **kwargs): | |
name = kwargs.pop("name", None) | |
super(TimeSeries, self).__init__(data, index=index, name=name, **kwargs) | |
@property | |
def is_timedata(self): | |
""" | |
Is the underlying stored data a time series | |
Returns | |
------- | |
boolean | |
""" | |
return isinstance(self.dtype, awk._connect._pandas.AwkwardDtype) | |
def __getitem__(self, key): | |
""" | |
Index series similar to pandas.Series. Also excepts a two-dimensional | |
keys, in which case the first dimension corresponds to the observations | |
(like regular pandas.Series indexing) and the second dimension | |
corresponds to time (only applied if underlying data is a timeseries) | |
Returns | |
------- | |
TimeSeries | |
""" | |
if isinstance(key, tuple) and len(key) == 2: | |
obsv_key = key[0] | |
time_key = key[1] | |
res = super(TimeSeries, self).__getitem__(obsv_key) | |
if self.is_timedata: | |
return res.slice_time(time_key.start, time_key.stop) | |
else: | |
return res | |
return super(TimeSeries, self).__getitem__(key) | |
def slice_time(self, start, end): | |
""" | |
Slice a time series across the time axis. Time in this simple | |
implementation is represented solely by the position in the array, i.e. | |
the first entry in a cell is time 0, the second is time 1, ... | |
Parameters | |
---------- | |
start : int | |
minimum time step to include | |
end : int | |
maximum time step to include | |
Returns | |
------- | |
TimeSeries | |
""" | |
if not self.is_timedata: | |
raise TypeError('The underlying data is not a timeseries type.') | |
return self._constructor( | |
self._values[(self.values.t >= start) & (self.values.t < end)], | |
index=self.index, name=self.name | |
) | |
def summarise_over_time(self, awk_func): | |
""" | |
Create a one-dimensional summary of each timeseries, for example the | |
mean across time. | |
Parameters | |
---------- | |
awk_func : function | |
a summary function implemented in awkward1, like awk.mean() | |
Returns | |
------- | |
pandas.Series | |
""" | |
return pd.Series( | |
awk_func(self._values.v, axis=1).to_numpy(), | |
index=self.index, name=self.name | |
) | |
class TimeFrame(pd.DataFrame): | |
""" | |
A TimeFrame object is a subclassed pandas.DataFrame that has one or more | |
columns containing time series data. | |
See Also | |
-------- | |
sktime.container.TimeArray | |
sktime.container.TimeSeries | |
pd.DataFrame | |
""" | |
@property | |
def _constructor(self) -> Type["TimeFrame"]: | |
return TimeFrame | |
_constructor_sliced: Type[TimeSeries] = TimeSeries | |
@property | |
def _constructor_expanddim(self): | |
raise NotImplementedError("Not supported for TimeFrames!") | |
def __init__(self, | |
data=None, | |
index=None, | |
columns=None, | |
copy=False): | |
super(TimeFrame, self).__init__(data, index, columns, copy=copy) | |
def __getitem__(self, key): | |
""" | |
Extends pd.DataFrame.__getitem__ by prioritising calls that are | |
compatible with .iloc by directly forwarding them. For example, the | |
following leads to an error in pd.DataFrame but not in TimeFrame: | |
> df[:2, 'x'] | |
Also allows for slicing in time by adding a third index: | |
> df[:2, 'x', 1:5] | |
Parameters | |
---------- | |
key | |
Any input allowed for pd.DataFrame.__getitem__. Additionally allows | |
for a 3-tuple, where the third key relates to the time dimension. | |
Returns | |
------- | |
TimeFrame or TimeSeries | |
""" | |
if isinstance(key, tuple): | |
row_key = key[0] | |
col_key = key[1] | |
if is_iloc_key(row_key) and is_iloc_key(col_key): | |
subset = self.iloc[row_key, col_key] | |
elif is_iloc_key(row_key) and not is_iloc_key(col_key): | |
subset = self.iloc[row_key, :].loc[:, col_key] | |
elif is_iloc_key(row_key) and not is_iloc_key(col_key): | |
subset = self.loc[row_key, :].iloc[:, col_key] | |
else: | |
subset = self.loc[row_key, col_key] | |
if len(key) == 3: | |
time_key = key[2] | |
for c in subset.columns.intersection(self.time_columns): | |
subset[c] = subset[c].slice_time(time_key.start, time_key.stop) | |
return subset | |
return super(TimeFrame, self).__getitem__(key) | |
@property | |
def time_columns(self): | |
""" | |
Names of all columns that contain timeseries data | |
Returns | |
------- | |
pandas.Index | |
""" | |
return self.columns[[self[c].is_timedata for c in self.columns]] | |
# ------------------------------------------------------------------------------------- | |
# Example usage (very simple) | |
# Simulate 100 patient encounters of random lengths between 20 and 80 hours, in which | |
# heart rate and temperature are taken once every hour. | |
np.random.seed(42) | |
pat_ids = np.arange(100) | |
los_hours = np.random.randint(20, 80, 100) # Length of stay | |
heart_rate = [[{'t': i, 'v': np.random.randint(20, 80)} for i in range(l)] for l in los_hours] | |
temperature = [[{'t': i, 'v': 36. + 2 * np.random.randn()} for i in range(l + np.random.randint(-20, 20))] for l in los_hours] | |
heart_rate_awk = awk.Array(heart_rate) | |
temperature_awk = awk.Array(temperature) | |
df = TimeFrame({ | |
'id': pat_ids, | |
'len': los_hours, | |
'heart_rate': heart_rate_awk, | |
'temperature': temperature_awk | |
}) | |
# A TimeSeries object can be sliced across patients and time index... | |
df[:2, ['heart_rate', 'temperature'], 1:5] | |
# ... and provides simply functionality to manipulate/summarise timeseries | |
df['heart_rate'].summarise_over_time(awk.mean) | |
# The TimeFrame objects also know meta-data, like which columns are timeseries and which are scalar columns | |
df.columns | |
df.time_columns | |
df['heart_rate'] = df['heart_rate'].summarise_over_time(awk.mean) | |
df.time_columns |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment