Skip to content

Instantly share code, notes, and snippets.

@vikas-git
Created December 5, 2019 06:49
Show Gist options
  • Save vikas-git/22aa3ca5164c37737f020389950a85bf to your computer and use it in GitHub Desktop.
Save vikas-git/22aa3ca5164c37737f020389950a85bf to your computer and use it in GitHub Desktop.
find distance between two points using google map api and plot it on folium.
# check attachment files
# find_distance.py
import os
from datetime import datetime
import googlemaps
from django.conf import settings
from packages.common_func import get_or_none, convert_float
from network.models import Distance, LatLong
class FindDistance(object):
def __init__(self):
pass
def convert_distance_in_km(self, distance):
new_distance = 0
try:
if distance.find(' km'):
new_distance = distance.replace(' km', '').replace(',','')
elif distance.find(' m'):
distance = float(distance.replace(' m', '')).replace(',','')
new_distance = distance/1000
else:
new_distance = distance
except Exception as e:
print(e)
return convert_float(new_distance)
def get_distance(self, origin, destination):
'''
This function return distance between origin and destination.
It search from db records if it does exist then it will hit google api and store result in db.
'''
distance_in_km = 0
try:
search_data = get_or_none(Distance, origin= origin, destination= destination)
if search_data:
distance_in_km = convert_float(search_data.get('distance', 0.0))
else:
# get origin and destination lat/long
origin_lat_long = get_or_none(LatLong, node=origin, status=1)
if origin_lat_long.get('latitude'):
origin_lat_long = origin_lat_long.get('latitude'), origin_lat_long.get('longitude')
destination_lat_long = get_or_none(LatLong, node=destination, status=1)
if destination_lat_long.get('latitude'):
destination_lat_long = (destination_lat_long.get('latitude'), destination_lat_long.get('longitude'))
try:
geo_dist = GeoDist(settings.GOOGLE_MAPS_API_KEY)
response = geo_dist.find_dist_and_time(origin_lat_long, destination_lat_long)
if response.get('distance'):
distance = self.convert_distance_in_km(response.get('distance'))
if distance > 0:
a_dict = {
'id': self.get_next_document_id(),
'origin': origin,
'destination': destination,
'distance': distance,
'duration': response.get('duration')
}
# save a_dict to database
Distance(**a_dict).save()
distance_in_km = distance
except Exception as e:
print(e)
except Exception as e:
print(e)
return distance_in_km
def get_next_document_id(self):
dis_obj = Distance.objects.latest('id')
next_id = 1
if dis_obj:
next_id = int(dis_obj.id)+1
return next_id
class GeoDist:
'''
Get Distance and duration from google map
'''
def __init__(self, key='XXX'):
self.gmaps = googlemaps.Client(key=key)
self.now = datetime.now()
def find_dist_and_time(self, source, destination, mode='driving', transit_mode='bus'):
now = datetime.now()
directions_result = self.gmaps.directions(source, destination, mode=mode, departure_time=now)
for map1 in directions_result:
overall_stats = map1['legs']
for dimensions in overall_stats:
duration = dimensions['duration']
distance = dimensions['distance']
return {'distance': distance['text'], 'duration': duration['text']}
# geovis.py
# -*- coding: utf-8 -*-
"""
Created on Tuesday 23-July-2019
@author: Vikas shrivastava
For more info go to official documentation of folium https://python-visualization.github.io/folium/modules.html
"""
import random
import logging
import folium
import pandas as pd
import numpy as np
from collections import namedtuple
logger = logging.getLogger('TaskLog')
class GeoVis:
def __init__(self, center_lat=23, center_lon=77, zoom=5, width='100%', height='100%'):
'''
This is constructor function for set center position of map
Parameters:
center_lat: default(23),
center_lon: default(77),
zoom: int default(5),
width: int default(100%),
height: int default(100%),
'''
self.m = folium.Map(
location=[center_lat, center_lon],
zoom_start=zoom,
width=width,
height=height
)
def mark_points(self, df, lat_col='latitude', lon_col='longitude', color='green', **other_constraint_params ):
try:
label_fields = other_constraint_params.get('label_fields', [])
node_type_col = other_constraint_params.get('node_type_col')
color_col = other_constraint_params.get('color_node')
size_col = other_constraint_params.get('size_node')
if not label_fields:
label_fields = list(df.columns)
response = self.apply_constraints(df, other_constraint_params)
df.apply(
lambda row: self.mark_point(
row[lat_col], row[lon_col],
self.make_html_for_tooltip(row, label_fields),
radius= 4.5,
color=color,
node_type_params=response.get('node_type_params', {}),
node_type_col_val= row[node_type_col] if node_type_col else None,
color_params=response.get('color_params', {}),
color_col_val=row[color_col] if color_col else None,
size_params=response.get('size_params', {}),
size_col_val=row[size_col] if size_col else None,
),
axis=1)
except Exception as e:
logger.error("mark_points() in packages/planning_script/geovis.py file : "+str(e))
return self
def mark_point(self, lat, lon, label='DC', radius=1.5, color='blue', weight=0, opacity=0.7, point_type='circle_marker', **other_params):
if pd.isna(lat) or pd.isna(lon):
return
node_type_params = other_params.get('node_type_params', {})
node_type_col_val = other_params.get('node_type_col_val')
if node_type_params and node_type_col_val:
try:
node_type_params['col_val'] = float(node_type_col_val)
node_type_params['default'] = point_type
point_type = self.manage_various_values(**node_type_params)
except Exception as e:
logger.error("mark_point() in packages/planning_script/geovis.py file : "+str(e))
color_params = other_params.get('color_params', {})
color_col_val = other_params.get('color_col_val')
if color_params and color_col_val:
try:
color_params['col_val'] = float(color_col_val)
color_params['default'] = color
color = self.manage_various_values(**color_params)
except Exception as e:
logger.error("mark_point() in packages/planning_script/geovis.py file : "+str(e))
size_params = other_params.get('size_params', {})
size_col_val = other_params.get('size_col_val')
if size_params and size_col_val:
try:
size_params['col_val'] = float(size_col_val)
size_params['default'] = float(radius)
radius = self.manage_various_values(**size_params)
except Exception as e:
logger.error("mark_point() in packages/planning_script/geovis.py file : "+str(e))
params = {
'location': [lat, lon],
# 'popup': label,
'tooltip': label,
'color': color,
'line_color': False,
'fill': True,
'fill_color': color,
'fill_opacity': 0.3,
'weight': weight,
'radius': radius
}
try:
if point_type == 'circle':
folium.Circle(**params).add_to(self.m)
elif point_type == 'circle_marker':
folium.CircleMarker(**params).add_to(self.m)
elif point_type == 'marker':
params['icon'] = folium.Icon(color=color)
folium.Marker(**params).add_to(self.m)
except Exception as e:
logger.error("mark_point() in packages/planning_script/geovis.py file : "+str(e))
return self
def connection_establish(self, points, color="#FF0000", weight=5, opacity=0.5):
'''
This function connect one lat, long to other lat, long
Parameters :
points : [(lat, long), (lat2, long2)]
color : hexadecimal code(example: #CD5C5C), default: #FF0000
'''
folium.PolyLine(
points,
color=color,
weight=weight,
opacity=opacity
).add_to(self.m)
return self
def draw_connection(self, lat1, lon1, lat2, lon2, color='blue', weight=3, opacity=0.5, **other_params):
if pd.isna(lat1) or pd.isna(lon1) or pd.isna(lat2) or pd.isna(lon2):
return
p1 = [lat1, lon1]
p2 = [lat2, lon2]
cate_color_var = other_params.get('cate_color_var')
if cate_color_var:
pass
else:
color_params = other_params.get('color_params', {})
color_col_val = other_params.get('color_col_val')
if color_params and color_col_val:
try:
color_params['col_val'] = float(color_col_val)
color_params['default'] = color
color = self.manage_various_values(**color_params)
except Exception as e:
logger.error("draw_connection() in packages/planning_script/geovis.py file : "+str(e))
thickness_params = other_params.get('thickness_params', {})
thickness_col_val = other_params.get('thickness_col_val')
if thickness_params and thickness_col_val:
try:
thickness_params['col_val'] = float(thickness_col_val)
thickness_params['default'] = weight
weight = self.manage_various_values(**thickness_params)
except Exception as e:
logger.error("draw_connection() in packages/planning_script/geovis.py file : "+str(e))
folium.PolyLine(
[p1, p2],
color= color,
weight= weight,
opacity= opacity,
# tooltip= other_params.get('tooltip', '')
popup= other_params.get('tooltip', '')
).add_to(self.m)
return self
def connections(self, df, lat_col1='from_latitude', lon_col1='from_longitude', lat_col2='to_latitude',
lon_col2='to_longitude', color='blue', weight=2, opacity=0.5, **other_constraint_params):
label_fields = other_constraint_params.get('label_fields', [])
if not label_fields:
label_fields = list(df.columns)
color_col = other_constraint_params.get('color_node')
thickness_col = other_constraint_params.get('thickness_node')
response = self.apply_constraints(df, other_constraint_params)
cate_color_var = other_constraint_params.get('cate_color_var')
df.apply(
lambda row: self.draw_connection(
row[lat_col1], row[lon_col1],
row[lat_col2], row[lon_col2],
color = row['cate_color_code'] if cate_color_var else color,
weight=weight,
opacity=opacity,
tooltip=self.make_html_for_tooltip(row, label_fields),
color_params=response.get('color_params', {}),
color_col_val=row[color_col] if color_col else None,
thickness_params=response.get('thickness_params', {}),
thickness_col_val=row[thickness_col] if thickness_col else None,
cate_color_var=cate_color_var
),
axis = 1)
return self
def get_bearing(self, p1, p2):
'''
Returns compass bearing from p1 to p2
Parameters
p1 : namedtuple with lat lon
p2 : namedtuple with lat lon
Return
compass bearing of type float
'''
long_diff = np.radians(p2.lon - p1.lon)
lat1 = np.radians(p1.lat)
lat2 = np.radians(p2.lat)
x = np.sin(long_diff) * np.cos(lat2)
y = (np.cos(lat1) * np.sin(lat2) - (np.sin(lat1) * np.cos(lat2) * np.cos(long_diff)))
bearing = np.degrees(np.arctan2(x, y))
# adjusting for compass bearing
if bearing < 0:
return bearing + 360
return bearing
def get_arrows(self, locations, color='blue', size=3, n_arrows=3):
'''
Get a list of correctly placed and rotated
arrows/markers to be plotted
Parameters
locations : list of lists of lat lons that represent the
start and end of the line.
eg [[41.1132, -96.1993],[41.3810, -95.8021]]
arrow_color : default is 'blue'
size : default is 6
n_arrows : number of arrows to create. default is 3
Return
list of arrows/markers
'''
Point = namedtuple('Point', field_names=['lat', 'lon'])
# creating point from our Point named tuple
p1 = Point(locations[0][0], locations[0][1])
p2 = Point(locations[1][0], locations[1][1])
# getting the rotation needed for our marker.
# Subtracting 90 to account for the marker's orientation
# of due East(get_bearing returns North)
# rotation = self.get_bearing(p1, p2) - 90
# get an evenly space list of lats and lons for our arrows
# note that I'm discarding the first and last for aesthetics
# as I'm using markers to denote the start and end
arrow_lats = np.linspace(p1.lat, p2.lat, n_arrows + 2)[1:n_arrows+1]
arrow_lons = np.linspace(p1.lon, p2.lon, n_arrows + 2)[1:n_arrows+1]
arrows = []
#creating each "arrow" and appending them to our arrows list
for points in zip(arrow_lats, arrow_lons):
arrows.append(folium.RegularPolygonMarker(location=points,
fill_color=color, number_of_sides=1,
radius=size, rotation=0, weight=1))
return arrows
def manage_arrows(self, lat1, lon1, lat2, lon2):
if pd.isna(lat1) or pd.isna(lon1) or pd.isna(lat2) or pd.isna(lon2):
return
p1 = [lat1, lon1]
p2 = [lat2, lon2]
arrows = self.get_arrows([p1,p2])
for arrow in arrows:
arrow.add_to(self.m)
return self
def connection_flow_arrow(self, df, lat_col1 = 'from_latitude', lon_col1 = 'from_longitude', lat_col2 = 'to_latitude',
lon_col2 = 'to_longitude'):
df.apply(
lambda row: self.manage_arrows(
row[lat_col1], row[lon_col1],
row[lat_col2], row[lon_col2]
),
axis = 1)
return self
def manage_arrows_without_df(self, points):
for i in range(len(points)-1):
arrows = self.get_arrows([points[i], points[i+1]])
for arrow in arrows:
arrow.add_to(self.m)
return self
# local function
def make_html_for_tooltip(self, row, fields):
html = ''
for field in fields:
html += '<div><span><b>'+field +'</b>: <label>'+str(row[field])+'</label></br></span></div>'
return html
def apply_constraints(self, df, other_constraint_params={}):
response = {}
# for node type constraint
node_type_col = other_constraint_params.get('node_type_col')
if node_type_col:
node_type_filters = other_constraint_params.get('node_type_filters')
des_col = dict(df[node_type_col].describe())
response['node_type_params'] = self.prepare_constraint_data(node_type_col, node_type_filters, des_col)
# for color of node
color_col = other_constraint_params.get('color_node')
if color_col:
color_filters = other_constraint_params.get('color_filters')
des_col = dict(df[color_col].describe())
response['color_params'] = self.prepare_constraint_data(color_col, color_filters, des_col)
# for size of node
size_col = other_constraint_params.get('size_node')
if size_col:
size_filters = other_constraint_params.get('size_filters')
des_col = dict(df[size_col].describe())
response['size_params'] = self.prepare_constraint_data(size_col, size_filters, des_col)
# for thickness of node
thickness_col = other_constraint_params.get('thickness_node')
if thickness_col:
thickness_filters = other_constraint_params.get('thickness_filters')
des_col = dict(df[thickness_col].describe())
response['thickness_params'] = self.prepare_constraint_data(thickness_col, thickness_filters, des_col)
return response
def prepare_constraint_data(self, col_name, _filters, des_col):
_params = {}
if col_name:
_params = {
'col_max_val': des_col.get('max', '100'),
'col_min_val': des_col.get('min', '1'),
'_filters': _filters
}
return _params
def manage_various_values(self, col_max_val, col_min_val, _filters, col_val, default):
return_val = default
try:
x = ((col_val - col_min_val)/(col_max_val-col_min_val)) * 100
for _filter in _filters:
if float(_filter.get('min_val', 0.0)) <= x <= float(_filter.get('max_val', 100.0)):
return_val = _filter.get('attr_value')
break
except Exception as e:
print(e)
return return_val
# def manage_color_for_connection(self, col_max_val, col_min_val, _filters, col_val):
# color_code = '#a94442'
# try:
# x = ((col_val - col_min_val)/(col_max_val-col_min_val)) * 100
# for _filter in _filters:
# if float(_filter.get('min_val', 0.0)) <= x <= float(_filter.get('max_val', 100.0)):
# color_code = _filter.get('color_code')
# break
# except Exception as e:
# print(e)
# return color_code
# def manage_marker_type(self, col_max_val, col_min_val, _filters, col_val):
# point_type = 'circle_marker'
# try:
# x = ((col_val - col_min_val)/(col_max_val-col_min_val)) * 100
# for _filter in _filters:
# if float(_filter.get('min_val', 0.0)) <= x <= float(_filter.get('max_val', 100.0)):
# point_type = _filter.get('icon_type')
# break
# except Exception as e:
# print(e)
# return point_type
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment