Skip to content

Instantly share code, notes, and snippets.

@boldfield
Last active August 15, 2021 18:02
Show Gist options
  • Save boldfield/8f7467722d51c59d391f3bad48424a9f to your computer and use it in GitHub Desktop.
Save boldfield/8f7467722d51c59d391f3bad48424a9f to your computer and use it in GitHub Desktop.
Python script used to analyze Washington State COVID breakthrough data
#!/usr/bin/env python
import csv
import gspread
import hashlib
import math
import os
import re
from datetime import datetime, timedelta
from oauth2client.service_account import ServiceAccountCredentials
import PyPDF2 as pypdf
from time import sleep
ASCII_A = 65
ASCII_Z = 90
date_pattern = re.compile(
r".*At a Glance \(\s?data from ([\w]+ [\d]{1,2}, [\d]{4})\s?-?\s?([\w]+ [\d]{1,2}, [\d]{4})\s?\).*"
)
case_count_pattern = re.compile(
r".*\s([\d]+)\s+SARS-CoV-2\s?vaccine\s?breakthrough\s?cases\s?have\s?been\s?identified.*"
)
hospitalized_pct_pattern = re.compile(
r".*\s([\d]{1,2})% were hospitalized.*"
)
death_count_pattern = re.compile(
r".*\s([\d]+) people died of COVID-related illness.*"
)
BREAKTHROUGH_CSV_KEYS = [
"period_start_date",
"period_end_date",
"days_since_last_report",
"case_count",
"period_change_case_count",
"daily_case_count",
"hospitalized_count",
"period_change_hospitalized_count",
"daily_hospitalized_count",
"death_count",
"period_change_death_count",
"daily_death_count",
"interpolated",
]
BREAKTHROUGH_AFTER_COVID_CSV_KEYS = [
"period_start_date",
"period_end_date",
"days_since_last_report",
"pre_delta_case_count",
"pre_delta_period_change_case_count",
"pre_delta_daily_case_count",
"pre_delta_daily_case_count_per_100k",
"pre_delta_hospitalized_count",
"pre_delta_period_change_hospitalized_count",
"pre_delta_daily_hospitalized_count",
"pre_delta_daily_hospitalized_count_per_100k",
"pre_delta_death_count",
"pre_delta_period_change_death_count",
"pre_delta_daily_death_count",
"pre_delta_daily_death_count_per_100k",
"post_delta_case_count",
"post_delta_period_change_case_count",
"post_delta_daily_case_count",
"post_delta_daily_case_count_per_100k",
"post_delta_hospitalized_count",
"post_delta_period_change_hospitalized_count",
"post_delta_daily_hospitalized_count",
"post_delta_daily_hospitalized_count_per_100k",
"post_delta_death_count",
"post_delta_period_change_death_count",
"post_delta_daily_death_count",
"post_delta_daily_death_count_per_100k",
"normalized_period_date",
]
EPI_CSV_KEYS = [
"period_start_date",
"period_end_date",
"days_since_last_report",
"case_count",
"period_change_case_count",
"daily_case_count",
]
VACCINE_CSV_KEYS = [
"record_date",
"total_pop",
"unvaccinated_pop",
"vaccinated_pop",
"period_change_vacination",
]
TEST_CSV_KEYS = [
"record_date",
"days_since_last_record",
"total_tests",
"period_change_tests",
"total_positive_tests",
"period_change_positive_tests",
"total_negative_tests",
"period_change_negative_tests",
]
AGGREGATE_CSV_KEYS = [
"period_start_date",
"period_end_date",
"days_since_last_report",
"case_count",
"period_change_case_count",
"daily_case_count",
"unvaccinated_case_count",
"unvaccinated_period_change_case_count",
"unvaccinated_daily_case_count",
"unvaccinated_case_pct_cohort",
"unvaccinated_case_per_100k",
"unvaccinated_period_change_case_pct_cohort",
"unvaccinated_period_change_case_per_100k",
"unvaccinated_daily_case_pct_cohort",
"unvaccinated_daily_case_per_100k",
"vaccinated_case_count",
"vaccinated_period_change_case_count",
"vaccinated_daily_case_count",
"vaccinated_case_pct_cohort",
"vaccinated_case_per_100k",
"vaccinated_period_change_case_pct_cohort",
"vaccinated_period_change_case_per_100k",
"vaccinated_daily_case_pct_cohort",
"vaccinated_daily_case_per_100k",
]
PRELIM_CSV_KEYS = [
"date",
"case_count",
"case_pct_pop",
"period_change_case_count",
"period_change_case_pct_pop",
"period_change_case_per_100k",
"daily_case_count",
"daily_case_pct_pop",
"daily_case_per_100k",
]
SHIFTED_PRELIM_CSV_KEYS = [
"sop_observation_date",
"sop_shifted_period_start_date",
"sop_shifted_period_end_date",
"sop_days_since_last_report",
"sop_case_count",
"sop_case_pct_pop",
"sop_case_per_100k",
"sop_period_change_case_count",
"sop_period_change_case_pct_pop",
"sop_period_change_case_per_100k",
"sop_daily_case_count",
"sop_daily_case_pct_pop",
"sop_daily_case_per_100k",
]
TEST_DATA_CSV_KEYS = [
"record_date",
"days_since_last_record",
"total_tests",
"period_change_tests",
"total_positive_tests",
"period_change_positive_tests",
"total_negative_tests",
"period_change_negative_tests",
]
G_CREDS = "{}/.google/google-creds.json".format(os.environ.get("HOME"))
G_SHEET_ID = "<SHEET_ID>"
WORK_ROOT = os.path.dirname(os.path.abspath(__file__))
UNPROCESSED_DIR = "{}/unprocessed".format(WORK_ROOT)
PROCESSED_DIR = "{}/processed".format(WORK_ROOT)
PDF_DIR = "{}/pdf".format(WORK_ROOT)
RAW_EPI_DATA = "{}/EpiCurve_Count_Cases_Hospitalizations_Deaths.csv".format(UNPROCESSED_DIR)
RAW_VACCINE_DATA = "{}/trends_in_number_of_covid19_vaccinations_in_wa.csv".format(UNPROCESSED_DIR)
RAW_TEST_DATA = "{}/PUBLIC_Tests_by_Specimen_Collection.csv".format(UNPROCESSED_DIR)
BREAKTHROUGH_DATA_CSV = "{}/breakthrough-data.csv".format(PROCESSED_DIR)
DELTA_ADJUSTED_BREAKTHROUGH_DATA_CSV = "{}/delta-breakthrough-data.csv".format(PROCESSED_DIR)
EPI_DATA_CSV = "{}/epi-data.csv".format(PROCESSED_DIR)
VACCINE_DATA_CSV = "{}/vaccine-data.csv".format(PROCESSED_DIR)
AGGREGATE_DATA_CSV = "{}/aggregate-data.csv".format(PROCESSED_DIR)
EARLY_DATA_CSV = "{}/early-data.csv".format(PROCESSED_DIR)
SHIFTED_EARLY_DATA_CSV = "{}/rezeroed-early-data.csv".format(PROCESSED_DIR)
TEST_DATA_CSV = "{}/test-data.csv".format(PROCESSED_DIR)
GDOC_TAB_MAP = {
"Intro": 0,
"Analysis": 1,
"Aggregate": 2,
"Breakthrough Data": 3,
"Delta Adjusted Breakthrough Data": 4,
"Epi Data": 5,
"Vaccine Data": 6,
"Early Data": 7,
"Re-Zeroed Data": 8,
"Test Data": 9,
}
WA_POP = 7656200
DELTA_BREAK_POINT = datetime(year=2021, month=6, day=21)
API_RATE_LIMIT_SLEEP = 5
def get_google_client():
scope = ['https://spreadsheets.google.com/feeds','https://www.googleapis.com/auth/drive']
creds = ServiceAccountCredentials.from_json_keyfile_name(G_CREDS, scope)
return gspread.authorize(creds)
def identify_unprocessed_files():
unprocessed_files = {}
processed_files = {}
dupe_files = {}
for fname in os.listdir(UNPROCESSED_DIR):
if os.stat("{}/{}".format(UNPROCESSED_DIR, fname)).st_size == 0:
print("Deleting empty file: {}".format(fname))
os.remove("{}/{}".format(UNPROCESSED_DIR, fname))
continue
if os.path.splitext(fname)[-1].lower() != ".pdf":
continue
md5 = hashlib.md5(open("{}/{}".format(UNPROCESSED_DIR, fname),'rb').read()).hexdigest()
unprocessed_files[md5] = fname
for fname in os.listdir(PDF_DIR):
md5 = hashlib.md5(open("{}/{}".format(PDF_DIR, fname),'rb').read()).hexdigest()
processed_files[md5] = fname
for unprocessed_md5 in list(unprocessed_files.keys()):
if unprocessed_md5 in processed_files:
dupe_files[unprocessed_md5] = unprocessed_files[unprocessed_md5]
del unprocessed_files[unprocessed_md5]
return unprocessed_files, dupe_files
def handle_processing_exception(pdir, fname, ex):
print("encountered an exception processing {}. {}".format(fname, ex))
def process_shit_date(dstring):
return datetime.strptime(dstring, '%B %d, %Y')
def process_pdf(pdir, fname):
with open(os.path.join(pdir, fname), 'rb') as fp:
pdfp = pypdf.PdfFileReader(fp)
text = ""
for p in range(pdfp.numPages):
text += pdfp.getPage(p).extractText()
stripped_text = text.replace('\n', "")
try:
start_date_raw, end_date_raw = date_pattern.match(stripped_text).groups()
start_date, end_date = process_shit_date(start_date_raw), process_shit_date(end_date_raw)
case_count = int(case_count_pattern.match(stripped_text).groups()[0])
hospitalized_pct = int(hospitalized_pct_pattern.match(stripped_text).groups()[0])
hospitalized_count = int(math.floor(float(case_count) * hospitalized_pct/100.0))
death_count = int(death_count_pattern.match(stripped_text).groups()[0])
except Exception as e:
handle_processing_exception(pdir, fname, e)
return start_date, end_date, case_count, hospitalized_count, death_count
def process_breakthrough_data(unprocessed_files, processed_dates):
data = []
for fname in sorted(unprocessed_files.values()):
start_date, end_date, case_count, hospitalized_count, death_count = process_pdf(UNPROCESSED_DIR, fname)
if end_date not in processed_dates:
row = {
"_period_start_date": start_date,
"_period_end_date": end_date,
"period_start_date": start_date.strftime("%Y/%m/%d"),
"period_end_date": end_date.strftime("%Y/%m/%d"),
"days_since_last_report": 0,
"case_count": case_count,
"period_change_case_count": 0,
"daily_case_count": 0,
"hospitalized_count": hospitalized_count,
"period_change_hospitalized_count": 0,
"daily_hospitalized_count": 0,
"death_count": death_count,
"period_change_death_count": 0,
"daily_death_count": 0,
}
if data:
row["days_since_last_report"] = (end_date - data[-1]["_period_end_date"]).days
row["period_change_case_count"] = case_count - data[-1]["case_count"]
row["daily_case_count"] = int(float(case_count - data[-1]["case_count"])/float(row["days_since_last_report"]))
row["period_change_hospitalized_count"] = hospitalized_count - data[-1]["hospitalized_count"]
row["daily_hospitalized_count"] = int(float(hospitalized_count - data[-1]["hospitalized_count"])/float(row["days_since_last_report"]))
row["period_change_death_count"] = death_count - data[-1]["death_count"]
row["daily_death_count"] = int(float(death_count - data[-1]["death_count"])/float(row["days_since_last_report"]))
data.append(row)
processed_dates.append(end_date)
data, processed_dates = interpolate_missing_data(data)
return data, processed_dates
def interpolate_missing_data(data, day_step=7):
adjusted_data = []
adjusted_processed_dates = []
for row in data:
if (row["days_since_last_report"] != 0) and (row["days_since_last_report"] != day_step):
new_period_case_step = row["case_count"] / row["days_since_last_report"]
new_period_hospitalized_step = row["hospitalized_count"] / row["days_since_last_report"]
new_period_death_step = row["death_count"] / row["days_since_last_report"]
num_steps = row["days_since_last_report"] / day_step
for i in range(num_steps):
adjusted_data.append({
"_period_start_date": adjusted_data[-1]["_period_start_date"] + timedelta(days=day_step),
"_period_end_date": adjusted_data[-1]["_period_end_date"] + timedelta(days=day_step),
"period_start_date": (adjusted_data[-1]["_period_start_date"] + timedelta(days=day_step)).strftime("%Y/%m/%d"),
"period_end_date": (adjusted_data[-1]["_period_end_date"] + timedelta(days=day_step)).strftime("%Y/%m/%d"),
"days_since_last_report": day_step,
"case_count": adjusted_data[-1]["case_count"] + (new_period_case_step * (i + 1)),
"period_change_case_count": new_period_case_step,
"daily_case_count": new_period_case_step / day_step,
"hospitalized_count": adjusted_data[-1]["hospitalized_count"] + (new_period_case_step * (i + 1)),
"period_change_hospitalized_count": new_period_hospitalized_step,
"daily_hospitalized_count": new_period_hospitalized_step / day_step,
"death_count": adjusted_data[-1]["death_count"] + (new_period_death_step * (i + 1)),
"period_change_death_count": new_period_death_step,
"daily_death_count": new_period_death_step / day_step,
"interpolated": True if i < (num_steps - 1) else False,
})
adjusted_processed_dates.append(adjusted_data[-1]["_period_end_date"])
else:
row["interpolated"] = False
row["normalized_period_date"] = 0 if not adjusted_data else (adjusted_data[0]["_period_end_date"] - adjusted_data[0]["_period_end_date"]).days + day_step,
adjusted_data.append(row)
adjusted_processed_dates.append(adjusted_data[-1]["_period_end_date"])
return adjusted_data, adjusted_processed_dates
def vaccinated_pop_for_date(vac_data, date_str):
return filter(
lambda d: d["record_date"] == date_str,
vac_data
)[0]["vaccinated_pop"]
def process_breakthrough_after_delta_data(breakthrough_data, vac_data):
data = []
pre_delta = filter(lambda d: d["_period_end_date"] < DELTA_BREAK_POINT, breakthrough_data)
post_delta = filter(lambda d: d["_period_end_date"] >= DELTA_BREAK_POINT, breakthrough_data)
for i in range(len(pre_delta)):
vac_pop = vaccinated_pop_for_date(vac_data, pre_delta[i]["period_end_date"])
data.append({
"_period_start_date": pre_delta[i]["_period_start_date"],
"_period_end_date": pre_delta[i]["_period_end_date"],
"period_start_date":pre_delta[i]["period_start_date"],
"period_end_date": pre_delta[i]["period_end_date"],
"days_since_last_report": pre_delta[i]["days_since_last_report"],
"pre_delta_case_count": pre_delta[i]["case_count"],
"pre_delta_period_change_case_count": pre_delta[i]["period_change_case_count"],
"pre_delta_daily_case_count": pre_delta[i]["daily_case_count"],
"pre_delta_daily_case_count_per_100k": float(pre_delta[i]["daily_case_count"]) / vac_pop * 100000.,
"pre_delta_hospitalized_count": pre_delta[i]["hospitalized_count"],
"pre_delta_period_change_hospitalized_count": pre_delta[i]["period_change_hospitalized_count"],
"pre_delta_daily_hospitalized_count": pre_delta[i]["daily_hospitalized_count"],
"pre_delta_daily_hospitalized_count_per_100k": float(pre_delta[i]["daily_hospitalized_count"]) / vac_pop * 100000.,
"pre_delta_death_count": pre_delta[i]["death_count"],
"pre_delta_period_change_death_count": pre_delta[i]["period_change_death_count"],
"pre_delta_daily_death_count": pre_delta[i]["daily_death_count"],
"pre_delta_daily_death_count_per_100k": float(pre_delta[i]["daily_death_count"]) / vac_pop * 100000.,
"post_delta_case_count": "",
"post_delta_period_change_case_count": "",
"post_delta_daily_case_count": "",
"post_delta_daily_case_count_per_100k": "",
"post_delta_hospitalized_count": "",
"post_delta_period_change_hospitalized_count": "",
"post_delta_daily_hospitalized_count": "",
"post_delta_daily_hospitalized_count_per_100k": "",
"post_delta_death_count": "",
"post_delta_period_change_death_count": "",
"post_delta_daily_death_count": "",
"post_delta_daily_death_count_per_100k": "",
"normalized_period_date": 0 if not data else (pre_delta[i]["_period_end_date"] - data[0]["_period_end_date"]).days,
})
for i in range(len(post_delta)):
vac_pop = vaccinated_pop_for_date(vac_data, post_delta[i]["period_end_date"])
this_row = {
"post_delta_case_count": post_delta[i]["case_count"],
"post_delta_period_change_case_count": post_delta[i]["period_change_case_count"],
"post_delta_daily_case_count": post_delta[i]["daily_case_count"],
"post_delta_daily_case_count_per_100k": float(post_delta[i]["daily_case_count"]) / vac_pop * 100000.,
"post_delta_hospitalized_count": post_delta[i]["hospitalized_count"],
"post_delta_period_change_hospitalized_count": post_delta[i]["period_change_hospitalized_count"],
"post_delta_daily_hospitalized_count": post_delta[i]["daily_hospitalized_count"],
"post_delta_daily_hospitalized_count_per_100k": float(post_delta[i]["daily_hospitalized_count"]) / vac_pop * 100000,
"post_delta_death_count": post_delta[i]["death_count"],
"post_delta_period_change_death_count": post_delta[i]["period_change_death_count"],
"post_delta_daily_death_count": post_delta[i]["daily_death_count"],
"post_delta_daily_death_count_per_100k": float(post_delta[i]["daily_death_count"]) / vac_pop * 100000,
}
if i >= len(data):
intervals_past = i - len(data)
_period_start_date = pre_delta[0]["_period_start_date"]
_period_end_date = pre_delta[-1]["_period_end_date"] + timedelta(days=(intervals_past * post_delta[i]["days_since_last_report"]))
data.append({
"_period_start_date": _period_start_date,
"_period_end_date": _period_end_date,
"period_start_date":_period_start_date.strftime("%Y/%m/%d"),
"period_end_date": _period_end_date.strftime("%Y/%m/%d"),
"days_since_last_report": post_delta[i]["days_since_last_report"],
"pre_delta_case_count": "",
"pre_delta_period_change_case_count": "",
"pre_delta_daily_case_count": "",
"pre_delta_daily_case_count_per_100k": "",
"pre_delta_hospitalized_count": "",
"pre_delta_period_change_hospitalized_count": "",
"pre_delta_daily_hospitalized_count": "",
"pre_delta_daily_hospitalized_count_per_100k": "",
"pre_delta_death_count": "",
"pre_delta_period_change_death_count": "",
"pre_delta_daily_death_count": "",
"pre_delta_daily_death_count_per_100k": "",
})
data[i].update(this_row)
return data
def process_epi_data(global_start_date, processed_dates):
data = []
global_start_date_str = start_date_str = global_start_date.strftime("%Y/%m/%d")
start_date = global_start_date
prev_case_count = 0
case_count = 0
for date in processed_dates:
end_date_str = date.strftime("%Y/%m/%d")
with open(RAW_EPI_DATA, 'r') as fp:
csv_reader = csv.DictReader(fp, delimiter=',')
for row in csv_reader:
if row["County"] != "Statewide":
continue
if row["Earliest Specimen Collection Date"] <= start_date_str or row["Earliest Specimen Collection Date"] > end_date_str:
continue
case_count += int(row["Total Cases"])
period_change_case_count = 0 if prev_case_count == 0 else case_count - prev_case_count
daily_case_count = period_change_case_count / (date - start_date).days
data.append({
"period_start_date": global_start_date_str,
"period_end_date": end_date_str,
"days_since_last_report": (date - start_date).days if data else 0,
"case_count": case_count,
"period_change_case_count": period_change_case_count,
"daily_case_count": daily_case_count,
})
start_date_str = end_date_str
start_date = date
prev_case_count = case_count
return data
def process_vaccine_data(processed_dates):
data = []
prev_vac_count = 0
vac_count = 0
start_date_str = ''
for date in processed_dates:
end_date_str = date.strftime("%Y-%m-%d")
with open(RAW_VACCINE_DATA, 'r') as fp:
csv_reader = csv.DictReader(fp, delimiter=',')
for row in csv_reader:
if row["Date"] == "N/A":
continue
#r_date = (datetime.strptime(row["Date"], "%Y-%m-%d") + timedelta(days=14)).strftime("%Y-%m-%d")
r_date = row["Date"]
if (r_date < start_date_str) or (r_date > end_date_str):
continue
vac_count += int(row["Daily Count of People Fully Vaccinated"]) if row["Daily Count of People Fully Vaccinated"] else 0
start_date_str = end_date_str
period_change_vac_count = 0 if prev_vac_count == 0 else vac_count - prev_vac_count
prev_vac_count = vac_count
data.append({
"record_date": date.strftime("%Y/%m/%d"),
"total_pop": WA_POP,
"unvaccinated_pop": WA_POP - vac_count,
"vaccinated_pop": vac_count,
"period_change_vacination": period_change_vac_count,
})
return data
def aggregate_data(breakthrough_data, epi_data, vac_data):
data = []
for i in range(len(breakthrough_data)):
unvac_case_count = epi_data[i]["case_count"] - breakthrough_data[i]["case_count"]
unvac_case_period_change_count = epi_data[i]["period_change_case_count"] - breakthrough_data[i]["period_change_case_count"]
unvac_case_daily_count = epi_data[i]["daily_case_count"] - breakthrough_data[i]["daily_case_count"]
unvac_case_pct = float(unvac_case_count)/float(vac_data[i]["unvaccinated_pop"]) * 100
unvac_case_period_change_pct = float(unvac_case_period_change_count)/float(vac_data[i]["unvaccinated_pop"]) * 100
unvac_case_daily_pct = float(unvac_case_daily_count)/float(vac_data[i]["unvaccinated_pop"]) * 100
unvac_case_100k = float(unvac_case_count)/float(vac_data[i]["unvaccinated_pop"]) * 100000
unvac_case_period_change_100k = float(unvac_case_period_change_count)/float(vac_data[i]["unvaccinated_pop"]) * 100000
unvac_case_daily_100k = float(unvac_case_daily_count)/float(vac_data[i]["unvaccinated_pop"]) * 100000
vac_case_pct = float(breakthrough_data[i]["case_count"])/float(vac_data[i]["vaccinated_pop"]) * 100
vac_case_period_change_pct = float(breakthrough_data[i]["period_change_case_count"])/float(vac_data[i]["vaccinated_pop"]) * 100
vac_case_daily_pct = float(breakthrough_data[i]["daily_case_count"])/float(vac_data[i]["vaccinated_pop"]) * 100
vac_case_100k = float(breakthrough_data[i]["case_count"])/float(vac_data[i]["vaccinated_pop"]) * 100000
vac_case_period_change_100k = float(breakthrough_data[i]["period_change_case_count"])/float(vac_data[i]["vaccinated_pop"]) * 100000
vac_case_daily_100k = float(breakthrough_data[i]["daily_case_count"])/float(vac_data[i]["vaccinated_pop"]) * 100000
data.append({
"period_start_date": breakthrough_data[i]["period_start_date"],
"period_end_date": breakthrough_data[i]["period_end_date"],
"days_since_last_report": breakthrough_data[i]["days_since_last_report"],
"case_count": epi_data[i]["case_count"],
"period_change_case_count": epi_data[i]["period_change_case_count"],
"daily_case_count": epi_data[i]["daily_case_count"],
"unvaccinated_case_count": unvac_case_count,
"unvaccinated_period_change_case_count": unvac_case_period_change_count,
"unvaccinated_daily_case_count": unvac_case_daily_count,
"unvaccinated_case_pct_cohort": unvac_case_pct,
"unvaccinated_case_per_100k": unvac_case_100k,
"unvaccinated_period_change_case_pct_cohort": unvac_case_period_change_pct,
"unvaccinated_period_change_case_per_100k": unvac_case_period_change_100k,
"unvaccinated_daily_case_pct_cohort": unvac_case_daily_pct,
"unvaccinated_daily_case_per_100k": unvac_case_daily_100k,
"vaccinated_case_count": breakthrough_data[i]["case_count"],
"vaccinated_period_change_case_count": breakthrough_data[i]["period_change_case_count"],
"vaccinated_daily_case_count": breakthrough_data[i]["daily_case_count"],
"vaccinated_case_pct_cohort": vac_case_pct,
"vaccinated_case_per_100k": vac_case_100k,
"vaccinated_period_change_case_pct_cohort": vac_case_period_change_pct,
"vaccinated_period_change_case_per_100k": vac_case_period_change_100k,
"vaccinated_daily_case_pct_cohort": vac_case_daily_pct,
"vaccinated_daily_case_per_100k": vac_case_daily_100k,
})
return data
def process_test_data(period_dates):
data = []
positive_count = 0
negative_count = 0
with open(RAW_TEST_DATA, 'r') as fp:
csv_reader = csv.DictReader(fp, delimiter=',')
for row in csv_reader:
if row["County"] != "Statewide":
continue
date = datetime.strptime(row["Day"], '%Y-%m-%d')
if date < period_dates[0]:
continue
if date > period_dates[-1]:
break
positive_count += int(row["Positive"])
negative_count += int(row["Negative"])
if date not in period_dates:
continue
data.append({
"_record_date": date,
"record_date": date.strftime("%Y/%m/%d"),
"days_since_last_record": 0 if not data else (date - data[-1]["_record_date"]).days,
"total_tests": positive_count + negative_count,
"period_change_tests": 0 if not data else (positive_count + negative_count) - data[-1]["total_tests"],
"total_positive_tests": positive_count,
"period_change_positive_tests": 0 if not data else positive_count - data[-1]["total_positive_tests"],
"total_negative_tests": negative_count,
"period_change_negative_tests": 0 if not data else negative_count - data[-1]["total_negative_tests"],
})
return data
def progression_until_start():
case_count = 0
prev_case_count = 0
date = None
data = []
with open(RAW_EPI_DATA, 'r') as fp:
csv_reader = csv.DictReader(fp, delimiter=',')
for row in csv_reader:
if row["County"] != "Statewide":
continue
if date is None:
date = datetime.strptime(row["Earliest Specimen Collection Date"], '%Y/%m/%d')
stop_date = date + timedelta(days=6)
case_count += int(row["Total Cases"])
if row["Earliest Specimen Collection Date"] >= stop_date.strftime('%Y/%m/%d'):
case_pct_pop = float(case_count) / float(WA_POP) * 100.
period_change_case_count = 0 if prev_case_count == 0 else case_count - prev_case_count
period_change_case_pct_pop = float(period_change_case_count) / float(WA_POP) * 100.
period_change_case_100k = float(period_change_case_count) / float(WA_POP) * 100000
daily_case_count = period_change_case_count / (stop_date - date).days
daily_case_pct_pop = float(daily_case_count) / float(WA_POP) * 100.
daily_case_100k = float(daily_case_count) / float(WA_POP) * 100000
data.append({
"date": date.strftime('%Y/%m/%d'),
"case_count": case_count,
"case_pct_pop": case_pct_pop,
"period_change_case_count": period_change_case_count,
"period_change_case_pct_pop": period_change_case_pct_pop,
"period_change_case_per_100k": period_change_case_100k,
"daily_case_count": daily_case_count,
"daily_case_pct_pop": daily_case_pct_pop,
"daily_case_per_100k": daily_case_100k,
})
if row["Earliest Specimen Collection Date"] > "2021/01/17":
break
prev_case_count = case_count
date = None
return data
def shift_and_norm_prelim_data(raw_data, new_zero_data, agg_dates):
agg_dates_processing = list(agg_dates)
dt_shift = new_zero_data - datetime.strptime(raw_data[0]["date"], '%Y/%m/%d')
case_count = 0
prev_case_count = 0
prev_dt = None
data = []
for row in raw_data:
dt = datetime.strptime(row["date"], '%Y/%m/%d') + dt_shift
if prev_dt is None:
prev_dt = dt
case_count += row["period_change_case_count"]
if dt >= agg_dates_processing[0]:
case_pct_pop = float(case_count) / float(WA_POP) * 100.
case_100k = float(case_count) / float(WA_POP) * 100000
period_change_case_count = 0 if prev_case_count == 0 else case_count - prev_case_count
period_change_case_pct_pop = float(period_change_case_count) / float(WA_POP) * 100.
period_change_case_100k = float(period_change_case_count) / float(WA_POP) * 100000
daily_case_count = period_change_case_count / (dt - prev_dt).days
daily_case_pct_pop = float(daily_case_count) / float(WA_POP) * 100.
daily_case_100k = float(daily_case_count) / float(WA_POP) * 100000
data.append({
"sop_observation_date": row["date"],
"sop_shifted_period_start_date": new_zero_data.strftime("%Y/%m/%d"),
"sop_shifted_period_end_date": dt.strftime("%Y/%m/%d"),
"sop_days_since_last_report": (dt - prev_dt).days if data else 0,
"sop_case_count": case_count,
"sop_case_pct_pop": case_pct_pop,
"sop_case_per_100k": case_100k,
"sop_period_change_case_count": period_change_case_count,
"sop_period_change_case_pct_pop": period_change_case_pct_pop,
"sop_period_change_case_per_100k": period_change_case_100k,
"sop_daily_case_count": daily_case_count,
"sop_daily_case_pct_pop": daily_case_pct_pop,
"sop_daily_case_per_100k": daily_case_100k,
})
prev_case_count = case_count
prev_dt = dt
agg_dates_processing.pop(0)
if not agg_dates_processing:
break
return data
def print_data(data, keys):
out_str = ', '.join(["{{{}}}".format(x) for x in keys])
print(', '.join(keys))
for row in data:
print(out_str.format(**row))
def write_csv(data, keys, fname):
with open(fname, 'w') as fp:
writer = csv.DictWriter(fp, keys, extrasaction='ignore')
writer.writeheader()
writer.writerows(data)
def update_sheet(tab, data, columns):
# Start inserting data from the second row to allow for additional
# detail about the data to be added to 1st row.
column_count = len(columns)
last_row_name = int_column_to_ascii_column(column_count)
cells = tab.range("A2:{}{}".format(last_row_name, column_count))
for i in range(column_count):
cells[i].value = columns[i]
tab.update_cells(cells)
sleep(API_RATE_LIMIT_SLEEP)
for i in range(len(data)):
row = data[i]
data_list = [row[k] for k in columns]
cells = tab.range("A{}:{}{}".format(
i + 3,
last_row_name,
i + 3,
))
for i in range(column_count):
cells[i].value = data_list[i]
tab.update_cells(cells)
sleep(API_RATE_LIMIT_SLEEP)
def int_column_to_ascii_column(column_count):
ascii_int = ASCII_A + column_count - 1
if ascii_int <= ASCII_Z:
return chr(ascii_int)
return "{}{}".format(
chr(ASCII_A + (ascii_int / ASCII_Z) - 1),
chr(ASCII_A + (ascii_int % ASCII_Z) - 1),
)
def main():
processed_dates = []
unprocessed_files, dupe_files = identify_unprocessed_files()
breakthrough_data, processed_dates = process_breakthrough_data(unprocessed_files, processed_dates)
print_data(breakthrough_data, BREAKTHROUGH_CSV_KEYS)
epi_data = process_epi_data(breakthrough_data[0]["_period_start_date"], processed_dates)
print_data(epi_data, EPI_CSV_KEYS)
vac_data = process_vaccine_data(processed_dates)
print_data(vac_data, VACCINE_CSV_KEYS)
breakthrough_after_delta = process_breakthrough_after_delta_data(breakthrough_data, vac_data)
print_data(breakthrough_after_delta, BREAKTHROUGH_AFTER_COVID_CSV_KEYS)
agg_data = aggregate_data(breakthrough_data, epi_data, vac_data)
print_data(agg_data, AGGREGATE_CSV_KEYS)
prelim_data = progression_until_start()
print_data(prelim_data, PRELIM_CSV_KEYS)
shifted_prelim_data = shift_and_norm_prelim_data(prelim_data, breakthrough_data[0]["_period_start_date"], processed_dates)
print_data(shifted_prelim_data, SHIFTED_PRELIM_CSV_KEYS)
test_data = process_test_data(processed_dates)
print_data(test_data, TEST_DATA_CSV_KEYS)
write_csv(breakthrough_data, BREAKTHROUGH_CSV_KEYS, BREAKTHROUGH_DATA_CSV)
write_csv(epi_data, EPI_CSV_KEYS, EPI_DATA_CSV)
write_csv(vac_data, VACCINE_CSV_KEYS, VACCINE_DATA_CSV)
write_csv(agg_data, AGGREGATE_CSV_KEYS, AGGREGATE_DATA_CSV)
write_csv(prelim_data, PRELIM_CSV_KEYS, EARLY_DATA_CSV)
write_csv(shifted_prelim_data, SHIFTED_PRELIM_CSV_KEYS, SHIFTED_EARLY_DATA_CSV)
write_csv(breakthrough_after_delta, BREAKTHROUGH_AFTER_COVID_CSV_KEYS, DELTA_ADJUSTED_BREAKTHROUGH_DATA_CSV)
write_csv(test_data, TEST_DATA_CSV_KEYS, TEST_DATA_CSV)
gclient = get_google_client()
sheet = gclient.open_by_key(G_SHEET_ID)
print("Updating sheet: Breakthrough Data")
# avoid bad data in sheet by popping the first element from the data (all rates are zero for first element)
breakthrough_data.pop(0)
update_sheet(sheet.get_worksheet(GDOC_TAB_MAP["Breakthrough Data"]), breakthrough_data, BREAKTHROUGH_CSV_KEYS)
print("Updating sheet: Delta Adjusted Breakthrough Data")
# avoid bad data in sheet by popping the first element from the data (all rates are zero for first element)
breakthrough_after_delta.pop(0)
update_sheet(sheet.get_worksheet(GDOC_TAB_MAP["Delta Adjusted Breakthrough Data"]), breakthrough_after_delta, BREAKTHROUGH_AFTER_COVID_CSV_KEYS)
print("Updating sheet: Epi Data")
# avoid bad data in sheet by popping the first element from the data (all rates are zero for first element)
epi_data.pop(0)
update_sheet(sheet.get_worksheet(GDOC_TAB_MAP["Epi Data"]), epi_data, EPI_CSV_KEYS)
print("Updating sheet: Vaccine Data")
# avoid bad data in sheet by popping the first element from the data (all rates are zero for first element)
vac_data.pop(0)
update_sheet(sheet.get_worksheet(GDOC_TAB_MAP["Vaccine Data"]), vac_data, VACCINE_CSV_KEYS)
print("Updating sheet: Aggregate")
# avoid bad data in sheet by popping the first element from the data (all rates are zero for first element)
agg_data.pop(0)
update_sheet(sheet.get_worksheet(GDOC_TAB_MAP["Aggregate"]), agg_data, AGGREGATE_CSV_KEYS)
print("Updating sheet: Early Data")
# avoid bad data in sheet by popping the first element from the data (all rates are zero for first element)
prelim_data.pop(0)
update_sheet(sheet.get_worksheet(GDOC_TAB_MAP["Early Data"]), prelim_data, PRELIM_CSV_KEYS)
print("Updating sheet: Re-Zeroed Data")
# avoid bad data in sheet by popping the first element from the data (all rates are zero for first element)
shifted_prelim_data.pop(0)
update_sheet(sheet.get_worksheet(GDOC_TAB_MAP["Re-Zeroed Data"]), shifted_prelim_data, SHIFTED_PRELIM_CSV_KEYS)
print("Updating sheet: Test Data")
# avoid bad data in sheet by popping the first element from the data (all rates are zero for first element)
test_data.pop(0)
update_sheet(sheet.get_worksheet(GDOC_TAB_MAP["Test Data"]), test_data, TEST_CSV_KEYS)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment