Skip to content

Instantly share code, notes, and snippets.

@rm-you
Created August 28, 2014 23:56
Show Gist options
  • Save rm-you/7130dbdc858569657423 to your computer and use it in GitHub Desktop.
Save rm-you/7130dbdc858569657423 to your computer and use it in GitHub Desktop.
A draft of an alternate formatting method for __str__() for containers
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import logging
import prettytable
from barbicanclient import base
from barbicanclient import secrets
from barbicanclient.openstack.common.timeutils import parse_isotime
from barbican_cli.containers import ContainerFormatter
from barbican_cli.containers import CertificateContainerFormatter
from barbican_cli.containers import RSAContainerFormatter
LOG = logging.getLogger(__name__)
def immutable_after_save(func):
@functools.wraps(func)
def wrapper(self, *args):
if self._container_ref:
raise base.ImmutableException()
return func(self, *args)
return wrapper
class Container(ContainerFormatter):
"""
Containers are used to keep track of the data stored in Barbican.
"""
entity = 'containers'
def __init__(self, api, name=None, type='generic', secret_refs=None,
consumers=None, status=None, created=None, updated=None,
container_ref=None):
"""
Builds a container object from a dictionary.
"""
self._api = api
self._cached_secrets = list()
self._container_ref = container_ref
self._name = name
self._type = type
self._secret_refs = secret_refs
self._consumers = consumers
self._status = status
self._created = parse_isotime(created) if created else None
self._updated = parse_isotime(updated) if updated else None
@property
def container_ref(self):
return self._container_ref
@property
def name(self):
return self._name
@property
def type(self):
return self._type
@property
def created(self):
return self._created
@property
def updated(self):
return self._updated
@property
def status(self):
return self._status
@property
def secret_refs(self):
if self._cached_secrets:
self._secret_refs = [
{'name': sec.get('name'),
'secret_ref': sec.get('secret').secret_ref}
for sec in self._cached_secrets
]
return self._secret_refs
def _fill_secrets(self):
if self._secret_refs:
self._cached_secrets = [
{
'name': s.get('name').lower(),
'secret': self._api.secrets.Secret(
secret_ref=s.get('secret_ref'))
} for s in self._secret_refs
]
@property
def secrets(self, cache=True):
if not self._cached_secrets or not cache:
self._fill_secrets()
return self._cached_secrets
@property
def consumers(self):
return self._consumers
@name.setter
@immutable_after_save
def name(self, value):
self._name = value
@type.setter
@immutable_after_save
def type(self, value):
self._type = value
@immutable_after_save
def add(self, name, sec):
if not isinstance(sec, secrets.Secret):
raise ValueError("Must provide a valid Secret object")
self._cached_secrets.append({'name': name.lower(), 'secret': sec})
def _get_secrets_and_store_them_if_necessary(self):
# Save all secrets if they are not yet saved
LOG.debug("Storing secrets: {0}".format(self.secrets))
secret_refs = []
for s in self.secrets:
sec = s.get('secret')
if sec and not sec.secret_ref:
sec.store()
secret_refs.append({'name': s.get('name'),
'secret_ref': sec.secret_ref})
return secret_refs
@immutable_after_save
def save(self):
secret_refs = self._get_secrets_and_store_them_if_necessary()
container_dict = base.filter_empty_keys({
'name': self.name,
'type': self.type,
'secret_refs': secret_refs
})
LOG.debug("Request body: {0}".format(container_dict))
# Save, store container_ref and return
response = self._api.post(self.entity, container_dict)
if response:
self._container_ref = response['container_ref']
return self.container_ref
def _get_named_secret(self, name):
for s in self.secrets:
if s.get('name') == name.lower():
return s.get('secret')
return None
def __str__(self):
column_names, data = self._get_formatted_entity(self)
table = prettytable.PrettyTable(field_names=('Field', 'Value'),
print_empty=False)
table.padding_width = 1
# Align all columns left
table.align['Field'] = 'l'
table.align['Value'] = 'l'
for name, value in zip(column_names, data):
table.add_row((name, value))
return table.get_string(fields=('Field', 'Value'))
def __repr__(self):
return 'Container(name="{0}")'.format(self.name)
class RSAContainer(RSAContainerFormatter, Container):
def __init__(self, api, **kwargs):
self._required_secrets = ["public_key", "private_key"]
self._optional_secrets = ["private_key_passphrase"]
super(RSAContainer, self).__init__(api=api, **kwargs)
@property
def public_key(self):
return self._get_named_secret("public_key")
@property
def private_key(self):
return self._get_named_secret("private_key")
@property
def private_key_passphrase(self):
return self._get_named_secret("private_key_passphrase")
@public_key.setter
@immutable_after_save
def public_key(self, value):
super(RSAContainer, self).add("public_key", value)
@private_key.setter
@immutable_after_save
def private_key(self, value):
super(RSAContainer, self).add("private_key", value)
@private_key_passphrase.setter
@immutable_after_save
def private_key_passphrase(self, value):
super(RSAContainer, self).add("private_key_passphrase", value)
def add(self, name, sec):
raise NotImplementedError("`add()` is not implemented for "
"Typed Containers")
def __repr__(self):
return 'RSAContainer(name="{0}")'.format(self.name)
class CertificateContainer(CertificateContainerFormatter, Container):
def __init__(self, api, **kwargs):
self._required_secrets = ["certificate", "private_key"]
self._optional_secrets = ["private_key_passphrase", "intermediates"]
super(CertificateContainer, self).__init__(api=api, **kwargs)
@property
def certificate(self):
return self._get_named_secret("certificate")
@property
def private_key(self):
return self._get_named_secret("private_key")
@property
def private_key_passphrase(self):
return self._get_named_secret("private_key_passphrase")
@property
def intermediates(self):
return self._get_named_secret("intermediates")
@certificate.setter
@immutable_after_save
def certificate(self, value):
super(CertificateContainer, self).add("certificate", value)
@private_key.setter
@immutable_after_save
def private_key(self, value):
super(CertificateContainer, self).add("private_key", value)
@private_key_passphrase.setter
@immutable_after_save
def private_key_passphrase(self, value):
super(CertificateContainer, self).add("private_key_passphrase", value)
@intermediates.setter
@immutable_after_save
def intermediates(self, value):
super(CertificateContainer, self).add("intermediates", value)
def add(self, name, sec):
raise NotImplementedError("`add()` is not implemented for "
"Typed Containers")
def __repr__(self):
return 'CertificateContainer(name="{0}")'.format(self.name)
class ContainerManager(base.BaseEntityManager):
_container_map = {
"generic": Container,
"rsa": RSAContainer,
"certificate": CertificateContainer
}
def __init__(self, api):
super(ContainerManager, self).__init__(api, 'containers')
def Container(self, container_ref=None, name=None, type=None,
secret_refs=None):
"""
Factory method that either retrieves a Container from Barbican if
given a container_ref, or creates a new Container if not, and returns
the appropriately typed Container object.
:param container_ref: If provided, will do a Container GET in Barbican
:param name: Name to use if creating a new Container
:param type: Type of Container to return if creating a new Container
:param secret_refs: Secret_refs to populate if creating a new Container
:returns: Container object (possibly of a specific type)
"""
if container_ref:
LOG.debug('Getting container - Container href: {0}'
.format(container_ref))
base.validate_ref(container_ref, 'Container')
response = self.api.get(container_ref)
resp_type = response.get('type')
if type and resp_type and resp_type.lower() != type.lower():
raise TypeError('Container type "{0}" does not match expected '
'type "{1}".'.format(resp_type, type))
container_type = self._container_map.get(resp_type)
if not container_type:
return TypeError('Unknown container type "{0}."'
.format(resp_type))
return container_type(api=self.api, **response)
if not type:
type = 'generic'
container_type = self._container_map.get(type.lower())
if container_type:
return container_type(api=self.api, name=name, type=type,
secret_refs=secret_refs)
raise ValueError("Invalid Container type specified. Valid types: {0}"
.format(', '.join((self._container_map.keys()))))
def RSAContainer(self, container_ref=None, name=None):
if container_ref:
return self.Container(container_ref=container_ref, name=name,
type='rsa')
return RSAContainer(api=self.api, type='rsa', name=name)
def CertificateContainer(self, container_ref=None, name=None):
if container_ref:
return self.Container(container_ref=container_ref, name=name,
type='certificate')
return CertificateContainer(api=self.api, type='certificate',
name=name)
def delete(self, container_ref):
"""
Deletes a container
:param container_ref: The href for the container
"""
if not container_ref:
raise ValueError('container_ref is required.')
self.api.delete(container_ref)
def list(self, limit=10, offset=0, name=None, type=None):
"""
List all containers for the tenant
:param limit: Max number of containers returned
:param offset: Offset containers to begin list
:param name: Name filter for the list
:param type: Type filter for the list
:returns: list of Container metadata objects
"""
LOG.debug('Listing containers - offset {0} limit {1} name {2} type {3}'
.format(offset, limit, name, type))
href = '{0}/{1}'.format(self.api.base_url, self.entity)
params = {'limit': limit, 'offset': offset}
if name:
params['name'] = name
if name:
params['type'] = type
response = self.api.get(href, params)
return [self._container_map[s.get('type')](api=self.api, **s)
for s in response.get('containers', [])]
def register_consumer(self, container_ref, name, URL):
"""
Add a consumer to the container
:param container_ref: The href for the container
:param name: Name of the consuming service
:param URL: URL of the consuming resource
:returns: A container object per the get() method
"""
LOG.debug('Creating consumer registration for container '
'{0} as {1}: {2}'.format(container_ref, name, URL))
href = '{0}/{1}/consumers'.format(self.entity,
container_ref.split('/')[-1])
consumer_dict = dict()
consumer_dict['name'] = name
consumer_dict['URL'] = URL
response = self.api.post(href, consumer_dict)
return self._container_map[response.get('type')](api=self.api,
**response)
def remove_consumer(self, container_ref, name, URL):
"""
Remove a consumer from the container
:param container_ref: The href for the container
:param name: Name of the previously consuming service
:param URL: URL of the previously consuming resource
"""
LOG.debug('Deleting consumer registration for container '
'{0} as {1}: {2}'.format(container_ref, name, URL))
href = '{0}/{1}/{2}/consumers'.format(self.api.base_url, self.entity,
container_ref.split('/')[-1])
consumer_dict = {
'name': name,
'URL': URL
}
self.api.delete(href, json=consumer_dict)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment