Skip to content

Instantly share code, notes, and snippets.

@toshke
Created February 16, 2018 02:49
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save toshke/e96b454099e27600ee68f86e68c29b22 to your computer and use it in GitHub Desktop.
Save toshke/e96b454099e27600ee68f86e68c29b22 to your computer and use it in GitHub Desktop.
copy from s3 to s3
import boto3
import os
import zipfile
import glob
import logging
import shutil
logger = logging.getLogger()
logger.setLevel(logging.INFO)
class S3CopyLogic:
### src - dict with Bucket and Key elements
### destination - dict with Bucket and Key elements
###
def __init__(self, context, type, src, dst):
self.context = context
self.type = type
self.src = src
self.dst = dst
self.local_filename = None
self.local_download_path = f"/tmp/cache/{self.context.aws_request_id}"
self.local_prefix_unzip = f"/tmp/cache/{self.context.aws_request_id}/unpacked"
self.local_prefix = f"/tmp/cache/{self.context.aws_request_id}/upload"
def copy(self):
shutil.rmtree(self.local_download_path, ignore_errors=True)
if self.type == 'object-zip':
self.download_object_unpack_zip_upload()
elif self.type == 'object':
self.download_object_upload()
elif self.type == 'sync':
self.download_prefix_upload_prefix()
else:
raise f"{self.type} type not supported"
def download_object_unpack_zip_upload(self):
self.download_object()
self.unpack_zip()
self.upload(self.local_prefix_unzip)
def download_object_upload(self):
self.download_object()
self.upload(self.local_download_path)
def download_prefix_upload_prefix(self):
self.download_prefix()
self.upload(self.local_download_path)
# Download whole bucket prefix
def download_prefix(self):
client = boto3.client('s3')
bucket = boto3.resource('s3').Bucket(self.src['Bucket'])
objects = []
resp = client.list_objects_v2(Bucket=self.src['Bucket'], Prefix=self.src['Prefix'])
objects += map(lambda x: x['Key'], resp['Contents'])
while resp['IsTruncated']:
resp = client.list_objects_v2(Bucket=self.src['Bucket'],
Prefix=self.src['Prefix'],
ContinuationToken=resp['NextContinuationToken'])
objects += map(lambda x: x['Key'], resp['Contents'])
for object in objects:
local_path = self.local_download_path + "/"
local_path += object.replace(self.src['Prefix'],'')
logger.info(f"s3://{self.src['Bucket']}/{object} -> {local_path}")
os.makedirs(os.path.dirname(local_path), exist_ok=True)
bucket.download_file(object, local_path)
# Download S3 object to lambda /tmp under current request
def download_object(self):
local_filename = os.path.basename(self.src['Key'])
self.local_filename = f"{self.local_download_path}/{local_filename}"
os.makedirs(os.path.dirname(self.local_filename), exist_ok=True)
s3 = boto3.resource('s3')
logger.info(f"s3://{self.src['Bucket']}/{self.src['Key']} -> {self.local_filename}")
s3.Bucket(self.src['Bucket']).download_file(self.src['Key'], self.local_filename)
# Unpack downloaded zip archive
def unpack_zip(self):
os.makedirs(os.path.dirname(self.local_prefix_unzip), exist_ok=True)
logger.info(f"Unpack {self.local_filename} to {self.local_prefix_unzip}")
zip_ref = zipfile.ZipFile(self.local_filename, 'r')
zip_ref.extractall(self.local_prefix_unzip)
zip_ref.close()
# Upload files to destination
def upload(self, path):
bucket = boto3.resource('s3').Bucket(self.dst['Bucket'])
logger.info(f"Uploading from {path}")
for local_path in glob.glob(f"{path}/**/*", recursive=True):
if not os.path.isdir(local_path):
destination_key = self.dst['Prefix']
if not destination_key[-1] == '/':
destination_key += '/'
destination_key += local_path.replace(f"{path}/", '')
logger.info(f"{local_path} -> s3://{self.dst['Bucket']}/{destination_key}")
bucket.upload_file(local_path, destination_key)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment