Skip to content

Instantly share code, notes, and snippets.

@Zverik
Created September 20, 2017 13:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Zverik/3ca9b4fcc2cbd7b4567e720952df0dd5 to your computer and use it in GitHub Desktop.
Save Zverik/3ca9b4fcc2cbd7b4567e720952df0dd5 to your computer and use it in GitHub Desktop.
Add stop_area relations to a map
#!/usr/bin/env python3
import json
from lxml import etree
import sys
import kdtree
import math
QUERY = """
[out:json][timeout:25][bbox:{{bbox}}];
(
node["railway"="subway_entrance"];
node["station"="subway"];
node["station"="light_rail"];
node["public_transport"="stop_position"]["train"="yes"];
node["public_transport"="stop_position"]["subway"="yes"];
way["station"="subway"];
relation["station"="subway"];
way["railway"="platform"];
relation["railway"="platform"];
);
out meta center qt;
rel(bn);out meta;
"""
class StationWrapper:
def __init__(self, st):
self.coords = (st['lon'], st['lat'])
self.station = st
def __len__(self):
return 2
def __getitem__(self, i):
return self.coords[i]
def distance(self, other):
"""Calculate distance in meters."""
dx = math.radians(self[0] - other['lon']) * math.cos(
0.5 * math.radians(self[1] + other['lat']))
dy = math.radians(self[1] - other['lat'])
return 6378137 * math.sqrt(dx*dx + dy*dy)
if len(sys.argv) < 2:
print('Read a JSON from Overpass and output JOSM OSM XML with added stop_area relations')
print('Usage: {} <export.json> [output.osm]'.format(sys.argv[0]))
sys.exit(1)
with open(sys.argv[1], 'r') as f:
src = json.load(f)['elements']
# Create a kd-tree out of subway stations
stations = kdtree.create(dimensions=2)
for el in src:
if 'tags' in el and el['tags'].get('station', None) in ('subway', 'light_rail'):
stations.add(StationWrapper(el))
# Populate a list of nearby subway exits and platforms for each station
MAX_DISTANCE = 300 # meters
stop_areas = {}
for el in src:
if 'tags' not in el:
continue
if (el['tags'].get('railway', None) not in ('subway_entrance', 'platform') and
el['tags'].get('public_transport', None) not in ('platform', 'stop_position')):
continue
coords = el.get('center', el)
station = stations.search_nn((coords['lon'], coords['lat']))[0].data
if station.distance(coords) < MAX_DISTANCE:
k = (station.station['id'], station.station['tags']['name'])
# Disregard exits and platforms that are differently named
if el['tags'].get('name', k[1]) == k[1]:
if k not in stop_areas:
stop_areas[k] = [station.station]
stop_areas[k].append(el)
# Find existing stop_area relations for stations and remove these stations
for el in src:
if el['type'] == 'relation' and el['tags'].get('public_transport', None) == 'stop_area':
found = False
for m in el['members']:
if found:
break
if m['type'] == 'node': # Assuming all stations are nodes
for st in stop_areas:
if m['ref'] == st[0]:
del stop_areas[st]
found = True
break
# Create OSM XML for new stop_area relations
root = etree.Element('osm', version='0.6')
rid = -1
for st, members in stop_areas.items():
rel = etree.SubElement(root, 'relation', id=str(rid))
rid -= 1
etree.SubElement(rel, 'tag', k='type', v='public_transport')
etree.SubElement(rel, 'tag', k='public_transport', v='stop_area')
etree.SubElement(rel, 'tag', k='name', v=st[1])
for m in members:
if m['tags'].get('railway', m['tags'].get('public_transport', None)) == 'platform':
role = 'platform'
elif m['tags'].get('public_transport', None) == 'stop_position':
role = 'stop'
else:
role = ''
etree.SubElement(rel, 'member', ref=str(m['id']), type=m['type'], role=role)
# Add all downloaded elements
for el in src:
obj = etree.SubElement(root, el['type'])
for a in ('id', 'type', 'user', 'uid', 'version', 'changeset', 'timestamp', 'lat', 'lon'):
if a in el:
obj.set(a, str(el[a]))
if 'tags' in el:
for k, v in el['tags'].items():
etree.SubElement(obj, 'tag', k=k, v=v)
if 'members' in el:
for m in el['members']:
etree.SubElement(obj, 'member', ref=str(m['ref']),
type=m['type'], role=m.get('role', ''))
if 'nodes' in el:
for n in el['nodes']:
etree.SubElement(obj, 'nd', ref=str(n))
result = etree.tostring(root, pretty_print=True)
if len(sys.argv) < 3:
print(result.decode('utf-8'))
else:
with open(sys.argv[2], 'wb') as f:
f.write(result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment