Skip to content

Instantly share code, notes, and snippets.

@marazmiki
Forked from skonik/upload_to_s3.py
Created July 23, 2022 05:49
Show Gist options
  • Save marazmiki/d756741578da8820821c3c3b9fe177aa to your computer and use it in GitHub Desktop.
Save marazmiki/d756741578da8820821c3c3b9fe177aa to your computer and use it in GitHub Desktop.
Asyncio S3 Multipart Upload
# Details https://skonik.me/uploading-large-file-to-s3-using-aiobotocore/
import asyncio
import math
import os
import aiobotocore
import aiofiles
AWS_S3_HOST = 'http://localhost:9000'
AWS_SECRET_ACCESS_KEY = 'SECRET_KEY'
AWS_ACCESS_KEY_ID = 'ACCESS_KEY'
AWS_MULTIPART_BYTES_PER_CHUNK = 10000000 # ~ 6mb
AWS_S3_BUCKET_NAME = 'test'
# We have to keep info about uploaded parts.
# https://github.com/boto/boto3/issues/50#issuecomment-72079954
part_info = {
'Parts': []
}
# File object is distributed across coroutines
# and the async file library is using threads under the hood.
# This might create data races with unpredictable issues
file_shared_lock = asyncio.Lock()
async def upload_chunk(client, file,
upload_id, chunk_number,
bytes_per_chunk, source_size, key):
offset = chunk_number * bytes_per_chunk
remaining_bytes = source_size - offset
bytes_to_read = min([bytes_per_chunk, remaining_bytes])
part_number = chunk_number + 1
async with file_shared_lock:
await file.seek(offset)
chunk = await file.read(bytes_to_read)
resp = await client.upload_part(
Bucket=AWS_S3_BUCKET_NAME,
Body=chunk,
UploadId=upload_id,
PartNumber=part_number,
Key=key
)
global part_info
part_info['Parts'].append(
{
'PartNumber': part_number,
'ETag': resp['ETag']
}
)
async def begin_multipart_upload(from_local_path, to_s3_folder_path,
host=AWS_S3_HOST,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID,
bytes_per_chunk=AWS_MULTIPART_BYTES_PER_CHUNK):
filename = os.path.basename(from_local_path)
key = '{}/{}'.format(to_s3_folder_path, filename)
session = aiobotocore.get_session()
async with session.create_client(
's3', endpoint_url=host,
aws_secret_access_key=aws_secret_access_key,
aws_access_key_id=aws_access_key_id
) as client:
source_size = os.stat(from_local_path).st_size
chunks_count = int(math.ceil(source_size / float(bytes_per_chunk)))
print('chunks_count: ', chunks_count)
create_multipart_upload_resp = await client.create_multipart_upload(
ACL='bucket-owner-full-control',
Bucket=AWS_S3_BUCKET_NAME,
Key=key,
)
upload_id = create_multipart_upload_resp['UploadId']
tasks = []
async with aiofiles.open(from_local_path, mode='rb') as file:
for chunk_number in range(chunks_count):
tasks.append(
upload_chunk(
client=client,
file=file,
chunk_number=chunk_number,
bytes_per_chunk=bytes_per_chunk,
key=key, upload_id=upload_id,
source_size=source_size
)
)
await asyncio.gather(*tasks)
list_parts_resp = await client.list_parts(
Bucket=AWS_S3_BUCKET_NAME,
Key=key,
UploadId=upload_id
)
# You have to sort parts in ascending order. Otherwise api will reject request
part_list = sorted(part_info['Parts'], key=lambda k: k['PartNumber'])
part_info['Parts'] = part_list
print(part_info['Parts'][0])
print('COMPLETED ', len(part_info['Parts']))
if len(list_parts_resp['Parts']) == chunks_count:
print('Done uploading file')
await client.complete_multipart_upload(
Bucket=AWS_S3_BUCKET_NAME,
Key=key,
UploadId=upload_id,
MultipartUpload=part_info
)
return True
else:
print('Aborted uploading file.')
await client.abort_multipart_upload(
Bucket=AWS_S3_BUCKET_NAME,
Key=key,
UploadId=upload_id
)
return False
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(begin_multipart_upload('./large.txt', to_s3_folder_path='large'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment