Skip to content

Instantly share code, notes, and snippets.

@alexklibisz
Created February 12, 2019 01:58
Show Gist options
  • Save alexklibisz/986527d9ca8707df69e396ee24893d09 to your computer and use it in GitHub Desktop.
Save alexklibisz/986527d9ca8707df69e396ee24893d09 to your computer and use it in GitHub Desktop.
Using a threadpool to speedup S3 downloads in Python
from concurrent.futures.thread import ThreadPoolExecutor
from pprint import pprint
from time import time
import boto3
import os
import botocore
import shell as shell
bucket = "aft-vbi-pds" # Source bucket.
prefix = "bin-images/000" # Source prefix.
out_dir = "/tmp/bin-images" # Directory to save downloaded files.
def s3obj_to_file(s3_pointer):
"""Function that takes an S3 pointer (i.e. the thing returned by calling `list_objects`), and saves it to a file."""
data = s3.get_object(Bucket=bucket, Key=s3_pointer["Key"]).get("Body").read()
fname = "%s/%s" % (out_dir, s3_pointer.get("Key").replace(prefix, ""))
with open(fname, "wb") as fp:
fp.write(data)
def cleanup():
os.system("rm -rf %s" % out_dir)
os.system("mkdir -p %s" % out_dir)
# Setup the s3 client. If you don't give it this type of config, it will print warnings at runtime.
# The config is just the number of parallel downloads that the s3 client can handle.
cfg = botocore.client.Config(max_pool_connections=os.cpu_count() * 5)
s3 = boto3.client("s3", config=cfg)
# Get the list of objects. Note that this does not actually download the data, just pointers to the objects.
objects = s3.list_objects(Bucket=bucket, Prefix=prefix).get("Contents")
print("Found %d objects" % (len(objects)))
# Make directory where they get saved.
cleanup()
# Download them serially (i.e. using a regular for-loop).
t0 = time()
for obj in objects:
s3obj_to_file(obj)
print("Elapsed time serial: %.3lf seconds" % (time() - t0))
print("Found %d objects in the output directory" % (len(os.listdir(out_dir))))
os.system("du -hs %s" % out_dir)
# Clean up again to make sure we're not cheating...
cleanup()
# Now do the same thing but using a threadpool.
t0 = time()
with ThreadPoolExecutor() as tpex:
tpex.map(s3obj_to_file, objects)
print("Elapsed time with threads: %.3lf seconds" % (time() - t0))
print("Found %d objects in the output directory" % (len(os.listdir(out_dir))))
os.system("du -hs %s" % out_dir)
# This is the output from running on my macbook pro with 8 cores (40 threads).
Found 99 objects
Elapsed time serial: 7.937 seconds
Found 99 objects in the output directory
5.5M /tmp/bin-images
Elapsed time with threads: 1.331 seconds
Found 99 objects in the output directory
5.5M /tmp/bin-images
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment