Last active
April 6, 2021 02:04
-
-
Save manleyroberts/91c896a72e2790898beddde1fca22a28 to your computer and use it in GitHub Desktop.
LMC3403-viz-tutorial
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name: ipymaps | |
channels: | |
- conda-forge | |
dependencies: | |
- geojson | |
- geopy | |
- ipyleaflet>=0.11.1 | |
- ipympl | |
- voila | |
- pip | |
- pip: | |
- "-r requirements.txt" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Starter Code obtained under MIT License, as below: | |
MIT License | |
Copyright (c) 2019 deeplook | |
Permission is hereby granted, free of charge, to any person obtaining a copy | |
of this software and associated documentation files (the "Software"), to deal | |
in the Software without restriction, including without limitation the rights | |
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
copies of the Software, and to permit persons to whom the Software is | |
furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in all | |
copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
SOFTWARE. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import os | |
import time | |
import json | |
import urllib | |
import datetime | |
import textwrap | |
import threading | |
import random | |
import requests | |
import geojson | |
import geohash | |
import osm2geojson | |
import pandas as pd | |
import geopandas as gpd | |
import openlocationcode.openlocationcode as olc | |
from shapely import wkt | |
from geopy.geocoders import Nominatim | |
from ipywidgets import Text, HTML, Layout | |
from ipyleaflet.leaflet import ControlException | |
from ipyleaflet import (Map, Marker, GeoJSON, Polyline, Icon, GeoData, WidgetControl, | |
FullScreenControl, LayersControl, ZoomControl, basemaps) | |
import ipyleaflet | |
def is_urlencoded(text): | |
"""Is this a URL-encoded string? | |
Find out by decoding and comparing to original. If they differ the | |
original is encoded, else the original isn't encoded. But that still | |
says nothing about whether the newly decoded version isn't still | |
encoded. | |
""" | |
return text != urllib.parse.unquote(text) | |
# provide a default identifer for a shape that can be used for demos | |
url = 'https://raw.githubusercontent.com/johan/world.geo.json/master/countries.geo.json' | |
countries = geojson.loads(requests.get(url).content) | |
del url | |
def bbox(coord_list): | |
"Find bounding box (lower-left and upper-right points) for a list of coordinates." | |
box = [] | |
for i in (0, 1): | |
res = sorted(coord_list, key=lambda x:x[i]) | |
box.append((res[0][i], res[-1][i])) | |
ll, ur = [box[0][0], box[1][0]], [box[0][1], box[1][1]] | |
return [ll, ur] | |
def overpass(query: str, fmt='json', url=None): | |
""" | |
Run a query on OSM Overpass API and return an XML string or a JSON/GeoJSON object. | |
The default server/path will be ``https://overpass-api.de/api/interpreter``. | |
""" | |
if fmt in ['json', 'geojson']: | |
data = '[out:json];' + query | |
elif fmt == 'xml': | |
data = '[out:xml];' + query | |
else: | |
raise ValueError('Format must be one of: json xml geojson') | |
url = url or 'https://overpass-api.de/api/interpreter' | |
params = dict(data=data) | |
text = requests.get(url, params=params).text | |
res = text | |
if fmt == 'json': | |
res = json.loads(text) | |
if fmt == 'geojson': | |
gj = osm2geojson.json2geojson(text) | |
res = geojson.loads(json.dumps(gj)) | |
return res | |
def wikidata(query: str, url=None): | |
""" | |
Run a query on Wikidata API in SparQL and return result as JSON object. | |
As the query will be sent in one text line it is important not to have | |
comments in it indicated by the # charater or it leads to a bad request. | |
The default server/path will be ``https://query.wikidata.org/sparql``. | |
""" | |
while is_urlencoded(query): | |
query = urllib.parse.unquote(query) | |
url = url or 'https://query.wikidata.org/sparql' | |
headers = {'User-agent': 'Mozilla/5.0'} | |
params = dict(query=query, format='json') | |
resp = requests.get(url, params=params, headers=headers) | |
if resp.status_code >= 400: | |
raise ValueError(resp.reason) | |
return resp.json() | |
def find_wktdata(js): | |
""" | |
Return the first variable name of type ``geosparql#wktLiteral`` from a SparQL JSON result. | |
""" | |
vars = js['head']['vars'] | |
b0 = js['results']['bindings'][0] | |
wkt_literal = 'http://www.opengis.net/ont/geosparql#wktLiteral' | |
for var in vars: | |
if b0[var].get('datatype', None) == wkt_literal: | |
return var | |
return None | |
class MapController: | |
""" | |
A map controller executing text entries in a search field on a map. | |
TODO: The various functions accessing external sources should become | |
some kind of plugins (e.g. Geocoding, OSM, Wikidata). | |
""" | |
def __init__(self, a_map, a_text, test_input=None, geocoder=None, globs=None): | |
self.geocoder = geocoder or Nominatim(user_agent="ipymaps") | |
self.globs = globs or {} | |
self.a_map = a_map | |
self.a_text = a_text | |
self.layer_name = '' | |
self.a_text.on_submit(self.text_changed) | |
self.test_input = test_input or re.split('\s*\n\s*', textwrap.dedent(''' | |
Paris | |
Buenos Aires | |
52.5 13.4 | |
Hong Kong | |
9F4MGC22+22 | |
https://raw.githubusercontent.com/johan/world.geo.json/master/countries.geo.json | |
http://commons.wikimedia.org/data/main/Data:Canada.map | |
http://overpass-api.de/api/interpreter?data=node["amenity"="post_box"](52.52, 13.35, 52.54, 13.45); | |
https://query.wikidata.org/#SELECT DISTINCT ?airport ?airportLabel ?coor WHERE { ?airport wdt:P31 wd:Q1248784; ?range wd:Q31; wdt:P625 ?coor. SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". } } | |
'''.strip())) | |
# setup status text field and controller | |
self.status_tx = Text('', layout=Layout(width='500px')) | |
self.status_wc = WidgetControl(widget=self.status_tx, position='bottomleft') | |
def text_changed(self, widget): | |
""" | |
This is called whenever the user hits enter in the search text field. | |
""" | |
now = datetime.datetime.utcnow().isoformat()[:19] | |
# commands | |
if widget.value == '/test': | |
self.test() | |
return | |
# locations and shapes | |
res = self.run_query(widget.value) | |
if type(res) == tuple: | |
self.a_map.center = res | |
t1 = threading.Thread(target=self.show_status, args=([str(res), 10])) | |
t1.start() | |
# t1.join() | |
elif type(res) == gpd.geodataframe.GeoDataFrame: | |
name = self.layer_name or now | |
gd = GeoData(geo_dataframe=res, | |
style={'color': 'black', 'fillColor': '#3366cc', 'opacity':0.05, | |
'weight':1.9, 'dashArray':'2', 'fillOpacity':0.6}, | |
hover_style={'fillColor': 'red', 'fillOpacity': 0.2}, | |
name=name) | |
self.a_map.add_layer(gd) | |
if self.layer_name: | |
self.layer_name = '' | |
elif hasattr(res, '__geo_interface__'): | |
data = res | |
# jump to center of data | |
ll, ur = bbox(list(geojson.utils.coords(data))) | |
center = [(ur[0]+ll[0])/2, (ur[1]+ll[1])/2] | |
self.a_map.center = list(reversed(center)) | |
name = self.layer_name or now | |
# name = os.path.basename(widget.value) | |
self.a_map.add_layer(GeoJSON(data=data, name=name)) | |
if self.layer_name: | |
self.layer_name = '' | |
def add_osmdata(self, query, url=None): | |
"""Load a GeoJSON string created from an OSM query. | |
Before execution the queries will be slightly extended to remain | |
within reasonable limits, both time and memory-wise. That means each | |
query will be prefixed with ``[timeout:300][maxsize:8388608];`` | |
limiting the max time and memory to 300 seconds and 8 Mb of memory. | |
And it will be suffixed with ``out;``. | |
""" | |
ll, ur = self.a_map.bounds | |
b = ll + ur | |
query = query + 'out;' | |
query = query.format(bounds=b) | |
return overpass(query, fmt='geojson', url=url) | |
def add_wikidata(self, query): | |
"""Make a Wikidata SparQL query and return a GeoPandas DataFrame. | |
""" | |
js = wikidata(query) | |
wkt_col = find_wktdata(js) | |
columns = js['head']['vars'] | |
arr = [[b[var]['value'] for var in columns] for b in js['results']['bindings']] | |
df = pd.DataFrame(arr, columns=columns) | |
df['geometry'] = df[wkt_col].apply(wkt.loads) | |
df = df.drop(labels=[wkt_col], axis=1) | |
gdf = gpd.GeoDataFrame(df, geometry='geometry') | |
return gdf | |
def add_pluscode(self, query): | |
"Convert a Google Pluscode (or Open Location Code) to a lat/lon." | |
if olc.isValid(query): | |
res = olc.decode(query) | |
lat, lon = res.latlng() | |
return lat, lon | |
else: | |
return None, None | |
def add_geojson(self, url): | |
"Load a GeoJSON string from the given URL, local or remote." | |
# load data | |
if url.startswith('file://'): | |
gj = open(url[7:]).read() | |
else: | |
gj = requests.get(url).content | |
return geojson.loads(gj) | |
def add_geojson_map(self, url): | |
"Load a map file from the given URL, local or remote." | |
# load data | |
if url.startswith('file://'): | |
js = open(url[7:]).read() | |
else: | |
js = requests.get(url).text | |
data = json.loads(js)['data'] | |
return geojson.loads(json.dumps(data)) | |
def show_status(self, msg, period=3): | |
"Add a temporary status line widget for some time to the map." | |
self.status_tx.value = msg | |
try: | |
self.a_map.add_control(self.status_wc) | |
except ControlException: | |
pass | |
time.sleep(period) | |
try: | |
self.a_map.remove_control(self.status_wc) | |
except ControlException: | |
pass | |
def test(self, period=5): | |
""" | |
Execute a sequence of test input lines. | |
""" | |
for line in self.test_input: | |
self.a_text.value = line | |
self.text_changed(self.a_text) | |
time.sleep(period) | |
def run_query(self, text): | |
""" | |
Run a query as given by the ``text`` string. | |
This must return eiter a lat/lon pair or a GeoJSON object for now. | |
""" | |
text = text.strip() | |
parsed_url = urllib.parse.urlparse(text) | |
# GeoJSON objects in passed namespace | |
if text in self.globs: | |
obj = self.globs[text] | |
if hasattr(obj, '__geo_interface__'): | |
return obj | |
# pluscodes | |
try: | |
if re.match('[A-Z0-9\+]', text): | |
lat, lon = self.add_pluscode(text) | |
if lat != None and lon != None: | |
return (lat, lon) | |
except ValueError: | |
pass | |
# lat lon | |
try: | |
values = re.split('[/,;: ]', text) | |
lat, lon = [v for v in values if v not in '/,;:'] | |
return tuple(list(map(float, (lat, lon)))) | |
except ValueError: | |
pass | |
# OSM Overpass Query | |
if 'overpass' in parsed_url.netloc.lower(): | |
if parsed_url.query: | |
query_dict = urllib.parse.parse_qs(parsed_url.query) | |
data = query_dict.get('data', None) | |
names = query_dict.get('name', None) | |
if names: | |
self.layer_name = names[0] | |
url = urllib.parse.urlunparse([ | |
parsed_url.scheme, parsed_url.netloc, parsed_url.path, '', '', '']) | |
# add ; since the terminal ; is filtered away by urllib.parse.parse_qs() ! | |
gj = self.add_osmdata(data[0] + ';', url=url) | |
return gj | |
# Wikidata SparQL Query | |
if 'wikidata' in parsed_url.netloc.lower(): | |
if parsed_url.query: | |
query_dict = urllib.parse.parse_qs(parsed_url.query) | |
names = query_dict.get('name', None) | |
if names: | |
self.layer_name = names[0] | |
gd = self.add_wikidata(parsed_url.fragment) | |
return gd | |
# URL pointing to some GeoJSON string | |
url = text | |
path = parsed_url.path.lower() | |
if path.endswith('.geojson') or path.endswith('.geo.json'): | |
gj = self.add_geojson(url) | |
self.layer_name = os.path.basename(url) | |
return gj | |
# URL pointing to some .map file containing GeoJSON | |
url = text | |
path = parsed_url.path.lower() | |
if path.endswith('.map'): | |
gj = self.add_geojson_map(url) | |
self.layer_name = os.path.basename(url) | |
return gj | |
# address to run geolocation for | |
loc = self.geocoder.geocode(text) | |
if loc: | |
center = loc.latitude, loc.longitude | |
return center | |
# try decoding geohash value | |
try: | |
return geohash.decode(text) | |
except ValueError: | |
pass | |
def getMap(): | |
bm = basemaps.OpenStreetMap['Mapnik'] | |
m = Map(center=[55, 40], zoom=5, | |
zoom_control=False, | |
basemap=bm, | |
layout=Layout(height='600px') | |
) | |
m.layers[0].name = bm['name'] | |
markers = [ | |
Marker(location=(random.uniform(50, 60), random.uniform(25, 55)), draggable=False, icon=(Icon(icon_url='solar.png') if random.uniform(0.0,1.0) < 0.6 else Icon(icon_url='battery.png'))) for i in range(50) | |
] | |
for marker in markers: | |
m.add_layer(marker); | |
print(random.uniform(0.0, 1.0)) | |
lines = [] | |
for marker_a in markers: | |
for marker_b in markers: | |
if marker_a != marker_b and (marker_a.location[0] - marker_b.location[0])**2 + (marker_a.location[1] - marker_b.location[1])**2 < 100: | |
if random.uniform(0.0,1.0) < 0.02: | |
lines.append(Polyline(locations=[ | |
marker_a.location, marker_b.location | |
], | |
color="green" , | |
fill=False)) | |
elif random.uniform(0.0,1.0) < 0.02: | |
lines.append(Polyline(locations=[ | |
marker_a.location, marker_b.location | |
], | |
color="black" , | |
fill=False)) | |
for line in lines: | |
m.add_layer(line) | |
# setup search search text field and controller | |
search_tx = Text('', layout=Layout(width='500px')) | |
wc = WidgetControl(widget=search_tx, position='topleft') | |
con = MapController(m, search_tx, globs=globals()) | |
# add controls | |
m.add_control(wc) | |
m.add_control(ZoomControl(position='topleft')) | |
m.add_control(FullScreenControl(position='topleft')) | |
layers_con = LayersControl(position='topright') | |
m.add_control(layers_con) | |
return m |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
python-geohash | |
geopandas | |
openlocationcode | |
osm2geojson |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment