Skip to content

Instantly share code, notes, and snippets.

Created Aug 24, 2020
What would you like to do?
# -*- coding: utf-8 -*-
Copyright (C) 2019, Zato Source s.r.o.
Licensed under LGPLv3, see LICENSE.txt for terms and conditions.
from __future__ import absolute_import, division, print_function, unicode_literals
# stdlib
from logging import getLogger
# Boto
from boto.s3.bucket import Bucket
from boto.s3.connection import NoHostProvided, S3Connection, OrdinaryCallingFormat
from boto.s3.key import Key
# Zato
from zato.common import ZATO_NONE
from zato.common.util import parse_extra_into_dict
from zato.server.connection.queue import Wrapper
# Minio support
from urllib.parse import urlparse
logger = getLogger(__name__)
class _S3Connection(object):
def __init__(self, **kwargs):
self.zato_default_bucket = kwargs.pop('bucket')
self.zato_content_type = kwargs.pop('content_type')
self.zato_metadata = kwargs.pop('metadata')
encrypt_at_rest = kwargs.pop('encrypt_at_rest')
self.zato_encrypt_at_rest = 'AES256' if encrypt_at_rest else None
self.zato_storage_class = kwargs.pop('storage_class')
self.impl = S3Connection(**kwargs)
def sanity_check(self):
def set(self, key, value, bucket=ZATO_NONE, content_type=ZATO_NONE, metadata=ZATO_NONE,
storage_class=ZATO_NONE, encrypt_at_rest=ZATO_NONE):
_bucket = Bucket(self.impl, bucket if bucket != ZATO_NONE else self.zato_default_bucket)
_key = Key(_bucket)
_key.content_type = content_type if content_type != ZATO_NONE else self.zato_content_type
_key.metadata.update(metadata if metadata != ZATO_NONE else parse_extra_into_dict(self.zato_metadata, False)) = key
_key.storage_class = storage_class if storage_class != ZATO_NONE else self.zato_storage_class
value, encrypt_key=(encrypt_at_rest if encrypt_at_rest != ZATO_NONE else self.zato_encrypt_at_rest))
class S3Wrapper(Wrapper):
""" Wraps a queue of connections to AWS S3.
def __init__(self, config, server):
config.auth_url = config.address
super(S3Wrapper, self).__init__(config, 'AWS S3', server)
def add_client(self):
# Pre-defining the host as it can be setted by server config or by Minio related block below
host = self.server.fs_server_config.misc.aws_host or NoHostProvided
# Dict used for passing minio related keyword arguments to _S3Connection's constructor
# If minio_aws is not defined, this should not affect the original behaviour
minio_kwargs = {}
# Check if the server configuration defines aws_minio flag
# If the flag is true, then parse the config.address URL for host,port and is_secure (Required for Minio connection)
if "aws_minio" in self.server.fs_server_config.misc and self.server.fs_server_config.misc.aws_minio == True:
minio_addr = urlparse(self.config.address)
# Validates that config.addres contains required fields for creating Minio connection
if not all([minio_addr.scheme, minio_addr.hostname, minio_addr.port]):
raise ValueError("Invalid address:{}".format(self.config.address))
# Replaces the host with hostname taken from config.address
# minio_kwargs is not used, as the host parameter is already in use by original implementation
host = minio_addr.hostname
minio_kwargs = {
# calling_format is required for boto2
# (
"calling_format": OrdinaryCallingFormat(),
"is_secure":minio_addr.scheme == "https",
except Exception as ex: # For now, only warn about possible issues with the minio configuration
logger.warn("Minio config failed, ex:{}".format(ex))
conn = _S3Connection(aws_access_key_id=self.config.username, aws_secret_access_key=self.config.password,
suppress_consec_slashes=self.config.suppr_cons_slashes, content_type=self.config.content_type,
metadata=self.config.metadata_ or {}, bucket=self.config.bucket, encrypt_at_rest=self.config.encrypt_at_rest,
storage_class=self.config.storage_class, host=host, **minio_kwargs)
# Sanity check - no exception here means the config is correct.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment