Skip to content

Instantly share code, notes, and snippets.

@joffilyfe
Last active August 6, 2020 13:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joffilyfe/88f6f111a46f9aeb663fccdc96d835fa to your computer and use it in GitHub Desktop.
Save joffilyfe/88f6f111a46f9aeb663fccdc96d835fa to your computer and use it in GitHub Desktop.
import abc
import os
import re
import shutil
import logging
import tempfile
import zipfile
import threading
from datetime import datetime
from ftplib import FTP, all_errors
from minio import Minio
from minio.error import ResponseError, NoSuchBucket
LOGGER = logging.getLogger(__name__)
class AsyncFTP(threading.Thread):
def __init__(
self, local_file_path, server, user, password, remote_path, timeout=60
):
threading.Thread.__init__(self)
self.local_file_path = local_file_path
self.server = server
self.user = user
self.password = password
self.remote_path = remote_path
self.timeout = timeout
def run(self):
LOGGER.info("FTP.START")
try:
ftp = FTP(self.server, self.user, self.password, self.timeout)
except all_errors as e:
LOGGER.info(e)
return
try:
if self.remote_path:
ftp.cwd(self.remote_path)
LOGGER.info("ftp " + self.local_file_path)
remote_name = os.path.basename(self.local_file_path)
with open(self.local_file_path, "rb") as f:
try:
LOGGER.info("FTP.STOR %s - start" % remote_name)
ftp.storbinary("STOR {}".format(remote_name), f)
LOGGER.info("FTP.STOR %s - end" % remote_name)
except all_errors:
LOGGER.info(
"FTP: Unable to send %s to %s"
% (self.local_file_path, remote_name),
exc_info=True,
)
except all_errors as e:
LOGGER.exception("all_errors as e")
finally:
ftp.close()
LOGGER.info("FTP.END")
class Exporter(abc.ABC):
"""Abstract Class that should be used as template"""
@abc.abstractmethod
def export(self, file_path: str) -> dict:
raise NotImplementedError
class FTPExporter(Exporter):
def __init__(
self, host, user, password, destination_dir_path, ftp_service: object = AsyncFTP
):
self.host = host
self.user = user
self.password = password
self.destination_dir_path = destination_dir_path
self.ftp_service = ftp_service
def export(self, zip_file: str):
try:
background = self.ftp_service(
zip_file, self.host, self.user, self.password, self.destination_dir_path
)
except (PermissionError, OSError) as exception:
LOGGER.error(
"%s - Unable to move zip file '%s' to destination '%s'. "
"The following exception was raised: '%s'",
self.__class__.__name__,
zip_file,
self.destination_dir_path,
exception,
)
else:
background.start()
class FileSystemExporter(Exporter):
"""Copy a file to destination"""
def __init__(self, destination_dir_path: str):
self.destination_dir_path = destination_dir_path
if not os.path.exists(self.destination_dir_path):
os.makedirs(self.destination_dir_path)
def export(self, zip_file: str):
"""Execute the proccess of export file to destination"""
try:
shutil.copy(zip_file, self.destination_dir_path)
except (PermissionError, OSError) as exception:
LOGGER.error(
"%s - Unable to move zip file '%s' to destination '%s'. "
"The following exception was raised: '%s'",
self.__class__.__name__,
zip_file,
self.destination_dir_path,
exception,
)
class MinioExporter(Exporter):
"""Sends a file to a Min.io object storage"""
def __init__(
self, host, access_key, secret_key, secure=False, bucket_name="xmlconverter",
):
self.bucket_name = bucket_name
self.POLICY_READ_ONLY = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": ["*"]},
"Action": ["s3:GetBucketLocation", "s3:ListBucket"],
"Resource": [f"arn:aws:s3:::{self.bucket_name}"],
},
{
"Effect": "Allow",
"Principal": {"AWS": ["*"]},
"Action": ["s3:GetObject"],
"Resource": [f"arn:aws:s3:::{self.bucket_name}/*"],
},
],
}
self._client = Minio(
host, access_key=access_key, secret_key=secret_key, secure=secure,
)
def export(self, file_path: str):
"""Put a object to the configurated bucket"""
object_name = os.path.basename(file_path)
self._client.fput_object(
self.bucket_name, object_name=object_name, file_path=file_path
)
LOGGER.info("ENVIANDO ARQUIVO PARA O MINIO %s", file_path)
class TemporaryZipFile:
"""Compress a source folder and generate a temporary Zip file"""
def __init__(self, package_source_dir_path: str):
self.package_source_dir_path = package_source_dir_path
def _create_temporary_zip_file(
self, directory_files_path: str, zip_filename: str
) -> str:
"""Makes a compressed file with directory source files and return a
path to the tempraty zip file."""
destination_directory_path = tempfile.mkdtemp()
file_path = os.path.join(destination_directory_path, zip_filename)
with zipfile.ZipFile(file_path, "w") as zf:
LOGGER.debug(
"%s - Creating zip file '%s' with files from %s.",
self.__class__.__name__,
file_path,
directory_files_path,
)
for item in os.listdir(directory_files_path):
path = os.path.join(directory_files_path, item)
zf.write(path, arcname=item)
LOGGER.debug(
"%s - Zip file '%s' created.", self.__class__.__name__, file_path,
)
return file_path
def get_zip_file(self):
root_dir, dir_base_name = (
os.path.dirname(self.package_source_dir_path),
os.path.basename(self.package_source_dir_path),
)
# preppend current datetime to the package name
package_file_name = os.path.join(
"%s_%s" % (re.sub(r"[\W\s]", "-", str(datetime.now())), dir_base_name,)
)
final_path = os.path.join(root_dir, package_file_name)
try:
zip_file = self._create_temporary_zip_file(
directory_files_path=self.package_source_dir_path,
zip_filename="{}.zip".format(final_path),
)
except (IOError, PermissionError) as e:
LOGGER.exception(
"%s - Unable to create zip for package '%s'. The following "
"exception was raised",
self.__class__.__name__,
package_file_name,
)
raise e
else:
return zip_file
def __enter__(self):
return self.get_zip_file()
def __exit__(self, type, value, traceback):
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment