Created
February 24, 2022 02:34
-
-
Save hrchu/5ab0a47b5d3c62df562e0ccb003d269b to your computer and use it in GitHub Desktop.
s3 python util/example code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from typing import Iterable, Dict, BinaryIO | |
import boto3 | |
from boto3.s3.transfer import TransferConfig | |
from botocore.config import Config | |
from botocore.exceptions import ClientError | |
KB = 1024 | |
MB = KB * KB | |
# TODO: extra this | |
transfer_config = TransferConfig(multipart_threshold=8 * MB, | |
multipart_chunksize=8 * MB) | |
logger = logging.getLogger(__name__) | |
class S3: | |
def __init__(self, endpoint, access_key, secret_key): | |
self.s3 = boto3.client('s3', | |
endpoint_url=endpoint, | |
use_ssl=False, | |
config=Config( | |
signature_version='s3', | |
proxies={'http': None} | |
), | |
aws_access_key_id=access_key, | |
aws_secret_access_key=secret_key, | |
) | |
def set_transition_rule(self, bucket, prefix='', date='2020-01-01', _class='GLACIER'): | |
self.s3.put_bucket_lifecycle_configuration( | |
Bucket=bucket, | |
LifecycleConfiguration={ | |
'Rules': [ | |
{ | |
'Transitions': [ | |
{ | |
'Date': date, | |
'StorageClass': _class | |
}, | |
], | |
'ID': 'string', | |
'Prefix': prefix, | |
'Status': 'Enabled', | |
}, | |
] | |
} | |
) | |
def create_bucket(self, bucket, ignore_conflict: bool = True): | |
try: | |
self.s3.create_bucket(Bucket=bucket, CreateBucketConfiguration={ | |
'LocationConstraint': 'ap-northeast-1'}) | |
except ClientError as e: | |
if ignore_conflict and e.response['Error']['Code'] == 'BucketAlreadyOwnedByYou': | |
logger.warning(f'bucket: {bucket} BucketAlreadyExists.') | |
else: | |
raise | |
# TODO: deprecated. Use upload_fileobj instead. | |
def put_object(self, bucket, key, f): | |
with f as data: | |
self.s3.upload_fileobj(data, bucket, key, Config=transfer_config) | |
def upload_fileobj(self, bucket, key, f, storage_class=None): | |
extra_args = None | |
if storage_class: | |
extra_args = { | |
'StorageClass': storage_class | |
} | |
self.s3.upload_fileobj(f, bucket, key, Config=transfer_config, ExtraArgs=extra_args) | |
def copy(self, src_bucket, src_key, dst_bucket, dst_key, storage_class=None): | |
copy_source = { | |
'Bucket': src_bucket, | |
'Key': src_key | |
} | |
extra_args = None | |
if storage_class: | |
extra_args = { | |
'StorageClass': storage_class | |
} | |
self.s3.copy(copy_source, dst_bucket, dst_key, Config=transfer_config, ExtraArgs=extra_args) | |
if not storage_class: | |
storage_class = 'STANDARD' | |
self.s3.copy_object(Bucket=dst_bucket, CopySource=copy_source, Key=dst_key, StorageClass=storage_class) | |
def get_object(self, bucket, key) -> Dict: | |
return self.s3.get_object(Bucket=bucket, Key=key) | |
def get_object_content(self, bucket, key) -> str: | |
resp = self.get_object(bucket, key) | |
return resp['Body'].read().decode("utf-8") | |
def get_object_content_as_stream(self, bucket, key) -> BinaryIO: | |
resp = self.get_object(bucket, key) | |
return resp['Body'] | |
def head_object(self, bucket, key) -> Dict: | |
return self.s3.head_object(Bucket=bucket, Key=key) | |
def is_exist(self, bucket, key) -> bool: | |
try: | |
self.s3.head_object(Bucket=bucket, Key=key) | |
except ClientError as e: | |
if e.response['ResponseMetadata']['HTTPStatusCode'] == 404: | |
return False | |
else: | |
raise | |
return True | |
def is_standard_class(self, bucket, key) -> bool: | |
return 'STANDARD' == self.get_key_storage_class(bucket, key) | |
def is_ia_class(self, bucket, key) -> bool: | |
return 'STANDARD_IA' == self.get_key_storage_class(bucket, key) | |
def is_archived(self, bucket, key) -> bool: | |
return 'GLACIER' == self.get_key_storage_class(bucket, key) | |
def get_key_storage_class(self, bucket, key) -> str: | |
""" | |
Provides storage class information of the object. Amazon S3 returns this header for all objects except for S3 Standard storage class objects. | |
""" | |
resp = self.s3.head_object(Bucket=bucket, Key=key) | |
if 'StorageClass' in resp: | |
return resp['StorageClass'] | |
else: | |
return 'STANDARD' | |
def delete_object(self, bucket, key, ignore_error: bool = True): | |
try: | |
self.s3.delete_object(Bucket=bucket, Key=key) | |
except ClientError: | |
if ignore_error: | |
logger.warning(f'bucket: {bucket} key: {key} ignore_error.') | |
def set_to_restore(self, bucket, key, suppress_restore_already_in_progress: bool = True): | |
try: | |
response = self.s3.restore_object( | |
Bucket=bucket, | |
Key=key, | |
RestoreRequest={ | |
'Days': 1 | |
} | |
) | |
assert response['ResponseMetadata']['HTTPStatusCode'] == 202 | |
except ClientError as e: | |
if suppress_restore_already_in_progress and e.response['Error']['Code'] == 'RestoreAlreadyInProgress': | |
logger.warning(f'bucket: {bucket} key: {key} RestoreAlreadyInProgress.') | |
else: | |
raise | |
def is_restored(self, bucket, key) -> bool: | |
if not self.is_archived(bucket, key): | |
return False | |
try: | |
self.s3.download_file(Bucket=bucket, Key=key, Filename='/dev/null') | |
return True | |
except ClientError as e: | |
if e.response['ResponseMetadata']['HTTPStatusCode'] == 403: | |
return False | |
else: | |
raise | |
def list_keys(self, bucket, prefix='', storage_class=None) -> Iterable[str]: | |
response = self.s3.list_objects( | |
Bucket=bucket, | |
Prefix=prefix, | |
) | |
if 'Contents' not in response: | |
raise Exception('Cannot list keys') | |
if response['IsTruncated']: | |
raise Exception('More them 1000 keys in the bucket') | |
for item in response['Contents']: | |
if not storage_class or item['StorageClass'] == storage_class: | |
yield item['Key'] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment