Skip to content

Instantly share code, notes, and snippets.

Created September 8, 2019 06:36
Show Gist options
  • Save DIRKMJK/d02ef2f5845228a3df96c5cf543bf4a4 to your computer and use it in GitHub Desktop.
Save DIRKMJK/d02ef2f5845228a3df96c5cf543bf4a4 to your computer and use it in GitHub Desktop.
import time
from pathlib import Path
import requests
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
import numpy as np
import geopy.distance
OSM = Path('../data/osm')
COUNTRIES = '../data/ne_10m_admin_0_countries/ne_10m_admin_0_countries.shp'
BASE_URL = '{}&key={}'
ELEVATION = '../data/elevation.txt'
PREC = 5
MILE = 1.609344
GET_ELEVATION = input('Get new elevation data? y/[n]')
def length(linestring):
"""Calculate length of linestring"""
length = 0
coords = linestring.coords
coords = [(lat, lon) for (lon, lat) in coords]
for i in range(len(coords) - 1):
start = coords[i]
end = coords[i + 1]
length += geopy.distance.distance(start, end).m
return length
def convert_speed(speed):
"""Convert mph to kph"""
if pd.isnull(speed):
return speed
if 'mph' in speed:
speed = speed.replace('mph', '').strip()
speed = MILE * float(speed)
return float(speed)
def find_country(line):
"""Find country line is in"""
for i in countries.index:
geom = countries.geometry[i]
if line.within(geom):
return countries.ADMIN[i]
return None
def find_shapes(line, geo):
"""Find multipolygons point is in"""
coords = line.coords
start = Point(coords[0])
end = Point(coords[-1])
matches = []
for i in geo.index:
multipolygon = geo.loc[i, 'geometry']
for polygon in list(multipolygon):
if start.within(polygon) or end.within(polygon):
return matches
def add_geo(var, geo, gdf_to_geo):
"""Lookup geo characteristics"""
for i in gdf[ == 'Netherlands'].index:
geo_indices = gdf_to_geo[i]
values = geo.loc[geo_indices, var]
values = [v for v in values if pd.notnull(v)]
gdf.loc[i, var] = '|'.join(values)
def location_query(elevation, n):
"""Create n-length query for locations with no elevation data yet"""
subset = elevation[pd.isnull(elevation.elevation)].head(n)
coords = ['{:f},{:f}'.format(c[0], c[1]) for c in zip(, subset.lon)]
query = '|'.join(coords)
return subset.index, query
def request_elevation(elevation, n):
"""Request elevation for next n locations"""
index, query = location_query(elevation, n)
url = BASE_URL.format(query, API_KEY)
r = requests.get(url)
if r.status_code != 200:
results = r.json()['results']
for i, result in enumerate(results):
ind = index[i]
elevation.loc[ind, 'lat_g'] = result['location']['lat']
elevation.loc[ind, 'lon_g'] = result['location']['lng']
elevation.loc[ind, 'elevation'] = result['elevation']
elevation.loc[ind, 'resolution'] = result['resolution']
return elevation, r
def lookup_elevation(geometry):
"""Look up elevation for start or end of edge"""
coords = geometry.coords
elevations = []
resolutions = []
key_start = f'{round(coords[0][0], PREC)},{round(coords[0][1], PREC)}'
if key_start in elev_dict:
key_end = f'{round(coords[-1][0], PREC)},{round(coords[-1][1], PREC)}'
if key_end in elev_dict:
if not elevations:
return None, None, None
return min(elevations), max(elevations), np.mean(resolutions)
def create_elevation(gdf):
"""Create df with elevation for start and and points of ways"""
gdf['start'] = x: x.coords[0])
gdf['end'] = x: x.coords[-1])
start_and_end_coords = set(list(gdf.start) + list(gdf.end))
elevation = pd.DataFrame(start_and_end_coords)
elevation.columns = ['lon', 'lat']
for c in ['lat_g', 'lon_g', 'elevation', 'resolution']:
if c not in elevation.columns:
elevation[c] = None
count = 0
todo = sum(pd.isnull(elevation.elevation))
n = 10
while count < todo:
count += n
if count % 500 == 0:
elevation, r = request_elevation(elevation, n)
if count % 100 == 0:
if len(r.json()['results']) < n:
elevation.to_csv(ELEVATION, index=False)
return elevation
# Merge OSM data into gdf and create additional variables
gdfs = [gpd.read_file(p) for p in OSM.glob('*.json')]
lans = [p.stem for p in OSM.glob('*.json')]
for i, gdf in enumerate(gdfs):
gdfs[i]['lan'] = lans[i]
gdf = pd.concat(gdfs, sort=False)
gdf = gdf.reset_index()
gdf['length_m'] = gdf.geometry.apply(length)
gdf['max_speed'] = gdf.maxspeed.apply(convert_speed)
# Add countries (shapefile from Natural Earth)
countries = gpd.read_file(COUNTRIES)
gdf['country'] = gdf.geometry.apply(find_country)
# Add geology for NL (from WUR Geomorfologie map)
geo = gpd.read_file('../data/geomorfologie2017/Geomorfologie2017_v2.gml')
geo = geo.to_crs({'init': 'epsg:4326'})
gdf_to_geo = {}
for i, row in gdf[ == 'Netherlands'].iterrows():
gdf_to_geo[i] = find_shapes(row.geometry, geo)
columns_geo = [c for c in geo.columns
if 'omschrijving' in c or c.endswith('naam')]
for c in columns_geo:
add_geo(c, geo, gdf_to_geo)
# Add elevation data
if GET_ELEVATION.lower() in ['y', 'yes']:
elevation = create_elevation(gdf)
elevation = pd.read_csv(ELEVATION)
elev_dict = {f'{round(row.lon, PREC)},{round(, PREC)}':
(row.elevation, row.resolution)
for _, row in elevation.iterrows()}
gdf['elev_min'] = x: lookup_elevation(x)[0])
gdf['elev_max'] = x: lookup_elevation(x)[1])
gdf['elev_mean'] = (gdf.elev_min + gdf.elev_max) / 2
gdf['resolution'] = x: lookup_elevation(x)[2])
gdf['gradient'] = 100 * (gdf.elev_max - gdf.elev_min) / gdf.length_m
# Store data
keep = [
'geometry', 'name', 'lan', 'oneway', 'length_m', 'max_speed',
'country', 'elev_min', 'elev_max', 'elev_mean', 'resolution',
geojson_data = gdf[keep].to_json()
with open('../map/data.js', 'w') as f:
f.write('var data = {};\n'.format(geojson_data))
f.write('var languages = {};'.format(list(set(gdf.lan))))
keep += columns_geo
rename_cols = {
'BRO_Relief_omschrijving': 'relief',
'BRO_Vormsubgroep_omschrijving': 'vormsubgroep',
'GENESE_omschrijving_kort': 'genese',
'Bovengrond_omschrijving': 'bovengrond',
'BovengrondReliëf_omschrijving': 'bovengr_rlf',
'Geologischtijdperk_vormingsproces_eindigt_naam': 'geo_eind',
'Geologischtijdperk_vormingsproces_begint_naam': 'geo_begin',
'BRO_Landvormgroep_omschrijving': 'landvormgroep',
geo['holleweg'] = x:
'Holle weg' in str(x))
holleweg_geojson = geo[geo.holleweg][['geometry']].to_json()
with open('../map/geo.js', 'w') as f:
f.write('var holleweg = {};'.format(holleweg_geojson))
import requests
import json
'nl': 'Holleweg',
'en': 'Holloway',
'fr': 'Chemin Creux',
'de': 'Hohlweg',
'gl': 'Corredoira',
'da': 'Hulvejen',
'no': 'Hulvei',
'sv': 'Hålväg'
URL = ""
query = "[out:json]; way[name~'.*{}']; out geom;"
def query_op(name):
"""Query overpass api for street name"""
params = {'data': query.format(name)}
response = requests.get(URL, params=params)
return response
def to_geojson(data):
"""Convert overpass response to geojson"""
geojson = {
'type': 'FeatureCollection',
'features': []
for element in data['elements']:
coordinates = [[v for k, v in coords.items()]
for coords in element['geometry']]
coordinates = [[v2, v1] for [v1, v2] in coordinates]
feature = {
'type': 'Feature',
'geometry': {
'type': 'LineString',
'coordinates': coordinates
'properties': element['tags']
return geojson
def main():
"""Query overpass for all street names and store results"""
for lan, name in NAMES.items():
response = query_op(name)
if response.status_code != 200:
geojson = to_geojson(response.json())
with open('../data/osm/{}.json'.format(lan), 'w') as f:
if __name__ == '__main__':
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment