Skip to content

Instantly share code, notes, and snippets.

@gnn
Last active February 23, 2021 11:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gnn/5b7e4476ac25013ff02d3d685c1b5811 to your computer and use it in GitHub Desktop.
Save gnn/5b7e4476ac25013ff02d3d685c1b5811 to your computer and use it in GitHub Desktop.
Filter CSV lines using a shapefile
#! /usr/bin/env python
from pathlib import Path
import csv
import json
from shapely.geometry import Point, shape
from shapely.prepared import prep
from egon.data import subprocess
from egon.data.db import credentials
state = "Schleswig-Holstein"
def target(source):
"""Generate the target path corresponding to a source path."""
return Path(source.stem + "." + state + source.suffix)
# Select the union of the geometries of Schleswig-Holstein from the
# database, convert their projection to the one used in the CSV file,
# output the result to stdout as a GeoJSON string and read it into a
# prepared shape for filtering.
db = credentials()
geojson = subprocess.run(
["ogr2ogr"]
+ ["-s_srs", "epsg:4326"]
+ ["-t_srs", "epsg:3035"]
+ ["-f", "GeoJSON"]
+ ["/vsistdout/"]
+ [
f"PG:host={db['HOST']} user='{db['POSTGRES_USER']}'"
f" password='{db['POSTGRES_PASSWORD']}' port={db['PORT']}"
f" dbname='{db['POSTGRES_DB']}'"
]
+ [
"-sql",
"SELECT ST_Union(geometry)"
" FROM boundaries.vg250_lan"
f" WHERE gen = '{state}'",
],
text=True,
)
features = json.loads(geojson.stdout)["features"]
assert (
len(features) == 1
), f"Found {len(features)} geometry features, expected exactly one."
schleswig_holstein = prep(shape(features[0]["geometry"]))
# This block actually filters lines in the source CSV file and copies
# the appropriate ones to the destination. While doing so, it keeps
# track of the value of the "Gitter_ID_100m" field of those lines, which
# where not discarded. These values are then used, to filter additional
# files, also having a "Gitter_ID_100m" field, but lacking "x_mp_100m"
# and "y_mp_100m" coordinate fields.
csv_file = Path("Zensus_Bevoelkerung_100m-Gitter.csv").resolve(strict=True)
with open(csv_file, mode="r", newline="") as input_lines:
rows = csv.DictReader(input_lines, delimiter=";")
gitter_ids = set()
with open(target(csv_file), mode="w", newline="") as destination:
output = csv.DictWriter(
destination, delimiter=";", fieldnames=rows.fieldnames
)
output.writeheader()
output.writerows(
gitter_ids.add(row["Gitter_ID_100m"]) or row
for row in rows
if schleswig_holstein.intersects(
Point(float(row["x_mp_100m"]), float(row["y_mp_100m"]))
)
)
for path in [
Path(p).resolve(strict=True)
for p in ["Geb100m.csv", "Haushalte100m.csv", "Wohnungen100m.csv"]
]:
with open(path, mode="r", newline="", encoding="iso-8859-1") as inputs:
rows = csv.DictReader(inputs, delimiter=",")
with open(
target(path),
mode="w",
newline="",
encoding="iso-8859-1",
) as destination:
output = csv.DictWriter(
destination, delimiter=",", fieldnames=rows.fieldnames
)
output.writeheader()
output.writerows(
row for row in rows if row["Gitter_ID_100m"] in gitter_ids
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment