Skip to content

Instantly share code, notes, and snippets.

@mckabi
Created February 16, 2021 08:15
Show Gist options
  • Save mckabi/6ac5ba62dad0923bde5b3d40ff8bf722 to your computer and use it in GitHub Desktop.
Save mckabi/6ac5ba62dad0923bde5b3d40ff8bf722 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import contextlib
import gzip
import sys
from itertools import zip_longest
from urllib.parse import urlparse
import boto3
from botocore.exceptions import NoCredentialsError
STDOUT = '-'
DEFAULT_LIMIT = 2000
DEFAULT_FILENAME = STDOUT
s3 = boto3.resource('s3')
buckets = None
def put_error(message, file=sys.stderr):
print(message, file=file)
def print_progress(iteration, total, prefix='', suffix='', decimals=1, length=None):
"""
https://stackoverflow.com/a/34325723
Call in a loop to create terminal progress bar
@params:
iteration - current iteration (int)
total - total iterations (int)
prefix - prefix string (str, '')
suffix - suffix string (str, '')
decimals - positive number of decimals in percent complete (int, 1)
length - character length of bar (int, 80)
"""
percent = ('{0:.' + str(decimals) + 'f}').format(100 * (iteration / float(total)))
if length is None:
length = 80 - len(percent) - 3
filled_length = int(length * iteration // total)
bar = '#' * filled_length + '-' * (length - filled_length)
print(f'\r{prefix}{bar} {percent}% {suffix}', end='\r')
if iteration == total:
print()
@contextlib.contextmanager
def output(filename):
fd = open(filename, 'ab') if filename and filename != STDOUT else sys.stdout.buffer
try:
yield fd
finally:
if fd is not sys.stdout:
fd.close()
def grouper(iterable, n, fillvalue=None):
"""Collect data into fixed-length chunks or blocks"""
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
def get_bucket(name):
global buckets
bucket = s3.Bucket(name)
if buckets is None:
try:
buckets = list(s3.buckets.all())
except NoCredentialsError:
raise
if bucket in buckets:
return bucket
raise ValueError(f'Not found bucket {name!r}')
class S3URL:
def __init__(self, url):
if not url.startswith('s3://') and '//' not in url:
url = 's3://' + url
self._parsed = urlparse(url, allow_fragments=False)
if not all(['s3' == self.get('scheme'), self.bucket, self.key]):
raise ValueError(f'Invalid S3URL: {url!r}')
def __repr__(self):
return f'<{type(self).__qualname__} {self.url!r}>'
def get(self, name):
return getattr(self._parsed, name, None)
@property
def bucket(self):
return self._parsed.netloc
@property
def key(self):
return self._parsed.path.lstrip('/')
@property
def url(self):
return self._parsed.geturl()
def get_object(self):
return s3.Object(self.bucket, self.key)
if __name__ == '__main__':
import argparse
import os
parser = argparse.ArgumentParser()
parser.add_argument(
'-v', '--verbose', action='store_true',
help='increase output verbosity')
parser.add_argument(
'-o', '--output', default=DEFAULT_FILENAME,
help=f'output filename (default {DEFAULT_FILENAME} as stdout)')
parser.add_argument(
'--limit', type=int, default=DEFAULT_LIMIT,
help=f'limit the count of file (default {DEFAULT_LIMIT})')
parser.add_argument(
'-d', '--delete', action='store_true',
help='delete files after combine')
parser.add_argument(
'-c', '--check', action='store_true',
help='only check files to be combine')
parser.add_argument('location', help='S3 URI or BUCKET/PREFIX')
args = parser.parse_args()
def put_verbose(*message, file=sys.stdout):
global args
if args.verbose:
print(*message, file=file)
verbose_file = sys.stdout
if args.output.startswith('s3://'):
orig_filename = S3URL(args.output)
filename = f'.tmp.{orig_filename.key.replace("/", "_")}'
elif not args.output or args.output == STDOUT:
verbose_file = sys.stderr
filename = orig_filename = STDOUT
else:
filename = orig_filename = args.output
if not args.location:
parser.print_usage()
sys.exit(2)
location = S3URL(args.location)
try:
bucket = get_bucket(location.bucket)
except (NoCredentialsError, ValueError) as error:
put_error(f'AWS: {error!s}')
sys.exit(2)
prefix = location.key
if not prefix:
put_error(f'Not found prefix {prefix!r} on s3://{bucket.name}')
sys.exit(2)
elif '/' not in prefix:
put_error(f'Too short prefix {prefix!r} on s3://{bucket.name}')
sys.exit(2)
put_verbose(bucket, prefix)
objects = sorted(bucket.objects.filter(Prefix=prefix), key=lambda o: o.key)
object_count = len(objects)
print(f'Found {object_count} files on s3://{bucket.name}/{prefix}*')
if object_count > args.limit:
confirm = input(f'There are more than {args.limit} files ({object_count}). continue? [Y/n]')
if confirm not in ['', 'y', 'Y']:
put_error('Break.')
sys.exit()
elif object_count < 1:
sys.exit()
if args.check:
sys.exit()
with output(filename) as fd:
print_progress(0, object_count)
for index, obj in enumerate(objects):
put_verbose(f's3://{bucket.name}/{obj.key}', file=verbose_file)
if obj.key.endswith('.gz'):
with gzip.GzipFile(fileobj=obj.Object().get()['Body']) as gzd:
fd.write(gzd.read())
else:
bucket.download_fileobj(obj.key, fd)
fd.flush()
print_progress(index + 1, object_count)
if args.delete:
for objects_chunk in grouper(objects, 1000):
objects_to_delete = [{'Key': obj.key} for obj in objects_chunk if obj is not None]
s3.meta.client.delete_objects(
Bucket=bucket.name,
Delete={'Objects': objects_to_delete}
)
print(f'Delete {len(objects)} files on s3://{bucket.name}/{prefix}*')
if isinstance(orig_filename, S3URL):
obj = orig_filename.get_object()
obj.put(Body=open(filename, 'rb'))
os.remove(filename)
print(f'Write to {orig_filename.url}')
elif orig_filename != STDOUT:
print(f'Write to {orig_filename}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment