Skip to content

Instantly share code, notes, and snippets.

@sjakthol
Created January 11, 2019 20:20
Show Gist options
  • Save sjakthol/19367500519a8828ec77ef5d34b1b0b9 to your computer and use it in GitHub Desktop.
Save sjakthol/19367500519a8828ec77ef5d34b1b0b9 to your computer and use it in GitHub Desktop.
Python code snippet for listing bucket objects in parallel.
#!/usr/bin/env python3
"""List S3 bucket objects in parallel.
This module contains a parallel implementation of S3 list_objects_v2()
API call. The implementation lists objects under every distinct prefix
in parallel. Hence, a speedup is achieved if objects are spread under
multiple distinct prefixes.
"""
import argparse
import concurrent.futures
import functools
import logging
import multiprocessing
import threading
import botocore
import botocore.session
LOGGER = logging.getLogger('list_objects')
@functools.lru_cache()
def _get_s3_client(_):
"""Get thread-specific S3 client.
Args:
_ (int): thread identifier
Returns:
(S3.Client) botocore S3 client
"""
session = botocore.session.get_session()
return session.create_client('s3')
def _list_objects_v2(Bucket, Prefix='/', Delimiter='/'): #pylint: disable=invalid-name
"""List ALL objects of bucket in given prefix.
Args:
:Bucket (str): the name of the bucket to list
:Prefix (str, optional): a prefix of the bucket to list (Default: None)
:Delimiter (str, optional): delimeter used to separate directories in S3 (Default: /)
Returns:
obj: The list of objects and directories under the given Prefix::
{
'Contents': [{
'Key': 'prefix/file.json',
'LastModified': datetime.datetime(2018, 12, 13, 14, 15, 16, 000000),
'ETag': '"58bcd9641b1176ea012b6377eb5ce050"'
'Size': 262756,
'StorageClass': 'STANDARD'
}],
'CommonPrefixes': [{
'Prefix': 'prefix/another/
}]
}
"""
s3_client = _get_s3_client(threading.current_thread())
paginator = s3_client.get_paginator('list_objects_v2')
objects = []
prefixes = []
LOGGER.debug('Starting to list s3://%s/%s', Bucket, Prefix)
for resp in paginator.paginate(Bucket=Bucket, Prefix=Prefix, Delimiter=Delimiter):
objects.extend(resp.get('Contents', []))
prefixes.extend(resp.get('CommonPrefixes', []))
return {'Contents': objects, 'CommonPrefixes': prefixes}
def list_objects_parallel(Bucket, Prefix='/', Delimiter='/', Parallelism=None): #pylint: disable=invalid-name
"""List objects of a bucket in parallel.
The bucket must have a directory structure for speedups to be
realized (each common prefix is listed in parallel).
Args:
:Bucket (str): the name of the bucket to list
:Prefix (str): a prefix of the bucket to list (Default: None)
:Delimiter (str): delimeter used to separate directories in S3 (Default: /)
:Parallelism (int, optional): the number of threads to use (Default: 10xCPUs)
Returns:
obj: The list of objects under the given bucket / prefix::
{
'Contents': [{
'Key': 'prefix/file.json',
'LastModified': datetime.datetime(2018, 12, 13, 14, 15, 16, 000000),
'ETag': '"58bcd9641b1176ea012b6377eb5ce050"'
'Size': 262756,
'StorageClass': 'STANDARD'
}]
}
"""
objects = []
tasks = set()
if not Parallelism:
# Heavily oversubscribe the CPU as these operations are mostly bound to
# network
Parallelism = multiprocessing.cpu_count() * 10
with concurrent.futures.ThreadPoolExecutor(max_workers=Parallelism) as tpe:
tasks.add(tpe.submit(_list_objects_v2, Bucket=Bucket, Prefix=Prefix, Delimiter=Delimiter))
while tasks:
done, _ = concurrent.futures.wait(tasks, return_when='FIRST_COMPLETED')
for future in done:
res = future.result()
objects.extend(res['Contents'])
for prefix in res['CommonPrefixes']:
tasks.add(
tpe.submit(_list_objects_v2, Bucket=Bucket, Prefix=prefix['Prefix'], Delimiter=Delimiter))
tasks = tasks - done
return {'Contents': objects}
def main():
"""Entrypoint"""
parser = argparse.ArgumentParser(description='''
Parallel S3 bucket listing utility.
''')
parser.add_argument('-b', '--bucket', type=str, required=True)
parser.add_argument('-p', '--prefix', type=str)
parser.add_argument('-l', '--log-level', type=str, default='INFO')
args = parser.parse_args()
logging.basicConfig(level=args.log_level.upper())
import json
import time
start = time.time()
res = list_objects_parallel(args.bucket, args.prefix)
logging.info('Finished in %s seconds', time.time() - start)
for obj in res.get('Contents', []):
print(json.dumps(obj, default=str))
if __name__ == '__main__':
main()
@nvinayvarma189
Copy link

I was exactly looking for something like this. This was helped me reduce the a retrieval task from 38 sec ( using bucket.obj.filter ) to 9 sec. Thanks you very much 🙏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment