Skip to content

Instantly share code, notes, and snippets.

@yaniv-aknin
Created April 23, 2012 17:31
Show Gist options
  • Save yaniv-aknin/2472511 to your computer and use it in GitHub Desktop.
Save yaniv-aknin/2472511 to your computer and use it in GitHub Desktop.
s3lint: a simple lint tool to update S3 content-type and cache-control headers
boto==2.3.0
gevent==0.13.7
greenlet==0.3.4
wsgiref==0.1.2
#!/usr/bin/env python
from __future__ import print_function
import logging
import sys
import argparse
from os import environ
import re
from collections import namedtuple
from mimetypes import guess_type
from gevent import monkey, queue, spawn
import boto
monkey.patch_all(thread=False)
logger = logging.getLogger('s3lint')
# see: http://docs.amazonwebservices.com/AmazonS3/latest/dev/BucketRestrictions.html
S3_URL_PATTERN=re.compile('^s3://(?P<bucket>[a-z][a-z0-9-]+[a-z](\.[a-z][a-z0-9-]+[a-z])*)/(?P<key_prefix>.*)$')
S3URL = namedtuple('S3URL', 'bucket, key_prefix')
options = None
def parse_s3_url(s3url):
match = S3_URL_PATTERN.match(s3url)
if not match:
raise ValueError('invalid s3 url: %r' % (s3url,))
return S3URL(match.groupdict()['bucket'], match.groupdict()['key_prefix'])
def parse_arguments(argv):
global options
parser = argparse.ArgumentParser()
parser.add_argument('-l', '--loglevel', choices=('debug', 'info', 'warning', 'error', 'critical'), default='debug')
parser.add_argument('-w', '--workers', default=8, type=int)
parser.add_argument('--cache-control')
parser.add_argument('--aws-id', default=environ.get('AWS_ACCESS_KEY_ID', None))
parser.add_argument('--aws-secret', default=environ.get('AWS_SECRET_ACCESS_KEY', None))
parser.add_argument('s3urls', nargs='+')
options = parser.parse_args(argv[1:])
if not all((options.aws_id, options.aws_secret)):
parser.error('AWS ID and secret missing from environment and command-line')
try:
options.s3urls = [parse_s3_url(url) for url in options.s3urls]
except ValueError, error:
parser.error(str(error))
return options
def setup_logging(level_name, *logger_names):
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT, None))
for logger_name in logger_names:
logger = logging.getLogger(logger_name)
logger.addHandler(handler)
logger.setLevel(getattr(logging, level_name.upper()))
def format_key(key):
return 's3://%s/%s' % (key.bucket.name, key.name)
def lint_key(key):
key = key.bucket.lookup(key.name)
if is_key_metadata_correct(key):
return
metadata = dict(key.metadata)
metadata['Content-Type'] = expected_content_type(key)
if options.cache_control:
metadata['Cache-Control'] = options.cache_control
key.copy(key.bucket, key.name, preserve_acl=True, metadata=metadata)
logger.info('fixed %s' % (format_key(key),))
def expected_content_type(key):
expected_type, expected_encoding = guess_type(key.name)
return expected_type or 'application/octet-stream'
def is_key_metadata_correct(key):
if key.content_type is None or expected_content_type(key) not in key.content_type:
logger.debug('%s mimetype %s is incorrect, expected %s', format_key(key), key.content_type,
expected_content_type(key))
return False
if options.cache_control is not None and key.cache_control != options.cache_control:
logger.debug('%s cache-control %r is incorrect, expected %r', format_key(key), key.cache_control,
options.cache_control)
return False
return True
def handle_job(jobs):
while True:
key = jobs.get()
logger.debug('examining key %s', format_key(key))
lint_key(key)
jobs.task_done()
def main():
setup_logging(options.loglevel, logger.name)
conn = boto.connect_s3(options.aws_id, options.aws_secret)
jobs = queue.JoinableQueue()
for worker in range(options.workers):
spawn(handle_job, jobs)
for s3url in options.s3urls:
bucket = conn.get_bucket(s3url.bucket)
for key in bucket.list(prefix=s3url.key_prefix):
if key.name.endswith('/'):
continue
jobs.put(key)
jobs.join()
if __name__ == '__main__':
parse_arguments(sys.argv)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment