Last active
August 29, 2015 14:25
-
-
Save duggan/a5fb47ae36bee229f413 to your computer and use it in GitHub Desktop.
Modify metadata headers for bucket objects parallellised on key prefixes. Based on Boto.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Depends on boto (install with `pip install boto`). | |
usage: s3-metadata.py [-h] -a HEADER -b BUCKET [-p PREFIX] [-d] | |
Parallellized S3 metadata modifier. | |
optional arguments: | |
-h, --help show this help message and exit | |
-a HEADER, --header HEADER | |
Header to add. | |
-b BUCKET, --bucket BUCKET | |
Bucket to operate on. | |
-p PREFIX, --prefix PREFIX | |
Key prefix(es). | |
-d, --dryrun Reports what changes would occur, but does not perform | |
them. | |
Also, you'll need to set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY as | |
environment variables. | |
""" | |
import os | |
import time | |
import signal | |
import argparse | |
from multiprocessing import Event | |
from multiprocessing import Process | |
import boto | |
from boto.s3.connection import S3Connection | |
from boto.s3.connection import OrdinaryCallingFormat | |
# Event for signalling across processes. | |
shutdown = Event() | |
# Each process should output how much work its done | |
# every HEARTBEAT keys. | |
HEARTBEAT = 100 | |
# Triggers the shutdown event on receipt of a signal. | |
def signal_handler(x,y): | |
shutdown.set() | |
# Exit with an error. | |
def error_out(message): | |
print(message) | |
exit(1) | |
# Register some signals with our shutdown handler. | |
signal.signal(signal.SIGTERM, signal_handler) | |
signal.signal(signal.SIGQUIT, signal_handler) | |
signal.signal(signal.SIGINT, signal_handler) | |
# Create a command line parser for the variable options. | |
# We'll collect the credentials via environment variables. | |
parser = argparse.ArgumentParser(description='Parallellized S3 metadata modifier.', | |
epilog="Also, you'll need to set AWS_ACCESS_KEY_ID and " | |
"AWS_SECRET_ACCESS_KEY as environment variables.") | |
parser.add_argument('-a', '--header', required=True, | |
help='Header to add.') | |
parser.add_argument('-b', '--bucket', required=True, | |
help='Bucket to operate on.') | |
parser.add_argument('-p', '--prefix', default='', required=False, | |
help='Key prefix(es).') | |
parser.add_argument('-d', '--dryrun', action='store_true', | |
help='Reports what changes would occur, but does not perform them.') | |
opts = parser.parse_args() | |
# Some basic sanity checking on the header format, not much though! | |
if ':' not in opts.header: | |
error_exit("--header must be a semi-colon separated string, like Content-Type:Foo") | |
header = opts.header.split(':') | |
# S3 bucket, just the bucket name is fine | |
bucket = opts.bucket | |
# Split a comma separated list of prefixes into a list | |
prefixes = [p.strip() for p in opts.prefix.split(',')] | |
# Dry run can be useful for making sure the change you want to happen | |
# is what will happen. Suggest testing first though and verifying results. | |
dry_run = opts.dryrun | |
# Environment variables | |
access_key_id = os.getenv("AWS_ACCESS_KEY_ID") | |
if not access_key_id: | |
error_out("Missing AWS_ACCESS_KEY_ID environment variable.") | |
secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY") | |
if not secret_access_key: | |
error_out("Missing AWS_SECRET_ACCESS_KEY environment variable.") | |
if len(opts.prefix): | |
print("Processing with prefixes: %s" % ",".join(prefixes)) | |
else: | |
print("WARNING: no prefixes were specified, so no parallellization.") | |
print("If you'd like to change that, hit Ctrl+C.") | |
timeout = 5 | |
while not shutdown.is_set(): | |
if timeout <= 0: | |
break | |
print(timeout) | |
time.sleep(1) | |
timeout -= 1 | |
if shutdown.is_set(): | |
print("Stopping.") | |
exit() | |
if dry_run: | |
print("Beginning a DRY RUN (no data will be modified)...") | |
def worker(prefix): | |
print("Starting worker for prefix '%s'" % prefix) | |
try: | |
c = S3Connection(access_key_id, secret_access_key, calling_format=OrdinaryCallingFormat()) | |
b = c.get_bucket(bucket) | |
except Exception as e: | |
print("Error (worker '%s'): %s" % (prefix, e)) | |
return | |
# Start a counter so we can offer feedback | |
count = 0 | |
for k in b.list(prefix=prefix): | |
if shutdown.is_set(): | |
# Interrupt on a shutdown signal | |
return | |
# `list()` keys do not contain metadata, so we retrieve it. | |
key = b.get_key(k.name) | |
# Grab a copy of the original metadata, otherwise a copy would | |
# truncate it. | |
metadata = key.metadata | |
# Adds/replaces header in metadata | |
metadata[header[0]] = header[1] | |
# If we don't grab the Content-Type specifically, S3 (or boto) | |
# unhelpfully resets Content-Type to the default octet-stream | |
metadata["Content-Type"] = key.content_type | |
if not dry_run: | |
key.copy(bucket, key, metadata=metadata, preserve_acl=True) | |
else: | |
print("DRYRUN: modifying metadata for %s/%s - %s" % (bucket, key.name, metadata)) | |
count += 1 | |
if count % HEARTBEAT == 0: | |
print("%d keys processed for %s..." % (count, prefix)) | |
print("Finished processing for %s (%d keys)" % (prefix, count)) | |
# Put all our jobs in a list so we can terminate them cleanly at the end of the run. | |
jobs = [] | |
for prefix in prefixes: | |
p = Process(target=worker, args=(prefix,)) | |
jobs.append(p) | |
p.start() | |
# Intermittently check for what processes have finished executing. | |
while not shutdown.is_set(): | |
alive = False | |
for job in jobs: | |
# Check with a one second timeout. | |
job.join(1) | |
if job.is_alive(): | |
alive = True | |
# If at least one process is alive, leep running. | |
if not alive: | |
shutdown.set() | |
print("Finished all jobs.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
My use case: setting the
Cache-Control
header on objects so that CloudFlare (or presumably any other caching proxy) will perform a lot fewer requests to the S3 backend, e.g.:The difference it has made so far (where the orange diverges from the blue towards the end):
The earlier stuff in the graph is
s3cmd
limping its way through the same process, failing with random errors and having to be restarted many times, never making it past the first few ten thousand objects or so.The more evenly distributed your keys, the better.
A version of this that automatically performs the parallellisation wouldn't be a huge stretch, but if you can find a good prefix distribution out of band it's probably a lot faster. Running something like this on the directory will give you a distribution based on the first character of each file:
Over a large enough number of files this will be pretty much identical to my example above, so you may want to go a little deeper than one character.
Another nice-to-have would be storing the progress and outputting a recovery artifact, such that, in the event of an error, you could specify
--recovery
and have the processes resume from that point.