Skip to content

Instantly share code, notes, and snippets.

@mattwang44
Last active April 2, 2024 07:56
Show Gist options
  • Star 20 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save mattwang44/0c2e0e244b9e5f901f3881d5f1e85d3a to your computer and use it in GitHub Desktop.
Save mattwang44/0c2e0e244b9e5f901f3881d5f1e85d3a to your computer and use it in GitHub Desktop.
async download from AWS S3 using aiobotocore
import os
import asyncio
import aiobotocore
import io
from PIL import Image
AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID']
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY']
async def getS3Obj(client, s3_bucket, s3_key):
"""send request and retrieve the obj from S3"""
resp = await client.get_object(
Bucket=s3_bucket,
Key=s3_key
)
obj = await resp['Body'].read()
return obj
async def downloadImg(client, s3_bucket, s3_key, save_path):
"""convert the obj to image and save"""
obj = await getS3Obj(client, s3_bucket, s3_key)
img = Image.open(io.BytesIO(obj))
img.save(save_path)
async def go(loop, download_list):
"""launch the process to download multiple files"""
session = aiobotocore.get_session(loop=loop)
async with session.create_client(
's3',
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID) as client:
tasks = [downloadImg(client, *file_) for file_ in download_list]
await asyncio.gather(*tasks)
if __name__ == "__main__":
download_list = [
("s3_bucket1", "s3_key1", "save_path1"),
("s3_bucket2", "s3_key2", "save_path2"),
("s3_bucket3", "s3_key3", "save_path3")
]
loop = asyncio.new_event_loop()
loop.run_until_complete(go(loop, download_list))
import os
import asyncio
import aiobotocore
import multiprocessing as mp
import io
from PIL import Image
AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID']
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY']
async def getS3Obj(client, s3_bucket, s3_key):
"""send request and retrieve the obj from S3"""
resp = await client.get_object(
Bucket=s3_bucket,
Key=s3_key
)
obj = await resp['Body'].read()
return obj
async def downloadImg(client, s3_bucket, s3_key, save_path):
"""convert the obj to image and save"""
obj = await getS3Obj(client, s3_bucket, s3_key)
img = Image.open(io.BytesIO(obj))
img.save(save_path)
async def go(loop, download_list):
"""launch the process to download multiple files"""
session = aiobotocore.get_session(loop=loop)
async with session.create_client(
's3',
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID) as client:
tasks = [downloadImg(client, *file_) for file_ in download_list]
await asyncio.gather(*tasks)
def singleProcess(download_list_):
"""mission for single process"""
loop = asyncio.new_event_loop()
loop.run_until_complete(go(loop, download_list_))
if __name__ == "__main__":
download_list = [
("s3_bucket1", "s3_key1", "save_path1"),
("s3_bucket2", "s3_key2", "save_path2"),
("s3_bucket3", "s3_key3", "save_path3")
]
# number of process
no_mp = 8 # or no_mp = mp.cpu_count()
# number of tasks
no_tasks = 16
download_list_chunk = [download_list[i::no_tasks] for i in range(no_tasks)]
with mp.Pool(no_mp) as p:
p.map(singleProcess, download_list_chunk)
import os
import botocore.session
import io
from PIL import Image
AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID']
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY']
def getS3Obj(client, s3_bucket, s3_key):
"""send request and retrieve the obj from S3"""
resp = client.get_object(
Bucket=s3_bucket,
Key=s3_key
)
obj = resp['Body'].read()
return obj
def downloadImg(client, s3_bucket, s3_key, save_path):
"""convert the obj to image and save"""
obj = getS3Obj(client, s3_bucket, s3_key)
img = Image.open(io.BytesIO(obj))
img.save(save_path)
def go(download_list):
"""launch the process to download multiple files"""
session = botocore.session.get_session()
with session.create_client(
's3',
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID) as client:
tasks = [downloadImg(client, *file_) for file_ in download_list]
if __name__ == "__main__":
download_list = [
("s3_bucket1", "s3_key1", "save_path1"),
("s3_bucket2", "s3_key2", "save_path2"),
("s3_bucket3", "s3_key3", "save_path3")
]
go(download_list)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment