Skip to content

Instantly share code, notes, and snippets.

@james-morrison-mowi james-morrison-mowi/py_fetch.py Secret
Last active Jun 28, 2019

Embed
What would you like to do?
Calculate distance from land for a give latitude & longitude at specific angles ( default UTM zone is for West coast of Scotland )
import geopandas as gpd
import geopy
import geopy.distance
import numpy as np
import pandas as pd
import shapely
from scipy.spatial import distance
from shapely.geometry import LineString, MultiLineString, Point
def get_crs_string(epsg):
return {"init":"epsg:"+str(epsg)}
utm29n_epsg = 32629
utm29n = get_crs_string(utm29n_epsg)
wgs84_epsg = 4326
wgs84 = get_crs_string(wgs84_epsg)
def read_land_polygons(land_polygons_path:str) -> gpd.GeoSeries:
#land_polygons derived from https://osmdata.openstreetmap.de/data/land-polygons.html
return gpd.GeoDataFrame(geometry=gpd.read_file(land_polygons_path).geometry)
def is_this_point_on_land(latitude: float, longitude: float,
land_polygons: gpd.GeoSeries) -> bool:
intersects = np.sum(land_polygons.intersects(Point(longitude,
latitude)).values)
if intersects > 0:
return True
return False
def calc_fetch(latitude: list, longitude: list,
land_polygons: gpd.GeoSeries,
distance_km: float = 500,
angle_increment: int = 1,
utm=utm29n_epsg) -> dict:
all_points_closest_intersects = {}
for lat, lon in zip(latitude, longitude):
on_land = is_this_point_on_land(lat, lon, land_polygons)
if not on_land:
radiating_lines = generate_line_strings(lat,lon, distance_km,
angle_increment, utm)
land_polygon_subset = subset_utm_land_polygon(lat, lon, land_polygons,
buffer_distance_km=distance_km)
closest_intersects = get_closest_intersects(lat, lon,
radiating_lines,
land_polygon_subset)
point = Point(lon,lat)
all_points_closest_intersects[point] = closest_intersects
return all_points_closest_intersects
def get_closest_intersects(lat: float, lon: float,
radiating_lines: gpd.GeoSeries,
land_polygons: gpd.GeoDataFrame):
line_intersects = line_polygon_intersects(radiating_lines, land_polygons)
first_intersects = get_first_intersections_by_angle( line_intersects )
return closest_first_intersection(first_intersects, lat, lon)
def generate_line_strings(latitude: float, longitude: float,
distance: float = 500,
angle_increment: int = 1,
utm=utm29n_epsg) -> gpd.GeoSeries:
"""given an origin create radiating linestrings"""
linestrings = []
origin = geopy.Point(latitude, longitude)
angles = []
for angle in range(0, 360, angle_increment):
destination = geopy.distance.geodesic(kilometers=distance).destination(origin,
angle)
linestrings.append(LineString([(origin.longitude,
origin.latitude),
(destination.longitude,
destination.latitude)]))
angles.append(angle)
line_strings = gpd.GeoDataFrame(angles, geometry=linestrings)
line_strings.rename({0:'angle_degrees'}, axis=1, inplace=True)
line_strings.set_index('angle_degrees', inplace=True, drop=True)
line_strings.crs = wgs84
radiating_lines_utm = line_strings.to_crs(epsg=utm)
return radiating_lines_utm
def line_polygon_intersects(line_strings: gpd.GeoSeries,
land_polygons: gpd.GeoSeries) -> dict:
line_dict = {}
for angle, record in line_strings.iterrows():
print("Angle:", angle)
intersections = []
for poly_index, poly in enumerate(land_polygons.geometry):
intersects = record.geometry.intersects(poly)
if intersects:
intersection = record.geometry.intersection(poly)
intersections.append(intersection)
if len(intersections) == 0:
intersections.append(record.geometry.coords[-1])
line_dict[angle] = intersections
return line_dict
def get_first_intersections_by_angle(line_dict:dict) -> dict:
intersections = {}
for angle, intersect in line_dict.items():
angle_intersections = []
for line in intersect:
if isinstance(line,MultiLineString):
for line in line:
angle_intersections.append(line.coords[0])
elif isinstance(line, tuple):
angle_intersections.append(line)
else:
angle_intersections.append(line.coords[0])
intersections[angle] = angle_intersections
return intersections
def closest_first_intersection(intersections: dict,
latitude: float,
longitude: float,
utm=utm29n_epsg) -> dict:
closest_intersections = {}
origin = gpd.GeoSeries(Point(longitude, latitude))
origin.crs = wgs84
origin = origin.to_crs(epsg=utm)
for angle, intersection in intersections.items():
points = []
for point in intersection:
points.append(Point(point))
if len(points) > 0 :
intersection_x = []
intersection_y = []
for point in points:
intersection_x.append(point.x)
intersection_y.append(point.y)
distances = []
for index in range(len(intersection_x)):
XA=[(origin.geometry.x.values[0], origin.geometry.y.values[0])]
XB=[(intersection_x[index], intersection_y[index])]
distances.append(distance.cdist(XA, XB)[0][0])
distances = pd.Series(distances)
point = points[distances.idxmin()]
minumum_distance = distances.min()
closest_intersections[angle] = ( minumum_distance, point )
closest_points = gpd.GeoDataFrame(closest_intersections).transpose()
closest_points.geometry = closest_points[1]
closest_points[0] = closest_points[0]/1000
closest_points.rename({0:'distance_km'}, axis=1, inplace=True)
closest_points.drop([1], axis=1, inplace=True)
closest_points.reset_index(inplace=True)
closest_points.rename({'index':'angle_degrees'},axis=1, inplace=True)
closest_points.crs = utm29n
return closest_points
def subset_utm_land_polygon(latitude: float, longitude: float,
utm_land_polygons: gpd.GeoDataFrame,
buffer_distance_km: float = 500,
buffer_resolution: float = 90,
utm = utm29n_epsg) -> gpd.GeoDataFrame:
origin = gpd.GeoSeries(Point(longitude, latitude))
origin.crs = wgs84
utm_origin = origin.to_crs(epsg=utm)
origin_buffer = utm_origin.buffer(distance= buffer_distance_km * 1000,
resolution= buffer_resolution)
buffer = gpd.GeoDataFrame(geometry=origin_buffer)
subset = gpd.overlay(buffer, utm_land_polygons)
return subset
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.