Skip to content

Instantly share code, notes, and snippets.

@roman-yepishev
Last active March 4, 2016 13:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save roman-yepishev/5de31e0ae7446b46b7ff to your computer and use it in GitHub Desktop.
Save roman-yepishev/5de31e0ae7446b46b7ff to your computer and use it in GitHub Desktop.
Autonumbering magic for Boston
#!/usr/bin/python3
import os
import sys
import xml.sax
import xml.sax.saxutils
import xml.sax.xmlreader
import sqlite3
from shapely.geometry import (Point, MultiPoint, LineString, Polygon, CAP_STYLE)
from shapely.geometry.polygon import LinearRing
METERS_IN_DEG = 111325
BUFFER_DEFAULT = 500 #meters
class BoundedPolygon(Polygon):
def bounds_polygon(self):
p = self.bounds
return Polygon([
(p[0],p[1]),
(p[2],p[1]),
(p[2],p[3]),
(p[0],p[3])
])
class StreetLocator(object):
def __init__(self, cache):
self._cache = cache
def get_street(self, street):
# street can consist of multiple ways
nodes = []
for way_id in self._cache['way']:
is_highway = False
is_name_matched = False
for key, value in self._cache['way'][way_id]['tags'].items():
if key == 'highway':
is_highway = True
if key == 'name' and value.lower() == street.lower():
is_name_matched = True
if is_highway and is_name_matched:
print("Found street segment: {} ({} points)".format(
way_id, len(self._cache['way'][way_id]['nodes'])))
nodes.extend(self._cache['way'][way_id]['nodes'])
points = []
for node_id in nodes:
points.append(self._cache['node'][node_id]['location'])
street = LineString(MultiPoint(points))
return street
class BuildingLocator(object):
def __init__(self, cache):
self._cache = cache
def get_buildings(self, street, buf):
approx = buf / METERS_IN_DEG
buildings = {}
for way_id in self._cache['way']:
is_building = False
for key, value in self._cache['way'][way_id]['tags'].items():
if key == 'building':
is_building = True
if is_building:
buildings[way_id] = self._cache['way'][way_id]['nodes']
found = 0
matched = 0
result = []
for way, nodes in buildings.items():
found +=1
points = [ self._cache['node'][node]['location'] for node in nodes ]
# Figure out geom center
outline = BoundedPolygon(MultiPoint(points)).bounds_polygon()
if street.distance(outline) <= approx:
matched += 1
result.append((way, outline))
print("Matched {} buildings out of {} on the map".format(matched, found))
return result
class PropertyDatabase(object):
def __init__(self, path):
self._conn = sqlite3.connect(path)
def match(self, buildings, st_name, st_suf, osm_street):
c = self._conn.cursor()
c.execute('''
SELECT DISTINCT Location, ST_NUM FROM Property_Assessment_2015
WHERE TRIM(ST_NAME)=? AND TRIM(ST_NAME_SUF)=?
''', (st_name, st_suf))
cache = {}
for row in c:
location = Point(
[float(x) for x in row[0].lstrip('(').rstrip(')').split(',')]
)
housenumber = row[1]
cache[location] = housenumber
c.close()
missing = []
found = 0
matched = 0
result = []
for location, housenumber in cache.items():
matched_iter = False
found += 1
for way, outline in buildings:
if outline.contains(location):
housenumber = housenumber.strip()
if housenumber != '':
tags = {
'addr:housenumber': housenumber,
'addr:street': osm_street
}
else:
tags = {
'fixme': 'City has no housenumber for this building'
}
result.append({
'_type': 'way',
'id': way,
'tags': tags
})
matched_iter = True
matched += 1
if not matched_iter:
print("MISS: {} ({})".format(housenumber, location))
print("Matched {} buildings out of {} found in database".format(
matched, found))
return result
class JOSMGenerator(xml.sax.saxutils.XMLGenerator):
def __init__(self, out, updates):
self._updates = {};
self._updating = None
for item in updates:
if item['_type'] not in self._updates:
self._updates[item['_type']] = {}
if item['id'] in self._updates[item['_type']]:
item['tags']['fixme'] = \
'different address maps to the same building'
else:
self._updates[item['_type']][item['id']] = item['tags']
super(JOSMGenerator, self).__init__(out, encoding='UTF-8', short_empty_elements=True)
def startElement(self, name, attrs):
if name in self._updates:
if 'id' in attrs and attrs['id'] in self._updates[name]:
self._updating = attrs['id']
attrs = dict(attrs)
attrs['action'] = 'modify'
attrs['user'] = 'ryebread'
attrs = xml.sax.xmlreader.AttributesImpl(attrs)
super(JOSMGenerator, self).startElement(name, attrs)
def endElement(self, name):
if name in self._updates:
if self._updating is not None:
# create fake data. If this works, then it is AMAZING
for key, value in self._updates[name][self._updating].items():
self.startElement('tag', {'k': key, 'v': value})
self.endElement('tag')
self._updating = None
super(JOSMGenerator, self).endElement(name)
class OSMContentHandler(xml.sax.ContentHandler):
def __init__(self):
self._id = None
self._tags = {}
self.cache = {
'node': {},
'way': {},
'relation': {}
}
def startElement(self, name, attrs):
if 'id' in attrs:
self._id = attrs['id']
if name == 'node':
self.cache['node'][self._id] = {
'location': Point(float(attrs['lat']), float(attrs['lon']))
}
elif name == 'way':
self.cache['way'][self._id] = {
'nodes': []
}
elif name == 'relation':
self.cache['relation'][self._id] = {
}
elif name == 'tag':
key = attrs['k'].strip()
value = attrs['v'].strip()
if key != '' and value != '':
self._tags[key] = value
elif name == 'nd':
# node reference
self.cache['way'][self._id]['nodes'].append(
attrs['ref']
)
def endElement(self, name):
if name in self.cache:
self.cache[name][self._id]['tags'] = self._tags
self._tags = {}
self._id = None
class OSMReader(object):
def __init__(self):
self._parser = xml.sax.make_parser()
def read(self, path):
handler = OSMContentHandler()
self._parser.setContentHandler(handler)
with open(path, 'r') as fh:
self._parser.parse(fh)
return handler.cache
class JOSMWriter(object):
def __init__(self, input_path, output_path):
self._input = input_path
self._output = output_path
def merge(self, updates):
with open(self._input, 'r') as input_fh:
with open(self._output, 'w') as output_fh:
generator = JOSMGenerator(output_fh, updates)
parser = xml.sax.make_parser()
parser.setContentHandler(generator)
parser.parse(input_fh)
if __name__ == "__main__":
buf = BUFFER_DEFAULT
path = sys.argv[1]
street_name = sys.argv[2]
property_street_name = sys.argv[3]
property_street_name_suf = sys.argv[4]
output = os.path.join(os.path.dirname(path),
os.path.basename(path).rsplit('.', 1)[0] +
'-numbered.osm')
cache = OSMReader().read(path)
locator = StreetLocator(cache)
street = locator.get_street(street_name)
locator = BuildingLocator(cache)
buildings = locator.get_buildings(street, buf)
db = PropertyDatabase('Property_Assessment_2015.sqlite')
updates = db.match(buildings,
property_street_name,
property_street_name_suf,
street_name)
writer = JOSMWriter(path, output)
writer.merge(updates)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment