Skip to content

Instantly share code, notes, and snippets.

@piraka9011
Last active August 18, 2022 15:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save piraka9011/a54c558016d40ac684a65d0e35e825f3 to your computer and use it in GitHub Desktop.
Save piraka9011/a54c558016d40ac684a65d0e35e825f3 to your computer and use it in GitHub Desktop.
Transfer from AWS to Wasabi S3 using boto3. Run `list_keys.py` first then `move_keys.py`
import boto3
from tqdm import tqdm
# Estimate from the S3 console
NUM_OBJECTS = 17292115
def list_objects_parallel(bucket, prefix):
objects = []
pbar = tqdm(total=NUM_OBJECTS)
s3_client = boto3.client("s3")
paginator = s3_client.get_paginator("list_objects_v2")
pagination_config = {
# Max API provides anyway
"PageSize": 1000,
}
# Loop based
for page in paginator.paginate(Bucket=bucket, Prefix=prefix, PaginationConfig=pagination_config):
results = [f"{key['Key']}\n" for key in page["Contents"]]
objects.extend(results)
pbar.update(len(results))
return objects
if __name__ == "__main__":
bucket_name = "my-bucket-name"
key_prefix = "key-prefix/"
output_filename = "all_keys.txt"
all_keys = list_objects_parallel(bucket_name, key_prefix)
with open(output_filename, "w") as fd:
fd.writelines(all_keys)
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import lru_cache, partial
from itertools import chain
import multiprocessing
import boto3
from botocore.errorfactory import ClientError
from tqdm import tqdm
@lru_cache()
def aws_s3_client():
# Update the keys accordingly
return boto3.client(
"s3",
region_name="us-west-2",
aws_access_key_id="***",
aws_secret_access_key="***",
)
@lru_cache()
def wasabi_s3_client():
# Update the keys accordingly
return boto3.client(
"s3",
endpoint_url="https://s3.us-west-1.wasabisys.com",
region_name="us-west-1",
aws_access_key_id="***",
aws_secret_access_key="***",
)
def chunks(l, n):
"""Yield n number of striped chunks from l."""
for i in range(0, n):
yield l[i::n]
def move_file(key, bucket, src_client, dest_client):
try:
# Check if key exists first.
# Can ignore this check by removing try/except block and using only the exception handling block below.
dest_client.head_object(Bucket=bucket, Key=key)
except ClientError as e:
original_object = src_client.get_object(
Bucket=bucket,
Key=key,
)
dest_client.put_object(
Bucket=bucket, Key=key, Body=original_object["Body"].read()
)
def process_chunk(chunk, bucket, max_workers=32):
aws_s3 = aws_s3_client()
wasabi_s3 = wasabi_s3_client()
_move_file = partial(move_file, bucket=bucket, src_client=aws_s3, dest_client=wasabi_s3)
failed_moves = []
keys = chunk[0]
position = chunk[1]
with tqdm(total=len(keys), position=position) as pbar:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(_move_file, key): key
for key in keys
}
for future in as_completed(futures):
if future.exception():
failed_key = futures[future]
failed_moves.append(failed_key)
pbar.set_postfix_str(f"Failed: {failed_key}")
pbar.update(1)
return failed_moves
if __name__ == "__main__":
bucket_name = "my-bucket-name"
# Adjust based on output from `nproc` and your CPU's threads per core (`lscpu`)
num_proc = 126
max_workers = 32
filename = "all_keys.txt"
with open(filename, "r") as fd:
lines = fd.readlines()
# Remove newline (\n) character, whitespaces
lines = [line.strip() for line in lines]
line_chunks = list(chunks(lines, num_proc))
# Add and index to help with positioning progress bars
idx = range(len(line_chunks))
iterables = list(zip(line_chunks, idx))
_process_chunk = partial(process_chunk, bucket=bucket_name, max_workers=max_workers)
with multiprocessing.Pool(num_proc) as p:
result = list(p.imap(_process_chunk, iterables))
result = list(chain(*result))
if len(result) > 0:
failed_moves = [f"{item}\n" for item in result]
with open("failed_keys.txt", "w") as fd:
fd.writelines(failed_moves)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment