Skip to content

Instantly share code, notes, and snippets.

@dstandish
Last active April 14, 2018 21:56
Show Gist options
  • Save dstandish/1bcee760e393cec48352c3a076288fc0 to your computer and use it in GitHub Desktop.
Save dstandish/1bcee760e393cec48352c3a076288fc0 to your computer and use it in GitHub Desktop.
python s3 client wrapper to simplify list, delete, copy, and upload operations; example extending boto3
"""
This module provides a boto3 s3 client factory get_client(), which returns an s3 client that has been augmented by some
additional functionality defined in the ClientWrap class, also present in this module.
ClientWrap adds a few wrapper methods that simplify simple list / delete / copy operations by (1) handling paging and
batching and (2) dealing only with keys instead of more detailed object metadata.
get_client() also makes it easy to to specify a default bucket for the client, so that you don't need to specify the
bucket in each call.
To use, you just need to make sure that you've set up your credentials e.g. with environment variables or with
credentials file. See http://boto3.readthedocs.io/en/latest/guide/configuration.html for details.
Usage example:
# create a client
c = get_client(default_bucket='blahblahblahbucket')
# get a list of keys with supplied prefix
keys = list(c.list_objects_wrap(prefix='inbound/face'))
for key in keys:
print(key)
keys = ['inbound/facebooktest/facebook.order_api_beta_20161027T103418.tsv.gz']
c.copy_objects_wrap(keys, 'abc123/test', ('inbound/facebooktest', ''), dry_run=True)
>> Copying 1 files
>> {'CopySource': {'Bucket': None, 'Key': 'inbound/facebooktest/facebook.order_api_beta_20161027T103418.tsv.gz'},
'Bucket': None, 'Key': 'abc123/test/facebook.order_api_beta_20161027T103418.tsv.gz'}
# delete keys
c.delete_objects_wrap(keys=keys)
# can still use native boto3 client methods
c.list_objects_v2(MaxKeys=1)
Use at your own risk.
"""
import glob
import boto3
from os import path as p
from math import ceil
import re
def get_client(default_bucket=None, profile_name='default', **kwargs):
"""
Returns a boto3 s3 client object augmented with functionality defined in ClientWrap class.
:rtype: ClientWrap | pyboto3.s3
"""
def add_custom_class(base_classes, **kwargs):
base_classes.insert(0, ClientWrap)
def add_default_bucket(params, **kwargs):
if 'Bucket' not in params or params['Bucket'] is None:
params['Bucket'] = default_bucket
session = boto3.Session(profile_name=profile_name, **kwargs)
session.events.register('creating-client-class.s3', add_custom_class)
client = session.client('s3')
event_system = client.meta.events
event_system.register('provide-client-params.s3.*', add_default_bucket)
return client
class ClientWrap(object):
def __init__(self, *args, **kwargs):
super(ClientWrap, self).__init__(*args, **kwargs)
@staticmethod
def join_key(path, *paths):
val = '/'.join([path] + list(paths))
val = re.sub(r'/+', '/', val)
val = re.sub(r'^/', '', val)
return val
def list_objects_wrap(self, prefix=None, bucket=None):
"""
Returns generator of keys matching prefix on bucket.
:type bucket: unicode|str
:type self: pyboto3.s3
:rtype Generator[str]
"""
payload = {'Bucket': bucket}
if prefix is not None and prefix not in ('', '/'):
payload['Prefix'] = prefix
response = self.list_objects_v2(**payload)
if 'Contents' in response:
for key in map(lambda x: x['Key'], response['Contents']):
yield key
while 'NextContinuationToken' in response:
response = self.list_objects_v2(
ContinuationToken=response['NextContinuationToken'],
**payload
)
if 'Contents' in response:
for key in map(lambda x: x['Key'], response['Contents']):
yield key
def copy_objects_wrap(self, keys, new_prefix, replace=None, src_bucket=None, tgt_bucket=None, dry_run=False):
"""
TODO: You may need to switch this to batch if you try to move too many objects in one call
:param keys: list of keys to copy
:param new_prefix: prefix where objects will be moved to
:param replace: tuple ('search', 'replace') to toss out some portion of the old prefix.
:param src_bucket: bucket currently holding the objects
:param tgt_bucket: destination bucket
:param dry_run: if true, will print out the planned request but not send
:type self: pyboto3.s3 | ClientWrap
"""
print('Copying %s files' % len(keys))
copy_configs = []
for key in keys:
replace = (self.join_key('', replace[0]), self.join_key('', replace[1]))
new_key = key.replace(*replace) if replace else key
new_key = self.join_key(new_prefix, new_key)
copy_configs.append(
{
'Bucket': tgt_bucket,
'CopySource': {'Bucket': src_bucket, 'Key': key},
'Key': new_key
}
)
for copy_config in copy_configs:
if dry_run:
print copy_config
else:
self.copy_object(**copy_config)
def delete_objects_wrap(self, keys, bucket=None):
"""
:param keys: list containing keys to delete
:type keys: list[str]
:param bucket: bucket name
:type bucket: str|unicode
:type self: pyboto3.s3|ClientWrap
"""
files_to_delete = map(
lambda key: {'Key': key},
keys
)
print('Deleting %s files' % len(files_to_delete))
deleted = []
if len(files_to_delete) > 0:
batch_size = 500
for i in range(0, int(ceil(float(len(files_to_delete)) / batch_size))):
lowerbound = i * batch_size
upperbound = (i + 1) * batch_size
curr_batch = files_to_delete[lowerbound:upperbound]
deleted += curr_batch
print('Batch %s (%s files)' % ((i + 1), len(curr_batch)))
for key in curr_batch:
print('Deleting %s' % key['Key'])
delete_config = {'Objects': curr_batch}
self.delete_objects(
Bucket=bucket,
Delete=delete_config,
)
print('Done. Deleted %s files' % len(deleted))
def move_objects_wrap(self, keys, new_prefix, replace=None, src_bucket=None, tgt_bucket=None):
"""
Copy + delete
"""
self.copy_objects_wrap(
keys=keys,
new_prefix=new_prefix,
replace=replace,
src_bucket=src_bucket,
tgt_bucket=tgt_bucket
)
self.delete_objects_wrap(
keys=keys,
bucket=src_bucket
)
def upload_files_wrap(self, file_glob, target_prefix=None, bucket=None):
"""
Takes a glob and loops through, calling client.upload_file()
:type self: pyboto3.s3|ClientWrap"""
for filename in glob.glob(file_glob):
key = self.join_key(target_prefix, p.basename(filename))
print 'uploading %s' % key
self.upload_file(filename, bucket, key)
def mkdir_wrap(self, key, bucket=None):
"""
Will create an empty file at key.
:type self: pyboto3.s3|ClientWrap
"""
self.put_object(Bucket=bucket, Key=self.join_key('', key + '/'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment