Skip to content

Instantly share code, notes, and snippets.

@airalcorn2
Created September 26, 2021 16:53
Show Gist options
  • Save airalcorn2/190b2240fa252de5a5fe9dac02d67142 to your computer and use it in GitHub Desktop.
Save airalcorn2/190b2240fa252de5a5fe9dac02d67142 to your computer and use it in GitHub Desktop.
Creates a video showing COVID-19 cases per million in U.S. counties over time.
import cv2
import geopandas
import io
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import requests
import time
import zipfile
from datetime import datetime, timedelta
from PIL import Image
# Load county population data.
url_population = "http://www2.census.gov/programs-surveys/popest/datasets/2010-2019/counties/totals/co-est2019-alldata.csv"
df_population = pd.read_csv(url_population, encoding="ISO-8859-1")
df_population["STATE"] = df_population["STATE"].apply(
lambda state_fips: str(state_fips).zfill(2)
)
df_population["COUNTY"] = df_population["COUNTY"].apply(
lambda county_fips: str(county_fips).zfill(3)
)
df_population["FIPS"] = df_population["STATE"] + df_population["COUNTY"]
# Load county map.
# From: https://www.census.gov/geographies/mapping-files/time-series/geo/tiger-line-file.html.
shapefile = "tl_2020_us_county"
try:
gdf_counties = geopandas.read_file(f"{shapefile}/{shapefile}.shp")
except:
zip_file_url = (
"https://www2.census.gov/geo/tiger/TIGER2020/COUNTY/tl_2020_us_county.zip"
)
r = requests.get(zip_file_url)
z = zipfile.ZipFile(io.BytesIO(r.content))
with zipfile.ZipFile(io.BytesIO(r.content)) as zip_ref:
zip_ref.extractall(f"{shapefile}")
gdf_counties = geopandas.read_file(f"{shapefile}/{shapefile}.shp")
gdf_counties.to_crs("EPSG:3395", inplace=True)
# Load county COVID-19 data.
url = "https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-counties.csv"
df_counties = pd.read_csv(url)
df_counties.sort_values(["fips", "date"], inplace=True)
# Calculate daily new cases from cumulative counts.
df_counties["new_cases"] = df_counties["cases"].diff()
mask = df_counties["fips"] != df_counties["fips"].shift(1)
df_counties["new_cases"][mask] = np.nan
df_counties["new_cases"] = df_counties["new_cases"].apply(lambda x: x if x > 0 else 0)
# Replace missing FIPS with -1 and covert FIPS values to strings.
df_counties["fips"] = df_counties["fips"].replace(np.nan, -1)
df_counties["fips"] = df_counties["fips"].apply(lambda fips: str(int(fips)).zfill(5))
# Find maximum 7-day cases per million.
current_day = datetime.strptime(df_counties["date"].min(), "%Y-%m-%d").date()
print(current_day)
end_day = datetime.strptime(df_counties["date"].max(), "%Y-%m-%d").date()
cases_per_mil = []
while current_day <= end_day:
current_day_str = current_day.strftime("%Y-%m-%d")
if current_day_str.split("-")[-1] == "01":
print(current_day_str, flush=True)
# Compute new cases over the past week.
start_day = current_day - timedelta(days=7)
start_day_str = start_day.strftime("%Y-%m-%d")
df_counties_past_week = df_counties[
(df_counties["date"] > start_day_str) & (df_counties["date"] <= current_day_str)
]
df_counties_past_week = (
df_counties_past_week.groupby("fips")["new_cases"].sum().reset_index()
)
# Combine COVID-19 data with population data.
combined = df_counties_past_week.merge(
df_population[["FIPS", "POPESTIMATE2019"]],
left_on="fips",
right_on="FIPS",
how="left",
)
combined["new_cases_per_mil"] = (
1000000 * combined["new_cases"] / combined["POPESTIMATE2019"]
)
combined["new_cases_per_mil"].replace(np.nan, 0, inplace=True)
current_day += timedelta(days=1)
cases_per_mil.extend(list(combined["new_cases_per_mil"].values))
print(np.quantile(cases_per_mil, [0.1, 0.5, 0.75, 0.9, 0.95, 0.98, 0.99, 0.999]))
max_thresh = np.quantile(cases_per_mil, 0.99)
# Create COVID-19 cases per million video.
current_day = datetime.strptime(df_counties["date"].min(), "%Y-%m-%d").date()
print(current_day)
imgs = []
start_time = time.time()
while current_day <= end_day:
current_day_str = current_day.strftime("%Y-%m-%d")
if current_day_str.split("-")[-1] == "01":
print(current_day_str, flush=True)
# Compute new cases over the past week.
start_day = current_day - timedelta(days=7)
start_day_str = start_day.strftime("%Y-%m-%d")
df_counties_past_week = df_counties[
(df_counties["date"] > start_day_str) & (df_counties["date"] <= current_day_str)
]
df_counties_past_week = (
df_counties_past_week.groupby("fips")["new_cases"].sum().reset_index()
)
# Combine COVID-19 data with population data.
combined = df_counties_past_week.merge(
df_population[["FIPS", "POPESTIMATE2019"]],
left_on="fips",
right_on="FIPS",
how="left",
)
combined["new_cases_per_mil"] = (
1000000 * combined["new_cases"] / combined["POPESTIMATE2019"]
)
combined["new_cases_per_mil"].replace(np.nan, 0, inplace=True)
combined["new_cases_per_mil"] = combined["new_cases_per_mil"].apply(
lambda x: x if x < max_thresh else max_thresh
)
# Combine COVID-19 per million data with counties map.
final_gdf = gdf_counties.merge(
combined[["FIPS", "new_cases_per_mil"]],
left_on="GEOID",
right_on="FIPS",
how="left",
)
# Plot.
ax = final_gdf.plot(
column="new_cases_per_mil", legend=True, vmin=0, vmax=max_thresh
)
plt.xlim(-1.397e7, -7.41e6)
plt.ylim(2.71e6, 6.39e6)
plt.title(current_day_str)
plt.axis("off")
plt.tight_layout()
ax.figure.canvas.draw()
imgs.append(
Image.frombytes(
"RGB",
ax.figure.canvas.get_width_height(),
ax.figure.canvas.tostring_rgb(),
)
)
plt.close()
current_day += timedelta(days=1)
print(time.time() - start_time)
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
video = cv2.VideoWriter("covid19.mp4", fourcc, 7.5, (640, 480))
for img in imgs:
cv2_img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
video.write(cv2_img)
cv2.destroyAllWindows()
video.release()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment