Skip to content

Instantly share code, notes, and snippets.

@mvexel
Forked from emacsen/pyxbot.py
Created May 12, 2012 18:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mvexel/2668003 to your computer and use it in GitHub Desktop.
Save mvexel/2668003 to your computer and use it in GitHub Desktop.
OSM Tiger expansion code
#!/usr/bin/env python
"""This is the base library that can used to run various OSM bots
which are implemented as plugins"""
import sys
import os
from xml.sax.handler import ContentHandler
from xml.sax import make_parser
import codecs
VERSION = '0.1'
BOTNAME = 'TIGER name expansion 2012'
QUIET = False
DEBUG = False
class OSMHandler(ContentHandler):
"""This is a base OSMHandler class which sets up the XML parsing, etc.
You will want to override the selectElement and transformElement
functions"""
def __init__(self, file_prefix):
self.path = file_prefix
self.file_prefix = file_prefix
self.object_counter = 0
self.clear()
self.max_objects_per_file = 10000
self.file_counter = 0
self.out = None
self.roads = 0
self.total_fixed = 0
self.fixed = False
self.tried_to_fix = 0
self.unrecognized_tags = set()
self.unrecognized_direction_tags = set()
self.ambigious_expansions = []
def _open(self):
if not os.path.isdir(self.path):
os.mkdir(self.path)
fname = self.path + '/' + "%s_%04d.osm" % (self.file_prefix,
self.file_counter)
fh = codecs.open(fname, 'w', encoding='utf-8')
self.out = fh
self.out.write('<?xml version=\'1.0\' encoding=\'UTF-8\'?>\n')
self.out.write('<osm version="0.6" generator="pyxbot">\n')
def _close(self):
self.out.write('</osm>\n')
self.out.close()
self.out = None
self.object_counter = 0
self.file_counter = self.file_counter + 1
# The output methods don't do any kind of data validation
def _emit_node(self):
"Output a node"
if self.tags:
self.out.write('<node %s >\n' %
' '.join(['%s="%s"' % (x,y)
for x,y in self.attrs.items()]))
for tag in self.tags:
self.out.write(' <tag k="%s" v="%s" />\n' %
(tag, self.tags[tag]))
self.out.write('</node>\n')
else:
self.out.write('<node %s />\n' %
' '.join(['%s="%s"' % (x,y)
for x,y in self.attrs.items()]))
def _emit_way(self):
"Output a way"
self.out.write('<way %s >\n' % ' '.join(['%s="%s"' % (x, y)
for x, y in self.attrs.items()]))
if self.tags or self.nodes:
for nodeid in self.nodes:
self.out.write(' <nd ref="%s" />\n' % nodeid)
for tag in self.tags:
self.out.write(' <tag k="%s" v="%s" />\n'
% (tag, unicode(self.tags[tag])))
self.out.write('</way>\n')
else:
self.out.write('<way %s />\n' %
' '.join(['%s="%s"' % (x,y) for x,y in self.attrs]))
def _emit_relation(self):
"Output a relation"
if self.members or self.tags:
self.out.write('<relation %s >\n' %
' '.join(['%s="%s"' % (x,y)
for x,y in self.attrs.items()]))
for member in self.members:
self.out.write(' <member %s />\n' %
' '.join(['%s="%s"' % (x,y)
for x,y in member.items()]))
for tag in self.tags:
self.out.write(u' <tag k="%s" v="%s" />\n'
% (tag, self.tags[tag]))
self.out.write('</relation>\n')
else:
self.out.write('<relation %s />\n' %
' '.join(['%s="%s"' % (x,y)
for x,y in self.attrs.items()]))
def emit(self):
"Output the current element"
if self.name == 'node':
self._emit_node()
elif self.name == 'way':
self._emit_way()
elif self.name == 'relation':
self._emit_relation()
def clear(self):
"Initialize the state machine"
self.name = None
self.tags = {}
self.nodes = []
self.members = []
self.attrs = {}
def startElement(self, name, attrs):
"This function is called at the start of the element (as per SAX)"
if name == 'node':
self.name = 'node'
self.attrs = attrs.copy()
elif name == 'way':
self.name = 'way'
self.attrs = attrs.copy()
elif name == 'relation':
self.name = 'relation'
self.attrs = attrs.copy()
elif name == 'tag':
self.tags[attrs.get('k')] = attrs.get('v')
elif name == 'member':
self.members.append(attrs.copy())
elif name == 'nd':
self.nodes.append(attrs.get('ref'))
def selectElement(self):
"""Select whether or not we care about the OSM object (True or
False). Override this function in your handler"""
return False
def transformElement(self):
"""Transform the element. Override this function in your
handler"""
pass
def deleteElement(self):
"""Returns the string to delete the element. Please use with
caution!"""
self.out.write('<delete version="%s" generator="%s">\n' %
(VERSION, BOTNAME))
self.emit()
self.out.write('</delete>\n')
def endElement(self, name):
"""As per the SAX handler, this method is where any work is
done. You may want to override it, but probably not"""
# If there's no open output, we need to open it
if not self.out:
self._open()
if name == 'way':
self.nodes = tuple(self.nodes)
elif name == 'relation':
self.members = tuple(self.members)
if name == 'node' or name == 'way' or name == 'relation':
if self.selectElement():
self.transformElement()
self.emit()
self.object_counter = self.object_counter + 1
if self.object_counter > self.max_objects_per_file:
self._close()
self.clear()
road_types = {
'Aly': 'Alley',
'Ave': 'Avenue',
'Blvd': 'Boulevard',
'Br': 'Branch',
'Brg': 'Bridge',
'Byp': 'Bypass',
'Cir': 'Circle',
'Cres': 'Crescent',
'Ct': 'Court',
'Ctr': 'Center',
'Cv': 'Cove',
'Dr': 'Drive',
'Expy': 'Expressway',
'Fwy': 'Freeway',
'Hwy': 'Highway',
'Ln': 'Lane',
'Mal': 'Mall',
'Pky': 'Parkway',
'Pl': 'Place',
'Plz': 'Plaza',
'Rd': 'Road',
'Rte': 'Route',
'Sq': 'Square',
'St': 'Street',
'Ter': 'Terrace',
'Thwy': 'Throughway',
'Trce': 'Terrace',
'Trl' : 'Trail',
'Wkwy': 'Walkway',
'Xing': 'Crossing'}
ignore_road_types = {
'Way': None,
'Run': None,
'Path': None,
'Spur': None,
'Pike': None,
'Ramp': None,
'Loop': None,
'Square': None,
'Walk': None,
'Pass': None,
'Avenue': None,
'Row': None}
directions = {
'N': 'North',
'S': 'South',
'E': 'East',
'W': 'West',
'NE': 'Northeast',
'NW': 'Northwest',
'SE': 'Southeast',
'SW': 'Southwest'}
class TigerRoadExpansionHandler(OSMHandler):
def expand_road_type(self):
short_name = self.road_type
long_name = road_types[short_name]
name = self.tags['name']
basename = self.tags['tiger:name_base']
if not basename in name:
# Someone has modified the name.
return
rest = name[name.index(basename) + len(basename):].split()
if short_name in rest:
rest[rest.index(short_name)] = long_name
rest_str = ' '.join(rest)
name = name[:name.index(basename) + len(basename)] + ' ' + rest_str
self.tags['name'] = name
self.fixed = True
if DEBUG:
print "fixed to " + name
else:
self.ambigious_expansions.append(name)
def expand_direction_prefix(self):
short_direction = self.dir_tag_prefix
long_direction = directions[short_direction]
# Let's assume the prefix is always the first thing in the name
name_list = self.tags['name'].split()
if name_list[0] == short_direction:
name_list[0] = long_direction
self.tags['name'] = ' '.join(name_list)
self.fixed = True
if DEBUG:
print "fixed to" + ' '.join(name_list)
def expand_direction_suffix(self):
short_direction = self.dir_tag_suffix
long_direction = directions[short_direction]
# Let's assume the suffix is always the last thing in the name
name_list = self.tags['name'].split()
if name_list[-1] == short_direction:
name_list[-1] = long_direction
self.tags['name'] = ' '.join(name_list)
self.fixed = True
if DEBUG:
print "fixed to " + " ".join(name_list)
def selectElement(self):
tags = self.tags
# Eliminate most objects straight away
if not (self.name == 'way' and tags.has_key('highway') and
tags.has_key('tiger:name_base')):
return
if not tags.has_key('name'):
return
name = tags['name']
namel = name.split()
self.roads += 1
if DEBUG and self.roads % 1000 == 0:
print str(self.roads) + "..."
# If we have a name_type that we haven't seen, store it.
# If the name is ambigious, store it.
road_type = tags.get('tiger:name_type')
if road_type:
if road_type in ignore_road_types:
road_type = None
if not road_type in road_types:
self.unrecognized_tags.add(road_type)
road_type = None
elif namel.count(road_type) > 2:
self.ambigious_expansions.append(name)
road_type = None
elif namel.count(road_type) < 1:
road_type = None
self.road_type = road_type
# Same with the direction tags prefix
dir_tag_prefix = tags.get('tiger:name_direction_prefix')
if dir_tag_prefix:
if not dir_tag_prefix in directions:
self.unrecognized_direction_tags.add(dir_tag_prefix)
dir_tag_prefix = None
else:
if namel.count(dir_tag_prefix) > 2:
self.ambigious_expansions.append(name)
dir_tag_prefix = None
elif namel.count(dir_tag_prefix) < 1:
dir_tag_prefix = None
self.dir_tag_prefix = dir_tag_prefix
dir_tag_suffix = tags.get('tiger:name_direction_suffix')
if dir_tag_suffix:
if not dir_tag_suffix in directions:
self.unrecognized_direction_tags.add(dir_tag_suffix)
dir_tag_suffix = None
else:
if namel.count(dir_tag_suffix) > 2:
self.ambigious_expansions.append(name)
dir_tag_suffix = None
elif namel.count(dir_tag_suffix) < 1:
dir_tag_suffix = None
self.dir_tag_suffix = dir_tag_suffix
if road_type or dir_tag_suffix or dir_tag_prefix:
self.tried_to_fix += 1
if DEBUG:
print "trying to fix " + name
return True
def transformElement(self):
if self.road_type:
self.expand_road_type()
if self.dir_tag_prefix:
self.expand_direction_prefix()
if self.dir_tag_suffix:
self.expand_direction_suffix()
if self.fixed:
self.total_fixed += 1
parser = make_parser()
handler = TigerRoadExpansionHandler("expansions")
parser.setContentHandler(handler)
if len(sys.argv) < 2:
print "usage: tiger.py filename [--quiet|--debug] (use - for stdin)"
sys.exit(0)
if len(sys.argv) == 3:
QUIET = (sys.argv[2] == '--quiet')
DEBUG = (sys.argv[2] == '--debug')
if DEBUG:
print "Debugging output enabled."
sys.stdout.flush()
fname = sys.argv[1]
if fname == '-':
fd = sys.stdin
else:
fd = open(fname, 'r')
parser.parse(fd)
if not QUIET:
print "%d total roads" % handler.roads
print "%d roads we tried to fix" % handler.tried_to_fix
print "%d fixed" % handler.total_fixed
print "%d unrecognized tags" % len(handler.unrecognized_tags)
print "%d ambigious road names" % len(handler.ambigious_expansions)
print
print "Ambigious Names"
print "================"
for n in handler.ambigious_expansions:
print n
print
print "Unrecognized Tags"
print "================="
for n in handler.unrecognized_tags:
print n
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment