Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
S3 Utilities
import contextlib
import os
import tempfile
half_lambda_memory = 10**6 * (
int(os.getenv('AWS_LAMBDA_FUNCITON_MEMORY_SIZE', '0')) / 2)
@contextlib.contextmanager
def buffered_s3_writer(Bucket, Key, ExtraArgs=None, buffering=-1, temp_dir='/tmp'):
"""fileobj that writes output after buffering to `temp_dir` """
buffer_bytes = (half_lambda_memory or 10**6*64) if buffering == -1 else buffering
ExtraArgs = ExtraArgs or {}
ExtraArgs.setdefault('ACL', 'bucket-owner-full-control')
if 'ContentType' not in ExtraArgs:
ContentType = mimetypes.guess_type(Key)[0] or 'text/plain'
with tempfile.SpooledTemporaryFile(max_size=buffer_bytes, dir=temp_dir) as f:
yield f
f.seek(0)
s3.upload_fileobj(f, Bucket, Key, ExtraArgs=ExtraArgs)
import boto3
def keys(Bucket, Prefix='', StartAfter='', Delimiter='/'):
Prefix = Prefix[1:] if Prefix.startswith(Delimiter) else Prefix
if not StartAfter:
del StartAfter
if Prefix.endswith(Delimiter):
StartAfter = Prefix
del Delimiter
for page in boto3.client('s3').get_paginator('list_objects_v2').paginate(
**locals()):
for content in page.get('Contents', ()):
yield content['Key']
def folders(Bucket, Prefix='', Delimiter='/'):
Prefix = Prefix[1:] if Prefix.startswith(Delimiter) else Prefix
for page in boto3.client('s3').get_paginator('list_objects_v2').paginate(
**locals()):
for prefix in page.get('CommonPrefixes', []):
yield prefix['Prefix']
"""Remove a bucket (possibly with versioning enabled)
This requires at a minimum:
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- s3:DeleteBucket
- s3:GetBucketVersioning
- s3:ListBucket
- s3:PutBucketVersioning
- s3:PutLifecycleConfiguration
Resource: !Sub 'arn:${AWS::Partition}:s3:::${Bucket}'
- Effect: Allow
Action:
- s3:DeleteObject
Resource: !Sub 'arn:${AWS::Partition}:s3:::${Bucket}/*'
TODO: catch non-fatal missing permission
"""
import itertools
import boto3
def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return itertools.zip_longest(*args, fillvalue=fillvalue)
def s3_versioned_keys(Bucket):
for page in boto3.client('s3').get_paginator(
'list_object_versions').paginate(Bucket=Bucket):
contents = itertools.chain(page.get('Versions', ()),
page.get('DeleteMarkers', ()))
for content in contents:
yield content['Key'], content['VersionId']
def remove_bucket(bucket_name):
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket_name)
versioning = bucket.Versioning()
if versioning.status == 'Enabled':
versioning.suspend()
bucket.LifecycleConfiguration().put(
LifecycleConfiguration={
"Rules": [{
"Status": "Enabled",
"Prefix": "",
"AbortIncompleteMultipartUpload": {
"DaysAfterInitiation": 1
},
"NoncurrentVersionExpiration": {
"NoncurrentDays": 1
},
"Expiration": {
"Date": "2000-01-01T00:00:00.000Z"
}
}]
})
key_version_batches = grouper(s3_versioned_keys(bucket_name), 1000,
(None, None))
for batch in key_version_batches:
bucket.delete_objects(
Delete={
'Quiet': True,
'Objects': tuple({
'Key': k,
'VersionId': v
} for k, v in batch if k)
})
bucket.delete()
import mimetypes
import urllib
import boto3
def url_to_s3(url, Bucket, ExtraArgs=None, **kwargs):
"""download a url target to an S3 Bucket
override destination with `Key`
also accepts boto3 `ExtraArgs`, `Callback` and `Config`
"""
_kwargs = kwargs.copy()
_kwargs['Bucket'] = Bucket
if 'Key' not in _kwargs:
_kwargs['Key'] = urllib.parse.urlparse(url).path[1:] or 'index'
ExtraArgs = ExtraArgs or {}
ExtraArgs.setdefault('ACL', 'bucket-owner-full-control')
with urllib.request.urlopen(url) as resp:
if 'ContentType' not in ExtraArgs:
ContentType = resp.getheader('content-type',
None) or mimetypes.guess_type(
_kwargs['Key'])[0]
if ContentType is not None:
ExtraArgs['ContentType'] = ContentType
boto3.client('s3').upload_fileobj(resp, ExtraArgs=ExtraArgs, **_kwargs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment