Skip to content

Instantly share code, notes, and snippets.

@hrchu
Created February 24, 2022 02:34
Show Gist options
  • Save hrchu/5ab0a47b5d3c62df562e0ccb003d269b to your computer and use it in GitHub Desktop.
Save hrchu/5ab0a47b5d3c62df562e0ccb003d269b to your computer and use it in GitHub Desktop.
s3 python util/example code
import logging
from typing import Iterable, Dict, BinaryIO
import boto3
from boto3.s3.transfer import TransferConfig
from botocore.config import Config
from botocore.exceptions import ClientError
KB = 1024
MB = KB * KB
# TODO: extra this
transfer_config = TransferConfig(multipart_threshold=8 * MB,
multipart_chunksize=8 * MB)
logger = logging.getLogger(__name__)
class S3:
def __init__(self, endpoint, access_key, secret_key):
self.s3 = boto3.client('s3',
endpoint_url=endpoint,
use_ssl=False,
config=Config(
signature_version='s3',
proxies={'http': None}
),
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
)
def set_transition_rule(self, bucket, prefix='', date='2020-01-01', _class='GLACIER'):
self.s3.put_bucket_lifecycle_configuration(
Bucket=bucket,
LifecycleConfiguration={
'Rules': [
{
'Transitions': [
{
'Date': date,
'StorageClass': _class
},
],
'ID': 'string',
'Prefix': prefix,
'Status': 'Enabled',
},
]
}
)
def create_bucket(self, bucket, ignore_conflict: bool = True):
try:
self.s3.create_bucket(Bucket=bucket, CreateBucketConfiguration={
'LocationConstraint': 'ap-northeast-1'})
except ClientError as e:
if ignore_conflict and e.response['Error']['Code'] == 'BucketAlreadyOwnedByYou':
logger.warning(f'bucket: {bucket} BucketAlreadyExists.')
else:
raise
# TODO: deprecated. Use upload_fileobj instead.
def put_object(self, bucket, key, f):
with f as data:
self.s3.upload_fileobj(data, bucket, key, Config=transfer_config)
def upload_fileobj(self, bucket, key, f, storage_class=None):
extra_args = None
if storage_class:
extra_args = {
'StorageClass': storage_class
}
self.s3.upload_fileobj(f, bucket, key, Config=transfer_config, ExtraArgs=extra_args)
def copy(self, src_bucket, src_key, dst_bucket, dst_key, storage_class=None):
copy_source = {
'Bucket': src_bucket,
'Key': src_key
}
extra_args = None
if storage_class:
extra_args = {
'StorageClass': storage_class
}
self.s3.copy(copy_source, dst_bucket, dst_key, Config=transfer_config, ExtraArgs=extra_args)
if not storage_class:
storage_class = 'STANDARD'
self.s3.copy_object(Bucket=dst_bucket, CopySource=copy_source, Key=dst_key, StorageClass=storage_class)
def get_object(self, bucket, key) -> Dict:
return self.s3.get_object(Bucket=bucket, Key=key)
def get_object_content(self, bucket, key) -> str:
resp = self.get_object(bucket, key)
return resp['Body'].read().decode("utf-8")
def get_object_content_as_stream(self, bucket, key) -> BinaryIO:
resp = self.get_object(bucket, key)
return resp['Body']
def head_object(self, bucket, key) -> Dict:
return self.s3.head_object(Bucket=bucket, Key=key)
def is_exist(self, bucket, key) -> bool:
try:
self.s3.head_object(Bucket=bucket, Key=key)
except ClientError as e:
if e.response['ResponseMetadata']['HTTPStatusCode'] == 404:
return False
else:
raise
return True
def is_standard_class(self, bucket, key) -> bool:
return 'STANDARD' == self.get_key_storage_class(bucket, key)
def is_ia_class(self, bucket, key) -> bool:
return 'STANDARD_IA' == self.get_key_storage_class(bucket, key)
def is_archived(self, bucket, key) -> bool:
return 'GLACIER' == self.get_key_storage_class(bucket, key)
def get_key_storage_class(self, bucket, key) -> str:
"""
Provides storage class information of the object. Amazon S3 returns this header for all objects except for S3 Standard storage class objects.
"""
resp = self.s3.head_object(Bucket=bucket, Key=key)
if 'StorageClass' in resp:
return resp['StorageClass']
else:
return 'STANDARD'
def delete_object(self, bucket, key, ignore_error: bool = True):
try:
self.s3.delete_object(Bucket=bucket, Key=key)
except ClientError:
if ignore_error:
logger.warning(f'bucket: {bucket} key: {key} ignore_error.')
def set_to_restore(self, bucket, key, suppress_restore_already_in_progress: bool = True):
try:
response = self.s3.restore_object(
Bucket=bucket,
Key=key,
RestoreRequest={
'Days': 1
}
)
assert response['ResponseMetadata']['HTTPStatusCode'] == 202
except ClientError as e:
if suppress_restore_already_in_progress and e.response['Error']['Code'] == 'RestoreAlreadyInProgress':
logger.warning(f'bucket: {bucket} key: {key} RestoreAlreadyInProgress.')
else:
raise
def is_restored(self, bucket, key) -> bool:
if not self.is_archived(bucket, key):
return False
try:
self.s3.download_file(Bucket=bucket, Key=key, Filename='/dev/null')
return True
except ClientError as e:
if e.response['ResponseMetadata']['HTTPStatusCode'] == 403:
return False
else:
raise
def list_keys(self, bucket, prefix='', storage_class=None) -> Iterable[str]:
response = self.s3.list_objects(
Bucket=bucket,
Prefix=prefix,
)
if 'Contents' not in response:
raise Exception('Cannot list keys')
if response['IsTruncated']:
raise Exception('More them 1000 keys in the bucket')
for item in response['Contents']:
if not storage_class or item['StorageClass'] == storage_class:
yield item['Key']
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment