Skip to content

Instantly share code, notes, and snippets.

@manleyroberts
Last active April 6, 2021 02:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save manleyroberts/91c896a72e2790898beddde1fca22a28 to your computer and use it in GitHub Desktop.
Save manleyroberts/91c896a72e2790898beddde1fca22a28 to your computer and use it in GitHub Desktop.
LMC3403-viz-tutorial
name: ipymaps
channels:
- conda-forge
dependencies:
- geojson
- geopy
- ipyleaflet>=0.11.1
- ipympl
- voila
- pip
- pip:
- "-r requirements.txt"
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Starter Code obtained under MIT License, as below:
MIT License
Copyright (c) 2019 deeplook
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
import re
import os
import time
import json
import urllib
import datetime
import textwrap
import threading
import random
import requests
import geojson
import geohash
import osm2geojson
import pandas as pd
import geopandas as gpd
import openlocationcode.openlocationcode as olc
from shapely import wkt
from geopy.geocoders import Nominatim
from ipywidgets import Text, HTML, Layout
from ipyleaflet.leaflet import ControlException
from ipyleaflet import (Map, Marker, GeoJSON, Polyline, Icon, GeoData, WidgetControl,
FullScreenControl, LayersControl, ZoomControl, basemaps)
import ipyleaflet
def is_urlencoded(text):
"""Is this a URL-encoded string?
Find out by decoding and comparing to original. If they differ the
original is encoded, else the original isn't encoded. But that still
says nothing about whether the newly decoded version isn't still
encoded.
"""
return text != urllib.parse.unquote(text)
# provide a default identifer for a shape that can be used for demos
url = 'https://raw.githubusercontent.com/johan/world.geo.json/master/countries.geo.json'
countries = geojson.loads(requests.get(url).content)
del url
def bbox(coord_list):
"Find bounding box (lower-left and upper-right points) for a list of coordinates."
box = []
for i in (0, 1):
res = sorted(coord_list, key=lambda x:x[i])
box.append((res[0][i], res[-1][i]))
ll, ur = [box[0][0], box[1][0]], [box[0][1], box[1][1]]
return [ll, ur]
def overpass(query: str, fmt='json', url=None):
"""
Run a query on OSM Overpass API and return an XML string or a JSON/GeoJSON object.
The default server/path will be ``https://overpass-api.de/api/interpreter``.
"""
if fmt in ['json', 'geojson']:
data = '[out:json];' + query
elif fmt == 'xml':
data = '[out:xml];' + query
else:
raise ValueError('Format must be one of: json xml geojson')
url = url or 'https://overpass-api.de/api/interpreter'
params = dict(data=data)
text = requests.get(url, params=params).text
res = text
if fmt == 'json':
res = json.loads(text)
if fmt == 'geojson':
gj = osm2geojson.json2geojson(text)
res = geojson.loads(json.dumps(gj))
return res
def wikidata(query: str, url=None):
"""
Run a query on Wikidata API in SparQL and return result as JSON object.
As the query will be sent in one text line it is important not to have
comments in it indicated by the # charater or it leads to a bad request.
The default server/path will be ``https://query.wikidata.org/sparql``.
"""
while is_urlencoded(query):
query = urllib.parse.unquote(query)
url = url or 'https://query.wikidata.org/sparql'
headers = {'User-agent': 'Mozilla/5.0'}
params = dict(query=query, format='json')
resp = requests.get(url, params=params, headers=headers)
if resp.status_code >= 400:
raise ValueError(resp.reason)
return resp.json()
def find_wktdata(js):
"""
Return the first variable name of type ``geosparql#wktLiteral`` from a SparQL JSON result.
"""
vars = js['head']['vars']
b0 = js['results']['bindings'][0]
wkt_literal = 'http://www.opengis.net/ont/geosparql#wktLiteral'
for var in vars:
if b0[var].get('datatype', None) == wkt_literal:
return var
return None
class MapController:
"""
A map controller executing text entries in a search field on a map.
TODO: The various functions accessing external sources should become
some kind of plugins (e.g. Geocoding, OSM, Wikidata).
"""
def __init__(self, a_map, a_text, test_input=None, geocoder=None, globs=None):
self.geocoder = geocoder or Nominatim(user_agent="ipymaps")
self.globs = globs or {}
self.a_map = a_map
self.a_text = a_text
self.layer_name = ''
self.a_text.on_submit(self.text_changed)
self.test_input = test_input or re.split('\s*\n\s*', textwrap.dedent('''
Paris
Buenos Aires
52.5 13.4
Hong Kong
9F4MGC22+22
https://raw.githubusercontent.com/johan/world.geo.json/master/countries.geo.json
http://commons.wikimedia.org/data/main/Data:Canada.map
http://overpass-api.de/api/interpreter?data=node["amenity"="post_box"](52.52, 13.35, 52.54, 13.45);
https://query.wikidata.org/#SELECT DISTINCT ?airport ?airportLabel ?coor WHERE { ?airport wdt:P31 wd:Q1248784; ?range wd:Q31; wdt:P625 ?coor. SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". } }
'''.strip()))
# setup status text field and controller
self.status_tx = Text('', layout=Layout(width='500px'))
self.status_wc = WidgetControl(widget=self.status_tx, position='bottomleft')
def text_changed(self, widget):
"""
This is called whenever the user hits enter in the search text field.
"""
now = datetime.datetime.utcnow().isoformat()[:19]
# commands
if widget.value == '/test':
self.test()
return
# locations and shapes
res = self.run_query(widget.value)
if type(res) == tuple:
self.a_map.center = res
t1 = threading.Thread(target=self.show_status, args=([str(res), 10]))
t1.start()
# t1.join()
elif type(res) == gpd.geodataframe.GeoDataFrame:
name = self.layer_name or now
gd = GeoData(geo_dataframe=res,
style={'color': 'black', 'fillColor': '#3366cc', 'opacity':0.05,
'weight':1.9, 'dashArray':'2', 'fillOpacity':0.6},
hover_style={'fillColor': 'red', 'fillOpacity': 0.2},
name=name)
self.a_map.add_layer(gd)
if self.layer_name:
self.layer_name = ''
elif hasattr(res, '__geo_interface__'):
data = res
# jump to center of data
ll, ur = bbox(list(geojson.utils.coords(data)))
center = [(ur[0]+ll[0])/2, (ur[1]+ll[1])/2]
self.a_map.center = list(reversed(center))
name = self.layer_name or now
# name = os.path.basename(widget.value)
self.a_map.add_layer(GeoJSON(data=data, name=name))
if self.layer_name:
self.layer_name = ''
def add_osmdata(self, query, url=None):
"""Load a GeoJSON string created from an OSM query.
Before execution the queries will be slightly extended to remain
within reasonable limits, both time and memory-wise. That means each
query will be prefixed with ``[timeout:300][maxsize:8388608];``
limiting the max time and memory to 300 seconds and 8 Mb of memory.
And it will be suffixed with ``out;``.
"""
ll, ur = self.a_map.bounds
b = ll + ur
query = query + 'out;'
query = query.format(bounds=b)
return overpass(query, fmt='geojson', url=url)
def add_wikidata(self, query):
"""Make a Wikidata SparQL query and return a GeoPandas DataFrame.
"""
js = wikidata(query)
wkt_col = find_wktdata(js)
columns = js['head']['vars']
arr = [[b[var]['value'] for var in columns] for b in js['results']['bindings']]
df = pd.DataFrame(arr, columns=columns)
df['geometry'] = df[wkt_col].apply(wkt.loads)
df = df.drop(labels=[wkt_col], axis=1)
gdf = gpd.GeoDataFrame(df, geometry='geometry')
return gdf
def add_pluscode(self, query):
"Convert a Google Pluscode (or Open Location Code) to a lat/lon."
if olc.isValid(query):
res = olc.decode(query)
lat, lon = res.latlng()
return lat, lon
else:
return None, None
def add_geojson(self, url):
"Load a GeoJSON string from the given URL, local or remote."
# load data
if url.startswith('file://'):
gj = open(url[7:]).read()
else:
gj = requests.get(url).content
return geojson.loads(gj)
def add_geojson_map(self, url):
"Load a map file from the given URL, local or remote."
# load data
if url.startswith('file://'):
js = open(url[7:]).read()
else:
js = requests.get(url).text
data = json.loads(js)['data']
return geojson.loads(json.dumps(data))
def show_status(self, msg, period=3):
"Add a temporary status line widget for some time to the map."
self.status_tx.value = msg
try:
self.a_map.add_control(self.status_wc)
except ControlException:
pass
time.sleep(period)
try:
self.a_map.remove_control(self.status_wc)
except ControlException:
pass
def test(self, period=5):
"""
Execute a sequence of test input lines.
"""
for line in self.test_input:
self.a_text.value = line
self.text_changed(self.a_text)
time.sleep(period)
def run_query(self, text):
"""
Run a query as given by the ``text`` string.
This must return eiter a lat/lon pair or a GeoJSON object for now.
"""
text = text.strip()
parsed_url = urllib.parse.urlparse(text)
# GeoJSON objects in passed namespace
if text in self.globs:
obj = self.globs[text]
if hasattr(obj, '__geo_interface__'):
return obj
# pluscodes
try:
if re.match('[A-Z0-9\+]', text):
lat, lon = self.add_pluscode(text)
if lat != None and lon != None:
return (lat, lon)
except ValueError:
pass
# lat lon
try:
values = re.split('[/,;: ]', text)
lat, lon = [v for v in values if v not in '/,;:']
return tuple(list(map(float, (lat, lon))))
except ValueError:
pass
# OSM Overpass Query
if 'overpass' in parsed_url.netloc.lower():
if parsed_url.query:
query_dict = urllib.parse.parse_qs(parsed_url.query)
data = query_dict.get('data', None)
names = query_dict.get('name', None)
if names:
self.layer_name = names[0]
url = urllib.parse.urlunparse([
parsed_url.scheme, parsed_url.netloc, parsed_url.path, '', '', ''])
# add ; since the terminal ; is filtered away by urllib.parse.parse_qs() !
gj = self.add_osmdata(data[0] + ';', url=url)
return gj
# Wikidata SparQL Query
if 'wikidata' in parsed_url.netloc.lower():
if parsed_url.query:
query_dict = urllib.parse.parse_qs(parsed_url.query)
names = query_dict.get('name', None)
if names:
self.layer_name = names[0]
gd = self.add_wikidata(parsed_url.fragment)
return gd
# URL pointing to some GeoJSON string
url = text
path = parsed_url.path.lower()
if path.endswith('.geojson') or path.endswith('.geo.json'):
gj = self.add_geojson(url)
self.layer_name = os.path.basename(url)
return gj
# URL pointing to some .map file containing GeoJSON
url = text
path = parsed_url.path.lower()
if path.endswith('.map'):
gj = self.add_geojson_map(url)
self.layer_name = os.path.basename(url)
return gj
# address to run geolocation for
loc = self.geocoder.geocode(text)
if loc:
center = loc.latitude, loc.longitude
return center
# try decoding geohash value
try:
return geohash.decode(text)
except ValueError:
pass
def getMap():
bm = basemaps.OpenStreetMap['Mapnik']
m = Map(center=[55, 40], zoom=5,
zoom_control=False,
basemap=bm,
layout=Layout(height='600px')
)
m.layers[0].name = bm['name']
markers = [
Marker(location=(random.uniform(50, 60), random.uniform(25, 55)), draggable=False, icon=(Icon(icon_url='solar.png') if random.uniform(0.0,1.0) < 0.6 else Icon(icon_url='battery.png'))) for i in range(50)
]
for marker in markers:
m.add_layer(marker);
print(random.uniform(0.0, 1.0))
lines = []
for marker_a in markers:
for marker_b in markers:
if marker_a != marker_b and (marker_a.location[0] - marker_b.location[0])**2 + (marker_a.location[1] - marker_b.location[1])**2 < 100:
if random.uniform(0.0,1.0) < 0.02:
lines.append(Polyline(locations=[
marker_a.location, marker_b.location
],
color="green" ,
fill=False))
elif random.uniform(0.0,1.0) < 0.02:
lines.append(Polyline(locations=[
marker_a.location, marker_b.location
],
color="black" ,
fill=False))
for line in lines:
m.add_layer(line)
# setup search search text field and controller
search_tx = Text('', layout=Layout(width='500px'))
wc = WidgetControl(widget=search_tx, position='topleft')
con = MapController(m, search_tx, globs=globals())
# add controls
m.add_control(wc)
m.add_control(ZoomControl(position='topleft'))
m.add_control(FullScreenControl(position='topleft'))
layers_con = LayersControl(position='topright')
m.add_control(layers_con)
return m
python-geohash
geopandas
openlocationcode
osm2geojson
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment