Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Example: An S3 proxy client written in Python
"""
The server application uses AWS S3 in various places.
This utility provides a common place for interacting
with S3 and handles the authentication in a unified manner.
"""
import os.path
import logging
import boto3
from settings import S3_ACCESS_KEY, S3_SECRET_KEY
class S3Client():
def __init__(self, aws_access_key=S3_ACCESS_KEY, aws_secret_key=S3_SECRET_KEY):
self.aws_access_key = aws_access_key
self.aws_secret_key = aws_secret_key
self.client = self._client()
def list_folder_contents(self, bucket_name, folder_name=None, exclude_self=True):
"""
List the contents (keys) of the objects in the folder.
:param bucket_name: (str) name of the bucket to list
:param folder_name: (str) name of the folder to list the contents of. If omitted or None, will list all folders
:param exclude_self: (boolean) under some odd circumstances, the folder itself might get listed. Normally the method will remove that. This flag, set to false, will not try to eliminate it. Default is True
:returns: list of key names in folder
"""
if folder_name:
folder_name = os.path.join(folder_name, '') # ensure it ends in a slash
else:
folder_name = '' # non-prefixed -- all folders
objects = []
incomplete = True
continuation_token = None
while incomplete:
if continuation_token:
response = self.client.list_objects_v2(
Bucket=bucket_name,
Prefix=folder_name,
ContinuationToken=continuation_token,
)
else:
response = self.client.list_objects_v2(
Bucket=bucket_name,
Prefix=folder_name,
)
objects += response.get('Contents', [])
if response.get('isTruncated', False):
continuation_token = response['NextContinuationToken']
else:
incomplete = False
if exclude_self:
contents = [obj['Key'] for obj in objects if obj['Key'] != folder_name]
else:
contents = [obj['Key'] for obj in objects]
return contents
def move_object(self, source_bucket_name=None, source_name=None, target_name=None, target_bucket_name=None):
"""
Moving an object on S3 requires two steps:
1) copy to destination
2) delete from source
:param source_bucket_name: (str) name of bucket to copy from
:param source_name: (str) object key to copy from
:param target_name: (str) object key to copy to
:param target_bucket_name: (str) name of bucket to copy to. If None, use the source_bucket_name
:return None:
"""
if target_bucket_name is None:
target_bucket_name = source_bucket_name
response = self.client.copy_object(
Bucket=target_bucket_name,
Key=target_name,
CopySource={
'Bucket': source_bucket_name,
'Key': source_name
}
)
if response.get('CopyObjectResult', False):
# Assume it worked
response = self.client.delete_object(
Bucket=source_bucket_name,
Key=source_name
)
def upload_file(self, source_name, target_name, bucket_name):
"""
Uploads the source to the target in the bucket
:params source_name: (str) name of file to upload
:params target_name: (str) name of object on S3 (include any folder or path)
:params bucket_name: (str) bucket to receive file
:returns: None
"""
self.client.upload_file(
source_name,
bucket_name,
target_name
)
def download_file(self, source_name, target_name, bucket_name):
"""
Downloads the source object from the bucket into the target file. Note that the target_name paths should already exist.
:params source_name: (str) object key to download
:params target_name: (str) destination for download -- all paths to the base file must already exist
:params bucket_name: (str) name of bucket to download from
:returns: None
"""
self.client.download_file(
bucket_name,
source_name,
target_name
)
def _client(self):
return boto3.client(
's3',
aws_access_key_id=self.aws_access_key,
aws_secret_access_key=self.aws_secret_key)
import os.path
import unittest
from tempfile import TemporaryDirectory
from utils.s3_client import S3Client
TEST_BUCKET='tt-testing-s3-bucket'
class TestS3Client(unittest.TestCase):
def setUp(self):
self.client = S3Client()
def tearDown(self):
self.client = None
def test_list_bucket_contents(self):
contents = self.client.list_folder_contents(TEST_BUCKET)
self.assertNotEqual([], contents)
def test_list_bucket_contents_with_folder(self):
# create test folder and object
self.client.client.put_object(
Bucket=TEST_BUCKET,
Key='TEST/test_object',
Body=b'test body',
)
contents = self.client.list_folder_contents(TEST_BUCKET, folder_name='TEST')
self.assertEqual(['TEST/test_object'], contents )
# remove test folder and object
self.client.client.delete_object(
Bucket=TEST_BUCKET,
Key='TEST/test_object',
)
def test_list_bucket_contents_with_folder_include_self(self):
# create test folder
self.client.client.put_object(
Bucket=TEST_BUCKET,
Key='TEST/',
Body=b'',
)
# create test folder and object
self.client.client.put_object(
Bucket=TEST_BUCKET,
Key='TEST/test_object',
Body=b'test body',
)
contents = self.client.list_folder_contents(TEST_BUCKET, folder_name='TEST', exclude_self=False)
self.assertEqual(['TEST/', 'TEST/test_object'], contents )
# remove test folder and object
self.client.client.delete_object(
Bucket=TEST_BUCKET,
Key='TEST/test_object',
)
self.client.client.delete_object(
Bucket=TEST_BUCKET,
Key='TEST/'
)
def test_move_object(self):
# create source object
self.client.client.put_object(
Bucket=TEST_BUCKET,
Key='SOURCE/source_object',
Body=b'source body',
)
self.client.move_object(
source_bucket_name=TEST_BUCKET,
source_name='SOURCE/source_object',
target_name='TARGET/target_object',
)
source_list = self.client.list_folder_contents(TEST_BUCKET, folder_name='SOURCE')
self.assertEqual([], source_list)
target_list = self.client.list_folder_contents(TEST_BUCKET, folder_name='TARGET')
self.assertEqual(['TARGET/target_object'], target_list)
# clean up
self.client.client.delete_object(
Bucket=TEST_BUCKET,
Key='TARGET/target_object'
)
def test_upload_file(self):
with TemporaryDirectory() as tdir:
source_file=os.path.join(tdir, 'source.dat')
with open(source_file, 'w') as src:
src.write('test data')
target_file='TEST/target.dat'
self.client.upload_file(
source_file,
target_file,
TEST_BUCKET,
)
contents = self.client.list_folder_contents(
TEST_BUCKET,
'TEST'
)
self.assertIn(target_file, contents)
self.client.client.delete_object(
Bucket=TEST_BUCKET,
Key=target_file
)
def test_download_file(self):
source='SOURCE/source.dat'
# create an S3 object
self.client.client.put_object(
Bucket=TEST_BUCKET,
Key=source,
Body=b'source body',
)
with TemporaryDirectory() as tdir:
target_file = os.path.join(tdir, 'target.dat')
self.client.download_file(
source,
target_file,
TEST_BUCKET,
)
self.assertTrue(os.path.exists(target_file))
# clean up
self.client.client.delete_object(
Bucket=TEST_BUCKET,
Key=source,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.