Skip to content

Instantly share code, notes, and snippets.

@rm-you
Last active August 29, 2015 14:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rm-you/5974777f0b49c9239c45 to your computer and use it in GitHub Desktop.
Save rm-you/5974777f0b49c9239c45 to your computer and use it in GitHub Desktop.
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import re
import six
from barbicanclient import base
from barbicanclient.openstack.common.timeutils import parse_isotime
LOG = logging.getLogger(__name__)
class ImmutableException(Exception):
def __init__(self, attribute=None):
message = "Secrets are immutable!"
super(ImmutableException, self).__init__(message)
class Secret(object):
"""
Secrets are used to keep track of the data stored in Barbican.
"""
entity = 'secrets'
def __init__(self, api=None, name=None, expiration=None, algorithm=None,
bit_length=None, mode=None, payload=None,
payload_content_type=None, payload_content_encoding=None,
secret_ref=None, created=None, updated=None, from_list=None,
content_types=None, status='INACTIVE'):
self._api = api
self._secret_ref = secret_ref
if secret_ref and not from_list:
try:
url = six.moves.urllib.parse.urlparse(secret_ref)
parts = url.path.rstrip('/').split('/')
re_uuid = re.compile(r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-'
'[0-9a-f]{4}-[0-9a-f]{12}', re.I)
if not re_uuid.findall(parts[-1]):
raise ValueError('secret uuid format error.')
except:
raise ValueError('secret incorrectly specified.')
resp = self._api.get(secret_ref)
self._name = resp['name']
self._algorithm = resp['algorithm']
self._bit_length = resp['bit_length']
self._mode = resp['mode']
if 'expiration' in resp and resp['expiration']:
self._expiration = parse_isotime(resp['expiration'])
else:
self._expiration = None
if 'created' in resp and resp['created']:
self._created = parse_isotime(resp['created'])
else:
self._created = None
if 'updated' in resp and resp['updated']:
self._updated = parse_isotime(resp['updated'])
else:
self._updated = None
self._status = resp['status']
self._payload = None
self._payload_content_encoding = None
if 'content_types' in resp and resp['content_types']:
self._content_types = resp['content_types']
self._payload_content_type = self._content_types.values()[0]
else:
self._content_types = None
self._payload_content_type = None
else:
self._name = name
self._algorithm = algorithm
self._bit_length = bit_length
self._mode = mode
self._payload = payload
if content_types:
self._payload_content_type = content_types.values()[0]
else:
self._payload_content_type = payload_content_type
self._payload_content_encoding = payload_content_encoding
self._expiration = parse_isotime(expiration) if expiration else None
self._created = parse_isotime(created) if created else None
self._updated = parse_isotime(updated) if updated else None
self._content_types = content_types
self._status = status
@property
def secret_ref(self):
return self._secret_ref
@property
def name(self):
return self._name
@property
def expiration(self):
return self._expiration
@property
def algorithm(self):
return self._algorithm
@property
def bit_length(self):
return self._bit_length
@property
def mode(self):
return self._mode
@property
def payload(self):
if not self._payload:
self._fetch_payload()
return self._payload
@property
def payload_content_type(self):
return self._payload_content_type
@property
def payload_content_encoding(self):
return self._payload_content_encoding
@property
def created(self):
return self._created
@property
def content_types(self):
if self._content_types:
return self._content_types
return {u'default': self.payload_content_type}
@property
def status(self):
return self._status
@name.setter
def name(self, value):
self._check_save()
self._name = value
@expiration.setter
def expiration(self, value):
self._check_save()
self._expiration = value
@algorithm.setter
def algorithm(self, value):
self._check_save()
self._algorithm = value
@bit_length.setter
def bit_length(self, value):
self._check_save()
self._bit_length = value
@mode.setter
def mode(self, value):
self._check_save()
self._mode = value
@payload.setter
def payload(self, value):
self._check_save()
self._payload = value
@payload_content_type.setter
def payload_content_type(self, value):
self._check_save()
self._payload_content_type = value
@payload_content_encoding.setter
def payload_content_encoding(self, value):
self._check_save()
self._payload_content_encoding = value
def _check_save(self):
if self._secret_ref:
raise ImmutableException()
def _fetch_payload(self):
if self._content_types is None:
raise ValueError('Secret has no encrypted data to decrypt.')
if 'default' not in self._content_types:
raise ValueError("Must specify decrypt content-type as "
"secret does not specify a 'default' "
"content-type.")
content_type = self._content_types['default']
headers = {'Accept': content_type}
self._payload = self._api.get_raw(self._secret_ref, headers)
def save(self):
self._check_save()
secret_dict = dict()
secret_dict['name'] = self.name
secret_dict['payload'] = self.payload
secret_dict['payload_content_type'] = self.payload_content_type
secret_dict['payload_content_encoding'] = self.payload_content_encoding
secret_dict['algorithm'] = self.algorithm
secret_dict['mode'] = self.mode
secret_dict['bit_length'] = self.bit_length
secret_dict['expiration'] = self.expiration
base.BaseEntityManager.remove_empty_keys(secret_dict)
LOG.debug("Request body: {0}".format(secret_dict))
resp = self._api.post(self.entity, secret_dict)
# Save, store secret_ref and return
if resp:
self._secret_ref = resp['secret_ref']
return self.secret_ref
def __str__(self):
return ("Secret:\n"
" href: {0}\n"
" name: {1}\n"
" created: {2}\n"
" status: {3}\n"
" content types: {4}\n"
" algorithm: {5}\n"
" bit length: {6}\n"
" mode: {7}\n"
" expiration: {8}\n"
.format(self.secret_ref, self.name, self.created,
self.status, self.content_types, self.algorithm,
self.bit_length, self.mode, self.expiration)
)
def __repr__(self):
return 'Secret(name="{0}")'.format(self.name)
class SecretManager(base.BaseEntityManager):
def __init__(self, api):
super(SecretManager, self).__init__(api, 'secrets')
def Secret(self, **kwargs):
return Secret(api=self.api, **kwargs)
def get(self, secret_ref):
"""
Returns a Secret object with metadata about the secret.
:param secret_ref: The href for the secret
"""
return Secret(api=self.api, secret_ref=secret_ref)
def list(self, limit=10, offset=0, name=None, algorithm=None,
mode=None, bits=0):
"""
List all secrets for the tenant
:param limit: Max number of secrets returned
:param offset: Offset secrets to begin list
:returns: list of Secret metadata objects
"""
LOG.debug('Listing secrets - offset {0} limit {1}'.format(offset,
limit))
href = '{0}/{1}'.format(self.api.base_url, self.entity)
params = {'limit': limit, 'offset': offset}
if name:
params['name'] = name
if algorithm:
params['alg'] = algorithm
if mode:
params['mode'] = mode
if bits > 0:
params['bits'] = bits
resp = self.api.get(href, params)
return [Secret(api=self.api, **s) for s in resp['secrets']]
def delete(self, secret_ref):
"""
Deletes a secret
:param secret_ref: The href for the secret
"""
if not secret_ref:
raise ValueError('secret_ref is required.')
self.api.delete(secret_ref)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment