Skip to content

Instantly share code, notes, and snippets.

@farrajota
Forked from PGryllos/s3_util.py
Created January 16, 2020 09:40
Show Gist options
  • Save farrajota/3d4c17cc816cf2bbd794ddd15fafdd46 to your computer and use it in GitHub Desktop.
Save farrajota/3d4c17cc816cf2bbd794ddd15fafdd46 to your computer and use it in GitHub Desktop.
Using dask's multithreaded scheduler to speedup download of multiple files from s3
"""
Using dask's multithreaded scheduler to speedup download of multiple files from
an s3 bucket
"""
import os
from functools import partial
import botocore
import boto3
import dask
from dask.diagnostics import ProgressBar
# playing with the number of threads can increase / decrease the throughput
dask.config.set(scheduler='threads', num_workers=20)
def _s3_download(s3_client, path, bucket, key):
"""wrapper to avoid crushing on not found objects
s3_client: s3 resource service client
path: path to store the downloaded file
bucket: bucket in which to find the file
key: key of the file
"""
try:
s3_client.Bucket(bucket).download_file(
key, os.path.join(path, key)
)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == '404':
print('The object does not exist')
else:
raise
def fetch_multiple(aws_access_key_id, aws_secret_access_key, bucket, keys,
path):
"""Initialise an s3 client Session and download a list of files
aws_access_key_id: access key
aws_secret_access_key: secret key
bucket: s3 bucket where the files are stored
keys: list of keys to download
"""
session = boto3.Session(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
s3 = session.resource('s3')
_download = partial(_s3_download, s3, path, bucket)
delayed_futures = []
for k in keys:
delayed_futures.append(dask.delayed(_download)(k))
with ProgressBar():
dask.compute(*delayed_futures)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment