Created
April 9, 2021 09:32
-
-
Save xenatisch/a54a273c4ee65c3e3da1240263d620c3 to your computer and use it in GitHub Desktop.
Rolling change calculations - UK Coronavirus Dashboard
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin python3 | |
# Imports | |
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
# Python: | |
import logging | |
# 3rd party: | |
from pandas import DataFrame, to_datetime | |
from numpy import NaN | |
# Internal: | |
try: | |
from __app__.utilities import func_logger | |
except ImportError: | |
from utilities import func_logger | |
from utilities.latest_data import get_latest_csv | |
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
# Header | |
__author__ = "Pouria Hadjibagheri" | |
__copyright__ = "Copyright (c) 2020, Public Health England" | |
__license__ = "MIT" | |
__version__ = "0.0.1" | |
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
__all__ = [ | |
'change_by_sum' | |
] | |
def col2datetime(d, col, format): | |
d.loc[:, col] = to_datetime(d.loc[:, col], format=format) | |
return d | |
def datetime2str(d, col, format): | |
d.loc[:, col] = d.loc[:, col].map(lambda x: x.strftime(format)) | |
return d | |
def get_directions(d, col): | |
d.loc[(d[col] < 0), col] = -1 | |
d.loc[(d[col] > 0), col] = 1 | |
d.loc[:, col] = d.loc[:, col].replace({-1: "DOWN", 0: "SAME", 1: "UP"}) | |
return d | |
def replace_all_zero(d): | |
if d.sum() == 0: | |
d[:] = NaN | |
return d | |
def calculate_percentage_change(d): | |
fraction = (d.iloc[7] / (d.iloc[0] or 1)) - 1 | |
if fraction == -1: | |
fraction = 0 | |
return fraction * 100 | |
@func_logger("change and direction by rolling sum calculation") | |
def change_by_sum(data: DataFrame, metrics, min_sum_allowed=None, min_sum_sub=None) -> DataFrame: | |
""" | |
Parameters | |
---------- | |
data : DataFrame | |
metrics : Iterable[str] | |
min_sum_allowed : int | |
min_sum_sub : int | |
All values in rolling sum that are smaller than ``min_sum_allowed`` are substituted | |
with ``min_sum_sub``. The latter is expected to be smaller than the former to prevent | |
conflicts. At the end of the process, all calculated columns carrying ``min_sum_sub``, | |
including the metric column, are substituted with ``NaN`` - . | |
Returns | |
------- | |
DataFrame | |
""" | |
metrics = set(metrics).intersection(data.columns) | |
data.sort_values( | |
["areaType", "areaCode", "date"], | |
ascending=[True, True, True], | |
inplace=True | |
) | |
logging.info(">> Starting to calculate the rolling metrics for") | |
date_fmt = "%Y-%m-%d" | |
date = "date" | |
unique_loc_qualifiers = ["areaType", "areaCode"] | |
unique_record_qualifiers = [*unique_loc_qualifiers, date] | |
for col_name in metrics: | |
rolling_sum_cols = [*unique_record_qualifiers, col_name] | |
rolling_sum = f"{col_name}RollingSum" | |
change = f"{col_name}Change" | |
direction = f"{col_name}Direction" | |
change_percentage = f"{col_name}ChangePercentage" | |
logging.info(f"\t{col_name}") | |
d = data.loc[:, rolling_sum_cols] | |
d.loc[:, col_name] = d.loc[:, col_name].astype(float) | |
if rolling_sum not in data.columns: | |
df_rsum = ( | |
d | |
.loc[:, rolling_sum_cols] | |
.pipe(col2datetime, col=date, format=date_fmt) | |
.groupby(unique_loc_qualifiers) | |
.rolling(7, on=date) | |
.sum() | |
.rename(columns={col_name: rolling_sum}) | |
.reset_index() | |
.loc[:, [*unique_record_qualifiers, rolling_sum]] | |
.pipe(datetime2str, col=date, format=date_fmt) | |
.set_index(unique_record_qualifiers) | |
) | |
logging.info("\t\tCalculated rolling sum") | |
try: | |
data.date = data.date.map(lambda x: x.strftime(date_fmt)) | |
except AttributeError: | |
# Already string | |
pass | |
if min_sum_allowed is not None: | |
df_rsum.loc[df_rsum[rolling_sum] < min_sum_allowed, rolling_sum] = min_sum_sub | |
data = ( | |
data | |
.set_index(unique_record_qualifiers) | |
.join(df_rsum, on=unique_record_qualifiers) | |
.reset_index() | |
) | |
logging.info("\t\tJoined rolling sum to dataset") | |
data.loc[:, rolling_sum] = ( | |
data | |
.groupby(unique_loc_qualifiers)[rolling_sum] | |
.apply(replace_all_zero) | |
) | |
logging.info(f"\t\tGrouped data by {unique_loc_qualifiers}") | |
df_tmp = data.loc[:, [*unique_record_qualifiers, rolling_sum]] | |
df_tmp = df_tmp.assign(**{ | |
change: ( | |
df_tmp | |
.pipe(col2datetime, col=date, format=date_fmt) | |
.loc[:, [*unique_loc_qualifiers, rolling_sum]] | |
.groupby(unique_loc_qualifiers) | |
.diff(periods=7) | |
), | |
direction: ( | |
df_tmp | |
.pipe(col2datetime, col=date, format=date_fmt) | |
.loc[:, [*unique_loc_qualifiers, rolling_sum]] | |
.groupby(unique_loc_qualifiers) | |
.diff(periods=7) | |
.pipe(get_directions, col=rolling_sum) | |
) | |
}) | |
logging.info("\t\tCalculated rolling change (diff)") | |
percentage_value = ( | |
df_tmp | |
.pipe(col2datetime, col=date, format=date_fmt) | |
.loc[:, [*unique_record_qualifiers, rolling_sum]] | |
.groupby(unique_loc_qualifiers) | |
.rolling(window=8, on=date)[rolling_sum] | |
.apply(calculate_percentage_change) | |
.round(1) | |
.to_frame(change_percentage) | |
) | |
logging.info("\t\tCalculated percentage change") | |
df_tmp = ( | |
df_tmp | |
.join(percentage_value, on=unique_record_qualifiers) | |
.pipe(datetime2str, col=date, format=date_fmt) | |
.set_index(unique_record_qualifiers) | |
.loc[:, [change, direction, change_percentage]] | |
) | |
logging.info("\t\tJoined percentage to other rolling figures") | |
data = ( | |
data | |
.join(df_tmp, on=unique_record_qualifiers) | |
.reset_index(drop=True) | |
) | |
logging.info("\t\tJoined rolling figures to main dataset") | |
data.loc[ | |
data.loc[:, col_name].isnull(), | |
[rolling_sum, change, direction, change_percentage] | |
] = NaN | |
logging.info("\t\tFinalised the data") | |
if min_sum_allowed is not None: | |
data.loc[ | |
data[rolling_sum] == min_sum_sub, | |
[rolling_sum, change, direction, change_percentage, col_name] | |
] = NaN | |
return data |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment