Skip to content

Instantly share code, notes, and snippets.

@tkyaji
Last active July 8, 2024 09:55
Show Gist options
  • Save tkyaji/307b638975de8b565cb3e4e331fad398 to your computer and use it in GitHub Desktop.
Save tkyaji/307b638975de8b565cb3e4e331fad398 to your computer and use it in GitHub Desktop.
import sys
import time
import argparse
import boto3
import urllib.parse
def normalize_file_size(file_bytes):
if file_bytes / (1024 * 1024 * 1024) > 1.0:
gb = round(file_bytes / (1024 * 1024 * 1024), 2)
return gb, 'GB'
elif file_bytes / (1024 * 1024) > 1.0:
mb = round(file_bytes / (1024 * 1024), 2)
return mb, 'MB'
elif file_bytes / 1024 > 1.0:
kb = round(file_bytes / 1024, 2)
return kb, 'KB'
return file_bytes, 'B'
def get_file_size_str(file_bytes):
unit_bytes, unit = normalize_file_size(file_bytes)
return f'{unit_bytes} {unit}'
def get_file_size_per_unit(file_bytes, unit):
if unit == 'GB':
gb = round(file_bytes / (1024 * 1024 * 1024), 2)
return gb
elif unit == 'MB':
mb = round(file_bytes / (1024 * 1024), 2)
return mb
elif unit == 'KB':
kb = round(file_bytes / 1024, 2)
return kb
else:
return file_bytes
def download_progress_callback_main(key, downloaded, file_size, file_size_u, file_unit, download_start_tm):
downloaded_u = get_file_size_per_unit(downloaded, file_unit)
done = int((50 * downloaded) / file_size)
print((' {0} : [{1}>{2}] | {3}{4} / {5}{6} | {7}sec' + (' ' * 5)).format(
key,
'=' * done,
' ' * (50-done),
downloaded_u,
file_unit,
file_size_u,
file_unit,
int(time.time() - download_start_tm),
), end='\r')
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--url', required=True)
parser.add_argument('--access-key', required=True)
parser.add_argument('--secret-key', required=True)
parser.add_argument('-o', '--out-file')
args = parser.parse_args()
url_parts = urllib.parse.urlsplit(args.url)
endpoint_url = urllib.parse.urlunparse((url_parts.scheme, url_parts.netloc, '', '', '', ''))
path_parts = url_parts.path.lstrip('/').split('/')
bucket_name = path_parts[0]
key = '/'.join(path_parts[1:])
out_file = args.out_file
if out_file == None:
out_file = key.split('/')[-1]
try:
s3_client = boto3.client('s3',
endpoint_url=endpoint_url,
aws_access_key_id=args.access_key,
aws_secret_access_key=args.secret_key,
)
head = s3_client.head_object(Bucket=bucket_name, Key=key)
file_size = head.get('ContentLength')
file_size_u, file_unit = normalize_file_size(file_size)
download_start_tm = time.time()
downloaded = 0
def download_progress_callback(chunk):
nonlocal downloaded
downloaded += chunk
download_progress_callback_main(key, downloaded, file_size, file_size_u, file_unit, download_start_tm)
s3_client.download_file(Bucket=bucket_name, Key=key, Filename=out_file, Callback=download_progress_callback)
print('\nSuccessfully downloaded.', f'=> {out_file}')
except Exception as ex:
print(ex, file=sys.stderr)
print('\nFailed to download.', args.url)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment