Skip to content

Instantly share code, notes, and snippets.

@1ec5
Forked from emacsen/pyxbot.py
Created May 11, 2012 15:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save 1ec5/2660570 to your computer and use it in GitHub Desktop.
Save 1ec5/2660570 to your computer and use it in GitHub Desktop.
OSM Tiger expansion code
#!/usr/bin/env python
"""This is the base library that can used to run various OSM bots
which are implemented as plugins"""
import sys
import os
from xml.sax.handler import ContentHandler
from xml.sax import make_parser
from xml.sax.saxutils import quoteattr
class OSMHandler(ContentHandler):
"""This is a base OSMHandler class which sets up the XML parsing, etc.
You will want to override the selectElement and transformElement
functions"""
def __init__(self, file_prefix):
self.path = file_prefix
self.file_prefix = file_prefix
self.object_counter = 0
self.clear()
self.max_objects_per_file = 10000
self.file_counter = 0
self.out = None
self.roads = 0
self.fixed = 0
self.unrecognized_tags = set()
self.unrecognized_direction_tags = set()
self.ambigious_expansions = []
def _open(self):
if not os.path.isdir(self.path):
os.mkdir(self.path)
fh = open(self.path + '/' + "%s_%04d.osm" %
(self.file_prefix, self.file_counter), 'w')
self.out = fh
self.out.write('<?xml version=\'1.0\' encoding=\'UTF-8\'?>\n')
self.out.write('<osm version="0.6" generator="pyxbot">\n')
def _close(self):
self.out.write('</osm>\n')
self.out.close()
self.out = None
self.object_counter = 0
self.file_counter = self.file_counter + 1
# The output methods don't do any kind of data validation
def _emit_node(self):
"Output a node"
if self.tags:
self.out.write('<node %s >\n' %
' '.join(['%s="%s"' % (x,y)
for x,y in self.attrs.items()]))
for tag in self.tags:
self.out.write(u' <tag k="%s" v="%s" />\n' %
(tag, self.tags[tag]))
self.out.write('</node>\n')
else:
self.out.write('<node %s />\n' %
' '.join(['%s="%s"' % (x,y)
for x,y in self.attrs.items()]))
def _emit_way(self):
"Output a way"
self.out.write('<way %s >\n' % ' '.join(['%s="%s"' % (x, y)
for x, y in self.attrs.items()]))
if self.tags or self.nodes:
for nodeid in self.nodes:
self.out.write(' <nd ref="%s" />\n' % nodeid)
for tag in self.tags:
self.out.write(u' <tag k="%s" v="%s" />\n'
% (tag, self.tags[tag]))
self.out.write('</way>\n')
else:
self.out.write('<way %s />\n' %
' '.join(['%s="%s"' % (x,y) for x,y in self.attrs]))
def _emit_relation(self):
"Output a relation"
if self.members or self.tags:
self.out.write('<relation %s >\n' %
' '.join(['%s="%s"' % (x,y)
for x,y in self.attrs.items()]))
for member in self.members:
self.out.write(' <member %s />\n' %
' '.join(['%s="%s"' % (x,y)
for x,y in member.items()]))
for tag in self.tags:
self.out.write(u' <tag k="%s" v="%s" />\n'
% (tag, self.tags[tag]))
self.out.write('</relation>\n')
else:
self.out.write('<relation %s />\n' %
' '.join(['%s="%s"' % (x,y)
for x,y in self.attrs.items()]))
def emit(self):
"Output the current element"
if self.name == 'node':
self._emit_node()
elif self.name == 'way':
self._emit_way()
elif self.name == 'relation':
self._emit_relation()
def clear(self):
"Initialize the state machine"
self.name = None
self.tags = {}
self.nodes = []
self.members = []
self.attrs = {}
def startElement(self, name, attrs):
"This function is called at the start of the element (as per SAX)"
if name == 'node':
self.name = 'node'
self.attrs = attrs.copy()
elif name == 'way':
self.name = 'way'
self.attrs = attrs.copy()
elif name == 'relation':
self.name = 'relation'
self.attrs = attrs.copy()
elif name == 'tag':
self.tags[attrs.get('k')] = attrs.get('v')
elif name == 'member':
self.members.append(attrs.copy())
elif name == 'nd':
self.nodes.append(attrs.get('ref'))
def selectElement(self):
"""Select whether or not we care about the OSM object (True or
False). Override this function in your handler"""
return False
def transformElement(self):
"""Transform the element. Override this function in your
handler"""
pass
def deleteElement(self):
"""Returns the string to delete the element. Please use with
caution!"""
self.out.write('<delete version="%s" generator="%s">\n' %
(VERSION, BOTNAME))
self.emit()
self.out.write('</delete>\n')
def endElement(self, name):
"""As per the SAX handler, this method is where any work is
done. You may want to override it, but probably not"""
# If there's no open output, we need to open it
if not self.out:
self._open()
if name == 'way':
self.nodes = tuple(self.nodes)
elif name == 'relation':
self.members = tuple(self.members)
if name == 'node' or name == 'way' or name == 'relation':
if self.selectElement():
self.transformElement()
self.emit()
self.object_counter = self.object_counter + 1
if self.object_counter > self.max_objects_per_file:
self._close()
self.clear()
road_types = {
'Aly': 'Alley',
'Ave': 'Avenue',
'Blvd': 'Boulevard',
'Br': 'Bridge',
'Brg': 'Bridge',
'Byp': 'Bypass',
'Cir': 'Circle',
'Cres': 'Crescent',
'Ct': 'Court',
'Ctr': 'Center',
'Cv': 'Cove',
'Dr': 'Drive',
'Expy': 'Expressway',
'Fwy': 'Freeway',
'Hwy': 'Highway',
'Ln': 'Lane',
'Mal': 'Mall',
'Pky': 'Parkway',
'Pl': 'Place',
'Plz': 'Plaza',
'Rd': 'Road',
'Rte': 'Route',
'Sq': 'Square',
'St': 'Street',
'Ter': 'Terrace',
'Thwy': 'Throughway',
'Trce': 'Terrace',
'Trl' : 'Trail',
'Wkwy': 'Walkway',
'Xing': 'Crossing'}
directions = {
'N': 'North',
'S': 'South',
'E': 'East',
'W': 'West',
'NE': 'Northeast',
'NW': 'Northwest',
'SE': 'Southeast',
'SW': 'Southwest'}
class TigerRoadExpansionHandler(OSMHandler):
def selectElement(self):
tags = self.tags
# Eliminate most objects straight away
if not (self.name == 'way' and tags.has_key('highway') and
tags.has_key('tiger:name_base')):
return
if not tags.has_key('name'):
return
name = tags['name']
self.roads += 1
self.namel = name.split()
# If we have a name_type that we haven't seen, store it.
# If the name is ambigious, store it.
road_type = tags.get('tiger:name_type')
if road_type:
if not road_type in road_types:
self.unrecognized_tags.add(road_type)
road_type = None
elif self.namel.count(road_type) > 1:
self.ambigious_expansions.append(name)
road_type_type = None
elif self.namel.count(road_type) < 1:
road_type = None
self.road_type = road_type
# Same with the direction tags prefix
dir_tag_prefix = tags.get('tiger:name_direction_prefix')
if dir_tag_prefix:
if not dir_tag_prefix in directions:
self.unrecognized_direction_tags.add(dir_tag_prefix)
dir_tag_prefix = None
else:
if self.namel.count(dir_tag_prefix) > 1:
self.ambigious_expansions.append(name)
dir_tag_prefix = None
elif self.namel.count(dir_tag_prefix) < 1:
dir_tag_prefix = None
self.dir_tag_prefix = dir_tag_prefix
dir_tag_suffix = tags.get('tiger:name_direction_suffix')
if dir_tag_suffix:
if not dir_tag_suffix in directions:
self.unrecognized_direction_tags.add(dir_tag_suffix)
dir_tag_suffix = None
else:
if self.namel.count(dir_tag_suffix) > 1:
self.ambigious_expansions.append(name)
dir_tag_suffix = None
elif self.namel.count(dir_tag_suffix) < 1:
dir_tag_suffix = None
self.dir_tag_suffix = dir_tag_suffix
if road_type or dir_tag_suffix or dir_tag_prefix:
self.fixed += 1
return True
def transformElement(self):
tags = self.tags
name = tags['name']
tags = self.tags
namel = self.namel
short_road_type = self.road_type
if short_road_type:
long_road_type = road_types[short_road_type]
indx = namel.index(short_road_type)
namel[indx] = long_road_type
dir_tag_prefix = self.dir_tag_prefix
if dir_tag_prefix:
long_direction = directions[dir_tag_prefix]
indx = namel.index(dir_tag_prefix)
namel[indx] = long_direction
dir_tag_suffix = self.dir_tag_suffix
if dir_tag_suffix:
long_direction = directions[dir_tag_suffix]
indx = namel.index(dir_tag_suffix)
namel[indx] = long_direction
self.tags['name'] = ' '.join(namel)
parser = make_parser()
handler = TigerRoadExpansionHandler("expansions")
parser.setContentHandler(handler)
fname = sys.argv[1]
out = sys.stdout
fd = open(fname, 'r')
parser.parse(fd)
print "%d total roads" % handler.roads
print "%d fixed roads" % handler.fixed
print "%d unrecognized tags" % len(handler.unrecognized_tags)
print "%d ambigious road names" % len(handler.ambigious_expansions)
## print "Ambigious Names"
## print "================"
## for n in handler.ambigious_expansions:
## print n
## print
## print "Unrecognized Tags"
## print "================="
## for n in handler.unrecognized_tags:
## print n
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment