Created
August 24, 2020 11:21
-
-
Save zoop500/aa433c89280542e0f212aed662e21164 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
Copyright (C) 2019, Zato Source s.r.o. https://zato.io | |
Licensed under LGPLv3, see LICENSE.txt for terms and conditions. | |
""" | |
from __future__ import absolute_import, division, print_function, unicode_literals | |
# stdlib | |
from logging import getLogger | |
# Boto | |
from boto.s3.bucket import Bucket | |
from boto.s3.connection import NoHostProvided, S3Connection, OrdinaryCallingFormat | |
from boto.s3.key import Key | |
# Zato | |
from zato.common import ZATO_NONE | |
from zato.common.util import parse_extra_into_dict | |
from zato.server.connection.queue import Wrapper | |
# Minio support | |
from urllib.parse import urlparse | |
logger = getLogger(__name__) | |
class _S3Connection(object): | |
def __init__(self, **kwargs): | |
self.zato_default_bucket = kwargs.pop('bucket') | |
self.zato_content_type = kwargs.pop('content_type') | |
self.zato_metadata = kwargs.pop('metadata') | |
encrypt_at_rest = kwargs.pop('encrypt_at_rest') | |
self.zato_encrypt_at_rest = 'AES256' if encrypt_at_rest else None | |
self.zato_storage_class = kwargs.pop('storage_class') | |
self.impl = S3Connection(**kwargs) | |
def sanity_check(self): | |
self.impl.get_canonical_user_id() | |
def set(self, key, value, bucket=ZATO_NONE, content_type=ZATO_NONE, metadata=ZATO_NONE, | |
storage_class=ZATO_NONE, encrypt_at_rest=ZATO_NONE): | |
_bucket = Bucket(self.impl, bucket if bucket != ZATO_NONE else self.zato_default_bucket) | |
_key = Key(_bucket) | |
_key.content_type = content_type if content_type != ZATO_NONE else self.zato_content_type | |
_key.metadata.update(metadata if metadata != ZATO_NONE else parse_extra_into_dict(self.zato_metadata, False)) | |
_key.name = key | |
_key.storage_class = storage_class if storage_class != ZATO_NONE else self.zato_storage_class | |
_key.set_contents_from_string( | |
value, encrypt_key=(encrypt_at_rest if encrypt_at_rest != ZATO_NONE else self.zato_encrypt_at_rest)) | |
class S3Wrapper(Wrapper): | |
""" Wraps a queue of connections to AWS S3. | |
""" | |
def __init__(self, config, server): | |
config.auth_url = config.address | |
super(S3Wrapper, self).__init__(config, 'AWS S3', server) | |
def add_client(self): | |
# Pre-defining the host as it can be setted by server config or by Minio related block below | |
host = self.server.fs_server_config.misc.aws_host or NoHostProvided | |
# Dict used for passing minio related keyword arguments to _S3Connection's constructor | |
# If minio_aws is not defined, this should not affect the original behaviour | |
minio_kwargs = {} | |
try: | |
# Check if the server configuration defines aws_minio flag | |
# If the flag is true, then parse the config.address URL for host,port and is_secure (Required for Minio connection) | |
if "aws_minio" in self.server.fs_server_config.misc and self.server.fs_server_config.misc.aws_minio == True: | |
minio_addr = urlparse(self.config.address) | |
# Validates that config.addres contains required fields for creating Minio connection | |
if not all([minio_addr.scheme, minio_addr.hostname, minio_addr.port]): | |
raise ValueError("Invalid address:{}".format(self.config.address)) | |
# Replaces the host with hostname taken from config.address | |
# minio_kwargs is not used, as the host parameter is already in use by original implementation | |
host = minio_addr.hostname | |
minio_kwargs = { | |
# calling_format is required for boto2 | |
# (https://github.com/minio/minio/issues/5422#issuecomment-358615603) | |
"calling_format": OrdinaryCallingFormat(), | |
"is_secure":minio_addr.scheme == "https", | |
"port":minio_addr.port | |
} | |
except Exception as ex: # For now, only warn about possible issues with the minio configuration | |
logger.warn("Minio config failed, ex:{}".format(ex)) | |
conn = _S3Connection(aws_access_key_id=self.config.username, aws_secret_access_key=self.config.password, | |
debug=self.config.debug_level, | |
suppress_consec_slashes=self.config.suppr_cons_slashes, content_type=self.config.content_type, | |
metadata=self.config.metadata_ or {}, bucket=self.config.bucket, encrypt_at_rest=self.config.encrypt_at_rest, | |
storage_class=self.config.storage_class, host=host, **minio_kwargs) | |
# Sanity check - no exception here means the config is correct. | |
conn.sanity_check() | |
self.client.put_client(conn) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment