Skip to content

Instantly share code, notes, and snippets.

@hadisfr
Created August 12, 2021 21:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hadisfr/01895e40b5262636c9a8b48eda44a030 to your computer and use it in GitHub Desktop.
Save hadisfr/01895e40b5262636c9a8b48eda44a030 to your computer and use it in GitHub Desktop.
plot graphs and animations of COVID-19 pandemic data from UNICEF population data and https://github.com/owid/covid-19-data
#!/usr/bin/env python3
import subprocess
import re
from os import path
import pandas as pd
import geopandas as gpd
import numpy as np
import seaborn as sns
from tqdm import tqdm
from matplotlib import rcParams, pyplot as plt, dates as mdates
rcParams["svg.fonttype"] = "none"
rcParams['font.family'] = 'Times New Roman'
per_capita = True
log = False
overlay_path = "overlay.png"
first_date = {
"new_cases": "2020-01-23",
"new_cases_smoothed": "2020-01-23",
"new_deaths": "2020-01-23",
"new_deaths_smoothed": "2020-01-23",
"new_tests": "2020-01-01",
"new_tests_smoothed": "2020-01-01",
"people_vaccinated": "2020-12-04",
"people_vaccinated_smoothed": "2020-12-04",
"people_fully_vaccinated": "2020-12-27",
"people_fully_vaccinated_smoothed": "2020-12-27",
}
countries = {"IRN", "RUS", "TUR", "USA"}
countries = ["IRN", "ISL", "ARE", "SWE", "GBR", "RUS", "QAT", "CHN", "PER", "HUN", "ITA", "NLD", "TUR", "USA"]
countries = ["IRN", "ARE", "CHN", "PER", "HUN", "ITA", "NLD", "USA", "CUB"]
countries = {"IRN"}
def from_unicef(src_key="Total population (thousands)", dst_key="population"):
df = pd.read_csv("UNICEF_latest.csv")
df = df[df["Indicator"] == src_key]
df = df[["LOCATION", "Value"]].rename(columns={"LOCATION": "iso_a3", "Value": dst_key})
return df
def get_world():
world = gpd.read_file(gpd.datasets.get_path('naturalearth_lowres'))
# fix for bug https://github.com/geopandas/geopandas/issues/1041
world.loc[world['name'] == 'France', 'iso_a3'] = 'FRA'
world.loc[world['name'] == 'Norway', 'iso_a3'] = 'NOR'
world.loc[world['name'] == 'N. Cyprus', 'iso_a3'] = 'CYP'
world.loc[world['name'] == 'Somaliland', 'iso_a3'] = 'SOM'
world.loc[world['name'] == 'Kosovo', 'iso_a3'] = 'RKS'
pop_df = from_unicef("Total population (thousands)", "population")
world = pd.merge(world, pop_df, on="iso_a3", how="left")
world["population"] = world["population"] * 1000
world["population"] = world["population"].fillna(world["pop_est"])
world["population"] = world["population"].astype("int32")
world = world[(world.pop_est > 0) & (world.name != "Antarctica")]
print("World DS:\n%s\n" % world)
return world
def get_covid():
df = pd.read_csv("owid-covid-data.csv")
print("COVID-19 DS Columns:\n%s\n" % "\n".join(list(map(lambda c: "\t%s" % c, df.columns))))
df = df.rename(columns={"iso_code": "iso_a3"})
print("COVID-19 DS:\n%s\n" % df)
return df
def compare_countries(world, covid_df):
world_countries = set(world["iso_a3"])
covid_df_countries = set(covid_df["iso_a3"])
print("missing COVID-19 data:\t%s" % (world_countries - covid_df_countries))
print("missing country data:\t%s" % (covid_df_countries - world_countries))
def interpolate_covid(covid_per_col, col):
covid_per_col = covid_per_col.rename(columns={col: "Metric"})
covid_per_col = covid_per_col.pivot(index=["date"], columns=["iso_a3"], values=["Metric"]).reset_index()
# covid_per_col = covid_per_col.fillna(method="bfill")
covid_per_col = covid_per_col.interpolate()
covid_per_col = covid_per_col.fillna(0)
covid_per_col = covid_per_col.melt(id_vars="date", var_name=["Metric", "iso_a3"], value_name=col)
covid_per_col = covid_per_col.drop("Metric", axis=1)
return covid_per_col
def filter_by_date(covid_per_col, col):
covid_per_col = covid_per_col[covid_per_col["date"] >= first_date[col]]
dates = list(sorted(set(covid_per_col["date"])))
print("\nfrom %s to %s (%d days)\n" % (dates[0], dates[-1], len(dates)))
return covid_per_col, dates
def fill_new_col(df, col):
new_col = col.title().replace("_", " ").replace("New", "Daily")
new_col = "COVID-19 " + new_col
if per_capita:
new_col += " per Million" if "vaccin" not in col.lower() else " (%)"
if log:
new_col += " (log10)"
new_col = re.sub(r"(.*) Smoothed(.*)", r"\1\2 (Smoothed)", new_col)
df[new_col] = df[col]
df[new_col] = df[new_col].mask(df[new_col] < 0, 0)
if per_capita:
df[new_col] = df[new_col] / df["population"] * (1_000_000 if "vaccin" not in col.lower() else 100)
if log:
df[new_col] = df[new_col].replace(0, np.nan)
df[new_col] = np.log10(df[new_col])
df[new_col] = df[new_col].fillna(0 if not per_capita else np.nanmin(df[new_col]))
# print(df[df[new_col].notna()])
return df, new_col
def plot_map_timeline(world, covid_df):
fig = plt.figure(figsize=(15, 5))
# for col in ["new_deaths"]:
for col in ["new_cases", "new_deaths", "new_tests", "people_vaccinated", "people_fully_vaccinated"]:
print(col)
path_prefix = path.join("results", col)
subprocess.run(["rm", "-r", path_prefix])
subprocess.run(["mkdir", "-p", path_prefix])
covid_per_col = covid_df[["iso_a3", "date", col]]
covid_per_col, dates = filter_by_date(covid_per_col, col)
covid_per_col = interpolate_covid(covid_per_col, col)
# for date in tqdm(dates[100:103]):
# for date in tqdm(dates[::100]):
# for date in tqdm(dates[::10]):
for date in tqdm(dates):
world_per_date = world.copy()
covid_per_date = covid_per_col[covid_per_col["date"] == date]
world_per_date = pd.merge(world_per_date, covid_per_date, on="iso_a3", how="left")
world_per_date, new_col = fill_new_col(world_per_date, col)
# fig = plt.figure(figsize=(15, 5))
ax = fig.add_subplot(1, 1, 1)
world_per_date.plot(column=new_col, ax=ax, legend=True)
plt.axis("off")
plt.title("%s (%s)" % (new_col, date))
# plt.show()
plt.savefig(path.join(path_prefix, "%s_%s.png" % (col, date)))
plt.clf()
intermediate_movie_path = path.join(path_prefix, "%s.mp4" % col)
final_movie_path = path.join(path_prefix, "%s_final.mp4" % col)
subprocess.run("ffmpeg -pattern_type glob -i '%s' -pix_fmt yuv420p %s" %
(path.join(path_prefix, "*.png"), intermediate_movie_path), shell=True)
subprocess.run("ffmpeg -i %s -i %s -filter_complex \"overlay=(main_w-overlay_w)/2:(main_h-overlay_h)/2\" -qscale:v 0 -codec:a copy %s" %
(intermediate_movie_path, overlay_path, final_movie_path), shell=True)
def plot_timeline_graph(world, covid_df):
sns.set_theme()
sns.set(font="Times New Roman")
# for col in ["new_deaths_smoothed"]:
for col in ["new_cases_smoothed", "new_deaths_smoothed", "new_tests_smoothed", "new_cases", "new_deaths", "new_tests", "people_vaccinated", "people_fully_vaccinated"]:
print(col)
covid_per_col = covid_df[["iso_a3", "date", col]]
covid_per_col = covid_per_col[covid_per_col["iso_a3"].isin(countries)]
if "vaccin" in col.lower():
covid_per_col = covid_per_col[covid_per_col["iso_a3"] != "CHN"]
covid_per_col, dates = filter_by_date(covid_per_col, col)
covid_per_col = interpolate_covid(covid_per_col, col)
covid_per_col = pd.merge(covid_per_col, world, on="iso_a3", how="left")
covid_per_col, new_col = fill_new_col(covid_per_col, col)
covid_per_col["date"] = pd.to_datetime(covid_per_col["date"], format='%Y-%m-%d')
fig, ax = plt.subplots(figsize=(8, 4.5))
plt.title(new_col, fontweight="bold")
ax.xaxis.set_major_locator(mdates.AutoDateLocator())
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y %b'))
fig.autofmt_xdate()
covid_per_col["Country"] = covid_per_col["iso_a3"]
sns.lineplot(data=covid_per_col, x="date", y=new_col, hue="Country")
if "USA" in countries:
biden_date = "2021-01-20"
biden_y = float(covid_per_col[(covid_per_col["date"] == biden_date) & (covid_per_col["iso_a3"] == "USA")][new_col])
plt.scatter(np.datetime64(biden_date), biden_y)
ax.annotate("Joe Biden", (np.datetime64(biden_date), biden_y))
plt.tight_layout()
# plt.show()
path_prefix = path.join("results")
plt.savefig(path.join(path_prefix, "%s.svg" % (col)))
def main():
world = get_world()
world = world[["iso_a3", "geometry", "population"]]
covid_df = get_covid()
compare_countries(world, covid_df)
plot_map_timeline(world, covid_df)
plot_timeline_graph(world, covid_df)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment