Create a gist now

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Upload valhalla routing tiles to s3
#!/usr/bin/env python3
import os
from urllib.parse import urlparse
from multiprocessing.pool import ThreadPool
from functools import partial
import gzip
from boto.s3.connection import S3Connection
from boto.s3.key import Key
from boto.exception import S3ResponseError
import boto
import click
tile_count = 0
upload_count = 0
max_retries = 3
def upload_walk(bucket, key_prefix, headers, root_dir, walkee, progress=True):
root, dirs, files = walkee
file_dir = root.replace(root_dir, "")
for f in files:
base, ext = os.path.splitext(f)
if ext != ".gph":
continue
key_path = key_prefix + file_dir + "/" + f
upload_tile(bucket, headers, os.path.join(root, f), key_path, progress=progress)
def upload_tile(bucket, headers, file_path, key_path, progress=True, retries=0):
print("Uploading " + file_path + " to " + key_path)
try:
k = Key(bucket)
k.key = key_path
for key, value in headers.items():
k.set_metadata(key, value)
with open(file_path, "rb") as f:
k.set_contents_from_string(gzip.compress(f.read()))
global upload_count
upload_count += 1
if progress and upload_count % 10 == 0:
print("%i/%i" % (upload_count, tile_count))
except Exception as e:
print(e)
if retries < max_retries:
upload_tile(bucket, headers, file_path, key_path, progress=progress, retries=retries + 1)
else:
raise Exception("Too Many upload failures")
@click.command()
@click.argument('tile_dir', type=click.Path(exists=True), required=True)
@click.argument('s3_url', required=True)
@click.option('--threads', default=10,
help="Number of simultaneous uploads")
def upload(tile_dir, s3_url, threads):
base_url = urlparse(s3_url)
conn = S3Connection(calling_format=boto.s3.connection.OrdinaryCallingFormat())
bucket = conn.get_bucket(base_url.netloc)
key_prefix = base_url.path.lstrip("/")
headers = {
"Content-Encoding":"gzip",
"Content-Type": "application/octet-stream"
}
print("uploading tiles from %s to s3://%s/%s" % (tile_dir, bucket.name, key_prefix))
global tile_count
tile_count = sum([len([f for f in files if f.endswith("gph")]) for root, dirs, files in os.walk(tile_dir)])
root = os.path.abspath(tile_dir)
if not root.endswith("/"):
root = root + "/"
pool = ThreadPool(threads)
func = partial(upload_walk, bucket, key_prefix, headers, root)
pool.map(func, os.walk(tile_dir))
if __name__ == '__main__':
upload()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment