Skip to content

Instantly share code, notes, and snippets.

@duggan
Last active August 29, 2015 14:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save duggan/a5fb47ae36bee229f413 to your computer and use it in GitHub Desktop.
Save duggan/a5fb47ae36bee229f413 to your computer and use it in GitHub Desktop.
Modify metadata headers for bucket objects parallellised on key prefixes. Based on Boto.
#!/usr/bin/env python
"""
Depends on boto (install with `pip install boto`).
usage: s3-metadata.py [-h] -a HEADER -b BUCKET [-p PREFIX] [-d]
Parallellized S3 metadata modifier.
optional arguments:
-h, --help show this help message and exit
-a HEADER, --header HEADER
Header to add.
-b BUCKET, --bucket BUCKET
Bucket to operate on.
-p PREFIX, --prefix PREFIX
Key prefix(es).
-d, --dryrun Reports what changes would occur, but does not perform
them.
Also, you'll need to set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY as
environment variables.
"""
import os
import time
import signal
import argparse
from multiprocessing import Event
from multiprocessing import Process
import boto
from boto.s3.connection import S3Connection
from boto.s3.connection import OrdinaryCallingFormat
# Event for signalling across processes.
shutdown = Event()
# Each process should output how much work its done
# every HEARTBEAT keys.
HEARTBEAT = 100
# Triggers the shutdown event on receipt of a signal.
def signal_handler(x,y):
shutdown.set()
# Exit with an error.
def error_out(message):
print(message)
exit(1)
# Register some signals with our shutdown handler.
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGQUIT, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# Create a command line parser for the variable options.
# We'll collect the credentials via environment variables.
parser = argparse.ArgumentParser(description='Parallellized S3 metadata modifier.',
epilog="Also, you'll need to set AWS_ACCESS_KEY_ID and "
"AWS_SECRET_ACCESS_KEY as environment variables.")
parser.add_argument('-a', '--header', required=True,
help='Header to add.')
parser.add_argument('-b', '--bucket', required=True,
help='Bucket to operate on.')
parser.add_argument('-p', '--prefix', default='', required=False,
help='Key prefix(es).')
parser.add_argument('-d', '--dryrun', action='store_true',
help='Reports what changes would occur, but does not perform them.')
opts = parser.parse_args()
# Some basic sanity checking on the header format, not much though!
if ':' not in opts.header:
error_exit("--header must be a semi-colon separated string, like Content-Type:Foo")
header = opts.header.split(':')
# S3 bucket, just the bucket name is fine
bucket = opts.bucket
# Split a comma separated list of prefixes into a list
prefixes = [p.strip() for p in opts.prefix.split(',')]
# Dry run can be useful for making sure the change you want to happen
# is what will happen. Suggest testing first though and verifying results.
dry_run = opts.dryrun
# Environment variables
access_key_id = os.getenv("AWS_ACCESS_KEY_ID")
if not access_key_id:
error_out("Missing AWS_ACCESS_KEY_ID environment variable.")
secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY")
if not secret_access_key:
error_out("Missing AWS_SECRET_ACCESS_KEY environment variable.")
if len(opts.prefix):
print("Processing with prefixes: %s" % ",".join(prefixes))
else:
print("WARNING: no prefixes were specified, so no parallellization.")
print("If you'd like to change that, hit Ctrl+C.")
timeout = 5
while not shutdown.is_set():
if timeout <= 0:
break
print(timeout)
time.sleep(1)
timeout -= 1
if shutdown.is_set():
print("Stopping.")
exit()
if dry_run:
print("Beginning a DRY RUN (no data will be modified)...")
def worker(prefix):
print("Starting worker for prefix '%s'" % prefix)
try:
c = S3Connection(access_key_id, secret_access_key, calling_format=OrdinaryCallingFormat())
b = c.get_bucket(bucket)
except Exception as e:
print("Error (worker '%s'): %s" % (prefix, e))
return
# Start a counter so we can offer feedback
count = 0
for k in b.list(prefix=prefix):
if shutdown.is_set():
# Interrupt on a shutdown signal
return
# `list()` keys do not contain metadata, so we retrieve it.
key = b.get_key(k.name)
# Grab a copy of the original metadata, otherwise a copy would
# truncate it.
metadata = key.metadata
# Adds/replaces header in metadata
metadata[header[0]] = header[1]
# If we don't grab the Content-Type specifically, S3 (or boto)
# unhelpfully resets Content-Type to the default octet-stream
metadata["Content-Type"] = key.content_type
if not dry_run:
key.copy(bucket, key, metadata=metadata, preserve_acl=True)
else:
print("DRYRUN: modifying metadata for %s/%s - %s" % (bucket, key.name, metadata))
count += 1
if count % HEARTBEAT == 0:
print("%d keys processed for %s..." % (count, prefix))
print("Finished processing for %s (%d keys)" % (prefix, count))
# Put all our jobs in a list so we can terminate them cleanly at the end of the run.
jobs = []
for prefix in prefixes:
p = Process(target=worker, args=(prefix,))
jobs.append(p)
p.start()
# Intermittently check for what processes have finished executing.
while not shutdown.is_set():
alive = False
for job in jobs:
# Check with a one second timeout.
job.join(1)
if job.is_alive():
alive = True
# If at least one process is alive, leep running.
if not alive:
shutdown.set()
print("Finished all jobs.")
@duggan
Copy link
Author

duggan commented Jul 22, 2015

My use case: setting the Cache-Control header on objects so that CloudFlare (or presumably any other caching proxy) will perform a lot fewer requests to the S3 backend, e.g.:

python s3-metadata.py --header="Cache-Control:max-age=31536000" \
--prefix="0,1,2,3,4,5,6,7,8,9,a,A,b,B,c,C,d,D,e,E,f,F,g,G,h,H,i,I,j,J,k,K,l,L,m,M,n,N,o,O,p,P,q,Q,r,R,s,S,t,T,u,U,v,V,w,W,x,X,y,Y,z,Z" \
--bucket=my-bucket

The difference it has made so far (where the orange diverges from the blue towards the end):

screen shot 2015-07-22 at 23 53 09

The earlier stuff in the graph is s3cmd limping its way through the same process, failing with random errors and having to be restarted many times, never making it past the first few ten thousand objects or so.

The more evenly distributed your keys, the better.

A version of this that automatically performs the parallellisation wouldn't be a huge stretch, but if you can find a good prefix distribution out of band it's probably a lot faster. Running something like this on the directory will give you a distribution based on the first character of each file:

find . -type f -printf "%f\n" | cut -c1 | sort | uniq | xargs | sed 's/ /,/g'

Over a large enough number of files this will be pretty much identical to my example above, so you may want to go a little deeper than one character.

Another nice-to-have would be storing the progress and outputting a recovery artifact, such that, in the event of an error, you could specify --recovery and have the processes resume from that point.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment