Last active
March 6, 2024 01:26
-
-
Save Recursing/f87d502df7426d5883dd0940726ccacd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from time import time # For tracking how long everything is taking | |
import os | |
import matplotlib.pyplot as plt # For plotting | |
import matplotlib.ticker as ticker | |
import geopandas as gpd # For geographic data | |
import pandas as pd # For population data | |
from shapely.geometry import Polygon # Used to filter out empty polygons | |
# Geographic dataframe | |
# https://www.istat.it/storage/cartografia/confini_amministrativi/generalizzati/Limiti01012022_g.zip | |
gdf = gpd.read_file("Limiti01012022_g/Com01012022_g/Com01012022_g_WGS84.shp") | |
def load_demographic_data(): | |
# Demographic dataframe | |
# https://demo.istat.it/app/?i=P02 it changed since last time, now it's in "Area download" and it's one file per province | |
dfs = [] | |
for fname in sorted(os.listdir()): | |
if fname.startswith("P2_2022_it_") and fname.endswith(".csv"): | |
dfs.append(pd.read_csv(fname, skiprows=1, sep=";")) | |
return pd.concat(dfs) | |
df = load_demographic_data() | |
# Only keep the interesting columns | |
df = df[ | |
["Codice comune", "Morti - Totale", "Popolazione censita al 31 dicembre - Totale"] | |
] | |
# Rename columns | |
df.columns = ["Codice comune", "Morti", "Popolazione"] | |
# Merge geographic data with demographic data | |
gdf = gdf.merge(df, left_on="PRO_COM", right_on="Codice comune") | |
gdf.set_index("PRO_COM") | |
city_name = "Busto Arsizio" | |
# Center everything with the centroid of the city | |
city_centroid = gdf[gdf.COMUNE == city_name].centroid.values[0] | |
gdf["geometry"] = gdf["geometry"].translate(-city_centroid.x, -city_centroid.y) | |
city_centroid = gdf[gdf.COMUNE == city_name].centroid.values[0] | |
# Distance used for coloring | |
gdf["Distance"] = gdf.centroid.distance(city_centroid) | |
print( | |
f"Sanity check, should be close to 0,0 : {city_centroid.x:.2E}, {city_centroid.y:.2E}" | |
) | |
# Track time elapsed | |
t0 = time() | |
# List of radiuses and death numbers for final plot | |
radiuses = [] | |
deaths = [] | |
populations = [] | |
# Scale ticks by 1000, so meters become kilometers, and deaths are in thousands | |
# See https://stackoverflow.com/a/17816809/2699176 | |
scale = 1000 | |
ticker_formatter = ticker.FuncFormatter(lambda x, _pos: f"{x/scale:g}") | |
# working dataframe with only rows which intersect the circle | |
cut_df = gdf | |
# Go from bigger to smaller circles, so we can reuse the filtered dataframe | |
# at each iteration, and only look at rows inside the previous circle | |
for radius in range(100000, 0, -500): | |
# Intersect the dataframe with a circle | |
# centered in the chosen city with the given radius | |
circle = city_centroid.buffer(radius) | |
cut_df["geometry"] = cut_df["geometry"].intersection(circle) | |
# Remove empty polygons, because they raise an error | |
cut_df = cut_df[cut_df["geometry"] != Polygon()] | |
# Calculate the amount of deaths inside the intersected area | |
# Assuming uniform population density in the city | |
cut_df["Fraction"] = cut_df.area / cut_df["Shape_Area"] | |
cut_df["Deaths_inside"] = cut_df.Fraction * cut_df.Morti | |
cut_df["Pop_inside"] = cut_df.Fraction * cut_df.Popolazione | |
# Create pyplot plot coloring by distance | |
cut_df.plot(column="Distance") | |
# Write plot title | |
# total_deaths need rounding, because they have been multiplied by | |
# the fraction of the area inside the circle | |
total_deaths = round(cut_df.Deaths_inside.sum()) | |
header = f"Radius: {radius/1000:.0f}km Deaths: {total_deaths:,}" | |
plt.title(header) | |
# Write names of 10 cities with most deaths inside the circle | |
top_10 = cut_df.sort_values(by="Deaths_inside")[-10:] | |
for _, row in top_10.iterrows(): | |
plt.annotate( | |
text=f"{row['COMUNE']}\n{row['Deaths_inside']:.0f}", | |
xy=row["geometry"].centroid.coords[:][0], | |
horizontalalignment="center", | |
size=6, | |
) | |
# Set axes tickers and labels | |
axes = plt.gca() | |
axes.yaxis.set_major_formatter(ticker_formatter) | |
axes.xaxis.set_major_formatter(ticker_formatter) | |
axes.set_ylabel("Distance (km)") | |
axes.set_xlabel("Distance (km)") | |
plt.savefig(f"Deaths_{radius//500:0>3}.png", dpi=200) | |
plt.close() | |
print(f"Plotted {radius=} Time elapsed: {time()-t0:.2f}s") | |
radiuses.append(radius) | |
deaths.append(total_deaths) | |
total_population = round(cut_df.Pop_inside.sum()) | |
populations.append(total_population) | |
# Final plot: deaths and population vs radius | |
fig, ax1 = plt.subplots() | |
plt.title(f"Population: {populations[0]} Deaths: {deaths[0]}") | |
color = "tab:red" | |
ax1.set_xlabel("Distance (km)") | |
ax1.set_ylabel("Deaths (thousands)", color=color) | |
plt.plot(radiuses, deaths, color=color) | |
ax1.tick_params(axis="y", labelcolor=color) | |
ax1.yaxis.set_major_formatter(ticker_formatter) | |
color = "tab:blue" | |
ax2 = ax1.twinx() | |
ax2.set_ylabel("Population (thousands)", color=color) | |
ax2.plot(radiuses, populations, color=color) | |
ax2.tick_params(axis="y", labelcolor=color) | |
ax2.yaxis.set_major_formatter(ticker_formatter) | |
axes = plt.gca() | |
axes.xaxis.set_major_formatter(ticker_formatter) | |
fig.tight_layout() | |
plt.savefig("PopDeathsVsRadius.png", dpi=200) | |
print(f"Total time: {time() - t0:.2f}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
geopandas==0.14.3 | |
matplotlib==3.8.3 | |
pandas==2.2.1 | |
shapely==2.0.3 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment