Created
February 16, 2021 08:15
-
-
Save mckabi/6ac5ba62dad0923bde5b3d40ff8bf722 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import contextlib | |
import gzip | |
import sys | |
from itertools import zip_longest | |
from urllib.parse import urlparse | |
import boto3 | |
from botocore.exceptions import NoCredentialsError | |
STDOUT = '-' | |
DEFAULT_LIMIT = 2000 | |
DEFAULT_FILENAME = STDOUT | |
s3 = boto3.resource('s3') | |
buckets = None | |
def put_error(message, file=sys.stderr): | |
print(message, file=file) | |
def print_progress(iteration, total, prefix='', suffix='', decimals=1, length=None): | |
""" | |
https://stackoverflow.com/a/34325723 | |
Call in a loop to create terminal progress bar | |
@params: | |
iteration - current iteration (int) | |
total - total iterations (int) | |
prefix - prefix string (str, '') | |
suffix - suffix string (str, '') | |
decimals - positive number of decimals in percent complete (int, 1) | |
length - character length of bar (int, 80) | |
""" | |
percent = ('{0:.' + str(decimals) + 'f}').format(100 * (iteration / float(total))) | |
if length is None: | |
length = 80 - len(percent) - 3 | |
filled_length = int(length * iteration // total) | |
bar = '#' * filled_length + '-' * (length - filled_length) | |
print(f'\r{prefix}{bar} {percent}% {suffix}', end='\r') | |
if iteration == total: | |
print() | |
@contextlib.contextmanager | |
def output(filename): | |
fd = open(filename, 'ab') if filename and filename != STDOUT else sys.stdout.buffer | |
try: | |
yield fd | |
finally: | |
if fd is not sys.stdout: | |
fd.close() | |
def grouper(iterable, n, fillvalue=None): | |
"""Collect data into fixed-length chunks or blocks""" | |
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx" | |
args = [iter(iterable)] * n | |
return zip_longest(*args, fillvalue=fillvalue) | |
def get_bucket(name): | |
global buckets | |
bucket = s3.Bucket(name) | |
if buckets is None: | |
try: | |
buckets = list(s3.buckets.all()) | |
except NoCredentialsError: | |
raise | |
if bucket in buckets: | |
return bucket | |
raise ValueError(f'Not found bucket {name!r}') | |
class S3URL: | |
def __init__(self, url): | |
if not url.startswith('s3://') and '//' not in url: | |
url = 's3://' + url | |
self._parsed = urlparse(url, allow_fragments=False) | |
if not all(['s3' == self.get('scheme'), self.bucket, self.key]): | |
raise ValueError(f'Invalid S3URL: {url!r}') | |
def __repr__(self): | |
return f'<{type(self).__qualname__} {self.url!r}>' | |
def get(self, name): | |
return getattr(self._parsed, name, None) | |
@property | |
def bucket(self): | |
return self._parsed.netloc | |
@property | |
def key(self): | |
return self._parsed.path.lstrip('/') | |
@property | |
def url(self): | |
return self._parsed.geturl() | |
def get_object(self): | |
return s3.Object(self.bucket, self.key) | |
if __name__ == '__main__': | |
import argparse | |
import os | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
'-v', '--verbose', action='store_true', | |
help='increase output verbosity') | |
parser.add_argument( | |
'-o', '--output', default=DEFAULT_FILENAME, | |
help=f'output filename (default {DEFAULT_FILENAME} as stdout)') | |
parser.add_argument( | |
'--limit', type=int, default=DEFAULT_LIMIT, | |
help=f'limit the count of file (default {DEFAULT_LIMIT})') | |
parser.add_argument( | |
'-d', '--delete', action='store_true', | |
help='delete files after combine') | |
parser.add_argument( | |
'-c', '--check', action='store_true', | |
help='only check files to be combine') | |
parser.add_argument('location', help='S3 URI or BUCKET/PREFIX') | |
args = parser.parse_args() | |
def put_verbose(*message, file=sys.stdout): | |
global args | |
if args.verbose: | |
print(*message, file=file) | |
verbose_file = sys.stdout | |
if args.output.startswith('s3://'): | |
orig_filename = S3URL(args.output) | |
filename = f'.tmp.{orig_filename.key.replace("/", "_")}' | |
elif not args.output or args.output == STDOUT: | |
verbose_file = sys.stderr | |
filename = orig_filename = STDOUT | |
else: | |
filename = orig_filename = args.output | |
if not args.location: | |
parser.print_usage() | |
sys.exit(2) | |
location = S3URL(args.location) | |
try: | |
bucket = get_bucket(location.bucket) | |
except (NoCredentialsError, ValueError) as error: | |
put_error(f'AWS: {error!s}') | |
sys.exit(2) | |
prefix = location.key | |
if not prefix: | |
put_error(f'Not found prefix {prefix!r} on s3://{bucket.name}') | |
sys.exit(2) | |
elif '/' not in prefix: | |
put_error(f'Too short prefix {prefix!r} on s3://{bucket.name}') | |
sys.exit(2) | |
put_verbose(bucket, prefix) | |
objects = sorted(bucket.objects.filter(Prefix=prefix), key=lambda o: o.key) | |
object_count = len(objects) | |
print(f'Found {object_count} files on s3://{bucket.name}/{prefix}*') | |
if object_count > args.limit: | |
confirm = input(f'There are more than {args.limit} files ({object_count}). continue? [Y/n]') | |
if confirm not in ['', 'y', 'Y']: | |
put_error('Break.') | |
sys.exit() | |
elif object_count < 1: | |
sys.exit() | |
if args.check: | |
sys.exit() | |
with output(filename) as fd: | |
print_progress(0, object_count) | |
for index, obj in enumerate(objects): | |
put_verbose(f's3://{bucket.name}/{obj.key}', file=verbose_file) | |
if obj.key.endswith('.gz'): | |
with gzip.GzipFile(fileobj=obj.Object().get()['Body']) as gzd: | |
fd.write(gzd.read()) | |
else: | |
bucket.download_fileobj(obj.key, fd) | |
fd.flush() | |
print_progress(index + 1, object_count) | |
if args.delete: | |
for objects_chunk in grouper(objects, 1000): | |
objects_to_delete = [{'Key': obj.key} for obj in objects_chunk if obj is not None] | |
s3.meta.client.delete_objects( | |
Bucket=bucket.name, | |
Delete={'Objects': objects_to_delete} | |
) | |
print(f'Delete {len(objects)} files on s3://{bucket.name}/{prefix}*') | |
if isinstance(orig_filename, S3URL): | |
obj = orig_filename.get_object() | |
obj.put(Body=open(filename, 'rb')) | |
os.remove(filename) | |
print(f'Write to {orig_filename.url}') | |
elif orig_filename != STDOUT: | |
print(f'Write to {orig_filename}') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment