Skip to content

Instantly share code, notes, and snippets.

@KerberosMorphy
Last active November 24, 2020 17:29
Show Gist options
  • Save KerberosMorphy/33c63dc53879120722b7d5ebf0ea8b90 to your computer and use it in GitHub Desktop.
Save KerberosMorphy/33c63dc53879120722b7d5ebf0ea8b90 to your computer and use it in GitHub Desktop.
Class to sync data with S3 Bucket
from glob import iglob
from pathlib import PurePath
from os import mkdir, makedirs
from os.path import isfile, isdir
from typing import Any, Dict, Optional, List
from tqdm import tqdm
from boto3.session import Session as AWSSession
BUCKET_NAME: str = ''
AWS_ACCESS_KEY_ID: str = ''
AWS_SECRET_ACCESS_KEY: str = ''
class DataSet:
"""DataSet
Sync data with remote S3 Bucket,
"""
def __init__(self, bucket_name: str, aws_access_key_id: Optional[str]=None, aws_secret_access_key: Optional[str]=None, region_name: str="us-east-1", profile: Optional[str]=None) -> None:
self.bucket_name: str = bucket_name
if profile:
self.session: AWSSession = AWSSession(profile_name=profile)
else:
assert aws_access_key_id and aws_secret_access_key, "You must give credentials"
self.session = AWSSession(aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=region_name)
if not isdir(PurePath(self.bucket_name)):
mkdir(PurePath(self.bucket_name))
self.s3_client = self.session.client('s3')
self.s3_resource = self.session.resource('s3')
self.last_file: Optional[str] = None
def list_objects(self, prefix: Optional[str]=None, start_after: Optional[str]=None, max: Optional[int]=1000) -> Dict[str, Any]:
"""List Objects
List Objects from S3 Bucket
Parameters
----------
prefix : Optional[str], optional
List objects starting with, by default None
start_after : Optional[str], optional
Continue listing avec a specific object's key, must be a complete file key, by default None
max : Optional[int], optional
Mac number of objects per call, by default 1000
Returns
-------
Dict[str, Any]
files : List[str]
List of objetcts' key
is_next : bool
True if there's more objects to list from S3 Bucket.
"""
if start_after and prefix:
response = self.s3_client.list_objects_v2(Bucket=self.bucket_name, Prefix=prefix, StartAfter=start_after, MaxKeys=max)
elif start_after:
response = self.s3_client.list_objects_v2(Bucket=self.bucket_name, StartAfter=start_after, MaxKeys=max)
elif prefix:
response = self.s3_client.list_objects_v2(Bucket=self.bucket_name, Prefix=prefix, MaxKeys=max)
else:
response = self.s3_client.list_objects_v2(Bucket=self.bucket_name, MaxKeys=max)
return {'files': [object['Key'] for object in response.get('Contents', [])], 'is_next': response.get('IsTruncated', False)}
def download(self):
"""Download Objects
Download Objects from S3 Bucket.
Download will be skip if object already exist in local folder.
The destination folder will have the same name than the S3 Bucket name.
"""
is_token: bool = True
remote_files: List[str] = []
start_after: Optional[str] = None
while is_token:
res: Dict[str, Any] = self.list_objects(start_after=start_after)
remote_files += res['files']
is_token = res['is_next']
for file in res['files']:
file_dest = PurePath(self.bucket_name, file)
if not self.is_local(file):
tqdm.write(f' - Downloading {file_dest.name} from S3')
if not isdir(file_dest.parent):
makedirs(file_dest.parent)
self.s3_resource.meta.client.download_file(self.bucket_name, file, f"{file_dest}")
else:
tqdm.write(f' - {file_dest.name} already in local')
def is_local(self, file: str) -> bool:
"""Is file exist locally
Check if file already exist in local folder
Parameters
----------
file : str
Should be the S3 Bucket object's key
Returns
-------
bool
True if file exist in locally
"""
return True if isfile(PurePath(self.bucket_name, file)) else False
def is_remote(self, file: str) -> bool:
"""Is file exist in remote
Check if file already exist in S3 Bucket
Parameters
----------
file : str
file path without the first folder name (file.split(self.bucket_name)[-1])
Returns
-------
bool
True if file exist in S3 Bucket
"""
return True if self.list_objects(prefix=file)['files'] else False
def next(self) -> Optional[bytes]:
"""Next file
Allow to work with data on the fly.
If the next file do not exist locally,
it will be download from S3 then retreive.
Returns
-------
Optional[bytes]
File in bytes or None if there's no more data
Raises
------
NotImplementedError
Method not finish yet
"""
raise NotImplementedError('Not finished')
def upload(self):
"""Upload Objects
Uploads Objects from S3 Bucket.
Upload will be skip if object already exist in bucket.
The source folder must have the same name than the S3 Bucket name.
"""
for file_from in tqdm(iglob(f'{self.bucket_name}/**/*.*', recursive=True)):
file: PurePath = PurePath(file_from.lstrip(f'{self.bucket_name}/'))
file_dest: str = file.as_posix().lstrip('/')
if isfile(file_from):
if not self.is_remote(file_dest):
tqdm.write(f' - Uploading {file.name} to S3')
self.s3_resource.meta.client.upload_file(file_from, self.bucket_name, file_dest)
else:
tqdm.write(f' - {file.name} already in S3')
if __name__ == "__main__":
data = DataSet(bucket_name=BUCKET_NAME, aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
# Download depuis le s3 vers le bucket nommé bucket_name, aura le même chemin de dossiers/nom_fichier
data.upload()
data.download()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment