Skip to content

Instantly share code, notes, and snippets.

@Fonsan
Created May 30, 2016 21:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Fonsan/4a1d5e7ec2ad57bfed0c72cf51ea93c7 to your computer and use it in GitHub Desktop.
Save Fonsan/4a1d5e7ec2ad57bfed0c72cf51ea93c7 to your computer and use it in GitHub Desktop.
import zlib
from imposm.parser.pbf.parser import PBFFile, PrimitiveBlockParser
import imposm.parser.pbf.parser
import imposm.parser.pbf
import StringIO
import cachetools
import bisect
import sys
import resource
import random
import gc
import multiprocessing
import functools
from collections import defaultdict
# def read_blob_data_no_open(filename, blob_pos, blob_size):
# if type(filename) is str:
# with open(filename, 'rb') as f:
# f.seek(blob_pos)
# blob_data = f.read(blob_size)
# else:
# f = filename
# f.seek(blob_pos)
# blob_data = f.read(blob_size)
# blob = OSMPBF.Blob()
# blob.ParseFromString(blob_data)
# raw_data = blob.raw
# if raw_data:
# return raw_data
# return zlib.decompress(blob.zlib_data)
# imposm.parser.pbf.parser.read_blob_data = read_blob_data_no_open
# class IOStr(str):
# @classmethod
# def from_file(cls, filename):
# io_str = IOStr(filename)
# with open(filename, 'rb') as read_file:
# io_str.io = StringIO.StringIO(read_file.read())
# return io_str
# io = None
# def seek(self, *args):
# return self.io.seek(*args)
# def read(self, *args):
# return self.io.read(*args)
class PrimitiveBlock(object):
def __init__(self, primitive_block):
self.primitivegroup = map(PrimitiveGroup, primitive_block.primitivegroup)
self.granularity = primitive_block.granularity
self.lat_offset = primitive_block.lat_offset
self.lon_offset = primitive_block.lon_offset
class Dense(object):
def __init__(self, dense):
self.id = dense.id
self.lat = dense.lat
self.lon = dense.lon
self.keys_vals = dense.keys_vals
class Way(object):
def __init__(self, way):
self.keys = way.keys
self.vals = way.vals
self.refs = way.refs
class Ways(object):
def __init__(self, ways):
self.ways = map(Way, ways)
class PrimitiveGroup(object):
dense = None
ways = None
def __init__(self, group):
if group.dense:
self.dense = Dense(group.dense)
if group.ways:
self.ways = Ways(group.ways)
class SuperPrimitiveBlockParser(PrimitiveBlockParser):
def __init__(self, data):
primitive_block = imposm.parser.pbf.OSMPBF.PrimitiveBlock()
primitive_block.ParseFromString(data)
self.stringtable = imposm.parser.pbf.parser.decoded_stringtable(primitive_block.stringtable.s)
self.primitive_block = primitive_block
self.primitivegroup = self.primitive_block.primitivegroup
class BlobParserProcess(multiprocessing.Process):
def __init__(self, queue, blob_queue, *args, **kw):
multiprocessing.Process.__init__(self)
self.daemon = True
self.queue = queue
self.blob_queue = blob_queue
def run(self):
while True:
pos = self.queue.get()
sys.stdout.flush()
if pos is None:
self.queue.task_done()
self.blob_queue.put(None)
break
data = imposm.parser.pbf.parser.read_blob_data(pos['filename'], pos['blob_pos'], pos['blob_size'])
blob = SuperPrimitiveBlockParser(data)
self.blob_queue.put(blob)
self.queue.task_done()
class CoordCollector(object):
verbose = False
chunk_size = 8000
coord_blobs = None
concurrency = 2
def __init__(self, filename, max_ways=50000, max_coord_blobs=1000):
self.max_ways = max_ways
self.max_coord_blobs = max_coord_blobs
self.filename = filename
def log(self, msg):
if self.verbose:
sys.stderr.write("{}\t{mem:.1f}MB\n".format(msg, mem=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1048576))
sys.stderr.flush()
def nodes_by_id_from_blob(self, blob):
start, end, data = blob
nodes_by_id = {}
for osm_id, tags, coord in SuperPrimitiveBlockParser(data).nodes():
nodes_by_id[osm_id] = coord
return nodes_by_id
def parse_file(self, coord_group_callback=lambda x: x, way_group_callback=lambda x: x):
pbf_file = PBFFile(self.filename)
for pos in pbf_file.blob_offsets():
data = imposm.parser.pbf.parser.read_blob_data(pos['filename'], pos['blob_pos'], pos['blob_size'])
parser = SuperPrimitiveBlockParser(data)
groups = parser.primitivegroup
first_dense_group = next((group for group in groups if group.dense), None)
if first_dense_group:
last_dense_group = next(group for group in reversed(groups) if group.dense)
coord_group_callback((first_dense_group.dense.id[0], sum(last_dense_group.dense.id), data))
else:
for group in groups:
if group.ways:
way_group_callback(data)
# for group in blob.primitivegroup:
# if group.dense:
# coord_group_callback((group.dense.id[0], sum(group.dense.id), group))
# elif group.ways:
# way_group_callback(group)
# pool = []
# queue = multiprocessing.JoinableQueue()
# blob_queue = multiprocessing.JoinableQueue()
# for _ in xrange(self.concurrency):
# proc = BlobParserProcess(queue, blob_queue)
# pool.append(proc)
# proc.start()
# for pos in pbf_file.blob_offsets():
# queue.put(pos)
# for proc in pool:
# queue.put(None)
# queue.join()
# running = self.concurrency
# while True:
# blob = blob_queue.get()
# if blob is None:
# blob_queue.task_done()
# running -= 1
# if running == 0:
# break
# else:
# continue
# for group in blob.primitivegroup:
# if group.dense:
# coord_group_callback((group.dense.id[0], sum(group.dense.id), group))
# elif group.ways:
# way_group_callback(group)
# blob_queue.task_done()
# for proc in pool:
# proc.join()
# blob_queue.join()
def ref_deps(self, coord_groups, ways):
ends = list(blob[1] for i, blob in enumerate(coord_groups))
deps = {key: [key] for key in range(len(coord_groups))}
appearances = defaultdict(int)
for osm_id, tags, refs in ways:
dep = set()
for j, ref in enumerate(refs):
index = bisect.bisect_left(ends, ref)
dep.add(index)
dep = list(dep)
all_deps = set()
for d in dep:
all_deps.update(deps[d])
all_deps = list(all_deps)
for d in dep:
deps[d] = all_deps
r = set(tuple(v) for v in deps.values())
self.log(r)
raise 'foo'
return deps
def drain_ways(self, ref_deps, blob_cache, ways):
for blob, nodes_by_id in blob_cache.iteritems():
if blob in ref_deps:
items = ref_deps[blob]
for refs, j in items:
refs[j] = nodes_by_id[refs[j]]
del ref_deps[blob]
for blob, items in ref_deps.iteritems():
self.log((blob[1]))
nodes_by_id = blob_cache[blob]
for refs, j in items:
refs[j] = nodes_by_id[refs[j]]
return ways
def blob_cache(self, coord_groups):
cache = cachetools.LRUCache(self.max_coord_blobs, missing=self.nodes_by_id_from_blob)
prewarm_blob_count = min(self.max_coord_blobs, len(coord_groups))
for blob in random.sample(coord_groups, prewarm_blob_count):
cache[blob]
self.log('prewarmed cache, {} of {} blobs in memory'.format(prewarm_blob_count, len(coord_groups)))
return cache
def get_ways_from_data(self, data):
return list(way for way in SuperPrimitiveBlockParser(data).ways())
def ways(self):
coord_groups = []
way_groups = []
self.log(len(list(PBFFile(self.filename).blob_offsets())))
self.parse_file(coord_group_callback=coord_groups.append, way_group_callback=way_groups.append)
self.log((len(way_groups), len(coord_groups)))
self.log('indexed coords')
blob_cache = self.blob_cache(coord_groups)
ways = []
for data in way_groups:
ways += self.get_ways_from_data(data)
if len(ways) + self.chunk_size > self.max_ways:
self.log('draining {} ways'.format(len(ways)))
for way in self.drain_ways(self.ref_deps(coord_groups, ways), blob_cache, ways):
yield way
ways = []
self.log('Final draining {} ways'.format(len(ways)))
for way in self.drain_ways(self.ref_deps(coord_groups, ways), blob_cache, ways):
yield way
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment