Skip to content

Instantly share code, notes, and snippets.

@michaelfeil
Last active April 23, 2023 10:02
Show Gist options
  • Save michaelfeil/6d9b5dc41fd09afaa5c2f08c15808036 to your computer and use it in GitHub Desktop.
Save michaelfeil/6d9b5dc41fd09afaa5c2f08c15808036 to your computer and use it in GitHub Desktop.
Pandas + OpenTopoData.org
"""MIT Licence, 2023, Michael Feil """
import pandas as pd
import os
import requests
import math
import numpy as np
from typing import Callable
import time
CACHE_DIR = str(os.path.join(os.path.dirname(__file__), ".geocache"))
KEY_LAT = "in_latitude"
KEY_LNG = "in_longitude"
KEY_ELE = "in_elevation"
def _get_elevation_opentopodata_simple(
df: pd.DataFrame,
url: str = f"https://api.opentopodata.org/v1/etopo1?",
additonal_api_params="",
sleep_between_request=1.0,
):
"""Queries opentopo without prior validation or caching
params:
df: pd.DataFrame with KEY_LAT="in_latitude", KEY_LNG=in_longitude"
returns:
df: pd.DataFrame with
KEY_LAT="in_latitude", KEY_LNG=in_longitude", KEY_ELE = "in_elevation"
url: https://api.opentopodata.org/v1/{dataset}, see docs: https://www.opentopodata.org/api/
additonal_api_params: e.g. "&interpolation=cubic", see docs: https://www.opentopodata.org/api/
"""
# define rest query params
# split long1,lat1|lng2,lat2
location = "|".join(
[f"{row[KEY_LAT]:.9f},{row[KEY_LNG]:.9f}" for _, row in df.iterrows()]
)
url_spec = url + "locations=" + location + additonal_api_params
# format query string and return query value
result = requests.get((url_spec))
time.sleep(sleep_between_request) # free api
if result.status_code == 200:
results = pd.json_normalize(result.json()["results"])
s = df.copy()
s[KEY_ELE] = results["elevation"].values
return s
else:
print(result, result.json())
return None
def get_elevation_with_cache(
lng_lat: pd.DataFrame,
key_lat: str = "in_latitude",
key_lng: str = "in_longitude",
query_batch_size: int = 99,
cache_path: str = os.path.join(CACHE_DIR, f"elevation_etopo1.pickle"),
api_function: Callable = _get_elevation_opentopodata_simple,
max_cache_size: int = 10_000_000,
):
"""Query service using lat, lon. add the elevation values as a new column.
uses caching to cache_path, data validation and rounds to 9 decimal per
side effects:
- rounds location to 9 decimals after comma to improve caching with near duplicates
- attemps to query the API maximum of 4 times, with exponential backoff
Args:
lng_lat: pd.DataFrame with [key_lat, key_lng] columns, dtype float
key_lat: str, column name
key_lng: str, column name
query_batch_size: int
cache_path: str, path to cache, if "" then no cache
max_cache_size: int, max number of elevations to cache
default: 10_000_000
Returns:
pd.Series elevation, Float64
Example:
df = pd.DataFrame(
[
[39.742043, -104.991531, "Denver, approx 1.610m"],
[49.37726, 10.19159, "Rothenburg ob der Tauber, approx 420m"],
],
columns=["lat", "LONG", "name"],
)
df["ele"] = get_elevation_with_cache(
df, key_lat="lat", key_lng="LONG"
)
Raises:
ValueError: For incorrect input data
"""
lng_lat = (
lng_lat[[key_lat] + [key_lng]]
.copy()
.rename(columns={key_lat: KEY_LAT, key_lng: KEY_LNG})
)
if cache_path:
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
# checks
if not all("float" in str(d).lower() for d in lng_lat.dtypes):
raise ValueError(f"lng_lat.dtypes={lng_lat.dtypes} should be float")
if lng_lat.isna().any().any():
raise ValueError(
f"lng_lat contains invalid NAN data: \n {lng_lat[lng_lat.isna().any(axis=1)]}."
)
# filter, then copy, then convert, then check if con
_lon_lat = lng_lat.copy()
lng_lat = lng_lat.astype("Float64").round(9)
lng_lat[(lng_lat[KEY_LNG] > 180.0) | (lng_lat[KEY_LNG] <= -180.0)] = pd.NA
lng_lat[(lng_lat[KEY_LAT] > 90.0) | (lng_lat[KEY_LAT] < -90.0)] = pd.NA
if lng_lat.isna().any().any():
raise ValueError(
f"lng / lat contains invalid data: \n {_lon_lat[lng_lat.isna().any(axis=1)]}. \n"
"After conversion. only wsg84 lng/lat e.g. "
"pd.DataFrame([[51.958108 7.300415]]) as float is valid."
)
lon_lat_in = lng_lat.copy()
# lon_lat_in now immutable
_cache_df = None
lng_lat[KEY_ELE] = lng_lat[KEY_LAT] * pd.NA
# get cache, else just initalize column elevation with NA's
if cache_path and os.path.exists(cache_path):
# merge elevtions from cache
_cache_df = pd.read_pickle(cache_path).drop_duplicates()
lng_lat = pd.merge(
lng_lat,
_cache_df.rename(columns={key_lat: KEY_LAT, key_lng: KEY_LNG}),
on=[KEY_LAT] + [KEY_LNG],
how="left",
suffixes=("", "_r"),
)
lng_lat[KEY_ELE] = lng_lat[KEY_ELE].combine_first(lng_lat[KEY_ELE + "_r"])
lng_lat = lng_lat[[KEY_LAT] + [KEY_LNG] + [KEY_ELE]]
else:
lng_lat[KEY_ELE] = lng_lat[KEY_LAT] * pd.NA
unique_pos = lng_lat.drop_duplicates(subset=[KEY_LAT] + [KEY_LNG])
uniq_pos_new = []
update_set = unique_pos[unique_pos[KEY_ELE].isna() & ~unique_pos[KEY_LNG].isna()]
if update_set.size:
print("querying server for elevation data")
for subset in np.array_split(
update_set, math.ceil(update_set.shape[0] / query_batch_size)
):
backoff = 0
for _ in range(4):
time.sleep(backoff)
newpos = api_function(subset)
if newpos is not None:
uniq_pos_new.append(newpos)
break
else:
# wait for 0, 2, 6, 38s
backoff = 2 + backoff**2
uniq_pos_new = pd.concat(uniq_pos_new, axis=0)
# merge new ones on top of existing positions
uniq_pos_new = pd.merge(
unique_pos,
uniq_pos_new,
on=[KEY_LAT] + [KEY_LNG],
how="left",
suffixes=("", "_r"),
)
uniq_pos_new[KEY_ELE] = uniq_pos_new[KEY_ELE + "_r"].combine_first(
uniq_pos_new[KEY_ELE]
)
uniq_pos_new = uniq_pos_new[[KEY_LAT] + [KEY_LNG] + [KEY_ELE]]
else:
# got all data cached
uniq_pos_new = unique_pos
print("not querying server for elevation data, only cache")
merged_all = pd.merge(
left=lon_lat_in,
right=uniq_pos_new,
on=[KEY_LAT] + [KEY_LNG],
how="left",
suffixes=("_x", ""),
)[[KEY_LAT] + [KEY_LNG] + [KEY_ELE]]
# sort in order of input, not ordered by batching / joins
merged_all = merged_all.reindex(lon_lat_in.index)
# done, report and merge cache.
if _cache_df is not None:
all_data = pd.merge(
merged_all,
_cache_df.rename(columns={key_lat: KEY_LAT, key_lng: KEY_LNG}),
on=[KEY_LAT] + [KEY_LNG],
how="outer",
suffixes=("", "_r"),
)[[KEY_LAT] + [KEY_LNG] + [KEY_ELE] + [KEY_ELE + "_r"]]
all_data[KEY_ELE] = all_data[KEY_ELE + "_r"].combine_first(all_data[KEY_ELE])
all_data.drop([KEY_ELE + "_r"], axis=1, inplace=True)
all_data.drop_duplicates().iloc[:max_cache_size].to_pickle(cache_path)
elif cache_path:
merged_all.drop_duplicates().iloc[:max_cache_size].to_pickle(cache_path)
# return elevation column
return merged_all[KEY_ELE].astype("Float64")
def test_get_elevation():
import tempfile
import pytest
df_base = pd.DataFrame(
[
[39.742043, -104.991531, "Denver, approx 1.610m"],
[49.37726, 10.19159, "Rothenburg ob der Tauber, approx 420m"],
],
columns=["lat", "LONG", "name"],
)
df_muc = pd.DataFrame(
[
[48.1351253, 11.5819806, "Munich, approx 520m"],
],
columns=["lat", "LONG", "name"],
)
df = df_base.copy()
df2 = df.copy()
df3 = df.copy()
fn = _get_elevation_opentopodata_simple
with tempfile.NamedTemporaryFile(suffix=".pkl") as tempf:
pd.DataFrame([], columns=[KEY_LAT] + [KEY_LNG] + [KEY_ELE]).to_pickle(
tempf.name
)
df["ele"] = get_elevation_with_cache(
df,
key_lat="lat",
key_lng="LONG",
api_function=fn,
cache_path=tempf.name,
query_batch_size=10,
)
_cache_df = pd.read_pickle(tempf.name)
# test all data written to cache
assert (df["ele"] == _cache_df[KEY_ELE]).all()
# second call should be reading only from cache
df2["ele"] = get_elevation_with_cache(
df2, key_lat="lat", key_lng="LONG", api_function=fn, cache_path=tempf.name
)
pd.testing.assert_frame_equal(df, df2)
# test if Munich data is additionally cached
df_muc["ele"] = get_elevation_with_cache(
df_muc,
key_lat="lat",
key_lng="LONG",
api_function=fn,
cache_path=tempf.name,
query_batch_size=10,
)
_cache_df = pd.read_pickle(tempf.name)
assert (
pd.concat([df["ele"], df_muc["ele"]]).sort_values().values
== _cache_df[KEY_ELE].sort_values().values
).all()
# third call, without using the cache, with 2 api calls
df3["ele"] = get_elevation_with_cache(
df3,
key_lat="lat",
key_lng="LONG",
api_function=fn,
cache_path="",
query_batch_size=1,
)
pd.testing.assert_frame_equal(df, df3)
# fourth call directly without any caching or validation
ele = fn(
df[["LONG"] + ["lat"]].copy().rename(columns={"lat": KEY_LAT, "LONG": KEY_LNG})
)
with pytest.raises(ValueError):
df_invalid = df_base.copy()
df_invalid["LONG"].iloc[0] = -191
get_elevation_with_cache(
df_invalid,
key_lat="lat",
key_lng="LONG",
api_function=fn,
)
with pytest.raises(ValueError):
df_invalid = df_base.copy()
df_invalid["LONG"].iloc[0] = pd.NA
get_elevation_with_cache(
df_invalid,
key_lat="lat",
key_lng="LONG",
api_function=fn,
)
assert ((df["ele"].values - ele[KEY_ELE].values) <= 1).all()
print(df)
if __name__ == "__main__":
test_get_elevation()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment