Skip to content

Instantly share code, notes, and snippets.

@darbio
Last active June 8, 2023 10:59
Show Gist options
  • Save darbio/8429cb82c00b7d4a2d9069a10f9f08bd to your computer and use it in GitHub Desktop.
Save darbio/8429cb82c00b7d4a2d9069a10f9f08bd to your computer and use it in GitHub Desktop.
S3 parallel put updated to use Python3 and logging changed to use print(), converted from https://github.com/mishudark/s3-parallel-put
#!/usr/bin/env python
# Parallel uploads to Amazon AWS S3
#
# The MIT License (MIT)
#
# Copyright (c) 2011-2014 Tom Payne
# Copyright (c) 2022-2023 James Darbyshire (updated to Python 3)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
try:
from io import StringIO
except ImportError:
from io import StringIO
from fnmatch import fnmatch
from gzip import GzipFile
from itertools import chain, islice
import logging
from multiprocessing import JoinableQueue, Process, current_process
from optparse import OptionGroup, OptionParser
import os.path
import re
from ssl import SSLError
import sys
import tarfile
import time
from datetime import datetime
import magic #python-magic
import mimetypes
import time
import boto
from boto.s3.connection import S3Connection
from boto.s3.acl import CannedACLStrings
from boto.utils import compute_md5
from boto.exception import BotoServerError
current_time = time.time()
DONE_RE = re.compile(r'\AINFO:s3-parallel-put\[putter-\d+\]:\S+\s+->\s+(\S+)\s*\Z')
# These content types are amenable to compression
# WISHLIST more types means more internets
GZIP_CONTENT_TYPES = (
'application/javascript',
'application/x-javascript',
'text/css',
'text/html',
'text/javascript',
)
GZIP_ALL = 'all'
def repeatedly(func, *args, **kwargs):
while True:
yield func(*args, **kwargs)
class FileObjectCache(object):
def __init__(self):
self.name = None
self.file_object = None
def open(self, name, *args):
if name != self.name:
self.name = name
self.file_object = open(self.name, *args)
return self
def __enter__(self):
return self.file_object
def __exit__(self, exc_type, exc_value, traceback):
pass
class Value(object):
def __init__(self, file_object_cache, content=None, filename=None, md5=None, offset=None, path=None, size=None, bucket_name=None):
self.file_object_cache = file_object_cache
self.content = content
self.filename = filename
self.md5 = md5
self.offset = offset
self.path = path
self.size = size
self.bucket_name = bucket_name
def get_content(self):
if self.content is None:
if self.filename:
with self.file_object_cache.open(self.filename) as file_object:
file_object.seek(self.offset)
self.content = file_object.read(self.size)
elif self.path:
with open(self.path, mode='rb') as file_object:
self.content = file_object.read()
else:
assert False
return self.content
def calculate_md5(self):
if self.md5 is None:
self.md5 = compute_md5(StringIO(self.get_content()))
return self.md5
def get_size(self):
if self.size is None:
if self.content:
self.size = len(self.content)
elif self.path:
self.size = os.stat(self.path).st_size
else:
assert False
return self.size
def should_copy_content(self):
return self.bucket_name is None
def excluded(pathname, options):
for glob in options.include:
if fnmatch(pathname, glob):
return False
for glob in options.exclude:
if fnmatch(pathname, glob):
return True
if options.ignore_files_older_than_days > 0:
if os.path.isdir(pathname):
creation_time = os.path.getctime(pathname)
if (current_time - creation_time) // (24 * 3600) >= options.ignore_files_older_than_days:
print('ignore file %s because crtime=%s older than %s days' % (pathname, datetime.fromtimestamp(creation_time).strftime("%d.%m.%Y, %H:%M:%S"), options.ignore_files_older_than_days))
return True
return False
def walk_filesystem(source, options):
if os.path.isdir(source):
for dirpath, dirnames, filenames in os.walk(source):
if excluded(dirpath, options):
continue
for filename in filenames:
abs_path = os.path.join(dirpath, filename)
if not os.path.isfile(abs_path):
continue
if excluded(filename, options):
continue
rel_path = os.path.relpath(abs_path, source)
key_name = '/'.join([options.prefix] + rel_path.split(os.sep))
yield (key_name, dict(path=abs_path))
elif os.path.isfile(source):
if excluded(source, options):
return
key_name = os.path.normpath(os.path.join(options.prefix, source))
yield (key_name, dict(path=source))
def walk_tar(source, options):
try:
tar_file = tarfile.open(source, 'r:')
for tarinfo in tar_file:
if tarinfo.isfile():
path = tarinfo.name
if excluded(path, options):
continue
key_name = os.path.normpath(os.path.join(options.prefix, path))
filename = source
offset = tarinfo.offset_data
size = tarinfo.size
yield (key_name, dict(filename=filename, offset=offset, path=path, size=size))
# http://blogs.oucs.ox.ac.uk/inapickle/2011/06/20/high-memory-usage-when-using-pythons-tarfile-module/
tar_file.members = []
except tarfile.ReadError:
tar_file = tarfile.open(source)
for tarinfo in tar_file:
if tarinfo.isfile():
path = tarinfo.name
if excluded(path, options):
continue
key_name = os.path.normpath(os.path.join(options.prefix, path))
content = tar_file.extractfile(tarinfo).read()
yield (key_name, dict(content=content, path=path))
def walk_s3(source, options):
connection = S3Connection(host=options.host, is_secure=options.secure)
for key in connection.get_bucket(source).list():
if excluded(key.name, options):
continue
yield (
key.name,
dict(
bucket_name=key.bucket.name,
md5=key.etag,
size=key.size,
path='%s/%s' % (source, key.name)))
def walker(walk, put_queue, sources, options):
pairs = chain(*map(lambda source: walk(source, options), sources))
if options.resume:
done = set()
for filename in options.resume:
with open(filename) as file_object:
for line in file_object:
match = DONE_RE.match(line)
if match:
done.add(match.group(1))
pairs = ((key_name, args) for key_name, args in pairs if key_name not in done)
if options.limit:
pairs = islice(pairs, options.limit)
for pair in pairs:
put_queue.put(pair)
def put_add(bucket, key_name, value):
key = bucket.get_key(key_name)
if key is None:
return bucket.new_key(key_name)
else:
return None
def put_stupid(bucket, key_name, value):
return bucket.new_key(key_name)
def put_update(bucket, key_name, value):
key = bucket.get_key(key_name)
if key is None:
return bucket.new_key(key_name)
else:
# Boto's md5 function actually returns 3-tuple: (hexdigest, base64, size)
value.calculate_md5()
if key.etag == '"%s"' % value.md5[0]:
return None
else:
return key
def put_copy(bucket, key_name, value):
return bucket.copy_key(key_name, value.bucket_name, key_name)
def putter(put, put_queue, stat_queue, options):
connection, bucket = None, None
file_object_cache = FileObjectCache()
# Figure out what content types we want to gzip
if not options.gzip_type: # default
gzip_content_types = GZIP_CONTENT_TYPES
elif 'all' in options.gzip_type:
gzip_content_types = GZIP_ALL
else:
gzip_content_types = options.gzip_type
if 'guess' in gzip_content_types:
# don't bother removing 'guess' from the list since nothing will match it
gzip_content_types.extend(GZIP_CONTENT_TYPES)
if options.gzip:
print('These content types will be gzipped: %s' % str(gzip_content_types))
while True:
args = put_queue.get()
if args is None:
put_queue.task_done()
break
key_name, value_kwargs = args
value = Value(file_object_cache, **value_kwargs)
should_gzip = False
try:
if connection is None:
connection = S3Connection(is_secure=options.secure, host=options.host)
if bucket is None:
bucket = connection.get_bucket(options.bucket)
key = put(bucket, key_name, value)
if key:
if value.should_copy_content():
if options.headers:
headers = dict(tuple(header.split(':', 1)) for header in options.headers)
else:
headers = {}
content_type = None
if options.content_type:
if options.content_type == 'guess':
content_type = mimetypes.guess_type(value.path)[0]
elif options.content_type == 'magic':
content_type = mimetypes.guess_type(value.path)[0]
if content_type is None:
content_type = magic.from_file(value.path, mime=True)
else:
content_type = options.content_type
headers['Content-Type'] = content_type
content = value.get_content()
md5 = value.md5
should_gzip = options.gzip and (
content_type and content_type in gzip_content_types or
gzip_content_types == GZIP_ALL)
if should_gzip:
headers['Content-Encoding'] = 'gzip'
string_io = StringIO()
gzip_file = GzipFile(compresslevel=9, fileobj=string_io, mode='w')
gzip_file.write(content)
gzip_file.close()
content = string_io.getvalue()
md5 = compute_md5(StringIO(content))
if not options.dry_run:
key.set_contents_from_string(content, headers, md5=md5, policy=options.grant, encrypt_key=options.encrypt_key)
print('%s %s> %s' % (
value.path, 'z' if should_gzip else '-', key.name))
stat_queue.put(dict(size=value.get_size()))
else:
print('skipping %s -> %s' % (value.path, key_name))
except SSLError as exc:
print('%s -> %s (%s)' % (value.path, key_name, exc))
put_queue.put(args)
connection, bucket = None, None
except IOError as exc:
print('%s -> %s (%s)' % (value.path, key_name, exc))
except BotoServerError as exc:
print('%s -> %s (%s)' % (value.path, key_name, exc))
put_queue.put(args)
put_queue.task_done()
def statter(stat_queue, start, options):
count, total_size = 0, 0
while True:
kwargs = stat_queue.get()
if kwargs is None:
stat_queue.task_done()
break
count += 1
total_size += kwargs.get('size', 0)
stat_queue.task_done()
duration = time.time() - start
print('put %d bytes in %d files in %.1f seconds (%d bytes/s, %.1f files/s)' % (total_size, count, duration, total_size / duration, count / duration))
def main(argv):
parser = OptionParser()
group = OptionGroup(parser, 'S3 options')
group.add_option('--bucket', metavar='BUCKET',
help='set bucket')
group.add_option('--bucket_region', default='us-east-1',
help='set bucket region if not in us-east-1 (default new bucket region)')
group.add_option('--host', default='s3.amazonaws.com',
help='set AWS host name')
group.add_option('--insecure', action='store_false', dest='secure',
help='use insecure connection')
group.add_option('--secure', action='store_true', default=True, dest='secure',
help='use secure connection')
parser.add_option_group(group)
group = OptionGroup(parser, 'Source options')
group.add_option('--walk', choices=('filesystem', 'tar', 's3'), default='filesystem', metavar='MODE',
help='set walk mode (filesystem or tar)')
group.add_option('--exclude', action='append', default=[], metavar='PATTERN',
help='exclude files matching PATTERN')
group.add_option('--include', action='append', default=[], metavar='PATTERN',
help='don\'t exclude files matching PATTERN')
group.add_option('--ignore-files-older-than-days', default=0, type=int,
help='ignore files older than x days')
parser.add_option_group(group)
group = OptionGroup(parser, 'Put options')
group.add_option('--content-type', default='guess', metavar='CONTENT-TYPE',
help='set content type, set to "guess" to guess based on file name '
'or "magic" to guess by filename and libmagic.')
group.add_option('--gzip', action='store_true',
help='gzip values and set content encoding')
group.add_option('--gzip-type', action='append', default=[],
help='if --gzip is set, sets what content-type to gzip, defaults '
'to a list of known text content types, "all" will gzip everything.'
' Specify multiple times for multiple content types. '
'[default: "guess"]')
group.add_option('--put', choices=('add', 'stupid', 'update', 'copy'), default='update', metavar='MODE',
help='set put mode (add, stupid, copy or update)')
group.add_option('--prefix', default='', metavar='PREFIX',
help='set key prefix')
group.add_option('--resume', action='append', default=[], metavar='FILENAME',
help='resume from log file')
group.add_option('--grant', metavar='GRANT', default=None, choices=CannedACLStrings,
help='A canned ACL policy to be applied to each file uploaded.\nChoices: %s' %
', '.join(CannedACLStrings))
group.add_option('--header', metavar='HEADER:VALUE', dest='headers', action='append',
help='extra headers to add to the file, can be specified multiple times')
group.add_option('--encrypt-key', action='store_true', default=False, dest='encrypt_key',
help='use server side encryption')
parser.add_option_group(group)
group = OptionGroup(parser, 'Logging options')
group.add_option('--log-filename', metavar='FILENAME',
help='set log filename')
group.add_option('--quiet', '-q', action='count', default=0,
help='less output')
group.add_option('--verbose', '-v', action='count', default=0,
help='more output')
parser.add_option_group(group)
group = OptionGroup(parser, 'Debug and performance tuning options')
group.add_option('--dry-run', action='store_true',
help='don\'t write to S3')
group.add_option('--limit', metavar='N', type=int,
help='set maximum number of keys to put')
group.add_option('--processes', default=8, metavar='PROCESSES', type=int,
help='set number of putter processes')
parser.add_option_group(group)
options, args = parser.parse_args(argv[1:])
logging.basicConfig(filename=options.log_filename, level=logging.INFO + 10 * (options.quiet - options.verbose))
if len(args) < 1:
print('missing source operand')
return 1
if not options.bucket:
print('missing bucket')
return 1
if not options.bucket_region:
options.bucket_region = 'us-east-1'
if 'AWS_SECURITY_TOKEN' in os.environ:
connection = boto.s3.connect_to_region(options.bucket_region,
security_token=os.environ.get('AWS_SECURITY_TOKEN'),
is_secure=True,
host=options.host,
calling_format = boto.s3.connection.OrdinaryCallingFormat(),
)
else:
connection = boto.s3.connect_to_region(options.bucket_region,
aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY'),
is_secure=True,
host=options.host,
calling_format = boto.s3.connection.OrdinaryCallingFormat(),
)
import ssl
if hasattr(ssl, '_create_unverified_context'):
ssl._create_default_https_context = ssl._create_unverified_context
bucket = connection.get_bucket(options.bucket)
del bucket
del connection
start = time.time()
put_queue = JoinableQueue(1024 * options.processes)
stat_queue = JoinableQueue()
walk = {'filesystem': walk_filesystem, 'tar': walk_tar, 's3': walk_s3}[options.walk]
walker_process = Process(target=walker, args=(walk, put_queue, args, options))
walker_process.start()
put = {'add': put_add, 'stupid': put_stupid, 'update': put_update, 'copy': put_copy}[options.put]
putter_processes = list(islice(repeatedly(Process, target=putter, args=(put, put_queue, stat_queue, options)), options.processes))
for putter_process in putter_processes:
putter_process.start()
statter_process = Process(target=statter, args=(stat_queue, start, options))
statter_process.start()
walker_process.join()
for putter_process in putter_processes:
put_queue.put(None)
put_queue.close()
for putter_process in putter_processes:
putter_process.join()
stat_queue.put(None)
stat_queue.close()
statter_process.join()
put_queue.join_thread()
stat_queue.join_thread()
if __name__ == '__main__':
sys.exit(main(sys.argv))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment