Skip to content

Instantly share code, notes, and snippets.

@koizumihiroo
Created March 22, 2020 00:14
Show Gist options
  • Save koizumihiroo/df83bb72c94153acc4ea9049e395ea93 to your computer and use it in GitHub Desktop.
Save koizumihiroo/df83bb72c94153acc4ea9049e395ea93 to your computer and use it in GitHub Desktop.
PMID_32046819
0.025 0.050 0.500 0.950 0.975 0.990
2.5% 1.269371 1.774799 5.528946 8.637324 9.118343 9.657112
50% 2.100401 2.660046 6.348415 10.329638 11.071968 11.923006
97.5% 2.973586 3.539892 7.457479 13.962904 15.366447 17.026429
"""
CAUTION:
The porpose of this script is just to translate original R script into Python script; there is
no guarantee of accuracy of contents.
Original Article:
CC BY 4.0:
Greatly appreciated the effort of the publishing as open source license.
https://www.eurosurveillance.org/content/10.2807/1560-7917.ES.2020.25.5.2000062
https://www.ncbi.nlm.nih.gov/pubmed/32046819
Supplementary Material:
https://www.eurosurveillance.org/deliver/fulltext/eurosurveillance/25/5/20-00062_BAKER_SupplementMaterial.zip
"""
import zipfile
from typing_extensions import Final
import numpy as np
import pandas as pd
import pystan
import scipy
from scipy import stats
from scipy.special import gamma
from pathlib import Path
import requests
# https://mc-stan.org/loo/
# https://github.com/avehtari/PSIS
# download psis.py into this working directory
psis_py = requests.get(
'https://raw.githubusercontent.com/avehtari/PSIS/master/py/psis.py')
Path('psis.py').write_bytes(psis_py.content)
from psis import psisloo
# CONST
supp_zip: Final[str] = 'https://www.eurosurveillance.org/deliver/fulltext/eurosurveillance/25/5/20-00062_BAKER_SupplementMaterial.zip'
tsv_pathname: Final[str] = './20-00062_BAKER_SupplementMaterial/BAKER_Supplementary material S1_data.tsv'
seed: Final[int] = 1234
def main() -> None:
download_extract_tsv(Path('supp.zip'))
data = manipulate_tsv(Path(tsv_pathname))
stanfit = run_stan(data)
out = make_output(stanfit)
print(out)
def download_extract_tsv(zip_path: Path) -> None:
# download data
zip_file = requests.get(supp_zip)
zip_path.write_bytes(zip_file.content)
# unzip
zipfile.ZipFile(zip_path).extractall()
def manipulate_tsv(tsv_path: Path) -> pd.DataFrame:
# load data
data = pd.read_table(tsv_path,
parse_dates=['reporting date', 'exposure_start', 'exposure_end', 'symptom_onset'])
data.columns = [x.replace(' ', '_').strip() for x in data.columns]
def to_diffdays(df_: pd.DataFrame, colname: str) -> pd.Series:
base_date = pd.to_datetime('2019-12-31')
return (df_[colname] - base_date).dt.days
# pipe-line style of manipulation
data2 = data.assign(
reporting_date=lambda df_: to_diffdays(df_, 'reporting_date'),
tSymptomOnset=lambda df_: to_diffdays(df_, 'symptom_onset'),
tStartExposure=lambda df_: to_diffdays(df_, 'exposure_start'),
tEndExposure=lambda df_: to_diffdays(df_, 'exposure_end')
).assign(
tStartExposure=lambda df_: np.where(pd.isna(df_['tStartExposure']),
np.min(df_['tSymptomOnset'] - 21),
df_['tStartExposure']),
tEndExposure=lambda df_: np.where(df_['tEndExposure'] > df_['tSymptomOnset'],
df_['tSymptomOnset'],
df_['tEndExposure'])
)
return data2
def run_stan(data: pd.DataFrame) -> ...:
"""
return type is unclear to me because I cannot access pystan.StanFit4Model object
```
(Pdb) p type(stanfit)
<class 'stanfit4anon_model_df568c9723cbf759ca5759b36acb4c6f_2883001500315052076.StanFit4Model'>
```
"""
input_data = {
'N': len(data),
'tStartExposure': data['tStartExposure'].to_numpy(),
'tEndExposure': data['tEndExposure'].to_numpy(),
'tSymptomOnset': data['tSymptomOnset'].to_numpy(),
}
# stan
model_code = """
data{
int <lower = 1> N;
vector[N] tStartExposure;
vector[N] tEndExposure;
vector[N] tSymptomOnset;
}
parameters{
real<lower = 0> alphaInc; // Shape parameter of weibull distributed incubation period
real<lower = 0> sigmaInc; // Scale parameter of weibull distributed incubation period
vector<lower = 0, upper = 1>[N] uE; // Uniform value for sampling between start and end exposure
}
transformed parameters{
vector[N] tE; // infection moment
tE = tStartExposure + uE .* (tEndExposure - tStartExposure);
}
model{
// Contribution to likelihood of incubation period
target += weibull_lpdf(tSymptomOnset - tE | alphaInc, sigmaInc);
}
generated quantities {
// likelihood for calculation of looIC
vector[N] log_lik;
for (i in 1:N) {
log_lik[i] = weibull_lpdf(tSymptomOnset[i] - tE[i] | alphaInc, sigmaInc);
}
}
"""
model = pystan.StanModel(model_code=model_code)
stanfit = model.sampling(
data=input_data,
warmup=4000,
iter=14000,
chains=8,
seed=seed # Added for reproducibility
)
return stanfit
def make_output(stanfit) -> pd.DataFrame:
loo, loos, ks = psisloo(stanfit['log_lik'])
# see modelfit
print(f"loo: {loo}")
ext = stanfit.extract()
alpha = ext['alphaInc']
sigma = ext['sigmaInc']
probs = [2.5, 50, 97.5]
# posterior median and 95%CI of mean
np.percentile(sigma * scipy.special.gamma(1 + 1 / alpha),
probs)
# posterior median and 95%CI of sd
np.percentile(np.sqrt(sigma**2 * (gamma(1 + 2 / alpha) - (gamma(1 + 1 / alpha))**2)),
probs)
qs = [0.025, 0.05, 0.5, 0.95, 0.975, 0.99]
# Python: stats.weibull_min.ppf(0.025, c=4.13336, loc=0, scale=7.317828)
# R: qweibull(p = 0.025, shape = 4.13336, scale = 7.317828)
# posterior median and 95%CI of percentiles
percentiles = np.array([
np.percentile(
stats.weibull_min.ppf(x, c=alpha, loc=0, scale=sigma),
probs)
for x
in qs
])
out = pd.DataFrame(
percentiles.T,
index=["2.5%", "50%", "97.5%"],
columns=qs)
return out
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment