Skip to content

Instantly share code, notes, and snippets.

@rrealrangel
Created August 1, 2020 02:13
Show Gist options
  • Save rrealrangel/9ed1b3a17d4572f0a62794e06ee209d8 to your computer and use it in GitHub Desktop.
Save rrealrangel/9ed1b3a17d4572f0a62794e06ee209d8 to your computer and use it in GitHub Desktop.
Estimar la tendencia del número de nuevos casos diarios de COVID-19 en los municipios de Baja California usando los conjuntos de datos de la Base de Datos de COVID-19 de la Dirección General de Epidemiología de la Secretaría de Salud, disponible en https://bit.ly/2XvIsPX.
# -*- coding: utf-8 -*-
"""
GENERAR FIGURAS DE LA EVOLUCIÓN DE CASOS DE COVID-19 EN MÉXICO.
Created on Fri May 22 10:53:26 2020
@author: r.realrangel@gmail.com
"""
from pathlib import Path
import datetime as dt
import numpy as np
import pandas as pd
from matplotlib import dates as mdates
from matplotlib import pyplot as plt
from scipy.interpolate import interp1d
from sklearn.model_selection import GridSearchCV
from sklearn.neighbors import KernelDensity
from sklearn.model_selection import KFold
from statsmodels.nonparametric.smoothers_lowess import lowess
# %% Inputs
INP_DATE = dt.datetime(year=2020, month=7, day=31) # Fecha que se desea calcular.
INP_DATA_DIR = 'C:/Users/...' # Directorio en que se almacenan los archivos de la Base de Datos de COVID-19.
INP_OUTPUT_DIR = 'C:/Users/...' # Directorio donde se desea exportar el archivo PNG con la figura generada.
INP_DATA_CATALOGUE = 'C:/Users/...' # Ruta completa del archivo Catalogos_0412.xlsx, de la Base de Datos de COVID-19.
)
INP_MONTH_NUMBER = {
1: 'enero', 2: 'febrero', 3: 'marzo', 4: 'abril', 5: 'mayo', 6: 'junio',
7: 'julio', 8: 'agosto', 9: 'septiembre', 10: 'octubre', 11: 'noviembre',
12: 'diciembre'
}
INP_POPULATION = {
'Ensenada': 466814,
'Mexicali': 936826,
'Playas De Rosarito': 90668,
'Tecate': 101079,
'Tijuana': 1559683
}
INP_STATE_NUMBER = 2
INP_MUNICIPALITIES = 'each'
# %% FUNCTIONS.
def load_result(db, region, resultado, today, retraso=14):
"""Load results."""
df = db.loc[
(db['ENTIDAD_RES'] == region['state_numb']) &
(db['MUNICIPIO_RES'] == region['munic_numb']) &
(db['RESULTADO'] == resultado)
]
df = df.iloc[:, 0].groupby(by=df.index).count()
df.name = 'CASOS'
new_index = pd.date_range(start=df.index[0], end=today)
df = df.reindex(index=new_index).fillna(0)
df = df / (INP_POPULATION[region['munic_name']] / 100000)
df = df.loc[:today - dt.timedelta(days=retraso)]
return df
def tsdec_lowess(x, y, decomposition='mult', frac='optimal', it=100):
"""Estimate the trend of a signal with a LOWESS model."""
def kde_cdf(x, x_steps=500, bandwidth=np.linspace(0, 1, 41)[1:]):
"""
Kernel Density Estimation with Scikit-learn.
Source: https://bit.ly/2Ozk8Jf
"""
x_range = x.max() - x.min()
x_grid_min = x.min() - (x_range)
x_grid_max = x.max() + (x_range)
x_grid = np.linspace(x_grid_min, x_grid_max, x_steps)
grid = GridSearchCV(
estimator=KernelDensity(), param_grid={'bandwidth': bandwidth},
cv=5
)
grid.fit(x[:, None])
bandwidth_best = grid.best_params_['bandwidth']
# Mirror the data about the domain boundary
# Source: https://bit.ly/3frE8ZL
low_bound = 0
x_mir = np.concatenate((x, 2 * low_bound - x))
kde_mir = KernelDensity(bandwidth=bandwidth_best)
kde_mir.fit(x_mir[:, np.newaxis])
# score_samples() returns the log-likelihood of the samples
pdf_mir = np.exp(kde_mir.score_samples(x_grid[:, np.newaxis]))
pdf_mir[x_grid <= low_bound] = 0
pdf_mir = pdf_mir * 2
cdf_mir = pdf_mir.cumsum() * (
(x_grid.max() - x_grid.min()) / (len(x_grid) - 1)
)
f = interp1d(x_grid, cdf_mir)
return f(x)
def is_outlier_peak(x, thresh=0.05):
n = len(x)
diffs = []
for i in range(len(x)):
f = i - 2
t = i + 3
if i <= 1:
f = 0
elif i >= n - 2:
t = n - 1
ref = np.nanmean(x[f:t][x[f:t] != x[i]])
diffs.append(np.abs(x[i] - ref))
diffs = np.where(np.isnan(diffs), 0, diffs)
P = kde_cdf(x=np.array(diffs))
is_peak = P >= (1 - thresh)
return is_peak
def is_outlier_rate(x, thresh=0.05):
rates = np.abs(np.diff(a=x))
P = kde_cdf(x=np.array(rates))
high_rate = P >= (1 - thresh)
aux = True
for i in np.where(high_rate)[0]:
# The high rates in pair indices (2, 4, 6, etc.) correspond to
# the recuperation from the high rates in odd indices (1, 3, 5,
# etc.), so they are removed from the output.
high_rate[i] = aux
aux = not aux
return np.concatenate([[False], high_rate])
def _optimal_f(x, y, f_num=101, it=100):
# Define the optimal value for f.
f_grid = np.linspace(start=0, stop=1, num=f_num)
f_grid = f_grid[f_grid != 0]
rmse_f = {}
for f in f_grid:
rmse_i = []
for i in range(it):
kf = KFold(n_splits=3, shuffle=True, random_state=i)
rmse_k = []
for trn, tst in kf.split(y):
x_trn = x[trn]
y_trn = y[trn]
x_tst = x[tst]
y_tst = y[tst]
y_trn_hat = lowess(endog=y_trn, exog=x_trn, frac=f)[:, 1]
y_tst_hat = np.interp(
x=x_tst, xp=x_trn, fp=y_trn_hat, left=np.nan,
right=np.nan
)
rmse_k.append(
np.sqrt(np.nanmean((y_tst - y_tst_hat) ** 2))
)
rmse_i.append(np.mean(rmse_k))
rmse_f[f] = np.nanmean(rmse_i)
return [f for f in rmse_f if rmse_f[f] == min(rmse_f.values())][-1]
# Flag outliers.
# is_outlier = is_outlier_peak(y) | is_outlier_rate(y)
# x_cleaned = x[~ is_outlier]
# y_cleaned = y[~ is_outlier]
x_cleaned = x
y_cleaned = y
# Time series decomposition.
if frac == 'optimal':
frac_used = _optimal_f(x=x_cleaned, y=y_cleaned, f_num=21, it=100)
else:
frac_used = frac
trend = lowess(endog=y_cleaned, exog=x_cleaned, frac=frac_used)[:, 1]
trend = np.interp(x=x, xp=x_cleaned, fp=trend)
if decomposition == 'add':
residual = y - trend
elif decomposition == 'mult':
residual = y / trend
return (trend, residual)
# %% DATA.
dbcat_municip = pd.read_excel(
io=INP_DATA_CATALOGUE, sheet_name='Catálogo MUNICIPIOS', index_col=[-1, 0]
)
dbcat_states = pd.read_excel(
io=INP_DATA_CATALOGUE, sheet_name='Catálogo de ENTIDADES', index_col=0
)
input_files = sorted(list(Path(INP_DATA_DIR).glob(pattern='**/*.csv')))
file = [
i for i in input_files if i.stem.startswith(INP_DATE.strftime('%y%m%d'))
]
file = file[0]
db = pd.read_csv(file, encoding='latin')
db.index = pd.to_datetime(db['FECHA_SINTOMAS'])
db = db.loc[db['ENTIDAD_RES'] == INP_STATE_NUMBER, :]
aux_pos = {}
aux_pos_trend = {}
aux_pos_ratio = {}
if INP_MUNICIPALITIES == 'each':
# 999: Municipio no especificado.
munic_numb = sorted(
db['MUNICIPIO_RES'].unique()[db['MUNICIPIO_RES'].unique() != 999]
)
for i in munic_numb:
region = {'state_numb': INP_STATE_NUMBER, 'munic_numb': i}
region['state_name'] = dbcat_states.loc[
region['state_numb'], 'ENTIDAD_FEDERATIVA'
].title()
region['munic_name'] = dbcat_municip.loc[
(region['state_numb'], region['munic_numb']), 'MUNICIPIO'
].title()
pos = load_result(db=db, region=region, resultado=1, today=INP_DATE)
neg = load_result(db=db, region=region, resultado=2, today=INP_DATE)
sos = load_result(db=db, region=region, resultado=3, today=INP_DATE)
new_index = pd.date_range(start=pos.index[0], end=pos.index[-1])
neg = neg.reindex(index=new_index).fillna(value=0)
sos = sos.reindex(index=new_index).fillna(value=0)
pos_ratio = (pos / (pos + neg))
aux_pos_ratio[region['munic_name']] = np.nanmean(pos_ratio.iloc[-30:])
pos_modif = pos + (sos * aux_pos_ratio[region['munic_name']])
pos_trend, _ = tsdec_lowess(
x=np.arange(len(pos_modif)), y=pos_modif.values, decomposition='mult',
frac='optimal', it=100
)
aux_pos[region['munic_name']] = pd.Series(
data=pos_modif, index=pos_modif.index
)
aux_pos_trend[region['munic_name']] = pd.Series(
data=pos_trend, index=pos_modif.index
)
print("{}, ready!".format(region['munic_name']))
pos = pd.DataFrame(aux_pos).loc['2020-03-01':]
pos_trend = pd.DataFrame(aux_pos_trend).loc['2020-03-01':]
# %% Plot
plt.rcParams["figure.figsize"] = (3.9370, 5.9055)
fig, ax = plt.subplots(len(munic_numb), 1)
figure_title = (
'COVID-19 en los municipios de\nBaja California,\n'
'al {} de {} de {}'.format(
INP_DATE.strftime('%#d'), INP_MONTH_NUMBER[INP_DATE.month],
INP_DATE.strftime('%Y')
)
)
ax[0].set_title(
label=figure_title,
fontsize=14
)
for m, munic in enumerate(pos_trend.columns):
ax[m].grid(b=True, which='both')
ax[m].set_axisbelow(True)
ax[m].text(
x=0.01,
y=0.8,
s=munic,
transform=ax[m].transAxes
)
# Casos confirmados (observado).
ax[m].scatter(
x=pos.index, y=pos[munic], s=2, c='k', marker='.',
label='Casos diarios registrados', zorder=3
)
# Casos confirmados (tendencia).
ax[m].plot(
pos_trend.index, pos_trend[munic], color=[239/255, 71/255, 111/255],
lw=3, label='Evolución de la tendencia', zorder=2
)
# Jornada Nacional de Sana Distancia.
jnsd_i = dt.datetime(year=2020, month=3, day=23)
jnsd_f = dt.datetime(year=2020, month=6, day=1)
ax[m].axvspan(
xmin=jnsd_i, xmax=jnsd_f, color='0.9', zorder=-1,
label='Jornada Nacional de Sana Distancia'
)
# Formato de los ejes.
ax[m].set_xlim(dt.datetime(year=2020, month=3, day=1), INP_DATE)
myFmt = mdates.DateFormatter("%d-%m")
ax[m].xaxis.set_major_formatter(myFmt)
ax[m].xaxis.set_ticks(
pd.date_range(
start=dt.datetime(year=2020, month=3, day=1), end=INP_DATE,
freq='MS'
)
)
ax[m].set_ylim([0, 15])
ax[m].yaxis.set_ticks([0, 5, 10, 15])
if m < 4:
ax[m].tick_params(
# axis='y', # changes apply to the x-axis
which='both', # both major and minor ticks are affected
bottom=False, # ticks along the bottom edge are off
top=False, # ticks along the top edge are off
left=True,
labelbottom=False, # labels along the bottom edge are off
labelleft=True)
else:
ax[m].tick_params(
# axis='x', # changes apply to the x-axis
which='both', # both major and minor ticks are affected
bottom=True, # ticks along the bottom edge are off
top=False, # ticks along the top edge are off
left=True,
labelbottom=True, # labels along the bottom edge are off
labelleft=True)
ax[m].set_xlabel('Fecha de inicio de síntomas (dd-mm)')
ax[2].set_ylabel('Nuevos casos diarios por cada 100 000 habitantes')
handles, labels = ax[m].get_legend_handles_labels()
# sort both labels and handles by labels
labels, handles = zip(*sorted(zip(labels, handles), key=lambda t: t[0]))
ax[m].legend(handles, labels, loc=(0.01, -1.44), frameon=False)
disclaimer = [
'La información presentada se proporciona en las condiciones en que se '
'encuentra, sin que se\nasumala obligación de ofrecer ningún tipo de '
'garantía. El autor se limita a proporcionar la\ninformación en los '
'términos más precisos posibles derivada de la base de COVID-19, '
'publicada\npor la Dirección General de Epidemiología de la Secretaría de '
'Salud, disponible en\n'
'https://www.gob.mx/salud/documentos/datos-abiertos-152127 y '
'consultados el {}. Así\nmismo, el autor no será responsable de '
'las posibles imprecisiones, errores u omisiones\ncontenidos en dicha '
'base de datos, así como daños directos, indirectos o riesgos '
'financieros\nasociados al uso de esta.'.format(
INP_DATE.strftime('%d/%m/%Y')
)
]
for r, row in enumerate(disclaimer):
fig.text(
x=0.05,
y=-0.18 - (r * 0.015),
s=row,
ha='left',
fontdict=dict(size=5, color='gray'),
wrap=True
)
fig.savefig(
fname=INP_OUTPUT_DIR + '/tendencias_{date}.png'.format(
date=INP_DATE.strftime('%Y%m%d')
),
dpi=400,
bbox_inches="tight"
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment