Created
April 22, 2017 03:37
-
-
Save akrzos/9d841feff51050c12913faf39634383a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- encoding: utf-8 -*- | |
# | |
# Copyright © 2016 Red Hat, Inc. | |
# Copyright © 2014-2015 eNovance | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); you may | |
# not use this file except in compliance with the License. You may obtain | |
# a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | |
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | |
# License for the specific language governing permissions and limitations | |
# under the License. | |
import functools | |
import itertools | |
import uuid | |
from concurrent import futures | |
import jsonpatch | |
from oslo_utils import dictutils | |
from oslo_utils import strutils | |
import pecan | |
from pecan import rest | |
import pyparsing | |
import six | |
from six.moves.urllib import parse as urllib_parse | |
from stevedore import extension | |
import voluptuous | |
import webob.exc | |
import werkzeug.http | |
from gnocchi import aggregates | |
from gnocchi import archive_policy | |
from gnocchi import indexer | |
from gnocchi import json | |
from gnocchi import resource_type | |
from gnocchi import storage | |
from gnocchi.storage import incoming | |
from gnocchi import utils | |
def arg_to_list(value): | |
if isinstance(value, list): | |
return value | |
elif value: | |
return [value] | |
return [] | |
def abort(status_code, detail='', headers=None, comment=None, **kw): | |
"""Like pecan.abort, but make sure detail is a string.""" | |
if status_code == 404 and not detail: | |
raise RuntimeError("http code 404 must have 'detail' set") | |
if isinstance(detail, Exception): | |
detail = six.text_type(detail) | |
return pecan.abort(status_code, detail, headers, comment, **kw) | |
def enforce(rule, target): | |
"""Return the user and project the request should be limited to. | |
:param rule: The rule name | |
:param target: The target to enforce on. | |
""" | |
creds = pecan.request.auth_helper.get_auth_info(pecan.request.headers) | |
if not isinstance(target, dict): | |
if hasattr(target, "jsonify"): | |
target = target.jsonify() | |
else: | |
target = target.__dict__ | |
# Flatten dict | |
target = dict(dictutils.flatten_dict_to_keypairs(d=target, separator='.')) | |
if not pecan.request.policy_enforcer.enforce(rule, target, creds): | |
abort(403) | |
def set_resp_location_hdr(location): | |
location = '%s%s' % (pecan.request.script_name, location) | |
# NOTE(sileht): according the pep-3333 the headers must be | |
# str in py2 and py3 even this is not the same thing in both | |
# version | |
# see: http://legacy.python.org/dev/peps/pep-3333/#unicode-issues | |
if six.PY2 and isinstance(location, six.text_type): | |
location = location.encode('utf-8') | |
location = urllib_parse.quote(location) | |
pecan.response.headers['Location'] = location | |
def deserialize(expected_content_types=None): | |
if expected_content_types is None: | |
expected_content_types = ("application/json", ) | |
mime_type, options = werkzeug.http.parse_options_header( | |
pecan.request.headers.get('Content-Type')) | |
if mime_type not in expected_content_types: | |
abort(415) | |
try: | |
params = json.load(pecan.request.body_file) | |
except Exception as e: | |
abort(400, "Unable to decode body: " + six.text_type(e)) | |
return params | |
def deserialize_and_validate(schema, required=True, | |
expected_content_types=None): | |
try: | |
return voluptuous.Schema(schema, required=required)( | |
deserialize(expected_content_types=expected_content_types)) | |
except voluptuous.Error as e: | |
abort(400, "Invalid input: %s" % e) | |
def PositiveOrNullInt(value): | |
value = int(value) | |
if value < 0: | |
raise ValueError("Value must be positive") | |
return value | |
def PositiveNotNullInt(value): | |
value = int(value) | |
if value <= 0: | |
raise ValueError("Value must be positive and not null") | |
return value | |
def Timespan(value): | |
return utils.to_timespan(value).total_seconds() | |
def get_header_option(name, params): | |
type, options = werkzeug.http.parse_options_header( | |
pecan.request.headers.get('Accept')) | |
try: | |
return strutils.bool_from_string( | |
options.get(name, params.pop(name, 'false')), | |
strict=True) | |
except ValueError as e: | |
method = 'Accept' if name in options else 'query' | |
abort( | |
400, | |
"Unable to parse %s value in %s: %s" | |
% (name, method, six.text_type(e))) | |
def get_history(params): | |
return get_header_option('history', params) | |
def get_details(params): | |
return get_header_option('details', params) | |
RESOURCE_DEFAULT_PAGINATION = ['revision_start:asc', | |
'started_at:asc'] | |
METRIC_DEFAULT_PAGINATION = ['id:asc'] | |
THREADS = utils.get_default_workers() | |
def get_pagination_options(params, default): | |
max_limit = pecan.request.conf.api.max_limit | |
limit = params.get('limit', max_limit) | |
marker = params.get('marker') | |
sorts = params.get('sort', default) | |
if not isinstance(sorts, list): | |
sorts = [sorts] | |
try: | |
limit = PositiveNotNullInt(limit) | |
except ValueError: | |
abort(400, "Invalid 'limit' value: %s" % params.get('limit')) | |
limit = min(limit, max_limit) | |
return {'limit': limit, | |
'marker': marker, | |
'sorts': sorts} | |
def ValidAggMethod(value): | |
value = six.text_type(value) | |
if value in archive_policy.ArchivePolicy.VALID_AGGREGATION_METHODS_VALUES: | |
return value | |
raise ValueError("Invalid aggregation method") | |
class ArchivePolicyController(rest.RestController): | |
def __init__(self, archive_policy): | |
self.archive_policy = archive_policy | |
@pecan.expose('json') | |
def get(self): | |
ap = pecan.request.indexer.get_archive_policy(self.archive_policy) | |
if ap: | |
enforce("get archive policy", ap) | |
return ap | |
abort(404, indexer.NoSuchArchivePolicy(self.archive_policy)) | |
@pecan.expose('json') | |
def patch(self): | |
ap = pecan.request.indexer.get_archive_policy(self.archive_policy) | |
if not ap: | |
abort(404, indexer.NoSuchArchivePolicy(self.archive_policy)) | |
enforce("update archive policy", ap) | |
body = deserialize_and_validate(voluptuous.Schema({ | |
voluptuous.Required("definition"): | |
voluptuous.All([{ | |
"granularity": Timespan, | |
"points": PositiveNotNullInt, | |
"timespan": Timespan}], voluptuous.Length(min=1)), | |
})) | |
# Validate the data | |
try: | |
ap_items = [archive_policy.ArchivePolicyItem(**item) for item in | |
body['definition']] | |
except ValueError as e: | |
abort(400, e) | |
try: | |
return pecan.request.indexer.update_archive_policy( | |
self.archive_policy, ap_items) | |
except indexer.UnsupportedArchivePolicyChange as e: | |
abort(400, e) | |
@pecan.expose() | |
def delete(self): | |
# NOTE(jd) I don't think there's any point in fetching and passing the | |
# archive policy here, as the rule is probably checking the actual role | |
# of the user, not the content of the AP. | |
enforce("delete archive policy", {}) | |
try: | |
pecan.request.indexer.delete_archive_policy(self.archive_policy) | |
except indexer.NoSuchArchivePolicy as e: | |
abort(404, e) | |
except indexer.ArchivePolicyInUse as e: | |
abort(400, e) | |
class ArchivePoliciesController(rest.RestController): | |
@pecan.expose() | |
def _lookup(self, archive_policy, *remainder): | |
return ArchivePolicyController(archive_policy), remainder | |
@pecan.expose('json') | |
def post(self): | |
# NOTE(jd): Initialize this one at run-time because we rely on conf | |
conf = pecan.request.conf | |
enforce("create archive policy", {}) | |
ArchivePolicySchema = voluptuous.Schema({ | |
voluptuous.Required("name"): six.text_type, | |
voluptuous.Required("back_window", default=0): PositiveOrNullInt, | |
voluptuous.Required( | |
"aggregation_methods", | |
default=set(conf.archive_policy.default_aggregation_methods)): | |
[ValidAggMethod], | |
voluptuous.Required("definition"): | |
voluptuous.All([{ | |
"granularity": Timespan, | |
"points": PositiveNotNullInt, | |
"timespan": Timespan, | |
}], voluptuous.Length(min=1)), | |
}) | |
body = deserialize_and_validate(ArchivePolicySchema) | |
# Validate the data | |
try: | |
ap = archive_policy.ArchivePolicy.from_dict(body) | |
except ValueError as e: | |
abort(400, e) | |
enforce("create archive policy", ap) | |
try: | |
ap = pecan.request.indexer.create_archive_policy(ap) | |
except indexer.ArchivePolicyAlreadyExists as e: | |
abort(409, e) | |
location = "/archive_policy/" + ap.name | |
set_resp_location_hdr(location) | |
pecan.response.status = 201 | |
return ap | |
@pecan.expose('json') | |
def get_all(self): | |
enforce("list archive policy", {}) | |
return pecan.request.indexer.list_archive_policies() | |
class ArchivePolicyRulesController(rest.RestController): | |
@pecan.expose('json') | |
def post(self): | |
enforce("create archive policy rule", {}) | |
ArchivePolicyRuleSchema = voluptuous.Schema({ | |
voluptuous.Required("name"): six.text_type, | |
voluptuous.Required("metric_pattern"): six.text_type, | |
voluptuous.Required("archive_policy_name"): six.text_type, | |
}) | |
body = deserialize_and_validate(ArchivePolicyRuleSchema) | |
enforce("create archive policy rule", body) | |
try: | |
ap = pecan.request.indexer.create_archive_policy_rule( | |
body['name'], body['metric_pattern'], | |
body['archive_policy_name'] | |
) | |
except indexer.ArchivePolicyRuleAlreadyExists as e: | |
abort(409, e) | |
location = "/archive_policy_rule/" + ap.name | |
set_resp_location_hdr(location) | |
pecan.response.status = 201 | |
return ap | |
@pecan.expose('json') | |
def get_one(self, name): | |
ap = pecan.request.indexer.get_archive_policy_rule(name) | |
if ap: | |
enforce("get archive policy rule", ap) | |
return ap | |
abort(404, indexer.NoSuchArchivePolicyRule(name)) | |
@pecan.expose('json') | |
def get_all(self): | |
enforce("list archive policy rule", {}) | |
return pecan.request.indexer.list_archive_policy_rules() | |
@pecan.expose() | |
def delete(self, name): | |
# NOTE(jd) I don't think there's any point in fetching and passing the | |
# archive policy rule here, as the rule is probably checking the actual | |
# role of the user, not the content of the AP rule. | |
enforce("delete archive policy rule", {}) | |
try: | |
pecan.request.indexer.delete_archive_policy_rule(name) | |
except indexer.NoSuchArchivePolicyRule as e: | |
abort(404, e) | |
except indexer.ArchivePolicyRuleInUse as e: | |
abort(400, e) | |
def MeasuresListSchema(measures): | |
try: | |
times = utils.to_timestamps((m['timestamp'] for m in measures)) | |
except TypeError: | |
abort(400, "Invalid format for measures") | |
except ValueError as e: | |
abort(400, "Invalid input for timestamp: %s" % e) | |
try: | |
values = [float(i['value']) for i in measures] | |
except Exception: | |
abort(400, "Invalid input for a value") | |
return (storage.Measure(t, v) for t, v in six.moves.zip( | |
times.tolist(), values)) | |
class MetricController(rest.RestController): | |
_custom_actions = { | |
'measures': ['POST', 'GET'] | |
} | |
def __init__(self, metric): | |
self.metric = metric | |
mgr = extension.ExtensionManager(namespace='gnocchi.aggregates', | |
invoke_on_load=True) | |
self.custom_agg = dict((x.name, x.obj) for x in mgr) | |
def enforce_metric(self, rule): | |
enforce(rule, json.to_primitive(self.metric)) | |
@pecan.expose('json') | |
def get_all(self): | |
self.enforce_metric("get metric") | |
return self.metric | |
@pecan.expose() | |
def post_measures(self): | |
self.enforce_metric("post measures") | |
params = deserialize() | |
if not isinstance(params, list): | |
abort(400, "Invalid input for measures") | |
if params: | |
pecan.request.storage.incoming.add_measures( | |
self.metric, MeasuresListSchema(params)) | |
pecan.response.status = 202 | |
@pecan.expose('json') | |
def get_measures(self, start=None, stop=None, aggregation='mean', | |
granularity=None, resample=None, refresh=False, **param): | |
self.enforce_metric("get measures") | |
if not (aggregation | |
in archive_policy.ArchivePolicy.VALID_AGGREGATION_METHODS | |
or aggregation in self.custom_agg): | |
msg = '''Invalid aggregation value %(agg)s, must be one of %(std)s | |
or %(custom)s''' | |
abort(400, msg % dict( | |
agg=aggregation, | |
std=archive_policy.ArchivePolicy.VALID_AGGREGATION_METHODS, | |
custom=str(self.custom_agg.keys()))) | |
if start is not None: | |
try: | |
start = utils.to_datetime(start) | |
except Exception: | |
abort(400, "Invalid value for start") | |
if stop is not None: | |
try: | |
stop = utils.to_datetime(stop) | |
except Exception: | |
abort(400, "Invalid value for stop") | |
if resample: | |
if not granularity: | |
abort(400, 'A granularity must be specified to resample') | |
try: | |
resample = Timespan(resample) | |
except ValueError as e: | |
abort(400, e) | |
if strutils.bool_from_string(refresh): | |
pecan.request.storage.process_new_measures( | |
pecan.request.indexer, [six.text_type(self.metric.id)], True) | |
try: | |
if aggregation in self.custom_agg: | |
measures = self.custom_agg[aggregation].compute( | |
pecan.request.storage, self.metric, | |
start, stop, **param) | |
else: | |
measures = pecan.request.storage.get_measures( | |
self.metric, start, stop, aggregation, | |
Timespan(granularity) if granularity is not None else None, | |
resample) | |
# Replace timestamp keys by their string versions | |
return [(timestamp.isoformat(), offset, v) | |
for timestamp, offset, v in measures] | |
except (storage.MetricDoesNotExist, | |
storage.GranularityDoesNotExist, | |
storage.AggregationDoesNotExist) as e: | |
abort(404, e) | |
except aggregates.CustomAggFailure as e: | |
abort(400, e) | |
@pecan.expose() | |
def delete(self): | |
self.enforce_metric("delete metric") | |
try: | |
pecan.request.indexer.delete_metric(self.metric.id) | |
except indexer.NoSuchMetric as e: | |
abort(404, e) | |
class MetricsController(rest.RestController): | |
@pecan.expose() | |
def _lookup(self, id, *remainder): | |
try: | |
metric_id = uuid.UUID(id) | |
except ValueError: | |
abort(404, indexer.NoSuchMetric(id)) | |
metrics = pecan.request.indexer.list_metrics( | |
id=metric_id, details=True) | |
if not metrics: | |
abort(404, indexer.NoSuchMetric(id)) | |
return MetricController(metrics[0]), remainder | |
_MetricSchema = voluptuous.Schema({ | |
"archive_policy_name": six.text_type, | |
"name": six.text_type, | |
voluptuous.Optional("unit"): | |
voluptuous.All(six.text_type, voluptuous.Length(max=31)), | |
}) | |
# NOTE(jd) Define this method as it was a voluptuous schema – it's just a | |
# smarter version of a voluptuous schema, no? | |
@classmethod | |
def MetricSchema(cls, definition): | |
# First basic validation | |
definition = cls._MetricSchema(definition) | |
archive_policy_name = definition.get('archive_policy_name') | |
name = definition.get('name') | |
if name and '/' in name: | |
abort(400, "'/' is not supported in metric name") | |
if archive_policy_name is None: | |
try: | |
ap = pecan.request.indexer.get_archive_policy_for_metric(name) | |
except indexer.NoArchivePolicyRuleMatch: | |
# NOTE(jd) Since this is a schema-like function, we | |
# should/could raise ValueError, but if we do so, voluptuous | |
# just returns a "invalid value" with no useful message – so we | |
# prefer to use abort() to make sure the user has the right | |
# error message | |
abort(400, "No archive policy name specified " | |
"and no archive policy rule found matching " | |
"the metric name %s" % name) | |
else: | |
definition['archive_policy_name'] = ap.name | |
creator = pecan.request.auth_helper.get_current_user( | |
pecan.request.headers) | |
enforce("create metric", { | |
"creator": creator, | |
"archive_policy_name": archive_policy_name, | |
"name": name, | |
"unit": definition.get('unit'), | |
}) | |
return definition | |
@pecan.expose('json') | |
def post(self): | |
creator = pecan.request.auth_helper.get_current_user( | |
pecan.request.headers) | |
body = deserialize_and_validate(self.MetricSchema) | |
try: | |
m = pecan.request.indexer.create_metric( | |
uuid.uuid4(), | |
creator, | |
name=body.get('name'), | |
unit=body.get('unit'), | |
archive_policy_name=body['archive_policy_name']) | |
except indexer.NoSuchArchivePolicy as e: | |
abort(400, e) | |
set_resp_location_hdr("/metric/" + str(m.id)) | |
pecan.response.status = 201 | |
return m | |
@staticmethod | |
@pecan.expose('json') | |
def get_all(**kwargs): | |
# Compat with old user/project API | |
provided_user_id = kwargs.get('user_id') | |
provided_project_id = kwargs.get('project_id') | |
if provided_user_id is None and provided_project_id is None: | |
provided_creator = kwargs.get('creator') | |
else: | |
provided_creator = ( | |
(provided_user_id or "") | |
+ ":" | |
+ (provided_project_id or "") | |
) | |
try: | |
enforce("list all metric", {}) | |
except webob.exc.HTTPForbidden: | |
enforce("list metric", {}) | |
creator = pecan.request.auth_helper.get_current_user( | |
pecan.request.headers) | |
if provided_creator and creator != provided_creator: | |
abort(403, "Insufficient privileges to filter by user/project") | |
attr_filter = {} | |
if provided_creator is not None: | |
attr_filter['creator'] = provided_creator | |
attr_filter.update(get_pagination_options( | |
kwargs, METRIC_DEFAULT_PAGINATION)) | |
try: | |
return pecan.request.indexer.list_metrics(**attr_filter) | |
except indexer.IndexerException as e: | |
abort(400, e) | |
_MetricsSchema = voluptuous.Schema({ | |
six.text_type: voluptuous.Any(utils.UUID, | |
MetricsController.MetricSchema), | |
}) | |
def MetricsSchema(data): | |
# NOTE(jd) Before doing any kind of validation, copy the metric name | |
# into the metric definition. This is required so we have the name | |
# available when doing the metric validation with its own MetricSchema, | |
# and so we can do things such as applying archive policy rules. | |
if isinstance(data, dict): | |
for metric_name, metric_def in six.iteritems(data): | |
if isinstance(metric_def, dict): | |
metric_def['name'] = metric_name | |
return _MetricsSchema(data) | |
class NamedMetricController(rest.RestController): | |
def __init__(self, resource_id, resource_type): | |
self.resource_id = resource_id | |
self.resource_type = resource_type | |
@pecan.expose() | |
def _lookup(self, name, *remainder): | |
details = True if pecan.request.method == 'GET' else False | |
m = pecan.request.indexer.list_metrics(details=details, | |
name=name, | |
resource_id=self.resource_id) | |
if m: | |
return MetricController(m[0]), remainder | |
resource = pecan.request.indexer.get_resource(self.resource_type, | |
self.resource_id) | |
if resource: | |
abort(404, indexer.NoSuchMetric(name)) | |
else: | |
abort(404, indexer.NoSuchResource(self.resource_id)) | |
@pecan.expose() | |
def post(self): | |
resource = pecan.request.indexer.get_resource( | |
self.resource_type, self.resource_id) | |
if not resource: | |
abort(404, indexer.NoSuchResource(self.resource_id)) | |
enforce("update resource", resource) | |
metrics = deserialize_and_validate(MetricsSchema) | |
try: | |
pecan.request.indexer.update_resource( | |
self.resource_type, self.resource_id, metrics=metrics, | |
append_metrics=True, | |
create_revision=False) | |
except (indexer.NoSuchMetric, | |
indexer.NoSuchArchivePolicy, | |
ValueError) as e: | |
abort(400, e) | |
except indexer.NamedMetricAlreadyExists as e: | |
abort(409, e) | |
except indexer.NoSuchResource as e: | |
abort(404, e) | |
@pecan.expose('json') | |
def get_all(self): | |
resource = pecan.request.indexer.get_resource( | |
self.resource_type, self.resource_id) | |
if not resource: | |
abort(404, indexer.NoSuchResource(self.resource_id)) | |
enforce("get resource", resource) | |
return pecan.request.indexer.list_metrics(resource_id=self.resource_id) | |
class ResourceHistoryController(rest.RestController): | |
def __init__(self, resource_id, resource_type): | |
self.resource_id = resource_id | |
self.resource_type = resource_type | |
@pecan.expose('json') | |
def get(self, **kwargs): | |
details = get_details(kwargs) | |
pagination_opts = get_pagination_options( | |
kwargs, RESOURCE_DEFAULT_PAGINATION) | |
resource = pecan.request.indexer.get_resource( | |
self.resource_type, self.resource_id) | |
if not resource: | |
abort(404, indexer.NoSuchResource(self.resource_id)) | |
enforce("get resource", resource) | |
try: | |
# FIXME(sileht): next API version should returns | |
# {'resources': [...], 'links': [ ... pagination rel ...]} | |
return pecan.request.indexer.list_resources( | |
self.resource_type, | |
attribute_filter={"=": {"id": self.resource_id}}, | |
details=details, | |
history=True, | |
**pagination_opts | |
) | |
except indexer.IndexerException as e: | |
abort(400, e) | |
def etag_precondition_check(obj): | |
etag, lastmodified = obj.etag, obj.lastmodified | |
# NOTE(sileht): Checks and order come from rfc7232 | |
# in webob, the '*' and the absent of the header is handled by | |
# if_match.__contains__() and if_none_match.__contains__() | |
# and are identique... | |
if etag not in pecan.request.if_match: | |
abort(412) | |
elif (not pecan.request.environ.get("HTTP_IF_MATCH") | |
and pecan.request.if_unmodified_since | |
and pecan.request.if_unmodified_since < lastmodified): | |
abort(412) | |
if etag in pecan.request.if_none_match: | |
if pecan.request.method in ['GET', 'HEAD']: | |
abort(304) | |
else: | |
abort(412) | |
elif (not pecan.request.environ.get("HTTP_IF_NONE_MATCH") | |
and pecan.request.if_modified_since | |
and (pecan.request.if_modified_since >= | |
lastmodified) | |
and pecan.request.method in ['GET', 'HEAD']): | |
abort(304) | |
def etag_set_headers(obj): | |
pecan.response.etag = obj.etag | |
pecan.response.last_modified = obj.lastmodified | |
def AttributesPath(value): | |
if value.startswith("/attributes"): | |
return value | |
raise ValueError("Only attributes can be modified") | |
ResourceTypeJsonPatchSchema = voluptuous.Schema([{ | |
"op": voluptuous.Any("add", "remove"), | |
"path": AttributesPath, | |
voluptuous.Optional("value"): dict, | |
}]) | |
class ResourceTypeController(rest.RestController): | |
def __init__(self, name): | |
self._name = name | |
@pecan.expose('json') | |
def get(self): | |
try: | |
rt = pecan.request.indexer.get_resource_type(self._name) | |
except indexer.NoSuchResourceType as e: | |
abort(404, e) | |
enforce("get resource type", rt) | |
return rt | |
@pecan.expose('json') | |
def patch(self): | |
# NOTE(sileht): should we check for "application/json-patch+json" | |
# Content-Type ? | |
try: | |
rt = pecan.request.indexer.get_resource_type(self._name) | |
except indexer.NoSuchResourceType as e: | |
abort(404, e) | |
enforce("update resource type", rt) | |
# Ensure this is a valid jsonpatch dict | |
patch = deserialize_and_validate( | |
ResourceTypeJsonPatchSchema, | |
expected_content_types=["application/json-patch+json"]) | |
# Add new attributes to the resource type | |
rt_json_current = rt.jsonify() | |
try: | |
rt_json_next = jsonpatch.apply_patch(rt_json_current, patch) | |
except jsonpatch.JsonPatchException as e: | |
abort(400, e) | |
del rt_json_next['state'] | |
# Validate that the whole new resource_type is valid | |
schema = pecan.request.indexer.get_resource_type_schema() | |
try: | |
rt_json_next = voluptuous.Schema(schema.for_update, required=True)( | |
rt_json_next) | |
except voluptuous.Error as e: | |
abort(400, "Invalid input: %s" % e) | |
# Get only newly formatted and deleted attributes | |
add_attrs = {k: v for k, v in rt_json_next["attributes"].items() | |
if k not in rt_json_current["attributes"]} | |
del_attrs = [k for k in rt_json_current["attributes"] | |
if k not in rt_json_next["attributes"]] | |
if not add_attrs and not del_attrs: | |
# NOTE(sileht): just returns the resource, the asked changes | |
# just do nothing | |
return rt | |
try: | |
add_attrs = schema.attributes_from_dict(add_attrs) | |
except resource_type.InvalidResourceAttribute as e: | |
abort(400, "Invalid input: %s" % e) | |
try: | |
return pecan.request.indexer.update_resource_type( | |
self._name, add_attributes=add_attrs, | |
del_attributes=del_attrs) | |
except indexer.NoSuchResourceType as e: | |
abort(400, e) | |
@pecan.expose() | |
def delete(self): | |
try: | |
pecan.request.indexer.get_resource_type(self._name) | |
except indexer.NoSuchResourceType as e: | |
abort(404, e) | |
enforce("delete resource type", resource_type) | |
try: | |
pecan.request.indexer.delete_resource_type(self._name) | |
except (indexer.NoSuchResourceType, | |
indexer.ResourceTypeInUse) as e: | |
abort(400, e) | |
class ResourceTypesController(rest.RestController): | |
@pecan.expose() | |
def _lookup(self, name, *remainder): | |
return ResourceTypeController(name), remainder | |
@pecan.expose('json') | |
def post(self): | |
schema = pecan.request.indexer.get_resource_type_schema() | |
body = deserialize_and_validate(schema) | |
body["state"] = "creating" | |
try: | |
rt = schema.resource_type_from_dict(**body) | |
except resource_type.InvalidResourceAttribute as e: | |
abort(400, "Invalid input: %s" % e) | |
enforce("create resource type", body) | |
try: | |
rt = pecan.request.indexer.create_resource_type(rt) | |
except indexer.ResourceTypeAlreadyExists as e: | |
abort(409, e) | |
set_resp_location_hdr("/resource_type/" + rt.name) | |
pecan.response.status = 201 | |
return rt | |
@pecan.expose('json') | |
def get_all(self, **kwargs): | |
enforce("list resource type", {}) | |
try: | |
return pecan.request.indexer.list_resource_types() | |
except indexer.IndexerException as e: | |
abort(400, e) | |
def ResourceSchema(schema): | |
base_schema = { | |
voluptuous.Optional('started_at'): utils.to_datetime, | |
voluptuous.Optional('ended_at'): utils.to_datetime, | |
voluptuous.Optional('user_id'): voluptuous.Any(None, six.text_type), | |
voluptuous.Optional('project_id'): voluptuous.Any(None, six.text_type), | |
voluptuous.Optional('metrics'): MetricsSchema, | |
} | |
base_schema.update(schema) | |
return base_schema | |
class ResourceController(rest.RestController): | |
def __init__(self, resource_type, id): | |
self._resource_type = resource_type | |
creator = pecan.request.auth_helper.get_current_user( | |
pecan.request.headers) | |
try: | |
self.id = utils.ResourceUUID(id, creator) | |
except ValueError: | |
abort(404, indexer.NoSuchResource(id)) | |
self.metric = NamedMetricController(str(self.id), self._resource_type) | |
self.history = ResourceHistoryController(str(self.id), | |
self._resource_type) | |
@pecan.expose('json') | |
def get(self): | |
resource = pecan.request.indexer.get_resource( | |
self._resource_type, self.id, with_metrics=True) | |
if resource: | |
enforce("get resource", resource) | |
etag_precondition_check(resource) | |
etag_set_headers(resource) | |
return resource | |
abort(404, indexer.NoSuchResource(self.id)) | |
@pecan.expose('json') | |
def patch(self): | |
resource = pecan.request.indexer.get_resource( | |
self._resource_type, self.id, with_metrics=True) | |
if not resource: | |
abort(404, indexer.NoSuchResource(self.id)) | |
enforce("update resource", resource) | |
etag_precondition_check(resource) | |
body = deserialize_and_validate( | |
schema_for(self._resource_type), | |
required=False) | |
if len(body) == 0: | |
etag_set_headers(resource) | |
return resource | |
for k, v in six.iteritems(body): | |
if k != 'metrics' and getattr(resource, k) != v: | |
create_revision = True | |
break | |
else: | |
if 'metrics' not in body: | |
# No need to go further, we assume the db resource | |
# doesn't change between the get and update | |
return resource | |
create_revision = False | |
try: | |
resource = pecan.request.indexer.update_resource( | |
self._resource_type, | |
self.id, | |
create_revision=create_revision, | |
**body) | |
except (indexer.NoSuchMetric, | |
indexer.NoSuchArchivePolicy, | |
ValueError) as e: | |
abort(400, e) | |
except indexer.NoSuchResource as e: | |
abort(404, e) | |
etag_set_headers(resource) | |
return resource | |
@pecan.expose() | |
def delete(self): | |
resource = pecan.request.indexer.get_resource( | |
self._resource_type, self.id) | |
if not resource: | |
abort(404, indexer.NoSuchResource(self.id)) | |
enforce("delete resource", resource) | |
etag_precondition_check(resource) | |
try: | |
pecan.request.indexer.delete_resource(self.id) | |
except indexer.NoSuchResource as e: | |
abort(404, e) | |
def schema_for(resource_type): | |
resource_type = pecan.request.indexer.get_resource_type(resource_type) | |
return ResourceSchema(resource_type.schema) | |
def ResourceUUID(value, creator): | |
try: | |
return utils.ResourceUUID(value, creator) | |
except ValueError as e: | |
raise voluptuous.Invalid(e) | |
def ResourceID(value, creator): | |
return (six.text_type(value), ResourceUUID(value, creator)) | |
class ResourcesController(rest.RestController): | |
def __init__(self, resource_type): | |
self._resource_type = resource_type | |
@pecan.expose() | |
def _lookup(self, id, *remainder): | |
return ResourceController(self._resource_type, id), remainder | |
@pecan.expose('json') | |
def post(self): | |
# NOTE(sileht): we need to copy the dict because when change it | |
# and we don't want that next patch call have the "id" | |
schema = dict(schema_for(self._resource_type)) | |
creator = pecan.request.auth_helper.get_current_user( | |
pecan.request.headers) | |
schema["id"] = functools.partial(ResourceID, creator=creator) | |
body = deserialize_and_validate(schema) | |
body["original_resource_id"], body["id"] = body["id"] | |
target = { | |
"resource_type": self._resource_type, | |
} | |
target.update(body) | |
enforce("create resource", target) | |
rid = body['id'] | |
del body['id'] | |
try: | |
resource = pecan.request.indexer.create_resource( | |
self._resource_type, rid, creator, | |
**body) | |
except (ValueError, | |
indexer.NoSuchMetric, | |
indexer.NoSuchArchivePolicy) as e: | |
abort(400, e) | |
except indexer.ResourceAlreadyExists as e: | |
abort(409, e) | |
set_resp_location_hdr("/resource/" | |
+ self._resource_type + "/" | |
+ six.text_type(resource.id)) | |
etag_set_headers(resource) | |
pecan.response.status = 201 | |
return resource | |
@pecan.expose('json') | |
def get_all(self, **kwargs): | |
details = get_details(kwargs) | |
history = get_history(kwargs) | |
pagination_opts = get_pagination_options( | |
kwargs, RESOURCE_DEFAULT_PAGINATION) | |
policy_filter = pecan.request.auth_helper.get_resource_policy_filter( | |
pecan.request.headers, "list resource", self._resource_type) | |
try: | |
# FIXME(sileht): next API version should returns | |
# {'resources': [...], 'links': [ ... pagination rel ...]} | |
return pecan.request.indexer.list_resources( | |
self._resource_type, | |
attribute_filter=policy_filter, | |
details=details, | |
history=history, | |
**pagination_opts | |
) | |
except indexer.IndexerException as e: | |
abort(400, e) | |
@pecan.expose('json') | |
def delete(self, **kwargs): | |
# NOTE(sileht): Don't allow empty filter, this is going to delete | |
# the entire database. | |
attr_filter = deserialize_and_validate(ResourceSearchSchema) | |
# the voluptuous checks everything, but it is better to | |
# have this here. | |
if not attr_filter: | |
abort(400, "caution: the query can not be empty, or it will \ | |
delete entire database") | |
policy_filter = pecan.request.auth_helper.get_resource_policy_filter( | |
pecan.request.headers, | |
"delete resources", self._resource_type) | |
if policy_filter: | |
attr_filter = {"and": [policy_filter, attr_filter]} | |
try: | |
delete_num = pecan.request.indexer.delete_resources( | |
self._resource_type, attribute_filter=attr_filter) | |
except indexer.IndexerException as e: | |
abort(400, e) | |
return {"deleted": delete_num} | |
class ResourcesByTypeController(rest.RestController): | |
@pecan.expose('json') | |
def get_all(self): | |
return dict( | |
(rt.name, | |
pecan.request.application_url + '/resource/' + rt.name) | |
for rt in pecan.request.indexer.list_resource_types()) | |
@pecan.expose() | |
def _lookup(self, resource_type, *remainder): | |
try: | |
pecan.request.indexer.get_resource_type(resource_type) | |
except indexer.NoSuchResourceType as e: | |
abort(404, e) | |
return ResourcesController(resource_type), remainder | |
class InvalidQueryStringSearchAttrFilter(Exception): | |
def __init__(self, reason): | |
super(InvalidQueryStringSearchAttrFilter, self).__init__( | |
"Invalid filter: %s" % reason) | |
class QueryStringSearchAttrFilter(object): | |
uninary_operators = ("not", ) | |
binary_operator = (u">=", u"<=", u"!=", u">", u"<", u"=", u"==", u"eq", | |
u"ne", u"lt", u"gt", u"ge", u"le", u"in", u"like", u"≠", | |
u"≥", u"≤") | |
multiple_operators = (u"and", u"or", u"∧", u"∨") | |
operator = pyparsing.Regex(u"|".join(binary_operator)) | |
null = pyparsing.Regex("None|none|null").setParseAction( | |
pyparsing.replaceWith(None)) | |
boolean = "False|True|false|true" | |
boolean = pyparsing.Regex(boolean).setParseAction( | |
lambda t: t[0].lower() == "true") | |
hex_string = lambda n: pyparsing.Word(pyparsing.hexnums, exact=n) | |
uuid_string = pyparsing.Combine( | |
hex_string(8) + (pyparsing.Optional("-") + hex_string(4)) * 3 + | |
pyparsing.Optional("-") + hex_string(12)) | |
number = r"[+-]?\d+(:?\.\d*)?(:?[eE][+-]?\d+)?" | |
number = pyparsing.Regex(number).setParseAction(lambda t: float(t[0])) | |
identifier = pyparsing.Word(pyparsing.alphas, pyparsing.alphanums + "_") | |
quoted_string = pyparsing.QuotedString('"') | pyparsing.QuotedString("'") | |
comparison_term = pyparsing.Forward() | |
in_list = pyparsing.Group( | |
pyparsing.Suppress('[') + | |
pyparsing.Optional(pyparsing.delimitedList(comparison_term)) + | |
pyparsing.Suppress(']'))("list") | |
comparison_term << (null | boolean | uuid_string | identifier | number | | |
quoted_string | in_list) | |
condition = pyparsing.Group(comparison_term + operator + comparison_term) | |
expr = pyparsing.infixNotation(condition, [ | |
("not", 1, pyparsing.opAssoc.RIGHT, ), | |
("and", 2, pyparsing.opAssoc.LEFT, ), | |
("∧", 2, pyparsing.opAssoc.LEFT, ), | |
("or", 2, pyparsing.opAssoc.LEFT, ), | |
("∨", 2, pyparsing.opAssoc.LEFT, ), | |
]) | |
@classmethod | |
def _parsed_query2dict(cls, parsed_query): | |
result = None | |
while parsed_query: | |
part = parsed_query.pop() | |
if part in cls.binary_operator: | |
result = {part: {parsed_query.pop(): result}} | |
elif part in cls.multiple_operators: | |
if result.get(part): | |
result[part].append( | |
cls._parsed_query2dict(parsed_query.pop())) | |
else: | |
result = {part: [result]} | |
elif part in cls.uninary_operators: | |
result = {part: result} | |
elif isinstance(part, pyparsing.ParseResults): | |
kind = part.getName() | |
if kind == "list": | |
res = part.asList() | |
else: | |
res = cls._parsed_query2dict(part) | |
if result is None: | |
result = res | |
elif isinstance(result, dict): | |
list(result.values())[0].append(res) | |
else: | |
result = part | |
return result | |
@classmethod | |
def parse(cls, query): | |
try: | |
parsed_query = cls.expr.parseString(query, parseAll=True)[0] | |
except pyparsing.ParseException as e: | |
raise InvalidQueryStringSearchAttrFilter(six.text_type(e)) | |
return cls._parsed_query2dict(parsed_query) | |
def ResourceSearchSchema(v): | |
return _ResourceSearchSchema()(v) | |
def _ResourceSearchSchema(): | |
user = pecan.request.auth_helper.get_current_user( | |
pecan.request.headers) | |
_ResourceUUID = functools.partial(ResourceUUID, creator=user) | |
return voluptuous.Schema( | |
voluptuous.All( | |
voluptuous.Length(min=0, max=1), | |
{ | |
voluptuous.Any( | |
u"=", u"==", u"eq", | |
u"<", u"lt", | |
u">", u"gt", | |
u"<=", u"≤", u"le", | |
u">=", u"≥", u"ge", | |
u"!=", u"≠", u"ne", | |
u"in", | |
u"like", | |
): voluptuous.All( | |
voluptuous.Length(min=1, max=1), | |
voluptuous.Any( | |
{"id": voluptuous.Any( | |
[_ResourceUUID], _ResourceUUID), | |
voluptuous.Extra: voluptuous.Extra})), | |
voluptuous.Any( | |
u"and", u"∨", | |
u"or", u"∧", | |
u"not", | |
): voluptuous.All( | |
[ResourceSearchSchema], voluptuous.Length(min=1) | |
) | |
} | |
) | |
) | |
class SearchResourceTypeController(rest.RestController): | |
def __init__(self, resource_type): | |
self._resource_type = resource_type | |
@staticmethod | |
def parse_and_validate_qs_filter(query): | |
try: | |
attr_filter = QueryStringSearchAttrFilter.parse(query) | |
except InvalidQueryStringSearchAttrFilter as e: | |
raise abort(400, e) | |
return voluptuous.Schema(ResourceSearchSchema, | |
required=True)(attr_filter) | |
def _search(self, **kwargs): | |
if pecan.request.body: | |
attr_filter = deserialize_and_validate(ResourceSearchSchema) | |
elif kwargs.get("filter"): | |
attr_filter = self.parse_and_validate_qs_filter(kwargs["filter"]) | |
else: | |
attr_filter = None | |
details = get_details(kwargs) | |
history = get_history(kwargs) | |
pagination_opts = get_pagination_options( | |
kwargs, RESOURCE_DEFAULT_PAGINATION) | |
policy_filter = pecan.request.auth_helper.get_resource_policy_filter( | |
pecan.request.headers, "search resource", self._resource_type) | |
if policy_filter: | |
if attr_filter: | |
attr_filter = {"and": [ | |
policy_filter, | |
attr_filter | |
]} | |
else: | |
attr_filter = policy_filter | |
return pecan.request.indexer.list_resources( | |
self._resource_type, | |
attribute_filter=attr_filter, | |
details=details, | |
history=history, | |
**pagination_opts) | |
@pecan.expose('json') | |
def post(self, **kwargs): | |
try: | |
return self._search(**kwargs) | |
except indexer.IndexerException as e: | |
abort(400, e) | |
class SearchResourceController(rest.RestController): | |
@pecan.expose() | |
def _lookup(self, resource_type, *remainder): | |
try: | |
pecan.request.indexer.get_resource_type(resource_type) | |
except indexer.NoSuchResourceType as e: | |
abort(404, e) | |
return SearchResourceTypeController(resource_type), remainder | |
def _MetricSearchSchema(v): | |
"""Helper method to indirect the recursivity of the search schema""" | |
return SearchMetricController.MetricSearchSchema(v) | |
def _MetricSearchOperationSchema(v): | |
"""Helper method to indirect the recursivity of the search schema""" | |
return SearchMetricController.MetricSearchOperationSchema(v) | |
class SearchMetricController(rest.RestController): | |
MetricSearchOperationSchema = voluptuous.Schema( | |
voluptuous.All( | |
voluptuous.Length(min=1, max=1), | |
{ | |
voluptuous.Any( | |
u"=", u"==", u"eq", | |
u"<", u"lt", | |
u">", u"gt", | |
u"<=", u"≤", u"le", | |
u">=", u"≥", u"ge", | |
u"!=", u"≠", u"ne", | |
u"%", u"mod", | |
u"+", u"add", | |
u"-", u"sub", | |
u"*", u"×", u"mul", | |
u"/", u"÷", u"div", | |
u"**", u"^", u"pow", | |
): voluptuous.Any( | |
float, int, | |
voluptuous.All( | |
[float, int, | |
voluptuous.Any(_MetricSearchOperationSchema)], | |
voluptuous.Length(min=2, max=2), | |
), | |
), | |
}, | |
) | |
) | |
MetricSearchSchema = voluptuous.Schema( | |
voluptuous.Any( | |
MetricSearchOperationSchema, | |
voluptuous.All( | |
voluptuous.Length(min=1, max=1), | |
{ | |
voluptuous.Any( | |
u"and", u"∨", | |
u"or", u"∧", | |
u"not", | |
): [_MetricSearchSchema], | |
} | |
) | |
) | |
) | |
@pecan.expose('json') | |
def post(self, metric_id, start=None, stop=None, aggregation='mean', | |
granularity=None): | |
granularity = [Timespan(g) | |
for g in arg_to_list(granularity or [])] | |
metrics = pecan.request.indexer.list_metrics( | |
ids=arg_to_list(metric_id)) | |
for metric in metrics: | |
enforce("search metric", metric) | |
if not pecan.request.body: | |
abort(400, "No query specified in body") | |
query = deserialize_and_validate(self.MetricSearchSchema) | |
if start is not None: | |
try: | |
start = utils.to_datetime(start) | |
except Exception: | |
abort(400, "Invalid value for start") | |
if stop is not None: | |
try: | |
stop = utils.to_datetime(stop) | |
except Exception: | |
abort(400, "Invalid value for stop") | |
try: | |
return { | |
str(metric.id): values | |
for metric, values in six.iteritems( | |
pecan.request.storage.search_value( | |
metrics, query, start, stop, aggregation, | |
granularity | |
) | |
) | |
} | |
except storage.InvalidQuery as e: | |
abort(400, e) | |
except storage.GranularityDoesNotExist as e: | |
abort(400, e) | |
class ResourcesMetricsMeasuresBatchController(rest.RestController): | |
@pecan.expose('json') | |
def post(self, create_metrics=False): | |
creator = pecan.request.auth_helper.get_current_user( | |
pecan.request.headers) | |
MeasuresBatchSchema = voluptuous.Schema( | |
{functools.partial(ResourceID, creator=creator): | |
{six.text_type: MeasuresListSchema}} | |
) | |
body = deserialize_and_validate(MeasuresBatchSchema) | |
known_metrics = [] | |
unknown_metrics = [] | |
unknown_resources = [] | |
body_by_rid = {} | |
for original_resource_id, resource_id in body: | |
body_by_rid[resource_id] = body[(original_resource_id, | |
resource_id)] | |
names = body[(original_resource_id, resource_id)].keys() | |
metrics = pecan.request.indexer.list_metrics( | |
names=names, resource_id=resource_id) | |
known_names = [m.name for m in metrics] | |
if strutils.bool_from_string(create_metrics): | |
already_exists_names = [] | |
for name in names: | |
if name not in known_names: | |
metric = MetricsController.MetricSchema({ | |
"name": name | |
}) | |
try: | |
m = pecan.request.indexer.create_metric( | |
uuid.uuid4(), | |
creator=creator, | |
resource_id=resource_id, | |
name=metric.get('name'), | |
unit=metric.get('unit'), | |
archive_policy_name=metric[ | |
'archive_policy_name']) | |
except indexer.NamedMetricAlreadyExists as e: | |
already_exists_names.append(e.metric) | |
except indexer.NoSuchResource: | |
unknown_resources.append({ | |
'resource_id': six.text_type(resource_id), | |
'original_resource_id': original_resource_id}) | |
except indexer.IndexerException as e: | |
# This catch NoSuchArchivePolicy, which is unlikely | |
# be still possible | |
abort(400, e) | |
else: | |
known_metrics.append(m) | |
if already_exists_names: | |
# Add metrics created in the meantime | |
known_names.extend(already_exists_names) | |
known_metrics.extend( | |
pecan.request.indexer.list_metrics( | |
names=already_exists_names, | |
resource_id=resource_id) | |
) | |
elif len(names) != len(metrics): | |
unknown_metrics.extend( | |
["%s/%s" % (six.text_type(resource_id), m) | |
for m in names if m not in known_names]) | |
known_metrics.extend(metrics) | |
if unknown_resources: | |
abort(400, {"cause": "Unknown resources", | |
"detail": unknown_resources}) | |
if unknown_metrics: | |
abort(400, "Unknown metrics: %s" % ", ".join( | |
sorted(unknown_metrics))) | |
for metric in known_metrics: | |
enforce("post measures", metric) | |
storage = pecan.request.storage.incoming | |
# Batch Patch | |
import struct | |
import datetime | |
from gnocchi.storage.common import ceph as gnocchi_ceph | |
with gnocchi_ceph.rados.WriteOpCtx() as op: | |
for metric in known_metrics: | |
measures = list(body_by_rid[metric.resource_id][metric.name]) | |
data = struct.pack("<" + storage._MEASURE_SERIAL_FORMAT * len(measures), *list(itertools.chain.from_iterable(measures))) | |
name = "_".join((storage.MEASURE_PREFIX, str(metric.id), str(uuid.uuid4()), datetime.datetime.utcnow().strftime("%Y%m%d_%H:%M:%S"))) | |
storage.ioctx.write_full(name, data) | |
storage.ioctx.set_omap(op, (name,), (b"",)) | |
storage.ioctx.operate_write_op(op, storage.MEASURE_PREFIX, flags=storage.OMAP_WRITE_FLAGS) | |
# End Batch patch | |
pecan.response.status = 202 | |
class MetricsMeasuresBatchController(rest.RestController): | |
# NOTE(sileht): we don't allow to mix both formats | |
# to not have to deal with id collision that can | |
# occurs between a metric_id and a resource_id. | |
# Because while json allow duplicate keys in dict payload | |
# only the last key will be retain by json python module to | |
# build the python dict. | |
MeasuresBatchSchema = voluptuous.Schema( | |
{utils.UUID: MeasuresListSchema} | |
) | |
@pecan.expose() | |
def post(self): | |
body = deserialize_and_validate(self.MeasuresBatchSchema) | |
metrics = pecan.request.indexer.list_metrics(ids=body.keys()) | |
if len(metrics) != len(body): | |
missing_metrics = sorted(set(body) - set(m.id for m in metrics)) | |
abort(400, "Unknown metrics: %s" % ", ".join( | |
six.moves.map(str, missing_metrics))) | |
for metric in metrics: | |
enforce("post measures", metric) | |
storage = pecan.request.storage.incoming | |
with futures.ThreadPoolExecutor(max_workers=THREADS) as executor: | |
list(executor.map(lambda x: storage.add_measures(*x), | |
((metric, body[metric.id]) for metric in | |
metrics))) | |
pecan.response.status = 202 | |
class SearchController(object): | |
resource = SearchResourceController() | |
metric = SearchMetricController() | |
class AggregationResourceController(rest.RestController): | |
def __init__(self, resource_type, metric_name): | |
self.resource_type = resource_type | |
self.metric_name = metric_name | |
@pecan.expose('json') | |
def post(self, start=None, stop=None, aggregation='mean', | |
reaggregation=None, granularity=None, needed_overlap=100.0, | |
groupby=None, fill=None, refresh=False, resample=None): | |
# First, set groupby in the right format: a sorted list of unique | |
# strings. | |
groupby = sorted(set(arg_to_list(groupby))) | |
# NOTE(jd) Sort by groupby so we are sure we do not return multiple | |
# groups when using itertools.groupby later. | |
try: | |
resources = SearchResourceTypeController( | |
self.resource_type)._search(sort=groupby) | |
except indexer.InvalidPagination: | |
abort(400, "Invalid groupby attribute") | |
except indexer.IndexerException as e: | |
abort(400, e) | |
if resources is None: | |
return [] | |
if not groupby: | |
metrics = list(filter(None, | |
(r.get_metric(self.metric_name) | |
for r in resources))) | |
return AggregationController.get_cross_metric_measures_from_objs( | |
metrics, start, stop, aggregation, reaggregation, | |
granularity, needed_overlap, fill, refresh, resample) | |
def groupper(r): | |
return tuple((attr, r[attr]) for attr in groupby) | |
results = [] | |
for key, resources in itertools.groupby(resources, groupper): | |
metrics = list(filter(None, | |
(r.get_metric(self.metric_name) | |
for r in resources))) | |
results.append({ | |
"group": dict(key), | |
"measures": AggregationController.get_cross_metric_measures_from_objs( # noqa | |
metrics, start, stop, aggregation, reaggregation, | |
granularity, needed_overlap, fill, refresh, resample) | |
}) | |
return results | |
class AggregationController(rest.RestController): | |
_custom_actions = { | |
'metric': ['GET'], | |
} | |
@pecan.expose() | |
def _lookup(self, object_type, resource_type, key, metric_name, | |
*remainder): | |
if object_type != "resource" or key != "metric": | |
# NOTE(sileht): we want the raw 404 message here | |
# so use directly pecan | |
pecan.abort(404) | |
try: | |
pecan.request.indexer.get_resource_type(resource_type) | |
except indexer.NoSuchResourceType as e: | |
abort(404, e) | |
return AggregationResourceController(resource_type, | |
metric_name), remainder | |
@staticmethod | |
def get_cross_metric_measures_from_objs(metrics, start=None, stop=None, | |
aggregation='mean', | |
reaggregation=None, | |
granularity=None, | |
needed_overlap=100.0, fill=None, | |
refresh=False, resample=None): | |
try: | |
needed_overlap = float(needed_overlap) | |
except ValueError: | |
abort(400, 'needed_overlap must be a number') | |
if start is not None: | |
try: | |
start = utils.to_datetime(start) | |
except Exception: | |
abort(400, "Invalid value for start") | |
if stop is not None: | |
try: | |
stop = utils.to_datetime(stop) | |
except Exception: | |
abort(400, "Invalid value for stop") | |
if (aggregation | |
not in archive_policy.ArchivePolicy.VALID_AGGREGATION_METHODS): | |
abort( | |
400, | |
'Invalid aggregation value %s, must be one of %s' | |
% (aggregation, | |
archive_policy.ArchivePolicy.VALID_AGGREGATION_METHODS)) | |
for metric in metrics: | |
enforce("get metric", metric) | |
number_of_metrics = len(metrics) | |
if number_of_metrics == 0: | |
return [] | |
if granularity is not None: | |
try: | |
granularity = Timespan(granularity) | |
except ValueError as e: | |
abort(400, e) | |
if resample: | |
if not granularity: | |
abort(400, 'A granularity must be specified to resample') | |
try: | |
resample = Timespan(resample) | |
except ValueError as e: | |
abort(400, e) | |
if fill is not None: | |
if granularity is None: | |
abort(400, "Unable to fill without a granularity") | |
try: | |
fill = float(fill) | |
except ValueError as e: | |
if fill != 'null': | |
abort(400, "fill must be a float or \'null\': %s" % e) | |
try: | |
if strutils.bool_from_string(refresh): | |
pecan.request.storage.process_new_measures( | |
pecan.request.indexer, | |
[six.text_type(m.id) for m in metrics], True) | |
if number_of_metrics == 1: | |
# NOTE(sileht): don't do the aggregation if we only have one | |
# metric | |
measures = pecan.request.storage.get_measures( | |
metrics[0], start, stop, aggregation, | |
granularity, resample) | |
else: | |
measures = pecan.request.storage.get_cross_metric_measures( | |
metrics, start, stop, aggregation, | |
reaggregation, resample, granularity, needed_overlap, fill) | |
# Replace timestamp keys by their string versions | |
return [(timestamp.isoformat(), offset, v) | |
for timestamp, offset, v in measures] | |
except storage.MetricUnaggregatable as e: | |
abort(400, ("One of the metrics being aggregated doesn't have " | |
"matching granularity: %s") % str(e)) | |
except storage.MetricDoesNotExist as e: | |
abort(404, e) | |
except storage.AggregationDoesNotExist as e: | |
abort(404, e) | |
@pecan.expose('json') | |
def get_metric(self, metric=None, start=None, stop=None, | |
aggregation='mean', reaggregation=None, granularity=None, | |
needed_overlap=100.0, fill=None, | |
refresh=False, resample=None): | |
# Check RBAC policy | |
metric_ids = arg_to_list(metric) | |
metrics = pecan.request.indexer.list_metrics(ids=metric_ids) | |
missing_metric_ids = (set(metric_ids) | |
- set(six.text_type(m.id) for m in metrics)) | |
if missing_metric_ids: | |
# Return one of the missing one in the error | |
abort(404, storage.MetricDoesNotExist( | |
missing_metric_ids.pop())) | |
return self.get_cross_metric_measures_from_objs( | |
metrics, start, stop, aggregation, reaggregation, | |
granularity, needed_overlap, fill, refresh, resample) | |
class CapabilityController(rest.RestController): | |
@staticmethod | |
@pecan.expose('json') | |
def get(): | |
aggregation_methods = set( | |
archive_policy.ArchivePolicy.VALID_AGGREGATION_METHODS) | |
return dict(aggregation_methods=aggregation_methods, | |
dynamic_aggregation_methods=[ | |
ext.name for ext in extension.ExtensionManager( | |
namespace='gnocchi.aggregates') | |
]) | |
class StatusController(rest.RestController): | |
@staticmethod | |
@pecan.expose('json') | |
def get(details=True): | |
enforce("get status", {}) | |
try: | |
report = pecan.request.storage.incoming.measures_report( | |
strutils.bool_from_string(details)) | |
except incoming.ReportGenerationError: | |
abort(503, 'Unable to generate status. Please retry.') | |
report_dict = {"storage": {"summary": report['summary']}} | |
if 'details' in report: | |
report_dict["storage"]["measures_to_process"] = report['details'] | |
return report_dict | |
class MetricsBatchController(object): | |
measures = MetricsMeasuresBatchController() | |
class ResourcesMetricsBatchController(object): | |
measures = ResourcesMetricsMeasuresBatchController() | |
class ResourcesBatchController(object): | |
metrics = ResourcesMetricsBatchController() | |
class BatchController(object): | |
metrics = MetricsBatchController() | |
resources = ResourcesBatchController() | |
class V1Controller(object): | |
def __init__(self): | |
self.sub_controllers = { | |
"search": SearchController(), | |
"archive_policy": ArchivePoliciesController(), | |
"archive_policy_rule": ArchivePolicyRulesController(), | |
"metric": MetricsController(), | |
"batch": BatchController(), | |
"resource": ResourcesByTypeController(), | |
"resource_type": ResourceTypesController(), | |
"aggregation": AggregationController(), | |
"capabilities": CapabilityController(), | |
"status": StatusController(), | |
} | |
for name, ctrl in self.sub_controllers.items(): | |
setattr(self, name, ctrl) | |
@pecan.expose('json') | |
def index(self): | |
return { | |
"version": "1.0", | |
"links": [ | |
{"rel": "self", | |
"href": pecan.request.application_url} | |
] + [ | |
{"rel": name, | |
"href": pecan.request.application_url + "/" + name} | |
for name in sorted(self.sub_controllers) | |
] | |
} | |
class VersionsController(object): | |
@staticmethod | |
@pecan.expose('json') | |
def index(): | |
return { | |
"versions": [ | |
{ | |
"status": "CURRENT", | |
"links": [ | |
{ | |
"rel": "self", | |
"href": pecan.request.application_url + "/v1/" | |
} | |
], | |
"id": "v1.0", | |
"updated": "2015-03-19" | |
} | |
] | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment