Created
March 22, 2020 00:14
-
-
Save koizumihiroo/df83bb72c94153acc4ea9049e395ea93 to your computer and use it in GitHub Desktop.
PMID_32046819
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
0.025 0.050 0.500 0.950 0.975 0.990 | |
2.5% 1.269371 1.774799 5.528946 8.637324 9.118343 9.657112 | |
50% 2.100401 2.660046 6.348415 10.329638 11.071968 11.923006 | |
97.5% 2.973586 3.539892 7.457479 13.962904 15.366447 17.026429 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
CAUTION: | |
The porpose of this script is just to translate original R script into Python script; there is | |
no guarantee of accuracy of contents. | |
Original Article: | |
CC BY 4.0: | |
Greatly appreciated the effort of the publishing as open source license. | |
https://www.eurosurveillance.org/content/10.2807/1560-7917.ES.2020.25.5.2000062 | |
https://www.ncbi.nlm.nih.gov/pubmed/32046819 | |
Supplementary Material: | |
https://www.eurosurveillance.org/deliver/fulltext/eurosurveillance/25/5/20-00062_BAKER_SupplementMaterial.zip | |
""" | |
import zipfile | |
from typing_extensions import Final | |
import numpy as np | |
import pandas as pd | |
import pystan | |
import scipy | |
from scipy import stats | |
from scipy.special import gamma | |
from pathlib import Path | |
import requests | |
# https://mc-stan.org/loo/ | |
# https://github.com/avehtari/PSIS | |
# download psis.py into this working directory | |
psis_py = requests.get( | |
'https://raw.githubusercontent.com/avehtari/PSIS/master/py/psis.py') | |
Path('psis.py').write_bytes(psis_py.content) | |
from psis import psisloo | |
# CONST | |
supp_zip: Final[str] = 'https://www.eurosurveillance.org/deliver/fulltext/eurosurveillance/25/5/20-00062_BAKER_SupplementMaterial.zip' | |
tsv_pathname: Final[str] = './20-00062_BAKER_SupplementMaterial/BAKER_Supplementary material S1_data.tsv' | |
seed: Final[int] = 1234 | |
def main() -> None: | |
download_extract_tsv(Path('supp.zip')) | |
data = manipulate_tsv(Path(tsv_pathname)) | |
stanfit = run_stan(data) | |
out = make_output(stanfit) | |
print(out) | |
def download_extract_tsv(zip_path: Path) -> None: | |
# download data | |
zip_file = requests.get(supp_zip) | |
zip_path.write_bytes(zip_file.content) | |
# unzip | |
zipfile.ZipFile(zip_path).extractall() | |
def manipulate_tsv(tsv_path: Path) -> pd.DataFrame: | |
# load data | |
data = pd.read_table(tsv_path, | |
parse_dates=['reporting date', 'exposure_start', 'exposure_end', 'symptom_onset']) | |
data.columns = [x.replace(' ', '_').strip() for x in data.columns] | |
def to_diffdays(df_: pd.DataFrame, colname: str) -> pd.Series: | |
base_date = pd.to_datetime('2019-12-31') | |
return (df_[colname] - base_date).dt.days | |
# pipe-line style of manipulation | |
data2 = data.assign( | |
reporting_date=lambda df_: to_diffdays(df_, 'reporting_date'), | |
tSymptomOnset=lambda df_: to_diffdays(df_, 'symptom_onset'), | |
tStartExposure=lambda df_: to_diffdays(df_, 'exposure_start'), | |
tEndExposure=lambda df_: to_diffdays(df_, 'exposure_end') | |
).assign( | |
tStartExposure=lambda df_: np.where(pd.isna(df_['tStartExposure']), | |
np.min(df_['tSymptomOnset'] - 21), | |
df_['tStartExposure']), | |
tEndExposure=lambda df_: np.where(df_['tEndExposure'] > df_['tSymptomOnset'], | |
df_['tSymptomOnset'], | |
df_['tEndExposure']) | |
) | |
return data2 | |
def run_stan(data: pd.DataFrame) -> ...: | |
""" | |
return type is unclear to me because I cannot access pystan.StanFit4Model object | |
``` | |
(Pdb) p type(stanfit) | |
<class 'stanfit4anon_model_df568c9723cbf759ca5759b36acb4c6f_2883001500315052076.StanFit4Model'> | |
``` | |
""" | |
input_data = { | |
'N': len(data), | |
'tStartExposure': data['tStartExposure'].to_numpy(), | |
'tEndExposure': data['tEndExposure'].to_numpy(), | |
'tSymptomOnset': data['tSymptomOnset'].to_numpy(), | |
} | |
# stan | |
model_code = """ | |
data{ | |
int <lower = 1> N; | |
vector[N] tStartExposure; | |
vector[N] tEndExposure; | |
vector[N] tSymptomOnset; | |
} | |
parameters{ | |
real<lower = 0> alphaInc; // Shape parameter of weibull distributed incubation period | |
real<lower = 0> sigmaInc; // Scale parameter of weibull distributed incubation period | |
vector<lower = 0, upper = 1>[N] uE; // Uniform value for sampling between start and end exposure | |
} | |
transformed parameters{ | |
vector[N] tE; // infection moment | |
tE = tStartExposure + uE .* (tEndExposure - tStartExposure); | |
} | |
model{ | |
// Contribution to likelihood of incubation period | |
target += weibull_lpdf(tSymptomOnset - tE | alphaInc, sigmaInc); | |
} | |
generated quantities { | |
// likelihood for calculation of looIC | |
vector[N] log_lik; | |
for (i in 1:N) { | |
log_lik[i] = weibull_lpdf(tSymptomOnset[i] - tE[i] | alphaInc, sigmaInc); | |
} | |
} | |
""" | |
model = pystan.StanModel(model_code=model_code) | |
stanfit = model.sampling( | |
data=input_data, | |
warmup=4000, | |
iter=14000, | |
chains=8, | |
seed=seed # Added for reproducibility | |
) | |
return stanfit | |
def make_output(stanfit) -> pd.DataFrame: | |
loo, loos, ks = psisloo(stanfit['log_lik']) | |
# see modelfit | |
print(f"loo: {loo}") | |
ext = stanfit.extract() | |
alpha = ext['alphaInc'] | |
sigma = ext['sigmaInc'] | |
probs = [2.5, 50, 97.5] | |
# posterior median and 95%CI of mean | |
np.percentile(sigma * scipy.special.gamma(1 + 1 / alpha), | |
probs) | |
# posterior median and 95%CI of sd | |
np.percentile(np.sqrt(sigma**2 * (gamma(1 + 2 / alpha) - (gamma(1 + 1 / alpha))**2)), | |
probs) | |
qs = [0.025, 0.05, 0.5, 0.95, 0.975, 0.99] | |
# Python: stats.weibull_min.ppf(0.025, c=4.13336, loc=0, scale=7.317828) | |
# R: qweibull(p = 0.025, shape = 4.13336, scale = 7.317828) | |
# posterior median and 95%CI of percentiles | |
percentiles = np.array([ | |
np.percentile( | |
stats.weibull_min.ppf(x, c=alpha, loc=0, scale=sigma), | |
probs) | |
for x | |
in qs | |
]) | |
out = pd.DataFrame( | |
percentiles.T, | |
index=["2.5%", "50%", "97.5%"], | |
columns=qs) | |
return out | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment