Skip to content

Instantly share code, notes, and snippets.

@emacsen
Created May 11, 2012 00:15
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save emacsen/2656735 to your computer and use it in GitHub Desktop.
Save emacsen/2656735 to your computer and use it in GitHub Desktop.
OSM Tiger expansion code
from xml.sax.handler import ContentHandler
import os
import codecs
from xml.sax.saxutils import escape
class OSMHandler(ContentHandler):
"""This is a base OSMHandler class which sets up the XML parsing, etc.
You will want to override the selectElement and transformElement
functions"""
def __init__(self, file_prefix):
self.path = file_prefix
self.file_prefix = file_prefix
self.object_counter = 0
self.clear()
self.max_objects_per_file = 1000
self.file_counter = 0
self.out = None
def _open(self):
if not os.path.isdir(self.path):
os.mkdir(self.path)
#fh = codecs.open(self.path + '/' + "%s_%04d.osm" %
# (self.file_prefix, self.file_counter), 'w', "utf-8")
self.fname = self.path + '/' + "%04d.osm" % self.file_counter
#print "Opening %s" % self.fname
fh = codecs.open(self.fname, 'w', "utf-8")
self.out = fh
self.out.write('<?xml version=\'1.0\' encoding=\'UTF-8\'?>\n')
self.out.write('<osm version="0.6" generator="pyxbot">\n')
def _close(self):
#print "Closing " + self.fname
self.out.write('</osm>\n')
self.out.flush()
self.out.close()
self.out = None
self.object_counter = 0
self.file_counter = self.file_counter + 1
def bump_version(self):
self.attrs['version'] = str(int(self.attrs['version']) + 1)
self.attrs['version'] = str(int(self.attrs['version']) + 1)
def remove_user_changeset(self):
if self.attrs.get('changeset'):
del(self.attrs['changeset'])
if self.attrs.get('uid'):
del(self.attrs['uid'])
if self.attrs.get('user'):
del(self.attrs['user'])
if self.attrs.get('timestamp'):
del(self.attrs['timestamp'])
# The output methods don't do any kind of data validation
def _str_node(self):
"Return a node as a string"
if self.tags:
s = u'<node %s >\n' % ' '.join([u'%s="%s"' % (x,y)
for x,y in self.attrs.items()])
for key,val in self.tags.items():
s += u' <tag k="%s" v="%s" />\n' % (escape(key), escape(val))
s += u'</node>'
else:
s = u'<node %s />\n' % ' '.join(['%s="%s"' % (x,y)
for x,y in self.attrs.items()])
return s
def _str_way(self):
"Output a way as a string"
s = u'<way %s >\n' % ' '.join([u'%s="%s"' % (x, y)
for x, y in self.attrs.items()])
for nodeid in self.nodes:
s += u' <nd ref="%s" />\n' % nodeid
for key, val in self.tags.items():
s += u' <tag k="%s" v="%s" />\n' % (escape(key), escape(val))
s += u'</way>\n'
return s
def _str_relation(self):
if self.members or self.tags:
s = u'<relation %s >\n' % ' '.join([u'%s="%s"' % (x, y)
for x, y in self.attrs.items()])
for member in members:
s += u' <member %s />\n' % ' '.join(['%s="%s"' % (x,y)
for x,y in member.items()])
for key, val in self.tags.items():
s += u' <tag k="%s" v="%s" />\n' % (escape(key), escape(val))
s += u'</relation>\n'
else:
s = u'<relation %s />\n' % ' '.join([u'%s="%s"' % (x, y)
for x, y in self.attrs.items()])
return s
def emit(self):
"Output the current element"
if self.type == 'node':
s = self._str_node()
elif self.type == 'way':
s = self._str_way()
elif self.type == 'relation':
s = self._str_relation()
self.out.write(s)
def clear(self):
"Initialize the state machine"
self.type = None
self.tags = {}
self.nodes = []
self.members = []
self.attrs = {}
self.fixed = None
def startElement(self, tag, attrs):
"This function is called at the start of the element (as per SAX)"
if tag == 'node':
self.type = 'node'
self.attrs = dict(attrs)
elif tag == 'way':
self.type = 'way'
self.attrs = dict(attrs)
elif tag == 'relation':
self.type = 'relation'
self.attrs = dict(attrs)
elif tag == 'tag':
self.tags[attrs.get('k')] = attrs.get('v')
elif tag == 'member':
self.members.append(attrs.copy())
elif tag == 'nd':
self.nodes.append(attrs.get('ref'))
def selectElement(self):
"""Select whether or not we care about the OSM object (True or
False). Override this function in your handler"""
return False
def transformElement(self):
"""Transform the element. Override this function in your
handler"""
pass
def deleteElement(self):
"""Returns the string to delete the element. Please use with
caution!"""
self.out.write('<delete version="%s" generator="%s">\n' %
(VERSION, BOTNAME))
self.emit()
self.out.write('</delete>\n')
def endElement(self, tag):
"""As per the SAX handler, this method is where any work is
done. You may want to override it, but probably not"""
# If there's no open output, we need to open it
if not self.out:
self._open()
if tag == 'way':
self.nodes = tuple(self.nodes)
elif tag == 'relation':
self.members = tuple(self.members)
if tag == 'node' or tag == 'way' or tag == 'relation':
if self.selectElement():
self.transformElement()
if self.fixed:
self.emit()
self.object_counter = self.object_counter + 1
if self.object_counter >= self.max_objects_per_file:
self._close()
self.clear()
def endDocument(self):
self._close()
#!/usr/bin/env python
"""This is the base library that can used to run various OSM bots
which are implemented as plugins"""
import sys
from xml.sax.handler import ContentHandler
from xml.sax import make_parser
from xml.sax.saxutils import quoteattr
import argparse
from pyxbot import OSMHandler
from os import remove
import codecs
def add_or_incr(dct, item):
if dct.has_key(item):
dct[item] = dct[item] + 1
else:
dct[item] = 1
# Thank you https://www.usps.com/send/official-abbreviations.htm
road_types = {
'Aly': 'Alley',
'Anx': 'Annex', ## From USPS
'Arc': 'Arcade', ## From USPS
'Ave': 'Avenue',
'Bch': 'Beach', ## From USPS
'Blf': 'Bluff', ## From USPS
'Blfs': 'Bluffs', ## From USPS
'Blvd': 'Boulevard',
'Bnd': 'Bend', ## From USPS
'Br': 'Bridge',
'Brg': 'Bridge',
'Byp': 'Bypass',
'Byu': 'Bayoo', ## From USPS
'Cir': 'Circle',
'Cres': 'Crescent',
'Cswy': 'Crossway',
'Ct': 'Court',
'Ctr': 'Center',
'Cv': 'Cove',
'Dr': 'Drive',
'Expy': 'Expressway',
'Expwy': 'Expressway',
'FMRd': 'Farm to Market Road',
'Fwy': 'Freeway',
'Grd': 'Grade',
'Hbr': 'Harbor',
'Holw': 'Hollow',
'Hwy': 'Highway',
'Ln': 'Lane',
'Lndg': 'Landing',
'Mal': 'Mall',
'Mtwy': 'Motorway',
'Ovps': 'Overpass',
'Pky': 'Parkway',
'Pkwy': 'Parkway',
'Pl': 'Place',
'Plz': 'Plaza',
'Rd': 'Road',
'Rdg': 'Ridge',
'RMRd': 'Ranch to Market Road',
'Rte': 'Route',
'Skwy', 'Skyway',
'Sq': 'Square',
'St': 'Street',
'Ter': 'Terrace',
'Tfwy': 'Trafficway',
'Thfr': 'Thoroughfare',
'Thwy': 'Thruway',
'Tpke': 'Turnpike',
'Trce': 'Trace',
'Trl' : 'Trail',
'Tunl': 'Tunnel',
'Unp': 'Underpass',
'Wkwy': 'Walkway',
'Xing': 'Crossing',
### NOT EXPANDED
'Way': 'Way',
'Walk': 'Walk',
'Loop': 'Loop',
'Oval': 'Oval',
'Ramp': 'Ramp',
'Row': 'Row',
'Run': 'Run',
'Pass': 'Pass',
'Spur': 'Spur',
'Path': 'Path',
'Pike': 'Pike',
'Rue': 'Rue',
'Mall': 'Mall',
}
directions = {
'N': 'North',
'S': 'South',
'E': 'East',
'W': 'West',
'NE': 'Northeast',
'NW': 'Northwest',
'SE': 'Southeast',
'SW': 'Southwest'}
class TigerRoadExpansionHandler(OSMHandler):
def __init__(self, file_prefix):
OSMHandler.__init__(self, file_prefix)
self.roads = 0
self.num_fixed = 0
self.checkme_ways = []
self.unrecognized_tags = {}
self.unrecognized_direction_tags = {}
self.ambigious_expansions = {}
def selectElement(self):
tags = self.tags
# We only care about ways with highway=* tags that have tiger:name_base
if not (self.type == 'way' and tags.has_key('highway') and
tags.has_key('tiger:name_base')):
return
# Of those, we only care about those with a name
if not tags.has_key('name'):
return
name = tags['name']
self.roads += 1
self.namel = name.split()
# If we have a name_type that we haven't seen, store it.
# If the name is ambigious, store it.
road_type = tags.get('tiger:name_type')
if road_type:
if road_type not in road_types:
add_or_incr(self.unrecognized_tags, road_type)
self.checkme_ways.append({'name': tags.get('name'),
'id': self.attrs['id'],
'reason': 'Unknown road_type (%s)' % road_type})
road_type = None
elif self.namel.count(road_type) > 1:
add_or_incr(self.ambigious_expansions, name)
self.checkme_ways.append({'name': tags.get('name'),
'id': self.attrs['id'],
'reason': 'Ambigious expansion'})
road_type = None
elif self.namel.count(road_type) < 1:
if not self.namel.count(road_types[road_type]) >= 1:
self.checkme_ways.append({'name': tags.get('name'),
'id': self.attrs['id'],
'reason': 'Road type (%s) not in name' % road_type})
road_type = None
self.road_type = road_type
# Same with the direction tags prefix
dir_tag_prefix = tags.get('tiger:name_direction_prefix')
if dir_tag_prefix:
if not dir_tag_prefix in directions:
add_or_incr(self.unrecognized_direction_tags, dir_tag_prefix)
dir_tag_prefix = None
else:
if self.namel.count(dir_tag_prefix) > 1:
add_or_incr(self.ambigious_expansions, name)
dir_tag_prefix = None
elif self.namel.count(dir_tag_prefix) < 1:
dir_tag_prefix = None
self.dir_tag_prefix = dir_tag_prefix
dir_tag_suffix = tags.get('tiger:name_direction_suffix')
if dir_tag_suffix:
if not dir_tag_suffix in directions:
add_or_incr(self.unrecognized_direction_tags, dir_tag_suffix)
dir_tag_suffix = None
else:
if self.namel.count(dir_tag_suffix) > 1:
add_or_incr(self.ambigious_expansions, name)
dir_tag_suffix = None
elif self.namel.count(dir_tag_suffix) < 1:
dir_tag_suffix = None
self.dir_tag_suffix = dir_tag_suffix
if road_type or dir_tag_suffix or dir_tag_prefix:
return True
def transformElement(self):
tags = self.tags
name = tags['name']
tags = self.tags
namel = self.namel
short_road_type = self.road_type
if short_road_type:
long_road_type = road_types[short_road_type]
indx = namel.index(short_road_type)
namel[indx] = long_road_type
dir_tag_prefix = self.dir_tag_prefix
if dir_tag_prefix:
try:
long_direction = directions[dir_tag_prefix]
except KeyError:
self.checkme_ways.append({'name': tags.get('name'),
'id': self.attrs['id'],
'reason': 'Direction prefix (%s) not in directions list' % dir_tag_prefix})
return
try:
indx = namel.index(dir_tag_prefix)
namel[indx] = long_direction
except ValueError:
self.checkme_ways.append({'name': tags.get('name'),
'id': self.attrs['id'],
'reason': 'Direction prefix (%s) not in name' % dir_tag_prefix})
return
dir_tag_suffix = self.dir_tag_suffix
if dir_tag_suffix:
try:
long_direction = directions[dir_tag_suffix]
except KeyError:
self.checkme_ways.append({'name': tags.get('name'),
'id': self.attrs['id'],
'reason': 'Direction suffix (%s) not in directions list' % dir_tag_prefix})
return
try:
indx = namel.index(dir_tag_suffix)
namel[indx] = long_direction
except ValueError:
self.checkme_ways.append({'name': tags.get('name'),
'id': self.attrs['id'],
'reason': 'Direction suffix (%s) not in name' % dir_tag_suffix})
return
newname = ' '.join(namel)
if newname != name:
self.tags['name'] = newname
self.bump_version()
self.remove_user_changeset()
self.fixed = True
self.num_fixed += 1
def endDocument(self):
self._close()
if self.num_fixed == 0:
remove(self.fname)
def main():
argparser = argparse.ArgumentParser(description="Tiger expansion bot")
argparser.add_argument('--input', dest = 'infname',
help = 'The input filename')
argparser.add_argument('--outdir', dest = 'outdirname',
default = 'processed', help = 'The output directory')
argparser.add_argument('--checkways', dest = 'checkways_fname',
default = 'ways.csv',
help = "Unfixable way csv file")
args = argparser.parse_args()
if args.infname == '-':
input = sys.stdin
args.infname = 'expansion'
else:
input = open(args.infname, 'r')
if not args.outdirname:
args.outdirname = args.infname
dirname = args.outdirname
parser = make_parser()
handler = TigerRoadExpansionHandler(dirname)
parser.setContentHandler(handler)
parser.parse(input)
#print "%d total roads" % handler.roads
#print "%d fixed roads" % handler.num_fixed
#print "%d unrecognized tags" % len(handler.unrecognized_tags)
#print "%d ambigious road names" % len(handler.ambigious_expansions)
#print
#print "Ambigious Names"
#print "================"
#for key, val in handler.ambigious_expansions.items():
# print "%s (%s)" % (key, val)
#print
#print "Unrecognized Tags"
#print "================="
#for key,val in handler.unrecognized_tags.items():
# print "%s (%s)" % (key, val)
if handler.checkme_ways:
fd = codecs.open(args.checkways_fname, 'w', 'utf-8')
fd.write('ID,Name,Reason\n')
for i in handler.checkme_ways:
fd.write("%s,%s,%s\n" % (i['id'], i['name'], i['reason']))
fd.close()
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment