Last active
November 24, 2020 17:29
-
-
Save KerberosMorphy/33c63dc53879120722b7d5ebf0ea8b90 to your computer and use it in GitHub Desktop.
Class to sync data with S3 Bucket
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from glob import iglob | |
from pathlib import PurePath | |
from os import mkdir, makedirs | |
from os.path import isfile, isdir | |
from typing import Any, Dict, Optional, List | |
from tqdm import tqdm | |
from boto3.session import Session as AWSSession | |
BUCKET_NAME: str = '' | |
AWS_ACCESS_KEY_ID: str = '' | |
AWS_SECRET_ACCESS_KEY: str = '' | |
class DataSet: | |
"""DataSet | |
Sync data with remote S3 Bucket, | |
""" | |
def __init__(self, bucket_name: str, aws_access_key_id: Optional[str]=None, aws_secret_access_key: Optional[str]=None, region_name: str="us-east-1", profile: Optional[str]=None) -> None: | |
self.bucket_name: str = bucket_name | |
if profile: | |
self.session: AWSSession = AWSSession(profile_name=profile) | |
else: | |
assert aws_access_key_id and aws_secret_access_key, "You must give credentials" | |
self.session = AWSSession(aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=region_name) | |
if not isdir(PurePath(self.bucket_name)): | |
mkdir(PurePath(self.bucket_name)) | |
self.s3_client = self.session.client('s3') | |
self.s3_resource = self.session.resource('s3') | |
self.last_file: Optional[str] = None | |
def list_objects(self, prefix: Optional[str]=None, start_after: Optional[str]=None, max: Optional[int]=1000) -> Dict[str, Any]: | |
"""List Objects | |
List Objects from S3 Bucket | |
Parameters | |
---------- | |
prefix : Optional[str], optional | |
List objects starting with, by default None | |
start_after : Optional[str], optional | |
Continue listing avec a specific object's key, must be a complete file key, by default None | |
max : Optional[int], optional | |
Mac number of objects per call, by default 1000 | |
Returns | |
------- | |
Dict[str, Any] | |
files : List[str] | |
List of objetcts' key | |
is_next : bool | |
True if there's more objects to list from S3 Bucket. | |
""" | |
if start_after and prefix: | |
response = self.s3_client.list_objects_v2(Bucket=self.bucket_name, Prefix=prefix, StartAfter=start_after, MaxKeys=max) | |
elif start_after: | |
response = self.s3_client.list_objects_v2(Bucket=self.bucket_name, StartAfter=start_after, MaxKeys=max) | |
elif prefix: | |
response = self.s3_client.list_objects_v2(Bucket=self.bucket_name, Prefix=prefix, MaxKeys=max) | |
else: | |
response = self.s3_client.list_objects_v2(Bucket=self.bucket_name, MaxKeys=max) | |
return {'files': [object['Key'] for object in response.get('Contents', [])], 'is_next': response.get('IsTruncated', False)} | |
def download(self): | |
"""Download Objects | |
Download Objects from S3 Bucket. | |
Download will be skip if object already exist in local folder. | |
The destination folder will have the same name than the S3 Bucket name. | |
""" | |
is_token: bool = True | |
remote_files: List[str] = [] | |
start_after: Optional[str] = None | |
while is_token: | |
res: Dict[str, Any] = self.list_objects(start_after=start_after) | |
remote_files += res['files'] | |
is_token = res['is_next'] | |
for file in res['files']: | |
file_dest = PurePath(self.bucket_name, file) | |
if not self.is_local(file): | |
tqdm.write(f' - Downloading {file_dest.name} from S3') | |
if not isdir(file_dest.parent): | |
makedirs(file_dest.parent) | |
self.s3_resource.meta.client.download_file(self.bucket_name, file, f"{file_dest}") | |
else: | |
tqdm.write(f' - {file_dest.name} already in local') | |
def is_local(self, file: str) -> bool: | |
"""Is file exist locally | |
Check if file already exist in local folder | |
Parameters | |
---------- | |
file : str | |
Should be the S3 Bucket object's key | |
Returns | |
------- | |
bool | |
True if file exist in locally | |
""" | |
return True if isfile(PurePath(self.bucket_name, file)) else False | |
def is_remote(self, file: str) -> bool: | |
"""Is file exist in remote | |
Check if file already exist in S3 Bucket | |
Parameters | |
---------- | |
file : str | |
file path without the first folder name (file.split(self.bucket_name)[-1]) | |
Returns | |
------- | |
bool | |
True if file exist in S3 Bucket | |
""" | |
return True if self.list_objects(prefix=file)['files'] else False | |
def next(self) -> Optional[bytes]: | |
"""Next file | |
Allow to work with data on the fly. | |
If the next file do not exist locally, | |
it will be download from S3 then retreive. | |
Returns | |
------- | |
Optional[bytes] | |
File in bytes or None if there's no more data | |
Raises | |
------ | |
NotImplementedError | |
Method not finish yet | |
""" | |
raise NotImplementedError('Not finished') | |
def upload(self): | |
"""Upload Objects | |
Uploads Objects from S3 Bucket. | |
Upload will be skip if object already exist in bucket. | |
The source folder must have the same name than the S3 Bucket name. | |
""" | |
for file_from in tqdm(iglob(f'{self.bucket_name}/**/*.*', recursive=True)): | |
file: PurePath = PurePath(file_from.lstrip(f'{self.bucket_name}/')) | |
file_dest: str = file.as_posix().lstrip('/') | |
if isfile(file_from): | |
if not self.is_remote(file_dest): | |
tqdm.write(f' - Uploading {file.name} to S3') | |
self.s3_resource.meta.client.upload_file(file_from, self.bucket_name, file_dest) | |
else: | |
tqdm.write(f' - {file.name} already in S3') | |
if __name__ == "__main__": | |
data = DataSet(bucket_name=BUCKET_NAME, aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY) | |
# Download depuis le s3 vers le bucket nommé bucket_name, aura le même chemin de dossiers/nom_fichier | |
data.upload() | |
data.download() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment