Skip to content

Instantly share code, notes, and snippets.

@jcary741
Created August 16, 2022 17:55
Show Gist options
  • Save jcary741/9a478419a9baa827b6956c061417eda8 to your computer and use it in GitHub Desktop.
Save jcary741/9a478419a9baa827b6956c061417eda8 to your computer and use it in GitHub Desktop.
Python upload files to S3 in parallel using concurrent.futures and boto3
"""
An example of how to use boto3 and a concurrent.futures process pool to upload files in parallel.
Bonus: when uploading files ending in .gz, the Content-Encoding metadata will be set automatically.
"""
import os
from concurrent.futures import ProcessPoolExecutor, as_completed
import boto3
def concurrent_s3_upload(files, bucket, prefix, metadata, remove_gz_extension=True):
"""
Upload a list of files to s3 concurrently
:param files: list of files to upload
:param bucket: s3 bucket to upload to
:param prefix: s3 prefix to upload to
:param metadata: s3 metadata to apply to each file
:param remove_gz_extension: Whether to remove the .gz extension from the filename and apply ContentEncoding=gzip
"""
prefix = prefix.strip('/')
batch_size = 1000 # to reduce memory usage, futures are submitted in batches
offset = 0
completed = 0
file_futures = dict()
failed = []
# spawn slightly more workers than available cpus to keep the bottleneck at IO
with ProcessPoolExecutor(max_workers=int(os.cpu_count() * 1.5)) as executor:
while offset < len(files):
# submit futures for the next batch of files
futures = []
for file in files[offset:offset + batch_size]:
future = executor.submit(_concurrent_s3_upload, file, bucket, prefix, metadata, remove_gz_extension)
futures.append(future)
# keep track of the futures, so we can retry them if they fail
file_futures[future] = file
for future in as_completed(futures):
try:
future.result()
completed += 1
except Exception:
failed.append(file_futures[future])
print(f'{completed}/{len(files)} uploaded. {len(failed)} failed, pending retry', end='\r')
offset += batch_size
# retry failed uploads
if not len(failed):
print(f'{completed}/{len(files)} uploaded. 0 failed, no retry')
else:
print(f'retrying {len(failed)} failed uploads')
futures = []
for file in failed:
future = executor.submit(_concurrent_s3_upload, file, bucket, prefix, metadata, remove_gz_extension)
futures.append(future)
failed = []
for future in as_completed(futures):
try:
future.result()
completed += 1
# keep track of the futures, so we can list them if they fail
file_futures[future] = file
except Exception as e:
failed.append(file_futures[future])
print(f'{completed}/{len(files)} uploaded, {len(failed)} failed', end='\r')
if len(failed) > 0:
print(f'{len(failed)} failed uploads:')
for file in failed:
print(file)
s3 = None
def _concurrent_s3_upload(file, bucket, prefix, metadata=None, remove_gz_extension=True):
global s3
extra_args = {}
if metadata is not None:
extra_args['Metadata'] = metadata
s3_path = f'{prefix}/{file}'
# check if the file has the gz extension, if so remove it
if remove_gz_extension and file.endswith('.gz'):
s3_path = f'{prefix}/{file[:-3]}'
# See also: s3transfer/manager.py:159
extra_args['ContentEncoding'] = 'gzip'
if s3_path.endswith('.json'):
extra_args['ContentType'] = 'application/json'
if s3 is None:
s3 = boto3.resource('s3')
s3.Bucket(bucket).upload_file(file, s3_path, ExtraArgs=extra_args)
return f's3://{bucket}/{s3_path}'
os.chdir(os.path.dirname(os.path.realpath(__file__)))
files = glob.glob('path/to/files/*.json.gz')
concurrent_s3_upload(files, 'mybucket', f'/custom/prefix/', {})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment